Skip to content

RonDB Recovery Architecture#

RonDB recovery architecture is an important part of the RonDB internal architecture. This is what makes it possible for RonDB to deliver availability of 99.9999% (Class 6 availability).

There are a number of parts of the architecture that work collectively to achieve this. To start with there is a transaction protocol, this is based on a combination of a standard two-phase commit protocol and a linear commit protocol to minimize the number of messages sent during the commit phase. We perform a lot of piggybacking of these messages to ensure that packets sent are as big as possible.

The standard two-phase commit protocol is blocking at node failures, this is clearly not a desirable feature of a highly available DBMS. To avoid this we have added a transaction discovery protocol at node failures. When a transaction coordinator fails, the information about the transaction is lost. To rebuild this information a new take-over transaction coordinator is selected. It is always the master data node (the oldest data node living gets this responsibility and more specifically the first tc thread instance in this data node). This new transaction coordinator will retrieve information from all Local Data Managers (LDMs) about any transaction they had ongoing with the failed coordinator. A part of this information is the state of the transaction. Using this information the new transaction coordinator is able to complete all transactions that were ongoing in the failed transaction coordinator. In addition it will inform the API about the outcome of the transactions.

RonDB have a special signal called TC_COMMIT_ACK. This signal is used to ensure that we keep information about the outcome of the transaction until we're safe that the API have heard about it. Thus the API should always hear about the transaction outcome if it was positive. An absence of any message about its outcome means that the failure happened before the transaction had completed and is thus an indication of an abort.

If the API node fails during the transaction there will be no one to report the outcome of the transaction to. It is possible to solve this as well. But this requires an application level solution where each transaction need to write e.g. the transaction id into a special transaction table. Every transaction would have to insert a row with the transaction id as key. After completing the transaction and after ensuring that all parties have been informed of its success the row must be deleted in a separate transaction.

The handling of failed transaction coordinators can be divided up into multiple phases if very many transactions were ongoing at the time of the node crash.

Given the requirements on response times on the order of hundreds of microseconds it was clear that committing write transactions to disk in the commit phase isn't desirable. Rather we ensure that a transaction is safe by committing it in several computers (nodes). This is called Network Durability. Rather than committing to persistent media we commit to multiple network locations.

At the same time it is necessary to restore a consistent database after a complete cluster crash. To ensure this, we added a global checkpoint protocol. This can be seen as a type of group commit protocol. The global checkpoint protocol ensures that transactions are committed to a durable media. This is a phase that happens in the background after the user have received the commit acknowledge. The application can wait for a certain global checkpoint to complete if desirable.

We create a new global checkpoint by using a two-phase commit protocol. First we prepare all nodes for a new global checkpoint, at this point no transaction is allowed to pass the commit point, in the second phase the new global checkpoint is installed and transaction can start committing again. Thus each transaction belongs to a global checkpoint and each transaction that is serialised after another transaction is belonging to a later global checkpoint or the same global checkpoint. This ensures that any recoverable global checkpoint is always a transaction consistent point.

As part of the global checkpoint we ensure that it is recoverable, this happens by forcing the REDO logs written as part of this global checkpoint to disk in all data nodes and later updating a system file that contains a recoverable global checkpoint identity such that we know which global checkpoint that is recoverable.

The recovery of RonDB is very much tied to the transaction protocol, the handling of failed transaction coordinators and the global checkpoint protocol. Now if we had access to eternal REDO logs this would be sufficient, but we don't, we need to perform checkpoints (we call those local checkpoints or LCPs). These checkpoints ensure that a version of each row is written to disk such that we can cut the REDO log tail and ensure that we don't run out of REDO log.

The local checkpoint protocol have to take special precautions to handle the fact that a row could consist of both main memory parts and parts that reside on disk. Now these four protocols (transaction protocol, failed coordinator protocol, the GCP protocol and the LCP protocol) take us a long way towards a recoverable RonDB. However more is still needed. First of all we need to know which tables, indexes, partitions, foreign keys and other meta data objects that are to be restored.

For this purpose RonDB have a schema transaction protocol. This allows for almost arbitrarily complex schema changes to be performed as online operations by performing a technique based on internal triggers.

In addition we need a heartbeat protocol and a node failure protocol. One important part of the distributed transaction theory is that node failures have to be handled as transactions. This means that any transaction have to be serialised in relation to the node failure. To achieve this we use the heartbeat protocol to quickly discover nodes that have failed and we use the node failure protocol to collectively decide on which set of nodes that failed together using a two-phase protocol. We detect node failures in many other ways such that most node failures that are detected by the heartbeat mechanism is either due to real hardware failures or are due to some sort of overload mechanism or an operating system that have stalled our process (was fairly common in early versions of virtual machine implementations).

