Skip to content

C++ NDB API#

Now we have completed the view of RonDB from the SQL point of view. The SQL view always depends on lower level constructs in its implementation. All accesses to data in tables, metadata about tables and retrieval of configuration data goes through the NDB API.

In MySQL there is a storage engine concept to allow many different implementations of the database engine. We have mapped the storage engine interface to our NDB API interface in the NDB storage engine.

The NDB API is implemented in C++, it is possible to write applications directly towards this C++ API. We have implemented special interfaces on top of the NDB API for Java (ClusterJ) and for JavaScript (Database Jones, a Node.js API towards NDB) that we will describe in subsequent chapters.

The aim of this chapter is to describe the NDB API from a high level to make it easier to understand how to write NDB API programs. There is a large set of example programs available in the directory storage/ndb/ndbapi-examples in the RonDB source tree. In this chapter we focus all attention on using the NdbRecord variant of the NDB API, some of the examples are old and uses the column format where one column at a time is provided. The new record format is faster, but the old column format is still supported and can thus still be used.

There is documentation of the NDB API available that is for the most part retrieved from the header files using Doxygen. Some parts are skipped in the documentation since they are treated as internal parts of the NDB API. These are safe to use, but have a higher risk of being changed in future versions since they are deemed as internal parts of the API.

Most of the early research work on NDB was centered around the NDB data nodes and how to implement those. In early 1998 the implementation of the data node parts was nearing completion and we started considering how the application code would interface the data nodes.

The NDB data node was written to handle requests asynchronously. We did not expect that application developers wanted to work with an asynchronous API towards NDB. Thus we decided to make a synchronous API. For the most part this decision has been a correct decision, implementing the NDB storage engine requires a synchronous API and using a synchronous API is a lot simpler compared to an asynchronous API.

At the same time the asynchronous API can be used to implement fairly elegant implementations and the code will be a lot easier to optimise. We ended up handling key operations using an asynchronous API that is used by a few users and by a number of our internal tools. It has also been used in a number of benchmarks we have developed to showcase the throughput possibilities of NDB.

At this point scan operations can only be accessed from the synchronous NDB API part.

Meta data operations#

The NDB API contains code to handle all interactions with the NDB data nodes including normal user transactions as well as event handling and meta data operations.

It is not recommended to use the meta data part of the NDB API directly in a user application. It is recommended to create all tables using the MySQL Server. Creating a table through the NDB API means that it isn't possible to use it from the MySQL Server. It is a lot simpler to use the SQL syntax to create tables, drop tables, create indexes, foreign keys and so forth.

The MySQL server will also take care of synchronising table changes with slave clusters.

Thus a normal NDB API application will contain code to both connect to a MySQL Server to create the necessary tables and other meta data for the application and the NDB API part to work with those tables.

We will not go through the meta data part of the NDB API in this book and will instead go through the specific syntax required for handling table creation and so forth of NDB tables in the MySQL Server.

Thus it is recommended to always have at least one MySQL Server in an NDB Cluster even if the MySQL Server isn't used at all for normal transactions or queries.

Initialising the NDB API and MySQL client API#

Before any interactions with the NDB API it is necessary to initialise the NDB API first. This happens through a call to the method ndb_init().

Similarly it is necessary to initialise the MySQL client API before using it by calling mysql_init().

Concurrency in the NDB API#

In 1998 most computers were using a single CPU or possibly two CPUs. Concurrency was limited, however it was necessary to handle many concurrent threads accessing the NDB API.

We decided to create a C++ class called Ndb that is used by one thread. There can be many Ndb objects that can be operated on from many different threads at the same time. But one Ndb object can only be operated on by one thread at a time. Thus this object must be handled with care. It is possible to use it in different threads, but only in one thread at a time. If using it in many threads, it is necessary to protect the move of the Ndb object to another thread.

The Ndb object uses a set of classes that we call the Transporter classes. These classes are responsible for taking the information from one Ndb object and converting it to the NDB protocol and later send this information to the NDB data nodes. It is responsible to receive data back from the NDB data nodes and read the NDB protocol and place the results back into the objects handled by the Ndb object.

In the first implementation of the NDB API and all the way up to MySQL Cluster version 7.2 the Transporter layer used a very simple concurrency control by implementing a Transporter mutex that ensured that only one thread at a time was able to interact with the transporter parts.

After a few years this became a bottleneck and to avoid this bottleneck we made it possible to have several API nodes within one API program. In the MySQL Server this is a configuration variable called --ndb-cluster-connections. In 7.2 and earlier versions each API node scaled to about 1.5 CPUs when running a simple benchmark using the NDB API directly (using the MySQL Server means it scales to more CPUs since the MySQL server code will use quite a few CPUs to handle query processing). So e.g. to run a benchmark in a 8-CPU environment was made possible by using 6 API nodes (or cluster connections as they are called in the NDB API). Each cluster connection uses their own node id and can be seen as independent nodes in most management views from NDB.

Now with the multi-core revolution that started around 2006-2007 the number of CPUs per machine escalated very quickly. Thus it was necessary to implement a more scalable concurrency control of the Transporter part. This was implemented in MySQL Cluster 7.3 and makes each API node or cluster connection scale a lot more. So e.g. for the Sysbench benchmark each cluster connection scales to more than three times as many CPUs as before. For DBT2 it was even more where it scaled to more than seven times more CPUs.

Now it is possible to to use only four cluster connections even when running a MySQL Server using 32 CPUs when executing the Sysbench benchmark.

The major limiting factor now in the Transporter part is that there is only one thread at a time that can execute the receive part of the Transporter part.

Ndb cluster connection#

The Ndb_cluster_connection object is the starting point for all NDB API programs. When creating this object it is necessary to specify the NDB connect string. The NDB connect string is used to fetch the cluster configuration from the NDB management server. It is possible to specify multiple NDB management servers in the connect string to increase the chance for a successful configuration retrieval. It is possible to specify a specific node id to use by this cluster connection. If not specified the management server will pick a node id that is available currently in the configuration.

To use specific node ids is not absolutely required. In my view it is recommended to use it as much as possible. For management servers and data nodes I recommend to always use it since the node id is connected to a state saved on disk. It will simplify management of the cluster if all processes start up using a specific node id. This requires setting the node id in the configuration for these node types.

For MySQL Servers and long running API programs it is a good idea to use specific node ids.

The normal procedure for the Ndb_cluster_connection object is to first create the object specifying the NDB connect string and possibly node id as well. Next a connect call will fetch the NDB configuration from a management server. Next a call to wait_until_ready will wait until we are connected to at least one NDB data node.

After these calls our API program is now a part of the cluster and will show up when we list the nodes in the current cluster.

Auto reconnect#

