Skip to content

Native RonDB APIs#

The most important API in RonDB is the C++ NDB API. Currently all access to the NDB data nodes goes through this interface.

The MySQL Server accesses NDB data nodes through the MySQL Storage engine API. This API is implemented using the NDB API. Through the MySQL Server it is possible to access the NDB data nodes through any MySQL API. There are so many MySQL APIs that I don't even know all of them myself. There is an API for most programming languages.

There are three direct NDB APIs. This means APIs that do not need to pass through a MySQL Server to get to the data. Quite a few applications have been written on top of the C++ NDB API. Some of those have implemented an LDAP server that can be accessed through an LDAP API.

There is also numerous applications developed on top of ClusterJ. ClusterJ is a very nice Java API that implements access to NDB rows as Java Objects. This API was used among others by the HopsFS that implements the meta data service for a Hadoop file system (HDFS).

There is also direct NDB API available from the NodeJS programming language.

We will go through the three direct APIs to the NDB data nodes, one in each chapter with most focus on the C++ NDB API.


Now we have completed the view of RonDB from the SQL point of view. The SQL view always depends on lower level constructs in its implementation. All accesses to data in tables, metadata about tables and retrieval of configuration data goes through the NDB API.

In MySQL there is a storage engine concept to allow many different implementations of the database engine. We have mapped the storage engine interface to our NDB API interface in the NDB storage engine.

The NDB API is implemented in C++, it is possible to write applications directly towards this C++ API. We have implemented special interfaces on top of the NDB API for Java (ClusterJ) and for JavaScript (Database Jones, a Node.js API towards NDB) that we will describe in subsequent chapters.

The aim of this chapter is to describe the NDB API from a high level to make it easier to understand how to write NDB API programs. There is a large set of example programs available in the directory storage/ndb/ndbapi-examples in the RonDB source tree. In this chapter we focus all attention on using the NdbRecord variant of the NDB API, some of the examples are old and uses the column format where one column at a time is provided. The new record format is faster, but the old column format is still supported and can thus still be used.

There is documentation of the NDB API available that is for the most part retrieved from the header files using Doxygen. Some parts are skipped in the documentation since they are treated as internal parts of the NDB API. These are safe to use, but have a higher risk of being changed in future versions since they are deemed as internal parts of the API.

Most of the early research work on NDB was centered around the NDB data nodes and how to implement those. In early 1998 the implementation of the data node parts was nearing completion and we started considering how the application code would interface the data nodes.

The NDB data node was written to handle requests asynchronously. We did not expect that application developers wanted to work with an asynchronous API towards NDB. Thus we decided to make a synchronous API. For the most part this decision has been a correct decision, implementing the NDB storage engine requires a synchronous API and using a synchronous API is a lot simpler compared to an asynchronous API.

At the same time the asynchronous API can be used to implement fairly elegant implementations and the code will be a lot easier to optimise. We ended up handling key operations using an asynchronous API that is used by a few users and by a number of our internal tools. It has also been used in a number of benchmarks we have developed to showcase the throughput possibilities of NDB.

At this point scan operations can only be accessed from the synchronous NDB API part.

Meta data operations#

The NDB API contains code to handle all interactions with the NDB data nodes including normal user transactions as well as event handling and meta data operations.

It is not recommended to use the meta data part of the NDB API directly in a user application. It is recommended to create all tables using the MySQL Server. Creating a table through the NDB API means that it isn't possible to use it from the MySQL Server. It is a lot simpler to use the SQL syntax to create tables, drop tables, create indexes, foreign keys and so forth.

The MySQL server will also take care of synchronising table changes with slave clusters.

Thus a normal NDB API application will contain code to both connect to a MySQL Server to create the necessary tables and other meta data for the application and the NDB API part to work with those tables.

We will not go through the meta data part of the NDB API in this book and will instead go through the specific syntax required for handling table creation and so forth of NDB tables in the MySQL Server.

Thus it is recommended to always have at least one MySQL Server in an NDB Cluster even if the MySQL Server isn't used at all for normal transactions or queries.

Initialising the NDB API and MySQL client API#

Before any interactions with the NDB API it is necessary to initialise the NDB API first. This happens through a call to the method ndb_init().

Similarly it is necessary to initialise the MySQL client API before using it by calling mysql_init().

Concurrency in the NDB API#

In 1998 most computers were using a single CPU or possibly two CPUs. Concurrency was limited, however it was necessary to handle many concurrent threads accessing the NDB API.

We decided to create a C++ class called Ndb that is used by one thread. There can be many Ndb objects that can be operated on from many different threads at the same time. But one Ndb object can only be operated on by one thread at a time. Thus this object must be handled with care. It is possible to use it in different threads, but only in one thread at a time. If using it in many threads, it is necessary to protect the move of the Ndb object to another thread.

The Ndb object uses a set of classes that we call the Transporter classes. These classes are responsible for taking the information from one Ndb object and converting it to the NDB protocol and later send this information to the NDB data nodes. It is responsible to receive data back from the NDB data nodes and read the NDB protocol and place the results back into the objects handled by the Ndb object.

In the first implementation of the NDB API and all the way up to MySQL Cluster version 7.2 the Transporter layer used a very simple concurrency control by implementing a Transporter mutex that ensured that only one thread at a time was able to interact with the transporter parts.

After a few years this became a bottleneck and to avoid this bottleneck we made it possible to have several API nodes within one API program. In the MySQL Server this is a configuration variable called --ndb-cluster-connections. In 7.2 and earlier versions each API node scaled to about 1.5 CPUs when running a simple benchmark using the NDB API directly (using the MySQL Server means it scales to more CPUs since the MySQL server code will use quite a few CPUs to handle query processing). So e.g. to run a benchmark in a 8-CPU environment was made possible by using 6 API nodes (or cluster connections as they are called in the NDB API). Each cluster connection uses their own node id and can be seen as independent nodes in most management views from NDB.

Now with the multi-core revolution that started around 2006-2007 the number of CPUs per machine escalated very quickly. Thus it was necessary to implement a more scalable concurrency control of the Transporter part. This was implemented in MySQL Cluster 7.3 and makes each API node or cluster connection scale a lot more. So e.g. for the Sysbench benchmark each cluster connection scales to more than three times as many CPUs as before. For DBT2 it was even more where it scaled to more than seven times more CPUs.

Now it is possible to to use only four cluster connections even when running a MySQL Server using 32 CPUs when executing the Sysbench benchmark.

The major limiting factor now in the Transporter part is that there is only one thread at a time that can execute the receive part of the Transporter part.

Ndb cluster connection#

The Ndb_cluster_connection object is the starting point for all NDB API programs. When creating this object it is necessary to specify the NDB connect string. The NDB connect string is used to fetch the cluster configuration from the NDB management server. It is possible to specify multiple NDB management servers in the connect string to increase the chance for a successful configuration retrieval. It is possible to specify a specific node id to use by this cluster connection. If not specified the management server will pick a node id that is available currently in the configuration.

To use specific node ids is not absolutely required. In my view it is recommended to use it as much as possible. For management servers and data nodes I recommend to always use it since the node id is connected to a state saved on disk. It will simplify management of the cluster if all processes start up using a specific node id. This requires setting the node id in the configuration for these node types.

For MySQL Servers and long running API programs it is a good idea to use specific node ids.

The normal procedure for the Ndb_cluster_connection object is to first create the object specifying the NDB connect string and possibly node id as well. Next a connect call will fetch the NDB configuration from a management server. Next a call to wait_until_ready will wait until we are connected to at least one NDB data node.

After these calls our API program is now a part of the cluster and will show up when we list the nodes in the current cluster.

Auto reconnect#

Once the NDB API has established communication to the data nodes it will automatically balance the load on the connected data nodes. If a data node is no longer connected we will automatically try to reconnect to this node without any need for the user of the API to interact. It is possible to disable this behaviour if desirable.

Selection of data node to communicate with#

Each transaction is handled by one specific data node. There is a number of things impacting the decision which data node to choose. The first consideration is whether the user have provided a hint when starting the transaction, if he has, we will use this hint to connect to the data node if possible.

The hint doesn't say which data node to connect to, it rather specifies a primary key in a specific table, this primary key is mapped to a node using a hashing function and some data distribution hints provided to us from the data nodes when we started using the table. These are hints that are very stable, but they can change e.g. during an add node operation. There is no guarantee that they will be 100% accurate. If they are incorrect, it is not a problem since the data node will get the correct nodes to read and write data from, independent of which data node the transaction is started from. The data nodes always have a correct view of the data node distribution of each table.

If no hint is provided we use a round-robin scheme to ensure that we spread the load on all data nodes.

When using SQL we always use the first SQL statement of a new transaction as the way to provide the hint. If the first SQL statement specifies a primary key to use for a table we will choose this as the hint for the transaction we start.

Setting data node neighbour#

In case we haven't specified a hint about where to place the transaction coordinator we can set the node id of the closest data node in the configuration. This will be used in those cases.

If a MySQL Server has the same IP address as a data node it will automatically become the neighbour of this data node and will prefer transactions to use this data node unless a hint has been provided.

However in some situations we know that two nodes are close to each other but they use different IP addresses. This could happen e.g. when we use a Docker network setup where each node have its own IP address but some IP addresses are actually located on the same physical server or even the same virtual machine.

In this case one can provide the data node neighbour through the set_data_node_neighbour call on the Ndb_cluster_connection object.

In the MySQL Server one can set this node id through the ndb-data-node-neighbour configuration variable.

The main use of the data node neighbour concept is for query processing where we are issuing complex SQL queries. In this case most likely no primary key is provided, round robin would be used unless a data node neighbour is selected.

Using a data node neighbour is important if the network between the computers have limited bandwidth. Using a node local to the computer means that we don't have to use limited bandwidth available in connecting between computers in the cluster.

Combining this feature with the possibility to read from backup replicas and possibly even using fully replicated tables makes the API node very strongly connected to the neighbour data nodes in their interaction for all query processing and makes it possible to build query processing that scales in an excellent manner when adding more and more MySQL Servers and NDB data nodes.

Locking receive thread to specific CPU#

It is possible on Linux, Solaris and Windows to lock threads to specific CPUs. In the NDB API we have the possibility to use this feature in the NDB API by calling the function set_recv_thread_cpu on the Ndb_cluster_connection object. The API call is prepared for multiple receive threads per NDB cluster connection. Currently we only support one, the array always should be 1 in length in this call.

It is possible to specify the CPUs to use for NDB cluster connections in the MySQL Server configuration. For this one uses the ndb-recv-thread-cpu-mask configuration variable. In this variable one can list e.g. 0,2-4 to specify that CPU 0 is used for the first cluster connection, 2 for the second, 3 for the third and 4 for the fourth. The number of CPUs here must be at least the number of cluster connections specified in the configuration variable ndb-cluster-connection-pool.

There is a call set_recv_thread_activation_threshold that specifies how many API threads that should be concurrently used for the receiver thread to take over receive processing. Setting CPU locking means that it can be a good idea to set this to 1 to ensure that the receive thread is used for all receive handling when multiple threads are active. There is a corresponding MySQL configuration parameter called ndb-recv-thread-activation-threshold for this as well.

By default there is no CPU locking for receive thread and the activation threshold is set to 8.

Locking the receive thread to a specific CPU improves performance if it is combined with locking the MySQL server process or the NDB API program to use other CPUs and not the CPUs used by receive threads. It is also necessary to control other programs in the computer that uses large CPU resources, one should also control interrupt handling to avoid those CPUs. If this cannot be done one should not use CPU locking of receive threads.

Ndb object#

The Ndb object is the starting point for all NDB interactions. As mentioned this object should not be used from more than one thread at a time. When waiting for a response back from the NDB data nodes it is important to not use this object while waiting.