These seven protocols takes us even further but even more is needed. All recovery protocols contain a master role. This master role must be quickly resumed by some other node when a failure happens. All nodes must select the same master node. Thus in order for this to always happen we have a node registration protocol where nodes always enter the cluster in a specific order that everyone knows about. This node registration protocol is used to select the new master always as the oldest member in the cluster. This registration protocol is part of the heartbeat protocol. An important part of all protocols except the registration protocol is that they all need to handle the following cases.

  1. Single node failure of participant

  2. Multiple node failures of participants

  3. Single node failure of master node

  4. Multiple node failures, including a failure of the master node

  5. Failure of new master in master take-over handling

When nodes fails and the master is still alive we need to maintain information in the master about ongoing state changes. If a node fails we will normally fake a response signal from the failed node to ensure that the protocol doesn't stop.

For all protocols where the master fails, we start by asking for information about protool state for this protocol in a discovery protocol. Thus the new master can build up the protocol state such that it can proceed with handling the failed nodes as part of the master takeover protocols.

These eight protocols are the ones that ensure that the protocol to start up nodes can work. We will in addition describe two additional parts of the recovery architecture, the first is the graceful node shutdown protocol and the second is the watchdog mechanism.

The startup as such is in a sense a protocol as well and we will describe it. The startup protocol is a complex set of operations that are linked together, we will describe this in a separate chapter.

Now we will move into describing the ten protocols that form the foundation of the RonDB recovery architecture. Each of them has some set of complexity involved in it.

The code itself has fairly extensive documentation of the restart process. This documentation is describing the actual implementation whereas this description describes the workings on a functional level fairly independent of its implementation. The restart process is described in the source file storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp.

Transaction protocol#

As mentioned we use a variant of the two-phase commit protocol with a special protocol to handle failure of transaction coordinators.

The protocol relies on that we always go first to the primary replica before we enter any backup replicas with a write operation. This has the advantage that the transactions are serialised on the primary replica rows. Going to all nodes in parallel means that the risk of deadlock is much higher since we could have two transactions performing a write operation on the same row and coming to the rows in different order they might both be successful in locking the rows in the first replica and thus both will face a deadlock when arriving at the second replica. This is avoided by all operations first going to the primary replica. This is a well known technique in distributed transaction theory and is called primary copy locking. Using this technique it is possible to prove that it is possible to create serialisable transactions. To create fully serialisable transactions one need to implement range locks which RonDB doesn't. RonDB will perform serialisable transactions as long as no updates of ranges are involved in the transaction.

The figure below shows the normal transaction protocol.


As one can see from this figure the order at commit time is the opposite. Here we commit first on the backup replicas whereafter we commit on the primary replica.

The idea with this order is that the commit point is reached when we arrive at the last replica for commit. Thus not until then are we free to release any locks on the rows in the transaction.

An alternative manner to handle commit is to send the commit messages to all replicas in parallel, in this case we need to wait with releasing locks until we get to the complete phase. This manner could decrease latency with more than 2 replicas.

REDO logging in transactions#

We get durability of transactions from the REDO log. At the time a transaction has committed all replicas have been updated, thus all reads after this commit will see the new data. Thus we are Network Durable after commit by relying on multiple computers to have the transaction stored in memory. The REDO log will be synchronized as part of the global checkpoint protocol. In order to make a transaction Durable both on network and on multiple persistent medias we rely on writing the REDO logs to disk as part of the global checkpoint protocol.

Transaction coordinator failure protocol#

When developing the transaction protocol it was obvious that the two-phase commit protocol as such is blocking. One variant used in another distributed telecom database developed in the 1990s was to use a backup transaction coordinator.

We weren't satisfied with this solution. This solution would still fail if we had two simultaneous node failures.

Instead we opted for designing a special failure protocol to rebuild the transaction state at failures. This is possible since the RonDB data nodes are part of an integrated system. This is not possible in most modern database solutions where each data server is an independent database as well. E.g. there is no manner for how to rebuild a transaction state in MySQL InnoDB Cluster, in Galera Cluster, in MongoDB or in any other database technology that builds a distributed database from a number of independent database servers. This technique was called federated databases in the past.

However in RonDB we can ask each transaction participant about its transaction state. The signal LQH_TRANSREQ is sent from the take over tc thread to all ldm threads in the cluster. Each ldm thread (or more accurately each LQH block instance) will go through all transactions that was ongoing. For each operation it finds that used the failed node as transaction coordinator it will send the information about this transaction to the take over tc thread.

When all operations have been found it will inform the take over tc that it has completed its scan of transaction operations.

