Wednesday, April 01, 2015

Benchmarking MySQL Cluster 7.4 on an Intel NUC

I have done a lot of benchmarks of MySQL Cluster on large servers which
is obviously very interesting. As mentioned in a previous blog I have
an Intel NUC machine now easily accessible. So I thought it would be
fun to make some benchmarks on this machine to see how fast MySQL Cluster
runs on small HW.

First a little description of the HW. The CPU is an Intel Core i5-4250
CPU. It runs at 1.3GHz and have a turbo frequency of 2.3 GHz. The CPU
has two cores and each core can run two threads simultaneously
(called hyperhtreading in Intel CPUs). It comes with the box containing
the motherboard and the CPU. Then you buy one or two DRAMs to it and an
SSD drive. I installed two DDR3L DRAMs which gives me a total of 16GByte
memory in the machine. In addition I installed an SSD drive of 256GByte.
The box fits nicely into the palm of your hand.

On top of this HW I installed Oracle Linux 7.

The configuration of the benchmark uses my laptop as benchmark driver
and uses the Intel NUC to run the data node. Since I only have one
Intel NUC available the benchmarks focused on a setup with only one

To start with I will focus this blog on performance of a table which
is a pure main memory where after a restart the table still exists,
but is empty. This kind of tables are useful to keep track of live
states where it doesn't make sense to recover the table since the
live state changes so often that the restored data is useless.

When recoverability on the table is active it adds lots of writes
of logs and checkpoints that adds an extra overhead of about
30-50% for inserts, deletes and updates. Reads are about as fast
with recoverability as without. I’ll dig into this type of benchmark
in a later blog.

So to the results of the benchmark run. Inserts were processed at a
rate of 400k per second, updates at 402k per second, deletes at
433k per second. Reads I got up to 589k per second. For both writes
and reads we're operating here at full CPU speed and also on full
network speed (Gigabit Ethernet).

This benchmark was executed with 1 primary key of 8 bytes and one
field with 100 bytes in it.

I was curious to see what the top rate of key lookups was with very
small records where the network speed wasn't a factor in the play.
So I changed the field from 100 bytes to 8 bytes and reran the

Now I got inserts at 476k per second, updates at 479k per second,
deletes at 474k per second and finally reads at 925k per second.
So close to a million reads per second on this tiny machine alone.

Given that I ran without logging to disk most of the configuration
was pretty straightforward of the data node. The most important
part is obviously how to make use of the 4 CPU threads in the 2
available cores.

As usual the LDM threads is where the actual database resides is
the thread that requires the most CPU, in this particular benchmark
also the TC threads require a lot of CPU since they are involved
in each and every key lookup. For e.g. applications with scans the
TC thread is a lot less involved. There is also a send thread
and a receive thread which is important to configure correctly.
These four thread types are the most important to configure when
running without recoverability. When adding recoverability the
configuration of the file io threads is also important. There is
also a main thread and replication thread and some connection
threads, but this have no impact on the performance of this
benchmark, but they were configured to be on the CPU with lowest

The best results were achieved with only one thread per thread
type. The best config of those was to allow the LDM thread to
use its own CPU core and thus not use hyperthreading for this
thread to any great extent. Then one CPU thread in the other
core was used for the TC thread and the other CPU thread was
used for the send and the receive thread. This CPU thread also
handled the other threads.

When running with 2 LDM threads the number of threads increased
and the OS had to do more scheduling, most likely this config
could provide better results in a more bursty load, but for
sustained good performance it was better to use only a few
threads. The performance difference was though quite small
between using 1 LDM and 1 TC compared to using 2 LDMs and
2 TCs.

The benchmark application was executed on my laptop connected
to the Intel NUC machine through a gigabit ethernet switch.

Monday, March 23, 2015

Quick setup to run sysbench using MySQL Cluster 7.4

In developing MySQL Server and MySQL Cluster we use four types of testing.
We use unit testing, we use functional testing, we use system testing and
we use benchmark testing.

For the unit tests and functional tests we use the MTR test framework. This
is dealing with most issues of how to use SQL to query the MySQL database

For system testing the InnoDB team has a test framework that is used to ensure
that InnoDB will survive crashes of many sorts. In a similar fashion MySQL
Cluster uses the autotest framework that I described in an earlier blog:
Link to earlier blog.

For benchmark testing we have a few frameworks, I am going to describe the one
that I use on a regular basis. This framework can be downloaded from:
Link to download.

I started developing this framework in 2006. So it has been extended for quite
some time and it is a lot simpler to run it now than it ever was before.

In this blog I am going to describe  a very minimal setup needed to run a
sysbench benchmark against MySQL Cluster 7.4.

These benchmarks can be used to run the following types of tests:
1) Sysbench on a single server with InnoDB as storage engine
2) Sysbench on a single server with NDB as storage engine
3) Sysbench against InnoDB with InnoDB and sysbench on different servers
4) Sysbench against NDB with different programs on different servers
5) flexAsynch for MySQL Cluster testing on a set of servers
6) DBT2 on a single server with InnoDB as storage engine
7) DBT2 on a single server with NDB as storage engine
8) DBT2 using NDB as a storage engine against a lot of servers

Running against NDB as a storage engine means to use MySQL Cluster.
I mostly focus on 1) above for InnoDB testing. For MySQL Cluster I mostly
focus on 2) and 5). But occasionally I also test 8). 3), 4), 6) and
7) are perfectly possible setups and have been tested as well, but I don't
personally use those so much.

I will write a few blogs about how to setup a benchmark using these benchmark
scripts. I will first describe the basics of setting up a sysbench benchmark
against MySQL Cluster in this blog. Then a basic flexAsynch benchmark. Finally
a basic DBT2 benchmark on a single server.

After describing the basic configuration setup, I will describe in detail the
various parts to configure. There are numerous things to configure:

Mandatory items:
1) Which benchmark to run

2) Pointer to file locations for tarballs, data directory and install directory

3) Pointer to tarballs for MySQL, Sysbench and DBT2 benchmarks, also the
   benchmark scripts which are located in the DBT2 tarball.

4) Definition on which servers the various programs under test will be

Optional parts of configuring SUT (System Under Test)
5) Configuration of MySQL Server. Started also in flexAsynch, but not really
   used in flexAsynch benchmark.

6) Configuration of InnoDB if running test against InnoDB.

7) Configuration of NDB storage engine and NDB nodes if running against NDB
   storage engine.

Optional parts of configuring the actual benchmark
8) Sysbench setup. Setting up how to run the actual sysbench benchmark, for how
long to run, what sysbench variant to use, how many tables and a lot of other
things can be configured about how sysbench is to run. This part is obviously
only interesting when running a sysbench test.

9) DBT2 setup. There are numerous ways to adapt the basic DBT2 benchmark
   although the benchmark transactions are always the same.

10) flexAsynch benchmark setup. Also in flexAsynch there are multiple ways to
    adapt the benchmark.

11) In connection with the flexAsynch benchmark one can also benchmark restarts
    in MySQL Cluster. First the scripts use flexAsynch to load the database and
    then various forms of restarts can be tested.

The full capabilities of those benchmark scripts are quite extensive, they are
the result of 9 years of incremental design where we have tried to extend the
use cases of these benchmarks to support as many environments as possible, but
also to automate as much as is possible.

Obviously to understand all of this in one go is a tad difficult. So we will
try to start by describing how to setup a very simple sysbench benchmark using
one machine running sysbench against the NDB storage engine using the latest
development version of MySQL Cluster 7.4.

This is a good first step to get acquainted with the benchmark scripts. I will
add more details in later blogs about running more advanced setups.

The scripts have been used mostly on Linux, also a fair deal of usage on
Solaris. When preparing this blog I ran it on Mac OS X. This requires some
changes that will soon arrive in the dbt2- updated version of the
benchmark scripts. So preferably use Linux when trying this out for the
first time.

The first step in running this test is to create a directory from where the
tests are located on disk. I usually use some home directory and create a
directory called bench there. So on Mac OS X that would /Users/mikael/bench.
On a Linux machine I would normally use /home/mikael/bench.

In this directory I create four directories. I create one directory called
tarballs, this is where I will place the DBT2, sysbench and MySQL tarballs.
I create one directory called ndb, this will be used as the data directory
and will be automatically filled by the scripts. Next I will create a
directory mysql, this directory is where I will place the binary installation.
Finally I create a directory called sysbench.

Next step is to download the DBT2 tarball and the sysbench tarball from the

Copy those tarballs to the tarballs directory created above.

Next step is to get the MySQL tarball that you want to test. It's ok to use
both source tarballs and binary tarballs. Personally I obviously mostly work
with source tarballs since I want to have full control over everything, but
it's ok to use a binary tarball as well.

Place this tarball also in the tarballs directory.

Now in order to be able to run the benchmark scripts it is necessary to
get the top-level script copied from the DBT2 tarball. To do this perform
the following commands:

cd /Users/mikael/bench #use your bench directory here
cd tarballs
tar xfz dbt2-
cp dbt2-037.50.6/scripts/ ..
rm -rf dbt2-

Now ls from /Users/mikael/bench should look like this:
Mikaels-MacBook-Pro:bench mikael$ ls ndb tarballs
mysql sysbench
Mikaels-MacBook-Pro:bench mikael$ pwd

and ls from tarballs should look something like this:
Mikaels-MacBook-Pro:bench mikael$ cd tarballs
Mikaels-MacBook-Pro:tarballs mikael$ ls
dbt2- mysql-cluster-gpl-7.4.5.tar.gz sysbench-

Now it's time to create the config file.
This file is always located in an almost empty directory under the bench directory with only
one file in it. This file is always called autobench.conf.

