Class AbstractProcessor

  • All Implemented Interfaces:
    Processor
    Direct Known Subclasses:
    LongStreamSourceP, ParallelBatchP, ParallelStreamP

    public abstract class AbstractProcessor
    extends java.lang.Object
    implements Processor
    Base class to implement custom processors. Simplifies the contract of Processor with several levels of convenience:
    1. Processor.init(Outbox, Context) retains the supplied outbox and the logger retrieved from the context.
    2. process(n, inbox) delegates to the matching tryProcessN() with each item received in the inbox.
    3. There is also the general tryProcess(int, Object) to which the tryProcessN methods delegate by default. It is convenient to override it when the processor doesn't care which edge an item originates from. Another convenient idiom is to override tryProcessN() for one or two specially treated edges and override tryProcess(int, Object) to process the rest of the edges, which are treated uniformly.
    4. The tryEmit(...) methods avoid the need to deal with Outbox directly.
    5. The emitFromTraverser(...) methods handle the boilerplate of emission from a traverser. They are especially useful in the Processor.complete() step when there is a collection of items to emit. The Traversers class contains traversers tailored to simplify the implementation of complete().
    6. The FlatMapper class additionally simplifies the usage of emitFromTraverser() inside tryProcess(), in a scenario where an input item results in a collection of output items. FlatMapper is obtained from one of the factory methods flatMapper(...).
    Since:
    Jet 3.0
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected <E> boolean emitFromTraverser​(int[] ordinals, Traverser<E> traverser)
      Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array.
      protected <E> boolean emitFromTraverser​(int ordinal, Traverser<E> traverser)
      Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array.
      protected boolean emitFromTraverser​(Traverser<?> traverser)
      Convenience for emitFromTraverser(int, Traverser) which emits to all ordinals.
      protected <T extends java.util.Map.Entry<?,​?>>
      boolean
      emitFromTraverserToSnapshot​(Traverser<T> traverser)
      Obtains items from the traverser and offers them to the snapshot bucket of the outbox.
      protected <T,​R>
      AbstractProcessor.FlatMapper<T,​R>
      flatMapper​(int[] ordinals, java.util.function.Function<? super T,​? extends Traverser<? extends R>> mapper)
      protected <T,​R>
      AbstractProcessor.FlatMapper<T,​R>
      flatMapper​(int ordinal, java.util.function.Function<? super T,​? extends Traverser<? extends R>> mapper)
      protected <T,​R>
      AbstractProcessor.FlatMapper<T,​R>
      flatMapper​(java.util.function.Function<? super T,​? extends Traverser<? extends R>> mapper)
      protected ILogger getLogger()
      Returns the logger associated with this processor instance.
      protected Outbox getOutbox()  
      void init​(Outbox outbox, Processor.Context context)
      Initializes this processor with the outbox that the processing methods must use to deposit their output items.
      protected void init​(Processor.Context context)
      Method that can be overridden to perform any necessary initialization for the processor.
      protected void keyedWatermarkCheck​(Watermark watermark)
      Throws UnsupportedOperationException if watermark has non-zero key.
      void process​(int ordinal, Inbox inbox)
      Implements the boilerplate of dispatching against the ordinal, taking items from the inbox one by one, and invoking the processing logic on each.
      void restoreFromSnapshot​(Inbox inbox)
      Implements the boilerplate of polling the inbox, casting the items to Map.Entry, and extracting the key and value.
      protected void restoreFromSnapshot​(java.lang.Object key, java.lang.Object value)
      Called to restore one key-value pair from the snapshot to processor's internal state.
      protected boolean tryEmit​(int[] ordinals, java.lang.Object item)
      Offers the item to the outbox buckets identified in the supplied array.
      protected boolean tryEmit​(int ordinal, java.lang.Object item)
      Offers the item to the outbox bucket at the supplied ordinal.
      protected boolean tryEmit​(java.lang.Object item)
      Offers the item to all the outbox buckets (except the snapshot outbox).
      protected boolean tryEmitToSnapshot​(java.lang.Object key, java.lang.Object value)
      Offers one key-value pair to the snapshot bucket.
      protected boolean tryProcess​(int ordinal, java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with the supplied ordinal.
      protected boolean tryProcess0​(java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with ordinal 0.
      protected boolean tryProcess1​(java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with ordinal 1.
      protected boolean tryProcess2​(java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with ordinal 2.
      protected boolean tryProcess3​(java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with ordinal 3.
      protected boolean tryProcess4​(java.lang.Object item)
      Tries to process the supplied input item, which was received from the edge with ordinal 4.
      boolean tryProcessWatermark​(Watermark watermark)
      This basic implementation only forwards the passed watermark.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • AbstractProcessor

        public AbstractProcessor()
    • Method Detail

      • init

        public final void init​(@Nonnull
                               Outbox outbox,
                               @Nonnull
                               Processor.Context context)
                        throws java.lang.Exception
        Description copied from interface: Processor
        Initializes this processor with the outbox that the processing methods must use to deposit their output items. This method will be called exactly once and strictly before any calls to other methods (except for the Processor.isCooperative() method).

        Even if this processor is cooperative, this method is allowed to do blocking operations.

        The default implementation does nothing.

        Specified by:
        init in interface Processor
        context - useful environment information
        Throws:
        java.lang.Exception
      • process

        public void process​(int ordinal,
                            @Nonnull
                            Inbox inbox)
        Implements the boilerplate of dispatching against the ordinal, taking items from the inbox one by one, and invoking the processing logic on each.
        Specified by:
        process in interface Processor
        Parameters:
        ordinal - ordinal of the inbound edge
        inbox - the inbox containing the pending items
      • init

        protected void init​(@Nonnull
                            Processor.Context context)
                     throws java.lang.Exception
        Method that can be overridden to perform any necessary initialization for the processor. It is called exactly once and strictly before any of the processing methods (process() and Processor.complete()), but after the outbox and logger have been initialized.

        Subclasses are not required to call this superclass method, it does nothing.

        Parameters:
        context - the context associated with this processor
        Throws:
        java.lang.Exception
      • tryProcess

        protected boolean tryProcess​(int ordinal,
                                     @Nonnull
                                     java.lang.Object item)
                              throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with the supplied ordinal. May choose to process only partially and return false, in which case it will be called again later with the same (ordinal, item) combination before any other processing method is called.

        The default implementation throws an UnsupportedOperationException.

        NOTE: unless the processor doesn't differentiate between its inbound edges, the first choice should be leaving this method alone and instead overriding the specific tryProcessN() methods for each ordinal the processor expects.

        Parameters:
        ordinal - ordinal of the edge that delivered the item
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • tryProcess0

        protected boolean tryProcess0​(@Nonnull
                                      java.lang.Object item)
                               throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with ordinal 0. May choose to process only partially and return false, in which case it will be called again later with the same item before any other processing method is called.

        The default implementation delegates to tryProcess(0, item).

        Parameters:
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • tryProcess1

        protected boolean tryProcess1​(@Nonnull
                                      java.lang.Object item)
                               throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with ordinal 1. May choose to process only partially and return false, in which case it will be called again later with the same item before any other processing method is called.

        The default implementation delegates to tryProcess(1, item).

        Parameters:
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • tryProcess2

        protected boolean tryProcess2​(@Nonnull
                                      java.lang.Object item)
                               throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with ordinal 2. May choose to process only partially and return false, in which case it will be called again later with the same item before any other processing method is called.

        The default implementation delegates to tryProcess(2, item).

        Parameters:
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • tryProcess3

        protected boolean tryProcess3​(@Nonnull
                                      java.lang.Object item)
                               throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with ordinal 3. May choose to process only partially and return false, in which case it will be called again later with the same item before any other processing method is called.

        The default implementation delegates to tryProcess(3, item).

        Parameters:
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • tryProcess4

        protected boolean tryProcess4​(@Nonnull
                                      java.lang.Object item)
                               throws java.lang.Exception
        Tries to process the supplied input item, which was received from the edge with ordinal 4. May choose to process only partially and return false, in which case it will be called again later with the same item before any other processing method is called.

        The default implementation delegates to tryProcess(4, item).

        Parameters:
        item - item to be processed
        Returns:
        true if this item has now been processed, false otherwise.
        Throws:
        java.lang.Exception
      • restoreFromSnapshot

        protected void restoreFromSnapshot​(@Nonnull
                                           java.lang.Object key,
                                           @Nonnull
                                           java.lang.Object value)
        Called to restore one key-value pair from the snapshot to processor's internal state.

        The default implementation throws an UnsupportedOperationException, but it will not be called unless you override Processor.saveToSnapshot().

        Parameters:
        key - key of the entry from the snapshot
        value - value of the entry from the snapshot
      • tryProcessWatermark

        public boolean tryProcessWatermark​(@Nonnull
                                           Watermark watermark)
        This basic implementation only forwards the passed watermark.
        Specified by:
        tryProcessWatermark in interface Processor
        Parameters:
        watermark - watermark to be processed
        Returns:
        true if this watermark has now been processed, false to call this method again with the same watermark
      • getLogger

        protected final ILogger getLogger()
        Returns the logger associated with this processor instance.
      • getOutbox

        protected final Outbox getOutbox()
      • tryEmit

        @CheckReturnValue
        protected final boolean tryEmit​(int ordinal,
                                        @Nonnull
                                        java.lang.Object item)
        Offers the item to the outbox bucket at the supplied ordinal.

        Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.

        Returns:
        true, if the item was accepted. If false is returned, the call must be retried later with the same (or equal) item.
      • tryEmit

        @CheckReturnValue
        protected final boolean tryEmit​(@Nonnull
                                        java.lang.Object item)
        Offers the item to all the outbox buckets (except the snapshot outbox).

        Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.

        Returns:
        true, if the item was accepted. If false is returned, the call must be retried later with the same (or equal) item.
      • tryEmit

        @CheckReturnValue
        protected final boolean tryEmit​(@Nonnull
                                        int[] ordinals,
                                        @Nonnull
                                        java.lang.Object item)
        Offers the item to the outbox buckets identified in the supplied array.

        Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.

        Returns:
        true, if the item was accepted. If false is returned, the call must be retried later with the same (or equal) item.
      • emitFromTraverser

        protected final <E> boolean emitFromTraverser​(@Nonnull
                                                      int[] ordinals,
                                                      @Nonnull
                                                      Traverser<E> traverser)
        Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array. If the outbox refuses an item, it backs off and returns false.

        Emitted items should not be subsequently mutated because the same instance might be used by a downstream processor in a different thread, causing concurrent access.

        If this method returns false, then the caller must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where it left off.

        For simplified usage from tryProcess(ordinal, item) methods, see AbstractProcessor.FlatMapper.

        Parameters:
        ordinals - ordinals of the target bucket
        traverser - traverser over items to emit
        Returns:
        whether the traverser has been exhausted
      • emitFromTraverser

        protected final <E> boolean emitFromTraverser​(int ordinal,
                                                      @Nonnull
                                                      Traverser<E> traverser)
        Obtains items from the traverser and offers them to the outbox's buckets identified in the supplied array. If the outbox refuses an item, it backs off and returns false.

        Do not mutate the items you emit because the downstream processor may be using them in a different thread, resulting in concurrent access.

        If this method returns false, then you must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where you left off.

        For simplified usage in tryProcess(ordinal, item) methods, see AbstractProcessor.FlatMapper.

        Parameters:
        ordinal - ordinal of the target bucket
        traverser - traverser over items to emit
        Returns:
        whether the traverser has been exhausted
      • tryEmitToSnapshot

        @CheckReturnValue
        protected final boolean tryEmitToSnapshot​(@Nonnull
                                                  java.lang.Object key,
                                                  @Nonnull
                                                  java.lang.Object value)
        Offers one key-value pair to the snapshot bucket.

        The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type BroadcastKey, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.

        Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.

        Returns:
        true, if the item was accepted. If false is returned, the call must be retried later with the same (or equal) key and value.
      • emitFromTraverserToSnapshot

        protected final <T extends java.util.Map.Entry<?,​?>> boolean emitFromTraverserToSnapshot​(@Nonnull
                                                                                                       Traverser<T> traverser)
        Obtains items from the traverser and offers them to the snapshot bucket of the outbox. Each item is a Map.Entry and its key and value are passed as the two arguments of tryEmitToSnapshot(Object, Object). If the outbox refuses an item, it backs off and returns false.

        Keys and values offered to snapshot are serialized and can be further mutated as soon as this method returns.

        If this method returns false, then the caller must retain the traverser and pass it again in the subsequent invocation of this method, so as to resume emitting where it left off.

        The type of the offered key determines which processors receive the key and value pair when it is restored. If the key is of type BroadcastKey, the entry will be restored to all processor instances. Otherwise, the key will be distributed according to default partitioning and only a single processor instance will receive the key.

        Parameters:
        traverser - traverser over the items to emit to the snapshot
        Returns:
        whether the traverser has been exhausted
      • flatMapper

        @Nonnull
        protected final <T,​R> AbstractProcessor.FlatMapper<T,​R> flatMapper​(@Nonnull
                                                                                       int[] ordinals,
                                                                                       @Nonnull
                                                                                       java.util.function.Function<? super T,​? extends Traverser<? extends R>> mapper)
        Factory of AbstractProcessor.FlatMapper. The FlatMapper will emit items to the ordinals identified in the array.
      • keyedWatermarkCheck

        protected void keyedWatermarkCheck​(Watermark watermark)
        Throws UnsupportedOperationException if watermark has non-zero key.

        Supposed to be used by processors that don't function properly with keyed watermarks.