Class GrpcServices

java.lang.Object
com.hazelcast.jet.grpc.GrpcServices

public final class GrpcServices extends Object
Provides ServiceFactory implementations for calling gRPC endpoints. The ServiceFactory created are designed to be used with the mapUsingServiceAsync transform.

Currently, two types of gRPC services are supported:

Since:
Jet 4.1
  • Method Details

    • unaryService

      @Nonnull public static <T, R> ServiceFactory<?,? extends GrpcService<T,R>> unaryService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
      Creates a ServiceFactory that calls out to a unary gRPC service.

      For example, given the protobuf definition below:

      
       service Greeter {
         // Sends a greeting
         rpc SayHello (HelloRequest) returns (HelloReply) {}
       }
       
      We can create the following service factory:
      
       ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
           () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
           channel -> GreeterGrpc.newStub(channel)::sayHello
       );
       
      where GreeterGrpc is the class auto-generated by the protobuf compiler.

      The created ServiceFactory should be used with the mapUsingServiceAsync transform as follows:

      
       Pipeline p = Pipeline.create();
       p.readFrom(TestSources.items("one", "two", "three", "four"))
           .mapUsingServiceAsync(greeterService, (service, input) -> {
               HelloRequest request = HelloRequest.newBuilder().setName(input).build();
              return service.call(request);
       })
        .writeTo(Sinks.logger());
       

      The remote end can signal an error for a given input item. In that case the CompletableFuture returned from service.call(request) will be completed with that exception. To catch and handle it, use the CompletableFuture API.

      Type Parameters:
      T - type of the request object
      R - type of the response object
      Parameters:
      channelFn - creates the channel builder. A single channel is created per processor instance.
      callStubFn - a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
    • bidirectionalStreamingService

      @Nonnull public static <T, R> ServiceFactory<?,? extends GrpcService<T,R>> bidirectionalStreamingService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
      Creates a ServiceFactory that calls out to a bidirectional streaming gRPC service. This may provide better throughput compared to the unary service because all communication happens within a single gRPC call, eliminating some overheads.

      For example, given the protobuf definition below:

      
       service Greeter {
         // Sends a greeting
         rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
       }
       
      We can create the following service factory:
      
       ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService =
           bidirectionalStreamingService(
               () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
               channel -> GreeterGrpc.newStub(channel)::sayHello
       );
       
      where GreeterGrpc is the auto-generated class by the protobuf compiler.

      The created ServiceFactory should be used in the * used with the mapUsingServiceAsync transform as follows:

      
       Pipeline p = Pipeline.create();
       p.readFrom(TestSources.items("one", "two", "three", "four"))
           .mapUsingServiceAsync(greeterService, (service, input) -> {
               HelloRequest request = HelloRequest.newBuilder().setName(input).build();
              return service.call(request);
       })
        .writeTo(Sinks.logger());
       

      The remote end can signal an error for a given input item. In that case the CompletableFuture returned from service.call(request) will be completed with that exception. To catch and handle it, use the CompletableFuture API.

      Type Parameters:
      T - type of the request object
      R - type of the response object
      Parameters:
      channelFn - creates the channel builder. A single channel is created per processor instance.
      callStubFn - a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.