Here is the content of the file in my simplistic test run.
#Define benchmark to run
#Define file locations of datadir, installdir and tarballs
#Define tarball versions
#Define engine to use
#Define way to build
#Define servers to use
#Configure MySQL Server
#Configure NDB part
#Configure sysbench

At first we need to define the benchmark to run, this is sysbench in our case.
We need to point out the tarball directory, we need to point out the directory
where to place the installation (REMOTE_BIN_INSTALL_DIR), we also need to point
out the data directory.

Next step is to provide the names of the tarballs, this is done in
have to adapt to different MySQL versions we also need to specify the base version
of MySQL which in the case of MySQL Cluster 7.4 is 5.6. The scripts currently
supports 5.1, 5.5 and 5.6. There is also some adaption needed for a new MySQL
version and so 5.7 is currently not supported but work is on the way for this.

Next we define if we want to use innodb or if we want to use ndb as the
storage engine in this test run.

We also define in the variable USE_BINARY_MYSQL_TARBALL whether we use a
source tarball or a binary tarball. In this case I use a source tarball.

We need to define the servers, there is in the case of sysbench always
one and only one MySQL Server, there can be one or many NDB management
servers. There can also be one or many NDB data nodes. But in this case
we wanted a simple setup on one machine so here there is one of each
sort and they are all located on which is the local host.

We always configure the MySQL server port to ensure we don't have a
problem running the benchmark if another MySQL Server is running on
the host. By default we use libtcmalloc instead of libc malloc, we avoid
this default since there are better malloc libraries and this variable should
be set purposely and not by default in most cases. But setting it to not use
a special malloc library will definitely work, so we do that here as a first

Since we run with NDB storage engine we need to set up a few configuration
variables. The NDB configuration can be quite extensive, but here we strive
for a simplistic one. We want only one replica which we set using NDB_REPLICAS.
We need to set the size of the NDB data memory and the NDB index memory.
Finally we set the number of execution threads to 8 which is pretty standard.
This should be sufficient for a simple benchmark run of sysbench.

The final section sets up a minimalistic set of parameters to run sysbench.
We run OLTP complex RW case. We insert 100.000 rows into the table and we
run two tests, one with 16 MySQL connections and one with 32 connections.
Each test will run for 1 minute and we will only run each test once.

Now that is a minimal configuration, it is possible to make it even smaller
but then it's hard to understand what the test actually does since one
needs to be an expert in the default settings which even I don't remember
for all variables.

Now this file autobench.conf is placed in
in my case.

Now it is time to run the test case.

The test is executed by the following commands:
cd /Users/mikael/bench
./ --default-directory /Users/mikael/bench/sysbench

The test will run everything until completed. Actually I more or less always use
one more parameter --skip-cleanup. This means that I don't have to repeat the
building of the source code if I want to rerun the test. If I run it then a
second time I should run with both --skip-cleanup and --skip-build to ensure
that I skip rebuilding the source and skip cleaning away the built source code
at the end of the test run.

The test result is located in the file:

The output from the sysbench program in this particular case can be found
in the file:

Good luck in benchmarking.

My favourite tool in following the benchmarks is top. I usually lock the various
threads onto different CPUs to ensure that I can understand how the different
threads behaves. I haven't done this in this set up since this is just a basic
setup. But top is still a good tool to follow what is going on in the server.

One final note is that running this basic setup will require 20-30 GByte of
disk space and also during the test run there will be a fairly high load on
the disk. As many developers are not suitable for benchmarks of
database engines one way to adapt the benchmark for a developer
environment is to set the config parameter NDB_DISKLESS="yes".
This essentially makes the file system behave as if it was located on

Friday, March 20, 2015

Controlling checkpoint speed in MySQL Cluster 7.4

A question from digitalpoint on a previous blog about 7.4 restarts requires
an explanation of how we control checkpoint speed in MySQL Cluster 7.4.
Since the explanation is fairly lengthy I will do it in this blog instead
of as a reply to a blog comment.

First of all some introduction into checkpoints in MySQL Cluster 7.4. We
actually use the term checkpoint for 2 things. We have LCPs (local
checkpoints) and GCPs (Global checkpoints). LCPs are the traditional
checkpoints and the one that will be described in this blog. GCPs are
about forming groups of transactions that will be durable after a
restart. GCPs happens very often whereas an LCP is a fairly long process.

So first I'll introduce why we're doing checkpoints (LCPs) in the first
place. There are two reasons for doing LCPs. The first is that MySQL
Cluster uses a log-based approach for recovery. Every update, insert,
delete and write is logged into the REDO log in MySQL Cluster. We also
have an UNDO log for disk data. To avoid that the logs grow to infinity
we use LCPs to be able to cut the log tail.

The logs have a fixed size, the REDO logs cannot be changed in size whereas
the UNDO log you can add new files to a logfile group. When a log is full
no updates are allowed anymore since we cannot recover the database anymore
if we allow non-logged updates.

All the above is true for tables that require durability. MySQL Cluster
actually also supports non-durable tables. These tables will be available
as long as the cluster survives, if the entire cluster fails, then those
tables will be restored to an empty state. Most applications need durable
tables, but there are also applications where durability doesn't make
sense and therefore it is better to run those tables without durability.

Checkpointing is obviously only relevant to durable tables where we recover
based on a combination of logs and checkpoints.

An LCP is used to cut the log tail. This means that we always need to write
the checkpoint at such a speed that we don't come into a point where we
run out of log (both REDO log and UNDO log). As an example assume that we
can execute 100.000 updates per second at top speed that updates 20 fields
of 200 bytes in size. The REDO log in this case will be about 200 bytes
plus 20 * 4 bytes for field info and around 70 bytes of fixed overhead for
a REDO log record. So about 350 bytes times 100.000 per second of REDO log
records created which is 35 MByte per second. Assuming that the data size is
50 GByte. We can either calculate the REDO log size based on the choice
of checkpoint write speed or the other way around that we calculate the
checkpoint write speed based on the REDO log size. If we start with a
checkpoint write speed of 100 MByte per second then it will take about
500 seconds to write an LCP. We need to have enough REDO log for at least
2 checkpoints and it is good have at least 100% safety margin. So then
the REDO log size needs to be big enough for 2000 seconds of log generation.
So 70 GByte of REDO log should be sufficient. The size of the REDO log is
a multiplication of number of log parts, number of log files and log file

So being able to cut the log tail is the only reason for running LCPs in
normal operation when there are no restarts ongoing. Obviously cutting the
log to be short also means shorter restart times since there is less
log to execute. So increasing checkpoint write speed means faster restart
times at the expense of more work in normal operation.

We also use the LCPs as part of restarts. We need to run an entire LCP where
the starting node participates to ensure that the starting node is
recoverable. In this scenario we simply want the LCP to run as fast as
possible to complete the restart as soon as possible.

So this means that we have two different speeds for checkpoint write speeds,
we have the speed needed during normal operation and we have the speed needed
for restarts. Actually we can even separate out a total cluster restart
(we call this system restart) from node restarts. The reason is that during
an LCP in a system restart there is no other action other than to perform
the LCP. Thus we can set the checkpoint write speed to the absolute maximum
possible. During another nodes restart we also need to execute user
transactions and so we want to find a balance between speeding up the restart
and being able to service current users of the cluster.

So this means that we need three config parameters. We call them
MaxDiskWriteSpeed (speed during normal operation),
MaxDiskWriteSpeedOtherNodeRestart (speed during another node performing
a node restart), MaxDiskWriteSpeedOwnNodeRestart (speed during our own node
restart, but also more importantly of all nodes during a system restart).

Obviously there is a cap to the write speed we can handle, there is both a
limit to what the disk drives can handle and there is also a limit to how
much each CPU running an LDM thread can output.

My experiments shows that one CPU that runs an LDM thread can output about
100 MByte per second if used only for generation of checkpoint writes.
The checkpoint format is essentially packed rows in a format that can be
restored by simply running normal inserts. We have changed a bit back and
forth between different checkpoint formats, we have used page-based schemes
that required UNDO logging also of memory tables in earlier versions of
MySQL Cluster. The benefit of the current scheme is that enables us to
avoid UNDO logs entirely for memory tables.

Modern disks can write a few hundred MBytes per second. Even cheap SSDs can
handle up to about 500 MBytes per second of writes. The disk has to handle
both writing of checkpoints and writes of the REDO log (and also of the
UNDO log for disk data).

In 7.3 and earlier the above config parameters was named differently.
They were named DiskCheckpointSpeed (more or less equal to MaxDiskWriteSpeed
in 7.4) and DiskCheckpointSpeedRestart (more or less equal to
MaxDiskWriteSpeedOwnRestart). MaxDiskWriteOtherNodeRestart have no
similitude in 7.3 and earlier versions, one uses DiskCheckpointSpeed also
when another node is restarting. This is one major factor in how we can
speed up node restarts and rolling upgrades.

One more thing is that the execution of LCPs have been improved in 7.4.
In 7.4 all LDM threads can write checkpoints in parallel. The config
parameter specifies the total speed of the node, thus each LDM will
use a part of this speed. So with write speed set to 100 MByte with
4 LDM threads means that each LDM thread will write at about 25 Mbyte
per second and thus about 25% of the available CPU capacity in the
LDM thread for checkpoints. In earlier versions up to 2 LDM threads
could write in parallel and each such LDM thread used the config
parameter DiskCheckpointSpeed/DiskCheckpointSpeedRestart. Thus the
write speed of the node was usually twice as much as the config
parameter was set to.

The change in 7.4 to enable all LDM threads to write in parallel means that
we can set the write speed higher in 7.4 than earlier. The CPU overhead of
checkpointing is spread on all LDM threads and this ensures that we are
able to increase the speed without detrimenting normal user operation.

There is also one more innovation in 7.4. This is seen from the config
parameter MinDiskWriteSpeed.

