T
- the type of items coming out of this stagepublic interface BatchStage<T> extends GeneralStage<T>
pipeline
that will
observe a finite amount of data (a batch). It accepts input from its
upstream stages (if any) and passes its output to its downstream stages.Modifier and Type | Method and Description |
---|---|
<R> BatchStage<R> |
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all
the items it receives.
|
default <T1,R0,R1> BatchStage<Tuple2<R0,R1>> |
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Attaches a stage that co-aggregates the data from this and the supplied
stage by performing a separate aggregate operation on each and emitting
a
Tuple2 with their results. |
default <T1,R0,R1,OUT> |
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
DistributedBiFunction<? super R0,? super R1,? extends OUT> mapToOutputFn)
Attaches a stage that co-aggregates the data from this and the supplied
stage by performing a separate aggregate operation on each and then
passing both results to
mapToOutputFn , which transforms them
into the final output. |
<T1,R> BatchStage<R> |
aggregate2(BatchStage<T1> stage1,
AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all
the items it receives from both this stage and
stage1 you supply. |
default <T1,T2,R0,R1,R2> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
BatchStage<T2> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Attaches a stage that co-aggregates the data from this and the two
supplied stages by performing a separate aggregate operation on each and
emitting a
Tuple3 with their results. |
default <T1,T2,R0,R1,R2,OUT> |
aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0,
BatchStage<T1> stage1,
AggregateOperation1<? super T1,?,? extends R1> aggrOp1,
BatchStage<T2> stage2,
AggregateOperation1<? super T2,?,? extends R2> aggrOp2,
DistributedTriFunction<? super R0,? super R1,? super R2,? extends OUT> mapToOutputFn)
Attaches a stage that co-aggregates the data from this and the two
supplied stages by performing a separate aggregate operation on each and
then passing all three results to
mapToOutputFn , which
transforms them into the final output. |
<T1,T2,R> BatchStage<R> |
aggregate3(BatchStage<T1> stage1,
BatchStage<T2> stage2,
AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all
the items it receives from this stage as well as
stage1 and
stage2 you supply. |
default AggregateBuilder1<T> |
aggregateBuilder()
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
default <R0> AggregateBuilder<R0> |
aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Offers a step-by-step API to build a pipeline stage that co-aggregates
the data from several input stages.
|
<R> BatchStage<R> |
customTransform(String stageName,
DistributedSupplier<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default BatchStage<T> |
distinct()
Attaches a stage that emits just the items that are distinct according
to their definition of equality (
equals and hashCode ). |
BatchStage<T> |
filter(DistributedPredicate<T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> BatchStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
DistributedBiPredicate<? super C,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<R> BatchStage<R> |
flatMap(DistributedFunction<? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns. |
<C,R> BatchStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
DistributedBiFunction<? super C,? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all items from the
Traverser it returns as the output items. |
<K> BatchStageWithKey<T,K> |
groupingKey(DistributedFunction<? super T,? extends K> keyFn)
Specifies the function that will extract a key from the items in the
associated pipeline stage.
|
<K,T1_IN,T1,R> |
hashJoin(BatchStage<T1_IN> stage1,
JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
DistributedBiFunction<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage a hash-joining stage and
returns it.
|
<K1,K2,T1_IN,T2_IN,T1,T2,R> |
hashJoin2(BatchStage<T1_IN> stage1,
JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
BatchStage<T2_IN> stage2,
JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
DistributedTriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages a hash-joining stage and
returns it.
|
default HashJoinBuilder<T> |
hashJoinBuilder()
Returns a fluent API builder object to construct a hash join operation
with any number of contributing stages.
|
<R> BatchStage<R> |
map(DistributedFunction<? super T,? extends R> mapFn)
Attaches a mapping stage which applies the supplied function to each
input item independently and emits the function's result as the output
item.
|
<C,R> BatchStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
DistributedBiFunction<? super C,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the supplied function to each
input item independently and emits the function's result as the output
item.
|
default <K,V,R> BatchStage<R> |
mapUsingIMap(IMap<K,V> iMap,
DistributedBiFunction<? super IMap<K,V>,? super T,? extends R> mapFn)
Attaches a
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>) stage where the context is a
Hazelcast IMap . |
default <K,V,R> BatchStage<R> |
mapUsingIMap(String mapName,
DistributedBiFunction<? super IMap<K,V>,? super T,? extends R> mapFn)
Attaches a
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>) stage where the context is a
Hazelcast IMap with the supplied name. |
default <K,V,R> BatchStage<R> |
mapUsingReplicatedMap(ReplicatedMap<K,V> replicatedMap,
DistributedBiFunction<? super ReplicatedMap<K,V>,? super T,? extends R> mapFn)
Attaches a
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>) stage where the context is a
Hazelcast ReplicatedMap . |
default <K,V,R> BatchStage<R> |
mapUsingReplicatedMap(String mapName,
DistributedBiFunction<? super ReplicatedMap<K,V>,? super T,? extends R> mapFn)
Attaches a
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>) stage where the context is a
Hazelcast ReplicatedMap with the supplied name. |
BatchStage<T> |
merge(BatchStage<? extends T> other)
Attaches a stage that emits all the items from this stage as well as all
the items from the supplied stage.
|
default BatchStage<T> |
peek()
Adds a peeking layer to this compute stage which logs its output.
|
default BatchStage<T> |
peek(DistributedFunction<? super T,? extends CharSequence> toStringFn)
Adds a peeking layer to this compute stage which logs its output.
|
BatchStage<T> |
peek(DistributedPredicate<? super T> shouldLogFn,
DistributedFunction<? super T,? extends CharSequence> toStringFn)
Attaches a peeking stage which logs this stage's output and passes it
through without transformation.
|
<R> BatchStage<R> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
BatchStage<T> |
setLocalParallelism(int localParallelism)
Sets the preferred local parallelism (number of processors per Jet
cluster member) this stage will configure its DAG vertices with.
|
BatchStage<T> |
setName(String name)
Overrides the default name of the stage with the name you choose and
returns the stage.
|
addTimestamps, addTimestamps, drainTo
getPipeline, name
@Nonnull <K> BatchStageWithKey<T,K> groupingKey(@Nonnull DistributedFunction<? super T,? extends K> keyFn)
equals()
and hashCode()
.groupingKey
in interface GeneralStage<T>
K
- type of the keykeyFn
- function that extracts the grouping key@Nonnull <R> BatchStage<R> map(@Nonnull DistributedFunction<? super T,? extends R> mapFn)
GeneralStage
null
, it emits nothing. Therefore this
stage can be used to implement filtering semantics as well.map
in interface GeneralStage<T>
R
- the result type of the mapping functionmapFn
- a stateless mapping function@Nonnull BatchStage<T> filter(@Nonnull DistributedPredicate<T> filterFn)
GeneralStage
filter
in interface GeneralStage<T>
filterFn
- a stateless filter predicate function@Nonnull <R> BatchStage<R> flatMap(@Nonnull DistributedFunction<? super T,? extends Traverser<? extends R>> flatMapFn)
GeneralStage
Traverser
it returns. The traverser must be null-terminated.flatMap
in interface GeneralStage<T>
R
- the type of items in the result's traversersflatMapFn
- a stateless flatmapping function, whose result type is
Jet's Traverser
@Nonnull <C,R> BatchStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C,? super T,? extends R> mapFn)
GeneralStage
contextFactory
.
If the mapping result is null
, it emits nothing. Therefore this
stage can be used to implement filtering semantics as well.
mapUsingContext
in interface GeneralStage<T>
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C> BatchStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiPredicate<? super C,? super T> filterFn)
GeneralStage
contextFactory
.
filterUsingContext
in interface GeneralStage<T>
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C,R> BatchStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull DistributedBiFunction<? super C,? super T,? extends Traverser<? extends R>> flatMapFn)
GeneralStage
Traverser
it returns as the output items. The traverser must be
null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
flatMapUsingContext
in interface GeneralStage<T>
C
- type of context objectR
- the type of items in the result's traverserscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function, whose result type is
Jet's Traverser
@Nonnull default <K,V,R> BatchStage<R> mapUsingReplicatedMap(@Nonnull String mapName, @Nonnull DistributedBiFunction<? super ReplicatedMap<K,V>,? super T,? extends R> mapFn)
GeneralStage
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>)
stage where the context is a
Hazelcast ReplicatedMap
with the supplied name. The mapping
function will receive it as the first argument.mapUsingReplicatedMap
in interface GeneralStage<T>
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemmapName
- name of the ReplicatedMap
mapFn
- the mapping function@Nonnull default <K,V,R> BatchStage<R> mapUsingReplicatedMap(@Nonnull ReplicatedMap<K,V> replicatedMap, @Nonnull DistributedBiFunction<? super ReplicatedMap<K,V>,? super T,? extends R> mapFn)
GeneralStage
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>)
stage where the context is a
Hazelcast ReplicatedMap
. It is not necessarily the
map you provide here, but a replicated map with the same name
in the Jet cluster that executes the pipeline. The mapping function
will receive the replicated map as the first argument.mapUsingReplicatedMap
in interface GeneralStage<T>
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemreplicatedMap
- the ReplicatedMap
to use as contextmapFn
- the mapping function@Nonnull default <K,V,R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull DistributedBiFunction<? super IMap<K,V>,? super T,? extends R> mapFn)
GeneralStage
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>)
stage where the context is a
Hazelcast IMap
with the supplied name. The mapping function
will receive it as the first argument.mapUsingIMap
in interface GeneralStage<T>
K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function@Nonnull default <K,V,R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull DistributedBiFunction<? super IMap<K,V>,? super T,? extends R> mapFn)
GeneralStage
GeneralStage.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.DistributedBiFunction<? super C, ? super T, ? extends R>)
stage where the context is a
Hazelcast IMap
. It is not necessarily the map you
provide here, but a map with the same name in the Jet cluster
that executes the pipeline. The mapping function will receive the
replicated map as the first argument.mapUsingIMap
in interface GeneralStage<T>
K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the contextmapFn
- the mapping function@Nonnull <R> BatchStage<R> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
GeneralStage
{2, 7, 8, -5}
, the output will be {2, 9, 17, 12}
. The
number of input and output items is equal.
This stage is fault-tolerant and saves its state to the snapshot.
NOTE 1: since the output for each item depends on all
the previous items, this operation cannot be parallelized. Jet will
perform it on a single member, single-threaded. Jet also supports
keyed rolling aggregation
which it can parallelize by partitioning.
NOTE 2: if you plan to use an aggregate operation whose
result size grows with input size (such as toList
and your data
source is unbounded, you must carefully consider the memory demands this
implies. The result will keep growing forever.
rollingAggregate
in interface GeneralStage<T>
R
- result type of the aggregate operationaggrOp
- the aggregate operation to do the aggregation@Nonnull default BatchStage<T> distinct()
equals
and hashCode
).
There is no guarantee which one of equal items it will emit.@Nonnull BatchStage<T> merge(@Nonnull BatchStage<? extends T> other)
other
- the other stage whose data to merge into this one@Nonnull <K,T1_IN,T1,R> BatchStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull DistributedBiFunction<T,T1,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.hashJoin
in interface GeneralStage<T>
K
- the type of the join keyT1_IN
- the type of stage1
itemsT1
- the result type of projection on stage1
itemsR
- the resulting output typestage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output value@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> BatchStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull DistributedTriFunction<T,T1,T2,R> mapToOutputFn)
GeneralStage
package javadoc
for a detailed description of the hash-join transform.hashJoin2
in interface GeneralStage<T>
K1
- the type of key for stage1
K2
- the type of key for stage2
T1_IN
- the type of stage1
itemsT2_IN
- the type of stage2
itemsT1
- the result type of projection of stage1
itemsT2
- the result type of projection of stage2
itemsR
- the resulting output typestage1
- the first stage to joinjoinClause1
- specifies how to join with stage1
stage2
- the second stage to joinjoinClause2
- specifies how to join with stage2
mapToOutputFn
- function to map the joined items to the output value@Nonnull default HashJoinBuilder<T> hashJoinBuilder()
GeneralStage
stage.hashJoinN(...)
calls because they offer
more static type safety.hashJoinBuilder
in interface GeneralStage<T>
@Nonnull <R> BatchStage<R> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
R
- the type of the resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull <T1,R> BatchStage<R> aggregate2(@Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
stage1
you supply.
This variant requires you to provide a two-input aggregate operation
(refer to its Javadoc for a simple
example). If you can express your logic in terms of two single-input
aggregate operations, one for each input stream, then you should use
stage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler
API and you can use the already defined single-input operations. Use
this variant only when you have the need to implement an aggregate
operation that combines the input streams into the same
accumulator.
The returned stage emits a single item.
T1
- type of items in stage1
R
- type of the resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,R0,R1,OUT> BatchStage<OUT> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull DistributedBiFunction<? super R0,? super R1,? extends OUT> mapToOutputFn)
mapToOutputFn
, which transforms them
into the final output.
The returned stage emits a single item.
T1
- type of the items in the other stageR0
- type of the aggregated result for this stageR1
- type of the aggregated result for the other stageOUT
- the output item typeaggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stagemapToOutputFn
- function to apply to the aggregated results@Nonnull default <T1,R0,R1> BatchStage<Tuple2<R0,R1>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Tuple2
with their results.
The returned stage emits a single item.
T1
- type of the items in the other stageR0
- type of the aggregated result for this stageR1
- type of the aggregated result for the other stageaggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stage@Nonnull <T1,T2,R> BatchStage<R> aggregate3(@Nonnull BatchStage<T1> stage1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
stage1
and
stage2
you supply. This variant requires you to provide a
three-input aggregate operation (refer to its Javadoc for a simple example). If you can express
your logic in terms of two single-input aggregate operations, one for
each input stream, then you should use stage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because
it offers a simpler API and you can use the already defined single-input
operations. Use this variant only when you have the need to implement an
aggregate operation that combines the input streams into the same
accumulator.
The aggregating stage emits a single item.
T1
- type of items in stage1
T2
- type of items in stage2
R
- type of the resultaggrOp
- the aggregate operation to performAggregateOperations
@Nonnull default <T1,T2,R0,R1,R2,OUT> BatchStage<OUT> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2, @Nonnull DistributedTriFunction<? super R0,? super R1,? super R2,? extends OUT> mapToOutputFn)
mapToOutputFn
, which
transforms them into the final output.
The returned stage emits a single item.
T1
- type of the items in stage1
T2
- type of the items in stage2
R0
- type of the aggregated result for this stageR1
- type of the aggregated result for stage1
R2
- type of the aggregated result for stage2
OUT
- the output item typeaggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
mapToOutputFn
- function to apply to the aggregated results@Nonnull default <T1,T2,R0,R1,R2> BatchStage<Tuple3<R0,R1,R2>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Tuple3
with their results.
The returned stage emits a single item.
T1
- type of the items in stage1
T2
- type of the items in stage2
R0
- type of the aggregated result for this stageR1
- type of the aggregated result for stage1
R2
- type of the aggregated result for stage2
aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform on stage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform on stage2
@Nonnull default <R0> AggregateBuilder<R0> aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Map.Entry(key, itemsByTag)
. Use the tag
you get from builder.add(stageN, aggrOpN)
to retrieve the aggregated result for that stage. Use builder.tag0()
as the tag of this stage. You
will also be able to supply a function to the builder that immediately
transforms the ItemsByTag
to the desired output type.
This example counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:
BatchStage<Long> stage0 = p.drawFrom(source0);
BatchStage<Long> stage1 = p.drawFrom(source1);
BatchStage<Long> stage2 = p.drawFrom(source2);
AggregateBuilder<Long> b = stage0.aggregateBuilder(
AggregateOperations.counting());
Tag<Long> tag0 = b.tag0();
Tag<Long> tag1 = b.add(stage1,
AggregateOperations.summingLong(Number::longValue));
Tag<Double> tag2 = b.add(stage2,
AggregateOperations.averagingLong(Number::longValue));
BatchStage<ItemsByTag> aggregated = b.build();
aggregated.map(ibt -> String.format(
"Count of stage0: %d, sum of stage1: %d, average of stage2: %f",
ibt.get(tag0), ibt.get(tag1), ibt.get(tag2))
);
@Nonnull default AggregateBuilder1<T> aggregateBuilder()
This builder requires you to provide a multi-input aggregate operation.
If you can express your logic in terms of single-input aggregate
operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the
need to implement an aggregate operation that combines all the input
streams into the same accumulator.
This builder is mainly intended to build a co-aggregation of four or
more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type
safety.
To add the other stages, call add(stage)
.
Collect all the tags returned from add()
and use them when
building the aggregate operation. Retrieve the tag of the first stage
(from which you obtained this builder) by calling AggregateBuilder1.tag0()
.
This example takes three streams of strings and counts the distinct strings across all of them:
Pipeline p = Pipeline.create();
BatchStage<String> stage0 = p.drawFrom(source0);
BatchStage<String> stage1 = p.drawFrom(source1);
BatchStage<String> stage2 = p.drawFrom(source2);
AggregateBuilder1<String> b = stage0.aggregateBuilder();
Tag<String> tag0 = b.tag0();
Tag<String> tag1 = b.add(stage1);
Tag<String> tag2 = b.add(stage2);
BatchStage<Integer> aggregated = b.build(AggregateOperation
.withCreate(HashSet<String>::new)
.andAccumulate(tag0, (acc, item) -> acc.add(item))
.andAccumulate(tag1, (acc, item) -> acc.add(item))
.andAccumulate(tag2, (acc, item) -> acc.add(item))
.andCombine(HashSet::addAll)
.andFinish(HashSet::size));
@Nonnull default BatchStage<T> peek()
GeneralStage
toString()
method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
.
The stage logs each item on whichever cluster member it happens to
receive it. Its primary purpose is for development use, when running Jet
on a local machine.peek
in interface GeneralStage<T>
GeneralStage.peek(DistributedPredicate, DistributedFunction)
,
GeneralStage.peek(DistributedFunction)
@Nonnull BatchStage<T> peek(@Nonnull DistributedPredicate<? super T> shouldLogFn, @Nonnull DistributedFunction<? super T,? extends CharSequence> toStringFn)
GeneralStage
shouldLogFn
predicate to see whether to log the item
toStringFn
to get the item's string
representation
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
peek
in interface GeneralStage<T>
shouldLogFn
- a function to filter the logged items. You can use alwaysTrue()
as a pass-through filter when you don't need any
filtering.toStringFn
- a function that returns a string representation of the itemGeneralStage.peek(DistributedFunction)
,
GeneralStage.peek()
@Nonnull default BatchStage<T> peek(@Nonnull DistributedFunction<? super T,? extends CharSequence> toStringFn)
GeneralStage
toStringFn
to get a string representation of the item
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
peek
in interface GeneralStage<T>
toStringFn
- a function that returns a string representation of the itemGeneralStage.peek(DistributedPredicate, DistributedFunction)
,
GeneralStage.peek()
@Nonnull <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull DistributedSupplier<Processor> procSupplier)
GeneralStage
Processor
s.
Note that the returned stage's type parameter is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStage<T>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull BatchStage<T> setLocalParallelism(int localParallelism)
Stage
While most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.
The default value is -1 and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.
setLocalParallelism
in interface Stage
Copyright © 2018 Hazelcast, Inc.. All rights reserved.