public final class ServiceFactories extends Object
service factories
.Modifier and Type | Method and Description |
---|---|
static <K,V> ServiceFactory<?,IMap<K,V>> |
iMapService(String mapName)
Returns a factory that provides an
IMap as the service. |
static <S> ServiceFactory<?,S> |
nonSharedService(FunctionEx<? super Processor.Context,? extends S> createServiceFn)
A variant of
nonSharedService(createFn, destroyFn) with a no-op destroyFn . |
static <S> ServiceFactory<?,S> |
nonSharedService(FunctionEx<? super Processor.Context,? extends S> createServiceFn,
ConsumerEx<? super S> destroyServiceFn)
Returns a
ServiceFactory which creates a separate service
instance for each parallel Jet processor. |
static <K,V> ServiceFactory<?,ReplicatedMap<K,V>> |
replicatedMapService(String mapName)
Returns a factory that provides a
ReplicatedMap as the service
object. |
static <S> ServiceFactory<?,S> |
sharedService(FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn)
A variant of
sharedService(createFn, destroyFn) with a no-op destroyFn . |
static <S> ServiceFactory<?,S> |
sharedService(FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn,
ConsumerEx<S> destroyServiceFn)
Returns a
ServiceFactory which will provide a single shared
service object per cluster member. |
@Nonnull public static <K,V> ServiceFactory<?,ReplicatedMap<K,V>> replicatedMapService(@Nonnull String mapName)
ReplicatedMap
as the service
object. A replicated map is a particularly good choice if you are
enriching an event stream with the data stored in the Hazelcast Jet
cluster. Unlike in a hashJoin
transformation, the data in the
map can change while the job is running so you can keep the enriching
dataset up-to-date. Unlike IMap
, the data you access is local so
you won't do any blocking calls using it (important for performance).
If you want to destroy the map after the job finishes, call
factory.destroyFn(ReplicatedMap::destroy)
on the object you get
from this method.
Example usage (without destroyFn):
p.readFrom( /* a batch or streaming source */ ) .mapUsingService(replicatedMapService("fooMapName"), (map, item) -> tuple2(item, map.get(item.getKey()))) .destroyFn(ReplicatedMap::destroy);
K
- type of the map keyV
- type of the map valuemapName
- name of the ReplicatedMap
to use as the service@Nonnull public static <K,V> ServiceFactory<?,IMap<K,V>> iMapService(@Nonnull String mapName)
IMap
as the service. This
is useful if you are enriching an event stream with the data stored in
the Hazelcast Jet cluster. Unlike in a hashJoin
transformation,
the data in the map can change while the job is running so you can keep
the enriching dataset up-to-date.
Instead of using this factory, you can call GeneralStage.mapUsingIMap(IMap, FunctionEx, BiFunctionEx)
or GeneralStageWithKey.mapUsingIMap(IMap, BiFunctionEx)
.
If you plan to use a sync method on the map, call ServiceFactory.toNonCooperative()
on the returned factory.
K
- key typeV
- value typemapName
- name of the map used as service@Nonnull public static <S> ServiceFactory<?,S> sharedService(@Nonnull FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn)
sharedService(createFn, destroyFn)
with a no-op destroyFn
.
Note: if your service has a blocking API (e.g., doing
synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine
to start a dedicated thread for those calls. Failing to do this can
cause severe performance problems. You should also carefully consider
how much local parallelism you need for this step since each parallel
tasklet needs its own thread. Call stage.setLocalParallelism()
to set an explicit level, otherwise it will
depend on the number of cores on the Jet machine, which makes no sense
for blocking code.
public static <S> ServiceFactory<?,S> sharedService(@Nonnull FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn, @Nonnull ConsumerEx<S> destroyServiceFn)
ServiceFactory
which will provide a single shared
service object per cluster member. All parallel processors serving the
associated pipeline stage will use the same object. Since the service
object will be accessed from many parallel threads, it must be
thread-safe.
Note: if your service has a blocking API (e.g., doing
synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine
to start a dedicated thread for those calls. Failing to do this can
cause severe performance problems. You should also carefully consider
how much local parallelism you need for this step since each parallel
tasklet needs its own thread. Call stage.setLocalParallelism()
to set an explicit level, otherwise it will
depend on the number of cores on the Jet machine, which makes no sense
for blocking code.
S
- type of the service objectcreateServiceFn
- the function that creates the service. It will be called once on each
Jet member. It must be stateless.destroyServiceFn
- the function that destroys the service. It will be called once on each
Jet member. It can be used to tear down any resources acquired by the
service. It must be stateless.nonSharedService(FunctionEx, ConsumerEx)
@Nonnull public static <S> ServiceFactory<?,S> nonSharedService(@Nonnull FunctionEx<? super Processor.Context,? extends S> createServiceFn)
nonSharedService(createFn, destroyFn)
with a no-op destroyFn
.
Note: if your service has a blocking API (e.g., doing
synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine
to start a dedicated thread for those calls. Failing to do this can
cause severe performance problems. You should also carefully consider
how much local parallelism you need for this step since each parallel
tasklet needs its own thread. Call stage.setLocalParallelism()
to set an explicit level, otherwise it will
depend on the number of cores on the Jet machine, which makes no sense
for blocking code.
public static <S> ServiceFactory<?,S> nonSharedService(@Nonnull FunctionEx<? super Processor.Context,? extends S> createServiceFn, @Nonnull ConsumerEx<? super S> destroyServiceFn)
ServiceFactory
which creates a separate service
instance for each parallel Jet processor. The number of processors on
each cluster member is dictated by stage.localParallelism
. Use this when the service instance should not
be shared across multiple threads.
Note: if your service has a blocking API (e.g., doing
synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine
to start a dedicated thread for those calls. Failing to do this can
cause severe performance problems. You should also carefully consider
how much local parallelism you need for this step since each parallel
tasklet needs its own thread. Call stage.setLocalParallelism()
to set an explicit level, otherwise it will
depend on the number of cores on the Jet machine, which makes no sense
for blocking code.
S
- type of the service objectcreateServiceFn
- the function that creates the service. It will be called once per
processor instance. It must be stateless.destroyServiceFn
- the function that destroys the service. It will be called once per
processor instance. It can be used to tear down any resources
acquired by the service. It must be stateless.sharedService(FunctionEx, ConsumerEx)
Copyright © 2024 Hazelcast, Inc.. All rights reserved.