The problem here is that checkpoint writes compete for CPU capacity with
normal user operations. So if the user needs to run transactions at the
very highest speed, it is a good idea to slow down the CPU usage from
from writing checkpoints.

Additionally if we experience an IO lag where the disk gets overloaded
we also want to slow down the writing of checkpoints.

What we have done here is that we have implemented an adaptive algorithm
that will slow down the checkpoint speed when we either have a CPU
overload situation or we have an IO overload situation. To get a balanced
adaptive algorithm we decrease speed faster than we increase the speed.

For CPU overload we start reacting when the CPU usage reaches 95% and more.
We use a mechanism to see how much CPU time the LDM thread has had each
second. This means that this algorithm will work best in an environment
where the LDM threads always have access to CPU time if they need to.
If the LDM threads are sharing CPU time with many other threads and processes
the data node will still function but this particular adaptive algorithm
won't work very well.

Personally I always set up my data nodes using CPU locking with the
ThreadConfig config parameter to get the optimal stable performance from
the data nodes.

It is actually even possible to setup a real-time environment with MySQL
Cluster, more on the how-tos of that in a separate blog.

IO lagging is discovered by analysing how many seconds of REDO log is
on its way to the disk which hasn't yet been written to disk.

The adaptive algorithm will always ensure that we never go below
MinDiskWriteSpeed independent of CPU load and IO load. This parameter should
be set at such a level to never jeopardize the system and not risking that
the log tail meets the log head due to not completing LCPs quick enough.

The maximum checkpoint write speed is then based on whether we are in normal
operation, whether another node is currently restarting or whether we are
restarting our own node.

So by this thorough introduction to execution of LCPs in MySQL Cluster 7.4
it is possible to answer digitalpoint's question.

In normal operation when there is no high load on the node, there is no
overload of the IO subsystem, then we will write with LCPs at a constant
speed of MaxDiskWriteSpeed. The MinDiskWriteSpeed we will reach in a
scenario with overload either of the CPUs or the IO subsystem due to
many transactions being executed by the users of MySQL Cluster.

So MinDiskWriteSpeed should be set at the minimum level required to ensure
that we don't run out of REDO log space between. MaxDiskWriteSpeed should
be set to a level dependent on how fast restarts we want to have and this
is the speed for the most part so also one should take into account the
possibility of wearing out the disk when setting this parameter.

MaxDiskWriteSpeedOtherNodeRestart should be set quite high to enable fast
restarts. We should still not set it drastically high to avoid that we
run into unnecessary overload situations during the node restart.

MaxDiskWriteSpeedOwnRestart can be set to a level that can be handled by
the IO subsystem and CPU subsystem since it has very little other
activity to handle during a restart.

The adaptive algorithm makes it possible to fairly aggressive in setting
the last two config parameters since the adaptive algorithm will ensure
that we still maintain a healthy level of CPU usage for user transactions.

A final word is that it is possible to change those parameters in any restart.
Actually we even implemented an undocumented feature whereby you can change
these parameters in live nodes. The reason we haven't documented those is
that they are intended for persons that completely understand how their
system behaves and thus we consider that they are able to read the code of
Backup.cpp to understand what to do to change this parameters. The change
done through this interface won't survive a restart, for this one needs to
change the configuration.

We have also added a new NDBINFO table making it possible to follow how
the checkpoint write speed and other disk writes is occuring in your own
cluster. More on that in a later blog.

Wednesday, March 11, 2015

Methods of optimised module interaction in MySQL Cluster 7.4

One of the important optimisations we did in MySQL Cluster 7.4 was to
use more of direct function calls as part of SCAN processing instead of
relying on asynchronous messages.

We want each message execution to be short, up to around 10
microseconds of execution per message we should strive to
keep it below. However if we decrease the time spent executing
a message to down to a microsecond or smaller than we spend too
much time in the message scheduler. So it is important to strike a
balance here.

For primary key lookups we do most of the work in one message
execution, so here we already were doing the optimal behaviour
in our model of execution.

For scans however we often scan hundreds or even thousands and
in some cases even millions of rows, so here there is a lot of
opportunity to decide how to execute the scan.

Previously the scan model used a number of messages per row
scanned. This was quite obviously a bit too conservative. We have
now made optimisations such that we can scan up to 5 rows within
the execution of one message execution.

When converting an asynchronous message into a function call
there are more than one way to achieve this. There are ways that
are extremely fast, but these leave no extra opportunity of
tracing. So for each such call we have to make a decision.

The excerpt below is taken from the code in MySQL Cluster 7.4
and explains the different ways to convert asynchronous messages
into function calls.

When calling another block immediately there is a set of ways to
do this.

Standard, non-optimised manner:
Use EXECUTE_DIRECT with four parameters that specify block reference
of receiver, the signal object, the global signal number and the
length of the signal (There is also a variant used when sending such
a signal to a different instance). This method is fairly optimised
but has quite a lot of potential for performance improvement.

Standard, optimised manner:
In this case we optimise things by translating to block object and
retrieving the function pointer in the block call. This gives
the compiler assistance to separate the loads and stores more from
each other.
The call will be seen as:
block->EXECUTE_DIRECT(signal, f);

This manner optimises the code but retains the flexibility and also
the possibility to trace signal execution between blocks.

Non-standard, optimised manner:
In this case we remove some of the flexibility of the call to enhance
the performance yet a bit more. In this case we remove the possibility
for a flexible receiver of the signal, it is directed to a certain
block and we also call the method directly without any indirection.
The call will be seen as e.g.:

The standard manner of calling EXECUTE_DIRECT are both always calling
functions with one parameter being the signal object and no return
value. There is however two ways of sending signals back. In some
cases a signal is always sent back when returning from the
EXECUTE_DIRECT call, in this case the signal object returned contains
a signal object with data for the return signal. In some cases
one gets a return signal in this manner by a combination of the signal
number and the parameters. In the example below with NEXT_SCANREQ
we always gets a return signal when specifying ZSCAN_COMMIT
but not in other cases.

The other manner of sending a return signal is to perform a new
EXECUTE_DIRECT signal. In those cases one needs to ensure that the
call chain is bounded to not run out of stack. It is also a good
idea to try and ensure that the EXECUTE_DIRECT can use the
tail-call optimisation to avoid using too much stack which is bad
for CPU caching. This means returning immediately after
EXECUTE_DIRECT but also avoiding having objects that needs
destruction after return and also avoiding taking the reference of
stack variables.

Using the non-standard manner one can obviously also change more
ways, one can return a bool for example as in the example with
execTUPKEYREQ, one can add parameters and one can even change the
name to a different name not according to the standard naming
conventions. Obviously doing this removes flexibility of using
blocks in a flexible manner.

Tuesday, March 10, 2015

LCP Pausing module in MySQL Cluster 7.4

A new feature that assists in making node restart much faster is
the new PAUSE LCP protocol. This is an excerpt from the MySQL
Cluster 7.4 source code. There is also a fair amount of new
comments in the 7.4 source code which are only valid in the code

This module contains code that executes for the purpose of pausing
LCP reporting to our meta data for a short time while we are copying the
meta data to a new starting node.

In order to better understand the handling of the LCP protocol we will
describe the LCP protocol, this includes both the old and the new protocol.

The LCP protocol is controlled by the DIH in the master node.
When an LCP has been completed we will immediately start checking for
the need for a new LCP to be started.

The first step here is to ensure that we have had sufficient activity in
the cluster to necessitate an LCP to be executed again.

To check this we send TCGETOPSIZEREQ to all DBTCs in the cluster. This
will gather in an estimate of how much writes we've had in the cluster
since the last LCP was started. There are also various ways to ensure
that we start an LCP immediately if so needed.

If the activity was sufficient we will start the LCP. Before starting the LCP
we will calculate a number of GCI values that are important, oldest restorable
GCI and so forth. Next we will send TC_CLOPSIZEREQ to all DBTCs in
the cluster to clear the activity counter in DBTC as preparation for the next
LCP start.

In the old way we will then grab a mutex on the fragment info, this
mutex will be held until the LCP is completed. The mutex is held in
the master node, in a master takeover the mutex needs to be taken
also in the new master node. Since all LCPs goes through the master
node this has the same effect as a distributed mutex on the fragment

In the new way we will start the LCP immediately without grabbing
the mutex.

The first step in starting is to calculate the set of LQHs involved in
the LCP and the set of DIHs involved in the LCP. A node is involved in
the LCP in DIH if it has had the meta data copied to it. It will
participate in an LCP in LQH if the data has been restored and we're
ready to perform a full LCP.

Next we update to the new LCP id of the new LCP.

The next step is performed in the master node by walking through all
fragment replicas of all active tables to see how much of the REDO log
we can cut away when starting the new LCP. At the first order of a
LCP of a fragment in an LDM instance we will set the new log tail in
that LDM instance.

After calculating the new GCI values and setting the LCP id we will
synchronize this information with all other nodes in the cluster.
This information will also be synchronized to the file system in
the Sysfile. This file is where all restarts start by looking at
the state of the our database on files.
The COPY_GCIREQ signal is used to distribute this message.

When all nodes have synchronized this information to disk and confirmed
this to the master then we are ready to start sending orders to perform
the individual checkpoints of the fragment replicas.

The next step is that we want to set the tables to be involved in the
LCP. At this point we want to ensure that the same set of tables is
calculated in all nodes. To ensure this we grab the mutex that ensures
no tables are able to commit their CREATE TABLE statements until we are
done with this step.
This is started by the signal START_LCP_REQ. This signal also contains
list of nodes involved in the LCP both for LQH and DIH.

CREATE TABLE can create new tables prior to this point  which we will
include, and that's ok as they cannot possibly affect the new redo tail
position. DROP TABLE can drop tables prior to this point, which could
remove the need to maintain some old redo, but that will be handled in
the following LCP.

