Skip to content

Internals of RonDB#

In the next few chapters we will go through the internal architecture of RonDB, both of the data nodes and of the NDB API.

RonDB uses a message passing architecture, it is implemented in C++ using classes. But most of those classes are fairly big classes that we call blocks. Messages are addressed with a 32-bit address label. The messages are called signals. The most essential part of RonDB is about blocks and signals. We start by describing how these blocks and signals are mapped in the data nodes. Next we describe them in the API nodes.

To understand the presentation of the most important signal flows, we next introduce the blocks in the data nodes.

We present the most important signal flows to provide an understanding of what happens in RonDB when a query is executed towards RonDB.

RonDB is a database, thus presenting the most important data structures we use is important. This includes the data structure of our rows, our hash index our ordered index and our data structures for row locks.

To implement this architecture we have implemented a virtual machines in steps, the first was optimised for machines with 2 CPUs and the current one is scalable to more than 1000 CPUs.

We will discuss some other important internal architecture components used in many parts of RonDB such as internal triggers, the transporter architecture used for communication and our principles for memory allocation.

Data Node Architecture#

The data nodes in RonDB is heavily inspired by the AXE architecure at Ericsson. The architecture is a message passing architecture where messages (called signals) are transported from one module (called block) to another module.

Messages have a destination address, this is a 32-bit number that similar to an IP address is used for routing the message to its destination. The message also carries a source address and the messages carries data.

The address consists of a node id, a thread id and a block id. This makes it possible to scale to 65535 nodes with 1024 threads in each node and up to 64 blocks in each thread.

Block and Signal Architecture#


The idea with a block is that it has its own data and its own code. In AXE the blocks were implemented in PLEX, this programming language did not allow for any type of code sharing and data sharing. Our blocks are implemented as C++ classes and they can share some common base classes to handle various common data structures such as hash tables, memory pools and so forth.

For the most part though the blocks contain their own code and their own data. This has the advantage that errors are quite isolated from each other. Given that the interface to a block is intended to use messages all the time, a bug can only affect other blocks by sending erroneus data. This makes it easier to find bugs in RonDB.

Reading the code requires an understanding of how messages are routed. In the code a signal is passed using a block reference. There is very little in the code that indicates where this signal goes. It is important to document signal flows to make it easier to understand the code. At the same time there are many ways to create signal flows to assist new developers to understand the signal flows.

Block references#

When a signal is sent we use a number of methods called something like sendSignal mostly. A signal is using a 32-bit block reference as its destination address. This address is divided into a number of parts. The lowest 16 bits is the node id. At the moment we can only use 8 of those bits, but the architecture is designed such that it can grow to clusters with 65535 nodes.

The 16 upper bits is a block address inside a node. They are interpreted a bit differently in data nodes and in API nodes.

In an API node the block address is used to get the Ndb object from an array, the array address is the index to this array. This array starts with index 32768. There can be at most 4711 Ndb objects in one cluster connection. Since Ndb objects are designed to be used in one thread this is the limit on the number of parallel threads to be used per cluster connection. At the same time the practical limit is much smaller. There is very little reason to use thousands of threads per cluster connection.

There are three special API blocks, block number 2047 is used for sending packed signals. Block number 4002 is used for sending signals to the cluster manager and 4003 is used for configuration change signals between management servers.

In a data node the lowest 9 bits of the block address is the block number, for example DBLQH have the block number 245. We can have up to 512 blocks, currently we have 24 blocks, there is lots of space in this part of the address space.

The upper 7 bits constitutes a thread id of the block. Each thread can contain its own DBLQH instance for example. Since we are not likely to use any more than 64 blocks, we currently have a bit more than 20 blocks, RonDB uses a mathematical transformation of block numbers and thread ids such that we support only 64 blocks and instead we support up to 1023 threads.

When a signal is sent the software looks at the destination node, if it isn't our own node we pass the signal to the sending of distributed signals. If the destination node is our own node we check the thread id combined with the block number to get the destination thread number. Next the signal is passed into a signal queue of the destination thread.

Asynchronous signals#

Most signals sent are asynchronous signals. These signals are not executed immediately, they are put into a signal queue of the thread. This signal queue is called job buffer.

Asynchronous signals can be sent on two priority levels, A is high priority and B is normal priority. The priority of a signal affects the scheduling of a signal execution in a data node, it doesn't affect how it is put into buffers for sending to other nodes and it doesn't affect scheduling on an API node.

Synchronous signals#

We can also send synchronous signals, in practice those are function calls that use the same protocols as signals. They are always executed in the same thread, but might in a few situations execute on blocks that belong to another thread (mostly true when looking up the distribution information from a transaction coordinator).

In some cases we can also use direct function calls on other blocks in the same thread. This is mainly used for the most optimised code paths and mostly in the ldm thread.

Scheduling rules#

A thread executing signals on blocks use a scheduler to handle execution and communication with other threads. When a signal is executed in a block it is always calling a method that looks like this:

BlockName::executeSIGNAL_NAME(Signal *signal)

The scheduler is not using a time sharing model here. It is the responsibility of the block to not execute for too long. There are coding rules that a signal should not execute for more than at most 10 microseconds and almost all signals should be executed within 1-2 microseconds.

Thus even if hundreds of signals are waiting in the job buffer, they will be executed within a few hundred microseconds.

This scheduling model have advantages when combining primary key queries with long running scan operations. Scan operations handle a few rows at a time when being scheduled, after that they will be put last in queue again. This means that they will not block the execution of key lookups for very long time. This architecture is an important part of our solution for predictable latency in RonDB.

To handle background activities that requires a certain percentage of the CPU resources we have implemented methods to check job buffer levels and make use of high priority signals in cases where it is necessary to run for more extended times. This is currently used by our local checkpoint algorithms to guarantee a certain progress even when the load from traffic queries is very high.

Parallelising signal execution#

Given that each signal starts a new thread of execution, it is extremely easy to parallelise activities in RonDB. If one signal is sending one signal it is one thread of execution, but if one thread sends two signals, it has in principle created a new thread of execution.

Thus creating a new thread of execution has a cost measured in nanoseconds. We use this when executing a scan operation. If we have 4 partitions in a table, a scan will send off parallel scan operations towards all partitions from the transaction coordinator.

It is important to not start new threads of execution in an unbounded manner. This would overload the job buffers. We have design principles that state that a signal cannot start more than 4 new signals in the same thread. And even signals to other threads one have to be careful with. For the above scans we limit the amount of scans that can execute in parallel by the MaxNoOfScans configuration parameter, this parameter can not be set higher than 500.

Signal data structure#

Signals come in a number of flavors. The easiest signals carry from 1 to 25 32-bit unsigned integer values, the signal has a specific length. These are sometimes called short signals.

Many years ago we added long signals, they can come with up to 3 extra parts. The total size of a long signal can be up to around 32 kBytes in size.

In some cases it is necessary to send even longer signals. In this case we use a concept called fragmented signals. This is a signal that is sent through many long signals. Each such long signal is received and handled and the memory is stored in a buffer waiting for all parts to arrive before the signal is executed.

These fragmented signals are fairly rare and mainly used in various meta data operations. Normal traffic signals are carried in long signals in almost all cases.

Receive handling in Data Node#

In a data node we have one or more special thread(s) that handles the sockets communicating with other nodes (more on single-threaded data nodes later). Signals are received on a socket, we use a special connection setup protocol to get started. After that each signal carries a fixed 12 byte header that ensures that we can easily find where the signal ends and where the next signal starts.

Any signal can be destined for any thread and any block in the data node.

Each thread have one job buffer for each thread that can communicate with it. The receiver thread have one job buffer to write its signals into for each thread handling signals in the data node. We call these threads block threads.

These job buffers can be written into without using any mutexes, they are mapped such that each job buffer has one thread writing into it and one thread reading from it. In this case it is possible to design algorithms for buffer read and write that only uses memory barriers to secure a safe communication between the threads. Only high priority signals are sent to a shared job buffer protected by a mutex.

In very large data nodes with more than 32 block threads we use 32 job buffers that other threads access using a mutex per job buffer.

The recv threads sleep on an epoll_wait call on Linux and similar constructs on other OSs. Thus as soon as a message arrives from a node the recv thread will wake up and handle it.

It is possible to specify that the recv thread should spin for a while before going to sleep again, this has the possibility of decreasing latency in low load scenarios. It is also possible to set the thread priority of the recv thread a bit higher to ensure it isn't forced to go to sleep when it still has a job to do in high load scenarios.

