public final class KafkaSources extends Object
Modifier and Type | Method and Description |
---|---|
static <K,V,T> StreamSource<T> |
kafka(DataConnectionRef dataConnectionRef,
FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn,
String... topics)
Returns a source that consumes one or more Apache Kafka topics and emits
items from them as
Map.Entry instances. |
static <K,V> StreamSource<Map.Entry<K,V>> |
kafka(DataConnectionRef dataConnectionRef,
String... topics)
Convenience for
kafka(DataConnectionRef, FunctionEx, String...)
wrapping the output in Map.Entry . |
static <K,V,T> StreamSource<T> |
kafka(Properties properties,
FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn,
String... topics)
Returns a source that consumes one or more Apache Kafka topics and emits
items from them as
Map.Entry instances. |
static <K,V,T> StreamSource<T> |
kafka(Properties properties,
FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn,
TopicsConfig topicsConfig)
Returns a source that consumes one or more Apache Kafka topics and emits
items from them as
Map.Entry instances. |
static <K,V> StreamSource<Map.Entry<K,V>> |
kafka(Properties properties,
String... topics)
Convenience for
kafka(Properties, FunctionEx, String...)
wrapping the output in Map.Entry . |
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull Properties properties, @Nonnull String... topics)
kafka(Properties, FunctionEx, String...)
wrapping the output in Map.Entry
.@Beta @Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull String... topics)
kafka(DataConnectionRef, FunctionEx, String...)
wrapping the output in Map.Entry
.@Nonnull public static <K,V,T> StreamSource<T> kafka(@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull TopicsConfig topicsConfig)
Map.Entry
instances.
The source creates a KafkaConsumer
for each Processor
instance using the supplied properties
. It assigns a subset of
Kafka partitions to each of them using manual partition assignment (it
ignores the group.id
property). The Kafka's message timestamp
will be used as a native timestamp.
If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
The source can work in two modes:
auto.offset.reset
property). You can enable offset committing by assigning a group.id
, enabling auto offset committing using enable.auto.commit
and configuring auto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the
descriptions of these properties.
topicConfig
will always have a priority.
That means even if group.id
property was specified, topicConfig
offsets
will have a precedence over offsets associated with given consumer group.
If you add Kafka partitions at run-time, consumption from them will
start after a delay, based on the metadata.max.age.ms
Kafka
property. Note, however, that events from them can be dropped as late if
the allowed lag is not large enough.
The processor never completes, it can only fail in the case of an error.
However, IO failures are generally handled by Kafka producer and do not
cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. If snapshotting is enabled
,
entire job might be blocked. This is a known issue of Kafka
(KAFKA-1894, now fixed). Refer to Kafka documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs
are available). Note that deserialization is done inside KafkaConsumer
. If you have high traffic, the deserialization might
become a bottleneck - increase the local parallelism or use byte[]
for messages and deserialize manually in a subsequent mapping
step.
properties
- consumer properties broker address and key/value
deserializersprojectionFn
- function to create output objects from the Kafka record.
If the projection returns a null
for an item,
that item will be filtered out.topicsConfig
- configuration for the topics to consume, at least one
topic must be specified in the configuration.@Nonnull public static <K,V,T> StreamSource<T> kafka(@Nonnull Properties properties, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull String... topics)
Map.Entry
instances.
The source creates a KafkaConsumer
for each Processor
instance using the supplied properties
. It assigns a subset of
Kafka partitions to each of them using manual partition assignment (it
ignores the group.id
property). The Kafka's message timestamp
will be used as a native timestamp.
If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
The source can work in two modes:
auto.offset.reset
property). You can enable offset committing by assigning a group.id
, enabling auto offset committing using enable.auto.commit
and configuring auto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the
descriptions of these properties.
metadata.max.age.ms
Kafka
property. Note, however, that events from them can be dropped as late if
the allowed lag is not large enough.
The processor never completes, it can only fail in the case of an error.
However, IO failures are generally handled by Kafka producer and do not
cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. If snapshotting is enabled
,
entire job might be blocked. This is a known issue of Kafka
(KAFKA-1894, now fixed). Refer to Kafka documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs
are available). Note that deserialization is done inside KafkaConsumer
. If you have high traffic, the deserialization might
become a bottleneck - increase the local parallelism or use byte[]
for messages and deserialize manually in a subsequent mapping
step.
properties
- consumer properties broker address and key/value
deserializersprojectionFn
- function to create output objects from the Kafka record.
If the projection returns a null
for an item,
that item will be filtered out.topics
- the topics to consume, at least one is required@Beta @Nonnull public static <K,V,T> StreamSource<T> kafka(@Nonnull DataConnectionRef dataConnectionRef, @Nonnull FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull String... topics)
Map.Entry
instances.
The source uses the supplied DataConnection to obtain a new KafkaConsumer
instance for each Processor
. It assigns a subset of
Kafka partitions to each of them using manual partition assignment (it
ignores the group.id
property). The Kafka's message timestamp
will be used as a native timestamp.
If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
The source can work in two modes:
auto.offset.reset
property). You can enable offset committing by assigning a group.id
, enabling auto offset committing using enable.auto.commit
and configuring auto.commit.interval.ms
in the given properties. Refer to Kafka documentation for the
descriptions of these properties.
If you add Kafka partitions at run-time, consumption from them will
start after a delay, based on the metadata.max.age.ms
Kafka
property. Note, however, that events from them can be dropped as late if
the allowed lag is not large enough.
The processor never completes, it can only fail in the case of an error.
However, IO failures are generally handled by Kafka producer and do not
cause the processor to fail. Kafka consumer also does not return from
poll(timeout)
if the cluster is down. If snapshotting is enabled
,
entire job might be blocked. This is a known issue of Kafka
(KAFKA-1894, now fixed). Refer to Kafka documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs
are available). Note that deserialization is done inside KafkaConsumer
. If you have high traffic, the deserialization might
become a bottleneck - increase the local parallelism or use byte[]
for messages and deserialize manually in a subsequent mapping
step.
dataConnectionRef
- dataConnectionRef to an existing KafkaDataConnection that will be
used to create consumersprojectionFn
- function to create output objects from the Kafka record.
If the projection returns a null
for an item,
that item will be filtered out.topics
- the topics to consume, at least one is requiredCopyright © 2024 Hazelcast, Inc.. All rights reserved.