Class ElasticSinkBuilder<T>
- Type Parameters:
T
-
- All Implemented Interfaces:
Serializable
The Sink first maps items from the pipeline using the provided
mapToRequestFn(FunctionEx)
and then using BulkRequest
.
BulkRequest()
is used by default, it can be
modified by providing custom bulkRequestFn(SupplierEx)
Usage:
Sink<Map<String, ?>> elasticSink = new ElasticSinkBuilder<Map<String, ?>>()
.clientFn(() -> ElasticClients.client(host, port))
.mapToRequestFn(item -> new IndexRequest("my-index").source(item))
.build();
Requires clientFn(SupplierEx)
and mapToRequestFn(FunctionEx)
.
- Since:
- Jet 4.2
- See Also:
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionbuild()
Create a sink that writes data into Elasticsearch based on this builder configurationbulkRequestFn
(SupplierEx<org.elasticsearch.action.bulk.BulkRequest> bulkRequestFn) Set the supplier function for BulkRequest, defaults to newBulkRequest()
clientFn
(SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn) Set the client supplier function<T_NEW> ElasticSinkBuilder<T_NEW>
mapToRequestFn
(FunctionEx<? super T_NEW, ? extends org.elasticsearch.action.DocWriteRequest<?>> mapToRequestFn) Set the function mapping the item from a pipeline item to an index requestoptionsFn
(FunctionEx<? super org.elasticsearch.action.ActionRequest, org.elasticsearch.client.RequestOptions> optionsFn) Set the function that providesRequestOptions
retries
(int retries) Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request.
-
Constructor Details
-
ElasticSinkBuilder
public ElasticSinkBuilder()
-
-
Method Details
-
clientFn
@Nonnull public ElasticSinkBuilder<T> clientFn(@Nonnull SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn) Set the client supplier functionThe connector uses the returned instance to access Elasticsearch. Also see
ElasticClients
for convenience factory methods.For example, to provide an authenticated client:
This parameter is required.builder.clientFn(() -> client(host, port, username, password))
- Parameters:
clientFn
- supplier function returning configured Elasticsearch REST client
-
bulkRequestFn
@Nonnull public ElasticSinkBuilder<T> bulkRequestFn(@Nonnull SupplierEx<org.elasticsearch.action.bulk.BulkRequest> bulkRequestFn) Set the supplier function for BulkRequest, defaults to newBulkRequest()
For example, to modify the BulkRequest used to index documents:
builder.bulkRequestFn(() -> new BulkRequest().setRefreshPolicy(IMMEDIATE))
- Parameters:
bulkRequestFn
- supplier function for the bulk request- See Also:
-
mapToRequestFn
@Nonnull public <T_NEW> ElasticSinkBuilder<T_NEW> mapToRequestFn(@Nonnull FunctionEx<? super T_NEW, ? extends org.elasticsearch.action.DocWriteRequest<?>> mapToRequestFn) Set the function mapping the item from a pipeline item to an index requestFor example, to create an IndexRequest for a versioned document:
This parameter is required.builder.mapToRequestFn((mapItem) -> new IndexRequest("my-index") .source(map) .version((Long) map.get("version"))
- Type Parameters:
T_NEW
- type of the items from the pipeline- Parameters:
mapToRequestFn
- maps an item from the stream to anIndexRequest
,UpdateRequest
orDeleteRequest
-
optionsFn
@Nonnull public ElasticSinkBuilder<T> optionsFn(@Nonnull FunctionEx<? super org.elasticsearch.action.ActionRequest, org.elasticsearch.client.RequestOptions> optionsFn) Set the function that providesRequestOptions
It can either return a constant value or a value based on provided request.
For example, to provide a custom authentication header:
sinkBuilder.optionsFn((request) -> { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); builder.addHeader("Authorization", "Bearer " + TOKEN); return builder.build(); })
- Parameters:
optionsFn
- function that providesRequestOptions
- See Also:
-
retries
Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request. When a request fails the node is marked dead and is not retried again for the request. This causes problems with single node clusters or in a situation where whole cluster becomes unavailable at the same time (e.g. due to a network issue). The initial delay is 2s, increasing by factor of 2 with each retry (4s, 8s, 16s, ..).- Parameters:
retries
- number of retries, defaults to 5
-
build
Create a sink that writes data into Elasticsearch based on this builder configuration
-