Tuesday, April 01, 2014

MySQL Fabric 1.4.2 Released

As you saw in the press release, MySQL Fabric 1.4.2 is now released! If you're interested in learning more about MySQL Fabric, there is a session titled Sharding and Scale-out using MySQL Fabric in Ballroom G. MySQL Fabric is a relatively new project in the MySQL ecosystem and it focuses on building a framework for working with large deployments of MySQL Servers. The architecture of MySQL Fabric is such that it allows extensions to be added and the first two extensions that we added were support for high-availability using High-Availability groups (HA groups) and sharding to manage very large databases. The first version of sharding have hash and range sharding implemented as well as procedures for moving and splitting shards.
A critical part of working with a collection of servers is the ability to route transactions to the correct servers, and for efficiency reasons we quite early decided to put this routing logic into the connectors. This avoid one extra network hop and hence improve performance by reducing latency, but it does require that the connectors containing routing logic, caches, and support for fetching data from MySQL Fabric. Putting the routing logic into the connector also make it easy to extend the API to add new support that applications can require.
MySQL Fabric 1.4.2 is distributed as part of MySQL Utilities 1.4.2. To avoid confusion, we have changed the version numbering to match the version of MySQL Utilities it is distributed in.
We have just done a few public releases, even though we did a few internal releases as well, but a brief history of our releases this far is:
  • MySQL Fabric 1.4.0
    • First public release
    • High-Availability groups for modeling farms
    • Event-driven Executor for execution of management procedures.
    • Simple failure detector with fail-over procedures.
    • Hash and Range sharding allowing management of large databases.
    • Shard move and shard split to support management of a sharded database.
    • Connector interfaces to support federated database systems.
    • Fabric-aware Connector/Python (labs)
    • Fabric-aware Connector/J (labs)
    • Fabric-aware Connector/PHP (labs)
  • MySQL Fabric 1.4.1
    • More solid scale-out support in connectors and MySQL Fabric
    • Improvements to the Executor to avoid stalling reads
    • Connector/Python 1.2.0 containing:
      • Range and Hash sharding
      • Load-balancing support
    • Labs release of Connector/J with Fabric-support
  • MySQL Fabric 1.4.2
    • Credentials in MySQL Fabric
    • External failure reporting interfaces supporting external failure detectors
    • Support for unreliable failure detectors in MySQL Fabric
    • Credentials support in Connector/Python
    • Connector/Python 1.2.1 containing:
      • Failure reporting
      • Credentials Support
  • Connector/J 5.1.30 containing Fabric support

  • Do you want to participate?

    There is a lot you can do if you want to help improve MySQL Fabric.

    Blogs about MySQL Fabric

    Monday, October 21, 2013

    MySQL Fabric: High Availability Groups

    As you might have noticed, we have released a framework for managing farms (or grids, as Justin suggested) of MySQL servers called MySQL Fabric. MySQL Fabric is focused on being easy to use and extensible, and two extensions are currently part of the framework: one to manage high-availability and one to implement sharding.

    High-Availability Group

    High-Availability Groups

    One of the central concepts used to construct a farm is the high-availability group (or just group when there is no risk of confusion) and is introduced by the high-availability extension. As mentioned in the previous post, the group concept does not really represent anything new but is rather a formalization of how we think and work with the structure of the farm. The key to supporting high-availability is to have redundancy in the system: if one component fail, another one should be ready to pick up the job of the failing component. Hardening the systems (by using hardware less prone to fail or hardware with built-in redundancy) can help reduce the chance of a component failing, but not completely eliminate it. Even a hardened system is susceptible to failure in a power outage or an earthquake. With this in mind, we introduced the group concept for managing pieces of data in our farm:

    each group consists of several machines that are responsible for managing the same piece of data.The concept of a group is an abstraction to model the basic concept that we're after, but does not really say anything about how it is implemented. This is intentional: it should be concrete enough to support all the operations we need, but abstract enough to not restrict how it is implemented. This is important because connectors (or any other "outside" observer) that work with groups should not have to be updated whenever new implementations are added. For example, it should not make a difference to a connector if the group is implemented using a traditional Master-Slave setup, a MySQL Cluster, or using replicated storage such as DRBD.

    Server properties in groups

    There are a few key properties that we assume for groups:
    • A server belong to (at most) one group.
    • At any time, each server in the group have a designated a status.
    • At any time, each server has a mode indicating if it accepts reads, writes, both, or neither.
    • Each server also has a weight, which is the relative power of the server and is used to balance the load.
    Note that these properties might change over time, depending on events that happen.For handling load-balancing and high-availability the properties Status, Mode, and Weight where introduced. The mode and weight properties are used by a connector when it comes to deciding where to send a transaction, while the status property is used by the Fabric to keep track of the status of the server. Let's take a closer look at the properties.

    Figure 1. Server Status
    Server Status (or Role). The status of the server provide information about what the server is currently doing in the group. The status of a server is Fabric's view of the status of the server and changes as time passes and the Fabric notice changes. A primary server accept both a write and a read load and sending high-priority read transactions here mean that they get current data. A secondary server can handle reads but, in the case of a master-slave configuration, it should not accept writes since that would lead to a split-brain situation. Secondary servers are servers waiting to pick up the job of the primary if it fails. Spare servers do not accept reads nor writes, but are ready and running and can therefore change status in the group to replace other servers in the event of failures. In addition, spare servers can be used to handle reads.

    In Figure 1 you can see an example of how servers could change status, but note that at this time, we do not track all states. For example, we are considering how to handle the provisioning of new servers in flexible and extensible way, but more about that in a separate post.

    Server Mode. The mode of the server gives information on whether it can be read or written and provide information for the connector on how it should send queries. For now, we only have three modes: Offline, Read-only, and Read-Write. Offline servers cannot be read from or written to, and usually does not accept connections. Read-only servers can only be read from and write transactions should not be sent to these. Read-Write servers are usually primaries of the group. They can accept writes and will propagate them correctly to other servers in the group.

    Server Weight. The weight of a server is used to balance the load between servers. The weight represent the relative power of the server. When balancing the load between servers, the connector will figure out what servers are eligible for accepting a transaction and then pick one of the servers in such a way that the distribution over time will be proportional to the weight of the server.

    Transaction properties

    As mentioned before, one of the goals is to support sharding in the presence of transactions and to make that work correctly, it is necessary to declare up-front what the transaction will contain. Not everything, but the key elements of the transaction: what tables it will access, what sharding key is used, and if it is a read-only or read-write transaction. The first two properties are only necessary if you are working with a sharded system, so we skip those for now; the last one, however, is important for handling load-balancing in the connector.When executing transactions using a Fabric-aware connector, you provide the information about the transaction using transaction properties. There are several properties available, but we will focus on the ones related to group handling: group and type. The group property is used to provide the name of the group you want to connect to (you can have several), and the type property is used to tell if this is a read-only or read-write transaction. In the future, we might add more properties such as priority to indicate that this is an urgent transaction and a prompt reply is needed. For example, the following code is using a Fabric-aware connector to promote an employee.

    from mysql.connector.fabric import (
    def promote_employee(conn, emp_no):
        stmts = [
            ("SELECT salary INTO @salary FROM salaries"
             " WHERE emp_no = %s AND to_date = DATE('9999-01-01')"),
            ("UPDATE titles SET to_date = CURRENT_DATE()"
             " WHERE emp_no = %s and to_date = DATE('9999-01-01')"),
            ("UPDATE salaries SET to_date = CURRENT_DATE()"
             " WHERE emp_no = %s and to_date = DATE('9999-01-01')"),
            ("INSERT INTO titles VALUES"
             " (%s, 'Master of the Universe', CURRENT_DATE(), DATE('9999-01-01'))"),
            ("INSERT INTO salaries VALUES"
             " (%s, 10 * @salary, CURRENT_DATE(), DATE('9999-01-01'))"),
        # Use the group for the ACME company
        conn.set_property('group', 'ACME')
        conn.set_property('type', TYPE_READWRITE)
        cur = conn.cursor()
        for stmt in stmts:
            print "Executing:", stmt % (emp_no,)
            cur.execute(stmt, (emp_no,))
    On line 20 and 21 you see how the properties of the transaction is set. In this case, we declare the group that we will access (for example, a fictional company "ACME") and also the type of the transaction. After that, a transaction is started as normal and executed. The Fabric-aware connector will pick the right server to send the transaction to and you will get the result back in the normal fashion.

    Note that the property type is not yet implemented in Connector/Python, some work remains to make it support load-balancing fully.

    Picking a server

    But are these server and transaction properties sufficient for a connector to make a decision on what to do with a transaction? Let's take a look and see how the server can be selected.A server can be chosen by first selecting a set of candidates and then picking one of the candidates based on the weight of the server. Picking the candidates are done by matching the transaction properties and the server properties to find all server that are eligible for accepting the transaction. When a list of candidates are available you can, for example, pick one at random based on the weight of the servers. You can see an example Python code below that illustrates how this could be done. The first function find_candidates computes the set of candidates from the set of all servers SERVERS, while the second function pick_server pick one of the servers at random based on the weight of the server.

    def find_candidates(props):
       candidates = []
       for srv in SERVERS:
          if props.group == srv.group and (props.mode & srv.mode):
       return candidates
    def pick_server(servers):
       random_weight = random() * sum(srv.weight for srv in servers)
       sum_weight = 0.0
       for idx, srv in enumerate(servers):
          sum_weight += srv.weight
          if sum_weight > random_weight:
             return servers[idx]
       return servers[-1]    # Last server in list
    # Example code for picking a server based on transaction properties

    Implementation of groups

    The reason to why we introduced the group concept in this manner is to be able to vary the implementation of a group, so the question is then, does it work? To see if it works, it is good to consider some sample implementations of high-availability groups and see if they can be described in this manner, so let's do that. Note that the only version that is currently implemented is the primary-secondary approach: the other ones are just food for thought (at this point).

    The primary-secondary approach (also known as primary-backup or master-slave) is the traditional way to set up MySQL servers for high-availability. The idea is that there is a single primary managing the data and one or more secondaries that replicate the data from the primary and are ready to become primary in the event that the primary dies. In addition, there is a number of pure read slaves that are used to scale-out reads.

    In this approach, the primary would be in read-write mode, and the secondaries could either be offline or in read mode. Secondaries cannot accept writes since that might cause a split-brain situation, but they can either be in read-only mode or offline. Not loading the servers with read-only transactions can make it easier for the secondaries to be up to date with the primary, but this depends on the general load on the system. Scale-out slaves added would then, of course, be pure read-only servers, and they cannot be promoted to be masters because they do not have the binary log enabled. However, if the primary master fails, they still need to fail-over to the new primary.

    If (when?) the primary master fails, MySQL Fabric will detect the failure and start executing a procedure to promote one of the secondary master to be primary instead of the one that failed. MySQL Fabric have to do this because the servers do not know how to handle the fail-over themselves, and in addition it is necessary to inform the connectors about the new topology. In this procedure, the scale-out servers have to be moved over to the new primary as well.

    Another popular solution for high-availability shared storage (for example, using shared network disks) or replicated storage (for example, using DRBD to replicate the block device). In this case, one of the server will be on-line, but the other will be on standby. For both DRBD and shared storage, it is necessary that the standby is completely offline and in the case of DRBD the server should not even be running on the standby machine. In addition to the primary and the secondary, you could have read slaves attached to the primary.

    In this setup, the primary would then be a read-write server, while the standby server would be in offline mode. Scale-out servers would be in read-only mode, in this case attached to the primary.

    Another approach is to use MySQL Cluster as a group. The cluster consists of several data nodes and employ a shared-nothing architecture to ensure high availability. In this case, all the servers will be both write and read servers, and all might be primaries. In the event that an NDB data node fails, the other nodes are always ready to pick up the job, so a MySQL Cluster group is self-managing. (There is an excellent overview of MySQL Cluster at http://dev.mysql.com/doc/refman/5.6/en/mysql-cluster-overview.html.)

    The two solutions above employ different fail-over procedures that are executed by the Fabric node when it notices the failure. In contrast with the solutions above, MySQL Cluster is self-governing and does not require any fail-over handling implemented in the Fabric node.

    Summary and considerations for the future

    For the examples above, the properties we have outlined is definitely sufficient, but there might be other cases where more information is needed.One property that is missing in the current implementation is a way to select a server based on the proximity to the connector. For example, it could be possible to put the primaries and secondaries in a group in different data centers to ensure that it can handle a catastrophic failure. This, however, opens two issues:

    1. There will be a set of read servers in each data center that should be connected to the primary or secondary in the same data center.
    2. When the connector picks one of the candidates, it should prefer to use those in the same data center.
    Both these issues mean that we need some measure of the distance between the servers and connectors so that when adding new scale-out servers it is not added in such a way that the same data is shipped several times between data centers, nor should a connector connect to a server in a different data center. Adding a complete matrix with distances between each and every server would not really work well, so it is likely that some other way to model the proximity is needed.Another case that might require some additional information is if the Fabric node fails or is unavailable temporarily (for instance, is restarting). Such an event should not block the entire system and since the connectors have information it would be possible to "run on the cache" for a brief period. The key issues here is that nothing can be updated, so each connector need to have a fallback plan in the event that a server fails. For example, if a master fails, the fail-over will not be executed, but it would still be possible to read information from the slaves.

    Related Links

    Tuesday, October 08, 2013

    MySQL Connect presentations on MySQL Fabric available on SlideShare

    Going to MySQL Connect was truly a blast. We got a lot of good questions and feedback in the sessions and there were a lot of interest in both MySQL Fabric and the MySQL Applier for Hadoop.

    A big thank you to all that attended the talks, I got a lot of good questions and comments that will help us build good solutions.

    The talks are available on SlideShare:

    Saturday, September 21, 2013

    A Brief Introduction to MySQL Fabric

    As you saw on the keynote, we are introducing an integrated framework for managing farms of MySQL servers with support for both high-availability and sharding. It should be noted that this is a very early alpha and that it at this point is not ready for production use.

    MySQL Fabric is an integrated system for managing a collection of MySQL servers and is the framework on which high-availability and sharding is built. MySQL Fabric is open-source and is intended to be extensible, easy to use, and support procedure execution even in the presence of failure, an execution model we call resilient execution.

    To ensure high-availability, it is necessary to have redundancy in the system. For database systems, the redundancy traditionally takes the form of having a primary server acting as a master and using replication to keep secondaries available to take over in case the primary fails. This means that the "server" that the application connects to is in reality a collection of servers, not a single server. In a similar manner, if the application is using a sharded database, it is in reality working with a collection of servers, not a single server. In this case, we refer to a collection of servers as a farm.

    Now, just having a collection of servers does not really help us that much: it is necessary to have some structure imposed on the farm as well as an API to work with the farm, and this is where MySQL Fabric comes in.

    Before going over the concepts, have a look at the farm below. In the figure, there is an application that want to connect to the farm and there are a set of database servers at the bottom organized into groups called high-availability groups. To manage the structure of the farm, there is a MySQL Fabric Node available that keeps, among other things, track of the meta-data as well as handles procedure execution. Both the application and an operator can connect to the Fabric node to get information about the farm being managed.

    The MySQL Fabric nodes are responsible for keeping track of the structure of the farm as well as all information about their status and is also where any management procedures are executed. If a server fails, a the Fabric node will handle the procedure of promoting one of the slaves to be the new master, but it also contain the logic for handling shard moving and shard splitting.

    High-Availability Group

    High-availability Groups

    The central concept to handling high-availability in Fabric are the high-availability groups. These are collections of servers that shall work together to deliver a database service to the application that connects. The groups are introduced to give structure to the farm and allow you to describe how the servers are organized to support the redundancy necessary to ensure high-availability.

    Inside the group, all the servers manage the same data and hence have the same schema. In addition, each server have distinct roles and no server can belong to more than one group. The group concept does not really represent anything new: it is just a way to structure the farm in such a manner that managing it is easy to understand and the roles of servers are clear. To manage redundancy, a traditional Master-Slave Setup is used, a topology that we all are familiar with (it is often called the Primary-Secondary Approach, hence the names that follow). Each group have a primary that is is the master for all the data. Any queries that update data is sent here and that data is propagated to the other servers in the group. Redundancy is achieved by keeping one or more secondaries in the group that receive changes from the primary and are ready to take over the role as primary should the primary dissapear. To handle scale-out, the group also contain scale-out servers which are severs that receive changes from the primary but are not eligable for being promoted to primary. In the group, there are also spares, which are servers that are available for use but which are not assigned any active role yet.


    In addition to high-availability support, MySQL Fabric also offer support for sharding, which is a technique for handling very large databases and/or very high write loads. The database is split into a large number of shards, where each shard contain a fragment of the data in the database. Each shard is stored on a separate server (or a separate set of servers if you want to ensure high-availability) and the transactions are directed to each shard based on the shard key. Splitting the database in this way allow you both to manage a larger database by separating it onto more servers, but it also scale the write traffic because you can execute writes independently.

    When using sharding, MySQL Fabric separate tables into sharded tables MySQL Fabric allow you to shard just some of the tables in the database and keep the other tables available on all shards, which we call global tablesand global tables. Since databases usually have multiple tables with foreign key relationships between, it is critical to be able to shard several tables the same way (but possibly on different columns), which is something that MySQL Fabric supports. Using this support, you can give all the tables that are sharded and what column should be used as the sharding key and MySQL Fabric will shard all tables and distribute the rows on the shards. Tables that are not sharded are the global tables and they will be available on all shards.

    If you want to know more about how Fabric support sharding, or about sharding in general, you should come to the session MySQL Sharding: Tools and Best Practices for Horizontal Scaling, September 21, 4:00pm-5:00pm in Imperial Ballroom B.

    Connecting to a MySQL Farm

    To provide better control when working with a MySQL farm we have extended the connector API in such a manner that it hides the complexities of handling fail-over in the event of a server failure as well as dispatching transactions to shards correctly. There is currently support for Fabric-aware versions of Connector/J, Connector/PHP, Connector/Python as well as some rudimentary support for Hibernate and Doctrine. If you are interested in how the extensions to the interface look and how you can use them to scale your application, you should come to Scaling PHP Applications, September 22, 10:00am-11:00am in Union Square Room 3/4.

    More information about MySQL Fabric

    There are several blogs being published on high-availability and sharding from the developers working on the Fabric system or the connectors.

    If you are interested in discussing and asking questions about MySQL Fabric, or sharding and high-availability in general, the forum on Fabric, Sharding, HA, Utilities is an excellent place for discussions. Also, if you are at MySQL Connect, going to MySQL Sharding, Replication, and HA (September 21, 5:30-6:30pm in Imperial Ballroom B) is an excellent opportunity to learn more about the project, meet us developers and team leads, and provide feedback to us. The BOF will cover several areas, some overlapping, but the discussion is likely to cover MySQL Fabric and MySQL replication.

    Thursday, August 29, 2013

    Going to MySQL Connect 2013

    MySQL Connect 2013 is coming up with several interesting new sessions. Some sessions that I am participating in got accepted for the conference, so if you are going there, you might find the following sessions interesting. For your convenience, the sessions have hCalendar markup, so it should be easier to add them to your calendar.

    MySQL Sharding, Replication, and HA (September 21, 5:30-6:30pm in Imperial Ballroom B)

    This session is an opportunity for you to meet the MySQL engineering team and discuss the latest tools and best practices for sharding MySQL across distributed server farms while maintaining high availability.

    Come here and meet Lars, Luis, Johannes, and me, and bring up any questions or comments you have regarding sharding, high-availability, and replication. We might have some informal presentations available to discuss sharding and high-availability issues.

    MySQL Sharding: Tools and Best Practices for Horizontal Scaling (September 21, 4:00pm-5:00pm in Imperial Ballroom B)

    In this session, Alfranio and I will discuss how to create and manage a sharded MySQL database system. The session covers tools, techniques, and best practices for handing various aspects of sharding such as:

    • Hybrid approaches mixing sharding and global tables.
    • Sharding in the precense of transactions and how that affect
    • Turning an unsharded database into a sharded database
    • Handling schema changes to a sharded database.

    MySQL and Hadoop: Big Data Integration—Unlocking New Insights (September 22,
    -12:30pm in Taylor)

    Hadoop enables organizations to gain deeper insight into their customers, partners, and processes. As the world's most popular open source database, MySQL is a key component of many big data platforms. This session discusses technologies that enable integration between MySQL and Hadoop, exploring the lifecycle of big data, from acquisition via SQL and NoSQL APIs through to delivering operational insight and tools such as Sqoop and the Binlog API with Hadoop Applier enabling both batch and real-time integration.

    MySQL High Availability: Managing Farms of Distributed Servers (September 22, 5:30pm-6:30pm in Imperial Ballroom B)

    In this session, Alfranio and I will discuss tools, best practices, and frameworks for delivering high availability using MySQL. For example, handling such issues as ensuring server redundancy, handling failure detection and executing recovery, re-directing clients in the event of failure.

    Scaling PHP Applications (September 22, 10:00am-11:00am in Union Square Room 3/4)

    When building a PHP application, it is necessary to consider both scaling and high-availability issues. In this session, Johannes and I will discuss how to ensure that your PHP application scales well and can work in a high-availability environment.

    Thursday, October 11, 2012

    Round Robin Replication using GTID

    In a previous post I showed how to implement multi-source round-robin replication in pure SQL using the tables that are needed for crash-safe replication. I also outlined a revised version of this approach in the Replication Tips & Tricks presentation I gave at MySQL Connect. This was, however, before the GTID (Global Transaction ID) implementation was done. Now that they are introduced, multi-source replication is even easier since you no longer have to keep track of the positions.

    Figure 1. Tables for storing information about masters
    CREATE TABLE my_masters (
        host CHAR(50) NOT NULL,
        port INT NOT NULL DEFAULT 3306,
        PRIMARY KEY (idx),
        UNIQUE INDEX (host,port)
    CREATE TABLE current_master (
        idx INT
    CREATE TABLE replication_user(
        name CHAR(40),
        passwd CHAR(40)

    One caveat is that this only works if you are replicating from servers that have GTID enabled, so if you are trying to replicate from a pre-5.6 server, you can use the original implementation. I have added a re-factored version of the code last in this post, and you can also find some utility procedures that I use in the version described here.

    For the version that uses GTID, we still keep the two tables we were using in the original implementation around, but we remove the file and position from the tables, giving the definitions seen in Figure 1. Also, the user and password is stored in a separate table and is assumed to be identical for all machines.

    To fetch the new master, I created a fetch_next_master procedure that fetches the next master in turn and then advance current_master to the next master. The second select in the code below is used to handle the case that you have a table with masters defined as in Table 1.

    delimiter $$
    CREATE PROCEDURE fetch_next_master(
        OUT p_host CHAR(50), OUT p_port INT UNSIGNED,
        OUT p_file CHAR(50), OUT p_pos BIGINT)
       DECLARE l_next_idx INT DEFAULT 1;
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx > (SELECT idx FROM current_master)
        ORDER BY idx LIMIT 1;
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx >= l_next_idx
        ORDER BY idx LIMIT 1;
       UPDATE current_master SET idx = l_next_idx;
       SELECT host, port INTO p_host, p_port
         FROM my_masters WHERE idx = l_next_idx;
    END $$
    delimiter ;

    Since we no longer need to save the position, the code for multi_source event is significantly simpler. All that is necessary is to change master to the next master in turn: the server remembers what transactions are missing automatically and will start replicating from the correct position.

    delimiter $$
    CREATE EVENT multi_source
       DECLARE l_host CHAR(50);
       DECLARE l_user CHAR(40);
       DECLARE l_passwd CHAR(40);
       DECLARE l_file CHAR(50);
       DECLARE l_pos BIGINT;
       SET SQL_LOG_BIN = 0;
       CALL stop_slave_gracefully();
       CALL fetch_next_master(l_host, l_port);
       SELECT name, passwd INFO l_user, l_passwd FROM replication_user;
       CALL change_master(l_host, l_port, l_user, l_passwd);
    END $$
    delimiter ;

    Full code for original implementation

    Here is the code for replicating from pre-5.6 to 5.6 using the replication tables added for implementing crash-safe slaves.

    Compared to the version described in the earlier post, I have added a few utility procedures such as a procedure to stop the slave gracefully. The procedure will first stop the I/O thread, and then empty the relay log before stopping the SQL thread. This is mainly to avoid having to re-transfer a lot of events from the master. Compared to the version provided in the previous post, I factored out some separate procedures. You can see the re-factored version last in the post.

    delimiter $$
    CREATE PROCEDURE change_master(
        p_host CHAR(40), p_port INT,
        p_user CHAR(40), p_passwd CHAR(40),
        p_file CHAR(40), p_pos LONG)
                         CONCAT('MASTER_HOST = "', p_host, '", '),
                         CONCAT('MASTER_PORT = ', p_port, ', '),
                         CONCAT('MASTER_USER = "', p_user, '", '),
                         CONCAT('MASTER_PASSWORD = "', p_passwd, '"'));
       IF p_file IS NOT NULL AND p_pos IS NOT NULL THEN
         SET @cmd = CONCAT(@cmd,
                           CONCAT(', MASTER_LOG_FILE = "', p_file, '"'),
                           CONCAT(', MASTER_LOG_POS = ', p_pos));
       END IF;
       PREPARE change_master FROM @cmd;
       EXECUTE change_master;
       DEALLOCATE PREPARE change_master;
    END $$
    delimiter ;
    delimiter $$
    CREATE PROCEDURE save_position()
       DECLARE l_msg CHAR(60);
       UPDATE my_masters AS m,
              mysql.slave_relay_log_info AS rli
          SET m.log_pos = rli.master_log_pos,
              m.log_file = rli.master_log_name
        WHERE idx = (SELECT idx FROM current_master);
    END $$
    delimiter ;
    delimiter $$
    CREATE PROCEDURE fetch_next_master(
        OUT p_host CHAR(40), OUT p_port INT UNSIGNED,
        OUT p_file CHAR(40), OUT p_pos BIGINT)
       DECLARE l_next_idx INT DEFAULT 1;
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx > (SELECT idx FROM current_master)
        ORDER BY idx LIMIT 1;
       SELECT idx INTO l_next_idx FROM my_masters
        WHERE idx >= l_next_idx
        ORDER BY idx LIMIT 1;
       UPDATE current_master SET idx = l_next_idx;
       SELECT host, port, log_pos, log_file
         INTO p_host, p_port, p_pos, p_file
         FROM my_masters
        WHERE idx = l_next_idx;
    END $$
    delimiter ;
    delimiter $$
    CREATE EVENT multi_source
       DECLARE l_host CHAR(40);
       DECLARE l_user CHAR(40);
       DECLARE l_passwd CHAR(40);
       DECLARE l_file CHAR(40);
       DECLARE l_pos BIGINT;
       SET SQL_LOG_BIN = 0;
       STOP SLAVE;
       CALL save_position();
       CALL fetch_next_master(l_host, l_port, l_file, l_pos);
       SELECT name, passwd INTO l_user, l_passwd FROM replication_user;
       CALL change_master(l_host, l_port, l_user, l_passwd, l_file, l_pos);
    END $$
    delimiter ;

    Tuesday, June 05, 2012

    Binary Log Group Commit in MySQL 5.6

    With the release of MySQL 5.6 binary log group commit is included, which is a feature focused on improving performance of a server when the binary log is enabled. In short, binary log group commit improve performance by grouping several writes to the binary log instead of writing them one by one, but let me digress a little on how transactions are logged to the binary log before going into the details. Before going into details about the problem and the implementation, let look at what you do to turn it on.


    Well... we actually have a few options to tweak it, but nothing required to turn it on. It even works for existing engines since we did not have to extend the handlerton interface to implement the binary log group commit. However, InnoDB has some optimizations to take advantage of the binary log group commit implementation.

    This is a global variable that can be set without stopping the server.

    If this is off (0), transactions may be committed in parallel. In some circumstances, this might offer some performance boost. For the measurements we did, there were no significant improvement in throughput, but we decided to keep the option anyway since there are special cases were it can offer improvements.

    This variable controls when to stop skimming the flush queue (more about that below) and move on as soon as possible. Note that this is not a timeout on how often the binary log should be written to disk since grabbing the queue and writing it to disk takes time.

    Transactions galore...

    As the server executes transactions the server will collect the changes done by the transaction in a per-connection transaction cache. If statement-based replication is used, the statements will be written to the transaction cache, and if row-based replication is used, the actual rows changed will be written to the transaction cache. Once the transaction commits, the transaction cache is written to the binary log as one single block. This allow each session to execute independently of each others and only need to take a lock on the binary log when writing the transaction data to it. Since transactions are isolated from each others it is enough to serialize the transactions on commits. (Of course, this is in an ideal world. Transactions can see other transactions changes if you set a different transaction isolation level. You would never do that unless you knew exactly what you're doing... right?)
    Figure 1. Two-Phase Commit Protocol (2PC)
    In order to keep the storage engine and the binary log in sync, the server employs a two-phase commit protocol (or just 2PC) that you can see in Figure 1. As you can see, there is a call to write() and one call to fsync() in the diagram: I'll get to that is just a moment, so stay tuned.

    The entire point of using a two-phase commit protocol is to be able to guarantee that the transaction is either both in the engine and the binary log (or in neither) even in the event that the server crashes after the prepare, and subsequently recovers. That is, it should not be possible that the transaction is in the engine but not in the binary log, or vice verse. Two-phase commit solves this by requiring that once a transaction is prepared in the engine, it can be either fully committed or fully rolled back even if the server crashes and recover. So, on recovery, the storage engine will then provide the server with all the transactions that are prepared but not yet committed, and the server will then commit the transaction if it can be found in the binary log, and roll it back otherwise.

    This is, however, only possible if the transaction can be guaranteed to be persistent in the binary log before committing the transaction in the engine. Since disks are slow and memory fast, the operating system tries to improve performance by keeping part of the file in memory instead of writing directly to disk. Once enough changes have been written, or the memory is needed for something else, the changes are written to disk. This is good for the operating system (and also for anybody using the computer), but causes a problem for the database server since if the server crashes, it is possible that the transaction is committed in the storage engine, but there is no trace of it in the binary log.

    For recovery to work properly, it is therefore necessary to ensure that the file is really on disk, which is why there is a call to fsync() in Figure 1, which makes the in-memory part of the file to be written to disk.

    The Infamous prepare_commit_mutex

    Figure 2. Missing un-committed transactions
    When the server recovers, it has access to the binary log and can therefore decide what to commit and what to rollback, but what if there is no binary log?

    In the general case, a recovery can just roll back all prepared transactions and start again. After all, the transactions that were just prepared but not committed are safe to roll back. They just move the database to the state it had just before starting those transactions. Any clients being connected has not got an indication that the transaction is committed, so they will realize that the transactions have to be re-executed.

    There is another case where recovery is being used in this way and that is when using on-line backup methods such as InnoDB Hot Backup (which is used in the MySQL Enterprise Backup). These tools take a copy of the database files and InnoDB transaction logs directly—which is an easy way to take a backup—but it means that the transaction logs contain transactions that have just been prepared. On recovery, they roll back all the transactions and have a database in a consistent state.

    Since these on-line backup methods are often used to bootstrap new slaves, the binary log position of the last committed transaction is written in the header of the InnoDB redo log. On recovery, the recovery program print the binary log position of the last committed transaction and you can use this information with the CHANGE MASTER command to start replicating from the correct position. For this to work correctly, it is necessary that all the transactions are committed in the same order as they are written to the binary log. If they are not, there can be "holes" where some transactions are written to the binary log, but not yet committed, which cause the slave to miss transactions that were not committed. The problematic case that can arise is what you see in Figure 3 below.

    Figure 3. Committing in parallel
    You can see an example of this in Figure 2, where replication will start from the last committed position, but there is a transaction that were just prepared and hence was rolled back when the backup was restored on the slave.

    To solve this, InnoDB added a mutex called the prepare_commit_mutex that was taken when preparing a transaction and released when committing the transaction. This is a simple solution to the problem, but causes some problems that we will get to in a minute. Basically, the prepare_commit_mutex solve the problem by forcing the call sequence to be as in Figure 4.

    Figure 4. Sequencing transactions

    Steady as she goes... NOT!

    Since disk writes are slow, writing every transaction to disk will affect performance quite a lot... well, actually very much...

    To try to handle that, there is a server option sync_binlog that can be set to how often the binary log should be written to disk. If it is set to 0, the operating system will decide on when the file pages should be written to disk, if it is set to 1, then fsync() will be called after every transaction being written to the binary log. In general, if you set sync_binlog to N, you can at most lose N-1 transactions, so in practice there are just two useful settings: sync_binlog=0 means that you accept that some transactions can be lost and handle it some other way, and sync_binlog=1 means that you do not accept to lose any transactions at all. You could of course set it to some other value to get something in between, but in reality you can either handle transaction loss or not.

    To improve performance, the common case is to bundle many writes with each sync: this is what the operating system does, and that is what we should be able to do. However, if you look at Figure 4 you see that there is no way to place a fsync() call in that sequence so that several transactions are written to disk at the same time. Why? Because at any point in that sequence there is at most one prepared and written transaction. However, if you go back to Figure 3, you can see that it would be possible to place an fsync() as shown and write several transactions to disk at the same time. If it was possible, then all transactions written to the binary log before the fsync() call would be written to disk at once. But this means that it is necessary to order the commits in the same order as the writes without using the prepare_commit_mutex.

    So, how does all this work then...

    The binary log group commit implementation used split the commit procedure into several stages as you can see in Figure 5. The stages are entirely internal to the binary log commit procedure and does not affect anything else. In theory, it would be possible to have another replication implementation with another policy for ordering commits. Since the commit procedure is separated into stages, there can be several threads processing transactions at the same time, which also improves throughput.
    Figure 5. Commit Procedure Stages
    For each stage, there is an input queue where sessions queue up for processing. If a thread registers in an empty queue, it is considered the stage leader otherwise, the session is a follower. Stage leaders will bring all the threads in the queue through the stage and then register the leader and all followers for the next stage. Followers will move off to the side and wait for a leader to signal that the entire commit is done. Since it is possible that a leader registers to a non-empty queue, a leader can decide to become a follower and go off waiting as well, but a follower can never become a leader.

    When a leader enters a stage, it will grab the entire queue in one go and process it in order according to the stage. After the queue is grabbed, other sessions can register for the stage while the leader processes the old queue.

    In the flush stage, all the threads that registered will have their caches written to the binary log. Since all the transactions are written to an internal I/O cache, the last part of the stage is writing the memory cache to disk (which means it is written to the file pages, also in memory).

    In the sync stage, the binary log is synced to disk according to the settings of sync_binlog. If sync_binlog=1 all sessions that were flushed in the flush stage will be synced to disk each time.

    In the commit stage, the sessions will that registered for the stage will be committed in the engine in the order they registered, all work is here done by the stage leader. Since order is preserved in each stage of the commit procedure, the writes and the commits will be made in the same order.

    After the commit stage has finished executing, all threads that were in the queue for the commit stage will be marked as done and will be signaled that they can continue. Each session will then return from the commit procedure and continue executing.

    Thanks to the fact that the leader registers for the next queue and is ready to become a follower, the stage that is slowest will accumulate the most work. This is typically the sync stage, at least for normal hard disks. However, it is critical to fill the flush stage with as many transactions as possible, so the flush stage is treated a little special.

    In the flush stage, the leader will skim the the sessions one by one from the flush queue (the input queue for the flush stage). As long as the last session was not remove the from the queue, or the first session was unqueued more than binlog_max_flush_queue_time microseconds ago, this process will continue. There are two different conditions that can stop the process:

    • If the queue is empty, the leader immediately advanced to the next stage and registers all sessions processed to the sync stage queue.
    • If the timeout was reached, the entire queue is grabbed and the sessions transaction caches are flushed (as before). The leader then advance to the sync stage.
    Figure 6. Comparing 5.6.5 and 5.6 June labs release

    Performance, performance, performance...

    I'm sure you all wonder what the improvements are, so without further delay, let's have a look at the results of some benchmarks we have done on the labs tree. There has been several improvements that is not related to the binary log, so I will just focus on the results involving the binary log. In Figure 6 you see a benchmark comparing the 5.6.5 release with the 5.6 June labs release using the binary log. These benchmarks were executed on an 2.00 GHz 48-core Intel® Xeon® 7540 with 512 GiB memory and using SSD disks.

    As you can see, the throughput has increased tremendously, with increases ranging from a little less than 2 and approaching 4 times that of 5.6.5. To a large extent, the improvements are in the server itself, but what is interesting is that with binary log group commit, the server is able to keep the pace. Even with sync_binlog=0 on 5.6.5 and sync_binlog=1 on the 5.6 labs release, the 5.6 labs release outperforms 5.6.5 by a big margin.

    Another interesting aspect is that even with sync_binlog=1 the server performs nearly as well when using sync_binlog=0. On higher number of connections (roughly more than 50), the difference in throughput is varying between 0% [sic] and with a more typical throughput between 5% and 10%. However, there is a drop of roughly 20% in the lower range. This looks very strange, especially in the light that the performance is almost equal in the higher range, so what is causing that drop and is there anything that can be done about it?

    Figure 7. Benchmark of Binary Log Group Commit
    The answer comes from some internal benchmarks done while developing the feature. For these tests we were using Sysbench on a 64-core Intel® Xeon® X7560 running at 2.27GHz with 126 GB memory and a HDD.

    In the benchmarks that you can see in Figure 7 the enhanced version of 5.6 with and without the binary log group commit is compared. The enhanced version of 5.6 include some optimizations to improve performance that are not available in the latest 5.6 DMR, but most are available in the labs tree. However, these are benchmarks done while developing, so it is not really possible to compare them with Figure 6 above, but it will help understand why we have a 20% drop at the lower number of connections.

    The bottom line in Figure 7 is the enhanced 5.6 branch without binary log group commit and using sync_binlog=1, which does not scale very well. (This is nothing new, and is why implementing binary log group commit is a good idea.) Note that even at sync_binlog=0 the staged commit architecture scale better than the old implementation. If you look at the other lines in the figure, you can see that even when the enhanced 5.6 server is running with sync_binlog=0, the binary log group commit implementation outperforms the enhanced 5.6 branch at roughly 105 simultaneous threads with sync_binlog=1.

    Also note that the difference between sync_binlog=1 and sync_binlog=0 is diminishing as the number of simultaneous connections is increased, to vanish completely at roughly 160 simultaneous connections. We haven't made a deep analysis of this, but while using the performance schema to analyze the performance of each individual stage, we noted that the sync time was completely dominating the performance (no surprise there, just giving the background), and that all available transactions "piled up" in the sync stage queue. Since each connection can at most have one ongoing transaction, it means that at 32 connections, there can never be more than 32 transactions in the queue. As a matter of fact, one can expect that over a long run, roughly half of the connections are in the queue and half of the connections are inside the sync stage (this was also confirmed in the measurements mentioned above), so at lower number of connections it is just not possible to fill the queue enough to utilize the system efficiently.

    The conclusion is that reducing the sync time would probably make the difference between sync_binlog=0 and sync_binlog=1 smaller even on low number of connections. We didn't do any benchmarks using disks with battery-backed caches (which should reduce the sync time significantly, if not entirely eliminates it), but it would be really interesting to see the effect of that on performance.

    Summary and closing remarks

    • The binary logging code has been simplified and optimized, leading to improved performance even when using sync_binlog=0.
    • The prepare_commit_mutex is removed from the code and instead the server orders transactions correctly.
    • Transactions can be written and committed as groups without losing any transactions, giving around 3 times improvement in performance on both sync_binlog=1 and sync_binlog=0.
    • The difference between sync_binlog=0 and sync_binlog=1 is small and reduces as the load increases on the system.
    • Existing storage engines benefit from binary log group commit since there are no changes to the handlerton interface.
    Binary log group commit is one of a range of important new enhancements to replication in MySQL 5.6, including global transaction IDs (GTIDs), multi-threaded slave, crash safe slave and binary log, replication event checksums, and some more. You can learn more about all of these from our DevZone article:

    You can also try out binary log group commit today by downloading the latest MySQL 5.6 build that is available on labs.mysql.com

    Related links

    • It all started with this post where Mark points out the problem and show some results of their implementation.
    • The next year, Kristian Nielsen implemented binary log group commit for MariaDB and has a lot of good posts on the technical challenges in implementing it. This implementation is using an atomic queue and does flushing and syncing of the transactions as a batch, after which the sessions are signalled in order and commit their transactions.