Class JdbcSinkBuilder<T>
- Type Parameters:
T
- type of the items the sink accepts
Sinks.jdbcBuilder()
.- Since:
- Jet 4.1
-
Field Summary
Modifier and TypeFieldDescriptionstatic final int
The default batch size limit to use for the sink if batching is supported.static final boolean
The default setting for whether exactly-once is allowed for the sink. -
Method Summary
Modifier and TypeMethodDescriptionbatchLimit
(int batchLimit) Sets the batch size limit for the sink.bindFn
(BiConsumerEx<PreparedStatement, T> bindFn) Set the function to bind values to aPreparedStatement
created with the query set withupdateQuery(String)
.build()
Creates and returns the JDBCSink
with the supplied components.dataConnectionRef
(DataConnectionRef dataConnectionRef) Sets the reference to the configured data connection ofDataConnectionRef
from which the instance of theDataSource
will be retrieved.dataSourceSupplier
(SupplierEx<? extends CommonDataSource> dataSourceSupplier) Sets the supplier ofDataSource
orXADataSource
.exactlyOnce
(boolean enabled) Sets whether the exactly-once mode is enabled for the sink.Sets the connection URL for the target database.updateQuery
(String updateQuery) The query to execute for each item.
-
Field Details
-
DEFAULT_EXACTLY_ONCE
public static final boolean DEFAULT_EXACTLY_ONCEThe default setting for whether exactly-once is allowed for the sink.- See Also:
-
DEFAULT_BATCH_LIMIT
public static final int DEFAULT_BATCH_LIMITThe default batch size limit to use for the sink if batching is supported.- See Also:
-
-
Method Details
-
updateQuery
The query to execute for each item. It should contain a parametrized query to which the bind function will bind values.Which type of statement to use?
In exactly-once mode we recommend using an
INSERT
statement. Each parallel processor uses a separate transaction: if two processors try to update the same record, the later one will be deadlocked: it will be blocked waiting for a record lock but will never get it because the snapshot will not be able to complete and the lock owner will never commit.On the other hand, in at-least-once mode we recommend using a
MERGE
statement. If a unique key is derived from the items, this can give you exactly-once behavior through idempotence without using XA transactions: the MERGE statement will insert a record initially and overwrite the same record, if the job restarted and the same item is written again. If you don't have a unique key in the item, use INSERT with auto-generated key.- Parameters:
updateQuery
- the SQL statement to execute for each item- Returns:
- this instance for fluent API
-
bindFn
Set the function to bind values to aPreparedStatement
created with the query set withupdateQuery(String)
. The function should not execute the query, nor callcommit()
or any other method.- Parameters:
bindFn
- the bind function. The function must be stateless.- Returns:
- this instance for fluent API
-
jdbcUrl
Sets the connection URL for the target database.If your job runs in exactly-once mode, don't use this method, but provide an
XADataSource
usingdataSourceSupplier(SupplierEx)
method, otherwise the job will fail. If your driver doesn't have an XADataSource implementation, also callexactlyOnce(false)
.See also
dataSourceSupplier(SupplierEx)
.- Parameters:
connectionUrl
- the connection URL- Returns:
- this instance for fluent API
-
dataSourceSupplier
@Nonnull public JdbcSinkBuilder<T> dataSourceSupplier(SupplierEx<? extends CommonDataSource> dataSourceSupplier) Sets the supplier ofDataSource
orXADataSource
. One dataSource instance will be created on each member. For exactly-once guarantee an XADataSource must be used or the job will fail. If your driver doesn't have an XADataSource implementation, also callexactlyOnce(false)
.There's no need to use
ConnectionPoolDataSource
. One connection is given to each processor and that connection is held during the entire job execution.- Parameters:
dataSourceSupplier
- the supplier of data source. The function must be stateless.- Returns:
- this instance for fluent API
-
exactlyOnce
Sets whether the exactly-once mode is enabled for the sink. If exactly-once is enabled, the job must also be in exactly-once mode for that mode to be used, otherwise the sink will use the job's guarantee.Set exactly-once to false if you want your job to run in exactly-once, but want to reduce the guarantee just for the sink.
The default is exactly-once set to true.
- Parameters:
enabled
- whether exactly-once is allowed for the sink- Returns:
- this instance for fluent API
-
batchLimit
Sets the batch size limit for the sink. If the JDBC driver supports batched updates, this defines the upper bound to the batch size.The default batch size limit is 50.
- Parameters:
batchLimit
- the batch size limit for the sink- Returns:
- this instance for fluent API
- Since:
- Jet 4.5
-
dataConnectionRef
Sets the reference to the configured data connection ofDataConnectionRef
from which the instance of theDataSource
will be retrieved.Example:
(Prerequisite) Data connection configuration:
Config config = smallInstanceConfig(); Properties properties = new Properties(); properties.put("jdbcUrl", jdbcUrl); properties.put("username", username); properties.put("password", password); DataConnectionConfig dataConnectionConfig = new DataConnectionConfig() .setName("my-jdbc-data-connection") .setClassName(JdbcDataConnection.class.getName()) .setProperties(properties); config.getDataConnectionConfigs().put(name, dataConnectionConfig);
Pipeline configuration
p.readFrom(TestSources.items(IntStream.range(0, PERSON_COUNT).boxed().toArray(Integer[]::new))) .map(item -> entry(item, item.toString())) .writeTo(Sinks.<Entry<Integer, String>>jdbcBuilder() .updateQuery("INSERT INTO " + tableName + " VALUES(?, ?)") .dataConnectionRef(dataConnectionRef("my-jdbc-data-connection")) .bindFn((stmt, item1) -> { stmt.setInt(1, item1.getKey()); stmt.setString(2, item1.getValue()); }) .build());
- Parameters:
dataConnectionRef
- the reference to the configured data connection- Returns:
- this instance for fluent API
- Since:
- 5.2
-
build
Creates and returns the JDBCSink
with the supplied components.
-