T - the event typepublic class EventTimeMapper<T> extends Object
EventTimePolicy. Generally this class should be used if a source needs
 to emit watermarks. The mapper deals with the
 following concerns:
 EventTimePolicy.wrapFn() is set.
 EventTimePolicy.watermarkThrottlingFrameSize() is wasteful since they
 are broadcast to all processors. The mapper ensures that watermarks are
 emitted according to the throttling frame size.
 Traverser that holds the output data. Your source can follow this
 pattern:
 
 public boolean complete() {
     if (traverser == null) {
         List<Record> records = poll();
         if (records.isEmpty()) {
             traverser = eventTimeMapper.flatMapIdle();
         } else {
             traverser = traverseIterable(records)
                 .flatMap(event -> eventTimeMapper.flatMapEvent(
                      event, event.getPartition()));
         }
         traverser = traverser.onFirstNull(() -> traverser = null);
     }
     emitFromTraverser(traverser, event -> {
         if (!(event instanceof Watermark)) {
             // store your offset after event was emitted
             offsetsMap.put(event.getPartition(), event.getOffset());
         }
     });
     return false;
 }
 addPartitions(int) and removePartition(int) to change your
     partition count initially or whenever the count changes.
 getWatermark(int) for all partitions to the snapshot. When restoring the
     state, call restoreWatermark(int, long).
     broadcastKey(), because the external partitions don't match Hazelcast
     partitions. This way, all processor instances will see all keys and they
     can restore the partitions they handle and ignore others.
 | Modifier and Type | Field and Description | 
|---|---|
| static long | NO_NATIVE_TIMEValue to use as the  nativeEventTimeargument when callingflatMapEvent(Object, int, long)when there's no native event
 time to supply. | 
| Constructor and Description | 
|---|
| EventTimeMapper(EventTimePolicy<? super T> eventTimePolicy)The partition count is initially set to 0, call  addPartitions(int)to add partitions. | 
| Modifier and Type | Method and Description | 
|---|---|
| void | addPartitions(int addedCount)Adds  addedCountpartitions. | 
| Traverser<Object> | flatMapEvent(long now,
            T event,
            int partitionIndex,
            long nativeEventTime)A lower-level variant of  flatMapEvent(T, int, long)that accepts an explicit result of aSystem.nanoTime()call. | 
| Traverser<Object> | flatMapEvent(T event,
            int partitionIndex,
            long nativeEventTime)Flat-maps the given  eventby (possibly) prepending it with a
 watermark. | 
| Traverser<Object> | flatMapIdle()Call this method when there is no event to emit. | 
| long | getWatermark(int partitionIndex)Watermark value to be saved to state snapshot for the given source
 partition index. | 
| int | partitionCount()Returns the current partition count. | 
| Traverser<Object> | removePartition(int partitionIndex)Removes a partition that will no longer have events. | 
| void | restoreWatermark(int partitionIndex,
                long wm)Restore watermark value from state snapshot. | 
public static final long NO_NATIVE_TIME
nativeEventTime argument when calling
 flatMapEvent(Object, int, long) when there's no native event
 time to supply.public EventTimeMapper(EventTimePolicy<? super T> eventTimePolicy)
addPartitions(int)
 to add partitions.eventTimePolicy - event time policy as passed in Sources.streamFromProcessorWithWatermarks(java.lang.String, boolean, com.hazelcast.function.FunctionEx<com.hazelcast.jet.core.EventTimePolicy<? super T>, com.hazelcast.jet.core.ProcessorMetaSupplier>)@Nonnull public Traverser<Object> flatMapEvent(@Nullable T event, int partitionIndex, long nativeEventTime)
event by (possibly) prepending it with a
 watermark. Designed to use when emitting from traverser:
 
     Traverser t = traverserIterable(...)
         .flatMap(event -> eventTimeMapper.flatMapEvent(
                 event, event.getPartition(), nativeEventTime));
 event - the event to flat-map.
                        If null, it's equivalent to the behavior of flatMapIdle()partitionIndex - the source partition index the event came fromnativeEventTime - native event time in case no timestampFn was supplied or
                        NO_NATIVE_TIME if the event has no native timestamp@Nonnull public Traverser<Object> flatMapIdle()
public Traverser<Object> flatMapEvent(long now, @Nullable T event, int partitionIndex, long nativeEventTime)
flatMapEvent(T, int, long) that accepts an explicit result of a
 System.nanoTime() call. Use this variant if you're calling it in
 a hot loop, in order to avoid repeating the expensive
 System.nanoTime() call.public void addPartitions(int addedCount)
addedCount partitions. Added partitions will be initially
 considered active and having watermark value equal to the last
 emitted watermark. Their indices will follow current highest index.
 You can call this method whenever new partitions are detected.
addedCount - number of added partitions, must be >= 0public Traverser<Object> removePartition(int partitionIndex)
 Note that the indexes of partitions with index larger than the given
 partitionIndex will be decremented by 1.
partitionIndex - the index of the removed partitionpublic int partitionCount()
public long getWatermark(int partitionIndex)
restored to a processor handling the same
 partition after restart.
 
 Method is meant to be used from Processor.saveToSnapshot().
partitionIndex - 0-based source partition index.public void restoreWatermark(int partitionIndex,
                             long wm)
 Method is meant to be used from Processor.restoreFromSnapshot(Inbox).
 
 See getWatermark(int).
partitionIndex - 0-based source partition index.wm - watermark value to restoreCopyright © 2022 Hazelcast, Inc.. All rights reserved.