T
- type of the output of the mapping function from SearchHit
-> Tpublic final class ElasticSourceBuilder<T> extends Object
mapToItemFn
Usage:
BatchSource<String> source = new ElasticSourceBuilder<String>()
.clientFn(() -> client(host, port))
.searchRequestFn(() -> new SearchRequest("my-index"))
.mapToItemFn(SearchHit::getSourceAsString)
.build();
BatchStage<String> stage = p.readFrom(source);
Requires clientFn(SupplierEx)
,
searchRequestFn(SupplierEx)
and mapToItemFn(FunctionEx)
.Constructor and Description |
---|
ElasticSourceBuilder() |
Modifier and Type | Method and Description |
---|---|
BatchSource<T> |
build()
Build Elasticsearch
BatchSource with supplied parameters |
ElasticSourceBuilder<T> |
clientFn(SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn)
Set the client supplier function
|
ElasticSourceBuilder<T> |
enableCoLocatedReading()
Enable co-located reading
Jet cluster member must run exactly on the same nodes as Elastic cluster.
|
ElasticSourceBuilder<T> |
enableSlicing()
Enable slicing
|
<T_NEW> ElasticSourceBuilder<T_NEW> |
mapToItemFn(FunctionEx<? super org.elasticsearch.search.SearchHit,T_NEW> mapToItemFn)
Set the function to map SearchHit to a pipeline item
|
ElasticSourceBuilder<T> |
optionsFn(FunctionEx<? super org.elasticsearch.action.ActionRequest,org.elasticsearch.client.RequestOptions> optionsFn)
Set the function that provides
RequestOptions |
ElasticSourceBuilder<T> |
retries(int retries)
Number of retries the connector will do in addition to Elastic
client retries
Elastic client tries to connect to a node only once for each
request.
|
ElasticSourceBuilder<T> |
scrollKeepAlive(String scrollKeepAlive)
Set the keepAlive for Elastic search scroll
|
ElasticSourceBuilder<T> |
searchRequestFn(SupplierEx<org.elasticsearch.action.search.SearchRequest> searchRequestFn)
Set the search request supplier function
|
@Nonnull public BatchSource<T> build()
BatchSource
with supplied parameters@Nonnull public ElasticSourceBuilder<T> clientFn(@Nonnull SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn)
The connector uses the returned instance to access Elasticsearch.
Also see ElasticClients
for convenience
factory methods.
For example, to provide an authenticated client:
builder.clientFn(() -> client(host, port, username, password))
This parameter is required.clientFn
- supplier function returning configured Elasticsearch
REST client@Nonnull public ElasticSourceBuilder<T> searchRequestFn(@Nonnull SupplierEx<org.elasticsearch.action.search.SearchRequest> searchRequestFn)
The connector executes this search request to retrieve documents from Elasticsearch.
For example, to create SearchRequest limited to an index `logs`:
builder.searchRequestFn(() -> new SearchRequest("logs"))
This parameter is required.searchRequestFn
- search request supplier function@Nonnull public <T_NEW> ElasticSourceBuilder<T_NEW> mapToItemFn(@Nonnull FunctionEx<? super org.elasticsearch.search.SearchHit,T_NEW> mapToItemFn)
For example, to map a SearchHit to a value of a field `productId`:
builder.mapToItemFn(hit -> (String) hit.getSourceAsMap().get("productId"))
This parameter is required.mapToItemFn
- maps search hits to output items@Nonnull public ElasticSourceBuilder<T> optionsFn(@Nonnull FunctionEx<? super org.elasticsearch.action.ActionRequest,org.elasticsearch.client.RequestOptions> optionsFn)
RequestOptions
It can either return a constant value or a value based on provided request.
For example, use this to provide a custom authentication header:
sourceBuilder.optionsFn((request) -> {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer " + TOKEN);
return builder.build();
})
optionsFn
- function that provides RequestOptions
@Nonnull public ElasticSourceBuilder<T> enableSlicing()
Number of slices is equal to globalParallelism
(localParallelism * numberOfNodes)
when only slicing is enabled. When
co-located reading is enabled as well then number of slices for
particular node is equal to localParallelism
.
Use this option to read from multiple shards in parallel. It can also be used on single shard, but it may increase initial latency. See Elastic documentation for Sliced Scroll for details.
@Nonnull public ElasticSourceBuilder<T> enableCoLocatedReading()
@Nonnull public ElasticSourceBuilder<T> scrollKeepAlive(@Nonnull String scrollKeepAlive)
The value must be in Elastic time unit format, e.g. 500ms for 500 milliseconds, 30s for 30 seconds,
5m for 5 minutes. See SearchRequest.scroll(String)
.
scrollKeepAlive
- keepAlive value, this must be high enough to
process all results from a single scroll, default
value 1m@Nonnull public ElasticSourceBuilder<T> retries(int retries)
retries
- number of retries, defaults to 5Copyright © 2023 Hazelcast, Inc.. All rights reserved.