Class ElasticSinkBuilder<T>
- Type Parameters:
T-
- All Implemented Interfaces:
Serializable
The Sink first maps items from the pipeline using the provided
mapToRequestFn(FunctionEx) and then using BulkRequest.
BulkRequest() is used by default, it can be
modified by providing custom bulkRequestFn(SupplierEx)
Usage:
Sink<Map<String, ?>> elasticSink = new ElasticSinkBuilder<Map<String, ?>>()
.clientFn(() -> ElasticClients.client(host, port))
.mapToRequestFn(item -> new IndexRequest("my-index").source(item))
.build();
Requires clientFn(SupplierEx) and mapToRequestFn(FunctionEx).
- Since:
- Jet 4.2
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionbuild()Create a sink that writes data into Elasticsearch based on this builder configurationbulkRequestFn(SupplierEx<org.elasticsearch.action.bulk.BulkRequest> bulkRequestFn) Set the supplier function for BulkRequest, defaults to newBulkRequest()clientFn(SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn) Set the client supplier function<T_NEW> ElasticSinkBuilder<T_NEW>mapToRequestFn(FunctionEx<? super T_NEW, ? extends org.elasticsearch.action.DocWriteRequest<?>> mapToRequestFn) Set the function mapping the item from a pipeline item to an index requestoptionsFn(FunctionEx<? super org.elasticsearch.action.ActionRequest, org.elasticsearch.client.RequestOptions> optionsFn) Set the function that providesRequestOptionsretries(int retries) Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request.
-
Constructor Details
-
ElasticSinkBuilder
public ElasticSinkBuilder()
-
-
Method Details
-
clientFn
@Nonnull public ElasticSinkBuilder<T> clientFn(@Nonnull SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn) Set the client supplier functionThe connector uses the returned instance to access Elasticsearch. Also see
ElasticClientsfor convenience factory methods.For example, to provide an authenticated client:
This parameter is required.builder.clientFn(() -> client(host, port, username, password))- Parameters:
clientFn- supplier function returning configured Elasticsearch REST client
-
bulkRequestFn
@Nonnull public ElasticSinkBuilder<T> bulkRequestFn(@Nonnull SupplierEx<org.elasticsearch.action.bulk.BulkRequest> bulkRequestFn) Set the supplier function for BulkRequest, defaults to newBulkRequest()For example, to modify the BulkRequest used to index documents:
builder.bulkRequestFn(() -> new BulkRequest().setRefreshPolicy(IMMEDIATE))- Parameters:
bulkRequestFn- supplier function for the bulk request- See Also:
-
mapToRequestFn
@Nonnull public <T_NEW> ElasticSinkBuilder<T_NEW> mapToRequestFn(@Nonnull FunctionEx<? super T_NEW, ? extends org.elasticsearch.action.DocWriteRequest<?>> mapToRequestFn) Set the function mapping the item from a pipeline item to an index requestFor example, to create an IndexRequest for a versioned document:
This parameter is required.builder.mapToRequestFn((mapItem) -> new IndexRequest("my-index") .source(map) .version((Long) map.get("version"))- Type Parameters:
T_NEW- type of the items from the pipeline- Parameters:
mapToRequestFn- maps an item from the stream to anIndexRequest,UpdateRequestorDeleteRequest
-
optionsFn
@Nonnull public ElasticSinkBuilder<T> optionsFn(@Nonnull FunctionEx<? super org.elasticsearch.action.ActionRequest, org.elasticsearch.client.RequestOptions> optionsFn) Set the function that providesRequestOptionsIt can either return a constant value or a value based on provided request.
For example, to provide a custom authentication header:
sinkBuilder.optionsFn((request) -> { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); builder.addHeader("Authorization", "Bearer " + TOKEN); return builder.build(); })- Parameters:
optionsFn- function that providesRequestOptions- See Also:
-
retries
Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request. When a request fails the node is marked dead and is not retried again for the request. This causes problems with single node clusters or in a situation where whole cluster becomes unavailable at the same time (e.g. due to a network issue). The initial delay is 2s, increasing by factor of 2 with each retry (4s, 8s, 16s, ..).- Parameters:
retries- number of retries, defaults to 5
-
build
Create a sink that writes data into Elasticsearch based on this builder configuration
-