Class GrpcServices
ServiceFactory
implementations for calling gRPC
endpoints. The ServiceFactory
created are designed to be
used with the mapUsingServiceAsync
transform.
Currently, two types of gRPC services are supported:
unary
bidirectionalStreamingService(SupplierEx, FunctionEx)
(SupplierEx, FunctionEx) bidirectionalStreaming}
- Since:
- Jet 4.1
-
Method Summary
Modifier and TypeMethodDescriptionstatic <T,
R> ServiceFactory<?, ? extends GrpcService<T, R>> bidirectionalStreamingService
(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel, ? extends FunctionEx<io.grpc.stub.StreamObserver<R>, io.grpc.stub.StreamObserver<T>>> callStubFn) Creates aServiceFactory
that calls out to a bidirectional streaming gRPC service.static <T,
R> ServiceFactory<?, ? extends GrpcService<T, R>> unaryService
(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel, ? extends BiConsumerEx<T, io.grpc.stub.StreamObserver<R>>> callStubFn) Creates aServiceFactory
that calls out to a unary gRPC service.
-
Method Details
-
unaryService
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T, unaryServiceR>> (@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel, ? extends BiConsumerEx<T, io.grpc.stub.StreamObserver<R>>> callStubFn) Creates aServiceFactory
that calls out to a unary gRPC service.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );
GreeterGrpc
is the class auto-generated by the protobuf compiler.The created
ServiceFactory
should be used with themapUsingServiceAsync
transform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case the
CompletableFuture
returned fromservice.call(request)
will be completed with that exception. To catch and handle it, use theCompletableFuture
API.- Type Parameters:
T
- type of the request objectR
- type of the response object- Parameters:
channelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
-
bidirectionalStreamingService
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T, bidirectionalStreamingServiceR>> (@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel, ? extends FunctionEx<io.grpc.stub.StreamObserver<R>, io.grpc.stub.StreamObserver<T>>> callStubFn) Creates aServiceFactory
that calls out to a bidirectional streaming gRPC service. This may provide better throughput compared to theunary
service because all communication happens within a single gRPC call, eliminating some overheads.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (stream HelloRequest) returns (stream HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = bidirectionalStreamingService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );
GreeterGrpc
is the auto-generated class by the protobuf compiler.The created
ServiceFactory
should be used in the * used with themapUsingServiceAsync
transform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case the
CompletableFuture
returned fromservice.call(request)
will be completed with that exception. To catch and handle it, use theCompletableFuture
API.- Type Parameters:
T
- type of the request objectR
- type of the response object- Parameters:
channelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
-