feat: MongoDB upsert sink, aligned connection API, improved connection UI, and end-to-end masking pipeline tests#66
Conversation
…d example config - MongoDBConnector.writeData: use bulkWrite with upsert when _id is present, insertMany otherwise - DataConnection entity: add host field for display/edit UX - DataConnectionRequest: make connectionString optional (blank = keep existing on update) - DataConnectionResponse: add host field - DataConnectionService: extract host from connection string, handle optional connectionString on update - ConnectionsView.vue: dedicated MongoDB connection string field, isSource/isDestination checkboxes, JDBC URL construction for SQL types - types/index.ts: align DataConnection and DataConnectionRequest with backend API - MongoDBConnectorTest: add upsert and mixed-row tests - docs/mongodb-masking-example.yml: example config file Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/f54acdb5-0ab3-4b16-9fff-a3b88dc7d77f Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
…, redundant doc access, label clarity) Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/f54acdb5-0ab3-4b16-9fff-a3b88dc7d77f Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds MongoDB write/upsert support and introduces role-based connections plus a host display field, updating both backend DTOs/services and the frontend connections UI/types.
Changes:
- Backend: MongoDB
writeDatanow upserts documents with_idviabulkWriteand inserts others viainsertMany;DataConnectionnow stores a derivedhoststring and supports optionalconnectionStringon update. - Frontend: Connection form/UI updated to handle MongoDB URI input vs SQL host/port inputs, and to support
isSource/isDestinationroles; types updated to match API. - Tests/Docs: Added MongoDB upsert tests, adjusted controller test for
host, and added a MongoDB masking example YAML.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| frontend/src/views/ConnectionsView.vue | New UI flow for MongoDB URI vs SQL host/port; adds Source/Destination roles; builds request payloads. |
| frontend/src/types/index.ts | Aligns frontend connection types with backend: connectionString, nullable host, and role flags. |
| docs/mongodb-masking-example.yml | Example workspace config for selective attribute masking with MongoDB source/destination. |
| backend/src/main/kotlin/com/opendatamask/adapter/output/connector/MongoDBConnector.kt | Implements upsert behavior for MongoDB writes using ReplaceOneModel(..., upsert=true). |
| backend/src/main/kotlin/com/opendatamask/application/service/DataConnectionService.kt | Makes connectionString optional on update, derives/stores host, and updates password only when provided. |
| backend/src/main/kotlin/com/opendatamask/domain/model/DataConnection.kt | Adds persisted host field for display/edit UX. |
| backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/DataConnectionDto.kt | Updates request/response DTOs: optional connectionString and new host field in responses. |
| backend/src/test/kotlin/com/opendatamask/adapter/output/connector/MongoDBConnectorTest.kt | Adds tests for insert-only, upsert-only, and mixed write behavior. |
| backend/src/test/kotlin/com/opendatamask/adapter/input/rest/DataConnectionControllerTest.kt | Updates expected response to include host. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val connection = findConnection(workspaceId, connectionId) | ||
| connection.name = request.name | ||
| connection.type = request.type | ||
| connection.connectionString = encryptionPort.encrypt(request.connectionString) | ||
| if (!request.connectionString.isNullOrBlank()) { | ||
| connection.connectionString = encryptionPort.encrypt(request.connectionString) | ||
| connection.host = extractHost(request.type, request.connectionString) | ||
| } |
There was a problem hiding this comment.
updateConnection allows changing type without providing a new connectionString (since it’s optional). That can leave the old connection string in place while switching connector type, producing a broken/invalid connection. If request.type != connection.type and connectionString is blank, return a 400 (or force the client to send a new connection string when changing type).
| ConnectionType.POSTGRESQL, ConnectionType.MYSQL, ConnectionType.AZURE_SQL -> { | ||
| // JDBC URL formats: | ||
| // jdbc:postgresql://host:port/db | ||
| // jdbc:mysql://host:port/db | ||
| // jdbc:sqlserver://host:port;databaseName=db;... (semicolon-delimited params) |
There was a problem hiding this comment.
extractHost treats Azure SQL JDBC URLs the same as Postgres/MySQL, but Azure SQL uses ; separators (e.g. jdbc:sqlserver://host:1433;databaseName=...). The current parsing returns host:1433;databaseName=... rather than just host:1433. Handle ConnectionType.AZURE_SQL separately (e.g. split on ; before ?) so host is clean for display/editing.
| // SQL-specific form fields (used to build the JDBC connection string) | ||
| // Shared form fields | ||
| const form = ref<DataConnectionRequest & { host: string; port: number; sslEnabled: boolean }>({ | ||
| name: '', | ||
| type: ConnectionType.POSTGRESQL, | ||
| connectionString: '', |
There was a problem hiding this comment.
sqlForm is declared but never used. With the current ESLint config this will be reported as an unused variable and can fail CI. Remove sqlForm or wire it into the form state if it’s intended to be used.
| } | ||
| } else { | ||
| // For SQL types, host, database and username are required | ||
| if (!form.value.host || !form.value.database || !form.value.username) { | ||
| formError.value = 'Host, database, and username are required.' | ||
| return false |
There was a problem hiding this comment.
When editing an existing connection, switching the type to MongoDB allows an empty Connection URI (because editingConnection is set). Submitting then omits connectionString, so the backend keeps the old (non-Mongo) connection string while changing type, resulting in an invalid connection. Require a URI when the type is changed to a Mongo type (or always require URI for Mongo on update if it’s blank).
| isDestination: conn.isDestination | ||
| } | ||
| formError.value = '' | ||
| showModal.value = true | ||
| } | ||
|
|
There was a problem hiding this comment.
sslEnabled is always reset to false when editing a connection. Since SQL updates always rebuild and send a connection string, this can unintentionally disable SSL (and drop any other existing JDBC options) even if the user didn’t change anything. Consider either persisting/parsing SSL state, or avoiding sending a regenerated connection string unless the user explicitly changed connection details.
| database: form.value.database || undefined, | ||
| isSource: form.value.isSource, | ||
| isDestination: form.value.isDestination | ||
| } | ||
|
|
||
| // Only include connectionString if it is non-empty (on update, blank = keep existing) | ||
| if (connectionString) { | ||
| payload.connectionString = connectionString | ||
| } | ||
| if (!editingConnection.value && !form.value.password) { | ||
| formError.value = 'Password is required.' | ||
| return | ||
| if (form.value.password) { | ||
| payload.password = form.value.password | ||
| } | ||
|
|
||
| saving.value = true |
There was a problem hiding this comment.
For SQL types, buildConnectionString() returns a non-empty string after validation, so payload.connectionString is always set on update. This defeats the "omit/blank = keep existing" behavior and forces overwriting the stored connection string on every edit (potentially dropping custom params). Only include connectionString on update when the user actually changed connection inputs, or provide an explicit "regenerate connection string" action.
| val connStr = request.connectionString | ||
| ?: throw IllegalArgumentException("Connection string is required when creating a connection") | ||
| val connection = DataConnection( | ||
| workspaceId = workspaceId, | ||
| name = request.name, | ||
| type = request.type, | ||
| connectionString = encryptionPort.encrypt(request.connectionString), | ||
| connectionString = encryptionPort.encrypt(connStr), | ||
| host = extractHost(request.type, connStr), | ||
| username = request.username, |
There was a problem hiding this comment.
createConnection only checks connectionString for null, so an empty/blank string will be accepted and persisted, despite the API conceptually requiring a usable connection string. Use isNullOrBlank() (or validation annotations/groups) so create rejects blank values with a 400.
|
@copilot apply changes based on the comments in this thread |
…ectionsView - createConnection: reject blank (not just null) connection strings via isNullOrBlank() - updateConnection: require new connection string when type changes - ConnectionsView: track sqlConnectionChanged flag; only rebuild/send connection string for SQL types when host/port/database/ssl actually changed (prevents inadvertent SSL drop) - ConnectionsView: require MongoDB URI when type is switched to Mongo on edit - ConnectionsView: remove stale comment, add hint about unchanged connection string - Tests: add cases for blank connStr, null connStr, type-change-without-connStr, and keep-existing-connStr on update Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/84218738-0407-4580-9bfa-5096657e0e5a Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
Applied all changes from the review thread in commit Backend (
Frontend (
New tests: blank connection string on create, null connection string on create, type-change-without-connection-string on update, keep-existing connection string on update. Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
|
@copilot extend the verification tests to replicate the existing solution to include mongodbL setting up a test source mongo collection with example data, reading this, masking & transforming into target collection. |
- InMemoryMongoDBConnector: test-only subclass of MongoDBConnector that stores all documents in-memory (HashMap), faithfully replicating the upsert-vs-insert logic from the production connector - MongoDBMaskingPipelineTest: 9 end-to-end tests covering: 1. Basic masking pipeline: NAME/EMAIL/PHONE/SSN masked, non-sensitive fields preserved 2. Passthrough mode: all rows copied unchanged 3. Row limit respected when fetching from source 4. Upsert: masked rows with _id replace existing docs without duplication 5. Mixed batch: rows with and without _id handled correctly 6. Truncate: clean target before re-run 7. CONSISTENT mode: deterministic output for same input 8. NULL generator: column set to null in target 9. Empty source collection: zero rows written to target Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/01d7fb21-77e4-4590-a588-f4f2e4a9e9f7 Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
Added in commit The new file
Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
|
@copilot extend the mongodb verification test to use a real mongo instance, potentially in docker. remove the hashmap implementation. |
…DB instance - Add testcontainers, testcontainers:mongodb, testcontainers:junit-jupiter dependencies (version 1.19.3, managed by Spring Boot BOM) - Remove InMemoryMongoDBConnector (HashMap-based test double) - Rewrite MongoDBMaskingPipelineTest to use a real MongoDBContainer (mongo:7.0) shared across all 9 test methods via @Testcontainers/@container - @beforeeach now drops source_db and target_db entirely via MongoClients, ensuring a clean state without enumerating individual collections - All assertions unchanged; tests now exercise the real MongoDB driver upsert/insert/fetch/truncate paths end-to-end Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/069d54fc-d273-4289-81fa-953259536559 Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
Done in commit
Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (isMongoType.value) { | ||
| // Require a URI on create, or when the type has changed to Mongo (old string is for a different type) | ||
| const uriRequired = !editingConnection.value || typeChanged | ||
| if (uriRequired && !form.value.connectionString) { |
There was a problem hiding this comment.
MongoDB URI validation doesn’t trim whitespace. A value like ' ' will pass validateForm() (truthy string) but later be omitted from the payload after .trim(), resulting in a backend error on create/type-change. Consider validating connectionString.trim().length > 0 when the URI is required.
| if (uriRequired && !form.value.connectionString) { | |
| if (uriRequired && form.value.connectionString.trim().length === 0) { |
| @Test | ||
| fun `writeData uses bulkWrite upsert when rows contain _id field`() { | ||
| val mockCollection = mock<MongoCollection<Document>>() | ||
| val mockDb = mock<MongoDatabase>() | ||
| val mockClient = mock<MongoClient>() | ||
| whenever(mockClient.getDatabase("testdb")).thenReturn(mockDb) | ||
| whenever(mockDb.getCollection("users")).thenReturn(mockCollection) | ||
|
|
||
| val connector = createConnector(mockClient) | ||
| val rows = listOf( | ||
| mapOf("_id" to "abc123", "name" to "Alice"), | ||
| mapOf("_id" to "def456", "name" to "Bob") | ||
| ) | ||
| val count = connector.writeData("users", rows) | ||
| assertEquals(2, count) | ||
| verify(mockCollection).bulkWrite(any<List<WriteModel<Document>>>()) | ||
| verify(mockCollection, never()).insertMany(any()) | ||
| } |
There was a problem hiding this comment.
The upsert-focused unit tests only verify that bulkWrite() was called, but they don’t assert that the ReplaceOneModel operations are configured with upsert=true and the expected _id filter. Without that, a regression could switch to non-upserting writes and these tests would still pass. Consider capturing the argument to bulkWrite() and asserting the models/options.
| @Testcontainers | ||
| class MongoDBMaskingPipelineTest { | ||
|
|
||
| companion object { | ||
| @Container | ||
| @JvmStatic | ||
| val mongoContainer: MongoDBContainer = MongoDBContainer(DockerImageName.parse("mongo:7.0")) |
There was a problem hiding this comment.
This new Testcontainers-based test suite will make ./gradlew test depend on a working Docker runtime. If Docker isn’t available (local dev machines, some CI runners), the whole build may fail. Consider tagging these as integration tests and/or using Testcontainers’ @Testcontainers(disabledWithoutDocker = true) so they’re skipped when Docker isn’t present.
| const { type, host, port, database, sslEnabled } = form.value | ||
| switch (type) { | ||
| case ConnectionType.POSTGRESQL: | ||
| return `jdbc:postgresql://${host}:${port}/${database}${sslEnabled ? '?ssl=require' : ''}` |
There was a problem hiding this comment.
PostgreSQL SSL JDBC parameter looks incorrect: ?ssl=require is not a valid/standard postgres-jdbc setting (the ssl property is boolean, and enforcing SSL is usually done via sslmode=require or ssl=true&sslmode=require). As-is, the driver may ignore SSL or treat it as false, breaking the “Enable SSL” toggle.
| return `jdbc:postgresql://${host}:${port}/${database}${sslEnabled ? '?ssl=require' : ''}` | |
| return `jdbc:postgresql://${host}:${port}/${database}${sslEnabled ? '?sslmode=require' : ''}` |
| function openEdit(conn: DataConnection) { | ||
| editingConnection.value = conn | ||
| originalType.value = conn.type | ||
| sqlConnectionChanged.value = false | ||
| // Parse host/port back from stored host string for SQL types (e.g. "localhost:5432") | ||
| let host = 'localhost' | ||
| let port = defaultPorts[conn.type] ?? 5432 | ||
| if (conn.host && !mongoTypes.has(conn.type)) { | ||
| const parts = conn.host.split(':') | ||
| host = parts[0] ?? 'localhost' | ||
| const parsedPort = parts[1] ? parseInt(parts[1], 10) : NaN | ||
| port = isNaN(parsedPort) ? (defaultPorts[conn.type] ?? 5432) : parsedPort | ||
| } |
There was a problem hiding this comment.
conn.host.split(':') is fragile for hosts containing colons (IPv6 literals) and can also mis-parse if the stored host ever includes multiple hosts/ports. Consider parsing using a safer approach (e.g., handle bracketed IPv6 [::1]:5432, or split on the last : only when appropriate).
| function openEdit(conn: DataConnection) { | |
| editingConnection.value = conn | |
| originalType.value = conn.type | |
| sqlConnectionChanged.value = false | |
| // Parse host/port back from stored host string for SQL types (e.g. "localhost:5432") | |
| let host = 'localhost' | |
| let port = defaultPorts[conn.type] ?? 5432 | |
| if (conn.host && !mongoTypes.has(conn.type)) { | |
| const parts = conn.host.split(':') | |
| host = parts[0] ?? 'localhost' | |
| const parsedPort = parts[1] ? parseInt(parts[1], 10) : NaN | |
| port = isNaN(parsedPort) ? (defaultPorts[conn.type] ?? 5432) : parsedPort | |
| } | |
| function parseStoredSqlHost(hostValue: string | undefined, type: ConnectionType) { | |
| let host = 'localhost' | |
| let port = defaultPorts[type] ?? 5432 | |
| if (!hostValue || mongoTypes.has(type)) { | |
| return { host, port } | |
| } | |
| if (hostValue.startsWith('[')) { | |
| const closingBracketIndex = hostValue.indexOf(']') | |
| if (closingBracketIndex !== -1) { | |
| const bracketedHost = hostValue.slice(1, closingBracketIndex) | |
| const remainder = hostValue.slice(closingBracketIndex + 1) | |
| host = bracketedHost || 'localhost' | |
| if (remainder.startsWith(':')) { | |
| const parsedPort = parseInt(remainder.slice(1), 10) | |
| port = isNaN(parsedPort) ? (defaultPorts[type] ?? 5432) : parsedPort | |
| } | |
| return { host, port } | |
| } | |
| } | |
| const firstColonIndex = hostValue.indexOf(':') | |
| const lastColonIndex = hostValue.lastIndexOf(':') | |
| if (firstColonIndex !== -1 && firstColonIndex === lastColonIndex) { | |
| host = hostValue.slice(0, lastColonIndex) || 'localhost' | |
| const parsedPort = parseInt(hostValue.slice(lastColonIndex + 1), 10) | |
| port = isNaN(parsedPort) ? (defaultPorts[type] ?? 5432) : parsedPort | |
| return { host, port } | |
| } | |
| return { | |
| host: hostValue || 'localhost', | |
| port | |
| } | |
| } | |
| function openEdit(conn: DataConnection) { | |
| editingConnection.value = conn | |
| originalType.value = conn.type | |
| sqlConnectionChanged.value = false | |
| // Parse host/port back from stored host string for SQL types (e.g. "localhost:5432") | |
| const { host, port } = parseStoredSqlHost(conn.host, conn.type) |
| return `jdbc:mysql://${host}:${port}/${database}${sslEnabled ? '?useSSL=true' : ''}` | ||
| case ConnectionType.AZURE_SQL: | ||
| return `jdbc:sqlserver://${host}:${port};databaseName=${database};encrypt=true` |
There was a problem hiding this comment.
The SSL checkbox is shown for all SQL types, but for AZURE_SQL the built connection string always includes encrypt=true regardless of sslEnabled (and the backend connector also hard-enforces encryption). This makes the toggle misleading. Consider hiding/disable the SSL toggle for Azure SQL (or make it reflect something meaningful).
| val (withId, withoutId) = rows.partition { it.containsKey("_id") } | ||
| if (withId.isNotEmpty()) { | ||
| val upserts = withId.map { row -> | ||
| val doc = Document(row) | ||
| val id = doc["_id"] | ||
| val filter = Document("_id", id) | ||
| ReplaceOneModel(filter, doc, ReplaceOptions().upsert(true)) |
There was a problem hiding this comment.
writeData() treats any row that contains _id as upsertable, even when _id is present but null. That can collapse multiple documents into a single upsert on {_id: null} and lose data. Consider requiring a non-null _id value (e.g., row["_id"] != null) before sending the row down the upsert path.
| generatorType: BIRTH_DATE | ||
| generatorParams: null | ||
|
|
||
| # Replace SSN with a NULL (omit from masked output) |
There was a problem hiding this comment.
Doc comment is misleading: the NULL generator sets the field value to null (stored as BSON null / SQL NULL), but does not omit/remove the field from the output document. Consider rewording to avoid implying the key disappears from the masked output.
| # Replace SSN with a NULL (omit from masked output) | |
| # Replace SSN with a NULL value (field remains present in masked output) |
| @Test | ||
| fun `createConnection throws for null connection string`() { | ||
| val request = makeRequest(connectionString = null) | ||
| assertThrows<IllegalArgumentException> { service.createConnection(10L, request) } | ||
| } | ||
|
|
||
| @Test | ||
| fun `createConnection throws for blank connection string`() { | ||
| val request = makeRequest(connectionString = " ") | ||
| assertThrows<IllegalArgumentException> { service.createConnection(10L, request) } | ||
| } |
There was a problem hiding this comment.
The service now derives and stores host via extractHost(...), but the new/updated tests don’t assert that host is correctly extracted (e.g., JDBC host:port, Azure SQL semicolon params, Mongo URIs with credentials stripped). Adding assertions around the saved DataConnection.host would prevent regressions in this parsing logic.
|
@copilot apply changes based on the comments in this thread |
Backend: - MongoDBConnector.writeData: only upsert rows where _id != null (not just containsKey) preventing multiple null-_id rows collapsing onto a single upsert target - MongoDBConnectorTest: capture bulkWrite argument and assert each ReplaceOneModel has upsert=true and a non-null _id filter value - MongoDBMaskingPipelineTest: add disabledWithoutDocker=true so the test is skipped gracefully when Docker is unavailable - DataConnectionServiceTest: add 3 new host-extraction assertions covering plain JDBC URL, Azure SQL semicolon params, and MongoDB URI credential stripping Frontend: - ConnectionsView.vue: fix PostgreSQL SSL param from ?ssl=require to ?sslmode=require - ConnectionsView.vue: add parseStoredSqlHost() for IPv6-safe host:port parsing in openEdit (handles [::1]:5432 and plain host:port) - ConnectionsView.vue: hide the SSL Enable checkbox for Azure SQL (encrypt is always on for that type; toggle was misleading) - ConnectionsView.vue: trim whitespace before URI required-check in validateForm Docs: - mongodb-masking-example.yml: fix NULL generator comment from 'omit from masked output' to 'field remains present in masked output' Agent-Logs-Url: https://github.com/MaximumTrainer/OpenDataMask/sessions/bef019f7-a5a0-41d4-bdd5-7679e57cd19c Co-authored-by: MaximumTrainer <1376575+MaximumTrainer@users.noreply.github.com>
Applied all 9 changes from the review thread in commit Backend:
Frontend (
Docs:
|
MongoDBConnector.kt— skip null_idvalues in upsert partitionMongoDBConnectorTest.kt— assert ReplaceOneModel has upsert=true and correct _id filterMongoDBMaskingPipelineTest.kt— adddisabledWithoutDocker = trueConnectionsView.vue— trim whitespace in URI validationConnectionsView.vue— fix PostgreSQL SSL param (sslmode=require)ConnectionsView.vue— safe IPv6-aware host:port parsing viaparseStoredSqlHost()ConnectionsView.vue— hide SSL toggle for Azure SQLdocs/mongodb-masking-example.yml— fix NULL generator doc commentDataConnectionServiceTest.kt— add host extraction assertions (JDBC, Azure SQL, MongoDB)