Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "postgresql"
version = "1.16.4"
version = "1.17.0"
authors = ["Ballerina"]
keywords = ["client", "network", "SQL", "RDBMS", "Vendor/PostgreSQL", "Area/Database", "Type/Connector", "Type/Trigger"]
repository = "https://github.com/ballerina-platform/module-ballerinax-postgresql"
Expand All @@ -15,8 +15,8 @@ graalvmCompatible = true
[[platform.java21.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "postgresql-native"
version = "1.16.4"
path = "../native/build/libs/postgresql-native-1.16.4.jar"
version = "1.17.0"
path = "../native/build/libs/postgresql-native-1.17.0-SNAPSHOT.jar"

[[platform.java21.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "postgresql-compiler-plugin"
class = "io.ballerina.stdlib.postgresql.compiler.PostgreSQLCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.16.4.jar"
path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.17.0-SNAPSHOT.jar"
72 changes: 5 additions & 67 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@
dependencies-toml-version = "2"
distribution-version = "2201.12.0"

[[package]]
org = "ballerina"
name = "avro"
version = "1.2.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "crypto"
Expand Down Expand Up @@ -75,6 +67,7 @@ modules = [
org = "ballerina"
name = "lang.__internal"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.object"}
Expand All @@ -99,16 +92,6 @@ dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.int"
version = "0.0.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.__internal"},
{org = "ballerina", name = "lang.object"}
]

[[package]]
org = "ballerina"
name = "lang.object"
Expand Down Expand Up @@ -224,17 +207,6 @@ modules = [
{org = "ballerina", packageName = "time", moduleName = "time"}
]

[[package]]
org = "ballerina"
name = "uuid"
version = "1.10.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.int"},
{org = "ballerina", name = "time"}
]

[[package]]
org = "ballerinai"
name = "observe"
Expand All @@ -247,57 +219,23 @@ dependencies = [
[[package]]
org = "ballerinax"
name = "cdc"
version = "1.2.0"
version = "1.3.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "data.jsondata"},
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "log"},
{org = "ballerinai", name = "observe"},
{org = "ballerinax", name = "kafka"}
{org = "ballerinai", name = "observe"}
]
modules = [
{org = "ballerinax", packageName = "cdc", moduleName = "cdc"}
]

[[package]]
org = "ballerinax"
name = "confluent.cavroserdes"
version = "1.0.2"
dependencies = [
{org = "ballerina", name = "avro"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerinai", name = "observe"},
{org = "ballerinax", name = "confluent.cregistry"}
]

[[package]]
org = "ballerinax"
name = "confluent.cregistry"
version = "0.4.3"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerinax"
name = "kafka"
version = "4.6.3"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"},
{org = "ballerina", name = "uuid"},
{org = "ballerinai", name = "observe"},
{org = "ballerinax", name = "confluent.cavroserdes"},
{org = "ballerinax", name = "confluent.cregistry"}
]

