Class KafkaSources
- Since:
- Jet 3.0
-
Method Summary
Modifier and TypeMethodDescriptionstatic <K,
V, T> StreamSource<T> kafka
(DataConnectionRef dataConnectionRef, FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, String... topics) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.static <K,
V> StreamSource<Map.Entry<K, V>> kafka
(DataConnectionRef dataConnectionRef, String... topics) Convenience forkafka(DataConnectionRef, FunctionEx, String...)
wrapping the output inMap.Entry
.static <K,
V, T> StreamSource<T> kafka
(Properties properties, FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, TopicsConfig topicsConfig) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.static <K,
V, T> StreamSource<T> kafka
(Properties properties, FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, String... topics) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.static <K,
V> StreamSource<Map.Entry<K, V>> kafka
(Properties properties, String... topics) Convenience forkafka(Properties, FunctionEx, String...)
wrapping the output inMap.Entry
.
-
Method Details
-
kafka
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull Properties properties, @Nonnull String... topics) Convenience forkafka(Properties, FunctionEx, String...)
wrapping the output inMap.Entry
. -
kafka
@Beta @Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull String... topics) Convenience forkafka(DataConnectionRef, FunctionEx, String...)
wrapping the output inMap.Entry
.- Since:
- 5.3
-
kafka
@Nonnull public static <K,V, StreamSource<T> kafkaT> (@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, @Nonnull TopicsConfig topicsConfig) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.The source creates a
KafkaConsumer
for eachProcessor
instance using the suppliedproperties
. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores thegroup.id
property). The Kafka's message timestamp will be used as a native timestamp.If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
- if you add a topic, it will be consumed from the default position
- if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
- if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
- if the partition count is lower after a restart, the extra offsets will be ignored
The source can work in two modes:
- if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
- if processing guarantee is disabled, the source will start
reading from default offsets (based on the
auto.offset.reset
property). You can enable offset committing by assigning agroup.id
, enabling auto offset committing usingenable.auto.commit
and configuringauto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the descriptions of these properties.
topicConfig
will always have a priority. That means even ifgroup.id
property was specified,topicConfig
offsets will have a precedence over offsets associated with given consumer group.If you add Kafka partitions at run-time, consumption from them will start after a delay, based on the
metadata.max.age.ms
Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. Ifsnapshotting is enabled
, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside
KafkaConsumer
. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or usebyte[]
for messages and deserialize manually in a subsequent mapping step.- Parameters:
properties
- consumer properties broker address and key/value deserializersprojectionFn
- function to create output objects from the Kafka record. If the projection returns anull
for an item, that item will be filtered out.topicsConfig
- configuration for the topics to consume, at least one topic must be specified in the configuration.- Since:
- 5.3
-
kafka
@Nonnull public static <K,V, StreamSource<T> kafkaT> (@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, @Nonnull String... topics) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.The source creates a
KafkaConsumer
for eachProcessor
instance using the suppliedproperties
. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores thegroup.id
property). The Kafka's message timestamp will be used as a native timestamp.If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
- if you add a topic, it will be consumed from the default position
- if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
- if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
- if the partition count is lower after a restart, the extra offsets will be ignored
The source can work in two modes:
- if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
- if processing guarantee is disabled, the source will start
reading from default offsets (based on the
auto.offset.reset
property). You can enable offset committing by assigning agroup.id
, enabling auto offset committing usingenable.auto.commit
and configuringauto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the descriptions of these properties.
metadata.max.age.ms
Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. Ifsnapshotting is enabled
, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside
KafkaConsumer
. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or usebyte[]
for messages and deserialize manually in a subsequent mapping step.- Parameters:
properties
- consumer properties broker address and key/value deserializersprojectionFn
- function to create output objects from the Kafka record. If the projection returns anull
for an item, that item will be filtered out.topics
- the topics to consume, at least one is required
-
kafka
@Beta @Nonnull public static <K,V, StreamSource<T> kafkaT> (@Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>, T> projectionFn, @Nonnull String... topics) Returns a source that consumes one or more Apache Kafka topics and emits items from them asMap.Entry
instances.The source uses the supplied DataConnection to obtain a new
KafkaConsumer
instance for eachProcessor
. It assigns a subset of Kafka partitions to each of them using manual partition assignment (it ignores thegroup.id
property). The Kafka's message timestamp will be used as a native timestamp.If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
- if you add a topic, it will be consumed from the default position
- if you remove a topic, restored offsets for that topic will be ignored (there will be a warning logged)
- if you connect to another cluster, the offsets will be used based on the equality of the topic name. If you want to start from default position, give different name to this source
- if the partition count is lower after a restart, the extra offsets will be ignored
The source can work in two modes:
- if processing guarantee is enabled, offsets are stored to the snapshot and after a restart or failure, the reading continues from the saved offset. You can achieve exactly-once or at-least-once behavior.
- if processing guarantee is disabled, the source will start
reading from default offsets (based on the
auto.offset.reset
property). You can enable offset committing by assigning agroup.id
, enabling auto offset committing usingenable.auto.commit
and configuringauto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the descriptions of these properties.
If you add Kafka partitions at run-time, consumption from them will start after a delay, based on the
metadata.max.age.ms
Kafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.The processor never completes, it can only fail in the case of an error. However, IO failures are generally handled by Kafka producer and do not cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. Ifsnapshotting is enabled
, entire job might be blocked. This is a known issue of Kafka (KAFKA-1894, now fixed). Refer to Kafka documentation for details.The default local parallelism for this processor is 4 (or less if less CPUs are available). Note that deserialization is done inside
KafkaConsumer
. If you have high traffic, the deserialization might become a bottleneck - increase the local parallelism or usebyte[]
for messages and deserialize manually in a subsequent mapping step.- Parameters:
dataConnectionRef
- dataConnectionRef to an existing KafkaDataConnection that will be used to create consumersprojectionFn
- function to create output objects from the Kafka record. If the projection returns anull
for an item, that item will be filtered out.topics
- the topics to consume, at least one is required- Since:
- 5.3
-