CPU spinning is the default in the receive threads and in the block threads. The CPU spinning is using an adaptive algorithm that ensures that we only spin when it is likely that we can gain from it.

Send handling in Data Node#

Sending signals to another node is handled through send buffers. These buffers are configurable in size. A block thread executing messages that is sending a signal will simply copy the signal into one of those send buffers and continue execution.

After executing a number of signals the scheduler will ensure that the send buffers local to its thread is made available for sending. This includes signals sent to other threads in the same node. The number of signals before this happens is controlled by the configuration parameter SchedulerResponsiveness. Setting this value higher means that we will execute for longer time before we make them available.

After executing even more signals we will return to the scheduler loop. The number of signals before this happens is controlled by the SchedulerResponsiveness parameter. If the job buffer is empty we will always return to the scheduler loop (this is the most common reason of course).

The scheduler loop handles checking for delayed signals (signals that are sent with a delay of a specific number of milliseconds)). In addition it handles packing of send buffers when necessary. It handles sending of packed signals, these are signals where several small signals are packed into one signal, these signals avoid the cost of signal scheduling for a number of very common signals and avoids the cost of the 12 byte overhead for each signal sent to another node over the network.

The scheduler of a block thread calls a method do_send to activate sending of the messages. The actual sending of a message is normally configured to be done by one or more send threads. If no send thread is configured, the sending will be handled by the block threads.

In MySQL NDB Cluster 7.5 we added the possibility for sending to be done by block threads even when a send thread is configured. This is handled by an adaptive algorithm. If a thread is very lightly loaded it will wake up once per millisecond to see if it can assist in sending messages. If a thread is an overloaded state (roughly above 75% load) it will offload all sending to the send threads. If the thread is not overloaded, it will send to as many nodes as the thread itself was attempting to send to.

In practice this algorithm usually leads to that the tc thread does a lot of assistance to send threads since they go through this scheduler loop quite frequently. The main and rep threads are usually lightly loaded and wake up every now and then to ensure that we keep up with the load on the send threads. This ensures that the node can keep up with the send load even when configured with a single send thread and lots of block threads. The ldm threads usually assist the send threads a bit while not overloaded, if any threads is overloaded, it is usually the ldm threads. The recv threads do not assist the send threads.

When a node wants assistance from send threads to execute, it will wake a send thread up. Each connection has a responsible send thread. Each send thread has its own mutex to ensure that this mutex isn't becoming a bottleneck.

The send threads have as their sole purpose to send signals. They grab the send thread mutex to see if any nodes are scheduled for signal sending. If so they grab the node from the list, release the mutex and start sending to the node. If no nodes are available to send to, the send thread will go to sleep.

RonDB connection setup protocol#

After a socket is setup between a data node and an API node or other data node, the following connection setup protocol is used to ensure that the other side is an RonDB node as well.

First the client side (the API node or the data node with the highest node id) sends a ndbd and ndbd passwd message and waits for a response from the server side. The server responds to these messages with an ok message. After this the client side sends 2 1 assuming 2 is the node id of the client side and 1 means TCP/IP socket used for communication. The server side responds with the same message with its node id and a 1.

Client: ndbd<CR>
Client: ndbd passwd<CR>
Server: ok<CR>
Client: 2 1<CR>
Server: 3 1<CR>

If any side finds a problem with this setup protocol it will close the connection. If both sides are ok with the setup protocol they will now enter into a full duplex mode where both can send signals at full speed. In most cases the receive side of the socket is handled by some receive thread and the send side of the socket is handled by some send thread.

RonDB signal header definition#

When the connection between two nodes are setup the only thing carried on the sockets are signals. These signals are sent using the following format.

The signal always starts with 3 words (a word is 32 bits) that form a header. If the configuration specified that the connection would use signal ids, the header will be four words instead where the fourth word is an id of the signal sent, each signal sent to the other node increments the id. This signal id can be used to improve signal logging capabilities. It is not on by default in a cluster.

After the header the normal signal data comes. This can be up to 25 words. The number of words is specified in the header part.

After the normal signal data we have the lengths of the optional 3 segments that the signal can carry, each length is a word. The number of segments is specified in the header part. There can be zero segments.

Next the segment data comes, this is always a number of words.

After the segment data there is an optional checksum word. By default this is not on, but it can be activated through a configuration parameter on the communication section.

Header word 1:

  1. Bit 0, 7, 24, 31 Endian bits

  2. Bit 1,25 Fragmented messages indicator

  3. Bit 2 Signal id used flag

  4. Bit 3 Not used

  5. Bit 4 Checksum used flag

  6. Bit 5-6 Signal priority

  7. Bit 8-23 Total signal size

  8. Bit 26-30 Main signal size (max 25)

Bit 0,7,24 and 31 in the first header word are all set if the communication link is using big endian, otherwise they are 0. Currently it is not possible to mix little endian machines with big endian machines in the cluster.

Fragmented messages are multiple messages that should be delivered as one message to the receiver of the message. In this case bit 1 and 25 represent a number between 0 and 3.

0 means there is no fragmentation, or in terms of fragments, this fragment is the first and the last fragment.

1 means it is the first fragment in a train of fragments.

2 means it is not the first fragment, but also isn't the last fragment.

3 means it isn't the first, but it is the last fragment.

Header word 2:

  1. Bit 0-19 Signal number (e.g. API_HBREQ)

  2. Bit 20-25 Trace number

  3. Bit 26-27 Number of segments used

  4. Bit 28-31

The signal number is the id of the signal sent. This will be used to ensure that the correct function is called in the data node and in the API node.

Trace number is a feature that can be used to trace signals for a specific train of signals. Each signal sent carries the trace number of the signal executed currently. It can be used to follow a special event like a scan operation through the data nodes. It has been used in special implementations, but there is no support for this type of tracing in the current source trees.

The number of segments is used to discover long signals and also segments in fragmented signals. It gives the number of segment sizes after the short signal data and how many segments of data to read in at the end of the signal.

Header word 3:

  1. Bit 0-15 Sender block id

  2. Bit 16-31 Receiver block id

The receiver block id is used to direct the signal to the proper Ndb object in the API nodes and the proper thread and block in the data nodes. the sender block id is occasionally used by blocks when executing a signal.

API Node Architecture#

In a previous chapter we went through the NDB API. This is the C++ API used to access RonDB. In this chapter we will look at the implementation aspects of this API. In particular what we have done to map the block and signal architecture to a synchronous API. We have an asynchronous API part as well that fits very well with the block and signal model.

The C++ NDB API is a fairly normal object-oriented library. It is using blocks and signals as a way to route messages to the appropriate thread and software unit in the data nodes.

The original NDB API used a structure where we had one mutex that protected the entire send and receive part of the NDB API. When we fixed this in MySQL NDB Cluster 7.3 we had a number of choices.

We needed to separate the send and receive logic and ensure that they could execute without interfering with each other. We had to make a number of choices on where to execute the actual signals. The signals have as destination the Ndb objects or some other object linked to this object. It is possible to let user threads execute the signals.

We decided to let the signals be executed by the receive logic. The reason is that we could improve our latency in this case. We could see that the alternative approach would increase latency of the NDB API. At the same time the scalability of the approach where user threads execute the signals is better, so it is a trade off.

Thus there are two main scalability limitations of the current NDB API limitation. The first is that all signals arriving have to be received and executed by a single thread that handles both receiving on the socket as well as executing the signal itself.

The second limitation is that one API node is using one TCP/IP socket. One socket have a fair amount of states where only one or two CPUs at a time can work on the socket. Thus one socket have limitations on number of packets received per second and the bandwidth that such a socket can maintain.

The solution to both these problems is to use multiple API nodes for one program. E.g. the MySQL Server can define any number of cluster connections that will work independent of each other.

In the managed version of RonDB each MySQL is assigned 4 API nodes that will ensure that it scales to at least 32 VCPUs.

Cluster connection#

The cluster connection is maintaining one API node and as mentioned already, there can be multiple API nodes in one program.

One cluster connection contains all the structures required to communicate with each data node in the cluster and each management server.

Thus the cluster connections have one receive thread handling signals arriving to the API node. It has one send thread that can take over send handling when the socket is overloaded at the time when the user thread tries to send. Finally it has a thread to handle cluster management activities, such as heartbeats.

User threads#