Using this information it is possible to rebuild the transaction state and ensure that the transaction is either committed or aborted. It is not possible to continue the transaction after building up this state. The transaction must complete as part of the node failure handling, either aborting it or committing it.

The protocol is designed such that it can handle a subset of the transactions at a time if the take over tc doesn't have sufficient memory to handle all at the same time.

Global Checkpoint protocol#

When a transaction have committed it is necessary to ensure that it is still durable when all nodes of the cluster fails. The solution to this comes from using logs that are flushed to durable disk media. In RonDB it is sufficient to flush the REDO logs to disk to make the transactions durable.

However in flushing the REDO log records to disk it is important that we ensure that the restored data is transaction consistent. To recover a transaction consistent point it is not sufficient to randomly flush REDO log records. It is necessary to ensure that all transactions up to a certain point is restored and no transaction after this point.

RonDB creates such a point using the global checkpoint protocol. In RonDB we create a global checkpoint (identified by a GCI, global checkpoint identifier) regularly. By default a new global checkpoint is created once per 1-2 seconds. It is possible to create those more often and less often, if we create them more often less transactions are lost in a cluster crash at the same time we have to perform more disk flushes and execute the global checkpoint protocol more often.

The global checkpoint protocol is controlled by the master node in the cluster (the oldest node in the cluster). It starts by sending a GCP_PREPARE message to all nodes in the cluster. At reception of this message the node stops committing transactions. It is still ok to continue preparing transactions and it is still ok to continue committing transactions that have already passed the commit point. Each node replies to the master indicating that they have completed the GCP prepare phase.

The master node waits for all live nodes to respond back, when they have, the master node immediately sends a GCP_COMMIT message to all live nodes. At reception of this message the queued transactions can start up again, they will get the new GCI as will any transactions committed after receiving this message.

Using this protocol we know that any transaction in GCI n+1 is serialised after all transactions belonging to GCI n and earlier GCIs. The reason is that at the point when a transaction reached the commit point (at this point it has acquired all necessary locks) for a transaction in GCI n+1 it knew that all transactions belonging to GCI n will either be before it in serialisation or independent of it. If it is independent of it, it is serialisable to it since it can be placed before both and after the transaction and thus it can surely be placed after. If there is a dependency it is certain that transactions belonging to GCI n to be before since they acquired the lock definitely before any transaction belonging to GCI n+1 due to the global checkpoint protocol where we ensure that all nodes freeze commits for a short time to ensure that we get a serial order of global checkpoints.

The prepare and commit phase have done the job of ensuring that we have a transaction consistent point available for recovery.

Before a global checkpoint can be used in recovery it has to be flushed to disk first.

The flushing happens in a second phase of the global checkpoint protocol. At first we send a message to all nodes instructing each node to send a response back when all transactions belonging to the given GCI has completed. When all nodes have responded to this message we are sure that all transactions belonging to the GCI have either reached the REDO log or its buffers in all nodes. This phase is combined with the GCP_COMMIT phase. There is no need to immediately respond to GCP_COMMIT so we respond when all transactions belonging to the GCI have completed.

If a large transaction is committed as part of this GCI, this wait can take some time since a lot of commit and complete messages have to be sent as part of this phase. Normally it will be very quick. The only impact of this delay is that it will take a bit longer to make the GCP durable on disk.

The next step is to send GCP_SAVEREQ to all nodes instructing them to flush all log parts up to the GCI. When all nodes have responded to this message we know that the GCI is recoverable.

It is perfectly possible that some log records belonging to GCI n+1 is flushed to disk as part of this. There is no serial order of GCPs in one specific REDO log. This is not a problem, at recovery we know which GCI we are recovering and we will skip any log records with a higher GCI than the one we are recovering.

The final problem is to ensure that we can easily discover which GCI is recoverable. This phase sends a COPY_GCIREQ message to all nodes, this message will write a special file called P0.sysfile. It is written in a safe manner in two places to ensure that it is always available at recovery. This last phase isn't absolutely necessary, but to avoid it would require scanning all REDO logs before we start recovery, we opted for this simpler variant instead.

These four messages back and forth from the master node will ensure that a global checkpoint is recoverable.

The global checkpoint protocol is a corner stone of the recovery architecture in RonDB.

The first two phases are used for creating micro GCIs. These micro GCIs are used for creating epochs in RonDB Replication. Thus we replicate one micro GCI at a time over to the replica cluster.

Local checkpoint protocol#

Up until version 7.5 of NDB the local checkpoint protocol used full checkpoints. When NDB was designed originally a machine with 1 GByte of DRAM was a large memory size. Thus handling of checkpoints of a few GBytes wasn't seen as a problem. As memory sizes started to grow into hundreds of GBytes this started to become an issue.