Concurrency in the NDB API comes from three places. First it is possible to use multiple cluster connections, second it is possible use multiple threads per cluster connection using separate Ndb objects. Third it is possible to handle multiple operations and even multiple transactions per interaction using the Ndb object.

The Ndb object is a placeholder for transaction objects, operation objects and many other objects used by the Ndb object. Objects are created through the Ndb object and destroyed by the Ndb object.

Before starting to use the Ndb object it has to be created and it is necessary to call the method init().

When creating the Ndb object one specifies two things, first the Ndb_cluster_connection object used and second a string containing the database name used by the Ndb object. It is possible to change the database name used later on.

The init() method has one parameter that is set to 4 by default, the parameter specifies the maximum number of transactions that can be active in parallel on the Ndb object. Using the asynchronous API or using batching with one Ndb object it is possible to handle up to 1024 transactions in parallel from one Ndb object.

Here is a simple code example skipping all error checks for creating the cluster connection and the first Ndb object.

  Ndb_cluster_connection *cc =
    new Ndb_cluster_connection(connectstring, node_id);
  cc->wait_until_ready(timeout, wait_after_first_node_connected);
  Ndb* ndb_obj = new Ndb(cc, "database_name" );

Setting up a row#

Before a user transaction can be called we need to setup a row data structure. The NDB API also contains a variant where one takes care of one column at a time. I will only describe the NDB API to handle things using the NdbRecord columns since this is nowadays the recommended use of the NDB API.

To setup a row we need a reference to the table and the columns we want to operate on.

The first step is to get an object from Ndb of the type NdbDictionary::Dictionary that is retrieved using the call getDictionary() on the Ndb object.

Next this dictionary object is used to get a table object of type NdbDictionary::Table. This is retrieved by using the call getTable(tableName) where tableName is a string with the table name.

Next we use the table object to retrieve one or more column objects. This is retrieved by using a set of calls of the type getColumn(colName) where colName is a string containing the column name.

Next we create a record specification object that maps each column into a buffer from where data to set will be read or data read will be inserted when read from NDB.

This is an object of the type NdbDictionary::RecordSpecification. We create an array of such objects with as many members in the array as the number of columns we want to access.

We create an Ndb record by calling createRecord on the dictionary object. This method have as input the table object, the record specification array, the number of entries in the record specification array and the size of the NdbDictionary::RecordSpecification object (needed for backwards compatibility since there used to be an older variant of the record specification object).

This call returns an object of the type NdbRecord that can now be used in user transactions. It is also possible to set up a row for access through an ordered index or a unique index. In this case we need both the index object and the table object in the call to createRecord.

Here is a very brief example of the above.

  struct row_buffer
    int col1;
    int col2;


  NdbDictionary::Dictionary* dict_obj= ndb_obj->getDictionary();
  NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
  NdbDictionary::Column *col = tab_obj->getColumn("column_name");
  NdbDictionary::RecordSpecification spec[2];
  spec[0].column = col;
  spec[0].offset = offsetof(row_buffer, col1);
  spec[0].nullbit_byte_offset= 0;
  spec[0].nullbit_bit_in_byte= 0;
  spec[1].column = col;
  spec[1].offset = offsetof(row_buffer, col2);
  spec[1].nullbit_byte_offset= 0;
  spec[1].nullbit_bit_in_byte= 0;
  NdbRecord *pk_record =
    dict_obj->createRecord(tab_obj, spec, 2, sizeof(spec));

For each element in the column array it is necessary to set the column object retrieved previously in the variable column and an offset to the column in the buffer provided in a variable called offset. There are variables called nullbit_byte_offset and nullbit_bit_in_byte. These two numbers point out which bit in the null bits of the record that is used to represent the NULL state of the column. These are ignored for columns that are declared as NOT NULL. Thus nullbit_byte_offset is the number of bytes from the start of the row.

Thus the row buffer must have space allocated for both the column values as well as for NULL bits. It is recommended use at least 32 bits for the NULL bits to avoid misaligned column values. Similarly for all column values it is recommended that they are aligned on 32-bit boundaries for efficient access without misaligned traps. On a SPARC it is necessary to use this alignment.


Every interaction involving reading or writing user tables in NDB uses transactions. A transaction goes through a number of steps, first it is created, next it is executed and the final execution specifies either commit or abort, it can be executed multiple times before abort/commit and finally it is closed.

Start a transaction#

Starting a transaction means that we allocate one connection record in one of the DBTC threads in the cluster. Once such a record have been allocated by an API node the API node keeps the connection record. This means that only the first time the start transaction involves a distributed interaction with the NDB data nodes. Most startTransaction calls are local calls that grabs a connection record from a linked list of free connection records and there is one such linked list per data node in the cluster.

The start transaction can be without any parameter startTransaction(). In this case a round robin mechanism is used to decide which data node to start the transaction in, if we have a neighbour data node (either with same IP address as API node or by setting data_node_neighbour in the cluster connection object), we use the neighbour node.

Start transaction with hint#

To start a transaction with a hint on where to start the transaction is a very important feature of RonDB. This is important to write scalable applications using RonDB. As an example HopsFS mapped a hierarchical file system on top of a 12-node RonDB cluster. In order to get this to scale it was important to ensure that operations such as ls (list files in a directory) could execute using a scan on just one partition of the table. HopsFS solved this by using the parent directory as the partition key for the table.

Similarly TPC-C have a warehouse id that is part of all tables that can be used to make TPC-C scale perfectly if the warehouse id is used as a partition key. Selecting the proper partition key is important to design scalable applications in NDB.

There are two reasons for this. The first is that if one makes a transaction that updates many rows that have some connection (this is what happens in TPC-C and in HopsFS and in almost all applications), it is very valuable to place the transaction coordinator on the same node as the primary replica location of the rows to be updated. This will significantly decrease the overhead of communication within the cluster during the transaction.

The second is that we can perform ordered index scans that has an equality bound on the partition key, or a full table scan with such an equality bound. These scans only need to scan one partition of the table since all rows sought for will be housed in the same node. By placing the transaction coordinator on this node, we can avoid communication to get to this partition since the partition is placed on the same node.

Decreasing the number of partitions that is required to scan gives us a lower startup cost for the scan. Each additional partition to scan brings an additional small cost to the query execution.

Decreasing communication needs is always important to create scalable distributed applications whether it is a NoSQL database, a shared disk relational DBMS, or a shared-nothing DBMS such as RonDB. Modern hardware makes communication cheaper and makes it easier to write distributed applications, but it will always be beneficial to write your application to minimise the communication needs.

To start a transaction with a hint we use a variant of the startTransaction call that use a few parameters. It uses a table object of the type NdbDictionary::Table as input that is created by calling the method getTable(tableName) on a NdbDictionary::Dictionary object. The second parameter is NULL-terminated array of objects of the type Key_part_ptr. The Key_part_ptr object is a simple struct containing a void pointer and a length of the pointer. This data represents the distribution key or partition key of the table defined by the table object. These are checked against each other that they are consistent.