User threads are not under our control in most cases. The MySQL Server is an exception where one thread is created per connection to the MySQL Server (except when the thread pool is used). These threads execute all the normal NDB API calls and we wake those threads up when we have completed executing all signals that we were requested to execute to handle the NDB API calls. User threads handles most of the send and receive handling in cases of low loads. The higher the load becomes, the more the receive thread will jump in to assist in executing the signals received.

NDB API send threads#

Send threads in the NDB API only send when the socket cannot keep up with the amount of signals we attempt to execute. Normally the send threads are sleeping. But in high load cases they can be quite busy sending signals that it was assigned to handle.

NDB API receive threads#

The receive threads is the heart of the NDB API implementation. Receive handling is a property that is controlled by a mutex. Any user thread can take this responsibility if no other thread already has grabbed this responsibility. This improves latency in single-threaded use cases.

When many user threads are active at the same time, the receive thread is becoming active. The threshold to this is set by the MySQL Server variable --ndb_recv_thread_activation_threshold. By default this is set to 8. One problem that we can get with the receive thread is that it is a thread that uses much more CPU compared to the other MySQL Server threads.

This means that the normal Linux scheduler will give it a lower priority compared to the rest of the threads. This is not beneficial to the other threads using this cluster connection since it will delay them getting woken up to serve the replies from the data nodes.

To ensure that the receive thread gets a higher priority we set the nice level of the receive thread to -20 if possible. As mentioned in the chapter on Installing RonDB in the section on adding a new mysql user, it is necessary to set the highest nice level that can be set by the user. To set this higher nice level the user mysql must have CAP_SYS_NICE capabilities as shown in the above chapter how to set.

Using a receive thread that is locked to a CPU and that gets activated as soon as more than one user thread is active is the most optimal solution for latency using the NDB API. But it requires that the mysql user can set the nice level higher and that it can lock CPUs in a safe way without interfering with user threads or other processes.

The default manner where the user threads takes care of everything has slightly worse latency, but it still scales very nicely.

NDB API cluster manager threads#

There is a thread taking care of heartbeats, registering as a new node with the data node. This thread will wake up every 100 millisecond and send a heartbeat signal if needed.

NDB API wakeup thread#

In some situations the NDB API is bottlenecked by the need to wake up threads receiving data. It takes a few microseconds to wake a thread from sleep. To ensure that the current receiving thread isn't consumed by wakeup activity it can offload the wakeup processing to a specialised wakeup thread, thus waking a single thread to wake possibly hundreds of user threads.

Blocks in API nodes#

A block in the API node is simply an Ndb object. When referring to a block, it is referring to a pointer in an array that in turn points to an Ndb object. Each Ndb object has to be handled by one thread at a time, thus it is easy to handle signals to blocks in the NDB API.

Blocks in a Data Node#

In order to understand the signalling diagrams that we will present in a later chapter on common use cases, it is necessary to at least understand the various blocks in RonDB at a high level.

The code also documents some of the most important signalling flows in RonDB. As an example the block Ndbcntr and in the code file NdbcntrMain.cpp contains a long description of a how a restart works. In the block Qmgr and in the file QmgrMain.cpp we have a signalling diagram for how the registration of new nodes into the cluster at startup happens and how this is the base for heartbeat signals sent later. In the block Dbdih and in the file DbdihMain.cpp there is a long description of how ALTER TABLE REORGANIZE PARTITIONS works with a detailed signalling flow. Hopefully with the aid of this explanation of the internals it will be easier to understand those descriptions in the code for anyone interested in understanding on a detailed level what goes on inside RonDB to handle the complex online meta data changes.

In the description we will describe the execution modules inside the RonDB data nodes. An execution module must execute within one thread, but different execution modules can be colocated within one thread in a variety of manners. The configuration variable ThreadConfig provides a great deal of flexibility in configuring the placement of execution modules into threads.

LDM Blocks#

LDM stands for Local Data Manager. It is a set of blocks that cooperate to deliver the local database service that is a very central part of RonDB. In ndbmtd these blocks all operate inside the same thread in the ldm threads. It is not possible to break those blocks apart and execute them in different threads. It is however possible to have several instances of the ldm threads. Each ldm thread handles one instance of each of the below blocks, in addition it can take care log parts of the REDO log. In RonDB we can have fewer REDO log parts than the number of LDM instances. In this case the REDO log parts will be handled by the first LDM instances, but all LDM instances will write to the REDO log controlled through a mutex on the REDO log part.


DBLQH stands for DataBase Local Query Handler. This block is where most interactions with the LDM blocks starts. It does handle the REDO log, but its main responsibility is to interact with the other blocks operating the tuple storage, hash index, ordered index, page cache, restore operations, backup and local checkpoint services.

The most common signal DBLQH receive is the LQHKEYREQ that is used for reading or writing a row using a primary key of a table (either a user table or a unique index table or a BLOB table). To serve this query it uses DBACC to lookup in the hash table and DBTUP that has the responsibility of the tuple storage and that performs the operation requested by the user.

Scan operations is the second most common operation and this is receiving a SCAN_FRAGREQ signal to order either a full table scan (implemented by either DBACC or DBTUP) or a range scan (implemented by DBTUX).

DBLQH also controls the execution of local checkpoints together with the BACKUP block.

One can say that DBLQH is the service provider for the LDM blocks that makes use of internal services delivered by the other blocks.


DBACC has two main functions. It controls the local part of the distributed hash index that every primary key uses, and every table in RonDB, including unique index tables and BLOB tables, have a primary key. Even tables without a primary key uses a hidden primary key.

The hash table in DBACC also acts as the locking data structure to ensure that rows are locked in the proper fashion by user operations.

DBACC stands for DataBase ACCess manager.


DBTUP means DataBase TUPle manager. DBTUP is where the actual in-memory rows are stored using the fixed row parts and the variable sized row parts. DBTUP has the ability to execute all sorts of operations requested by the higher levels of software in RonDB. Thus this is where the actual reads and writes of data happens. DBTUP also contains a fairly simple interpreter that can execute a number of simple statements that can be used to push down condition evaluation to RonDB. It also reads and writes the disk columns delivered by PGMAN.


DBTUX means DataBase TUple IndeX. It contains an implementation of a T-tree. The T-tree is an index specifically developed to support ordered indexes in main memory. Every update of a row involving an ordered index will perform changes of the T-tree and it is used heavily during restart when the T-trees are rebuilt using rows recovered.

The DBTUX software have been very stable for a long time. It does what it is supposed to do and there is very little reason to change it. Most new features developed are done at a higher service level in RonDB and DBTUP has lots of capabilities that interact with higher service levels in RonDB.


PGMAN stands for PaGe MANager and it handles the page cache for disk pages used for disk columns. Each ldm thread has a part of the page cache. It contains a state-of-the-art page caching algorithm that is explained in comments for the get_page function in this block.


The backup block is responsible to write local checkpoint and backup information to files. It does so by using a full table scan using the DBTUP block to ensure that all rows are analysed to see if they should be written into this local checkpoint or not. The local checkpoints in RonDB uses a mechanism called Partial Local Checkpoint (Partial LCP). This mechanism was introduced in MySQL Cluster 7.6 and provides the ability to write checkpoints that only write a part of the in-memory data. This is an extremely important feature to enable use of RonDB with large memories. The previous implementation made every checkpoint a full checkpoint. With terabytes of data in memory, this is no longer a feasible approach.

The Partial LCP uses an adaptive algorithm to decide on the size of each checkpoint of each table fragment. The full table scan in DBTUP for checkpoints uses some special mechanism to quickly go through the database to find those rows that have changed since last checkpoint. Pages that are not written to since last checkpoint will be quickly discovered and skipped. The partial checkpoint means that a subset of the pages will be written fully and a subset will only write changed rows. The number of pages to write fully depends on how much write activity have occurred in the table fragment since the last checkpoint.

The Partial LCP has to handle dropped pages and new pages in a proper manner. The code in DbtupScan.cpp contains a long description of how this works together with a proof of that the implementation actually works.


The RESTORE block is only used during the early phases of a restart when it is used to restore a local checkpoint. The local checkpoint contains saved in-memory rows that are written back into DBTUP at recovery using more or less the same code path as a normal insert or write operation would use in RonDB.

With Partial LCP we have to write several checkpoints back in time to ensure that we have restored the checkpoint properly. Thus this phase of the recovery takes slightly longer time, however the Partial LCP speeds up the checkpoints so much that the cost of executing the REDO log decreases substantially. Thus recovery with Partial LCP can go up 5x faster compared to using full checkpoints.

Query thread blocks#

