AggregateOperation is a holder of five functional primitives Jet uses
to evaluate an aggregate function over a set of data items:
createa new accumulator object.
accumulatethe data of an item by mutating the accumulator's state.
combinethe contents of the right-hand accumulator into the left-hand one.
deductthe contents of the right-hand accumulator from the left-hand one (undo the effects of
finishaccumulation by transforming the accumulator object into the final result.
If you are familiar with
java.util.stream.Collector, you may get a
déjà vu here: except for some naming differences,
is a very similar object. The devil is in the detail, however, and these
primitives are tailor-made for Jet's aggregation style. We assume that
both accumulation and combining are destructive operations, mutating the
left-hand operand. Mutability means lower GC pressure and we always use
it in our implementations. We also define the
deduct primitive, absent
Collector and specifically targetted at the optimization of
sliding window aggregation.
As a simple example we can define an aggregate operation that collects items to a set:
AggregateOperation.of( HashSet::new, Set::add, Set::addAll, Set::removeAll, Set::toString );
of() is a factory method that takes the primitives in the order we
gave above. Let's have a brief look at this definition. Our accumulator
object is a
HashSet instance. We accumulate an item with
combine it with another accumulator with
addAll(); we undo this
removeAll(). Our final output is a string
representation of the set.
This example was easy and familiar because
Set is a mutable
abstraction so the fit is natural. It gets less natural when your
accumulated value is something the JDK doesn't provide in mutable form.
If you want to accumulate a
long value, you'll need a mutable
container object for it. We do provide convenience that will create an
AggregateOperation from immutable reduction primitives, so you could
AggregateOperations.reducing( 0L, (Long x) -> x, (sum1, sum2) -> sum1 + sum2, (sum1, sum2) -> sum1 - sum2 );
Under the hood Jet will instantiate a mutable holder that keeps a
reference to the immutable objects returned from the user-supplied
reducing() takes these arguments:
- the "empty" accumulated value
- the function that takes a stream item and computes its accumulated value
deduct here are pure functions acting on
immutable arguments and have appropriately different lambda shapes than
While simple, the above definition of summing will create a lot of
Long objects. Jet's own summing operation looks like this:
return AggregateOperation.of( LongAccumulator::new, (a, item) -> a.addExact(mapToLongF.applyAsLong(item)), LongAccumulator::addExact, LongAccumulator::subtractExact, LongAccumulator::get );
Instead of one immutable
Long per input item we create just a single
LongAccumulator instance for the whole operation. There's no
intermediate step of first computing the accumulated value of an item
and then combining it with the running state (this would force us to
create an object); here the
accumulate primitive takes the whole item
and works it out internally how to update the accumulator.
LongAccumulator declares methods for "exact" addition/subtraction
(result is checked for integer overflow) and we use them here,
preferring fail-fast behavior to emitting incorrect results.