This section explains the basics of the Hazelcast MapReduce framework. While walking through the different API classes, we will build the word count example that was discussed earlier and create it step by step.
The Hazelcast API for MapReduce operations consists of a fluent DSL-like configuration syntax to build and submit jobs. JobTracker
is the basic entry point to all MapReduce operations and is retrieved from com.hazelcast.core.HazelcastInstance
by calling getJobTracker
and supplying the name of the required JobTracker
configuration. The configuration for JobTracker
s will be discussed later; for now we focus on the API itself.
In addition, the complete submission part of the API is built to support a fully reactive way of programming.
To give an easy introduction to people used to Hadoop, we created the class names to be as familiar as possible to their counterparts on Hadoop. That means while most users will recognize a lot of similar sounding classes, the way to configure the jobs is more fluent due to the DSL-like styled API.
While building the example, we will go through as many options as possible, e.g., we will create a specialized JobTracker
configuration (at the end). Special JobTracker
configuration is not required, because for all other Hazelcast features you can use "default" as the configuration name. However, special configurations offer better options to predict behavior of the framework execution.
The full example is available here as a ready to run Maven project.
Retrieving a JobTracker Instance
JobTracker
creates Job instances, whereas every instance of com.hazelcast.mapreduce.Job
defines a single MapReduce configuration. The same Job can be submitted multiple times regardless of whether it is executed in parallel or after the previous execution is finished.
NOTE: After retrieving the JobTracker
, be aware that it should only be used with data structures derived from the same HazelcastInstance. Otherwise, you can get unexpected behavior.
To retrieve a JobTracker
from Hazelcast, we will start by using the "default" configuration for convenience reasons to show the basic way.
import com.hazelcast.mapreduce.*;
JobTracker jobTracker = hazelcastInstance.getJobTracker( "default" );
JobTracker
is retrieved using the same kind of entry point as most other Hazelcast features. After building the cluster connection, you use the created HazelcastInstance to request the configured (or default) JobTracker
from Hazelcast.
The next step will be to create a new Job
and configure it to execute our first MapReduce request against cluster data.
Creating a Job
As mentioned in the Retrieving a JobTracker Instance section above, you create a Job using the retrieved JobTracker
instance. A Job defines exactly one configuration of a MapReduce task. Mapper, combiner and reducers will be defined per job. However, since the Job instance is only a configuration, it can be submitted multiple times, regardless of whether executions happen in parallel or one after the other.
A submitted job is always identified using a unique combination of the JobTracker
's name and a jobId generated on submit-time. The way to retrieve the jobId will be shown in one of the later sections.
To create a Job, a second class com.hazelcast.mapreduce.KeyValueSource
is necessary. We will have a deeper look at the KeyValueSource
class in the next section. KeyValueSource
is used to wrap any kind of data or data structure into a well defined set of key-value pairs.
The example code below is a direct follow up to the example in the Retrieving a JobTracker Instance section, and it reuses the already created HazelcastInstance and JobTracker
instances.
The example starts by retrieving an instance of our data map, and then it creates the Job instance. Implementations used to configure the Job will be discussed while walking further through the API documentation.
NOTE: Since the Job class is highly dependent upon generics to support type safety, the generics change over time and may not be assignment compatible to old variable types. To make use of the full potential of the fluent API, we recommend you use fluent method chaining as shown in this example to prevent the need for too many variables.
IMap<String, String> map = hazelcastInstance.getMap( "articles" );
KeyValueSource<String, String> source = KeyValueSource.fromMap( map );
Job<String, String> job = jobTracker.newJob( source );
ICompletableFuture<Map<String, Long>> future = job
.mapper( new TokenizerMapper() )
.combiner( new WordCountCombinerFactory() )
.reducer( new WordCountReducerFactory() )
.submit();
// Attach a callback listener
future.andThen( buildCallback() );
// Wait and retrieve the result
Map<String, Long> result = future.get();
As seen above, we create the Job instance and define a mapper, combiner, and reducer. Then we submit the request to the cluster. The submit
method returns an ICompletableFuture that can be used to attach our callbacks or to wait for the result to be processed in a blocking fashion.
There are more options available for job configurations, such as defining a general chunk size or on what keys the operation will operate. For more information, please refer to the Hazelcast source code for Job.java.
Creating Key-Value Input Sources with KeyValueSource
KeyValueSource
can either wrap Hazelcast data structures (like IMap, MultiMap, IList, ISet) into key-value pair input sources, or build your own custom key-value input source. The latter option makes it possible to feed Hazelcast MapReduce with all kinds of data, such as just-in-time downloaded web page contents or data files. People familiar with Hadoop will recognize similarities with the Input class.
You can imagine a KeyValueSource
as a bigger java.util.Iterator
implementation. Whereas most methods must be implemented, implementing the getAllKeys
method is optional. If implementation is able to gather all keys upfront, it should be implemented and isAllKeysSupported
must return true
. That way, Job configured KeyPredicates are able to evaluate keys upfront before sending them to the cluster. Otherwise they are serialized and transferred as well, to be evaluated at execution time.
As shown in the example above, the abstract KeyValueSource
class provides a number of static methods to easily wrap Hazelcast data structures into KeyValueSource
implementations already provided by Hazelcast. The data structures' generics are inherited by the resulting KeyValueSource
instance. For data structures like IList or ISet, the key type is always String. While mapping, the key is the data structure's name, whereas
the value type and value itself are inherited from the IList or ISet itself.
// KeyValueSource from com.hazelcast.core.IMap
IMap<String, String> map = hazelcastInstance.getMap( "my-map" );
KeyValueSource<String, String> source = KeyValueSource.fromMap( map );
// KeyValueSource from com.hazelcast.core.MultiMap
MultiMap<String, String> multiMap = hazelcastInstance.getMultiMap( "my-multimap" );
KeyValueSource<String, String> source = KeyValueSource.fromMultiMap( multiMap );
// KeyValueSource from com.hazelcast.core.IList
IList<String> list = hazelcastInstance.getList( "my-list" );
KeyValueSource<String, String> source = KeyValueSource.fromList( list );
// KeyValueSource from com.hazelcast.core.ISet
ISet<String> set = hazelcastInstance.getSet( "my-set" );
KeyValueSource<String, String> source = KeyValueSource.fromSet( set );
PartitionIdAware
The com.hazelcast.mapreduce.PartitionIdAware
interface can be implemented by the KeyValueSource
implementation if the underlying data set is aware of the Hazelcast partitioning schema (as it is for all internal data structures). If this interface is implemented, the same KeyValueSource
instance is reused multiple times for all partitions on the cluster member. As a consequence, the close
and open
methods are also executed
multiple times but once per partitionId.
Implementing Mapping Logic with Mapper
Using the Mapper
interface, you will implement the mapping logic. Mappers can transform, split, calculate, and aggregate data from data sources. In Hazelcast you can also integrate data from more than the KeyValueSource data source by implementing com.hazelcast.core.HazelcastInstanceAware
and requesting additional maps, multimaps, list, and/or sets.
The mappers map
function is called once per available entry in the data structure. If you work on distributed data structures that operate in a partition-based fashion, multiple mappers work in parallel on the different cluster members on the members' assigned partitions. Mappers then prepare and maybe transform the input key-value pair and emit zero or more key-value pairs for the reducing phase.
For our word count example, we retrieve an input document (a text document) and we transform it by splitting the text into the available words. After that, as discussed in the pseudo code, we emit every single word with a key-value pair with the word as the key and 1 as the value.
A common implementation of that Mapper
might look like the following example:
public class TokenizerMapper implements Mapper<String, String, String, Long> {
private static final Long ONE = Long.valueOf( 1L );
@Override
public void map(String key, String document, Context<String, Long> context) {
StringTokenizer tokenizer = new StringTokenizer( document.toLowerCase() );
while ( tokenizer.hasMoreTokens() ) {
context.emit( tokenizer.nextToken(), ONE );
}
}
}
This code splits the mapped texts into their tokens, iterates over the tokenizer as long as there are more tokens, and emits a pair per word. Note that we're not yet collecting multiple occurrences of the same word, we just fire every word on its own.
LifecycleMapper / LifecycleMapperAdapter
The LifecycleMapper interface or its adapter class LifecycleMapperAdapter can be used to make the Mapper implementation lifecycle aware. That means it will be notified when mapping of a partition or set of data begins and when the last entry was mapped.
Only special algorithms might need those additional lifecycle events to prepare, clean up, or emit additional values.
Minimizing Cluster Traffic with Combiner
As stated in the introduction, a Combiner is used to minimize traffic between the different cluster members when transmitting mapped values from mappers to the reducers. It does this by aggregating multiple values for the same emitted key. This is a fully optional operation, but using it is highly recommended.
Combiners can be seen as an intermediate reducer. The calculated value is always assigned back to the key for which the combiner initially was created. Since combiners are created per emitted key, the Combiner implementation itself is not defined in the jobs configuration; instead, a CombinerFactory that is able to create the expected Combiner instance is created.
Because Hazelcast MapReduce is executing the mapping and reducing phases in parallel, the Combiner implementation must be able to deal with chunked data. Therefore, you must reset its internal state whenever you call finalizeChunk
. Calling the finalizeChunk
method creates a chunk of intermediate data to be grouped (shuffled) and sent to the reducers.
Combiners can override beginCombine
and finalizeCombine
to perform preparation or cleanup work.
For our word count example, we are going to have a simple CombinerFactory and Combiner implementation similar to the following example.
public class WordCountCombinerFactory
implements CombinerFactory<String, Long, Long> {
@Override
public Combiner<Long, Long> newCombiner( String key ) {
return new WordCountCombiner();
}
private class WordCountCombiner extends Combiner<Long, Long> {
private long sum = 0;
@Override
public void combine( Long value ) {
sum++;
}
@Override
public Long finalizeChunk() {
return sum;
}
@Override
public void reset() {
sum = 0;
}
}
}
The Combiner must be able to return its current value as a chunk and reset the internal state by setting sum
back to 0. Since combiners are always called from a single thread, no synchronization or volatility of the variables is necessary.
Doing Algorithm Work with Reducer
Reducers do the last bit of algorithm work. This can be aggregating values, calculating averages, or any other work that is expected from the algorithm.
Since values arrive in chunks, the reduce
method is called multiple times for every emitted value of the creation key. This also can happen multiple times per chunk if no Combiner implementation was configured for a job configuration.
Unlike combiners, a reducer's finalizeReduce
method is only called once per reducer (which means once per key). Therefore, a reducer does not need to reset its internal state at any time.
Reducers can override beginReduce
to perform preparation work.
For our word count example, the implementation will look similar to the following code example.
public class WordCountReducerFactory implements ReducerFactory<String, Long, Long> {
@Override
public Reducer<Long, Long> newReducer( String key ) {
return new WordCountReducer();
}
private class WordCountReducer extends Reducer<Long, Long> {
private volatile long sum = 0;
@Override
public void reduce( Long value ) {
sum += value.longValue();
}
@Override
public Long finalizeReduce() {
return sum;
}
}
}
Reducers Switching Threads
Unlike combiners, reducers tend to switch threads if running out of data to prevent blocking threads from the JobTracker
configuration. They are rescheduled at a later point when new data to be processed arrives, but are unlikely to be executed on the same thread as before. As of Hazelcast version 3.3.3 the guarantee for memory visibility on the new thread is ensured by the framework. This means the previous requirement for making fields volatile is dropped.
Modifying the Result with Collator
A Collator is an optional operation that is executed on the job emitting member and is able to modify the finally reduced result before returned to the user's codebase. Only special use cases are likely to use collators.
For an imaginary use case, we might want to know how many words were all over in the documents we analyzed. For this case, a Collator implementation can be given to the submit
method of the Job instance.
A collator would look like the following snippet:
public class WordCountCollator implements Collator<Map.Entry<String, Long>, Long> {
@Override
public Long collate( Iterable<Map.Entry<String, Long>> values ) {
long sum = 0;
for ( Map.Entry<String, Long> entry : values ) {
sum += entry.getValue().longValue();
}
return sum;
}
}
The definition of the input type is a bit strange, but because Combiner and Reducer implementations are optional, the input type heavily depends on the state of the data. As stated above, collators are non-typical use cases and the generics of the framework always help in finding the correct signature.
Preselecting Keys with KeyPredicate
You can use KeyPredicate
to pre-select whether or not a key should be selected for mapping in the mapping phase. If the KeyValueSource
implementation is able to know all keys prior to execution, the keys are filtered before the operations are divided among the different cluster members.
A KeyPredicate
can also be used to select only a special range of data, e.g., a time frame, or in similar use cases.
A basic KeyPredicate
implementation that only maps keys containing the word "hazelcast" might look like the following code example:
public class WordCountKeyPredicate implements KeyPredicate<String> {
@Override
public boolean evaluate( String s ) {
return s != null && s.toLowerCase().contains( "hazelcast" );
}
}
Job Monitoring with TrackableJob
You can retrieve a TrackableJob
instance after submitting a job. It is requested from the JobTracker
using the unique jobId (per JobTracker
). You can use it get runtime statistics of the job. The information available is limited to the number of processed (mapped) records and the processing state of the different partitions or members (if KeyValueSource
is not PartitionIdAware).
To retrieve the jobId after submission of the job, use com.hazelcast.mapreduce.JobCompletableFuture
instead of the com.hazelcast.core.ICompletableFuture
as the variable type for the returned future.
The example code below gives a quick introduction on how to retrieve the instance and the runtime data. For more information, please have a look at the Javadoc corresponding your running Hazelcast version.
The example performs the following steps to get the job instance.
- It gets the map with the hazelcastInstance
getMap
method. - From the map, it gets the source with the KeyValueSource
fromMap
method. - From the source, it gets a job with the JobTracker
newJob
method.
IMap<String, String> map = hazelcastInstance.getMap( "articles" );
KeyValueSource<String, String> source = KeyValueSource.fromMap( map );
Job<String, String> job = jobTracker.newJob( source );
JobCompletableFuture<Map<String, Long>> future = job
.mapper( new TokenizerMapper() )
.combiner( new WordCountCombinerFactory() )
.reducer( new WordCountReducerFactory() )
.submit();
String jobId = future.getJobId();
TrackableJob trackableJob = jobTracker.getTrackableJob(jobId);
JobProcessInformation stats = trackableJob.getJobProcessInformation();
int processedRecords = stats.getProcessedRecords();
log( "ProcessedRecords: " + processedRecords );
JobPartitionState[] partitionStates = stats.getPartitionStates();
for ( JobPartitionState partitionState : partitionStates ) {
log( "PartitionOwner: " + partitionState.getOwner()
+ ", Processing state: " + partitionState.getState().name() );
}
NOTE: Caching of the JobProcessInformation does not work on Java native clients since current values are retrieved while retrieving the instance to minimize traffic between executing member and client.
Configuring JobTracker
You configure JobTracker
configuration to set up behavior of the Hazelcast MapReduce framework.
Every JobTracker
is capable of running multiple MapReduce jobs at once; one configuration is meant as a shared resource for all jobs created by the same JobTracker
. The configuration gives full control over the expected load behavior and thread counts to be used.
The following snippet shows a typical JobTracker
configuration. The configuration properties are discussed below the example.
<jobtracker name="default">
<max-thread-size>0</max-thread-size>
<!-- Queue size 0 means number of partitions * 2 -->
<queue-size>0</queue-size>
<retry-count>0</retry-count>
<chunk-size>1000</chunk-size>
<communicate-stats>true</communicate-stats>
<topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
</jobtracker>
- max-thread-size: Maximum thread pool size of the JobTracker.
- queue-size: Maximum number of tasks that are able to wait to be processed. A value of 0 means an unbounded queue. Very low numbers can prevent successful execution since the job might not be correctly scheduled or intermediate chunks might be lost.
- retry-count: Currently not used. Reserved for later use where the framework will automatically try to restart/retry operations from an available save point.
- chunk-size: Number of emitted values before a chunk is sent to the reducers. If your emitted values are big or you want to better balance your work, you might want to change this to a lower or higher value. A value of 0 means immediate transmission, but remember that low values mean higher traffic costs. A very high value might cause an OutOfMemoryError to occur if the emitted values do not fit into heap memory before being sent to the reducers. To prevent this, you might want to use a combiner to pre-reduce values on mapping members.
- communicate-stats: Specifies whether the statistics (for example, statistics about processed entries) are transmitted to the job emitter. This can show progress to a user inside of an UI system, but it produces additional traffic. If not needed, you might want to deactivate this.
-
topology-changed-strategy: Specifies how the MapReduce framework reacts on topology changes while executing a job. Currently, only CANCEL_RUNNING_OPERATION is fully supported, which throws an exception to the job emitter (will throw a
com.hazelcast.mapreduce.TopologyChangedException
).