There is also a method that uses a NdbRecord, this is not part of the public NDB API, but is used by the NDB storage engine. The NdbRecord here is a record of the primary key of the table used. Here is a simple example to start a transaction with a hint given this interface.

  NdbRecord *ndb_rec = ....
  Ndb *ndb_obj = ....

  NdbTransaction *trans_obj = ndb_obj->startTransaction(ndb_rec,

The two last parameters specify a buffer to be used for hashing. By specifying NULL as pointer to the buffer we ensure that the malloc call is used, otherwise one has to ensure that the buffer is large enough for the hash function call. In the public function described above these two parameters defaults to NULL and 0. But they can be specified explicitly also in this call.

If a buffer is provided it must be large enough to contain all columns in their maximum size and the buffer must be aligned to a 64-bit boundary to be used in these calls. So the buffer should be fairly large if provided. The hash function must work on an expanded row to ensure that different representations of the same word uses the same hash value (there are many character sets that have multiple ways to write the same word).

If no such pointer is passed the NDB API will have to use malloc and free to allocate and release the memory needed for executing the hash function.

It is also possible to start a transaction on a specific node id and instance id, but normally the application will have no knowledge of the table distribution such that this will be of any use whereas using the partition key will be very useful.

Define operations#

After starting a transaction we define a set of operations for the first interaction with the NDB data nodes. We will go through in detail how this is done below, these could be operations on a specific record using either a primary key or unique key, it could be scan operations that scan a part of a table using either a full table scan or an ordered index scan. Scans can be used to write rows where we find the rows to write through a scan.

Executing a transaction#

After defining one or more operations it is now time to execute the operations. When executing key operations one can decide whether it is time to commit the transaction while executing it. It is possible to commit with a set of operations, it is also possible to commit with an empty set of operations. Committing an empty set of operations will commit all operations executed since the start transaction call was made.

When a transaction has completed executing a commit of a transaction it is no longer possible to continue using the operation in any other manner than to close the transaction.

The call to execute is made on the NdbTransaction object received in the startTransaction call. The method is called execute and the only required parameter is whether to commit, not commit or abort. The value sent in to commit is NdbTransaction::Commit, to execute but not commit yet one uses NdbTransaction::NoCommit and to abort a transaction one uses NdbTransaction::Rollback. We can provide a second parameter that specifies how to handle errors during transaction execution.

Setting this second parameter to AbortOnError means that no errors at all are allowed. AO_IgnoreError means that we will commit as much as is possible to commit, thus for most errors we will continue the transaction even if one part of the transaction fails. The default setting for the abort option is that it can be set on a per operation basis. However explicitly setting it in the execute call overrides the operation setting.

It is possible to set a third parameter with a force send flag. By default this is turned off. In the MySQL Server this is controlled by a configuration parameter in the MySQL Server.

Scan operations use execute as part of starting up a scan. The continuation is however different after the startup of the scan. A scan can handle any number of rows so there is an API to control the fetching of rows to the API that we will describe below.

Close a transaction#

After committing a transaction one needs to close the transaction to make all allocated resources available again. If the transaction had not been committed yet and the NDB data node is waiting for more input the NDB API will abort the transaction as part of the closeTransaction() call.

A call to start a transaction with a hint immediately followed by a call to get the node id of the transaction coordinator (using getConnectedNodeId()) followed by a call to close the transaction is a quick way of discovering which node the primary replica record belongs to. This method is used by the flexAsynch benchmark program and the ndb_import program (introduced in 7.6) to distribute queries to different threads where each thread is handling one data node.

Key operations#

To define a key operation is straightforward after setting up the row. One calls a method on the NdbTransaction object and it returns an NdbOperation object.

Insert tuple#

To insert a row we call the method insertTuple that requires four parameters. The first is the NdbRecord object for the primary key and the second is a pointer to the memory buffer for the primary key columns. The third parameter is the NdbRecord for the rest of the columns and the fourth is the memory buffer for these columns. The memory buffers must contain the data to be inserted in all columns before the call.

In a sense one can see the NdbRecord objects as the meta data descriptions of a row object. The memory buffers contains the actual data that is interpreted according to the NdbRecord object.

There is a variant where only two parameters are needed where the primary key columns are combined with the rest of the columns. This only exists for the insertTuple call and not for other types of calls.

The buffer can be reused for other purposes immediately after the call since the data have been saved in the NDB API and no data is to be returned in this case.

Update tuple#

Exactly the same as inserting of a row except that instead we use the call updateTuple with the same parameters and we still need to set all columns before the call. Also here the buffers can be immediately reused.

Write tuple#

The writeTuple call will perform an insert if the row didn't exist before or update if the row previously did exist. All required columns need to be part of the record specification since it can become an insert. Also here the buffer can be immediately reused.

Delete tuple#

The call deleteTuple uses the same parameters, but the fourth parameter can be skipped if no read is performed while deleting the row. The record specification for the entire row is still required in this call.

Read tuple#

The call readTuple will perform a read of one tuple. The parameters are still the same as when calling insert, update and write of a tuple. It is not ok to reuse the second memory buffer provided in the fourth parameter until the read has been completed. The memory buffer is here used to return the data read from NDB. It is important to still set the primary key fields before calling readTuple. The first memory buffer can be immediately reused after the call.

It is possible to set lock mode in this call, more on this below in the section on scans.

Operation options#

In addition the above calls can all set a result mask which is a bit mask indicating which parts of the record specification to use. It is an array of bytes where bits are set in column id order. If a bit is set in this array of bytes it indicates that the column is used for either update or read dependent on the operation type.

There is a possibility to set special options for the reads and writes, more on such options below. There are two parameters for this, one that is a pointer to the options object and a second that contains the length of this options object.

Scan operations#

There are two types of scan operations, full table scans and ordered index scans.

Ordered index scans#

Ordered index scans normally use a range scan that specifies a start position and an end position for the range scan. These ranges are defined on the columns defined in the ordered index. Multiple ranges are possible to specify in a scan operation.

To specify an ordered index scan we use a call to readIndex on the transaction object that returns an NdbIndexScanOperation object. The parameters passed in this call defines the ordered index record specification and another record specification for the data returned in the scan. Optionally one defines the lock mode, a bitmask of the columns in the record specification that are to be read, a bound object of the type NdbIndexScanOperation::IndexBound and finally an option object with its length.

The bound object can be defined after this call in a separate call on the scan object returned from scanIndex using the method setBound where the record specification of the ordered index and the bound object is the parameters.

Multiple index ranges can be defined, various sort orders can be defined. These are specified in the options parameter specified below.

Full table scan#

A full table scan will scan all rows in a table and there are three ways to do this, scan through hash index, scan through main memory rows and scan through disk data records.

After starting the transaction one can start a full table scan by calling scanTable on the transaction object. This call will return an object of the type NdbScanOperation.

This requires at least one parameter, this is the record specification of the row to be retrieved from the table, not all columns need to be part of this specification only the columns of interest.

The second parameter is the lock mode of the scan. Default lock mode is LM_Read that means that a row lock will be held on the row from it is scanned until we move onto the next row. LM_Exclusive means that we have a row lock, but here an exclusive lock instead.

LM_CommittedRead means that no lock is held at all while scanning, the last committed value of the row is read instead.

LM_CommittedRead is the default setting for SELECT queries in MySQL for the NDB storage engine. The NDB storage engine only supports the setting READ_COMMITTED.

To use LM_Read from SQL one uses SELECT ... LOCK IN SHARED MODE and to use LM_Exclusive from SQL one uses the syntax SELECT FOR UPDATE .... The final setting that one can use is LM_SimpleRead, thus the row is locked while reading it, but before sending it to the API the lock is released. This behaviour is default for reading BLOB tables from the MySQL Server. BLOB tables are a bit special in how they handle these lock modes, see the chapter on concurrency control in RonDB.

Lock mode can be set on the readTuple calls above when defining a read operation using primary key or unique key. It cannot be set on write operations since these operations are always using an exclusive lock.

The third parameter is the result mask. This parameter can be used to define the set of columns (in column id order) to be used in the scan. This is useful if we are using a standard record specification that contains all columns and we want to specifically set a different set of columns for this scan operation.

The fourth parameter is the scan options, there are many different ways to perform the scan, we will go through those options below.

The actual scan phase#

The scanning is started by calling execute on the transaction object with NdbTransaction::NoCommit as parameter.

Next the actual scanning happens in a loop where we constantly call the method nextResult on the NdbScanOperation object. This method requires a pointer to a memory buffer from where the data can be read. Thus the NDB API uses its own memory and returns a pointer to this memory.

The second parameter is important, if true it states that a fetch is allowed. Thus if it is false we will only return rows that have already been fetched from the NDB data node. By looping with this set to false we know that we won't release locks on rows that we have scanned with nextResult, we know that the memory buffers that was returned from previous nextResult calls are still valid. This gives us an opportunity to perform write operations on these rows before releasing the lock on the rows. It makes it possible to get this done in a batching mode where up to almost one thousand rows can be scanned locally before the next interaction with the NDB data nodes need to happen.

The third parameter is whether to use force send flag when starting a new fetch. If we are ok to pass on to the next row immediately we will set fetch allowed to be true and this is set after completing the actions required on rows that we have scanned without starting a new fetch.

For read queries we can simply call nextResult in a simple loop until we get the result that no more rows exists.

There is a variant of nextResult called nextResultCopyOut that instead of getting a pointer to a memory buffer sends in a buffer where it asks the NDB API to store the row fetched. Other than that it works exactly the same way as nextResult.

Scan with takeover#

Scan with takeover means that we use the row fetched in a scan to either update or delete the row. This should only be used if the scan uses the lock mode LM_Exclusive. If used with only LM_Read it can cause a deadlock to occur since the lock must be upgraded.

When using scan with takeover it is necessary to use a double loop, an outer loop that uses nextResult with fetch allowed set to true and an inner loop that uses nextResult with fetch allowed set to false. When the inner loop exits or as part of the inner loop we need to handle all the updates and deletes performed as part of the scan.

To delete the row that was reported in the scan operation it is done through the call deleteCurrentTuple, updates are using the call updateCurrentTuple. The take over can be made by any transaction, it need not be the same transaction as the scan is executed in.


There is a whole range of options that can be set for both scan and key operations that affects the operation of the key lookups and scans. We will cover these here, we will explain them in the context of key lookups, but they can also be used for scan operations. The difference is only that the options class is called NdbOperation::OperationOption for key lookups and it is called NdbScanOperation::OperationOption for scan operations.

The option identifiers are called OO_* for key lookups and they are called SO_* for scan operations.

Scan flags#

Scan flags are an integral part of a scan. It is set by the option SO_SCANFLAGS and by setting the option variable scan_flags to a proper value. This option is only available for scan operations.

Several flags can be set by OR:ing them together. The following flags are supported:


This option is only applicable to a full table scan. It scans the tuples in row id order instead of the default order imposed by the hash index.


This option is only applicable to a full table scan on a table with disk columns. In this case the scan can be done in the row order on disk pages. This can be beneficial since it can decrease the amount of fetching disk pages for a table scan.


Given that an ordered index scan is parallelised on a number of partitions the rows can be returned in non-sorted order. Each partition will return rows in the order sorted by the index.

The NDB API has the capability to sort the rows before returning them to the application. This requires running with full parallelism, thus this flag cannot be combined with SO_Parallel. It requires all partitions to return data such that the sort can be performed before returning data to the application. This will have an impact on scan throughput.


Same flag as SF_OrderBy except that the flag implies that all index columns are added to the read set of the scan.


This is only applicable to an ordered index scan. By default ordered index scans are done in ascending order, with this flag the order is instead descending.


This applies to ordered index scans where multiple index ranges are scanned. In this case this flag must be set.


If multiple ranges are scanned this flag enables reading the range number for a row that has been delivered through the API.


If a scan is performed using the lock mode LM_Read one can specify this flag. It will send back key information to the API that enables a take over operation to be defined using the call lockCurrentTuple. This is necessary if we want to retain a read lock on a row already scanned.

This flag is enabled by default when using the lock mode LM_Exclusive.

Scan batching#

Normally the size of batches sent back to the API is limited by configuration parameters on the API node. It is possible to override these defaults by setting the option SO_BATCH and set the option variable batch to the number of rows per batch that is desired.

This option is only available for scan operations.

Scan parallellism#

By default a scan will use the maximum parallelism available. This means to scan all partitions in parallel up to a limit of how many partitions that can be scanned in parallel.

By setting the option SO_PARALLEL we can set an explicit limit on the parallelism for a scan. We set the limit in the variable parallel on the options object.

This option is only available for scan operations.

Scan partitioning info#

There is an option SO_PART_INFO available that is never used. Its use is related to multiple ranges defined in a query where all ranges use the same partition. This is a feature used by the user defined partitioning scheme not supported but still implemented. The MySQL Server implements this without using any option.

Only available in scan operations.

REDO log queueing#

All write operations are written into a REDO log. As part of the prepare phase we write them into a REDO log buffer. This buffer could be full. If this happens we have different choices. One choice is to abort the transaction. The other option is to queue the operation before executing it and wait for REDO log buffer to be available.

The default behaviour for this is set as part of the cluster configuration of the API node. Through setting an option OO_QUEUABLE and OO_NOTQUEUABLE (note spelling error here). we can change to either queue or not queue for a specific transaction. The same option should be used for all operations in a transaction.

This option is only available for writing key lookups.

Early unlock#

The normal behaviour for a transaction is to release locks at commit. BLOBs in NDB are implemented as separate tables.

For a normal table using READ COMMITTED mode requires no locks. For a BLOB table we need to first get a lock on the base table followed by a read on the BLOB table where we read with a simple lock.

For BLOB tables this lock on the base table is unlocked immediately after completing the read of the BLOB table.

The mechanism to unlock rows before commit is available to an application. Such behaviour has an impact on the transactional correctness of the application, so if used, it should be used with care.

When the operation is defined one uses the flag OO_LOCKHANDLE. Later when we want to unlock the row we call getLockHandle on the operation object followed by a call to unlock on the transaction object using this handle. The unlock row operation will be sent the next time we call execute.

  NdbTransaction *ndb_trans = .....
  NdbOperation *ndb_op = ....
  NdbOperation::OperationOptions options;
  options.optionsPresent =
  Execute ndb_op operation towards NDB data node
  NdbLockHandle *lock_handle = ndb_op->getLockHandle();

This operation is only available to key lookups.

Disable Foreign Key constraints#

The MySQL Server provides the capability to temporarily disable foreign key constraints. This is implemented as an option in the NDB API. By setting the option OO_DISABLE_FK this is achieved, if it is set on an operation it is set on the transaction in the NDB data node, it is always a transaction property.

Use of this feature means that it is possible to create an inconsistent database, it is important that you understand what you are doing when using this feature.

This feature is only available to key lookups.

Deferred constraints#

In the SQL standard constraints are checked at commit time, deferred constraints are the default behaviour. In MySQL the constraints are checked immediately by default and InnoDB only supports immediate check of foreign key constraints.

NDB supports deferring constraint checks (unique key constraints and foreign key constraints) to commit time. This is controlled in the MySQL Server by setting the MySQL configuration parameter --ndb-deferred-constraints to 1. It is always used in the slave applier for RonDB Replication since this packs a number of transactions together, only the result at commit time is certain to pass the constraint checks.

It is possible to use deferred constraints in the NDB API as well by setting the option flag OO_DEFERRED_CONSTAINTS (note spelling error here).

This feature is only available to key lookups.

Save data in operation object#

It is possible to save a reference to any data structure in the operation object. This is done by setting the option OO_CUSTOMDATA and setting the variable optionData on the options object.

This is used by NDB for handling conflict detection in the slave applier. But it could be used for anything really.

Later on you can retrieve this pointer through the call getCustomData() on the same operation object.

This option is available to both scan operations and key lookups.

Pass a value to the event API#

It is possible to pass a 32-bit value from the NDB API to the NDB Event API. This is passed in any update, insert or delete operation. When this operation triggers an event that is sent to the node receiving the events, this value is passed and can be read through the call getAnyValue on the event operation object.

To set this value use the flag OO_ANYVALUE and the variable anyValue on the options object.

This is used as part of the RonDB Replication implementation to pass information from the NDB API that issued the change to the slave applier. For more information on the content in the any value data see the code files sql/ndb_anyvalue.h and sql/

If the cluster isn't using replication to another cluster and the application needs to transport some value to the event API it is possible to use this value.

This feature is only available to key lookups.

Operation bound to specific partition#

It is possible to bind an operation to only be executed by one partition. For example if one needs an estimate on the number of rows one can scan one partition and assume that the the other partitions have roughly the same data distribution.

In this case one sets the flag OO_PARTITION_ID and sets the partition id that is scanned or used in the variable partitionId on the options object.

This option is available to both scan operations and key lookups.

Get special values#

Normally an operation retrieves column values from the row. It is also possible to retrieve metadata values about the row and its partition. These columns are called pseudo columns internally in NDB. These pseudo columns all relate to the current row read or written in a key lookup or in a scan operation. By performing a scan operation on all partitions of a table it is possible to get table values for many of those items.

A few of those pseudo columns are only intended for internal use cases.

This option is available to both scan operations and key lookups.

  Uint32 row_gci;
  Uint32 row_size;
  NdbOperation::OperationOptions options;
  NdbOperation::GetValueSpec extra_gets[2];
  extraGets[0].column = NdbDictionary::Column::ROW_GCI;
  extraGets[0].appStorage = &row_gci;
  extraGets[1].column = NdbDictionary::Column::ROW_SIZE;
  extraGets[1].appStorage = &row_size;
  options.optionsPresent =
  options.extraGetValues = &extra_gets[0];
  options.numExtraGetValues =
  Pass reference to option in call to readTuple/insertTuple/...

In the example above we perform a normal key lookup operation where we are reading a number of normal row columns (not shown in example above) and in addition we want to know the last GCI the row was updated and we want to know the size of the row.

These extra columns are added to the columns read by this operation as shown above and the same pattern applies to a whole range of metadata columns.

These columns are also accessible from SQL, their name in this case is as below with NDB$ prepended, e.g. NDB$ROW_GCI.


This will return the id of the partition of the row returned. It is a 32-bit column.


This provides the total amount of memory used for the fixed part of the rows in this partition (we use the term fragment internally in NDB as synonym for partition in most cases). In reality we are reading the information on the specific fragment replica where the operation was executed. It is a 64-bit column.


This provides the amount of variable sized memory for the partition. A row has a fixed memory part, an optional variable sized part and an optional disk part. The partition also have a primary key hash index and possibly also a set of ordered indexes. It is a 64-bit column.


This returns the current number of rows in the partition. A row is seen in this view when the row insert have committed. It is a 64-bit column.


The number of commits in this partition since the node was started (or partition was created) is reported in this variable. It is a 64-bit column.


Returns the size of the row read. It is a 32-bit column.


This is only applicable to ordered index scans. Ordered index scans can use multiple ranges, in this case this column returns the number of the range that the row is returned from. It is a 32-bit column.


This is the reference to the disk part. It contains a file id, a page id and a page index. It is a 64-bit column.


When executing an ordered index scan this column will calculate an estimate of how many rows that exist in the range specified in the scan. The intended use for this is to calculate index statistics used to optimise queries in the MySQL Server. The column type is an array of 4 32-bit values.

The first value is the number of entries in the index (all rows in the partition except rows that have a NULL value in an index column). The second is the number of rows in the range, the third value is the number of rows before the range and the last value is the number of rows after the range. All these values are estimates.


The column type is an array of 2 32-bit values. The first 32-bit value is the fragment page id and the second is the page index. This number must be the same in the primary replica and the backup replicas.


This value is the GCI identifier of the transaction that last updated this row. It is a 64-bit column, although the value will always fit in a 32-bit variable.


It is a 64-bit column. The higher 32 bits is the same as returned in ROW_GCI. The lower is the extra GCI bits used in the conflict handling of the RonDB Replication. More on this later.


When a table is involved in conflict detection handling we add this extra flag that indicates if the row was written last by the slave applier or not. It is a 32-bit column.


This reads the any value set by the OO_ANYVALUE above. It is a 32-bit column.


The column type is an array of 2 32-bit values. It is the same as ROWID except that this is the row id for the copy row, this row id might differ in the replicas.


This provides a reference to the lock object. The highest 16 bits of the first 32-bit value is the node id where the lock resides. The lowest 16 bits of the first is the fragment id, the second word is the operation record reference in the LQH block and the third word is a part of the unique id in the LQH block of the operation.

This is used under the hood for unlocking a row before commit. It is intended only for internal use of the NDB API.


This returns the unique id inside the LQH block of the operation currently executed. It is currently not used for anything. It is a 64-bit column.


The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the number of extents the partition have that are free.


The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the amount of free rows available in the free extents already allocated to this partition.

Set special values#

This is very similar to getting special values. The difference is that the option flag is OO_SETVALUE, the object used to set the values are NdbOperation::SetValueSpec instead of NdbOperation::GetValueSpec. The number of set values is set in numExtraSetValues and the set value object is stored in extraSetValues as shown below.

  NdbOperation::OperationOptions options;
  NdbOperation::SetValueSpec extra_sets[2];
  options.optionsPresent =
  options.extraSetValues = &extra_sets[0];
  options.numExtraSetValues =

This feature is only available to key lookups. As an example it can be used to set the ROW_AUTHOR field above, see the NDB storage engine code for an example.

Hidden key column#

When a table is created without a primary key, the NDB storage engine will add a column that is an 8-byte column where we generate a unique id of the row when inserting it. This column is always placed last in the set of columns.

Partition id for user defined partitions#

For tables with user defined partitioning the MySQL Server calculates the partition id and sets it in the first column id after the last column (or after the hidden key column).


ROW_AUTHOR can be set by slave applier thread for tables involved in conflict detection.


This column is used as part of implementation of OPTIMIZE TABLE. By setting this column to a proper value the NDB storage engine instructs the NDB data node to move the variable sized part to a prepared place to remove fragmentation of the variable sized memory. It is not intended for normal use cases. It is a 32-bit column.

Interpreter commands as part of NDB API operations#

The final option that one can use is so advanced that it merits its own section. This is the execution of interpreted programs. All read and write operations can run an interpreted program as part of their operation.

The most common use of this feature is to push down scan filters to NDB data nodes. Thus the MySQL Server has a condition it wants to push down to the data node. This is handled such that the conditions is translated into an interpreted program.

This interpreted program is sent as part of the read/write operation down to the data node owning the data. When reading or writing the row this interpreted program is executed and for scan filters it decides whether to read the row or to skip the row and move on.

For write operations it could be used e.g. to increment an integer field rather than performing a read followed by a write. There is no way currently to get this optimisation from the MySQL Server, but it is possible to write an NDB API program that builds an interpreted program that does this.

The interpreter is a very simple register machine with 8 registers. It is possible to read a column value into a register, it is possible to load constants into registers, it is possible to add, subtract registers. It is possible to define labels and it is possible to branch dependent on a register value to a label. It is also possible to define a subroutine and call it.

To enable pushdown of LIKE constructs, this is also available as a interpreter instruction.

To use this one creates an NdbInterpretedCode object. For use of things like increment column one should use the functions of the NdbInterpretedCode object directly. For definition of condition filters one should use the NdbScanFilter class.

  NdbInterpretedCode code(...);
  ... Define interpreted program
  options.optionsPresent =
  options.interpretedCode = &code;


The contains lots of uses the NdbScanFilter functions. Also read NdbScanFilter.hpp to see more how one can use this object.

The general idea of using the NdbScanFilter is that you define groups of conditions. The start of this filter group is started by calling the function begin. The parameter to this specifies whether the conditions inside the group should use AND, OR, NOR, or NAND. One condition inside this group could then be a new group of conditions. One could view begin and end calls as left parenthesis (begin) and right parenthesis (end) in a textual condition.

The actual conditions are defined by a call to the function cmp. This starts by defining the comparator type (e.g. larger than, equal and so forth). Next the column to compare with. Next the value to compare with. This value is represented by a void* pointer with a length. No length bytes are expected in the value representation.

The comparator could also be LIKE and NOT LIKE. In this case the value could contain % and ? wildcards.

There is also methods to check for NULL and for NOT NULL.

Scan example#

Here is an example where we delete all rows using an ordered index on the primary key and in addition a scan filter. All the rows that pass are part of the index scan and matches the scan filter are returned and deleted.

At first we start a transaction, we create a scan object that contains the record specification of the index and one record specification of the columns to read. We use exclusive lock since we are going to delete the rows as part of the scan.

We define the lower bound to be 1 and we include 1 in the range. We define the upper bound to be 100 and we don't include the 100 in the range.

Next we also define a scan filter using an equal condition on one of the columns in the table. This scan filter will generate an interpreted program that will skip the row unless the scan filter is passed.

Using the above mentioned option SO_SCANFLAGS it is possible to specify that rows are returned in various sorted orders, it is possible to define multiple ranges and to get the range number of returned rows.

We have a double loop where the outer loop is making calls to the NDB data node and the inner loops handles the rows returned from this call to the NDB data node.

We execute the deletes once per outer loop, but we wait with committing the transaction until the scan is completed.

  NdbRecord *primary_key_record = ...
  NdbRecord *columns_record = ...
  trans_obj = ndb_obj->startTransaction();
  scan_obj = trans_obj->scanIndex(primary_key_record,
  unsigned lower_bound = 1;
  unsigned upper_bound = 100;

  NdbIndexScanOperation::IndexBound key_range;
  key_range.low_key = (char*)&lower_bound;
  key_range.low_key_count = 1;
  key_range.low_inclusive = true;


  scan_obj->setBound(primary_key_record, key_range);

  NdbScanFilter scan_filter(scan_obj);

  while (scan_obj->nextResult(true)) == 0)
    } while (scan_obj->nextResult(false)) == 0);
  trans_obj = ndb_obj->closeTransaction();

