Class Edge
- java.lang.Object
-
- com.hazelcast.jet.core.Edge
-
- All Implemented Interfaces:
DataSerializable
,IdentifiedDataSerializable
public class Edge extends java.lang.Object implements IdentifiedDataSerializable
Represents an edge between twovertices
in aDAG
. Conceptually, data travels over the edge from the source vertex to the destination vertex. Practically, since the vertex is distributed across the cluster and across threads in each cluster member, the edge is implemented by a number of concurrent queues and network sender/receiver pairs.It is often desirable to arrange that all items belonging to the same collation key are received by the same processing unit (instance of
Processor
). This is achieved by configuring an appropriatePartitioner
on the edge. The partitioner will determine the partition ID of each item and all items with the same partition ID will be routed to the sameProcessor
instance. Depending on the value of edge's distributed property, the processor will be unique cluster-wide, or only within each member.A newly instantiated Edge is non-distributed with a
UNICAST
routing policy.- Since:
- Jet 3.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Edge.RoutingPolicy
An edge describes a connection from many upstream processors to many downstream processors.
-
Field Summary
Fields Modifier and Type Field Description static Address
DISTRIBUTE_TO_ALL
An address returned bygetDistributedTo()
denoting an edge that distributes the items among all members.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description Edge
allToOne(java.lang.Object key)
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
.static Edge
between(Vertex source, Vertex destination)
Returns an edge between two vertices.Edge
broadcast()
Activates theBROADCAST
routing policy.Edge
distributed()
Declares that the edge is distributed.Edge
distributeTo(Address targetMember)
Declares that all items sent over this edge will be delivered to the specified member.boolean
equals(java.lang.Object obj)
Edge
fanout()
Activates theFANOUT
routing policy.static Edge
from(Vertex source)
Returns an edge with the given source vertex and no destination vertex.static Edge
from(Vertex source, int ordinal)
Returns an edge with the given source vertex at the given ordinal and no destination vertex.int
getClassId()
Returns type identifier for this class.EdgeConfig
getConfig()
Returns theEdgeConfig
instance associated with this edge.Vertex
getDestination()
Returns this edge's destination vertex.java.lang.String
getDestName()
Returns the name of the destination vertex.int
getDestOrdinal()
Returns the ordinal of the edge at the destination vertex.Address
getDistributedTo()
Possible return values: null - route only to local members (after alocal()
call) "255.255.255.255:0 - route to all members (after adistributed()
call) else - route to specific member (after adistributeTo(com.hazelcast.cluster.Address)
call)int
getFactoryId()
Returns DataSerializableFactory factory ID for this class.ComparatorEx<?>
getOrderComparator()
Returns the comparator defined on this edge usingordered(ComparatorEx)
.Partitioner<?>
getPartitioner()
Returns the instance encapsulating the partitioning strategy in effect on this edge.int
getPriority()
Returns the value of edge's priority, as explained onpriority(int)
.Edge.RoutingPolicy
getRoutingPolicy()
Returns theEdge.RoutingPolicy
in effect on the edge.Vertex
getSource()
Returns this edge's source vertex.java.lang.String
getSourceName()
Returns the name of the source vertex.int
getSourceOrdinal()
Returns the ordinal of the edge at the source vertex.int
hashCode()
boolean
isDistributed()
Says whether this edge distributes items among all members, as requested by thedistributed()
method.boolean
isLocal()
Returning if the edge is local.Edge
isolated()
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors.Edge
local()
Declares that the edge is local.Edge
ordered(ComparatorEx<?> comparator)
Specifies that the data traveling on this edge is ordered according to the provided comparator.<T> Edge
partitioned(FunctionEx<T,?> extractKeyFn)
Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy.<T,K>
Edgepartitioned(FunctionEx<T,K> extractKeyFn, Partitioner<? super K> partitioner)
Activates thePARTITIONED
routing policy and applies the provided partitioning strategy.Edge
priority(int priority)
Sets the priority of the edge.void
readData(ObjectDataInput in)
Reads fields from the input streamEdge
setConfig(EdgeConfig config)
Assigns anEdgeConfig
to this edge.Edge
to(Vertex destination)
Sets the destination vertex of this edge, with ordinal 0.Edge
to(Vertex destination, int ordinal)
Sets the destination vertex and ordinal of this edge.java.lang.String
toString()
Edge
unicast()
Chooses theUNICAST
routing policy for this edge.void
writeData(ObjectDataOutput out)
Writes object fields to output stream
-
-
-
Field Detail
-
DISTRIBUTE_TO_ALL
public static final Address DISTRIBUTE_TO_ALL
An address returned bygetDistributedTo()
denoting an edge that distributes the items among all members.- Since:
- Jet 4.3
-
-
Method Detail
-
between
@Nonnull public static Edge between(@Nonnull Vertex source, @Nonnull Vertex destination)
Returns an edge between two vertices. The ordinal of the edge is 0 at both ends. Equivalent tofrom(source).to(destination)
.- Parameters:
source
- the source vertexdestination
- the destination vertex
-
from
@Nonnull public static Edge from(@Nonnull Vertex source)
Returns an edge with the given source vertex and no destination vertex. The ordinal of the edge is 0. Typically followed by one of theto()
method calls.
-
from
@Nonnull public static Edge from(@Nonnull Vertex source, int ordinal)
Returns an edge with the given source vertex at the given ordinal and no destination vertex. Typically followed by a call to one of theto()
methods.
-
to
@Nonnull public Edge to(@Nonnull Vertex destination)
Sets the destination vertex of this edge, with ordinal 0.
-
to
@Nonnull public Edge to(@Nonnull Vertex destination, int ordinal)
Sets the destination vertex and ordinal of this edge.
-
getSource
@Nonnull public Vertex getSource()
Returns this edge's source vertex.
-
getDestination
@Nullable public Vertex getDestination()
Returns this edge's destination vertex.
-
getSourceName
@Nonnull public java.lang.String getSourceName()
Returns the name of the source vertex.
-
getSourceOrdinal
public int getSourceOrdinal()
Returns the ordinal of the edge at the source vertex.
-
getDestName
@Nullable public java.lang.String getDestName()
Returns the name of the destination vertex.
-
getDestOrdinal
public int getDestOrdinal()
Returns the ordinal of the edge at the destination vertex.
-
priority
@Nonnull public Edge priority(int priority)
Sets the priority of the edge. A lower number means higher priority and the default is 0.Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.
Possible deadlock
If you split the output of one source vertex and later join the streams with different priorities, you're very likely to run into a deadlock. Consider this DAG:S --+---- V1 ----+--- J \ / +-- V2 --+
The vertexJ
joins the streams, that were originally split from sourceS
. Let's say the input fromV1
has higher priority than the input fromV2
. In this case, no item fromV2
will be processed byJ
beforeV1
completes, which presupposes thatS
also completes. ButS
cannot complete, because it can't emit all items toV2
becauseV2
is blocked byJ
, which is not processing its items. This is a deadlock.This DAG can work only if
S
emits as few items into both paths as can fit into the queues (see queue size configuration.Note
Having different priority edges will cause postponing of the first snapshot until after upstream vertices of higher priority edges are completed. Reason: after receiving abarrier
we stop processing items on that edge until the barrier is received from all other edges. However, we also don't process lower priority edges until higher priority edges are done, which prevents receiving the barrier on them, which in the end stalls the job indefinitely. Technically this applies only toEXACTLY_ONCE
snapshot mode, but the snapshot is also postponed forAT_LEAST_ONCE
jobs, because the snapshot won't complete until after all higher priority edges are completed and will increase the number of duplicately processed items.
-
getPriority
public int getPriority()
Returns the value of edge's priority, as explained onpriority(int)
.
-
unicast
@Nonnull public Edge unicast()
Chooses theUNICAST
routing policy for this edge. This policy is the default.
-
partitioned
@Nonnull public <T> Edge partitioned(@Nonnull FunctionEx<T,?> extractKeyFn)
Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy. The strategy is applied to the result of theextractKeyFn
function.
-
partitioned
@Nonnull public <T,K> Edge partitioned(@Nonnull FunctionEx<T,K> extractKeyFn, @Nonnull Partitioner<? super K> partitioner)
Activates thePARTITIONED
routing policy and applies the provided partitioning strategy. The strategy is applied to the result of theextractKeyFn
function.
-
allToOne
@Nonnull public Edge allToOne(java.lang.Object key)
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
. It means that all items will be directed to the same processor and other processors will be idle.It is equivalent to using
partitioned(t -> key)
, but it has a small optimization that the partition ID is not recalculated for each stream item.
-
isolated
@Nonnull public Edge isolated()
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors.Since all traffic will be local, this policy is not allowed on a distributed edge.
-
ordered
public Edge ordered(@Nonnull ComparatorEx<?> comparator)
Specifies that the data traveling on this edge is ordered according to the provided comparator. The edge maintains this property when merging the data coming from different upstream processors, so that the receiving processor observes them in the proper order. Every upstream processor must emit the data in the same order because the edge doesn't sort, it only prevents reordering while receiving.The implementation currently doesn't handle watermarks or barriers: if the source processors emit watermarks or you add a processing guarantee, the job will fail at runtime.
- Since:
- Jet 4.3
-
getPartitioner
@Nullable public Partitioner<?> getPartitioner()
Returns the instance encapsulating the partitioning strategy in effect on this edge.
-
getOrderComparator
@Nullable public ComparatorEx<?> getOrderComparator()
Returns the comparator defined on this edge usingordered(ComparatorEx)
.- Since:
- Jet 4.3
-
getRoutingPolicy
@Nonnull public Edge.RoutingPolicy getRoutingPolicy()
Returns theEdge.RoutingPolicy
in effect on the edge.
-
local
@Nonnull public Edge local()
Declares that the edge is local. A local edge only transfers data within the same member, network is not involved. This setting is the default.- Since:
- Jet 4.3
- See Also:
distributed()
,distributeTo(Address)
,getDistributedTo()
-
isLocal
public boolean isLocal()
Returning if the edge is local.
-
distributed
@Nonnull public Edge distributed()
Declares that the edge is distributed. A non-distributed edge only transfers data within the same member. If the data source running on local member is distributed (produces only a slice of all the data on any given member), the local processors will not observe all the data. The same holds true when the data originates from an upstream distributed edge.A distributed edge allows all the data to be observed by all the processors (using the
BROADCAST
routing policy) and, more attractively, all the data with a given partition ID to be observed by the same unique processor, regardless of whether it is running on the local or a remote member (using thePARTITIONED
routing policy).- See Also:
distributeTo(Address)
,local()
,getDistributedTo()
-
distributeTo
@Nonnull public Edge distributeTo(@Nonnull Address targetMember)
Declares that all items sent over this edge will be delivered to the specified member. Processors on other members will not receive any data.This option is most useful for sinks if we want to ensure that the results are written (or sent from) only that member.
It's not suitable for fault-tolerant jobs. If the
targetMember
is not a member, the job can't be executed and will fail.- Parameters:
targetMember
- the member to deliver the items to- Since:
- Jet 4.3
- See Also:
distributed()
,local()
,getDistributedTo()
-
getDistributedTo
@Nullable public Address getDistributedTo()
Possible return values:- null - route only to local members (after a
local()
call) - "255.255.255.255:0 - route to all members (after a
distributed()
call) - else - route to specific member (after a
distributeTo(com.hazelcast.cluster.Address)
call)
- Since:
- Jet 4.3
- null - route only to local members (after a
-
isDistributed
public boolean isDistributed()
Says whether this edge distributes items among all members, as requested by thedistributed()
method.
-
getConfig
@Nullable public EdgeConfig getConfig()
Returns theEdgeConfig
instance associated with this edge. Default value isnull
.
-
setConfig
@Nonnull public Edge setConfig(@Nullable EdgeConfig config)
Assigns anEdgeConfig
to this edge. Ifnull
is supplied, the edge will useJetConfig.getDefaultEdgeConfig()
.
-
toString
@Nonnull public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
equals
public boolean equals(java.lang.Object obj)
- Overrides:
equals
in classjava.lang.Object
-
hashCode
public int hashCode()
- Overrides:
hashCode
in classjava.lang.Object
-
writeData
public void writeData(@Nonnull ObjectDataOutput out) throws java.io.IOException
Description copied from interface:DataSerializable
Writes object fields to output stream- Specified by:
writeData
in interfaceDataSerializable
- Parameters:
out
- output- Throws:
java.io.IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the output stream has been closed.
-
readData
public void readData(@Nonnull ObjectDataInput in) throws java.io.IOException
Description copied from interface:DataSerializable
Reads fields from the input stream- Specified by:
readData
in interfaceDataSerializable
- Parameters:
in
- input- Throws:
java.io.IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the input stream has been closed.
-
getFactoryId
public int getFactoryId()
Description copied from interface:IdentifiedDataSerializable
Returns DataSerializableFactory factory ID for this class.- Specified by:
getFactoryId
in interfaceIdentifiedDataSerializable
- Returns:
- factory ID
-
getClassId
public int getClassId()
Description copied from interface:IdentifiedDataSerializable
Returns type identifier for this class. It should be unique per DataSerializableFactory.- Specified by:
getClassId
in interfaceIdentifiedDataSerializable
- Returns:
- type ID
-
-