T - event typepublic class WatermarkSourceUtil<T> extends Object
Watermarks from a source which reads
events from multiple external partitions.
Traverser. Your
source might follow this pattern:
public boolean complete() {
if (traverser == null) {
List<Record> records = poll(); // get a batch of events from external source
if (records.isEmpty()) {
traverser = watermarkSourceUtil.handleNoEvent();
} else {
traverser = traverserIterable(records)
.flatMap(event -> watermarkSourceUtil.handleEvent(event, event.getPartition()));
}
traverser = traverser.onFirstNull(() -> traverser = null);
}
emitFromTraverser(traverser, event -> {
if (!(event instanceof Watermark)) {
// store your offset after event was emitted
offsetsMap.put(event.getPartition(), event.getOffset());
}
});
return false;
}
Other methods:
increasePartitionCount(int) to set your partition count
initially or whenever the count increases.
getWatermark(int) for all partitions to the snapshot. When restoring the
state, call restoreWatermark(int, long).broadcastKey(), because the external partitions don't match Hazelcast
partitions. This way, all processor instances will see all keys and they
can restore partition they handle and ignore others.
| Constructor and Description |
|---|
WatermarkSourceUtil(WatermarkGenerationParams<? super T> params)
A constructor.
|
| Modifier and Type | Method and Description |
|---|---|
long |
getWatermark(int partitionIndex)
Watermark value to be saved to state snapshot for the given source
partition index.
|
Traverser<Object> |
handleEvent(T event,
int partitionIndex)
Flat-maps the given
event by (possibly) prepending it with a
watermark. |
Traverser<Object> |
handleNoEvent()
Call this method when there is no event coming.
|
void |
increasePartitionCount(int newPartitionCount)
Changes the partition count.
|
void |
restoreWatermark(int partitionIndex,
long wm)
Restore watermark value from state snapshot.
|
public WatermarkSourceUtil(WatermarkGenerationParams<? super T> params)
The partition count is initially set to 0, call increasePartitionCount(int) to set it.
@Nonnull public Traverser<Object> handleEvent(T event, int partitionIndex)
event by (possibly) prepending it with a
watermark. Designed to use when emitting from traverser:
Traverser t = traverserIterable(...)
.flatMap(event -> watermarkSourceUtil.flatMap(event, event.getPartition()));
@Nonnull public Traverser<Object> handleNoEvent()
next() on the result.public void increasePartitionCount(int newPartitionCount)
You can call this method at any moment. Added partitions will be considered active initially.
newPartitionCount - partition count, must be higher than the
current countpublic long getWatermark(int partitionIndex)
restored to a processor handling the same
partition after restart.
Method is meant to be used from Processor.saveToSnapshot().
partitionIndex - 0-based source partition index.public void restoreWatermark(int partitionIndex,
long wm)
Method is meant to be used from Processor.restoreFromSnapshot(Inbox).
See getWatermark(int).
partitionIndex - 0-based source partition index.wm - Watermark value to restoreCopyright © 2018 Hazelcast, Inc.. All rights reserved.