diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index db8dca731..abd128883 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "postgresql" -version = "1.16.4" +version = "1.17.0" authors = ["Ballerina"] keywords = ["client", "network", "SQL", "RDBMS", "Vendor/PostgreSQL", "Area/Database", "Type/Connector", "Type/Trigger"] repository = "https://github.com/ballerina-platform/module-ballerinax-postgresql" @@ -15,8 +15,8 @@ graalvmCompatible = true [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "postgresql-native" -version = "1.16.4" -path = "../native/build/libs/postgresql-native-1.16.4.jar" +version = "1.17.0" +path = "../native/build/libs/postgresql-native-1.17.0-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 4270589b4..1a947a0b4 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "postgresql-compiler-plugin" class = "io.ballerina.stdlib.postgresql.compiler.PostgreSQLCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.16.4.jar" +path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.17.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 3fd4b65a9..081f1755f 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,14 +7,6 @@ dependencies-toml-version = "2" distribution-version = "2201.12.0" -[[package]] -org = "ballerina" -name = "avro" -version = "1.2.0" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - [[package]] org = "ballerina" name = "crypto" @@ -75,6 +67,7 @@ modules = [ org = "ballerina" name = "lang.__internal" version = "0.0.0" +scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.object"} @@ -99,16 +92,6 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] -[[package]] -org = "ballerina" -name = "lang.int" -version = "0.0.0" -dependencies = [ - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.__internal"}, - {org = "ballerina", name = "lang.object"} -] - [[package]] org = "ballerina" name = "lang.object" @@ -224,17 +207,6 @@ modules = [ {org = "ballerina", packageName = "time", moduleName = "time"} ] -[[package]] -org = "ballerina" -name = "uuid" -version = "1.10.0" -dependencies = [ - {org = "ballerina", name = "crypto"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "lang.int"}, - {org = "ballerina", name = "time"} -] - [[package]] org = "ballerinai" name = "observe" @@ -247,57 +219,23 @@ dependencies = [ [[package]] org = "ballerinax" name = "cdc" -version = "1.2.0" +version = "1.3.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "log"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "kafka"} + {org = "ballerinai", name = "observe"} ] modules = [ {org = "ballerinax", packageName = "cdc", moduleName = "cdc"} ] -[[package]] -org = "ballerinax" -name = "confluent.cavroserdes" -version = "1.0.2" -dependencies = [ - {org = "ballerina", name = "avro"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "confluent.cregistry"} -] - -[[package]] -org = "ballerinax" -name = "confluent.cregistry" -version = "0.4.3" -dependencies = [ - {org = "ballerina", name = "jballerina.java"} -] - -[[package]] -org = "ballerinax" -name = "kafka" -version = "4.6.3" -dependencies = [ - {org = "ballerina", name = "crypto"}, - {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "time"}, - {org = "ballerina", name = "uuid"}, - {org = "ballerinai", name = "observe"}, - {org = "ballerinax", name = "confluent.cavroserdes"}, - {org = "ballerinax", name = "confluent.cregistry"} -] - [[package]] org = "ballerinax" name = "postgresql" -version = "1.16.4" +version = "1.17.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "file"}, @@ -331,7 +269,7 @@ modules = [ [[package]] org = "ballerinax" name = "postgresql.driver" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" modules = [ {org = "ballerinax", packageName = "postgresql.driver", moduleName = "postgresql.driver"} diff --git a/ballerina/README.md b/ballerina/README.md index e70159d04..1efda1610 100644 --- a/ballerina/README.md +++ b/ballerina/README.md @@ -494,10 +494,41 @@ You can create a CDC listener by specifying the required configurations such as ```ballerina listener postgresql:CdcListener cdcListener = new (database = { username: , - password: + password: , + databaseName: "inventory" }); ``` +#### Configure the database connection + +The `database` parameter accepts a `PostgresDatabaseConnection` record with PostgreSQL-specific fields for logical replication, publication, and filtering: + +```ballerina +listener postgresql:CdcListener cdcListener = new (database = { + username: "cdc_user", + password: "password", + databaseName: "inventory", + includedSchemas: ["public"], + includedTables: ["public.products", "public.orders"], + pluginName: postgresql:PGOUTPUT, + slotName: "my_slot", + publicationName: "my_publication", + publicationAutocreateMode: postgresql:FILTERED +}, options = { + heartbeatConfig: { + interval: 10 + }, + guardrailConfig: { + maxCollections: 100, + limitAction: cdc:WARN + } +}); +``` + +#### Advanced options + +The `options` parameter (type `PostgreSqlOptions`) exposes all fields from `cdc:Options`, including `heartbeatConfig`, `signalConfig`, `transactionMetadataConfig`, `columnTransformConfig`, `topicConfig`, `connectionRetryConfig`, `performanceConfig`. PostgreSQL-specific extensions include `extendedSnapshot` (for lock timeout) and `dataTypeConfig` (for binary and time precision handling). Refer to [`cdc:Options`](https://docs.central.ballerina.io/ballerinax/cdc/latest#Options) for the full list of available options. Because `PostgreSqlOptions` is an open record, raw Debezium properties can also be passed directly as additional fields. + #### Implement a service to handle CDC events You can attach a service to the listener to handle CDC events. The service can define remote methods for different event types such as `onRead`, `onCreate`, `onUpdate`, and `onDelete`. diff --git a/ballerina/cdc_listener.bal b/ballerina/cdc_listener.bal index 273c4f085..6bb782048 100644 --- a/ballerina/cdc_listener.bal +++ b/ballerina/cdc_listener.bal @@ -19,7 +19,8 @@ import ballerinax/cdc; public isolated class CdcListener { *cdc:Listener; - private final map & readonly config; + private final map & readonly debeziumConfigs; + private final map & readonly listenerConfigs; private boolean isStarted = false; private boolean hasAttachedService = false; @@ -28,33 +29,11 @@ public isolated class CdcListener { # + config - The configuration for the Postgresql connector public isolated function init(*PostgresListenerConfiguration config) { map debeziumConfigs = {}; - cdc:populateDebeziumProperties({ - engineName: config.engineName, - offsetStorage: config.offsetStorage, - internalSchemaStorage: config.internalSchemaStorage, - options: config.options - }, debeziumConfigs - ); - cdc:populateDatabaseConfigurations({ - connectorClass: config.database.connectorClass, - hostname: config.database.hostname, - port: config.database.port, - username: config.database.username, - password: config.database.password, - connectTimeout: config.database.connectTimeout, - tasksMax: config.database.tasksMax, - secure: config.database.secure, - includedTables: config.database.includedTables, - excludedTables: config.database.excludedTables, - includedColumns: config.database.includedColumns, - excludedColumns: config.database.excludedColumns - }, debeziumConfigs); - populatePostgresConfigurations(config.database, debeziumConfigs); - map listenerConfigs = { - ...debeziumConfigs - }; - listenerConfigs["livenessInterval"] = config.livenessInterval; - self.config = listenerConfigs.cloneReadOnly(); + map listenerConfigs = {}; + populateDebeziumProperties(config, debeziumConfigs); + cdc:populateListenerProperties(config, listenerConfigs); + self.debeziumConfigs = debeziumConfigs.cloneReadOnly(); + self.listenerConfigs = listenerConfigs.cloneReadOnly(); } # Attaches a CDC service to the Postgresql listener. @@ -70,7 +49,7 @@ public isolated class CdcListener { # # + return - An `cdc:Error` if the listener cannot be started, or `()` if successful public isolated function 'start() returns cdc:Error? { - check cdc:externStart(self, self.config); + check cdc:externStartWithExtendedConfigs(self, self.debeziumConfigs, self.listenerConfigs); } # Detaches a CDC service from the Postgresql listener. diff --git a/ballerina/listener_types.bal b/ballerina/listener_types.bal index c54480bd9..5a397147b 100644 --- a/ballerina/listener_types.bal +++ b/ballerina/listener_types.bal @@ -13,6 +13,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + import ballerinax/cdc; # Represents the PostgreSQL logical decoding plugins. @@ -24,7 +25,54 @@ public enum PostgreSQLLogicalDecodingPlugin { DECODERBUFS = "decoderbufs" } -# The configuration for the PostgreSQL CDC database connection. +# Represents publication autocreate modes. +public enum PublicationAutocreateMode { + ALL_TABLES = "all_tables", + DISABLED = "disabled", + FILTERED = "filtered" +} + +# Represents LSN flush modes. +public enum LsnFlushMode { + MANUAL = "manual", + CONNECTOR = "connector", + CONNECTOR_AND_DRIVER = "connector_and_driver" +} + +# PostgreSQL replication configuration (logical decoding). +# +# + pluginName - Logical decoding plugin to use (pgoutput, decoderbufs) +# + slotName - Name of the PostgreSQL logical replication slot +# + slotDropOnStop - Drop replication slot when connector stops +# + slotStreamParams - Custom replication slot parameters +public type ReplicationConfiguration record {| + PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; + string slotName = "debezium"; + boolean slotDropOnStop = false; + string slotStreamParams?; +|}; + +# PostgreSQL publication configuration (pgoutput plugin). +# +# + publicationName - Name of PostgreSQL publication +# + publicationAutocreateMode - Mode for auto-creating publications +public type PublicationConfiguration record {| + string publicationName = "dbz_publication"; + PublicationAutocreateMode publicationAutocreateMode = ALL_TABLES; +|}; + +# PostgreSQL streaming and status configuration. +# +# + statusUpdateInterval - Interval for sending status updates to PostgreSQL in seconds +# + xminFetchInterval - Interval for fetching current xmin position in seconds +# + lsnFlushMode - LSN flushing strategy +public type StreamingConfiguration record {| + decimal statusUpdateInterval = 10; + decimal xminFetchInterval = 0; + LsnFlushMode lsnFlushMode?; +|}; + +# Represents the configuration for the Postgres CDC database connection. # # + connectorClass - The class name of the PostgreSQL connector implementation to use # + hostname - The hostname of the PostgreSQL server @@ -32,10 +80,17 @@ public enum PostgreSQLLogicalDecodingPlugin { # + databaseName - The name of the PostgreSQL database from which to stream the changes. # + includedSchemas - A list of regular expressions matching fully-qualified schema identifiers to capture changes from # + excludedSchemas - A list of regular expressions matching fully-qualified schema identifiers to exclude from change capture +# + includedTables - Regex patterns for tables to capture (mutually exclusive with `excludedTables`) +# + excludedTables - Regex patterns for tables to exclude (mutually exclusive with `includedTables`) +# + includedColumns - Regex patterns for columns to capture (mutually exclusive with `excludedColumns`) +# + excludedColumns - Regex patterns for columns to exclude (mutually exclusive with `includedColumns`) +# + messageKeyColumns - Composite message key columns for change events # + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable -# + pluginName - The name of the PostgreSQL logical decoding plug-in installed on the server -# + slotName - The name of the PostgreSQL logical decoding slot -# + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput. +# + pluginName - Deprecated: Use `replicationConfig.pluginName` instead +# + slotName - Deprecated: Use `replicationConfig.slotName` instead +# + publicationName - Deprecated: Use `publicationConfig.publicationName` instead +# + replicationConfig - Replication configuration (logical decoding plugin, slot name and parameters). Takes priority over deprecated top-level fields +# + publicationConfig - Publication configuration (publication name and autocreate mode). Takes priority over deprecated top-level fields public type PostgresDatabaseConnection record {| *cdc:DatabaseConnection; string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; @@ -44,16 +99,54 @@ public type PostgresDatabaseConnection record {| string databaseName; string|string[] includedSchemas?; string|string[] excludedSchemas?; + string|string[] includedTables?; + string|string[] excludedTables?; + string|string[] includedColumns?; + string|string[] excludedColumns?; + cdc:MessageKeyColumns[] messageKeyColumns?; int tasksMax = 1; + # Deprecated: Use `replicationConfig.pluginName` instead. + @deprecated PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT; + # Deprecated: Use `replicationConfig.slotName` instead. + @deprecated string slotName = "debezium"; + # Deprecated: Use `publicationConfig.publicationName` instead. + @deprecated string publicationName = "dbz_publication"; + ReplicationConfiguration replicationConfig?; + PublicationConfiguration publicationConfig?; + StreamingConfiguration streamingConfig?; |}; -# The configuration for the PostgreSQL CDC listener. +# PostgreSQL CDC listener configuration including database connection, storage, and CDC options. # -# + database - The PostgreSQL database connection configuration +# + database - PostgreSQL database connection, logical decoding, and capture settings +# + options - PostgreSQL-specific CDC options including snapshot, heartbeat, signals, and data type handling public type PostgresListenerConfiguration record {| PostgresDatabaseConnection database; *cdc:ListenerConfiguration; + PostgreSqlOptions options = {}; +|}; + +# PostgreSQL-specific CDC options for configuring snapshot behavior and data type handling. +# +# + extendedSnapshot - Extended snapshot configuration with PostgreSQL-specific lock timeout and query settings +# + dataTypeConfig - Data type handling configuration including schema change tracking +# + heartbeatConfig - Heartbeat configuration for keeping the PostgreSQL replication slot active +public type PostgreSqlOptions record {| + *cdc:Options; + ExtendedSnapshotConfiguration extendedSnapshot?; + cdc:DataTypeConfiguration dataTypeConfig?; + cdc:RelationalHeartbeatConfiguration heartbeatConfig?; +|}; + +# Represents the extended snapshot configuration for the PostgreSQL CDC listener. +# +# + lockTimeout - Lock acquisition timeout in seconds +# + isolationMode - Transaction isolation level during snapshot +public type ExtendedSnapshotConfiguration record {| + *cdc:RelationalExtendedSnapshotConfiguration; + decimal lockTimeout = 10; + cdc:SnapshotIsolationMode isolationMode?; |}; diff --git a/ballerina/tests/listener_tests.bal b/ballerina/tests/listener_tests.bal index 2db0a31f5..72004b35d 100644 --- a/ballerina/tests/listener_tests.bal +++ b/ballerina/tests/listener_tests.bal @@ -257,3 +257,199 @@ function testCdcListenerEvents() returns error? { check testListener.gracefulStop(); } + +@test:Config {groups: ["postgres-replication"]} +function testPostgresReplicationConfiguration() { + map expectedProperties = { + "plugin.name": "decoderbufs", + "slot.name": "custom_slot", + "slot.drop.on.stop": "true", + "slot.stream.params": "include-unchanged-toast=true" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + replicationConfig: { + pluginName: DECODERBUFS, + slotName: "custom_slot", + slotDropOnStop: true, + slotStreamParams: "include-unchanged-toast=true" + } + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["plugin.name"], + expectedProperties["plugin.name"], + msg = "Plugin name does not match."); + test:assertEquals(actualProperties["slot.drop.on.stop"], + expectedProperties["slot.drop.on.stop"], + msg = "Slot drop on stop does not match."); +} + +@test:Config {groups: ["postgres-publication"]} +function testPostgresPublicationConfiguration() { + map expectedProperties = { + "publication.name": "my_publication", + "publication.autocreate.mode": "filtered" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + publicationConfig: { + publicationName: "my_publication", + publicationAutocreateMode: FILTERED + } + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["publication.name"], + expectedProperties["publication.name"], + msg = "Publication name does not match."); + test:assertEquals(actualProperties["publication.autocreate.mode"], + expectedProperties["publication.autocreate.mode"], + msg = "Publication autocreate mode does not match."); +} + +@test:Config {groups: ["postgres-streaming"]} +function testPostgresStreamingConfiguration() { + map expectedProperties = { + "status.update.interval.ms": "5000", + "xmin.fetch.interval.ms": "1000", + "lsn.flush.mode": "connector" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + streamingConfig: { + statusUpdateInterval: 5.0, + xminFetchInterval: 1, + lsnFlushMode: CONNECTOR + } + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["status.update.interval.ms"], + expectedProperties["status.update.interval.ms"], + msg = "Status update interval does not match."); + test:assertEquals(actualProperties["lsn.flush.mode"], + expectedProperties["lsn.flush.mode"], + msg = "LSN flush mode does not match."); +} + +@test:Config {groups: ["postgres-relational"]} +function testPostgresRelationalCommonConfiguration() { + map expectedProperties = { + "schema.include.list": "public,custom" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + includedSchemas: ["public", "custom"] + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["schema.include.list"], + expectedProperties["schema.include.list"], + msg = "Schema include list does not match."); +} + +@test:Config {groups: ["postgres-relational"]} +function testPostgresRelationalFilteringConfiguration() { + map expectedProperties = { + "schema.include.list": "public,custom", + "table.include.list": "public.users,public.orders", + "column.exclude.list": "public.*.password,public.*.ssn", + "message.key.columns": "public.users:id;public.orders:order_id" + }; + + PostgresDatabaseConnection connection = { + username: "testuser", + password: "testpass", + databaseName: "testdb", + includedSchemas: ["public", "custom"], + includedTables: ["public.users", "public.orders"], + excludedColumns: ["public.*.password", "public.*.ssn"], + messageKeyColumns: [ + {tableName: "public.users", columns: ["id"]}, + {tableName: "public.orders", columns: ["order_id"]} + ] + }; + + map actualProperties = {}; + populateDatabaseConfigurations(connection, actualProperties); + + test:assertEquals(actualProperties["schema.include.list"], + expectedProperties["schema.include.list"], + msg = "Schema include list does not match."); + test:assertEquals(actualProperties["table.include.list"], + expectedProperties["table.include.list"], + msg = "Table include list does not match."); + test:assertEquals(actualProperties["column.exclude.list"], + expectedProperties["column.exclude.list"], + msg = "Column exclude list does not match."); + test:assertEquals(actualProperties["message.key.columns"], + expectedProperties["message.key.columns"], + msg = "Message key columns does not match."); +} + +@test:Config {groups: ["postgres-snapshot"]} +function testPostgresExtendedSnapshotConfiguration() { + map expectedProperties = { + "snapshot.lock.timeout.ms": "20000" + }; + + PostgreSqlOptions options = { + extendedSnapshot: { + lockTimeout: 20 + } + }; + + map actualProperties = {}; + populateOptions(options, actualProperties); + + test:assertEquals(actualProperties["snapshot.lock.timeout.ms"], + expectedProperties["snapshot.lock.timeout.ms"], + msg = "Snapshot lock timeout does not match."); +} + +@test:Config {groups: ["postgres-options"]} +function testPostgresOptionsWithHeartbeat() { + map expectedProperties = { + "heartbeat.interval.ms": "15000", + "heartbeat.action.query": "SELECT NOW()" + }; + + PostgreSqlOptions options = { + heartbeatConfig: { + interval: 15, + actionQuery: "SELECT NOW()" + } + }; + + map actualProperties = {}; + cdc:populateOptions(options, actualProperties); + populateOptions(options, actualProperties); + + test:assertEquals(actualProperties["heartbeat.interval.ms"], + expectedProperties["heartbeat.interval.ms"], + msg = "Heartbeat interval does not match."); + test:assertEquals(actualProperties["heartbeat.action.query"], + expectedProperties["heartbeat.action.query"], + msg = "Heartbeat action query does not match."); +} diff --git a/ballerina/tests/test_cleanup.bal b/ballerina/tests/test_cleanup.bal new file mode 100644 index 000000000..28bbce329 --- /dev/null +++ b/ballerina/tests/test_cleanup.bal @@ -0,0 +1,11 @@ +import ballerina/file; +import ballerina/test; + +@test:AfterSuite +function cleanup() returns error? { + // delete the tmp directory created for tests + string tmpDirPath = "./tmp"; + if check file:test(tmpDirPath, file:EXISTS) { + check file:remove(tmpDirPath, file:RECURSIVE); + } +} diff --git a/ballerina/utils.bal b/ballerina/utils.bal index f84ab9614..a8f8e4951 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -13,6 +13,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +import ballerinax/cdc; + const string SCHEMA_INCLUDE_LIST = "schema.include.list"; const string SCHEMA_EXCLUDE_LIST = "schema.exclude.list"; @@ -21,25 +23,146 @@ const string POSTGRESQL_PLUGIN_NAME = "plugin.name"; const string POSTGRESQL_SLOT_NAME = "slot.name"; const string POSTGRESQL_PUBLICATION_NAME = "publication.name"; +// PostgreSQL-specific advanced configuration properties +const string SLOT_DROP_ON_STOP = "slot.drop.on.stop"; +const string PUBLICATION_AUTOCREATE_MODE = "publication.autocreate.mode"; +const string STATUS_UPDATE_INTERVAL_MS = "status.update.interval.ms"; +const string XMIN_FETCH_INTERVAL_MS = "xmin.fetch.interval.ms"; +const string LSN_FLUSH_MODE = "lsn.flush.mode"; +const string SLOT_STREAM_PARAMS = "slot.stream.params"; + +// Extended snapshot configuration properties +const string SNAPSHOT_LOCK_TIMEOUT_MS = "snapshot.lock.timeout.ms"; +const string SNAPSHOT_ISOLATION_MODE = "snapshot.isolation.mode"; + +// Populates PostgreSQL-specific relational filtering (table/column inclusion/exclusion and message key columns) +isolated function populateTableAndColumnFiltering(PostgresDatabaseConnection connection, map configMap) { + // Call CDC utility functions with direct parameters + cdc:populateTableAndColumnConfigurations( + connection.includedTables, + connection.excludedTables, + connection.includedColumns, + connection.excludedColumns, + configMap + ); + + cdc:populateMessageKeyColumnsConfiguration(connection.messageKeyColumns, configMap); +} + +isolated function populateDebeziumProperties(PostgresListenerConfiguration config, map debeziumConfigs) { + cdc:populateDebeziumProperties({ + engineName: config.engineName, + offsetStorage: config.offsetStorage, + internalSchemaStorage: config.internalSchemaStorage, + database: config.database, + options: config.options + }, debeziumConfigs); + populateDatabaseConfigurations(config.database, debeziumConfigs); + populateOptions(config.options, debeziumConfigs); +} + // Populates PostgreSQL-specific configurations -isolated function populatePostgresConfigurations(PostgresDatabaseConnection connection, map configMap) { - configMap[POSTGRESQL_DATABASE_NAME] = connection.databaseName; - populateSchemaConfigurations(connection, configMap); - configMap[POSTGRESQL_PLUGIN_NAME] = connection.pluginName; - configMap[POSTGRESQL_SLOT_NAME] = connection.slotName; - configMap[POSTGRESQL_PUBLICATION_NAME] = connection.publicationName; +isolated function populateDatabaseConfigurations(PostgresDatabaseConnection database, map debeziumConfigs) { + debeziumConfigs[POSTGRESQL_DATABASE_NAME] = database.databaseName; + + // Populate PostgreSQL-specific relational filtering + populateTableAndColumnFiltering(database, debeziumConfigs); + + populateSchemaConfigurations(database, debeziumConfigs); + + // Replication configuration: nested record takes priority over deprecated top-level fields + ReplicationConfiguration? replicationConfig = database.replicationConfig; + if replicationConfig is ReplicationConfiguration { + debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = replicationConfig.pluginName; + debeziumConfigs[POSTGRESQL_SLOT_NAME] = replicationConfig.slotName; + debeziumConfigs[SLOT_DROP_ON_STOP] = replicationConfig.slotDropOnStop.toString(); + string? slotStreamParams = replicationConfig.slotStreamParams; + if slotStreamParams !is () { + debeziumConfigs[SLOT_STREAM_PARAMS] = slotStreamParams; + } + } else { + // Fall back to deprecated top-level fields + debeziumConfigs[POSTGRESQL_PLUGIN_NAME] = database.pluginName; + debeziumConfigs[POSTGRESQL_SLOT_NAME] = database.slotName; + debeziumConfigs[SLOT_DROP_ON_STOP] = false.toString(); + } + + // Publication configuration: nested record takes priority over deprecated top-level field + PublicationConfiguration? publicationConfig = database.publicationConfig; + if publicationConfig is PublicationConfiguration { + debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = publicationConfig.publicationName; + debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = publicationConfig.publicationAutocreateMode.toString(); + } else { + // Fall back to deprecated top-level field + debeziumConfigs[POSTGRESQL_PUBLICATION_NAME] = database.publicationName; + debeziumConfigs[PUBLICATION_AUTOCREATE_MODE] = ALL_TABLES; + } + + // Streaming configuration + StreamingConfiguration? streamingConfig = database.streamingConfig; + if streamingConfig is StreamingConfiguration { + populateStreamingConfiguration(streamingConfig, debeziumConfigs); + } } // Populates schema inclusion/exclusion configurations -isolated function populateSchemaConfigurations(PostgresDatabaseConnection connection, map configMap) { +isolated function populateSchemaConfigurations(PostgresDatabaseConnection connection, map debeziumConfigs) { string|string[]? includedSchemas = connection.includedSchemas; if includedSchemas !is () { - configMap[SCHEMA_INCLUDE_LIST] = includedSchemas is string ? includedSchemas : string:'join(",", ...includedSchemas); + debeziumConfigs[SCHEMA_INCLUDE_LIST] = includedSchemas is string ? includedSchemas : string:'join(",", ...includedSchemas); } string|string[]? excludedSchemas = connection.excludedSchemas; if excludedSchemas !is () { - configMap[SCHEMA_EXCLUDE_LIST] = excludedSchemas is string ? excludedSchemas : string:'join(",", ...excludedSchemas); + debeziumConfigs[SCHEMA_EXCLUDE_LIST] = excludedSchemas is string ? excludedSchemas : string:'join(",", ...excludedSchemas); } } + +isolated function populateStreamingConfiguration(StreamingConfiguration config, map debeziumConfigs) { + debeziumConfigs[STATUS_UPDATE_INTERVAL_MS] = getMillisecondValueOf(config.statusUpdateInterval); + debeziumConfigs[XMIN_FETCH_INTERVAL_MS] = getMillisecondValueOf(config.xminFetchInterval); + LsnFlushMode? lsnFlushMode = config.lsnFlushMode; + if lsnFlushMode !is () { + debeziumConfigs[LSN_FLUSH_MODE] = lsnFlushMode.toString(); + } +} + +// Populates PostgreSQL-specific options +isolated function populateOptions(PostgreSqlOptions options, map debeziumConfigs) { + // Populate PostgreSQL-specific extended snapshot configuration + ExtendedSnapshotConfiguration? extendedSnapshot = options.extendedSnapshot; + if extendedSnapshot is ExtendedSnapshotConfiguration { + populateExtendedSnapshotConfiguration(extendedSnapshot, debeziumConfigs); + } + + // PostgreSQL uses generic cdc:DataTypeConfiguration (no PostgreSQL-specific extensions) + cdc:DataTypeConfiguration? dataTypeConfig = options.dataTypeConfig; + if dataTypeConfig is cdc:DataTypeConfiguration { + cdc:populateDataTypeConfiguration(dataTypeConfig, debeziumConfigs); + } + + // Populate relational heartbeat configuration + cdc:RelationalHeartbeatConfiguration? heartbeatConfig = options.heartbeatConfig; + if heartbeatConfig is cdc:RelationalHeartbeatConfiguration { + cdc:populateRelationalHeartbeatConfiguration(heartbeatConfig, debeziumConfigs); + } + + // Populate additional DB-specific options not present in base Options + cdc:populateAdditionalConfigurations(options, debeziumConfigs, typeof options); +} + +// Populates PostgreSQL-specific extended snapshot properties +isolated function populateExtendedSnapshotConfiguration(ExtendedSnapshotConfiguration config, map debeziumConfigs) { + cdc:populateRelationalExtendedSnapshotConfiguration(config, debeziumConfigs); + debeziumConfigs[SNAPSHOT_LOCK_TIMEOUT_MS] = getMillisecondValueOf(config.lockTimeout); + cdc:SnapshotIsolationMode? isolationMode = config.isolationMode; + if isolationMode is cdc:SnapshotIsolationMode { + debeziumConfigs[SNAPSHOT_ISOLATION_MODE] = isolationMode; + } +} + +isolated function getMillisecondValueOf(decimal value) returns string { + string milliSecondVal = (value * 1000).toBalString(); + return milliSecondVal.substring(0, milliSecondVal.indexOf(".") ?: milliSecondVal.length()); +} diff --git a/build.gradle b/build.gradle index 7a74cf5a4..8fca778b6 100644 --- a/build.gradle +++ b/build.gradle @@ -99,13 +99,9 @@ subprojects { ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" ballerinaStdLibs "io.ballerina.stdlib:postgresql.driver-ballerina:${stdlibPostgresqlDriverVersion}" - ballerinaStdLibs "io.ballerina.lib:avro-ballerina:${stdlibAvroVersion}" ballerinaStdLibs "io.ballerina.lib:cdc-ballerina:${stdlibCdcVersion}" ballerinaStdLibs "io.ballerina.lib:postgresql.cdc.driver-ballerina:${stdlibPostgresCdcDriverVersion}" - ballerinaStdLibs "io.ballerina.stdlib:kafka-ballerina:${stdlibKafkaVersion}" - ballerinaStdLibs "io.ballerina.lib:confluent.cavroserdes-ballerina:${stdlibConfluentAvroSerDesVersion}" - ballerinaStdLibs "io.ballerina.lib:confluent.cregistry-ballerina:${stdlibConfluentSchemaRegistryVersion}" } } diff --git a/changelog.md b/changelog.md index 60cb52f13..86d8f5115 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- [Introduce additional Debezium properties](https://github.com/ballerina-platform/ballerina-library/issues/8572) + ## [1.16.1] - 2024-06-13 ### Changed diff --git a/gradle.properties b/gradle.properties index 07710a5fa..606ec843a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=io.ballerina.stdlib -version=1.16.5-SNAPSHOT +version=1.17.0-SNAPSHOT ballerinaLangVersion=2201.12.0 checkstylePluginVersion=10.12.1 @@ -39,7 +39,6 @@ stdlibUrlVersion=2.6.0 stdlibConstraintVersion=1.7.0 stdlibCryptoVersion=2.9.3 stdlibTaskVersion=2.7.0 -stdlibAvroVersion=1.2.0 # Level 03 stdlibCacheVersion=3.10.0 @@ -60,8 +59,5 @@ stdlibTransactionVersion=1.12.0 # Ballerina extended library stdlibPostgresqlDriverVersion=1.6.1 -stdlibCdcVersion=1.2.0 +stdlibCdcVersion=1.3.0-20260319-091700-6e7e4b6 stdlibPostgresCdcDriverVersion=1.0.0 -stdlibConfluentAvroSerDesVersion=1.0.2 -stdlibConfluentSchemaRegistryVersion=0.4.3 -stdlibKafkaVersion=4.6.3