Each query thread contains the blocks DBQLQH, DBQACC, DBQTUP, DBQTUX, QBACKUP and QRESTORE. These are using exactly the same code except that they have a variable m_is_query_thread that is set to true. A query thread doesn't have a PGMAN instance since the query threads currently don't handle queries using disk columns.

Query threads only handle Read Committed queries. Any query thread can handle queries to any table fragment. However when routing the requests to query threads we ensure that query threads only execute queries for the same Round Robin Group. Round Robin groups strive to share a common L3 cache to ensure that we make good use of CPU caches and TLB data.

TC blocks#

TC stands for Transaction Coordinator. These blocks handle a small but important part of the RonDB architecture. There are only two blocks here DBTC and DBSPJ. All transactions pass through the TC block and for pushdown joins the DBSPJ block is used as the join processor.


The most common signalling interface to DBTC is TCKEYREQ and TCINDXREQ. The TCKEYREQ is used for primary key operations towards all tables. DBTC works intimately with the DBDIH block, DBDIH contains all distribution information. DBTC queries this information for all queries to ensure that we work with an up to date version of the distribution information. TCINDXREQ handles unique key requests.

The distribution information in DBDIH is used from all TC blocks, this information required special handling of concurrency. Since it is read extremely often and updates to it are rare, we used a special mechanism where we read an atomic variable before accessing the data, we read the same variable again after accessing the data, if the variable has changed since we started reading, we will retry and read the data once again. Obviously this requires special care in handling errors in the read processing.

DBTC uses the DBLQH service provider interface to implement transactional changes. DBTC ensures that transactional changes are executed in the correct order. This includes handling triggers fired when executing change operations. These triggers are fired to handle unique indexes, foreign keys and table reorganisations. The triggers are often fired in DBTUP and sent to DBTC for processing as part of the transaction that fired the change.

Scan operations and pushdown join operations are started through the SCAN_TABREQ signal. Scan operations on a single table is handled by DBTC and pushdown joins are handled by DBSPJ.

DBTC handles timeouts to ensure that we get progress even in the presence of deadlocks.

If a data node fails, a lot of ongoing transactions will lose their transaction state. To be able to complete those transactions (by either aborting or committing them) DBTC can build up the state again by asking DBLQH for information about ongoing transactions from the failed node. It is the first tc thread in the master data node that will take care of this.


DBSPJ stands for DataBase Select-Project-Join. SPJ is a popular phrase in discussing join queries in databases. The DBSPJ blocks implements the linked join operations that is used in RonDB to implement a form of parallel query. DBSPJ has no function for anything apart from join queries.

It implements the joins by sending SCAN_FRAGREQ for scans to DBLQH and LQHKEYREQ to DBLQH for primary key lookups. The results of the scans and key lookups are sent directly to the NDB API that also get some keys that makes it possible for the NDB API to put together the result rows. Some parts of the reads can also be sent back to DBSPJ when the result from one table is required as input to a later table in the join processing.

Main thread blocks#

There are two execution modules used for various things, this is the main and the rep execution module. Here we will first go through the blocks found in the main execution module. These blocks all have only one instance. There are also a few proxy blocks. Proxy blocks is a special kind of block that was introduced to handle multiple block instances. The proxy blocks implements some functionalities such that it is possible to send a signal to a block and have it executed on all block instances of e.g. the DBLQH block. This was helpful in changing the code from the single-threaded version to the multi-threaded version.


DBDICT stands for DataBase DICTionary. It contains meta data about tables, columns, indexes, foreign keys, various internal meta data objects. It implements a framework for implementing any type of schema transactions. A schema transaction tracks the progress of a schema transaction. If a failure happens we will either roll the schema transaction backwards or forwards. If it has passed a point where it can no longer be rolled back, we will ensure that it will be completed even in the presence of node crashes and even in the presence of a cluster crash. Schema transactions cannot contain any normal user transactions.

DBDICT implements this by executing a careful stepwise change of each schema transaction. Thus the schema transaction is divided into many smaller steps, each of those steps are applied in all nodes in parallel. After completing the steps we record the information about the step we have completed in a file flushed to disk. This means that schema transactions in RonDB are not so fast, but they can handle very complex changes. It is possible to use this framework for quite advanced schema changes. But introducing new types of schema changes still requires writing a few new functions and verifying that the new schema change variant is working well.

DBDICT has an important role in the startup of a node where it decides which tables to restore. It asks DBDIH to perform the actual recovery and will assist DBDIH in some of those recovery steps.


DBDIH stands for DataBase DIstribution Handler. This is one of the most important parts of RonDB. In DBDIH we maintain all the distribution information about all tables in the cluster. It contains algorithms to ensure that this information is synchronised with all other nodes in the cluster.

This distribution information is updated by schema changes and by node starts and by node failures. Thus for a specific table the information is very stable. Days and even years can pass without it being updated. To handle this we have used a mechanism similar to the RCU mechanism used in the Linux kernel. The idea is that when reading one performs the following, before starting to read we check if the data is currently being changed (checked by reading a flag on the table). If it isn't currently being changed we go on and read the information, but we record the flags we read before reading it. If it was currently being changed we will simply loop until the data isn't being updated. After reading the data we will check if the data have changed since we read (checked by again reading the table flags). If the data didn't change we're done, if it changed, we restart the read once again.

This procedure is used to protect a number of variables regarding data distribution maintained by DBDIH. All the code to handle these protected regions are assembled together in DBDIH to make it easier to follow this part of the code.

DBDIH runs the global checkpoint protocol, global checkpoints was described in an earlier chapter and represents recoverable checkpoints on a cluster level. DBDIH also runs the local checkpoint protocol. DBDIH gathers information during the local checkpoint process that is used in restarts. With the implementation of Partial LCP most of the handling of checkpoints is handled locally in the DBLQH block. However DBDIH is still used to control the flow of checkpoints although some information it retrieves from DBLQH isn't really used any more.


TRIX stands for TRIgger eXecution. It is involved in handling online index builds, copying data during table reorganisations and online foreign key builds. It is a block that assists DBDICT by performing a number of important operations to move data around to implement complex online schema changes.


QMGR stands for Cluster Manager with a little play on pronunciation. This block is responsible for the heartbeat protocol in RonDB. It is the lowest level of the cluster management in RonDB. It is involved in early start phases to register a new node into the cluster and maintains the order used to decide which node is the master node (it is the oldest node registered). Nodes are registered one at a time and all nodes are involved in this registration, thus all nodes agree on the age of a node. Next we decide that the oldest node is always choosen as the new master node whenever a master node fails.


NDBCNTR stands Network DataBase CoNTRoller. From the beginning this block was the block responsible for the database start of the data node. This is still true to a great extent.

The original idea with the design was that QMGR was a generic block that handled being part of the cluster. The NDBCNTR block was specifically responsible for the database module. The idea in the early design was that there could be other modules as well. Nowadays QMGR and NDBCNTR assist each other in performing important parts of the cluster management and parts of the restart control. Mostly NDBCNTR is used for new functionality and QMGR takes care of the lower level functionality which among other things relates to setting up connections between nodes in the cluster.


CMVMI stands Cluster Manager Virtual Machine Interface. This block was originally a block that was used as a gateway between blocks implemented in PLEX and blocks implemented in C++. Nowadays and since 21 years all blocks are C++ blocks. It now implements a few support functions for other blocks.


DBUTIL stands DataBase UTILity. This blocks implements a signal based API towards DBTC and DBLQH. It is a support block to other blocks like NDBCNTR, TRIX and so forth to execute database operations from the block code.


NDBFS stands for Network DataBase File System. It implements a file system API using signals. It was developed to ensure that file system accesses could use the normal signalling protocols. Given that all the rest of the code in the RonDB data nodes is asynchronous and uses signals between blocks, it was important to move also the RonDB file system accesses into this structure. Not every OS have an asynchronous API towards their filesystem, we implemented the interface to the file system through many small threads that each can do blocking file system calls. But they interact with the main execution module whenever they are requested to perform a file operation and the threads send back information to the main execution modules when the file operation is completed.

The APIs implemented towards NDBFS supports a number of different models. The various uses of a file system is rather different for the functions in RonDB. Some functions simply need to write a file of random size, others work with strict page sizes and so forth.

Proxy blocks#

The main execution module handles the proxy blocks for DBTC and DBSPJ.


The main execution module handles a variety of things. For the most part it is involved heavily in restarts and in meta data operations. NDBFS operations and checkpoint operations in DBDIH is the most common operation it does during normal traffic load.

