As mentioned in the Hazelcast Jet 102 section, determining the watermark is somewhat of a black art; it's about superimposing order over a disordered stream of events. We must decide at which point it stops making sense to wait even longer for data about past events to arrive. There's a tension between two opposing forces here:

  • wait as long as possible to account for all the data;
  • get results as soon as possible.

While there are ways to (kind of) achieve both, there's a significant associated cost in terms of complexity and overal performance. Hazelcast Jet takes a simple approach and strictly triages stream items into "still on time" and "late", discarding the latter.

WatermarkPolicy is the abstraction that computes the value of the watermark for a (sub)stream of disordered data items. It takes as input the timestamp of each observed item and outputs the current watermark value.

Predefined watermark policies

We provide some general, data-agnostic watermark policies in the WatermarkPolicies class. They vary in how well they deal with advancing the watermark during a stream lull. The better they deal with it, the more assumptions they must make on the nature of the events' timestamp values and on the relationship between the timestamps and the locally observed wall-clock time.

"With Fixed Lag"

The withFixedLag() policy will maintain a watermark that lags behind the highest observed event timestamp by a configured amount. In other words, each time an event with the highest timestamp so far is encountered, this policy advances the watermark to eventTimestamp - lag. This puts a limit on the spread between timestamps in the stream: all events whose timestamp is more than the configured lag behind the highest timestamp are considered late.

"Limiting Lag and Delay"

The limitingLagAndDelay() policy applies the same fixed-lag logic as above and adds another limit: maximum delay from observing an item and advancing the watermark to at least that item's timestamp. A stream may experience a lull (no items arriving) and this added limit will ensure that the watermark doesn't stay behind the highest timestamp observed before the onset of the lull. However, the skew between substreams may still cause the watermark that reaches the downstream vertex to stay behind some timestamps. This is because the downstream will only get the lowest of all substream watermarks.

The advantage of this policy is that it doesn't assume anything about the unit of measurement used for event timestamps.

"Limiting Lag and Lull"

The limitingLagAndLull() policy is similar to limitingLagAndDelay in adressing the stream lull problem and goes a step further by addressing the issues of lull combined with skew. To achieve this it must introduce an assumption, though: that the time unit used for event timestamps is milliseconds. After a given period passes with the watermark not being advanced by the arriving data (i.e., a lull happens), it will start advancing it in lockstep with the passage of the local system time. The watermark isn't adjusted towards the local time; the policy just ensures the difference between local time and the watermark stays the same during a lull. Since the system time advances equally on all substream processors, the watermark propagated to downstream is now guaranteed to advance regardless of the lull.

There is, however, a subtle issue with limitingLagAndLull(): if there is any substream that never observes an item, that substream's policy instance won't be able to initialize its "last seen timestamp" and will cause the watermark sent to the downstream to forever lag behind all the actual data.

"Limiting Timestamp and Wall-Clock Lag"

The limitingTimestampAndWallClockLag() policy makes a stronger assumption: that the event timestamps are in milliseconds since the Unix epoch and that they are synchronized with the local time on the processing machine. It puts a limit on how much the watermark can lag behind the local time. As long as its assumption holds, this policy gives straightforward results. It also doesn't suffer from the subtle issue with limitingLagAndLull().

Watermark Throttling

The policy objects presented above will return the "ideal" watermark value according to their logic; however it would be too much overhead to insert a watermark item each time the ideal watermark advances (typically a thousand times per second). WatermarkEmissionPolicy is the object that decides whether to emit a watermark item given the last emitted and the current value of the watermark. For the purpose of sliding windows there is an easy answer: suppress all watermark items that belong to the same frame as the already emitted one. Such items would have no effect since the watermark must advance beyond a frame's end for the aggregating vertex to consider the frame completed and act upon its results. The method WatermarkEmissionPolicy.emitByFrame() will return a policy with this kind of throttling applied. For other cases there is emitByMinStep() which suppresses watermark items until the watermark has advanced at least minStep ahead of the previously emitted one.