Once the NDB API has established communication to the data nodes it will automatically balance the load on the connected data nodes. If a data node is no longer connected we will automatically try to reconnect to this node without any need for the user of the API to interact. It is possible to disable this behaviour if desirable.

Selection of data node to communicate with#

Each transaction is handled by one specific data node. There is a number of things impacting the decision which data node to choose. The first consideration is whether the user have provided a hint when starting the transaction, if he has, we will use this hint to connect to the data node if possible.

The hint doesn't say which data node to connect to, it rather specifies a primary key in a specific table, this primary key is mapped to a node using a hashing function and some data distribution hints provided to us from the data nodes when we started using the table. These are hints that are very stable, but they can change e.g. during an add node operation. There is no guarantee that they will be 100% accurate. If they are incorrect, it is not a problem since the data node will get the correct nodes to read and write data from, independent of which data node the transaction is started from. The data nodes always have a correct view of the data node distribution of each table.

If no hint is provided we use a round-robin scheme to ensure that we spread the load on all data nodes.

When using SQL we always use the first SQL statement of a new transaction as the way to provide the hint. If the first SQL statement specifies a primary key to use for a table we will choose this as the hint for the transaction we start.

Setting data node neighbour#

In case we haven't specified a hint about where to place the transaction coordinator we can set the node id of the closest data node in the configuration. This will be used in those cases.

If a MySQL Server has the same IP address as a data node it will automatically become the neighbour of this data node and will prefer transactions to use this data node unless a hint has been provided.

However in some situations we know that two nodes are close to each other but they use different IP addresses. This could happen e.g. when we use a Docker network setup where each node have its own IP address but some IP addresses are actually located on the same physical server or even the same virtual machine.

In this case one can provide the data node neighbour through the set_data_node_neighbour call on the Ndb_cluster_connection object.

In the MySQL Server one can set this node id through the ndb-data-node-neighbour configuration variable.

The main use of the data node neighbour concept is for query processing where we are issuing complex SQL queries. In this case most likely no primary key is provided, round robin would be used unless a data node neighbour is selected.

Using a data node neighbour is important if the network between the computers have limited bandwidth. Using a node local to the computer means that we don't have to use limited bandwidth available in connecting between computers in the cluster.

Combining this feature with the possibility to read from backup replicas and possibly even using fully replicated tables makes the API node very strongly connected to the neighbour data nodes in their interaction for all query processing and makes it possible to build query processing that scales in an excellent manner when adding more and more MySQL Servers and NDB data nodes.

Locking receive thread to specific CPU#

It is possible on Linux, Solaris and Windows to lock threads to specific CPUs. In the NDB API we have the possibility to use this feature in the NDB API by calling the function set_recv_thread_cpu on the Ndb_cluster_connection object. The API call is prepared for multiple receive threads per NDB cluster connection. Currently we only support one, the array always should be 1 in length in this call.

It is possible to specify the CPUs to use for NDB cluster connections in the MySQL Server configuration. For this one uses the ndb-recv-thread-cpu-mask configuration variable. In this variable one can list e.g. 0,2-4 to specify that CPU 0 is used for the first cluster connection, 2 for the second, 3 for the third and 4 for the fourth. The number of CPUs here must be at least the number of cluster connections specified in the configuration variable ndb-cluster-connection-pool.

There is a call set_recv_thread_activation_threshold that specifies how many API threads that should be concurrently used for the receiver thread to take over receive processing. Setting CPU locking means that it can be a good idea to set this to 1 to ensure that the receive thread is used for all receive handling when multiple threads are active. There is a corresponding MySQL configuration parameter called ndb-recv-thread-activation-threshold for this as well.

By default there is no CPU locking for receive thread and the activation threshold is set to 8.

Locking the receive thread to a specific CPU improves performance if it is combined with locking the MySQL server process or the NDB API program to use other CPUs and not the CPUs used by receive threads. It is also necessary to control other programs in the computer that uses large CPU resources, one should also control interrupt handling to avoid those CPUs. If this cannot be done one should not use CPU locking of receive threads.

Ndb object#

The Ndb object is the starting point for all NDB interactions. As mentioned this object should not be used from more than one thread at a time. When waiting for a response back from the NDB data nodes it is important to not use this object while waiting.

Concurrency in the NDB API comes from three places. First it is possible to use multiple cluster connections, second it is possible use multiple threads per cluster connection using separate Ndb objects. Third it is possible to handle multiple operations and even multiple transactions per interaction using the Ndb object.

The Ndb object is a placeholder for transaction objects, operation objects and many other objects used by the Ndb object. Objects are created through the Ndb object and destroyed by the Ndb object.

Before starting to use the Ndb object it has to be created and it is necessary to call the method init().

When creating the Ndb object one specifies two things, first the Ndb_cluster_connection object used and second a string containing the database name used by the Ndb object. It is possible to change the database name used later on.

The init() method has one parameter that is set to 4 by default, the parameter specifies the maximum number of transactions that can be active in parallel on the Ndb object. Using the asynchronous API or using batching with one Ndb object it is possible to handle up to 1024 transactions in parallel from one Ndb object.

Here is a simple code example skipping all error checks for creating the cluster connection and the first Ndb object.

  Ndb_cluster_connection *cc =
    new Ndb_cluster_connection(connectstring, node_id);
  cc->connect();
  cc->wait_until_ready(timeout, wait_after_first_node_connected);
  Ndb* ndb_obj = new Ndb(cc, "database_name" );
  ndb_obj->init();

Setting up a row#

Before a user transaction can be called we need to setup a row data structure. The NDB API also contains a variant where one takes care of one column at a time. I will only describe the NDB API to handle things using the NdbRecord columns since this is nowadays the recommended use of the NDB API.

To setup a row we need a reference to the table and the columns we want to operate on.

The first step is to get an object from Ndb of the type NdbDictionary::Dictionary that is retrieved using the call getDictionary() on the Ndb object.

Next this dictionary object is used to get a table object of type NdbDictionary::Table. This is retrieved by using the call getTable(tableName) where tableName is a string with the table name.

Next we use the table object to retrieve one or more column objects. This is retrieved by using a set of calls of the type getColumn(colName) where colName is a string containing the column name.

Next we create a record specification object that maps each column into a buffer from where data to set will be read or data read will be inserted when read from NDB.

This is an object of the type NdbDictionary::RecordSpecification. We create an array of such objects with as many members in the array as the number of columns we want to access.

We create an Ndb record by calling createRecord on the dictionary object. This method have as input the table object, the record specification array, the number of entries in the record specification array and the size of the NdbDictionary::RecordSpecification object (needed for backwards compatibility since there used to be an older variant of the record specification object).

This call returns an object of the type NdbRecord that can now be used in user transactions. It is also possible to set up a row for access through an ordered index or a unique index. In this case we need both the index object and the table object in the call to createRecord.

