C
- type of the shared context objectS
- type of the service objectpublic final class ServiceFactory<C,S> extends Object implements Serializable, Cloneable
stage.mapUsingService()
.
The lifecycle of this factory object is as follows:
ServiceFactory
and sends
it to all the cluster members.
createContextFn()
to get a context
object that will be shared across all the service instances on that
member. For example, if you are connecting to an external service that
provides a thread-safe client, you can create it here and then create
individual sessions for each service instance.
createServiceFn()
to create as many
service instances on each member as determined by the localParallelism
of the pipeline
stage. The invocations of createServiceFn()
receive the context
object.
destroyServiceFn()
with each
service instance.
destroyContextFn()
with the context object.
ServiceFactories.nonSharedService(FunctionEx, ConsumerEx)
ServiceFactories.processorLocalService} or ServiceFactories.sharedService(FunctionEx, ConsumerEx)
ServiceFactories.memberLocalService}.
Here's a list of pipeline transforms that require a ServiceFactory
:
GeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
GeneralStage.filterUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiPredicateEx<? super S, ? super T>)
GeneralStage.flatMapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends com.hazelcast.jet.Traverser<R>>)
GeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>)
GeneralStage.mapUsingServiceAsyncBatched(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, int, com.hazelcast.function.BiFunctionEx<? super S, ? super java.util.List<T>, ? extends java.util.concurrent.CompletableFuture<java.util.List<R>>>)
GeneralStageWithKey.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends R>)
GeneralStageWithKey.filterUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriPredicate<? super S, ? super K, ? super T>)
GeneralStageWithKey.flatMapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<R>>)
GeneralStageWithKey.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.jet.function.TriFunction<? super S, ? super K, ? super T, java.util.concurrent.CompletableFuture<R>>)
Modifier and Type | Field and Description |
---|---|
static boolean |
COOPERATIVE_DEFAULT
Default value for
isCooperative . |
Modifier and Type | Method and Description |
---|---|
Map<String,File> |
attachedFiles()
Returns the files and directories attached to this service factory.
|
protected ServiceFactory<C,S> |
clone() |
FunctionEx<? super ProcessorSupplier.Context,? extends C> |
createContextFn()
Returns the function that creates the shared context object.
|
BiFunctionEx<? super Processor.Context,? super C,? extends S> |
createServiceFn()
Returns the function that creates the service object.
|
ConsumerEx<? super C> |
destroyContextFn()
Returns the function that destroys the shared context object at the end
of the Jet job.
|
ConsumerEx<? super S> |
destroyServiceFn()
Returns the function that destroys the service object at the end of the
Jet job.
|
boolean |
isCooperative()
Returns the
isCooperative flag, see toNonCooperative() . |
ServiceFactory<C,S> |
toNonCooperative()
Returns a copy of this
ServiceFactory with the isCooperative flag set to false . |
ServiceFactory<C,S> |
withAttachedDirectory(String id,
File directory)
Attaches a directory to this service factory under the given ID.
|
ServiceFactory<C,S> |
withAttachedFile(String id,
File file)
Attaches a file to this service factory under the given ID.
|
static <C> ServiceFactory<C,Void> |
withCreateContextFn(FunctionEx<? super ProcessorSupplier.Context,? extends C> createContextFn)
Creates a new
ServiceFactory with the given function that
creates the shared context object. |
<S_NEW> ServiceFactory<C,S_NEW> |
withCreateServiceFn(BiFunctionEx<? super Processor.Context,? super C,? extends S_NEW> createServiceFn)
Returns a copy of this
ServiceFactory with the given createService function. |
ServiceFactory<C,S> |
withDestroyContextFn(ConsumerEx<? super C> destroyContextFn)
Returns a copy of this
ServiceFactory with the destroyContext function replaced with the given function. |
ServiceFactory<C,S> |
withDestroyServiceFn(ConsumerEx<? super S> destroyServiceFn)
Returns a copy of this
ServiceFactory with the destroyService function replaced with the given function. |
ServiceFactory<C,S> |
withoutAttachedFiles()
Returns a copy of this
ServiceFactory with any attached files
removed. |
public static final boolean COOPERATIVE_DEFAULT
isCooperative
.@Nonnull public static <C> ServiceFactory<C,Void> withCreateContextFn(@Nonnull FunctionEx<? super ProcessorSupplier.Context,? extends C> createContextFn)
ServiceFactory
with the given function that
creates the shared context object. Make sure to also call withCreateServiceFn(com.hazelcast.function.BiFunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? super C, ? extends S_NEW>)
that creates the service objects. You can use the
shared context as a shared service object as well, by returning it from
createServiceFn
. To achieve this more conveniently, use ServiceFactories.sharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.ProcessorSupplier.Context, S>)
instead of this method. If you don't need
a shared context at all, just independent service instances, you can use
the convenience of ServiceFactories.nonSharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? extends S>)
.
Note: if your service has a blocking API (e.g., doing
synchronous IO or acquiring locks), you must call toNonCooperative()
as a hint to the Jet execution engine to start a
dedicated thread for those calls. Failing to do this can cause severe
performance problems. You should also carefully consider how much local
parallelism you need for this step since each parallel tasklet needs its
own thread. Call stage.setLocalParallelism()
to set an explicit level, otherwise it will
depend on the number of cores on the Jet machine, which makes no sense
for blocking code.
C
- type of the service context instancecreateContextFn
- the function to create new context object, given a ProcessorSupplier.Context
. Called once per Jet member. It must be
stateless.createServiceFn
)@Nonnull public ServiceFactory<C,S> withDestroyContextFn(@Nonnull ConsumerEx<? super C> destroyContextFn)
ServiceFactory
with the destroyContext
function replaced with the given function.
Jet calls this function at the end of the job for each shared context object it created (one on each cluster member).
destroyContextFn
- the function to destroy the shared service
context. It must be stateless.@Nonnull public <S_NEW> ServiceFactory<C,S_NEW> withCreateServiceFn(@Nonnull BiFunctionEx<? super Processor.Context,? super C,? extends S_NEW> createServiceFn)
ServiceFactory
with the given createService
function.
Jet calls this function to create each parallel instance of the service
object (their number on each cluster member is determined by stage.localParallelism
). Each
invocation gets the shared context
instance as the parameter, as well as the lower-level Processor.Context
.
Since the call of this method establishes the <S>
type parameter
of the service factory, you must call it before setting the destroyService
function. Calling
this method resets any pre-existing destroyService
function to a
no-op.
createServiceFn
- the function that creates the service instance.
It must be stateless.@Nonnull public ServiceFactory<C,S> withDestroyServiceFn(@Nonnull ConsumerEx<? super S> destroyServiceFn)
ServiceFactory
with the destroyService
function replaced with the given function.
The destroy function is called at the end of the job to destroy all created services objects.
destroyServiceFn
- the function to destroy the service instance.
This function is called once per processor instance. It must be
stateless.@Nonnull public ServiceFactory<C,S> toNonCooperative()
ServiceFactory
with the isCooperative
flag set to false
. Call this method if your
service doesn't follow the cooperative processor contract, that is if it waits for IO, blocks for
synchronization, takes too long to complete etc. If the service will
perform async operations, you can typically use a cooperative
processor. Cooperative processors offer higher performance.isCooperative
flag set
to false
.@Nonnull public ServiceFactory<C,S> withAttachedFile(@Nonnull String id, @Nonnull File file)
createContextFn()
as procSupplierContext.attachedFile(id)
.@Nonnull public ServiceFactory<C,S> withAttachedDirectory(@Nonnull String id, @Nonnull File directory)
createContextFn()
as procSupplierContext.attachedDirectory(id)
.@Nonnull public ServiceFactory<C,S> withoutAttachedFiles()
ServiceFactory
with any attached files
removed.@Nonnull public FunctionEx<? super ProcessorSupplier.Context,? extends C> createContextFn()
withCreateContextFn(FunctionEx)
@Nonnull public BiFunctionEx<? super Processor.Context,? super C,? extends S> createServiceFn()
stage.localParallelism
.withCreateServiceFn(BiFunctionEx)
@Nonnull public ConsumerEx<? super S> destroyServiceFn()
withDestroyServiceFn(ConsumerEx)
@Nonnull public ConsumerEx<? super C> destroyContextFn()
withDestroyContextFn(ConsumerEx)
public boolean isCooperative()
isCooperative
flag, see toNonCooperative()
.@Nonnull public Map<String,File> attachedFiles()
createContextFn()
as procSupplierContext.attachedFile(file.toString())
or
procSupplierContext.attachedDirectory(directory.toString())
.protected ServiceFactory<C,S> clone()
Copyright © 2021 Hazelcast, Inc.. All rights reserved.