Interface StreamSourceStage<T>
- Type Parameters:
- T- the type of items coming out of this stage
pipeline
 that will observe an unbounded amount of data (i.e., an event stream).
 Timestamp handling, a prerequisite for attaching data processing stages,
 is not yet defined in this step. Call one of the methods on this
 instance to declare whether and how the data source will assign
 timestamps to events.- Since:
- Jet 3.0
- 
Method SummaryModifier and TypeMethodDescriptionDeclares that the source will assign the time of ingestion as the event timestamp.withNativeTimestamps(long allowedLag) Declares that the stream will use the source's native timestamps.Declares that the source will not assign any timestamp to the events it emits.withTimestamps(ToLongFunctionEx<? super T> timestampFn, long allowedLag) Declares that the source will extract timestamps from the stream items.
- 
Method Details- 
withoutTimestampsStreamStage<T> withoutTimestamps()Declares that the source will not assign any timestamp to the events it emits. You can add them later usingaddTimestamps, but the behavior is different — see the note there.
- 
withIngestionTimestampsStreamStage<T> withIngestionTimestamps()Declares that the source will assign the time of ingestion as the event timestamp. It will callSystem.currentTimeMillis()at the moment it observes an event from the data source and assign it as the event timestamp. The actual time of the original event is ignored.With this mode, unlike withTimestamps(com.hazelcast.function.ToLongFunctionEx<? super T>, long)orwithNativeTimestamps(long), the sparse events issue isn't present. You can use this mode to avoid the issue, but there's a caveat:Note: when snapshotting is enabled to achieve fault tolerance, after a restart Jet replays all the events that were already processed since the last snapshot. These events will then get different timestamps. If you want your job to be fault-tolerant, the events in the stream must have a stable timestamp associated with them. The source may natively provide such timestamps (the withNativeTimestamps(long)option). If that is not appropriate, the events should carry their own timestamp as a part of their data, and you can usewithTimestamps(timestampFn, allowedLagto extract it.Note 2: if the system time goes back (such as when adjusting it), newer events will get older timestamps and might be dropped as late, because the allowed lag is 0. 
- 
withNativeTimestampsDeclares that the stream will use the source's native timestamps. This is typically the message timestamp that the external system assigns as event's metadata.If there's no notion of native timestamps in the source, this method will throw a JetException.Issue with sparse events Event time progresses only through the ingestion of new events. If the events are sparse, time will effectively stop until a newer event arrives. This causes high latency for time-sensitive operations (such as window aggregation). In addition, Jet tracks event time for every source partition separately, and if just one partition has sparse events, time progress in the whole job is hindered. To overcome this you can either ensure there's a consistent influx of events in every partition, or you can use withIngestionTimestamps().- Parameters:
- allowedLag- the allowed lag of a given event's timestamp behind the top timestamp value observed so far
 
- 
withTimestampsDeclares that the source will extract timestamps from the stream items.Issue with sparse events Event time progresses only through the ingestion of new events. If the events are sparse, time will effectively stop until a newer event arrives. This causes high latency for time-sensitive operations (such as window aggregation). In addition, Jet tracks event time for every source partition separately, and if just one partition has sparse events, time progress in the whole job is hindered. To overcome this you can either ensure there's a consistent influx of events in every partition, or you can use withIngestionTimestamps().- Parameters:
- timestampFn- a function that returns the timestamp for each item, typically in milliseconds. It must be stateless and cooperative.
- allowedLag- the allowed lag of a given event's timestamp behind the top timestamp value observed so far. The time unit is the same as the unit used by- timestampFn
 
 
-