Class JdbcSinkBuilder<T>

java.lang.Object
com.hazelcast.jet.pipeline.JdbcSinkBuilder<T>
Type Parameters:
T - type of the items the sink accepts

public class JdbcSinkBuilder<T> extends Object
Since:
Jet 4.1
  • Field Details

    • DEFAULT_EXACTLY_ONCE

      public static final boolean DEFAULT_EXACTLY_ONCE
      The default setting for whether exactly-once is allowed for the sink.
      See Also:
    • DEFAULT_BATCH_LIMIT

      public static final int DEFAULT_BATCH_LIMIT
      The default batch size limit to use for the sink if batching is supported.
      See Also:
  • Method Details

    • updateQuery

      @Nonnull public JdbcSinkBuilder<T> updateQuery(@Nonnull String updateQuery)
      The query to execute for each item. It should contain a parametrized query to which the bind function will bind values.

      Which type of statement to use?

      In exactly-once mode we recommend using an INSERT statement. Each parallel processor uses a separate transaction: if two processors try to update the same record, the later one will be deadlocked: it will be blocked waiting for a record lock but will never get it because the snapshot will not be able to complete and the lock owner will never commit.

      On the other hand, in at-least-once mode we recommend using a MERGE statement. If a unique key is derived from the items, this can give you exactly-once behavior through idempotence without using XA transactions: the MERGE statement will insert a record initially and overwrite the same record, if the job restarted and the same item is written again. If you don't have a unique key in the item, use INSERT with auto-generated key.

      Parameters:
      updateQuery - the SQL statement to execute for each item
      Returns:
      this instance for fluent API
    • bindFn

      @Nonnull public JdbcSinkBuilder<T> bindFn(@Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
      Set the function to bind values to a PreparedStatement created with the query set with updateQuery(String). The function should not execute the query, nor call commit() or any other method.
      Parameters:
      bindFn - the bind function. The function must be stateless.
      Returns:
      this instance for fluent API
    • jdbcUrl

      @Nonnull public JdbcSinkBuilder<T> jdbcUrl(String connectionUrl)
      Sets the connection URL for the target database.

      If your job runs in exactly-once mode, don't use this method, but provide an XADataSource using dataSourceSupplier(SupplierEx) method, otherwise the job will fail. If your driver doesn't have an XADataSource implementation, also call exactlyOnce(false).

      See also dataSourceSupplier(SupplierEx).

      Parameters:
      connectionUrl - the connection URL
      Returns:
      this instance for fluent API
    • dataSourceSupplier

      @Nonnull public JdbcSinkBuilder<T> dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier)
      Sets the supplier of DataSource or XADataSource. One dataSource instance will be created on each member. For exactly-once guarantee an XADataSource must be used or the job will fail. If your driver doesn't have an XADataSource implementation, also call exactlyOnce(false).

      There's no need to use ConnectionPoolDataSource. One connection is given to each processor and that connection is held during the entire job execution.

      Parameters:
      dataSourceSupplier - the supplier of data source. The function must be stateless.
      Returns:
      this instance for fluent API
    • exactlyOnce

      @Nonnull public JdbcSinkBuilder<T> exactlyOnce(boolean enabled)
      Sets whether the exactly-once mode is enabled for the sink. If exactly-once is enabled, the job must also be in exactly-once mode for that mode to be used, otherwise the sink will use the job's guarantee.

      Set exactly-once to false if you want your job to run in exactly-once, but want to reduce the guarantee just for the sink.

      The default is exactly-once set to true.

      Parameters:
      enabled - whether exactly-once is allowed for the sink
      Returns:
      this instance for fluent API
    • batchLimit

      @Nonnull public JdbcSinkBuilder<T> batchLimit(int batchLimit)
      Sets the batch size limit for the sink. If the JDBC driver supports batched updates, this defines the upper bound to the batch size.

      The default batch size limit is 50.

      Parameters:
      batchLimit - the batch size limit for the sink
      Returns:
      this instance for fluent API
      Since:
      Jet 4.5
    • dataConnectionRef

      @Nonnull @Beta public JdbcSinkBuilder<T> dataConnectionRef(DataConnectionRef dataConnectionRef)
      Sets the reference to the configured data connection of DataConnectionRef from which the instance of the DataSource will be retrieved.

      Example:

      (Prerequisite) Data connection configuration:

      
            Config config = smallInstanceConfig();
            Properties properties = new Properties();
            properties.setProperty("jdbcUrl", jdbcUrl);
            properties.setProperty("username", username);
            properties.setProperty("password", password);
            DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
                    .setName("my-jdbc-data-connection")
                    .setClassName(JdbcDataConnection.class.getName())
                    .setProperties(properties);
            config.getDataConnectionConfigs().put(name, dataConnectionConfig);
       

      Pipeline configuration

      
      
               p.readFrom(TestSources.items(IntStream.range(0, PERSON_COUNT).boxed().toArray(Integer[]::new)))
                       .map(item -> entry(item, item.toString()))
                       .writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
                               .updateQuery("INSERT INTO " + tableName + " VALUES(?, ?)")
                               .dataConnectionRef(dataConnectionRef("my-jdbc-data-connection"))
                               .bindFn((stmt, item1) -> {
                                   stmt.setInt(1, item1.getKey());
                                   stmt.setString(2, item1.getValue());
                               })
                               .build());
       

      Parameters:
      dataConnectionRef - the reference to the configured data connection
      Returns:
      this instance for fluent API
      Since:
      5.2
    • build

      @Nonnull public Sink<T> build()
      Creates and returns the JDBC Sink with the supplied components.