Skip to content

Latest commit

 

History

History
202 lines (149 loc) · 9.5 KB

queue-replication-message-protocol-overview.adoc

File metadata and controls

202 lines (149 loc) · 9.5 KB

Chronicle Queue Enterprise - TCP/IP Replication Protocol

Overview

This document describes the high-level protocol and handshaking that is used by Chronicle Queue Enterprise to replicate data using TCP/IP from queues that are on different machines. Chronicle Queue Enterprise is a licenced product; if you are interested in obtaining more information about this please contact [email protected]

Message format in this document

Chronicle Queue Enterprise sends its messages in binary wire format (see net.openhft.chronicle.wire.BinaryWire); this is a binary form of YAML.

In order to make the messages human-readable in this document, the messages are shown in text wire format (see net.openhft.chronicle.wire.TextWire).

Who should read this document

Anyone who wants to get more information into the data that is transferred between the remote hosts, used by Chronicle Queue Enterprise, to do TCP/IP replication. This document is not a queue functionality overview document.

UberHandler

Chronicle Queue Enterprise replication uses an UberHandler. UberHandlers act as message routers, They are serialised locally, and then sent to the remote host using TCP/IP. The serialised form of UberHandler is shown below:

--- !!data #binary
handler: !net.openhft.chronicle.network.cluster.handlers.UberHandler {
  remoteIdentifier: 1,
  localIdentifier: 2,
  wireType: !WireType BINARY_LIGHT,
  clusterName: global
}

onInitialize() and TerminatorHandler

