public abstract class AbstractProcessor extends Object implements Processor
Processor
with several levels of convenience:
Processor.init(Outbox, Context)
retains the supplied outbox
and the logger retrieved from the context.
process(n, inbox)
delegates to the matching
tryProcessN()
with each item received in the inbox. If the item
is a watermark, routes it to tryProcessWmN()
instead.
tryProcess(int, Object)
to which
the tryProcessN
methods delegate by default. It is convenient
to override it when the processor doesn't care which edge an item
originates from. Another convenient idiom is to override tryProcessN()
for one or two specially treated edges and override
tryProcess(int, Object)
to process the rest of the edges, which
are treated uniformly.
tryEmit(...)
methods avoid the need to deal with Outbox
directly.
emitFromTraverser(...)
methods handle the boilerplate of
emission from a traverser. They are especially useful in the
Processor.complete()
step when there is a collection of items to emit.
The Traversers
class contains traversers tailored to simplify
the implementation of complete()
.
FlatMapper
class additionally simplifies the
usage of emitFromTraverser()
inside tryProcess()
, in
a scenario where an input item results in a collection of output
items. FlatMapper
is obtained from one of the factory methods
flatMapper(...)
.
Modifier and Type | Class and Description |
---|---|
protected class |
AbstractProcessor.FlatMapper<T,R>
A helper that simplifies the implementation of
tryProcess(ordinal, item) for emitting collections. |
Processor.Context
Constructor and Description |
---|
AbstractProcessor() |
Modifier and Type | Method and Description |
---|---|
protected boolean |
emitFromTraverser(int[] ordinals,
Traverser<?> traverser)
Convenience for
emitFromTraverser(int[], Traverser, Consumer)
which emits to the specified ordinals. |
protected <E> boolean |
emitFromTraverser(int[] ordinals,
Traverser<E> traverser,
java.util.function.Consumer<? super E> onEmit)
Obtains items from the traverser and offers them to the outbox's buckets
identified in the supplied array.
|
protected boolean |
emitFromTraverser(int ordinal,
Traverser<?> traverser)
Convenience for
emitFromTraverser(int, Traverser, Consumer)
which emits to the specified ordinal. |
protected <E> boolean |
emitFromTraverser(int ordinal,
Traverser<E> traverser,
java.util.function.Consumer<? super E> onEmit)
Obtains items from the traverser and offers them to the outbox's buckets
identified in the supplied array.
|
protected boolean |
emitFromTraverser(Traverser<?> traverser)
Convenience for
emitFromTraverser(int, Traverser, Consumer)
which emits to all ordinals. |
protected <E> boolean |
emitFromTraverser(Traverser<E> traverser,
java.util.function.Consumer<? super E> onEmit)
Convenience for
emitFromTraverser(int, Traverser, Consumer)
which emits to all ordinals. |
protected <T extends Map.Entry<?,?>> |
emitFromTraverserToSnapshot(Traverser<T> traverser)
Obtains items from the traverser and offers them to the snapshot bucket
of the outbox.
|
protected <T,R> AbstractProcessor.FlatMapper<T,R> |
flatMapper(java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory of
AbstractProcessor.FlatMapper . |
protected <T,R> AbstractProcessor.FlatMapper<T,R> |
flatMapper(int[] ordinals,
java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory of
AbstractProcessor.FlatMapper . |
protected <T,R> AbstractProcessor.FlatMapper<T,R> |
flatMapper(int ordinal,
java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
Factory of
AbstractProcessor.FlatMapper . |
protected com.hazelcast.logging.ILogger |
getLogger()
Returns the logger associated with this processor instance.
|
void |
init(Outbox outbox,
Processor.Context context)
Initializes this processor with the outbox that the processing methods
must use to deposit their output items.
|
protected void |
init(Processor.Context context)
Method that can be overridden to perform any necessary initialization
for the processor.
|
boolean |
isCooperative()
Tells whether this processor is able to participate in cooperative
multithreading.
|
void |
process(int ordinal,
Inbox inbox)
Implements the boilerplate of dispatching against the ordinal,
taking items from the inbox one by one, and invoking the
processing logic on each.
|
void |
restoreFromSnapshot(Inbox inbox)
Implements the boilerplate of polling the inbox, casting the items to
Map.Entry , and extracting the key and value. |
protected void |
restoreFromSnapshot(Object key,
Object value)
Called to restore one key-value pair from the snapshot to processor's
internal state.
|
void |
setCooperative(boolean isCooperative)
Specifies what this processor's
isCooperative() method will return. |
protected boolean |
tryEmit(int[] ordinals,
Object item)
Offers the item to the outbox buckets identified in the supplied array.
|
protected boolean |
tryEmit(int ordinal,
Object item)
Offers the item to the outbox bucket at the supplied ordinal.
|
protected boolean |
tryEmit(Object item)
Offers the item to all the outbox buckets (except the snapshot outbox).
|
protected boolean |
tryEmitToSnapshot(Object key,
Object value)
Offers one key-value pair to the snapshot bucket.
|
protected boolean |
tryProcess(int ordinal,
Object item)
Tries to process the supplied input item, which was received from the
edge with the supplied ordinal.
|
protected boolean |
tryProcess0(Object item)
Tries to process the supplied input item, which was received from the
edge with ordinal 0.
|
protected boolean |
tryProcess1(Object item)
Tries to process the supplied input item, which was received from the
edge with ordinal 1.
|
protected boolean |
tryProcess2(Object item)
Tries to process the supplied input item, which was received from the
edge with ordinal 2.
|
protected boolean |
tryProcess3(Object item)
Tries to process the supplied input item, which was received from the
edge with ordinal 3.
|
protected boolean |
tryProcess4(Object item)
Tries to process the supplied input item, which was received from the
edge with ordinal 4.
|
protected boolean |
tryProcessWm(int ordinal,
Watermark wm)
Tries to process the supplied watermark, which was received from the
edge with the supplied ordinal.
|
protected boolean |
tryProcessWm0(Watermark wm) |
protected boolean |
tryProcessWm1(Watermark wm) |
protected boolean |
tryProcessWm2(Watermark wm) |
protected boolean |
tryProcessWm3(Watermark wm) |
protected boolean |
tryProcessWm4(Watermark wm) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
complete, completeEdge, finishSnapshotRestore, saveToSnapshot, tryProcess
public final void setCooperative(boolean isCooperative)
isCooperative()
method will return.
The method will have no effect if called after the processor has been
submitted to the execution service; therefore it should be called from the
ProcessorSupplier
that creates it or in processor's constructor.public boolean isCooperative()
Processor
A cooperative processor should also not attempt any blocking operations,
such as I/O operations, waiting for locks/semaphores or sleep
operations. Violations to this rule will manifest themselves as less
than 100% CPU usage under maximum load. The processor should also return
as soon as an item is rejected by the outbox (that is when the offer()
method returns false
).
If this processor declares itself cooperative, it will share a thread with other cooperative processors. Otherwise it will run in a dedicated Java thread.
Jet prefers cooperative processors because they result in greater overall throughput. A processor should be non-cooperative only if it involves blocking operations, which would cause all other processors on the same shared thread to starve.
Processor instances on single vertex are allowed to return different value, but single processor instance must return constant value.
The default implementation returns true
.
isCooperative
in interface Processor
public final void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context)
Processor
Processor.process(int, Inbox)
and Processor.complete()
).
The default implementation does nothing.
public final void restoreFromSnapshot(@Nonnull Inbox inbox)
Map.Entry
, and extracting the key and value. Forwards each
key-value pair to restoreFromSnapshot(Object, Object)
.restoreFromSnapshot
in interface Processor
public final void process(int ordinal, @Nonnull Inbox inbox)
protected void init(@Nonnull Processor.Context context) throws Exception
process()
and
Processor.complete()
), but after the outbox and logger
have been initialized.
Subclasses are not required to call this superclass method, it does nothing.
context
- the context
associated with this processorException
protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception
false
, in which case it will be called again later
with the same (ordinal, item)
combination.
The default implementation throws an UnsupportedOperationException
.
NOTE: unless the processor doesn't differentiate between
its inbound edges, the first choice should be leaving this method alone
and instead overriding the specific tryProcessN()
methods for
each ordinal the processor expects.
ordinal
- ordinal of the edge that delivered the itemitem
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcess0(@Nonnull Object item) throws Exception
false
, in which case it will be called again later with the same
item.
The default implementation delegates to tryProcess(0, item)
.
item
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcess1(@Nonnull Object item) throws Exception
false
, in which case it will be called again later with the same
item.
The default implementation delegates to tryProcess(1, item)
.
item
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcess2(@Nonnull Object item) throws Exception
false
, in which case it will be called again later with the same
item.
The default implementation delegates to tryProcess(2, item)
.
item
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcess3(@Nonnull Object item) throws Exception
false
, in which case it will be called again later with the same
item.
The default implementation delegates to tryProcess(3, item)
.
item
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcess4(@Nonnull Object item) throws Exception
false
, in which case it will be called again later with the same
item.
The default implementation delegates to tryProcess(4, item)
.
item
- item to be processedtrue
if this item has now been processed,
false
otherwise.Exception
protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm)
false
, in which case it will be called again later
with the same (ordinal, item)
combination.
The default implementation just emits the watermark downstream to all ordinals.
Caution for jobs with at-least-once guarantee
In snapshotted jobs with at-least-once processing guarantee it
can happen that the watermarks break the monotonicity requirement when
the job is restarted. This is caused by the fact that watermark, like any
other stream item, can be delivered duplicately after restart. This
means that after a restart, a processor can be asked to process a
watermark older than it already processed.
NOTE: unless the processor doesn't differentiate between
its inbound edges, the first choice should be leaving this method alone
and instead overriding the specific tryProcessWmN()
methods for
each ordinal the processor expects.
ordinal
- ordinal of the edge that delivered the watermarkwm
- watermark to be processedtrue
if this watermark has now been processed,
false
otherwise.protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value)
The default implementation throws an UnsupportedOperationException
, but it will not be called unless you
override Processor.saveToSnapshot()
.
key
- key of the entry from the snapshotvalue
- value of the entry from the snapshotprotected final com.hazelcast.logging.ILogger getLogger()
@CheckReturnValue protected final boolean tryEmit(int ordinal, @Nonnull Object item)
true
, if the item was accepted. If false
is
returned, the call must be retried later with the same (or equal) item.@CheckReturnValue protected final boolean tryEmit(@Nonnull Object item)
true
, if the item was accepted. If false
is
returned, the call must be retried later with the same (or equal) item.@CheckReturnValue protected final boolean tryEmit(int[] ordinals, @Nonnull Object item)
true
, if the item was accepted. If false
is
returned, the call must be retried later with the same (or equal) item.protected final <E> boolean emitFromTraverser(@Nonnull int[] ordinals, @Nonnull Traverser<E> traverser, @Nullable java.util.function.Consumer<? super E> onEmit)
onEmit
callback (if
supplied) for each emitted item. If the outbox refuses an item, it backs
off and returns false
.
If this method returns false
, then the caller must retain the
traverser and pass it again in the subsequent invocation of this method,
so as to resume emitting where it left off.
For simplified usage from tryProcess(ordinal, item)
methods, see AbstractProcessor.FlatMapper
.
ordinals
- ordinals of the target buckettraverser
- traverser over items to emitonEmit
- optional callback that gets notified of each emitted itemprotected final <E> boolean emitFromTraverser(int ordinal, @Nonnull Traverser<E> traverser, @Nullable java.util.function.Consumer<? super E> onEmit)
onEmit
callback (if
supplied) for each emitted item. If the outbox refuses an item, it backs
off and returns false
.
If this method returns false
, then the caller must retain the
traverser and pass it again in the subsequent invocation of this method,
so as to resume emitting where it left off.
For simplified usage in tryProcess(ordinal, item)
methods, see AbstractProcessor.FlatMapper
.
ordinal
- ordinal of the target buckettraverser
- traverser over items to emitonEmit
- optional callback that gets notified of each emitted itemprotected final <E> boolean emitFromTraverser(@Nonnull Traverser<E> traverser, @Nullable java.util.function.Consumer<? super E> onEmit)
emitFromTraverser(int, Traverser, Consumer)
which emits to all ordinals.protected final boolean emitFromTraverser(@Nonnull Traverser<?> traverser)
emitFromTraverser(int, Traverser, Consumer)
which emits to all ordinals.protected final boolean emitFromTraverser(int ordinal, @Nonnull Traverser<?> traverser)
emitFromTraverser(int, Traverser, Consumer)
which emits to the specified ordinal.protected final boolean emitFromTraverser(int[] ordinals, @Nonnull Traverser<?> traverser)
emitFromTraverser(int[], Traverser, Consumer)
which emits to the specified ordinals.@CheckReturnValue protected final boolean tryEmitToSnapshot(@Nonnull Object key, @Nonnull Object value)
The type of the offered key determines which processors receive the key
and value pair when it is restored. If the key is of type BroadcastKey
, the entry will be restored to all processor instances.
Otherwise, the key will be distributed according to default partitioning
and only a single processor instance will receive the key.
true
, if the item was accepted. If false
is
returned, the call must be retried later with the same (or equal) key
and value.protected final <T extends Map.Entry<?,?>> boolean emitFromTraverserToSnapshot(@Nonnull Traverser<T> traverser)
Map.Entry
and its key and value
are passed as the two arguments of tryEmitToSnapshot(Object, Object)
.
If the outbox refuses an item, it backs off and returns false
.
If this method returns false
, then the caller must retain the
traverser and pass it again in the subsequent invocation of this method,
so as to resume emitting where it left off.
The type of the offered key determines which processors receive the key
and value pair when it is restored. If the key is of type BroadcastKey
, the entry will be restored to all processor instances.
Otherwise, the key will be distributed according to default partitioning
and only a single processor instance will receive the key.
traverser
- traverser over the items to emit to the snapshot@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(int ordinal, @Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
AbstractProcessor.FlatMapper
. The FlatMapper
will emit items to
the given output ordinal.@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(@Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
AbstractProcessor.FlatMapper
. The FlatMapper
will emit items to
all defined output ordinals.@Nonnull protected final <T,R> AbstractProcessor.FlatMapper<T,R> flatMapper(int[] ordinals, @Nonnull java.util.function.Function<? super T,? extends Traverser<? extends R>> mapper)
AbstractProcessor.FlatMapper
. The FlatMapper
will emit items to
the ordinals identified in the array.Copyright © 2017 Hazelcast, Inc.. All Rights Reserved.