Class AbstractProcessor
- java.lang.Object
-
- com.hazelcast.jet.core.AbstractProcessor
-
- All Implemented Interfaces:
Processor
- Direct Known Subclasses:
LongStreamSourceP
,ParallelBatchP
,ParallelStreamP
public abstract class AbstractProcessor extends java.lang.Object implements Processor
Base class to implement custom processors. Simplifies the contract ofProcessor
with several levels of convenience:-
Processor.init(Outbox, Context)
retains the supplied outbox and the logger retrieved from the context. -
process(n, inbox)
delegates to the matchingtryProcessN()
with each item received in the inbox. -
There is also the general
tryProcess(int, Object)
to which thetryProcessN
methods delegate by default. It is convenient to override it when the processor doesn't care which edge an item originates from. Another convenient idiom is to overridetryProcessN()
for one or two specially treated edges and overridetryProcess(int, Object)
to process the rest of the edges, which are treated uniformly. -
The
tryEmit(...)
methods avoid the need to deal withOutbox
directly. -
The
emitFromTraverser(...)
methods handle the boilerplate of emission from a traverser. They are especially useful in theProcessor.complete()
step when there is a collection of items to emit. TheTraversers
class contains traversers tailored to simplify the implementation ofcomplete()
. -
The
FlatMapper
class additionally simplifies the usage ofemitFromTraverser()
insidetryProcess()
, in a scenario where an input item results in a collection of output items.FlatMapper
is obtained from one of the factory methodsflatMapper(...)
.
- Since:
- Jet 3.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
AbstractProcessor.FlatMapper<T,R>
A helper that simplifies the implementation oftryProcess(ordinal, item)
for emitting collections.-
Nested classes/interfaces inherited from interface com.hazelcast.jet.core.Processor
Processor.Context
-
-
Constructor Summary
Constructors Constructor Description AbstractProcessor()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected <E> boolean
emitFromTraverser(int[] ordinals, Traverser<E> traverser)
Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array.protected <E> boolean
emitFromTraverser(int ordinal, Traverser<E> traverser)
Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array.protected boolean
emitFromTraverser(Traverser<?> traverser)
Convenience foremitFromTraverser(int, Traverser)
which emits to all ordinals.protected <T extends java.util.Map.Entry<?,?>>
booleanemitFromTraverserToSnapshot(Traverser<T> traverser)
Obtains items from the traverser and offers them to the snapshot bucket of the outbox.protected <T,R>
AbstractProcessor.FlatMapper<T,R>flatMapper(int[] ordinals, java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
.protected <T,R>
AbstractProcessor.FlatMapper<T,R>flatMapper(int ordinal, java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
.protected <T,R>
AbstractProcessor.FlatMapper<T,R>flatMapper(java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
.protected ILogger
getLogger()
Returns the logger associated with this processor instance.protected Outbox
getOutbox()
void
init(Outbox outbox, Processor.Context context)
Initializes this processor with the outbox that the processing methods must use to deposit their output items.protected void
init(Processor.Context context)
Method that can be overridden to perform any necessary initialization for the processor.protected void
keyedWatermarkCheck(Watermark watermark)
ThrowsUnsupportedOperationException
if watermark has non-zero key.void
process(int ordinal, Inbox inbox)
Implements the boilerplate of dispatching against the ordinal, taking items from the inbox one by one, and invoking the processing logic on each.void
restoreFromSnapshot(Inbox inbox)
Implements the boilerplate of polling the inbox, casting the items toMap.Entry
, and extracting the key and value.protected void
restoreFromSnapshot(java.lang.Object key, java.lang.Object value)
Called to restore one key-value pair from the snapshot to processor's internal state.protected boolean
tryEmit(int[] ordinals, java.lang.Object item)
Offers the item to the outbox buckets identified in the supplied array.protected boolean
tryEmit(int ordinal, java.lang.Object item)
Offers the item to the outbox bucket at the supplied ordinal.protected boolean
tryEmit(java.lang.Object item)
Offers the item to all the outbox buckets (except the snapshot outbox).protected boolean
tryEmitToSnapshot(java.lang.Object key, java.lang.Object value)
Offers one key-value pair to the snapshot bucket.protected boolean
tryProcess(int ordinal, java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with the supplied ordinal.protected boolean
tryProcess0(java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with ordinal 0.protected boolean
tryProcess1(java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with ordinal 1.protected boolean
tryProcess2(java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with ordinal 2.protected boolean
tryProcess3(java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with ordinal 3.protected boolean
tryProcess4(java.lang.Object item)
Tries to process the supplied input item, which was received from the edge with ordinal 4.boolean
tryProcessWatermark(Watermark watermark)
This basic implementation only forwards the passed watermark.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.hazelcast.jet.core.Processor
close, closeIsCooperative, complete, completeEdge, finishSnapshotRestore, isCooperative, saveToSnapshot, snapshotCommitFinish, snapshotCommitPrepare, tryProcess, tryProcessWatermark
-
-
-
-
Method Detail
-
init
public final void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws java.lang.Exception
Description copied from interface:Processor
Initializes this processor with the outbox that the processing methods must use to deposit their output items. This method will be called exactly once and strictly before any calls to other methods (except for theProcessor.isCooperative()
method).Even if this processor is cooperative, this method is allowed to do blocking operations.
The default implementation does nothing.
-
restoreFromSnapshot
public final void restoreFromSnapshot(@Nonnull Inbox inbox)
Implements the boilerplate of polling the inbox, casting the items toMap.Entry
, and extracting the key and value. Forwards each key-value pair torestoreFromSnapshot(Object, Object)
.- Specified by:
restoreFromSnapshot
in interfaceProcessor
-
process
public void process(int ordinal, @Nonnull Inbox inbox)
Implements the boilerplate of dispatching against the ordinal, taking items from the inbox one by one, and invoking the processing logic on each.
-
init
protected void init(@Nonnull Processor.Context context) throws java.lang.Exception
Method that can be overridden to perform any necessary initialization for the processor. It is called exactly once and strictly before any of the processing methods (process()
andProcessor.complete()
), but after the outbox andlogger
have been initialized.Subclasses are not required to call this superclass method, it does nothing.
- Parameters:
context
- thecontext
associated with this processor- Throws:
java.lang.Exception
-
tryProcess
protected boolean tryProcess(int ordinal, @Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with the supplied ordinal. May choose to process only partially and returnfalse
, in which case it will be called again later with the same(ordinal, item)
combination before any other processing method is called.The default implementation throws an
UnsupportedOperationException
.NOTE: unless the processor doesn't differentiate between its inbound edges, the first choice should be leaving this method alone and instead overriding the specific
tryProcessN()
methods for each ordinal the processor expects.- Parameters:
ordinal
- ordinal of the edge that delivered the itemitem
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
tryProcess0
protected boolean tryProcess0(@Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with ordinal 0. May choose to process only partially and returnfalse
, in which case it will be called again later with the same item before any other processing method is called.The default implementation delegates to
tryProcess(0, item)
.- Parameters:
item
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
tryProcess1
protected boolean tryProcess1(@Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with ordinal 1. May choose to process only partially and returnfalse
, in which case it will be called again later with the same item before any other processing method is called.The default implementation delegates to
tryProcess(1, item)
.- Parameters:
item
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
tryProcess2
protected boolean tryProcess2(@Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with ordinal 2. May choose to process only partially and returnfalse
, in which case it will be called again later with the same item before any other processing method is called.The default implementation delegates to
tryProcess(2, item)
.- Parameters:
item
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
tryProcess3
protected boolean tryProcess3(@Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with ordinal 3. May choose to process only partially and returnfalse
, in which case it will be called again later with the same item before any other processing method is called.The default implementation delegates to
tryProcess(3, item)
.- Parameters:
item
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
tryProcess4
protected boolean tryProcess4(@Nonnull java.lang.Object item) throws java.lang.Exception
Tries to process the supplied input item, which was received from the edge with ordinal 4. May choose to process only partially and returnfalse
, in which case it will be called again later with the same item before any other processing method is called.The default implementation delegates to
tryProcess(4, item)
.- Parameters:
item
- item to be processed- Returns:
true
if this item has now been processed,false
otherwise.- Throws:
java.lang.Exception
-
restoreFromSnapshot
protected void restoreFromSnapshot(@Nonnull java.lang.Object key, @Nonnull java.lang.Object value)
Called to restore one key-value pair from the snapshot to processor's internal state.The default implementation throws an
UnsupportedOperationException
, but it will not be called unless you overrideProcessor.saveToSnapshot()
.- Parameters:
key
- key of the entry from the snapshotvalue
- value of the entry from the snapshot
-
tryProcessWatermark
public boolean tryProcessWatermark(@Nonnull Watermark watermark)
This basic implementation only forwards the passed watermark.- Specified by:
tryProcessWatermark
in interfaceProcessor
- Parameters:
watermark
- watermark to be processed- Returns:
true
if this watermark has now been processed,false
to call this method again with the same watermark
-
getLogger
protected final ILogger getLogger()
Returns the logger associated with this processor instance.
-
getOutbox
protected final Outbox getOutbox()
-
tryEmit
@CheckReturnValue protected final boolean tryEmit(int ordinal, @Nonnull java.lang.Object item)
Offers the item to the outbox bucket at the supplied ordinal.Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.
- Returns:
true
, if the item was accepted. Iffalse
is returned, the call must be retried later with the same (or equal) item.
-
tryEmit
@CheckReturnValue protected final boolean tryEmit(@Nonnull java.lang.Object item)
Offers the item to all the outbox buckets (except the snapshot outbox).Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.
- Returns:
true
, if the item was accepted. Iffalse
is returned, the call must be retried later with the same (or equal) item.
-
tryEmit
@CheckReturnValue protected final boolean tryEmit(@Nonnull int[] ordinals, @Nonnull java.lang.Object item)
Offers the item to the outbox buckets identified in the supplied array.Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.
- Returns:
true
, if the item was accepted. Iffalse
is returned, the call must be retried later with the same (or equal) item.
-
emitFromTraverser
protected final <E> boolean emitFromTraverser(@Nonnull int[] ordinals, @Nonnull Traverser<E> traverser)
Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array. If the outbox refuses an item, it backs off and returnsfalse
.Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.
If this method returns
false
, then the caller must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where it left off.For simplified usage from
tryProcess(ordinal, item)
methods, seeAbstractProcessor.FlatMapper
.- Parameters:
ordinals
- ordinals of the target buckettraverser
- traverser over items to emit- Returns:
- whether the traverser has been exhausted
-
emitFromTraverser
protected final <E> boolean emitFromTraverser(int ordinal, @Nonnull Traverser<E> traverser)
Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array. If the outbox refuses an item, it backs off and returnsfalse
.Do not mutate the items you emit because the downstream processor may be using them in a different thread, resulting in concurrent access.
If this method returns
false
, then you must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where you left off.For simplified usage in
tryProcess(ordinal, item)
methods, seeAbstractProcessor.FlatMapper
.- Parameters:
ordinal
- ordinal of the target buckettraverser
- traverser over items to emit- Returns:
- whether the traverser has been exhausted
-
emitFromTraverser
protected final boolean emitFromTraverser(@Nonnull Traverser<?> traverser)
Convenience foremitFromTraverser(int, Traverser)
which emits to all ordinals.
-
tryEmitToSnapshot
@CheckReturnValue protected final boolean tryEmitToSnapshot(@Nonnull java.lang.Object key, @Nonnull java.lang.Object value)
Offers one key-value pair to the snapshot bucket.The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type
BroadcastKey
, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.
- Returns:
true
, if the item was accepted. Iffalse
is returned, the call must be retried later with the same (or equal) key and value.
-
emitFromTraverserToSnapshot
protected final <T extends java.util.Map.Entry<?,?>> boolean emitFromTraverserToSnapshot(@Nonnull Traverser<T> traverser)
Obtains items from the traverser and offers them to the snapshot bucket of the outbox. Each item is aMap.Entry
and its key and value are passed as the two arguments oftryEmitToSnapshot(Object, Object)
. If the outbox refuses an item, it backs off and returnsfalse
.Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.
If this method returns
false
, then the caller must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where it left off.The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type
BroadcastKey
, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.- Parameters:
traverser
- traverser over the items to emit to the snapshot- Returns:
- whether the traverser has been exhausted
-
flatMapper
@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(int ordinal, @Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
. TheFlatMapper
will emit items to the given output ordinal.
-
flatMapper
@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(@Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
. TheFlatMapper
will emit items to all defined output ordinals.
-
flatMapper
@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(@Nonnull int[] ordinals, @Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory ofAbstractProcessor.FlatMapper
. TheFlatMapper
will emit items to the ordinals identified in the array.
-
keyedWatermarkCheck
protected void keyedWatermarkCheck(Watermark watermark)
ThrowsUnsupportedOperationException
if watermark has non-zero key.Supposed to be used by processors that don't function properly with keyed watermarks.
-
-