T - type of the items the sink acceptspublic class JdbcSinkBuilder<T> extends Object
Sinks.jdbcBuilder().| Modifier and Type | Field and Description |
|---|---|
static int |
DEFAULT_BATCH_LIMIT
The default batch size limit to use for the sink if batching is supported.
|
static boolean |
DEFAULT_EXACTLY_ONCE
The default setting for whether exactly-once is allowed for the sink.
|
| Modifier and Type | Method and Description |
|---|---|
JdbcSinkBuilder<T> |
batchLimit(int batchLimit)
Sets the batch size limit for the sink.
|
JdbcSinkBuilder<T> |
bindFn(BiConsumerEx<PreparedStatement,T> bindFn)
Set the function to bind values to a
PreparedStatement created
with the query set with updateQuery(String). |
Sink<T> |
build()
Creates and returns the JDBC
Sink with the supplied components. |
JdbcSinkBuilder<T> |
dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier)
Sets the supplier of
DataSource or XADataSource. |
JdbcSinkBuilder<T> |
exactlyOnce(boolean enabled)
Sets whether the exactly-once mode is enabled for the sink.
|
JdbcSinkBuilder<T> |
externalDataStoreRef(ExternalDataStoreRef externalDataStoreRef)
Sets the reference to the configured external dataStore of
ExternalDataStoreRef from which
the instance of the DataSource will be retrieved. |
JdbcSinkBuilder<T> |
jdbcUrl(String connectionUrl)
Sets the connection URL for the target database.
|
JdbcSinkBuilder<T> |
updateQuery(String updateQuery)
The query to execute for each item.
|
public static final boolean DEFAULT_EXACTLY_ONCE
public static final int DEFAULT_BATCH_LIMIT
@Nonnull public JdbcSinkBuilder<T> updateQuery(@Nonnull String updateQuery)
Which type of statement to use?
In exactly-once mode we recommend using an INSERT statement.
Each parallel processor uses a separate transaction: if two processors
try to update the same record, the later one will be deadlocked: it will
be blocked waiting for a record lock but will never get it because the
snapshot will not be able to complete and the lock owner will never
commit.
On the other hand, in at-least-once mode we recommend using a MERGE statement. If a unique key is derived from the items, this can
give you exactly-once behavior through idempotence without using XA
transactions: the MERGE statement will insert a record initially and
overwrite the same record, if the job restarted and the same item is
written again. If you don't have a unique key in the item, use INSERT
with auto-generated key.
updateQuery - the SQL statement to execute for each item@Nonnull public JdbcSinkBuilder<T> bindFn(@Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
PreparedStatement created
with the query set with updateQuery(String). The function
should not execute the query, nor call commit() or any other
method.bindFn - the bind function. The function must be stateless.@Nonnull public JdbcSinkBuilder<T> jdbcUrl(String connectionUrl)
If your job runs in exactly-once mode, don't use this method, but
provide an XADataSource using dataSourceSupplier(SupplierEx) method, otherwise the job will fail.
If your driver doesn't have an XADataSource implementation, also call
exactlyOnce(false).
See also dataSourceSupplier(SupplierEx).
connectionUrl - the connection URL@Nonnull public JdbcSinkBuilder<T> dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier)
DataSource or XADataSource. One dataSource instance will be created on each
member. For exactly-once guarantee an XADataSource must be used or the
job will fail. If your driver doesn't have an XADataSource
implementation, also call exactlyOnce(false).
There's no need to use ConnectionPoolDataSource. One
connection is given to each processor and that connection is held during
the entire job execution.
dataSourceSupplier - the supplier of data source. The function must
be stateless.@Nonnull public JdbcSinkBuilder<T> exactlyOnce(boolean enabled)
Set exactly-once to false if you want your job to run in exactly-once, but want to reduce the guarantee just for the sink.
The default is exactly-once set to true.
enabled - whether exactly-once is allowed for the sink@Nonnull public JdbcSinkBuilder<T> batchLimit(int batchLimit)
The default batch size limit is 50.
batchLimit - the batch size limit for the sink@Nonnull @Beta public JdbcSinkBuilder<T> externalDataStoreRef(ExternalDataStoreRef externalDataStoreRef)
ExternalDataStoreRef from which
the instance of the DataSource will be retrieved.
Example:
(Prerequisite) External dataStore configuration:
Config config = smallInstanceConfig();
Properties properties = new Properties();
properties.put("jdbcUrl", jdbcUrl);
properties.put("username", username);
properties.put("password", password);
ExternalDataStoreConfig externalDataStoreConfig = new ExternalDataStoreConfig()
.setName("my-jdbc-data-store")
.setClassName(JdbcDataStoreFactory.class.getName())
.setProperties(properties);
config.getExternalDataStoreConfigs().put(name, externalDataStoreConfig);
Pipeline configuration
p.readFrom(TestSources.items(IntStream.range(0, PERSON_COUNT).boxed().toArray(Integer[]::new)))
.map(item -> entry(item, item.toString()))
.writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
.updateQuery("INSERT INTO " + tableName + " VALUES(?, ?)")
.externalDataStoreRef(externalDataStoreRef("my-jdbc-data-store"))
.bindFn((stmt, item1) -> {
stmt.setInt(1, item1.getKey());
stmt.setString(2, item1.getValue());
})
.build());
externalDataStoreRef - the reference to the configured external dataStoreCopyright © 2023 Hazelcast, Inc.. All rights reserved.