Class Edge

  • All Implemented Interfaces:
    DataSerializable, IdentifiedDataSerializable

    public class Edge
    extends java.lang.Object
    implements IdentifiedDataSerializable
    Represents an edge between two vertices in a DAG. Conceptually, data travels over the edge from the source vertex to the destination vertex. Practically, since the vertex is distributed across the cluster and across threads in each cluster member, the edge is implemented by a number of concurrent queues and network sender/receiver pairs.

    It is often desirable to arrange that all items belonging to the same collation key are received by the same processing unit (instance of Processor). This is achieved by configuring an appropriate Partitioner on the edge. The partitioner will determine the partition ID of each item and all items with the same partition ID will be routed to the same Processor instance. Depending on the value of edge's distributed property, the processor will be unique cluster-wide, or only within each member.

    A newly instantiated Edge is non-distributed with a UNICAST routing policy.

    Since:
    Jet 3.0
    • Field Detail

      • DISTRIBUTE_TO_ALL

        public static final Address DISTRIBUTE_TO_ALL
        An address returned by getDistributedTo() denoting an edge that distributes the items among all members.
        Since:
        Jet 4.3
    • Constructor Detail

      • Edge

        protected Edge()
      • Edge

        protected Edge​(@Nonnull
                       Vertex source,
                       int sourceOrdinal,
                       Vertex destination,
                       int destOrdinal)
    • Method Detail

      • between

        @Nonnull
        public static Edge between​(@Nonnull
                                   Vertex source,
                                   @Nonnull
                                   Vertex destination)
        Returns an edge between two vertices. The ordinal of the edge is 0 at both ends. Equivalent to from(source).to(destination).
        Parameters:
        source - the source vertex
        destination - the destination vertex
      • from

        @Nonnull
        public static Edge from​(@Nonnull
                                Vertex source)
        Returns an edge with the given source vertex and no destination vertex. The ordinal of the edge is 0. Typically followed by one of the to() method calls.
      • from

        @Nonnull
        public static Edge from​(@Nonnull
                                Vertex source,
                                int ordinal)
        Returns an edge with the given source vertex at the given ordinal and no destination vertex. Typically followed by a call to one of the to() methods.
      • to

        @Nonnull
        public Edge to​(@Nonnull
                       Vertex destination)
        Sets the destination vertex of this edge, with ordinal 0.
      • to

        @Nonnull
        public Edge to​(@Nonnull
                       Vertex destination,
                       int ordinal)
        Sets the destination vertex and ordinal of this edge.
      • getSource

        @Nonnull
        public Vertex getSource()
        Returns this edge's source vertex.
      • getDestination

        @Nullable
        public Vertex getDestination()
        Returns this edge's destination vertex.
      • getSourceName

        @Nonnull
        public java.lang.String getSourceName()
        Returns the name of the source vertex.
      • getSourceOrdinal

        public int getSourceOrdinal()
        Returns the ordinal of the edge at the source vertex.
      • getDestName

        @Nullable
        public java.lang.String getDestName()
        Returns the name of the destination vertex.
      • getDestOrdinal

        public int getDestOrdinal()
        Returns the ordinal of the edge at the destination vertex.
      • priority

        @Nonnull
        public Edge priority​(int priority)
        Sets the priority of the edge. A lower number means higher priority and the default is 0.

        Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.

        Possible deadlock

        If you split the output of one source vertex and later join the streams with different priorities, you're very likely to run into a deadlock. Consider this DAG:
         S --+---- V1 ----+--- J
              \          /
               +-- V2 --+
         
        The vertex J joins the streams, that were originally split from source S. Let's say the input from V1 has higher priority than the input from V2. In this case, no item from V2 will be processed by J before V1 completes, which presupposes that S also completes. But S cannot complete, because it can't emit all items to V2 because V2 is blocked by J, which is not processing its items. This is a deadlock.

        This DAG can work only if S emits as few items into both paths as can fit into the queues (see queue size configuration.

        Note

        Having different priority edges will cause postponing of the first snapshot until after upstream vertices of higher priority edges are completed. Reason: after receiving a barrier we stop processing items on that edge until the barrier is received from all other edges. However, we also don't process lower priority edges until higher priority edges are done, which prevents receiving the barrier on them, which in the end stalls the job indefinitely. Technically this applies only to EXACTLY_ONCE snapshot mode, but the snapshot is also postponed for AT_LEAST_ONCE jobs, because the snapshot won't complete until after all higher priority edges are completed and will increase the number of duplicately processed items.
      • getPriority

        public int getPriority()
        Returns the value of edge's priority, as explained on priority(int).
      • unicast

        @Nonnull
        public Edge unicast()
        Chooses the UNICAST routing policy for this edge. This policy is the default.
      • partitioned

        @Nonnull
        public <T> Edge partitioned​(@Nonnull
                                    FunctionEx<T,​?> extractKeyFn)
        Activates the PARTITIONED routing policy and applies the default Hazelcast partitioning strategy. The strategy is applied to the result of the extractKeyFn function.
      • partitioned

        @Nonnull
        public <T,​K> Edge partitioned​(@Nonnull
                                            FunctionEx<T,​K> extractKeyFn,
                                            @Nonnull
                                            Partitioner<? super K> partitioner)
        Activates the PARTITIONED routing policy and applies the provided partitioning strategy. The strategy is applied to the result of the extractKeyFn function.
      • allToOne

        @Nonnull
        public Edge allToOne​(java.lang.Object key)
        Activates a special-cased PARTITIONED routing policy where all items will be routed to the same partition ID, determined from the given key. It means that all items will be directed to the same processor and other processors will be idle.

        It is equivalent to using partitioned(t -> key), but it has a small optimization that the partition ID is not recalculated for each stream item.

      • broadcast

        @Nonnull
        public Edge broadcast()
        Activates the BROADCAST routing policy.
      • isolated

        @Nonnull
        public Edge isolated()
        Activates the ISOLATED routing policy which establishes isolated paths from upstream to downstream processors.

        Since all traffic will be local, this policy is not allowed on a distributed edge.

      • ordered

        public Edge ordered​(@Nonnull
                            ComparatorEx<?> comparator)
        Specifies that the data traveling on this edge is ordered according to the provided comparator. The edge maintains this property when merging the data coming from different upstream processors, so that the receiving processor observes them in the proper order. Every upstream processor must emit the data in the same order because the edge doesn't sort, it only prevents reordering while receiving.

        The implementation currently doesn't handle watermarks or barriers: if the source processors emit watermarks or you add a processing guarantee, the job will fail at runtime.

        Since:
        Jet 4.3
      • fanout

        @Nonnull
        public Edge fanout()
        Activates the FANOUT routing policy.
        Since:
        Jet 4.4
      • getPartitioner

        @Nullable
        public Partitioner<?> getPartitioner()
        Returns the instance encapsulating the partitioning strategy in effect on this edge.
      • isLocal

        public boolean isLocal()
        Returning if the edge is local.
      • distributed

        @Nonnull
        public Edge distributed()
        Declares that the edge is distributed. A non-distributed edge only transfers data within the same member. If the data source running on local member is distributed (produces only a slice of all the data on any given member), the local processors will not observe all the data. The same holds true when the data originates from an upstream distributed edge.

        A distributed edge allows all the data to be observed by all the processors (using the BROADCAST routing policy) and, more attractively, all the data with a given partition ID to be observed by the same unique processor, regardless of whether it is running on the local or a remote member (using the PARTITIONED routing policy).

        See Also:
        distributeTo(Address), local(), getDistributedTo()
      • distributeTo

        @Nonnull
        public Edge distributeTo​(@Nonnull
                                 Address targetMember)
        Declares that all items sent over this edge will be delivered to the specified member. Processors on other members will not receive any data.

        This option is most useful for sinks if we want to ensure that the results are written (or sent from) only that member.

        It's not suitable for fault-tolerant jobs. If the targetMember is not a member, the job can't be executed and will fail.

        Parameters:
        targetMember - the member to deliver the items to
        Since:
        Jet 4.3
        See Also:
        distributed(), local(), getDistributedTo()
      • isDistributed

        public boolean isDistributed()
        Says whether this edge distributes items among all members, as requested by the distributed() method.
      • getConfig

        @Nullable
        public EdgeConfig getConfig()
        Returns the EdgeConfig instance associated with this edge. Default value is null.
      • toString

        @Nonnull
        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • equals

        public boolean equals​(java.lang.Object obj)
        Overrides:
        equals in class java.lang.Object
      • hashCode

        public int hashCode()
        Overrides:
        hashCode in class java.lang.Object
      • writeData

        public void writeData​(@Nonnull
                              ObjectDataOutput out)
                       throws java.io.IOException
        Description copied from interface: DataSerializable
        Writes object fields to output stream
        Specified by:
        writeData in interface DataSerializable
        Parameters:
        out - output
        Throws:
        java.io.IOException - if an I/O error occurs. In particular, an IOException may be thrown if the output stream has been closed.
      • readData

        public void readData​(@Nonnull
                             ObjectDataInput in)
                      throws java.io.IOException
        Description copied from interface: DataSerializable
        Reads fields from the input stream
        Specified by:
        readData in interface DataSerializable
        Parameters:
        in - input
        Throws:
        java.io.IOException - if an I/O error occurs. In particular, an IOException may be thrown if the input stream has been closed.