NDB Event API#

The NDB API contains an Event API that makes it possible to subscribe to all changes of a specific table. Each time a transaction is committed an event can be generated for each row that have been changed. These events are collected together for a specific global checkpoint and sent to the API node(s) that are listening to the events. There can be many listeners to the same event. This is used by NDB to inject events into a MySQL binlog to get the changes into the MySQL Replication infrastructure. It can be used by applications to wait for certain events and create actions based on those events. There are countless applications of how to use events.

The events are asynchronous events, they will arrive as soon as possible, but there is no specific guarantee on when they will arrive. Normally a cluster uses micro GCPs that are created with 100 millisecond delay. Therefore it is expected that events are delayed for at least around hundreds of milliseconds and in overloaded systems it can take longer.

The data node will buffer events in a buffer, if the API node is not fast enough to consume the events, the event will eventually fail. In this case the API node will be informed that the event reporting failed.

Create Event#

The first step is to create an event of type NdbDictionary::Event. To create such an event it needs a name and it needs a table object of type NdbDictionary::Table, the table object is retrieved by name from the NdbDictionary::Dictionary object.

Next one adds one or more table events, one can set the event to wait for inserts, deletes, updates or all of those. This uses the method addTableEvent on the event object.

The event specifies which columns that will be sent to the event listener as part of each event. This uses the method addEventColumns on the event object.

For insert events the after value is sent, for delete events the before value is sent and for update events both the before value and the after value of those columns are sent.

If the table contains BLOBs that one want to hear when they are updated it is necessary to call the method mergeEvents on the event object before creating it in the NDB data nodes. This method can be called with the flag set to not merge events from the BLOB tables.

Setting merge events to true have the impact that several writes on the same row within one global checkpoint will be merged into one event.

BLOB events are reported as mentioned above when merge events have been set, but it is necessary to read the BLOB fields using the NdbBlob object to see their value.

The next step is to create the event in the NDB data nodes. This happens through a call to the method createEvent on the NdbDictionary::Dictionary object using the event object as parameter.

