This manual is for an old version of Hazelcast Jet, use the latest stable version.

Let's say we want to write a simple source, which generates numbers from 0 to 1,000,000 (exclusive). It is trivial to write a single Processor which can do this using java.util.stream and Traverser.

public static class NumberGenerator extends AbstractProcessor {

    private final Traverser<Integer> traverser;

    public NumberGenerator(int limit) {
        traverser = traverseStream(IntStream.range(0, limit).boxed());
    }

    @Override
    public boolean complete() {
        return emitCooperatively(traverser);
    }
}

We will also add a simple logging processor, so we can see what values are generated and received:

public static class PeekProcessor extends AbstractProcessor {
    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        System.out.println("Received number: " + item);
        emit(item);
        return true;
    }
}

You can then build your DAG as follows and execute it:

final int limit = 10;
dag.newVertex("number-generator", () -> new NumberGenerator(limit));
// Vertex logger = dag.newVertex("logger-vertex", PeekProcessor::new);
dag.edge(Edge.between(generator, logger));

jet.newJob(dag).execute().get();

When you run this code, you will see the output as below:

Received number: 4
Received number: 0
Received number: 3
Received number: 2
Received number: 2
Received number: 2

Since we are using the default parallelism setting on this vertex, several instances of this processor are created, all of which will be generating the same sequence of values.

What we actually want is to generate a subset of the values for each processor. We will start by partitioning the data according to its remainder, when divided by the total number of processors. For example, if we have 8 processors, numbers which have a remainder of 1 when divided by 8, will go to processor with index 1.

To do this, we need to implement the ProcessorSupplier API:

static class NumberGeneratorSupplier implements ProcessorSupplier {

    private final int limit;

    public NumberGeneratorSupplier(int limit) {
        this.limit = limit;
    }

    @Nonnull
    @Override
    public List<? extends Processor> get(int count) {
        // each processor is responsible for a subset of the numbers.
        return IntStream.range(0, count)
                        .mapToObj(index ->
                            new NumberGenerator(IntStream.range(0, limit)
                                                         .filter(n -> n % count == index))
                        )
                        .collect(Collectors.toList());
    }
}

static class NumberGenerator extends AbstractProcessor {

    private final Traverser<Integer> traverser;

    public NumberGenerator(IntStream stream) {
        traverser = traverseStream(stream.boxed());
    }

    @Override
    public boolean complete() {
        return emitCooperatively(traverser);
    }
}

With this approach, each instance of processor will only generate a subset of the numbers, with each instance generating the numbers where the remainder of the number divided count matches the index of the processor.

If we add another member to the cluster, we will quickly see that both members are generating the same sequence of numbers. We need to distribute the work across the cluster, by making sure that each member will generate a subset of the numbers. We will again follow a similar approach as above, but now we need to be aware of the global index for each Processor instance.

To achieve this, we need to implement a custom ProcessorMetaSupplier. A ProcessorMetaSupplier is called from a single coordinator member, and creates one ProcessorSupplier for each member. The main partition allocation thus can be done by the ProcessorMetaSupplier. Our distributed number generator source could then look as follows:

static class NumberGeneratorMetaSupplier implements ProcessorMetaSupplier {

    private final int limit;

    private transient int totalParallelism;
    private transient int localParallelism;

    NumberGeneratorMetaSupplier(int limit) {
        this.limit = limit;
    }

    @Override
    public void init(@Nonnull Context context) {
        totalParallelism = context.totalParallelism();
        localParallelism = context.localParallelism();
    }


    @Override @Nonnull
    public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
        Map<Address, ProcessorSupplier> map = new HashMap<>();
        for (int i = 0; i < addresses.size(); i++) {
            Address address = addresses.get(i);
            int start = i * localParallelism;
            int end = (i + 1) * localParallelism;
            int mod = totalParallelism;
            map.put(address, count -> range(start, end)
                    .mapToObj(index -> new NumberGenerator(range(0, limit).filter(f -> f % mod == index)))
                    .collect(toList())
            );
        }
        return map::get;
    }
}

The vertex creation can then be updated as follows:

Vertex generator = dag.newVertex("number-generator", new NumberGeneratorMetaSupplier(limit));