Rep blocks#

The blocks that execute in the rep execution module are used for a variety of purposes. The name rep is short for replication and stems from that the SUMA block is used to distribute events from the data node to the MySQL replication servers.


SUMA stands for SUbscribption MAnager. Events about changes on rows are sent from the SUMA block to the NDB API nodes that have subscribed to changes on the changed tables. This is mainly used for RonDB Replication, but can also be used for event processing such as in HopsFS where it is used to update ElasticSearch to enable generic searches in HopsFS for files.


LGMAN stands for LoG MANager. There is only one LGMAN block in the data node. But this block can be accessed from any ldm execution module. In this case the Logfile_client class is used, when creating such an object a mutex is grabbed to ensure that only one thread at a time is accessing the UNDO log. These accesses from the ldm execution modules are only writing into the UNDO log buffer. The actual file writes are executed in the LGMAN block in the rep execution module.


TSMAN stands for TableSpace MANager. The creation and opening of tablespace files happens in the TSMAN block. The actual writes to the tablespace files are however handled by the PGMAN blocks in the ldm threads and the extra PGMAN block described below.

Any writes to extent information is handled by TSMAN. These accesses are executed from the ldm execution modules. Before entering the TSMAN block these accesses have to create a Tablespace_client object. This object will grab a mutex to ensure that only one ldm execution module at a time reads and writes into the extent information.


DBINFO is a block used to implement scans used to get information that is presented in the ndbinfo tables. There are lots of ndbinfo tables, those are used to present information about what goes on in RonDB. In a managed RonDB setup this information is presented in lots of graphs using Grafana.

Proxy blocks#

The proxy blocks for DBLQH, DBACC, DBTUP, DBTUX, BACKUP, RESTORE and PGMAN are handled in the rep execution module.

PGMAN extra#

The PGMAN block has one instance in each ldm execution module. There is also an extra PGMAN instance. The main responsibility of this instance is to handle checkpointing the extent information in tablespaces. The tablespace data used by normal tables is handled by the PGMAN instances in the ldm execution module.

The extent information is a few pages in each tablespace data file that are locked into the page cache. The checkpointing of those pages are handled by this extra PGMAN block instance. This block instance also executes in the rep execution module.


THRMAN stands for THRead MANager. This block has one instance in each thread. It handles a few things such as tracking CPU usage in the thread and anything that requires access to a specific thread in the data node.


TRPMAN stands for TRansPorter MANager. Transporters are used to implement communication between nodes in RonDB. There is a TRPMAN block in each recv thread. This block can be used to open and close communication between nodes at request of other blocks.

Virtual Machine in Data Nodes#

In this chapter we will discuss a few more things regarding the virtual machines and its threads in ndbmtd.

Each execution module is placed into one thread. It could be a thread that only handles one execution module, but we could also have threads that handles many execution modules. One execution module cannot be split up in different threads, but multiple execution modules can be joined together in a thread.

In the presentation below we will describe things as if each execution module had its own thread.

Thread types in ndbmtd#

ldm threads#

The ldm thread cannot communicate with any other ldm thread in the same node, it can communicate with all tc threads and with the main thread. It can communicate with itself as can every other thread. The first ldm thread was a bit special in earlier version when backup processing wasn't parallelised. This is no longer the case.

The ldm threads have an important characteristic regarding its CPU usage. It is extremely efficient in using the CPUs. The instructions per cycle (IPC) can be as high as 1.5 when executing a standard Sysbench OLTP benchmark and even above 2 when executing only range scans that performs filtering of rows. This is one of the reasons why it doesn't pay off so much to use two ldm threads per CPU core. There isn't enough execution units in a CPU core to keep two threads moving in parallel. The thread has a branch prediction miss rate of less than 2% and also L1 cache misses are only about 2% of the accesses. Misses in the last level cache is however about 15%. This is where the actual accesses to the data parts come into play.

The ldm thread can benefit much if they stay at the same CPU all the time and their CPU caches are not influenced by other threads executed in the machine.

This is why CPU locking is an important method to improve performance of RonDB data nodes.

query threads#

Query threads execute key lookup and scan queries using the READ COMMITTED mode. This means that several threads can access the same data concurrently. The aim is that query threads execute on the same CPU core as a ldm thread.

With the introduction of query threads it was possible to decrease the number of table partitions. This means that in a large data node, not all ldm threads are used to a full extent. This provides the possibility for the query in this CPU core to be used to a greater extent, thus providing the capability to distribute load evenly.

tc threads#

tc threads communicate with all other threads. tc threads have very different characteristics compared to ldm threads. The signals executed are very short and there is a lot of those small messages. So the tc threads are up and running for a short time and goes to sleep for a very short time.

tc threads benefit a lot from hyperthreading, we get 40% extra performance for each CPU core by using hyperthreading. The IPC for tc is also a lot lower.

tc threads play an important role in handling sends to other nodes, both when using send threads and without specific send threads. Thus you can see a lot of CPU usage by the OS kernel from this thread. The send assistance by tc threads is important to decrease the response time for requests towards RonDB.

Having too many tc threads can easily lower total performance of the node. Most likely that we get to send too much small packets to the API node, this will have a negative impact on API node performance.

main threads#

main threads are normally not very busy at all. They are mainly used for file system signals during normal operation. They can be fairly busy during ALTER TABLE commands that reorganize data, build indexes and build foreign keys. Most of the CPU resources for this thread type can be colocated with the rep thread, but in larger configurations it can be useful to separate those threads on their own CPUs.

rep threads#

The rep thread is used to send event data to the NDB APIs that have subscribed to change events. It is also used for ALTER TABLE operations that build indexes, foreign keys and reorganise tables.

Other functions should have very small impact on CPU usage.

The main thing to consider for the main and rep thread is that they normally will use very little CPU resources, but at times they can spike when running a heavy ALTER TABLE operation.

Both main and rep threads can communicate with all other thread types.

recv threads#

recv threads benefit greatly from hyperthreading. One important thing to consider for recv threads is that they benefit from not using a full CPU. Locking recv threads to a CPU, one should strive to not use the CPU to more than 60%. The reason is that it is very important for the recv thread to quickly wake up and take care of any actions and similarly to ensure that other threads can quickly wake up to to handle the received signals.

If the recv thread uses more CPU than 60%, it is a good idea to add more recv threads. The ndb_top tool is a good tool to check how much CPU a thread uses if the threads are not locked to their own CPUs, in this case the top tool can be used as well.

send threads#

There are many other thread types that can assist send threads. Using a single send thread is mostly ok and having more than one might even have negative performance impact.

Still there are cases when more send threads are needed. Often performance can increase a bit by using all the way up to 4 send threads. HopsFS is a good example of an application that requires much CPU resources for sending.

send threads benefit from hyperthreading.

io threads#

When using RonDB as an in-memory engine without compressing backups and local checkpoints, the CPU usage in io threads is very small. Adding compression to backups and/or local checkpoints using the CompressedBackup and CompressedLCP configuration parameter quickly increases the CPU usage in the io threads significantly. Similarly using disk data means that io threads gets a lot more work.

The io threads can use many CPUs concurrently.

Thread setup#

Automatic Thread Configuration#

For almost all users we recommend setting the configuration parameter AutomaticThreadConfig to 1. This means that RonDB will automatically calculate the number of threads of various types.

ndbmtd will use OS calls to figure out how many CPUs that are available to the RonDB data node. By default this is all CPUs in the server/VM. However if ndbmtd was started using e.g. numactl one can limit the number of CPUs that are accessible to ndbmtd. ndbmtd will respect this limitation and not use all the CPUs in the server/VM.

Another manner is to set the configuration parameter NumCPUs, this will limit the number of CPUs to the number set in this parameter. However no CPU locking will occur if this is set since it is assumed that ndbmtd is executing in a shared environment with other programs on the same server/VM. This is heavily used for RonDB testing.

Thread setups using ThreadConfig#

Actually the ldm, query, tc, main, rep, send are all optional threads. At least one recv is always around. io are always there as well.

There are a few common possibilities to setup a RonDB data node.

Data node up to 4 CPUs#

In a small data node there is no room for send threads, main, query and rep threads.

In the very smallest data nodes it is also a good idea to combine the tc into the recv thread. Support for this was added in RonDB 21.10.0.

In earlier versions of RonDB it was possible to use a recv thread and a main thread that combined ldm, tc, main functionality. However most of the processing happens in the ldm, thus it is a good to have a separate ldm thread.