Thus it was deemed absolutely necessary to implement a local checkpoint protocol that was based on partial checkpoints instead. At the same time it was desirable to make the checkpoint speed adapt to the environment it is running in. Also instead of only being able to restore from the start of the checkpoint, the checkpoints are restorable as soon as a checkpoint of a table partition has completed.

One more issue that needed to be handled was that the checkpoints was a distributed protocol where all nodes participates. This is ok when all nodes are in the same situation. But a starting node usually has to checkpoint a lot more data than the live nodes since it hasn't been participating in a number of checkpoints while it was down.

Thus there was a need to support checkpoints only handled locally during node restarts and most particularly during initial node restarts. These checkpoints enable the starting node to catch up with checkpoints that it wasn't part of while being down. Normally one such extra checkpoint is enough, but in rare cases multiple ones might be required.

This also ensures that our restarts fail since we run out of UNDO log for the disk data columns.

All these things were implemented in NDB version 7.6 and is thus part of RonDB today.


Considerations for Partial checkpoints#

The requirements of the design was to support DataMemory sizes up to 16 TBytes. Going beyond 16 TBytes will be possible, but will require dividing the checkpoints into even more parts than the maximum of 2048 implemented in RonDB. Increasing the number of parts will lead to larger control files, but in reality is not a major concern.

Thus the implemented algorithms for checkpoints should scale far beyond 16 TBytes as well when this limit becomes an issue.

Another important requirement was to decrease the size of checkpoints on disk. In a full checkpoint it is necessary to always keep 2 versions of the checkpoint. Thus the size on disk is always at least two times more than the size of data.

It is desirable to get this down to around 1.5 times the size of data. If the checkpoints are also compressed, the size can decrease even further.

With full checkpoints the REDO log has to be very large to accomodate the case when the application is loading the database. With partial checkpoints the requirement is to be able to always handle the situation with a fixed size REDO log. We have found that a size of 64 GByte of REDO log can handle more or less any load situations.

We tested with loading 53000 warehouses in TPC-C (5 TByte of data) into RonDB using a REDO log of 64 GByte with very high speed in the loading. This worked like a charm. With full checkpoints this would have required a REDO log of almost 10 TByte since eventually the checkpoints won't be finishing during the load process since data keeps growing faster than we can checkpoint it.

Thus the requirements of disk space with compression goes to below the data size and without compression it stays below two times the data size.

Even for a DBMS that primarily stores data in-memory it is important to save disk space. Most VMs in the cloud that use NVMe drives have a size of the NVMe drives that is close to the memory size. Block storage in the cloud is fairly expensive, thus it makes a lot of sense to minimise the requirements on required disk space.

Thus although sizes of SSD's keeps growing and grows faster than the size of memories, most servers keep the same ratio between memory and storage.

Row-based vs Page-based checkpoints#

When implementing a partial checkpoint one alternative is to use a traditional checkpoint using pages and only checkpoint those pages that have changed. This was a clear alternative that would be simple and RonDB already has this type of checkpoint available for the disk columns.

However some math on the impact of page-based checkpoints showed that this approach wasn't a good idea.

Let's assume that database rows are 300 bytes each, the data size is 1 TByte and page size is 32 kBytes (this is the standard page size we use in RonDB).

Assume that a checkpoint takes about 10 seconds and that we execute 1 million updates per second. The data fits in 32 million pages, thus most pages will only be updated once. Let's say we thus update around 8 million pages in 10 seconds.

All these 8 million pages need to be written to disk during a checkpoint. This means that a checkpoint will need to write 256 GByte per checkpoint and thus around 25 GByte per second. There are almost no servers existing that can write even to NVMe drives at this speed.

Assuming that we cut the page size to 8 kBytes we would still arrive at more than 6 GByte of write speed.

Now the same calculation using a Row-based checkpoint shows that we need to write all changed rows, thus 3000 MByte of data. On top of this we also need to checkpoint a small part of the data fully. In this type of test we expect it to use the smallest possible size here which is in 1 out of 2048 parts per checkpoint. This means an additional 500 MByte of data.

Thus we arrive at a total of 3500 Mbyte to checkpoint, thus 350 MByte per second. This is definitely within reach even for a block storage using gp3 (general purpose block storage version 3) in AWS that maxes out at 1000 MByte per second when paying a bit extra. We also need to write the REDO log, but we are still within the limits of this type of block storage.

Thus we have shown that this workload can easily be handled without requiring specialised storage solutions in a cloud environment.

From this calculations one could quickly derive that it was a very good idea to pay the extra cost of the complexity of a row-based partial checkpoint. We showed that the row-based checkpoint is 20x more efficient when using 8 kByte pages and even 80x more efficient in this example when using row-based checkpoints.

Partial checkpoints#

