Class ServiceFactory<C,S>

java.lang.Object
com.hazelcast.jet.pipeline.ServiceFactory<C,S>
Type Parameters:
C - type of the shared context object
S - type of the service object
All Implemented Interfaces:
Serializable, Cloneable

public final class ServiceFactory<C,S> extends Object implements Serializable, Cloneable
A holder of functions needed to create and destroy a service object used in pipeline transforms such as stage.mapUsingService().

The lifecycle of this factory object is as follows:

  1. When you submit a job, Jet serializes ServiceFactory and sends it to all the cluster members.
  2. On each member Jet calls createContextFn() to get a context object that will be shared across all the service instances on that member. For example, if you are connecting to an external service that provides a thread-safe client, you can create it here and then create individual sessions for each service instance.
  3. Jet repeatedly calls createServiceFn() to create as many service instances on each member as determined by the localParallelism of the pipeline stage. The invocations of createServiceFn() receive the context object.
  4. When the job is done, Jet calls destroyServiceFn() with each service instance.
  5. Finally, Jet calls destroyContextFn() with the context object.
If you don't need the member-wide context object, you can call the simpler methods ServiceFactories.nonSharedService(FunctionEx, ConsumerEx) ServiceFactories.processorLocalService} or ServiceFactories.sharedService(FunctionEx, ConsumerEx) ServiceFactories.memberLocalService}.

Here's a list of pipeline transforms that require a ServiceFactory:

