Interface Processor

All Known Implementing Classes:
AbstractProcessor, LongStreamSourceP, ParallelBatchP, ParallelStreamP

public interface Processor
When Jet executes a DAG, it creates one or more instances of Processor on each cluster member to do the work of a given vertex. The vertex's localParallelism property controls the number of processors per member.

The processor is a single-threaded processing unit that performs the computation needed to transform zero or more input data streams into zero or more output streams. Each input/output stream corresponds to an edge on the vertex. The correspondence between a stream and an edge is established via the edge's ordinal.

The special case of zero input streams applies to a source vertex, which gets its data from the environment. The special case of zero output streams applies to a sink vertex, which pushes its data to the environment.

The processor accepts input from instances of Inbox and pushes its output to an instance of Outbox.

See the isCooperative() for important restrictions to how the processor should work.

Processing methods

When the documentation in this class refers to processing methods, we mean all methods except for these:

Transactional processors

If this processor communicates with an external transactional store, after the snapshot is restored and before it executes any code in a processing method, it should roll back all transactions that this processor created. It should only roll back transactions created by this vertex and this job; it can use the vertex name and job ID passed to the init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context) method in the context to filter.

Determining the list of transactions to rollback
You can't store the IDs of the created transactions to the snapshot, as one might intuitively think. The job might run for a while after creating a snapshot and start a new transaction, and we need to roll that one too. The job might even fail before it creates the first snapshot.

There are multiple ways to tackle this:

  • enumerate all pending transactions in the external system and rollback those that were created by this processor. For example, a file sink can list files in the directory it is writing to
  • if the remote system doesn't allow us to enumerate transactions, we can use deterministic scheme for transaction ID and probe all IDs that could be used by this processor. For example: jobId + vertexId + globalProcessorIndex + sequence

How the methods are called

Except for init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context), close(), isCooperative() and closeIsCooperative(), the methods are called in a tight loop with a possibly short back-off if the method does no work. "No work" is defined as adding nothing to outbox and taking nothing from inbox. If you do heavy work on each call (such as querying a remote service), you can do additional back-off: use sleep in a non-cooperative processor or do nothing if sufficient time didn't elapse.

