This manual is for an old version of Hazelcast Jet, use the latest stable version.

IMap and ICache readers

IMap and ICache Readers distribute the partitions to processors according to ownership of the partitions. Thus each processor will access data locally. Processors will iterate over entries and emit them as Map.Entry. The number of Hazelcast partitions should be configured to at least localParallelism * clusterSize, otherwise some processors will have no partitions assigned to them.

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.readMap(MAP_NAME));
    // ... other vertices
    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.readCache(CACHE_NAME));
    // ... other vertices

You can use IMap and ICache readers to fetch the entries from a remote Hazelcast cluster by configuring a ClientConfig.

    DAG dag = new DAG();
    ClientConfig clientConfig = new ClientConfig();
    // ... configure the client
    Vertex source = dag.newVertex("source", Sources.readMap(MAP_NAME, clientConfig));
    // ... other vertices
    DAG dag = new DAG();
    ClientConfig clientConfig = new ClientConfig();
    // ... configure the client
    Vertex source = dag.newVertex("source", Sources.readCache(CACHE_NAME, clientConfig));
    // ... other vertices

If the underlying map or cache is concurrently being modified, there are no guarantees given with respect to missing or duplicate items.

IList reader

Since IList is not a partitioned data structure, all elements from the list are emitted on a single member — the one where the entire list is stored.

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.readList(LIST_NAME));
    // ... other vertices

You can use IList reader to fetch the items from a remote Hazelcast cluster by configuring a ClientConfig.

    DAG dag = new DAG();
    ClientConfig clientConfig = new ClientConfig();
    // ... configure the client
    Vertex source = dag.newVertex("source", Sources.readList(LIST_NAME, clientConfig));
    // ... other vertices

File Reader

File Reader is a source that emits lines from files in a directory matching the supplied pattern (but not its subdirectories). You can pass * as the pattern to read all the files in the directory.

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.readFiles(DIRECTORY));
    // ... other vertices
    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.readFiles(DIRECTORY,
        StandardCharsets.UTF_8, PATTERN));
    // ... other vertices

The files must not change while being read; if they do, the behavior is unspecified. There will be no indication of which file a particular line comes from.

The same pathname should be available on all members, but it should not contain the same files. For example it should not resolve to a directory shared over the network.

Since this processor is file IO-intensive, local parallelism of the vertex should be set according to the performance characteristics of the underlying storage system. Typical values are in the range of 1 to 4. Multiple files are read in parallel; if just a single file is read, it is always read by single thread.

See the Access log analyzer sample for a fully working example.

File Streamer

File Streamer is a source that generates a stream of lines of text coming from files in the watched directory matching the supplied pattern (but not its subdirectories). You can pass * as the pattern to read all the files in the directory. It will pick up both newly created files and content appended to pre-existing files. It expects the file contents not to change once appended. There is no indication of
which file a particular line comes from.

The processor will scan pre-existing files for file sizes on startup and process them from that position. It will ignore the first line if the starting offset is not immediately after a newline character (it is assumed that another process is concurrently appending to the file).

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.streamFiles(DIRECTORY));
    // ... other vertices
    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.streamFiles(DIRECTORY,
        StandardCharsets.UTF_8, PATTERN));
    // ... other vertices

The same pathname should be available on all the members, but it should not contain the same files. For example it should not resolve to a directory shared over the network.

Since this processor is file IO-intensive, local parallelism of the vertex should be set according to the performance characteristics of the underlying storage system. Typical values are in the range of 1 to 4. If just a single file is read, it is always read by single thread.

When a change is detected, the file is opened, appended lines are read and file is closed. This process is repeated as necessary.

The processor completes when the directory is deleted. However, in order to delete the directory, all files in it must be deleted and if you delete a file that is currently being read from, the job may encounter an IOException. The directory must be deleted on all members. Any IOException will cause the job to fail.

Limitation on Windows

On Windows OS, the WatchService is not notified of the appended lines until the file is closed. If the writer keeps the file open while appending (which is typical), the processor may fail to observe the changes. It will be notified if any process tries to open that file, such as looking at the file in Explorer. This holds for Windows 10 with the NTFS file system and might change in future. You are advised to do your own testing on your target Windows platform.

Use the latest JRE

The underlying JDK API java.nio.file.WatchService has a history of unreliability and this processor may experience infinite blocking, missed, or duplicate events as a result. Such problems may be resolved by upgrading the JRE to the latest version.

See the Access stream analyzer sample for a fully working example.

Socket Streamer

Socket Streamer is a source which streams text read from a socket line by line. You can configure a Charset otherwise UTF-8 will be used as the default. Each processor instance will create a socket connection to the configured [host:port], so there will be clusterSize * localParallelism connections. The server should do the load-balancing.

The processors will complete when the socket is closed by the server. No reconnection is attempted.

    DAG dag = new DAG();
    Vertex source = dag.newVertex("source", Sources.streamSocket(HOST, PORT));
    // ... other vertices

See the Socket code sample for a fully working example.

HDFS Reader

HdfsProcessors.readHdfs() is used to read items from one or more HDFS files. The input is split according to the given InputFormat and read in parallel across all processor instances.

readHdfs() by default emits items of type Map.Entry<K,V>, where K and V are the parameters for the given InputFormat. It is possible to transform the records using an optional mapper parameter. For example, in the above example the output record type of TextInputFormat is Map.Entry<LongWritable, Text>. LongWritable represents the line number and Text the contents of the line. If you do not care about line numbers, and want your output as a plain String, you can do as follows:

Vertex source = dag.newVertex("source", HdfsProcessors.readHdfs(jobConf, (k, v) -> v.toString()));

With this change, readHdfs() will emit items of type String instead.

This source is part of the hazelcast-jet-hadoop module.

Cluster Co-location

The Jet cluster should be run on the same machines as the HDFS nodes for best read performance. If this is the case, each processor instance will try to read as much local data as possible. A heuristic algorithm is used to assign replicated blocks across the cluster to ensure a well-balanced work distribution between processor instances for maximum performance.

Kafka Streamer

KafkaProcessors.streamKafka() is used to consume items from one or more Apache Kafka topics. It uses the Kafka consumer API and consumer groups to distribute partitions among processors where each partition is consumed by a single processor at any given time. The reader emits items of type Map.Entry<K,V> where the key and the value are deserialized using the key/value deserializers configured in Kafka properties.

The partition count in Kafka should be at least as large as the total parallelism in the Jet cluster (localParallelism * clusterSize) to make sure that each processor will get some partitions assigned to it.

Internally, a KafkaConsumer is created per Processor instance using the supplied properties. All processors must be in the same consumer group supplied by the group.id property. It is required that the group.id is explicitly set by the user to a non-empty value. The supplied properties will be passed on to each KafkaConsumer instance. These processors are only terminated in case of an error or if the underlying job is cancelled.

streamKafka() forces the enable.auto.commit property to false and commits the current offsets after they have been fully emitted.

This vertex takes a Properties instance which will be forwarded to the underlying Kafka consumers as configuration. For example, this is how we configure it in our ConsumeKafka code sample:

Properties props = props(
        "group.id", "group-" + Math.random(),
        "bootstrap.servers", "localhost:9092",
        "key.deserializer", StringDeserializer.class.getCanonicalName(),
        "value.deserializer", IntegerDeserializer.class.getCanonicalName(),
        "auto.offset.reset", "earliest");

Vertex source = dag.newVertex("source", KafkaProcessors.streamKafka(props, "t1", "t2"));

This source is part of the hazelcast-jet-kafka module.