public interface Processor
Processor on each cluster member to do the work of a given vertex. The
 vertex's localParallelism property controls the number of
 processors per member.
 The processor is a single-threaded processing unit that performs the computation needed to transform zero or more input data streams into zero or more output streams. Each input/output stream corresponds to an edge on the vertex. The correspondence between a stream and an edge is established via the edge's ordinal.
The special case of zero input streams applies to a source vertex, which gets its data from the environment. The special case of zero output streams applies to a sink vertex, which pushes its data to the environment.
 The processor accepts input from instances of Inbox and pushes
 its output to an instance of Outbox.
 
 See the isCooperative() for important restrictions to how the
 processor should work.
 
When the documentation in this class refers to processing methods, we mean all methods except for these:
 If this processor communicates with an external transactional store, after
 the snapshot is restored and before it executes any code in a processing
 method, it should roll back all transactions that this processor
 created. It should only roll back transactions created by this vertex and this
 job; it can use the vertex name and job ID passed to the init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context) method
 in the context to filter.
 
 Determining the list of transactions to rollback
 You can't store the IDs of the created transactions to the snapshot, as one
 might intuitively think. The job might run for a while after creating a
 snapshot and start a new transaction, and we need to roll that one too. The
 job might even fail before it creates the first snapshot.
 
There are multiple ways to tackle this:
jobId +
     vertexId + globalProcessorIndex + sequence
 
 Except for init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context), close(), isCooperative() and closeIsCooperative(), the methods are called in a tight loop with a
 possibly short back-off if the method does no work. "No work" is defined as
 adding nothing to outbox and taking nothing from inbox. If you do heavy work
 on each call (such as querying a remote service), you can do additional
 back-off: use sleep in a non-cooperative processor or do nothing if
 sufficient time didn't elapse.
| Modifier and Type | Interface and Description | 
|---|---|
| static interface  | Processor.ContextContext passed to the processor in the
  init()call. | 
| Modifier and Type | Method and Description | 
|---|---|
| default void | close()Called as the last method in the processor lifecycle. | 
| default boolean | closeIsCooperative()Returns  trueif theclose()method of this processor is
 cooperative. | 
| default boolean | complete()Called after all the inbound edges' streams are exhausted. | 
| default boolean | completeEdge(int ordinal)Called after the edge input with the supplied  ordinalis
 exhausted. | 
| default boolean | finishSnapshotRestore()Called after a job was restarted from a snapshot and the processor has
 consumed all the snapshot data in  restoreFromSnapshot(com.hazelcast.jet.core.Inbox). | 
| default void | init(Outbox outbox,
    Processor.Context context)Initializes this processor with the outbox that the processing
 methods must use to deposit their output items. | 
| default boolean | isCooperative()Tells whether this processor is able to participate in cooperative
 multithreading. | 
| default void | process(int ordinal,
       Inbox inbox)Called with a batch of items retrieved from an inbound edge's stream. | 
| default void | restoreFromSnapshot(Inbox inbox)Called when a batch of items is received during the "restore from
 snapshot" operation. | 
| default boolean | saveToSnapshot()Stores the processor's state to a state snapshot by adding items to the
 outbox's snapshot
 bucket. | 
| default boolean | snapshotCommitFinish(boolean success)This is the second phase of a two-phase commit. | 
| default boolean | snapshotCommitPrepare()Prepares the transactions for commit after the snapshot is completed. | 
| default boolean | tryProcess()This method will be called periodically and only when the current batch
 of items in the inbox has been exhausted. | 
| default boolean | tryProcessWatermark(int ordinal,
                   Watermark watermark)Tries to process the supplied watermark. | 
| boolean | tryProcessWatermark(Watermark watermark)Tries to process the supplied watermark. | 
default boolean isCooperative()
There are specific requirements that all processing methods of a cooperative processor must follow:
offer() method returns false).
 Non-cooperative processors are allowed to block, but still must return at least once per second (that is, they should not block indeterminately). If they block longer, snapshots will take longer to complete and job will respond more slowly to termination: Jet doesn't interrupt the dedicated threads if it wants them to cancel, it waits for them to return.
Jet prefers cooperative processors because they result in a greater overall throughput. A processor should be non-cooperative only if it involves blocking operations, which would cause all other processors on the same shared thread to starve.
Processor instances of a single vertex are allowed to return different values, but a single processor instance must always return the same value.
 The default implementation returns true.
