Class ElasticSinkBuilder<T>

java.lang.Object
com.hazelcast.jet.elastic.ElasticSinkBuilder<T>
Type Parameters:
T -
All Implemented Interfaces:
Serializable

public final class ElasticSinkBuilder<T> extends Object implements Serializable
Builder for Elasticsearch Sink

The Sink first maps items from the pipeline using the provided mapToRequestFn(FunctionEx) and then using BulkRequest.

BulkRequest() is used by default, it can be modified by providing custom bulkRequestFn(SupplierEx)

Usage:


 Sink<Map<String, ?>> elasticSink = new ElasticSinkBuilder<Map<String, ?>>()
   .clientFn(() -> ElasticClients.client(host, port))
   .mapToRequestFn(item -> new IndexRequest("my-index").source(item))
   .build();
 

Requires clientFn(SupplierEx) and mapToRequestFn(FunctionEx).

Since:
Jet 4.2
See Also:
  • Constructor Details

    • ElasticSinkBuilder

      public ElasticSinkBuilder()
  • Method Details

    • clientFn

      @Nonnull public ElasticSinkBuilder<T> clientFn(@Nonnull SupplierEx<org.elasticsearch.client.RestClientBuilder> clientFn)
      Set the client supplier function

      The connector uses the returned instance to access Elasticsearch. Also see ElasticClients for convenience factory methods.

      For example, to provide an authenticated client:

      
       builder.clientFn(() -> client(host, port, username, password))
       
      This parameter is required.
      Parameters:
      clientFn - supplier function returning configured Elasticsearch REST client
    • bulkRequestFn

      @Nonnull public ElasticSinkBuilder<T> bulkRequestFn(@Nonnull SupplierEx<org.elasticsearch.action.bulk.BulkRequest> bulkRequestFn)
      Set the supplier function for BulkRequest, defaults to new BulkRequest()

      For example, to modify the BulkRequest used to index documents:

      
       builder.bulkRequestFn(() -> new BulkRequest().setRefreshPolicy(IMMEDIATE))
       
      Parameters:
      bulkRequestFn - supplier function for the bulk request
      See Also:
    • mapToRequestFn

      @Nonnull public <T_NEW> ElasticSinkBuilder<T_NEW> mapToRequestFn(@Nonnull FunctionEx<? super T_NEW,? extends org.elasticsearch.action.DocWriteRequest<?>> mapToRequestFn)
      Set the function mapping the item from a pipeline item to an index request

      For example, to create an IndexRequest for a versioned document:

      
       builder.mapToRequestFn((mapItem) ->
                            new IndexRequest("my-index")
                                    .source(map)
                                    .version((Long) map.get("version"))
       
      This parameter is required.
      Type Parameters:
      T_NEW - type of the items from the pipeline
      Parameters:
      mapToRequestFn - maps an item from the stream to an IndexRequest, UpdateRequest or DeleteRequest
    • optionsFn

      @Nonnull public ElasticSinkBuilder<T> optionsFn(@Nonnull FunctionEx<? super org.elasticsearch.action.ActionRequest,org.elasticsearch.client.RequestOptions> optionsFn)
      Set the function that provides RequestOptions

      It can either return a constant value or a value based on provided request.

      For example, to provide a custom authentication header:

      
       sinkBuilder.optionsFn((request) -> {
           RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
           builder.addHeader("Authorization", "Bearer " + TOKEN);
           return builder.build();
       })
       
      Parameters:
      optionsFn - function that provides RequestOptions
      See Also:
    • retries

      @Nonnull public ElasticSinkBuilder<T> retries(int retries)
      Number of retries the connector will do in addition to Elastic client retries

      Elastic client tries to connect to a node only once for each request. When a request fails the node is marked dead and is not retried again for the request. This causes problems with single node clusters or in a situation where whole cluster becomes unavailable at the same time (e.g. due to a network issue).

      The initial delay is 2s, increasing by factor of 2 with each retry (4s, 8s, 16s, ..).

      Parameters:
      retries - number of retries, defaults to 5
    • build

      @Nonnull public Sink<T> build()
      Create a sink that writes data into Elasticsearch based on this builder configuration