Each table to execute the LCP on is marked with a proper state in the
variable tabLcpStatus. Also each fragment replica to execute the LCP
on is marked with true in the lcpOngoingFlag and we set the number of
replicas to perform LCP on per fragment as well.

These preparatory steps are done in a synchronized manner, so all nodes
have received information about the COPY_GCIREQ and now all nodes have
heard the START_LCP_REQ signals. So in a master takeover we can ask all
nodes about their LCP state and we can derive if we sent the COPY_GCIREQ
to all nodes and similarly we can derive if we sent and completed the
START_LCP_REQ step. To derive this requires all nodes to have heard of
those signals, not just one of them since a crash can occur in the
middle of signal sending.

In a master takeover if we haven't completed the COPY_GCIREQ step then
we can start the next LCP from the beginning again. If COPY_GCIREQ has
been completed but not the START_LCP_REQ, then we can restart the
START_LCP_REQ step. Finally if the START_LCP_REQ has been completed
then we know that the execution of checkpoints on individual fragment
replicas is ongoing. Obviously in a master take over we should ensure
that the processing of START_LCP_REQ is completed before we report
back our state to the master node to ensure that we make the master
takeover handling as simple as possible.

So now that we know exactly which tables and fragment replicas to
checkpoint it is time to start the actual checkpoint phase.

The master node will send LCP_FRAG_ORD to DBLQH for each of the fragment
replicas to execute the LCP on.

In the old way there was a queue of such LCP_FRAG_ORD with limited size in
DBDIH (queue size was 2 in 7.3 and earlier and 128 in 7.4 versions).
Also DBLQH had a queue for LCP_FRAG_ORDs, in 7.3 this was 2 in size and
in early versions of 7.4 it was 64.

In the new version we can send LCP_FRAG_ORD to LQH as before, LQH has an
infinite queue size (it simply stores the LCP_FRAG_ORD on the fragment
record, so there is no limit to the queue size since all fragments can
be in the queue). In addition at master takeover we also support receiving
the same order two or more times. By ensuring that we keep track of that
we already received a LCP_FRAG_ORD on a fragment we can also easily discard
LCP_FRAG_ORDs that we already received.

These features mean that LQH can process a Local Checkpoint without much
interaction with DIH / DIH Master, which enables simplifications at DIH
and DIH Master in later versions. In principle we could send off all
LCP_FRAG_ORDs immediately if we like and more or less turn the LDM
instances into independent LCP execution engines. This is a step in the
direction of more local control in LQH over LCP execution.

When all LCP_FRAG_ORD have been sent, then a special LCP_FRAG_ORD
is sent to all participating LQH nodes. This signal has the flag lastFragmentFlag
set, it doesn't contain any fragment to checkpoint, it is only a flag that
indicates that we've sent the last LCP_FRAG_ORD.

LQH will execute orders to execute LCP on a fragment in the order they are
received. As a fragment is completing its LCP it will generate a new message
LCP_FRAG_REP. This message is broadcasted to all participating DIHs. First
the message is sent from DBLQH to the local DIH. Finally the local DIH will
broadcast it to all participating DIHs.

This new Pausing LCP module is involved here by being able to queue also
LCP_FRAG_REP before they are broadcast to the participating DIHs. They are
queued on the fragment replica records in the local DIH and thus we have
no limits on the queue size.

This allows the DIH Master state to be stabilised as necessary during an
LCP, removing the need in some cases to wait for an LCP to complete before
performing some other activity.

When LQH have executed all the LCP_FRAG_ORDs and have received the
last fragment flag, then the LDM will perform a number of activities to
complete the local checkpoint. These activities is mostly used by the
disk data tables.

After all these activities have completed the LQH will send
LCP_COMPLETE_REP to the local DIH. The local DIH will broadcast it to all
participating DIHs.

When all LQHs have sent all LCP_FRAG_REP and it has also sent the
LCP_COMPLETE_REP, then the LCP is completed. So a node that has seen
LCP_COMPLETE_REP from all nodes participating in the LCP knows that
it has received all the LCP_FRAG_REP for the LCP.

In a master takeover in the old way we could not resend the LCP_FRAG_ORD
to the LQH again. To avoid this we used an extra master takeover
protocol EMPTY_LCP_REQ. This protocol ensures that all LQHs have completed
the queues and that all LCP_FRAG_REPs have been sent to all participating
DIHs and likewise with the LCP_COMPLETE_REP such that the new master has
a precise view of which fragment replicas have completed the LCP execution
so far.

Thus when the master takeover is completed we know that each DIH has received
all the LCP_FRAG_REP for which an LCP_FRAG_ORD have been sent and also
all LCP_COMPLETE_REP that have been produced. This means that we are now
ready to restart the process of sending LCP_FRAG_ORD again.

The problem with this approach is that can consume a very long time to
execute the entire LCP fragment queue in LQH if the queue size increases
(increased from 2 to 64 going from 7.3 to 7.4) and the size of the
fragments also increase. So the master takeover can take a substantial
time in this case.

So the new manner is to allow for the LQH to get LCP_FRAG_ORD and also
the special last LCP_FRAG_ORD several times with the same LCP id and
discard those that it receives for a second time. In this manner we can
simply restart sending the LCP_FRAG_ORD from the beginning. When we are
done with this we can start checking for completion of the LCP in the
normal way.

When the master has sent the last special LCP_FRAG_ORD and these have been
received by the receiving nodes, then the master will actually itself not
do anything more to execute the LCP. The non-master nodes will however send
LCP_COMPLETE_REP to the master node. So this means that a new LCP won't
start until all participating DIHs have completed the processing of the
last LCP.

So effectively taking over as master in this phase doesn't really require
any specific work other than redirecting the LCP_COMPLETE_REP from the
non-masters to the new master. If it has already been sent it should be
seen in the response to the MASTER_LCPREQ from the node. So after
receiving the last MASTER_LCPCONF we have information enough about whether
we need to send more LCP_FRAG_ORDs or not.

We can still optimise the sending of LCP_FRAG_ORD a little bit by avoiding
to send LCP_FRAG_ORD to a fragment replica where we have already received
a LCP_FRAG_REP for it. It would be possible to avoid sending extra
LCP_FRAG_ORDs in various ways, but it doesn't really cost much, LCP_FRAG_ORD
is a small signal and the number of signals sent is limited to the number
of fragment replicas. So this would make sense if we have to support
extremely large clusters and extremely many tables in combination.

As this description shows some interesting places to test master failures
1) Master failure while clearing TC counters (TC_CLOPSIZEREQ).
2) Master failure while distributing COPY_GCIREQ.
3) Master failure while distributing START_LCP_REQ
4) Master failure while processing the LCP and sending LCP_FRAG_ORDs
4.1) Before any LCP_FRAG_REP received
4.2) After receiving many LCP_FRAG_REPs, but not all
4.3) After receiving all LCP_FRAG_REPs, but not all LCP_COMPLETE_REPs
4.4) After receiving all LCP_FRAG_REPs, and all LCP_COMPLETE_REPs.

While distributing above can be interpreted as one test case of before
distributing, one in the middle of distributing and one when all
responses have been received.

It is also important to similarly test PAUSE_LCP_REQ handling in all of
the above states. This can be handled by inserting an ERROR_INSERT that
effectively stops the process to copy meta data at some point and then
setting some variable that triggers the copying of meta data to continue
at a state that we wanted to accomplish.

Monday, March 09, 2015

Restart Phases in MySQL Cluster

Here is one more additional documentation effort in the MySQL Cluster 7.4
source code. This describes a fairly detailed view of what is performed in
the various restart phases of MySQL Cluster and in particular for node

In MySQL Cluster the restart is processed in phases, the restart of a node
is driven by a set of phases. In addition a node restart is also synchronised
with already started nodes and other nodes that are starting up in parallel
with our node. This comment will describe the various phases used.

Data node process startup

The first step in starting a node is to create the data node run-time
environment. The data node process is normally running with an angel process,
this angel process ensures that the data node is automatically restarted in
cases of failures. So the only reason to run the data node again is after an
OS crash or after a shutdown by an operator or as part of a software upgrade.

When starting up the data node, the data node needs a node id, this is either
assigned through setting the parameter --ndb-nodeid when starting the data
node, or it is assigned by the management server when retrieving the
configuration. The angel process will ensure that the assigned node id will be
the same for all restarts of the data node.

After forking the data node process, the starting process stays as the angel
process and the new process becomes the actual data node process. The actual
data node process starts by retrieving the configuration from the management

At this stage we have read the options, we have allocated a node id, we have
the configuration loaded from the management server. We will print some
important information to the data node log about our thread configuration and
some other things. To ensure that we find the correct files and create files
in the correct place we set the datadir of our data node process.

Next we have to start the watch-dog thread since we are now starting to do
activities where we want to ensure that we don't get stuck due to some
software error.

Next we will allocate the memory of the global memory pools, this is where
most memory is allocated, we still have a fair amount of memory allocated as
part of the initialisation of the various software modules in the NDB kernel,
but step by step we're moving towards usage of the global memory pools.

Allocating memory can be a fairly time-consuming process where the OS can
require up to one second for each GByte of memory allocated (naturally OS
dependent and will change over time). What actually consumes the time here is
actually that we also touch each page to ensure that the allocated memory is
also mapped to real physical memory to avoid page misses while we're running
the process. To speed up this process we have made the touching of memory

Actually where most memory is allocated is configurable, the configuration
variable LateAlloc can be used to delay the allocation of most memory to early
phases of the restart.

The only memory that is required to allocate in the early phase is the job
buffer, memory for sending messages over the network and finally memory for
messages to and from the file system threads. So allocation of e.g.
DataMemory, IndexMemory and DiskPageBufferMemory can be delayed until the
early start phases.

