Let's say we want to write a simple source, which generates numbers from
0 to 1,000,000 (exclusive). It is trivial to write a single Processor
which can do this using java.util.stream
and Traverser
.
public static class NumberGenerator extends AbstractProcessor {
private final Traverser<Integer> traverser;
public NumberGenerator(int limit) {
traverser = traverseStream(IntStream.range(0, limit).boxed());
}
@Override
public boolean complete() {
return emitCooperatively(traverser);
}
}
We will also add a simple logging processor, so we can see what values are generated and received:
public static class PeekProcessor extends AbstractProcessor {
@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
System.out.println("Received number: " + item);
emit(item);
return true;
}
}
You can then build your DAG as follows and execute it:
final int limit = 10;
dag.newVertex("number-generator", () -> new NumberGenerator(limit));
// Vertex logger = dag.newVertex("logger-vertex", PeekProcessor::new);
dag.edge(Edge.between(generator, logger));
jet.newJob(dag).execute().get();
When you run this code, you will see the output as below:
Received number: 4
Received number: 0
Received number: 3
Received number: 2
Received number: 2
Received number: 2
Since we are using the default parallelism setting on this vertex, several instances of this processor are created, all of which will be generating the same sequence of values.
What we actually want is to generate a subset of the values for each processor. We will start by partitioning the data according to its remainder, when divided by the total number of processors. For example, if we have 8 processors, numbers which have a remainder of 1 when divided by 8, will go to processor with index 1.
To do this, we need to implement the ProcessorSupplier
API:
static class NumberGeneratorSupplier implements ProcessorSupplier {
private final int limit;
public NumberGeneratorSupplier(int limit) {
this.limit = limit;
}
@Nonnull
@Override
public List<? extends Processor> get(int count) {
// each processor is responsible for a subset of the numbers.
return IntStream.range(0, count)
.mapToObj(index ->
new NumberGenerator(IntStream.range(0, limit)
.filter(n -> n % count == index))
)
.collect(Collectors.toList());
}
}
static class NumberGenerator extends AbstractProcessor {
private final Traverser<Integer> traverser;
public NumberGenerator(IntStream stream) {
traverser = traverseStream(stream.boxed());
}
@Override
public boolean complete() {
return emitCooperatively(traverser);
}
}
With this approach, each instance of processor will only generate a
subset of the numbers, with each instance generating the numbers where
the remainder of the number divided count
matches the index of the
processor.
If we add another member to the cluster, we will quickly see that both
members are generating the same sequence of numbers. We need to distribute
the work across the cluster, by making sure that each member will generate
a subset of the numbers. We will again follow a similar approach as
above, but now we need to be aware of the global index for each
Processor
instance.
To achieve this, we need to implement a custom ProcessorMetaSupplier
.
A ProcessorMetaSupplier
is called from a single coordinator member,
and creates one ProcessorSupplier
for each member. The main partition
allocation thus can be done by the ProcessorMetaSupplier
. Our
distributed number generator source could then look as follows:
static class NumberGeneratorMetaSupplier implements ProcessorMetaSupplier {
private final int limit;
private transient int totalParallelism;
private transient int localParallelism;
NumberGeneratorMetaSupplier(int limit) {
this.limit = limit;
}
@Override
public void init(@Nonnull Context context) {
totalParallelism = context.totalParallelism();
localParallelism = context.localParallelism();
}
@Override @Nonnull
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
Map<Address, ProcessorSupplier> map = new HashMap<>();
for (int i = 0; i < addresses.size(); i++) {
Address address = addresses.get(i);
int start = i * localParallelism;
int end = (i + 1) * localParallelism;
int mod = totalParallelism;
map.put(address, count -> range(start, end)
.mapToObj(index -> new NumberGenerator(range(0, limit).filter(f -> f % mod == index)))
.collect(toList())
);
}
return map::get;
}
}
The vertex creation can then be updated as follows:
Vertex generator = dag.newVertex("number-generator", new NumberGeneratorMetaSupplier(limit));