Here is a very brief example of the above.

  struct row_buffer
  {
    int col1;
    int col2;
  };

  ....

  NdbDictionary::Dictionary* dict_obj= ndb_obj->getDictionary();
  NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
  NdbDictionary::Column *col = tab_obj->getColumn("column_name");
  NdbDictionary::RecordSpecification spec[2];
  spec[0].column = col;
  spec[0].offset = offsetof(row_buffer, col1);
  spec[0].nullbit_byte_offset= 0;
  spec[0].nullbit_bit_in_byte= 0;
  spec[1].column = col;
  spec[1].offset = offsetof(row_buffer, col2);
  spec[1].nullbit_byte_offset= 0;
  spec[1].nullbit_bit_in_byte= 0;
  NdbRecord *pk_record =
    dict_obj->createRecord(tab_obj, spec, 2, sizeof(spec));

For each element in the column array it is necessary to set the column object retrieved previously in the variable column and an offset to the column in the buffer provided in a variable called offset. There are variables called nullbit_byte_offset and nullbit_bit_in_byte. These two numbers point out which bit in the null bits of the record that is used to represent the NULL state of the column. These are ignored for columns that are declared as NOT NULL. Thus nullbit_byte_offset is the number of bytes from the start of the row.

Thus the row buffer must have space allocated for both the column values as well as for NULL bits. It is recommended use at least 32 bits for the NULL bits to avoid misaligned column values. Similarly for all column values it is recommended that they are aligned on 32-bit boundaries for efficient access without misaligned traps. On a SPARC it is necessary to use this alignment.

Transactions#

Every interaction involving reading or writing user tables in NDB uses transactions. A transaction goes through a number of steps, first it is created, next it is executed and the final execution specifies either commit or abort, it can be executed multiple times before abort/commit and finally it is closed.

Start a transaction#

Starting a transaction means that we allocate one connection record in one of the DBTC threads in the cluster. Once such a record have been allocated by an API node the API node keeps the connection record. This means that only the first time the start transaction involves a distributed interaction with the NDB data nodes. Most startTransaction calls are local calls that grabs a connection record from a linked list of free connection records and there is one such linked list per data node in the cluster.

The start transaction can be without any parameter startTransaction(). In this case a round robin mechanism is used to decide which data node to start the transaction in, if we have a neighbour data node (either with same IP address as API node or by setting data_node_neighbour in the cluster connection object), we use the neighbour node.

Start transaction with hint#

To start a transaction with a hint on where to start the transaction is a very important feature of RonDB. This is important to write scalable applications using RonDB. As an example HopsFS mapped a hierarchical file system on top of a 12-node RonDB cluster. In order to get this to scale it was important to ensure that operations such as ls (list files in a directory) could execute using a scan on just one partition of the table. HopsFS solved this by using the parent directory as the partition key for the table.

Similarly TPC-C have a warehouse id that is part of all tables that can be used to make TPC-C scale perfectly if the warehouse id is used as a partition key. Selecting the proper partition key is important to design scalable applications in NDB.

There are two reasons for this. The first is that if one makes a transaction that updates many rows that have some connection (this is what happens in TPC-C and in HopsFS and in almost all applications), it is very valuable to place the transaction coordinator on the same node as the primary replica location of the rows to be updated. This will significantly decrease the overhead of communication within the cluster during the transaction.

The second is that we can perform ordered index scans that has an equality bound on the partition key, or a full table scan with such an equality bound. These scans only need to scan one partition of the table since all rows sought for will be housed in the same node. By placing the transaction coordinator on this node, we can avoid communication to get to this partition since the partition is placed on the same node.

Decreasing the number of partitions that is required to scan gives us a lower startup cost for the scan. Each additional partition to scan brings an additional small cost to the query execution.

Decreasing communication needs is always important to create scalable distributed applications whether it is a NoSQL database, a shared disk relational DBMS, or a shared-nothing DBMS such as RonDB. Modern hardware makes communication cheaper and makes it easier to write distributed applications, but it will always be beneficial to write your application to minimise the communication needs.

To start a transaction with a hint we use a variant of the startTransaction call that use a few parameters. It uses a table object of the type NdbDictionary::Table as input that is created by calling the method getTable(tableName) on a NdbDictionary::Dictionary object. The second parameter is NULL-terminated array of objects of the type Key_part_ptr. The Key_part_ptr object is a simple struct containing a void pointer and a length of the pointer. This data represents the distribution key or partition key of the table defined by the table object. These are checked against each other that they are consistent.

There is also a method that uses a NdbRecord, this is not part of the public NDB API, but is used by the NDB storage engine. The NdbRecord here is a record of the primary key of the table used. Here is a simple example to start a transaction with a hint given this interface.

  NdbRecord *ndb_rec = ....
  Ndb *ndb_obj = ....

  NdbTransaction *trans_obj = ndb_obj->startTransaction(ndb_rec,
                                                        row_buffer,
                                                        NULL,
                                                        0);

The two last parameters specify a buffer to be used for hashing. By specifying NULL as pointer to the buffer we ensure that the malloc call is used, otherwise one has to ensure that the buffer is large enough for the hash function call. In the public function described above these two parameters defaults to NULL and 0. But they can be specified explicitly also in this call.

If a buffer is provided it must be large enough to contain all columns in their maximum size and the buffer must be aligned to a 64-bit boundary to be used in these calls. So the buffer should be fairly large if provided. The hash function must work on an expanded row to ensure that different representations of the same word uses the same hash value (there are many character sets that have multiple ways to write the same word).

If no such pointer is passed the NDB API will have to use malloc and free to allocate and release the memory needed for executing the hash function.

It is also possible to start a transaction on a specific node id and instance id, but normally the application will have no knowledge of the table distribution such that this will be of any use whereas using the partition key will be very useful.

Define operations#

After starting a transaction we define a set of operations for the first interaction with the NDB data nodes. We will go through in detail how this is done below, these could be operations on a specific record using either a primary key or unique key, it could be scan operations that scan a part of a table using either a full table scan or an ordered index scan. Scans can be used to write rows where we find the rows to write through a scan.

Executing a transaction#

After defining one or more operations it is now time to execute the operations. When executing key operations one can decide whether it is time to commit the transaction while executing it. It is possible to commit with a set of operations, it is also possible to commit with an empty set of operations. Committing an empty set of operations will commit all operations executed since the start transaction call was made.

When a transaction has completed executing a commit of a transaction it is no longer possible to continue using the operation in any other manner than to close the transaction.

The call to execute is made on the NdbTransaction object received in the startTransaction call. The method is called execute and the only required parameter is whether to commit, not commit or abort. The value sent in to commit is NdbTransaction::Commit, to execute but not commit yet one uses NdbTransaction::NoCommit and to abort a transaction one uses NdbTransaction::Rollback. We can provide a second parameter that specifies how to handle errors during transaction execution.