When the Uberhandler is loaded by the remote machine, its onInitialize() method is automatically called (see net.openhft.chronicle.network.cluster.handlers.UberHandler#onInitialize).

Note
When sending the Uberhandler, only its data is serialised. The Java byte code of the Uberhandler must already exist on the remote machine. Only the data of the Uberhandler instance is sent, not the byte code for the Uberhandler class itself. This is because automatically running any arbitrary code using the onInitialize() method remotely could lead to security concerns.

This is also true for any SubHandler (see net.openhft.chronicle.network.api.session.SubHandler) which also has an onInitialize() method. This is covered in more detail below.

SubHandlers

The Uberhandler is used to route messages to SubHandlers based upon the csp, or the cid which is just an alias to the csp.

The reason that we set up a cid is that a cid is numeric, and as such is limited (using BinaryWire) to 8 bytes (long type).

A csp is a string field which can be much larger. Once the relationship between the csp and its alias cid is established, then only the more compact cid will be sent.

To summarise;

  • first, the Uberhandler is sent, which is used to route the subHandler messages.

  • then, any number of SubHandlers are sent. There is a relationship that is maintained by the Uberhandler, between the SubHandler and its cip/cid used for routing purposes.

Heartbeat Handlers

The message shown below sets up the following relationship between the csp: / also aliased as cid: !int 1671070996 and its heartbeat handler`, so that any message sent with cid: !int 1671070996 will be received by this heartbeat handler instance running on the remote machine.

--- !!meta-data #binary
csp: /
cid: !int 1671070996
--- !!meta-data #binary
handler: !net.openhft.chronicle.network.cluster.handlers.HeartbeatHandler {
  heartbeatTimeoutMs: !int 40000,
  heartbeatIntervalMs: !short 30000
}

Heartbeat handlers are set up on both the source, and the sink machines. Periodically (set using the heartbeatIntervalMs field) they send a heartbeat message to their corresponding remote host. If the heartbeatTimeoutMs has passed and no heartbeat message has been received, then the heartbeat handler will close the socket connection and attempt to reconnect.

Exchanging data between Source and Sink

Chronicle Queues are replicated using the SourceReplicationHandler, which sends its messages to the SinkReplicationHandler. Both the SourceReplicationHandler and SinkReplicationHandler are SubHandlers.

However, handshaking messages are sent back from the SinkReplicationHandler to the SourceReplicationHandler. One such message is the acknowledgement message which the sink sends the source. By inspecting the idx: 0x451600000000 (see example message below) this allows you to add code on your source machine to ensure that the message was received by the remote machine, and therefore successfully replicated.

--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
idx: 0x451600000000
ns: 10849029994071

The Sink and Source handlers are as follows:

from

to

msg sent

source

sink

software.chronicle.enterprise.queue.replication.SinkReplicationHandler

sink

source

software.chronicle.enterprise.queue.replication.SourceReplicationHandler

The source-side of the connection sends a SinkReplicationHandler to be run on the remote sink host.

--- !!meta-data #binary
csp: /replication/out-1/2
cid: 292323922173575
--- !!data #binary
handler: !SinkReplicationHandler {
  queueName: out-1,
  wireType: BINARY_LIGHT,
  acknowledgement: true,
  nextIndexRequired: 0x451600000001,
  sourceId: !short 1002
}

The sink-side of the connection will respond by setting up a SourceReplicationHandler to be run on the remote source host.

--- !!meta-data #binary
csp: /replication/out-1/2
cid: 292323922173575
--- !!data #binary
handler: !SourceReplicationHandler {
  queueName: out-1,
  wireType: BINARY_LIGHT,
  acknowledgement: true,
  nextIndexRequired: 0x0,
  sourceId: !short 1002
}

Whenever your application appends data to the source queue, the SourceReplicationHandler will read this queue using a queue tailer, and then immediately stream any new data to the remote host.

Chronicle Queue Enterprise establishes a stream rather than a polling protocol. If the network buffers are full, then data will not be sent by the SourceReplicationHandler. Therefore, it is not strictly reactive, but rather, it is sensitive to push back.

Chronicle Queue Enterprise uses queues which page data to disk, rather than holding it all in memory. Therefor Chronicle Queue will not get saturated by a slow consumer, because the data is not paged into memory from the queue until the TCP/IP buffers have sufficient free space.

SinkReplicationHandler

Before the sink replication handler starts to read messages from the source machine, it first copies back messages from the sink machine, to the source machine; this is called the back copy.

Although rare, it is useful for example, if the source machine was replicating to two (or more) sinks, and the source suffered a power outage.
Chronicle Queue Enterprise will fail-over to one of the remaining sinks, and therefore we need to ensure that, whichever sink is chosen, it has the latest messages.
Therefore, in the event that one of the sinks has more messages than the other, we will first copy any messages from the other sink before we establish this sink as our new source.

When the ServiceSinkReplicationHandler starts, it calls software.chronicle.enterprise.queue.replication.SinkReplicationHandler#onInitialize.

When all the data has been replicated, an END_OF_STREAM message is sent to notify the SourceReplicationHandler that the back copy is complete.

--- !!meta-data #binary
cid: 573798926109737
--- !!data #binary
eos: !!null "" #  END_OF_STREAM

The sink replication handler then receives new messages from the SourceReplicationHandler. When it receives these new messages it uses a Chronicle Queue appender to write them to a Chronicle Queue.

Note
When the messages are written to the Chronicle Queue by the SinkReplicationHandler, we write the message and take account of the source index, to guarantee that the order of messages from the source exactly matches the order of messages on the sink. In addition, using the index also ensures that the message has always been written to the correct queue file. This is even if events such has roll-over have occurred. This would typically cause a normal appender to write the message to the next queue file. This is not what we want, because we create an exact copy of messages, from the source to the sink.

Sending data with the SourceReplicationHandler

The SourceReplicationHandler sends messages to the SinkReplicationHandler. The SourceReplicationHandler uses a Chronicle tailer to read new messages from your Chronicle Queue. The messages will be written to the queue by your application logic. When the SourceReplicationHandler comes to read the contents of this Chronicle queue, it does not de-serialize the message in any way, it treats the message as a blob of bytes, and writes the bytes to the replication event. This is also known as the re in the message below:

--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
re: < replication-event> # see below

The bytes that make up the replication-event follow the following format:

public void writeMarshallable(@NotNull WireOut wire) {
    @NotNull ValueOut out = wire.getValueOut();
    out.int64_0x(index);
    out.bytesLiteral(payload);

    // nano-timestamp create with the timestamp from the source machine
    out.int64(nanoTimeStamp = System.nanoTime());
}

When the message is received by the sink, it sends an acknowledgement to the source:

--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
idx: 0x451600000000
ns: 10849029994071