After allocating the global memory pool we initialise all data used by the
run-time environment. This ensures that we're ready to send and receive data
between the threads used in the data node process as soon as they are started.

At this point we've only started the watch-dog process and the thread started
as part of creating the process (this thread will later be converted to the
first receive thread if we're running ndbmtd and the only execution thread if
we are running ndbd). Next step is to load all software modules and initialise
those to ensure they're properly set-up when the messages start arriving for

Before we start the run-time environment we also need to activate the send
and receive services. This involves creating a socket client thread that
attempts to connect to socket server parts of other nodes in the cluster and
a thread to listen to the socket server used for those data nodes we
communicate with as the socket server.

The default behaviour is that the node with the lowest nodeid is the socket
server in the communication setup. This can be changed in the data node

Before we proceed and start the data node environment we will place the start
signals of the run-time environment in its proper job buffer. Actually to
start the system one needs to place two equal signals in the job buffer. The
first start signal starts the communication to other nodes and sets the state
to wait for the next signal to actually start the system. The second one will
start running the start phases. There is support to set the node in such a
state that it has received the first of these signals and then wait for the
NDB management server to send the second of those signals at command from a
user of the NDB management client.

Finally we start all the threads of the run-time environment. These can
currently include a main thread, a rep thread, a number of tc threads,
a number of send threads, a number of receive threads and a number of
ldm threads. Given that communication buffers for all threads have been
preallocated, we can start sending signals immediately as those threads
startup. The receiving thread will start to take care of its received signals
as soon as it has come to that point in its thread startup code.

There are two identical start signals, the first starts a recurring signal
that is sent on a regular basis to keep track of time in the data node.
Only the second one starts performing the various start phases.

Signal driven start phases

A startup of a data node is handled in a set of phases. The first phase is
to send the signal READ_CONFIG_REQ to all software modules in the kernel,
then STTOR is similarly sent to all software modules in 256 phases numbered
from 0 to 255. These are numbered from 0 to 255, we don't use all of those
phases, but the code is flexible such that any of those phases could be
used now or sometime in the future.

In addition we have 6 modules that are involved in one more set of start
phases. The signal sent in these phases are called NDB_STTOR. The original
idea was to view this message as the local start of the NDB subsystem.
These signals are sent and handled by NDBCNTR and sent as part of the STTOR
handling in NDBCNTR. This means that it becomes a sequential part of the
startup phases.

Before starting the phases we ensure that any management node can connect
to our node and that all other nodes are disconnected and that they can only
send messages to the QMGR module. The management server receives reports
about various events in the data node and the QMGR module is taking care of
the inclusion of the data node into the cluster. Before we're included in
the cluster we cannot communicate with other nodes in any manner.
(This inclusion protocol was described in an earlier blog).

The start always starts in the main thread where each software module is
represented by at least a proxy module that all multithreaded modules contain.
The proxy module makes it possible to easy send and receive messages to a
set of modules of the same type using one message and one reply.

The READ_CONFIG_REQ signals are always sent in the same order. It starts by
sending to CMVMI (originally was the interface to the data node internals,
stands for Cluster Manager Virtual Machine Interface), this is the block
that receives the start order and it performs a number of functions from
where the software modules can affect the run-time environment. It normally
allocates most memory of the process and touches all of this memory. It is
part of the main thread.

The next module receiving READ_CONFIG_REQ is NDBFS, this is the module that
controls the file system threads, this module is found in the main thread.

Next module is DBINFO, this module supports the ndbinfo database used to get
information about the data node internals in table format, this module is
found in the main thread.

Next is DBTUP, this is the module where the actual data is stored. Next DBACC,
the module where primary key and unique key hash indexes are stored and where
we control row locks from. Both those blocks are contained in the ldm threads.

Next is DBTC, the module where transaction coordination is managed from,
this module is part of the tc thread. Next is DBLQH, the module that controls
the actions on data through key operations and scans and also handles the
REDO logs. This is the main module of the ldm thread.

Next is DBTUX that operates ordered index reusing pages used to store rows
in DBTUP, also part of the ldm thread. Next is DBDICT, the dictionary module
used to store and handle all metadata information about tables and columns,
tablespaces, log files and so forth. DICT is part of the main thread.

Next is DBDIH, the module to store and handle distribution information about
all tables, the table partitions and all replicas of each partition. It
controls the local checkpoint process, the global checkpoint process and
controls a major part of the restart processing. The DIH module is a part of
the main thread.

Next is NDBCNTR that controls the restart phases, it's part of the main
thread. Next is QMGR which takes care of the heartbeat protocol and inclusion
and exclusion of nodes in the cluster. It's part of the main thread.

Next is TRIX that performs a few services related to ordered indexes and other
trigger-based services. It's part of the tc thread. Next is BACKUP, this is
used for backups and local checkpoints and is part of the ldm thread.

Next is DBUTIL that provides a number of services such as performing key
operations on behalf of code in the modules. It's part of the main thread.
Next is the SUMA module that takes care of replication events, this is the
module handled by the rep thread.

Next is TSMAN, then LGMAN, and then PGMAN that are all part of the disk data
handling taking care of tablespace, UNDO logging and page management. They
are all part of the ldm thread.

RESTORE is a module used to restore local checkpoints as part of a startup.
This module is also part of the ldm thread.

Finally we have the DBSPJ module that takes care of join queries pushed down
to the data node, it executes as part of the tc thread.

PGMAN, RESTORE are all tightly integrated modules that takes care of
the data and indexes locally in each node. This set of modules form an
LDM instance, each node can have multiple LDM instances and these
can be spread over a set of threads. Each LDM instance owns its own
partition of the data and has its own set of REDO log files.

We also have two modules that are not a part of restart handling, this is the
TRPMAN module that performs a number of transport-related functions
(communication with other nodes). It executes in the receive threads. Finally
we have the THRMAN that executes in every thread and does some thread
management functionality.

All modules receive READ_CONFIG_REQ, all modules also receive STTOR for
phase 0 and phase 1. In phase 1 they report back which startphases they want
to get informed about more.

During the READ_CONFIG_REQ the threads can execute for a very long time in
a module since we can be allocating and touching memory of large sizes. This
means that our watchdog thread have a special timeout for this phase to
ensure that we don't crash the process simply due to a long time of
initialising our memory. In normal operations each signal should execute only
for a small number of microseconds.

The start phases are synchronized by sending the message STTOR to all modules,
logically each module gets this signal for each start phase from 0 to 255.
However the response message STTORRY contains the list of start phases the
module really is interested in.

The NDBCNTR module that handles the start phase signals can optimise away
any signals not needed. The order in which modules receive the STTOR message
is the same for all phases:


In addition there is a special start phase handling controlled by NDBCNTR,
so when NDBCNTR receives its own STTOR message it starts a local start phase
handling involving the modules, DBLQH, DBDICT, DBTUP, DBACC,

This happens for phases 2 through 8. The messages sent in these start phases
are NDB_STTOR and NDB_STTORRY, they are handled in a similar manner
to STTOR and STTORRY. The modules receive also those start phases in the
same order for all phases and this order is:


For those modules that are multithreaded, the STTOR and NDB_STTOR messages
always are received by the Proxy module that executes in the main thread.
The Proxy module will then send the STTOR and NDB_STTOR messages to each
individual instance of the module (the number of instances is normally the
same as the number of threads, but could sometimes be different). It does
so in parallel, so all instances execute STTOR in parallel.

So effectively each instance of a module will logically first receive
READ_CONFIG_REQ, then a set of STTOR messages for each start phase and some
modules will also receive NDB_STTOR in a certain order. All these messages
are sent in a specific order and sequentially. So this means that we have the
ability to control when things are done by performing it in the correct start

Next we will describe step-by-step what happens in a node restart (or a node
start as part of a cluster start/restart). The startup is currently a
sequential process except where it is stated that it happens in parallel.
The below description thus describes the order things actually happens


The READ_CONFIG_REQ does more or less the same for all software modules. It
allocates the memory required by the software module and initialises the
memory (creates various free lists and so forth). It also reads the various
configuration parameter which is of interest to the module (these often
affect the size of the memory we allocate).

It starts in CMVMI that allocates most of the global memory pool, next we
have NDBFS that creates the necessary file directories for disk data, it
also creates the bound IO threads that can be used by one file at a time
(initial number of threads configurable through InitalNoOpenFiles), then it
creates a number of free threads (number of them configurable through
IOThreadPool) used by disk data files (all files used to handle disk data),
each such thread can be used to open/read/write/close a disk data file.
Finally NDBFS also creates the communication channel from the file system
threads back to the other threads.

All other modules follow the same standard, they calculate a number of sizes
based on hard coded defines or through configuration variables, they allocate
memory for those variables, finally they initialise those allocated memory

STTOR Phase 0

First STTOR phase executed is STTOR phase 0. The only modules doing anything
in this phase is NDBCNTR that clears the file system if the start is an initial
start and CMVMI that creates the file system directory.

STTOR Phase 1

Next phase executed is STTOR phase 1, in this phase most modules initialise
some more data, references to neighbour modules are setup if necessary. In
addition DBDIH create some special mutexes that ensures that only one process
is involved in certain parts of the code at a time.

NDBCNTR initialises some data related to running NDB_STTOR starting in
phase 2. CMVMI locks memory if configured to do so, after this it installs the
normal watchdog timeout since now all large memory allocations are performed.
CMVMI also starts regular memory reporting.

QMGR is the most active module in this phase. It initialises some data, it
gets the restart type (initial start or normal start) from DBDIH, it opens
communication to all nodes in the cluster, it starts checking for node
failures of the include node handling. Finally it runs the protocol to
include the new node into the heartbeat protocol. This could take a while
since the node inclusion process can only bring in one node at a time and
the protocol contains some delays.