Now the event is created in the NDB data node and can be used to listen to events.

Drop Event#

Events are dropped from the NDB data nodes by calling the method dropEvent on the NdbDictionary::Dictionary object using the event name as the parameter.

Create Event Operation#

Now that the event exists in the NDB data nodes we can create an event operation to listen to the events. This is done through a call to the method createEventOperation on the Ndb object using the event name as parameter.

It is necessary to call the method mergeEvents on the event operation object to specify whether it is desirable to get events merged or not. This merge setting activates the merging, but the merge flag need to be set also on the event object when the event is created in the data nodes. If not set also there there will be events missing from the data node making it impossible to merge events properly.

As part of setting up the event operation object we need to define objects of type NdbRecAttr to provide the before value (using getPreValue) and after value (using getValue) for each column we are interested in.

Finally we need to call execute on the event operation object.

Wait for events#

To listen to events one now calls the method pollEvents on the Ndb object using the number of milliseconds to wait for wakeup if no events occur. This method returns a positive number if events occur.

If events occur one calls the method nextEvent on the Ndb object to retrieve one event. This method returns the event operation previously defined. This object is now filled with information from the event and the user can use methods in this object to retrieve the information about the event type (insert, delete or update) and what columns that changed with their before value and after value.

The NdbRecAttr objects can be used to discover if attributes have changed, if they are set to NULL and the column value.

When the program is no longer interested in listening to events it drops the event operation by calling dropEventOperation on the Ndb object using the event operation object as parameter.

Node failure handling of events#

Events are generated in all data nodes. It is only the node that have been selected as primary event sender that will send his event to the API node. If a data node fails that was primary event sender the other nodes will ensure that the API node gets the information from one of the remaining live nodes instead. Thus all nodes keep the buffers for the events of a global checkpoint until the API node have confirmed that it has received those events. All this happens automatically without any user interaction.

NDB API Event example#

