Package com.hazelcast.jet.core
Class EventTimeMapper<T>
java.lang.Object
com.hazelcast.jet.core.EventTimeMapper<T>
- Type Parameters:
T
- the event type
A utility that helps a source emit events according to a given
EventTimePolicy
. Generally this class should be used if a source needs
to emit watermarks
. The mapper deals with the
following concerns:
1. Reading partition by partition
Upon restart it can happen that partition P1 has one very recent event and P2 has an old one. If we poll P1 first and emit its recent event, it will advance the watermark. When we poll P2 later on, its event will be behind the watermark and can be dropped as late. This utility tracks the event timestamps for each source partition individually and allows the processor to emit the watermark that is correct with respect to all the partitions.2. Some partition having no data
It can happen that some partition does not have any events at all while others do, or the processor doesn't get any external partitions assigned to it. If we simply wait for the timestamps in all partitions to advance to some point, we won't be emitting any watermarks. This utility supports the idle timeout: if there's no new data from a partition after the timeout elapses, it will be marked as idle, allowing the processor's watermark to advance as if that partition didn't exist. If all partitions are idle or there are no partitions, the processor will emit a special idle message and the downstream will exclude this processor from watermark coalescing.3. Wrapping of events
Events may need to be wrapped with the extracted timestamp ifEventTimePolicy.wrapFn()
is set.
4. Throttling of Watermarks
Watermarks are only consumed by windowing operations and emitting watermarks more frequently than the givenEventTimePolicy.watermarkThrottlingFrameSize()
is wasteful since they
are broadcast to all processors. The mapper ensures that watermarks are
emitted according to the throttling frame size.
Usage
The API is designed to be used as a flat-mapping step in theTraverser
that holds the output data. Your source can follow this
pattern:
public boolean complete() {
if (traverser == null) {
List<Record> records = poll();
if (records.isEmpty()) {
traverser = eventTimeMapper.flatMapIdle();
} else {
traverser = traverseIterable(records)
.flatMap(event -> eventTimeMapper.flatMapEvent(
event, event.getPartition()));
}
traverser = traverser.onFirstNull(() -> traverser = null);
}
emitFromTraverser(traverser, event -> {
if (!(event instanceof Watermark)) {
// store your offset after event was emitted
offsetsMap.put(event.getPartition(), event.getOffset());
}
});
return false;
}
Other methods:
-
Call
addPartitions(int)
andremovePartition(int)
to change your partition count initially or whenever the count changes. -
If you support state snapshots, save the value returned by
getWatermark(int)
for all partitions to the snapshot. When restoring the state, callrestoreWatermark(int, long)
.
You should save the value under your external partition key so that the watermark value can be restored to correct processor instance. The key should also be wrapped usingbroadcastKey()
, because the external partitions don't match Hazelcast partitions. This way, all processor instances will see all keys and they can restore the partitions they handle and ignore others.
- Since:
- Jet 3.0
-
Field Summary
Modifier and TypeFieldDescriptionstatic final long
Value to use as thenativeEventTime
argument when callingflatMapEvent(Object, int, long)
when there's no native event time to supply. -
Constructor Summary
ConstructorDescriptionEventTimeMapper
(EventTimePolicy<? super T> eventTimePolicy) The partition count is initially set to 0, calladdPartitions(int)
to add partitions. -
Method Summary
Modifier and TypeMethodDescriptionvoid
addPartitions
(int addedCount) AddsaddedCount
partitions.flatMapEvent
(long now, T event, int partitionIndex, long nativeEventTime) A lower-level variant offlatMapEvent(T, int, long)
that accepts an explicit result of aSystem.nanoTime()
call.flatMapEvent
(T event, int partitionIndex, long nativeEventTime) Flat-maps the givenevent
by (possibly) prepending it with a watermark.Call this method when there is no event to emit.long
getWatermark
(int partitionIndex) Watermark value to be saved to state snapshot for the given source partition index.int
Returns the current partition count.removePartition
(int partitionIndex) Removes a partition that will no longer have events.void
restoreWatermark
(int partitionIndex, long wm) Restore watermark value from state snapshot.
-
Field Details
-
NO_NATIVE_TIME
public static final long NO_NATIVE_TIMEValue to use as thenativeEventTime
argument when callingflatMapEvent(Object, int, long)
when there's no native event time to supply.- See Also:
-
-
Constructor Details
-
EventTimeMapper
The partition count is initially set to 0, calladdPartitions(int)
to add partitions.- Parameters:
eventTimePolicy
- event time policy as passed inSources.streamFromProcessorWithWatermarks(java.lang.String, boolean, com.hazelcast.function.FunctionEx<com.hazelcast.jet.core.EventTimePolicy<? super T>, com.hazelcast.jet.core.ProcessorMetaSupplier>)
-
-
Method Details
-
flatMapEvent
@Nonnull public Traverser<Object> flatMapEvent(@Nullable T event, int partitionIndex, long nativeEventTime) Flat-maps the givenevent
by (possibly) prepending it with a watermark. Designed to use when emitting from traverser:Traverser t = traverserIterable(...) .flatMap(event -> eventTimeMapper.flatMapEvent( event, event.getPartition(), nativeEventTime));
- Parameters:
event
- the event to flat-map. Ifnull
, it's equivalent to the behavior offlatMapIdle()
partitionIndex
- the source partition index the event came fromnativeEventTime
- native event time in case notimestampFn
was supplied orNO_NATIVE_TIME
if the event has no native timestamp- Returns:
- a traverser over the given event and the watermark (if it was due)
-
flatMapIdle
Call this method when there is no event to emit. It returns a traverser over the watermark, if it was due. -
flatMapEvent
public Traverser<Object> flatMapEvent(long now, @Nullable T event, int partitionIndex, long nativeEventTime) A lower-level variant offlatMapEvent(T, int, long)
that accepts an explicit result of aSystem.nanoTime()
call. Use this variant if you're calling it in a hot loop, in order to avoid repeating the expensiveSystem.nanoTime()
call. -
addPartitions
public void addPartitions(int addedCount) AddsaddedCount
partitions. Added partitions will be initially considered active and having watermark value equal to the last emitted watermark. Their indices will follow current highest index.You can call this method whenever new partitions are detected.
- Parameters:
addedCount
- number of added partitions, must be >= 0
-
removePartition
Removes a partition that will no longer have events. If we were waiting for a watermark from it, the returned traverser might contain a watermark to emit.Note that the indexes of partitions with index larger than the given
partitionIndex
will be decremented by 1.- Parameters:
partitionIndex
- the index of the removed partition
-
partitionCount
public int partitionCount()Returns the current partition count. -
getWatermark
public long getWatermark(int partitionIndex) Watermark value to be saved to state snapshot for the given source partition index. The returned value should berestored
to a processor handling the same partition after restart.Method is meant to be used from
Processor.saveToSnapshot()
.- Parameters:
partitionIndex
- 0-based source partition index.- Returns:
- A value to save to state snapshot
-
restoreWatermark
public void restoreWatermark(int partitionIndex, long wm) Restore watermark value from state snapshot.Method is meant to be used from
Processor.restoreFromSnapshot(Inbox)
.See
getWatermark(int)
.- Parameters:
partitionIndex
- 0-based source partition index.wm
- watermark value to restore
-