Class KinesisSinks
- Since:
- Jet 4.4
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic final class
Fluent builder for constructing the Kinesis sink and setting its configuration parameters. -
Field Summary
Modifier and TypeFieldDescriptionstatic final String
One of the metrics exposed by the sink used to monitor the current sending batch size.static final int
The length of a record's data blob (byte array length), plus the record's key size (no.static final int
Kinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.static final String
One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds). -
Method Summary
Modifier and TypeMethodDescriptionstatic KinesisSinks.Builder<Map.Entry<String,
byte[]>> Convenience method for a specific type of sink, one that ingests items of typeMap.Entry<String, byte[]>
and assumes that the entries key is the partition key and the entries value is the record data blob.static <T> KinesisSinks.Builder<T>
kinesis
(String stream, FunctionEx<T, String> keyFn, FunctionEx<T, byte[]> valueFn) Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).
-
Field Details
-
MAXIMUM_KEY_LENGTH
public static final int MAXIMUM_KEY_LENGTHKinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.- See Also:
-
MAX_RECORD_SIZE
public static final int MAX_RECORD_SIZEThe length of a record's data blob (byte array length), plus the record's key size (no. of Unicode characters in the key), must not be larger than 1MiB.- See Also:
-
BATCH_SIZE_METRIC
One of the metrics exposed by the sink used to monitor the current sending batch size. The batch size is computed based on the number of shards in the stream. The more shards, the bigger the batch size, the more data the stream can ingest. The maximum value is 500, the limit imposed by Kinesis.- See Also:
-
THROTTLING_SLEEP_METRIC
One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds). When the flow control mechanism is deactivated, the value should always be zero. If flow control kicks in, the value keeps increasing until shard ingestion rates are no longer tripped, then stabilizes, then slowly decreases back to zero (if the data rate was just a spike and is not sustained).- See Also:
-
-
Method Details
-
kinesis
@Nonnull public static <T> KinesisSinks.Builder<T> kinesis(@Nonnull String stream, @Nonnull FunctionEx<T, String> keyFn, @Nonnull FunctionEx<T, byte[]> valueFn) Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).The basic unit of data stored in KDS is the record. A record is composed of a sequence number, partition key, and data blob. KDS assigns the sequence numbers on ingestion; the other two have to be specified by the sink.
The key function we provide specifies how to assign partition keys to the items to be published. The partition keys have the role of grouping related records so that Kinesis can handle them accordingly. Partition keys are Unicode strings with a maximum length of 256 characters.
The value function we provide specifies how to extract the useful data content from the items to be published. It's basically serialization of the messages (Kinesis handles neither serialization nor deserialization internally). The length of the resulting byte array, plus the length of the partition key, can't be longer than 1MiB.
The sink is distributed Each instance handles a subset of possible partition keys. In Jet terms, the sink processors' input edges are distributed and partitioned by the same key function we provide for computing the partition keys. As a result, each item with the same partition key will end up in the same distributed sink instance.
The sink can be used in both streaming and batching pipelines.
The only processing guarantee the sink can support is at-least-once. This is caused by the lack of transaction support in Kinesis (can't write data into it with transactional guarantees) and the AWS SDK occasionally causing data duplication on its own (@see Producer Retries in the documentation).
The sink preserves the ordering of items for the same partition key, as long as the stream's ingestion rate is not tripped. Ingestion rates in Kinesis are specified on a per shard basis, which is the base throughput unit. One shard provides a capacity of 1MiB/sec data input and 2MiB/sec data output. One shard can support up to 1000 record publications per second. The owner of the Kinesis stream specifies the number of shards needed when creating the stream. It can be changed later without stopping the data flow.
If the sinks attempt to write more into a shard than allowed, some records will be rejected. This rejection breaks the ordering because the sinks write data in batches, and the shards don't just reject entire batches but random records from them. What's rejected can (and is) retried, but the batch's original ordering can't be preserved.
The sink cannot avoid rejections entirely because multiple instances of it write into the same shard, and coordinating an aggregated rate among them is not something currently possible in Jet.
The sink has a flow control mechanism, which tries to minimize the amount of ingestion rate tripping, when it starts happening, by reducing the send batch size and introducing adaptive sleep delays between consecutive sends. However, the only sure way to avoid the problem is having enough shards and a good spread of partition keys (partition keys are assigned to shard based on an MD5 hash function; each shard owns a partition of the hash space).
The sink exposes metrics that can be used to monitor the flow control mechanism. One of them is the
send batch size
; the other one is thesleep delay between consecutive sends
.- Parameters:
stream
- name of the Kinesis stream being written intokeyFn
- function for computing partition keysvalueFn
- function for computing serialized message data content- Returns:
- fluent builder that can be used to set properties and also to construct the sink once configuration is done
-
kinesis
@Nonnull public static KinesisSinks.Builder<Map.Entry<String,byte[]>> kinesis(@Nonnull String stream) Convenience method for a specific type of sink, one that ingests items of typeMap.Entry<String, byte[]>
and assumes that the entries key is the partition key and the entries value is the record data blob. No explicit key nor value function needs to be specified.
-