Class CdcSinks
- java.lang.Object
-
- com.hazelcast.jet.cdc.CdcSinks
-
public final class CdcSinks extends java.lang.Object
Contains factory methods for change data capture specific pipeline sinks. As a consequence these sinks takeChangeRecord
items as their input.These sinks can detect any reordering that might happen in the
ChangeRecord
stream (Jet pipelines use parallel execution, so item reordering can and does happen). Reordering detection is based on implementation-specific sequence numbers provided by CDC event sources. The sink reacts to reordering by dropping obsolete input items. The exact behavior is as follows. For each input item, the sink:-
applies the
keyFn
to the input item to extract its key - extracts the item's sequence number
- compares the sequence number with the previously seen sequence number for the same key, if any
- if the previous sequence number is more recent than the one observed in the input item, it drops (ignores) the input item
About the implementation-specific sequence numbers provided by the CDC sources. They consist of two parts:
- numeric sequence for which a monotonically increasing value is emitted by the source and which allows ordering of the event
- source descriptor which allows us to identify situations when the numeric sequence gets reset or any other events when comparing new numeric values with previous ones no longer makes sense
The sequence source is made up of information like ID of the database instance the connector is connected to, name of the binlog file being monitored and so on. So whenever the source reconnects to a new server or switches to a new binlog file or other such event, the source field of sequence numbers will change.
The logic of determining which event are more recent takes the sequence source into consideration. Whenever the source field changes, the event carrying it will be considered more recent than ones with the old source value. Numeric sequence numbers are compared to establish order only when their sources match.
Restarting the CDC Jet source will not change sequence number sources, only significant changes on the database side will.
- Since:
- Jet 4.2
-
applies the
-
-
Field Summary
Fields Modifier and Type Field Description static HazelcastProperty
SEQUENCE_CACHE_EXPIRATION_SECONDS
Number of seconds for which the sink will remember the last seen sequence number for an input key (used to detect reordering).
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <K,V>
Sink<ChangeRecord>map(IMap<? super K,? super V> map, FunctionEx<? super ChangeRecord,? extends K> keyFn, FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
.static <K,V>
Sink<ChangeRecord>map(java.lang.String mapName, FunctionEx<? super ChangeRecord,? extends K> keyFn, FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
.static <K,V>
Sink<ChangeRecord>remoteMap(java.lang.String mapName, ClientConfig clientConfig, FunctionEx<? super ChangeRecord,? extends K> keyFn, FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink equivalent tomap(java.lang.String, com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.cdc.ChangeRecord, ? extends K>, com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.cdc.ChangeRecord, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.
-
-
-
Field Detail
-
SEQUENCE_CACHE_EXPIRATION_SECONDS
public static final HazelcastProperty SEQUENCE_CACHE_EXPIRATION_SECONDS
Number of seconds for which the sink will remember the last seen sequence number for an input key (used to detect reordering). After this time the last-seen sequence number values will eventually be evicted, in order to save space.The default value is 10 seconds.
- Since:
- Jet 4.2
-
-
Method Detail
-
map
@Nonnull public static <K,V> Sink<ChangeRecord> map(@Nonnull java.lang.String mapName, @Nonnull FunctionEx<? super ChangeRecord,? extends K> keyFn, @Nonnull FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
. The main usage is to have theIMap
mirror the contents of the data table that is the source of the CDC stream, but since it accepts arbitrary key and value functions, other behaviors are possible as well.NOTE: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
For each item the sink receives, it uses the
keyFn
to determine which map key the change event applies to. Then, based on theChangeRecord
'sOperation
it decides to either:-
delete the key from the map (
Operation.DELETE
) -
insert a new value for the key
(
Operation.SYNC
&Operation.INSERT
) -
update the current value for the key (
Operation.UPDATE
)
valueFn
to the change record.NOTE: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (ie. even for update and insert records).- Since:
- Jet 4.2
-
delete the key from the map (
-
map
@Nonnull public static <K,V> Sink<ChangeRecord> map(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super ChangeRecord,? extends K> keyFn, @Nonnull FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink that applies the changes described by a Change Data Capture (CDC) stream to anIMap
. The main usage is to have theIMap
mirror the contents of the data table that is the source of the CDC stream, but since it accepts arbitrary key and value functions, other behaviors are possible as well.NOTE: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
For each item the sink receives it uses the
keyFn
to determine which map key the change event applies to. Then, based on theChangeRecord
'sOperation
it decides to either:-
delete the key from the map (
Operation.DELETE
) -
insert a new value for the key
(
Operation.SYNC
&Operation.INSERT
) -
update the current value for the key (
Operation.UPDATE
)
valueFn
to the change record.NOTE: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (ie. even for update and insert records).- Since:
- Jet 4.2
-
delete the key from the map (
-
remoteMap
@Nonnull public static <K,V> Sink<ChangeRecord> remoteMap(@Nonnull java.lang.String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super ChangeRecord,? extends K> keyFn, @Nonnull FunctionEx<? super ChangeRecord,? extends V> valueFn)
Returns a sink equivalent tomap(java.lang.String, com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.cdc.ChangeRecord, ? extends K>, com.hazelcast.function.FunctionEx<? super com.hazelcast.jet.cdc.ChangeRecord, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.NOTE 1: in order for the sink behavior to be predictable, the map should be non-existent or empty at the time the sink starts using it.
NOTE 2: if
valueFn
returnsnull
, then the key will be deleted no matter the operation (ie. even for update and insert records).Due to the used API, the remote cluster must be at least version 4.0.
- Since:
- Jet 4.2
-
-