This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Sinks

Supported event sinks

1 - EventStoreDB gRPC Sink

EventStoreDB sink with gRPC protocol, supported by latest versions (v20+).

When replicating events to Event Store Cloud, we recommend using the EventStoreDB gRPC sink.

You need to specify two configurations options for it:

  • replicator.sink.protocol - set to grpc
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the gRPC client.

For example, for an Event Store Cloud cluster the connection string would look like:

esdb+discover://<username>:<password>@<cluster_id>.mesdb.eventstore.cloud.

Using gRPC gives you more predictable write operation time. For example, on a C4-size instance in Google Cloud Platform, one write would take 4-5 ms, and this number allows you to calculate the replication process throughput, as it doesn’t change much when the database size grows.

2 - EventStoreDB TCP Sink

EventStoreDB sink with TCP protocol, supported by older versions (v5 and earlier).

The TCP sink should only be used when migrating from one older version cluster to another older version cluster. As Event Store plans to phase out the TCP client and protocol, consider using the gRPC sink instead.

For the TCP sink, you need to specify two configurations options for it:

  • replicator.sink.protocol - set to tcp
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the TCP client.

Check the connection string format and options in the TCP client documentation.

The risk of using the TCP sink is that you might get unstable write speed. The speed might go down when the database size grows, unlike gRPC sink write speed, which remains stable.

3 - Kafka Sink

Sink for Apache Kafka, using confirmed writes

The Kafka sink allows you to set up continuous replication from EventStoreDB to Apache Kafka. It might be useful, for example, to scale out subscriptions, as you can partition events in Kafka. Then, you can have a consumer group with concurrent consumers, which process individual partitions, instead of having a single partition on $all.

There’s no way to specify a custom partition, so the default (random) Kafka partitioner will be used.

The Kafka sink needs to be configured in the sink section of the Replicator configuration.

  • replicator.sink.protocol - set to kafka
  • replicator.sink.connectionString - Kafka connection string, which is a comma-separated list of connection options
  • replicator.sink.partitionCount - the number of Kafka partitions in the target topic
  • replicator.sink.router - optional JavaScript function to route events to topics and partitions

Example:

replicator:
  reader:
    connectionString: esdb+discover://admin:changeit@xyz.mesb.eventstore.cloud
    protocol: grpc
  sink:
    connectionString: bootstrap.servers=localhost:9092
    protocol: kafka
    partitionCount: 10
    router: ./config/route.js

Routing

Replicator needs to route events to Kafka. In particular, it needs to know the topic, where to write events to, and the partition key. By default, the topic is the stream “category” (similar to the category projection), which is part of the event stream before the dash. For example, an event from Customer-123 stream will be routed to the Customer topic. The stream name is used as the partition key to ensure events order within a stream.

It’s possible to customise both topic and partition key by using a routing function. You can supply a JavaScript code file, which will instruct Replicator about routing events to topics and partitions.

The code file must have a function called route, which accepts the following parameters:

  • stream - original stream name
  • eventType - original event type
  • data - event payload (data), only works with JSON
  • metadata - event metadata, only works with JSON

The function needs to return an object with two fields:

  • topic - target topic
  • partitionKey - partition key

For example:

function route(stream, eventType, data, meta) {
    return {
        topic: "myTopic",
        partitionKey: stream
    }
}

The example function will tell Replicator to produce all the events to the myTopic topic, using the stream name as partition key.

You need to specify the name of the while, which contains the route function, in the replicator.sink.router setting. Such a configuration is displayed in the sample configuration YAML snipped above.

4 - Sink Partitioning

Increase the speed of writes by enabling partitioned sinks.

Write modes

Replicator will read events from the source cluster using batched reads of 4096 (default) events per batch. As it reads from $all, one batch will contain events for different streams. Therefore, writing events requires a single write operation per event to ensure the correct order of events written to the target cluster.

If you don’t care much about events order in $all, you can configure Replicator to use concurrent writers, which will increase performance. The tool uses concurrent writers with a configurable concurrency limit. Writes are partitioned, and the order of written events within a partition is kept intact. Read more below about different available partitioning modes.

Partition by stream name

Writers can be partitioned by stream name. This guarantees that events in individual streams will be in the same order as in the source cluster, but the order of $all will be slightly off.

To enable concurrent writers partitioned by stream name, you need to change the replicator.sink.partitionCount setting. The default value is 1, so all the writes are sequential.

Custom partitions

You can also use a JavaScript function to use event data or metadata for partitioning writers. The function must be named partition, it accepts a single argument, which is an object with the following schema:

{
  "stream": "",
  "eventType": "",
  "data": {},
  "metadata": {}
}

The function must return a string, which is then used as a partition key.

For example, the following function will return the Tenant property of the event payload, to be used as the partition key:

function partition(event) {
    return event.data.Tenant;
}

There are two modes for custom partitions, described below.

Partitioning by hash

As with the stream name partitioning, the custom partition key is hashed, and the hash of the key is used to decide which partition will take the event. This method allows having less partitions than there are keys.

To use this mode you need to set the partition count using the replicator.sink.partitionCount setting, and also specify the file name of the partitioning function in the replicator.sink.partitioner setting. For example:

replicator:
  sink:
    partitionCount: 10
    partitioner: ./partitioner.js

Partition by value

In some cases, it’s better to assign a single partition for each partition key. Use this method only if the number of unique values for the partition key is upper bound. This strategy works well for partitioning by tenant, for example, if the number of tenants doesn’t exceed a hundred. You can also decide to go beyond this limit, but each partition uses some memory, so you need to allocate enough memory space for a high partition count. In addition, be aware of the performance concerns described in the next section. Those concerns might be less relevant though as not all the partitions will be active simultaneously if a single page doesn’t contain events for all tenants at once.

To use value-based partitioning, use the same partitioning function signature. The difference is that for each returned partition key there will be a separate partition. For example, if the function deterministically return 10 different values, there will be 10 partitions. You don’t need to configure the partition count, partitions will be dynamically created based on the number of unique keys.

The settings file, therefore, only needs the replicator.sink.partitioner setting configured.

Partition count considerations

Do not set this setting to a very high value, as it might lead to thread starvation, or the target database overload. For example, using six to ten partitions is reasonable for a C4 Event Store Cloud managed database, but higher value might cause degraded performance.