Class ElasticSourceBuilder<T>
- java.lang.Object
-
- com.hazelcast.jet.elastic.ElasticSourceBuilder<T>
-
- Type Parameters:
T
- type of the output of the mapping function fromSearchHit
-> T
public final class ElasticSourceBuilder<T> extends java.lang.Object
Builder for Elasticsearch source which reads data from Elasticsearch and converts SearchHits using providedmapToItemFn
Usage:
RequiresBatchSource<String> source = new ElasticSourceBuilder<String>() .clientFn(() -> client(host, port)) .searchRequestFn(() -> new SearchRequest("my-index")) .mapToItemFn(SearchHit::getSourceAsString) .build(); BatchStage<String> stage = p.readFrom(source);
clientFn(SupplierEx)
,searchRequestFn(SupplierEx)
andmapToItemFn(FunctionEx)
.- Since:
- Jet 4.2
-
-
Constructor Summary
Constructors Constructor Description ElasticSourceBuilder()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description BatchSource<T>
build()
Build ElasticsearchBatchSource
with supplied parametersElasticSourceBuilder<T>
clientFn(SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn)
Set the client supplier functionElasticSourceBuilder<T>
enableCoLocatedReading()
Enable co-located reading Jet cluster member must run exactly on the same nodes as Elastic cluster.ElasticSourceBuilder<T>
enableSlicing()
Enable slicing<T_NEW> ElasticSourceBuilder<T_NEW>
mapToItemFn(FunctionEx<? super org.elasticsearch.search.SearchHit,T_NEW> mapToItemFn)
Set the function to map SearchHit to a pipeline itemElasticSourceBuilder<T>
optionsFn(FunctionEx<? super org.elasticsearch.action.ActionRequest,org.elasticsearch.client.RequestOptions> optionsFn)
Set the function that providesRequestOptions
ElasticSourceBuilder<T>
retries(int retries)
Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request.ElasticSourceBuilder<T>
scrollKeepAlive(java.lang.String scrollKeepAlive)
Set the keepAlive for Elastic search scrollElasticSourceBuilder<T>
searchRequestFn(SupplierEx<org.elasticsearch.action.search.SearchRequest> searchRequestFn)
Set the search request supplier function
-
-
-
Method Detail
-
build
@Nonnull public BatchSource<T> build()
Build ElasticsearchBatchSource
with supplied parameters- Returns:
- configured source which is to be used in the pipeline
-
clientFn
@Nonnull public ElasticSourceBuilder<T> clientFn(@Nonnull SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn)
Set the client supplier functionThe connector uses the returned instance to access Elasticsearch. Also see
ElasticClients
for convenience factory methods.For example, to provide an authenticated client:
This parameter is required.builder.clientFn(() -> client(host, port, username, password))
- Parameters:
clientFn
- supplier function returning configured Elasticsearch REST client
-
searchRequestFn
@Nonnull public ElasticSourceBuilder<T> searchRequestFn(@Nonnull SupplierEx<org.elasticsearch.action.search.SearchRequest> searchRequestFn)
Set the search request supplier functionThe connector executes this search request to retrieve documents from Elasticsearch.
For example, to create SearchRequest limited to an index `logs`:
This parameter is required.builder.searchRequestFn(() -> new SearchRequest("logs"))
- Parameters:
searchRequestFn
- search request supplier function
-
mapToItemFn
@Nonnull public <T_NEW> ElasticSourceBuilder<T_NEW> mapToItemFn(@Nonnull FunctionEx<? super org.elasticsearch.search.SearchHit,T_NEW> mapToItemFn)
Set the function to map SearchHit to a pipeline itemFor example, to map a SearchHit to a value of a field `productId`:
This parameter is required.builder.mapToItemFn(hit -> (String) hit.getSourceAsMap().get("productId"))
- Parameters:
mapToItemFn
- maps search hits to output items
-
optionsFn
@Nonnull public ElasticSourceBuilder<T> optionsFn(@Nonnull FunctionEx<? super org.elasticsearch.action.ActionRequest,org.elasticsearch.client.RequestOptions> optionsFn)
Set the function that providesRequestOptions
It can either return a constant value or a value based on provided request.
For example, use this to provide a custom authentication header:
sourceBuilder.optionsFn((request) -> { RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); builder.addHeader("Authorization", "Bearer " + TOKEN); return builder.build(); })
- Parameters:
optionsFn
- function that providesRequestOptions
- See Also:
- RequestOptions in Elastic documentation
-
enableSlicing
@Nonnull public ElasticSourceBuilder<T> enableSlicing()
Enable slicingNumber of slices is equal to
globalParallelism (localParallelism * numberOfNodes)
when only slicing is enabled. When co-located reading is enabled as well then number of slices for particular node is equal tolocalParallelism
.Use this option to read from multiple shards in parallel. It can also be used on single shard, but it may increase initial latency. See Elastic documentation for Sliced Scroll for details.
-
enableCoLocatedReading
@Nonnull public ElasticSourceBuilder<T> enableCoLocatedReading()
Enable co-located reading Jet cluster member must run exactly on the same nodes as Elastic cluster.
-
scrollKeepAlive
@Nonnull public ElasticSourceBuilder<T> scrollKeepAlive(@Nonnull java.lang.String scrollKeepAlive)
Set the keepAlive for Elastic search scrollThe value must be in Elastic time unit format, e.g. 500ms for 500 milliseconds, 30s for 30 seconds, 5m for 5 minutes. See
SearchRequest.scroll(String)
.- Parameters:
scrollKeepAlive
- keepAlive value, this must be high enough to process all results from a single scroll, default value 1m
-
retries
@Nonnull public ElasticSourceBuilder<T> retries(int retries)
Number of retries the connector will do in addition to Elastic client retries Elastic client tries to connect to a node only once for each request. When a request fails the node is marked dead and is not retried again for the request. This causes problems with single node clusters or in a situation where whole cluster becomes unavailable at the same time (e.g. due to a network issue). The initial delay is 2s, increasing by factor of 2 with each retry (4s, 8s, 16s, ..).- Parameters:
retries
- number of retries, defaults to 5
-
-