The BACKUP module then starts the disk speed check loop which will run as
long as the node is up and running.

STTOR Phase 2

Next step is to execute STTOR phase 2. The only module that does anything in
STTOR phase 2 is NDBCNTR, it asks DIH for the restart type, it reads the node
from the configuration, it initialises the partial timeout variables that
controls for how long to wait before we perform a partial start.

NDBCNTR sends the signal CNTR_START_REQ to the NDBCNTR in the current
master node, this signal enables the master node to delay the start of this node if
necessary due to other starting nodes or some other condition. For cluster
starts/restarts it also gives the master node the chance to ensure we wait
for enough nodes to start up before we start the nodes.

The master only accepts one node at a time that has received CNTR_START_CONF,
the next node can only receive CNTR_START_CONF after the previous starting
node have completed copying the metadata and releasing the metadata locks and
locks on DIH info, that happens below in STTOR phase 5.

So in a rolling restart it is quite common that the first node will get
CNTR_START_CONF and then instead get blocked on the DICT lock waiting for
an LCP to complete. The other nodes starting up in parallel will instead
wait on CNTR_START_CONF since only one node at a time can pass this.
In 7.4 a new PAUSE LCP protocol has been implemented that will minimise this
wait for a LCP to complete, this will described in more detail in a coming

After receiving CNTR_START_CONF, NDBCNTR continues by running NDB_STTOR
phase 1. Here DBLQH initialises the node records, it starts a reporting
service. It does also initialise the data about the REDO log, this also
includes initialising the REDO log on disk for all types of initial start
(can be quite time consuming unless logs are configured to be initialised
in a sparse mode).

DBDICT initialises the schema file (contains the tables that have been created
in the cluster and other metadata objects). DBTUP initialises a default value
fragment and DBTC and DBDIH initialises some data variables. After completing
the NDB_STTOR phase in NDBCNTR there is no more work done in STTOR phase 2.

STTOR Phase 3

Next step is to run the STTOR phase 3. Most modules that need the list of
nodes in the cluster reads this in this phase. DBDIH reads the nodes in this
phase, DBDICT sets the restart type. Next NDBCNTR receives this phase and
starts NDB_STTOR phase 2. In this phase DBLQH sets up connections from its
operation records to the operation records in DBACC and DBTUP. This is done
in parallel for all DBLQH module instances.

DBDIH now prepares the node restart process by locking the meta data. This
means that we will wait until any ongoing meta data operation is completed
and when it is completed we will lock the meta data such that no meta data
changes can be done until we're done with the phase where we are copying the
metadata informatiom.

The reason for locking is that all meta data and distribution info is fully
replicated. So we need to lock this information while we are copying the data
from the master node to the starting node. While we retain this lock we cannot
change meta data through meta data transactions. Before copying the meta data
later we also need to ensure no local checkpoint is running since this also
updates the distribution information. In 7.4 it is sufficient to instead
use the PAUSE LCP protocol which contains a lot less waiting.

After locking this we need to request permission to start the node from the
master node. The request for permission to start the node is handled by the
starting node sending START_PERMREQ to the master node. This could receive a
negative reply if another node is already processing a node restart, it could
fail if an initial start is required. If another node is already starting we
will wait 3 second and try again. This is executed in DBDIH as part of
NDB_STTOR phase 2.

After completing the NDB_STTOR phase 2 the STTOR phase 3 continues by the
CMVMI module activating the checks of send packed data which is used by scan
and key operations to decrease signalling overhead.

Next the BACKUP module reads the configured nodes. Next the SUMA module sets
the reference to the Page Pool such that it can reuse pages from this global
memory pool, next DBTUX sets the restart type. Finally PGMAN starts a stats
loop and a cleanup loop that will run as long as the node is up and running.

We could crash the node if our node is still involved in some processes
ongoing in the master node. This is fairly normal and will simply trigger a
crash followed by a normal new start up by the angel process. The request
for permission is handled by the master sending the information to all nodes.

For initial starts the request for permission can be quite time consuming
since we have to invalidate all local checkpoints from all tables in the
meta data on all nodes. There is no parallelisation of this invalidation
process currently, so it will invalidate one table at a time. If the node
fails during this process the next time it tries to start it will be
blocked by START_PERMREQ until this invalidation process is completed.

STTOR Phase 4

After completing STTOR phase 3 we move onto STTOR phase 4. This phase starts
by DBLQH acquiring a backup record in the BACKUP module that will be used
for local checkpoint processing.

Next NDBCNTR starts NDB_STTOR phase 3. This starts also in DBLQH where we
read the configured nodes. Then we start reading the REDO log to get it
set-up (we will set this up in the background, it will be synchronised by
another part of cluster restart/node restart later described), for all types
of initial starts we will wait until the initialisation of the REDO log have
been completed until reporting this phase as completed.

Next DBDICT will read the configured nodes whereafter also DBTC reads the
configured nodes and starts transaction counters reporting. Next in
NDB_STTOR phase 3 is that DBDIH initialises restart data for initial starts.

Before completing its work in STTOR phase 4, NDBCNTR will set-up a waiting
point such that all starting nodes have reached this point before
proceeding. This is only done for cluster starts/restarts, so not for node

The master node controls this waitpoint and will send the signal
NDB_STARTREQ to DBDIH when all nodes of the cluster restart have reached
this point. More on this signal later.

The final thing happening in STTOR phase 4 is that DBSPJ reads the configured

STTOR Phase 5

We now move onto STTOR phase 5. The first thing done here is to run NDB_STTOR
phase 4. Only DBDIH does some work here and it only does something in node
restarts. In this case it asks the current master node to start it up by
sending the START_MEREQ signal to it.

START_MEREQ works by copying distribution information from master DBDIH node
and then also meta data information from master DBDICT. It copies one table
of distribution information at a time which makes the process a bit slow
since it includes writing the table to disk in the starting node.

The only manner to trace this event is when writing the table distribution
information per table in DBDIH in the starting node. We can trace the
reception of DICTSTARTREQ that is received in the starting nodes DBDICT.

When DBDIH and DBDICT information is copied then we need to block the global
checkpoint in order to include the new node in all changes of meta data and
distribution information from now on. This is performed by sending
INCL_NODEREQ to all nodes. After this we can release the meta data lock that
was set by DBDIH already in STTOR phase 2.

After completing NDB_STTOR phase 4, NDBCNTR synchronises the start again in
the following manner:

If initial cluster start and master then create system tables
If cluster start/restart then wait for all nodes to reach this point.
After waiting for nodes in a cluster start/restart then run NDB_STTOR
phase 5 in master node (only sent to DBDIH).
If node restart then run NDB_STTOR phase 5 (only sent to DBDIH).

NDB_STTOR phase 5 in DBDIH is waiting for completion of a local checkpoint
if it is a master and we are running a cluster start/restart. For node
restarts we send the signal START_COPYREQ to the starting node to ask for
copying of data to our node.


We start with explaining a number of terms used

LCP: Local checkpoint, in NDB this means that all data in main memory is
written to disk and we also write changed disk pages to disk to ensure
that all changes before a certain point is available on disk.

Execute REDO log: This means that we're reading the REDO log one REDO log
record at a time and executing the action if needed that is found in the
REDO log record.

Apply the REDO log: Synonym of execute the REDO log.
Prepare REDO log record: This is a REDO log record that contains the
information about a change in the database (insert/delete/update/write).

COMMIT REDO log record: This is a REDO log record that specifies that a
Prepare REDO log record is to be actually executed. The COMMIT REDO log
record contains a back reference to the Prepare REDO log record.

ABORT REDO log record: Similarly to the COMMIT REDO log record but here
the transaction was aborted so there is no need to apply the REDO log

Database: Means in this context all the data residing in the cluster or
in the node when there is a node restart.

Off-line Database: Means that our database in our node is not on-line
and thus cannot be used for reading. This is the state of the database
after restoring a LCP, but before applying the REDO log.

Off-line Consistent database: This is a database state which is not
up-to-date with the most recent changes, but it represents an old state
in the database that existed previously. This state is achieved after
restoring an LCP and executing the REDO log.

On-line Database: This is a database state which is up-to-date, any node
that can be used to read data has its database on-line (actually
fragments are brought on-line one by one).

On-line Recoverable Database: This is an on-line database that is also
recoverable. In a node restart we reach the state on-line database first,
but we need to run an LCP before the database can also be recovered to
its current state. A recoverable database is also durable so this means
that we're adding the D in ACID to the database when we reach this state.

Node: There are API nodes, data nodes and management server nodes. A data
node is a ndbd/ndbmtd process that runs all the database logic and
contains the database data. The management server node is a process that
runs ndb_mgmd that contains the cluster configuration and also performs
a number of management services. API nodes are part of application processes
and within mysqld's. There can be more than one API node per application
process. Each API node is connected through a socket (or other
communication media) to each of the data nodes and management server nodes.
When one refers to nodes in this text it's mostly implied that we're
talking about a data node.

Node Group: A set of data nodes that all contain the same data. The number
of nodes in a node group is equal to the number of replicas we use in the

Fragment: A part of a table that is fully stored on one node group.

Partition: Synonym of fragment.

Fragment replica: This is one fragment in one node. There can be up
to 4 replicas of a fragment (so thus a node group can have up to
4 nodes in it).

Distribution information: This is information about the partitions
(synonym of fragments) of the tables and on which nodes they reside
and information about LCPs that have been executed on each fragment

Metadata: This is the information about tables, indexes, triggers,
foreign keys, hash maps, files, log file groups, table spaces.

Dictionary information: Synonym to metadata.

