Class GrpcServices
ServiceFactory implementations for calling gRPC
 endpoints. The ServiceFactory created are designed to be
 used with the mapUsingServiceAsync
 transform.
 Currently, two types of gRPC services are supported:
unarybidirectionalStreamingService(SupplierEx, FunctionEx)(SupplierEx, FunctionEx) bidirectionalStreaming}
- Since:
 - Jet 4.1
 
- 
Method Summary
Modifier and TypeMethodDescriptionstatic <T,R> ServiceFactory<?, ? extends GrpcService<T, R>> bidirectionalStreamingService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel, ? extends FunctionEx<io.grpc.stub.StreamObserver<R>, io.grpc.stub.StreamObserver<T>>> callStubFn) Creates aServiceFactorythat calls out to a bidirectional streaming gRPC service.static <T,R> ServiceFactory<?, ? extends GrpcService<T, R>> unaryService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel, ? extends BiConsumerEx<T, io.grpc.stub.StreamObserver<R>>> callStubFn) Creates aServiceFactorythat calls out to a unary gRPC service. 
- 
Method Details
- 
unaryService
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T, unaryServiceR>> (@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel, ? extends BiConsumerEx<T, io.grpc.stub.StreamObserver<R>>> callStubFn) Creates aServiceFactorythat calls out to a unary gRPC service.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );GreeterGrpcis the class auto-generated by the protobuf compiler.The created
ServiceFactoryshould be used with themapUsingServiceAsynctransform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());The remote end can signal an error for a given input item. In that case the
CompletableFuturereturned fromservice.call(request)will be completed with that exception. To catch and handle it, use theCompletableFutureAPI.- Type Parameters:
 T- type of the request objectR- type of the response object- Parameters:
 channelFn- creates the channel builder. A single channel is created per processor instance.callStubFn- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
 - 
bidirectionalStreamingService
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T, bidirectionalStreamingServiceR>> (@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel, ? extends FunctionEx<io.grpc.stub.StreamObserver<R>, io.grpc.stub.StreamObserver<T>>> callStubFn) Creates aServiceFactorythat calls out to a bidirectional streaming gRPC service. This may provide better throughput compared to theunaryservice because all communication happens within a single gRPC call, eliminating some overheads.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (stream HelloRequest) returns (stream HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = bidirectionalStreamingService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );GreeterGrpcis the auto-generated class by the protobuf compiler.The created
ServiceFactoryshould be used in the * used with themapUsingServiceAsynctransform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());The remote end can signal an error for a given input item. In that case the
CompletableFuturereturned fromservice.call(request)will be completed with that exception. To catch and handle it, use theCompletableFutureAPI.- Type Parameters:
 T- type of the request objectR- type of the response object- Parameters:
 channelFn- creates the channel builder. A single channel is created per processor instance.callStubFn- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
 
 -