Size vs checkpoint speed of partial checkpoints#

Each partial checkpoint operates on one table partition at a time. The actual checkpoint processing happens in the data owning ldm thread. Thus it is important to ensure that we regulate checkpoint speed based on the current CPU usage of the ldm thread. Thus the higher the CPU usage, the smaller the amount of checkpoint processing.

At the same we cannot stop checkpoint processing, this could create a situation where we run out of REDO log and having to write the checkpoint at very high speed. Thus we need to maintain a balanced checkpoint speed that takes into account amount of REDO log available, amount of CPU resources available and even the current load on the disks used for checkpoint writes.

The checkpoint scan needs to write all rows that have changed since the last checkpoint plus a subset of all rows. The subset of all rows can be 1 part of 2048 parts up to all 2048 parts. The calculation of how many parts to write fully uses a mathematical formula that tries to ensure that the total size of checkpoints are no more than 1.6 times the data size. This number can be adapted through the configuration variable RecoveryWork. It is set to 60 by default meaning that we strive to have 60% overhead on the data size. It can be set to a value between 25 and 100. Obviously decreasing this value means that we save disk space. However it also means that we need to write more parts of all rows and thus it increases the write speed of checkpoint.

Thus it isn't recommended to adjust parameter very much unless one is very certain that one can allow higher checkpoint speeds or can allow higher use of disk space. The mathematical functions are of the for 1 DIV (1 - x). Thus the increase isn't linear, it is much faster than this.

One particular time when the disk subsystem is heavily used is during load operations such as when the entire database is reloaded from a backup or from another database. In this case we have a configuration parameter called InsertRecoveryWork that can be set higher to decrease the load on the disk subsystem at the expense of temporarily using more disk space. It is set to 40 by default. It can be set lower as well to decrease disk space usage, this will further increase the load on the disk subsystem.

The writes to the checkpoint files use fairly large buffers and will write normally 256 kByte at a time which is the maximum size of an IO operation in many clouds. The REDO log is normally written with the same size although this requires a fairly high load to be achieved.

Partial checkpoint scans#

A partial checkpoint is basically a diff between what existed in the table partition at the time of the previous checkpoint and its current content. We have to take into account completely new pages, pages that are no longer in use. To ensure that we don't lose any vital information we retain the page map information also for dropped pages. The page map contains 4 bits of vital information for checkpoints.

The normal case is obviously that the page existed before and that the page still exists. In this case we scan each row id in the page and check whether the row has been changed since the last checkpoint by checking the timestamp on the row. This timestamp is implemented as a GCI (global checkpoint identifier). We know the GCI which was fully incorporated in the previous checkpoint. All rows with a GCI that is higher than this will be recorded in this checkpoint.

To make the scanning more efficient we record change bits in the page header. This ensures that for example a page that hasn't been touched since the last checkpoint scan will be quickly taken care of. This improves the speed of checkpoints up to a hundredfold.


Thus in the figure above we can see that the above method is used when both old and current have existing pages.

In the figure above we are missing a page in the old checkpoint, all the rows in this page must thus be written as part of the current checkpoint.

We have two pages that are missing in the current checkpoint that was present in the old checkpoint. In this case we issue Delete Page instead of a set of Delete Row commands.

The final point here is that we have more pages in this checkpoint than in the previous checkpoint. Again these pages must be fully written in the checkpoint since all rows in those pages are new pages.

There is one thing more to consider. If a transaction writes a row before it has been scanned, then we need to write the row to the checkpoint before the old row data is dropped. This includes setting a bit to ensure that the row is skipped later when the row is scanned.

The reason for this is that a checkpoint is consisting of the rows at a very exact point in time.

Partial checkpoint disk files#

Checkpoint files are stored in the RonDB file system. The RonDB file system can be divided into a number of parts, but checkpoints are always stored in the main RonDB file system as set in FileSystemPath configuration variable. In the top RonDB file system directory there is a directory called LCP. In this directory we have a number of directories called a single number from 0 and up to 2064.

Directory 0 and 1 are used to store the control files for each table partition. These control files are called e.g. T13F3.ctl. This means that the file is a control file for table 13 and partition 3. There can be such file in directory 0 and one in directory 1. There are only two files temporarily, as soon as the new one is fully safe, the old one is removed. There is no mapping from table name to table id in these files and similarly there is no column information in those files. Thus the files are completely binary and require other information to be useful.

The data files are named T13F3.Data. These files are stored in increasing directory numbers. Thus the first checkpoint is stored in directory 0, the next in directory 1 and so forth. As soon as a data file is no longer needed, the file is deleted from the file system. The number of data files can be 0 immediately after the table was created and between 1 and 2048 when the table has been in use for a while.