Already at 4 CPUs it becomes a good idea to add a tc thread, thus using 1 recv, 1 tc and 2 ldm threads.

Alternative setups#

RonDB data nodes also support running all threads in the recv threads. However comparing 2 recv threads with a setup using 1 recv thread and 1 ldm shows that the separated thread setup is more efficient in most cases. The only case when the use of 2 recv threads wins is when the recv and tc functionality uses very small amounts of CPU. In this case the 2 recv thread setup simply wins because it has a balanced use of CPUs although the use is still more inefficient.

Communication between threads in ndbmtd#

Communication between threads uses highly optimised code paths in RonDB. We use lock-free communication where each communication buffer is only used for communication between two threads. This means that we can use a single-reader and single-writer optimisations that avoid using any locks when communicating, only memory reads and writes combined with memory barrier operations is sufficient. This part of the code is the main portability hurdle for RonDB. This code is currently working on x86 servers and SPARC servers. Some attempts have been made to make it work on POWER servers, but it has not been completed. It currently works also on ARM servers although it isn't tested at the same level as x86 is tested.

To wake up other threads we use futexes on Linux that are combined with special x86 instructions. On other OSs and other HW we use normal mutexes and condition variables to wake up other threads.

In a configuration with many threads in the data node, there will be significant amount of memory allocated to the communication between threads.

In RonDB we change to using a mutex on each job buffer when going beyond 64 threads. With more threads the cost of scanning all memory buffers becomes higher than the cost of using a mutex.

Scheduler in ndbmtd#

Each block thread (ldm, tc, main, rep and recv threads) have a scheduler that executes signals. It has settings that can be changed through the configuration parameters SchedulerResponsiveness. These settings define how often we will flush signals to other threads and nodes. Flushing often have a negative impact on throughput, but can have a positive impact on latency. Flushing seldomly means that latency increases and throughput can increase. The scheduler supports delayed signals through a special time queue.

What one always find when running various benchmarks is that RonDB as a distributed system is a highly coupled system. So to get best performance and latency one has to find the right balance of resources.

The MaxSendDelay is a parameter that works well in very large clusters where we need to constrain the nodes from sending to often. Sending too often will use too much resources in other nodes. To get good performance it is important that a node isn't running so fast that it sends smaller packets that gives other nodes more work to do. This will have a negative impact on system performance.

In RonDB the MaxSendDelay configuration parameter have been removed, the functionality is now automated, thus the above delays on sending will happen automatically as the cluster load goes up.

Single-threaded Data Nodes, ndbd#

Single-threaded Data nodes is no longer supported by RonDB and no longer works in RonDB. All the functionality of ndbd can be achieved using ndbmtd.

The ndbd was the original data node program that was developed for NDB. In those days a large server had two CPUs and about 1 GByte of memory. Thus machines of today have a lot more CPUs. This means that a multithreaded approach was required.

Detailed RonDB Internals#

This chapter mentions a few basic foundations for the software design in RonDB. We have already touched upon that we use the fail fast model for software development. This is natural when you expect that your nodes are always setup in a replicated fashion. It is not so natural when you have downtime associated with each node failure.

Internal triggers in RonDB#

In Mikael RonstrĀ oms Ph.D thesis he developed quite a few algorithms for node recovery, for online schema changes and so forth. Most of those algorithms are based on a trigger approach.

In RonDB we make use of triggers for a wide range of topics. We use triggers to maintain our ordered indexes (T-trees). Thus every change on a column that is part of an ordered index will generate an insert trigger and a delete trigger on the ordered index.

We use triggers to maintain our unique indexes. We use triggers to maintain our foreign keys. We use triggers for various meta data operations such as when building an index as an online operation.

When adding nodes we dynamically add more partitions to parts of the table. These partitions are built as an online operation and we use triggers to maintain these new partitions. We use triggers to maintain fully replicated tables.

The nice thing about our triggers is that they are very well integrated with our transactional changes. RonDB does almost every change in a transactional context. This means that using triggers makes it is easy for us to perform additional changes required by online meta data operations and maintaining indexes and so forth.

There are asynchronous triggers as well that are used to maintain our event reporting APIs that are used for replicating from one cluster to another.

Transporter Model#

All our communication methods are located in a transporter. Transporters interact with the remaining parts of the virtual machine in the data nodes. The virtual machine need not worry about what technique we are using to transport the data.

Over the years we have implemented transporters for Dolphin SCI technology, shared memory, TCP/IP and a special transporter for OSE/Delta messages.

The SCI transporter is no longer needed since Dolphin technology can be used with Dolphin SuperSockets. These provide communication to another within less than a microsecond and is even better integrated with the Linux OS compared to using a native SCI transporter.

The shared memory transporter was revived in MySQL Cluster 7.6. This greatly improves latency when colocating the NDB API nodes and the RonDB data nodes.

RonDB runs well on Infiniband as well, previously we used SDP in the same fashion as for Dolphin SuperSocket, but now it is more common to use IPoIB (IP over InfiniBand).

Memory Allocation principles#

Historically memory allocation was a very expensive operation. Thus to get the optimal performance it was a good idea to avoid allocating small amounts of memory.

This reason for managing your own memory is smaller with modern malloc implementations that scale very well and have little overhead. However using malloc to extend the memory in a process can still cause extra latency.

Managing your own memory is however still a very good idea to ensure that you can get the best possible real-time experience. It also greatly improves your control over the execution of the RonDB data nodes. Since RonDB data nodes allocate all the memory at the startup and it can even lock this memory it ensures that other programs executing on the same machine cannot destroy the setup of the RonDB data node.

The preferred method for using RonDB is to set the LockInMainMemory parameter to 1 such that the memory is locked once allocated. RonDB data nodes allocate all its memory in the startup phase. Once it has allocated the memory it maintains the memory itself.

Allocating memory dynamically provides a more flexible environment. It is much harder to provide the highest availability in a flexible environment. Most every program in the world can get into hang situations when overallocating memory. Overallocation of memory leads to swapping that slows the process down to a grinding halt. So even if the process still is considered up and running, it is not delivering the expected service.

In contrast the RonDB data nodes takes the pain at startup, once the node is up and running, it won't release its memory, thus ensuring continued operation. Our flexibility instead comes from that data nodes can be restarted with a new configuration without causing any downtime in the system.

The MySQL Server and the NDB API use a much more traditional memory allocation principle where memory is allocated and released as needs arise.

Automatic Memory Configuration#

In RonDB we implemented automatic memory configurations. This means that we will use OS information to retrieve information about the amount of memory available in the machine. We will use almost all of the available memory. RonDB will decide how much memory is used for various purposes. RonDB will ensure that it is possible to create a large amount of tables, columns, indexes and other schema objects. It will ensure that it is possible to handle fairly large transactions and many parallel transactions. The remainder of the memory is divided between the memory of the page cache for disk columns and the memory used to store the actual in-memory rows (90% of the remaining memory is used for this purpose).

Angel process#

Automated responses to failures is important in RonDB. Our approach to solving this problem is to ensure that any node failure is followed by a node restart. We implement this by using an angel process when starting a data node.

When starting the data node it is the angel process that is started. After reading the configuration from the management server the angel process will fork itself into an angel process that simply waits for the running data node process to fail. The actual work in the data node is executed by the forked process.

When the forked process stops, the angel process discovers it and restarts the process and thus a node restart is immediately started after the node failure.

It is possible to configure the cluster such that we don't get those automatic restarts, but there is very little reason to use this configuration option.

Signal flows in RonDB#

We will go through some of the most important signalling flows to give an idea about what happens when a query is executed against RonDB.

For queries against the database there are three main flows that we use. The key lookup flow used to read and write data through primary keys or unique keys. The scan flow used to perform full table scans, range scans and pruned versions of these. We have pushdown join that is essentially linked key lookups and scan operations. Pushdown joins builds entirely on top of the key lookup and scan flows.

In addition there are a number of protocols implemented in RonDB to handle all sorts of node failures, node restarts, schema changes and so forth. We will go through a few of those and in some cases simply point out where one can find descriptions of these in the source code.

To maintain various secondary data structures we use triggers and these are integrated into the key lookup protocol and we will describe how this happens.

Key operations#

RonDB was designed with efficient key lookup transactions in mind. Almost all applications that was analysed before implementing RonDB had a major portion of key lookups. Most of them had a high percentage of writes as well.

One of the problems with DBMSs in the 90s was that the overhead of transactions was very high. This is still to a great extent true. For example in other MySQL clustering solutions there are limitations on how many transactions that can commit in parallel.

