Package com.hazelcast.jet.pipeline

package com.hazelcast.jet.pipeline
The Pipeline API is Jet's high-level API to build and execute distributed computation jobs. It models the computation using an analogy with a system of interconnected water pipes. The data flows from the pipeline's sources to its sinks. Pipes can bifurcate and merge, but there can't be any closed loops (cycles).

The basic element is a pipeline stage which can be attached to one or more other stages, both in the upstream and the downstream direction. A pipeline accepts the data coming from its upstream stages, transforms it, and directs the resulting data to its downstream stages.

Kinds of transformation performed by pipeline stages


Basic transformations have a single upstream pipeline and statelessly transform individual items in it. Examples are map, filter, and flatMap.

Grouping and aggregation

The aggregate*() transformations perform an aggregate operation on a set of items. You can call stage.groupingKey() to group the items by a key and then Jet will aggregate each group separately. For stream stages you must specify a stage.window() which will transform the infinite stream into a series of finite windows. If you specify more than one input stage for the aggregation (using stage.aggregate2(), stage.aggregate3() or stage.aggregateBuilder(), the data from all streams will be combined into the aggregation result. The AggregateOperation you supply must define a separate accumulate primitive for each contributing stream. Refer to its Javadoc for further details.


The hash-join is a joining transform designed for the use case of data enrichment with static data. It is an asymmetric join that joins the enriching stage(s) to the primary stage. The enriching stages must be batch stages — they must represent finite datasets. The primary stage may be either a batch or a stream stage.

You must provide a separate pair of functions for each of the enriching stages: one to extract the key from the primary item and one to extract it from the enriching item. For example, you can join a Trade with a Broker on trade.getBrokerId() == broker.getId() and a Product on trade.getProductId() == product.getId(), and all this can happen in a single hash-join transform.

The hash-join transform is optimized for throughput — each cluster member materializes a local copy of all the enriching data, stored in hashtables (hence the name). It consumes the enriching streams in full before ingesting any data from the primary stream.

The output of hashJoin is just like an SQL left outer join: for each primary item there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, the output will have a null instead of the enriching item.

If you need SQL inner join, then you can use the specialised innerHashJoin function, in which for each primary item with at least one match, there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, there will be no records with the given primary item. In this case the output function's arguments are always non-null.

The join also allows duplicate keys on both enriching and primary inputs: the output is a cartesian product of all the matching entries.


 |     Primary input      | Enriching input |          Output           |
 | Trade{ticker=AA,amt=1} | Ticker{id=AA}   | Tuple2{                   |
 | Trade{ticker=BB,amt=2} | Ticker{id=BB}   |   Trade{ticker=AA,amt=1}, |
 | Trade{ticker=AA,amt=3} |                 |   Ticker{id=AA}           |
 |                        |                 | }                         |
 |                        |                 | Tuple2{                   |
 |                        |                 |   Trade{ticker=BB,amt=2}, |
 |                        |                 |   Ticker{id=BB}           |
 |                        |                 | }                         |
 |                        |                 | Tuple2{                   |
 |                        |                 |   Trade{ticker=AA,amt=3}, |
 |                        |                 |   Ticker{id=AA}           |
 |                        |                 | }                         |
Jet 3.0