Class CdcSinks
ChangeRecord
items
as their input.
These sinks can detect any reordering that might happen in
the ChangeRecord
stream (Jet pipelines use parallel
execution, so item reordering can and does happen). Reordering
detection is based on implementation-specific sequence numbers
provided by CDC event sources. The sink reacts to reordering by
dropping obsolete input items. The exact behavior is as follows. For
each input item, the sink:
-
applies the
keyFn
to the input item to extract its key - extracts the item's sequence number
- compares the sequence number with the previously seen sequence number for the same key, if any
- if the previous sequence number is more recent than the one observed in the input item, it drops (ignores) the input item
About the implementation-specific sequence numbers provided by the CDC sources. They consist of two parts:
- numeric sequence for which a monotonically increasing value is emitted by the source and which allows ordering of the event
- source descriptor which allows us to identify situations when the numeric sequence gets reset or any other events when comparing new numeric values with previous ones no longer makes sense
The sequence source is made up of information like ID of the database instance the connector is connected to, name of the binlog file being monitored and so on. So whenever the source reconnects to a new server or switches to a new binlog file or other such event, the source field of sequence numbers will change.
The logic of determining which event are more recent takes the sequence source into consideration. Whenever the source field changes, the event carrying it will be considered more recent than ones with the old source value. Numeric sequence numbers are compared to establish order only when their sources match.
Restarting the CDC Jet source will not change sequence number sources, only significant changes on the database side will.
- Since:
- 5.5
-
Field Summary
Modifier and TypeFieldDescriptionstatic final com.hazelcast.spi.properties.HazelcastProperty
Number of seconds for which the sink will remember the last seen sequence number for an input key (used to detect reordering). -
Method Summary
Modifier and TypeMethodDescriptionstatic <K,
V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> map
(com.hazelcast.map.IMap<? super K, ? super V> map, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
.static <K,
V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> map
(String mapName, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
.static <K,
V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> remoteMap
(String mapName, com.hazelcast.client.config.ClientConfig clientConfig, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink equivalent tomap(java.lang.String, com.hazelcast.function.FunctionEx<? super com.hazelcast.enterprise.jet.cdc.ChangeRecord, ? extends K>, com.hazelcast.function.FunctionEx<? super com.hazelcast.enterprise.jet.cdc.ChangeRecord, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.
-
Field Details
-
SEQUENCE_CACHE_EXPIRATION_SECONDS
public static final com.hazelcast.spi.properties.HazelcastProperty SEQUENCE_CACHE_EXPIRATION_SECONDSNumber of seconds for which the sink will remember the last seen sequence number for an input key (used to detect reordering). After this time the last-seen sequence number values will eventually be evicted, in order to save space.The default value is 10 seconds.
- Since:
- 5.5
-
-
Method Details
-
map
@Nonnull public static <K,V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> map(@Nonnull String mapName, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
. The main usage is to have theIMap
mirror the contents of the data table that is the source of the CDC stream, but since it accepts arbitrary key and value functions, other behaviors are possible as well.NOTE: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
For each item the sink receives, it uses the
keyFn
to determine which map key the change event applies to. Then, based on theChangeRecord
'sOperation
it decides to either:-
delete the key from the map (
Operation.DELETE
) -
insert a new value for the key
(
Operation.SYNC
&Operation.INSERT
) -
update the current value for the key (
Operation.UPDATE
)
valueFn
to the change record.NOTE: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (i.e. even for update and insert records).- Since:
- 5.5
-
delete the key from the map (
-
map
@Nonnull public static <K,V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> map(@Nonnull com.hazelcast.map.IMap<? super K, ? super V> map, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
. The main usage is to have theIMap
mirror the contents of the data table that is the source of the CDC stream, but since it accepts arbitrary key and value functions, other behaviors are possible as well.NOTE: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
For each item the sink receives it uses the
keyFn
to determine which map key the change event applies to. Then, based on theChangeRecord
'sOperation
it decides to either:-
delete the key from the map (
Operation.DELETE
) -
insert a new value for the key
(
Operation.SYNC
&Operation.INSERT
) -
update the current value for the key (
Operation.UPDATE
)
valueFn
to the change record.NOTE: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (i.e. even for update and insert records).- Since:
- 5.5
-
delete the key from the map (
-
remoteMap
@Nonnull public static <K,V> com.hazelcast.jet.pipeline.Sink<ChangeRecord> remoteMap(@Nonnull String mapName, @Nonnull com.hazelcast.client.config.ClientConfig clientConfig, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends K> keyFn, @Nonnull com.hazelcast.function.FunctionEx<? super ChangeRecord, ? extends V> valueFn) Returns a sink equivalent tomap(java.lang.String, com.hazelcast.function.FunctionEx<? super com.hazelcast.enterprise.jet.cdc.ChangeRecord, ? extends K>, com.hazelcast.function.FunctionEx<? super com.hazelcast.enterprise.jet.cdc.ChangeRecord, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.NOTE 1: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
NOTE 2: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (i.e. even for update and insert records).- Since:
- 5.5
-