Skip to content

Concepts in RonDB#

In the next set of chapters we will cover a number of very important concepts in RonDB. They are not absolutely necessary to understand RonDB from an SQL view, but it certainly helps to understand those matters when designing scalable applications using RonDB.

The first chapter is the replication protocol used in RonDB. It is based on the traditional two-phase commit protocol where we have made it a non-blocking protocol. It is also designed to handle predictable latency while at the same time providing a consistent recovery from cluster crashes.

Many new implementations of DBMSs have opted to use various replication protocols based on quorum and various forms of eventual consistency. In RonDB we provide strong consistency at scale to ensure that application developers do not have to worry about where to send their requests.

The next chapter shows how RonDB routes read and write requests. This is performed in a transparent manner to the application. But to design applications that scales to millions of transactions per second it is essential to understand this chapter.

The development of the cloud in various forms have made data center implementations streamlined such that clouds have several data centers per cloud region. We have designed features in RonDB that takes into account the placement of nodes within a region in a cloud. This ensures that we can improve scalability of cloud installations and also provide the optimal latency of queries executed in a cloud.

Any scalable DBMS will have to consider how the consistency of the data is handled. In RonDB we have designed for easy application development where applications can see their own writes and that the data is immediately visible. All consistency models have different advantages and disadvantages and in the chapter on Concurrency Control we describe how we handle this in RonDB.

The final chapter in this part goes through a feature that is currently unique in the MySQL world, the ability to parallelise query execution. This is a feature with many names, but it is about pushing down joins from the MySQL Server to the data nodes. In this chapter we describe how this is done and the advantages this provides and also the current set of limitations of this feature.

Non-blocking 2PC#

A popular protocol to replicate changes between nodes in a distributed database is the two-phase commit protocol. Recently many systems have implemented other protocols that are more complex and most of them require three phases. The idea with the third phase is to avoid that the protocol is blocking.

In the two-phase commit protocol a node is blocked in the prepare phase until the transaction coordinator have spread its decision on whether the transaction was committed or not.


The block here could mean that nodes have to keep locks for a long time waiting for a node to recover. This is not a desirable feature of a replication protocol.

We have also added a third phase, the complete phase. It is possible to reply to the application already after the second phase. We discussed this in an earlier chapter where we described the Read Backup feature.

Idea behind new two-phase commit protocol#

In RonDB we kept some of the ideas of the two-phase commit protocol, but we also changed a lot of important details in it.

Distributed commit combined with linear commit#

One variant of the two-phase commit protocol is called linear commit protocol. This protocol sends the prepare message from first participant to last participant in serial order. Instead of returning to the transaction coordinator to commit the transaction, the transaction is committed in the last participant. Next the commit message is sent in serial order back to the first participant.