Starting point of partial checkpoint#

The starting point of a checkpoint is an important point. The data in the database at start of the checkpoint must be retained in a fuzzy manner in the checkpoint on disk. What this means is in this case is that exactly the rows that are present at the start of the checkpoint of a fragment must exist in the fragment checkpoint on disk.

If a row doesn't exist at the time of the start of the checkpoint it cannot exist in the checkpoint on disk either. The actual row data might be different. After restoring the checkpoint we will execute the REDO log to bring the data content to the global checkpoint that the data node had written before it crashed.

In a system restart this is the data recovered, in a node restart we also need to synchronize the data with a node that is alive with the most up to date version of the data.

Partial checkpoints also write the disk data parts. The references from the main memory row at start of the checkpoint and the reference from the disk row back to the main memory part of the row must be retained in the checkpoint on disk. The disk data reference from main memory row doesn't change unless the row is deleted. If a row is deleted before it was sent to the checkpoint, the row will be copied and queued up for immediate sending to the checkpoint on disk.

The disk data parts use an UNDO log to ensure that the data pages of the table partition has exactly the rows at start of the LCP. This is accomplished by writing an UNDO log record representing the start of the checkpoint into the UNDO log.

Next each change of the row is UNDO logged. Each time we make an update to a disk data page we will update the page LSN (Log Sequence Number). This ensures that when the page is written to disk we know which UNDO log records need to be applied and which can be ignored.

Write phase#

We perform a full table partition scan such that all rows will be retrieved. We have some special mechanisms to ensure that checkpoint writing has higher priority at very high loads to ensure that checkpoint writing is done at the rate we need it to be. If we have filled our quota for checkpoint writing we will back off from writing until we get more quota to write again. We get a new quota once per 100 milliseconds.

The local checkpoint simply takes the row format as it is and copies it to disk. The operation to write the row is very simple. It must copy the fixed size part of the row first and next the variable sized part of the row.

When the writing of the main memory parts are done we record the maximum global checkpoint identifier that was written during this write phase. The checkpoint is not usable for restore until this global checkpoint is restorable. We cannot remove the checkpoint files from the previous checkpoint of this table partition until we are sure that this global checkpoint is restorable.

Next step is to write the disk data pages that are dirty to disk if such exist in the table partition. To ensure that this writing is not too bursty we start by writing some pages already before the actual checkpoint starts.

The UNDO log will ensure that those pages are transported back to the start of the checkpoint at recovery, we need not care about what is written on those pages while we are checkpointing, the UNDO logger will handle that.

Long-running transactions#

Each operation in the REDO log consists of two parts. The prepare part and the commit part. We will execute the REDO log operation when we find a commit entry. The commit entry will refer to the prepare entry. The prepare entry is written at prepare of the write operation, if a long time goes by after preparing before committing the transaction we will have to keep the change records in memory as well as we cannot cut the REDO log tail beyond the prepare records we have written for prepared transactions. Long-running transactions can lead to that we miss finding prepare entries in the REDO log buffer when executing REDO logs. They can slow down restart execution.

RonDB isn't intended for long-running transaction applications. A certain amount of long-running transactions can be handled by proper configuration, but it isn't intended to perform transactions that takes several minutes to complete or that changes more than 10% of the database in one set of transactions running in parallel. Again it is possible to configure it for that, but it will use more memory and disk resources. A better approach is to divide your changes into appropriately sized transactions (can still be fairly large) that executes without awaiting user input.

RonDB 21.04.3 ensures that RonDB will be able to handle transactions with millions of rows involved. Again this is not what RonDB is optimised for and thus even if it will work it isn't recommended to execute so large transactions.

General thoughts on Partial checkpoints#

The execution of checkpoints is controlled by the master node. The nodes will send a report about completed checkpoints to all data nodes. We retain this information in the DIH block, and use it at recovery to decide which checkpoint to use in restore. In reality the ldm thread have enough information to find this information using the checkpoint files. Thus we are really only interested in knowing which table partitions to restore and which global checkpoint identifier to restore the data to.

A checkpoint only writes down the row content, no information about indexes is written to checkpoints, instead the indexes are rebuilt during restart.

The whole purpose of the checkpoint protocol is to cut the REDO and UNDO log, this happens after completing a partial checkpoint on all table partitions. The actual point in time when this cut is performed is at the start of the next partial checkpoint. At this point we move both the REDO log tail forward as well as the UNDO log tail.

Schema transaction protocol#

It is recommended that all tables are created from a MySQL Server. If they are created from the NDB API they cannot be used from the MySQL server since the MySQL Server creates an files that needs to exist for a table to be accessible.