Here is an example of how to write an event handler loop for a specific table and a number of specific columns.

  const char *col_names[2] = { "col1", "col2" };
  NdbDictionary::Dictionary *dict_obj ndb_obj->getDictionary();
  NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
  NdbDictionary::Event event_obj("event_name", *tab_obj);
  event_obj.addEventColumns(2, col_names);
  NdbEventOperation *op = ndb_obj->createEventOperation("event_name");
  NdbRecAttr *col_after[0] = op->getValue(col_names[0]);
  NdbRecAttr *col_before[0] = op->getPreValue(col_names[0]);
  NdbRecAttr *col_after[1] = op->getValue(col_names[1]);
  NdbRecAttr *col_before[1] = op->getPreValue(col_names[1]);
  while (true) {
    int r = ndb_obj->pollEvents(1000);
    while (r > 0 && (op = ndb_obj->nextEvents())) {
      ... Handle event

Pushdown Join#

A part of the NDB API is the handling of the complex query operations. It is possible to push parts of a join query down to the NDB data nodes. This makes use of a form of linked table scans.

The first table is either using a primary key operation, unique key operation or a scan operation. Each row that is found in this first table is sent to the second table in the join order. Normally the first table is a scan operation in a pushdown join operation.

For each row found in the first table the results is sent to the NDB API, at the same time the columns needed for evaluation further down the join evaluation is sent along in a new join that involves all tables except the first table.

In this manner the query is recursively executed. If the join evaluation contains filtering parts this can lead to a very high parallelisation. If very little filtering is performed the NDB API will still be a major bottleneck since there is a limit to how fast the NDB API can process queries.

This pushdown join interface to the NDB API is used by the NDB storage engine, it is not intended for use by normal NDB API applications. I won't go through any more details how this part of the NDB API works.

Asynchronous API#

The asynchronous NDB API is currently only applicable to the key operations. It is not possible to use it for scan operations.

It only differs in how transactions are executed. Instead of a single execute there are three calls. These calls must be preceded by a call executeAsynchPrepare that prepares the defined operations for execute.

A normal execute(...) call will normally perform three actions:

  1. Prepare defined operations for execute

  2. Send prepared operations

  3. Poll for completion of sent operations

With the normal synchronous API it is possible to execute multiple transactions in parallel, but the normal use case is to use one transaction at a time. The above poll will always wait for all sent operations to complete.

Using the asynchronous API it is expected to have many concurrent transactions outstanding at any point in time and the poll doesn't have to wait for all actions to complete, it is sufficient to wait for a few actions (by default one) to complete.


sendPollNdb is more or less the same as calling execute with one major difference. The sendPollNdb can be asked to wake up before all transactions have completed. Thus the asynchronous interface can be used to work with hundreds of interactions in parallel per thread. Thus fewer threads are needed to operate the interactions, the fewer threads can thus use more batching and thus higher throughput can be achieved.


If the thread will take care of any other business while NDB is working it can use the sendPreparedTransactions call to send the prepared transactions but not waiting for their completion.


This can be used in conjunction with sendPreparedTransactions. When returning from other activities it is possible to poll for those transactions already sent to be completed.

flexAsynch Architecture#

The flexAsynch program is a benchmark program we've used to showcase hundreds of millions of reads and tens of millions of write operations per second. It uses a special model for how to achieve extremely high throughput. It can be used in other programs only requiring access to key operations.

ClusterJ, API for Java and Scala#

ClusterJ is the name of our direct access to the NDB data nodes for Java programs. ClusterJ is implemented on top of the C++ NDB API. The first layer on top of the NDB API is a JNI (Java Native Interface) layer that maps the C++ NDB API onto a Java NDB API.

Thus it is possible to program directly against most of the NDB API even from Java if desirable (this layer is called ndbjtie internally). ClusterJ adds another layer that makes it considerably much easier to program against NDB.

I am personally quite fond of the ClusterJ interface even though I rarely program in Java myself. Interestingly ClusterJ can also be used from the Scala programming language. All Java libraries can be called from Scala.

The ClusterJ layer provides an object-relational mapping that is similar to Hibernate.

To explain how to use ClusterJ we will start with the initialisation methods followed by the classes created to map tables or parts of tables into Java objects. The next step is to go through the use of the Session object that controls accesses to NDB. We will go through some details on how to control searches in NDB from the Session object. We will also provide a number of examples of how to use the ClusterJ API.

There are plenty examples provided in the MySQL source tarball. The test suite for ClusterJ contains a large set of examples of how to use the ClusterJ API.

A fairly sizable application that uses ClusterJ is HopsFS that we covered a bit earlier. This implements a Hadoop file system on top of NDB. It replaces the name nodes of the HDFS (file system in Hadoop) with a set of name nodes that use ClusterJ and store its data in NDB data nodes. This code can also be used as an example of how to write a ClusterJ application.

This code is available on and can be downloaded from there using the following command:

git clone

Installing ClusterJ#

The simplest method of installing ClusterJ is by using the Linux repos as described in an earlier chapter. The name of the repo to install is java. Installing a full RonDB application will contain ClusterJ as well.

Currently there is no prepared Docker file ready for ClusterJ.

Compiling ClusterJ applications#

When compiling ClusterJ applications we need to have the jar file clusterj-api.jar in the class path. We provide the class path using the option -classpath. There are three main places where the Java jar files for RonDB are installed. This is either in the share/mysql/java directory under the install directory of MySQL. This is where older versions of MySQL Cluster placed it.

RonDB place it in the share/java under the RonDB installation directory.

The MySQL Linux repos install the JAVA jar files for MySQL Cluster under the directory /usr/share/mysql/java.


In addition in newer versions the file names are tagged with a version number. Thus the jar file is called clusterj-api-7.5.4.jar in MySQL Cluster 7.5.4. Searching for clusterj-api-CLUSTER_VER.jar is the most appropriate on recent MySQL Cluster versions.

The program to compile Java code is javac that requires installation of the proper Java JDK for this. ClusterJ requires use of Java version 7 or version 8.

Here is the command used to compile my small test program.

javac \
  -classpath $CLUSTERJ_INSTALL_DIR/clusterj-api-$CLUSTER_VER.jar:. \

The extra . in the classpath ensures that the compiler looks for a or Customer.class in the same directory as my main program.

Executing a ClusterJ application#

Executing a Java application using ClusterJ requires access to at least two things. One is the clusterj.jar that is located in the same directory as the clusterj-api.jar described in the previous section.

The second thing we need access to is the NDB API library. We provide the path to this library in java.library.path. This library is normally found in the lib directory in the MySQL installation directory in newer versions. In older versions it can be under lib/mysql instead.

The following command will run your new ClusterJ application. The extra . directory ensures that classes already compiled in your local directory will be included in the compile or execution of your ClusterJ application. The command to start your ClusterJ application becomes more complex for bigger projects. Java code is executed by the java binary.

java \
  -classpath $CLUSTERJ_INSTALL_DIR/clusterj-$CLUSTER_VER.jar:. \
  -Djava.library.path=$MYSQL_INSTALL_DIR/lib \

Properties file#

All the properties that can be changed in the Properties object are documented in the MySQL Cluster documentation at

We will go through the most interesting ones in this section. When creating a session factory (one such per cluster connection) we need a Properties object as input. The recommended manner to fill in this object is by using a properties file. Normally this file would be called and be placed under the same directory where your application resides. But given that this is under control of the application developer it could be named differently and be placed elsewhere as well.

After reading the file and filling in the properties it is also possible to change those settings programmatically in the Properties object.


The connect string to the NDB management server(s) is a necessary setting that most installations is required to set. It can be used to set hostname(s) of management server(s) (defaults to localhost), port number (defaults to 1186).


This parameter defines the default database to use when working against NDB. Defaults to test.


If more than one cluster connection it can be set here. One cluster connection scales to some level, but if the program is required to use a full server one might need a few more cluster connections to scale. Defaults to 1.


To improve management of the cluster it is a good idea to have stable node ids on NDB API programs that execute for long times. For short lived programs it can be ok to get any node id.

This parameter can list a number of node ids to use for the cluster connections. The number of node ids should be the same as the size of the pool of cluster connections. It is also possible to only set one node id, in this case the node ids used are starting at this node id, e.g. if set to 55 and the pool size is 4 we will use the node ids 55, 56, 57 and 58.

Is not set by default.


When using tables with autoincrement columns this parameter specifies how much we will step the autoincrement between each insert. Defaults to 1.


This represents the starting value for the autoincrements. Defaults to 1. The intended use case for non-default settings of autoincrement increment offset is for RonDB Replication where both clusters can be used to insert new rows at the same time. By ensuring that the clusters use the same increment, but different start values (= offsets) we can ensure that they won't generate the same autoincrement value.


An API node allocates a batch of autoincrement values at a time. This parameter specifies how many values we retrieve per batch. Set by default to 10.


Specifies the number of retries before we give up on connecting to a management server. A negative value means that we will never give up. We will continue until we succeed with the connection setup. Set by default to 4.


Specifies the delay after an unsuccessful attempt until we make another attempt to connect to the management server(s). Defaults to 5 seconds.


This will write a bit more verbose printouts during connect process if set. Defaults to not set (= 0).


The NDB API is designed for automatic reconnect at failures. This parameter specifies how many seconds to wait until we reconnect after a network failure. The default value is 0, thus we won't attempt to reconnect. It is worth considering changing this value. The timeout is used to wait for all Session objects to be closed before a reconnect is performed.

There is a method reconnect on the SessionFactory object that can be used to change this value programmatically.


The maximum number of transactions that can be handled by one Session object. This is an important parameter to increase if using the asynchronous NDB API. This API is not accessible from ClusterJ. Thus in this case the only manner to use many transaction objects is through multiple open scans. Thus by default we can have at most 3 open scans in parallel. If more are needed this parameter must be incremented. There is no reason to decrease it. It is set to 4 by default.


The time we will block waiting for a connection from a data node. When we timeout here we will check the timeouts before and after if they have expired, if not we will retry the blocking connect attempt. Defaults to 30000 milliseconds, only multiples of 1000 are allowed.


This represents the time to wait for the first data node to connect before giving up. Defaults to 30 seconds.


After the first data node have connected, this is how long we will wait for all data nodes to connect. Defaults to 20 seconds.


By default memory blocks used to store BLOB parts are pooled within ClusterJ. To save memory at the expense of more processing one can set this value to 1. This means that buffers for BLOB objects will be using normal malloc and free calls during their usage.


Each cluster connection (SessionFactory) have one receive thread. This receive thread executes all signals arriving from the NDB data nodes. Thus the receive thread is a potential bottleneck for scalability of the NDB APIs. It is thus a bottleneck due to that only one CPU can be used to execute these signals. The way to scale this is to use multiple API nodes, this gives access to multiple receive threads.

Given that this receive thread executes so much it can be beneficial to execute it in the same CPU all the time. This parameter specifies a list of CPU ids, the number of CPUs listed here must equal the pool size.

The default setting is that no CPUs are provided, thus no CPU locking is used.

It is possible to set/unset this programmatically on the SessionFactory object using the call setRecvThreadCPUids. The input to this call is an array of CPU ids of short type. The size of the array must be the same as the number of cluster connections used (pool size above). The CPU locking is unset by using the CPU id -1. It is set by setting a CPU id recognized by the OS in the array.


This parameter specifies the threshold for how many Session objects (NDB objects) that are active in a cluster connection before the receive thread is activated to take over execution of signals. Before this threshold the user thread will take care of the receive thread work (still only one thread at a time can handle the receive thread work). By default this is set to 8. If we lock CPUs it might be useful to set this to 1 instead.

It is possible to set this programmatically on the SessionFactory object using the call setRecvThreadActivationThreshold.

ClusterJ annotations#

One of the first step in defining your ClusterJ application would be to map the table objects in your database towards a set of Java classes. These classes are mapped to the table definition using annotations. Normally such classes maps all columns of the table, but it is also possible to only map a subset of the columns and indexes to the class.

It is possible to define several classes that maps to the same table. One can see these classes as different views on the tables.

Each column of the table has a get-method that returns the value of the field of the Java type that maps to the SQL type stored in NDB. Similarly there is a set-method for setting the value of this column.

The following attributes are always preceded by an at-sign as shown in the below examples.


Before the class is defined we define that the class is a persistent class. The only attribute of interest to this annotation is the name of the table in NDB.


One can either define the entire primary key with all its columns before the class, but it is easier to simply add a PrimaryKey annotation before defining the get and set method of the column. No attributes are needed on the annotation if provided inside the class.


In many cases it is not necessary to provide any Column annotation. The default name of the column is the name found in the get and set call.

Now we have enough to give a very simple example. As shown in this example it is necessary to import all annotation types used in the java file.

The example table can be created using the following MySQL command:

CREATE TABLE customer (
  name varchar(255) UTF8,
  KEY name_index(name)

In the example below we have a table customer with two columns. The first one customer_id is an integer that is also the primary key. The second field name contains the name of the customer. There is an index on this column.

Since we want to use the method names getId and setId for getting/setting the customer id, we describe the column using the Column annotation. This is required when the name of the column in NDB is different from the names of the methods.

import com.mysql.clusterj.annotation.PersistenceCapable;
import com.mysql.clusterj.annotation.PrimaryKey;
import com.mysql.clusterj.annotation.Column;

public interface Customer {
  int getId();
  void setId(int id);

  String getName();
  void setName(String name);

We add the PrimaryKey annotation before the customer_id column to indicate that this is the primary key of the table.

The column name requires no mapping since it uses the same name on the methods as the name of the column in the database. There is an index on the column, this is discovered automatically by ClusterJ.

ClusterJ will automatically discover the partition key of a table to ensure that scans can avoid scanning all partitions when the partition key is provided.


BLOB columns is mapped using the Lob annotation type. This is used both for text BLOB's that can use the String type in Java and binary BLOB's that can use the byte [] type. The Lob annotation is provided as part of class definition right before the BLOB column.


One table can be mapped to more than one class. If a class will not contain all columns one should provide the Projection annotation before the persistence definition. At a minimum a class must contain the primary key or possibly a unique key. Without this information in the object it isn't possible to persist the object in the database.


It is possible to add variables in the Java class that is not persistent and thus not mapped to any database column. In this case one adds the annotation NotPersistent before the attribute is declared with its access methods.

Null values#

Columns that can contain NULL values can handle NULL values differently. One method is to signal with an exception if the column isn't set before the object is flushed to the database. Another method is to set the value to the default value if no value have been provided. Here are two examples of this.

  public Integer getX();
  public void setX(Integer int_val);

  public Integer getY();
  public void setY(Integer int_val);

Startup example code#

import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.SessionFactory;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.Query;
import com.mysql.clusterj.LockMode;
import com.mysql.clusterj.query.QueryBuilder;
import com.mysql.clusterj.query.QueryDomainType;
import com.mysql.clusterj.query.Predicate;
import com.mysql.clusterj.query.PredicateOperand;
import java.util.List;
import java.util.ArrayList;

public class TestClusterJ {
  ... Implementation of test methods
  public static void main(String[] args) throws Exception {
    File propertiesFile new File("");
    InputStream inputStream = new FileInputStream(propertiesFile);
    Properties properties = new Properties();
    SessionFactory sessionFactory =
    Session session = sessionFactory.getSession();
    .... Call test methods

To start up a ClusterJ application it is necessary create one SessionFactory object. From this object we create Session objects that are used to interact with NDB. The Session object is mapped directly to the Ndb object in the C++ NDB API as explained in the previous chapter. Ndb objects are mapped to cluster connections in a round robin fashion by the SessionFactory object.

To create a SessionFactory requires that we prepare a Properties object. We assume this object comes from reading the file

Equipped with a Session object we are ready to work with data in NDB.

Session Object#

We get a session object from the session factory object. The session object is mapped to the Ndb object in the NDB API. It can handle one transaction at a time and multiple rows can be fetched and updated in parallel when interacting with NDB data nodes.

Inserting a row#

We use a simple example based on the Customer class that we defined in the previous section. To insert a new object we create a new object of type Customer, next we assign values to all columns. Finally we call the persist function, the function makePersistent is equivalent to the persist function.

  Customer newCust = session.newInstance(Customer.class);

If the session has no transaction started yet, the persist call will perform the actual transaction in an NDB data node that inserts the new customer row into the database.

Thus if no transaction is started in the Session object we execute in autocommit mode where each operation is executed as a separate transaction.

Updating a row#

There are two variants of how to update a row. The first method is a so called blind update. This means that we don't read the row before we update it. We simply overwrite all values in the updating class.

This example shows that the only difference to an insert is that we call updatePersistent instead of makePersistent.

  Customer newCust = session.newInstance(Customer.class);

The other variant is a read before the update. In this case we perform the read by using the find call. In this case we only need to update the columns that will change. All the other ones will be persisted with the same column values as was read. Given that we are planning to update the row after reading it, we use exclusive lock mode already at the read to avoid deadlocks, in addition we have to explicitly use a transaction to ensure that the lock is held also after returning from find. Default mode is autocommit, thus find will by default commit before returning (thus no locks are held when returning).

  Customer newCust = session.find(Customer.class, 100);

The find call will always go immediately to the NDB data node and fetch the row.

One more variant is to issue multiple reads before we go towards the data node to fetch the rows. In the below example we use the load method (only prepares for a read) several times followed by a flush call that performs the prepared operation. The load method can only be used when an active transaction is ongoing.

Since we have a transaction started, the calls to updatePersistent will not take effect until we call commit, at commit both updates will be sent in one batch to the data nodes.

When a transaction is started only find, flush and commit will start communication with the data nodes.

  Customer newCust1 = session.newInstance(Customer.class);
  Customer newCust2 = session.newInstance(Customer.class);

Hopefully these examples provides some insights into how one can batch interactions with NDB data nodes when designing ClusterJ applications.

We will show an alternative to using the transaction interface to batch updates. This uses the updatePersistentAll interface that takes a set of rows to persist through updating.

  List<Customer> updatedInstances = new ArrayList<Customer>();
  Customer newCust1 = session.newInstance(Customer.class);
  Customer newCust2 = session.newInstance(Customer.class);

The updatePersistentAll takes a list of objects to update instead of a single object. In this case we don't use explicit transactions, instead we use the autocommit mode. Thus the updatePersistentAll will perform the updates and commit in one interaction.

Deleting a row#

Deleting a single row using autocommit is very similar to an update except that it is only necessary to update the primary key of the object and use the call deletePersistent.

  Customer newCust1 = session.newInstance(Customer.class);

We can use the read before delete in the same fashion as for updates and similarly we can use the batch interface to perform multiple deletes in a batched operation as shown in example below.

  List<Customer> deletedInstances = new ArrayList<Customer>();
  Customer newCust1 = session.newInstance(Customer.class);
  Customer newCust2 = session.newInstance(Customer.class);

Reading a row#

To read a row we can either use the find call or the load call. Both read an object based on the primary key and load it into a Java object.

find will execute immediately in the NDB data node whereas load will execute asynchronously at the next interaction with an NDB data node (find, flush, query or commit calls). As mentioned above load can only be used when an active transaction is ongoing, it cannot be used in autocommit mode.

Thus the load call is easy to use for batching a set of reads whereas find might be a bit easier to program against given that the interaction is immediate.

load use an object where at least primary key have been filled in by a newInstance call or the object have been returned by previous find call or from a query.


Each Session object can have one current transaction at most. This transaction can be found by calling currentTransaction() on the Session object. This transaction object can be used to start a transaction using the begin() call, to commit a transaction using the commit() call, to abort a transaction using the rollback() call.

It is also possible to set the transaction into a mode where it can only be rolled back by calling the method setRollbackOnly() on the transaction object. The status of this flag can be checked with a call to getRollbackOnly(). If this rollback only flag is set when commit() is called, the transaction will be rolled back.

One can check if a transaction is currently active by calling isActive() on the transaction object.

Hints where to place the transaction#

Normally the hint to place the transaction coordinator is automatically derived from the first operation in the transaction. If for some reason we want something different we can set the hint through a call to the method setPartitionKey on the Session object.

This call needs two parameters, the first is a class that is a mapping towards an NDB table. The second is an object that maps to the primary key of this table. In our examples the primary key is an integer, a number works fine as object.

For multi-column primary keys we need to create an Object[] type with one entry for each primary key column and the type for each column should correspond to the primary key column used. The order of the columns is the order defined when annotating the mapped object.

  session.setPartitionKey(Customer.class, 100);

Creating a query#

Queries are scanning a table, either using an ordered index or through a full table scan. The interface in ClusterJ doesn't explicitly specify using a table scan or an index scan. It will use an index scan if possible and otherwise it will use a table scan. ClusterJ doesn't support queries against multiple tables in the same query.

Thus ClusterJ contain a simple query optimiser that will decide the index to use through analysing the conditions.

If it is necessary to execute complex queries against NDB from a Java application one should simply use SQL through the MySQL JDBC connector. It is perfectly possible to mix using ClusterJ and using JDBC. Some of the ClusterJ test programs does exactly that. Using the MySQL JDBC connector for NDB tables works perfectly fine.

The execution of a query in ClusterJ goes through a number of steps.

  1. Create a QueryBuilder object

  2. Create a QueryDomainType object for the persistent class

  3. Create parameters needed by query (optional)

  4. Specify query condition (optional)

  5. Create Query object

  6. Set parameter values (optional)

  7. Set skip and limit (optional)

  8. Set scan ordering (optional)

  9. Execute the query

  10. Handle query result

We start with a simple example based on our usual Customer table.

  QueryBuilder qb = session.getQueryBuilder();
  QueryDomainType<Customer> qdc =
  PredicateOperand id_low = qdc.param("id_low");
  PredicateOperand id_high = qdc.param("id_high");
  PredicateOperand searched_name = qdc.param("searched_name");
  Predicate left = qdc.get("id").between(id_low, id_high);
  Predicate right = qdc.get("name").equal(searched_name);
  Query<Customer> qc = session.createQuery(qdc);
  qc.setParameter("id_low", 100);
  qc.setParameter("id_high", 102);
  qc.setParameter("searched_name", "Mikael");
  List<Customer> results = qc.getResultList();

In this example we created a QueryBuilder object and based on this we created a QueryDomainType for our Customer class that is a persistence class for the customer table.

We created one condition to return all Customers with id between 100 and 102. We use this as the left predicate. We add another predicate that we only want to return Customers with the name Mikael. We use AND between those predicates and create the query object.

100, 102 and Mikael was all parameters that could have been instantiated using input variables to a function, so this could be extended to a function very easily.

After setting the parameters we are ready to execute the query. getResultList will execute the entire query at once. Thus when using this interface it is important to ensure that sufficient amount of memory is available to handle the full result of the query.

The name used for columns is the name of the attribute in the class, thus not necessarily the column name used in the table. Here we use id and not customer_id as the column name is in the table.

QueryDomainType class#


get use the name of the column in the class to represent this in a query. The actual value is in the database, thus different for each row. It returns a PredicateOperand object.


We can introduce parameters in our query. Before executing the query these parameters must get a value set through the Query object. It returns a PredicateOperand object.


A Predicate is constituting the search condition. This predicate is boolean expression of predicates where each predicate represents a search condition. It returns the QueryDomainType object itself.

PredicateOperand class#

Many different search conditions can be applied here. Normally the PredicateOperand returned from the get call on the QueryDomainType object is used as base object and a number of methods are available to compare it with one or more other PredicateOperands. Mostly these will be PredicateOperands returned from the param call, but could also be another PredicateOperand returned from the get call.

The conditions that can be applied are equal, greaterThan, greaterEqual, lessThan, lessEqual, between (have low and high PredicateOperand), in and like.

in is used to represent a list of values, this is the same as a number of equality conditions that ORed together. In this context in is the same as calling equal, but calling in provides more context and makes it easier to choose the right execution plan for the query.

like has a parameter that is a string that can contain % and ? to form a LIKE expression.

We also have methods to check for NULL values, isNull and isNotNull.

Predicate class#

A predicate can be combined with other predicates to form a new predicate and also a predicate can be negated. We support and, or and not. Given these we can form any boolean expression since all other boolean expressions can be converted to a set of those.

Query class#

Before moving on to executing the query we need to set the parameter values defined in the query.


The parameter is provided as a string value and the value is of any type, but it must be conformant to something that can be compared to its other operands.


The setLimits call sets the skip parameter to specify how many rows to skip before considering the result rows and the limit parameter specifies the maximum number of rows that will be returned to the application.

Must be called before executing the query.


This call corresponds to an ORDER BY call in SQL. The first parameter defines whether we should use ASCENDING order or DESCENDING order (Ordering.DESCENDING). ASCENDING is default.

After that the columns are listed in the order that we sort them on, we first sort on the first column and so forth.

The columns listed must be the columns in an ordered index. It is not necessary to specify all of the columns, but the order of the columns must be the same as the order specified in the index. Thus we don't support general sorting of rows, but we support ordering of results from many index partitions that are scanned in parallel. In our example we could have used this method.

In our example above both the ordered index on the primary key and the index on the name column could have been used. With the below call we enforce that the index on the name column is used.

  qc.setOrdering(Ordering.DESCENDING, "name");

This call is a quick way of executing the entire query in one call without having to iterate the results. The returned result is a list of the Customer objects.


This is another method of executing the entire query in one call. In this case all the rows that we find in the query will be deleted. In this case we should ensure that we call setLockMode to set it to use the EXCLUSIVE mode before executing this query, otherwise deadlocks can easily occur.

Remember to switch back the lock mode after completing the transaction when switching to EXCLUSIVE lock mode.


In case the result is too big to handle in one call we can call execute instead. In this case we set the parameters in the call to execute in the order they were defined in the query definition.

This call returns a Results object. By calling iterator() on this object you will get an Iterator on Customers (in our example) that can be used to iterate through the results of the query.


Given that ClusterJ have a simple query optimiser it is also necessary to provide input to the user of how the query is going to be executed. Thus just before executing the query we can call explain to discover how this query is to be executed. The result is of the type Map\<String, Object>, it is possible to call toString on this object to get a textual representation of how the query is executed, which index is used, what type of index is used and what scan type is used.

Column type mappings#

Mappings between MySQL data types and Java data types are described in the following web page. For the most data types there is a natural mapping to a Java data type. However in Java there are no unsigned data types. Thus mapping MySQL unsigned data types to Java isn't straightforward.

ClusterJ reconnect#

When a cluster connection loses the connection to the cluster we have to close all open sessions before we can reconnect. The reason is that when we reconnect to the cluster, it could have restarted, thus there is no guarantee that we connect to a cluster in the same state as before. We discover this condition by calling getState() on the SessionFactory object. If the state is RECONNECTING or CLOSED we have lost connection to the cluster, if we discover this we should start closing Sessions.

Losing connection to the cluster means losing connection to all data nodes. As long as we are connected to at least one data node the cluster connection is still ok.

If we discover that we lost connection to the cluster we can call reconnect on the SessionFactory object. This initiates a reconnect independent of the settings used in the ClusterJ properties.

It is important to close Sessions before reconnecting since a cluster restart could potentially return the cluster with changes in meta data and data. Thus we need to start from a clean slate by closing all Session objects first. Otherwise we might use incorrect cached data in the Session object.

It is even possible that an initial cluster start was performed while we lost connection, no cached data in ClusterJ can be trusted if we lose all connections to the cluster.

The error code 4009 means that we lost cluster connection. 4010 indicates a node failure, thus most likely the cluster connection is still ok.

Dynamic mapping of rows#

The description of the ClusterJ so far has only described the case where the programmer knows the tables in the database. In this case one can use static mappings. It is also possible to use dynamic mappings in ClusterJ to access also tables that wasn't known at the time when the program was developed.

Node.js API#

The NDB API for use in Node.js applications is called Database Jones. It is a direct NDB API that can be used to develop real-time Node.js applications on top of NDB.

Node.js is a server-side implementation of the JavaScript language. JavaScript was developed to handle interactive web pages. Since so many people learned this language, it is now also used in web server development.

Database Jones is aimed at supporting web server applications that want to provide a scalable RonDB solution.

With Database Jones it is possible to have hundreds of web servers connected to the same cluster, using RonDB Replication it is possible to scale this even further by having many read clusters.

Given that NDB has very good write scalability it would be particularly interesting for applications that puts a heavy write load on the database.

With this in mind a sample application was developed that mimic a simple tweet application. The sample application is a very simple web server that accepts simple HTTP requests and shows how to program such an application towards NDB using Database Jones, the NDB adapter for Node.js.

Database Jones is developed in a separate git tree, this tree is named mysql/mysql-js.

This tree contains the NDB adapter called ndb. It also contains an extension of the mysql adapter. The description of the API can be used both towards the mysql adapter and towards the ndb adapter. The main interest in this book is the ndb adapter that connects directly to NDB using the C++ NDB API. Node.js can map C++ calls to Node.js calls in a layer similar to the JNI layer for Java.

In this chapter we will go through how to install Database Jones, how to get the sample application up and running and go through some very basic use cases for the API.

Programming in Node.js is asynchronous, this works very well with NDB since NDB is designed as asynchronous engine beneath the hood. At the same time it is a bit more complex to reason about asynchronous programs, it can take some time to get used to for someone that is used to sequential programming.

Installing Database Jones#

We will describe here how to install the Node.js NDB adapter from where the development tree resides.

To install from we need to install git at first. We also need an installation of RonDB. What we need here is the NDB API library that is used by ndb adapter.

Next step is to install Node.js. On my Mac OS X I simply wrote brew install node to do this. It should be easy enough to find out how to do this on your machine and operating system. Database Jones is working on Linux and Mac OS X primarily.

Now we have the npm program that we can use to install the mysql adapter if we want that one installed. It is not necessary since all API operations can be performed using the ndb adapter. The ndb adapter cannot be used to create, drop and alter tables.

To install the mysql adapter use the following command:

npm install mysql

Preparing to install the ndb adapter#

To install the ndb adapter we need to compile the C++ code that maps Node.js calls to C++ NDB API calls. This is done using a Node.js module called node-gyp. To install this use the command:

npm install -g node-gyp

node-gyp requires Python 2.7. On Linux we need a make tool and a compiler such as gcc.

On Mac OS X we need to install the XCode environment, this contains both a compiler and make tools.

Installing ndb adapter#

We start by preparing git to use symlinks and next we download mysql/mysql-js from and prepare to install the ndb apapter using the following command.

git config --global --add core.symlinks true
git clone
cd mysql-js/jones-ndb

At first we need to configure the ndb adapter using the command:

node configure

During execution of this command we need to point to the RonDB installation such that we find the NDB API library.

Now we are ready to build the ndb adapter using the command:

node-gyp configure build -d

It is normal to get a warning that $MYSQL_INSTALL_DIR/lib/mysql don't exist. In newer versions of MySQL this no longer exists.

The Tweet sample application#

With Database Jones installed we can now make some experiments in how to use it. Node.js is a language developed for web pages that have been developed to also be used in web servers. A natural example application is a tweet application.

The sample application is available in the directory samples/tweet directory.

For simplicity we will start up a cluster using the MySQL test tool MTR. Go to your installed RonDB installation and the mysql-test directory and run the command.

cd $MYSQL_INSTALL_DIR/mysql-test
./mtr --start ndb.ndb_basic

This starts up a MySQL Server on port 13001 and a NDB management server on port 13000 together with two data nodes.

At first we need to create the tables. We run the following command in the samples/tweet directory.

mysql --protocol=tcp --port=13001 -uroot < create_tweet_tables.sql

Now the tweet.js program contains a web server that we will start on port 7800. Node.js is a programming language that is interpreted, there is no need to compile it before running it. The compilation is done by the Node.js execution environment.

Before we do this we need to set the environment deployed to use the MTR setup. To do this we perform the following commands:

export JONES_ADAPTER="ndb"
node tweet start server 7800

Now we have a web server running on port 7800. If we run on Mac OS X we need to change the LD_LIBRARY_PATH to DYLD_LIBRARY_PATH.

In the last line node is the Node.js execution environment. tweet means that the file tweet.js is executed. start server 7800 are the three startup parameters to this tweet program.

export needs to be replaced by setenv on some shell implementations.

This is the general method of executing a Node.js program.

node my_program_name program_parameters

This will execute the program in my_program_name.js. Since Node.js is an interpreted language, there is no need to compile the program before executing it. The compilation happens on the fly.

The next step is to populate the database with the initial data. This is performed using the script This script is hardcoded for some reason to use the mysql adapter. If you haven't installed the mysql adapter you need to manually change to use ndb adapter on one of the first lines in this script (the export command).

Now we run the that runs some http requests such as this one:

curl http://localhost:7800/tweets-about/carthage

The variable JONES_DEPLOYMENT affects what deployment is choosen from the file jones_deployment.js in the top directory for this test run.


The first part of the program always calls require with database-jones as parameter. The next step is to create an object of type ConnectionProperties. This object decides how to connect to RonDB.

mysql_host, mysql_user, mysql_port and mysql_password contains the parameters needed to connect to the MySQL Server using the mysql adapter.

None of these are needed for the ndb adapter. But they could be used to create tables from the ndb adapter using a MySQL Server.

It is also possible to set mysql_charset, mysql_socket, mysql_sql_mode, mysql_pool_size and a few debug parameters on the mysql adapter. These are not possible to set on the ndb adapter.

Common ConnectionProperties#

There is only one common ConnectionProperties for the mysql and the ndb adapter. This is the database property. This is by default equal to test for both adapters.

ndb ConnectionProperties#


Defaults to 4. This is the number of retries we will perform to connect to the NDB management server before giving up. A negative value here means that there is no limit on the number of retries, it will be retried indefinitely.


This represents the delay between attempts to connect to the cluster. By default it is set to 1 second.


If set this variable ensures that printouts to the console is made of progress on cluster connect setup. By default it is set.

ndb_connect_retries, ndb_connect_delay and ndb_connect_verbose are all passed to the call where the cluster connection is made.


This parameter sets a timeout on how long we will wait before we start closing the cluster connection when the connection was closed. By default this is set to 500 milliseconds.


Setting this to true means that we are going to use the asynchronous NDB API functionality to increase the parallelism in using the NDB API. By default this is set to false.


Setting this to true means that we keep results fetched from NDB in NDB API buffers, otherwise they are copied out to JavaScript objects. Defaults to true.


Each cluster connection uses a pool of NDB objects (Each Session object in the ndb adapter maps to one NDB object. This parameter sets the minimum number of objects in this pool. It defaults to 4.


This parameter sets the maximum number of objects in the NDB object pool. By default it is set to 100.


The maximum number of transactions in an NDB session, only one is visible to the user, but multiple may be started before the current have finished. Defaults to 4. If the asynchronous API is used it is recommended to increase this parameter to 1024 since the asynchronous API can run many transactions in parallel.

Creating a ConnectionProperties object#

This object is created by calling ConnectionProperties with the first parameter set to a string with either ndb or mysql dependent on which adapter is used.

The second parameter is not needed, if it is not set a default object is returned. Next the application can change this object programmatically as desired before proceeding.

The second parameter can also be a function in which case this function is used to fill in the variables in this object.

The second argument can be a string pointing to which deployment to use (several can be defined).

The deployment is found in a file called jones_deployment.js. An example file of this is found in the top directory of the mysql-js installation.

The search starts for this file starts in the directory containing the currently executing main source file. Next it tries in the directory above and continues until it reaches the root of the file system. It attempts to find it in the directory that is the current working directory.

Alternative 1:
  var con_prop = new db_con.ConnectionProperties("ndb");
  con_prop.database = "new_db";

Alternative 2:
  function set_prop(props) {
    props.database = "new_db";
  var con_prop = new db_con.ConnectionProperties("ndb", set_prop);

Alternative 3:
  var con_prop = new db_con.ConnectionProperties("ndb", "test");

Cluster connection object#

After calling require on Database Jones and creating a ConnectionProperties object, we are ready to create a cluster connection object.

For the mysql adapter this object is simply a pool of connections the MySQL Server. Each session will be represented by a connection to the MySQL Server.

For the ndb adapter this object creates a cluster connection. Thus we can use multiple such objects for one Node.js server since NDB supports having multiple cluster connections from one program.

Each cluster connection will have its own SessionFactory returned and when a Session object is created it will use an NDB object to map the Session calls to NDB API calls.

The call to create the SessionFactory object is created using two parameters, the ConnectionProperties object and a mappings object. The mappings object can be an array of table mappings that can be used by the cluster connection.

Table mappings#

Table mappings in Database Jones is very similar to the annotations used in ClusterJ. In our example we will use the Customer table. At first we need to create a function to fill in the Customer object. In this example the JavaScript object have two fields id and name. The table uses the column names customer_id and name.

This object can be created when reading the row from the database when it is called without parameters, it can also be called from application logic with proper parameters. By checking the first parameter for undefined we can derive whether to set the variables.

  function Customer(customerId, customerName) {
    if (customerId != undefined)
    { = customerId; = customerName;

The simplest method to map this table would be the following code.

  var db_con = require("database-jones");
  var cust_map = new db_con.TableMapping('customer').

Now this won't work since the id is stored in a column called customer_id. We need to define also the field mappings like this:

  var db_con = require("database-jones");
  var cust_map = new db_con.TableMapping('customer');
  cust_map.mapField("id", "customer_id");

Here we provide a mapping between the id used in the JavaScript object and the customer_id used in the table.

It is also possible to specify a Converter object that converts between database types and JavaScript types. It is also possible to set a field to not be persistent. For more details on this see the documentation in the database-jones/API-documentation directory.

It is not necessary to specify indexes in the table mapping. For all key lookup operations (both read and write) all primary key columns must be specified in the object. Alternatively all columns of a unique key column must be specified. Otherwise an exception is generated on the call on the Session object.

The table mappings will use all columns of the table. If the table changes such that a new column is added while the cluster connection is up and running the new column will not be part of the mapping. Thus schema modifications and software upgrade must be closely aligned.

Startup example#

  var db_con = require("database-jones");
  var con_prop = new db_con.ConnectionProperties("ndb", "test");
  function Customer(customerId, customerName) {
    if (customerId != undefined)
    { = customerId; = customerName;
  table_maps = [];
  var cust_map = new db_con.TableMapping('customer');
  cust_map.mapField("id", "customer_id");
  function successFunction() {
  function failFunction() {
  function exampleOperation(session) {
  db_con.connect(con_prop, table_maps).
    then(function() { return sessionFactory.openSession()}).
    then(exampleOperation, failFunction);

The Node.js API has a lot of similarities to the ClusterJ API. This isn't surprising since the ClusterJ architect was involved also in designing the Node.js API.

There is a main difference though in that Node.js is a language designed for asynchronous operation. Each time we call a function that accesses the NDB data node we can use callbacks to continue the execution. This makes it possible to perform rather complex interactions with several steps with a few lines of simple code.

We use the standard promises design pattern in some of those examples. The reason is that most of the asynchronous calls in Database Jones return a promise that is Promises/A+ compatible.

The idea with promises is that they return either with success plus a return value or it returns with a failure and a return value. The return value can be multiple objects.

Putting these things together our example codes will always use the following startup code. Here exampleOperation will be replaced by the operation we want to show an example of in coming sections. We use the SessionFactory returned by the connect call to create a Session object to use in our examples. We ignore the error handling function and the function to use at success here.

Session operations#

Inserting a row#

The below example shows a very simple example of an insert row. We create a new Customer object and call the method persist on the Session object. This performs an insert of the provided object.

  function insertOperation(session) {
    newCust = new Customer(100, "Mikael");
      then(successFunction, failFunction);

Updating a row#

To update the row instead of inserting we simply replace the persist function with the update function.

  function updateOperation(session) {
    newCust = new Customer(100, "Mikael");
      then(successFunction, failFunction);

Delete a row#

To delete a row we only need to specify the primary key and we use the remove function to perform the delete operation.

  function deleteOperation(session) {
    newCust = new Customer(100);
      then(successFunction, failFunction);

Write a row#

To write a row (insert if row with same primary key doesn't exist and update if it previously existed), one uses the save method.

  function writeOperation(session) {
    newCust = new Customer(100, "Mikael");
      then(successFunction, failFunction);

Read and update a row#

Now we move on to a slightly more advanced example. In the previous examples we executed in autocommit mode. Only one operation was executed in the transaction, there was no need to introduce any transactions. Now we move onto a transaction where we first want to read the row, followed by an update of the row where want to change the name of the customer.

In this case we create a new function changeCustomer to handle the change of the customer name.

We introduce a transaction that we begin before reading the row and complete after updating it. The method currentTransaction returns the transaction where we can apply the functions begin(), commit(), rollback(). We can also set the transaction to be only abortable by a call to setRollbackOnly() and we can check if it is only abortable by calling isRollbackOnly().

We set the Session to now work with exclusive locks on reads to ensure that the transaction doesn't deadlock after attempting to perform a lock upgrade from shared lock to exclusive lock. The lock modes that can be set are EXCLUSIVE, SHARED and NONE. NONE corresponds to READ COMMITTED.

When a transaction is started on the Session object the call to update will return immediately, it will be executed by the next operation that requires an interaction with the NDB data nodes (calls to flush, find, commit or rollback).

  function readOperation(session) {
    this.session = session;
    function changeCustomer(customer, name) { = name;
    var customerName = "Mikael";
    var customerId = 100;
    var customer = new Customer(customerId);
    session.find("customer", customer).
      then(successFunction, failFunction);

Batching multiple operations#

In this example we show how to batch two insert operations into one execution towards the NDB data nodes.

To do this we introduce a batch object by calling the method createBatch on the Session object. This object have more or less the same interface as the Session object except that the calls are batched for later execution instead of executed immediately.

To execute the batched operations we need to call execute on the batch object.

  function insertMultipleRows(session) {
    batch = session.createBatch();
    newCust1 = new Customer(101, "Mikael jr");
    newCust2 = new Customer(102, "Mikaela");
      then(successFunction, failFunction);

We can also batch multiple keyed reads using the same batch interface as shown here. After reading the rows we want to update them again after changing the customer names.

  function insertMultipleOps(session, cust1, cust2) { = "Michael jr"; = "Michaela";
      then(successFunction, failFunction);
  function readAndInsertMultipleRows(session) {
    batch = session.createBatch();
    var newCust1 = Customer(101);
    var newCust2 = Customer(102);
    batch.find("customer", newCust1);
    batch.find("customer", newCust2);
      then(insertMultipleOps(session, newCust1, newCust2)),

Scanning rows#

There is also an interface to scan tables through an ordered index. To perform this we need to first create a query object. When creating the query object we can supply the table name or a table mapping object.

This method returns a Query object. Through this query object we can define the query search condition. We define one predicate per call. All functions that define predicates return a query predicate that can be passed into a new predicate function.

The conditions supported are eq (equality condition), ne (not equal), gt (greater than), ge (greater than or equal), lt (less than), le (less than or equal), isNull, isNotNull.

There are numerous functions to join two predicates.

and(predicate) (if both predicates are true), or, andNot (this query predicate with the negation of the parameter predicate), orNot, not.

function handle_results(err, results) {
  if (err) { ... } // Error code
  else     { ... } // Successful code
function query_definition(query) {
  return query.execute("order" : "desc");
function scanCustomers(session) {
    then(handle_results(err, results),failFunction);

In the example we use the short form where refers to the name column. In the execute we define the query to be executed in descending order. It is also possible to set a limit in the same manner that limits the number of rows to return. Also skip that skips the first number of rows before starting to report rows.

Default order is no order, default limit is several billions of rows and skip defaults to 0.

The result of the execute method is returned with an error condition and an array of result objects. The result objects are in this case a set of Customer objects.

Join operations#

There is an interface to define join operations using Database Jones. This uses the Projection interface. This code is a bit less stable than the rest of the code, I will skip describing it here. It is documented in the same place as the rest of the code.

We recommend using SQL to execute complex queries such as joins between tables, as this is the most well tested method of executing complex queries. If the query is complex it makes sense to pay the little extra cost in shipping it over to the MySQL Server for execution.

Final comments#

The above examples were developed to show one way of using Database Jones to develop a web application. More complete examples are found in the samples/tweet directory.

The API that Database Jones supports makes it possible to access NDB in many more variants than I have shown here. These examples are there to get you started to develop more interesting applications using Database Jones.

The complete API documentation is found in the database-jones/API-documentation directory.