NEO Protocol Specification

Agenda

  • Architecture
  • Network Layer
  • Operating Data
  • Start & Stop
  • Read & Write
  • Replication
  • Deadlock Avoidance
  • Maintenance
  • Storage Backends

(this document is being written: see below for its state)

This document first introduces NEO architecture and the different roles of each node in a NEO cluster. Then it gives precise definition of communication protocol and semantic of workflows involved:

In Network Layer, we explain how data is exchanged between nodes: serialization format and link establishment. But we do not explain the semantic of messages exchanged between nodes.

In Operating Data, we define the data that describe the state of the cluster, and how they are exchanged between nodes.

In Start & Stop, we detail how a cluster becomes operational and cleanly shutdown.

In Read & Write, we describe the different workflows while providing service to clients.

In Replication, we explain how we get High Availability and online backups.

Deadlock Avoidance is part of Read & Write but easier to document after Replication.

In Maintenance, we see various tasks like ZODB Pack and add/replace/remove storage nodes.

In Storage Backends, we describe the typical architecture of a possible implementation of storage backend.

Notice to implementers: A NEO deployement can be seen as a single application, in that a node can trust peers to implement the same algorithm. However, failures like a node being stopped or unpredictable network may happen at any time and such misbehaviour should be handled gracefully.

XXX The following sections are not finished:

  • Start & Stop: election
  • Read & Write: undo
  • Replication: backup
  • Deadlock Avoidance
  • Maintenance
  • Storage Backends
  • Appendix, References

Architecture

NEO is a distributed, redundant and scalable implementation of ZODB API, which means it is a transactional noSQL database with ZODB clients that can use it. More technically, NEO is a ZODB storage backend and this document assumes you have read the ZODB book.

Let us start by explaining the architecture of NEO. This section is inspired by Linux Solutions Presentation and summarized with less technical details.

Cluster Overview

AdminA1...AiClientC1...CjMasterprimarysecondary...secondaryStorageS1...Skneoctl⇢ control→ data

NEO consists of three components: master nodes (M), storage nodes (S) and client nodes (C). Here, node means a software component which runs on a computer: it can either be a process or a set of threads inside another process. They can run in the same computer or different computers in a network.

At the level of an application accessing to a NEO database, there's the client part of NEO, which is used through the ZODB. Client nodes interact with the primary master node and storage nodes to perform transactions and retrieve data.

Among master nodes, one is the primary: as the arbitrator of the cluster, its tasks are to generate new identifiers for objects (OID) and transactions (TID), broadcast invalidations, control transactions, take decisions about topology changes. It does the minimum to guarantee the consistency of the database and in particular, it never sees any application data.

Other master nodes are called secondary. They are spare master nodes. One of them can take over the role of primary master if the current one fails. A master node becomes primary after an election.

Storage nodes are the only ones storing data persistently: mainly application data, i.e. objects and transactions that form the database, but also all the cluster metadata that the primary master needs to recover.

A fourth node type is optional. The administration of a NEO cluster is done via an admin node (A), which issues commands to the primary master. It comes with a command-line tool called neoctl.

Partitioning

A NEO database is split in partitions and each storage node is assigned to several partitions. These assignments form the partition table: each assignment is called cell.

Locating in which partition an OID belongs is done by computing OID % NP, where NP is the number of partitions. Then, the partition table is queried to know which storage node(s) contain(s) the object.

Load balancing is achieved by having several storage nodes, similarly to RAID0. For high availability, each partition can also be assigned to NR+1 storage nodes, similarly to RAID1+0: NR is the number of replicas.

Each cell has a state: for example, it is not readable if it does not contain all the partition data. By also looking at node states, a client knows where to load or store data.

The primary node broadcasts all changes to the partition table to all other nodes, and it also notifies of all node state changes.

Transactions

Common ZODB implementations do database-level locking, which means that the processing of a tpc_begin must wait that another transaction being committed is finished. In some cases like ZEO for 2 distinct server connections, the database is locked during vote, as described in the above schema.

For better scalability, NEO implements object-level locking, so that transactions modifying disjoint sets of oids can be committed simultaneously. Concurrent tpc_begin and tpc_finish can even happen in a different order. This implies the use of a temporary TID (TTID) during the commit as a way for different nodes to refer to the transaction, and the resolution of possible deadlocks.

During a commit, modified oids are write-locked. On tpc_finish, they are also read-locked because storage nodes may not be ready yet when the clients are notified of the new transaction.

NEO also aims to handle efficiently transactions of any size, in number of bytes or number of changed OIDs. For this, the protocol is designed in such way that the client and storage nodes can process a stream of data even on systems with limited RAM.

Split Brain

master storage master network cut storage NR = 1 client client

The above example of cluster is the simplest case of split brain, when 2 master nodes don't see each other, usually due to a network cut.

NEO makes sure that at most one part is operational, even if several parts have all data (NR>0) to serve clients. At most one part must be writable by clients, so that other parts are only out of date and don't diverge.

This is done by only starting operation automatically if it's safe to do so. Nothing prevents a user from forcing that if all data are there.

Network Layer

Communication between two nodes is done by exchanging messages over a streaming point-to-point protocol that supports guaranteed order and delivery of messages. Only TCP and SSL over TCP are implemented for now (over IPv4 and IPv6), but other protocols like UNIX sockets or QUIC would be possible. For TCP, TCP_NODELAY shoud be used.

Connection failures shall also be detected. The NEO protocol has no mechanism for that, so it has to be done at the level of the underlying protocol, e.g. by enabling TCP keep-alives.

We start by describing how these messages are serialized; their semantics will be explained in the sections describing workflows. MessagePack is used for all packets. The handshake packet is an array with the following 2 fields:

  • Magic: "NEO" (3 bytes)
  • Version: positive integer

All other packets are arrays of 3 fields, as described in the next section:

  • Message ID: positive integer
  • Message Code: 16-bit positive integer
  • Arguments: array

Messages

The message code encodes the type of the message. The following table lists the 70 different types that can be exchanged. 44 of them are requests with response packets. 1 of them is a generic response packet for error handling. The remaining 25 are notification packets.