In RonDB, schema operations are smaller than SQL statements. For example when adding a new table it might consist of a new table, a new hash map, a new set of indexes (hashed and ordered). It might also add foreign keys to the table.

In RonDB we have implemented the possibility to declare a set of those schema operations as a schema transaction. The schema transaction is executed in a number of phases, each phase have an abort operation as well that makes it possible to roll back a schema transaction.

Actually a schema transaction has a point of no return from where we will rather roll forward in case of a failure.

The implementation is fairly general and we can probably some time in the future even move towards schema transactions involving multiple tables. However for the moment it is limited to supporting the subset of DDL operations that MySQL supports for RonDB tables.

This schema transaction implementation is something we have worked hard on in many versions of NDB. The current implementation of schema transactions is the fourth generation of the meta data implementation.

The first phase was very simple and had most of the data hard coded, this implementation had severe issues already early. We substituted it with a new implementation that used a serialised format for sending over schema information. The third generation of the meta data implementation standardised on handling of schema operations where each operation had a set of methods and each of those methods had a callback used to execute it.

The fourth phase implemented a protocol that executes each step of the schema transaction in lock-step mode. This means that a schema transaction is defined in a specific order and we keep track of exactly what in this order that have completed and what haven't. This makes it possible at recovery to see exactly which steps that have already executed. Many of those steps also have to be synchronised to disk in the cluster.

Schema transactions are not super fast, but they are very reliable and can handle arbitrarily complex schema transactions.

A schema transaction has a commit point, if it passed this point the transaction will be rolled forward even in the case of node restarts and system restarts.

The most complex schema transaction we support is most likely the operation to reorganize a table to make use of new node groups added since the table was created.

This involves creating a set of new fragments on the new node groups. It also means that we have to copy over the data from the old table partitions to the new table partitions.

Node registration protocol#

The node registration protocol is an important part of the node recovery handling. We can only add one node at a time to the cluster. This protocol is described in some more detail in the source file storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp.

The nodes currently in the cluster must agree on the node to add next. The reason is that nodes have a dynamic order id. Thus we know which nodes is the currently oldest node in the cluster. We will always select the oldest node in the cluster as the master node in the cluster.

Heartbeat protocol#

The heartbeat protocol uses the order of nodes acquired by the node registration protocol and always sends heartbeats to the next node in the dynamic order. Similarly it expects to receive heartbeats from the node before it in dynamic order. The newest node will send heartbeats to the oldest node and vice versa the oldest node will expect heartbeats from the newest node.

Heartbeats have a special signal. Normally this signal isn't used. Every signal received from a node is treated as a heartbeat, so the transport layer assists in the heartbeat protocol. The reason was that in high load cases the heartbeat signal sometimes got so far into the send buffer that the heartbeat timeout happened before it got to the other node.

Node failure protocol#

Node failures are discovered in numerous ways. If a data node closes the TCP/IP socket to another data node, the other node will treat this as a node failure. Missing heartbeats for more than the heartbeat period (four times the heartbeat timeout) will trigger a node failure handling protocol.

Sometimes a node behaves in a weird manner, in this case the node might be shut down by another node sending a shutdown signal to it.

When the node failure protocol starts, it sends a prepare message to all nodes with the list of failed nodes. A node receiving this message will be able to add to the list of failed nodes. It will not be able to remove nodes from the list.

When all nodes have replied to this we might have a new set of failed nodes. If so we send another round of prepare messages. Eventually the set of nodes will agree on the failed nodes. After that we will commit the node failures. Thus each node failure set is agreed upon by all surviving nodes in the cluster. Thus we can trust that all nodes see the same view on the list of nodes alive after the node failure transaction is completed.

Now that we have decided on the set of failed nodes we will start handling the node failures. This includes handling each and every distributed protocol.

In particular it involves handling the failure of the transaction coordinator in many transactions as described in the section above.

There is an extension available to the node failure protocol that starts by performing a full check of connectivity among the nodes in the cluster before initiating the node failure protocol. To activate this one needs to set the configuration variable ConnectionCheckIntervalDelay to a non-zero value.

Graceful shutdown protocol#

A crash shutdown works just fine for many cases. However we have implemented a graceful shutdown to ensure that no transactions are aborted during a planned shutdown of a node.

A graceful shutdown is initiated from the NDB MGM client (ndb_mgm binary). It is also initiated by kill -TERM on the data node process. This is used in managed RonDB when stopping a node.

A graceful shutdown will ensure that no more transactions are started in the node. It will allow for already started transactions to complete before it shuts down the node. If transactions are still running after a few seconds they will be aborted.

Watchdog handling#