Setting this second parameter to AbortOnError means that no errors at all are allowed. AO_IgnoreError means that we will commit as much as is possible to commit, thus for most errors we will continue the transaction even if one part of the transaction fails. The default setting for the abort option is that it can be set on a per operation basis. However explicitly setting it in the execute call overrides the operation setting.

It is possible to set a third parameter with a force send flag. By default this is turned off. In the MySQL Server this is controlled by a configuration parameter in the MySQL Server.

Scan operations use execute as part of starting up a scan. The continuation is however different after the startup of the scan. A scan can handle any number of rows so there is an API to control the fetching of rows to the API that we will describe below.

Close a transaction#

After committing a transaction one needs to close the transaction to make all allocated resources available again. If the transaction had not been committed yet and the NDB data node is waiting for more input the NDB API will abort the transaction as part of the closeTransaction() call.

A call to start a transaction with a hint immediately followed by a call to get the node id of the transaction coordinator (using getConnectedNodeId()) followed by a call to close the transaction is a quick way of discovering which node the primary replica record belongs to. This method is used by the flexAsynch benchmark program and the ndb_import program (introduced in 7.6) to distribute queries to different threads where each thread is handling one data node.

Key operations#

To define a key operation is straightforward after setting up the row. One calls a method on the NdbTransaction object and it returns an NdbOperation object.

Insert tuple#

To insert a row we call the method insertTuple that requires four parameters. The first is the NdbRecord object for the primary key and the second is a pointer to the memory buffer for the primary key columns. The third parameter is the NdbRecord for the rest of the columns and the fourth is the memory buffer for these columns. The memory buffers must contain the data to be inserted in all columns before the call.

In a sense one can see the NdbRecord objects as the meta data descriptions of a row object. The memory buffers contains the actual data that is interpreted according to the NdbRecord object.

There is a variant where only two parameters are needed where the primary key columns are combined with the rest of the columns. This only exists for the insertTuple call and not for other types of calls.

The buffer can be reused for other purposes immediately after the call since the data have been saved in the NDB API and no data is to be returned in this case.

Update tuple#

Exactly the same as inserting of a row except that instead we use the call updateTuple with the same parameters and we still need to set all columns before the call. Also here the buffers can be immediately reused.

Write tuple#

The writeTuple call will perform an insert if the row didn't exist before or update if the row previously did exist. All required columns need to be part of the record specification since it can become an insert. Also here the buffer can be immediately reused.

Delete tuple#

The call deleteTuple uses the same parameters, but the fourth parameter can be skipped if no read is performed while deleting the row. The record specification for the entire row is still required in this call.

Read tuple#

The call readTuple will perform a read of one tuple. The parameters are still the same as when calling insert, update and write of a tuple. It is not ok to reuse the second memory buffer provided in the fourth parameter until the read has been completed. The memory buffer is here used to return the data read from NDB. It is important to still set the primary key fields before calling readTuple. The first memory buffer can be immediately reused after the call.

It is possible to set lock mode in this call, more on this below in the section on scans.

Operation options#

In addition the above calls can all set a result mask which is a bit mask indicating which parts of the record specification to use. It is an array of bytes where bits are set in column id order. If a bit is set in this array of bytes it indicates that the column is used for either update or read dependent on the operation type.

There is a possibility to set special options for the reads and writes, more on such options below. There are two parameters for this, one that is a pointer to the options object and a second that contains the length of this options object.

Scan operations#

There are two types of scan operations, full table scans and ordered index scans.

Ordered index scans#

Ordered index scans normally use a range scan that specifies a start position and an end position for the range scan. These ranges are defined on the columns defined in the ordered index. Multiple ranges are possible to specify in a scan operation.

To specify an ordered index scan we use a call to readIndex on the transaction object that returns an NdbIndexScanOperation object. The parameters passed in this call defines the ordered index record specification and another record specification for the data returned in the scan. Optionally one defines the lock mode, a bitmask of the columns in the record specification that are to be read, a bound object of the type NdbIndexScanOperation::IndexBound and finally an option object with its length.

The bound object can be defined after this call in a separate call on the scan object returned from scanIndex using the method setBound where the record specification of the ordered index and the bound object is the parameters.

Multiple index ranges can be defined, various sort orders can be defined. These are specified in the options parameter specified below.

Full table scan#

A full table scan will scan all rows in a table and there are three ways to do this, scan through hash index, scan through main memory rows and scan through disk data records.

After starting the transaction one can start a full table scan by calling scanTable on the transaction object. This call will return an object of the type NdbScanOperation.

This requires at least one parameter, this is the record specification of the row to be retrieved from the table, not all columns need to be part of this specification only the columns of interest.

The second parameter is the lock mode of the scan. Default lock mode is LM_Read that means that a row lock will be held on the row from it is scanned until we move onto the next row. LM_Exclusive means that we have a row lock, but here an exclusive lock instead.

LM_CommittedRead means that no lock is held at all while scanning, the last committed value of the row is read instead.

LM_CommittedRead is the default setting for SELECT queries in MySQL for the NDB storage engine. The NDB storage engine only supports the setting READ_COMMITTED.

To use LM_Read from SQL one uses SELECT ... LOCK IN SHARED MODE and to use LM_Exclusive from SQL one uses the syntax SELECT FOR UPDATE .... The final setting that one can use is LM_SimpleRead, thus the row is locked while reading it, but before sending it to the API the lock is released. This behaviour is default for reading BLOB tables from the MySQL Server. BLOB tables are a bit special in how they handle these lock modes, see the chapter on concurrency control in RonDB.

Lock mode can be set on the readTuple calls above when defining a read operation using primary key or unique key. It cannot be set on write operations since these operations are always using an exclusive lock.

The third parameter is the result mask. This parameter can be used to define the set of columns (in column id order) to be used in the scan. This is useful if we are using a standard record specification that contains all columns and we want to specifically set a different set of columns for this scan operation.

The fourth parameter is the scan options, there are many different ways to perform the scan, we will go through those options below.

The actual scan phase#

The scanning is started by calling execute on the transaction object with NdbTransaction::NoCommit as parameter.

Next the actual scanning happens in a loop where we constantly call the method nextResult on the NdbScanOperation object. This method requires a pointer to a memory buffer from where the data can be read. Thus the NDB API uses its own memory and returns a pointer to this memory.

The second parameter is important, if true it states that a fetch is allowed. Thus if it is false we will only return rows that have already been fetched from the NDB data node. By looping with this set to false we know that we won't release locks on rows that we have scanned with nextResult, we know that the memory buffers that was returned from previous nextResult calls are still valid. This gives us an opportunity to perform write operations on these rows before releasing the lock on the rows. It makes it possible to get this done in a batching mode where up to almost one thousand rows can be scanned locally before the next interaction with the NDB data nodes need to happen.