With RonDB each row operation is committed in exactly the same fashion if it is a single row transaction or if it is a transaction with 1000 rows being updated. We have worked on streamlining this implementation and there is absolutely no limitation on how many transactions that can commit in parallel. The only limit is the CPU resources, memory resources, networking resources and disk resources.

Basic write transaction#

We send one signal from the NDB API for each row to be read, updated, deleted, written or inserted. The same signal is used for all types of key operations.

The figure below shows the signal flow for a single row transaction.


The signal used is called TCKEYREQ and this signal is sent to any tc thread based on some hinting techniques described in the chapter on the NDB API.

DBTC uses DBDIH to discover the placement of all the live replicas in the cluster of the row. Next it sends LQHKEYREQ to DBLQH in the node where the primary replica belongs. The signal is sent directly to the proper ldm thread owning the partition where this primary row resides.

The idea with sending it to the primary replica first is to avoid deadlocks which would easily happen if the writes are sent to replicas in any order.

There is no special signal to start a transaction. Start of a transaction is indicated by one bit in the TCKEYREQ signal.

There is a commit signal that can be used called TC_COMMITREQ. In the signal diagram above it is used. It is not necessary to use this signal. One can also send a TCKEYREQ with the commit bit set. In this case the transaction is committed immediately after completing the last operation and no more TCKEYREQs will be accepted in this transaction after receiving this bit in a TCKEYREQ signal.

The primary sends a signal to the backup replica, the first LQHKEYREQ contained the list of nodes to update. It sends a LQHKEYREQ, the code executed in the backup replica is almost exactly the same as executed in the primary replica, both replicas will update the row in the prepare phase. The new row will be stored in a copy row, the old row is kept until the commit point. This means that readers that want to see the latest committed row state can always proceed and perform the read.

When all replicas have been updated the last replica sends LQHKEYCONF back to the DBTC that handles the transaction. In this case the TCKEYREQ had both a start bit and a commit bit set, thus the transaction will immediately be committed. We will get the GCI (global checkpoint identifier) of the transaction from DBDIH and send COMMIT to the backup replica. Backup replicas will simply record the commit state and send COMMIT to the next replica in the chain until it reaches the primary replica.

At the primary replica it will release the lock and commit the row change by installing the copy row into the row state and by writing the commit message into the REDO log buffer. After this any reader will see the new row state. Next the primary replica will send COMMITTED to DBTC.

In RonDB all tables default to use the Read Backup feature added in MySQL Cluster 7.5. For those tables the primary replica will not release the locks in the Commit phase, instead it will wait with releasing locks until the Complete phase.

When receiving the COMMITTED signal in DBTC the behaviour depends on if the table is a READ BACKUP table or not. If it isn't DBTC will send an ack to the API using the TCKEYCONF signal. In parallel it will send the COMPLETE signal to the last backup replica. In the complete phase the backup replicas will release the locks and commit the row changes. It will also release operation records and any other memory attached to the operation.

The primary replica will send COMPLETED back to DBTC. At this point the READ BACKUP tables will send TCKEYCONF. After executing the COMPLETED in DBTC the transaction is committed and we have released all records belonging to the transaction.

A final part is happening in the background. The NDB API when receiving TCKEYCONF indicating transaction commit will send TC_COMMIT_ACK to the DBTC. This will generate a number of REMOVE_MARKER_ORD signals. These signals clear transaction state that we needed to keep around when for some reason the NDB API didn't receive the confirmation of the commit decision.

A final note is that most of these signals are sent as packed signals. This is true for LQHKEYCONF, COMMIT, COMMITTED, COMPLETE, COMPLETED and REMOVE_MARKER_ORD. This saves 12 bytes of overhead for each of those signals in the transporter part. Cost of sending data over TCP/IP sockets is a large part of the cost of executing distributed transactions.

One more important lesson that I learned through a master thesis project more than 20 years ago is the cost of flexible protocols. The original idea for the NDB protocol was to use a standard telecom protocol using BER encoding. This turned out to be much too expensive. Thus we invented a streamlined protocol where all parts of the protocol are 32 bits in size. Thus we don't have to process one byte at a time, rather we can process 4 bytes at a time which is using the most efficient CPU instructions.

Most signals are fairly static, thus the code that reads the signal data knows exactly where to find each item. This makes the code very efficient.

A special note on key writes is that they can also read data while updating it. For example it is possible to send a delete operation that reads the record while deleting it. These operations can read either the values before the update or after and it is even possible to send back calculated values.

Basic read transaction#

The reads through a primary key uses TCKEYREQ as well. In this case the signal goes to DBTC and from there it is sent to one of the DBLQH blocks. For default tables it will always be sent to the DBLQH of the primary replica.

For READ BACKUP tables and fully replicated tables it depends on the type of the read. READ COMMITTED can be sent to any node with a replica. For reads using read locks or exclusive locks it is sent to the primary replica.


DBLQH sends the data immediately from the ldm thread back to the API using the signal TRANSID_AI. For READ COMMITTED there is no other signal sent to the API. I've highlighted the signals that are sent in this specific case in the figure above.

For all other reads there is a signal going back to DBTC called LQHKEYCONF to ensure that the read can be integrated in writing transactions, this signal will lead to a TCKEYCONF signal sent back to the API. TCKEYCONF can contain responses to multiple TCKEYREQ signals in one message.

Updates in fully replicated table#

Now let's look at how updates happens in a fully replicated table. The protocol is the same as for a normal write transaction. The difference lies in that a trigger is fired on the primary replica when updating the row. This trigger sends a signal called FIRE_TRIG_REQ to DBTC. The trigger contains the information to change in the rows. This information is packed into a number of LQHKEYREQ signals, one LQHKEYREQ signal per extra node group. The first LQHKEYREQ for fully replicated tables is always sent to the first node group. Thus in a single node group cluster the only difference for fully replicated tables is that the trigger signal is sent to DBTC from DBTUP.

The updates in the base fragment and the replicated fragments will then proceed in parallel and commit of each such replicated fragment will happen as if they were independent writes. But since they are part of a transaction the changes will be atomic, thus either all of them happens or none of them.


Using this trigger mechanism integrates the changes to the other fragments naturally into the transaction.

Maintaining a unique index#

Unique indexes are maintained using exactly the same technique as updates are performed in a fully replicated table. In this case there will be only one LQHKEYREQ to update the fragment where the unique key resides. A special case is a unique index in a fully replicated table, in this case the unique index trigger fires a transactions similar to the update in a fully replicated table.

Building new partitions#

When executing the SQL statement ALTER TABLE t1 REORGANIZE PARTITIONS we will reorganise the table to use the new nodes in the cluster.

While building the new partitions we will have triggers that ensure that operations on the table partitions that are reorganised will be sent to the new table fragments. These fragments are normally present in new nodes in the cluster.

This follows the same type of protocol as for fully replicated tables. In this case only rows that are moved to the new table fragments will execute those triggers.

Asynchronous operations#

Asynchronous operations in the NDB API is not the standard method to use the NDB API. But it is a very efficient method that at times have been shown to outperform other methods by as much as 10x.

The reason for a higher efficiency can easily be seen in the below figure showing the protocol at a higher level.


In this mode of operation we receive multiple operations on the table in a small number of TCP/IP packets. Next those processed signals send a batch of LQHKEYREQ signals. In principle the protocol is the same as before, but now everything is heavily parallelised. By sending 200 updates in a batch instead of 1-2 updates at a time, we can gain up to 10x higher throughput.

Asynchronous operations can only be key operations at the moment.

Unique index read operations#

Unique indexes can be used for both updates and reads. As mentioned before they constitue a challenge since they can cause deadlocks since reads of the index and updates of the base table can interact with primary key updates of the base table in sequences that can cause a deadlock.


The signalling is easy, it uses a normal read of the unique index table to get the primary key. This key is read back to DBTC that will use the key to start up a normal TCKEYREQ to read/write the row.

Table scan operations#

The second class of query operations that RonDB handles are scans of a table. Scans can use a full table scan if no indexes are present. Scans can use ordered indexes to get data in index order. There are three types of full table scans, one uses the hash index in DBACC, a second one scans in rowid order, a third one scans in disk order in DBTUP. Currently the scan in disk order isn't enabled.

Basic scan operation#

