Class KafkaSources

java.lang.Object
com.hazelcast.jet.kafka.KafkaSources

public final class KafkaSources extends Object
Contains factory methods for Apache Kafka sources.
Since:
Jet 3.0
  • Method Details

    • kafka

      @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull Properties properties, @Nonnull String... topics)
      Convenience for kafka(Properties, FunctionEx, String...) wrapping the output in Map.Entry.
    • kafka

      @Beta @Nonnull public static <K, V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull String... topics)
      Convenience for kafka(DataConnectionRef, FunctionEx, String...) wrapping the output in Map.Entry.
      Since:
      5.3
    • kafka

      @Nonnull public static <K, V, T> StreamSource<T> kafka(@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull TopicsConfig topicsConfig)
      Returns a source that consumes one or more Apache Kafka topics and emits items from them as Map.Entry instances.

      The source creates a KafkaConsumer for each Processor instance using the supplied properties. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores the group.id property). The Kafka's message timestamp will be used as a native timestamp.

      If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.

      If you start a new job from an exported state, you can change the source parameters as needed:

      • if you add a topic, it will be consumed from the default position
      • if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
      • if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
      • if the partition count is lower after a restart, the extra offsets will be ignored

      The source can work in two modes:

      1. if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
      2. if processing guarantee is disabled, the source will start reading from default offsets (based on the auto.offset.reset property). You can enable offset committing by assigning a group.id, enabling auto offset committing using enable.auto.commit and configuring auto.commit.interval.ms in the given properties. Refer to Kafka documentation for the descriptions of these properties.
      Additionally, topicConfig parameter can be used to specify exact initial offsets for given partitions that will be used later by the source consumer as a starting offsets when reading records from those partitions. Note that initial offsets provided in topicConfig will always have a priority. That means even if group.id property was specified, topicConfig offsets will have a precedence over offsets associated with given consumer group.

      If you add Kafka partitions at run-time, consumption from them will start after a delay, based on the metadata.max.age.ms Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.

      The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from poll(timeout) if the cluster is down. If snapshotting is enabled, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.

      The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside KafkaConsumer. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or use byte[] for messages and deserialize manually in a subsequent mapping step.

      Parameters:
      properties - consumer properties broker address and key/value deserializers
      projectionFn - function to create output objects from the Kafka record. If the projection returns a null for an item, that item will be filtered out.
      topicsConfig - configuration for the topics to consume, at least one topic must be specified in the configuration.
      Since:
      5.3
    • kafka

      @Nonnull public static <K, V, T> StreamSource<T> kafka(@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull String... topics)
      Returns a source that consumes one or more Apache Kafka topics and emits items from them as Map.Entry instances.

      The source creates a KafkaConsumer for each Processor instance using the supplied properties. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores the group.id property). The Kafka's message timestamp will be used as a native timestamp.

      If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.

      If you start a new job from an exported state, you can change the source parameters as needed:

      • if you add a topic, it will be consumed from the default position
      • if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
      • if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
      • if the partition count is lower after a restart, the extra offsets will be ignored

      The source can work in two modes:

      1. if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
      2. if processing guarantee is disabled, the source will start reading from default offsets (based on the auto.offset.reset property). You can enable offset committing by assigning a group.id, enabling auto offset committing using enable.auto.commit and configuring auto.commit.interval.ms in the given properties. Refer to Kafka documentation for the descriptions of these properties.
      If you add Kafka partitions at run-time, consumption from them will start after a delay, based on the metadata.max.age.ms Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.

      The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from poll(timeout) if the cluster is down. If snapshotting is enabled, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.

      The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside KafkaConsumer. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or use byte[] for messages and deserialize manually in a subsequent mapping step.

      Parameters:
      properties - consumer properties broker address and key/value deserializers
      projectionFn - function to create output objects from the Kafka record. If the projection returns a null for an item, that item will be filtered out.
      topics - the topics to consume, at least one is required
    • kafka

      @Beta @Nonnull public static <K, V, T> StreamSource<T> kafka(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull String... topics)
      Returns a source that consumes one or more Apache Kafka topics and emits items from them as Map.Entry instances.

      The source uses the supplied DataConnection to obtain a new KafkaConsumer instance for each Processor. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores the group.id property). The Kafka's message timestamp will be used as a native timestamp.

      If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.

      If you start a new job from an exported state, you can change the source parameters as needed:

      • if you add a topic, it will be consumed from the default position
      • if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
      • if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
      • if the partition count is lower after a restart, the extra offsets will be ignored

      The source can work in two modes:

      1. if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
      2. if processing guarantee is disabled, the source will start reading from default offsets (based on the auto.offset.reset property). You can enable offset committing by assigning a group.id, enabling auto offset committing using enable.auto.commit and configuring auto.commit.interval.ms in the given properties. Refer to Kafka documentation for the descriptions of these properties.

      If you add Kafka partitions at run-time, consumption from them will start after a delay, based on the metadata.max.age.ms Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.

      The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from poll(timeout) if the cluster is down. If snapshotting is enabled, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.

      The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside KafkaConsumer. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or use byte[] for messages and deserialize manually in a subsequent mapping step.

      Parameters:
      dataConnectionRef - dataConnectionRef to an existing KafkaDataConnection that will be used to create consumers
      projectionFn - function to create output objects from the Kafka record. If the projection returns a null for an item, that item will be filtered out.
      topics - the topics to consume, at least one is required
      Since:
      5.3