T
- type of the values in the sequencepublic interface Observable<T> extends Iterable<T>
jet.getObservable(name).addObserver(myObserver)
.
The Observable
is backed by a Ringbuffer
, which, once
created, has a fixed capacity for storing messages. It supports reading
by multiple Observer Observers
, which will all observe the same
sequence of messages. A new Observer
will start reading
automatically from the oldest sequence available. Once the capacity is
full, the oldest messages will be overwritten as new ones arrive.
The Ringbuffer
's capacity defaults to , but can be changed (via the configureCapacity(int)
method), as long as the Ringbuffer
hasn't
been created yet (see the "Lifecycle" section below).
In addition to data events, the Observer
can also observe
completion and failure events. Completion means that no further values
will appear in the sequence. Failure means that something went wrong
during the job execution .
Lifecycle
When talking about the lifecycle of an Observable
(which is
basically just a client side object and has a lifecycle just like any
other POJO) it's better to actually consider the lifecycle of the
underlying Ringbuffer
, since that is the significant
distributed entity.
The lifecycle of the Ringbuffer
is decoupled from the lifecycle
of the job. The Ringbuffer
is created either when the user
gets a reference to its equivalent Observable
(through
JetService.getObservable()
)
and registers the first Observer
on it (through
Observable.addObserver()
)
or when the job containing the sink for it starts executing.
The Ringbuffer
must be explicitly destroyed when it's no longer
in use, or data will be retained in the cluster. This is done via the
Observable.destroy()
method. Note: even if the
Observable
POJO gets lost and its underlying Ringbuffer
is leaked in the cluster, it's still possible to manually destroy
it later by creating another Observable
instance with the same
name and calling destroy()
on that.
Important: The same Observable
must
not be used again in a new job since this will cause
completion events interleaving and causing data loss or other unexpected
behaviour. Using one observable name in multiple
observable sinks
in the same job is
allowed, this will not produce multiple completion or error events (just
an intermingling of the results from the two sinks, but that should be
fine in some use cases).
Modifier and Type | Method and Description |
---|---|
UUID |
addObserver(Observer<T> observer)
Registers an
Observer to this Observable . |
Observable<T> |
configureCapacity(int capacity)
Set the capacity of the underlying
Ringbuffer , which defaults to
. |
void |
destroy()
Removes all previously registered observers and destroys the backing
Ringbuffer . |
int |
getConfiguredCapacity()
Returns the configured capacity of the underlying
Ringbuffer .. |
default Iterator<T> |
iterator()
Returns an iterator over the sequence of events produced by this
Observable . |
String |
name()
Name of this instance.
|
void |
removeObserver(UUID registrationId)
Removes a previously added
Observer identified by its
assigned registration ID. |
default <R> CompletableFuture<R> |
toFuture(Function<Stream<T>,R> fn)
Allows you to post-process the results of a Jet job on the client side
using the standard Java
Stream API . |
forEach, spliterator
@Nonnull UUID addObserver(@Nonnull Observer<T> observer)
Observer
to this Observable
. It will
receive all events currently in the backing Ringbuffer
and
then continue receiving any future events.Observer
,
can be used to remove the Observer
latervoid removeObserver(@Nonnull UUID registrationId)
Observer
identified by its
assigned registration ID. A removed Observer
will not get
notified about further events.Observable<T> configureCapacity(int capacity)
Ringbuffer
, which defaults to
.
This method can be called only before the Ringbuffer
gets
created. This means before any Observers
are added
to the Observable
and before any jobs containing
observable
sinks
(with the same observable name) are submitted for execution.
Important: only configure capacity once, multiple configuration are currently not supported.
IllegalStateException
- if the Ringbuffer
has already been
createdint getConfiguredCapacity()
Ringbuffer
..
This method only works if the backing Ringbuffer
has already
been created. If so, it will be queried for its actual capacity,
which can't be changed any longer. (Reminder: the Ringbuffer
gets created either when the first Observer
is added or when
the job containing the observable sink
(with the same observable name) is submitted for
execution.)
IllegalStateException
- if the backing Ringbuffer
has not
yet been created@Nonnull default Iterator<T> iterator()
Observable
. If there are currently no events to observe,
the iterator's hasNext()
and next()
methods will block.
A completion event completes the iterator (hasNext()
will return
false) and a failure event makes the iterator's methods throw the
underlying exception.
If used against an Observable
populated from a streaming job,
the iterator will complete only in the case of an error or job
cancellation.
The iterator is not thread-safe.
The iterator is backed by a blocking concurrent queue which stores all events until consumed.
@Nonnull default <R> CompletableFuture<R> toFuture(@Nonnull Function<Stream<T>,R> fn)
Stream API
. You provide
a function that will receive the job results as a Stream<T>
and
return a single result (which can in fact be another Stream
,
if so desired).
Returns a CompletableFuture
that will become
completed once your function has received all the job results through
its Stream
and returned the final result.
A trivial example is counting, like this: observable.toFuture(Stream::count)
,
however the Stream API is quite rich and you can perform arbitrary
transformations and aggregations.
This feature is intended to be used only on the results of a batch job. On an unbounded streaming job the stream-collecting operation will never reach the final result.
fn
- transform function which takes the stream of observed values
and produces an altered value from it, which could also
be a streamvoid destroy()
Ringbuffer
.
Note: if you call this while a job that publishes to this
Observable
is still active, it will silently create a new Ringbuffer
and go on publishing to it.
Copyright © 2023 Hazelcast, Inc.. All rights reserved.