Skip to content

Internals of Global Replication#

Before we move into a description of how Global Replication works, it is important to understand the internals of asynchronous replication in RonDB.

Global Replication is built on top of MySQL Replication. There are some major differences to how RonDB uses MySQL Replication compared to how other storage engines use it. In other storage engines the events that are written into the binlog (the MySQL replication log) in the same MySQL Server where the write occurs.

This is not possible using RonDB since we can have API nodes that write into RonDB without using a MySQL Server. In addition we could have tens or even hundreds of MySQL Server in one cluster. It isn’t possible to write a log in only one of the MySQL Server where the changes occurs.

Instead we use specialised MySQL Servers that I will call MySQL replication servers. To ensure high availability we strongly recommend to always have at least two such MySQL replication servers in each cluster.

These servers receive events from data nodes and write them into the binlog.

There might even be more pairs to accomodate replication to different clusters or replicating different parts of the database in different directions.

MySQL Replication uses a binlog server that gathers all logs of writes in the database. In the backup cluster a replica server retrieves log records from the binlog server and writes them into the backup cluster.

In this chapter we will focus on how MySQL replication servers communicate with the RonDB data nodes. The communication between MySQL replication servers and setup of those will be covered in the next set of chapters.

Data node triggers#

In the RonDB data node we have a trigger mechanism that is used for a diverse set of purposes. It can be used to maintain local T-tree indexes, it can be used to maintain global unique indexes, it can be used to maintain foreign keys, it is used during online reorganisation of table partitions.

For Global Replication we have asynchronous triggers that gather and send information about committed changes in RonDB data nodes.

Each time a write of a row (update, insert, delete) is committed it can trigger an asynchronous write of the change to a local module called SUMA (SUbscriber MAnagement).

There are multiple ldm threads that can commit row writes. In the RonDB architecture these log messages of a committed row write are called events and the user can get access to those events from an NDB Event API.

The SUMA block is executing in one thread, the rep thread.


As discussed in an earlier chapter we commit operations in groups that are committed to a durable media (files on top of hard drives, SSDs or other persistent media).

These groups are called global checkpoints and occur at configurable intervals, by default 2 seconds. For Global Replication we have one more grouping of transactions into something internally called micro GCPs. The name we use in documentation is epochs. Epochs is a term that was described in the research literature on replication between databases already more than 25 years ago.

By default we have 20 epochs for every global checkpoint. Thus they arrive with 100 milliseconds intervals.

Epoch Buffers#

The SUMA block in each node organises the events in a number of Epoch Buffers. We can have multiple epoch buffers. Epoch buffers are maintained only in memory.

As soon as an epoch is completed we can send the epoch over to the requester of the events. In this case the requester is a MySQL replication server. It could be any other application that uses the NDB Event API. As an example HopsFS uses the NDB Event API to maintain a searchable index in ElasticSearch to files stored in HopsFS.

We work together with the other data nodes in the same node group to send an epoch over to the MySQL replication servers.

When all the MySQL replication servers have acknowledged receiving the epoch from the RonDB data nodes, and the other RonDB data nodes in the node group have acknowledged that it has completed its transfer to the MySQL replication servers. At this point the buffers used by the epoch can be released in the data nodes.

The data nodes in a node group share the responsibility to send the epochs to the MySQL replication servers. If a data node fails the other data node(s) in the same node group must complete the sending of the buffer.

In the figure below we see the protocol to transfer epochs from the data nodes to the MySQL replication servers. In this figure we have 2 MySQL replication servers and two data nodes in the node group. There could be up to 4 data nodes in one node group (max 4 replicas in RonDB). There can be any number of MySQL replication servers although normally it would be 2.


In this figure we only show how one node group transfers its epoch data to the MySQL replication server. All node groups must do this, this is performed independently of each other. Thus as soon as one node group has completed the transfer of epoch n we can release the epoch buffers for this epoch in the nodes in the node group.

As we see in this picture both the data nodes in the node group gather all data for the epoch in its epoch buffers. When the buffer is complete and the transfer of the previous epoch is completed we start sending our part of the epoch buffers to the MySQL replication servers. We send our part to all the MySQL replication servers. The other data node sends all its parts also to all the MySQL replication servers.

When a MySQL replication server have received an epoch buffer from one node group it acknowledges this to the data nodes. The MySQL replication server will send this acknowledgement when it has received the epoch from the entire cluster.

As soon as a data node receives the confirmation from all MySQL replication servers about receiving all epoch data it can release the epoch buffers connected to this epoch.

We need to have sufficiently large epoch buffers to gather epochs even in situations when node crashes slows things down, short bursts of high update rates and a few larger transaction.

These values are configurable through MaxBufferedEpochs that limits the number of epochs we will buffer, by default set to 100, thus a bit more than 10 seconds of buffering. MaxBufferedEpochBytes sets the limit on the buffer sizes for epoch buffers. By default this is set to 25 MBytes. These are important parameters to configure, in a cluster that handle a hundred thousand transactions per second we could write 50 MBytes per second or more and thus it would be a good idea to extend buffer size in this case to 500 MBytes instead.