LDM: Stands for Local Data Manager, these are the blocks that execute
the code that handles the data handled within one data node. It contains
blocks that handles the tuple storage, the primary key hash index, the
T-tree index, the page buffer manager, the tablespace manager, a block
that writes LCPs and a block that restores LCPs, a log manager for
disk data.


What happens as part START_COPYREQ is what is the real database restore
process. Here most of the important database recovery algorithms are
executed to bring the database online again. The earlier phases were still
needed to restore the metadata and setup communication, setup memory and
bringing in the starting node as a full citizen in the cluster of data

START_COPYREQ goes through all distribution information and sends
START_FRAGREQ to the owning DBLQH module instance for each fragment replica
to be restored on the node. DBLQH will start immediately to restore those
fragment replicas, it will queue the fragment replicas and restore one at a
time. This happens in two phases, first all fragment replicas that requires
restore of a local checkpoint starts to do that.

After all fragment replicas to restore have been sent and we have restored all
fragments from a local checkpoint stored on our disk (or sometime by getting
the entire fragment from an alive node) then it is time to run the disk data
UNDO log. Finally after running this UNDO log we're ready to get the fragment
replicas restored to latest disk-durable state by applying the REDO log.

DBDIH will send all required information for all fragment replicas to DBLQH
whereafter it sends START_RECREQ to DBLQH to indicate all fragment replica
information have been sent now.

START_RECREQ is sent through the DBLQH proxy module and this part is
parallelised such that all LDM instances are performing the below parts in

If we're doing a initial node restart we don't need to restore any local
checkpoints since initial node restart means that we start without a file
system. So this means that we have to restore all data from other nodes in
the node group. In this case we start applying the copying of fragment
replicas immediately in DBLQH when we receive START_FRAGREQ. In this case
we don't need to run any Undo or Redo log since there is no local checkpoint
to restore the fragment.

When this is completed and DBDIH has reported that all fragment replicas to
start have been sent by sending START_RECREQ to DBLQH we will send
START_RECREQ to TSMAN whereafter we are done with the restore of the data.

We will specify all fragment replicas to restore as part of REDO log
execution. This is done through the signal EXEC_FRAGREQ. When all such signals
have been sent we send EXEC_SRREQ to indicate we have prepared for the next
phase of REDO log execution in DBLQH.

When all such signals are sent we have completed what is termed as phase 2
of DBLQH, the phase 1 in DBLQH is what started in NDB_STTOR phase 3 to prepare
the REDO log for reading it. So when both those phases are complete we're ready
to start what is termed phase 3 in DBLQH.

These DBLQH phases are not related to the start phases, these are internal
stages of startup in the DBLQH module.

Phase 3 in DBLQH is the reading of the REDO log and applying it on fragment
replicas restored from the local checkpoint. This is required to create a
database state which is synchronised on a specific global checkpoint. So we
first install a local checkpoint for all fragments, next we apply the REDO
log to synchronise the fragment replica with a certain global checkpoint.

Before executing the REDO log we need to calculate the start GCI and the last
GCI to apply in the REDO log by checking the limits on all fragment replicas
we will restore to the desired global checkpoint.

DBDIH has stored information about each local checkpoint of a fragment
replica which global checkpoint ranges that are required to run from the REDO
log in order to bring it to the state of a certain global checkpoint. This
information was sent in the START_FRAGREQ signal. DBLQH will merge all of
those limits per fragment replica to a global range of global checkpoints to
run for this LDM instance. So each fragment replica has its own GCP id range
to execute and this means that the minimum of all those start ranges and
maximum of all the end ranges is the global range of GCP ids that we need
to execute in the REDO log to bring the cluster on-line again.

The next step is to calculate the start and stop megabyte in the REDO log for
each log part by using the start and stop global checkpoint id. All the
information required to calculate this is already in memory, so it's a pure

When we execute the REDO log we actually only apply the COMMIT records in the
correct global checkpoint range. The COMMIT record and the actual change
records are in different places in the REDO log, so for each Megabyte of
REDO log we record how far back in the REDO log we have to go to find the
change records.

While running the REDO log we maintain a fairly large cache of the REDO log
to avoid that we have to do disk reads in those cases where the transaction
ran for a long time.

This means that long-running and large transactions can have a negative effect
on restart times.

After all log parts have completed this calculation we're now ready to start
executing the REDO log. After executing the REDO log to completion we also
write some stuff into the REDO log to indicate that any information beyond
what we used here won't be used at any later time.

We now need to wait for all other log parts to also complete execution of
their parts of the REDO log. The REDO log execution is designed such that we
can execute the REDO log in more than one phase, this is intended for cases
where we can rebuild a node from more than one live node. Currently this code
should never be used.

So the next step is to check for the new head and tail of the REDO log parts.
This is done through the same code that uses start and stop global
checkpoints to calculate this number. This phase of the code also prepares
the REDO log parts for writing new REDO log records by ensuring that the
proper REDO log files are open. It also involves some rather tricky code to
ensure that pages that have been made dirty are properly handled.


After completing restoring fragment replicas to a consistent global
checkpoint, we will now start rebuilding the ordered indexes based on the
data restored. After rebuilding the ordered indexes we are ready to send
START_RECCONF to the starting DBDIH. START_RECCONF is sent through the
DBLQH proxy, so it won't be passed onto DBDIH until all DBLQH instances
have completed this phase and responded with START_RECCONF.

At this point in the DBLQH instances we have restored a consistent but old
variant of all data in the node. There are still no ordered indexes and there
is still much work remaining to get the node synchronised with the other nodes
again. For cluster restarts it might be that the node is fully ready to go,
it's however likely that some nodes still requires being synchronised with
nodes that have restored a more recent global checkpoint.

The DBDIH of the starting node will then start the take over process now
that the starting node has consistent fragment replicas. We will prepare the
starting node's DBLQH for the copying phase by sending PREPARE_COPY_FRAG_REQ
for each fragment replica we will copy over. This is a sequential process that
could be parallelised a bit.

The process to take over a fragment replica is quite involved. It starts by
sending PREPARE_COPY_FRAGREQ/CONF to the starting DBLQH, then we send
UPDATE_TOREQ/CONF to the master DBDIH to ensure we lock the fragment
information before the take over starts. After receiving confirmation of this
fragment lock, the starting node send UPDATE_FRAG_STATEREQ/CONF to all nodes to
include the new node into all operations on the fragment.

After completing this we again send UPDATE_TOREQ/CONF to the master node to
inform of the new status and unlock the lock on the fragment information. Then
we're ready to perform the actual copying of the fragment. This is done by
sending COPY_FRAGREQ/CONF to the node that will copy the data. When this
copying is done we send COPY_ACTIVEREQ/CONF to the starting node to activate
the fragment replica.

Next we again send UPDATE_TOREQ/CONF to the master informing about that we're
about to install the commit the take over of the new fragment replica. Next we
commit the new fragment replica by sending UPDATE_FRAG_STATEREQ/CONF to all
nodes informing them about completion of the copying of the fragment replica.
Finally we send another update to the master node with UPDATE_TOREQ/CONF.
Now we're finally complete with copying of this fragment.

The idea with this scheme is that the first UPDATE_FRAG_STATEREQ ensures that
we're a part of all transactions on the fragment. After doing the COPY_FRAGREQ
that synchronises the starting node's fragment replica with the alive node's
fragment replica on a row by row basis, we're sure that the two fragment
replicas are entirely synchronised and we can do a new UPDATE_FRAG_STATEREQ to
ensure all nodes know that we're done with the synchronisation.

In 7.4 this process is parallelised such that each LDM thread can work on
synchronizing one fragment.


At this point we have restored an online variant of the database by
bringing one fragment at a time online. The database is still not
recoverable since we haven't enabled logging yet and there is no local
checkpoint of the data in the starting node.

Next step is to enable REDO logging on all fragments, after completing this step
we will send END_TOREQ to the master DBDIH. At this point we will wait until a
local checkpoint is completed where this node have been involved. Finally when
the local checkpoint have been completed we will send END_TOCONF to the
starting node and then we will send START_COPYCONF and that will complete
this phase of the restart.


At this point we have managed to restored all data and we have brought it
online and now we have also executed a local checkpoint afer enabling
logging and so now data in the starting node is also recoverable. So this
means that the database is now fully online again.

After completing NDB_STTOR phase 5 then all nodes that have been synchronised
in a waitpoint here are started again and NDBCNTR continues by running
phase 6 of NDB_STTOR.

