Hazelcast Jet adds distributed java.util.stream
support for Hazelcast
IMap, ICache and IList data structures.
For extensive information about java.util.stream
API please refer to
the official javadocs.
Simple Example
JetInstance jet = Jet.newJetInstance();
IStreamMap<String, Integer> map = jet.getMap("latitudes");
map.put("London", 51);
map.put("Paris", 48);
map.put("NYC", 40);
map.put("Sydney", -34);
map.put("Sao Paulo", -23);
map.put("Jakarta", -6);
map.stream().filter(e -> e.getValue() < 0).forEach(System.out::println);
In addition to Hazelcast data structures any source processor can be used to create a distributed stream.
JetInstance jet = Jet.newJetInstance();
ProcessorSupplier processorSupplier = SourceProcessors.readFilesP("path", UTF_8, "*");
IList<String> sink = DistributedStream
.<String>fromSource(jet, ProcessorMetaSupplier.of(processorSupplier))
.flatMap(line -> Arrays.stream(line.split(" ")))
.collect(DistributedCollectors.toIList("sink"));
sink.forEach(System.out::println);
Java specifies that a stream computation starts upon invoking the
terminal operation on it (such as forEach()
). At that point Jet
converts the expression into a Core API DAG and submits it for
execution.
Distributed Collectors
Like with the functional interfaces, Jet also includes distributed
versions of the classes found in java.util.stream.Collectors
. These
can be reached from the
DistributedCollectors
utility class. However, keep in mind that the collectors such as
toMap()
, toCollection()
, toList()
, and toArray()
create a
local data structure and load all the results into it. This works fine
with the regular JDK streams, where everything is local, but usually
fails badly in the context of a distributed computing job.
For example the following innocent-looking code can easily cause out-of-memory errors because the whole distributed map will need to be held in the memory at a single place:
// get a distributed map with 5GB per member on a 10-member cluster
IStreamMap<String, String> map = jet.getMap("large_map");
// now try to build a HashMap of 50GB
Map<String, String> result = map.stream()
.map(e -> e.getKey() + e.getValue())
.collect(toMap(v -> v, v -> v));
This is why Jet distinguishes between the standard java.util.stream
collectors and the Jet-specific Reducer
s. A Reducer
puts the data
into a distributed data structure and knows how to leverage its native
partitioning scheme to optimize the access pattern across the cluster.
These are some of the Reducer
implementations provided in Jet:
-
toIMap()
: Writes the data to a new HazelcastIMap
. -
groupingByToIMap()
: Performs a grouping operation and then writes the results to a HazelcastIMap
. This uses a more efficient implementation than the standardgroupingBy()
collector and can make use of partitioning. -
toIList()
: Writes the data to a new HazelcastIList
.
A distributed data structure is cluster-managed, therefore you can't
just create one and forget about it; it will live on until you
explicitly destroy it. That means it's inappropriate to use as a part of
a data item inside a larger collection, a further consequence being that
a Reducer
is inappropriate as a downstream collector; that's where
the JDK-standard collectors make sense.
Word Count
The word count example that was described in the
Get Started section can be rewritten using the java.util.stream
API as follows:
IMap<String, Long> counts = lines
.stream()
.flatMap(word -> Stream.of(word.split("\\W+")))
.collect(DistributedCollectors.toIMap(w -> w, w -> 1L, (left, right) -> left + right));