Since:
Jet 4.0
See Also:
  • Field Details

    • COOPERATIVE_DEFAULT

      public static final boolean COOPERATIVE_DEFAULT
      Default value for isCooperative.
      See Also:
  • Method Details

    • withCreateContextFn

      @Nonnull public static <C> ServiceFactory<C,Void> withCreateContextFn(@Nonnull FunctionEx<? super ProcessorSupplier.Context,? extends C> createContextFn)
      Creates a new ServiceFactory with the given function that creates the shared context object. Make sure to also call withCreateServiceFn(com.hazelcast.function.BiFunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? super C, ? extends S_NEW>) that creates the service objects. You can use the shared context as a shared service object as well, by returning it from createServiceFn. To achieve this more conveniently, use ServiceFactories.sharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.ProcessorSupplier.Context, S>) instead of this method. If you don't need a shared context at all, just independent service instances, you can use the convenience of ServiceFactories.nonSharedService(com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.core.Processor.Context, ? extends S>).

      Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call toNonCooperative() as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Call stage.setLocalParallelism() to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.

      Type Parameters:
      C - type of the service context instance
      Parameters:
      createContextFn - the function to create new context object, given a ProcessorSupplier.Context. Called once per Jet member. It must be stateless.
      Returns:
      a new factory instance, not yet ready to use (needs the createServiceFn)
    • withDestroyContextFn

      @Nonnull public ServiceFactory<C,S> withDestroyContextFn(@Nonnull ConsumerEx<? super C> destroyContextFn)
      Returns a copy of this ServiceFactory with the destroyContext function replaced with the given function.

      Jet calls this function at the end of the job for each shared context object it created (one on each cluster member).

      Parameters:
      destroyContextFn - the function to destroy the shared service context. It must be stateless.
      Returns:
      a copy of this factory with the supplied destroy-function
    • withCreateServiceFn

      @Nonnull public <S_NEW> ServiceFactory<C,S_NEW> withCreateServiceFn(@Nonnull BiFunctionEx<? super Processor.Context,? super C,? extends S_NEW> createServiceFn)
      Returns a copy of this ServiceFactory with the given createService function.

      Jet calls this function to create each parallel instance of the service object (their number on each cluster member is determined by stage.localParallelism). Each invocation gets the shared context instance as the parameter, as well as the lower-level Processor.Context.

      Since the call of this method establishes the <S> type parameter of the service factory, you must call it before setting the destroyService function. Calling this method resets any pre-existing destroyService function to a no-op.

      Parameters:
      createServiceFn - the function that creates the service instance. It must be stateless.
      Returns:
      a copy of this factory with the supplied create-service-function
    • withDestroyServiceFn

      @Nonnull public ServiceFactory<C,S> withDestroyServiceFn(@Nonnull ConsumerEx<? super S> destroyServiceFn)
      Returns a copy of this ServiceFactory with the destroyService function replaced with the given function.

      The destroy function is called at the end of the job to destroy all created services objects.

      Parameters:
      destroyServiceFn - the function to destroy the service instance. This function is called once per processor instance. It must be stateless.
      Returns:
      a copy of this factory with the supplied destroy-function
    • toNonCooperative

      @Nonnull public ServiceFactory<C,S> toNonCooperative()
      Returns a copy of this ServiceFactory with the isCooperative flag set to false. Call this method if your service doesn't follow the cooperative processor contract, that is if it waits for IO, blocks for synchronization, takes too long to complete etc. If the service will perform async operations, you can typically use a cooperative processor. Cooperative processors offer higher performance.
      Returns:
      a copy of this factory with the isCooperative flag set to false.
    • setCooperative

      @Nonnull public ServiceFactory<C,S> setCooperative(boolean cooperative)
      Returns a copy of this ServiceFactory with the isCooperative flag set to cooperative argument value.

      Note: if the service will perform async operations, you can typically use a cooperative processor. Cooperative processors offer higher performance.

      Returns:
      a copy of this factory with the isCooperative flag changed.
    • withAttachedFile

      @Nonnull public ServiceFactory<C,S> withAttachedFile(@Nonnull String id, @Nonnull File file)
      Attaches a file to this service factory under the given ID. It will become a part of the Jet job and available to createContextFn() as procSupplierContext.attachedFile(id).
      Returns:
      a copy of this factory with the file attached
      Since:
      Jet 4.0
    • withAttachedDirectory

      @Nonnull public ServiceFactory<C,S> withAttachedDirectory(@Nonnull String id, @Nonnull File directory)
      Attaches a directory to this service factory under the given ID. It will become a part of the Jet job and available to createContextFn() as procSupplierContext.attachedDirectory(id).
      Returns:
      a copy of this factory with the directory attached
      Since:
      Jet 4.0
    • withoutAttachedFiles

      @Nonnull public ServiceFactory<C,S> withoutAttachedFiles()
      Returns a copy of this ServiceFactory with any attached files removed.
      Since:
      Jet 4.0
    • withPermission

      @Nonnull public ServiceFactory<C,S> withPermission(@Nonnull Permission permission)
      Returns a copy of this ServiceFactory with setting the required permission. This is en Enterprise feature.
    • createContextFn

      @Nonnull public FunctionEx<? super ProcessorSupplier.Context,? extends C> createContextFn()
      Returns the function that creates the shared context object. Each Jet member creates one such object and passes it to all the parallel service instances.
      See Also:
    • createServiceFn

      @Nonnull public BiFunctionEx<? super Processor.Context,? super C,? extends S> createServiceFn()
      Returns the function that creates the service object. There can be many parallel service objects on each Jet member serving the same pipeline stage, their number is determined by stage.localParallelism.
      See Also:
    • destroyServiceFn

      @Nonnull public ConsumerEx<? super S> destroyServiceFn()
      Returns the function that destroys the service object at the end of the Jet job.
      See Also:
    • destroyContextFn

      @Nonnull public ConsumerEx<? super C> destroyContextFn()
      Returns the function that destroys the shared context object at the end of the Jet job.
      See Also:
    • isCooperative

      public boolean isCooperative()
      Returns the isCooperative flag, see toNonCooperative().
    • attachedFiles

      @Nonnull public Map<String,File> attachedFiles()
      Returns the files and directories attached to this service factory. They will become a part of the Jet job and available to createContextFn() as procSupplierContext.attachedFile(file.toString()) or procSupplierContext.attachedDirectory(directory.toString()).
      Since:
      Jet 4.0
    • permission

      @Nullable public Permission permission()
      Returns the required permission to use this factory. This is an Enterprise feature.
    • clone

      protected ServiceFactory<C,S> clone()
      Overrides:
      clone in class Object