The third parameter is whether to use force send flag when starting a new fetch. If we are ok to pass on to the next row immediately we will set fetch allowed to be true and this is set after completing the actions required on rows that we have scanned without starting a new fetch.

For read queries we can simply call nextResult in a simple loop until we get the result that no more rows exists.

There is a variant of nextResult called nextResultCopyOut that instead of getting a pointer to a memory buffer sends in a buffer where it asks the NDB API to store the row fetched. Other than that it works exactly the same way as nextResult.

Scan with takeover#

Scan with takeover means that we use the row fetched in a scan to either update or delete the row. This should only be used if the scan uses the lock mode LM_Exclusive. If used with only LM_Read it can cause a deadlock to occur since the lock must be upgraded.

When using scan with takeover it is necessary to use a double loop, an outer loop that uses nextResult with fetch allowed set to true and an inner loop that uses nextResult with fetch allowed set to false. When the inner loop exits or as part of the inner loop we need to handle all the updates and deletes performed as part of the scan.

To delete the row that was reported in the scan operation it is done through the call deleteCurrentTuple, updates are using the call updateCurrentTuple. The take over can be made by any transaction, it need not be the same transaction as the scan is executed in.

Options#

There is a whole range of options that can be set for both scan and key operations that affects the operation of the key lookups and scans. We will cover these here, we will explain them in the context of key lookups, but they can also be used for scan operations. The difference is only that the options class is called NdbOperation::OperationOption for key lookups and it is called NdbScanOperation::OperationOption for scan operations.

The option identifiers are called OO_* for key lookups and they are called SO_* for scan operations.

Scan flags#

Scan flags are an integral part of a scan. It is set by the option SO_SCANFLAGS and by setting the option variable scan_flags to a proper value. This option is only available for scan operations.

Several flags can be set by OR:ing them together. The following flags are supported:

SF_TupScan#

This option is only applicable to a full table scan. It scans the tuples in row id order instead of the default order imposed by the hash index.

SF_DiskScan#

This option is only applicable to a full table scan on a table with disk columns. In this case the scan can be done in the row order on disk pages. This can be beneficial since it can decrease the amount of fetching disk pages for a table scan.

SF_OrderBy#

Given that an ordered index scan is parallelised on a number of partitions the rows can be returned in non-sorted order. Each partition will return rows in the order sorted by the index.

The NDB API has the capability to sort the rows before returning them to the application. This requires running with full parallelism, thus this flag cannot be combined with SO_Parallel. It requires all partitions to return data such that the sort can be performed before returning data to the application. This will have an impact on scan throughput.

SF_OrderByFull#

Same flag as SF_OrderBy except that the flag implies that all index columns are added to the read set of the scan.

SF_Descending#

This is only applicable to an ordered index scan. By default ordered index scans are done in ascending order, with this flag the order is instead descending.

SF_MultiRange#

This applies to ordered index scans where multiple index ranges are scanned. In this case this flag must be set.

SF_ReadRangeNo#

If multiple ranges are scanned this flag enables reading the range number for a row that has been delivered through the API.

SF_KeyInfo#

If a scan is performed using the lock mode LM_Read one can specify this flag. It will send back key information to the API that enables a take over operation to be defined using the call lockCurrentTuple. This is necessary if we want to retain a read lock on a row already scanned.

This flag is enabled by default when using the lock mode LM_Exclusive.

Scan batching#

Normally the size of batches sent back to the API is limited by configuration parameters on the API node. It is possible to override these defaults by setting the option SO_BATCH and set the option variable batch to the number of rows per batch that is desired.

This option is only available for scan operations.

Scan parallellism#

By default a scan will use the maximum parallelism available. This means to scan all partitions in parallel up to a limit of how many partitions that can be scanned in parallel.

By setting the option SO_PARALLEL we can set an explicit limit on the parallelism for a scan. We set the limit in the variable parallel on the options object.

This option is only available for scan operations.

Scan partitioning info#

There is an option SO_PART_INFO available that is never used. Its use is related to multiple ranges defined in a query where all ranges use the same partition. This is a feature used by the user defined partitioning scheme not supported but still implemented. The MySQL Server implements this without using any option.

Only available in scan operations.

REDO log queueing#

All write operations are written into a REDO log. As part of the prepare phase we write them into a REDO log buffer. This buffer could be full. If this happens we have different choices. One choice is to abort the transaction. The other option is to queue the operation before executing it and wait for REDO log buffer to be available.

The default behaviour for this is set as part of the cluster configuration of the API node. Through setting an option OO_QUEUABLE and OO_NOTQUEUABLE (note spelling error here). we can change to either queue or not queue for a specific transaction. The same option should be used for all operations in a transaction.

This option is only available for writing key lookups.

Early unlock#

The normal behaviour for a transaction is to release locks at commit. BLOBs in NDB are implemented as separate tables.

For a normal table using READ COMMITTED mode requires no locks. For a BLOB table we need to first get a lock on the base table followed by a read on the BLOB table where we read with a simple lock.

For BLOB tables this lock on the base table is unlocked immediately after completing the read of the BLOB table.

The mechanism to unlock rows before commit is available to an application. Such behaviour has an impact on the transactional correctness of the application, so if used, it should be used with care.

When the operation is defined one uses the flag OO_LOCKHANDLE. Later when we want to unlock the row we call getLockHandle on the operation object followed by a call to unlock on the transaction object using this handle. The unlock row operation will be sent the next time we call execute.

  NdbTransaction *ndb_trans = .....
  NdbOperation *ndb_op = ....
  NdbOperation::OperationOptions options;
  options.optionsPresent =
    NdbOperation::OperationOptions::OO_LOCKHANDLE
  ...
  Execute ndb_op operation towards NDB data node
  ...
  NdbLockHandle *lock_handle = ndb_op->getLockHandle();
  ndb_trans->unlock(lock_handle);

This operation is only available to key lookups.

Disable Foreign Key constraints#

The MySQL Server provides the capability to temporarily disable foreign key constraints. This is implemented as an option in the NDB API. By setting the option OO_DISABLE_FK this is achieved, if it is set on an operation it is set on the transaction in the NDB data node, it is always a transaction property.

Use of this feature means that it is possible to create an inconsistent database, it is important that you understand what you are doing when using this feature.

This feature is only available to key lookups.

Deferred constraints#

In the SQL standard constraints are checked at commit time, deferred constraints are the default behaviour. In MySQL the constraints are checked immediately by default and InnoDB only supports immediate check of foreign key constraints.

NDB supports deferring constraint checks (unique key constraints and foreign key constraints) to commit time. This is controlled in the MySQL Server by setting the MySQL configuration parameter --ndb-deferred-constraints to 1. It is always used in the slave applier for RonDB Replication since this packs a number of transactions together, only the result at commit time is certain to pass the constraint checks.

