From 514a045f757e71cc9f78034bed726e908a7498fd Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Thu, 12 Feb 2026 22:23:07 +0530 Subject: [PATCH 01/18] Add db specific fields --- ballerina/cdc_listener.bal | 4 +- ballerina/listener_types.bal | 71 ++++++++++++++++++++++++++++- ballerina/utils.bal | 87 ++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/ballerina/cdc_listener.bal b/ballerina/cdc_listener.bal index 1a02987a8..e5de0a4ca 100644 --- a/ballerina/cdc_listener.bal +++ b/ballerina/cdc_listener.bal @@ -31,8 +31,7 @@ public isolated class CdcListener { cdc:populateDebeziumProperties({ engineName: config.engineName, offsetStorage: config.offsetStorage, - internalSchemaStorage: config.internalSchemaStorage, - options: config.options + internalSchemaStorage: config.internalSchemaStorage }, debeziumConfigs ); cdc:populateDatabaseConfigurations({ @@ -50,6 +49,7 @@ public isolated class CdcListener { excludedColumns: config.database.excludedColumns }, debeziumConfigs); populatePostgresConfigurations(config.database, debeziumConfigs); + populatePostgresOptions(config.options, debeziumConfigs); map listenerConfigs = { ...debeziumConfigs }; diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index d3466e6cc..3aac17953 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -13,6 +13,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + import ballerinax/cdc; # Represents the PostgreSQL logical decoding plugins. @@ -24,6 +25,37 @@ public enum PostgreSQLLogicalDecodingPlugin { DECODERBUFS = "decoderbufs" } +# Represents PostgreSQL-specific advanced configuration options. +# +# + slotDropOnStop - Whether to drop the replication slot when the connector stops +# + publicationAutocreateMode - Mode for auto-creating publications (all_tables, disabled, filtered) +# + statusUpdateIntervalMs - Interval in milliseconds for sending status updates to the server +# + xminFetchIntervalMs - Interval in milliseconds for fetching the current xmin value +# + lsnFlushMode - Mode for flushing LSN to the server (always or lazy) +# + slotStreamParams - Parameters to pass to the replication slot stream +# + unavailableValuePlaceholder - Placeholder string for unavailable column values during TOAST handling +public type PostgresAdvancedConfiguration record {| + boolean slotDropOnStop = false; + cdc:PublicationAutocreateMode publicationAutocreateMode = ALL_TABLES; + int statusUpdateIntervalMs = 10000; + int xminFetchIntervalMs = 0; + cdc:LsnFlushMode lsnFlushMode?; + string slotStreamParams?; + string unavailableValuePlaceholder = "__debezium_unavailable_value"; +|}; + +# Represents relational database common configuration options. +# These properties are applicable to all relational databases (MySQL, PostgreSQL, SQL Server). +# +# + schemaIncludeList - List of schemas to include (comma-separated regular expressions) +# + schemaExcludeList - List of schemas to exclude (comma-separated regular expressions) +# + messageKeyColumns - Custom message key columns (format: schemaName.tableName:keyColumn1,keyColumn2) +public type RelationalCommonConfiguration record {| + string|string[] schemaIncludeList?; + string|string[] schemaExcludeList?; + string|string[] messageKeyColumns?; +|}; + # Represents the configuration for the Postgres CDC database connection. # # + connectorClass - The class name of the PostgreSQL connector implementation to use @@ -36,6 +68,8 @@ public enum PostgreSQLLogicalDecodingPlugin { # + pluginName - The name of the PostgreSQL logical decoding plug-in installed on the server # + slotName - The name of the PostgreSQL logical decoding slot # + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput. +# + postgresAdvancedConfig - PostgreSQL-specific advanced configuration options +# + relationalCommonConfig - Relational database common configuration options (applicable to MySQL, PostgreSQL, SQL Server) public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; @@ -48,12 +82,45 @@ public type PostgresDatabaseConnection record {| PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; string slotName = "debezium"; string publicationName = "dbz_publication"; + PostgresAdvancedConfiguration postgresAdvancedConfig?; + RelationalCommonConfiguration relationalCommonConfig?; |}; -# Represents the configuration for the Postgres CDC listener. +# PostgreSQL CDC listener configuration including database connection, storage, and CDC options. # -# + database - The Postgres database connection configuration +# + database - PostgreSQL database connection, logical decoding, and capture settings +# + engineName - Unique name for the CDC engine instance +# + internalSchemaStorage - Schema history storage backend (file, Kafka, memory, JDBC, Redis, S3, Azure Blob, RocketMQ) +# + offsetStorage - Offset storage backend for tracking connector progress (file, Kafka, memory, JDBC, Redis) +# + options - PostgreSQL-specific CDC options including snapshot, heartbeat, signals, and data type handling public type PostgresListenerConfiguration record {| PostgresDatabaseConnection database; *cdc:ListenerConfiguration; + PostgreSqlOptions options = {}; |}; + +# PostgreSQL-specific CDC options for configuring snapshot behavior and data type handling. +# +# + extendedSnapshot - Extended snapshot configuration with PostgreSQL-specific lock timeout and query settings +# + dataTypeConfig - Data type handling configuration including schema change tracking +public type PostgreSqlOptions record {| + *cdc:Options; + ExtendedSnapshotConfiguration extendedSnapshot?; + DataTypeConfiguration dataTypeConfig?; +|}; + +# Represents the extended snapshot configuration for the PostgreSQL CDC listener. +# +# + lockTimeout - Lock acquisition timeout in seconds +public type ExtendedSnapshotConfiguration record {| + *cdc:RelationalExtendedSnapshotConfiguration; + decimal lockTimeout = 10; +|}; + +# Represents data type handling configuration. +# +# + includeSchemaChanges - Whether to include schema change events +public type DataTypeConfiguration record {| + *cdc:DataTypeConfiguration; + boolean includeSchemaChanges = true; +|}; \ No newline at end of file diff --git a/ballerina/utils.bal b/ballerina/utils.bal index f84ab9614..9c91461ce 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -13,6 +13,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +import ballerinax/cdc; + const string SCHEMA_INCLUDE_LIST = "schema.include.list"; const string SCHEMA_EXCLUDE_LIST = "schema.exclude.list"; @@ -21,6 +23,52 @@ const string POSTGRESQL_PLUGIN_NAME = "plugin.name"; const string POSTGRESQL_SLOT_NAME = "slot.name"; const string POSTGRESQL_PUBLICATION_NAME = "publication.name"; +// PostgreSQL-specific advanced configuration properties +const string SLOT_DROP_ON_STOP = "slot.drop.on.stop"; +const string PUBLICATION_AUTOCREATE_MODE = "publication.autocreate.mode"; +const string STATUS_UPDATE_INTERVAL_MS = "status.update.interval.ms"; +const string XMIN_FETCH_INTERVAL_MS = "xmin.fetch.interval.ms"; +const string LSN_FLUSH_MODE = "lsn.flush.mode"; +const string SLOT_STREAM_PARAMS = "slot.stream.params"; +const string UNAVAILABLE_VALUE_PLACEHOLDER = "unavailable.value.placeholder"; + +// Relational-common configuration properties (applicable to MySQL, PostgreSQL, SQL Server) +const string MESSAGE_KEY_COLUMNS = "message.key.columns"; + +isolated function populatePostgresAdvancedConfiguration(PostgresAdvancedConfiguration? config, map configMap) { + if config is () { + return; + } + + configMap[SLOT_DROP_ON_STOP] = config.slotDropOnStop.toString(); + configMap[PUBLICATION_AUTOCREATE_MODE] = config.publicationAutocreateMode.toString(); + configMap[STATUS_UPDATE_INTERVAL_MS] = config.statusUpdateIntervalMs.toString(); + configMap[XMIN_FETCH_INTERVAL_MS] = config.xminFetchIntervalMs.toString(); + + if config.lsnFlushMode !is () { + configMap[LSN_FLUSH_MODE] = (config.lsnFlushMode ?: ALWAYS).toString(); + } + + if config.slotStreamParams !is () { + configMap[SLOT_STREAM_PARAMS] = config.slotStreamParams ?: ""; + } + + configMap[UNAVAILABLE_VALUE_PLACEHOLDER] = config.unavailableValuePlaceholder; +} + +isolated function populateRelationalCommonConfiguration(RelationalCommonConfiguration? config, map configMap) { + if config is () { + return; + } + + // Note: schemaIncludeList and schemaExcludeList are already handled by populateSchemaConfigurations + // from the top-level includedSchemas/excludedSchemas fields for backward compatibility + + if config.messageKeyColumns !is () { + string|string[] keyColumns = config.messageKeyColumns ?: ""; + configMap[MESSAGE_KEY_COLUMNS] = keyColumns is string ? keyColumns : string:'join(";", ...keyColumns); + } +} // Populates PostgreSQL-specific configurations isolated function populatePostgresConfigurations(PostgresDatabaseConnection connection, map configMap) { @@ -29,6 +77,12 @@ isolated function populatePostgresConfigurations(PostgresDatabaseConnection conn configMap[POSTGRESQL_PLUGIN_NAME] = connection.pluginName; configMap[POSTGRESQL_SLOT_NAME] = connection.slotName; configMap[POSTGRESQL_PUBLICATION_NAME] = connection.publicationName; + + // Populate PostgreSQL-specific advanced configuration + populatePostgresAdvancedConfiguration(connection.postgresAdvancedConfig, configMap); + + // Populate relational-common configuration + populateRelationalCommonConfiguration(connection.relationalCommonConfig, configMap); } // Populates schema inclusion/exclusion configurations @@ -43,3 +97,36 @@ isolated function populateSchemaConfigurations(PostgresDatabaseConnection connec configMap[SCHEMA_EXCLUDE_LIST] = excludedSchemas is string ? excludedSchemas : string:'join(",", ...excludedSchemas); } } + +const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; +const string INCLUDE_SCHEMA_CHANGES = "include.schema.changes"; + +// Populates PostgreSQL-specific options +isolated function populatePostgresOptions(PostgreSqlOptions options, map configMap) { + // Populate common options from cdc module + cdc:populateOptions(options, configMap); + + // Populate PostgreSQL-specific extended snapshot configuration + ExtendedSnapshotConfiguration? extendedSnapshot = options.extendedSnapshot; + if extendedSnapshot is ExtendedSnapshotConfiguration { + cdc:populateRelationalExtendedSnapshotConfiguration(extendedSnapshot, configMap); + populatePostgresExtendedSnapshotConfiguration(extendedSnapshot, configMap); + } + + // Populate PostgreSQL-specific data type configuration + DataTypeConfiguration? dataTypeConfig = options.dataTypeConfig; + if dataTypeConfig is DataTypeConfiguration { + cdc:populateDataTypeConfiguration(dataTypeConfig, configMap); + configMap[INCLUDE_SCHEMA_CHANGES] = dataTypeConfig.includeSchemaChanges.toString(); + } +} + +// Populates PostgreSQL-specific extended snapshot properties +isolated function populatePostgresExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map configMap) { + configMap[SNAPSHOT_LOCK_TIMEOUT_MS] = getMillisecondValueOf(config.lockTimeout); +} + +isolated function getMillisecondValueOf(decimal value) returns string { + string milliSecondVal = (value * 1000).toBalString(); + return milliSecondVal.substring(0, milliSecondVal.indexOf(".") ?: milliSecondVal.length()); +} From 1e33ff363f42b310f103888064af37eb2f978d47 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Fri, 13 Feb 2026 17:00:21 +0530 Subject: [PATCH 02/18] Add db specific fields --- ballerina/listener_types.bal | 95 ++++++++++++++++++++---------------- ballerina/utils.bal | 74 ++++++++++++++-------------- 2 files changed, 91 insertions(+), 78 deletions(-) diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index 3aac17953..4940d9be5 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -25,35 +25,58 @@ public enum PostgreSQLLogicalDecodingPlugin { DECODERBUFS = "decoderbufs" } -# Represents PostgreSQL-specific advanced configuration options. +# Represents publication autocreate modes. +public enum PublicationAutocreateMode { + ALL_TABLES = "all_tables", + DISABLED = "disabled", + FILTERED = "filtered" +} + +# Represents LSN flush modes. +public enum LsnFlushMode { + MANUAL = "manual", + CONNECTOR = "connector", + CONNECTOR_AND_DRIVER = "connector_and_driver" +} + +# PostgreSQL replication configuration (logical decoding). # -# + slotDropOnStop - Whether to drop the replication slot when the connector stops -# + publicationAutocreateMode - Mode for auto-creating publications (all_tables, disabled, filtered) -# + statusUpdateIntervalMs - Interval in milliseconds for sending status updates to the server -# + xminFetchIntervalMs - Interval in milliseconds for fetching the current xmin value -# + lsnFlushMode - Mode for flushing LSN to the server (always or lazy) -# + slotStreamParams - Parameters to pass to the replication slot stream -# + unavailableValuePlaceholder - Placeholder string for unavailable column values during TOAST handling -public type PostgresAdvancedConfiguration record {| +# + pluginName - Logical decoding plugin to use (pgoutput, decoderbufs) +# + slotName - Name of the PostgreSQL logical replication slot +# + slotDropOnStop - Drop replication slot when connector stops +# + slotStreamParams - Custom replication slot parameters +public type ReplicationConfiguration record {| + PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; + string slotName = "debezium"; boolean slotDropOnStop = false; - cdc:PublicationAutocreateMode publicationAutocreateMode = ALL_TABLES; + string slotStreamParams?; +|}; + +# PostgreSQL publication configuration (pgoutput plugin). +# +# + publicationName - Name of PostgreSQL publication +# + publicationAutocreateMode - Mode for auto-creating publications +public type PublicationConfiguration record {| + string publicationName = "dbz_publication"; + PublicationAutocreateMode publicationAutocreateMode = ALL_TABLES; +|}; + +# PostgreSQL streaming and status configuration. +# +# + statusUpdateIntervalMs - Interval for sending status updates to PostgreSQL +# + xminFetchIntervalMs - Interval for fetching current xmin position +# + lsnFlushMode - LSN flushing strategy +public type StreamingConfiguration record {| int statusUpdateIntervalMs = 10000; int xminFetchIntervalMs = 0; - cdc:LsnFlushMode lsnFlushMode?; - string slotStreamParams?; - string unavailableValuePlaceholder = "__debezium_unavailable_value"; + LsnFlushMode lsnFlushMode?; |}; -# Represents relational database common configuration options. -# These properties are applicable to all relational databases (MySQL, PostgreSQL, SQL Server). +# PostgreSQL data handling configuration. # -# + schemaIncludeList - List of schemas to include (comma-separated regular expressions) -# + schemaExcludeList - List of schemas to exclude (comma-separated regular expressions) -# + messageKeyColumns - Custom message key columns (format: schemaName.tableName:keyColumn1,keyColumn2) -public type RelationalCommonConfiguration record {| - string|string[] schemaIncludeList?; - string|string[] schemaExcludeList?; - string|string[] messageKeyColumns?; +# + unavailableValuePlaceholder - Placeholder for unavailable TOAST values +public type DataHandlingConfiguration record {| + string unavailableValuePlaceholder = "__debezium_unavailable_value"; |}; # Represents the configuration for the Postgres CDC database connection. @@ -65,11 +88,10 @@ public type RelationalCommonConfiguration record {| # + includedSchemas - A list of regular expressions matching fully-qualified schema identifiers to capture changes from # + excludedSchemas - A list of regular expressions matching fully-qualified schema identifiers to exclude from change capture # + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable -# + pluginName - The name of the PostgreSQL logical decoding plug-in installed on the server -# + slotName - The name of the PostgreSQL logical decoding slot -# + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput. -# + postgresAdvancedConfig - PostgreSQL-specific advanced configuration options -# + relationalCommonConfig - Relational database common configuration options (applicable to MySQL, PostgreSQL, SQL Server) +# + replicationConfig - PostgreSQL replication configuration (logical decoding) +# + publicationConfig - PostgreSQL publication configuration (pgoutput plugin) +# + streamingConfig - PostgreSQL streaming and status configuration +# + dataHandlingConfig - PostgreSQL data handling configuration public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; @@ -79,11 +101,10 @@ public type PostgresDatabaseConnection record {| string|string[] includedSchemas?; string|string[] excludedSchemas?; int tasksMax = 1; - PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; - string slotName = "debezium"; - string publicationName = "dbz_publication"; - PostgresAdvancedConfiguration postgresAdvancedConfig?; - RelationalCommonConfiguration relationalCommonConfig?; + ReplicationConfiguration replicationConfig = {}; + PublicationConfiguration publicationConfig = {}; + StreamingConfiguration streamingConfig = {}; + DataHandlingConfiguration dataHandlingConfig = {}; |}; # PostgreSQL CDC listener configuration including database connection, storage, and CDC options. @@ -106,7 +127,7 @@ public type PostgresListenerConfiguration record {| public type PostgreSqlOptions record {| *cdc:Options; ExtendedSnapshotConfiguration extendedSnapshot?; - DataTypeConfiguration dataTypeConfig?; + cdc:DataTypeConfiguration dataTypeConfig?; |}; # Represents the extended snapshot configuration for the PostgreSQL CDC listener. @@ -116,11 +137,3 @@ public type ExtendedSnapshotConfiguration record {| *cdc:RelationalExtendedSnapshotConfiguration; decimal lockTimeout = 10; |}; - -# Represents data type handling configuration. -# -# + includeSchemaChanges - Whether to include schema change events -public type DataTypeConfiguration record {| - *cdc:DataTypeConfiguration; - boolean includeSchemaChanges = true; -|}; \ No newline at end of file diff --git a/ballerina/utils.bal b/ballerina/utils.bal index 9c91461ce..0fabdeea2 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -35,54 +35,56 @@ const string UNAVAILABLE_VALUE_PLACEHOLDER = "unavailable.value.placeholder"; // Relational-common configuration properties (applicable to MySQL, PostgreSQL, SQL Server) const string MESSAGE_KEY_COLUMNS = "message.key.columns"; -isolated function populatePostgresAdvancedConfiguration(PostgresAdvancedConfiguration? config, map configMap) { - if config is () { - return; +// Populates PostgreSQL replication configuration +isolated function populateReplicationConfiguration(ReplicationConfiguration config, map configMap) { + configMap[POSTGRESQL_PLUGIN_NAME] = config.pluginName; + configMap[POSTGRESQL_SLOT_NAME] = config.slotName; + configMap[SLOT_DROP_ON_STOP] = config.slotDropOnStop.toString(); + + string? slotStreamParams = config.slotStreamParams; + if slotStreamParams !is () { + configMap[SLOT_STREAM_PARAMS] = slotStreamParams; } +} - configMap[SLOT_DROP_ON_STOP] = config.slotDropOnStop.toString(); +// Populates PostgreSQL publication configuration +isolated function populatePublicationConfiguration(PublicationConfiguration config, map configMap) { + configMap[POSTGRESQL_PUBLICATION_NAME] = config.publicationName; configMap[PUBLICATION_AUTOCREATE_MODE] = config.publicationAutocreateMode.toString(); +} + +// Populates PostgreSQL streaming configuration +isolated function populateStreamingConfiguration(StreamingConfiguration config, map configMap) { configMap[STATUS_UPDATE_INTERVAL_MS] = config.statusUpdateIntervalMs.toString(); configMap[XMIN_FETCH_INTERVAL_MS] = config.xminFetchIntervalMs.toString(); - if config.lsnFlushMode !is () { - configMap[LSN_FLUSH_MODE] = (config.lsnFlushMode ?: ALWAYS).toString(); - } - - if config.slotStreamParams !is () { - configMap[SLOT_STREAM_PARAMS] = config.slotStreamParams ?: ""; + cdc:LsnFlushMode? lsnFlushMode = config.lsnFlushMode; + if lsnFlushMode !is () { + configMap[LSN_FLUSH_MODE] = lsnFlushMode.toString(); } - - configMap[UNAVAILABLE_VALUE_PLACEHOLDER] = config.unavailableValuePlaceholder; } -isolated function populateRelationalCommonConfiguration(RelationalCommonConfiguration? config, map configMap) { - if config is () { - return; - } - - // Note: schemaIncludeList and schemaExcludeList are already handled by populateSchemaConfigurations - // from the top-level includedSchemas/excludedSchemas fields for backward compatibility - - if config.messageKeyColumns !is () { - string|string[] keyColumns = config.messageKeyColumns ?: ""; - configMap[MESSAGE_KEY_COLUMNS] = keyColumns is string ? keyColumns : string:'join(";", ...keyColumns); - } +// Populates PostgreSQL data handling configuration +isolated function populateDataHandlingConfiguration(DataHandlingConfiguration config, map configMap) { + configMap[UNAVAILABLE_VALUE_PLACEHOLDER] = config.unavailableValuePlaceholder; } // Populates PostgreSQL-specific configurations isolated function populatePostgresConfigurations(PostgresDatabaseConnection connection, map configMap) { configMap[POSTGRESQL_DATABASE_NAME] = connection.databaseName; populateSchemaConfigurations(connection, configMap); - configMap[POSTGRESQL_PLUGIN_NAME] = connection.pluginName; - configMap[POSTGRESQL_SLOT_NAME] = connection.slotName; - configMap[POSTGRESQL_PUBLICATION_NAME] = connection.publicationName; - // Populate PostgreSQL-specific advanced configuration - populatePostgresAdvancedConfiguration(connection.postgresAdvancedConfig, configMap); + // Populate PostgreSQL replication configuration + populateReplicationConfiguration(connection.replicationConfig, configMap); + + // Populate PostgreSQL publication configuration + populatePublicationConfiguration(connection.publicationConfig, configMap); + + // Populate PostgreSQL streaming configuration + populateStreamingConfiguration(connection.streamingConfig, configMap); - // Populate relational-common configuration - populateRelationalCommonConfiguration(connection.relationalCommonConfig, configMap); + // Populate PostgreSQL data handling configuration + populateDataHandlingConfiguration(connection.dataHandlingConfig, configMap); } // Populates schema inclusion/exclusion configurations @@ -99,12 +101,11 @@ isolated function populateSchemaConfigurations(PostgresDatabaseConnection connec } const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; -const string INCLUDE_SCHEMA_CHANGES = "include.schema.changes"; // Populates PostgreSQL-specific options isolated function populatePostgresOptions(PostgreSqlOptions options, map configMap) { // Populate common options from cdc module - cdc:populateOptions(options, configMap); + cdc:populateOptions(options, configMap, typeof options); // Populate PostgreSQL-specific extended snapshot configuration ExtendedSnapshotConfiguration? extendedSnapshot = options.extendedSnapshot; @@ -113,11 +114,10 @@ isolated function populatePostgresOptions(PostgreSqlOptions options, map populatePostgresExtendedSnapshotConfiguration(extendedSnapshot, configMap); } - // Populate PostgreSQL-specific data type configuration - DataTypeConfiguration? dataTypeConfig = options.dataTypeConfig; - if dataTypeConfig is DataTypeConfiguration { + // PostgreSQL uses generic cdc:DataTypeConfiguration (no PostgreSQL-specific extensions) + cdc:DataTypeConfiguration? dataTypeConfig = options.dataTypeConfig; + if dataTypeConfig is cdc:DataTypeConfiguration { cdc:populateDataTypeConfiguration(dataTypeConfig, configMap); - configMap[INCLUDE_SCHEMA_CHANGES] = dataTypeConfig.includeSchemaChanges.toString(); } } From 0030095c66f56bf5675dc73b630668c84dc12268 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Tue, 17 Feb 2026 08:10:42 +0530 Subject: [PATCH 03/18] temp commit --- ballerina/Ballerina.toml | 6 + ballerina/tests/listener_tests.bal | 185 +++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 37e582e39..572a72a57 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -24,3 +24,9 @@ artifactId = "sql-native" version = "1.16.0" path = "./lib/sql-native-1.16.0.jar" +# TODO: remove +[[dependency]] +org="ballerinax" +name="cdc" +version="1.1.1" +repository="local" diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index 2db0a31f5..5fa3365c2 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -257,3 +257,188 @@ function testCdcListenerEvents() returns error? { check testListener.gracefulStop(); } + +// ========== DATABASE-SPECIFIC CONFIGURATION TESTS ========== + +@test:Config {groups: ["postgres-replication"]} +function testPostgresReplicationConfiguration() { + map expectedProperties = { + "plugin.name": "decoderbufs", + "slot.name": "custom_slot", + "slot.drop.on.stop": "true", + "slot.stream.params": "include-unchanged-toast=true" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + replicationConfig: { + pluginName: DECODERBUFS, + slotName: "custom_slot", + slotDropOnStop: true, + slotStreamParams: "include-unchanged-toast=true" + } + }; + + map actualProperties = {}; + populatePostgresConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["plugin.name"], + expectedProperties["plugin.name"], + msg = "Plugin name does not match."); + test:assertEquals(actualProperties["slot.drop.on.stop"], + expectedProperties["slot.drop.on.stop"], + msg = "Slot drop on stop does not match."); +} + +@test:Config {groups: ["postgres-publication"]} +function testPostgresPublicationConfiguration() { + map expectedProperties = { + "publication.name": "my_publication", + "publication.autocreate.mode": "filtered" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + publicationConfig: { + publicationName: "my_publication", + publicationAutocreateMode: FILTERED + } + }; + + map actualProperties = {}; + populatePostgresConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["publication.name"], + expectedProperties["publication.name"], + msg = "Publication name does not match."); + test:assertEquals(actualProperties["publication.autocreate.mode"], + expectedProperties["publication.autocreate.mode"], + msg = "Publication autocreate mode does not match."); +} + +@test:Config {groups: ["postgres-streaming"]} +function testPostgresStreamingConfiguration() { + map expectedProperties = { + "status.update.interval.ms": "5000", + "xmin.fetch.interval.ms": "1000", + "lsn.flush.mode": "connector" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + streamingConfig: { + statusUpdateIntervalMs: 5000, + xminFetchIntervalMs: 1000, + lsnFlushMode: CONNECTOR + } + }; + + map actualProperties = {}; + populatePostgresConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["status.update.interval.ms"], + expectedProperties["status.update.interval.ms"], + msg = "Status update interval does not match."); + test:assertEquals(actualProperties["lsn.flush.mode"], + expectedProperties["lsn.flush.mode"], + msg = "LSN flush mode does not match."); +} + +@test:Config {groups: ["postgres-datahandling"]} +function testPostgresDataHandlingConfiguration() { + map expectedProperties = { + "unavailable.value.placeholder": "__custom_unavailable__" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + dataHandlingConfig: { + unavailableValuePlaceholder: "__custom_unavailable__" + } + }; + + map actualProperties = {}; + populatePostgresConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["unavailable.value.placeholder"], + expectedProperties["unavailable.value.placeholder"], + msg = "Unavailable value placeholder does not match."); +} + +@test:Config {groups: ["postgres-relational"]} +function testPostgresRelationalCommonConfiguration() { + map expectedProperties = { + "schema.include.list": "public,custom", + "message.key.columns": "db.table1:id;db.table2:key" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + relationalCommonConfig: { + schemaIncludeList: ["public", "custom"], + messageKeyColumns: "db.table1:id;db.table2:key" + } + }; + + map actualProperties = {}; + populatePostgresConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["schema.include.list"], + expectedProperties["schema.include.list"], + msg = "Schema include list does not match."); +} + +@test:Config {groups: ["postgres-snapshot"]} +function testPostgresExtendedSnapshotConfiguration() { + map expectedProperties = { + "snapshot.lock.timeout.ms": "20000" + }; + + PostgreSqlOptions options = { + extendedSnapshot: { + lockTimeout: 20 + } + }; + + map actualProperties = {}; + populatePostgresOptions(options, actualProperties); + + test:assertEquals(actualProperties["snapshot.lock.timeout.ms"], + expectedProperties["snapshot.lock.timeout.ms"], + msg = "Snapshot lock timeout does not match."); +} + +@test:Config {groups: ["postgres-options"]} +function testPostgresOptionsWithHeartbeat() { + map expectedProperties = { + "heartbeat.interval.ms": "15000", + "heartbeat.action.query": "SELECT NOW()" + }; + + PostgreSqlOptions options = { + heartbeat: { + interval: 15, + actionQuery: "SELECT NOW()" + } + }; + + map actualProperties = {}; + populatePostgresOptions(options, actualProperties); + + test:assertEquals(actualProperties["heartbeat.interval.ms"], + expectedProperties["heartbeat.interval.ms"], + msg = "Heartbeat interval does not match."); + test:assertEquals(actualProperties["heartbeat.action.query"], + expectedProperties["heartbeat.action.query"], + msg = "Heartbeat action query does not match."); +} From b3c5aade6ae92b7c4e324f09d9bc70c88e2a8de4 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Thu, 19 Feb 2026 14:01:03 +0530 Subject: [PATCH 04/18] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 572a72a57..37e582e39 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -24,9 +24,3 @@ artifactId = "sql-native" version = "1.16.0" path = "./lib/sql-native-1.16.0.jar" -# TODO: remove -[[dependency]] -org="ballerinax" -name="cdc" -version="1.1.1" -repository="local" From cee3f07414906775a6d28e7a5627907c469fe5ea Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Wed, 25 Feb 2026 11:02:14 +0530 Subject: [PATCH 05/18] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 70 +++---------------------------------- 1 file changed, 4 insertions(+), 66 deletions(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 9c872b8c0..4a54d02ea 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,14 +7,6 @@ dependencies-toml-version = "2" distribution-version = "2201.12.0" -[[package]] -org = "ballerina" -name = "avro" -version = "1.2.1" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - [[package]] org = "ballerina" name = "crypto" @@ -75,6 +67,7 @@ modules = [ org = "ballerina" name = "lang.__internal" version = "0.0.0" +scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.object"} @@ -99,16 +92,6 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] -[[package]] -org = "ballerina" -name = "lang.int" -version = "0.0.0" -dependencies = [ - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.__internal"}, - {org = "ballerina", name = "lang.object"} -] - [[package]] org = "ballerina" name = "lang.object" @@ -224,17 +207,6 @@ modules = [ {org = "ballerina", packageName = "time", moduleName = "time"} ] -[[package]] -org = "ballerina" -name = "uuid" -version = "1.10.0" -dependencies = [ - {org = "ballerina", name = "crypto"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.int"}, - {org = "ballerina", name = "time"} -] - [[package]] org = "ballerinai" name = "observe" @@ -247,53 +219,19 @@ dependencies = [ [[package]] org = "ballerinax" name = "cdc" -version = "1.2.1" +version = "1.2.2" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "log"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "kafka"} + {org = "ballerinai", name = "observe"} ] modules = [ {org = "ballerinax", packageName = "cdc", moduleName = "cdc"} ] -[[package]] -org = "ballerinax" -name = "confluent.cavroserdes" -version = "1.0.2" -dependencies = [ - {org = "ballerina", name = "avro"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "confluent.cregistry"} -] - -[[package]] -org = "ballerinax" -name = "confluent.cregistry" -version = "0.4.4" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - -[[package]] -org = "ballerinax" -name = "kafka" -version = "4.6.4" -dependencies = [ - {org = "ballerina", name = "crypto"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "time"}, - {org = "ballerina", name = "uuid"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "confluent.cavroserdes"}, - {org = "ballerinax", name = "confluent.cregistry"} -] - [[package]] org = "ballerinax" name = "postgresql" @@ -331,7 +269,7 @@ modules = [ [[package]] org = "ballerinax" name = "postgresql.driver" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" modules = [ {org = "ballerinax", packageName = "postgresql.driver", moduleName = "postgresql.driver"} From 3cb0e503f9bc90c59afdf0403467c38e4e179259 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Wed, 25 Feb 2026 13:24:39 +0530 Subject: [PATCH 06/18] Update to use refactored cdc --- ballerina/cdc_listener.bal | 37 +++------- ballerina/listener_types.bal | 15 ++-- ballerina/tests/listener_tests.bal | 52 +++++--------- ballerina/utils.bal | 112 +++++++++++++++-------------- build.gradle | 4 -- gradle.properties | 6 +- 6 files changed, 90 insertions(+), 136 deletions(-) diff --git a/ballerina/cdc_listener.bal b/ballerina/cdc_listener.bal index e5de0a4ca..d76745763 100644 --- a/ballerina/cdc_listener.bal +++ b/ballerina/cdc_listener.bal @@ -19,7 +19,8 @@ import ballerinax/cdc; public isolated class CdcListener { *cdc:Listener; - private final map & readonly config; + private final map & readonly debeziumConfigs; + private final map & readonly listenerConfigs; private boolean isStarted = false; private boolean hasAttachedService = false; @@ -28,33 +29,11 @@ public isolated class CdcListener { # + config - The configuration for the Postgresql connector public isolated function init(*PostgresListenerConfiguration config) { map debeziumConfigs = {}; - cdc:populateDebeziumProperties({ - engineName: config.engineName, - offsetStorage: config.offsetStorage, - internalSchemaStorage: config.internalSchemaStorage - }, debeziumConfigs - ); - cdc:populateDatabaseConfigurations({ - connectorClass: config.database.connectorClass, - hostname: config.database.hostname, - port: config.database.port, - username: config.database.username, - password: config.database.password, - connectTimeout: config.database.connectTimeout, - tasksMax: config.database.tasksMax, - secure: config.database.secure, - includedTables: config.database.includedTables, - excludedTables: config.database.excludedTables, - includedColumns: config.database.includedColumns, - excludedColumns: config.database.excludedColumns - }, debeziumConfigs); - populatePostgresConfigurations(config.database, debeziumConfigs); - populatePostgresOptions(config.options, debeziumConfigs); - map listenerConfigs = { - ...debeziumConfigs - }; - listenerConfigs["livenessInterval"] = config.livenessInterval; - self.config = listenerConfigs.cloneReadOnly(); + map listenerConfigs = {}; + populateDebeziumProperties(config, debeziumConfigs); + cdc:populateListenerProperties(config, listenerConfigs); + self.debeziumConfigs = debeziumConfigs.cloneReadOnly(); + self.listenerConfigs = listenerConfigs.cloneReadOnly(); } # Attaches a CDC service to the Postgresql listener. @@ -70,7 +49,7 @@ public isolated class CdcListener { # # + return - An error if the listener cannot be started, or `()` if successful public isolated function 'start() returns cdc:Error? { - check cdc:externStart(self, self.config); + check cdc:externStart(self, self.debeziumConfigs, self.listenerConfigs); } # Detaches a CDC service from the Postgresql listener. diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index 4940d9be5..eda3b7c08 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -88,10 +88,6 @@ public type DataHandlingConfiguration record {| # + includedSchemas - A list of regular expressions matching fully-qualified schema identifiers to capture changes from # + excludedSchemas - A list of regular expressions matching fully-qualified schema identifiers to exclude from change capture # + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable -# + replicationConfig - PostgreSQL replication configuration (logical decoding) -# + publicationConfig - PostgreSQL publication configuration (pgoutput plugin) -# + streamingConfig - PostgreSQL streaming and status configuration -# + dataHandlingConfig - PostgreSQL data handling configuration public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; @@ -101,18 +97,15 @@ public type PostgresDatabaseConnection record {| string|string[] includedSchemas?; string|string[] excludedSchemas?; int tasksMax = 1; - ReplicationConfiguration replicationConfig = {}; - PublicationConfiguration publicationConfig = {}; - StreamingConfiguration streamingConfig = {}; - DataHandlingConfiguration dataHandlingConfig = {}; + *ReplicationConfiguration; + *PublicationConfiguration; + *StreamingConfiguration; + *DataHandlingConfiguration; |}; # PostgreSQL CDC listener configuration including database connection, storage, and CDC options. # # + database - PostgreSQL database connection, logical decoding, and capture settings -# + engineName - Unique name for the CDC engine instance -# + internalSchemaStorage - Schema history storage backend (file, Kafka, memory, JDBC, Redis, S3, Azure Blob, RocketMQ) -# + offsetStorage - Offset storage backend for tracking connector progress (file, Kafka, memory, JDBC, Redis) # + options - PostgreSQL-specific CDC options including snapshot, heartbeat, signals, and data type handling public type PostgresListenerConfiguration record {| PostgresDatabaseConnection database; diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index 5fa3365c2..68e534941 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -258,8 +258,6 @@ function testCdcListenerEvents() returns error? { check testListener.gracefulStop(); } -// ========== DATABASE-SPECIFIC CONFIGURATION TESTS ========== - @test:Config {groups: ["postgres-replication"]} function testPostgresReplicationConfiguration() { map expectedProperties = { @@ -273,16 +271,14 @@ function testPostgresReplicationConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - replicationConfig: { - pluginName: DECODERBUFS, - slotName: "custom_slot", - slotDropOnStop: true, - slotStreamParams: "include-unchanged-toast=true" - } + pluginName: DECODERBUFS, + slotName: "custom_slot", + slotDropOnStop: true, + slotStreamParams: "include-unchanged-toast=true" }; map actualProperties = {}; - populatePostgresConfigurations(connection, actualProperties); + populateDatabaseConfigurations(connection, actualProperties); test:assertEquals(actualProperties["plugin.name"], expectedProperties["plugin.name"], @@ -303,14 +299,12 @@ function testPostgresPublicationConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - publicationConfig: { - publicationName: "my_publication", - publicationAutocreateMode: FILTERED - } + publicationName: "my_publication", + publicationAutocreateMode: FILTERED }; map actualProperties = {}; - populatePostgresConfigurations(connection, actualProperties); + populateDatabaseConfigurations(connection, actualProperties); test:assertEquals(actualProperties["publication.name"], expectedProperties["publication.name"], @@ -332,15 +326,13 @@ function testPostgresStreamingConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - streamingConfig: { - statusUpdateIntervalMs: 5000, - xminFetchIntervalMs: 1000, - lsnFlushMode: CONNECTOR - } + statusUpdateIntervalMs: 5000, + xminFetchIntervalMs: 1000, + lsnFlushMode: CONNECTOR }; map actualProperties = {}; - populatePostgresConfigurations(connection, actualProperties); + populateDatabaseConfigurations(connection, actualProperties); test:assertEquals(actualProperties["status.update.interval.ms"], expectedProperties["status.update.interval.ms"], @@ -360,13 +352,11 @@ function testPostgresDataHandlingConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - dataHandlingConfig: { - unavailableValuePlaceholder: "__custom_unavailable__" - } + unavailableValuePlaceholder: "__custom_unavailable__" }; map actualProperties = {}; - populatePostgresConfigurations(connection, actualProperties); + populateDatabaseConfigurations(connection, actualProperties); test:assertEquals(actualProperties["unavailable.value.placeholder"], expectedProperties["unavailable.value.placeholder"], @@ -376,22 +366,18 @@ function testPostgresDataHandlingConfiguration() { @test:Config {groups: ["postgres-relational"]} function testPostgresRelationalCommonConfiguration() { map expectedProperties = { - "schema.include.list": "public,custom", - "message.key.columns": "db.table1:id;db.table2:key" + "schema.include.list": "public,custom" }; PostgresDatabaseConnection connection = { username: "testuser", password: "testpass", databaseName: "testdb", - relationalCommonConfig: { - schemaIncludeList: ["public", "custom"], - messageKeyColumns: "db.table1:id;db.table2:key" - } + includedSchemas: ["public", "custom"] }; map actualProperties = {}; - populatePostgresConfigurations(connection, actualProperties); + populateDatabaseConfigurations(connection, actualProperties); test:assertEquals(actualProperties["schema.include.list"], expectedProperties["schema.include.list"], @@ -411,7 +397,7 @@ function testPostgresExtendedSnapshotConfiguration() { }; map actualProperties = {}; - populatePostgresOptions(options, actualProperties); + populateOptions(options, actualProperties); test:assertEquals(actualProperties["snapshot.lock.timeout.ms"], expectedProperties["snapshot.lock.timeout.ms"], @@ -433,7 +419,7 @@ function testPostgresOptionsWithHeartbeat() { }; map actualProperties = {}; - populatePostgresOptions(options, actualProperties); + populateOptions(options, actualProperties); test:assertEquals(actualProperties["heartbeat.interval.ms"], expectedProperties["heartbeat.interval.ms"], diff --git a/ballerina/utils.bal b/ballerina/utils.bal index 0fabdeea2..04d668911 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -35,95 +35,99 @@ const string UNAVAILABLE_VALUE_PLACEHOLDER = "unavailable.value.placeholder"; // Relational-common configuration properties (applicable to MySQL, PostgreSQL, SQL Server) const string MESSAGE_KEY_COLUMNS = "message.key.columns"; -// Populates PostgreSQL replication configuration -isolated function populateReplicationConfiguration(ReplicationConfiguration config, map configMap) { - configMap[POSTGRESQL_PLUGIN_NAME] = config.pluginName; - configMap[POSTGRESQL_SLOT_NAME] = config.slotName; - configMap[SLOT_DROP_ON_STOP] = config.slotDropOnStop.toString(); +isolated function populateDebeziumProperties(PostgresListenerConfiguration config, map debeziumConfigs) { + cdc:populateDebeziumProperties({ + engineName: config.engineName, + offsetStorage: config.offsetStorage, + internalSchemaStorage: config.internalSchemaStorage + }, debeziumConfigs); + populateDatabaseConfigurations(config.database, debeziumConfigs); + populateOptions(config.options, debeziumConfigs); +} + - string? slotStreamParams = config.slotStreamParams; +// Populates PostgreSQL-specific configurations +isolated function populateDatabaseConfigurations(PostgresDatabaseConnection database, map debeziumConfigs) { + cdc:populateDatabaseConfigurations({ + connectorClass: database.connectorClass, + hostname: database.hostname, + port: database.port, + username: database.username, + password: database.password, + connectTimeout: database.connectTimeout, + tasksMax: database.tasksMax, + secure: database.secure, + includedTables: database.includedTables, + excludedTables: database.excludedTables, + includedColumns: database.includedColumns, + excludedColumns: database.excludedColumns + }, debeziumConfigs); + + debeziumConfigs[POSTGRESQL_DATABASE_NAME] = database.databaseName; + populateSchemaConfigurations(database, debeziumConfigs); + + // Replication configuration (fields inlined from ReplicationConfiguration) + debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = database.pluginName; + debeziumConfigs[POSTGRESQL_SLOT_NAME] = database.slotName; + debeziumConfigs[SLOT_DROP_ON_STOP] = database.slotDropOnStop.toString(); + string? slotStreamParams = database.slotStreamParams; if slotStreamParams !is () { - configMap[SLOT_STREAM_PARAMS] = slotStreamParams; + debeziumConfigs[SLOT_STREAM_PARAMS] = slotStreamParams; } -} -// Populates PostgreSQL publication configuration -isolated function populatePublicationConfiguration(PublicationConfiguration config, map configMap) { - configMap[POSTGRESQL_PUBLICATION_NAME] = config.publicationName; - configMap[PUBLICATION_AUTOCREATE_MODE] = config.publicationAutocreateMode.toString(); -} + // Publication configuration (fields inlined from PublicationConfiguration) + debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = database.publicationName; + debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = database.publicationAutocreateMode.toString(); -// Populates PostgreSQL streaming configuration -isolated function populateStreamingConfiguration(StreamingConfiguration config, map configMap) { - configMap[STATUS_UPDATE_INTERVAL_MS] = config.statusUpdateIntervalMs.toString(); - configMap[XMIN_FETCH_INTERVAL_MS] = config.xminFetchIntervalMs.toString(); - - cdc:LsnFlushMode? lsnFlushMode = config.lsnFlushMode; + // Streaming configuration (fields inlined from StreamingConfiguration) + debeziumConfigs[STATUS_UPDATE_INTERVAL_MS] = database.statusUpdateIntervalMs.toString(); + debeziumConfigs[XMIN_FETCH_INTERVAL_MS] = database.xminFetchIntervalMs.toString(); + LsnFlushMode? lsnFlushMode = database.lsnFlushMode; if lsnFlushMode !is () { - configMap[LSN_FLUSH_MODE] = lsnFlushMode.toString(); + debeziumConfigs[LSN_FLUSH_MODE] = lsnFlushMode.toString(); } -} - -// Populates PostgreSQL data handling configuration -isolated function populateDataHandlingConfiguration(DataHandlingConfiguration config, map configMap) { - configMap[UNAVAILABLE_VALUE_PLACEHOLDER] = config.unavailableValuePlaceholder; -} -// Populates PostgreSQL-specific configurations -isolated function populatePostgresConfigurations(PostgresDatabaseConnection connection, map configMap) { - configMap[POSTGRESQL_DATABASE_NAME] = connection.databaseName; - populateSchemaConfigurations(connection, configMap); - - // Populate PostgreSQL replication configuration - populateReplicationConfiguration(connection.replicationConfig, configMap); - - // Populate PostgreSQL publication configuration - populatePublicationConfiguration(connection.publicationConfig, configMap); - - // Populate PostgreSQL streaming configuration - populateStreamingConfiguration(connection.streamingConfig, configMap); - - // Populate PostgreSQL data handling configuration - populateDataHandlingConfiguration(connection.dataHandlingConfig, configMap); + // Data handling configuration (fields inlined from DataHandlingConfiguration) + debeziumConfigs[UNAVAILABLE_VALUE_PLACEHOLDER] = database.unavailableValuePlaceholder; } // Populates schema inclusion/exclusion configurations -isolated function populateSchemaConfigurations(PostgresDatabaseConnection connection, map configMap) { +isolated function populateSchemaConfigurations(PostgresDatabaseConnection connection, map debeziumConfigs) { string|string[]? includedSchemas = connection.includedSchemas; if includedSchemas !is () { - configMap[SCHEMA_INCLUDE_LIST] = includedSchemas is string ? includedSchemas : string:'join(",", ...includedSchemas); + debeziumConfigs[SCHEMA_INCLUDE_LIST] = includedSchemas is string ? includedSchemas : string:'join(",", ...includedSchemas); } string|string[]? excludedSchemas = connection.excludedSchemas; if excludedSchemas !is () { - configMap[SCHEMA_EXCLUDE_LIST] = excludedSchemas is string ? excludedSchemas : string:'join(",", ...excludedSchemas); + debeziumConfigs[SCHEMA_EXCLUDE_LIST] = excludedSchemas is string ? excludedSchemas : string:'join(",", ...excludedSchemas); } } const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; // Populates PostgreSQL-specific options -isolated function populatePostgresOptions(PostgreSqlOptions options, map configMap) { - // Populate common options from cdc module - cdc:populateOptions(options, configMap, typeof options); - +isolated function populateOptions(PostgreSqlOptions options, map debeziumConfigs) { // Populate PostgreSQL-specific extended snapshot configuration ExtendedSnapshotConfiguration? extendedSnapshot = options.extendedSnapshot; if extendedSnapshot is ExtendedSnapshotConfiguration { - cdc:populateRelationalExtendedSnapshotConfiguration(extendedSnapshot, configMap); - populatePostgresExtendedSnapshotConfiguration(extendedSnapshot, configMap); + populateExtendedSnapshotConfiguration(extendedSnapshot, debeziumConfigs); } // PostgreSQL uses generic cdc:DataTypeConfiguration (no PostgreSQL-specific extensions) cdc:DataTypeConfiguration? dataTypeConfig = options.dataTypeConfig; if dataTypeConfig is cdc:DataTypeConfiguration { - cdc:populateDataTypeConfiguration(dataTypeConfig, configMap); + cdc:populateDataTypeConfiguration(dataTypeConfig, debeziumConfigs); } + + // Populate common options from cdc module + cdc:populateOptions(options, debeziumConfigs, typeof options); } // Populates PostgreSQL-specific extended snapshot properties -isolated function populatePostgresExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map configMap) { - configMap[SNAPSHOT_LOCK_TIMEOUT_MS] = getMillisecondValueOf(config.lockTimeout); +isolated function populateExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map debeziumConfigs) { + cdc:populateRelationalExtendedSnapshotConfiguration(config, debeziumConfigs); + debeziumConfigs[SNAPSHOT_LOCK_TIMEOUT_MS] = getMillisecondValueOf(config.lockTimeout); } isolated function getMillisecondValueOf(decimal value) returns string { diff --git a/build.gradle b/build.gradle index 7a74cf5a4..8fca778b6 100644 --- a/build.gradle +++ b/build.gradle @@ -99,13 +99,9 @@ subprojects { ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" ballerinaStdLibs "io.ballerina.stdlib:postgresql.driver-ballerina:${stdlibPostgresqlDriverVersion}" - ballerinaStdLibs "io.ballerina.lib:avro-ballerina:${stdlibAvroVersion}" ballerinaStdLibs "io.ballerina.lib:cdc-ballerina:${stdlibCdcVersion}" ballerinaStdLibs "io.ballerina.lib:postgresql.cdc.driver-ballerina:${stdlibPostgresCdcDriverVersion}" - ballerinaStdLibs "io.ballerina.stdlib:kafka-ballerina:${stdlibKafkaVersion}" - ballerinaStdLibs "io.ballerina.lib:confluent.cavroserdes-ballerina:${stdlibConfluentAvroSerDesVersion}" - ballerinaStdLibs "io.ballerina.lib:confluent.cregistry-ballerina:${stdlibConfluentSchemaRegistryVersion}" } } diff --git a/gradle.properties b/gradle.properties index 9716d84d9..5317916a3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,6 @@ stdlibUrlVersion=2.6.0 stdlibConstraintVersion=1.7.0 stdlibCryptoVersion=2.9.3 stdlibTaskVersion=2.7.0 -stdlibAvroVersion=1.2.0 # Level 03 stdlibCacheVersion=3.10.0 @@ -60,8 +59,5 @@ stdlibTransactionVersion=1.12.0 # Ballerina extended library stdlibPostgresqlDriverVersion=1.6.1 -stdlibCdcVersion=1.2.0 +stdlibCdcVersion=1.2.2-SNAPSHOT stdlibPostgresCdcDriverVersion=1.0.0 -stdlibConfluentAvroSerDesVersion=1.0.2 -stdlibConfluentSchemaRegistryVersion=0.4.3 -stdlibKafkaVersion=4.6.3 From 4eef619707683ea1a193a9b2ffa296d289f8f191 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Thu, 26 Feb 2026 16:38:22 +0530 Subject: [PATCH 07/18] Move db specific fields into postgres --- ballerina/Dependencies.toml | 2 +- ballerina/listener_types.bal | 10 +++++++++ ballerina/tests/listener_tests.bal | 36 ++++++++++++++++++++++++++++++ ballerina/tests/test_cleanup.bal | 11 +++++++++ ballerina/utils.bal | 28 ++++++++++++++++------- 5 files changed, 78 insertions(+), 9 deletions(-) create mode 100644 ballerina/tests/test_cleanup.bal diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 4a54d02ea..5b1dd5df4 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -219,7 +219,7 @@ dependencies = [ [[package]] org = "ballerinax" name = "cdc" -version = "1.2.2" +version = "1.3.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.jsondata"}, diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index eda3b7c08..94fced464 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -87,6 +87,11 @@ public type DataHandlingConfiguration record {| # + databaseName - The name of the PostgreSQL database from which to stream the changes. # + includedSchemas - A list of regular expressions matching fully-qualified schema identifiers to capture changes from # + excludedSchemas - A list of regular expressions matching fully-qualified schema identifiers to exclude from change capture +# + includedTables - Regex patterns for tables to capture (mutually exclusive with `excludedTables`) +# + excludedTables - Regex patterns for tables to exclude (mutually exclusive with `includedTables`) +# + includedColumns - Regex patterns for columns to capture (mutually exclusive with `excludedColumns`) +# + excludedColumns - Regex patterns for columns to exclude (mutually exclusive with `includedColumns`) +# + messageKeyColumns - Composite message key columns for change events # + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; @@ -96,6 +101,11 @@ public type PostgresDatabaseConnection record {| string databaseName; string|string[] includedSchemas?; string|string[] excludedSchemas?; + string|string[] includedTables?; + string|string[] excludedTables?; + string|string[] includedColumns?; + string|string[] excludedColumns?; + string messageKeyColumns?; int tasksMax = 1; *ReplicationConfiguration; *PublicationConfiguration; diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index 68e534941..2dbf138ca 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -384,6 +384,42 @@ function testPostgresRelationalCommonConfiguration() { msg = "Schema include list does not match."); } +@test:Config {groups: ["postgres-relational"]} +function testPostgresRelationalFilteringConfiguration() { + map expectedProperties = { + "schema.include.list": "public,custom", + "table.include.list": "public.users,public.orders", + "column.exclude.list": "public.*.password,public.*.ssn", + "message.key.columns": "public.users:id;public.orders:order_id" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + includedSchemas: ["public", "custom"], + includedTables: ["public.users", "public.orders"], + excludedColumns: ["public.*.password", "public.*.ssn"], + messageKeyColumns: "public.users:id;public.orders:order_id" + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["schema.include.list"], + expectedProperties["schema.include.list"], + msg = "Schema include list does not match."); + test:assertEquals(actualProperties["table.include.list"], + expectedProperties["table.include.list"], + msg = "Table include list does not match."); + test:assertEquals(actualProperties["column.exclude.list"], + expectedProperties["column.exclude.list"], + msg = "Column exclude list does not match."); + test:assertEquals(actualProperties["message.key.columns"], + expectedProperties["message.key.columns"], + msg = "Message key columns does not match."); +} + @test:Config {groups: ["postgres-snapshot"]} function testPostgresExtendedSnapshotConfiguration() { map expectedProperties = { diff --git a/ballerina/tests/test_cleanup.bal b/ballerina/tests/test_cleanup.bal new file mode 100644 index 000000000..28bbce329 --- /dev/null +++ b/ballerina/tests/test_cleanup.bal @@ -0,0 +1,11 @@ +import ballerina/file; +import ballerina/test; + +@test:AfterSuite +function cleanup() returns error? { + // delete the tmp directory created for tests + string tmpDirPath = "./tmp"; + if check file:test(tmpDirPath, file:EXISTS) { + check file:remove(tmpDirPath, file:RECURSIVE); + } +} diff --git a/ballerina/utils.bal b/ballerina/utils.bal index 04d668911..ca0022a61 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -32,8 +32,19 @@ const string LSN_FLUSH_MODE = "lsn.flush.mode"; const string SLOT_STREAM_PARAMS = "slot.stream.params"; const string UNAVAILABLE_VALUE_PLACEHOLDER = "unavailable.value.placeholder"; -// Relational-common configuration properties (applicable to MySQL, PostgreSQL, SQL Server) -const string MESSAGE_KEY_COLUMNS = "message.key.columns"; +// Populates PostgreSQL-specific relational filtering (table/column inclusion/exclusion and message key columns) +isolated function populateTableAndColumnFiltering(PostgresDatabaseConnection connection, map configMap) { + // Call CDC utility functions with direct parameters + cdc:populateTableAndColumnConfigurations( + connection.includedTables, + connection.excludedTables, + connection.includedColumns, + connection.excludedColumns, + configMap + ); + + cdc:populateMessageKeyColumnsConfiguration(connection.messageKeyColumns, configMap); +} isolated function populateDebeziumProperties(PostgresListenerConfiguration config, map debeziumConfigs) { cdc:populateDebeziumProperties({ @@ -48,6 +59,7 @@ isolated function populateDebeziumProperties(PostgresListenerConfiguration confi // Populates PostgreSQL-specific configurations isolated function populateDatabaseConfigurations(PostgresDatabaseConnection database, map debeziumConfigs) { + // Populate generic CDC connection fields cdc:populateDatabaseConfigurations({ connectorClass: database.connectorClass, hostname: database.hostname, @@ -56,14 +68,14 @@ isolated function populateDatabaseConfigurations(PostgresDatabaseConnection data password: database.password, connectTimeout: database.connectTimeout, tasksMax: database.tasksMax, - secure: database.secure, - includedTables: database.includedTables, - excludedTables: database.excludedTables, - includedColumns: database.includedColumns, - excludedColumns: database.excludedColumns + secure: database.secure }, debeziumConfigs); - + debeziumConfigs[POSTGRESQL_DATABASE_NAME] = database.databaseName; + + // Populate PostgreSQL-specific relational filtering + populateTableAndColumnFiltering(database, debeziumConfigs); + populateSchemaConfigurations(database, debeziumConfigs); // Replication configuration (fields inlined from ReplicationConfiguration) From dc04756f193527240e99ed20bd1ec3d5388e1bea Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Fri, 27 Feb 2026 10:15:51 +0530 Subject: [PATCH 08/18] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 6 +++--- ballerina/CompilerPlugin.toml | 2 +- ballerina/Dependencies.toml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 37e582e39..abd128883 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "postgresql" -version = "1.16.4" +version = "1.17.0" authors = ["Ballerina"] keywords = ["client", "network", "SQL", "RDBMS", "Vendor/PostgreSQL", "Area/Database", "Type/Connector", "Type/Trigger"] repository = "https://github.com/ballerina-platform/module-ballerinax-postgresql" @@ -15,8 +15,8 @@ graalvmCompatible = true [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "postgresql-native" -version = "1.16.4" -path = "../native/build/libs/postgresql-native-1.16.4-SNAPSHOT.jar" +version = "1.17.0" +path = "../native/build/libs/postgresql-native-1.17.0-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index fbc3943cf..1a947a0b4 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "postgresql-compiler-plugin" class = "io.ballerina.stdlib.postgresql.compiler.PostgreSQLCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.16.4-SNAPSHOT.jar" +path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.17.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 5b1dd5df4..081f1755f 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -235,7 +235,7 @@ modules = [ [[package]] org = "ballerinax" name = "postgresql" -version = "1.16.4" +version = "1.17.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "file"}, From e8b5334cf90b1216d6a1172fd95def0c4022d551 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Sun, 1 Mar 2026 06:43:05 +0530 Subject: [PATCH 09/18] Bump package version to minor 17 --- gradle.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gradle.properties b/gradle.properties index 5317916a3..ea6c666db 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=io.ballerina.stdlib -version=1.16.4-SNAPSHOT +version=1.17.0-SNAPSHOT ballerinaLangVersion=2201.12.0 checkstylePluginVersion=10.12.1 @@ -59,5 +59,5 @@ stdlibTransactionVersion=1.12.0 # Ballerina extended library stdlibPostgresqlDriverVersion=1.6.1 -stdlibCdcVersion=1.2.2-SNAPSHOT +stdlibCdcVersion=1.3.0-SNAPSHOT stdlibPostgresCdcDriverVersion=1.0.0 From bf89284bac79a27daf71160ffad3f4e8d64d0aeb Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Tue, 3 Mar 2026 22:49:36 +0530 Subject: [PATCH 10/18] Fix errors in types --- ballerina/README.md | 33 ++++++++++++++++++++++++++- ballerina/listener_types.bal | 24 ++++++++------------ ballerina/tests/listener_tests.bal | 36 +++++++++--------------------- ballerina/utils.bal | 31 ++++++++++++++++--------- changelog.md | 8 +++++++ 5 files changed, 79 insertions(+), 53 deletions(-) diff --git a/ballerina/README.md b/ballerina/README.md index e70159d04..1efda1610 100644 --- a/ballerina/README.md +++ b/ballerina/README.md @@ -494,10 +494,41 @@ You can create a CDC listener by specifying the required configurations such as ```ballerina listener postgresql:CdcListener cdcListener = new (database = { username: , - password: + password: , + databaseName: "inventory" }); ``` +#### Configure the database connection + +The `database` parameter accepts a `PostgresDatabaseConnection` record with PostgreSQL-specific fields for logical replication, publication, and filtering: + +```ballerina +listener postgresql:CdcListener cdcListener = new (database = { + username: "cdc_user", + password: "password", + databaseName: "inventory", + includedSchemas: ["public"], + includedTables: ["public.products", "public.orders"], + pluginName: postgresql:PGOUTPUT, + slotName: "my_slot", + publicationName: "my_publication", + publicationAutocreateMode: postgresql:FILTERED +}, options = { + heartbeatConfig: { + interval: 10 + }, + guardrailConfig: { + maxCollections: 100, + limitAction: cdc:WARN + } +}); +``` + +#### Advanced options + +The `options` parameter (type `PostgreSqlOptions`) exposes all fields from `cdc:Options`, including `heartbeatConfig`, `signalConfig`, `transactionMetadataConfig`, `columnTransformConfig`, `topicConfig`, `connectionRetryConfig`, `performanceConfig`. PostgreSQL-specific extensions include `extendedSnapshot` (for lock timeout) and `dataTypeConfig` (for binary and time precision handling). Refer to [`cdc:Options`](https://docs.central.ballerina.io/ballerinax/cdc/latest#Options) for the full list of available options. Because `PostgreSqlOptions` is an open record, raw Debezium properties can also be passed directly as additional fields. + #### Implement a service to handle CDC events You can attach a service to the listener to handle CDC events. The service can define remote methods for different event types such as `onRead`, `onCreate`, `onUpdate`, and `onDelete`. diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index 94fced464..95f9f2370 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -63,22 +63,15 @@ public type PublicationConfiguration record {| # PostgreSQL streaming and status configuration. # -# + statusUpdateIntervalMs - Interval for sending status updates to PostgreSQL -# + xminFetchIntervalMs - Interval for fetching current xmin position +# + statusUpdateInterval - Interval for sending status updates to PostgreSQL in seconds +# + xminFetchInterval - Interval for fetching current xmin position in seconds # + lsnFlushMode - LSN flushing strategy public type StreamingConfiguration record {| - int statusUpdateIntervalMs = 10000; - int xminFetchIntervalMs = 0; + decimal statusUpdateInterval = 10; + decimal xminFetchInterval = 0; LsnFlushMode lsnFlushMode?; |}; -# PostgreSQL data handling configuration. -# -# + unavailableValuePlaceholder - Placeholder for unavailable TOAST values -public type DataHandlingConfiguration record {| - string unavailableValuePlaceholder = "__debezium_unavailable_value"; -|}; - # Represents the configuration for the Postgres CDC database connection. # # + connectorClass - The class name of the PostgreSQL connector implementation to use @@ -105,12 +98,11 @@ public type PostgresDatabaseConnection record {| string|string[] excludedTables?; string|string[] includedColumns?; string|string[] excludedColumns?; - string messageKeyColumns?; + cdc:MessageKeyColumns[] messageKeyColumns?; int tasksMax = 1; *ReplicationConfiguration; *PublicationConfiguration; - *StreamingConfiguration; - *DataHandlingConfiguration; + StreamingConfiguration streamingConfig?; |}; # PostgreSQL CDC listener configuration including database connection, storage, and CDC options. @@ -134,9 +126,11 @@ public type PostgreSqlOptions record {| |}; # Represents the extended snapshot configuration for the PostgreSQL CDC listener. -# +# # + lockTimeout - Lock acquisition timeout in seconds +# + isolationMode - Transaction isolation level during snapshot public type ExtendedSnapshotConfiguration record {| *cdc:RelationalExtendedSnapshotConfiguration; decimal lockTimeout = 10; + cdc:SnapshotIsolationMode isolationMode?; |}; diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index 2dbf138ca..b715a3858 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -326,9 +326,11 @@ function testPostgresStreamingConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - statusUpdateIntervalMs: 5000, - xminFetchIntervalMs: 1000, - lsnFlushMode: CONNECTOR + streamingConfig: { + statusUpdateInterval: 5.0, + xminFetchInterval: 1, + lsnFlushMode: CONNECTOR + } }; map actualProperties = {}; @@ -342,27 +344,6 @@ function testPostgresStreamingConfiguration() { msg = "LSN flush mode does not match."); } -@test:Config {groups: ["postgres-datahandling"]} -function testPostgresDataHandlingConfiguration() { - map expectedProperties = { - "unavailable.value.placeholder": "__custom_unavailable__" - }; - - PostgresDatabaseConnection connection = { - username: "testuser", - password: "testpass", - databaseName: "testdb", - unavailableValuePlaceholder: "__custom_unavailable__" - }; - - map actualProperties = {}; - populateDatabaseConfigurations(connection, actualProperties); - - test:assertEquals(actualProperties["unavailable.value.placeholder"], - expectedProperties["unavailable.value.placeholder"], - msg = "Unavailable value placeholder does not match."); -} - @test:Config {groups: ["postgres-relational"]} function testPostgresRelationalCommonConfiguration() { map expectedProperties = { @@ -400,7 +381,10 @@ function testPostgresRelationalFilteringConfiguration() { includedSchemas: ["public", "custom"], includedTables: ["public.users", "public.orders"], excludedColumns: ["public.*.password", "public.*.ssn"], - messageKeyColumns: "public.users:id;public.orders:order_id" + messageKeyColumns: [ + {tableName: "public.users", columns: ["id"]}, + {tableName: "public.orders", columns: ["order_id"]} + ] }; map actualProperties = {}; @@ -448,7 +432,7 @@ function testPostgresOptionsWithHeartbeat() { }; PostgreSqlOptions options = { - heartbeat: { + heartbeatConfig: { interval: 15, actionQuery: "SELECT NOW()" } diff --git a/ballerina/utils.bal b/ballerina/utils.bal index ca0022a61..2526c4331 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -30,7 +30,10 @@ const string STATUS_UPDATE_INTERVAL_MS = "status.update.interval.ms"; const string XMIN_FETCH_INTERVAL_MS = "xmin.fetch.interval.ms"; const string LSN_FLUSH_MODE = "lsn.flush.mode"; const string SLOT_STREAM_PARAMS = "slot.stream.params"; -const string UNAVAILABLE_VALUE_PLACEHOLDER = "unavailable.value.placeholder"; + +// Extended snapshot configuration properties +const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; +const string SNAPSHOT_ISOLATION_MODE = "snapshot.isolation.mode"; // Populates PostgreSQL-specific relational filtering (table/column inclusion/exclusion and message key columns) isolated function populateTableAndColumnFiltering(PostgresDatabaseConnection connection, map configMap) { @@ -91,16 +94,11 @@ isolated function populateDatabaseConfigurations(PostgresDatabaseConnection data debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = database.publicationName; debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = database.publicationAutocreateMode.toString(); - // Streaming configuration (fields inlined from StreamingConfiguration) - debeziumConfigs[STATUS_UPDATE_INTERVAL_MS] = database.statusUpdateIntervalMs.toString(); - debeziumConfigs[XMIN_FETCH_INTERVAL_MS] = database.xminFetchIntervalMs.toString(); - LsnFlushMode? lsnFlushMode = database.lsnFlushMode; - if lsnFlushMode !is () { - debeziumConfigs[LSN_FLUSH_MODE] = lsnFlushMode.toString(); + // Streaming configuration + StreamingConfiguration? streamingConfig = database.streamingConfig; + if streamingConfig is StreamingConfiguration { + populateStreamingConfiguration(streamingConfig, debeziumConfigs); } - - // Data handling configuration (fields inlined from DataHandlingConfiguration) - debeziumConfigs[UNAVAILABLE_VALUE_PLACEHOLDER] = database.unavailableValuePlaceholder; } // Populates schema inclusion/exclusion configurations @@ -116,7 +114,14 @@ isolated function populateSchemaConfigurations(PostgresDatabaseConnection connec } } -const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; +isolated function populateStreamingConfiguration(StreamingConfiguration config, map debeziumConfigs) { + debeziumConfigs[STATUS_UPDATE_INTERVAL_MS] = getMillisecondValueOf(config.statusUpdateInterval); + debeziumConfigs[XMIN_FETCH_INTERVAL_MS] = getMillisecondValueOf(config.xminFetchInterval); + LsnFlushMode? lsnFlushMode = config.lsnFlushMode; + if lsnFlushMode !is () { + debeziumConfigs[LSN_FLUSH_MODE] = lsnFlushMode.toString(); + } +} // Populates PostgreSQL-specific options isolated function populateOptions(PostgreSqlOptions options, map debeziumConfigs) { @@ -140,6 +145,10 @@ isolated function populateOptions(PostgreSqlOptions options, map debeziu isolated function populateExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map debeziumConfigs) { cdc:populateRelationalExtendedSnapshotConfiguration(config, debeziumConfigs); debeziumConfigs[SNAPSHOT_LOCK_TIMEOUT_MS] = getMillisecondValueOf(config.lockTimeout); + cdc:SnapshotIsolationMode? isolationMode = config.isolationMode; + if isolationMode is cdc:SnapshotIsolationMode { + debeziumConfigs[SNAPSHOT_ISOLATION_MODE] = isolationMode; + } } isolated function getMillisecondValueOf(decimal value) returns string { diff --git a/changelog.md b/changelog.md index 60cb52f13..3e25cf625 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- New CDC options: `heartbeatConfig`, `signalConfig`, `transactionMetadataConfig`, `columnTransformConfig`, `topicConfig`, `connectionRetryConfig`, `performanceConfig` (via `cdc` module `1.3.0`) +- Support for additional offset and schema history storage backends (Memory, JDBC, Redis, S3, Azure Blob, RocketMQ) + +### Changed +- Updated `cdc` module dependency to `1.3.0` +- `PostgresDatabaseConnection` now owns table/column filtering fields (`includedTables`, `excludedTables`, `includedColumns`, `excludedColumns`) previously on `cdc:DatabaseConnection` + ## [1.16.1] - 2024-06-13 ### Changed From 556ee096f22d977420af152091de19815e3aec50 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Mon, 9 Mar 2026 13:49:01 +0530 Subject: [PATCH 11/18] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index abd128883..302dc7fb1 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -24,3 +24,5 @@ artifactId = "sql-native" version = "1.16.0" path = "./lib/sql-native-1.16.0.jar" +[[dependency]] +org = "ballerinax" From c566bca06c7362af59f5b76f7716a8279475d931 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Mon, 9 Mar 2026 13:54:50 +0530 Subject: [PATCH 12/18] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 302dc7fb1..3ef688f82 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -26,3 +26,6 @@ path = "./lib/sql-native-1.16.0.jar" [[dependency]] org = "ballerinax" +name = "cdc" +version = "1.3.0" +repository = "local" From d6c63bd372581001e8c08c7147904780af1c4864 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Mon, 9 Mar 2026 14:08:09 +0530 Subject: [PATCH 13/18] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 3ef688f82..abd128883 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -24,8 +24,3 @@ artifactId = "sql-native" version = "1.16.0" path = "./lib/sql-native-1.16.0.jar" -[[dependency]] -org = "ballerinax" -name = "cdc" -version = "1.3.0" -repository = "local" From 08c11253ffc01c947a136b6832311b14d1a3e862 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Mon, 9 Mar 2026 14:41:59 +0530 Subject: [PATCH 14/18] Fix review suggestions --- ballerina/cdc_listener.bal | 2 +- ballerina/listener_types.bal | 20 +++++++++- ballerina/tests/listener_tests.bal | 17 ++++++--- ballerina/utils.bal | 61 ++++++++++++++++++------------ 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/ballerina/cdc_listener.bal b/ballerina/cdc_listener.bal index d76745763..0c0fc15b5 100644 --- a/ballerina/cdc_listener.bal +++ b/ballerina/cdc_listener.bal @@ -49,7 +49,7 @@ public isolated class CdcListener { # # + return - An error if the listener cannot be started, or `()` if successful public isolated function 'start() returns cdc:Error? { - check cdc:externStart(self, self.debeziumConfigs, self.listenerConfigs); + check cdc:externStartWithSeparateConfigs(self, self.debeziumConfigs, self.listenerConfigs); } # Detaches a CDC service from the Postgresql listener. diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index 95f9f2370..5a397147b 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -86,6 +86,11 @@ public type StreamingConfiguration record {| # + excludedColumns - Regex patterns for columns to exclude (mutually exclusive with `includedColumns`) # + messageKeyColumns - Composite message key columns for change events # + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable +# + pluginName - Deprecated: Use `replicationConfig.pluginName` instead +# + slotName - Deprecated: Use `replicationConfig.slotName` instead +# + publicationName - Deprecated: Use `publicationConfig.publicationName` instead +# + replicationConfig - Replication configuration (logical decoding plugin, slot name and parameters). Takes priority over deprecated top-level fields +# + publicationConfig - Publication configuration (publication name and autocreate mode). Takes priority over deprecated top-level fields public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; @@ -100,8 +105,17 @@ public type PostgresDatabaseConnection record {| string|string[] excludedColumns?; cdc:MessageKeyColumns[] messageKeyColumns?; int tasksMax = 1; - *ReplicationConfiguration; - *PublicationConfiguration; + # Deprecated: Use `replicationConfig.pluginName` instead. + @deprecated + PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; + # Deprecated: Use `replicationConfig.slotName` instead. + @deprecated + string slotName = "debezium"; + # Deprecated: Use `publicationConfig.publicationName` instead. + @deprecated + string publicationName = "dbz_publication"; + ReplicationConfiguration replicationConfig?; + PublicationConfiguration publicationConfig?; StreamingConfiguration streamingConfig?; |}; @@ -119,10 +133,12 @@ public type PostgresListenerConfiguration record {| # # + extendedSnapshot - Extended snapshot configuration with PostgreSQL-specific lock timeout and query settings # + dataTypeConfig - Data type handling configuration including schema change tracking +# + heartbeatConfig - Heartbeat configuration for keeping the PostgreSQL replication slot active public type PostgreSqlOptions record {| *cdc:Options; ExtendedSnapshotConfiguration extendedSnapshot?; cdc:DataTypeConfiguration dataTypeConfig?; + cdc:RelationalHeartbeatConfiguration heartbeatConfig?; |}; # Represents the extended snapshot configuration for the PostgreSQL CDC listener. diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index b715a3858..72004b35d 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -271,10 +271,12 @@ function testPostgresReplicationConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - pluginName: DECODERBUFS, - slotName: "custom_slot", - slotDropOnStop: true, - slotStreamParams: "include-unchanged-toast=true" + replicationConfig: { + pluginName: DECODERBUFS, + slotName: "custom_slot", + slotDropOnStop: true, + slotStreamParams: "include-unchanged-toast=true" + } }; map actualProperties = {}; @@ -299,8 +301,10 @@ function testPostgresPublicationConfiguration() { username: "testuser", password: "testpass", databaseName: "testdb", - publicationName: "my_publication", - publicationAutocreateMode: FILTERED + publicationConfig: { + publicationName: "my_publication", + publicationAutocreateMode: FILTERED + } }; map actualProperties = {}; @@ -439,6 +443,7 @@ function testPostgresOptionsWithHeartbeat() { }; map actualProperties = {}; + cdc:populateOptions(options, actualProperties); populateOptions(options, actualProperties); test:assertEquals(actualProperties["heartbeat.interval.ms"], diff --git a/ballerina/utils.bal b/ballerina/utils.bal index 2526c4331..a8f8e4951 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -53,7 +53,9 @@ isolated function populateDebeziumProperties(PostgresListenerConfiguration confi cdc:populateDebeziumProperties({ engineName: config.engineName, offsetStorage: config.offsetStorage, - internalSchemaStorage: config.internalSchemaStorage + internalSchemaStorage: config.internalSchemaStorage, + database: config.database, + options: config.options }, debeziumConfigs); populateDatabaseConfigurations(config.database, debeziumConfigs); populateOptions(config.options, debeziumConfigs); @@ -62,18 +64,6 @@ isolated function populateDebeziumProperties(PostgresListenerConfiguration confi // Populates PostgreSQL-specific configurations isolated function populateDatabaseConfigurations(PostgresDatabaseConnection database, map debeziumConfigs) { - // Populate generic CDC connection fields - cdc:populateDatabaseConfigurations({ - connectorClass: database.connectorClass, - hostname: database.hostname, - port: database.port, - username: database.username, - password: database.password, - connectTimeout: database.connectTimeout, - tasksMax: database.tasksMax, - secure: database.secure - }, debeziumConfigs); - debeziumConfigs[POSTGRESQL_DATABASE_NAME] = database.databaseName; // Populate PostgreSQL-specific relational filtering @@ -81,18 +71,33 @@ isolated function populateDatabaseConfigurations(PostgresDatabaseConnection data populateSchemaConfigurations(database, debeziumConfigs); - // Replication configuration (fields inlined from ReplicationConfiguration) - debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = database.pluginName; - debeziumConfigs[POSTGRESQL_SLOT_NAME] = database.slotName; - debeziumConfigs[SLOT_DROP_ON_STOP] = database.slotDropOnStop.toString(); - string? slotStreamParams = database.slotStreamParams; - if slotStreamParams !is () { - debeziumConfigs[SLOT_STREAM_PARAMS] = slotStreamParams; + // Replication configuration: nested record takes priority over deprecated top-level fields + ReplicationConfiguration? replicationConfig = database.replicationConfig; + if replicationConfig is ReplicationConfiguration { + debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = replicationConfig.pluginName; + debeziumConfigs[POSTGRESQL_SLOT_NAME] = replicationConfig.slotName; + debeziumConfigs[SLOT_DROP_ON_STOP] = replicationConfig.slotDropOnStop.toString(); + string? slotStreamParams = replicationConfig.slotStreamParams; + if slotStreamParams !is () { + debeziumConfigs[SLOT_STREAM_PARAMS] = slotStreamParams; + } + } else { + // Fall back to deprecated top-level fields + debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = database.pluginName; + debeziumConfigs[POSTGRESQL_SLOT_NAME] = database.slotName; + debeziumConfigs[SLOT_DROP_ON_STOP] = false.toString(); } - // Publication configuration (fields inlined from PublicationConfiguration) - debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = database.publicationName; - debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = database.publicationAutocreateMode.toString(); + // Publication configuration: nested record takes priority over deprecated top-level field + PublicationConfiguration? publicationConfig = database.publicationConfig; + if publicationConfig is PublicationConfiguration { + debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = publicationConfig.publicationName; + debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = publicationConfig.publicationAutocreateMode.toString(); + } else { + // Fall back to deprecated top-level field + debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = database.publicationName; + debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = ALL_TABLES; + } // Streaming configuration StreamingConfiguration? streamingConfig = database.streamingConfig; @@ -137,8 +142,14 @@ isolated function populateOptions(PostgreSqlOptions options, map debeziu cdc:populateDataTypeConfiguration(dataTypeConfig, debeziumConfigs); } - // Populate common options from cdc module - cdc:populateOptions(options, debeziumConfigs, typeof options); + // Populate relational heartbeat configuration + cdc:RelationalHeartbeatConfiguration? heartbeatConfig = options.heartbeatConfig; + if heartbeatConfig is cdc:RelationalHeartbeatConfiguration { + cdc:populateRelationalHeartbeatConfiguration(heartbeatConfig, debeziumConfigs); + } + + // Populate additional DB-specific options not present in base Options + cdc:populateAdditionalConfigurations(options, debeziumConfigs, typeof options); } // Populates PostgreSQL-specific extended snapshot properties From 22c75d310b02801645fe16fa8d833b0d61d37289 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Fri, 13 Mar 2026 14:51:32 +0530 Subject: [PATCH 15/18] Rename externStartWithSeparateConfigs to externStartWithExtendedConfigs --- ballerina/cdc_listener.bal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballerina/cdc_listener.bal b/ballerina/cdc_listener.bal index 0c0fc15b5..e1d1fef03 100644 --- a/ballerina/cdc_listener.bal +++ b/ballerina/cdc_listener.bal @@ -49,7 +49,7 @@ public isolated class CdcListener { # # + return - An error if the listener cannot be started, or `()` if successful public isolated function 'start() returns cdc:Error? { - check cdc:externStartWithSeparateConfigs(self, self.debeziumConfigs, self.listenerConfigs); + check cdc:externStartWithExtendedConfigs(self, self.debeziumConfigs, self.listenerConfigs); } # Detaches a CDC service from the Postgresql listener. From feb09e830357c4507da8c937df138ee1781cb692 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Tue, 17 Mar 2026 21:40:34 +0530 Subject: [PATCH 16/18] Bump cdc version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index ea6c666db..d4f94b06a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -59,5 +59,5 @@ stdlibTransactionVersion=1.12.0 # Ballerina extended library stdlibPostgresqlDriverVersion=1.6.1 -stdlibCdcVersion=1.3.0-SNAPSHOT +stdlibCdcVersion=1.3.0 stdlibPostgresCdcDriverVersion=1.0.0 From ee32c0d07de889e16f4d68e25706aa6747867f19 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Thu, 19 Mar 2026 09:39:13 +0530 Subject: [PATCH 17/18] Fix changelog --- changelog.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/changelog.md b/changelog.md index 3e25cf625..86d8f5115 100644 --- a/changelog.md +++ b/changelog.md @@ -7,12 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added -- New CDC options: `heartbeatConfig`, `signalConfig`, `transactionMetadataConfig`, `columnTransformConfig`, `topicConfig`, `connectionRetryConfig`, `performanceConfig` (via `cdc` module `1.3.0`) -- Support for additional offset and schema history storage backends (Memory, JDBC, Redis, S3, Azure Blob, RocketMQ) - -### Changed -- Updated `cdc` module dependency to `1.3.0` -- `PostgresDatabaseConnection` now owns table/column filtering fields (`includedTables`, `excludedTables`, `includedColumns`, `excludedColumns`) previously on `cdc:DatabaseConnection` +- [Introduce additional Debezium properties](https://github.com/ballerina-platform/ballerina-library/issues/8572) ## [1.16.1] - 2024-06-13 From 944a758c810d015dd65bb9dda0115ef5dfef6498 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Thu, 19 Mar 2026 10:06:35 +0530 Subject: [PATCH 18/18] Add cdc timestamp --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index d4f94b06a..606ec843a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -59,5 +59,5 @@ stdlibTransactionVersion=1.12.0 # Ballerina extended library stdlibPostgresqlDriverVersion=1.6.1 -stdlibCdcVersion=1.3.0 +stdlibCdcVersion=1.3.0-20260319-091700-6e7e4b6 stdlibPostgresCdcDriverVersion=1.0.0