Class PostgresCdcSources.Builder<T>
- Type Parameters:
T
- type of items produced by the source
- Enclosing class:
- PostgresCdcSources
-
Field Summary
Fields inherited from class com.hazelcast.enterprise.jet.cdc.DebeziumCdcSources.Builder
config, engineBuilderSupplier, recordMappingFunction
-
Method Summary
Modifier and TypeMethodDescriptioncom.hazelcast.jet.pipeline.StreamSource<T>
build()
Returns the source based on the properties set so far.Sets the return type of the source toChangeRecord
.<T_NEW> PostgresCdcSources.Builder<T_NEW>
customMapping
(RecordMappingFunction<T_NEW> recordMappingFunction) Sets the return type of the source to user defined#T_NEW
type.json()
Sets the return type of the source toMap.Entry
with key and value beingSourceRecord
's key and value, parsed to json string.setColumnIncludeList
(String... columnNameRegExps) Optional regular expressions that match the fully-qualified names of columns that should be excluded from change event message values.setCustomSnapshotter
(Class<?> snapshotterClass) Custom snapshotter that will be used by the connector.setDatabaseAddress
(String address) IP address or hostname of the database server, has to be specified.setDatabaseAddress
(String address, int port) IP address or hostname and the port of the database server, has to be specified.setDatabaseCredentials
(String user, String password) Database user and password for connecting to the database server.setDatabaseExcludeList
(String... databaseNameRegExps) Optional regular expressions that match databases to be excluded from monitoring; any table not included in the exclude list will be monitored.setDatabaseIncludeList
(String... databaseNameRegExps) Optional regular expressions that match databases to be monitored; any database not included in the include list will be excluded from monitoring.setDatabaseName
(String dbName) The name of the PostgreSQL database from which to stream the changes.setDatabasePassword
(String password) Database user password for connecting to the database server.setDatabasePort
(int port) Optional port number of the database server, if unspecified defaults to the database specific default port (5432).setDatabaseUser
(String user) Database user for connecting to the database server.setLogicalDecodingPlugIn
(String pluginName) The name of the @see Postgres logical decoding plug-in installed on the server.setProperty
(String key, boolean value) Sets a source property.setProperty
(String key, int value) Sets a source property.setProperty
(String key, long value) Sets a source property.setProperty
(String key, String value) Sets a source property.setPublicationName
(String publicationName) The name of the Postgres publication that will be used for CDC purposes.setReplicationSlotDropOnStop
(boolean dropOnStop) Whether to drop the logical replication slot when the connector disconnects cleanly.setReplicationSlotName
(String slotName) The name of the @see Postgres logical decoding slot (also called "replication slot") created for streaming changes from a plug-in and database instance.setSchemaExcludeList
(String... schemaNameRegExps) Optional regular expressions that match schema names to be excluded from monitoring ("schema" is used here to denote logical groups of tables).setSchemaIncludeList
(String... schemaNameRegExps) Optional regular expressions that match schema names to be monitored ("schema" is used here to denote logical groups of tables).setSnapshotMode
(DebeziumSnapshotMode snapshotMode) Snapshot mode that will be used by the connector.setSslCertificateFile
(String file) Specifies the (path to the) file containing the SSL Certificate for the database client.setSslKeyFile
(String file) Specifies the (path to the) file containing the SSL private key of the database client.setSslKeyFilePassword
(String password) Specifies the password to be used to access the SSL key file, if specified.setSslMode
(String mode) Specifies whether to use an encrypted connection to the database.Specifies the file containing SSL certificate authority (CA) certificate(s).setTableExcludeList
(String... tableNameRegExps) Optional regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored.setTableIncludeList
(String... tableNameRegExps) Optional regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring.Changes the engine used to async embedded engine.Changes the engine used to default embedded engine.withErrorMaxRetries
(int errorRetryCount) Sets the maximum retry count in case of errors.withSequenceExtractor
(Class<? extends SequenceExtractor> sequenceExtractorClass) Sets theSequenceExtractor
class property.
-
Method Details
-
setSnapshotMode
@Nonnull public PostgresCdcSources.Builder<T> setSnapshotMode(@Nonnull DebeziumSnapshotMode snapshotMode) Snapshot mode that will be used by the connector.If you want to use
PostgresConnectorConfig.SnapshotMode.CUSTOM
, please usesetCustomSnapshotter(Class)
method instead. -
setCustomSnapshotter
@Nonnull public PostgresCdcSources.Builder<T> setCustomSnapshotter(@Nonnull Class<?> snapshotterClass) Custom snapshotter that will be used by the connector. -
setDatabaseAddress
IP address or hostname and the port of the database server, has to be specified. -
setDatabaseCredentials
public PostgresCdcSources.Builder<T> setDatabaseCredentials(@Nonnull String user, @Nonnull String password) Database user and password for connecting to the database server. Has to be specified. -
setDatabaseAddress
IP address or hostname of the database server, has to be specified. -
setDatabasePort
Optional port number of the database server, if unspecified defaults to the database specific default port (5432). -
setDatabaseUser
Database user for connecting to the database server. Has to be specified. -
setDatabasePassword
Database user password for connecting to the database server. Has to be specified. -
setDatabaseName
The name of the PostgreSQL database from which to stream the changes. Has to be set.Currently, this source is not capable of monitoring multiple databases, only multiple schemas and/or tables. See white- and black-listing configuration options for those.
-
setSchemaIncludeList
@Nonnull public PostgresCdcSources.Builder<T> setSchemaIncludeList(@Nonnull String... schemaNameRegExps) Optional regular expressions that match schema names to be monitored ("schema" is used here to denote logical groups of tables). Any schema name not included in the whitelist will be excluded from monitoring. By default, all non-system schemas will be monitored. May not be used withschema blacklist
. -
setSchemaExcludeList
@Nonnull public PostgresCdcSources.Builder<T> setSchemaExcludeList(@Nonnull String... schemaNameRegExps) Optional regular expressions that match schema names to be excluded from monitoring ("schema" is used here to denote logical groups of tables). Any schema name not included in the blacklist will be monitored, except system schemas. May not be used withschema whitelist
. -
setDatabaseIncludeList
@Nonnull public PostgresCdcSources.Builder<T> setDatabaseIncludeList(@Nonnull String... databaseNameRegExps) Optional regular expressions that match databases to be monitored; any database not included in the include list will be excluded from monitoring. By default, the connector will monitor all databases. May not be used withdatabase exclude list
.- Overrides:
setDatabaseIncludeList
in classDebeziumCdcSources.Builder<T>
-
setDatabaseExcludeList
@Nonnull public PostgresCdcSources.Builder<T> setDatabaseExcludeList(@Nonnull String... databaseNameRegExps) Optional regular expressions that match databases to be excluded from monitoring; any table not included in the exclude list will be monitored. May not be used withdatabase include list
.- Overrides:
setDatabaseExcludeList
in classDebeziumCdcSources.Builder<T>
-
setTableIncludeList
@Nonnull public PostgresCdcSources.Builder<T> setTableIncludeList(@Nonnull String... tableNameRegExps) Optional regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form schemaName.tableName. By default, the connector will monitor every non-system table in each monitored database. May not be used withtable exclude list
.- Overrides:
setTableIncludeList
in classDebeziumCdcSources.Builder<T>
-
setTableExcludeList
@Nonnull public PostgresCdcSources.Builder<T> setTableExcludeList(@Nonnull String... tableNameRegExps) Optional regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form schemaName.tableName. May not be used withtable whitelist
.- Overrides:
setTableExcludeList
in classDebeziumCdcSources.Builder<T>
-
setColumnIncludeList
@Nonnull public PostgresCdcSources.Builder<T> setColumnIncludeList(@Nonnull String... columnNameRegExps) Optional regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. -
setLogicalDecodingPlugIn
The name of the @see Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.If not explicitly set, the property defaults to decoderbufs.
When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases it is possible to switch to so-called streaming mode when every change in transactions is sent as a separate message from PostgreSQL.
-
setReplicationSlotName
The name of the @see Postgres logical decoding slot (also called "replication slot") created for streaming changes from a plug-in and database instance.Values must conform to Postgres replication slot naming rules which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."
Replication slots have to have an identifier that is unique across all databases in a PostgreSQL cluster.
If not explicitly set, the property defaults to debezium.
-
setReplicationSlotDropOnStop
Whether to drop the logical replication slot when the connector disconnects cleanly.Defaults to false
Should only be set to true in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database, so it may happen that after a restart the connector cannot resume from the WAL position where it left off before.
-
setPublicationName
The name of the Postgres publication that will be used for CDC purposes.If the publication does not exist when this source starts up, then the source will create it (note: the database user of the source must have superuser permissions to be able to do so). If created this way the publication will include all tables and the source itself must filter the data based on its white-/blacklist configs. This is not efficient because the database will still send all data to the connector, before filtering is applied.
It's best to use a pre-defined publication (via the
CREATE PUBLICATION
SQL command, specified via its name.If not explicitly set, the property defaults to dbz_publication.
-
setSslMode
Specifies whether to use an encrypted connection to the database. The default is disable, and specifies to use an unencrypted connection.The require option establishes an encrypted connection but will fail if one cannot be made for any reason.
The verify_ca option behaves like require but additionally it verifies the server TLS certificate against the configured Certificate Authority (CA) certificates and will fail if it doesn’t match any valid CA certificates.
The verify-full option behaves like verify_ca but additionally verifies that the server certificate matches the host of the remote connection.
-
setSslCertificateFile
Specifies the (path to the) file containing the SSL Certificate for the database client. -
setSslKeyFile
Specifies the (path to the) file containing the SSL private key of the database client. -
setSslKeyFilePassword
Specifies the password to be used to access the SSL key file, if specified.Mandatory if key file specified.
-
setSslRootCertificateFile
Specifies the file containing SSL certificate authority (CA) certificate(s). -
changeRecord
Sets the return type of the source toChangeRecord
.- Overrides:
changeRecord
in classDebeziumCdcSources.Builder<T>
-
json
Sets the return type of the source toMap.Entry
with key and value beingSourceRecord
's key and value, parsed to json string.- Overrides:
json
in classDebeziumCdcSources.Builder<T>
-
customMapping
@Nonnull public <T_NEW> PostgresCdcSources.Builder<T_NEW> customMapping(@Nonnull RecordMappingFunction<T_NEW> recordMappingFunction) Sets the return type of the source to user defined#T_NEW
type. Mapping will be performed by user-defined mapping function.- Overrides:
customMapping
in classDebeziumCdcSources.Builder<T>
-
withSequenceExtractor
@Nonnull public PostgresCdcSources.Builder<T> withSequenceExtractor(Class<? extends SequenceExtractor> sequenceExtractorClass) Sets theSequenceExtractor
class property.Sequence extractor is used to determine monotonically increasing order of CDC order, that can be used later in
WriteCdcP
.- Overrides:
withSequenceExtractor
in classDebeziumCdcSources.Builder<T>
- Parameters:
sequenceExtractorClass
- Class of the SequenceExtractor implementation. Must be on the classpath during execution.
-
withDefaultEngine
Changes the engine used to default embedded engine.This engine is most predictable as it's single-threaded. If you want to use multiple threads for event reading, please use
withAsyncEngine()
.- Overrides:
withDefaultEngine
in classDebeziumCdcSources.Builder<T>
-
withAsyncEngine
Changes the engine used to async embedded engine.Async engine allows multithreaded reading, however you need to deal with async world and it's quirks - like the fact that we've observed that initial offsets are set after engine starts reading, causing some additional records to be read (that were before saved last committed offset).
- Overrides:
withAsyncEngine
in classDebeziumCdcSources.Builder<T>
-
withErrorMaxRetries
Sets the maximum retry count in case of errors.Default value is -1, which means infinite retries.
- Overrides:
withErrorMaxRetries
in classDebeziumCdcSources.Builder<T>
-
setProperty
@Nonnull public PostgresCdcSources.Builder<T> setProperty(@Nonnull String key, @Nonnull String value) Sets a source property. These properties are passed to Debezium.- Overrides:
setProperty
in classDebeziumCdcSources.Builder<T>
-
setProperty
Sets a source property. These properties are passed to Debezium.- Overrides:
setProperty
in classDebeziumCdcSources.Builder<T>
-
setProperty
Sets a source property. These properties are passed to Debezium.- Overrides:
setProperty
in classDebeziumCdcSources.Builder<T>
-
setProperty
Sets a source property. These properties are passed to Debezium.- Overrides:
setProperty
in classDebeziumCdcSources.Builder<T>
-
build
Returns the source based on the properties set so far.- Overrides:
build
in classDebeziumCdcSources.Builder<T>
-