Class Edge

java.lang.Object
com.hazelcast.jet.core.Edge
All Implemented Interfaces:
DataSerializable, IdentifiedDataSerializable

public class Edge extends Object implements IdentifiedDataSerializable
Represents an edge between two vertices in a DAG. Conceptually, data travels over the edge from the source vertex to the destination vertex. Practically, since the vertex is distributed across the cluster and across threads in each cluster member, the edge is implemented by a number of concurrent queues and network sender/receiver pairs.

It is often desirable to arrange that all items belonging to the same collation key are received by the same processing unit (instance of Processor). This is achieved by configuring an appropriate Partitioner on the edge. The partitioner will determine the partition ID of each item and all items with the same partition ID will be routed to the same Processor instance. Depending on the value of edge's distributed property, the processor will be unique cluster-wide, or only within each member.

A newly instantiated Edge is non-distributed with a UNICAST routing policy.

Since:
Jet 3.0
  • Field Details

    • DISTRIBUTE_TO_ALL

      public static final Address DISTRIBUTE_TO_ALL
      An address returned by getDistributedTo() denoting an edge that distributes the items among all members.
      Since:
      Jet 4.3
  • Constructor Details

    • Edge

      protected Edge()
    • Edge

      protected Edge(@Nonnull Vertex source, int sourceOrdinal, Vertex destination, int destOrdinal)
  • Method Details

    • between

      @Nonnull public static Edge between(@Nonnull Vertex source, @Nonnull Vertex destination)
      Returns an edge between two vertices. The ordinal of the edge is 0 at both ends. Equivalent to from(source).to(destination).
      Parameters:
      source - the source vertex
      destination - the destination vertex
    • from

      @Nonnull public static Edge from(@Nonnull Vertex source)
      Returns an edge with the given source vertex and no destination vertex. The ordinal of the edge is 0. Typically followed by one of the to() method calls.
    • from

      @Nonnull public static Edge from(@Nonnull Vertex source, int ordinal)
      Returns an edge with the given source vertex at the given ordinal and no destination vertex. Typically followed by a call to one of the to() methods.
    • to

      @Nonnull public Edge to(@Nonnull Vertex destination)
      Sets the destination vertex of this edge, with ordinal 0.
    • to

      @Nonnull public Edge to(@Nonnull Vertex destination, int ordinal)
      Sets the destination vertex and ordinal of this edge.
    • getSource

      @Nonnull public Vertex getSource()
      Returns this edge's source vertex.
    • getDestination

      @Nullable public Vertex getDestination()
      Returns this edge's destination vertex.
    • getSourceName

      @Nonnull public String getSourceName()
      Returns the name of the source vertex.
    • getSourceOrdinal

      public int getSourceOrdinal()
      Returns the ordinal of the edge at the source vertex.
    • getDestName

      @Nullable public String getDestName()
      Returns the name of the destination vertex.
    • getDestOrdinal

      public int getDestOrdinal()
      Returns the ordinal of the edge at the destination vertex.
    • priority

      @Nonnull public Edge priority(int priority)
      Sets the priority of the edge. A lower number means higher priority and the default is 0.

      Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.

      Possible deadlock

      If you split the output of one source vertex and later join the streams with different priorities, you're very likely to run into a deadlock. Consider this DAG:
       S --+---- V1 ----+--- J
            \          /
             +-- V2 --+
       
      The vertex J joins the streams, that were originally split from source S. Let's say the input from V1 has higher priority than the input from V2. In this case, no item from V2 will be processed by J before V1 completes, which presupposes that S also completes. But S cannot complete, because it can't emit all items to V2 because V2 is blocked by J, which is not processing its items. This is a deadlock.

      This DAG can work only if S emits as few items into both paths as can fit into the queues (see queue size configuration.

      Note

      Having different priority edges will cause postponing of the first snapshot until after upstream vertices of higher priority edges are completed. Reason: after receiving a barrier we stop processing items on that edge until the barrier is received from all other edges. However, we also don't process lower priority edges until higher priority edges are done, which prevents receiving the barrier on them, which in the end stalls the job indefinitely. Technically this applies only to EXACTLY_ONCE snapshot mode, but the snapshot is also postponed for AT_LEAST_ONCE jobs, because the snapshot won't complete until after all higher priority edges are completed and will increase the number of duplicately processed items.
    • getPriority

      public int getPriority()
      Returns the value of edge's priority, as explained on priority(int).
    • unicast

      @Nonnull public Edge unicast()
      Chooses the UNICAST routing policy for this edge. This policy is the default.
    • partitioned

      @Nonnull public <T> Edge partitioned(@Nonnull FunctionEx<T,?> extractKeyFn)
      Activates the PARTITIONED routing policy and applies the default Hazelcast partitioning strategy. The strategy is applied to the result of the extractKeyFn function.
    • partitioned

      @Nonnull public <T, K> Edge partitioned(@Nonnull FunctionEx<T,K> extractKeyFn, @Nonnull Partitioner<? super K> partitioner)
      Activates the PARTITIONED routing policy and applies the provided partitioning strategy. The strategy is applied to the result of the extractKeyFn function.
    • allToOne

      @Nonnull public Edge allToOne(Object key)
      Activates a special-cased PARTITIONED routing policy where all items will be routed to the same partition ID, determined from the given key. It means that all items will be directed to the same processor and other processors will be idle.

      It is equivalent to using partitioned(t -> key), but it has a small optimization that the partition ID is not recalculated for each stream item.

    • allToOne

      @Nonnull public Edge allToOne()
      Activates a special-cased PARTITIONED routing policy where all items will be routed to the same partition ID. It means that all items will be directed to the same processor and other processors will be idle.
      Since:
      5.4
      See Also:
    • broadcast

      @Nonnull public Edge broadcast()
      Activates the BROADCAST routing policy.
    • isolated

      @Nonnull public Edge isolated()
      Activates the ISOLATED routing policy which establishes isolated paths from upstream to downstream processors.

      Since all traffic will be local, this policy is not allowed on a distributed edge.

    • ordered

      public Edge ordered(@Nonnull ComparatorEx<?> comparator)
      Specifies that the data traveling on this edge is ordered according to the provided comparator. The edge maintains this property when merging the data coming from different upstream processors, so that the receiving processor observes them in the proper order. Every upstream processor must emit the data in the same order because the edge doesn't sort, it only prevents reordering while receiving.

      The implementation currently doesn't handle watermarks or barriers: if the source processors emit watermarks or you add a processing guarantee, the job will fail at runtime.

      Since:
      Jet 4.3
    • fanout

      @Nonnull public Edge fanout()
      Activates the FANOUT routing policy.
      Since:
      Jet 4.4
    • getPartitioner

      @Nullable public Partitioner<?> getPartitioner()
      Returns the instance encapsulating the partitioning strategy in effect on this edge.
    • getOrderComparator

      @Nullable public ComparatorEx<?> getOrderComparator()
      Returns the comparator defined on this edge using ordered(ComparatorEx).
      Since:
      Jet 4.3
    • getRoutingPolicy

      @Nonnull public Edge.RoutingPolicy getRoutingPolicy()
      Returns the Edge.RoutingPolicy in effect on the edge.
    • local

      @Nonnull public Edge local()
      Declares that the edge is local. A local edge only transfers data within the same member, network is not involved. This setting is the default.
      Since:
      Jet 4.3
      See Also:
    • isLocal

      public boolean isLocal()
      Returning if the edge is local.
    • distributed

      @Nonnull public Edge distributed()
      Declares that the edge is distributed. A non-distributed edge only transfers data within the same member. If the data source running on local member is distributed (produces only a slice of all the data on any given member), the local processors will not observe all the data. The same holds true when the data originates from an upstream distributed edge.

      A distributed edge allows all the data to be observed by all the processors (using the BROADCAST routing policy) and, more attractively, all the data with a given partition ID to be observed by the same unique processor, regardless of whether it is running on the local or a remote member (using the PARTITIONED routing policy).

      See Also:
    • distributeTo

      @Nonnull public Edge distributeTo(@Nonnull Address targetMember)
      Declares that all items sent over this edge will be delivered to the specified member. Processors on other members will not receive any data.

      This option is most useful for sinks if we want to ensure that the results are written (or sent from) only that member.

      It's not suitable for fault-tolerant jobs. If the targetMember is not a member, the job can't be executed and will fail.

      Parameters:
      targetMember - the member to deliver the items to
      Since:
      Jet 4.3
      See Also:
    • getDistributedTo

      @Nullable public Address getDistributedTo()
      Possible return values:
      Since:
      Jet 4.3
    • isDistributed

      public boolean isDistributed()
      Says whether this edge distributes items among all members, as requested by the distributed() method.
    • getConfig

      @Nullable public EdgeConfig getConfig()
      Returns the EdgeConfig instance associated with this edge. Default value is null.
    • setConfig

      @Nonnull public Edge setConfig(@Nullable EdgeConfig config)
      Assigns an EdgeConfig to this edge. If null is supplied, the edge will use JetConfig.getDefaultEdgeConfig().
    • toString

      @Nonnull public String toString()
      Overrides:
      toString in class Object
    • equals

      public boolean equals(Object obj)
      Overrides:
      equals in class Object
    • hashCode

      public int hashCode()
      Overrides:
      hashCode in class Object
    • writeData

      public void writeData(@Nonnull ObjectDataOutput out) throws IOException
      Description copied from interface: DataSerializable
      Writes object fields to output stream
      Specified by:
      writeData in interface DataSerializable
      Parameters:
      out - output
      Throws:
      IOException - if an I/O error occurs. In particular, an IOException may be thrown if the output stream has been closed.
    • readData

      public void readData(@Nonnull ObjectDataInput in) throws IOException
      Description copied from interface: DataSerializable
      Reads fields from the input stream
      Specified by:
      readData in interface DataSerializable
      Parameters:
      in - input
      Throws:
      IOException - if an I/O error occurs. In particular, an IOException may be thrown if the input stream has been closed.
    • getFactoryId

      public int getFactoryId()
      Description copied from interface: IdentifiedDataSerializable
      Returns DataSerializableFactory factory ID for this class.
      Specified by:
      getFactoryId in interface IdentifiedDataSerializable
      Returns:
      factory ID
    • getClassId

      public int getClassId()
      Description copied from interface: IdentifiedDataSerializable
      Returns type identifier for this class. It should be unique per DataSerializableFactory.
      Specified by:
      getClassId in interface IdentifiedDataSerializable
      Returns:
      type ID