Every thread in the RonDB data nodes are protected by a watchdog thread. For most of the time the watchdog is not showing up at all. As soon as a thread is non-responsive for more than 100 milliseconds we will get printouts of this in RonDB. Watchdog printouts can come due to bugs in RonDB code, but it can also come if the OS have problems in delivering a real-time experience. During early phases of restart it is fairly normal to get some printouts of some watchdog issues during memory allocation.

If the watchdog doesn't hear any progress in a thread for the watchdog timeout (about one minute) it will crash the data node. If this happens it is usually due to an RonDB bug where we end up in some loop that we don't get out of. The watchdog thread is there to ensure that we will come out with a crash even if there is no crashing code.

Node restart activities#

Before a node can restart the node failure handling of the crash must have been completed first. Next the node must succeed in acquiring its node id. After this it will use the node registration protocol. Before coming to this point however the node will allocate all the required memory.

During restart we use the LCP to restore the rows. We bring the disk data parts back to their starting point by applying the UNDO log. Next we run the REDO log to recover the node to a specific GCP. After this we rebuild the ordered indexes. At this point we have an in-memory representation and the disk columns are exactly as things was at the restored global checkpoint.

After this we come to a very important part of the recovery, we have a node that is recovered to an old but consistent GCP. At the same time we have a live node that has continued to update the rows.

The next step is to synchronize the live node with the starting node. We didn't want to rely on any logs for this part. This would require us to keep the logs for a very long time, it would complicate the REDO logs since we would need to read and write them at the same time. In addition any synchronization using logs requires a point in time where we have to stop transactions to allow the node to participate in transactions again.

To avoid all of this we instead rely on synchronising the starting node using the data in the live node. In MySQL Cluster 5.1 we implemented a mechanism that synchronizes using row ids. We ensure that rows in all replicas share the same row id. Each row has a GCI that specifies the last GCI it was written. This makes it easy to scan all rows in the live node in row id order.

For each row in the live node that is stamped with a GCI that is higher than the restored GCI in the starting node we start a synchronization of the rows. We must handle deleted rows as well, thus even deleted rows must be synchronized with the starting node. If an entire page of row ids have been deleted the row ids of this page will be synchronized with the live node.

When we are about to synchronize the rows we start by taking a shared lock on the row in the live node. Thus we also need to synchronize all rows that are currently involved in a transaction. After this we send a write operation to the starting node with the row content (or a delete operation). We rely on signal ordering such that we can release the row lock immediately after reading the row and not wait for the write operation to complete.

In RonDB 22.10 we have implemented an optimisation such that if the GCI on the page is lower than the restored GCI and the page is not a new page and there are no ongoing transactions on the page, then we can skip all checking all rows on the page. This makes node recovery go faster for tables that have not so many writes since the node failure.

During the synchronization phase the starting node participates in each transaction. If a transaction handles a row that have not yet been synchronized we will act as if we successfully performed the change. If the transaction happens after the synchronization of the row, we will change the row as for a normal write operation.

In the Ph.D of Mikael Ronstrőm, this mechanism was proven to be correct by reasoning about the various operations that can occur in a transaction.

The number of rows synchronised at any time is limited by a hardcoded constant. This ensures that transaction throughput is really good even at very high loads during node recovery. Experiments using YCSB with Workload B shows that we retain 85-90% of the throughput in all phases of the node recovery even when the cluster is operating at close to 100% of its capacity.

During the synchronization phase the REDO log is not active for any fragment. When all fragments have been synchronized we activate the REDO log for all fragments.

The final step in ensuring that data is recoverable in the starting node we wait until the starting node have participated in a local checkpoint. After this have happened the node is recoverable again and the node can survive even if the live node would die.

Given that performing a local checkpoint is a distributed protocol that involves all live nodes we perform a local checkpoint in the starting node without involving other nodes before being involved in a distributed local checkpoint. This ensures that distributed local checkpoints can be fast and thus ensuring fast recovery times.

Initial node restart activities#

The default behaviour of an initial node restart is very similar to a node restart. The only difference is that it restores GCI 0, thus all fragments are empty.

This means that the synchronization phase have to synchronize all rows.

By setting the configuration parameter TwoPassInitialNodeRestartCopy to 1 we change the recovery for initial node restarts. This is the recommended setting and is default in RonDB.

With this setting the synchronization happens in two phases. The first phase synchronizes all rows from the live node. There is no ordered index created in this first phase. Thus this synchronization phase is faster. In this phase the node is not involved in any transactions, we simply retrieve the most recent rows from the live node.

Next we execute a parallel ordered index build. It is much faster to build indexes in parallel compared to building them while inserting rows.

The last phase is a normal synchronization that uses the GCI that happened immediately before the first synchronization phase started.

In this phase the new node is involved in all transactions thus ensuring that rows that have been synchronised are kept in synch even with new transactions.