public final class KafkaSources extends Object
|Modifier and Type||Method and Description|
Returns a source that consumes one or more Apache Kafka topics and emits items from them as
@Nonnull public static <K,V> StreamSource<Map.Entry<K,V>> kafka(@Nonnull Properties properties, @Nonnull String... topics)
kafka(Properties, FunctionEx, String...)wrapping the output in
@Nonnull public static <K,V,T> StreamSource<T> kafka(@Nonnull Properties properties, @Nonnull com.hazelcast.function.FunctionEx<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>,T> projectionFn, @Nonnull String... topics)
The source creates a
KafkaConsumer for each
instance using the supplied
properties. It assigns a subset of
Kafka partitions to each of them using manual partition assignment (it
group.id property). Default local parallelism for
this processor is 2 (or less if less CPUs are available). The Kafka's
message timestamp will be used as a native timestamp.
If snapshotting is enabled, partition offsets are saved to the snapshot. After a restart, the source emits the events from the same offsets.
If you start a new job from an exported state, you can change the source parameters as needed:
The source can work in two modes:
auto.offset.resetproperty). You can enable offset committing by assigning a
group.id, enabling auto offset committing using
auto.commit.interval.msin the given properties. Refer to Kafka documentation for the descriptions of these properties.
metadata.max.age.msKafka property. Note, however, that events from them can be dropped as late if the allowed lag is not large enough.
The processor never completes, it can only fail in the case of an error.
However, IO failures are generally handled by Kafka producer and do not
cause the processor to fail. Kafka consumer also does not return from
poll(timeout) if the cluster is down. If
snapshotting is enabled,
entire job might be blocked. This is a known issue of Kafka
(KAFKA-1894, now fixed). Refer to Kafka documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs
are available). Note that deserialization is done inside
KafkaConsumer. If you have high traffic, the deserialization might
become a bottleneck - increase the local parallelism or use
byte for messages and deserialize manually in a subsequent mapping
properties- consumer properties broker address and key/value deserializers
projectionFn- function to create output objects from the Kafka record. If the projection returns a
nullfor an item, that item will be filtered out.
topics- the topics to consume, at least one is required
Copyright © 2020 Hazelcast, Inc.. All rights reserved.