In this phase DBLQH, DBDICT and DBTC sets some status variables indicating
that now the start has completed (it's not fully completed yet, but all
services required for those modules to operate are completed. DBDIH also
starts global checkpoint protocol for cluster start/restarts where it has
become the master node.

Yet one more waiting point for all nodes is now done in the case of a cluster

The final step in STTOR phase 5 is SUMA that reads the configured nodes,
gets the node group members and if there is node restart it asks another
node to recreate subscriptions for it.

STTOR Phase 6

We now move onto STTOR phase 6. In this phase NDBCNTR gets the node group of
the node, DBUTIL gets the systable id, prepares a set of operations for later
use and connects to TC to enable it to run key operations on behalf of other
modules later on.

STTOR Phase 7

Next we move onto STTOR phase 7. DBDICT now starts the index statistics loop
that will run as long as the node lives.

QMGR will start arbitration handling to handle a case where we are at risk of
network partitioning.

BACKUP will update the disk checkpoint speed (there is one config variable
for speed during restarts and one for normal operation, here we install the
normal operation speed). If initial start BACKUP will also create a backup
sequence through DBUTIL.

SUMA will create a sequence if it's running in a master node and it's an
initial start. SUMA will also always calculate which buckets it is
responsible to handle. Finally DBTUX will start monitoring of ordered indexes.

STTOR Phase 8

We then move onto STTOR phase 8. First thing here is to run phase 7 of
NDB_STTOR in which DBDICT enables foreign keys. Next NDBCNTR will also wait
for all nodes to come here if we're doing a cluster start/restart.

Next CMVMI will set state to STARTED and QMGR will enable communication to
all API nodes.

STTOR Phase 101

After this phase the only remaining phase is STTOR phase 101 in which SUMA
takes over responsibility of the buckets it is responsible for in the
asynchronous replication handling.

Major potential consumers of time so far:

All steps in the memory allocation (all steps of the READ_CONFIG_REQ).
CMVMI STTOR phase 1 that could lock memory. QMGR phase 1 that runs the
node inclusion protocol.

NDBCNTR STTOR phase 2 that waits for CNTR_START_REQ, DBLQH REDO log
initialisation for initial start types that happens in STTOR phase 2.
Given that only one node can be in this phase at a time, this can be
stalled by a local checkpoint wait of another node starting. So this
wait can be fairly long.

DBLQH sets up connections to DBACC and DBTUP, this is NDB_STTOR phase 2.
DBDIH in NDB_STTOR phase 2 also can wait for the meta data to be locked
and it can wait for response to START_PERMREQ.

For initial starts waiting for DBLQH to complete NDB_STTOR phase 3 where
it initialises set-up of the REDO logs. NDBCNTR for cluster start/restarts
in STTOR phase 4 after completing NDB_STTOR phase 3 have to wait for all
nodes to reach this point and then it has to wait for NDB_STARTREQ to

For node restarts we have delays in waiting for response to START_MEREQ
signal and START_COPYREQ, this is actually where most of the real work of
the restart is done. SUMA STTOR phase 5 where subscriptions are recreated
is another potential time consumer.

All waitpoints are obvious potential consumers of time. Those are mainly
located in NDBCNTR (waitpoint 5.2, 5,1 and 6).

Historical anecdotes

1) The NDB kernel run-time environment was originally designed for an
AXE virtual machine. In AXE the starts were using the module MISSRA to
drive the STTOR/STTORRY signals for the various startup phases.
The MISSRA was later merged into NDBCNTR and is a submodule of NDBCNTR
nowadays. The name of STTOR and STTORRY has some basis in the AXE systems
way of naming signals in early days but has been forgotten now. At least
the ST had something to do wih Start/Restart.

2) The reason for introducing the NDB_STTOR was since we envisioned a system
where the NDB kernel was just one subsystem within the run-time environment.
So therefore we introduced separate start phases for the NDB subsystem.
Over time the need for such a subsystem startup phases are no longer there,
but the software is already engineered for this and thus it's been kept in
this manner.

3) Also the responsibility for the distributed parts of the database start
is divided. QMGR is responsible for discovering when nodes are up and down.
NDBCNTR maintains the protocols for failure handling and other changes of the
node configuration. Finally DBDIH is responsible for the distributed start of
the database parts. It interacts a lot with DBLQH that have the local
responsibility of starting one nodes database part as directed by DBDIH.

Local checkpoint processing in MySQL Cluster

This comment attempts to describe the processing of checkpoints as it happens
in MySQL Cluster. It also clarifies where potential bottlenecks are. This
comment is mainly intended as internal documentation of the open source code
of MySQL Cluster.

The reason for local checkpoints in MySQL Cluster is to ensure that we have
copy of data on disk which can be used to run the REDO log against to restore
the data in MySQL Cluster after a crash.

We start by introducing different restart variants in MySQL Cluster. The first
variant is a normal node restart, this means that the node have been missing
for a short time, but is now back on line again. We start by installing a
checkpointed version of all tables (including executing proper parts of the
REDO log against it). Next step is to use the replica which are still online
to make the checkpointed version up to date. Replicas are always organised in
node groups, the most common size of a node group is two nodes. So when a
node starts up, it uses the other node in the same node group to get an
online version of the tables back online. In a normal node restart we have
first restored a somewhat old version of all tables before using the other
node to synchronize it. This means that we only need to ship the latest
version of the rows that have been updated since the node failed before the
node restart. We also have the case of initial node restarts where all data
have to be restored from the other node since the checkpoint in the starting
node is either too old to be reused or it's not there at all when a completely
new node is started up.

The third variant of restart is a so called system restart, this means that
the entire cluster is starting up after a cluster crash or after a controlled
stop of the cluster. In this restart type we first restore a checkpoint on all
nodes before running the REDO log to get the system in a consistent and
up-to-date state. If any node was restored to an older global checkpoint than
the one to restart from, then it is necessary to use the same code used in
node restarts to bring those node to an online state.

The system restart will restore a so called global checkpoint. A set of
transactions are grouped together into a global checkpoint, when this global
checkpoint has been completed the transactions belonging to it are safe and
will survive a cluster crash. We run global checkpoints on a second level,
local checkpoints write the entire data set to disk and is a longer process
taking at least minutes.

Before a starting node can be declared as fully restored it has to participate
in a local checkpoint. The crashing node misses a set of REDO log record
needed to restore the cluster, thus the node isn't fully restored until it can
be used to restore all data it owns in a system restart.

So when performing a rolling node restart where all nodes in the cluster are
restarted (e.g. to upgrade the software in MySQL Cluster), it makes sense to
restart a set of nodes at a time since we can only have one set of nodes
restarted at a time.

This was a bit of prerequisite to understand the need for local checkpoints.
We now move to the description of how a local checkpoint is processed.

The local checkpoint is a distributed process. It is controlled by a
software module called DBDIH (or DIH for short, DIstribution Handler).
DIH contains all the information about where various replicas of each fragment
(synonym with partition) are placed and various data on these replicas.
DIH stores distribution information in one file per table. This file is
actually two files, this is to ensure that we can do careful writing of the
file. We first write file 0, when this is completed, we write file 1,
in this manner we can easily handle any crashes while writing the table

When a local checkpoint have been completed, DIH immediately starts the
process to start the next checkpoint. At least one global checkpoint have
to be completed since starting the local checkpoint before we will start a
new local checkpoint.

The first step in the next local checkpoint is to check if we're ready to
run it yet. This is performed by sending the message TCGETOPSIZEREQ to all
TC's in the cluster. This will report back the amount of REDO log information
generated by checking the information received in TC for all write
transactions. The message will be sent by the master DIH. The role of the
master is assigned to the oldest surviving data node, this makes it easy to
select a new master whenever a data node currently acting as master dies.
All nodes agree on the order of nodes entering the cluster, so the age of
a node is consistent in all nodes in the cluster.

When all messages have returned the REDO log write size to the master
DIH we will compare it to the config variable TimeBetweenLocalCheckpoints
(this variable is set in logarithm of size, so e.g. 25 means we wait
2^25 words of REDO log has been created in the cluster which is 128 MByte
of REDO log info).

When sufficient amount of REDO log is generated, then we start the next local
checkpoint, the first step is to clear all TC counters, this is done by
sending TC_CLOPSIZEREQ to all TC's in the cluster.

The next step is to calculate the keep GCI (this is the oldest global
checkpoint id that needs to be retained in the REDO log). This number is very
important since it's the point where we can move the tail of the REDO log
forward. If we run out of REDO log space we will not be able to run any
writing transactions until we have started the next local checkpoint and
thereby moved the REDO log tail forward.

We calculate this number by checking each fragment what GCI it needs to be
restored. We currently keep two old local checkpoints still valid, so we
won't move the GCI back to invalidate the two oldest local checkpoints per
fragment. The GCI that will be restorable after completing this calculation
is the minimum GCI found on all fragments when looping over them.

Next we write this number and the new local checkpoint id and some other
information in the Sysfile of all nodes in the cluster. This Sysfile is the
first thing we look at when starting a restore of the cluster in a system
restart, so it's important to have this type of information correct in this

When this is done we will calculate which nodes that will participate in the
local checkpoint (nodes currently performing the early parts of a restart is
not part of the local checkpoint and obviously also not dead nodes).

We send the information about the starting local checkpoint to all other DIH's
in the system. We must keep all other DIH's up-to-date all the time to ensure
it is easy to continue the local checkpoint also when the master DIH crashes
or is stopped in the middle of the local checkpoint process. Each DIH records
the set of nodes participating in the local checkpoint. They also set a flag
on each replica record indicating a local checkpoint is ongoing, on each
fragment record we also set the number of replicas that are part of this local

Now we have completed the preparations for the local checkpoint, it is now
time to start doing the actual checkpoint writing of the actual data. The
master DIH controls this process by sending off a LCP_FRAG_ORD for each
fragment replica that should be checkpointed. DIH can currently have 64 such
LCP_FRAG_ORD outstanding per node and 64 fragment replicas queued. Each LDM
thread can process writing of one fragment replica at a time and it can
have one request for the next fragment replica queued. It's fairly
straightforward to extend this number such that more fragment replicas can
be written in parallel and more can be queued.

LCP_FRAG_REP is sent to all DIH's when the local checkpoint for a fragment
replica is completed. When a DIH discovers that all fragment replicas of a
table have completed the local checkpoint, then it's time to write the table
description to the file system. This will record the interesting local
checkpoint information for all of the fragment replicas. There are two things
that can cause this to wait. First writing and reading of the entire table
description is something that can only happen one at a time, this mainly
happens when there is some node failure handling ongoing while the local
checkpoint is being processed.

The second thing that can block the writing of a table description is that
currently a maximum of 4 table descriptions can be written in parallel. This
could easily become a bottleneck since each write a file can take in the order
of fifty milliseconds. So this means we can currently only write about 80 such
tables per second. In a system with many tables and little data this could
become a bottleneck. It should however not be a difficult bottleneck.

When the master DIH has sent all requests to checkpoint all fragment replicas
it will send a special LCP_FRAG_ORD to all nodes indicating that no more
fragment replicas will be sent out.