public class Edge extends Object implements IdentifiedDataSerializable
vertices
in a DAG
.
Conceptually, data travels over the edge from the source vertex to the
destination vertex. Practically, since the vertex is distributed across
the cluster and across threads in each cluster member, the edge is
implemented by a number of concurrent queues and network sender/receiver
pairs.
It is often desirable to arrange that all items belonging to the same
collation key are received by the same processing unit (instance of
Processor
). This is achieved by configuring an appropriate
Partitioner
on the edge. The partitioner will determine the
partition ID of each item and all items with the same partition ID will
be routed to the same Processor
instance. Depending on the value
of edge's distributed property, the processor will be unique
cluster-wide, or only within each member.
A newly instantiated Edge is non-distributed with a UNICAST
routing policy.
Modifier and Type | Class and Description |
---|---|
static class |
Edge.RoutingPolicy
An edge describes a connection from many upstream processors to many
downstream processors.
|
Modifier and Type | Field and Description |
---|---|
static Address |
DISTRIBUTE_TO_ALL
An address returned by
getDistributedTo() denoting an edge that
distributes the items among all members. |
Modifier | Constructor and Description |
---|---|
protected |
Edge() |
protected |
Edge(Vertex source,
int sourceOrdinal,
Vertex destination,
int destOrdinal) |
Modifier and Type | Method and Description |
---|---|
Edge |
allToOne(Object key)
Activates a special-cased
PARTITIONED
routing policy where all items will be routed to the same partition ID,
determined from the given key . |
static Edge |
between(Vertex source,
Vertex destination)
Returns an edge between two vertices.
|
Edge |
broadcast()
Activates the
BROADCAST routing policy. |
Edge |
distributed()
Declares that the edge is distributed.
|
Edge |
distributeTo(Address targetMember)
Declares that all items sent over this edge will be delivered to the
specified member.
|
boolean |
equals(Object obj) |
Edge |
fanout()
Activates the
FANOUT routing policy. |
static Edge |
from(Vertex source)
Returns an edge with the given source vertex and no destination vertex.
|
static Edge |
from(Vertex source,
int ordinal)
Returns an edge with the given source vertex at the given ordinal
and no destination vertex.
|
int |
getClassId()
Returns type identifier for this class.
|
EdgeConfig |
getConfig()
Returns the
EdgeConfig instance associated with this edge. |
Vertex |
getDestination()
Returns this edge's destination vertex.
|
String |
getDestName()
Returns the name of the destination vertex.
|
int |
getDestOrdinal()
Returns the ordinal of the edge at the destination vertex.
|
Address |
getDistributedTo()
Possible return values:
null - route only to local members (after a
local()
call)
"255.255.255.255:0 - route to all members (after a distributed() call)
else - route to specific member (after a distributeTo(com.hazelcast.cluster.Address)
call)
|
int |
getFactoryId()
Returns DataSerializableFactory factory ID for this class.
|
ComparatorEx<?> |
getOrderComparator()
Returns the comparator defined on this edge using
ordered(ComparatorEx) . |
Partitioner<?> |
getPartitioner()
Returns the instance encapsulating the partitioning strategy in effect
on this edge.
|
int |
getPriority()
Returns the value of edge's priority, as explained on
priority(int) . |
Edge.RoutingPolicy |
getRoutingPolicy()
Returns the
Edge.RoutingPolicy in effect on the edge. |
Vertex |
getSource()
Returns this edge's source vertex.
|
String |
getSourceName()
Returns the name of the source vertex.
|
int |
getSourceOrdinal()
Returns the ordinal of the edge at the source vertex.
|
int |
hashCode() |
boolean |
isDistributed()
Says whether this edge distributes items among all members, as requested
by the
distributed() method. |
boolean |
isLocal()
Returning if the edge is local.
|
Edge |
isolated()
Activates the
ISOLATED routing policy
which establishes isolated paths from upstream to downstream processors. |
Edge |
local()
Declares that the edge is local.
|
Edge |
ordered(ComparatorEx<?> comparator)
Specifies that the data traveling on this edge is ordered according to
the provided comparator.
|
<T> Edge |
partitioned(FunctionEx<T,?> extractKeyFn)
Activates the
PARTITIONED routing
policy and applies the default
Hazelcast partitioning strategy. |
<T,K> Edge |
partitioned(FunctionEx<T,K> extractKeyFn,
Partitioner<? super K> partitioner)
Activates the
PARTITIONED routing
policy and applies the provided partitioning strategy. |
Edge |
priority(int priority)
Sets the priority of the edge.
|
void |
readData(ObjectDataInput in)
Reads fields from the input stream
|
Edge |
setConfig(EdgeConfig config)
Assigns an
EdgeConfig to this edge. |
Edge |
to(Vertex destination)
Sets the destination vertex of this edge, with ordinal 0.
|
Edge |
to(Vertex destination,
int ordinal)
Sets the destination vertex and ordinal of this edge.
|
String |
toString() |
Edge |
unicast()
Chooses the
UNICAST routing policy for
this edge. |
void |
writeData(ObjectDataOutput out)
Writes object fields to output stream
|
public static final Address DISTRIBUTE_TO_ALL
getDistributedTo()
denoting an edge that
distributes the items among all members.@Nonnull public static Edge between(@Nonnull Vertex source, @Nonnull Vertex destination)
from(source).to(destination)
.source
- the source vertexdestination
- the destination vertex@Nonnull public static Edge from(@Nonnull Vertex source)
to()
method calls.@Nonnull public static Edge from(@Nonnull Vertex source, int ordinal)
to()
methods.@Nonnull public Edge to(@Nonnull Vertex destination)
@Nonnull public Edge to(@Nonnull Vertex destination, int ordinal)
public int getSourceOrdinal()
public int getDestOrdinal()
@Nonnull public Edge priority(int priority)
Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.
S --+---- V1 ----+--- J \ / +-- V2 --+The vertex
J
joins the streams, that were originally split from
source S
. Let's say the input from V1
has higher
priority than the input from V2
. In this case, no item from
V2
will be processed by J
before V1
completes,
which presupposes that S
also completes. But S
cannot
complete, because it can't emit all items to V2
because V2
is blocked by J
, which is not processing its items. This is
a deadlock.
This DAG can work only if S
emits as few items into both paths
as can fit into the queues (see queue size configuration.
barrier
we stop
processing items on that edge until the barrier is received from all
other edges. However, we also don't process lower priority edges until
higher priority edges are done, which prevents receiving the barrier on
them, which in the end stalls the job indefinitely. Technically this
applies only to EXACTLY_ONCE
snapshot mode, but the snapshot is also postponed for AT_LEAST_ONCE
jobs, because the snapshot won't complete until after all
higher priority edges are completed and will increase the number of
duplicately processed items.public int getPriority()
priority(int)
.@Nonnull public Edge unicast()
UNICAST
routing policy for
this edge. This policy is the default.@Nonnull public <T> Edge partitioned(@Nonnull FunctionEx<T,?> extractKeyFn)
PARTITIONED
routing
policy and applies the default
Hazelcast partitioning strategy. The strategy is applied to the result of
the extractKeyFn
function.@Nonnull public <T,K> Edge partitioned(@Nonnull FunctionEx<T,K> extractKeyFn, @Nonnull Partitioner<? super K> partitioner)
PARTITIONED
routing
policy and applies the provided partitioning strategy. The strategy
is applied to the result of the extractKeyFn
function.@Nonnull public Edge allToOne(Object key)
PARTITIONED
routing policy where all items will be routed to the same partition ID,
determined from the given key
. It means that all items will be
directed to the same processor and other processors will be idle.
It is equivalent to using partitioned(t -> key)
, but it has
a small optimization that the partition ID is not recalculated
for each stream item.
@Nonnull public Edge isolated()
ISOLATED
routing policy
which establishes isolated paths from upstream to downstream processors.
Since all traffic will be local, this policy is not allowed on a distributed edge.
public Edge ordered(@Nonnull ComparatorEx<?> comparator)
The implementation currently doesn't handle watermarks or barriers: if the source processors emit watermarks or you add a processing guarantee, the job will fail at runtime.
@Nullable public Partitioner<?> getPartitioner()
@Nullable public ComparatorEx<?> getOrderComparator()
ordered(ComparatorEx)
.@Nonnull public Edge.RoutingPolicy getRoutingPolicy()
Edge.RoutingPolicy
in effect on the edge.@Nonnull public Edge local()
distributed()
,
distributeTo(Address)
,
getDistributedTo()
public boolean isLocal()
@Nonnull public Edge distributed()
A distributed edge allows all the data to be observed by all
the processors (using the BROADCAST
routing policy) and, more attractively, all the data with a given
partition ID to be observed by the same unique processor, regardless of
whether it is running on the local or a remote member (using the PARTITIONED
routing policy).
distributeTo(Address)
,
local()
,
getDistributedTo()
@Nonnull public Edge distributeTo(@Nonnull Address targetMember)
This option is most useful for sinks if we want to ensure that the results are written (or sent from) only that member.
It's not suitable for fault-tolerant jobs. If the targetMember
is not a member, the job can't be executed and will fail.
targetMember
- the member to deliver the items todistributed()
,
local()
,
getDistributedTo()
@Nullable public Address getDistributedTo()
local()
call)
distributed()
call)
distributeTo(com.hazelcast.cluster.Address)
call)
public boolean isDistributed()
distributed()
method.@Nullable public EdgeConfig getConfig()
EdgeConfig
instance associated with this edge.
Default value is null
.@Nonnull public Edge setConfig(@Nullable EdgeConfig config)
EdgeConfig
to this edge. If null
is supplied,
the edge will use JetConfig.getDefaultEdgeConfig()
.public void writeData(@Nonnull ObjectDataOutput out) throws IOException
DataSerializable
writeData
in interface DataSerializable
out
- outputIOException
- if an I/O error occurs. In particular,
an IOException
may be thrown if the
output stream has been closed.public void readData(@Nonnull ObjectDataInput in) throws IOException
DataSerializable
readData
in interface DataSerializable
in
- inputIOException
- if an I/O error occurs. In particular,
an IOException
may be thrown if the
input stream has been closed.public int getFactoryId()
IdentifiedDataSerializable
getFactoryId
in interface IdentifiedDataSerializable
public int getClassId()
IdentifiedDataSerializable
getClassId
in interface IdentifiedDataSerializable
Copyright © 2022 Hazelcast, Inc.. All rights reserved.