Class KinesisSinks


  • public final class KinesisSinks
    extends java.lang.Object
    Contains factory methods for creating Amazon Kinesis Data Streams (KDS) sinks.
    Since:
    Jet 4.4
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Class Description
      static class  KinesisSinks.Builder<T>
      Fluent builder for constructing the Kinesis sink and setting its configuration parameters.
    • Field Summary

      Fields 
      Modifier and Type Field Description
      static java.lang.String BATCH_SIZE_METRIC
      One of the metrics exposed by the sink used to monitor the current sending batch size.
      static int MAX_RECORD_SIZE
      The length of a record's data blob (byte array length), plus the record's key size (no.
      static int MAXIMUM_KEY_LENGTH
      Kinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.
      static java.lang.String THROTTLING_SLEEP_METRIC
      One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds).
    • Method Summary

      All Methods Static Methods Concrete Methods 
      Modifier and Type Method Description
      static KinesisSinks.Builder<java.util.Map.Entry<java.lang.String,​byte[]>> kinesis​(java.lang.String stream)
      Convenience method for a specific type of sink, one that ingests items of type Map.Entry<String, byte[]> and assumes that the entries key is the partition key and the entries value is the record data blob.
      static <T> KinesisSinks.Builder<T> kinesis​(java.lang.String stream, FunctionEx<T,​java.lang.String> keyFn, FunctionEx<T,​byte[]> valueFn)
      Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • MAXIMUM_KEY_LENGTH

        public static final int MAXIMUM_KEY_LENGTH
        Kinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.
        See Also:
        Constant Field Values
      • MAX_RECORD_SIZE

        public static final int MAX_RECORD_SIZE
        The length of a record's data blob (byte array length), plus the record's key size (no. of Unicode characters in the key), must not be larger than 1MiB.
        See Also:
        Constant Field Values
      • BATCH_SIZE_METRIC

        public static final java.lang.String BATCH_SIZE_METRIC
        One of the metrics exposed by the sink used to monitor the current sending batch size. The batch size is computed based on the number of shards in the stream. The more shards, the bigger the batch size, the more data the stream can ingest. The maximum value is 500, the limit imposed by Kinesis.
        See Also:
        Constant Field Values
      • THROTTLING_SLEEP_METRIC

        public static final java.lang.String THROTTLING_SLEEP_METRIC
        One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds). When the flow control mechanism is deactivated, the value should always be zero. If flow control kicks in, the value keeps increasing until shard ingestion rates are no longer tripped, then stabilizes, then slowly decreases back to zero (if the data rate was just a spike and is not sustained).
        See Also:
        Constant Field Values
    • Method Detail

      • kinesis

        @Nonnull
        public static <T> KinesisSinks.Builder<T> kinesis​(@Nonnull
                                                          java.lang.String stream,
                                                          @Nonnull
                                                          FunctionEx<T,​java.lang.String> keyFn,
                                                          @Nonnull
                                                          FunctionEx<T,​byte[]> valueFn)
        Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).

        The basic unit of data stored in KDS is the record. A record is composed of a sequence number, partition key, and data blob. KDS assigns the sequence numbers on ingestion; the other two have to be specified by the sink.

        The key function we provide specifies how to assign partition keys to the items to be published. The partition keys have the role of grouping related records so that Kinesis can handle them accordingly. Partition keys are Unicode strings with a maximum length of 256 characters.

        The value function we provide specifies how to extract the useful data content from the items to be published. It's basically serialization of the messages (Kinesis handles neither serialization nor deserialization internally). The length of the resulting byte array, plus the length of the partition key, can't be longer than 1MiB.

        The sink is distributed Each instance handles a subset of possible partition keys. In Jet terms, the sink processors' input edges are distributed and partitioned by the same key function we provide for computing the partition keys. As a result, each item with the same partition key will end up in the same distributed sink instance.

        The sink can be used in both streaming and batching pipelines.

        The only processing guarantee the sink can support is at-least-once. This is caused by the lack of transaction support in Kinesis (can't write data into it with transactional guarantees) and the AWS SDK occasionally causing data duplication on its own (@see Producer Retries in the documentation).

        The sink preserves the ordering of items for the same partition key, as long as the stream's ingestion rate is not tripped. Ingestion rates in Kinesis are specified on a per shard basis, which is the base throughput unit. One shard provides a capacity of 1MiB/sec data input and 2MiB/sec data output. One shard can support up to 1000 record publications per second. The owner of the Kinesis stream specifies the number of shards needed when creating the stream. It can be changed later without stopping the data flow.

        If the sinks attempt to write more into a shard than allowed, some records will be rejected. This rejection breaks the ordering because the sinks write data in batches, and the shards don't just reject entire batches but random records from them. What's rejected can (and is) retried, but the batch's original ordering can't be preserved.

        The sink cannot avoid rejections entirely because multiple instances of it write into the same shard, and coordinating an aggregated rate among them is not something currently possible in Jet.

        The sink has a flow control mechanism, which tries to minimize the amount of ingestion rate tripping, when it starts happening, by reducing the send batch size and introducing adaptive sleep delays between consecutive sends. However, the only sure way to avoid the problem is having enough shards and a good spread of partition keys (partition keys are assigned to shard based on an MD5 hash function; each shard owns a partition of the hash space).

        The sink exposes metrics that can be used to monitor the flow control mechanism. One of them is the send batch size; the other one is the sleep delay between consecutive sends.

        Parameters:
        stream - name of the Kinesis stream being written into
        keyFn - function for computing partition keys
        valueFn - function for computing serialized message data content
        Returns:
        fluent builder that can be used to set properties and also to construct the sink once configuration is done
      • kinesis

        @Nonnull
        public static KinesisSinks.Builder<java.util.Map.Entry<java.lang.String,​byte[]>> kinesis​(@Nonnull
                                                                                                       java.lang.String stream)
        Convenience method for a specific type of sink, one that ingests items of type Map.Entry<String, byte[]> and assumes that the entries key is the partition key and the entries value is the record data blob. No explicit key nor value function needs to be specified.