This manual is for an old version of Hazelcast Jet, use the latest stable version.

Apache Kafka is a production-worthy choice of both source and sink for infinite stream processing jobs. It supports fault tolerance and snapshotting. The basic paradigm is that of a distributed publish/subscribe message queue. Jet's Kafka Source subscribes to a Kafka topic and the sink publishes events to a Kafka topic.

The following code will consume from topics t1 and t2 and then write to t3:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.serializer", IntegerSerializer.class.getCanonicalName());
props.setProperty("value.deserializer", IntegerDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");

p.drawFrom(KafkaSources.kafka(props, "t1", "t2"))
 .drainTo(KafkaSinks.kafka(props, "t3"));

Using Kafka as a Source

The Kafka source emits entries of type Map.Entry<Key,Value> which can be transformed using an optional mapping function. It never completes. The job will end only if explicitly cancelled or aborted due to an error.

Internally Jet creates one KafkaConsumer per Processor instance using the supplied properties. Jet uses manual partition assignment to arrange the available Kafka partitions among the available processors and will ignore the group.id property.

Currently there is a requirement that the global parallelism of the Kafka source be at most the number of partitions you are subscribing to. The local parallelism of the Kafka source is 2 and if your Jet cluster has 4 members, this means that a minimum of 8 Kafka partitions must be available.

If any new partitions are added while the job is running, Jet will automatically assign them to the existing processors and consume them from the beginning.

Processing Guarantees

The Kafka source supports snapshots. Upon each snapshot it saves the current offset for each partition. When the job is restarted from a snapshot, the source can continue reading from the saved offset.

If snapshots are disabled, the source will commit the offset of the last record it read to the Kafka cluster. Since the fact that the source read an item doesn't mean that the whole Jet pipeline processed it, this doesn't guarantee against data loss.

Using Kafka as a Sink

The Kafka sink creates one KafkaProducer per cluster member and shares it among all the sink processors on that member. You can provide a mapping function that transforms the items the sink receives into ProducerRecords.