public final class KinesisSinks extends Object
Modifier and Type | Class and Description |
---|---|
static class |
KinesisSinks.Builder<T>
Fluent builder for constructing the Kinesis sink and setting its
configuration parameters.
|
Modifier and Type | Field and Description |
---|---|
static String |
BATCH_SIZE_METRIC
One of the metrics exposed by the sink used to monitor the
current sending batch size.
|
static int |
MAX_RECORD_SIZE
The length of a record's data blob (byte array length), plus the
record's key size (no.
|
static int |
MAXIMUM_KEY_LENGTH
Kinesis partition keys are Unicode strings, with a maximum length
limit of 256 characters for each key.
|
static String |
THROTTLING_SLEEP_METRIC
One of the metrics exposed by the sink used to monitor the
current sleep delay between consecutive sends (in milliseconds).
|
Modifier and Type | Method and Description |
---|---|
static KinesisSinks.Builder<Map.Entry<String,byte[]>> |
kinesis(String stream)
Convenience method for a specific type of sink, one that ingests
items of type
Map.Entry<String, byte[]> and assumes that
the entries key is the partition key and the entries value is the
record data blob. |
static <T> KinesisSinks.Builder<T> |
kinesis(String stream,
FunctionEx<T,String> keyFn,
FunctionEx<T,byte[]> valueFn)
Initiates the building of a sink that publishes messages into
Amazon Kinesis Data Streams (KDS).
|
public static final int MAXIMUM_KEY_LENGTH
public static final int MAX_RECORD_SIZE
public static final String BATCH_SIZE_METRIC
public static final String THROTTLING_SLEEP_METRIC
@Nonnull public static <T> KinesisSinks.Builder<T> kinesis(@Nonnull String stream, @Nonnull FunctionEx<T,String> keyFn, @Nonnull FunctionEx<T,byte[]> valueFn)
The basic unit of data stored in KDS is the record. A record is composed of a sequence number, partition key, and data blob. KDS assigns the sequence numbers on ingestion; the other two have to be specified by the sink.
The key function we provide specifies how to assign partition keys to the items to be published. The partition keys have the role of grouping related records so that Kinesis can handle them accordingly. Partition keys are Unicode strings with a maximum length of 256 characters.
The value function we provide specifies how to extract the useful data content from the items to be published. It's basically serialization of the messages (Kinesis handles neither serialization nor deserialization internally). The length of the resulting byte array, plus the length of the partition key, can't be longer than 1MiB.
The sink is distributed Each instance handles a subset of possible partition keys. In Jet terms, the sink processors' input edges are distributed and partitioned by the same key function we provide for computing the partition keys. As a result, each item with the same partition key will end up in the same distributed sink instance.
The sink can be used in both streaming and batching pipelines.
The only processing guarantee the sink can support is at-least-once. This is caused by the lack of transaction support in Kinesis (can't write data into it with transactional guarantees) and the AWS SDK occasionally causing data duplication on its own (@see Producer Retries in the documentation).
The sink preserves the ordering of items for the same partition key, as long as the stream's ingestion rate is not tripped. Ingestion rates in Kinesis are specified on a per shard basis, which is the base throughput unit. One shard provides a capacity of 1MiB/sec data input and 2MiB/sec data output. One shard can support up to 1000 record publications per second. The owner of the Kinesis stream specifies the number of shards needed when creating the stream. It can be changed later without stopping the data flow.
If the sinks attempt to write more into a shard than allowed, some records will be rejected. This rejection breaks the ordering because the sinks write data in batches, and the shards don't just reject entire batches but random records from them. What's rejected can (and is) retried, but the batch's original ordering can't be preserved.
The sink cannot avoid rejections entirely because multiple instances of it write into the same shard, and coordinating an aggregated rate among them is not something currently possible in Jet.
The sink has a flow control mechanism, which tries to minimize the amount of ingestion rate tripping, when it starts happening, by reducing the send batch size and introducing adaptive sleep delays between consecutive sends. However, the only sure way to avoid the problem is having enough shards and a good spread of partition keys (partition keys are assigned to shard based on an MD5 hash function; each shard owns a partition of the hash space).
The sink exposes metrics that can be used to monitor the flow
control mechanism. One of them is the
send batch size
; the other
one is the
sleep delay between consecutive sends
.
stream
- name of the Kinesis stream being written intokeyFn
- function for computing partition keysvalueFn
- function for computing serialized message data
content@Nonnull public static KinesisSinks.Builder<Map.Entry<String,byte[]>> kinesis(@Nonnull String stream)
Map.Entry<String, byte[]>
and assumes that
the entries key is the partition key and the entries value is the
record data blob. No explicit key nor value function needs to be
specified.Copyright © 2023 Hazelcast, Inc.. All rights reserved.