The code (#) of a response packet is the same as the corresponding request one, with the highest order bit set. Using Python language, it translates as follows:

response_code = request_code | 0x8000

Message IDs are used to identify response packets: each node sends a request with a unique value and the peer replies using the same id as the request. Notification packets normally follow the same rule as request packets, for debugging purpose. In some complex cases where replying is done with several notification packets followed by a response one (e.g. replication), the notification packets must have the same id as the response.

Notice to implementers:

  • A 32-bit counter can be used for Message IDs, 0 being the value of the first sent message, and the value is reset to 0 after 0xffffffff.
  • On error, the implementer is free to answer with a Error packet before aborting the connection, so that the requesting node logs debugging information. We will only document other uses of Error.
Message Types
# Message Description Workflow Nodes
0 Error Error is a special type of message, because this can be sent against any other message, even if such a message does not expect a reply usually.   * → *
1 RequestIdentification Request a node identification. This must be the first packet for any connection.   * ⇄ *
2 Ping Empty request used as network barrier.   * ⇄ *
3 CloseClient Tell peer that it can close the connection if it has finished with us.   * → *
4 AskPrimary Ask node identier of the current primary master.   ctl ⇄ A
5 NotPrimaryMaster Notify peer that I'm not the primary master. Attach any extra information to help the peer joining the cluster.   SM → *
6 NotifyNodeInformation Notify information about one or more nodes.   M → *
7 AskRecovery Ask storage nodes data needed by master to recover. Reused by `neoctl print ids`.   M ⇄ S
ctl ⇄ A ⇄ M
8 AskLastIDs Ask the last OID/TID so that a master can initialize its TransactionManager. Reused by `neoctl print ids`.   M ⇄ S
ctl ⇄ A ⇄ M
9 AskPartitionTable Ask storage node the remaining data needed by master to recover.   M ⇄ S
10 SendPartitionTable Send the full partition table to admin/client/storage nodes on connection.   M → A, C, S
11 NotifyPartitionChanges Notify about changes in the partition table.   M → *
12 StartOperation Tell a storage node to start operation. Before this message, it must only communicate with the primary master.   M → S
13 StopOperation Notify that the cluster is not operational anymore. Any operation between nodes must be aborted.   M → S, C
14 AskUnfinishedTransactions Ask unfinished transactions, which will be replicated when they're finished.   S ⇄ M
15 AskLockedTransactions Ask locked transactions to replay committed transactions that haven't been unlocked.   M ⇄ S
16 AskFinalTID Return final tid if ttid has been committed, to recover from certain failures during tpc_finish.   M ⇄ S
C ⇄ M, S
17 ValidateTransaction Do replay a committed transaction that was not unlocked.   M → S
18 AskBeginTransaction Ask to begin a new transaction. This maps to `tpc_begin`.   C ⇄ M
19 FailedVote Report storage nodes for which vote failed. True is returned if it's still possible to finish the transaction.   C ⇄ M
20 AskFinishTransaction Finish a transaction. Return the TID of the committed transaction. This maps to `tpc_finish`.   C ⇄ M
21 AskLockInformation Commit a transaction. The new data is read-locked.   M ⇄ S
22 InvalidateObjects Notify about a new transaction modifying objects, invalidating client caches.   M → C
23 NotifyUnlockInformation Notify about a successfully committed transaction. The new data can be unlocked.   M → S
24 AskNewOIDs Ask new OIDs to create objects.   C ⇄ M
25 NotifyDeadlock Ask master to generate a new TTID that will be used by the client to solve a deadlock by rebasing the transaction on top of concurrent changes.   S → M → C
26 AskRebaseTransaction Rebase a transaction to solve a deadlock.   C ⇄ S
27 AskRebaseObject Rebase an object change to solve a deadlock.   C ⇄ S
28 AskStoreObject Ask to create/modify an object. This maps to `store`.   C ⇄ S
29 AbortTransaction Abort a transaction. This maps to `tpc_abort`.   C → S
C → M → S
30 AskStoreTransaction Ask to store a transaction. Implies vote.   C ⇄ S
31 AskVoteTransaction Ask to vote a transaction.   C ⇄ S
32 AskObject Ask a stored object by its OID, optionally at/before a specific tid. This maps to `load/loadBefore/loadSerial`.   C ⇄ S
33 AskTIDs Ask for TIDs between a range of offsets. The order of TIDs is descending, and the range is [first, last). This maps to `undoLog`.   C ⇄ S
34 AskTransactionInformation Ask for transaction metadata.   C ⇄ S
35 AskObjectHistory Ask history information for a given object. The order of serials is descending, and the range is [first, last]. This maps to `history`.   C ⇄ S
36 AskPartitionList Ask information about partitions.   ctl ⇄ A
37 AskNodeList Ask information about nodes.   ctl ⇄ A
38 SetNodeState Change the state of a node.   ctl ⇄ A ⇄ M
39 AddPendingNodes Mark given pending nodes as running, for future inclusion when tweaking the partition table.   ctl ⇄ A ⇄ M
40 TweakPartitionTable Ask the master to balance the partition table, optionally excluding specific nodes in anticipation of removing them.   ctl ⇄ A ⇄ M
41 SetNumReplicas Set the number of replicas.   ctl ⇄ A ⇄ M
42 SetClusterState Set the cluster state.   ctl ⇄ A ⇄ M
43 Repair Ask storage nodes to repair their databases.   ctl ⇄ A ⇄ M
44 NotifyRepair Repair is translated to this message, asking a specific storage node to repair its database.   M → S
45 NotifyClusterInformation Notify about a cluster state change.   M → *
46 AskClusterState Ask the state of the cluster   ctl ⇄ A
A ⇄ M
47 AskObjectUndoSerial Ask storage the serial where object data is when undoing given transaction, for a list of OIDs.   C ⇄ S
48 AskTIDsFrom Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Used by `iterator`.   C ⇄ S
49 AskPack Request a pack at given TID.   C ⇄ M ⇄ S
50 CheckReplicas Ask the cluster to search for mismatches between replicas, metadata only, and optionally within a specific range. Reference nodes can be specified.   ctl ⇄ A ⇄ M
51 CheckPartition Ask a storage node to compare a partition with all other nodes. Like for CheckReplicas, only metadata are checked, optionally within a specific range. A reference node can be specified.   M → S
52 AskCheckTIDRange Ask some stats about a range of transactions. Used to know if there are differences between a replicating node and reference node.   S ⇄ S
53 AskCheckSerialRange Ask some stats about a range of object history. Used to know if there are differences between a replicating node and reference node.   S ⇄ S
54 NotifyPartitionCorrupted Notify that mismatches were found while check replicas for a partition.   S → M
55 NotifyReady Notify that we're ready to serve requests.   S → M
56 AskLastTransaction Ask last committed TID.   C ⇄ M
ctl ⇄ A ⇄ M
57 AskCheckCurrentSerial Check if given serial is current for the given oid, and lock it so that this state is not altered until transaction ends. This maps to `checkCurrentSerialInTransaction`.   C ⇄ S
58 NotifyTransactionFinished Notify that a transaction blocking a replication is now finished.   M → S
59 Replicate Notify a storage node to replicate partitions up to given 'tid' and from given sources.   M → S
60 NotifyReplicationDone Notify the master node that a partition has been successfully replicated from a storage to another.   S → M
61 AskFetchTransactions Ask a storage node to send all transaction data we don't have, and reply with the list of transactions we should not have.   S ⇄ S
62 AskFetchObjects Ask a storage node to send object records we don't have, and reply with the list of records we should not have.   S ⇄ S
63 AddTransaction Send metadata of a transaction to a node that does not have them.   S → S
64 AddObject Send an object record to a node that does not have it.   S → S
65 Truncate Request DB to be truncated. Also used to leave backup mode.   ctl ⇄ A ⇄ M
M ⇄ S
66 FlushLog Request all nodes to flush their logs.   ctl → A → M → *
67 AskMonitorInformation     ctl ⇄ A
68 NotifyMonitorInformation     A → A
69 NotifyUpstreamAdmin     M → A
Enum Types
  1. CellStates
    1. OUT_OF_DATE
    2. UP_TO_DATE
    3. FEEDING
    4. CORRUPTED
    5. DISCARDED
  2. ClusterStates
    1. RECOVERING
    2. VERIFYING
    3. RUNNING
    4. STOPPING
    5. STARTING_BACKUP
    6. BACKINGUP
    7. STOPPING_BACKUP
  3. ErrorCodes
    1. ACK
    2. DENIED
    3. NOT_READY
    4. OID_NOT_FOUND
    5. TID_NOT_FOUND
    6. OID_DOES_NOT_EXIST
    7. PROTOCOL_ERROR
    8. REPLICATION_ERROR
    9. CHECKING_ERROR
    10. BACKEND_NOT_IMPLEMENTED
    11. NON_READABLE_CELL
    12. READ_ONLY_ACCESS
    13. INCOMPLETE_TRANSACTION
  4. NodeStates
    1. UNKNOWN
    2. DOWN
    3. RUNNING
    4. PENDING
  5. NodeTypes
    1. MASTER
    2. STORAGE
    3. CLIENT
    4. ADMIN

Enum values are serialized using Extension mechanism: type is the number of the Enum type (as listed above), data is MessagePack serialization of the Enum value (i.e. a positive integer). For example, NodeStates.RUNNING is encoded as \xd4\x03\x02.

Naming choice: For cell states, node states and node types, names are chosen to have unambiguous initials, which is useful to produce shorter logs or have a more compact user interface. This explains for example why RUNNING was preferred over UP.

Other Constants
INVALID_TID '\xff\xff\xff\xff\xff\xff\xff\xff'
INVALID_OID '\xff\xff\xff\xff\xff\xff\xff\xff'
INVALID_PARTITION 0xffffffff
ZERO_HASH '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_TID '\x00\x00\x00\x00\x00\x00\x00\x00'
ZERO_OID '\x00\x00\x00\x00\x00\x00\x00\x00'
MAX_TID '\x7f\xff\xff\xff\xff\xff\xff\xff'

MAX_TID could be bigger but in the Python implementation, TIDs are stored as integers and some storage backend may have no support for values above 2⁶³-1 (e.g. SQLite).

Node IDs are 32-bit integers. NID namespaces are required to prevent conflicts when the master generates new ids before it knows those of existing storage nodes. The high-order byte of node ids is one the following values:

Storage 0x00
Master -0x10
Client -0x20
Admin -0x30

Actually, only the high order bit is really important and the 31 other bits could be random, but extra namespace information and non-randomness of 3 LOB help to read logs.

Link Establishment

dialing nodeserver nodeunderlying (e.g. TCP)connection establishedhandshakeRequestIdentificationAcceptIdentification

Before two nodes can exchange messages, they first need to connect to each other by way of underlying network.

For a given connection, one node was listening and another one dialed listening node address. The way to set address a node listens on is outside scope of this specification. Nodes publish their addresses to the primary master node during identification (see RequestIdentification message). Node information is then broadcast using NotifyNodeInformation.

In the context of established link the node which dialed the listener is called "dialing node", while the node which was listening - "server node".

Upon exception with the underlying connection, all incoming pending packets must be processed (but dropping any further packet that would be sent via this connection). When connected to a peer (i.e. when not connecting), a node can initiate a disconnection in 2 ways, depending on whether pending outgoing packets must be sent before actually closing the underlying connection: if they should, it is so-called aborted and the node must not process any incoming packet from this connection. Aborting a connection is often used to explain a peer why it is disconnected (e.g. Error message).

Handshake

NEO-specific handshake follows after underlying network connection is established to make sure nodes talks the same protocol. Handshake transmissions are not ordered with respect to each other and can go in parallel.

To avoid accidental DoS or stuck communication in case a connection is made with a non-NEO node, MessagePack should not be used to decode this first packet. Comparison should be done:

  • at the lowest level against the expected sequence of bytes;
  • as soon as bytes are received, so that the connection is rejected if the beginning does not match.

To simplify the implementation, the MessagePack encoding must be as small as possible, using bin format for the magic, so '\x92\xa3NEO\x01' for the current version.

Handshake succeeds if the comparison matches exactly. Else, the underlying network link is closed, thus cancelling link establishment.

The version is increased whenever upgrading a node may require to upgrade other nodes.

Notice to implementers:

  • To avoid log flooding, a node should wait a while before retrying to establish a link, e.g. 1 second.
  • Bad SSL implementations like OpenSSL 1.0.2 do not try to reject non-SSL connections as soon as possible. MSG_PEEK can be used to check that the first byte is \x16 (SSL handshake).
  • Handshake:
    • When only the version differs, a different error should be logged.
    • Messages can be sent without waiting for the handshake phase to complete, even within the same network packet as the handshake.
Identification

After successful handshake, any node except tools connecting to the admin node to send command must identify with the server node which either accepts or rejects it. On success, the link becomes established and further message exchange on it can happen depending on peer types, workflow and current cluster state.

RequestIdentification
node_type NodeTypes Type of the requesting node.
nid int | nil Node ID of the requesting node if one was already assigned.
address [host: bin, port: int] | nil Address the requesting node itself listens on.
name bin Name of the NEO cluster, to catch misconfiguration mistakes.
id_timestamp float | nil Time when the requesting node was identified by the master.
extra {key: bin, value: any} Extra node properties.
AcceptIdentification
node_type NodeTypes Type of the server node
nid int | nil Node ID of the server node.
your_nid int | nil Node ID assigned to requesting node if server is master, otherwise echo of nid parameter.
Identification to the Primary Master

Once the cluster name is checked, the master should check nid and address and reject if it conflicts with an already identified node, using PROTOCOL_ERROR. In particular, it should protect against misconfiguration when cloning a node. For exemple, address must not be the same as the the one of the master. The wanted nid can be ignored for all nodes except storages (a negative nid is considered temporary).

See the beginning of IdentificationHandler.requestIdentification for an example of implementation (note: the uuid variable should be renamed into nid).

Then, processing depends on node_type. Unknown values result in PROTOCOL_ERROR. The master rejects with NOT_READY if the connecting node can't be serviced for the current cluster state (an untested idea is to suspend the identification until the cluster enters into an appropriate state). If accepted:

  1. A new id_timestamp is generated for the node using a strictly increasing function. Although a counter would work, an epoch value (with a protection against date jumps in the past) is more useful because the user can easily know when a node has joined the cluster.
  2. Information about the new node is broadcasted (NotifyNodeInformation) to other nodes.
  3. Optionally, storage node cloning.
  4. The node is marked as identified.
  5. AcceptIdentification is answered, immediately followed by node table and the partition table, in this order. A node knows its id_timestamp via the NotifyNodeInformation packet.

XXX There's currently a duplicate nid issue at cluster creation when storage nodes are identified before all masters see each other and elect a primary. Do storage nodes need an id before the cluster is created? Another solution is that they treat their ids as temporary as long as the partition table is empty.

Extra Node Properties
  • Admin
    • backup: List of backup cluster names, for monitoring.
  • Client
    • backup: Backup cluster name. The client node is actually the primary master of a backup cluster.
  • Storage
    • new_nid: Storage node cloning, nid and address must be nil.
    • devpath: A list of strings, e.g. [room, machine, disk]. For maximum resiliency, cells of each partition are assigned as far as possible from each other, by checking the topology path of nodes.
Identification to Other Nodes

The general case is to only accept known nodes (from the master), with same cluster name, and with a id_timestamp that matches. It can mismatch in 2 ways:

  • if the id_timestamp value from master is smaller, the identification can be delayed, waiting for a NotifyNodeInformation packet;
  • else, the connecting node got disconnected from the master and NOT_READY must be returned.

Admin and storage nodes also accept connections from nodes of same type in a related backup cluster. In such case, nid, address and id_timestamp are nil. For storage, the cluster name must match as usual, whereas backup admins specify their own cluster name.

There's no identification between neoctl and an admin node.

Simultaneous Connection between 2 Nodes

As part of replication, it is possible that 2 storage nodes of the same cluster connect to each other at the same time. The race is solved by comparing nid: if the peer id is greater, identification succeeds, replacing the current outgoing link establishment, else it is rejected with PROTOCOL_ERROR.

It implies that the established link is used for requests from both sides. To avoid that storages remain connected with each other forever, the CloseClient packet (no argument) can be used to notify the peer that it's free to close the connection.

XXX Reusing a connection as client whereas CloseClient was already sent is unlikely to work reliably, because the peer may close at any time. The packet should be changed into AsClient(bool).

Operating Data

Besides data actually stored in the database, there is also state maintained among nodes representing e.g. current set of nodes, partition table, master's idea about what is currently going on with the cluster, etc. The primary master maintains main version of this state, makes changes to it and shares this state with all its peers.

These data consist of:

  • Node table
  • Partition table
  • Cluster state

Whenever some information is shared with a node, it is sent in whole just after identification; then incremental updates are sent after each change. Note however this rule does not apply to the cluster state, but it may change in the future.

The next sections describe them.

These data can also serve to request nodes to perform actions, like shutdown a node by telling it it is DOWN.

Nodes

Possible states for each type:

  ADMIN CLIENT SM STORAGE
UNKNOWN notification only
DOWN    
RUNNING
PENDING      

Notification matrix:

  ADMIN CLIENT MASTER STORAGE
ADMIN (backup)
CLIENT (backup)  
SM      
STORAGE  
(e.g. clients only know the states
of master and storage nodes)

All nodes have a list of nodes in volatile memory, with following information:

  • type (NodeTypes)
  • nid
  • address
  • status (NodeStates)
  • identified
  • id_timestamp
  • extra properties...

This table is maintained by the primary master, which broadcasts it to interested nodes (see notification matrix). The same packet is used for full data upon connection to the master and incremental changes afterwards:

NotifyNodeInformation
timestamp float Strictly monotonic counter. Any node that requests identification with a greater timestamp shall be delayed.
node_list [node_type: NodeTypes, addr: [host: bin, port: int] | nil, nid: int | nil, state: NodeStates, id_timestamp: float | nil] List of nodes.

Node states describe the current status of nodes from primary master point of view. By definition, the primary master node is therefore always running and the above matrices use SM to only refer to secondary master nodes. Admin and client nodes are also either running or forgotten, so node states are somehow only useful for other types.

UNKNOWN
Not really a state: it is used to notify that a node can be forgotten.
DOWN
The node is not connected to the primary master and other nodes must not (try to) communicate with it.
RUNNING
The node is up and running normally as part of the cluster.
PENDING
The storage node is connected to the cluster but it's still unused. It is an intermediate state before adding it to the partition table.

If the primary master node fails, all nodes must be able to find the new elected one, which explains why any change to the list of master nodes is broadcasted to all nodes without exception.

For the monitoring of backup clusters, their admin nodes must be able to connect to the admin of the main cluster. Information about the latter is propagated via backup master nodes (seen as clients). Hence (backup) in the notification matrix, using the backup extra node property as described in the Link Establishment section.

Apart from that, the following changes are considered:

  • The current implementation of the admin node is minimal and we don't know yet if notifying admins about the state of other admins in the same cluster would be useful.
  • The PENDING state may be removed. There's at least the following minor issue: a RUNNING storage node without any assigned partition is restarted as PENDING.

Partition Table

  • Partition Table ID
  • Number of replicas
  • Table: NP lists of cells, from partition 0 to NP-1

Cell States:

DISCARDED OUT_OF_DATE UP_TO_DATE replicated node lost FEEDING tweak CORRUPTED check check
Cell
Node ID, Cell State
Operational
Means that for every partition, at least one cell is readable and the associated node is running.
Messages

Full table:

AskPartitionTable
(no parameter)
AnswerPartitionTable
ptid int | nil Partition Table ID.
num_replicas int Number of replicas.
row_list [[(nid: int, state: CellStates)]] NP lists of cells, from partition 0 to NP-1.
SendPartitionTable
same as AnswerPartitionTable

Updates:

NotifyPartitionChanges
ptid int Partition Table ID.
num_replicas int Number of replicas.
cell_list [(partition: int, nid: int, state: CellStates)] List of cells.
Cell States
OUT_OF_DATE
Write-only cell. Last transactions are missing because storage is/was down for a while, or because it is new for the partition. It usually becomes UP_TO_DATE when replication is done.
UP_TO_DATE
Normal state: cell is writable/readable, and it isn't planned to drop it.
FEEDING
Same as UP_TO_DATE, except that it will be discarded as soon as another node finishes to replicate it. It means a partition is moved from 1 node to another. It is also discarded immediately if out-of-date.
CORRUPTED
A check revealed that data differs from other replicas. Cell is neither readable nor writable.
DISCARDED
Not really a state: only used in network packets to tell storages to drop partitions.

UP_TO_DATE is the initial cell state when the database is created. Then, i.e. after a tweak, new cells always start in OUT_OF_DATE state.

Cell States readable/writable:

  OUT_OF_DATE UP_TO_DATE FEEDING CORRUPTED DISCARDED
readable
writeable

The partition table is said operational when there's at least 1 readable cell for every partition. When it isn't, the master isn't authoritative anymore about it and therefore doesn't send it to storage nodes: we'll see later that's one of the reason causing the master to go back to RECOVERING cluster state. The only case a storage node stores an PT that isn't operational is when a cell of a backup cluster is CORRUPTED (after comparison against an upstream cluster). The master can always notify admin nodes, so that the user is informed about what it knows.

Cluster State

Allowed states for each type:

  CLIENT STORAGE other
RUNNING/BACKINGUP
STOPPING_BACKUP   [1]
other   [2]
  1. Only nodes that are already identified before switching to this state.
  2. Cloning a storage node is forbidden in RECOVERING state.

Cluster States:

RECOVERING VERIFYING STOPPING RUNNING STARTING_BACKUP BACKINGUP STOPPING_BACKUP

The primary master indicates to other nodes its idea about what is currently going on with the cluster via the cluster state value.

NotifyClusterInformation
state ClusterStates Cluster state.

It is only sent on state change, not after identification. Admin nodes always want to know the state and therefore reuse the following ctl ⇄ A packet:

AskClusterState
(no parameter)
AnswerClusterState
same as NotifyClusterInformation

During identification, the master must reject with NOT_READY nodes that are not allowed to join the cluster in the current state. And reciprocally, on state change, it must disconnect nodes that aren't allowed in the new state.

Before going back to RECOVERING:

  • StopOperation (no parameter) is sent to identified storage/client nodes (for clients, it's done just before disconnecting them). XXX This is probably redundant with NotifyClusterInformation.
  • Identified storage nodes are changed to PENDING state.

Start & Stop

The main task of a NEO cluster is to provide read/write service to clients. However before this operational service could be provided, the cluster has to undergo through several startup states whose purpose is to make sure the cluster is indeed ready.

These cluster states are detailed in the next sections. Most logic is located inside master node. We'll end by detailing how the cluster can be properly shutdown, and how the storage nodes must behave depending on the state of the cluster.

RECOVERING

Network IOelectionrecoveryNode Table changesall storage nodes that will immediatelyserve cells must be set RUNNINGall unidentified master nodes must be, if necessary:disconnectedset DOWNPartition Table changesExisting PT?yesnouse it, outdate cells if necessarycreate newswitch to VERIFYINGRecoveryMSidentifiedPENDINGloopAskRecoveryAnswerRecoveryAskPartitionTableAnswerPartitionTablestartup allowed; breakunless truncation is requiredTruncate

The cluster is initially in the RECOVERING state, and it goes back to this state whenever the partition table becomes non-operational again. In parallel of recovering its own data by reading it from storage nodes, an election of the primary master always happens.

Both processes ends if the master is still primary and allowed to become operational, in which case the node and partition tables are updated (which implies broadcast of changes to other nodes) and the cluster switches to VERIFYING. Reciprocally, the master will have to go back to RECOVERING for any of the following reasons:

  • the partition table is not operational anymore
  • the user request to truncate the database
  • a storage is disconnected whereas truncation is not complete

The changes in the node table also means that whenever the cluster is not in RECOVERING state, a newly-identified storage node is set RUNNING if it serves at least 1 cell, PENDING otherwise.

Truncate
tid bin All data after the given TID will have to be deleted.
AskRecovery
(no parameter)
AnswerRecovery
ptid int | nil Partition Table ID.
backup_tid bin | nil If not nil, the cluster is replicating another cluster and the storage node has all data up to the given TID.
truncate_tid bin | nil All data after the given TID shall be deleted.

Storage nodes may have different versions of the partition table (it means that some nodes were down the last time that the cluster was operational). The master must retrieve it from any node having the last one, and take its truncate_tid, as well as its backup_tid to know if the cluster is in backup mode. However, the master must track the backup_tid of each cell, which is know from the backup_tid value returned by storage nodes.

The conditions to start up the cluster are complex because we want to avoid a split brain without requiring the user to force unnecessarily. There are 2 modes of evaluation:

  • strict: the master only starts if all checks pass;
  • non-strict: some checks can be skipped,
    • either if the user requested to force startup, but such request should be denied if startup would not be possible immediately;
    • or if the master has always played the primary role since the last time the cluster was operational, provided the DB does not have to be truncated.

Conditions for ending recovery:

  • If partition table exists:
    1. The master starts from the list of nodes that have at least 1 readable cell and:
      • if non-strict: unidentified nodes are filtered out
      • else, if the list is not empty:
        • if it is going to truncate: the list is replaced by that of nodes with an assigned cell (to avoid that an offline node comes back with a different history)
        • all nodes in the list must be identified
    2. The list must not be empty and the master must not be waiting for an answer from any node in the list
    3. If truncation is required, all nodes in the list must have stored persistently a truncate_tid value that is not greater than the target one: this is fixed by sending Truncate+AskRecovery messages, which prevents startup because the master is waiting again for answers. Truncation is effective during the VERIFYING phase.
    4. Nodes that will immediately serve cells are those that are identified and with at least 1 assigned cell.
  • Else, if non-strict, the master takes the list of identified storage nodes: like when a PT exists, startup is allowed unless the list is empty or the master is waiting for an answer from any node in the list. Cells will be assigned to nodes in this list.
Election

Election is the workflow responsible to select only one node in between potentially many master nodes of the cluster. The selected master node becomes known as the primary master. Once the primary master is elected it becomes reponsible for driving the cluster in every aspect where master is documented to be involved.

TODO

VERIFYING

MSreplay tpc_finish (if not in backup mode)AskLockedTransactionsAnswerLockedTransactionsall nodes have repliedAskFinalTIDAnswerFinalTIDall needed TIDs are knownValidateTransactionAskLastIDstruncate here ifpreviously requested(withTruncate)AnswerLastIDsVERIFYING ends when all nodes have replied

In VERIFYING state:

  1. The data are verified, which only consists in replaying transactions that were partially finished, in case the cluster was not cleanly shutdown. Obviously, there's nothing to do here in backup mode because there can't be any commit.
  2. The master asks all (RUNNING) storage nodes the greatest TID&OID that exist in the database. On storage's side, this first triggers database truncation, if there previously was request issued to do so.
  3. The master assumes that the database is now truncated and picks the greatest TID&OID of all returned values: it is now able to generate new TIDs&OIDs.
  4. At last, it switches to STARTING_BACKUP state if in backup mode, or to RUNNING otherwise.

Verifying data starts by gathering all transactions that may have been partially finished. It's safe to query outdated cells from nodes with readable cells. For other nodes, it's more complicated:

  1. pt: U|U ltid: 10
  2. S1: restart with voted ttid=13
    S2: stop with locked ttid=13
  3. pt: U|O ltid: 10
  4. verification drops ttid=13 because it's not locked
  5. new commits -> ltid: 20
  6. S1 restarted, S2 started
  7. ttid=13 must be dropped

And we can't ignore ttid < last tid for all nodes, even if the master serializes unlock notifications:

  1. pt: U.|.U ltid: 15
  2. unlock ttid=18 to S1
  3. unlock ttid=20 to S2
  4. S1 stopped before unlocking ttid=18
  5. S2 unlocks ttid=20
  6. back to recovery, S1 started
  7. verification must validate ttid=18

So for nodes without any readable cell, and only for them, one only checks if they have locked transactions. Replication will do the rest. Nodes are queried in parallel with:

AskLockedTransactions
(no parameter)
AnswerLockedTransactions
tid_dict {ttid: bin, tid: bin | nil} Voted transactions, locked or not.

From storage answers, the master established 2 lists:

  • voted_dict, mapping TTID to the list of nodes that returned it and have at least 1 readable cell, and only TTIDs with such nodes
  • locked_dict, mapping TTID to TID

Only nodes that are assigned to the cell containing transaction metadata store whether a voted transaction is locked or not. If these node have unlocked the transaction and others were interrupted before doing so, locked_dict is incomplete. The TID can be got back with:

AskFinalTID
ttid bin TTID.
AnswerFinalTID
tid bin | nil TID.

This has to be done for all TIDs in voted_dict but not locked_dict, and the master must only query readable cells (if an outdated cell had unlocked ttid, then either it is already in locked_dict or a readable cell also unlocked it). For each TID, the master stops asking after the first positive result. No result means that the transaction is not locked: no need to tell nodes to delete it, since they drop any unfinished data just before being operational.

At this point, the master knows all transactions for which tpc_finish was called but not fully processed, and this may include replicas with transactions that were not even locked. It finishes them by sending ValidateTransaction notifications for every TTID in locked_dict, to every node in voted_dict.

ValidateTransaction
ttid bin TTID.
tid bin TID.

The process of this packet is identical to what's done by storage backends when the transaction is locked and unlocked (see tpc_finish). A single underlying commit should be done, at the end.

For the next step, all nodes are queried, in parallel:

AskLastIDs
(no parameter)
AnswerLastIDs
loid bin | nil Greatest OID stored by the storage node.
ltid bin | nil Greatest TID stored by the storage node.

RUNNING

The cluster is in RUNNING state when it has fully recovered, which means the database is read-writable by client nodes.

In order to notify RUNNING storage nodes that they can server clients, the master sends StartOperation, either when the cluster enters RUNNING state for nodes that are already RUNNING, or whenever a node becomes RUNNING.

StartOperation
backup bool False

XXX By reworking how the cluster state is broadcasted to other nodes, it should be possible to get rid of this packet: a storage node being notified that it is RUNNING and that the cluster is RUNNING could start operation immediately. In some way, like clients, which can only join a RUNNING cluster and work with storage nodes as soon as they're RUNNING.

A storage node notifies the master it is ready with NotifyReady (no parameter). Then it accepts connections from clients.

STOPPING

The main purpose of the STOPPING state is to provide a way to shutdown the whole cluster cleanly, in order to prevent transactions from being interrupted during the second phase.

If the cluster was RUNNING, the master must wait that all transactions are either aborted or finished, and block all requests to start new transactions. Clients must abort transactions that are still in the first phase.

If the cluster is no longer operational or if there isn't any transaction left:

  1. The master stops accepting new connections.
  2. The master asks each identified storage node to shutdown by notifying with NotifyNodeInformation that the latter is DOWN.
  3. Connections to nodes are aborted.
  4. The master exits when it is no longer connected to any other node.

XXX The reference implementation is broken when the cluster was not RUNNING so the above specifications are subject to change.

Joining a Cluster

A node first needs to establish a link with the primary master. This is done by trying all known master nodes in loop. Identifying against a secondary master results in a NotPrimaryMaster notification (instead of a AcceptIdentification reply), so at most 2 attempts are usually enough.

Storage Node:

Once identified, the behaviour of a storage node depends on the cluster state. Via NotifyNodeInformation, it was told being either PENDING or RUNNING. As long as it is not explicitly asked to start operation, it is in a special state where it must be able to:

  • provide information for the master to recover (RECOVERING)
  • receive a partition table (any state except RECOVERING)
  • perform VERIFYING operations

Client Node:

For a client node, identification is followed by AskLastTransaction (no parameter). AnswerLastTransaction only contains the TID of the last committed transaction: if different from a previous session, caches must be invalidated:

  • if new TID is bigger (common case), all current records (i.e. no next_tid) must be removed from NEO cache
  • else the database was truncated (rare case) and the whole NEO cache should be emptied

In either case, the ZODB Connection caches must be invalidated in whole (invalidateCache). InvalidateObjects notifications must be ignored as long as AnswerLastTransaction is not processed.

Read & Write

As soon as the cluster enters RUNNING state, the master must be ready to process requests to commit new transactions, and same for storage nodes that are ready to serve clients. When not operational anymore, all ongoing transactions must be forgotten.

Whereas previous sections describe NEO internals, we'll see workflows that correspond to user-visible service.

ZODB is somewhat a key-value database. For some specific ZODB&NEO features, a few metadata are associated to them. For storage node, a record consists in:

  • OID
  • TID (aka serial)
  • data, optionally compressed by the client node
  • compression: 0 if uncompressed, 1 if zlib
  • checksum: SHA1 of data, after compression if compressed
  • data serial: TID of original record in case of an undo

The undo of an object creation is encoded with (compression=0, data='', checksum='\0..\0').

Read

MCS (readable cell)S (readable cell)NotifyPartitionChanges| NotifyNodeInformationGetObjectnode retrieves dataAnswerGetObjectpossible error handlingon race conditionPingPongGetObjectAnswerGetObject

In order to load an object, a client node first consults its partition & node tables to get a list of storage nodes that should currently have corresponding data. Then it queries one node from that list:

AskObject
oid bin OID of object to load.
at bin | nil same as serial in IStorage.loadSerial
before bin | nil same as tid in IStorage.loadBefore
AnswerObject
oid bin Same as in request. XXX shoud be removed
serial bin TID of returned record
next_serial bin | nil TID of next record, nil if there's none
compression uint See Read & Write section.
checksum bin
data bin
data_serial bin | nil

At least one of at and before must be nil. Possible error codes are:

  • OID_NOT_FOUND: OID exists but not at given at/before
  • OID_DOES_NOT_EXIST: OID does not exist
  • NON_READABLE_CELL

The latter error code is returned when the storage node is no longer assigned to store the partition containing the requested data, or if the cell is not readable anymore. Such case happens due to distributed nature of the cluster, and here it means that messages from the master about changes in the partition table are sent but not yet delivered to the client. For any inconsistency between its tables and what's actually returned by the storage node, the client must use a Ping request to the master as a network barrier, to update its tables and retry.

A request may also simply fail because the storage node become unavailable. The client node should retry if there's any other readable cell.

Notice to implementers:

  • The design of ZODB is such that a client node should have a cache. Therefore, a request from the application (or, as we'll see later, from conflict resolution) to load an object does not always result in querying a storage node.
  • To choose a node to query: picking a random one is a trivial and quite efficient way to distribute the load evenly. Whether a node is connected or not should be ignored, otherwise the client always load from the same nodes until they write.
  • To avoid looping forever in case of failures, a client can limit to 1 failure per node. NON_READABLE_CELL would not be counted as failure if there are incoming pending updates from the master.

The above way of querying storage nodes for read applies to all requests that consist in reading from 1 node, e.g. AskFinalTID, AskTransactionInformation, etc.

XXX The NEO reference implementation of IStorageUndoable.undoLog is broken so the current protocol for this API is likely to change.

XXX next_serial is likely to be removed from the reply in the future, because it's often expensive for the storage node to return it (e.g. 2 SQL queries for SQL backends) and the client node should be able to do without it.

XXX(kirr) The Ping trick makes many implicit assumptions about e.g. Master being single threaded and other implicit orderings that are true for current NEO/py implementation, but are generally not true if implementation is not made serial. If any ordering is required for correct working the protocol must specify it explicitly.

Commit

MCS (writable cell)S (writable cell)tpc_beginBeginTransactionAnswerBeginTransactioncommitStoreObject | CheckCurrentSerialAnswerStoreObject |AnswerCheckCurrentSerialtpc_voteStoreTransaction | VoteTransactionAnswerStoreTransaction |AnswerVoteTransactionFailedVoteACK | INCOMPLETE_TRANSACTIONtpc_finishFinishTransactionLockInformationAnswerInformationLockedAnswerFinishTransactionUnlockInformation

Commit is split into phases in accordance with ZODB Two-Phase Commit Protocol. In this section, we describe the basic case, when there's no concurrency with other client nodes. In the next sections, we'll detail the extra mechanisms to solve conflicts or even deadlocks, as well as how commit interacts with synchronous replication.

tpc_begin

A client node first asks the master to begin transaction with:

AskBeginTransaction
tid bin | nil nil for IStorage.tpc_begin, or IStorageRestoreable.tpc_begin argument.
AnswerBeginTransaction
ttid bin Temporary TID (same as tid if not nil).

Because the master needs to know which storage nodes are expected to have all data from the client, it must wait that any storage node that was requested to start operation is either ready (NotifyReady) or not RUNNING anymore before processing any AskBeginTransaction packet. Then, the master records the list of ready nodes at the time the transaction is started, generates a new TID, and returns it.

The master must use a single TID generator for both temporary and final TIDs. Any newly-generated TID must be strictly greater than the previous one. The partition containing transaction metadata is computed with TID % NP, similarly to OIDs. Because these metadata are stored before the final TID is known, the latter must have the same modulo as the TTID. Here is an algorithm (the first 2 points are identical to any ZODB implementation):

  1. convert current time to TID
  2. if previously generated TID (let's name it min_tid) is greater or equal, take min_tid + 1
  3. stop here for a TTID
  4. TID += TTID % NP - TID % NP
  5. if still less or equal than min_tid: TID += NP

commit

XXX document askNewOIDs (but where?)

During this phase, the new data is sent to storage nodes, taking write-locks for each modified objects. On storage side, the code giving or rejecting write-locks can be seen as the core of NEO, because it's where many features interact with each other. For a better explanation, we will also describe internals of the reference implementation.

IStorage.store maps to:

AskStoreObject
oid bin OID of object to store.
serial bin Base TID.
compression uint See Read & Write section.
checksum bin
data bin
data_serial bin | nil
ttid bin TTID of transaction being committed.
AnswerStoreObject
locked bin | nil nil if successful.

And ReadVerifyingStorage.checkCurrentSerialInTransaction to:

AskCheckCurrentSerial
ttid bin TTID of transaction being committed.
oid bin OID of object to verify.
serial bin Base TID.
AnswerCheckCurrentSerial
locked bin | nil nil if successful.

Both requests must be done to all writeable cells and both replies are processed identically. The client node does not wait for answers before sending the next store/check: the next phase (vote) will start by waiting for all answers.

The client node can hold some of transaction-related data as follows:

data_dict {oid: (data | check_marker, serial, [nid])} data being checked/stored
cache_dict {oid: data | check_marker | nocache_marker} stored/checked data, data will go to the cache on tpc_finish
conflict_dict {oid: serial} conflicts to resolve
resolved_dict {oid: serial} resolved conflicts
conn_dict {nid: connection | nil} involved storage nodes

For the storage node:

serial_dict {oid: serial} stored/checked objects
store_dict {oid: (oid, data_id, value_serial)} metadata of stored objects
lockless {oid} objects for which the storage accepted to store/check without checking for conflict

In order to handle transactions of any size (bigger than RAM) both the client and storage nodes must act as a pipe for the raw data between the application and the storage devices. Because data being stored must be kept in memory by the client until it has an answer from all written cells, as required by conflict resolution, stores must be throttled to limit memory usage (e.g. same as cache size), by waiting for answers before the vote when the limit is reached. The storage node must write the raw data immediately to the underlying storage device (without commit) and only keep a reference (data_id). At last, if the client has a cache and wants to update it on tpc_finish, it can keep some stored data without exceeding the cache size.

Whenever an object is stored/checked, the client node must request at least 1 storage node, else the transaction fails immediately. It must also look up the current partition table to iterate over the storage nodes to request: in other words, for a given partitition, the list of cells can evolve during the transaction, as we receive updates from the master node.

However, a transaction must always use the same network connection with a storage node. It can establish a new connection with a storage node that is involved for the first time but must not reconnect or use a new connection that would have been established by some concurrent activity (read or concurrent transaction).

tpc_vote

The vote phase starts by waiting for all store/check answers. Then the client requests all involved storage nodes to vote the transaction, in parallel. The following packet is used if the storage node is assigned the partition that contain transaction metadata:

AskStoreTransaction
ttid bin TTID of transaction being committed.
user bin  
description bin  
extension bin  
oids [bin] cache_dict keys
AnswerStoreTransaction
(no parameter)

Else:

AskVoteTransaction
ttid bin TTID of transaction being committed.
AnswerVoteTransaction
(no parameter)

On storage side, the vote consists in storing durably all metadata to a temporary place (i.e. commit on storage backend). It must be done in such a way that tpc_finish should require minimal extra disk space, ideally by just flipping a boolean, in order to maximize the probability that tpc_finish succeed, and also for tpc_finish to complete in constant time, regardless the size of the transaction (in bytes or number of oids). The metadata to store are transaction metadata (if AskStoreTransaction), and object metadata (store_dict values).

Once all storage nodes have voted, the client node must do extra checks if any involved node failed (i.e. if any value in conn_dict indicates a closed connection). First, vote must fail if any oid is not locked by at least one node that didn't fail. Then, if there is any involved RUNNING node that failed (still conn_dict), it must ask the master whether they can be disconnected while keeping the cluster operational:

FailedVote
ttid bin TTID of transaction being committed.
failed [nid] List of failed involved RUNNING storage nodes.
Error
ACK During tpc_finish, the master will disconnect the failed nodes that are expected to have transactions in full (those that are ready since the beginning of the transaction).
INCOMPLETE_TRANSACTION The nodes can't be disconnected and the transaction must be aborted.

tpc_finish

From the point of view of the client, it is usually a single request:

AskFinishTransaction
ttid bin TTID of transaction being committed.
stored_list [bin] List of stored objects.
checked_list [bin] List of checked objects.
AnswerFinishTransaction
tid bin Final TID of the transaction.

If there was a FailedVote request, the master must check again whether the transaction can be committed, failing with INCOMPLETE_TRANSACTION otherwise. Then it iterates over the involved nodes (based on stored_list & checked_list) that are identified:

  • A node is disconnected at this moment if needed (FailedVote).
  • If ready, the node takes part in tpc_finish: we'll refer to them with node_list.

The master continues by generating the final TID (if not given by tpc_begin), adjusting its OID generator if a foreign OID was stored (max(oid_list)), and request node_list to lock the transaction:

AskLockInformation
ttid bin TTID of transaction being committed.
tid bin Final TID.
AnswerInformationLocked
ttid bin Same as in request.

Upon reception of AskLockInformation, the storage node locks all stored objects for reading, blocking all read requests (e.g. AskObject) for these objects. If the node stored transaction metadata, it also marks the transaction as committed, by storing durably the final TID.

Transactions must be unlocked in the same order they are locked. Once concurrent tpc_finish with a smaller final TID are unlocked, and once the master has received all answers from node_list, the transaction can be finalized. If the connection was not lost with the committer node, it replies to it with AnswerTransactionFinished. All other client nodes get invalidations with a single packet:

InvalidateObjects
tid bin TID.
oid_list bin Same as stored_list.

The transaction is unlocked by notifying node_list with:

NotifyUnlockInformation
ttid bin TTID of transaction being committed.

The storage node unlocks by making all committed data readable and releasing all locks (both read & write) taken by the transaction. The reduce I/O, the storage backend does not have to commit durably, at least not immediately: in case of a crash, the transaction would be recovered. A example implementation is to commit only if there's no other commit during the following second (e.g. another transaction being voted or locked).

If the connection to the master is lost waiting for AnswerFinishTransaction, the client should try to recover by querying the final TID with AskFinalTID. The master is asked first:

  • None can be returned if it is obvious that the TTID does not refer to a valid transaction.
  • MAX_TID is returned if the transaction may be committed and the master does not know it anymore, in which case the client should ask storage nodes having the transaction metadata.
  • If the master knows the final TID, it must reply when it processes AnswerInformationLocked.

Deadlock

T₁ST₂oid1oid2oid1waitoid2deadlockT₁S₁S₂T₂oid1oid1oid1waitoid1deadlock

During the commit phase, we saw quickly that a write-lock is taken when an object is stored/checked. Except a special case related to replication (which will be detailed later), a transaction that has successfully stored/checked an object owns a write-lock on this object and this prevents any concurrent transaction from doing the same (in theory, transactions that check the same object could share the lock) until the lock is released. This is object-level locking and with either several storage nodes or transactions committing oids in unsorted order, this could lead to deadlocks.

The core principle is to compare TTIDs. Given TTID1 that started before than TTID2, i.e. TTID1 < TTID2, we consider that TTID1 has statistically more chance to store/check objects before TTID2, which means that it is more common that TTID2 tries to store/check a TTID1-locked object, in which case TTID2 waits. And vice versa, TTID1 trying to store/check a TTID2-locked object is a possible deadlock case, which must be solved without delay.

Conflict Resolution

C₁MSC₂StoreObject (A₀→A₁)AnswerStoreObject (stored)write-lockedStoreObject (A₀→A₂)delayed (ttid₁ < ttid₂)FinishTransactionLockInformationAnswerLockInformationFinishTransactionInvalidateObjectsUnlockInformationunlockedAnswerStoreObject (conflict)conflict resolutionGetObject (A₁)AnswerGetObjectStoreObject (A₁→A₂')

After having looked at whether a write-lock is already taken or not, the storage must check whether there's a conflict for the requested operation, as defined by the ZODB specifications.

This is done by first getting the last TID of the object being stored/checked, which may fail with INCOMPLETE_TRANSACTION (as any other read), and we'll see later that it can a be tricky in the case of internal replication. If there's such a last TID (i.e. not a new OID), it is compared with serial (from AskStoreObject or AskCheckCurrentSerial): if different, the storage node stops here and returns the last TID as locked value in the answer packet, else the transaction gets the lock.

In the case checkCurrentSerialInTransaction, the conflict results in ZODB ReadConflictError, else the client node tries to resolve it. Internally, the ZODB tryToResolveConflict function uses loadSerial, which may result in a read request to any storage node if the data is not in cache, as described in Read section. If the conflict resolution succeeds, the client node stores new data with AskStoreObject and new base TID, as described in the commit phase.

While a transaction is resolving a conflict for an object, a concurrent transaction may have the time to read the last serial of this object (in the diagram, C₁ already has it) and modify it, which means that the resolved conflict will result in another conflict. In theory, if a transaction is always slow enough to resolve a conflict, a concurrent process could prevent it to commit by modifying the conflicting object in loop. We consider this is a non-issue, but an idea to fix it is to take the lock before the second attempt to resolve.

With replicas, there may be several readable cells, in which the client node receives several conflict answers. Resolution can be done upon the first answer and the new data is sent even to nodes that haven't replied yet for previous stores.

Abort

The most common case of abort is after a conflict and it is initiated by the client node, using the following notification:

AbortTransaction
ttid bin TTID of transaction being committed.
nid_list [nid] List of involved storage nodes. Empty if sent to a storage node.

Storage nodes must abort a transaction after any other packet sent by the client for this transaction. If there was a pending write, aborting only via the master would lead to a race condition, with the consequence that storage nodes may lock oids forever. Therefore, the client node must notify both the master and involved storage nodes. And to ensure that involved nodes are notified, the master also notifies them. Most of the time, the storage nodes get 2 AbortTransaction packets, the second one being ignored.

Once finishing the transaction is requested by the client node (AskFinishTransaction), the master becomes the node responsible to initiate an abort if needed. The difference is clearly visible with INCOMPLETE_TRANSACTION, after which the abort is initiated by the client in the case of tpc_vote, and by the master in the case of tpc_finish. In the latter case, a simplified implementation is to notify all ready storage nodes.

A connection failure with a client node also results in dropping its transactions, provided they're not too advanced:

  • When disconnected from the master, storage nodes are notified that the client node is DOWN. Then the master and storage nodes drop transactions that have not been locked yet.
  • When losing connection with a storage node, the transaction is dropped only if it has not voted yet.

Undo

XXX

Replication

destinationstoragesourcestorageloopAskFetchTransactionsloop[for any missing transaction]AddTransactionAnswerFetchTransactionsloopAskFetchObjectsloop[for any missing object]AddObjectAnswerFetchObjects

Replication is the workflow responsible for keeping multiple copies of data on several storage nodes. It defines how a storage node synchronizes from others if, for any reason, it does not have all data. Replication is used in 2 ways:

  • synchronous: Also called internal replication, it is comparable to master-master replication in other databases. The number of replicas (NR) is a property of the partition table and the cluster aims at having any data present in NR+1 storage nodes, so that service can still be provided if up to NR storage nodes goes down. Because a client node writes to all storage nodes having the data being modified, such replication is used only for non-readable cells. It is tightly interrelated to commit process.
  • asynchronous: Also called backup, it is comparable to master-slave replication in other databases. When a NEO database (backup cluster) replicates another NEO DB (upstream cluster), backup storage nodes fetch data from upstream using the same storage-storage protocol than for synchronous replication. By also being part of the upstream cluster, the backup master is notified about new commits and sends replication orders to storage nodes. Inside a backup cluster, there is also replication if there are replicas.

In any case, storage nodes exchange data partition per partitition, first doing all transaction metadata and objects, both being done by increasing TID.

AskFetchTransactions
partition uint Partition number.
length uint Maximum number of transactions to replicate. For example, 1000.
min_tid bin Start of the transaction chunk to replicate, included.
max_tid bin End of the transaction chunk to replicate, included.
tid_list [tid: bin] List of transactions the destination node already has between min_tid and max_tid.
AnswerFetchTransactions
pack_tid nil Not yet implemented.
next_tid bin | nil If nil, the [min_tid..max_tid] chunk is fully replicated, else the length was reached and next_tid is used as min_tid for a next AskFetchTransactions request.
delete_list [tid: bin] Transactions to delete.

For any missing transaction, the source node uses the following packet with same messsage id as AskFetchTransactions:

AddTransaction
user bin  
description bin  
extension bin  
packed bool  
ttid bin  
oids [oid: bin]  

The replication of object records belonging to the same transaction is done by increasing OID:

AskFetchObjects
partition uint Partition number.
length uint Maximum number of objects to replicate. For example, 1000.
min_tid bin Start of the transaction chunk to replicate, included.
max_tid bin End of the transaction chunk to replicate, included.
min_oid bin Start of the transaction chunk to replicate, included.
object_dict {oid: bin, tid_list: [bin]} List of objects the destination node already has in the chunk.
AnswerFetchObjects
pack_tid nil Not yet implemented.
next_tid bin | nil If nil, the [(min_tid,min_oid)..max_tid] chunk is fully replicated, else the length was reached and (next_tid,next_oid) is used as starting point for a next AskFetchObjects request.
next_oid bin | nil
delete_dict {oid: bin, tid_list: [bin]} Objects to delete.

For any missing object, the source node uses the following packet with same messsage id as AskFetchObjects:

AddObject
oid bin  
tid bin  
compression uint  
checksum bin  
data bin  
data_serial bin | nil  

The source must abort AskFetchTransactions & AskFetchObjects with REPLICATION_ERROR if it does not have data, i.e. the replicated partition is dropped or truncated.

Notice to implementers: For performance reasons, AddTransaction & AddObject should be buffered, i.e. they should not be passed to the kernel unless enough data are accumulated (e.g. 64kB) or some other packet type is sent, not only to avoid the cost of TCP_NODELAY but also to reduce the number of syscalls. For these packets, throughput is more important than latency.

Synchronous Replication

S₁S₂MStartOperation |NotifyPartitionChangesNotifyReady (if StartOperation)AskUnfinishedTransactionslockless writesAnswerUnfinishedTransactionsloopreplicationNotifyTransactionFinished| AbortTransactionreplicationshared lockingNotifyReplicationDonenormal lockingNotifyPartitionChanges

A replicating node can not depend on other nodes to fetch the data recently/being committed because that can not be done atomically: it could miss writes between the processing of its request by a source node and the reception of the answer.

This is why outdated cells are writable: a storage node asks the master for transactions being committed and then it is expected to fully receive from the client any transaction that is started after this answer. Which has in turn other consequences:

  • Because the storage node lacks the data to check for conflicts, writes must be accepted blindly (i.e. without taking a write-lock), hereinafter lockless writes. This is possible because 1 readable cell (for each partition) is enough to do these checks.
  • Even if the replication is finished, we have to wait that write-locking is back to normal before announcing to the master that the cell is readable. Until NotifyPartitionChanges is received, the cell is still OUT_OF_DATE, but it is internally readable, in order to check for conflicts.
AskUnfinishedTransactions
offset_list [partition: uint] List of OUT_OF_DATE partitions.
AnswerUnfinishedTransactions
max_tid bin TID of last committed transaction.
ttid_list [ttid: bin] List of transactions being committed.

XXX Because the master already knows the list of outdated cells, the above request could be changed to a single NotifyUnfinishedTransactions (M  →  S) paquet.

The storage marks all partitions in offset_list are having to be replicated up to max_tid. It also has to wait for all transactions in ttid_list to end. It is notified of aborted transactions with AbortTransaction (as usual). For successful commits, the master sends:

NotifyTransactionFinished
ttid bin TTID.
max_tid bin TID of last committed transaction, which is actually the one that corresponds to ttid.

Note that a storage node may participate to transactions listed in AnswerUnfinishedTransactions, in which case NotifyTransactionFinished must be sent just after NotifyUnlockInformation. Then the resulting replication would fetch missed writes: there may be none.

The storage node is free to start replicating (up to the last max_tid received so far) even it's still for transactions to complete, provided that a last replication process is performed after the last NotifyTransactionFinished.

On the side of transaction management, write-locking evolves through 2 phases before going back to normal.

Lockless Writes

As long as the partition is not internally readable, no lock is taken by AskStoreObject & AskCheckCurrentSerial: the storage node must remember lockless oids and reply with locked = ZERO_TID.

On client side, such an answer could be a late one, to a request that resulted in a conflict (from readable cells), and it must be ignored. Otherwise, the cell has to be marked as not being fully write-locked: at tpc_vote, if any connection to a storage node was lost, every touched partition must be fully write-locked by at least 1 node. Then, it continues to process the answer like for a normal one.

Shared Locking

At the time the partition is fully replicated, the storage node may have accepted multiple transactions to store/check the same oid and it does not know which one will win (assuming there aren't only checks), whereas others will either abort or store again: in the latter case, it is again a lockless write (ZERO_TID returned). For these oids, the write-lock is given to the highest TTID, which matters for any new transaction that tries to lock it.

As soon as there's only 1 lockless write for an oid, this oid is write-locked normally. When all oids are write-locked normally for a replicated partition, the storage node notifies the master:

NotifyReplicationDone
partition uint Partition number.
max_tid bin TID of last replicated transaction.

XXX Explain why NotifyReplicationDone can't be sent earlier (end of phase 1). Maybe required for deadlock avoidance, or because it was implemented before we realized that the client must remember cells that aren't fully write-locked.

Backup

XXX Whole section to review.

Backup, or more exactly online backup, is the workflow responsible for keeping data from one NEO cluster to be also available at another NEO cluster. If the first cluster is the main one used for a service, and the second cluster is located in e.g. another building or another country, the link latency in between clusters will be much more than intra cluster link latencies. With current state of NEO it is not practical to mix storage nodes with different link latencies into one cluster as the slower ones will be retarding most operations (else it makes more sense to just use all nodes as replicas). However the cluster located at another physical place has high chances to be resilient to physical failure at original place and thus the backup cluster (likey with backup service instance also moved nearby it) can be used in the case of such damage.

Online backup relies on Invalidation and Replication for its working. Whenever a backup cluster is setup its primary master connects as client node to master of the main cluster to receive invalidation messages this way staying informed about transactions committed. The backup master issues commands to its storage nodes to replicate corresponding data from storages of the main cluster. The storages in turn use the same replication protocol which is used in main cluster for asynchronous replication.

XXX(jm) document in more detail how backing up works, in particular how backup cluster reports its partition table and cell states there.

Deadlock avoidance

XXX Whole section to review, once deadlock avoidance is reimplemented, due to performance issues.

A typical scenario is the following one:

  1. C1 does a first store (X or X)
  2. C2 stores X and X; one is delayed
  3. C1 stores the other -> deadlock

When solving the deadlock, the data of the first store may only exist on the storage nodes.

Deadlocks are detecting by comparing locking TIDs; initially a locking TID is the same as the TTID. When trying to (write-)lock an OID that is already locked by a smaller locking TID, the store is delayed: this behaviour is chosen because a transaction that starts first is likely to store first. Otherwise, when trying to lock with a smaller locking TID, a deadlock is issued.

Deadlock avoidance Protocol

The main idea behind deadlock avoidance is to rebase OIDs that have already been stored. In order that the comparison of locking TIDs keeps working, the master is involved to deliver a new locking TID:

1. Sa: mark the transaction as being rebased
2. Sa: release all write-locks
3. Sa: sortAndExecuteQueuedEvents
4. Sa > M   NotifyDeadlock{
                ttid,                  # to identify the transaction
                current locking tid,   # M must ignore late/redundant packets
              }
5. Sa: waiting that the transaction is rebased, the store is delayed
6. M > C    NotifyDeadlock{ttid, new locking tid}
7. C > S    # to all involved storage nodes
            RebaseTransaction{ttid, new locking tid}
8. Sb: sets new locking tid
9. Sb (a!=b): same as 2
10. Sb: sortAndExecuteQueuedEvents

At step 10, a new deadlock may happen, in which case it loops to 1 and an empty answer is sent back to the client:

11. Sb > C   AnswerRebaseTransaction{[]}

If there's no new deadlock:

11. lock all oids that can be relocked immediately
12. Sb > C   AnswerRebaseTransaction{
                oids that are locked by another transaction
                + those that conflict
              }

Then for each oid returned by AnswerRebaseTransaction:

13. C > Sb   RebaseObject{ttid,oid}

The processing of RebaseObject is similar to that of StoreObject, with the following differences:

  • The storage node already has the data.
  • Nothing to do in the following case: there was a previous rebase for this oid, it was still delayed during the second RebaseTransaction, and then a conflict was reported when another transaction was committed. A similar case is when a partition is dropped.
  • On conflict, the data must be sent back to the client in case that it does not have it anymore. In this case, the storage node forget this oid.

Possible answers:

  • locked: AnswerRebaseObject{}
  • conflict: AnswerRebaseObject{old tid,saved tid,data} where data is None for checkCurrent or undo (the client has enough information on its side to distinguish the 2 cases)

On client, AnswerRebaseObject is also handled in a similar way that AnswerStoreObject.

Note

RebaseObject is a request packet to simplify the implementation. For more efficiency, this should be turned into a notification, and RebaseTransaction should answered once all objects are rebased (so that the client can still wait on something).

Cascaded deadlocks

XXX Whole section to review.

So-called cascaded deadlock is when a new deadlock happens during deadlock avoidance. The above protocol does handle this case. Here is a example with a single storage node:

# locking tids: t1 < t2 < t3

1. A2 (t2 stores A)
2. B1, c2 (t2 checks C)
3. A3 (delayed), B3 (delayed), D3 (delayed)
4. C1 -> deadlock: B3
5. d2 -> deadlock: A3

# locking tids: t3 < t1 < t2

6. t3 commits
7. t2 rebase: conflicts on A and D
8. t1 rebase: new deadlock on C
9. t2 aborts (D non current)

all locks released for t1, which rebases and resolves conflicts

Deadlock avoidance also depends on correct ordering of delayed events (sortAndExecuteQueuedEvents), otherwise the cluster could enter in an infinite cascade of deadlocks. Here is an overview with 3 storage nodes and 3 transactions modifying the same oid (2 replicas):

S1     S2     S3     order of locking tids          # abbreviations:
l1     l1     l2     123                            #  l: lock
q23    q23    d1q3   231                            #  d: deadlock triggered
r1:l3  r1:l2  (r1)   # for S3, we still have l2     #  q: queued
d2q1   q13    q13    312                            #  r: rebase

Above, we show what happens when a random transaction gets a lock just after that another is rebased. Here, the result is that the last 2 lines are a permutation of the first 2, and this can repeat indefinitely with bad luck.

The probability of deadlock must be reduced by processing delayed stores/checks in the order of their locking tid. In the above example, S1 would give the lock to 2 when 1 is rebased, and 2 would vote successfully.

Maintenance

XXX

Add/Replace/Remove Storage Nodes

XXX

Cloning a Storage Node

XXX

Partition Table Tweak

XXX

Pack

XXX

Storage Backends

Although the following sections are only an example of how data can be stored persistently, they should help a lot in understanding the internals of NEO.

The example is for a backend storing data in a relational transactional database like MariaDB/InnoDB. One advantage of SQL is that it makes backend data easier to inspect; SQL is not mandatory. We will describe the schema of tables that are used in the reference implémentation.

In ZODB, TIDs are a sequence of 8 bytes. It may be easier to handle 64-bit integers, as shown in the example.

config

Database & storage node metadata.

name VARBINARY(255) NOT NULL PRIMARY KEY
value VARBINARY(255) NULL

name can be:

  • name: cluster name
  • nid: node id
  • ptid: partition table id
  • replicas: number of replicas
  • version: version of backend storage format
  • truncate_tid: TID at which the DB will have to be truncated (see Truncate packet)
  • backup_tid: (backup mode only) TID at which the node is considered up-to-date

pt

Partition table.

partition SMALLINT UNSIGNED NOT NULL
nid INT NOT NULL
tid BIGINT NOT NULL
  • PRIMARY KEY (partition, nid)

tid is either:

  • ≤ the opposite of the cell state
  • > 0: assigned OUT_OF_DATE cell with all data up to this value

The purpose of an OUT_OF_DATE TID per assigned cell is to optimize resumption of replication by starting from a greater TID:

Although data that are already transferred aren't transferred again, checking that the data are there for a whole partition can still be a lot of work for big databases. A storage node that gets disconnected for a short time should becomre fully operational quite instantaneously because it only has to replicate the new data. Without such an optimization, the time to recover would dependon the size of the database.

For OUT_OF_DATE cells, the difficult part is that they are writable and can then contain holes, so we can't just take the last TID in trans/obj. This is solved that by remembering the latter at the moment the state switches from UP_TO_DATE/FEEDING, for each assigned partition. With only a single global value, a cell that still has a lot to replicate would still cause all other cells to resume from the a very small TID, or even ZERO_TID; the worse case is when a new cell is assigned to a node (as a result of tweak).

trans

Transaction metadata of committed transactions.

partition SMALLINT UNSIGNED NOT NULL
tid BIGINT UNSIGNED NOT NULL
packed BOOLEAN NOT NULL
oids MEDIUMBLOB NOT NULL
user BLOB NOT NULL
description BLOB NOT NULL
ext BLOB NOT NULL
ttid BIGINT UNSIGNED NOT NULL
  • PRIMARY KEY (partition, tid)

ttid is the TTID that was used when the transaction was being committed, which is required for AskFinalTID. It is also useful for debugging and performance analyzis (the difference between TTID and TID gives the time needed to commit the transaction). When importing transactions, this value can be the same as tid.

obj

Object metadata of committed transactions.

partition SMALLINT UNSIGNED NOT NULL
oid BIGINT UNSIGNED NOT NULL
tid BIGINT UNSIGNED NOT NULL
data_id BIGINT UNSIGNED NULL
value_tid BIGINT UNSIGNED NULL
  • PRIMARY KEY (partition, oid, tid)
  • KEY tid (partition, tid, oid)
  • KEY (data_id)

data_id refers to data.id.

data

Actual object content.

id BIGINT UNSIGNED NOT NULL PRIMARY KEY
hash BINARY(20) NOT NULL
compression TINYINT UNSIGNED NULL
value LONGBLOB NOT NULL

id shall be generated as follows:

  • High-order 16 bits: partition
  • Low-order 48 bits: per-partition autoincrement

Thus, replicating a partition usually results in reading data sequentially. Not doing this causes much read-amplification or disk seeks.

ttrans & tobj

Metadata of transactions being committed.

They are temporary counterparts of trans & obj. The differences are:

  • ttrans.tid can be NULL.
  • ttrans: PRIMARY KEY (ttid)
  • tobj: PRIMARY KEY (tid, oid)
  • tobj: no secondary index

tobj.tid contains the TTID.

These tables are filled during tpc_vote: only tobj if VoteTransaction, or at least ttrans if AskStoreTransaction. This is followed by a commit, which often includes changes to data by AskStoreObject requests.

trans.tid is set during tpc_finish, when processing AskLockInformation. This is followed by a commit.

Upon reception of NotifyUnlockInformation, the content is moved to trans & obj, by changing tobj.tid on the fly. It is recommended to wait a while before commit.

Appendix

XXX

References

The original design from Yoshinori Okuji introduces most concepts of NEO architecture and protocol with much details to describe the network protocol. Although the original protocol has been changed since then, the concepts described in the document are still valid and reading the document helps understanding the architecture of NEO.

DZUG presentation XXXX

 

Thank You

  • Nexedi SA
  • 147 Rue du Ballon
  • 59110 La Madeleine
  • France
  • +33629024425

For more information, please contact Jean-Paul, CEO of Nexedi (+33 629 02 44 25).