Running out of epoch buffers means that the replication channel is at risk. Running out of epoch buffers in all data nodes in a node group means that the MySQL replication servers cannot continue replicating and thus the entire replication channel will be lost. In this case the replication channel must be restarted.

Thus it is very important to ensure that the epoch buffers are large enough. Failure of a replication channel will not stop transactions in a cluster from proceeding. Thus operation of the local cluster have higher priority than the event handling.

Send buffers#

There is a lot of data sent from the data nodes to the MySQL replication servers that act as binlog servers. Similarly there is a lot of data sent from MySQL replication servers to the RonDB data nodes when they act as replica appliers.

The send buffers to communicate between MySQL replication servers and NDB data nodes are more likely than others to run out of send buffers.

Send buffer memory can be configured per link between nodes if desirable to change the send buffer for those links to a higher value compared to other nodes.

MySQL replication server internals#

We will skip most of the details on how MySQL replication works. There is a lot of detail about this that can be gathered from the MySQL manual and from many books on MySQL.

RonDB uses a special thread called the binlog injector thread that acts as the thread that writes into the binlog. This runs an eternal loop reading NDB events and inserting those into the binlog files.


When writing into the binlog files a memory buffer is used. To ensure that this writing doesn’t become a bottleneck it is possible to set the binlog cache size through the MySQL option binlog-cache-size. By default this is set to 32 kBytes. It is more or less mandatory to set it higher. Another important parameter to consider setting is the size of the binlog files. If file size is too small there will be too many hickups when files are changing. At the same time it is not good with very large files since then it becomes difficult to purge the binlog files. It is set through the MySQL option max-binlog-size that defaults to 1 GByte which should be ok for most use cases. The MySQL binlog files must be purged manually.

The replica applier uses two main threads. The replica IO thread that reads over the network from the MySQL replication server in the other cluster. It stores the information in relay log files. The replica SQL thread reads from the relay log and creates RonDB transactions to insert one epoch at a time.

Epochs are written into the replica clusters as one transaction. The reason is that this is the only consistency points. In the middle of applying an epoch the database isn’t consistent, to avoid inconsistency problem the entire epoch is written as one transaction.

The relay log files are automatically purged as soon as they have been used.

Limitations of Global Replication#

There are many parts of the replication between clusters that are single threaded. This means that there are limits to how many transactions per second can be handled. The most demanding write applications can get problems in using Global Replication.

With a proper setup of Global Replication it should be possible to reach at least well beyond 100.000 row writes per second.

At the same time many of our users, and even the most demanding users have been able to use the Global Replication. Many found good ways to shard their applications such that they have one cluster per shard.

Scaling Global Replication#

One approach to scale Global Replication is to have separate databases use different replication channels. In Hopsworks we have clusters that are multi-tenant, they have one database per user, this makes it fairly easy to scale replication by assigning databases to a set of replication channels instead of using a single replication channel. In this manner Global Replication can handle millions of row changes per second.

Bottlenecks in Global Replication#

The first potential bottleneck is the rep thread. This thread receives change information from the ldm threads and buffers this information to send it on to the API nodes that have subscribed to those changes. We’ve never seen this become a bottleneck, but to ensure it doesn’t become a bottleneck one can lock this thread to a CPU no one else uses.

The next step in the chain is the MySQL replication server acting as a binlog server. There are two main threads that perform a lot of work in this part. The receive thread for the NDB API and the injector thread. To achieve the best possible throughput here one should use a dedicated MySQL server that uses one cluster connection and the MySQL Server should be locked to 4 CPUs such that both the receive thread and the injector thread can use their own CPU. The other two CPUs are used to send data to the replica MySQL server(s).

To ensure that the binlog injector thread doesn’t perform any of the receive thread activity one should set ndb-recv-thread-activation-threshold=0. This ensures that all receive activity is handled by the NDB API receive thread and not by any other thread at all.

On the replica side it is important to set replica-allow-batching since without this setting the throughput of the replica applier is decreased by a magnitude. Similarly on the replica side it is important to use a dedicated MySQL Server to achieve optimal performance. This server should be locked to 4 CPUs to ensure that the replica IO thread, the replica SQL thread, the NDB API receive thread and send activitity can work in parallel. Also here one should set ndb-recv-thread-activation-threshold=0 to ensure that the replica SQL thread doesn’t have to handle any NDB API receive thread activity.

If one uses one VM per MySQL Server in a cloud setup one should use a VM with 2 CPU cores for the MySQL replication servers to achieve optimal throughput. If they act as both source and replica replication servers we should use 4 CPU cores for optimal throughput.

This is a method to ensure that they are locked to their own CPUs without having to use specific locking to CPU when starting the MySQL Server. Otherwise one can use taskset or numactl on Linux.

In the RonDB data node it is important that there is sufficient amount of tc threads such that the tc thread used to execute the epoch transactions are not sharing the tc thread with too many other activities. This is the least likely performance bottleneck.