It is possible to use deferred constraints in the NDB API as well by setting the option flag OO_DEFERRED_CONSTAINTS (note spelling error here).

This feature is only available to key lookups.

Save data in operation object#

It is possible to save a reference to any data structure in the operation object. This is done by setting the option OO_CUSTOMDATA and setting the variable optionData on the options object.

This is used by NDB for handling conflict detection in the slave applier. But it could be used for anything really.

Later on you can retrieve this pointer through the call getCustomData() on the same operation object.

This option is available to both scan operations and key lookups.

Pass a value to the event API#

It is possible to pass a 32-bit value from the NDB API to the NDB Event API. This is passed in any update, insert or delete operation. When this operation triggers an event that is sent to the node receiving the events, this value is passed and can be read through the call getAnyValue on the event operation object.

To set this value use the flag OO_ANYVALUE and the variable anyValue on the options object.

This is used as part of the RonDB Replication implementation to pass information from the NDB API that issued the change to the slave applier. For more information on the content in the any value data see the code files sql/ndb_anyvalue.h and sql/ndb_anyvalue.cc.

If the cluster isn't using replication to another cluster and the application needs to transport some value to the event API it is possible to use this value.

This feature is only available to key lookups.

Operation bound to specific partition#

It is possible to bind an operation to only be executed by one partition. For example if one needs an estimate on the number of rows one can scan one partition and assume that the the other partitions have roughly the same data distribution.

In this case one sets the flag OO_PARTITION_ID and sets the partition id that is scanned or used in the variable partitionId on the options object.

This option is available to both scan operations and key lookups.

Get special values#

Normally an operation retrieves column values from the row. It is also possible to retrieve metadata values about the row and its partition. These columns are called pseudo columns internally in NDB. These pseudo columns all relate to the current row read or written in a key lookup or in a scan operation. By performing a scan operation on all partitions of a table it is possible to get table values for many of those items.

A few of those pseudo columns are only intended for internal use cases.

This option is available to both scan operations and key lookups.

  Uint32 row_gci;
  Uint32 row_size;
  ....
  NdbOperation::OperationOptions options;
  NdbOperation::GetValueSpec extra_gets[2];
  extraGets[0].column = NdbDictionary::Column::ROW_GCI;
  extraGets[0].appStorage = &row_gci;
  extraGets[1].column = NdbDictionary::Column::ROW_SIZE;
  extraGets[1].appStorage = &row_size;
  options.optionsPresent =
    NdbOperation::OperationOptions::OO_GETVALUE;
  options.extraGetValues = &extra_gets[0];
  options.numExtraGetValues =
    sizeof(extra_gets)/sizeof(extra_gets[0]);
  ...
  Pass reference to option in call to readTuple/insertTuple/...

In the example above we perform a normal key lookup operation where we are reading a number of normal row columns (not shown in example above) and in addition we want to know the last GCI the row was updated and we want to know the size of the row.

These extra columns are added to the columns read by this operation as shown above and the same pattern applies to a whole range of metadata columns.

These columns are also accessible from SQL, their name in this case is as below with NDB$ prepended, e.g. NDB$ROW_GCI.

FRAGMENT#

This will return the id of the partition of the row returned. It is a 32-bit column.

FRAGMENT_FIXED_MEMORY#

This provides the total amount of memory used for the fixed part of the rows in this partition (we use the term fragment internally in NDB as synonym for partition in most cases). In reality we are reading the information on the specific fragment replica where the operation was executed. It is a 64-bit column.

FRAGMENT_VARSIZED_MEMORY#

This provides the amount of variable sized memory for the partition. A row has a fixed memory part, an optional variable sized part and an optional disk part. The partition also have a primary key hash index and possibly also a set of ordered indexes. It is a 64-bit column.

ROW_COUNT#

This returns the current number of rows in the partition. A row is seen in this view when the row insert have committed. It is a 64-bit column.

COMMIT_COUNT#

The number of commits in this partition since the node was started (or partition was created) is reported in this variable. It is a 64-bit column.

ROW_SIZE#

Returns the size of the row read. It is a 32-bit column.

RANGE_NO#

This is only applicable to ordered index scans. Ordered index scans can use multiple ranges, in this case this column returns the number of the range that the row is returned from. It is a 32-bit column.

DISK_REF#

This is the reference to the disk part. It contains a file id, a page id and a page index. It is a 64-bit column.

RECORDS_IN_RANGE#

When executing an ordered index scan this column will calculate an estimate of how many rows that exist in the range specified in the scan. The intended use for this is to calculate index statistics used to optimise queries in the MySQL Server. The column type is an array of 4 32-bit values.

The first value is the number of entries in the index (all rows in the partition except rows that have a NULL value in an index column). The second is the number of rows in the range, the third value is the number of rows before the range and the last value is the number of rows after the range. All these values are estimates.

ROWID#

The column type is an array of 2 32-bit values. The first 32-bit value is the fragment page id and the second is the page index. This number must be the same in the primary replica and the backup replicas.

ROW_GCI#

This value is the GCI identifier of the transaction that last updated this row. It is a 64-bit column, although the value will always fit in a 32-bit variable.

ROW_GCI64#

It is a 64-bit column. The higher 32 bits is the same as returned in ROW_GCI. The lower is the extra GCI bits used in the conflict handling of the RonDB Replication. More on this later.

ROW_AUTHOR#

When a table is involved in conflict detection handling we add this extra flag that indicates if the row was written last by the slave applier or not. It is a 32-bit column.

ANY_VALUE#

This reads the any value set by the OO_ANYVALUE above. It is a 32-bit column.

COPY_ROWID#

The column type is an array of 2 32-bit values. It is the same as ROWID except that this is the row id for the copy row, this row id might differ in the replicas.

LOCK_REF#

This provides a reference to the lock object. The highest 16 bits of the first 32-bit value is the node id where the lock resides. The lowest 16 bits of the first is the fragment id, the second word is the operation record reference in the LQH block and the third word is a part of the unique id in the LQH block of the operation.

This is used under the hood for unlocking a row before commit. It is intended only for internal use of the NDB API.

OP_ID#

This returns the unique id inside the LQH block of the operation currently executed. It is currently not used for anything. It is a 64-bit column.

FRAGMENT_EXTENT_SPACE#

The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the number of extents the partition have that are free.

FRAGMENT_FREE_EXTENT_SPACE#

The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the amount of free rows available in the free extents already allocated to this partition.

Set special values#

