T
- type of the items the sink acceptspublic class JdbcSinkBuilder<T> extends Object
Sinks.jdbcBuilder()
.Modifier and Type | Field and Description |
---|---|
static int |
DEFAULT_BATCH_LIMIT
The default batch size limit to use for the sink if batching is supported.
|
static boolean |
DEFAULT_EXACTLY_ONCE
The default setting for whether exactly-once is allowed for the sink.
|
Modifier and Type | Method and Description |
---|---|
JdbcSinkBuilder<T> |
batchLimit(int batchLimit)
Sets the batch size limit for the sink.
|
JdbcSinkBuilder<T> |
bindFn(BiConsumerEx<PreparedStatement,T> bindFn)
Set the function to bind values to a
PreparedStatement created
with the query set with updateQuery(String) . |
Sink<T> |
build()
Creates and returns the JDBC
Sink with the supplied components. |
JdbcSinkBuilder<T> |
dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier)
Sets the supplier of
DataSource or XADataSource . |
JdbcSinkBuilder<T> |
exactlyOnce(boolean enabled)
Sets whether the exactly-once mode is enabled for the sink.
|
JdbcSinkBuilder<T> |
externalDataStoreRef(ExternalDataStoreRef externalDataStoreRef)
Sets the reference to the configured external dataStore of
ExternalDataStoreRef from which
the instance of the DataSource will be retrieved. |
JdbcSinkBuilder<T> |
jdbcUrl(String connectionUrl)
Sets the connection URL for the target database.
|
JdbcSinkBuilder<T> |
updateQuery(String updateQuery)
The query to execute for each item.
|
public static final boolean DEFAULT_EXACTLY_ONCE
public static final int DEFAULT_BATCH_LIMIT
@Nonnull public JdbcSinkBuilder<T> updateQuery(@Nonnull String updateQuery)
Which type of statement to use?
In exactly-once mode we recommend using an INSERT
statement.
Each parallel processor uses a separate transaction: if two processors
try to update the same record, the later one will be deadlocked: it will
be blocked waiting for a record lock but will never get it because the
snapshot will not be able to complete and the lock owner will never
commit.
On the other hand, in at-least-once mode we recommend using a MERGE
statement. If a unique key is derived from the items, this can
give you exactly-once behavior through idempotence without using XA
transactions: the MERGE statement will insert a record initially and
overwrite the same record, if the job restarted and the same item is
written again. If you don't have a unique key in the item, use INSERT
with auto-generated key.
updateQuery
- the SQL statement to execute for each item@Nonnull public JdbcSinkBuilder<T> bindFn(@Nonnull BiConsumerEx<PreparedStatement,T> bindFn)
PreparedStatement
created
with the query set with updateQuery(String)
. The function
should not execute the query, nor call commit()
or any other
method.bindFn
- the bind function. The function must be stateless.@Nonnull public JdbcSinkBuilder<T> jdbcUrl(String connectionUrl)
If your job runs in exactly-once mode, don't use this method, but
provide an XADataSource
using dataSourceSupplier(SupplierEx)
method, otherwise the job will fail.
If your driver doesn't have an XADataSource implementation, also call
exactlyOnce(false)
.
See also dataSourceSupplier(SupplierEx)
.
connectionUrl
- the connection URL@Nonnull public JdbcSinkBuilder<T> dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier)
DataSource
or XADataSource
. One dataSource instance will be created on each
member. For exactly-once guarantee an XADataSource must be used or the
job will fail. If your driver doesn't have an XADataSource
implementation, also call exactlyOnce(false)
.
There's no need to use ConnectionPoolDataSource
. One
connection is given to each processor and that connection is held during
the entire job execution.
dataSourceSupplier
- the supplier of data source. The function must
be stateless.@Nonnull public JdbcSinkBuilder<T> exactlyOnce(boolean enabled)
Set exactly-once to false if you want your job to run in exactly-once, but want to reduce the guarantee just for the sink.
The default is exactly-once set to true.
enabled
- whether exactly-once is allowed for the sink@Nonnull public JdbcSinkBuilder<T> batchLimit(int batchLimit)
The default batch size limit is 50.
batchLimit
- the batch size limit for the sink@Nonnull @Beta public JdbcSinkBuilder<T> externalDataStoreRef(ExternalDataStoreRef externalDataStoreRef)
ExternalDataStoreRef
from which
the instance of the DataSource
will be retrieved.
Example:
(Prerequisite) External dataStore configuration:
Config config = smallInstanceConfig();
Properties properties = new Properties();
properties.put("jdbcUrl", jdbcUrl);
properties.put("username", username);
properties.put("password", password);
ExternalDataStoreConfig externalDataStoreConfig = new ExternalDataStoreConfig()
.setName("my-jdbc-data-store")
.setClassName(JdbcDataStoreFactory.class.getName())
.setProperties(properties);
config.getExternalDataStoreConfigs().put(name, externalDataStoreConfig);
Pipeline configuration
p.readFrom(TestSources.items(IntStream.range(0, PERSON_COUNT).boxed().toArray(Integer[]::new)))
.map(item -> entry(item, item.toString()))
.writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder()
.updateQuery("INSERT INTO " + tableName + " VALUES(?, ?)")
.externalDataStoreRef(externalDataStoreRef("my-jdbc-data-store"))
.bindFn((stmt, item1) -> {
stmt.setInt(1, item1.getKey());
stmt.setString(2, item1.getValue());
})
.build());
externalDataStoreRef
- the reference to the configured external dataStoreCopyright © 2022 Hazelcast, Inc.. All rights reserved.