Since:
Jet 3.0
  • Nested Class Summary

    Nested Classes
    Modifier and Type
    Interface
    Description
    static interface 
    Context passed to the processor in the init() call.
  • Method Summary

    Modifier and Type
    Method
    Description
    default void
    Called as the last method in the processor lifecycle.
    default boolean
    Returns true if the close() method of this processor is cooperative.
    default boolean
    Called after all the inbound edges' streams are exhausted.
    default boolean
    completeEdge(int ordinal)
    Called after the edge input with the supplied ordinal is exhausted.
    default boolean
    Called after a job was restarted from a snapshot and the processor has consumed all the snapshot data in restoreFromSnapshot(com.hazelcast.jet.core.Inbox).
    default void
    init(Outbox outbox, Processor.Context context)
    Initializes this processor with the outbox that the processing methods must use to deposit their output items.
    default boolean
    Tells whether this processor is able to participate in cooperative multithreading.
    default void
    process(int ordinal, Inbox inbox)
    Called with a batch of items retrieved from an inbound edge's stream.
    default void
    Called when a batch of items is received during the "restore from snapshot" operation.
    default boolean
    Stores the processor's state to a state snapshot by adding items to the outbox's snapshot bucket.
    default boolean
    snapshotCommitFinish(boolean success)
    This is the second phase of a two-phase commit.
    default boolean
    Prepares the transactions for commit after the snapshot is completed.
    default boolean
    This method will be called periodically and only when the current batch of items in the inbox has been exhausted.
    default boolean
    tryProcessWatermark(int ordinal, Watermark watermark)
    Tries to process the supplied watermark.
    boolean
    Tries to process the supplied watermark.
  • Method Details

    • isCooperative

      default boolean isCooperative()
      Tells whether this processor is able to participate in cooperative multithreading. If this processor declares itself cooperative, it will share a thread with other cooperative processors. Otherwise, it will run in a dedicated Java thread.

      There are specific requirements that all processing methods of a cooperative processor must follow:

      • each call must take a reasonably small amount of time (up to a millisecond). Violations will manifest as increased latency due to slower switching of processors.
      • should also not attempt any blocking operations, such as I/O operations, waiting for locks/semaphores or sleep operations. Violations of this rule will manifest as less than 100% CPU usage under maximum load (note that this is possible for other reasons too, for example if the network is the bottleneck or if parking time is too high). The processor must also return as soon as the outbox rejects an item (that is when the offer() method returns false).

      Non-cooperative processors are allowed to block, but still must return at least once per second (that is, they should not block indeterminately). If they block longer, snapshots will take longer to complete and job will respond more slowly to termination: Jet doesn't interrupt the dedicated threads if it wants them to cancel, it waits for them to return.

      Jet prefers cooperative processors because they result in a greater overall throughput. A processor should be non-cooperative only if it involves blocking operations, which would cause all other processors on the same shared thread to starve.

      Processor instances of a single vertex are allowed to return different values, but a single processor instance must always return the same value.

      The default implementation returns true.

    • init

      default void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception
      Initializes this processor with the outbox that the processing methods must use to deposit their output items. This method will be called exactly once and strictly before any calls to other methods (except for the isCooperative() method).

      Even if this processor is cooperative, this method is allowed to do blocking operations.

      The default implementation does nothing.

      Parameters:
      context - useful environment information
      Throws:
      Exception
    • process

      default void process(int ordinal, @Nonnull Inbox inbox)
      Called with a batch of items retrieved from an inbound edge's stream. The items are in the inbox and this method may process zero or more of them, removing each item after it is processed. Does not remove an item until it is done with it.

      If the method returns with items still present in the inbox, it will be called again before proceeding to call any other method (except for snapshotCommitFinish(boolean)), with the same items. In other words, no more items are added to the inbox if the previous call didn't return an empty inbox.

      There is at least one item in the inbox when this method is called.

      The default implementation throws an exception, it is suitable for source processors.

      Parameters:
      ordinal - ordinal of the inbound edge
      inbox - the inbox containing the pending items
    • tryProcessWatermark

      boolean tryProcessWatermark(@Nonnull Watermark watermark)
      Tries to process the supplied watermark. The value is always greater than in a previous call with watermark with the same key. The watermark is delivered for processing after it has been received from all the input edges.

      The implementation may choose to process only partially and return false, in which case it will be called again later with the same watermark before any other processing method is called. Before the method returns true, it should emit the watermark to the downstream processors, though in general the processor can process the watermark in any way: drop it, delay it or move it ahead, change the key, or even emit a completely different watermark, as long as the output watermarks are monotonic. Any processing method can emit watermarks. Sink processors in general should ignore the watermark and simply return true.

      Difference between the overloaded tryProcessWatermarks() variants

      The method is available in two overloaded variants: with and without an edge ordinal:
      • The variant with ordinal is called after the watermark was received from all upstream processors contributing to that input ordinal.
      • The variant without an ordinal is called after the watermark was received from all input ordinals.
      Which method to override depends on the purpose of the processor. For example, a join processor can receive different watermark from each input, so it needs to override the variant with the ordinal. A merging processor, on the other hand, expects the same watermarks from all inputs, so it overrides the variant without an ordinal. Each watermark is passed to both methods, so in most cases you need to override at most one method. However, if a watermark with some key is not received from all input edges, the variant without the ordinal is never called for that watermark key.

      Also, please, pay attention to the default implementation in this class, and in AbstractProcessor, which handle the case of merging streams.

      Caution for Jobs With the At-Least-Once Guarantee

      Jet propagates the value of the watermark by sending watermark items interleaved with the regular stream items. If a job configured with the at-least-once processing guarantee gets restarted, the same watermark, like any other stream item, can be delivered again. Therefore, the processor may be asked to process a watermark older than the one it had already processed before the restart.
      Parameters:
      watermark - watermark to be processed
      Returns:
      true if this watermark has now been processed, false to call this method again with the same watermark
    • tryProcessWatermark

      default boolean tryProcessWatermark(int ordinal, @Nonnull Watermark watermark)
      Tries to process the supplied watermark. The value is always greater than in a previous call with watermark with the same key. The watermark is delivered for processing after it has been received from all upstream processors connected to the edge with the given ordinal.

      The implementation may choose to process only partially and return false, in which case it will be called again later with the same watermark before any other processing method is called. Before the method returns true, it should emit the watermark to the downstream processors, though in general the processor can process the watermark in any way: drop it, delay it or move it ahead, change the key, or even emit a completely different watermark, as long as the output watermarks are monotonic. Any processing method can emit watermarks. Sink processors in general should ignore the watermark and simply return true.

      Difference between the overloaded tryProcessWatermarks() variants

      The method is available in two overloaded variants: with and without an edge ordinal:
      • The variant with ordinal is called after the watermark was received from all upstream processors contributing to that input ordinal.
      • The variant without an ordinal is called after the watermark was received from all input ordinals.
      Which method to override depends on the purpose of the processor. For example, a join processor can receive different watermark from each input, so it needs to override the variant with the ordinal. A merging processor, on the other hand, expects the same watermarks from all inputs, so it overrides the variant without an ordinal. Each watermark is passed to both methods, so in most cases you need to override at most one method. However, if a watermark with some key is not received from all input edges, the variant without the ordinal is never called for that watermark key.

      Also, please, pay attention to the default implementations in this class, and in AbstractProcessor, which handle the case of merging streams.

      Caution for Jobs With the At-Least-Once Guarantee

      Jet propagates the value of the watermark by sending watermark items interleaved with the regular stream items. If a job configured with the at-least-once processing guarantee gets restarted, the same watermark, like any other stream item, can be delivered again. Therefore, the processor may be asked to process a watermark older than the one it had already processed before the restart.
      Parameters:
      ordinal - the ordinal on which this watermark occurred
      watermark - watermark to be processed
      Returns:
      true if this watermark has now been processed, false to call this method again with the same watermark
      Since:
      5.2
    • tryProcess

      default boolean tryProcess()
      This method will be called periodically and only when the current batch of items in the inbox has been exhausted. It can be used to produce output in the absence of input or to do general maintenance work. If the job restores state from a snapshot, this method is called for the first time after finishSnapshotRestore().

      If the call returns false, it will be called again before proceeding to call any other processing method. Default implementation returns true.

    • completeEdge

      default boolean completeEdge(int ordinal)
      Called after the edge input with the supplied ordinal is exhausted. If it returns false, it will be called again before proceeding to call any other method.

      If this method tried to offer to the outbox and the offer call returned false, this method must also return false and retry the offer in the next call.

      Returns:
      true if the processor is now done completing the edge, false to call this method again
    • complete

      default boolean complete()
      Called after all the inbound edges' streams are exhausted. If it returns false, it will be invoked again until it returns true. For example, a streaming source processor will return false forever. Unlike other methods which guarantee that no other method is called until they return true, saveToSnapshot() can be called even though this method returned false.

      After this method is called, no other processing methods are called on this processor, except for snapshotCommitFinish(boolean).

      Non-cooperative processors are required to return from this method from time to time to give the system a chance to check for snapshot requests and job cancellation. The time the processor spends in this method affects the latency of snapshots and job cancellations.

      Returns:
      true if the completing step is now done, false to call this method again
    • saveToSnapshot

      default boolean saveToSnapshot()
      Stores the processor's state to a state snapshot by adding items to the outbox's snapshot bucket. If this method returns false, it will be called again before proceeding to call any other method.

      This method will only be called after a call to process() returns with an empty inbox. After all the input is exhausted, it is also called between complete() calls. Once complete() returns true, this method won't be called anymore.

      The default implementation does nothing and returns true.

      Returns:
      true if this step is done, false to call this method again
    • snapshotCommitPrepare

      default boolean snapshotCommitPrepare()
      Prepares the transactions for commit after the snapshot is completed. If the processor doesn't use transactions, it can just return true or rely on the no-op default implementation. This is the first phase of a two-phase commit.

      This method is called right after saveToSnapshot(). After this method returns true, Jet will return to call the processing methods again. Some time later, snapshotCommitFinish(boolean) will be called.

      When this processor communicates with an external transactional store, it should do the following:

      • mark the current active transaction with the external system as prepared and stop using it. The prepared transaction will be committed when snapshotCommitFinish(boolean) with commitTransactions == true is called
      • store IDs of the pending transaction(s) to the snapshot. Note that there can be multiple prepared transactions if the previous snapshot completed with commitTransactions == false
      • optionally, start a new active transaction that will be used to handle input or produce output until onSnapshotCompleted() is called. If the implementation doesn't start a new active transaction, it can opt to not process more input or emit any output

      This method is skipped if the snapshot was initiated using Job.exportSnapshot(java.lang.String). If this method is skipped, snapshotCommitFinish(boolean) will be skipped too.

      Returns:
      true if this step is done, false to call this method again
      Since:
      Jet 4.0
    • snapshotCommitFinish

      default boolean snapshotCommitFinish(boolean success)
      This is the second phase of a two-phase commit. Jet calls it after the snapshot was successfully saved on all other processors in the job on all cluster members.

      This method can be called even when the process() method didn't process the items in the inbox. For this reason this method must not add any items to the outbox. It is also called between complete() calls. Once complete() returns true, this method can still be called to finish the snapshot that was started before this processor completed.

      The processor should do the following:

      • if success == true, it should commit the prepared transactions. It must not continue to use the just-committed transaction ID - we stored it in the latest snapshot and after restart we commit the transactions with IDs found in the snapshot - we would commit the items written after the snapshot.
      • if success == false, it should do nothing to the prepared transactions. If it didn't create a new active transaction in saveToSnapshot(), it can continue to use the last active transaction as active.

      The method is called repeatedly until it eventually returns true. No other method on this processor will be called before it returns true.

      Error handling

      The two-phase commit protocol requires that the second phase must eventually succeed. If you're not able to commit your transactions now, you should either return false and try again later, or you can throw a RestartableException to cause a job restart; the processor is required to commit the transactions with IDs stored in the state snapshot after the restart in restoreFromSnapshot(com.hazelcast.jet.core.Inbox). This is necessary to ensure exactly-once processing of transactional processors.

      The default implementation takes no action and returns true.

      Parameters:
      success - true, if the first snapshot phase completed successfully
      Returns:
      true if this step is done, false to call this method again
      Since:
      Jet 4.0
    • restoreFromSnapshot

      default void restoreFromSnapshot(@Nonnull Inbox inbox)
      Called when a batch of items is received during the "restore from snapshot" operation. The type of items in the inbox is Map.Entry, key and value types are exactly as they were saved in saveToSnapshot(). This method may emit items to the outbox.

      If this method returns with items still present in the inbox, it will be called again before proceeding to call any other methods. No more items are added to the inbox if the method didn't return with an empty inbox. It is never called with an empty inbox. After all items are processed, finishSnapshotRestore() is called.

      If a transaction ID saved in snapshotCommitPrepare() is restored, this method should commit that transaction. If the processor is unable to commit those transactions, data loss or duplication might occur. The processor must be ready to restore a transaction ID that no longer exists in the remote system: either because the transaction was already committed (this is the most common case) or because the transaction timed out in the remote system. Also, the job ID, if it's part of the transaction ID, can be different from the current job ID, if the job was started from an exported state. These cases should be handled gracefully.

      The default implementation throws an exception - if you emit something in saveToSnapshot(), you must be able to handle it here. If you don't override saveToSnapshot(), throwing an exception here will never happen.

    • finishSnapshotRestore

      default boolean finishSnapshotRestore()
      Called after a job was restarted from a snapshot and the processor has consumed all the snapshot data in restoreFromSnapshot(com.hazelcast.jet.core.Inbox).

      If this method returns false, it will be called again before proceeding to call any other methods.

      If this method tried to offer to the outbox and the offer call returned false, this method must also return false and retry the offer in the next call.

      The default implementation takes no action and returns true.

      Returns:
      true if this step is done, false to call this method again
    • close

      default void close() throws Exception
      Called as the last method in the processor lifecycle. It is called whether the job was successful or not, and strictly before ProcessorSupplier.close(java.lang.Throwable) is called on this member. The method might get called even if init(com.hazelcast.jet.core.Outbox, com.hazelcast.jet.core.Processor.Context) method was not yet called.

      The method will be called right after complete() returns true, that is before the job is finished. The job might still be running other processors.

      See closeIsCooperative() regarding the cooperative behavior of this method.

      If this method throws an exception, it is logged, but it won't be reported as a job failure or cause the job to fail.

      The default implementation does nothing.

      Throws:
      Exception
    • closeIsCooperative

      default boolean closeIsCooperative()
      Returns true if the close() method of this processor is cooperative. If it's not, the call to the close() method is off-loaded to another thread.

      This flag is ignored for non-cooperative processors.

      By default, close() is assumed to be non-cooperative to guarantee correct-by-default behavior for custom processors, even though default implementation of close() is empty, so it's cooperative. Implementors are however encouraged to override this method if the default, empty close() is used in a cooperative processor to avoid offloading an empty invocation to another thread.