Class Edge
- All Implemented Interfaces:
DataSerializable
,IdentifiedDataSerializable
vertices
in a DAG
.
Conceptually, data travels over the edge from the source vertex to the
destination vertex. Practically, since the vertex is distributed across
the cluster and across threads in each cluster member, the edge is
implemented by a number of concurrent queues and network sender/receiver
pairs.
It is often desirable to arrange that all items belonging to the same
collation key are received by the same processing unit (instance of
Processor
). This is achieved by configuring an appropriate
Partitioner
on the edge. The partitioner will determine the
partition ID of each item and all items with the same partition ID will
be routed to the same Processor
instance. Depending on the value
of edge's distributed property, the processor will be unique
cluster-wide, or only within each member.
A newly instantiated Edge is non-distributed with a UNICAST
routing policy.
- Since:
- Jet 3.0
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic enum
An edge describes a connection from many upstream processors to many downstream processors. -
Field Summary
Modifier and TypeFieldDescriptionstatic final Address
An address returned bygetDistributedTo()
denoting an edge that distributes the items among all members. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionallToOne()
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID.Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
.static Edge
Returns an edge between two vertices.Activates theBROADCAST
routing policy.Declares that the edge is distributed.distributeTo
(Address targetMember) Declares that all items sent over this edge will be delivered to the specified member.boolean
fanout()
Activates theFANOUT
routing policy.static Edge
Returns an edge with the given source vertex and no destination vertex.static Edge
Returns an edge with the given source vertex at the given ordinal and no destination vertex.int
Returns type identifier for this class.Returns theEdgeConfig
instance associated with this edge.Returns this edge's destination vertex.Returns the name of the destination vertex.int
Returns the ordinal of the edge at the destination vertex.Possible return values: null - route only to local members (after alocal()
call) "255.255.255.255:0 - route to all members (after adistributed()
call) else - route to specific member (after adistributeTo(com.hazelcast.cluster.Address)
call)int
Returns DataSerializableFactory factory ID for this class.ComparatorEx<?>
Returns the comparator defined on this edge usingordered(ComparatorEx)
.Partitioner<?>
Returns the instance encapsulating the partitioning strategy in effect on this edge.int
Returns the value of edge's priority, as explained onpriority(int)
.Returns theEdge.RoutingPolicy
in effect on the edge.Returns this edge's source vertex.Returns the name of the source vertex.int
Returns the ordinal of the edge at the source vertex.int
hashCode()
boolean
Says whether this edge distributes items among all members, as requested by thedistributed()
method.boolean
isLocal()
Returning if the edge is local.isolated()
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors.local()
Declares that the edge is local.ordered
(ComparatorEx<?> comparator) Specifies that the data traveling on this edge is ordered according to the provided comparator.<T> Edge
partitioned
(FunctionEx<T, ?> extractKeyFn) Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy.<T,
K> Edge partitioned
(FunctionEx<T, K> extractKeyFn, Partitioner<? super K> partitioner) Activates thePARTITIONED
routing policy and applies the provided partitioning strategy.priority
(int priority) Sets the priority of the edge.void
Reads fields from the input streamsetConfig
(EdgeConfig config) Assigns anEdgeConfig
to this edge.Sets the destination vertex of this edge, with ordinal 0.Sets the destination vertex and ordinal of this edge.toString()
unicast()
Chooses theUNICAST
routing policy for this edge.void
Writes object fields to output stream
-
Field Details
-
DISTRIBUTE_TO_ALL
An address returned bygetDistributedTo()
denoting an edge that distributes the items among all members.- Since:
- Jet 4.3
-
-
Constructor Details
-
Edge
protected Edge() -
Edge
-
-
Method Details
-
between
Returns an edge between two vertices. The ordinal of the edge is 0 at both ends. Equivalent tofrom(source).to(destination)
.- Parameters:
source
- the source vertexdestination
- the destination vertex
-
from
Returns an edge with the given source vertex and no destination vertex. The ordinal of the edge is 0. Typically followed by one of theto()
method calls. -
from
Returns an edge with the given source vertex at the given ordinal and no destination vertex. Typically followed by a call to one of theto()
methods. -
to
Sets the destination vertex of this edge, with ordinal 0. -
to
Sets the destination vertex and ordinal of this edge. -
getSource
Returns this edge's source vertex. -
getDestination
Returns this edge's destination vertex. -
getSourceName
Returns the name of the source vertex. -
getSourceOrdinal
public int getSourceOrdinal()Returns the ordinal of the edge at the source vertex. -
getDestName
Returns the name of the destination vertex. -
getDestOrdinal
public int getDestOrdinal()Returns the ordinal of the edge at the destination vertex. -
priority
Sets the priority of the edge. A lower number means higher priority and the default is 0.Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.
Possible deadlock
If you split the output of one source vertex and later join the streams with different priorities, you're very likely to run into a deadlock. Consider this DAG:S --+---- V1 ----+--- J \ / +-- V2 --+
The vertexJ
joins the streams, that were originally split from sourceS
. Let's say the input fromV1
has higher priority than the input fromV2
. In this case, no item fromV2
will be processed byJ
beforeV1
completes, which presupposes thatS
also completes. ButS
cannot complete, because it can't emit all items toV2
becauseV2
is blocked byJ
, which is not processing its items. This is a deadlock.This DAG can work only if
S
emits as few items into both paths as can fit into the queues (see queue size configuration.Note
Having different priority edges will cause postponing of the first snapshot until after upstream vertices of higher priority edges are completed. Reason: after receiving abarrier
we stop processing items on that edge until the barrier is received from all other edges. However, we also don't process lower priority edges until higher priority edges are done, which prevents receiving the barrier on them, which in the end stalls the job indefinitely. Technically this applies only toEXACTLY_ONCE
snapshot mode, but the snapshot is also postponed forAT_LEAST_ONCE
jobs, because the snapshot won't complete until after all higher priority edges are completed and will increase the number of duplicately processed items. -
getPriority
public int getPriority()Returns the value of edge's priority, as explained onpriority(int)
. -
unicast
Chooses theUNICAST
routing policy for this edge. This policy is the default. -
partitioned
Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy. The strategy is applied to the result of theextractKeyFn
function. -
partitioned
@Nonnull public <T,K> Edge partitioned(@Nonnull FunctionEx<T, K> extractKeyFn, @Nonnull Partitioner<? super K> partitioner) Activates thePARTITIONED
routing policy and applies the provided partitioning strategy. The strategy is applied to the result of theextractKeyFn
function. -
allToOne
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
. It means that all items will be directed to the same processor and other processors will be idle.It is equivalent to using
partitioned(t -> key)
, but it has a small optimization that the partition ID is not recalculated for each stream item. -
allToOne
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID. It means that all items will be directed to the same processor and other processors will be idle.- Since:
- 5.4
- See Also:
-
broadcast
Activates theBROADCAST
routing policy. -
isolated
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors.Since all traffic will be local, this policy is not allowed on a distributed edge.
-
ordered
Specifies that the data traveling on this edge is ordered according to the provided comparator. The edge maintains this property when merging the data coming from different upstream processors, so that the receiving processor observes them in the proper order. Every upstream processor must emit the data in the same order because the edge doesn't sort, it only prevents reordering while receiving.The implementation currently doesn't handle watermarks or barriers: if the source processors emit watermarks or you add a processing guarantee, the job will fail at runtime.
- Since:
- Jet 4.3
-
fanout
Activates theFANOUT
routing policy.- Since:
- Jet 4.4
-
getPartitioner
Returns the instance encapsulating the partitioning strategy in effect on this edge. -
getOrderComparator
Returns the comparator defined on this edge usingordered(ComparatorEx)
.- Since:
- Jet 4.3
-
getRoutingPolicy
Returns theEdge.RoutingPolicy
in effect on the edge. -
local
Declares that the edge is local. A local edge only transfers data within the same member, network is not involved. This setting is the default.- Since:
- Jet 4.3
- See Also:
-
isLocal
public boolean isLocal()Returning if the edge is local. -
distributed
Declares that the edge is distributed. A non-distributed edge only transfers data within the same member. If the data source running on local member is distributed (produces only a slice of all the data on any given member), the local processors will not observe all the data. The same holds true when the data originates from an upstream distributed edge.A distributed edge allows all the data to be observed by all the processors (using the
BROADCAST
routing policy) and, more attractively, all the data with a given partition ID to be observed by the same unique processor, regardless of whether it is running on the local or a remote member (using thePARTITIONED
routing policy).- See Also:
-
distributeTo
Declares that all items sent over this edge will be delivered to the specified member. Processors on other members will not receive any data.This option is most useful for sinks if we want to ensure that the results are written (or sent from) only that member.
It's not suitable for fault-tolerant jobs. If the
targetMember
is not a member, the job can't be executed and will fail.- Parameters:
targetMember
- the member to deliver the items to- Since:
- Jet 4.3
- See Also:
-
getDistributedTo
Possible return values:- null - route only to local members (after a
local()
call) - "255.255.255.255:0 - route to all members (after a
distributed()
call) - else - route to specific member (after a
distributeTo(com.hazelcast.cluster.Address)
call)
- Since:
- Jet 4.3
- null - route only to local members (after a
-
isDistributed
public boolean isDistributed()Says whether this edge distributes items among all members, as requested by thedistributed()
method. -
getConfig
Returns theEdgeConfig
instance associated with this edge. Default value isnull
. -
setConfig
Assigns anEdgeConfig
to this edge. Ifnull
is supplied, the edge will useJetConfig.getDefaultEdgeConfig()
. -
toString
-
equals
-
hashCode
public int hashCode() -
writeData
Description copied from interface:DataSerializable
Writes object fields to output stream- Specified by:
writeData
in interfaceDataSerializable
- Parameters:
out
- output- Throws:
IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the output stream has been closed.
-
readData
Description copied from interface:DataSerializable
Reads fields from the input stream- Specified by:
readData
in interfaceDataSerializable
- Parameters:
in
- input- Throws:
IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the input stream has been closed.
-
getFactoryId
public int getFactoryId()Description copied from interface:IdentifiedDataSerializable
Returns DataSerializableFactory factory ID for this class.- Specified by:
getFactoryId
in interfaceIdentifiedDataSerializable
- Returns:
- factory ID
-
getClassId
public int getClassId()Description copied from interface:IdentifiedDataSerializable
Returns type identifier for this class. It should be unique per DataSerializableFactory.- Specified by:
getClassId
in interfaceIdentifiedDataSerializable
- Returns:
- type ID
-