public final class GrpcServices extends Object
ServiceFactory implementations for calling gRPC
endpoints. The ServiceFactory created are designed to be
used with the mapUsingServiceAsync
transform.
Currently two types of gRPC services are supported:
unarybidirectionalStreamingService(SupplierEx, FunctionEx) (SupplierEx, FunctionEx)
bidirectionalStreaming}| Modifier and Type | Method and Description |
|---|---|
static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> |
bidirectionalStreamingService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
Creates a
ServiceFactory that calls out to a
bidrectional streaming gRPC service. |
static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> |
unaryService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn,
FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
Creates a
ServiceFactory that calls out to a
unary gRPC service. |
@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> unaryService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
ServiceFactory that calls out to a
unary gRPC service.
For example, given the protobuf definition below:
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
where GreeterGrpc is the class auto-generated by the protobuf
compiler.
The created ServiceFactory should be used with the
mapUsingServiceAsync
transform as follows:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case
the CompletableFuture returned from service.call(request)
will be completed with that exception. To catch and handle it, use the
CompletableFuture API.
T - type of the request objectR - type of the response objectchannelFn - creates the channel builder. A single channel is created per processor instance.callStubFn - a function which, given a channel, creates the stub and returns a
function that calls the stub given the input item and the observer.
It will be called once per input item.@Nonnull public static <T,R> ServiceFactory<?,? extends GrpcService<T,R>> bidirectionalStreamingService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
ServiceFactory that calls out to a
bidrectional streaming gRPC service. This may provide better
throughput compared to the unaryService(SupplierEx, FunctionEx)
unary} service because all communication happens within a single gRPC
call, eliminating some overheads.
For example, given the protobuf definition below:
service Greeter {
// Sends a greeting
rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService =
bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
where GreeterGrpc is the auto-generated class by the protobuf compiler.
The created ServiceFactory should be used in the * used with the
mapUsingServiceAsync
transform as follows:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case
the CompletableFuture returned from service.call(request)
will be completed with that exception. To catch and handle it, use the
CompletableFuture API.
T - type of the request objectR - type of the response objectchannelFn - creates the channel builder. A single channel is created per processor instance.callStubFn - a function which, given a channel, creates the stub and returns a
function that calls the stub given the input item and the observer.
It will be called once per input item.Copyright © 2023 Hazelcast, Inc.. All rights reserved.