The Hadoop Distributed File System is a production-worthy choice for both a data source and sink in a batch computation job. It is a distributed, replicated storage system that handles these concerns automatically, exposing a simple unified view to the client.
The HDFS source and sink require a configuration object of type
JobConf
which supplies the input and output paths and formats. No actual
MapReduce job is created, this config is simply used to describe the
required inputs and outputs. The same JobConf
instance can be shared
between the source and the sink.
JobConf jobConfig = new JobConf();
jobConfig.setInputFormat(TextInputFormat.class);
jobConfig.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(jobConfig, "output-path");
TextInputFormat.addInputPath(jobConfig, "input-path");
The word count pipeline can then be expressed using HDFS as follows
Pipeline p = Pipeline.create();
p.drawFrom(HdfsSources.hdfs(jobConfig, (k, v) -> v.toString()))
.flatMap(line -> traverseArray(delimiter.split(line.toLowerCase())).filter(w -> !w.isEmpty()))
.groupBy(wholeItem(), counting())
.drainTo(HdfsSinks.hdfs(jobConfig));
Data Locality When Reading
Jet will split the input data across the cluster, with each processor instance reading a part of the input. If the Jet nodes are running along the HDFS datanodes, then Jet can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read speed.
Output
Each processor will write to a different file in the output folder identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for HDFS in the future to have better streaming support.
Dealing with Writables
Hadoop types implement their own serialization mechanism through the use
of Writable.
Jet provides an adapter to register a Writable
for
Hazelcast serialization
without having to write additional serialization code. To use this
adapter, you can register your own Writable
types by extending
WritableSerializerHook
and
registering the hook.
Hadoop JARs and Classpath
When submitting JARs along with a Job, sending Hadoop JARs should be avoided and instead Hadoop JARs should be present on the classpath of the running members. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.