Class ServiceFactory<C,S>
- Type Parameters:
C
- type of the shared context objectS
- type of the service object
- All Implemented Interfaces:
Serializable
,Cloneable
stage.mapUsingService()
.
The lifecycle of this factory object is as follows:
-
When you submit a job, Jet serializes
ServiceFactory
and sends it to all the cluster members. -
On each member Jet calls
createContextFn()
to get a context object that will be shared across all the service instances on that member. For example, if you are connecting to an external service that provides a thread-safe client, you can create it here and then create individual sessions for each service instance. -
Jet repeatedly calls
createServiceFn()
to create as many service instances on each member as determined by thelocalParallelism
of the pipeline stage. The invocations ofcreateServiceFn()
receive the context object. -
When the job is done, Jet calls
destroyServiceFn()
with each service instance. -
Finally, Jet calls
destroyContextFn()
with the context object.
ServiceFactories.nonSharedService(FunctionEx, ConsumerEx)
ServiceFactories.processorLocalService} or ServiceFactories.sharedService(FunctionEx, ConsumerEx)
ServiceFactories.memberLocalService}.
Here's a list of pipeline transforms that require a ServiceFactory
:
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
GeneralStage.filterUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiPredicateEx<? super S, ? super T>)
GeneralStage.flatMapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends com.hazelcast.jet.Traverser<R>>)
GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>)
GeneralStage.mapUsingServiceAsyncBatched(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, int, com.hazelcast.function.BiFunctionEx<? super S, ? super java.util.List<T>, ? extends java.util.concurrent.CompletableFuture<java.util.List<R>>>)
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
GeneralStageWithKey.filterUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriPredicate<? super S, ? super K, ? super T>)
GeneralStageWithKey.flatMapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<R>>)
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
- Since:
- Jet 4.0
- See Also:
-
Field Summary
Modifier and TypeFieldDescriptionstatic final boolean
Default value forisCooperative
. -
Method Summary
Modifier and TypeMethodDescriptionReturns the files and directories attached to this service factory.protected ServiceFactory<C,
S> clone()
FunctionEx<? super ProcessorSupplier.Context,
? extends C> Returns the function that creates the shared context object.BiFunctionEx<? super Processor.Context,
? super C, ? extends S> Returns the function that creates the service object.ConsumerEx<? super C>
Returns the function that destroys the shared context object at the end of the Jet job.ConsumerEx<? super S>
Returns the function that destroys the service object at the end of the Jet job.boolean
Returns theisCooperative
flag, seetoNonCooperative()
.Returns the required permission to use this factory.setCooperative
(boolean cooperative) Returns a copy of thisServiceFactory
with theisCooperative
flag set tocooperative
argument value.Returns a copy of thisServiceFactory
with theisCooperative
flag set tofalse
.withAttachedDirectory
(String id, File directory) Attaches a directory to this service factory under the given ID.withAttachedFile
(String id, File file) Attaches a file to this service factory under the given ID.static <C> ServiceFactory<C,
Void> withCreateContextFn
(FunctionEx<? super ProcessorSupplier.Context, ? extends C> createContextFn) Creates a newServiceFactory
with the given function that creates the shared context object.<S_NEW> ServiceFactory<C,
S_NEW> withCreateServiceFn
(BiFunctionEx<? super Processor.Context, ? super C, ? extends S_NEW> createServiceFn) Returns a copy of thisServiceFactory
with the givencreateService
function.withDestroyContextFn
(ConsumerEx<? super C> destroyContextFn) Returns a copy of thisServiceFactory
with thedestroyContext
function replaced with the given function.withDestroyServiceFn
(ConsumerEx<? super S> destroyServiceFn) Returns a copy of thisServiceFactory
with thedestroyService
function replaced with the given function.Returns a copy of thisServiceFactory
with any attached files removed.withPermission
(Permission permission) Returns a copy of thisServiceFactory
with setting the required permission.
-
Field Details
-
COOPERATIVE_DEFAULT
public static final boolean COOPERATIVE_DEFAULTDefault value forisCooperative
.- See Also:
-
-
Method Details
-
withCreateContextFn
@Nonnull public static <C> ServiceFactory<C,Void> withCreateContextFn(@Nonnull FunctionEx<? super ProcessorSupplier.Context, ? extends C> createContextFn) Creates a newServiceFactory
with the given function that creates the shared context object. Make sure to also callwithCreateServiceFn(com.hazelcast.function.BiFunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? super C, ? extends S_NEW>)
that creates the service objects. You can use the shared context as a shared service object as well, by returning it fromcreateServiceFn
. To achieve this more conveniently, useServiceFactories.sharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.ProcessorSupplier.Context, S>)
instead of this method. If you don't need a shared context at all, just independent service instances, you can use the convenience ofServiceFactories.nonSharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? extends S>)
.Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call
toNonCooperative()
as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Callstage.setLocalParallelism()
to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.- Type Parameters:
C
- type of the service context instance- Parameters:
createContextFn
- the function to create new context object, given aProcessorSupplier.Context
. Called once per Jet member. It must be stateless.- Returns:
- a new factory instance, not yet ready to use (needs the
createServiceFn
)
-
withDestroyContextFn
@Nonnull public ServiceFactory<C,S> withDestroyContextFn(@Nonnull ConsumerEx<? super C> destroyContextFn) Returns a copy of thisServiceFactory
with thedestroyContext
function replaced with the given function.Jet calls this function at the end of the job for each shared context object it created (one on each cluster member).
- Parameters:
destroyContextFn
- the function to destroy the shared service context. It must be stateless.- Returns:
- a copy of this factory with the supplied destroy-function
-
withCreateServiceFn
@Nonnull public <S_NEW> ServiceFactory<C,S_NEW> withCreateServiceFn(@Nonnull BiFunctionEx<? super Processor.Context, ? super C, ? extends S_NEW> createServiceFn) Returns a copy of thisServiceFactory
with the givencreateService
function.Jet calls this function to create each parallel instance of the service object (their number on each cluster member is determined by
stage.localParallelism
). Each invocation gets the shared context instance as the parameter, as well as the lower-levelProcessor.Context
.Since the call of this method establishes the
<S>
type parameter of the service factory, you must call it before setting thedestroyService
function. Calling this method resets any pre-existingdestroyService
function to a no-op.- Parameters:
createServiceFn
- the function that creates the service instance. It must be stateless.- Returns:
- a copy of this factory with the supplied create-service-function
-
withDestroyServiceFn
@Nonnull public ServiceFactory<C,S> withDestroyServiceFn(@Nonnull ConsumerEx<? super S> destroyServiceFn) Returns a copy of thisServiceFactory
with thedestroyService
function replaced with the given function.The destroy function is called at the end of the job to destroy all created services objects.
- Parameters:
destroyServiceFn
- the function to destroy the service instance. This function is called once per processor instance. It must be stateless.- Returns:
- a copy of this factory with the supplied destroy-function
-
toNonCooperative
Returns a copy of thisServiceFactory
with theisCooperative
flag set tofalse
. Call this method if your service doesn't follow the cooperative processor contract, that is if it waits for IO, blocks for synchronization, takes too long to complete etc. If the service will perform async operations, you can typically use a cooperative processor. Cooperative processors offer higher performance.- Returns:
- a copy of this factory with the
isCooperative
flag set tofalse
.
-
setCooperative
Returns a copy of thisServiceFactory
with theisCooperative
flag set tocooperative
argument value.Note: if the service will perform async operations, you can typically use a cooperative processor. Cooperative processors offer higher performance.
- Returns:
- a copy of this factory with the
isCooperative
flag changed.
-
withAttachedFile
Attaches a file to this service factory under the given ID. It will become a part of the Jet job and available tocreateContextFn()
asprocSupplierContext.attachedFile(id)
.- Returns:
- a copy of this factory with the file attached
- Since:
- Jet 4.0
-
withAttachedDirectory
@Nonnull public ServiceFactory<C,S> withAttachedDirectory(@Nonnull String id, @Nonnull File directory) Attaches a directory to this service factory under the given ID. It will become a part of the Jet job and available tocreateContextFn()
asprocSupplierContext.attachedDirectory(id)
.- Returns:
- a copy of this factory with the directory attached
- Since:
- Jet 4.0
-
withoutAttachedFiles
Returns a copy of thisServiceFactory
with any attached files removed.- Since:
- Jet 4.0
-
withPermission
Returns a copy of thisServiceFactory
with setting the required permission. This is en Enterprise feature. -
createContextFn
Returns the function that creates the shared context object. Each Jet member creates one such object and passes it to all the parallel service instances.- See Also:
-
createServiceFn
Returns the function that creates the service object. There can be many parallel service objects on each Jet member serving the same pipeline stage, their number is determined bystage.localParallelism
.- See Also:
-
destroyServiceFn
Returns the function that destroys the service object at the end of the Jet job.- See Also:
-
destroyContextFn
Returns the function that destroys the shared context object at the end of the Jet job.- See Also:
-
isCooperative
public boolean isCooperative()Returns theisCooperative
flag, seetoNonCooperative()
. -
attachedFiles
Returns the files and directories attached to this service factory. They will become a part of the Jet job and available tocreateContextFn()
asprocSupplierContext.attachedFile(file.toString())
orprocSupplierContext.attachedDirectory(directory.toString())
.- Since:
- Jet 4.0
-
permission
Returns the required permission to use this factory. This is an Enterprise feature. -
clone
-