public final class GrpcServices extends Object
ServiceFactory
implementations for calling gRPC
endpoints. The ServiceFactory
created are designed to be
used with the mapUsingServiceAsync
transform.
Currently two types of gRPC services are supported:
unary
bidirectionalStreamingService(SupplierEx, FunctionEx)
(SupplierEx, FunctionEx)
bidirectionalStreaming}Modifier and Type | Method and Description |
---|---|
static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> |
bidirectionalStreamingService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
Creates a
ServiceFactory that calls out to a
bidrectional streaming gRPC service. |
static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> |
unaryService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
Creates a
ServiceFactory that calls out to a
unary gRPC service. |
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> unaryService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
ServiceFactory
that calls out to a
unary gRPC service.
For example, given the protobuf definition below:
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
where GreeterGrpc
is the class auto-generated by the protobuf
compiler.
The created ServiceFactory
should be used with the
mapUsingServiceAsync
transform as follows:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case
the CompletableFuture
returned from service.call(request)
will be completed with that exception. To catch and handle it, use the
CompletableFuture
API.
T
- type of the request objectR
- type of the response objectchannelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a
function that calls the stub given the input item and the observer.
It will be called once per input item.@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> bidirectionalStreamingService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
ServiceFactory
that calls out to a
bidrectional streaming gRPC service. This may provide better
throughput compared to the unaryService(SupplierEx, FunctionEx)
unary} service because all communication happens within a single gRPC
call, eliminating some overheads.
For example, given the protobuf definition below:
service Greeter {
// Sends a greeting
rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService =
bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
where GreeterGrpc
is the auto-generated class by the protobuf compiler.
The created ServiceFactory
should be used in the * used with the
mapUsingServiceAsync
transform as follows:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case
the CompletableFuture
returned from service.call(request)
will be completed with that exception. To catch and handle it, use the
CompletableFuture
API.
T
- type of the request objectR
- type of the response objectchannelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a
function that calls the stub given the input item and the observer.
It will be called once per input item.Copyright © 2021 Hazelcast, Inc.. All rights reserved.