The original two-phase commit protocol uses 4 * n messages to complete a transaction (where n is the number of participant nodes in the transaction. The linear two-phase commit protocol uses 2 * (n - 1) instead. Thus the linear commit protocol saves more than half of the messages. This is very important for throughput.

At the same time latency of the original two-phase commit protocol is shorter. The latency of the original two-phase commit protocol is the latency of two rounds of parallel messages whereas the linear commit protocol gets longer as the number of nodes increases.

Given that we want to achieve scalability AND predictable latency it would be beneficial to use a combination the original protocol and the linear commit protocol.

Before presenting our new protocol, we mention that we use primary copy locking. Thus we need to lock the primary replica row before we lock the backup replica row. Failure to use this order can easily lead to deadlocks that could have been avoided.

Our idea is to use linear commit for one row, but to use the original commit protocol on each row. Thus the number of messages we will send will be 2 * m * (r + 1) (where m is number of rows in transaction set, and r is the number replicas). If we were to use the original commit protocol only we would send m * 4 * r.

An example with m, number of rows set to 10 and number of replicas (r) set to 2 will give 80 messages for original protocol and 60 messages for our new protocol. The latency of our protocol is much better compared to the linear commit protocol. The latency of our protocol is slightly worse to the original commit protocol. Given that we decreased the cost of communication we expect to get this latency loss back at high loads. We use a lot of piggybacking to ensure that the real number of messages is a lot lower.

Description of new protocol#

The figure below shows the new transaction protocol.


The idea is to start the prepare phase by going to the primary replica, this ensures that we minimise the risk of deadlocks. The risk of deadlocks is high in the original two-phase commit protocol since we can lock the replicas in different order for different transactions. This can lead to a deadlock even when the application has ensured a transaction ordering that is free from deadlocks.

The use of linear commit is important not only to minimise number of messages, but also to ensure that we avoid distributed deadlocks that make it very difficult to scale the application even if properly designed.

Next we move to the backup replicas and from there back to the transaction coordinator.

Now when it is time to commit the transaction we use the reverse linear order. The idea with this order is that it makes it possible to release the locks on the primary replica already in the commit phase. This has the advantage that if we route reads to the primary replica we can report transaction commit to the application already after the commit phase and still see our own updates. If we want to read any replica we still have to wait for the complete phase to complete before reporting the commit to the application.

The complete phase releases the locks in the backup replica. It also releases the memory attached to the transaction in the form of copy rows and operation records used to contain transaction state. A certain level of parallelism is used in the transaction commit where several rows can be committed in parallel and likewise for the complete phase.

Achieving a non-blocking commit protocol#

Before we move on to prove that the new protocol is a non-blocking we need to make a few observations.

First, the transaction coordinator is no longer the commit point in the transaction. Instead the commit point is when the first primary replica decides to commit the transaction. Before the commit message arrives at the primary replica it has passed through all backup replicas. Thus at the commit point all replicas have decided on the commit.

All replicas reside within one node group in RonDB, if all nodes within a node group fails, the cluster will also fail. Thus at a node failure that doesn't bring down the cluster we are certain that there is information in the surviving nodes to find out whether the transaction should commit or abort.

Thus we are never blocked by any node failure that doesn't cause a crash of the cluster.

If the cluster crashes completely we will always need recovery before transactions can start again. We can never become blocked in this case. Thus our commit protocol is a non-blocking protocol if we can make use of the information in the transaction participants to complete transactions after node failures.

We implement a take-over protocol to rebuild the transaction state of transactions that lost their transaction coordinator to ensure that we can always decide on commit or abort of any ongoing transactions in the cluster. More on this protocol below.

We have shown that our new two-phase commit protocol is a non-blocking protocol. There is no node failures that can cause transactions to hang waiting for nodes to recover before the transaction can complete.

If nodes are missing to complete the transaction it also means that there is insufficient amount of nodes to keep the cluster up and running. If the cluster crashes it will perform recovery and recover those transactions that are durable also on disk.

Take over protocol#

The take-over protocol asks all nodes in the cluster to send information about any ongoing transaction where the failed node is transaction coordinator. The node asking is the master node that will take over the completion of these transactions.

After receiving all information about a set of transactions the new transaction coordinator will decide whether the transaction should be committed or aborted based on how far it got in the process. If any node heard a commit decision it will be committed. The new transaction coordinator will also inform the API of the transaction outcome.

This protocol will ensure that the transactions ongoing in the failed node, at the time of the crash, are completed before we report the node failure handling as completed. When node failure handling is completed we thus have none of the transactions still lingering in the surviving data nodes, they are all committed or aborted.

Similarly the API nodes have been informed of the transaction outcome with one exception. The exception is when using the read backup feature, in this case the transaction outcome is reported at complete. This means that the only remaining part of the transaction is the transaction coordinator when going to send the commit message. This transaction is handled by the message about the node failure handling being complete. If the transaction coordinator have failed and the node fail handling is completed and we haven't heard anything about transaction outcome, we know that the transaction is committed since this is the only reason for the transaction outcome message to not arrive.

We keep special commit ack information in the data nodes to ensure that the API can always be informed of the transaction outcome if the API itself survives.

Global checkpoint protocol#

RonDB is designed for applications that require latencies that cannot be achieved when writing to a hard drive as part of the commit protocol. SSDs wasn't around at the time NDB was designed.

RonDB uses Network Durability to implement the D in the term ACID (Atomic, Consistent, Isolated, Durable). Thus at commit time the transaction is committed in memory of multiple computers, thus it can survive any single point of failure of an entire computer and in many cases even multiple node failures.

First phase of global checkpoint protocol#

Some mechanism is also required to ensure that the cluster recovers to a consistent state also after a complete cluster crash. It is not sufficient to simply write out the transaction logs to disk as soon as possible. To recover a consistent state of the database we need something more as well.

What we implemented here is a sort of group commit transaction. We call this group commit a global checkpoint (GCP). Each GCP is a consistent recoverable point at a cluster crash. We also use micro GCPs. These are not recoverable after a crash, they form epochs used to send batches of transaction over to a slave cluster.

We use global checkpoints for many things in our recovery mechanisms. Each time a transaction is about to commit it asks for a global checkpoint id (GCI). Normally it gets one of those immediately. If we are in the process of starting a new GCP there could be a small wait.

The process to start a new global checkpoint is the following. The master data node decides to create a new global checkpoint. It sends a message called GCP_PREPARE to all nodes in the cluster (including itself). It responds with a GCP_PREPARECONF message, next the master data node sends GCP_COMMIT to all nodes.

At reception of the GCP_PREPARE message the nodes starts blocking committers from proceeding. All the prepare phase activities are allowed to continue which means that the impact on throughput and latency is minimal. Impact on throughput is low since there will be more room for prepare messages, thus we can still keep the CPUs busy. Impact on latency is low since it only introduces an extra delay of a two-phase message and given that committers are blocked a little bit more CPU is available to handle this message faster. After 15 years of operation of the cluster we still have had no issues with this initial phase of the global checkpoint protocol. One more reason is that queueing the commits for a short time introduces a sort of batching of commits that improves throughput.

At reception of GCP_COMMIT the nodes will immediately resume the commit of the stalled transactions.

Now that we have created a new global checkpoint we have also at the same time completed the previous one. Given the two-phased approach we are certain that we get a proper serialisation point between the global checkpoints. We know that a global checkpoint contains a set of transactions and all these will either be recovered completely or not at all.

This means that when we recover a node we use the GCI to recover to know which transactions to restore.

Each COMMIT record in the transaction log (in our case only a REDO log) is tagged with the GCI of its transaction. We also use the GCI to make synchronisation of a starting node and a living node easy. We recover the starting node to a certain GCI, after that only the rows in the living node with a higher GCI need to be sent to the starting node to synchronize the rows in the nodes.

Second phase of global checkpoint protocol#

The second phase makes the completed GCI durable on disk. This phase starts already when receiving GCP_COMMIT.

The first step is that each node needs to wait until the transaction coordinators have completed the processing of all transactions belonging to the completed GCI. Normally this is fast since it mostly involves networking messages and memory operations. Large transactions have the possibility to increase the time for this phase. Commits of transactions changing many rows with disk columns can represent a challenge.

When all transactions in the GCP in the node are completed we will send the message GCP_NODEFINISH back to the master data node.

When all nodes have finished the commit phase of the global checkpoint protocol the next phase is to flush the REDO logs in all nodes. The message GCP_SAVEREQ is sent to all nodes to ask them to flush the REDO log up to the GCI. The REDO log implementation need to keep track of where the last commit message for this GCI is stored to know how much REDO log needs to be flushed.

When the flush of the REDO logs is complete the nodes respond with GCP_SAVECONF.

At this point there is enough information to recover this GCI. To simplify the recovery we use one more phase where we write a small system file that contains the recoverable GCI. This is accomplished by sending COPY_GCIREQ to all nodes that will respond with COPY_GCICONF when done.


Handling cluster crash#

At a cluster crash we will use the information from the system file to discover which GCI to restore. To restore the data we will first install a local checkpoint. This local checkpoint will know how far back in the REDO log we need to start to ensure that we recover the data. More on the details of this in a later chapter.

Next we execute the REDO log executing all log records belonging to restored tables and with GCI smaller or equal to the restored GCI.

After that we have restored a consistent database, before the recovery is completed we execute a local checkpoint to avoid strange issues with multiple node crashes close to each other.

Impact of Read Backup feature#

As mentioned the Read Backup feature changes the protocol slightly in that the response to the API is done after the complete phase instead of after the commit phase. Thus making the protocol into a true three-phase commit protocol. Only when reading primary replicas can we use the optimisation to respond already after the second phase of the commit protocol.


The RonDB transaction protocol is a bit involved since we have many different requirements on it. The requirements are:

  1. Avoid deadlocks using primary copy locking

  2. The protocol must be non-blocking

  3. It must be able to handle multiple node failures in all protocols

  4. It must be able to handle multiple node failures in node failure handling

  5. Minimise overhead of transaction protocol by decreasing amount of messages

  6. Avoid writing to disk as part of commit phase to achieve predictable latency

  7. Restore a consistent database after a cluster crash

  8. Handle node restarts

Important to understand here is how the transaction protocol and the global checkpoint protocol is actually both parts of the implementation of transactions.

What we have done is that we first use a special two-phase commit protocol to ensure that we achieve Network Durability (committed in memory of several computers before sending commit message to the application).

Next we perform a group commit of a set of transactions and make those transactions durable.

Interestingly although I invented the ideas, it is really useful to describe those protocols to fully understand how the protocols are interconnected. The ideas evolved over a period of 25 years, so even for me it is good to go back and revisit the basic requirements to understand why we have the set of protocols we have.

Most DBMSs integrate everything into one commit protocol and possibly do some group commit optimisations. For RonDB this wasn't an option since the telecom database requirements meant that we could not wait for disk during the commit processing. To provide the durability without using disk we made use of the Network Durability concept instead that commits in memory of several computers.

At the same we still needed to recover something also after a complete cluster crash. We needed a protocol to make the transactions durable also on disk. Also this disk durable state needed to be consistent and thus the global checkpoint protocol was invented.

Routing of reads and writes#

An important part of a distributed DBMS is how to route reads and writes within the various nodes in the distributed DBMS.

In RonDB we distribute data using a hash algorithm using a hash map. Each partition of each table is fully replicated within one node group of the cluster. Each partition is also located within one ldm thread within the nodes in this node group. The location of data is decided once we have created a table.

However reads can be routed differently if we are using the Read Backup feature or if we are using the fully replicated feature.

The transaction coordinator can be placed in any node in the cluster. Any MySQL Server can be choosen to execute a query in RonDB.

Choosing the MySQL Server#

With RonDB every MySQL Server is capable of handling each transaction. There is no specific need to choose a special MySQL Server for read and write transactions. We have special MySQL Servers for replicating between clusters and it is possible to have special MySQL Servers for handling meta data changes (although not required). But there is no need to use a special MySQL Server for normal read and write transactions.

The natural choice here is some sort of normal round robin scheme. There are many methods to implement the handling of this round robin scheme. One method is to use methods in the MySQL APIs. For example the JDBC API towards MySQL supports listing many different MySQL Server and performing a round robin on them.

The second option is to use the MySQL Router.

One more option is to use a standard load balancer in front of the MySQL Server. Thus the set of MySQL Servers used have a single IP address and a single port to access. In e.g. the Oracle Infrastructure Cloud this is a normal service that you can connect to your infrastructure. It is possible to set this load balancer up such that it is highly available.

If you are developing an application that use the NDB API, it is necessary to have a similar mechanism on top of your application nodes.

From a MySQL client we have no way of guessing the best MySQL Server to use. This requires knowledge only available through the NDB API.

The principle to use to get best scaling is to spread the load among the MySQL Servers in the cluster.

Choosing the node of the transaction coordinator#

It is possible to choose any node as transaction coordinator. But there is great value in choosing a transaction coordinator that is close to the data of the transaction. The principle of choosing transaction coordinator based on locality means that we are minimising the latency of the transaction and avoiding as much as possible oversubscribed network links that can cause bottlenecks.

Choosing at NDB API level#

Using the NDB API it is possible to provide a hint about where to place the transaction coordinator. The hint we provide is the table object and the primary key to use for access. Provided the primary key and the table object we can calculate the partition identity and through this and the table object we can deduce the nodes where the replicas are stored. This is not exact knowledge since e.g. an ALTER TABLE REORGANIZE can change this on the fly without immediately informing the NDB API. It is only a hint, the impact of an erroneus hint is that the transaction will take slightly longer time to execute.

A key hint provided to start the transaction#

We will select a node from one of the data nodes that have a replica. The selection depends on a few things, it depends on if we are using the read backup feature, it depends on if we are using the fully replicated feature.

If the table have the read backup feature we can select any of the data nodes with a replica. We will select the one that is considered most close. This is using a configuration parameter called Group on each transporter (this parameter should normally not be set by user). By default on a TCP transporter this is set to 55. When we start an API and the transporter is an TCP transporter we will check if the TCP transporter can use a localhost connection. If it can it drops the Group down to 54. We can change the Group all the way down to 5 if we have set the node to be our neighbour. This is done by setting the configuration parameter for the MySQL Server --ndb-data-node-neighbour to the node id of our closest neighbour.

All of this assumes that we have set --ndb-optimized-node-selection to 3 which is the default value. This parameter is there to be able to get backwards compatible behaviour with older versions. This parameter should normally not be changed.

In short we will select the node among the replicas and we will prioritize first our data node neighbour, next any node that can connect using localhost, and finally any other node.

If the table is fully replicated we will perform the same algorithm, but this time using all nodes that have a replica (normally all data nodes in the cluster).

No key was provided#

When no key is provided we have no hints, we will select a node based on the closeness of the node as described above. This time the nodes to choose among is the full set of data nodes in the cluster.

Choosing at the MySQL Server level#

From a MySQL Server we have integrated the hint mechanism of the NDB API in the following manner. When starting a transaction the first query will decide the key to provide in the hint. If the primary key of a table is given in the first query we will use this as the key in the hint mechanism. For range scans, full table scans and pushdown joins we will not have any key provided.

Choosing the tc thread#

The tc thread is always choosen after the node have been choosen using a round robin scheme.

Choosing the data node for reading#

For tables without the read backup feature we always use the primary replica to read with the exception for some BLOB reads as explained in the BLOB chapter.

For tables with the read backup feature we first attempt to read from a replica residing in the same data node as the transaction coordinator is placed in. Otherwise we always read the primary replica.

Choosing the ldm thread#

The ldm thread is decided by the row used. We will use the same ldm in all nodes in the node group for a certain partition. Thus there is nothing to choose here.

Choosing the data nodes for writing#

This is completely determined by the row to write. We will always write all available replicas and these replicas have a strict order where one replica is primary replica and the other are backup replicas that are organised in a certain order. Thus there is nothing to choose here.

Concurrency Control#

For an application developer it is important to understand the concurrency control model used by a DBMS. There are two approaches to concurrency control, there is an optimistic approach and there is a pessimistic approach. The optimistic approach is used when low number of conflicts between changes are expected. The optimistic approach goes ahead and does the updates without checking for other updates. At commit time the transactions are checked for conflicts, if any two transactions are in conflict one of them has to be aborted.

The pessimistic approach tries to ensure, already before commit, that we don't cause any concurrent actions on the same object to occur. Most commonly this employs a method called strict two-phase locking. This means that there is first a lock phase where the applications grab their locks and after getting all locks the transaction is committed (or aborted) whereafter all locks are released whereafter no one is allowed to lock any more rows in this particular transaction.

The rule of strict two-phase locking is that no locks are taken after the first row has been unlocked.

The pessimistic approach have much higher chance of handling a highly concurrent system better.

There is a great variety of different methods for both optimistic and pessimistic algorithms. Most of them is based on either locking or some timestamp method. It is one of the most well researched topics in the area of DBMSs.

So what to use? We opted for the pessimistic approach. The main reason for this was a number of factors. I read many, many research articles on the topic of concurrency control and there are tons of research papers in this area. The concurrency control mechanism used in most DBMSs is the strict two-phase locking.

The optimistic approach have a serious impact on application development. Any DBMS using the optimistic approach have to handle many more aborted transactions. In the pessimistic approach the only aborted transactions that comes about from concurrency control is due to deadlocks. These are much less frequent than conflicts. A conflict using the pessimistic approach causes a delay, but not an aborted transaction. Applications using the optimistic approach have to be written with the understanding that aborts are common, this makes it even more difficult to port an application to a new DBMS than otherwise would be the case (it is difficult enough as it is).

In addition the optimistic approach has the disadvantage that the CPU cost per transaction increases as load increases and the chance of conflicts increase.

Using the optimistic approach would have had an impact on all recovery algorithms. It is a lot easier to reason about recovery algorithms when using locks compared to when using the optimistic approach. Given that our focus is online algorithms for all metadata changes, it made a lot of sense to try to use simpler algorithms. As an example the ability to lock a row for a short time is used in the node restart algorithm when synchronising the live node with the starting node. This short lock makes it very easy to reason about the node restart algorithm.

For these reasons we opted to use the standard strict two-phase locking approach that acquires locks in the prepare phase and releases the locks in the commit phase and continues to release the locks in the complete phase.

Row locking#

The approach in RonDB is to use row locking. We always access records using a hash index when any type of lock is needed. We can scan using the hash index, but we can also scan in row order and in disk data order. In the two latter cases we have to ask the hash index for a lock on the row for each row we scan when a lock is needed.

In the hash index we implemented our row locks. If a scan comes from an ordered index and needs to lock the row it will use the hash index to get the lock. Most DBMS use a hash index to implement the lock data structure, in our case this hash index is also used as an access data structure, not just to handle locks.

RonDB has no support for range locks. This is required to fully support serialisability. Thus we support serialisability on everything except range queries. Our analysis of the applications that RonDB focused on suggested that serialisable mode for range queries was of limited use in the analysed applications.

Supporting range locks in a distributed database imposes serious scalability problems. Locking a range using hash partitioning requires locking the range in all nodes and the alternative to use range partitioning only supports range locks on the columns in the partition key. Range partitioning is much harder to make scalable in a distributed system. This is a major reason why most classic SQL DBMSs use a shared disk implementation to scale the DBMS. Shared nothing have a number of algorithms that are harder to implement such as range locking, deadlock handling and so forth.

The choice between shared nothing and shared disk is a trade off where you have to select what matters most to you. When implementing RonDB what mattered most was the time to handle a node failure. With shared nothing we were able to get this down to a small number of seconds and even below a second in a real-time environment. For shared disk it is hard to get this number down this far given that all nodes do not have access to the latest updates, the latest updates can only be retrieved from the logs. Thus shared disk can never be as fast to recover as shared nothing.

Another benefit of shared nothing is that it doesn't require a shared nothing file system. Interestingly all shared disk implementations are implemented on top of a file system that is more or less a shared nothing database. This is why it makes sense to build a file system on top of RonDB.

When developing RonDB we decided to go for shared nothing and thus decided that we will not support range locks in all situations.

If the application has special needs to lock certain parts of the database, it is possible to implement this by ensuring that the access to those parts always go through a specific record. Thus special records can be used in the same fashion as a read-write mutex is used in lower-level designs.

In essence it is possible to handle all locking needs using RonDB, but it might require a bit more work on the application level to get the most fancy lock variants.

A good example of this is HopsFS that had a special problem where they needed to support the mv command. This moves a whole hierarchy of directories. This was implemented by performing the move step by step and using application level locks while doing so. Every DBMS is implemented for a sweet spot and the engineers have to decide on which requirement is the most important for them. The aim of RonDB is to meet the highest availability requirements and at the same time extreme performance requirements for key lookups. We are aiming long-term to provide a very good parallelisation of complex queries since this is such a good fit in the RonDB architecture.

For the same reasons no sharded NoSQL system supports range locks. Since RonDB is a distributed system that automatically supports up to 24 shards using 2 replicas, it is a natural choice to not support range locks in RonDB.

In conclusion all rows in RonDB can be locked either exclusively (when updating or when using exclusive lock mode for reads) or locked for reads. It is possible to read the latest committed value without using locks.

Consistency Models#

DBMSs mainly uses two different consistency models. The first model is the one implemented in RonDB that performs read using read committed mode. The second popular consistency model in DBMS is to use repeatable read mode. We will explain those two variants, they both provide a consistency level that uses transactions to update the data.

Most of the NoSQL systems employs something called eventual consistency. Eventual consistency means that updates are not transactional, eventual consistency is more about replicating changes. In NoSQL systems all updates are first employed on a master node, after that various methods are used to replicate those changes to other nodes in the system.

Eventual consistency can not be combined with distributed transactions. One can keep the locks in the master node for such a long time that the changes are also employed in the nodes replicated to. But the term eventual consistency only promise that each node replicated to will eventually be consistent with the master node. Thus the only node where you can see the latest changes are in the master node.

InnoDB Cluster, Galera Cluster, MongoDB are examples of systems that uses eventual consistency. In InnoDB Cluster the other nodes have received the logs of the changes when the transaction is committed, but the other nodes have not yet updated the actual data at commit time. Reads that are sent to other nodes than the master node will not see the latest writes.

Thus a major difference between systems employing eventual consistency is that the application have to know in detail what the concurrency implications are due to the eventual consistency. This makes it a lot harder to implement a scalable application using NoSQL systems compared to using RonDB.

We will now try to explain how read committed differs from repeatable read and why the choice for RonDB to use the read committed mode was a natural choice.

Repeatable read means that the query results will be the result that would result if the database was frozen at the start time of the query. This means that the query can be repeated again and again and the result remains the same. During the query execution there is no locks on the database, what happens is rather that the DBMS have to save the old row until the query is completed.

This have two consequences. The first consequence is that the database size will grow, rows have to be kept until all queries have completed that was started at the time when the row change happened. The combination of long-running queries and high update rates will cause a high increase of storage needs. With RonDB we have a focus on in-memory data, thus we would get too much RAM used for old rows.

Additionally to use repeatable read requires adding a timestamp to each row, thus even more use of memory resources. It adds CPU overhead to handle all those extra old rows although this is not a major issue.

The second part is that repeatable read will not deliver results that is based on the latest changes. Thus repeatable read is completely focused on delivering consistency at the cost of recentness. The read committed mode focuses on delivering the most recent changes at the cost of complete consistency of the result.

Delivering a completely consistent result and up-to-date result can only be achieved with massive locks in the database, thus this is not an alternative for any scalable application.

So which mode is most appropriate is dependent on the application. But for the type of applications that RonDB was developed for, the read committed mode is the preferred one.

Read Committed mode#

The model used in RonDB is called read committed mode. Thus we will read the latest committed row. There could be one or two row versions at any point in time (except for transactions that update the same row many times). There is one row when the row isn't currently updated and there is two rows when a transaction is currently changing the row.

A read committed read will read the old committed rows in cases where the user is currently updating the row. These reads are lock-free.

When performing a read of the database in read committed mode we will always read the latest committed row, so even when running a long-running query the result of the query will be impacted by changes in the database.

Locking and Unique indexes#

Unique indexes are simply a special table that have the unique key columns as primary key and the primary key of the main table is the remaining columns. Thus we can with one key lookup translate the unique key to the primary key.

When we read the unique key we use a shared lock to ensure that no one is allowed to delete or insert the row we are looking for while we are performing this translation. As soon as we have found or completed the read of the main table we can release the lock on the unique index row. The NDB storage engine has special code to ensure that we release those unique key locks immediately after returning from the read/write of the main table.

We need to use this shared lock also when the read of the main table is using the read committed mode.

Reading through the unique index and updating the main table can happen in parallel. Given that those comes from different directions it is possible for them to cause a deadlock. These are normally rare, but frequent inserts and deletes of rows in a table using a unique key and at the same time reading this table using the unique key can cause deadlocks to occur. Deadlocks cause no harm other than timing out, but constitutes a problem to consider to achieve predictable latency of an application.

The best use of unique keys is to use them primarily for constraint checks. In this case they are only used during inserts and updates to ensure that only row with a given unique key exists. In this case we cannot have deadlocks due to reads using unique key since we never use such reads.

Locking and BLOB tables#

Locking for BLOB tables is a bit special since the BLOB consists of multiple rows in RonDB. The first row accessed is always the main row, any changes of any BLOB requires holding an exclusive lock on both the main row (even if no part of it is updated) and on the BLOB parts that are updated.

Reading of a BLOB is a bit special, especially when using the READ COMMITTED mode. Normally this mode requires no locks for reading since we always ensure that the read is reading the latest committed values. For BLOBs this method doesn't work since reading rows with BLOBs requires reading multiple rows.

The manner used here is that we take a shared lock on the main row of the BLOB table. Next we read the BLOB parts one by one. By holding the shared lock on the main row we ensure that no one is allowed to change any BLOB parts while we are reading the BLOB parts.

In addition the reads on the BLOB parts use a special lock mode when reading the BLOB parts. This mode takes a lock while reading the row, but releases the lock before sending the result back to the NDB API. This has the effect that we ensure that the read is consistent, if we would have read the BLOB parts using the latest committed we could read an old version of the BLOB through a race condition.

The short-lived lock is almost certain to be successfully acquired immediately, but in rare situations it might have to wait for some commit messages from an old transaction to arrive before it can start the read.

The effect of this is that BLOB tables have less concurrency than ordinary tables, but they will still be consistently read and updated.

Another positive effect of the above is that reads of BLOB tables are always done using any replica independent of the table mode for READ_BACKUP.

Parallel Query#

Many SQL queries are performed with some amount of parallelism in RonDB. It happens when the SQL queries uses scans that are not limited to scanning a single partition.

It can happen as part of table scans, it can also happen as part of more complex scans that are linked together. These linked scans is a variant of a parallel join algorithm. The scanning of data in the data nodes can be parallelised to the same level as the number of ldm threads in the cluster. The result rows are sent to the single MySQL Server that has a single user thread that is executing the query, but much of the filtering of the rows can be parallelised.

This parallel join algorithm is a bit more advanced compared to a parallel nested loop join. The best descriptive name for it is to call it a parallel linked scan algorithm.

Full table scan in RonDB#

Scans comes in a number of flavors, the first one is the full table scan. It will read each and every row in the table and apply a search filter on each row to see if it should be returned to the application. These scans are automatically parallelised such that they execute on a set of partitions in parallel. For example if the table contains 1 million rows and is divided into 8 partitions, eight concurrent scans will be started that each will scan around 125k rows. Conditions from the SQL query can be pushed to this scan such that only a subset of the rows is returned to the MySQL Server. When the row returns to the MySQL Server it has to be evaluated by the single thread that the SQL query is executed within, but during the table scan we can have up to eight CPUs running in parallel on filtering out the rows to be returned. This happens automatically, no special action is needed to make this happen, it is the default behaviour.

We will go through in more details later what conditions that can be pushed down as part of a scan (the same condition pushdown can be applied on a primary key or unique key request to flag if the read row is to be returned to the application).

Range scan in RonDB#

Next the same thing applies to range scans. Ordered indexes in RonDB are implemented per partition. Thus in order to perform a range scan the same scan algorithm is used as for full table scan except that we scan in index order between two boundaries. RonDB can also start multi-range scans using one scan operation. These scans can also have conditions pushed down and the scan can be parallelised.

Partition pruned range scans#

We have a special case which is very important, this is the partition pruned range scans. In this case the range scan is not performed on all partitions, it is rather performed only on one partition. There is an easy condition for when this pruned partition scan happens. As part of the condition the partition key must be fully set. For example in TPC-C there are lots of tables that contain a warehouse id. One manner to set up the tables in TPC-C is that all tables use the warehouse id as the partition key. Now any query that has a condition that contains warehouse_id = variable/constant will now be able to perform a partition pruned range scan. By retrieving the warehouse_id from the query we can figure out which partition to scan. We know that the data cannot be in any other partition and we can thus prune away all the other partitions.

This technique is what almost all Big Data applications is using when they are sharding. With RonDB it is possible to set up the cluster tables to be easily sharded. If all or at least most of the tables in a database or application uses a common partition key this becomes a sharding key of the application inside RonDB.

As an example in HopsFS where a file system meta data layer was implemented a clever partition key was selected that made it possible to ensure that almost all scan queries, e.g. the scan used to perform ls (list the files in a directory in Unix) was always directed to only one partition and thus no extra overhead was created in starting up many parallel scans since each parallel scan only accessed one partition each.

Pushdown joins#

In a number of situations we want to perform queries that span the entire data set. In this case we are most likely analysing the data and we are scanning major portions of the data to find some specific property of the data. In this case the fact that scans are automatically parallelised is good and decreases the latency significantly in processing complex queries.

In the descriptions below we use the term scan, but often a scan is a key lookup. In the presentation below this would simply be a special scan that can return 0 or 1 rows.

Now in RonDB we have made another step forward. We implement a technique where these scans can be linked. Say that we start with a scan of table t1, this scan provides 10.000 rows. Now we want to join the table t1 with the table t2, from each of those 10.000 rows we get a key that is used in the condition for table t2 and we perform a new index scan for each of those 10.000 rows. This index scan can also be a key lookup, it can also be a full table scan and the scan can also be partition pruned such that it only scans one partition. Let's say that each of those 10.000 scans now gives us 3 more rows back, thus we now have 30.000 rows from t2. Now we can continue and use data from table t2 to join it with a table t3 in exactly the same manner.

Using this technique we can execute arbitrarily complex queries. In our testing of this we've used heavily a query that we got from an early user of RonDB. In the early days of RonDB this query took something like 6 seconds to execute (in those days MySQL had almost no batching techniques, the query was more or less executed one row at a time). Now in RonDB using the normal technique of single table scans performed in sequence and always returning to the MySQL Server the query takes 1.2 seconds to execute in a very small cluster with one data node and one LDM instance. Using condition pushdown we are able to avoid much of the interaction with the MySQL Server and most of the filtering is done before sending it to the MySQL Server. Using these techniques the query can be executed in 0.2 seconds instead. With a few more ldm threads, e.g. 4 of them the query is executed in 50 milliseconds. This query scans and analyses many tens of thousands of rows as part of the query execution, mostly in the data node.

One can perform more odd variants of joins using this technique, so for example if one starts by reading table t1 and based on the data retrieved in table t1 we can immediately start fetching the data in table t2, table t3 and so forth.

Limitations of pushdown joins#

The conditions used in joining tables that are part of the pushed down query must use equality conditions. The columns used in those equality conditions must be of exactly the same data type. It is currently not possible to use pushdown joins if any of the columns retrieved by the query is a BLOB column. Subqueries can sometimes inhibit the pushdown of the join from happening. One can use show warnings after executing the query to discover any problems in handling the pushdown of the join. Likewise the EXPLAIN command can be used to discover exactly how the pushdown join is handled.

Pushdown of joins can only occur in read only queries that are not using any locking constructs such as LOCK IN SHARED MODE or other similar construct. The join execution must be lock-free, so it cannot lock rows as part of the join execution which would be required if it is part of an updating transaction where the result can be used to update rows in a scan takeover.

Details of pushdown joins#

A complex join can be divided into multiple parts that are pushed down to RonDB. As an example assume that the MySQL Server comes up with a plan that does a 5-way join, for each row in the inner join of this 5-way join we have a subquery that is in itself a 4-way join. Now the 5-way join can be handled with a scan on the first table and for each row in the first table we execute a pushed down 4-way join to the NDB API.

Thus even if there are specific parts that makes it impossible to handle the entire query in the RonDB data nodes, we can still send parts of the query at a time down to the RonDB data nodes.

The MySQL Server does the query optimisation before the joins are pushed down to the RonDB data nodes. The query optimisation doesn't take the distribution aspects into account. However once RonDB decides to push a join down to RonDB data nodes it can decide using some heuristics to reorder the join order of the tables.

Aggregation, group by and order by processing is currently not handled by pushdown joins. Thus all the rows to be put into the group by processing must be returned to the MySQL Server as part of the join processing. This makes the MySQL Server user thread a bottleneck for complex queries using group by, at least if many rows are used to calculate the aggregates. The parallel filtering can still make those queries go significantly faster using pushdown join.

Pushdown of joins and fully replicated tables#

Fully replicated tables are a bit special, these tables are replicated in all data nodes. Thus a join where all tables are fully replicated can be handled in a completely local execution where the entire query execution is processed within one data node.

The same is true for a table using the read backup feature and executing in a cluster with only one node group (e.g. 2 nodes with 2 replicas).

This means that we can execute the entire query without performing any communication with other computers if the MySQL Server and data node are colocated.

A normal join execution starts up a number of parallel join execution parts that each take care of a subset of the query. This is handled by limiting each subquery to only scan a subset of the partitions on the first table. The result of the query in this is case is a union of the subqueries and can thus easily be merged back again. E.g. the first query part could take the first partition of the first joined table and join this table with the full tables of the remainder of the joined tables.

The join execution for each query part will be controlled from the node where the primary replica of the partitions used in the first table resides. This is independent of if this first table is fully replicated or not. The remainder of the reads will always go to the same node if a replica is stored on the node, otherwise it will go to the nearest replica available.

Execution of pushdown joins will often spread over all nodes to make use of all nodes and ldm threads in the cluster.

RonDB can be limited by Gigabit Ethernet very easily and even with 10G Ethernet it is fairly easy to overload the network in a dual socket server. Thus it makes a lot of sense to try to ensure that queries are executed as much as possible in the same node as the query was issued from.

Condition pushdown#

Pushdown of conditions to the RonDB data node can happen in cases where we compare with constants either for equality, greater than and so forth and ranges of the same. We can perform checks if the column is NULL or not, we can perform like conditions and we can compare string values if they are of the same collation.

This can be used both for full table scan, index scans and key lookups and it can be part of a pushdown join execution. It is a very powerful concept when it gets used.

Specialised MySQL Servers#

When configuring a cluster it is useful to have a set of specialised MySQL Servers. The first is a specialised MySQL Server for metadata operations. Most of the operations in RonDB are possible to perform as online operations such adding a column, adding nodes to the cluster, reorganising data partitions for a table, adding and dropping indexes and tables.

The MySQL Server code is not designed for truly online operations. This means that when performing an ALTER TABLE on a table, the table is read only in the MySQL Server performing the ALTER TABLE operation while the table can be both read and written in all other MySQL Servers and API nodes.

Thus it makes sense to use a specialised MySQL Server for metadata operations. For high availability it makes sense to have two of those in different machines.

The next is RonDB Replication. Normally the binlog records are generated in the MySQL Server from where the update operation was issued. This is not the case for RonDB. In RonDB the operations are executed in the data nodes and after that a change log is shipped to the MySQL Server that has defined replication to be set up.

In almost all cases it is desirable to have at least two MySQL Servers used for replication to make sure we can handle failover to another MySQL Replication server in cases of node failures.

When metadata operations are performed in a MySQL Server the binlog of those events are shipped in a special manner between the MySQL Servers. We support sharing the binlogging MySQL Server and the MySQL Server performing the metadata operation.

The recommended minimum configuration for RonDB is to have two data nodes on separate machines, two MySQL Servers on two different machines (could be the same as the data node machines) and two RonDB management servers, all on different machines. In addition a few API node slots should be configured to enable the various RonDB tools to also access the cluster concurrently with the MySQL Servers.

The MySQL Servers used for metadata operations and replication will use very little memory and CPU resources, it will not be any issue in having those colocated on the same machine with a MySQL Server used for normal query processing.

This type of configuration avoids many hickups that could otherwise happen in combination with meta data changes.

For bigger clusters it is not necessary to add any more specialised MySQL Servers nor any more RonDB management servers, it is enough to add new servers with data nodes and MySQL Servers used for query services.