All scan operations use the same base protocol. It starts by sending SCAN_TABREQ from the API to the choosen DBTC. DBTC discovers where the partitions of the table reside. Next it sends SCAN_FRAGREQ in parallel to all table partitions. An exception is for pruned scans that only scan one partition. It is possible to limit the amount of parallelism using a parameter in the SCAN_TABREQ signal.

SCAN_FRAGREQs are sent in parallel to a set of DBLQHs that house the partitions. DBLQH has a local scan protocol to gather the data using DBACC, DBTUP, and DBTUX blocks. This protocol is local to one thread in the data node.


Each signal execution in DBLQH will gather up to five rows. This takes roughly from 2-3 microseconds to 20-30 microseconds or more. Thus even if a scan goes through 1 million rows, it will not cause real-time issues for other scans or key lookup operations. Such a scan will be divided up into hundreds of thousands of signal executions automatically. Thus millions of operations can execute on the same data in parallel with the scan operation even in this case.

DBTC serialises communication with the API. The communication between DBTC and the NDB API has one talker at a time. When one of the DBLQH blocks sends back SCAN_FRAGCONF to DBTC, two things can happen. If the NDB API is still waiting for input from DBTC one sends a SCAN_TABCONF signal back to the API immediately. If we have sent SCAN_TABCONF and are waiting for the SCAN_NEXTREQ signal to proceed we will queue the scan results in DBTC. When SCAN_NEXTREQ arrives we will immediately send a new SCAN_TABCONF to the API if any results were queued up. At the same time we will send SCAN_NEXTREQ to the DBLQH blocks that already reported ack to the API.

SCAN_TABCONF will normally report back rows read from multiple DBLQHs at a time. How many is completely dependent on current system load.

SCAN_TABREQ carries two signal parts. One part is the interpreted program to execute on each row scanned. The second is an index bound (only used by range scans).

The interpreted program carries out filtering and will report back whether the row read was sent back to the API or if it was skipped. The actual row data is sent directly from the ldm thread to the API using the TRANSID_AI signal.

The API can always send SCAN_NEXTREQ with a close flag if it decides that it has seen enough. This will start a close protocol of all the outstanding fragment scans.

Similarly DBTC can close a scan in DBLQH by sending a SCAN_NEXTREQ signal with the close flag set.

Range scans#

Range scans are by far the most common scan operations. It carries an index bound that can contain more than one range. All rows that lies within those index bounds will be scanned.

In executing pushdown joins it is very common that DBSPJ creates scans with multiple ranges to execute for one scan.

Pruned Scan operations#

Scans have two options, either they can scan all partitions of a table, or they can scan just one partition of the table. The scans that only scan a single partition are called pruned scans. The reason that they only need to scan one partition is that they have knowledge of the partition key.

This is similar to the design in sharded databases. In a sharded database it is necessary to know the sharding key to be able to query the data. Otherwise all shards must be queried.

This is exactly how RonDB works as well. A shard in RonDB is here a table partition and the sharding key is the partitioning key.

Designing a scalable solution with many data nodes and many ldm threads requires use of a carefully selected partition key on the tables.

Full Table scans#

Full table scans are necessary when no keys are present. Even in this case RonDB can scan fast. Since scans are automatically parallelised we can easily scan many millions of rows per second if the query uses a filter that ignores most rows. RonDB is capable of shipping many millions of rows per second to the NDB API as well for further analysis.

Pushdown joins#

Pushdown joins are implemented on top of the key lookup protocol and scan protocol. They use a linked join algorithm where keys for the tables down in the join are sent to the DBSPJ block for use with later scans and key lookups. Columns that are read are immediately sent to the NDB API for storage in memory there waiting for the rest of the queried parts that are fetched further down in the join processing.

Foreign Keys#

Foreign keys have two parts. The first part are the consistency checks that the foreign key defines. The checks are executed in a special prepare-to-commit phase executed when starting to commit. If this phase is successful the transaction will be committed, otherwise it will be aborted.

The second part is the cascading actions. These are executed exactly as the other triggering operations. One special thing to consider about foreign keys is however that triggers can fire recursively. One small change is capable of triggering very massive changes. This is not desirable, it is important to consider this when defining your foreign key relations.

Table Reorganisation#

In the block DBDIH in the file DbdihMain.cpp, there is a section called Transaction Handling module. This describes exactly how the global data in DBDIH used for query processing is maintained. In particular it describes how table reorganisations are executed in RonDB. It contains a description of the signal flows for this algorithm.

Meta data operations#

Meta data operations starts with a phase where the meta data transaction is defined by using signals such as CREATE_TABLE_REQ, DROP_TABLE_REQ, ALTER_TABLE_REQ, CREATE_FILE_REQ and so forth. These signals are spread to all data nodes and are processed by the DBDICT block.

After defining the meta data transaction, there are a great many steps for each of those meta data parts where there is a number of different phases to step forward, there is also a set of steps defined to step backwards when the schema change didn't work.

These steps are implemented by a signal SCHEMA_TRANS_IMPL_REQ and its response SCHEMA_TRANS_IMPL_CONF. All the nodes steps forward in lockstep until all phases have been completed. If something goes wrong in a specific phase, there is a well defined set of phases defined to abort the schema transaction. Obviosuly aborting a meta data operation is a lot more complex than a normal transaction.

Cluster Manager operations#

The node registration protocol and the heartbeat protocol following it are described in some detail in the Qmgr block in the QmgrMain.cpp file in the RonDB source code. It contains signal flows for this algorithm.

Local checkpoint protocol#

The local checkpoint protocol is actually a number of protocols. The first step is a protocol using signals TC_GETOPSIZEREQ/CONF, TC_CLOPSIZEREQ/CONF. These are used to decide when to start a new LCP based on the amount of changes in the data nodes.

We have to acquire a schema lock also for a short time to decide on which nodes to integrate into the LCP. Only the tables defined at the start of the LCP will be checkpointed. New tables created after starting a local checkpoint will have to wait until the next local checkpoint.

One important part of the LCP protocol added in 7.4 is the pause LCP protocol. This ensures that we can pause the LCP execution when a node is starting up to copy the meta data in the cluster. There is a long description of this protocol and the LCP protocol itself in the block DBDIH, in the source code file DbdihMain.cpp. The aim of this protocol is to ensure that DBLQH stops sending LCP_FRAG_REP signals that will change the state of distribution information in DBDIH. This protocol is only required during restart of a node.


The above figure shows the execution phase protocol of a local checkpoint. It starts by sending START_LCP_REQ to all nodes. Before sending this signal we acquired a mutex on the distribution information in DBDIH. The aim of this signal is to set the flags on what table fragments that will perform this local checkpoint. Since we have locked the distribution information all nodes will calculate this locally. After completing this preparatory step of a local checkpoint each node will respond to the DBDIH block in the master node with the START_LCP_CONF signal.

After receiving this signal from all nodes we have finished the preparation of the local checkpoint. We can now release the mutex.

The next step is to start sending the signal LCP_FRAG_ORD to DBLQH. Each such signal asks DBLQH to checkpoint a specific table fragment. Many such signals can be outstanding at a time, they are queued up if DBLQH isn't ready to execute them. DBDIH will send lots of those signals to give DBLQH the chance to control the speed of executing the local checkpoint.

Whenever DBLQH finishes execution of a local checkpoint for a table fragment it will send the signal LCP_FRAG_REP to report about the completed checkpoint. This signal is sent to all DBDIHs (one per node).

When the master DBDIH have completed sending all LCP_FRAG_ORD to all nodes, it will send a final LCP_FRAG_ORD with the last fragment flag set to true. This indicates to all DBLQHs that no more checkpoint requests will be sent. Thus when DBLQH finishes executing the last checkpoint in the queue and this signal have been received, it knows that its part of the local checkpoint execution is completed.

At completion of local checkpoint execution in a DBLQH it will send the signal LCP_COMPLETE_REP to all DBDIH blocks. This signal will first go to the DBLQH proxy block, only when all local DBLQH instances have sent this signal will the proxy block send this to the local DBDIH block, that in turn will broadcast it to all DBDIH blocks. This signal will be marked by DBLQH completing its part of the local checkpoint execution.

When a DBDIH block have heard this signal from all nodes and it has completed saving the changes to the distribution information from local checkpoint executions, it will send LCP_COMPLETE_REP to the master DBDIH block. This signal will be marked that DBDIH has completed its part of the local checkpoint execution.

When the master have received this signal from all DBDIH blocks, it will send LCP_COMPLETE_REP to all DBDIH blocks with a 0 indicating that now the entire local checkpoint execution is completed in all nodes.