Class ServiceFactories
- java.lang.Object
-
- com.hazelcast.jet.pipeline.ServiceFactories
-
public final class ServiceFactories extends java.lang.Object
Utility class with methods that create several usefulservice factories
.- Since:
- Jet 3.0
-
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <K,V>
ServiceFactory<?,IMap<K,V>>iMapService(java.lang.String mapName)
Returns a factory that provides anIMap
as the service.static <S> ServiceFactory<?,S>
nonSharedService(FunctionEx<? super Processor.Context,? extends S> createServiceFn)
A variant ofnonSharedService(createFn, destroyFn)
with a no-opdestroyFn
.static <S> ServiceFactory<?,S>
nonSharedService(FunctionEx<? super Processor.Context,? extends S> createServiceFn, ConsumerEx<? super S> destroyServiceFn)
Returns aServiceFactory
which creates a separate service instance for each parallel Jet processor.static <K,V>
ServiceFactory<?,ReplicatedMap<K,V>>replicatedMapService(java.lang.String mapName)
Returns a factory that provides aReplicatedMap
as the service object.static <S> ServiceFactory<?,S>
sharedService(FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn)
A variant ofsharedService(createFn, destroyFn)
with a no-opdestroyFn
.static <S> ServiceFactory<?,S>
sharedService(FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn, ConsumerEx<S> destroyServiceFn)
Returns aServiceFactory
which will provide a single shared service object per cluster member.
-
-
-
Method Detail
-
replicatedMapService
@Nonnull public static <K,V> ServiceFactory<?,ReplicatedMap<K,V>> replicatedMapService(@Nonnull java.lang.String mapName)
Returns a factory that provides aReplicatedMap
as the service object. A replicated map is a particularly good choice if you are enriching an event stream with the data stored in the Hazelcast Jet cluster. Unlike in ahashJoin
transformation, the data in the map can change while the job is running so you can keep the enriching dataset up-to-date. UnlikeIMap
, the data you access is local so you won't do any blocking calls using it (important for performance).If you want to destroy the map after the job finishes, call
factory.destroyFn(ReplicatedMap::destroy)
on the object you get from this method.Example usage (without destroyFn):
p.readFrom( /* a batch or streaming source */ ) .mapUsingService(replicatedMapService("fooMapName"), (map, item) -> tuple2(item, map.get(item.getKey()))) .destroyFn(ReplicatedMap::destroy);
- Type Parameters:
K
- type of the map keyV
- type of the map value- Parameters:
mapName
- name of theReplicatedMap
to use as the service- Since:
- Jet 3.0
-
iMapService
@Nonnull public static <K,V> ServiceFactory<?,IMap<K,V>> iMapService(@Nonnull java.lang.String mapName)
Returns a factory that provides anIMap
as the service. This is useful if you are enriching an event stream with the data stored in the Hazelcast Jet cluster. Unlike in ahashJoin
transformation, the data in the map can change while the job is running so you can keep the enriching dataset up-to-date.Instead of using this factory, you can call
GeneralStage.mapUsingIMap(IMap, FunctionEx, BiFunctionEx)
orGeneralStageWithKey.mapUsingIMap(IMap, BiFunctionEx)
.If you plan to use a sync method on the map, call
ServiceFactory.toNonCooperative()
on the returned factory.- Type Parameters:
K
- key typeV
- value type- Parameters:
mapName
- name of the map used as service- Returns:
- the service factory
- Since:
- Jet 3.0
-
sharedService
@Nonnull public static <S> ServiceFactory<?,S> sharedService(@Nonnull FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn)
A variant ofsharedService(createFn, destroyFn)
with a no-opdestroyFn
.Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call
ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Callstage.setLocalParallelism()
to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.- Since:
- Jet 4.0
-
sharedService
public static <S> ServiceFactory<?,S> sharedService(@Nonnull FunctionEx<? super ProcessorSupplier.Context,S> createServiceFn, @Nonnull ConsumerEx<S> destroyServiceFn)
Returns aServiceFactory
which will provide a single shared service object per cluster member. All parallel processors serving the associated pipeline stage will use the same object. Since the service object will be accessed from many parallel threads, it must be thread-safe.Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call
ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Callstage.setLocalParallelism()
to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.- Type Parameters:
S
- type of the service object- Parameters:
createServiceFn
- the function that creates the service. It will be called once on each Jet member. It must be stateless.destroyServiceFn
- the function that destroys the service. It will be called once on each Jet member. It can be used to tear down any resources acquired by the service. It must be stateless.- See Also:
nonSharedService(FunctionEx, ConsumerEx)
-
nonSharedService
@Nonnull public static <S> ServiceFactory<?,S> nonSharedService(@Nonnull FunctionEx<? super Processor.Context,? extends S> createServiceFn)
A variant ofnonSharedService(createFn, destroyFn)
with a no-opdestroyFn
.Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call
ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Callstage.setLocalParallelism()
to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.- Since:
- Jet 4.0
-
nonSharedService
public static <S> ServiceFactory<?,S> nonSharedService(@Nonnull FunctionEx<? super Processor.Context,? extends S> createServiceFn, @Nonnull ConsumerEx<? super S> destroyServiceFn)
Returns aServiceFactory
which creates a separate service instance for each parallel Jet processor. The number of processors on each cluster member is dictated bystage.localParallelism
. Use this when the service instance should not be shared across multiple threads.Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call
ServiceFactory.toNonCooperative()
as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Callstage.setLocalParallelism()
to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.- Type Parameters:
S
- type of the service object- Parameters:
createServiceFn
- the function that creates the service. It will be called once per processor instance. It must be stateless.destroyServiceFn
- the function that destroys the service. It will be called once per processor instance. It can be used to tear down any resources acquired by the service. It must be stateless.- See Also:
sharedService(FunctionEx, ConsumerEx)
-
-