This is very similar to getting special values. The difference is that the option flag is OO_SETVALUE, the object used to set the values are NdbOperation::SetValueSpec instead of NdbOperation::GetValueSpec. The number of set values is set in numExtraSetValues and the set value object is stored in extraSetValues as shown below.

  NdbOperation::OperationOptions options;
  NdbOperation::SetValueSpec extra_sets[2];
  ....
  options.optionsPresent =
    NdbOperation::OperationOptions::OO_SETVALUE;
  options.extraSetValues = &extra_sets[0];
  options.numExtraSetValues =
    sizeof(extra_sets)/sizeof(extra_sets[0]);

This feature is only available to key lookups. As an example it can be used to set the ROW_AUTHOR field above, see the NDB storage engine code for an example.

Hidden key column#

When a table is created without a primary key, the NDB storage engine will add a column that is an 8-byte column where we generate a unique id of the row when inserting it. This column is always placed last in the set of columns.

Partition id for user defined partitions#

For tables with user defined partitioning the MySQL Server calculates the partition id and sets it in the first column id after the last column (or after the hidden key column).

ROW_AUTHOR#

ROW_AUTHOR can be set by slave applier thread for tables involved in conflict detection.

OPTIMIZE#

This column is used as part of implementation of OPTIMIZE TABLE. By setting this column to a proper value the NDB storage engine instructs the NDB data node to move the variable sized part to a prepared place to remove fragmentation of the variable sized memory. It is not intended for normal use cases. It is a 32-bit column.

Interpreter commands as part of NDB API operations#

The final option that one can use is so advanced that it merits its own section. This is the execution of interpreted programs. All read and write operations can run an interpreted program as part of their operation.

The most common use of this feature is to push down scan filters to NDB data nodes. Thus the MySQL Server has a condition it wants to push down to the data node. This is handled such that the conditions is translated into an interpreted program.

This interpreted program is sent as part of the read/write operation down to the data node owning the data. When reading or writing the row this interpreted program is executed and for scan filters it decides whether to read the row or to skip the row and move on.

For write operations it could be used e.g. to increment an integer field rather than performing a read followed by a write. There is no way currently to get this optimisation from the MySQL Server, but it is possible to write an NDB API program that builds an interpreted program that does this.

The interpreter is a very simple register machine with 8 registers. It is possible to read a column value into a register, it is possible to load constants into registers, it is possible to add, subtract registers. It is possible to define labels and it is possible to branch dependent on a register value to a label. It is also possible to define a subroutine and call it.

To enable pushdown of LIKE constructs, this is also available as a interpreter instruction.

To use this one creates an NdbInterpretedCode object. For use of things like increment column one should use the functions of the NdbInterpretedCode object directly. For definition of condition filters one should use the NdbScanFilter class.

  NdbInterpretedCode code(...);
  ... Define interpreted program
  options.optionsPresent =
    NdbOperation::OperationOptions::OO_INTERPRETED;
  options.interpretedCode = &code;

NdbScanFilter#

The ha_ndbcluster_cond.cc contains lots of uses the NdbScanFilter functions. Also read NdbScanFilter.hpp to see more how one can use this object.

The general idea of using the NdbScanFilter is that you define groups of conditions. The start of this filter group is started by calling the function begin. The parameter to this specifies whether the conditions inside the group should use AND, OR, NOR, or NAND. One condition inside this group could then be a new group of conditions. One could view begin and end calls as left parenthesis (begin) and right parenthesis (end) in a textual condition.

The actual conditions are defined by a call to the function cmp. This starts by defining the comparator type (e.g. larger than, equal and so forth). Next the column to compare with. Next the value to compare with. This value is represented by a void* pointer with a length. No length bytes are expected in the value representation.

The comparator could also be LIKE and NOT LIKE. In this case the value could contain % and ? wildcards.

There is also methods to check for NULL and for NOT NULL.

Scan example#

Here is an example where we delete all rows using an ordered index on the primary key and in addition a scan filter. All the rows that pass are part of the index scan and matches the scan filter are returned and deleted.

At first we start a transaction, we create a scan object that contains the record specification of the index and one record specification of the columns to read. We use exclusive lock since we are going to delete the rows as part of the scan.

We define the lower bound to be 1 and we include 1 in the range. We define the upper bound to be 100 and we don't include the 100 in the range.

Next we also define a scan filter using an equal condition on one of the columns in the table. This scan filter will generate an interpreted program that will skip the row unless the scan filter is passed.

Using the above mentioned option SO_SCANFLAGS it is possible to specify that rows are returned in various sorted orders, it is possible to define multiple ranges and to get the range number of returned rows.

We have a double loop where the outer loop is making calls to the NDB data node and the inner loops handles the rows returned from this call to the NDB data node.

We execute the deletes once per outer loop, but we wait with committing the transaction until the scan is completed.

  NdbRecord *primary_key_record = ...
  NdbRecord *columns_record = ...
  ...
  trans_obj = ndb_obj->startTransaction();
  scan_obj = trans_obj->scanIndex(primary_key_record,
                                  columns_record,
                                  NdbOperation::LM_Exclusive);
  unsigned lower_bound = 1;
  unsigned upper_bound = 100;

  NdbIndexScanOperation::IndexBound key_range;
  key_range.low_key = (char*)&lower_bound;
  key_range.low_key_count = 1;
  key_range.low_inclusive = true;

  key_range.high_key=(char*)&high;
  key_range.high_key_count=100;
  key_range.high_inclusive=false;

  key_range.range_no=0;
  scan_obj->setBound(primary_key_record, key_range);

  NdbScanFilter scan_filter(scan_obj);
  scan_filter.begin(NdbScanFilter::AND);
  scan_filter.cmp(NdbScanFilter::COND_EQ,
                  column_id,
                 (void*)column_value,
                 column_length);
  scan_filter.end();

  trans_obj->execute(NdbTransaction::NoCommit);
  while (scan_obj->nextResult(true)) == 0)
  {
    do
    {
      scan_obj->deleteCurrentTuple();
    } while (scan_obj->nextResult(false)) == 0);
    trans_obj->execute(NdbTransaction::NoCommit);
  }
  trans_obj->execute(NdbTransaction::Commit);
  trans_obj = ndb_obj->closeTransaction();

NDB Event API#

The NDB API contains an Event API that makes it possible to subscribe to all changes of a specific table. Each time a transaction is committed an event can be generated for each row that have been changed. These events are collected together for a specific global checkpoint and sent to the API node(s) that are listening to the events. There can be many listeners to the same event. This is used by NDB to inject events into a MySQL binlog to get the changes into the MySQL Replication infrastructure. It can be used by applications to wait for certain events and create actions based on those events. There are countless applications of how to use events.

The events are asynchronous events, they will arrive as soon as possible, but there is no specific guarantee on when they will arrive. Normally a cluster uses micro GCPs that are created with 100 millisecond delay. Therefore it is expected that events are delayed for at least around hundreds of milliseconds and in overloaded systems it can take longer.

The data node will buffer events in a buffer, if the API node is not fast enough to consume the events, the event will eventually fail. In this case the API node will be informed that the event reporting failed.

