Class JdbcSinkBuilder<T>

  • Type Parameters:
    T - type of the items the sink accepts

    public class JdbcSinkBuilder<T>
    extends java.lang.Object
    Since:
    Jet 4.1
    • Field Detail

      • DEFAULT_EXACTLY_ONCE

        public static final boolean DEFAULT_EXACTLY_ONCE
        The default setting for whether exactly-once is allowed for the sink.
        See Also:
        Constant Field Values
      • DEFAULT_BATCH_LIMIT

        public static final int DEFAULT_BATCH_LIMIT
        The default batch size limit to use for the sink if batching is supported.
        See Also:
        Constant Field Values
    • Method Detail

      • updateQuery

        @Nonnull
        public JdbcSinkBuilder<T> updateQuery​(@Nonnull
                                              java.lang.String updateQuery)
        The query to execute for each item. It should contain a parametrized query to which the bind function will bind values.

        Which type of statement to use?

        In exactly-once mode we recommend using an INSERT statement. Each parallel processor uses a separate transaction: if two processors try to update the same record, the later one will be deadlocked: it will be blocked waiting for a record lock but will never get it because the snapshot will not be able to complete and the lock owner will never commit.

        On the other hand, in at-least-once mode we recommend using a MERGE statement. If a unique key is derived from the items, this can give you exactly-once behavior through idempotence without using XA transactions: the MERGE statement will insert a record initially and overwrite the same record, if the job restarted and the same item is written again. If you don't have a unique key in the item, use INSERT with auto-generated key.

        Parameters:
        updateQuery - the SQL statement to execute for each item
        Returns:
        this instance for fluent API
      • bindFn

        @Nonnull
        public JdbcSinkBuilder<T> bindFn​(@Nonnull
                                         BiConsumerEx<java.sql.PreparedStatement,​T> bindFn)
        Set the function to bind values to a PreparedStatement created with the query set with updateQuery(String). The function should not execute the query, nor call commit() or any other method.
        Parameters:
        bindFn - the bind function. The function must be stateless.
        Returns:
        this instance for fluent API
      • jdbcUrl

        @Nonnull
        public JdbcSinkBuilder<T> jdbcUrl​(java.lang.String connectionUrl)
        Sets the connection URL for the target database.

        If your job runs in exactly-once mode, don't use this method, but provide an XADataSource using dataSourceSupplier(SupplierEx) method, otherwise the job will fail. If your driver doesn't have an XADataSource implementation, also call exactlyOnce(false).

        See also dataSourceSupplier(SupplierEx).

        Parameters:
        connectionUrl - the connection URL
        Returns:
        this instance for fluent API
      • dataSourceSupplier

        @Nonnull
        public JdbcSinkBuilder<T> dataSourceSupplier​(SupplierEx<? extends javax.sql.CommonDataSource> dataSourceSupplier)
        Sets the supplier of DataSource or XADataSource. One dataSource instance will be created on each member. For exactly-once guarantee an XADataSource must be used or the job will fail. If your driver doesn't have an XADataSource implementation, also call exactlyOnce(false).

        There's no need to use ConnectionPoolDataSource. One connection is given to each processor and that connection is held during the entire job execution.

        Parameters:
        dataSourceSupplier - the supplier of data source. The function must be stateless.
        Returns:
        this instance for fluent API
      • exactlyOnce

        @Nonnull
        public JdbcSinkBuilder<T> exactlyOnce​(boolean enabled)
        Sets whether the exactly-once mode is enabled for the sink. If exactly-once is enabled, the job must also be in exactly-once mode for that mode to be used, otherwise the sink will use the job's guarantee.

        Set exactly-once to false if you want your job to run in exactly-once, but want to reduce the guarantee just for the sink.

        The default is exactly-once set to true.

        Parameters:
        enabled - whether exactly-once is allowed for the sink
        Returns:
        this instance for fluent API
      • batchLimit

        @Nonnull
        public JdbcSinkBuilder<T> batchLimit​(int batchLimit)
        Sets the batch size limit for the sink. If the JDBC driver supports batched updates, this defines the upper bound to the batch size.

        The default batch size limit is 50.

        Parameters:
        batchLimit - the batch size limit for the sink
        Returns:
        this instance for fluent API
        Since:
        Jet 4.5
      • dataConnectionRef

        @Nonnull
        @Beta
        public JdbcSinkBuilder<T> dataConnectionRef​(DataConnectionRef dataConnectionRef)
        Sets the reference to the configured data connection of DataConnectionRef from which the instance of the DataSource will be retrieved.

        Example:

        (Prerequisite) Data connection configuration:

        
              Config config = smallInstanceConfig();
              Properties properties = new Properties();
              properties.put("jdbcUrl", jdbcUrl);
              properties.put("username", username);
              properties.put("password", password);
              DataConnectionConfig dataConnectionConfig = new DataConnectionConfig()
                      .setName("my-jdbc-data-connection")
                      .setClassName(JdbcDataConnection.class.getName())
                      .setProperties(properties);
              config.getDataConnectionConfigs().put(name, dataConnectionConfig);
         

        Pipeline configuration

        
        
                 p.readFrom(TestSources.items(IntStream.range(0, PERSON_COUNT).boxed().toArray(Integer[]::new)))
                         .map(item -> entry(item, item.toString()))
                         .writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
                                 .updateQuery("INSERT INTO " + tableName + " VALUES(?, ?)")
                                 .dataConnectionRef(dataConnectionRef("my-jdbc-data-connection"))
                                 .bindFn((stmt, item1) -> {
                                     stmt.setInt(1, item1.getKey());
                                     stmt.setString(2, item1.getValue());
                                 })
                                 .build());
         

        Parameters:
        dataConnectionRef - the reference to the configured data connection
        Returns:
        this instance for fluent API
        Since:
        5.2
      • build

        @Nonnull
        public Sink<T> build()
        Creates and returns the JDBC Sink with the supplied components.