This section explains some of the internals of the MapReduce framework. This is more advanced information. If you're not interested in how it works internally, you might want to skip this section.
Member Interoperation Example
To understand the following technical internals, we first have a short look at what happens in terms of an example workflow.
As a simple example, think of an IMap<String, Integer>
and emitted keys having the same types. Imagine you have a cluster with three members, and you initiate the MapReduce job on the first member. After you requested the JobTracker from your running/connected Hazelcast, we submit the task and retrieve the ICompletableFuture, which gives us a chance to wait for the result to be calculated or to add a callback (and being more reactive).
The example expects that the chunk size is 0 or 1, so an emitted value is directly sent to the reducers. Internally, the job is prepared, started, and executed on all members as shown below. The first member acts as the job owner (job emitter).
Member1 starts MapReduce job
Member1 emits key=Foo, value=1
Member1 does PartitionService::getKeyOwner(Foo) => results in Member3
Member2 emits key=Foo, value=14
Member2 asks jobOwner (Member1) for keyOwner of Foo => results in Member3
Member1 sends chunk for key=Foo to Member3
Member3 receives chunk for key=Foo and looks if there is already a Reducer,
if not creates one for key=Foo
Member3 processes chunk for key=Foo
Member2 sends chunk for key=Foo to Member3
Member3 receives chunk for key=Foo and looks if there is already a Reducer and uses
the previous one
Member3 processes chunk for key=Foo
Member1 send LastChunk information to Member3 because processing local values finished
Member2 emits key=Foo, value=27
Member2 has cached keyOwner of Foo => results in Member3
Member2 sends chunk for key=Foo to Member3
Member3 receives chunk for key=Foo and looks if there is already a Reducer and uses
the previous one
Member3 processes chunk for key=Foo
Member2 send LastChunk information to Member3 because processing local values finished
Member3 finishes reducing for key=Foo
Member1 registers its local partitions are processed
Member2 registers its local partitions are processed
Member1 sees all partitions processed and requests reducing from all members
Member1 merges all reduced results together in a final structure and returns it
The flow is quite complex but extremely powerful since everything is executed in parallel. Reducers do not wait until all values are emitted, but they immediately begin to reduce (when the first chunk for an emitted key arrives).
Internal MapReduce Packages
Beginning with the package level, there is one basic package: com.hazelcast.mapreduce
. This includes the external API and the impl package, which itself contains the internal implementation.
- The impl package contains all the default
KeyValueSource
implementations and abstract base and support classes for the exposed API. - The client package contains all classes that are needed on the client and member sides when a client offers a MapReduce job.
- The notification package contains all "notification" or event classes that notify other members about progress on operations.
- The operation package contains all operations that are used by the workers or job owner to coordinate work and sync partition or reducer processing.
- The task package contains all classes that execute the actual MapReduce operation. It features the supervisor, mapping phase implementation, and mapping and reducing tasks.
MapReduce Job Walk-Through
Now to the technical walk-through: A MapReduce Job is always retrieved from a named JobTracker
, which is implemented in NodeJobTracker
(extends AbstractJobTracker
) and is configured using the configuration DSL. All of the internal implementation is completely ICompletableFuture-driven and mostly non-blocking in design.
On submit, the Job creates a unique UUID which afterwards acts as a jobId and is combined with the JobTracker's name to be uniquely identifiable inside the cluster. Then, the preparation is sent around the cluster and every member prepares its execution by creating a JobSupervisor, MapCombineTask, and ReducerTask. The job-emitting JobSupervisor gains special capabilities to synchronize and control JobSupervisors on other members for the same job.
If preparation is finished on all members, the job itself is started by executing a StartProcessingJobOperation on every member. This initiates a MappingPhase implementation (defaults to KeyValueSourceMappingPhase) and starts the actual mapping on the members.
The mapping process is currently a single threaded operation per member, but will be extended to run in parallel on multiple partitions (configurable per Job) in future versions. The Mapper is now called on every available value on the partition and eventually emits values. For every emitted value, either a configured CombinerFactory is called to create a Combiner or a cached one is used (or the default CollectingCombinerFactory is used to create Combiners). When the chunk limit is reached on a member, a IntermediateChunkNotification is prepared by collecting emitted keys to their corresponding members. This is either done by asking the job owner to assign members or by an already cached assignment. In later versions, a PartitionStrategy might also be configurable.
The IntermediateChunkNotification is then sent to the reducers (containing only values for this member) and is offered to the ReducerTask. On every offer, the ReducerTask checks if it is already running and if not, it submits itself to the configured ExecutorService (from the JobTracker configuration).
If reducer queue runs out of work, the ReducerTask is removed from the ExecutorService to not block threads but eventually will be resubmitted on next chunk of work.
On every phase, the partition state is changed to keep track of the currently running operations. A JobPartitionState can be in one of the following states with self-explanatory titles: [WAITING, MAPPING, REDUCING, PROCESSED, CANCELLED]
. If you have a deeper interest of these states, look at the Javadoc.
- Member asks for new partition to process: WAITING => MAPPING
- Member emits first chunk to a reducer: MAPPING => REDUCING
- All members signal that they finished mapping phase and reducing is finished, too: REDUCING => PROCESSED
Eventually, all JobPartitionStates reach the state of PROCESSED. Then, the job emitter's JobSupervisor asks all members for their reduced results and executes a potentially offered Collator. With this Collator, the overall result is calculated before it removes itself from the JobTracker, doing some final cleanup and returning the result to the requester (using the internal TrackableJobFuture).
If a job is cancelled while execution, all partitions are immediately set to the CANCELLED state and a CancelJobSupervisorOperation is executed on all members to kill the running processes.
While the operation is running in addition to the default operations, some more operations like ProcessStatsUpdateOperation (updates processed records statistics) or NotifyRemoteExceptionOperation (notifies the members that the sending member encountered an unrecoverable situation and the Job needs to be cancelled, e.g., NullPointerException inside of a Mapper, are executed against the job owner to keep track of the process.