default void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception
isCooperative() method).
 Even if this processor is cooperative, this method is allowed to do blocking operations.
The default implementation does nothing.
context - useful environment informationExceptiondefault void process(int ordinal,
                     @Nonnull
                     Inbox inbox)
 If the method returns with items still present in the inbox, it will be
 called again before proceeding to call any other method (except for
 snapshotCommitFinish(boolean)), with the same items. In other words, no
 more items are added to the inbox if the previous call didn't return an
 empty inbox.
 
There is at least one item in the inbox when this method is called.
The default implementation throws an exception, it is suitable for source processors.
ordinal - ordinal of the inbound edgeinbox - the inbox containing the pending itemsboolean tryProcessWatermark(@Nonnull Watermark watermark)
 The implementation may choose to process only partially and return false, in which case it will be called again later with the same
 watermark before any other processing method is called. Before
 the method returns true, it should emit the watermark to
 the downstream processors, though in general the processor can process
 the watermark in any way: drop it, delay it or move it ahead, change the
 key, or even emit a completely different watermark, as long as the output
 watermarks are monotonic. Any processing method can emit watermarks. Sink
 processors in general should ignore the watermark and simply return
 true.
 
tryProcessWatermarks()
 variants
 Also, please, pay attention to the default implementation in this class,
 and in AbstractProcessor, which handle the case of merging
 streams.
 
watermark - watermark to be processedtrue if this watermark has now been processed,
         false to call this method again with the same watermarkdefault boolean tryProcessWatermark(int ordinal,
                                    @Nonnull
                                    Watermark watermark)
ordinal.
 
 The implementation may choose to process only partially and return false, in which case it will be called again later with the same
 watermark before any other processing method is called. Before
 the method returns true, it should emit the watermark to
 the downstream processors, though in general the processor can process
 the watermark in any way: drop it, delay it or move it ahead, change the
 key, or even emit a completely different watermark, as long as the output
 watermarks are monotonic. Any processing method can emit watermarks. Sink
 processors in general should ignore the watermark and simply return
 true.
 
tryProcessWatermarks()
 variants
 Also, please, pay attention to the default implementations in this class,
 and in AbstractProcessor, which handle the case of merging
 streams.
 
ordinal - the ordinal on which this watermark occurredwatermark - watermark to be processedtrue if this watermark has now been processed,
         false to call this method again with the same watermarkdefault boolean tryProcess()
finishSnapshotRestore().
 
 If the call returns false, it will be called again before
 proceeding to call any other processing method. Default
 implementation returns true.
default boolean completeEdge(int ordinal)
ordinal is
 exhausted. If it returns false, it will be called again before
 proceeding to call any other method.
 If this method tried to offer to the outbox and the offer call returned false, this method must also return false and retry the offer in the next call.
true if the processor is now done completing the edge,
         false to call this method againdefault boolean complete()
false, it will be invoked again until it returns true.
 For example, a streaming source processor will return false
 forever. Unlike other methods which guarantee that no other method is
 called until they return true, saveToSnapshot() can be
 called even though this method returned false.
 
 After this method is called, no other processing methods are called on
 this processor, except for snapshotCommitFinish(boolean).
 
Non-cooperative processors are required to return from this method from time to time to give the system a chance to check for snapshot requests and job cancellation. The time the processor spends in this method affects the latency of snapshots and job cancellations.
true if the completing step is now done, false
         to call this method againdefault boolean saveToSnapshot()
false, it will be called again
 before proceeding to call any other method.
 
 This method will only be called after a call to process() returns with an empty inbox. After all the input is
 exhausted, it is also called between complete() calls. Once
 complete() returns true, this method won't be called
 anymore.
 
 The default implementation does nothing and returns true.
true if this step is done, false to call this
      method againdefault boolean snapshotCommitPrepare()
true
 or rely on the no-op default implementation. This is the first phase of
 a two-phase commit.
 
 This method is called right after saveToSnapshot(). After
 this method returns true, Jet will return to call the processing
 methods again. Some time later, snapshotCommitFinish(boolean) will be
 called.
 
