C++ NDB API#
Now we have completed the view of RonDB from the SQL point of view. The SQL view always depends on lower-level constructs in its implementation. All accesses to data in tables, metadata about tables and retrieval of configuration data goes through the NDB API.
In MySQL, there is a storage engine concept to allow many different implementations of the database engine. We have mapped the storage engine interface to our NDB API interface in the NDB storage engine.
The NDB API is implemented in C++. It is possible to write applications directly towards this C++ API. We have implemented special interfaces on top of the NDB API for Java (ClusterJ) and for JavaScript (Database Jones, a Node.js API towards NDB) that we will describe in subsequent chapters.
This chapter aims to describe the NDB API from a high level to make it easier to understand how to write NDB API programs. There is a large set of example programs available in the directory storage/ndb/ndbapi-examples in the RonDB source tree. In this chapter we focus all attention on using the NdbRecord variant of the NDB API, some of the examples are old and use the column format where one column at a time is provided. The new record format is faster, but the old column format is still supported and can thus still be used.
There is documentation of the NDB API available that is for the most part retrieved from the header files using Doxygen. Some parts are skipped in the documentation since they are treated as internal parts of the NDB API. These are safe to use, but have a higher risk of being changed in future versions since they are deemed as internal parts of the API.
Most of the early research work on NDB was centered around the NDB data nodes and how to implement those. In early 1998 the implementation of the data node parts was nearing completion and we started considering how the application code would interface the data nodes.
The NDB data node was written to handle requests asynchronously. We did not expect that application developers wanted to work with an asynchronous API towards NDB. Thus we decided to make a synchronous API. For the most part, this decision has been correct. Implementing the NDB storage engine requires a synchronous API and using a synchronous API is a lot simpler compared to an asynchronous API.
At the same time the asynchronous API can be used to implement fairly elegant implementations and the code will be a lot easier to optimise. We ended up handling key operations using an asynchronous API that is used by a few users and by a number of our internal tools. It has also been used in a number of benchmarks we have developed to showcase the throughput possibilities of NDB.
At this point, scan operations can only be accessed from the synchronous NDB API part.
Meta data operations#
The NDB API contains code to handle all interactions with the NDB data nodes including normal user transactions as well as event handling and metadata operations.
It is not recommended to use the metadata part of the NDB API directly in a user application. It is recommended to create all tables using the MySQL Server. Creating a table through the NDB API means that it isn’t possible to use it from the MySQL Server. It is a lot simpler to use the SQL syntax to create tables, drop tables, create indexes, foreign keys and so forth.
The MySQL server will also take care of synchronizing table changes with backup clusters.
Thus a normal NDB API application will contain code to both connect to a MySQL Server to create the necessary tables and other metadata for the application and the NDB API part to work with those tables.
We will not go through the metadata part of the NDB API in this book and will instead go through the specific syntax required for handling table creation and so forth of NDB tables in the MySQL Server.
Thus it is recommended to always have at least one MySQL Server in an NDB Cluster even if the MySQL Server isn’t used at all for normal transactions or queries.
Initialising the NDB API and MySQL client API#
Before any interactions with the NDB API it is necessary to initialize
the NDB API first. This happens through a call to the method
ndb_init()
.
Similarly, it is necessary to initialize the MySQL client API before
using it by calling mysql_init()
.
Concurrency in the NDB API#
In 1998 most computers were using a single CPU or possibly two CPUs. Concurrency was limited, however, it was necessary to handle many concurrent threads accessing the NDB API.
We decided to create a C++ class called Ndb that is used by one thread. There can be many Ndb objects that can be operated on from many different threads at the same time. But one Ndb object can only be operated on by one thread at a time. Thus this object must be handled with care. It is possible to use it in different threads, but only in one thread at a time. If using it in many threads, it is necessary to protect the move of the Ndb object to another thread.
The Ndb object uses a set of classes that we call the Transporter classes. These classes are responsible for taking the information from one Ndb object and converting it to the NDB protocol and later send this information to the NDB data nodes. It is responsible to receive data back from the NDB data nodes and read the NDB protocol and place the results back into the objects handled by the Ndb object.
In the first implementation of the NDB API and up to MySQL Cluster version 7.2 the Transporter layer used a very simple concurrency control by implementing a Transporter mutex that ensured that only one thread at a time was able to interact with the transporter parts.
After a few years this became a bottleneck and to avoid this bottleneck
we made it possible to have several API nodes within one API program. In
the MySQL Server, this is a configuration variable called
--ndb-cluster-connections
. In 7.2 and earlier versions each API node
scaled to about 1.5 CPUs when running a simple benchmark using the NDB
API directly (using the MySQL Server means it scales to more CPUs since
the MySQL server code will use quite a few CPUs to handle query
processing). So e.g. to run a benchmark in an 8-CPU environment was made
possible by using 6 API nodes (or cluster connections as they are called
in the NDB API). Each cluster connection uses their own node ID and can
be seen as independent nodes in most management views from NDB.
Now with the multi-core revolution that started around 2006-2007 the number of CPUs per machine escalated very quickly. Thus it was necessary to implement a more scalable concurrency control of the Transporter part. This was implemented in MySQL Cluster 7.3 and makes each API node or cluster connection scale a lot more. So e.g. for the Sysbench benchmark each cluster connection scales to more than three times as many CPUs as before. For DBT2 it was even more where it scaled to more than seven times more CPUs.
Now it is possible to to use only four cluster connections even when running a MySQL Server using 32 CPUs when executing the Sysbench benchmark.
The major limiting factor now in the Transporter part is that there is only one thread at a time that can execute the receive part of the Transporter part.
Ndb cluster connection#
The Ndb_cluster_connection
object is the starting point for all NDB
API programs. When creating this object it is necessary to specify the
NDB connect string. The NDB connect string is used to fetch the cluster
configuration from the NDB management server. It is possible to specify
multiple NDB management servers in the connect string to increase the
chance for a successful configuration retrieval. It is possible to
specify a specific node ID to use by this cluster connection. If not
specified the management server will pick a node ID that is available
currently in the configuration.
To use specific node IDs is not absolutely required. In my view it is recommended to use it as much as possible. For management servers and data nodes I recommend to always use it since the node ID is connected to a state saved on disk. It will simplify management of the cluster if all processes start up using a specific node ID. This requires setting the node ID in the configuration for these node types.
For MySQL Servers and long-running API programs, it is a good idea to use specific node IDs.
The normal procedure for the Ndb_cluster_connection
object is to first
create the object specifying the NDB connect string and possibly node ID
as well. Next a connect call will fetch the NDB configuration from a
management server. Next a call to wait_until_ready
will wait until we
are connected to at least one NDB data node.
After these calls our API program is now a part of the cluster and will show up when we list the nodes in the current cluster.
Auto reconnect#
Once the NDB API has established communication to the data nodes it will automatically balance the load on the connected data nodes. If a data node is no longer connected we will automatically try to reconnect to this node without any need for the user of the API to interact. It is possible to disable this behavior if desirable.
Selection of data node to communicate with#
Each transaction is handled by one specific data node. There is a number of things impacting the decision which data node to choose. The first consideration is whether the user has provided a hint when starting the transaction, if so, we will use this hint to connect to the data node if possible.
The hint doesn’t say which data node to connect to, it rather specifies a primary key in a specific table, this primary key is mapped to a node using a hashing function and some data distribution hints provided to us from the data nodes when we started using the table. These are hints that are very stable, but they can change e.g. during an add node operation. There is no guarantee that they will be 100% accurate. If they are incorrect, it is not a problem since the data node will get the correct nodes to read and write data from, independent of which data node the transaction is started from. The data nodes always have a correct view of the data node distribution of each table.
If no hint is provided we use a round-robin scheme to ensure that we spread the load on all data nodes.
When using SQL we always use the first SQL statement of a new transaction as the way to provide the hint. If the first SQL statement specifies a primary key to use for a table we will choose this as the hint for the transaction we start.
Setting data node neighbor#
In case we haven’t specified a hint about where to place the transaction coordinator, we can set the node ID of the closest data node in the configuration. This will be used in those cases.
If a MySQL Server has the same IP address as a data node it will automatically become the neighbor of this data node and will prefer transactions to use this data node unless a hint has been provided.
However, in some situations, we know that two nodes are close to each other but they use different IP addresses. This could happen e.g. when we use a Docker network setup where each node has its own IP address but some IP addresses are actually located on the same physical server or even the same virtual machine.
In this case, one can provide the data node neighbor through the
set_data_node_neighbour
call on the Ndb_cluster_connection
object.
In the MySQL Server, one can set this node ID through the
ndb-data-node-neighbour
configuration variable.
The main use of the data node neighbor concept is for query processing where we are issuing complex SQL queries. In this case, most likely no primary key is provided, round robin would be used unless a data node neighbor is selected.
Using a data node neighbor is important if the network between the computers has limited bandwidth. Using a node local to the computer means that we don’t have to use limited bandwidth available in connecting between computers in the cluster.
Combining this feature with the possibility of reading from backup replicas and possibly even using fully replicated tables makes the API node very strongly connected to the neighbor data nodes in their interaction for all query processing and makes it possible to build query processing that scales in an excellent manner when adding more and more MySQL Servers and NDB data nodes.
Locking receive thread to specific CPU#
It is possible on Linux, Solaris and Windows to lock threads to specific
CPUs. In the NDB API, we have the possibility to use this feature in the
NDB API by calling the function set_recv_thread_cpu
on the
Ndb_cluster_connection
object. The API call is prepared for multiple
receive threads per NDB cluster connection. Currently, we only support
one, the array always should be 1 in length in this call.
It is possible to specify the CPUs to use for NDB cluster connections in
the MySQL Server configuration. For this one uses the
ndb-recv-thread-cpu-mask
configuration variable. In this variable one
can list e.g. 0,2-4 to specify that CPU 0 is used for the first cluster
connection, 2 for the second, 3 for the third and 4 for the fourth. The
number of CPUs here must be at least the number of cluster connections
specified in the configuration variable ndb-cluster-connection-pool
.
There is a call set_recv_thread_activation_threshold
that specifies
how many API threads that should be concurrently used for the receiver
thread to take over receive processing. Setting CPU locking means that
it can be a good idea to set this to 1 to ensure that the receive thread
is used for all receive handling when multiple threads are active. There
is a corresponding MySQL configuration parameter called
ndb-recv-thread-activation-threshold
for this as well.
By default, there is no CPU locking for the receive thread and the activation threshold is set to 8.
Locking the receive thread to a specific CPU improves performance if it is combined with locking the MySQL server process or the NDB API program to use other CPUs and not the CPUs used by receive threads. It is also necessary to control other programs in the computer that uses large CPU resources, one should also control interrupt handling to avoid those CPUs. If this cannot be done one should not use CPU locking of receive threads.
Ndb object#
The Ndb object is the starting point for all NDB interactions. As mentioned this object should not be used from more than one thread at a time. When waiting for a response back from the NDB data nodes it is important to not use this object while waiting.
Concurrency in the NDB API comes from three places. First, it is possible to use multiple cluster connections. Second, it is possible to use multiple threads per cluster connection using separate Ndb objects. Third, it is possible to handle multiple operations and even multiple transactions per interaction using the Ndb object.
The Ndb object is a placeholder for transaction objects, operation objects and many other objects used by the Ndb object. Objects are created through the Ndb object and destroyed by the Ndb object.
Before starting to use the Ndb object it has to be created and it is
necessary to call the method init()
.
When creating the Ndb object one specifies two things, first the
Ndb_cluster_connection
object used and second a string containing the
database name used by the Ndb object. It is possible to change the
database name used later on.
The init()
method has one parameter that is set to 4 by default, the
parameter specifies the maximum number of transactions that can be
active in parallel on the Ndb object. Using the asynchronous API or
using batching with one Ndb object it is possible to handle up to 1024
transactions in parallel from one Ndb object.
Here is a simple code example skipping all error checks for creating the cluster connection and the first Ndb object.
Ndb_cluster_connection *cc =
new Ndb_cluster_connection(connectstring, node_id);
cc->connect();
cc->wait_until_ready(timeout, wait_after_first_node_connected);
Ndb* ndb_obj = new Ndb(cc, "database_name" );
ndb_obj->init();
Setting up a row#
Before a user transaction can be called we need to setup a row data
structure. The NDB API also contains a variant where one takes care of
one column at a time. I will only describe the NDB API to handle things
using the NdbRecord
columns since this is nowadays the recommended use
of the NDB API.
To setup a row we need a reference to the table and the columns we want to operate on.
The first step is to get an object from Ndb of the type
NdbDictionary::Dictionary
that is retrieved using the call
getDictionary() on the Ndb object.
Next, this dictionary object is used to get a table object of type
NdbDictionary::Table
. This is retrieved by using the call
getTable(tableName)
where tableName is a string with the table name.
Next, we use the table object to retrieve one or more column objects.
This is retrieved by using a set of calls of the type
getColumn(colName)
where colName is a string containing the column
name.
Next, we create a record specification object that maps each column into a buffer from where data to set will be read or data read will be inserted when read from NDB.
This is an object of the type NdbDictionary::RecordSpecification
. We
create an array of such objects with as many members in the array as the
number of columns we want to access.
We create an Ndb record by calling createRecord
on the dictionary
object. This method has as input the table object, the record
specification array, the number of entries in the record specification
array and the size of the NdbDictionary::RecordSpecification
object
(needed for backward compatibility since there used to be an older
variant of the record specification object).
This call returns an object of the type NdbRecord
that can now be used
in user transactions. It is also possible to set up a row for access
through an ordered index or a unique index. In this case, we need both
the index object and the table object in the call to createRecord
.
Here is a very brief example of the above.
struct row_buffer
{
int col1;
int col2;
};
....
NdbDictionary::Dictionary* dict_obj= ndb_obj->getDictionary();
NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
NdbDictionary::Column *col = tab_obj->getColumn("column_name");
NdbDictionary::RecordSpecification spec[2];
spec[0].column = col;
spec[0].offset = offsetof(row_buffer, col1);
spec[0].nullbit_byte_offset= 0;
spec[0].nullbit_bit_in_byte= 0;
spec[1].column = col;
spec[1].offset = offsetof(row_buffer, col2);
spec[1].nullbit_byte_offset= 0;
spec[1].nullbit_bit_in_byte= 0;
NdbRecord *pk_record =
dict_obj->createRecord(tab_obj, spec, 2, sizeof(spec));
For each element in the column array, it is necessary to set the column
object retrieved previously in the variable column
and an offset to
the column in the buffer provided in a variable called offset
. There
are variables called nullbit_byte_offset
and nullbit_bit_in_byte
.
These two numbers point out which bit in the null bits of the record
that is used to represent the NULL state of the column. These are
ignored for columns that are declared as NOT NULL. Thus
nullbit_byte_offset
is the number of bytes from the start of the row.
Thus the row buffer must have space allocated for both the column values as well as for NULL bits. It is recommended to use at least 32 bits for the NULL bits to avoid misaligned column values. Similarly for all column values it is recommended that they are aligned on 32-bit boundaries for efficient access without misaligned traps. On a SPARC it is necessary to use this alignment.
Transactions#
Every interaction involving reading or writing user tables in NDB uses transactions. A transaction goes through a number of steps, first it is created, next it is executed and the final execution specifies either commit or abort, it can be executed multiple times before abort/commit and finally it is closed.
Start a transaction#
Starting a transaction means that we allocate one connection record in one of the DBTC threads in the cluster. Once such a record has been allocated by an API node the API node keeps the connection record. This means that only the first time the start transaction involves a distributed interaction with the NDB data nodes. Most startTransaction calls are local calls that grab a connection record from a linked list of free connection records and there is one such linked list per data node in the cluster.
The start transaction can be without any parameter startTransaction()
.
In this case, a round-robin mechanism is used to decide which data node
to start the transaction in, if we have a neighbor data node (either
with same IP address as API node or by setting data_node_neighbour
in
the cluster connection object), we use the neighbor node.
Start transaction with hint#
To start a transaction with a hint on where to start the transaction is
a very important feature of RonDB. This is important to write scalable
applications using RonDB. As an example, HopsFS mapped a hierarchical
file system on top of a 12-node RonDB cluster. In order to get this to
scale it was important to ensure that operations such as ls
(list
files in a directory) could execute using a scan on just one partition
of the table. HopsFS solved this by using the parent directory as the
partition key for the table.
Similarly, TPC-C has a warehouse ID that is part of all tables that can be used to make TPC-C scale perfectly if the warehouse ID is used as a partition key. Selecting the proper partition key is important to design scalable applications in NDB.
There are two reasons for this. The first is that if one makes a transaction that updates many rows that have some connection (this is what happens in TPC-C and in HopsFS and in almost all applications), it is very valuable to place the transaction coordinator on the same node as the primary replica location of the rows to be updated. This will significantly decrease the overhead of communication within the cluster during the transaction.
The second is that we can perform ordered index scans that have an equality bound on the partition key, or a full table scan with such an equality bound. These scans only need to scan one partition of the table since all rows sought for will be housed in the same node. By placing the transaction coordinator on this node, we can avoid communication to get to this partition since the partition is placed on the same node.
Decreasing the number of partitions that are required to scan gives us a lower startup cost for the scan. Each additional partition to scan brings an additional small cost to the query execution.
Decreasing communication needs is always important to create scalable distributed applications whether it is a NoSQL database, a shared disk relational DBMS, or a shared-nothing DBMS such as RonDB. Modern hardware makes communication cheaper and makes it easier to write distributed applications, but it will always be beneficial to write your application to minimize the communication needs.
To start a transaction with a hint we use a variant of the
startTransaction
call that uses a few parameters. It uses a table
object of the type NdbDictionary::Table
as input that is created by
calling the method getTable(tableName)
on a
NdbDictionary::Dictionary
object. The second parameter is
NULL-terminated array of objects of the type Key_part_ptr
. The
Key_part_ptr
object is a simple struct containing a void pointer and a
length of the pointer. This data represents the distribution key or
partition key of the table defined by the table object. These are
checked against each other that they are consistent.
There is also a method that uses a NdbRecord
, this is not part of the
public NDB API, but is used by the NDB storage engine. The NdbRecord
here is a record of the primary key of the table used. Here is a simple
example of starting a transaction with a hint given this interface.
NdbRecord *ndb_rec = ....
Ndb *ndb_obj = ....
NdbTransaction *trans_obj = ndb_obj->startTransaction(ndb_rec,
row_buffer,
NULL,
0);
The two last parameters specify a buffer to be used for hashing. By
specifying NULL as a pointer to the buffer, we ensure that the malloc
call is used, otherwise one has to ensure that the buffer is large
enough for the hash function call. In the public function described
above these two parameters defaults to NULL and 0. But they can be
specified explicitly also in this call.
If a buffer is provided it must be large enough to contain all columns in their maximum size and the buffer must be aligned to a 64-bit boundary to be used in these calls. So the buffer should be fairly large if provided. The hash function must work on an expanded row to ensure that different representations of the same word uses the same hash value (there are many character sets that have multiple ways to write the same word).
If no such pointer is passed the NDB API will have to use malloc
and
free
to allocate and release the memory needed for executing the hash
function.
It is also possible to start a transaction on a specific node ID and instance ID, but normally the application will have no knowledge of the table distribution such that this will be of any use whereas using the partition key will be very useful.
Define operations#
After starting a transaction we define a set of operations for the first interaction with the NDB data nodes. We will go through in detail how this is done below, these could be operations on a specific record using either a primary or unique key, it could be scan operations that scan a part of a table using either a full table scan or an ordered index scan. Scans can be used to write rows where we find the rows to write through a scan.
Executing a transaction#
After defining one or more operations it is now time to execute the operations. When executing key operations one can decide whether it is time to commit the transaction while executing it. It is possible to commit with a set of operations, it is also possible to commit with an empty set of operations. Committing an empty set of operations will commit all operations executed since the start transaction call was made.
When a transaction has completed, executing a commit of a transaction it is no longer possible to continue using the operation in any other manner than to close the transaction.
The call to execute is made on the NdbTransaction
object received in
the startTransaction
call. The method is called execute
and the only
required parameter is whether to commit, not commit or abort. The value
sent in to commit is NdbTransaction::Commit
, to execute but not commit
yet one uses NdbTransaction::NoCommit
and to abort a transaction one
uses NdbTransaction::Rollback
. We can provide a second parameter that
specifies how to handle errors during transaction execution.
Setting this second parameter to AbortOnError
means that no errors at
all are allowed. AO_IgnoreError
means that we will commit as much as
possible. Thus for most errors, we will continue the transaction even if
one part of the transaction fails. The default setting for the abort
option is that it can be set on a per-operation basis. However
explicitly setting it in the execute
call overrides the operation
setting.
It is possible to set a third parameter with a force send flag. By default, this is turned off. In the MySQL Server, this is controlled by a configuration parameter.
Scan operations use execute
as part of starting up a scan. The
continuation is however different after the startup of the scan. A scan
can handle any number of rows so there is an API to control the fetching
of rows to the API that we will describe below.
Close a transaction#
After committing a transaction one needs to close the transaction to
make all allocated resources available again. If the transaction had not
been committed yet and the NDB data node is waiting for more input the
NDB API will abort the transaction as part of the closeTransaction()
call.
A call to start a transaction with a hint immediately followed by a call
to get the node ID of the transaction coordinator (using
getConnectedNodeId()
) followed by a call to close the transaction is a
quick way of discovering which node the primary replica record belongs
to. This method is used by the flexAsynch benchmark program and the
ndb_import program (introduced in 7.6) to distribute queries to
different threads where each thread is handling one data node.
Key operations#
To define a key operation is straightforward after setting up the row.
One calls a method on the NdbTransaction
object and it returns an
NdbOperation
object.
Insert tuple#
To insert a row we call the method insertTuple
that requires four
parameters. The first is the NdbRecord
object for the primary key and
the second is a pointer to the memory buffer for the primary key
columns. The third parameter is the NdbRecord
for the rest of the
columns and the fourth is the memory buffer for these columns. The
memory buffers must contain the data to be inserted in all columns
before the call.
In a sense, one can see the NdbRecord
objects as the metadata
descriptions of a row object. The memory buffers contain the actual data
that is interpreted according to the NdbRecord
object.
There is a variant where only two parameters are needed where the
primary key columns are combined with the rest of the columns. This only
exists for the insertTuple
call and not for other types of calls.
The buffer can be reused for other purposes immediately after the call since the data have been saved in the NDB API and no data is to be returned in this case.
Update tuple#
The same as inserting a row except that instead we use the call
updateTuple
with the same parameters and we still need to set all
columns before the call. Also here the buffers can be immediately
reused.
Write tuple#
The writeTuple
call will perform an insert if the row didn’t exist
before or update if the row previously did exist. All required columns
need to be part of the record specification since it can become an
insert. Also here the buffer can be immediately reused.
Delete tuple#
The call deleteTuple
uses the same parameters, but the fourth
parameter can be skipped if no read is performed while deleting the row.
The record specification for the entire row is still required in this
call.
Read tuple#
The call readTuple
will perform a read of one tuple. The parameters
are still the same as when calling insert, update and write of a tuple.
It is not ok to reuse the second memory buffer provided in the fourth
parameter until the read has been completed. The memory buffer is here
used to return the data read from NDB. It is important to still set the
primary key fields before calling readTuple
. The first memory buffer
can be immediately reused after the call.
It is possible to set lock mode in this call, more on this below in the section on scans.
Operation options#
In addition the above calls can all set a result mask which is a bit mask indicating which parts of the record specification to use. It is an array of bytes where bits are set in column ID order. If a bit is set in this array of bytes it indicates that the column is used for either update or read dependent on the operation type.
There is a possibility to set special options for the reads and writes, more on such options below. There are two parameters for this, one that is a pointer to the options object and a second that contains the length of this options object.
Scan operations#
There are two types of scan operations, full table scans and ordered index scans.
Ordered index scans#
Ordered index scans normally use a range scan that specifies a start position and an end position for the range scan. These ranges are defined on the columns defined in the ordered index. Multiple ranges are possible to specify in a scan operation.
To specify an ordered index scan we use a call to getIndex
on the
transaction object that returns an NdbIndexScanOperation
object. The
parameters passed in this call define the ordered index record
specification and another record specification for the data returned in
the scan. Optionally one defines the lock mode, a bitmask of the columns
in the record specification that are to be read, a bound object of the
type NdbIndexScanOperation::IndexBound
and finally an option object
with its length.
The bound object can be defined after this call in a separate call on
the scan object returned from scanIndex
using the method setBound
where the record specification of the ordered index and the bound object
is the parameters.
Multiple index ranges can be defined, various sort orders can be defined. These are specified in the options parameter specified below.
Full table scan#
A full table scan will scan all rows in a table and there are three ways to do this, scan through hash index, scan through main memory rows and scan through disk data records.
After starting the transaction one can start a full table scan by
calling scanTable
on the transaction object. This call will return an
object of the type NdbScanOperation
.
This requires at least one parameter, which is the record specification of the row to be retrieved from the table, not all columns need to be part of this specification only the columns of interest.
The second parameter is the lock mode of the scan. Default lock mode is LM_Read that means that a row lock will be held on the row from it is scanned until we move onto the next row. LM_Exclusive means that we have a row lock, but here an exclusive lock instead.
LM_CommittedRead means that no lock is held at all while scanning, the last committed value of the row is read instead.
LM_CommittedRead is the default setting for SELECT queries in MySQL for the NDB storage engine. The NDB storage engine only supports the setting READ_COMMITTED.
To use LM_Read from SQL one uses SELECT ... LOCK IN SHARED MODE
and
to use LM_Exclusive from SQL one uses the syntax
SELECT FOR UPDATE ...
. The final setting that one can use is
LM_SimpleRead, thus the row is locked while reading it, but before
sending it to the API the lock is released. This behavior is the default
for reading BLOB tables from the MySQL Server. BLOB tables are a bit
special in how they handle these lock modes, see the chapter on
concurrency control in RonDB.
Lock mode can be set on the readTuple
calls above when defining a read
operation using primary key or unique key. It cannot be set on write
operations since these operations always use an exclusive lock.
The third parameter is the result mask. This parameter can be used to define the set of columns (in column ID order) to be used in the scan. This is useful if we are using a standard record specification that contains all columns and we want to specifically set a different set of columns for this scan operation.
The fourth parameter is the scan options, there are many different ways to perform the scan, we will go through those options below.
The actual scan phase#
The scanning is started by calling execute
on the transaction object
with NdbTransaction::NoCommit
as a parameter.
Next, the actual scanning happens in a loop where we constantly call the
method nextResult
on the NdbScanOperation
object. This method
requires a pointer to a memory buffer from where the data can be read.
Thus the NDB API uses its own memory and returns a pointer to this
memory.
The second parameter is important, if true it states that a fetch is
allowed. Thus if it is false we will only return rows that have already
been fetched from the NDB data node. By looping with this set to false
we know that we won’t release locks on rows that we have scanned with
nextResult
, we know that the memory buffers that was returned from
previous nextResult
calls are still valid. This allows to perform
write operations on these rows before releasing the lock on the rows. It
makes it possible to get this done in a batching mode where up to almost
one thousand rows can be scanned locally before the next interaction
with the NDB data nodes need to happen.
The third parameter is whether to use force send flag when starting a new fetch. If we are ok to pass on to the next row immediately we will set fetch allowed to be true and this is set after completing the actions required on rows that we have scanned without starting a new fetch.
For read queries, we can simply call nextResult
in a simple loop until
we get the result that no more rows exists.
There is a variant of nextResult
called nextResultCopyOut
that
instead of getting a pointer to a memory buffer sends in a buffer where
it asks the NDB API to store the row fetched. Other than that it works
exactly the same way as nextResult
.
Scan with takeover#
Scan with takeover means that we use the row fetched in a scan to either update or delete the row. This should only be used if the scan uses the lock mode LM_Exclusive. If used with only LM_Read it can cause a deadlock to occur since the lock must be upgraded.
When using scan with takeover it is necessary to use a double loop, an
outer loop that uses nextResult
with fetch allowed set to true and an
inner loop that uses nextResult
with fetch allowed set to false. When
the inner loop exits or as part of the inner loop we need to handle all
the updates and deletes performed as part of the scan.
To delete the row that was reported in the scan operation it is done
through the call deleteCurrentTuple
, updates are using the call
updateCurrentTuple
. The take-over can be made by any transaction, it
need not be the same transaction as the scan is executed in.
Options#
There is a whole range of options that can be set for both scan and key
operations that affects the operation of the key lookups and scans. We
will cover these here, and we will explain them in the context of key
lookups, but they can also be used for scan operations. The only
difference is that the options class is called
NdbOperation::OperationOption
for key lookups and it is called
NdbScanOperation::OperationOption
for scan operations.
The option identifiers are called OO_*
for key lookups and they are
called SO_*
for scan operations.
Scan flags#
Scan flags are an integral part of a scan. It is set by the option
SO_SCANFLAGS
and by setting the option variable scan_flags
to a
proper value. This option is only available for scan operations.
Several flags can be set by OR:ing them together. The following flags are supported:
SF_TupScan#
This option is only applicable to a full table scan. It scans the tuples in row ID order instead of the default order imposed by the hash index.
SF_DiskScan#
This option is only applicable to a full table scan on a table with disk columns. In this case, the scan can be done in the row order on disk pages. This can be beneficial since it can decrease the amount of fetching disk pages for a table scan.
SF_OrderBy#
Given that an ordered index scan is parallelized on a number of partitions the rows can be returned in non-sorted order. Each partition will return rows in the order sorted by the index.
The NDB API can sort the rows before returning them to the application.
This requires running with full parallelism, thus this flag cannot be
combined with SO_Parallel
. It requires all partitions to return data
such that the sort can be performed before returning data to the
application. This will have an impact on scan throughput.
SF_OrderByFull#
Same flag as SF_OrderBy
except that the flag implies that all index
columns are added to the read set of the scan.
SF_Descending#
This is only applicable to an ordered index scan. By default ordered index scans are done in ascending order. With this flag, the order is instead descending.
SF_MultiRange#
This applies to ordered index scans where multiple index ranges are scanned. In this case, this flag must be set.
SF_ReadRangeNo#
If multiple ranges are scanned this flag enables reading the range number for a row that has been delivered through the API.
SF_KeyInfo#
If a scan is performed using the lock mode LM_Read one can specify
this flag. It will send back key information to the API that enables a
take-over operation to be defined using the call lockCurrentTuple
.
This is necessary if we want to retain a read lock on a row already
scanned.
This flag is enabled by default when using the lock mode LM_Exclusive.
Scan batching#
Normally the size of batches sent back to the API is limited by
configuration parameters on the API node. It is possible to override
these defaults by setting the option SO_BATCH
and set the option
variable batch
to the number of rows per batch that is desired.
This option is only available for scan operations.
Scan parallellism#
By default, a scan will use the maximum parallelism available. This means to scan all partitions in parallel up to a limit of how many partitions can be scanned in parallel.
By setting the option SO_PARALLEL
we can set an explicit limit on the
parallelism for a scan. We set the limit in the variable parallel
on
the options object.
This option is only available for scan operations.
Scan partitioning info#
There is an option SO_PART_INFO
available that is never used. Its use
is related to multiple ranges defined in a query where all ranges use
the same partition. This is a feature used by the user-defined
partitioning scheme not supported but still implemented. The MySQL
Server implements this without using any option.
Only available in scan operations.
REDO log queueing#
All write operations are written into a REDO log. As part of the prepare phase we write them into a REDO log buffer. This buffer could be full. If this happens we have different choices. One choice is to abort the transaction. The other option is to queue the operation before executing it and wait for REDO log buffer to be available.
The default behavior for this is set as part of the cluster configuration of the API node. Through setting an option OO_QUEUABLE and OO_NOTQUEUABLE (note spelling error here). we can change to either queue or not queue for a specific transaction. The same option should be used for all operations in a transaction.
This option is only available for writing key lookups.
Early unlock#
The normal behavior for a transaction is to release locks at commit. BLOBs in NDB are implemented as separate tables.
For a normal table using READ COMMITTED mode requires no locks. For a BLOB table, we need to first get a lock on the base table followed by a read on the BLOB table where we read with a simple lock.
For BLOB tables this lock on the base table is unlocked immediately after completing the read of the BLOB table.
The mechanism to unlock rows before commit is available to an application. Such behavior has an impact on the transactional correctness of the application, so if used, it should be used with care.
When the operation is defined one uses the flag OO_LOCKHANDLE
. Later
when we want to unlock the row we call getLockHandle on the operation
object followed by a call to unlock on the transaction object using this
handle. The unlock row operation will be sent the next time we call
execute
.
NdbTransaction *ndb_trans = .....
NdbOperation *ndb_op = ....
NdbOperation::OperationOptions options;
options.optionsPresent =
NdbOperation::OperationOptions::OO_LOCKHANDLE
...
Execute ndb_op operation towards NDB data node
...
NdbLockHandle *lock_handle = ndb_op->getLockHandle();
ndb_trans->unlock(lock_handle);
This operation is only available to key lookups.
Disable Foreign Key constraints#
The MySQL Server provides the capability to temporarily disable foreign
key constraints. This is implemented as an option in the NDB API. By
setting the option OO_DISABLE_FK
this is achieved, if it is set on an
operation it is set on the transaction in the NDB data node, it is
always a transaction property.
Use of this feature means that it is possible to create an inconsistent database, it is important that you understand what you are doing when using this feature.
This feature is only available to key lookups.
Deferred constraints#
In the SQL standard, constraints are checked at commit time, and deferred constraints are the default behavior. In MySQL, the constraints are checked immediately by default and InnoDB only supports immediate check of foreign key constraints.
NDB supports deferring constraint checks (unique key constraints and
foreign key constraints) to commit time. This is controlled in the MySQL
Server by setting the MySQL configuration parameter
--ndb-deferred-constraints
to 1. It is always used in the replica
applier for Global Replication since this packs a number of transactions
together, only the result at commit time is certain to pass the
constraint checks.
It is possible to use deferred constraints in the NDB API as well by
setting the option flag OO_DEFERRED_CONSTAINTS
(note spelling error
here).
This feature is only available to key lookups.
Save data in operation object#
It is possible to save a reference to any data structure in the
operation object. This is done by setting the option OO_CUSTOMDATA
and
setting the variable optionData
on the options object.
This is used by RonDB for handling conflict detection in the replica applier. But it could be used for anything.
Later on, you can retrieve this pointer through the call
getCustomData()
on the same operation object.
This option is available for both scan operations and key lookups.
Pass a value to the event API#
It is possible to pass a 32-bit value from the NDB API to the NDB Event
API. This is passed in any update, insert or delete operation. When this
operation triggers an event that is sent to the node receiving the
events, this value is passed and can be read through the call
getAnyValue
on the event operation object.
To set this value use the flag OO_ANYVALUE
and the variable anyValue
on the options object.
This is used as part of the Global Replication implementation to pass information from the NDB API that issued the change to the replica applier. For more information on the content in the any value data see the code files sql/ndb_anyvalue.h and sql/ndb_anyvalue.cc.
If the cluster isn’t using replication to another cluster and the application needs to transport some value to the event API it is possible to use this value.
This feature is only available to key lookups.
Operation bound to specific partition#
It is possible to bind an operation to only be executed by one partition. For example if one needs an estimate of the number of rows one can scan one partition and assume that the other partitions have roughly the same data distribution.
In this case, one sets the flag OO_PARTITION_ID
and sets the partition
ID that is scanned or used in the variable partitionId
on the options
object.
This option is available for both scan operations and key lookups.
Get special values#
Normally an operation retrieves column values from the row. It is also possible to retrieve metadata values about the row and its partition. These columns are called pseudo columns internally in NDB. These pseudo columns all relate to the current row read or written in a key lookup or in a scan operation. By performing a scan operation on all partitions of a table it is possible to get table values for many of those items.
A few of those pseudo columns are only intended for internal use cases.
This option is available to both scan operations and key lookups.
Uint32 row_gci;
Uint32 row_size;
....
NdbOperation::OperationOptions options;
NdbOperation::GetValueSpec extra_gets[2];
extraGets[0].column = NdbDictionary::Column::ROW_GCI;
extraGets[0].appStorage = &row_gci;
extraGets[1].column = NdbDictionary::Column::ROW_SIZE;
extraGets[1].appStorage = &row_size;
options.optionsPresent =
NdbOperation::OperationOptions::OO_GETVALUE;
options.extraGetValues = &extra_gets[0];
options.numExtraGetValues =
sizeof(extra_gets)/sizeof(extra_gets[0]);
...
Pass reference to option in call to readTuple/insertTuple/...
In the example above we perform a normal key lookup operation where we are reading a number of normal row columns (not shown in the example above). In addition, we want to know the last GCI the row was updated and we want to know the size of the row.
These extra columns are added to the columns read by this operation as shown above and the same pattern applies to a whole range of metadata columns.
These columns are also accessible from SQL. In this case, their name is
as below with NDB$
prepended, e.g. NDB$ROW_GCI
.
FRAGMENT#
This will return the ID of the partition of the row returned. It is a 32-bit column.
FRAGMENT_FIXED_MEMORY#
This provides the total amount of memory used for the fixed part of the rows in this partition (we use the term fragment internally in NDB as synonym for partition in most cases). In reality, we are reading the information on the specific fragment replica where the operation was executed. It is a 64-bit column.
FRAGMENT_VARSIZED_MEMORY#
This provides the amount of variable-sized memory for the partition. A row has a fixed memory part, an optional variable-sized part and an optional disk part. The partition also has a primary key hash index and possibly a set of ordered indexes. It is a 64-bit column.
ROW_COUNT#
This returns the current number of rows in the partition. A row is seen in this view when the row insert has committed. It is a 64-bit column.
COMMIT_COUNT#
The number of commits in this partition since the node was started (or partition was created) is reported in this variable. It is a 64-bit column.
ROW_SIZE#
Returns the size of the row read. It is a 32-bit column.
RANGE_NO#
This is only applicable to ordered index scans. Ordered index scans can use multiple ranges. In this case, this column returns the number of the range that the row is returned from. It is a 32-bit column.
DISK_REF#
This is the reference to the disk part. It contains a file ID, a page ID and a page index. It is a 64-bit column.
RECORDS_IN_RANGE#
When executing an ordered index scan this column will calculate an estimate of how many rows exist in the range specified in the scan. The intended use for this is to calculate index statistics used to optimize queries in the MySQL Server. The column type is an array of 4 32-bit values.
The first value is the number of entries in the index (all rows in the partition except rows that have a NULL value in an index column). The second is the number of rows in the range, the third value is the number of rows before the range and the last value is the number of rows after the range. All these values are estimates.
ROWID#
The column type is an array of 2 32-bit values. The first 32-bit value is the fragment page ID and the second is the page index. This number must be the same in the primary replica and the backup replicas.
ROW_GCI#
This value is the GCI identifier of the transaction that last updated this row. It is a 64-bit column, although the value will always fit in a 32-bit variable.
ROW_GCI64#
It is a 64-bit column. The higher 32 bits are the same as returned in ROW_GCI. The lower are the extra GCI bits used in the conflict handling of the Global Replication. More on this later.
ROW_AUTHOR#
When a table is involved in conflict detection handling we add this extra flag that indicates if the row was written last by the replica applier or not. It is a 32-bit column.
ANY_VALUE#
This reads the any value set by the OO_ANYVALUE
above. It is a 32-bit
column.
COPY_ROWID#
The column type is an array of 2 32-bit values. It is the same as ROWID except that this is the row ID for the copy row, this row ID might differ in the replicas.
LOCK_REF#
This provides a reference to the lock object. The highest 16 bits of the first 32-bit value is the node ID where the lock resides. The lowest 16 bits of the first is the fragment ID, the second word is the operation record reference in the LQH block and the third word is a part of the unique ID in the LQH block of the operation.
This is used under the hood for unlocking a row before commit. It is intended only for internal use of the NDB API.
OP_ID#
This returns the unique ID inside the LQH block of the operation currently executed. It is currently not used for anything. It is a 64-bit column.
FRAGMENT_EXTENT_SPACE#
The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the number of extents the partition has that are free.
FRAGMENT_FREE_EXTENT_SPACE#
The column type is an array of 2 32-bit values. These two numbers together form a 64-bit value. This value represents the amount of free rows available in the free extents already allocated to this partition.
Set special values#
This is very similar to getting special values. The difference is that
the option flag is OO_SETVALUE
, the object used to set the values is
NdbOperation::SetValueSpec
instead of NdbOperation::GetValueSpec
.
The number of set values is set in numExtraSetValues
and the set value
object is stored in extraSetValues
as shown below.
NdbOperation::OperationOptions options;
NdbOperation::SetValueSpec extra_sets[2];
....
options.optionsPresent =
NdbOperation::OperationOptions::OO_SETVALUE;
options.extraSetValues = &extra_sets[0];
options.numExtraSetValues =
sizeof(extra_sets)/sizeof(extra_sets[0]);
This feature is only available to key lookups. As an example, it can be
used to set the ROW_AUTHOR
field above, see the NDB storage engine
code for an example.
Hidden key column#
When a table is created without a primary key, the NDB storage engine will add a column that is an 8-byte column where we generate a unique ID of the row when inserting it. This column is always placed last in the set of columns.
Partition ID for user-defined partitions#
For tables with user-defined partitioning, the MySQL Server calculates the partition ID and sets it in the first column ID after the last column (or after the hidden key column).
ROW_AUTHOR#
ROW_AUTHOR can be set by replica applier thread for tables involved in conflict detection.
OPTIMIZE#
This column is used as part of the implementation of OPTIMIZE TABLE
.
By setting this column to a proper value the NDB storage engine
instructs the NDB data node to move the variable-sized part to a
prepared place to remove fragmentation of the variable-sized memory. It
is not intended for normal use cases. It is a 32-bit column.
Interpreter commands as part of NDB API operations#
The final option that one can use is so advanced that it merits its own section. This is the execution of interpreted programs. All read- and write-operations can run an interpreted program as part of their operation.
The most common use of this feature is to push down scan filters to NDB data nodes. Thus the MySQL Server has a condition it wants to push down to the data node. This is handled such that the conditions are translated into an interpreted program.
This interpreted program is sent as part of the read/write operation down to the data node owning the data. When reading or writing the row this interpreted program is executed and for scan filters it decides whether to read the row or to skip the row and move on.
For write operations it could be used e.g. to increment an integer field rather than performing a read followed by a write. There is no way currently to get this optimization from the MySQL Server, but it is possible to write an NDB API program that builds an interpreted program that does this.
The interpreter is a very simple register machine with 8 registers. It is possible to read a column value into a register, it is possible to load constants into registers, it is possible to add, subtract registers. It is possible to define labels and it is possible to branch dependent on a register value to a label. It is also possible to define a subroutine and call it.
To enable pushdown of LIKE constructs, this is also available as a interpreter instruction.
To use this, one creates an NdbInterpretedCode
object. To use things
such as increment column one should use the functions of the
NdbInterpretedCode object directly. For the definition of condition
filters, one should use the NdbScanFilter
class.
NdbInterpretedCode code(...);
... Define interpreted program
options.optionsPresent =
NdbOperation::OperationOptions::OO_INTERPRETED;
options.interpretedCode = &code;
NdbScanFilter#
The ha_ndbcluster_cond.cc contains lots of uses the NdbScanFilter functions. Also, read NdbScanFilter.hpp to see more how one can use this object.
The general idea of using the NdbScanFilter is that you define groups of
conditions. The start of this filter group is started by calling the
function begin
. The parameter to this specifies whether the conditions
inside the group should use AND, OR, NOR, or NAND. One condition inside
this group could then be a new group of conditions. One could view
begin
and end
calls as left parenthesis (begin
) and right
parenthesis (end
) in a textual condition.
The actual conditions are defined by a call to the function cmp
. This
starts by defining the comparator type (e.g. larger than, equal and so
forth). Next the column to compare with. Next, the value to compare
with. This value is represented by a void*
pointer with a length. No
length bytes are expected in the value representation.
The comparator could also be LIKE and NOT LIKE. In this case, the value could contain % and ? wildcards.
There are also methods to check for NULL and for NOT NULL.
Scan example#
Here is an example where we delete all rows using an ordered index on the primary key and in addition a scan filter. All the rows that pass are part of the index scan and matches the scan filter are returned and deleted.
At first, we start a transaction, we create a scan object that contains the record specification of the index and one record specification of the columns to read. We use exclusive lock since we are going to delete the rows as part of the scan.
We define the lower bound to be 1 and we include 1 in the range. We define the upper bound to be 100 and we don’t include the 100 in the range.
Next, we also define a scan filter using an equal condition on one of the columns in the table. This scan filter will generate an interpreted program that will skip the row unless the scan filter is passed.
Using the above mentioned option SO_SCANFLAGS
it is possible to
specify that rows are returned in various sorted orders, it is possible
to define multiple ranges and to get the range number of returned rows.
We have a double loop where the outer loop is making calls to the NDB data node and the inner loops handles the rows returned from this call to the NDB data node.
We execute the deletes once per outer loop, but we wait with committing the transaction until the scan is completed.
NdbRecord *primary_key_record = ...
NdbRecord *columns_record = ...
...
trans_obj = ndb_obj->startTransaction();
scan_obj = trans_obj->scanIndex(primary_key_record,
columns_record,
NdbOperation::LM_Exclusive);
unsigned lower_bound = 1;
unsigned upper_bound = 100;
NdbIndexScanOperation::IndexBound key_range;
key_range.low_key = (char*)&lower_bound;
key_range.low_key_count = 1;
key_range.low_inclusive = true;
key_range.high_key=(char*)&high;
key_range.high_key_count=100;
key_range.high_inclusive=false;
key_range.range_no=0;
scan_obj->setBound(primary_key_record, key_range);
NdbScanFilter scan_filter(scan_obj);
scan_filter.begin(NdbScanFilter::AND);
scan_filter.cmp(NdbScanFilter::COND_EQ,
column_id,
(void*)column_value,
column_length);
scan_filter.end();
trans_obj->execute(NdbTransaction::NoCommit);
while (scan_obj->nextResult(true)) == 0)
{
do
{
scan_obj->deleteCurrentTuple();
} while (scan_obj->nextResult(false)) == 0);
trans_obj->execute(NdbTransaction::NoCommit);
}
trans_obj->execute(NdbTransaction::Commit);
trans_obj = ndb_obj->closeTransaction();
NDB Event API#
The NDB API contains an Event API that makes it possible to subscribe to all changes of a specific table. Each time a transaction is committed an event can be generated for each row that has been changed. These events are collected together for a specific global checkpoint and sent to the API node(s) that are listening to the events. There can be many listeners to the same event. This is used by NDB to inject events into a MySQL binlog to get the changes into the MySQL Replication infrastructure. It can be used by applications to wait for certain events and create actions based on those events. There are countless applications of how to use events.
The events are asynchronous events, they will arrive as soon as possible, but there is no specific guarantee on when they will arrive. Normally a cluster uses micro GCPs that are created with a 100-millisecond delay. Therefore it is expected that events are delayed for at least hundreds of milliseconds and in overloaded systems it can take longer.
The data node will buffer events in a buffer, if the API node is not fast enough to consume the events, the event will eventually fail. In this case, the API node will be informed that the event reporting failed.
Create Event#
The first step is to create an event of type NdbDictionary::Event
. To
create such an event it needs a name and it needs a table object of type
NdbDictionary::Table
, the table object is retrieved by name from the
NdbDictionary::Dictionary
object.
Next one adds one or more table events, one can set the event to wait
for inserts, deletes, updates or all of those. This uses the method
addTableEvent
on the event object.
The event specifies which columns that will be sent to the event
listener as part of each event. This uses the method addEventColumns
on the event object.
For insert events, the after value is sent, for delete events the before value is sent and for update events both the before and after value of those columns are sent.
If the table contains BLOBs one wants to hear when they are updated it
is necessary to call the method mergeEvents
on the event object before
creating it in the NDB data nodes. This method can be called with the
flag set to not merge events from the BLOB tables.
Setting merge events to true has the impact that several writes on the same row within one global checkpoint will be merged into one event.
BLOB events are reported as mentioned above when merge events have been
set, but it is necessary to read the BLOB fields using the NdbBlob
object to see their value.
The next step is to create the event in the NDB data nodes. This happens
through a call to the method createEvent
on the
NdbDictionary::Dictionary
object using the event object as a
parameter.
Now the event is created in the NDB data node and can be used to listen to events.
Drop Event#
Events are dropped from the NDB data nodes by calling the method
dropEvent
on the NdbDictionary::Dictionary
object using the event
name as the parameter.
Create Event Operation#
Now that the event exists in the NDB data nodes we can create an event
operation to listen to the events. This is done through a call to the
method createEventOperation
on the Ndb object using the event name as
a parameter.
It is necessary to call the method mergeEvents
on the event operation
object to specify whether it is desirable to get events merged or not.
This merge setting activates the merging, but the merge flag needs to be
set also on the event object when the event is created in the data
nodes. If not set also there there will be events missing from the data
node making it impossible to merge events properly.
As part of setting up the event operation object we need to define
objects of type NdbRecAttr
to provide the before value (using
getPreValue
) and after value (using getValue
) for each column we are
interested in.
Finally, we need to call execute
on the event operation object.
Wait for events#
To listen to events one now calls the method pollEvents
on the Ndb
object using the number of milliseconds to wait for wake up if no events
occur. This method returns a positive number if events occur.
If events occur one calls the method nextEvent
on the Ndb object to
retrieve one event. This method returns the event operation previously
defined. This object is now filled with information from the event and
the user can use methods in this object to retrieve the information
about the event type (insert, delete or update) and what columns changed
with their before value and after value.
The NdbRecAttr
objects can be used to discover if attributes have
changed, if they are set to NULL and the column value.
When the program is no longer interested in listening to events it drops
the event operation by calling dropEventOperation
on the Ndb object
using the event operation object as a parameter.
Node failure handling of events#
Events are generated in all data nodes. It is only the node that has been selected as the primary event sender that will send his event to the API node. If a data node fails that was the primary event sender the other nodes will ensure that the API node gets the information from one of the remaining live nodes instead. Thus all nodes keep the buffers for the events of a global checkpoint until the API node has confirmed that it has received those events. All this happens automatically without any user interaction.
NDB API Event example#
Here is an example of how to write an event handler loop for a specific table and a number of specific columns.
const char *col_names[2] = { "col1", "col2" };
NdbDictionary::Dictionary *dict_obj ndb_obj->getDictionary();
NdbDictionary::Table *tab_obj = dict_obj->getTable("table_name");
NdbDictionary::Event event_obj("event_name", *tab_obj);
event_obj.addTableEvent(NdbDictionary::Event::TE_ALL);
event_obj.addEventColumns(2, col_names);
event_obj.mergeEvents(false);
dict_obj->createEvent(event_obj);
NdbEventOperation *op = ndb_obj->createEventOperation("event_name");
op->mergeEvents(false);
NdbRecAttr *col_after[0] = op->getValue(col_names[0]);
NdbRecAttr *col_before[0] = op->getPreValue(col_names[0]);
NdbRecAttr *col_after[1] = op->getValue(col_names[1]);
NdbRecAttr *col_before[1] = op->getPreValue(col_names[1]);
op->execute();
while (true) {
int r = ndb_obj->pollEvents(1000);
while (r > 0 && (op = ndb_obj->nextEvents())) {
... Handle event
}
}
ndb_obj->dropEventOperation(op);
dict_obj->dropEvent("event_name");
Pushdown Join#
A part of the NDB API is the handling of complex query operations. It is possible to push parts of a join query down to the NDB data nodes. This makes use of a form of linked table scans.
The first table either uses a primary key operation, unique key operation or a scan operation. Each row that is found in this first table is sent to the second table in the join order. Normally the first table is a scan operation in a pushdown join operation.
For each row found in the first table, the results are sent to the NDB API. At the same time, the columns needed for evaluation further down the join evaluation is sent along in a new join that involves all tables except the first table.
In this manner, the query is recursively executed. If the join evaluation contains filtering parts, this can lead to a very high parallelization. If very little filtering is performed the NDB API will still be a major bottleneck since there is a limit to how fast the NDB API can process queries.
This pushdown join interface to the NDB API is used by the NDB storage engine, it is not intended for use by normal NDB API applications. This is not described here.
Asynchronous API#
The asynchronous NDB API is currently only applicable to the key operations. It is not possible to use it for scan operations.
It only differs in how transactions are executed. Instead of a single
execute
, there are three calls. These calls must be preceded by a call
executeAsynchPrepare
that prepares the defined operations for execute.
A normal execute(...)
call will normally perform three actions:
-
Prepare defined operations to execute
-
Send prepared operations
-
Poll for completion of sent operations
With the normal synchronous API, it is possible to execute multiple transactions in parallel, but the normal use case is to use one transaction at a time. The above poll will always wait for all sent operations to complete.
Using the asynchronous API it is expected to have many concurrent transactions outstanding at any point in time and the poll doesn’t have to wait for all actions to complete, it is sufficient to wait for a few actions (by default one) to complete.
sendPollNdb
#
sendPollNdb
is more or less the same as calling execute with one major
difference. The sendPollNdb can be asked to wake up before all
transactions have completed. Thus the asynchronous interface can be used
to work with hundreds of interactions in parallel per thread. Thus the
fewer threads are needed to operate the interactions, the fewer threads
can use more batching and thus higher throughput can be achieved.
sendPreparedTransactions
#
If the thread will take care of any other business while NDB is working
it can use the sendPreparedTransactions
call to send the prepared
transactions but not waiting for their completion.
pollNdb
#
This can be used in conjunction with sendPreparedTransactions
. When
returning from other activities it is possible to poll for those
transactions already sent to be completed.
flexAsynch Architecture#
The flexAsynch program is a benchmark program we’ve used to showcase hundreds of millions of reads and tens of millions of write operations per second. It uses a special model for how to achieve extremely high throughput. It can be used in other programs only requiring access to key operations.