This document is for an old version of the former Hazelcast IMDG product.

We've combined the in-memory storage of IMDG with the stream processing power of Jet to bring you an all new platform: Hazelcast 5.0

Use the following links to try it:

Hazelcast Jet adds distributed java.util.stream support for Hazelcast IMap, ICache and IList data structures.

For extensive information about java.util.stream API please refer to the official javadocs.

Simple Example

JetInstance jet = Jet.newJetInstance();
IStreamMap<String, Integer> map = jet.getMap("latitudes");
map.put("London", 51);
map.put("Paris", 48);
map.put("NYC", 40);
map.put("Sydney", -34);
map.put("Sao Paulo", -23);
map.put("Jakarta", -6);

map.stream().filter(e -> e.getValue() < 0).forEach(System.out::println);

In addition to Hazelcast data structures any source processor can be used to create a distributed stream.

JetInstance jet = Jet.newJetInstance();
ProcessorSupplier processorSupplier = SourceProcessors.readFilesP("path", UTF_8, "*");

IList<String> sink = DistributedStream
        .<String>fromSource(jet, ProcessorMetaSupplier.of(processorSupplier))
        .flatMap(line -> Arrays.stream(line.split(" ")))
        .collect(DistributedCollectors.toIList("sink"));


sink.forEach(System.out::println);

Java specifies that a stream computation starts upon invoking the terminal operation on it (such as forEach()). At that point Jet converts the expression into a Core API DAG and submits it for execution.

Distributed Collectors

Like with the functional interfaces, Jet also includes distributed versions of the classes found in java.util.stream.Collectors. These can be reached from the DistributedCollectors utility class. However, keep in mind that the collectors such as toMap(), toCollection(), toList(), and toArray() create a local data structure and load all the results into it. This works fine with the regular JDK streams, where everything is local, but usually fails badly in the context of a distributed computing job.

For example the following innocent-looking code can easily cause out-of-memory errors because the whole distributed map will need to be held in the memory at a single place:

// get a distributed map with 5GB per member on a 10-member cluster
IStreamMap<String, String> map = jet.getMap("large_map");
// now try to build a HashMap of 50GB
Map<String, String> result = map.stream()
                                .map(e -> e.getKey() + e.getValue())
                                .collect(toMap(v -> v, v -> v));

This is why Jet distinguishes between the standard java.util.stream collectors and the Jet-specific Reducers. A Reducer puts the data into a distributed data structure and knows how to leverage its native partitioning scheme to optimize the access pattern across the cluster.

These are some of the Reducer implementations provided in Jet:

  • toIMap(): Writes the data to a new Hazelcast IMap.
  • groupingByToIMap(): Performs a grouping operation and then writes the results to a Hazelcast IMap. This uses a more efficient implementation than the standard groupingBy() collector and can make use of partitioning.
  • toIList(): Writes the data to a new Hazelcast IList.

A distributed data structure is cluster-managed, therefore you can't just create one and forget about it; it will live on until you explicitly destroy it. That means it's inappropriate to use as a part of a data item inside a larger collection, a further consequence being that a Reducer is inappropriate as a downstream collector; that's where the JDK-standard collectors make sense.

Word Count

The word count example that was described in the Get Started section can be rewritten using the java.util.stream API as follows:

IMap<String, Long> counts = lines
                .stream()
                .flatMap(word -> Stream.of(word.split("\\W+")))
                .collect(DistributedCollectors.toIMap(w -> w, w -> 1L, (left, right) -> left + right));