When this processor communicates with an external transactional store, it should do the following:
snapshotCommitFinish(boolean) with commitTransactions == true is called
     commitTransactions == false
     onSnapshotCompleted()
     is called. If the implementation doesn't start a new active
     transaction, it can opt to not process more input or emit any output
 
 This method is skipped if the snapshot was initiated using Job.exportSnapshot(java.lang.String). If this method is skipped, snapshotCommitFinish(boolean) will be skipped too.
true if this step is done, false to call this
      method againdefault boolean snapshotCommitFinish(boolean success)
 This method can be called even when the process() method didn't process the items in the inbox. For this reason
 this method must not add any items to the outbox. It is also called
 between complete() calls. Once complete() returns
 true, this method can still be called to finish the snapshot
 that was started before this processor completed.
 
The processor should do the following:
success == true, it should commit the prepared
     transactions. It must not continue to use the just-committed
     transaction ID - we stored it in the latest snapshot and after
     restart we commit the transactions with IDs found in the snapshot -
     we would commit the items written after the snapshot.
     success == false, it should do nothing to the
     prepared transactions. If it didn't create a new active transaction
     in saveToSnapshot(), it can continue to use the last active
     transaction as active.
 
 The method is called repeatedly until it eventually returns true. No other method on this processor will be called before it
 returns true.
 
 The two-phase commit protocol requires that the second phase must
 eventually succeed. If you're not able to commit your transactions now,
 you should either return false and try again later, or you can
 throw a RestartableException to cause a job restart; the
 processor is required to commit the transactions with IDs stored in the
 state snapshot after the restart in restoreFromSnapshot(com.hazelcast.jet.core.Inbox). This
 is necessary to ensure exactly-once processing of transactional
 processors.
 
 The default implementation takes no action and returns true.
success - true, if the first snapshot phase completed successfullytrue if this step is done, false to call this
      method againdefault void restoreFromSnapshot(@Nonnull Inbox inbox)
Map.Entry, key and value types are exactly as they were saved in saveToSnapshot(). This method may emit items to the outbox.
 
 If this method returns with items still present in the inbox, it will
 be called again before proceeding to call any other methods. No more
 items are added to the inbox if the method didn't return with an empty
 inbox. It is never called with an empty inbox. After all items are
 processed, finishSnapshotRestore() is called.
 
 If a transaction ID saved in snapshotCommitPrepare() is
 restored, this method should commit that transaction. If the processor
 is unable to commit those transactions, data loss or duplication might
 occur. The processor must be ready to restore a transaction ID that no
 longer exists in the remote system: either because the transaction was
 already committed (this is the most common case) or because the
 transaction timed out in the remote system. Also, the job ID, if it's
 part of the transaction ID, can be different from the current job ID, if
 the job was started from an
 exported state. These cases should be handled gracefully.
 
 The default implementation throws an exception - if you emit
 something in saveToSnapshot(), you must be able to handle it
 here. If you don't override saveToSnapshot(), throwing an
 exception here will never happen.
default boolean finishSnapshotRestore()
restoreFromSnapshot(com.hazelcast.jet.core.Inbox).
 
 If this method returns false, it will be called again before
 proceeding to call any other methods.
 
If this method tried to offer to the outbox and the offer call returned false, this method must also return false and retry the offer in the next call.
 The default implementation takes no action and returns true.
true if this step is done, false to call this
      method againdefault void close()
            throws Exception
ProcessorSupplier.close(java.lang.Throwable) is called on this member. The method might get
 called even if init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context) method was not yet called.
 
 The method will be called right after complete() returns true, that is before the job is finished. The job might still be
 running other processors.
 
 See closeIsCooperative() regarding the cooperative behavior of
 this method.
 
If this method throws an exception, it is logged, but it won't be reported as a job failure or cause the job to fail.
The default implementation does nothing.
Exceptiondefault boolean closeIsCooperative()
true if the close() method of this processor is
 cooperative. If it's not, the call to the close() method is
 off-loaded to another thread.
 This flag is ignored for non-cooperative processors.
 By default, close() is assumed to be non-cooperative to guarantee
 correct-by-default behavior for custom processors, even though default
 implementation of close() is empty, so it's cooperative.
 Implementors are however encouraged to override this method if the
 default, empty close() is used in a cooperative processor to
 avoid offloading an empty invocation to another thread.
Copyright © 2023 Hazelcast, Inc.. All rights reserved.