Create Event#

The first step is to create an event of type NdbDictionary::Event. To create such an event it needs a name and it needs a table object of type NdbDictionary::Table, the table object is retrieved by name from the NdbDictionary::Dictionary object.

Next one adds one or more table events, one can set the event to wait for inserts, deletes, updates or all of those. This uses the method addTableEvent on the event object.

The event specifies which columns that will be sent to the event listener as part of each event. This uses the method addEventColumns on the event object.

For insert events the after value is sent, for delete events the before value is sent and for update events both the before value and the after value of those columns are sent.

If the table contains BLOBs that one want to hear when they are updated it is necessary to call the method mergeEvents on the event object before creating it in the NDB data nodes. This method can be called with the flag set to not merge events from the BLOB tables.

Setting merge events to true have the impact that several writes on the same row within one global checkpoint will be merged into one event.

BLOB events are reported as mentioned above when merge events have been set, but it is necessary to read the BLOB fields using the NdbBlob object to see their value.

The next step is to create the event in the NDB data nodes. This happens through a call to the method createEvent on the NdbDictionary::Dictionary object using the event object as parameter.

Now the event is created in the NDB data node and can be used to listen to events.

Drop Event#

Events are dropped from the NDB data nodes by calling the method dropEvent on the NdbDictionary::Dictionary object using the event name as the parameter.

Create Event Operation#

Now that the event exists in the NDB data nodes we can create an event operation to listen to the events. This is done through a call to the method createEventOperation on the Ndb object using the event name as parameter.

It is necessary to call the method mergeEvents on the event operation object to specify whether it is desirable to get events merged or not. This merge setting activates the merging, but the merge flag need to be set also on the event object when the event is created in the data nodes. If not set also there there will be events missing from the data node making it impossible to merge events properly.

As part of setting up the event operation object we need to define objects of type NdbRecAttr to provide the before value (using getPreValue) and after value (using getValue) for each column we are interested in.

Finally we need to call execute on the event operation object.

Wait for events#

To listen to events one now calls the method pollEvents on the Ndb object using the number of milliseconds to wait for wakeup if no events occur. This method returns a positive number if events occur.

If events occur one calls the method nextEvent on the Ndb object to retrieve one event. This method returns the event operation previously defined. This object is now filled with information from the event and the user can use methods in this object to retrieve the information about the event type (insert, delete or update) and what columns that changed with their before value and after value.

The NdbRecAttr objects can be used to discover if attributes have changed, if they are set to NULL and the column value.

When the program is no longer interested in listening to events it drops the event operation by calling dropEventOperation on the Ndb object using the event operation object as parameter.

Node failure handling of events#

Events are generated in all data nodes. It is only the node that have been selected as primary event sender that will send his event to the API node. If a data node fails that was primary event sender the other nodes will ensure that the API node gets the information from one of the remaining live nodes instead. Thus all nodes keep the buffers for the events of a global checkpoint until the API node have confirmed that it has received those events. All this happens automatically without any user interaction.

NDB API Event example#

Here is an example of how to write an event handler loop for a specific table and a number of specific columns.

  const char *col_names[2] = { "col1", "col2" };
  NdbDictionary::Dictionary *dict_obj ndb_obj->getDictionary();
  NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
  NdbDictionary::Event event_obj("event_name", *tab_obj);
  event_obj.addTableEvent(NdbDictionary::Event::TE_ALL);
  event_obj.addEventColumns(2, col_names);
  event_obj.mergeEvents(false);
  dict_obj->createEvent(event_obj);
  NdbEventOperation *op = ndb_obj->createEventOperation("event_name");
  op->mergeEvents(false);
  NdbRecAttr *col_after[0] = op->getValue(col_names[0]);
  NdbRecAttr *col_before[0] = op->getPreValue(col_names[0]);
  NdbRecAttr *col_after[1] = op->getValue(col_names[1]);
  NdbRecAttr *col_before[1] = op->getPreValue(col_names[1]);
  op->execute();
  while (true) {
    int r = ndb_obj->pollEvents(1000);
    while (r > 0 && (op = ndb_obj->nextEvents())) {
      ... Handle event
    }
  }
  ndb_obj->dropEventOperation(op);
  dict_obj->dropEvent("event_name");

Pushdown Join#

A part of the NDB API is the handling of the complex query operations. It is possible to push parts of a join query down to the NDB data nodes. This makes use of a form of linked table scans.

The first table is either using a primary key operation, unique key operation or a scan operation. Each row that is found in this first table is sent to the second table in the join order. Normally the first table is a scan operation in a pushdown join operation.

For each row found in the first table the results is sent to the NDB API, at the same time the columns needed for evaluation further down the join evaluation is sent along in a new join that involves all tables except the first table.

In this manner the query is recursively executed. If the join evaluation contains filtering parts this can lead to a very high parallelisation. If very little filtering is performed the NDB API will still be a major bottleneck since there is a limit to how fast the NDB API can process queries.

This pushdown join interface to the NDB API is used by the NDB storage engine, it is not intended for use by normal NDB API applications. I won't go through any more details how this part of the NDB API works.

Asynchronous API#

The asynchronous NDB API is currently only applicable to the key operations. It is not possible to use it for scan operations.

It only differs in how transactions are executed. Instead of a single execute there are three calls. These calls must be preceded by a call executeAsynchPrepare that prepares the defined operations for execute.

A normal execute(...) call will normally perform three actions:

  1. Prepare defined operations for execute

  2. Send prepared operations

  3. Poll for completion of sent operations

With the normal synchronous API it is possible to execute multiple transactions in parallel, but the normal use case is to use one transaction at a time. The above poll will always wait for all sent operations to complete.

Using the asynchronous API it is expected to have many concurrent transactions outstanding at any point in time and the poll doesn't have to wait for all actions to complete, it is sufficient to wait for a few actions (by default one) to complete.

sendPollNdb#

sendPollNdb is more or less the same as calling execute with one major difference. The sendPollNdb can be asked to wake up before all transactions have completed. Thus the asynchronous interface can be used to work with hundreds of interactions in parallel per thread. Thus fewer threads are needed to operate the interactions, the fewer threads can thus use more batching and thus higher throughput can be achieved.

sendPreparedTransactions#

If the thread will take care of any other business while NDB is working it can use the sendPreparedTransactions call to send the prepared transactions but not waiting for their completion.

pollNdb#

This can be used in conjunction with sendPreparedTransactions. When returning from other activities it is possible to poll for those transactions already sent to be completed.

flexAsynch Architecture#

The flexAsynch program is a benchmark program we've used to showcase hundreds of millions of reads and tens of millions of write operations per second. It uses a special model for how to achieve extremely high throughput. It can be used in other programs only requiring access to key operations.