Interface Observable<T>
-
- Type Parameters:
T
- type of the values in the sequence
- All Superinterfaces:
java.lang.Iterable<T>
public interface Observable<T> extends java.lang.Iterable<T>
Represents a flowing sequence of events produced by observable sinks. To observe the events, calljet.getObservable(name).addObserver(myObserver)
.The
Observable
is backed by aRingbuffer
, which, once created, has a fixed capacity for storing messages. It supports reading by multipleObserver Observers
, which will all observe the same sequence of messages. A newObserver
will start reading automatically from the oldest sequence available. Once the capacity is full, the oldest messages will be overwritten as new ones arrive.The
Ringbuffer
's capacity defaults to 10000, but can be changed (via theconfigureCapacity(int)
method), as long as theRingbuffer
hasn't been created yet (see the "Lifecycle" section below).In addition to data events, the
Observer
can also observe completion and failure events. Completion means that no further values will appear in the sequence. Failure means that something went wrong during the job execution .Lifecycle
When talking about the lifecycle of an
Observable
(which is basically just a client side object and has a lifecycle just like any other POJO) it's better to actually consider the lifecycle of the underlyingRingbuffer
, since that is the significant distributed entity.The lifecycle of the
Ringbuffer
is decoupled from the lifecycle of the job. TheRingbuffer
is created either when the user gets a reference to its equivalentObservable
(throughJetService.getObservable()
) and registers the firstObserver
on it (throughObservable.addObserver()
) or when the job containing the sink for it starts executing.The
Ringbuffer
must be explicitly destroyed when it's no longer in use, or data will be retained in the cluster. This is done via theObservable.destroy()
method. Note: even if theObservable
POJO gets lost and its underlyingRingbuffer
is leaked in the cluster, it's still possible to manually destroy it later by creating anotherObservable
instance with the same name and callingdestroy()
on that.Important: The same
Observable
must not be used again in a new job since this will cause completion events interleaving and causing data loss or other unexpected behaviour. Using one observable name in multipleobservable sinks
in the same job is allowed, this will not produce multiple completion or error events (just an intermingling of the results from the two sinks, but that should be fine in some use cases).- Since:
- Jet 4.0
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description java.util.UUID
addObserver(Observer<T> observer)
Registers anObserver
to thisObservable
.Observable<T>
configureCapacity(int capacity)
Set the capacity of the underlyingRingbuffer
, which defaults to 10000.void
destroy()
Removes all previously registered observers and destroys the backingRingbuffer
.int
getConfiguredCapacity()
Returns the configured capacity of the underlyingRingbuffer
..default java.util.Iterator<T>
iterator()
Returns an iterator over the sequence of events produced by thisObservable
.java.lang.String
name()
Name of this instance.void
removeObserver(java.util.UUID registrationId)
Removes a previously addedObserver
identified by its assigned registration ID.default <R> java.util.concurrent.CompletableFuture<R>
toFuture(java.util.function.Function<java.util.stream.Stream<T>,R> fn)
Allows you to post-process the results of a Jet job on the client side using the standard JavaStream API
.
-
-
-
Method Detail
-
name
@Nonnull java.lang.String name()
Name of this instance.
-
addObserver
@Nonnull java.util.UUID addObserver(@Nonnull Observer<T> observer)
Registers anObserver
to thisObservable
. It will receive all events currently in the backingRingbuffer
and then continue receiving any future events.- Returns:
- registration ID associated with the added
Observer
, can be used to remove theObserver
later
-
removeObserver
void removeObserver(@Nonnull java.util.UUID registrationId)
Removes a previously addedObserver
identified by its assigned registration ID. A removedObserver
will not get notified about further events.
-
configureCapacity
Observable<T> configureCapacity(int capacity)
Set the capacity of the underlyingRingbuffer
, which defaults to 10000.This method can be called only before the
Ringbuffer
gets created. This means before anyObservers
are added to theObservable
and before any jobs containingobservable sinks
(with the same observable name) are submitted for execution.Important: only configure capacity once, multiple configuration are currently not supported.
- Throws:
java.lang.IllegalStateException
- if theRingbuffer
has already been created
-
getConfiguredCapacity
int getConfiguredCapacity()
Returns the configured capacity of the underlyingRingbuffer
..This method only works if the backing
Ringbuffer
has already been created. If so, it will be queried for its actual capacity, which can't be changed any longer. (Reminder: theRingbuffer
gets created either when the firstObserver
is added or when the job containing theobservable sink
(with the same observable name) is submitted for execution.)- Throws:
java.lang.IllegalStateException
- if the backingRingbuffer
has not yet been created
-
iterator
@Nonnull default java.util.Iterator<T> iterator()
Returns an iterator over the sequence of events produced by thisObservable
. If there are currently no events to observe, the iterator'shasNext()
andnext()
methods will block. A completion event completes the iterator (hasNext()
will return false) and a failure event makes the iterator's methods throw the underlying exception.If used against an
Observable
populated from a streaming job, the iterator will complete only in the case of an error or job cancellation.The iterator is not thread-safe.
The iterator is backed by a blocking concurrent queue which stores all events until consumed.
- Specified by:
iterator
in interfacejava.lang.Iterable<T>
-
toFuture
@Nonnull default <R> java.util.concurrent.CompletableFuture<R> toFuture(@Nonnull java.util.function.Function<java.util.stream.Stream<T>,R> fn)
Allows you to post-process the results of a Jet job on the client side using the standard JavaStream API
. You provide a function that will receive the job results as aStream<T>
and return a single result (which can in fact be anotherStream
, if so desired).Returns a
CompletableFuture
that will become completed once your function has received all the job results through itsStream
and returned the final result.A trivial example is counting, like this:
observable.toFuture(Stream::count)
, however the Stream API is quite rich and you can perform arbitrary transformations and aggregations.This feature is intended to be used only on the results of a batch job. On an unbounded streaming job the stream-collecting operation will never reach the final result.
- Parameters:
fn
- transform function which takes the stream of observed values and produces an altered value from it, which could also be a stream
-
destroy
void destroy()
Removes all previously registered observers and destroys the backingRingbuffer
.Note: if you call this while a job that publishes to this
Observable
is still active, it will silently create a newRingbuffer
and go on publishing to it.
-
-