Apache Kafka is a production-worthy choice of both source and sink for infinite stream processing jobs. It supports fault tolerance and snapshotting. The basic paradigm is that of a distributed publish/subscribe message queue. Jet's Kafka Source subscribes to a Kafka topic and the sink publishes events to a Kafka topic.
The following code will consume from topics t1
and t2
and then write to
t3
:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.serializer", IntegerSerializer.class.getCanonicalName());
props.setProperty("value.deserializer", IntegerDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");
p.drawFrom(KafkaSources.kafka(props, "t1", "t2"))
.drainTo(KafkaSinks.kafka(props, "t3"));
Using Kafka as a Source
The Kafka source emits entries of type Map.Entry<Key,Value>
which
can be transformed using an optional mapping function. It never
completes. The job will end only if explicitly cancelled or aborted
due to an error.
Internally Jet creates one KafkaConsumer
per Processor
instance
using the supplied properties. Jet uses manual partition assignment
to arrange the available Kafka partitions among the available
processors and will ignore the group.id
property.
Currently there is a requirement that the global parallelism of the Kafka source be at most the number of partitions you are subscribing to. The local parallelism of the Kafka source is 2 and if your Jet cluster has 4 members, this means that a minimum of 8 Kafka partitions must be available.
If any new partitions are added while the job is running, Jet will automatically assign them to the existing processors and consume them from the beginning.
Processing Guarantees
The Kafka source supports snapshots. Upon each snapshot it saves the current offset for each partition. When the job is restarted from a snapshot, the source can continue reading from the saved offset.
If snapshots are disabled, the source will commit the offset of the last record it read to the Kafka cluster. Since the fact that the source read an item doesn't mean that the whole Jet pipeline processed it, this doesn't guarantee against data loss.
Using Kafka as a Sink
The Kafka sink creates one KafkaProducer
per cluster member and
shares it among all the sink processors on that member. You can
provide a mapping function that transforms the items the sink
receives into ProducerRecord
s.