(this document is being written: see below for its state)
This document first introduces NEO architecture and the different roles of each node in a NEO cluster. Then it gives precise definition of communication protocol and semantic of workflows involved:
In Network Layer, we explain how data is exchanged between nodes: serialization format and link establishment. But we do not explain the semantic of messages exchanged between nodes.
In Operating Data, we define the data that describe the state of the cluster, and how they are exchanged between nodes.
In Start & Stop, we detail how a cluster becomes operational and cleanly shutdown.
In Read & Write, we describe the different workflows while providing service to clients.
In Replication, we explain how we get High Availability and online backups.
Deadlock Avoidance is part of Read & Write but easier to document after Replication.
In Maintenance, we see various tasks like ZODB Pack and add/replace/remove storage nodes.
In Storage Backends, we describe the typical architecture of a possible implementation of storage backend.
Notice to implementers: A NEO deployement can be seen as a single application, in that a node can trust peers to implement the same algorithm. However, failures like a node being stopped or unpredictable network may happen at any time and such misbehaviour should be handled gracefully.
XXX The following sections are not finished:
NEO is a distributed, redundant and scalable implementation of ZODB API, which means it is a transactional noSQL database with ZODB clients that can use it. More technically, NEO is a ZODB storage backend and this document assumes you have read the ZODB book.
Let us start by explaining the architecture of NEO. This section is inspired by Linux Solutions Presentation and summarized with less technical details.
NEO consists of three components: master nodes (M), storage nodes (S) and client nodes (C). Here, node means a software component which runs on a computer: it can either be a process or a set of threads inside another process. They can run in the same computer or different computers in a network.
At the level of an application accessing to a NEO database, there's the client part of NEO, which is used through the ZODB. Client nodes interact with the primary master node and storage nodes to perform transactions and retrieve data.
Among master nodes, one is the primary: as the arbitrator of the cluster, its tasks are to generate new identifiers for objects (OID) and transactions (TID), broadcast invalidations, control transactions, take decisions about topology changes. It does the minimum to guarantee the consistency of the database and in particular, it never sees any application data.
Other master nodes are called secondary. They are spare master nodes. One of them can take over the role of primary master if the current one fails. A master node becomes primary after an election.
Storage nodes are the only ones storing data persistently: mainly application data, i.e. objects and transactions that form the database, but also all the cluster metadata that the primary master needs to recover.
A fourth node type is optional. The administration of a NEO cluster is done via an admin node (A), which issues commands to the primary master. It comes with a command-line tool called neoctl.
A NEO database is split in partitions and each storage node is assigned to several partitions. These assignments form the partition table: each assignment is called cell.
Locating in which partition an OID belongs is done by computing OID % NP, where NP is the number of partitions. Then, the partition table is queried to know which storage node(s) contain(s) the object.
Load balancing is achieved by having several storage nodes, similarly to RAID0. For high availability, each partition can also be assigned to NR+1 storage nodes, similarly to RAID1+0: NR is the number of replicas.
Each cell has a state: for example, it is not readable if it does not contain all the partition data. By also looking at node states, a client knows where to load or store data.
The primary node broadcasts all changes to the partition table to all other nodes, and it also notifies of all node state changes.
Common ZODB implementations do database-level locking, which means that the processing of a tpc_begin must wait that another transaction being committed is finished. In some cases like ZEO for 2 distinct server connections, the database is locked during vote, as described in the above schema.
For better scalability, NEO implements object-level locking, so that transactions modifying disjoint sets of oids can be committed simultaneously. Concurrent tpc_begin and tpc_finish can even happen in a different order. This implies the use of a temporary TID (TTID) during the commit as a way for different nodes to refer to the transaction, and the resolution of possible deadlocks.
During a commit, modified oids are write-locked. On tpc_finish, they are also read-locked because storage nodes may not be ready yet when the clients are notified of the new transaction.
NEO also aims to handle efficiently transactions of any size, in number of bytes or number of changed OIDs. For this, the protocol is designed in such way that the client and storage nodes can process a stream of data even on systems with limited RAM.
The above example of cluster is the simplest case of split brain, when 2 master nodes don't see each other, usually due to a network cut.
NEO makes sure that at most one part is operational, even if several parts have all data (NR>0) to serve clients. At most one part must be writable by clients, so that other parts are only out of date and don't diverge.
This is done by only starting operation automatically if it's safe to do so. Nothing prevents a user from forcing that if all data are there.
Communication between two nodes is done by exchanging messages over a streaming point-to-point protocol that supports guaranteed order and delivery of messages. Only TCP and SSL over TCP are implemented for now (over IPv4 and IPv6), but other protocols like UNIX sockets or QUIC would be possible. For TCP, TCP_NODELAY shoud be used.
Connection failures shall also be detected. The NEO protocol has no mechanism for that, so it has to be done at the level of the underlying protocol, e.g. by enabling TCP keep-alives.
We start by describing how these messages are serialized; their semantics will be explained in the sections describing workflows. MessagePack is used for all packets. The handshake packet is an array with the following 2 fields:
All other packets are arrays of 3 fields, as described in the next section:
The message code encodes the type of the message. The following table lists the 70 different types that can be exchanged. 44 of them are requests with response packets. 1 of them is a generic response packet for error handling. The remaining 25 are notification packets.
The code (#) of a response packet is the same as the corresponding request one, with the highest order bit set. Using Python language, it translates as follows:
response_code = request_code | 0x8000
Message IDs are used to identify response packets: each node sends a request with a unique value and the peer replies using the same id as the request. Notification packets normally follow the same rule as request packets, for debugging purpose. In some complex cases where replying is done with several notification packets followed by a response one (e.g. replication), the notification packets must have the same id as the response.
Notice to implementers:
# | Message | Description | Workflow | Nodes |
---|---|---|---|---|
0 | Error | Error is a special type of message, because this can be sent against any other message, even if such a message does not expect a reply usually. | * → * | |
1 | RequestIdentification | Request a node identification. This must be the first packet for any connection. | * ⇄ * | |
2 | Ping | Empty request used as network barrier. | * ⇄ * | |
3 | CloseClient | Tell peer that it can close the connection if it has finished with us. | * → * | |
4 | AskPrimary | Ask node identier of the current primary master. | ctl ⇄ A | |
5 | NotPrimaryMaster | Notify peer that I'm not the primary master. Attach any extra information to help the peer joining the cluster. | SM → * | |
6 | NotifyNodeInformation | Notify information about one or more nodes. | M → * | |
7 | AskRecovery | Ask storage nodes data needed by master to recover. Reused by `neoctl print ids`. | M ⇄ S ctl ⇄ A ⇄ M |
|
8 | AskLastIDs | Ask the last OID/TID so that a master can initialize its TransactionManager. Reused by `neoctl print ids`. | M ⇄ S ctl ⇄ A ⇄ M |
|
9 | AskPartitionTable | Ask storage node the remaining data needed by master to recover. | M ⇄ S | |
10 | SendPartitionTable | Send the full partition table to admin/client/storage nodes on connection. | M → A, C, S | |
11 | NotifyPartitionChanges | Notify about changes in the partition table. | M → * | |
12 | StartOperation | Tell a storage node to start operation. Before this message, it must only communicate with the primary master. | M → S | |
13 | StopOperation | Notify that the cluster is not operational anymore. Any operation between nodes must be aborted. | M → S, C | |
14 | AskUnfinishedTransactions | Ask unfinished transactions, which will be replicated when they're finished. | S ⇄ M | |
15 | AskLockedTransactions | Ask locked transactions to replay committed transactions that haven't been unlocked. | M ⇄ S | |
16 | AskFinalTID | Return final tid if ttid has been committed, to recover from certain failures during tpc_finish. | M ⇄ S C ⇄ M, S |
|
17 | ValidateTransaction | Do replay a committed transaction that was not unlocked. | M → S | |
18 | AskBeginTransaction | Ask to begin a new transaction. This maps to `tpc_begin`. | C ⇄ M | |
19 | FailedVote | Report storage nodes for which vote failed. True is returned if it's still possible to finish the transaction. | C ⇄ M | |
20 | AskFinishTransaction | Finish a transaction. Return the TID of the committed transaction. This maps to `tpc_finish`. | C ⇄ M | |
21 | AskLockInformation | Commit a transaction. The new data is read-locked. | M ⇄ S | |
22 | InvalidateObjects | Notify about a new transaction modifying objects, invalidating client caches. | M → C | |
23 | NotifyUnlockInformation | Notify about a successfully committed transaction. The new data can be unlocked. | M → S | |
24 | AskNewOIDs | Ask new OIDs to create objects. | C ⇄ M | |
25 | NotifyDeadlock | Ask master to generate a new TTID that will be used by the client to solve a deadlock by rebasing the transaction on top of concurrent changes. | S → M → C | |
26 | AskRebaseTransaction | Rebase a transaction to solve a deadlock. | C ⇄ S | |
27 | AskRebaseObject | Rebase an object change to solve a deadlock. | C ⇄ S | |
28 | AskStoreObject | Ask to create/modify an object. This maps to `store`. | C ⇄ S | |
29 | AbortTransaction | Abort a transaction. This maps to `tpc_abort`. | C → S C → M → S |
|
30 | AskStoreTransaction | Ask to store a transaction. Implies vote. | C ⇄ S | |
31 | AskVoteTransaction | Ask to vote a transaction. | C ⇄ S | |
32 | AskObject | Ask a stored object by its OID, optionally at/before a specific tid. This maps to `load/loadBefore/loadSerial`. | C ⇄ S | |
33 | AskTIDs | Ask for TIDs between a range of offsets. The order of TIDs is descending, and the range is [first, last). This maps to `undoLog`. | C ⇄ S | |
34 | AskTransactionInformation | Ask for transaction metadata. | C ⇄ S | |
35 | AskObjectHistory | Ask history information for a given object. The order of serials is descending, and the range is [first, last]. This maps to `history`. | C ⇄ S | |
36 | AskPartitionList | Ask information about partitions. | ctl ⇄ A | |
37 | AskNodeList | Ask information about nodes. | ctl ⇄ A | |
38 | SetNodeState | Change the state of a node. | ctl ⇄ A ⇄ M | |
39 | AddPendingNodes | Mark given pending nodes as running, for future inclusion when tweaking the partition table. | ctl ⇄ A ⇄ M | |
40 | TweakPartitionTable | Ask the master to balance the partition table, optionally excluding specific nodes in anticipation of removing them. | ctl ⇄ A ⇄ M | |
41 | SetNumReplicas | Set the number of replicas. | ctl ⇄ A ⇄ M | |
42 | SetClusterState | Set the cluster state. | ctl ⇄ A ⇄ M | |
43 | Repair | Ask storage nodes to repair their databases. | ctl ⇄ A ⇄ M | |
44 | NotifyRepair | Repair is translated to this message, asking a specific storage node to repair its database. | M → S | |
45 | NotifyClusterInformation | Notify about a cluster state change. | M → * | |
46 | AskClusterState | Ask the state of the cluster | ctl ⇄ A A ⇄ M |
|
47 | AskObjectUndoSerial | Ask storage the serial where object data is when undoing given transaction, for a list of OIDs. | C ⇄ S | |
48 | AskTIDsFrom | Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Used by `iterator`. | C ⇄ S | |
49 | AskPack | Request a pack at given TID. | C ⇄ M ⇄ S | |
50 | CheckReplicas | Ask the cluster to search for mismatches between replicas, metadata only, and optionally within a specific range. Reference nodes can be specified. | ctl ⇄ A ⇄ M | |
51 | CheckPartition | Ask a storage node to compare a partition with all other nodes. Like for CheckReplicas, only metadata are checked, optionally within a specific range. A reference node can be specified. | M → S | |
52 | AskCheckTIDRange | Ask some stats about a range of transactions. Used to know if there are differences between a replicating node and reference node. | S ⇄ S | |
53 | AskCheckSerialRange | Ask some stats about a range of object history. Used to know if there are differences between a replicating node and reference node. | S ⇄ S | |
54 | NotifyPartitionCorrupted | Notify that mismatches were found while check replicas for a partition. | S → M | |
55 | NotifyReady | Notify that we're ready to serve requests. | S → M | |
56 | AskLastTransaction | Ask last committed TID. | C ⇄ M ctl ⇄ A ⇄ M |
|
57 | AskCheckCurrentSerial | Check if given serial is current for the given oid, and lock it so that this state is not altered until transaction ends. This maps to `checkCurrentSerialInTransaction`. | C ⇄ S | |
58 | NotifyTransactionFinished | Notify that a transaction blocking a replication is now finished. | M → S | |
59 | Replicate | Notify a storage node to replicate partitions up to given 'tid' and from given sources. | M → S | |
60 | NotifyReplicationDone | Notify the master node that a partition has been successfully replicated from a storage to another. | S → M | |
61 | AskFetchTransactions | Ask a storage node to send all transaction data we don't have, and reply with the list of transactions we should not have. | S ⇄ S | |
62 | AskFetchObjects | Ask a storage node to send object records we don't have, and reply with the list of records we should not have. | S ⇄ S | |
63 | AddTransaction | Send metadata of a transaction to a node that does not have them. | S → S | |
64 | AddObject | Send an object record to a node that does not have it. | S → S | |
65 | Truncate | Request DB to be truncated. Also used to leave backup mode. | ctl ⇄ A ⇄ M M ⇄ S |
|
66 | FlushLog | Request all nodes to flush their logs. | ctl → A → M → * | |
67 | AskMonitorInformation | ctl ⇄ A | ||
68 | NotifyMonitorInformation | A → A | ||
69 | NotifyUpstreamAdmin | M → A |
Enum values are serialized using Extension mechanism: type is the number of the Enum type (as listed above), data is MessagePack serialization of the Enum value (i.e. a positive integer). For example, NodeStates.RUNNING is encoded as \xd4\x03\x02.
Naming choice: For cell states, node states and node types, names are chosen to have unambiguous initials, which is useful to produce shorter logs or have a more compact user interface. This explains for example why RUNNING was preferred over UP.
INVALID_TID | '\xff\xff\xff\xff\xff\xff\xff\xff' |
INVALID_OID | '\xff\xff\xff\xff\xff\xff\xff\xff' |
INVALID_PARTITION | 0xffffffff |
ZERO_HASH | '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' |
ZERO_TID | '\x00\x00\x00\x00\x00\x00\x00\x00' |
ZERO_OID | '\x00\x00\x00\x00\x00\x00\x00\x00' |
MAX_TID | '\x7f\xff\xff\xff\xff\xff\xff\xff' |
MAX_TID could be bigger but in the Python implementation, TIDs are stored as integers and some storage backend may have no support for values above 2⁶³-1 (e.g. SQLite).
Node IDs are 32-bit integers. NID namespaces are required to prevent conflicts when the master generates new ids before it knows those of existing storage nodes. The high-order byte of node ids is one the following values:
Storage | 0x00 |
Master | -0x10 |
Client | -0x20 |
Admin | -0x30 |
Actually, only the high order bit is really important and the 31 other bits could be random, but extra namespace information and non-randomness of 3 LOB help to read logs.
Before two nodes can exchange messages, they first need to connect to each other by way of underlying network.
For a given connection, one node was listening and another one dialed listening node address. The way to set address a node listens on is outside scope of this specification. Nodes publish their addresses to the primary master node during identification (see RequestIdentification message). Node information is then broadcast using NotifyNodeInformation.
In the context of established link the node which dialed the listener is called "dialing node", while the node which was listening - "server node".
Upon exception with the underlying connection, all incoming pending packets must be processed (but dropping any further packet that would be sent via this connection). When connected to a peer (i.e. when not connecting), a node can initiate a disconnection in 2 ways, depending on whether pending outgoing packets must be sent before actually closing the underlying connection: if they should, it is so-called aborted and the node must not process any incoming packet from this connection. Aborting a connection is often used to explain a peer why it is disconnected (e.g. Error message).
NEO-specific handshake follows after underlying network connection is established to make sure nodes talks the same protocol. Handshake transmissions are not ordered with respect to each other and can go in parallel.
To avoid accidental DoS or stuck communication in case a connection is made with a non-NEO node, MessagePack should not be used to decode this first packet. Comparison should be done:
To simplify the implementation, the MessagePack encoding must be as small as possible, using bin format for the magic, so '\x92\xa3NEO\x01' for the current version.
Handshake succeeds if the comparison matches exactly. Else, the underlying network link is closed, thus cancelling link establishment.
The version is increased whenever upgrading a node may require to upgrade other nodes.
Notice to implementers:
After successful handshake, any node except tools connecting to the admin node to send command must identify with the server node which either accepts or rejects it. On success, the link becomes established and further message exchange on it can happen depending on peer types, workflow and current cluster state.
RequestIdentification | ||
---|---|---|
node_type | NodeTypes | Type of the requesting node. |
nid | int | nil | Node ID of the requesting node if one was already assigned. |
address | [host: bin, port: int] | nil | Address the requesting node itself listens on. |
name | bin | Name of the NEO cluster, to catch misconfiguration mistakes. |
id_timestamp | float | nil | Time when the requesting node was identified by the master. |
extra | {key: bin, value: any} | Extra node properties. |
AcceptIdentification | ||
node_type | NodeTypes | Type of the server node |
nid | int | nil | Node ID of the server node. |
your_nid | int | nil | Node ID assigned to requesting node if server is master, otherwise echo of nid parameter. |
Once the cluster name is checked, the master should check nid and address and reject if it conflicts with an already identified node, using PROTOCOL_ERROR. In particular, it should protect against misconfiguration when cloning a node. For exemple, address must not be the same as the the one of the master. The wanted nid can be ignored for all nodes except storages (a negative nid is considered temporary).
See the beginning of IdentificationHandler.requestIdentification for an example of implementation (note: the uuid variable should be renamed into nid).
Then, processing depends on node_type. Unknown values result in PROTOCOL_ERROR. The master rejects with NOT_READY if the connecting node can't be serviced for the current cluster state (an untested idea is to suspend the identification until the cluster enters into an appropriate state). If accepted:
XXX There's currently a duplicate nid issue at cluster creation when storage nodes are identified before all masters see each other and elect a primary. Do storage nodes need an id before the cluster is created? Another solution is that they treat their ids as temporary as long as the partition table is empty.
The general case is to only accept known nodes (from the master), with same cluster name, and with a id_timestamp that matches. It can mismatch in 2 ways:
Admin and storage nodes also accept connections from nodes of same type in a related backup cluster. In such case, nid, address and id_timestamp are nil. For storage, the cluster name must match as usual, whereas backup admins specify their own cluster name.
There's no identification between neoctl and an admin node.
As part of replication, it is possible that 2 storage nodes of the same cluster connect to each other at the same time. The race is solved by comparing nid: if the peer id is greater, identification succeeds, replacing the current outgoing link establishment, else it is rejected with PROTOCOL_ERROR.
It implies that the established link is used for requests from both sides. To avoid that storages remain connected with each other forever, the CloseClient packet (no argument) can be used to notify the peer that it's free to close the connection.
XXX Reusing a connection as client whereas CloseClient was already sent is unlikely to work reliably, because the peer may close at any time. The packet should be changed into AsClient(bool).
Besides data actually stored in the database, there is also state maintained among nodes representing e.g. current set of nodes, partition table, master's idea about what is currently going on with the cluster, etc. The primary master maintains main version of this state, makes changes to it and shares this state with all its peers.
These data consist of:
Whenever some information is shared with a node, it is sent in whole just after identification; then incremental updates are sent after each change. Note however this rule does not apply to the cluster state, but it may change in the future.
The next sections describe them.
These data can also serve to request nodes to perform actions, like shutdown a node by telling it it is DOWN.
Possible states for each type:
ADMIN | CLIENT | SM | STORAGE | |
---|---|---|---|---|
UNKNOWN | notification only | |||
DOWN | ✓ | ✓ | ||
RUNNING | ✓ | ✓ | ✓ | ✓ |
PENDING | ✓ |
Notification matrix:
ADMIN | CLIENT | MASTER | STORAGE | |
---|---|---|---|---|
ADMIN | (backup) | ✓ | ✓ | ✓ |
CLIENT | (backup) | ✓ | ✓ | |
SM | ✓ | |||
STORAGE | ✓ | ✓ | ✓ | |
(e.g. clients only know the states of master and storage nodes) |
All nodes have a list of nodes in volatile memory, with following information:
This table is maintained by the primary master, which broadcasts it to interested nodes (see notification matrix). The same packet is used for full data upon connection to the master and incremental changes afterwards:
NotifyNodeInformation | ||
---|---|---|
timestamp | float | Strictly monotonic counter. Any node that requests identification with a greater timestamp shall be delayed. |
node_list | [node_type: NodeTypes, addr: [host: bin, port: int] | nil, nid: int | nil, state: NodeStates, id_timestamp: float | nil] | List of nodes. |
Node states describe the current status of nodes from primary master point of view. By definition, the primary master node is therefore always running and the above matrices use SM to only refer to secondary master nodes. Admin and client nodes are also either running or forgotten, so node states are somehow only useful for other types.
If the primary master node fails, all nodes must be able to find the new elected one, which explains why any change to the list of master nodes is broadcasted to all nodes without exception.
For the monitoring of backup clusters, their admin nodes must be able to connect to the admin of the main cluster. Information about the latter is propagated via backup master nodes (seen as clients). Hence (backup) in the notification matrix, using the backup extra node property as described in the Link Establishment section.
Apart from that, the following changes are considered:
|
Cell States: |
|
Full table:
AskPartitionTable | ||
---|---|---|
(no parameter) | ||
AnswerPartitionTable | ||
ptid | int | nil | Partition Table ID. |
num_replicas | int | Number of replicas. |
row_list | [[(nid: int, state: CellStates)]] | NP lists of cells, from partition 0 to NP-1. |
SendPartitionTable | ||
---|---|---|
same as AnswerPartitionTable |
Updates:
NotifyPartitionChanges | ||
---|---|---|
ptid | int | Partition Table ID. |
num_replicas | int | Number of replicas. |
cell_list | [(partition: int, nid: int, state: CellStates)] | List of cells. |
UP_TO_DATE is the initial cell state when the database is created. Then, i.e. after a tweak, new cells always start in OUT_OF_DATE state.
Cell States readable/writable:
OUT_OF_DATE | UP_TO_DATE | FEEDING | CORRUPTED | DISCARDED | |
---|---|---|---|---|---|
readable | ✓ | ✓ | |||
writeable | ✓ | ✓ | ✓ |
The partition table is said operational when there's at least 1 readable cell for every partition. When it isn't, the master isn't authoritative anymore about it and therefore doesn't send it to storage nodes: we'll see later that's one of the reason causing the master to go back to RECOVERING cluster state. The only case a storage node stores an PT that isn't operational is when a cell of a backup cluster is CORRUPTED (after comparison against an upstream cluster). The master can always notify admin nodes, so that the user is informed about what it knows.
Allowed states for each type:
CLIENT | STORAGE | other | |
---|---|---|---|
RUNNING/BACKINGUP | ✓ | ✓ | ✓ |
STOPPING_BACKUP | [1] | ✓ | |
other | [2] | ✓ | |
|
Cluster States:
The primary master indicates to other nodes its idea about what is currently going on with the cluster via the cluster state value.
NotifyClusterInformation | ||
---|---|---|
state | ClusterStates | Cluster state. |
It is only sent on state change, not after identification. Admin nodes always want to know the state and therefore reuse the following ctl ⇄ A packet:
AskClusterState |
---|
(no parameter) |
AnswerClusterState |
same as NotifyClusterInformation |
During identification, the master must reject with NOT_READY nodes that are not allowed to join the cluster in the current state. And reciprocally, on state change, it must disconnect nodes that aren't allowed in the new state.
Before going back to RECOVERING:
The main task of a NEO cluster is to provide read/write service to clients. However before this operational service could be provided, the cluster has to undergo through several startup states whose purpose is to make sure the cluster is indeed ready.
These cluster states are detailed in the next sections. Most logic is located inside master node. We'll end by detailing how the cluster can be properly shutdown, and how the storage nodes must behave depending on the state of the cluster.
The cluster is initially in the RECOVERING state, and it goes back to this state whenever the partition table becomes non-operational again. In parallel of recovering its own data by reading it from storage nodes, an election of the primary master always happens.
Both processes ends if the master is still primary and allowed to become operational, in which case the node and partition tables are updated (which implies broadcast of changes to other nodes) and the cluster switches to VERIFYING. Reciprocally, the master will have to go back to RECOVERING for any of the following reasons:
The changes in the node table also means that whenever the cluster is not in RECOVERING state, a newly-identified storage node is set RUNNING if it serves at least 1 cell, PENDING otherwise.
Truncate | ||
---|---|---|
tid | bin | All data after the given TID will have to be deleted. |
AskRecovery | ||
---|---|---|
(no parameter) | ||
AnswerRecovery | ||
ptid | int | nil | Partition Table ID. |
backup_tid | bin | nil | If not nil, the cluster is replicating another cluster and the storage node has all data up to the given TID. |
truncate_tid | bin | nil | All data after the given TID shall be deleted. |
Storage nodes may have different versions of the partition table (it means that some nodes were down the last time that the cluster was operational). The master must retrieve it from any node having the last one, and take its truncate_tid, as well as its backup_tid to know if the cluster is in backup mode. However, the master must track the backup_tid of each cell, which is know from the backup_tid value returned by storage nodes.
The conditions to start up the cluster are complex because we want to avoid a split brain without requiring the user to force unnecessarily. There are 2 modes of evaluation:
Conditions for ending recovery:
Election is the workflow responsible to select only one node in between potentially many master nodes of the cluster. The selected master node becomes known as the primary master. Once the primary master is elected it becomes reponsible for driving the cluster in every aspect where master is documented to be involved.
TODO
In VERIFYING state:
Verifying data starts by gathering all transactions that may have been partially finished. It's safe to query outdated cells from nodes with readable cells. For other nodes, it's more complicated:
And we can't ignore ttid < last tid for all nodes, even if the master serializes unlock notifications:
So for nodes without any readable cell, and only for them, one only checks if they have locked transactions. Replication will do the rest. Nodes are queried in parallel with:
AskLockedTransactions | ||
---|---|---|
(no parameter) | ||
AnswerLockedTransactions | ||
tid_dict | {ttid: bin, tid: bin | nil} | Voted transactions, locked or not. |
From storage answers, the master established 2 lists:
Only nodes that are assigned to the cell containing transaction metadata store whether a voted transaction is locked or not. If these node have unlocked the transaction and others were interrupted before doing so, locked_dict is incomplete. The TID can be got back with:
AskFinalTID | ||
---|---|---|
ttid | bin | TTID. |
AnswerFinalTID | ||
tid | bin | nil | TID. |
This has to be done for all TIDs in voted_dict but not locked_dict, and the master must only query readable cells (if an outdated cell had unlocked ttid, then either it is already in locked_dict or a readable cell also unlocked it). For each TID, the master stops asking after the first positive result. No result means that the transaction is not locked: no need to tell nodes to delete it, since they drop any unfinished data just before being operational.
At this point, the master knows all transactions for which tpc_finish was called but not fully processed, and this may include replicas with transactions that were not even locked. It finishes them by sending ValidateTransaction notifications for every TTID in locked_dict, to every node in voted_dict.
ValidateTransaction | ||
---|---|---|
ttid | bin | TTID. |
tid | bin | TID. |
The process of this packet is identical to what's done by storage backends when the transaction is locked and unlocked (see tpc_finish). A single underlying commit should be done, at the end.
For the next step, all nodes are queried, in parallel:
AskLastIDs | ||
---|---|---|
(no parameter) | ||
AnswerLastIDs | ||
loid | bin | nil | Greatest OID stored by the storage node. |
ltid | bin | nil | Greatest TID stored by the storage node. |
The cluster is in RUNNING state when it has fully recovered, which means the database is read-writable by client nodes.
In order to notify RUNNING storage nodes that they can server clients, the master sends StartOperation, either when the cluster enters RUNNING state for nodes that are already RUNNING, or whenever a node becomes RUNNING.
StartOperation | ||
---|---|---|
backup | bool | False |
XXX By reworking how the cluster state is broadcasted to other nodes, it should be possible to get rid of this packet: a storage node being notified that it is RUNNING and that the cluster is RUNNING could start operation immediately. In some way, like clients, which can only join a RUNNING cluster and work with storage nodes as soon as they're RUNNING.
A storage node notifies the master it is ready with NotifyReady (no parameter). Then it accepts connections from clients.
The main purpose of the STOPPING state is to provide a way to shutdown the whole cluster cleanly, in order to prevent transactions from being interrupted during the second phase.
If the cluster was RUNNING, the master must wait that all transactions are either aborted or finished, and block all requests to start new transactions. Clients must abort transactions that are still in the first phase.
If the cluster is no longer operational or if there isn't any transaction left:
XXX The reference implementation is broken when the cluster was not RUNNING so the above specifications are subject to change.
A node first needs to establish a link with the primary master. This is done by trying all known master nodes in loop. Identifying against a secondary master results in a NotPrimaryMaster notification (instead of a AcceptIdentification reply), so at most 2 attempts are usually enough.
Storage Node:
Once identified, the behaviour of a storage node depends on the cluster state. Via NotifyNodeInformation, it was told being either PENDING or RUNNING. As long as it is not explicitly asked to start operation, it is in a special state where it must be able to:
Client Node:
For a client node, identification is followed by AskLastTransaction (no parameter). AnswerLastTransaction only contains the TID of the last committed transaction: if different from a previous session, caches must be invalidated:
In either case, the ZODB Connection caches must be invalidated in whole (invalidateCache). InvalidateObjects notifications must be ignored as long as AnswerLastTransaction is not processed.
As soon as the cluster enters RUNNING state, the master must be ready to process requests to commit new transactions, and same for storage nodes that are ready to serve clients. When not operational anymore, all ongoing transactions must be forgotten.
Whereas previous sections describe NEO internals, we'll see workflows that correspond to user-visible service.
ZODB is somewhat a key-value database. For some specific ZODB&NEO features, a few metadata are associated to them. For storage node, a record consists in:
The undo of an object creation is encoded with (compression=0, data='', checksum='\0..\0').
In order to load an object, a client node first consults its partition & node tables to get a list of storage nodes that should currently have corresponding data. Then it queries one node from that list:
AskObject | ||
---|---|---|
oid | bin | OID of object to load. |
at | bin | nil | same as serial in IStorage.loadSerial |
before | bin | nil | same as tid in IStorage.loadBefore |
AnswerObject | ||
oid | bin | Same as in request. XXX shoud be removed |
serial | bin | TID of returned record |
next_serial | bin | nil | TID of next record, nil if there's none |
compression | uint | See Read & Write section. |
checksum | bin | |
data | bin | |
data_serial | bin | nil |
At least one of at and before must be nil. Possible error codes are:
The latter error code is returned when the storage node is no longer assigned to store the partition containing the requested data, or if the cell is not readable anymore. Such case happens due to distributed nature of the cluster, and here it means that messages from the master about changes in the partition table are sent but not yet delivered to the client. For any inconsistency between its tables and what's actually returned by the storage node, the client must use a Ping request to the master as a network barrier, to update its tables and retry.
A request may also simply fail because the storage node become unavailable. The client node should retry if there's any other readable cell.
Notice to implementers:
The above way of querying storage nodes for read applies to all requests that consist in reading from 1 node, e.g. AskFinalTID, AskTransactionInformation, etc.
XXX The NEO reference implementation of IStorageUndoable.undoLog is broken so the current protocol for this API is likely to change.
XXX next_serial is likely to be removed from the reply in the future, because it's often expensive for the storage node to return it (e.g. 2 SQL queries for SQL backends) and the client node should be able to do without it.
XXX(kirr) The Ping trick makes many implicit assumptions about e.g. Master being single threaded and other implicit orderings that are true for current NEO/py implementation, but are generally not true if implementation is not made serial. If any ordering is required for correct working the protocol must specify it explicitly.
Commit is split into phases in accordance with ZODB Two-Phase Commit Protocol. In this section, we describe the basic case, when there's no concurrency with other client nodes. In the next sections, we'll detail the extra mechanisms to solve conflicts or even deadlocks, as well as how commit interacts with synchronous replication.
A client node first asks the master to begin transaction with:
AskBeginTransaction | ||
---|---|---|
tid | bin | nil | nil for IStorage.tpc_begin, or IStorageRestoreable.tpc_begin argument. |
AnswerBeginTransaction | ||
ttid | bin | Temporary TID (same as tid if not nil). |
Because the master needs to know which storage nodes are expected to have all data from the client, it must wait that any storage node that was requested to start operation is either ready (NotifyReady) or not RUNNING anymore before processing any AskBeginTransaction packet. Then, the master records the list of ready nodes at the time the transaction is started, generates a new TID, and returns it.
The master must use a single TID generator for both temporary and final TIDs. Any newly-generated TID must be strictly greater than the previous one. The partition containing transaction metadata is computed with TID % NP, similarly to OIDs. Because these metadata are stored before the final TID is known, the latter must have the same modulo as the TTID. Here is an algorithm (the first 2 points are identical to any ZODB implementation):
XXX document askNewOIDs (but where?)
During this phase, the new data is sent to storage nodes, taking write-locks for each modified objects. On storage side, the code giving or rejecting write-locks can be seen as the core of NEO, because it's where many features interact with each other. For a better explanation, we will also describe internals of the reference implementation.
IStorage.store maps to:
AskStoreObject | ||
---|---|---|
oid | bin | OID of object to store. |
serial | bin | Base TID. |
compression | uint | See Read & Write section. |
checksum | bin | |
data | bin | |
data_serial | bin | nil | |
ttid | bin | TTID of transaction being committed. |
AnswerStoreObject | ||
locked | bin | nil | nil if successful. |
And ReadVerifyingStorage.checkCurrentSerialInTransaction to:
AskCheckCurrentSerial | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
oid | bin | OID of object to verify. |
serial | bin | Base TID. |
AnswerCheckCurrentSerial | ||
locked | bin | nil | nil if successful. |
Both requests must be done to all writeable cells and both replies are processed identically. The client node does not wait for answers before sending the next store/check: the next phase (vote) will start by waiting for all answers.
The client node can hold some of transaction-related data as follows:
data_dict | {oid: (data | check_marker, serial, [nid])} | data being checked/stored |
cache_dict | {oid: data | check_marker | nocache_marker} | stored/checked data, data will go to the cache on tpc_finish |
conflict_dict | {oid: serial} | conflicts to resolve |
resolved_dict | {oid: serial} | resolved conflicts |
conn_dict | {nid: connection | nil} | involved storage nodes |
For the storage node:
serial_dict | {oid: serial} | stored/checked objects |
store_dict | {oid: (oid, data_id, value_serial)} | metadata of stored objects |
lockless | {oid} | objects for which the storage accepted to store/check without checking for conflict |
In order to handle transactions of any size (bigger than RAM) both the client and storage nodes must act as a pipe for the raw data between the application and the storage devices. Because data being stored must be kept in memory by the client until it has an answer from all written cells, as required by conflict resolution, stores must be throttled to limit memory usage (e.g. same as cache size), by waiting for answers before the vote when the limit is reached. The storage node must write the raw data immediately to the underlying storage device (without commit) and only keep a reference (data_id). At last, if the client has a cache and wants to update it on tpc_finish, it can keep some stored data without exceeding the cache size.
Whenever an object is stored/checked, the client node must request at least 1 storage node, else the transaction fails immediately. It must also look up the current partition table to iterate over the storage nodes to request: in other words, for a given partitition, the list of cells can evolve during the transaction, as we receive updates from the master node.
However, a transaction must always use the same network connection with a storage node. It can establish a new connection with a storage node that is involved for the first time but must not reconnect or use a new connection that would have been established by some concurrent activity (read or concurrent transaction).
The vote phase starts by waiting for all store/check answers. Then the client requests all involved storage nodes to vote the transaction, in parallel. The following packet is used if the storage node is assigned the partition that contain transaction metadata:
AskStoreTransaction | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
user | bin | |
description | bin | |
extension | bin | |
oids | [bin] | cache_dict keys |
AnswerStoreTransaction | ||
(no parameter) |
Else:
AskVoteTransaction | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
AnswerVoteTransaction | ||
(no parameter) |
On storage side, the vote consists in storing durably all metadata to a temporary place (i.e. commit on storage backend). It must be done in such a way that tpc_finish should require minimal extra disk space, ideally by just flipping a boolean, in order to maximize the probability that tpc_finish succeed, and also for tpc_finish to complete in constant time, regardless the size of the transaction (in bytes or number of oids). The metadata to store are transaction metadata (if AskStoreTransaction), and object metadata (store_dict values).
Once all storage nodes have voted, the client node must do extra checks if any involved node failed (i.e. if any value in conn_dict indicates a closed connection). First, vote must fail if any oid is not locked by at least one node that didn't fail. Then, if there is any involved RUNNING node that failed (still conn_dict), it must ask the master whether they can be disconnected while keeping the cluster operational:
FailedVote | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
failed | [nid] | List of failed involved RUNNING storage nodes. |
Error | ||
ACK | During tpc_finish, the master will disconnect the failed nodes that are expected to have transactions in full (those that are ready since the beginning of the transaction). | |
INCOMPLETE_TRANSACTION | The nodes can't be disconnected and the transaction must be aborted. |
From the point of view of the client, it is usually a single request:
AskFinishTransaction | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
stored_list | [bin] | List of stored objects. |
checked_list | [bin] | List of checked objects. |
AnswerFinishTransaction | ||
tid | bin | Final TID of the transaction. |
If there was a FailedVote request, the master must check again whether the transaction can be committed, failing with INCOMPLETE_TRANSACTION otherwise. Then it iterates over the involved nodes (based on stored_list & checked_list) that are identified:
The master continues by generating the final TID (if not given by tpc_begin), adjusting its OID generator if a foreign OID was stored (max(oid_list)), and request node_list to lock the transaction:
AskLockInformation | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
tid | bin | Final TID. |
AnswerInformationLocked | ||
ttid | bin | Same as in request. |
Upon reception of AskLockInformation, the storage node locks all stored objects for reading, blocking all read requests (e.g. AskObject) for these objects. If the node stored transaction metadata, it also marks the transaction as committed, by storing durably the final TID.
Transactions must be unlocked in the same order they are locked. Once concurrent tpc_finish with a smaller final TID are unlocked, and once the master has received all answers from node_list, the transaction can be finalized. If the connection was not lost with the committer node, it replies to it with AnswerTransactionFinished. All other client nodes get invalidations with a single packet:
InvalidateObjects | ||
---|---|---|
tid | bin | TID. |
oid_list | bin | Same as stored_list. |
The transaction is unlocked by notifying node_list with:
NotifyUnlockInformation | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
The storage node unlocks by making all committed data readable and releasing all locks (both read & write) taken by the transaction. The reduce I/O, the storage backend does not have to commit durably, at least not immediately: in case of a crash, the transaction would be recovered. A example implementation is to commit only if there's no other commit during the following second (e.g. another transaction being voted or locked).
If the connection to the master is lost waiting for AnswerFinishTransaction, the client should try to recover by querying the final TID with AskFinalTID. The master is asked first:
During the commit phase, we saw quickly that a write-lock is taken when an object is stored/checked. Except a special case related to replication (which will be detailed later), a transaction that has successfully stored/checked an object owns a write-lock on this object and this prevents any concurrent transaction from doing the same (in theory, transactions that check the same object could share the lock) until the lock is released. This is object-level locking and with either several storage nodes or transactions committing oids in unsorted order, this could lead to deadlocks.
The core principle is to compare TTIDs. Given TTID1 that started before than TTID2, i.e. TTID1 < TTID2, we consider that TTID1 has statistically more chance to store/check objects before TTID2, which means that it is more common that TTID2 tries to store/check a TTID1-locked object, in which case TTID2 waits. And vice versa, TTID1 trying to store/check a TTID2-locked object is a possible deadlock case, which must be solved without delay.
After having looked at whether a write-lock is already taken or not, the storage must check whether there's a conflict for the requested operation, as defined by the ZODB specifications.
This is done by first getting the last TID of the object being stored/checked, which may fail with INCOMPLETE_TRANSACTION (as any other read), and we'll see later that it can a be tricky in the case of internal replication. If there's such a last TID (i.e. not a new OID), it is compared with serial (from AskStoreObject or AskCheckCurrentSerial): if different, the storage node stops here and returns the last TID as locked value in the answer packet, else the transaction gets the lock.
In the case checkCurrentSerialInTransaction, the conflict results in ZODB ReadConflictError, else the client node tries to resolve it. Internally, the ZODB tryToResolveConflict function uses loadSerial, which may result in a read request to any storage node if the data is not in cache, as described in Read section. If the conflict resolution succeeds, the client node stores new data with AskStoreObject and new base TID, as described in the commit phase.
While a transaction is resolving a conflict for an object, a concurrent transaction may have the time to read the last serial of this object (in the diagram, C₁ already has it) and modify it, which means that the resolved conflict will result in another conflict. In theory, if a transaction is always slow enough to resolve a conflict, a concurrent process could prevent it to commit by modifying the conflicting object in loop. We consider this is a non-issue, but an idea to fix it is to take the lock before the second attempt to resolve.
With replicas, there may be several readable cells, in which the client node receives several conflict answers. Resolution can be done upon the first answer and the new data is sent even to nodes that haven't replied yet for previous stores.
The most common case of abort is after a conflict and it is initiated by the client node, using the following notification:
AbortTransaction | ||
---|---|---|
ttid | bin | TTID of transaction being committed. |
nid_list | [nid] | List of involved storage nodes. Empty if sent to a storage node. |
Storage nodes must abort a transaction after any other packet sent by the client for this transaction. If there was a pending write, aborting only via the master would lead to a race condition, with the consequence that storage nodes may lock oids forever. Therefore, the client node must notify both the master and involved storage nodes. And to ensure that involved nodes are notified, the master also notifies them. Most of the time, the storage nodes get 2 AbortTransaction packets, the second one being ignored.
Once finishing the transaction is requested by the client node (AskFinishTransaction), the master becomes the node responsible to initiate an abort if needed. The difference is clearly visible with INCOMPLETE_TRANSACTION, after which the abort is initiated by the client in the case of tpc_vote, and by the master in the case of tpc_finish. In the latter case, a simplified implementation is to notify all ready storage nodes.
A connection failure with a client node also results in dropping its transactions, provided they're not too advanced:
XXX
Replication is the workflow responsible for keeping multiple copies of data on several storage nodes. It defines how a storage node synchronizes from others if, for any reason, it does not have all data. Replication is used in 2 ways:
In any case, storage nodes exchange data partition per partitition, first doing all transaction metadata and objects, both being done by increasing TID.
AskFetchTransactions | ||
---|---|---|
partition | uint | Partition number. |
length | uint | Maximum number of transactions to replicate. For example, 1000. |
min_tid | bin | Start of the transaction chunk to replicate, included. |
max_tid | bin | End of the transaction chunk to replicate, included. |
tid_list | [tid: bin] | List of transactions the destination node already has between min_tid and max_tid. |
AnswerFetchTransactions | ||
pack_tid | nil | Not yet implemented. |
next_tid | bin | nil | If nil, the [min_tid..max_tid] chunk is fully replicated, else the length was reached and next_tid is used as min_tid for a next AskFetchTransactions request. |
delete_list | [tid: bin] | Transactions to delete. |
For any missing transaction, the source node uses the following packet with same messsage id as AskFetchTransactions:
AddTransaction | ||
---|---|---|
user | bin | |
description | bin | |
extension | bin | |
packed | bool | |
ttid | bin | |
oids | [oid: bin] |
The replication of object records belonging to the same transaction is done by increasing OID:
AskFetchObjects | ||
---|---|---|
partition | uint | Partition number. |
length | uint | Maximum number of objects to replicate. For example, 1000. |
min_tid | bin | Start of the transaction chunk to replicate, included. |
max_tid | bin | End of the transaction chunk to replicate, included. |
min_oid | bin | Start of the transaction chunk to replicate, included. |
object_dict | {oid: bin, tid_list: [bin]} | List of objects the destination node already has in the chunk. |
AnswerFetchObjects | ||
pack_tid | nil | Not yet implemented. |
next_tid | bin | nil | If nil, the [(min_tid,min_oid)..max_tid] chunk is fully replicated, else the length was reached and (next_tid,next_oid) is used as starting point for a next AskFetchObjects request. |
next_oid | bin | nil | |
delete_dict | {oid: bin, tid_list: [bin]} | Objects to delete. |
For any missing object, the source node uses the following packet with same messsage id as AskFetchObjects:
AddObject | ||
---|---|---|
oid | bin | |
tid | bin | |
compression | uint | |
checksum | bin | |
data | bin | |
data_serial | bin | nil |
The source must abort AskFetchTransactions & AskFetchObjects with REPLICATION_ERROR if it does not have data, i.e. the replicated partition is dropped or truncated.
Notice to implementers: For performance reasons, AddTransaction & AddObject should be buffered, i.e. they should not be passed to the kernel unless enough data are accumulated (e.g. 64kB) or some other packet type is sent, not only to avoid the cost of TCP_NODELAY but also to reduce the number of syscalls. For these packets, throughput is more important than latency.
A replicating node can not depend on other nodes to fetch the data recently/being committed because that can not be done atomically: it could miss writes between the processing of its request by a source node and the reception of the answer.
This is why outdated cells are writable: a storage node asks the master for transactions being committed and then it is expected to fully receive from the client any transaction that is started after this answer. Which has in turn other consequences:
AskUnfinishedTransactions | ||
---|---|---|
offset_list | [partition: uint] | List of OUT_OF_DATE partitions. |
AnswerUnfinishedTransactions | ||
max_tid | bin | TID of last committed transaction. |
ttid_list | [ttid: bin] | List of transactions being committed. |
XXX Because the master already knows the list of outdated cells, the above request could be changed to a single NotifyUnfinishedTransactions (M → S) paquet.
The storage marks all partitions in offset_list are having to be replicated up to max_tid. It also has to wait for all transactions in ttid_list to end. It is notified of aborted transactions with AbortTransaction (as usual). For successful commits, the master sends:
NotifyTransactionFinished | ||
---|---|---|
ttid | bin | TTID. |
max_tid | bin | TID of last committed transaction, which is actually the one that corresponds to ttid. |
Note that a storage node may participate to transactions listed in AnswerUnfinishedTransactions, in which case NotifyTransactionFinished must be sent just after NotifyUnlockInformation. Then the resulting replication would fetch missed writes: there may be none.
The storage node is free to start replicating (up to the last max_tid received so far) even it's still for transactions to complete, provided that a last replication process is performed after the last NotifyTransactionFinished.
On the side of transaction management, write-locking evolves through 2 phases before going back to normal.
As long as the partition is not internally readable, no lock is taken by AskStoreObject & AskCheckCurrentSerial: the storage node must remember lockless oids and reply with locked = ZERO_TID.
On client side, such an answer could be a late one, to a request that resulted in a conflict (from readable cells), and it must be ignored. Otherwise, the cell has to be marked as not being fully write-locked: at tpc_vote, if any connection to a storage node was lost, every touched partition must be fully write-locked by at least 1 node. Then, it continues to process the answer like for a normal one.
At the time the partition is fully replicated, the storage node may have accepted multiple transactions to store/check the same oid and it does not know which one will win (assuming there aren't only checks), whereas others will either abort or store again: in the latter case, it is again a lockless write (ZERO_TID returned). For these oids, the write-lock is given to the highest TTID, which matters for any new transaction that tries to lock it.
As soon as there's only 1 lockless write for an oid, this oid is write-locked normally. When all oids are write-locked normally for a replicated partition, the storage node notifies the master:
NotifyReplicationDone | ||
---|---|---|
partition | uint | Partition number. |
max_tid | bin | TID of last replicated transaction. |
XXX Explain why NotifyReplicationDone can't be sent earlier (end of phase 1). Maybe required for deadlock avoidance, or because it was implemented before we realized that the client must remember cells that aren't fully write-locked.
XXX Whole section to review.
Backup, or more exactly online backup, is the workflow responsible for keeping data from one NEO cluster to be also available at another NEO cluster. If the first cluster is the main one used for a service, and the second cluster is located in e.g. another building or another country, the link latency in between clusters will be much more than intra cluster link latencies. With current state of NEO it is not practical to mix storage nodes with different link latencies into one cluster as the slower ones will be retarding most operations (else it makes more sense to just use all nodes as replicas). However the cluster located at another physical place has high chances to be resilient to physical failure at original place and thus the backup cluster (likey with backup service instance also moved nearby it) can be used in the case of such damage.
Online backup relies on Invalidation and Replication for its working. Whenever a backup cluster is setup its primary master connects as client node to master of the main cluster to receive invalidation messages this way staying informed about transactions committed. The backup master issues commands to its storage nodes to replicate corresponding data from storages of the main cluster. The storages in turn use the same replication protocol which is used in main cluster for asynchronous replication.
XXX(jm) document in more detail how backing up works, in particular how backup cluster reports its partition table and cell states there.
XXX Whole section to review, once deadlock avoidance is reimplemented, due to performance issues.
A typical scenario is the following one:
When solving the deadlock, the data of the first store may only exist on the storage nodes.
Deadlocks are detecting by comparing locking TIDs; initially a locking TID is the same as the TTID. When trying to (write-)lock an OID that is already locked by a smaller locking TID, the store is delayed: this behaviour is chosen because a transaction that starts first is likely to store first. Otherwise, when trying to lock with a smaller locking TID, a deadlock is issued.
The main idea behind deadlock avoidance is to rebase OIDs that have already been stored. In order that the comparison of locking TIDs keeps working, the master is involved to deliver a new locking TID:
1. Sa: mark the transaction as being rebased 2. Sa: release all write-locks 3. Sa: sortAndExecuteQueuedEvents 4. Sa > M NotifyDeadlock{ ttid, # to identify the transaction current locking tid, # M must ignore late/redundant packets } 5. Sa: waiting that the transaction is rebased, the store is delayed 6. M > C NotifyDeadlock{ttid, new locking tid} 7. C > S # to all involved storage nodes RebaseTransaction{ttid, new locking tid} 8. Sb: sets new locking tid 9. Sb (a!=b): same as 2 10. Sb: sortAndExecuteQueuedEvents
At step 10, a new deadlock may happen, in which case it loops to 1 and an empty answer is sent back to the client:
11. Sb > C AnswerRebaseTransaction{[]}
If there's no new deadlock:
11. lock all oids that can be relocked immediately 12. Sb > C AnswerRebaseTransaction{ oids that are locked by another transaction + those that conflict }
Then for each oid returned by AnswerRebaseTransaction:
13. C > Sb RebaseObject{ttid,oid}
The processing of RebaseObject is similar to that of StoreObject, with the following differences:
Possible answers:
On client, AnswerRebaseObject is also handled in a similar way that AnswerStoreObject.
Note
RebaseObject is a request packet to simplify the implementation. For more efficiency, this should be turned into a notification, and RebaseTransaction should answered once all objects are rebased (so that the client can still wait on something).
XXX Whole section to review.
So-called cascaded deadlock is when a new deadlock happens during deadlock avoidance. The above protocol does handle this case. Here is a example with a single storage node:
# locking tids: t1 < t2 < t3 1. A2 (t2 stores A) 2. B1, c2 (t2 checks C) 3. A3 (delayed), B3 (delayed), D3 (delayed) 4. C1 -> deadlock: B3 5. d2 -> deadlock: A3 # locking tids: t3 < t1 < t2 6. t3 commits 7. t2 rebase: conflicts on A and D 8. t1 rebase: new deadlock on C 9. t2 aborts (D non current)
all locks released for t1, which rebases and resolves conflicts
Deadlock avoidance also depends on correct ordering of delayed events (sortAndExecuteQueuedEvents), otherwise the cluster could enter in an infinite cascade of deadlocks. Here is an overview with 3 storage nodes and 3 transactions modifying the same oid (2 replicas):
S1 S2 S3 order of locking tids # abbreviations: l1 l1 l2 123 # l: lock q23 q23 d1q3 231 # d: deadlock triggered r1:l3 r1:l2 (r1) # for S3, we still have l2 # q: queued d2q1 q13 q13 312 # r: rebase
Above, we show what happens when a random transaction gets a lock just after that another is rebased. Here, the result is that the last 2 lines are a permutation of the first 2, and this can repeat indefinitely with bad luck.
The probability of deadlock must be reduced by processing delayed stores/checks in the order of their locking tid. In the above example, S1 would give the lock to 2 when 1 is rebased, and 2 would vote successfully.
XXX
XXX
XXX
XXX
XXX
Although the following sections are only an example of how data can be stored persistently, they should help a lot in understanding the internals of NEO.
The example is for a backend storing data in a relational transactional database like MariaDB/InnoDB. One advantage of SQL is that it makes backend data easier to inspect; SQL is not mandatory. We will describe the schema of tables that are used in the reference implémentation.
In ZODB, TIDs are a sequence of 8 bytes. It may be easier to handle 64-bit integers, as shown in the example.
Database & storage node metadata.
name | VARBINARY(255) NOT NULL PRIMARY KEY |
value | VARBINARY(255) NULL |
name can be:
Partition table.
partition | SMALLINT UNSIGNED NOT NULL |
nid | INT NOT NULL |
tid | BIGINT NOT NULL |
tid is either:
The purpose of an OUT_OF_DATE TID per assigned cell is to optimize resumption of replication by starting from a greater TID:
Although data that are already transferred aren't transferred again, checking that the data are there for a whole partition can still be a lot of work for big databases. A storage node that gets disconnected for a short time should becomre fully operational quite instantaneously because it only has to replicate the new data. Without such an optimization, the time to recover would dependon the size of the database.
For OUT_OF_DATE cells, the difficult part is that they are writable and can then contain holes, so we can't just take the last TID in trans/obj. This is solved that by remembering the latter at the moment the state switches from UP_TO_DATE/FEEDING, for each assigned partition. With only a single global value, a cell that still has a lot to replicate would still cause all other cells to resume from the a very small TID, or even ZERO_TID; the worse case is when a new cell is assigned to a node (as a result of tweak).
Transaction metadata of committed transactions.
partition | SMALLINT UNSIGNED NOT NULL |
tid | BIGINT UNSIGNED NOT NULL |
packed | BOOLEAN NOT NULL |
oids | MEDIUMBLOB NOT NULL |
user | BLOB NOT NULL |
description | BLOB NOT NULL |
ext | BLOB NOT NULL |
ttid | BIGINT UNSIGNED NOT NULL |
ttid is the TTID that was used when the transaction was being committed, which is required for AskFinalTID. It is also useful for debugging and performance analyzis (the difference between TTID and TID gives the time needed to commit the transaction). When importing transactions, this value can be the same as tid.
Object metadata of committed transactions.
partition | SMALLINT UNSIGNED NOT NULL |
oid | BIGINT UNSIGNED NOT NULL |
tid | BIGINT UNSIGNED NOT NULL |
data_id | BIGINT UNSIGNED NULL |
value_tid | BIGINT UNSIGNED NULL |
data_id refers to data.id.
Actual object content.
id | BIGINT UNSIGNED NOT NULL PRIMARY KEY |
hash | BINARY(20) NOT NULL |
compression | TINYINT UNSIGNED NULL |
value | LONGBLOB NOT NULL |
id shall be generated as follows:
Thus, replicating a partition usually results in reading data sequentially. Not doing this causes much read-amplification or disk seeks.
Metadata of transactions being committed.
They are temporary counterparts of trans & obj. The differences are:
tobj.tid contains the TTID.
These tables are filled during tpc_vote: only tobj if VoteTransaction, or at least ttrans if AskStoreTransaction. This is followed by a commit, which often includes changes to data by AskStoreObject requests.
trans.tid is set during tpc_finish, when processing AskLockInformation. This is followed by a commit.
Upon reception of NotifyUnlockInformation, the content is moved to trans & obj, by changing tobj.tid on the fly. It is recommended to wait a while before commit.
XXX
The original design from Yoshinori Okuji introduces most concepts of NEO architecture and protocol with much details to describe the network protocol. Although the original protocol has been changed since then, the concepts described in the document are still valid and reading the document helps understanding the architecture of NEO.
DZUG presentation XXXX
For more information, please contact Jean-Paul, CEO of Nexedi (+33 629 02 44 25).