Hazelcast Jet adds distributed
java.util.stream support for Hazelcast
IMap and IList data structures.
For extensive information about
java.util.stream API please refer to
the official javadocs.
JetInstance jet = Jet.newJetInstance(); IStreamMap<String, Integer> map = jet.getMap("latitudes"); map.put("London", 51); map.put("Paris", 48); map.put("NYC", 40); map.put("Sydney", -34); map.put("Sao Paulo", -23); map.put("Jakarta", -6);
map.stream().filter(e -> e.getValue() < 0).forEach(System.out::println);
Serializable Lambda Functions
By default, the functional interfaces which were added to
java.util.function are not serializable. In a distributed system, the
defined lambdas need to be serialized and sent to the other members. Jet
includes the serializable version of all the interfaces found in the
java.util.function which can be accessed using the
Like with the functional interfaces, Jet also includes the distributed
versions of the classes found in
can be reached via
However, keep in mind that the collectors such as
toArray() create a local data
structure and load all the results into it. This works fine with the
regular JDK streams, where everything is local, but usually fails badly
in the context of a distributed computing job.
For example the following innocent-looking code can easily cause out-of-memory errors because the whole distributed map will need to be held in the memory at a single place:
// get a distributed map with 5GB per member on a 10-member cluster IStreamMap<String, String> map = jet.getMap("large_map"); // now try to build a HashMap of 50GB Map<String, String> result = map.stream() .map(e -> e.getKey() + e.getValue()) .collect(toMap(v -> v, v -> v));
This is why Jet distinguishes between the standard
collectors and the Jet-specific
Reducer puts the data
into a distributed data structure and knows how to leverage its native
partitioning scheme to optimize the access pattern across the cluster.
These are some of the
Reducer implementations provided in Jet:
toIMap(): Writes the data to a new Hazelcast
groupingByToIMap(): Performs a grouping operation and then writes the results to a Hazelcast
IMap. This uses a more efficient implementation than the standard
groupingBy()collector and can make use of partitioning.
toIList(): Writes the data to a new Hazelcast
A distributed data structure is cluster-managed, therefore you can't
just create one and forget about it; it will live on until you
explicitly destroy it. That means it's inappropriate to use as a part of
a data item inside a larger collection, a further consequence being that
Reducer is inappropriate as a downstream collector; that's where
the JDK-standard collectors make sense.
The word count example that was described in the
Hazelcast Jet 101 chapter can be rewritten
java.util.stream API as follows:
IMap<String, Long> counts = lines .stream() .flatMap(m -> Stream.of(PATTERN.split(m.getValue().toLowerCase()))) .collect(DistributedCollectors.toIMap(w -> w, w -> 1L, (left, right) -> left + right));
java.util.stream implementation will automatically convert a
stream into a
DAG when one of the terminal methods are called. The DAG
creation is done lazily, and only if a terminal method is called.
The following DAG will be compiled as follows:
IStreamMap<String, Integer> ints = jet.getMap("ints"); int result = ints.stream().map(Entry::getValue) .reduce(0, (l, r) -> l + r);