[[package]]
org = "ballerinax"
name = "postgresql"
version = "1.16.4"
version = "1.17.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "file"},
Expand Down Expand Up @@ -331,7 +269,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "postgresql.driver"
version = "1.6.1"
version = "1.6.2"
scope = "testOnly"
modules = [
{org = "ballerinax", packageName = "postgresql.driver", moduleName = "postgresql.driver"}
Expand Down
33 changes: 32 additions & 1 deletion ballerina/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,41 @@ You can create a CDC listener by specifying the required configurations such as
```ballerina
listener postgresql:CdcListener cdcListener = new (database = {
username: <username>,
password: <password>
password: <password>,
databaseName: "inventory"
});
```

#### Configure the database connection

The `database` parameter accepts a `PostgresDatabaseConnection` record with PostgreSQL-specific fields for logical replication, publication, and filtering:

```ballerina
listener postgresql:CdcListener cdcListener = new (database = {
username: "cdc_user",
password: "password",
databaseName: "inventory",
includedSchemas: ["public"],
includedTables: ["public.products", "public.orders"],
pluginName: postgresql:PGOUTPUT,
slotName: "my_slot",
publicationName: "my_publication",
publicationAutocreateMode: postgresql:FILTERED
}, options = {
heartbeatConfig: {
interval: 10
},
guardrailConfig: {
maxCollections: 100,
limitAction: cdc:WARN
}
});
```

#### Advanced options

The `options` parameter (type `PostgreSqlOptions`) exposes all fields from `cdc:Options`, including `heartbeatConfig`, `signalConfig`, `transactionMetadataConfig`, `columnTransformConfig`, `topicConfig`, `connectionRetryConfig`, `performanceConfig`. PostgreSQL-specific extensions include `extendedSnapshot` (for lock timeout) and `dataTypeConfig` (for binary and time precision handling). Refer to [`cdc:Options`](https://docs.central.ballerina.io/ballerinax/cdc/latest#Options) for the full list of available options. Because `PostgreSqlOptions` is an open record, raw Debezium properties can also be passed directly as additional fields.

#### Implement a service to handle CDC events

You can attach a service to the listener to handle CDC events. The service can define remote methods for different event types such as `onRead`, `onCreate`, `onUpdate`, and `onDelete`.
Expand Down
37 changes: 8 additions & 29 deletions ballerina/cdc_listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import ballerinax/cdc;
public isolated class CdcListener {
*cdc:Listener;

private final map<anydata> & readonly config;
private final map<string> & readonly debeziumConfigs;
private final map<anydata> & readonly listenerConfigs;
private boolean isStarted = false;
private boolean hasAttachedService = false;

Expand All @@ -28,33 +29,11 @@ public isolated class CdcListener {
# + config - The configuration for the Postgresql connector
public isolated function init(*PostgresListenerConfiguration config) {
map<string> debeziumConfigs = {};
cdc:populateDebeziumProperties({
engineName: config.engineName,
offsetStorage: config.offsetStorage,
internalSchemaStorage: config.internalSchemaStorage,
options: config.options
}, debeziumConfigs
);
cdc:populateDatabaseConfigurations({
connectorClass: config.database.connectorClass,
hostname: config.database.hostname,
port: config.database.port,
username: config.database.username,
password: config.database.password,
connectTimeout: config.database.connectTimeout,
tasksMax: config.database.tasksMax,
secure: config.database.secure,
includedTables: config.database.includedTables,
excludedTables: config.database.excludedTables,
includedColumns: config.database.includedColumns,
excludedColumns: config.database.excludedColumns
}, debeziumConfigs);
populatePostgresConfigurations(config.database, debeziumConfigs);
map<anydata> listenerConfigs = {
...debeziumConfigs
};
listenerConfigs["livenessInterval"] = config.livenessInterval;
self.config = listenerConfigs.cloneReadOnly();
map<anydata> listenerConfigs = {};
populateDebeziumProperties(config, debeziumConfigs);
cdc:populateListenerProperties(config, listenerConfigs);
self.debeziumConfigs = debeziumConfigs.cloneReadOnly();
self.listenerConfigs = listenerConfigs.cloneReadOnly();
}

# Attaches a CDC service to the Postgresql listener.
Expand All @@ -70,7 +49,7 @@ public isolated class CdcListener {
#
# + return - An `cdc:Error` if the listener cannot be started, or `()` if successful
public isolated function 'start() returns cdc:Error? {
check cdc:externStart(self, self.config);
check cdc:externStartWithExtendedConfigs(self, self.debeziumConfigs, self.listenerConfigs);
}

# Detaches a CDC service from the Postgresql listener.
Expand Down
105 changes: 99 additions & 6 deletions ballerina/listener_types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerinax/cdc;

# Represents the PostgreSQL logical decoding plugins.
Expand All @@ -24,18 +25,72 @@ public enum PostgreSQLLogicalDecodingPlugin {
DECODERBUFS = "decoderbufs"
}

# The configuration for the PostgreSQL CDC database connection.
# Represents publication autocreate modes.
public enum PublicationAutocreateMode {
ALL_TABLES = "all_tables",
DISABLED = "disabled",
FILTERED = "filtered"
}
Comment on lines +28 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

curl -fsSL https://debezium.io/documentation/reference/stable/connectors/postgresql.html |
  sed -n '/publication.autocreate.mode/,/replica.identity.autoset.values/p' |
  rg -n '`(all_tables|disabled|filtered|no_tables)`'

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Check the actual file to see current enum members
cat -n ballerina/listener_types.bal | head -40

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 1719


🏁 Script executed:

# Search for dependency declarations (look for build files, gradle, maven, etc.)
fd -e 'gradle|xml|toml|yaml|yml|lock' -type f | head -20

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 261


🏁 Script executed:

# Try alternative approach to check Debezium docs - simpler curl without piping to external tools
curl -fsSL 'https://debezium.io/documentation/reference/stable/connectors/postgresql.html' 2>&1 | grep -i 'publication.autocreate.mode' | head -5

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 1680


🏁 Script executed:

# Search for dependency/version files in the repository
fd -type f \( -name "*.gradle" -o -name "pom.xml" -o -name "Ballerina.toml" -o -name "*.lock" \)

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 261


🌐 Web query:

Debezium PostgreSQL 3.0.8 publication.autocreate.mode no_tables

💡 Result:

In Debezium PostgreSQL connector version 3.0.8, the publication.autocreate.mode configuration property controls how the connector handles creation of the PostgreSQL publication used for logical replication with the pgoutput plugin. The no_tables mode creates (or uses an existing) publication with no tables assigned to it initially. The user must then manually add tables to the publication using SQL commands like ALTER PUBLICATION mypub ADD TABLE myschema.mytable;. This is useful when the connector lacks privileges to auto-add tables or when fine-grained control over publication tables is needed. This option was added via DBZ-8156 and documented in a merged pull request in August 2024, present in Debezium 3.0.8 (released March 2025). Other common modes include: - disabled: No auto-creation; publication must pre-exist. - all_tables: Creates publication for all tables (default in older versions). - filtered: Creates publication based on table.include.list/table.exclude.list filters. Configure as: "publication.autocreate.mode": "no_tables"

Citations:


🏁 Script executed:

# Find dependency files using simpler approach
find . -maxdepth 3 -type f \( -name "Ballerina.toml" -o -name "gradle.build" -o -name "pom.xml" -o -name "*.gradle" \) 2>/dev/null

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 688


🏁 Script executed:

# Search for "debezium" references in the repository
rg -i "debezium|publication.autocreate" --type-list | head -5 && rg -i "debezium|publication" -A 2 -B 2 | head -40

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 2881


🏁 Script executed:

# Check main build.gradle for dependencies
cat build.gradle | grep -i debezium -A 2 -B 2

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Check native build.gradle
cat native/build.gradle | grep -i debezium -A 2 -B 2

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Read the native/build.gradle file to see dependencies
cat native/build.gradle | head -80

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 2803


🏁 Script executed:

# Broader search for debezium anywhere in the repo
rg 'debezium' -i --max-count 20

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 3615


🏁 Script executed:

# Check gradle.properties for version variables
cat gradle.properties | grep -i 'cdc\|debezium' -A 1 -B 1

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 186


🏁 Script executed:

# Check ballerina/Ballerina.toml for dependencies
cat ballerina/Ballerina.toml | head -40

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 830


🏁 Script executed:

# Check what version of cdc driver is being used and search for its Debezium version
# Look at settings.gradle to understand dependency resolution
cat settings.gradle | grep -i cdc -A 3 -B 3

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🌐 Web query:

ballerinax postgresql.cdc.driver 1.0.0 Debezium version

💡 Result:

The ballerinax/postgresql.cdc.driver version 1.0.0 bundles Debezium Postgres Driver version 3.0.8.Final.

Citations:


🏁 Script executed:

# Check if there are any other references to the actual Debezium version in this repo
rg 'debezium.*3\.0' -i

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


Add the missing NO_TABLES enum member to PublicationAutocreateMode.

The currently published ballerinax/postgresql.cdc.driver 1.0.0 bundles Debezium Postgres Driver 3.0.8.Final, which supports publication.autocreate.mode=no_tables. Omitting it from the enum blocks users from configuring this valid mode through the public API.

Fix
 public enum PublicationAutocreateMode {
     ALL_TABLES = "all_tables",
     DISABLED = "disabled",
-    FILTERED = "filtered"
+    FILTERED = "filtered",
+    NO_TABLES = "no_tables"
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Represents publication autocreate modes.
public enum PublicationAutocreateMode {
ALL_TABLES = "all_tables",
DISABLED = "disabled",
FILTERED = "filtered"
}
# Represents publication autocreate modes.
public enum PublicationAutocreateMode {
ALL_TABLES = "all_tables",
DISABLED = "disabled",
FILTERED = "filtered",
NO_TABLES = "no_tables"
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ballerina/listener_types.bal` around lines 28 - 33, Add the missing enum
member to PublicationAutocreateMode so callers can use the Debezium-supported
mode "no_tables": add a new enum constant named NO_TABLES with the string value
"no_tables" to the public enum PublicationAutocreateMode (alongside ALL_TABLES,
DISABLED, FILTERED) so the public API exposes that valid mode.


# Represents LSN flush modes.
public enum LsnFlushMode {
MANUAL = "manual",
CONNECTOR = "connector",
CONNECTOR_AND_DRIVER = "connector_and_driver"
}
Comment on lines +35 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Candidate manifest files =="
fd '^(Ballerina\.toml|Dependencies\.toml|Package\.md|README\.md)$' -x sh -c '
  printf "\n--- %s ---\n" "$1"
  sed -n "1,220p" "$1"
' sh {}

echo
echo "== Debezium / CDC driver references =="
rg -n 'postgresql\.cdc\.driver|debezium-connector-postgres|Debezium Postgres Driver|lsn\.flush\.mode|connector_and_driver' .

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 29179


🏁 Script executed:

# Check the full postgresql.cdc.driver version in Dependencies.toml
cat ballerina/Dependencies.toml | grep -A 10 'name = "postgresql.cdc.driver"'

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 583


🏁 Script executed:

# Check build.gradle for CDC driver version
grep -n 'stdlibPostgresCdcDriverVersion\|postgresql.*cdc.*driver' build.gradle | head -20

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 204


🏁 Script executed:

# Check if there's a gradle.properties or similar with version definitions
fd '(gradle\.properties|version\.txt|versions\.gradle)$' -x cat {}

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 1257


🏁 Script executed:

# Search for any Debezium version references in the native build or pom files
fd '(pom\.xml|build\.gradle)' -x grep -l 'debezium\|cdc.*driver' {} 2>/dev/null | head -5

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 105


🏁 Script executed:

# Find and examine native build files for Debezium/CDC driver dependencies
find ./native -name 'pom.xml' -o -name 'build.gradle' 2>/dev/null | head -5

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 112


🏁 Script executed:

# Check for any native module files that might reference Debezium version
fd 'pom\.xml' ./native -x grep -l 'debezium' {} 2>/dev/null

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Search for any version or release documentation
fd '(CHANGELOG|RELEASE|NEWS|VERSION)' -i | grep -i 'cdc\|debezium' | head -10

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Check native build gradle files for CDC driver or Debezium references
find ./native -name '*.gradle' -x grep -n 'debezium\|cdc.*driver\|postgres.*connector' {} 2>/dev/null

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 73


🏁 Script executed:

# Examine native/build.gradle for CDC driver dependencies
cat ./native/build.gradle | grep -A 5 -B 5 'cdc\|debezium\|postgres' -i

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 934


🏁 Script executed:

# Check if there's a separate CDC driver repository or if it's defined in build
rg 'postgresql\.cdc\.driver|Debezium' ballerina/Dependencies.toml -A 5 -B 2

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 705


🏁 Script executed:

# Search for any gradle property or variable that tracks CDC driver bundled Debezium version
rg 'debezium.*version|cdc.*version' -i gradle.properties build.gradle

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 400


🏁 Script executed:

# Check if this PR changes any CDC driver version by looking for gradle version assignments
grep -r 'stdlibPostgresCdcDriverVersion\|postgresql.cdc.driver' gradle.properties build.gradle native/build.gradle 2>/dev/null

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 268


🏁 Script executed:

# Search for any Debezium or version configuration in tests
rg 'debezium|3\.0\.|3\.4\.' -i ballerina/tests listener_tests.bal | head -20

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 149


🏁 Script executed:

# Check README or documentation for Debezium version requirements
cat ballerina/README.md | grep -A 5 -B 5 'debezium\|cdc.*driver\|3\.0\|3\.4' -i

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 999


🏁 Script executed:

# Search for any version constraints or compatibility notes
rg 'version.*3\.' ballerina/ -A 2 -B 2

Repository: ballerina-platform/module-ballerinax-postgresql

Length of output: 336


🌐 Web query:

ballerinax/postgresql.cdc.driver version 1.0.0 Debezium

💡 Result:

ballerinax/postgresql.cdc.driver version 1.0.0 is a Ballerina library that provides the Debezium PostgreSQL connector dependencies for Change Data Capture (CDC) functionality in Ballerina applications. It enables seamless listening to changes (inserts, updates, deletes) in PostgreSQL databases. Key details: - Released on 2025-05-29 (latest and only release as of 2026-03-19). - Compatible with Ballerina Language version 2201.12.0. - Bundles Debezium Postgres Driver version 3.0.8.Final. - Usage: Import alongside ballerinax/cdc and ballerinax/postgresql modules: import ballerinax/cdc; import ballerinax/postgresql; import ballerinax/postgresql.cdc.driver as _; Then configure a postgresql:CdcListener to capture events with remote functions like onCreate, onUpdate, onDelete. It is a support module for Ballerina's CDC framework, which is built on Debezium. No additional driver configuration needed as it's pre-bundled. Available on Ballerina Central: https://central.ballerina.io/ballerinax/postgresql.cdc.driver/latest. Source: https://github.com/ballerina-platform/module-ballerinax-postgresql.cdc.driver.

Citations:


Remove CONNECTOR_AND_DRIVER from the public API or upgrade the bundled CDC driver to support Debezium 3.4+.

The enum value CONNECTOR_AND_DRIVER requires Debezium Postgres Driver 3.4 or later to support the lsn.flush.mode property. The bundled ballerinax/postgresql.cdc.driver:1.0.0 currently provides only Debezium 3.0.8.Final, which does not recognize this configuration value. Code using this enum will compile but fail at runtime when Debezium attempts to parse it. This PR does not upgrade the CDC driver version, so either the enum should be removed from the public API for now, or the CDC driver dependency must be upgraded to a version supporting Debezium 3.4+.

Also applies to: lines 69-72

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ballerina/listener_types.bal` around lines 35 - 40, The public enum
LsnFlushMode exposes a value CONNECTOR_AND_DRIVER that requires Debezium 3.4+
but the bundled CDC driver (ballerinax/postgresql.cdc.driver:1.0.0) contains
Debezium 3.0.8 and will fail at runtime; fix by either removing the
CONNECTOR_AND_DRIVER enum member from LsnFlushMode (and any other references to
it) so the public API only exposes supported values (MANUAL, CONNECTOR) and
update any docs/tests that reference it, or instead update the CDC driver
dependency to a version that bundles Debezium 3.4+ and adjust build/dependency
metadata accordingly; target symbols: enum LsnFlushMode and the
CONNECTOR_AND_DRIVER variant (also remove/update the duplicate occurrence
referenced at the later LsnFlushMode definition).


# PostgreSQL replication configuration (logical decoding).
#
# + pluginName - Logical decoding plugin to use (pgoutput, decoderbufs)
# + slotName - Name of the PostgreSQL logical replication slot
# + slotDropOnStop - Drop replication slot when connector stops
# + slotStreamParams - Custom replication slot parameters
public type ReplicationConfiguration record {|
PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT;
string slotName = "debezium";
boolean slotDropOnStop = false;
string slotStreamParams?;
|};

# PostgreSQL publication configuration (pgoutput plugin).
#
# + publicationName - Name of PostgreSQL publication
# + publicationAutocreateMode - Mode for auto-creating publications
public type PublicationConfiguration record {|
string publicationName = "dbz_publication";
PublicationAutocreateMode publicationAutocreateMode = ALL_TABLES;
|};
Comment on lines +48 to +62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject mixed deprecated and nested configs, or merge them field-by-field.

ballerina/utils.bal:74-100 switches wholesale to replicationConfig / publicationConfig when those records are present. Because these new records prefill pluginName, slotName, and publicationName, a partial migration such as {pluginName: DECODERBUFS, replicationConfig: {slotDropOnStop: true}} silently resolves to pgoutput / debezium instead of preserving the legacy values. That is a behavior change, not just a deprecation path.

Also applies to: 108-119

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ballerina/listener_types.bal` around lines 48 - 62, The code accepts both
legacy fields and new nested records (ReplicationConfiguration and
PublicationConfiguration) but currently overrides legacy values when nested
records are present; update the logic around replicationConfig and
publicationConfig (in the code that currently switches wholesale) to either
reject mixed usage or merge field-by-field: detect if both legacy fields and the
nested record are provided and return an error, or iterate each field in
ReplicationConfiguration and PublicationConfiguration and copy only
non-nil/non-default values into the final config so legacy values are preserved
when nested records omit fields (reference ReplicationConfiguration,
PublicationConfiguration, replicationConfig, publicationConfig, and the
functions that perform the conversion).


# PostgreSQL streaming and status configuration.
#
# + statusUpdateInterval - Interval for sending status updates to PostgreSQL in seconds
# + xminFetchInterval - Interval for fetching current xmin position in seconds
# + lsnFlushMode - LSN flushing strategy
public type StreamingConfiguration record {|
decimal statusUpdateInterval = 10;
decimal xminFetchInterval = 0;
LsnFlushMode lsnFlushMode?;
|};

# Represents the configuration for the Postgres CDC database connection.
#
# + connectorClass - The class name of the PostgreSQL connector implementation to use
# + hostname - The hostname of the PostgreSQL server
# + port - The port number of the PostgreSQL server
# + databaseName - The name of the PostgreSQL database from which to stream the changes.
# + includedSchemas - A list of regular expressions matching fully-qualified schema identifiers to capture changes from
# + excludedSchemas - A list of regular expressions matching fully-qualified schema identifiers to exclude from change capture
# + includedTables - Regex patterns for tables to capture (mutually exclusive with `excludedTables`)
# + excludedTables - Regex patterns for tables to exclude (mutually exclusive with `includedTables`)
# + includedColumns - Regex patterns for columns to capture (mutually exclusive with `excludedColumns`)
# + excludedColumns - Regex patterns for columns to exclude (mutually exclusive with `includedColumns`)
# + messageKeyColumns - Composite message key columns for change events
# + tasksMax - The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable
# + pluginName - The name of the PostgreSQL logical decoding plug-in installed on the server
# + slotName - The name of the PostgreSQL logical decoding slot
# + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput.
# + pluginName - Deprecated: Use `replicationConfig.pluginName` instead
# + slotName - Deprecated: Use `replicationConfig.slotName` instead
# + publicationName - Deprecated: Use `publicationConfig.publicationName` instead
# + replicationConfig - Replication configuration (logical decoding plugin, slot name and parameters). Takes priority over deprecated top-level fields
# + publicationConfig - Publication configuration (publication name and autocreate mode). Takes priority over deprecated top-level fields
public type PostgresDatabaseConnection record {|
Comment on lines +92 to 94
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document streamingConfig in the public record docs.

Line 119 adds a new public field, but the PostgresDatabaseConnection doc block never lists or describes streamingConfig. That leaves the generated API docs incomplete for one of the new entry points.

Also applies to: 117-119

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ballerina/listener_types.bal` around lines 92 - 94, The
PostgresDatabaseConnection public record's doc block is missing documentation
for the new streamingConfig field; update the PostgresDatabaseConnection
documentation to add a description for streamingConfig (what it configures,
expected type/shape, and any relation/priority with
replicationConfig/publicationConfig), ensuring the doc mentions the
streamingConfig field name and its purpose so the generated API docs include
this new entry point.

*cdc:DatabaseConnection;
string connectorClass = "io.debezium.connector.postgresql.PostgresConnector";
Expand All @@ -44,16 +99,54 @@ public type PostgresDatabaseConnection record {|
string databaseName;
string|string[] includedSchemas?;
string|string[] excludedSchemas?;
string|string[] includedTables?;
string|string[] excludedTables?;
string|string[] includedColumns?;
string|string[] excludedColumns?;
cdc:MessageKeyColumns[] messageKeyColumns?;
int tasksMax = 1;
# Deprecated: Use `replicationConfig.pluginName` instead.
@deprecated
PostgreSQLLogicalDecodingPlugin pluginName = PGOUTPUT;
# Deprecated: Use `replicationConfig.slotName` instead.
@deprecated
string slotName = "debezium";
# Deprecated: Use `publicationConfig.publicationName` instead.
@deprecated
string publicationName = "dbz_publication";
ReplicationConfiguration replicationConfig?;
PublicationConfiguration publicationConfig?;
StreamingConfiguration streamingConfig?;
|};

# The configuration for the PostgreSQL CDC listener.
# PostgreSQL CDC listener configuration including database connection, storage, and CDC options.
#
# + database - The PostgreSQL database connection configuration
# + database - PostgreSQL database connection, logical decoding, and capture settings
# + options - PostgreSQL-specific CDC options including snapshot, heartbeat, signals, and data type handling
public type PostgresListenerConfiguration record {|
PostgresDatabaseConnection database;
*cdc:ListenerConfiguration;
PostgreSqlOptions options = {};
|};

# PostgreSQL-specific CDC options for configuring snapshot behavior and data type handling.
#
# + extendedSnapshot - Extended snapshot configuration with PostgreSQL-specific lock timeout and query settings
# + dataTypeConfig - Data type handling configuration including schema change tracking
# + heartbeatConfig - Heartbeat configuration for keeping the PostgreSQL replication slot active
public type PostgreSqlOptions record {|
*cdc:Options;
ExtendedSnapshotConfiguration extendedSnapshot?;
cdc:DataTypeConfiguration dataTypeConfig?;
cdc:RelationalHeartbeatConfiguration heartbeatConfig?;
|};

# Represents the extended snapshot configuration for the PostgreSQL CDC listener.
#
# + lockTimeout - Lock acquisition timeout in seconds
# + isolationMode - Transaction isolation level during snapshot
public type ExtendedSnapshotConfiguration record {|
*cdc:RelationalExtendedSnapshotConfiguration;
decimal lockTimeout = 10;
cdc:SnapshotIsolationMode isolationMode?;
|};
Loading
Loading