T
- event typepublic class WatermarkSourceUtil<T> extends Object
Watermark
s from a source which reads
events from multiple external partitions.
Traverser
that holds the output data. Your source can follow this
pattern:
public boolean complete() {
if (traverser == null) {
List<Record> records = poll();
if (records.isEmpty()) {
traverser = wmSourceUtil.handleNoEvent();
} else {
traverser = traverserIterable(records)
.flatMap(event -> wmSourceUtil.handleEvent(
event, event.getPartition()));
}
traverser = traverser.onFirstNull(() -> traverser = null);
}
emitFromTraverser(traverser, event -> {
if (!(event instanceof Watermark)) {
// store your offset after event was emitted
offsetsMap.put(event.getPartition(), event.getOffset());
}
});
return false;
}
Other methods:
increasePartitionCount(int)
to set your partition count
initially or whenever the count increases.
getWatermark(int)
for all partitions to the snapshot. When restoring the
state, call restoreWatermark(int, long)
.
broadcastKey()
, because the external partitions don't match Hazelcast
partitions. This way, all processor instances will see all keys and they
can restore partition they handle and ignore others.
Constructor and Description |
---|
WatermarkSourceUtil(WatermarkGenerationParams<? super T> params)
A constructor.
|
Modifier and Type | Method and Description |
---|---|
long |
getWatermark(int partitionIndex)
Watermark value to be saved to state snapshot for the given source
partition index.
|
Traverser<Object> |
handleEvent(T event,
int partitionIndex)
Flat-maps the given
event by (possibly) prepending it with a
watermark. |
Traverser<Object> |
handleNoEvent()
Call this method when there is no event coming.
|
void |
increasePartitionCount(int newPartitionCount)
Changes the partition count.
|
void |
restoreWatermark(int partitionIndex,
long wm)
Restore watermark value from state snapshot.
|
public WatermarkSourceUtil(WatermarkGenerationParams<? super T> params)
The partition count is initially set to 0, call increasePartitionCount(int)
to set it.
@Nonnull public Traverser<Object> handleEvent(T event, int partitionIndex)
event
by (possibly) prepending it with a
watermark. Designed to use when emitting from traverser:
Traverser t = traverserIterable(...)
.flatMap(event -> watermarkSourceUtil.flatMap(event, event.getPartition()));
@Nonnull public Traverser<Object> handleNoEvent()
next()
on the result.public void increasePartitionCount(int newPartitionCount)
You can call this method at any moment. Added partitions will be considered active initially.
newPartitionCount
- partition count, must be higher than the
current countpublic long getWatermark(int partitionIndex)
restored
to a processor handling the same
partition after restart.
Method is meant to be used from Processor.saveToSnapshot()
.
partitionIndex
- 0-based source partition index.public void restoreWatermark(int partitionIndex, long wm)
Method is meant to be used from Processor.restoreFromSnapshot(Inbox)
.
See getWatermark(int)
.
partitionIndex
- 0-based source partition index.wm
- Watermark value to restoreCopyright © 2018 Hazelcast, Inc.. All rights reserved.