Class GrpcServices

    • Method Detail

      • unaryService

        @Nonnull
        public static <T,​R> ServiceFactory<?,​? extends GrpcService<T,​R>> unaryService​(@Nonnull
                                                                                                        SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
                                                                                                        @Nonnull
                                                                                                        FunctionEx<? super io.grpc.ManagedChannel,​? extends BiConsumerEx<T,​io.grpc.stub.StreamObserver<R>>> callStubFn)
        Creates a ServiceFactory that calls out to a unary gRPC service.

        For example, given the protobuf definition below:

        
         service Greeter {
           // Sends a greeting
           rpc SayHello (HelloRequest) returns (HelloReply) {}
         }
         
        We can create the following service factory:
        
         ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
             () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
             channel -> GreeterGrpc.newStub(channel)::sayHello
         );
         
        where GreeterGrpc is the class auto-generated by the protobuf compiler.

        The created ServiceFactory should be used with the mapUsingServiceAsync transform as follows:

        
         Pipeline p = Pipeline.create();
         p.readFrom(TestSources.items("one", "two", "three", "four"))
             .mapUsingServiceAsync(greeterService, (service, input) -> {
                 HelloRequest request = HelloRequest.newBuilder().setName(input).build();
                return service.call(request);
         })
          .writeTo(Sinks.logger());
         

        The remote end can signal an error for a given input item. In that case the CompletableFuture returned from service.call(request) will be completed with that exception. To catch and handle it, use the CompletableFuture API.

        Type Parameters:
        T - type of the request object
        R - type of the response object
        Parameters:
        channelFn - creates the channel builder. A single channel is created per processor instance.
        callStubFn - a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
      • bidirectionalStreamingService

        @Nonnull
        public static <T,​R> ServiceFactory<?,​? extends GrpcService<T,​R>> bidirectionalStreamingService​(@Nonnull
                                                                                                                         SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
                                                                                                                         @Nonnull
                                                                                                                         FunctionEx<? super io.grpc.ManagedChannel,​? extends FunctionEx<io.grpc.stub.StreamObserver<R>,​io.grpc.stub.StreamObserver<T>>> callStubFn)
        Creates a ServiceFactory that calls out to a bidirectional streaming gRPC service. This may provide better throughput compared to the unary service because all communication happens within a single gRPC call, eliminating some overheads.

        For example, given the protobuf definition below:

        
         service Greeter {
           // Sends a greeting
           rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
         }
         
        We can create the following service factory:
        
         ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService =
             bidirectionalStreamingService(
                 () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
                 channel -> GreeterGrpc.newStub(channel)::sayHello
         );
         
        where GreeterGrpc is the auto-generated class by the protobuf compiler.

        The created ServiceFactory should be used in the * used with the mapUsingServiceAsync transform as follows:

        
         Pipeline p = Pipeline.create();
         p.readFrom(TestSources.items("one", "two", "three", "four"))
             .mapUsingServiceAsync(greeterService, (service, input) -> {
                 HelloRequest request = HelloRequest.newBuilder().setName(input).build();
                return service.call(request);
         })
          .writeTo(Sinks.logger());
         

        The remote end can signal an error for a given input item. In that case the CompletableFuture returned from service.call(request) will be completed with that exception. To catch and handle it, use the CompletableFuture API.

        Type Parameters:
        T - type of the request object
        R - type of the response object
        Parameters:
        channelFn - creates the channel builder. A single channel is created per processor instance.
        callStubFn - a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.