-
Notifications
You must be signed in to change notification settings - Fork 0
feat: MongoDB upsert sink, aligned connection API, improved connection UI, and end-to-end masking pipeline tests #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
678b7a8
ff34250
bc9c141
5d35a34
0d3a26f
5c02d65
c11d327
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import com.opendatamask.domain.port.output.DataConnectionPort | |
| import com.opendatamask.domain.port.input.dto.ConnectionTestResult | ||
| import com.opendatamask.domain.port.input.dto.DataConnectionRequest | ||
| import com.opendatamask.domain.port.input.dto.DataConnectionResponse | ||
| import com.opendatamask.domain.model.ConnectionType | ||
| import com.opendatamask.domain.model.DataConnection | ||
| import org.springframework.stereotype.Service | ||
| import org.springframework.transaction.annotation.Transactional | ||
|
|
@@ -21,11 +22,16 @@ class DataConnectionService( | |
|
|
||
| @Transactional | ||
| override fun createConnection(workspaceId: Long, request: DataConnectionRequest): DataConnectionResponse { | ||
| val connStr = request.connectionString | ||
| if (connStr.isNullOrBlank()) { | ||
| throw IllegalArgumentException("Connection string is required when creating a connection") | ||
| } | ||
| val connection = DataConnection( | ||
| workspaceId = workspaceId, | ||
| name = request.name, | ||
| type = request.type, | ||
| connectionString = encryptionPort.encrypt(request.connectionString), | ||
| connectionString = encryptionPort.encrypt(connStr), | ||
| host = extractHost(request.type, connStr), | ||
| username = request.username, | ||
| password = request.password?.let { encryptionPort.encrypt(it) }, | ||
| database = request.database, | ||
|
|
@@ -49,11 +55,23 @@ class DataConnectionService( | |
| @Transactional | ||
| override fun updateConnection(workspaceId: Long, connectionId: Long, request: DataConnectionRequest): DataConnectionResponse { | ||
| val connection = findConnection(workspaceId, connectionId) | ||
| // If the connector type is changing, a new connection string must be provided because the | ||
| // existing stored string is for the old type and would be invalid for the new one. | ||
| if (request.type != connection.type && request.connectionString.isNullOrBlank()) { | ||
| throw IllegalArgumentException( | ||
| "A new connection string is required when changing the connection type" | ||
| ) | ||
| } | ||
| connection.name = request.name | ||
| connection.type = request.type | ||
| connection.connectionString = encryptionPort.encrypt(request.connectionString) | ||
| if (!request.connectionString.isNullOrBlank()) { | ||
| connection.connectionString = encryptionPort.encrypt(request.connectionString) | ||
| connection.host = extractHost(request.type, request.connectionString) | ||
| } | ||
|
Comment on lines
57
to
+70
|
||
| connection.username = request.username | ||
| connection.password = request.password?.let { encryptionPort.encrypt(it) } | ||
| if (!request.password.isNullOrBlank()) { | ||
| connection.password = encryptionPort.encrypt(request.password) | ||
| } | ||
| connection.database = request.database | ||
| connection.isSource = request.isSource | ||
| connection.isDestination = request.isDestination | ||
|
|
@@ -99,11 +117,39 @@ class DataConnectionService( | |
| return connection | ||
| } | ||
|
|
||
| // Extracts the host (and port) portion from a connection string for display purposes. | ||
| private fun extractHost(type: ConnectionType, connectionString: String): String? { | ||
| return try { | ||
| when (type) { | ||
| ConnectionType.POSTGRESQL, ConnectionType.MYSQL, ConnectionType.AZURE_SQL -> { | ||
| // JDBC URL formats: | ||
| // jdbc:postgresql://host:port/db | ||
| // jdbc:mysql://host:port/db | ||
| // jdbc:sqlserver://host:port;databaseName=db;... (semicolon-delimited params) | ||
|
Comment on lines
+124
to
+128
|
||
| val afterSlashes = connectionString.substringAfter("//", "") | ||
| afterSlashes.substringBefore("/").substringBefore(";").substringBefore("?").ifBlank { null } | ||
| } | ||
| ConnectionType.MONGODB, ConnectionType.MONGODB_COSMOS -> { | ||
| // MongoDB URI: mongodb://[user:pass@]host:port[/db] | ||
| val afterSlashes = connectionString.substringAfter("//", "") | ||
| val hostPart = afterSlashes.substringBefore("/").substringBefore("?") | ||
| // Strip credentials (user:pass@) | ||
| val withoutCreds = if (hostPart.contains("@")) hostPart.substringAfter("@") else hostPart | ||
| withoutCreds.ifBlank { null } | ||
| } | ||
| else -> null | ||
| } | ||
| } catch (e: Exception) { | ||
| null | ||
| } | ||
| } | ||
|
|
||
| private fun DataConnection.toResponse() = DataConnectionResponse( | ||
| id = id, | ||
| workspaceId = workspaceId, | ||
| name = name, | ||
| type = type, | ||
| host = host, | ||
| username = username, | ||
| database = database, | ||
| isSource = isSource, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ import com.mongodb.client.FindIterable | |
| import com.mongodb.client.MongoClient | ||
| import com.mongodb.client.MongoCollection | ||
| import com.mongodb.client.MongoDatabase | ||
| import com.mongodb.client.model.ReplaceOneModel | ||
| import com.mongodb.client.model.WriteModel | ||
| import org.bson.Document | ||
| import org.junit.jupiter.api.Test | ||
| import org.junit.jupiter.api.Assertions.* | ||
|
|
@@ -98,6 +100,64 @@ class MongoDBConnectorTest { | |
| val count = connector.writeData("users", rows) | ||
| assertEquals(2, count) | ||
| verify(mockCollection).insertMany(any()) | ||
| verify(mockCollection, never()).bulkWrite(any<List<WriteModel<Document>>>()) | ||
| } | ||
|
|
||
| @Test | ||
| fun `writeData uses bulkWrite upsert when rows contain _id field`() { | ||
| val mockCollection = mock<MongoCollection<Document>>() | ||
| val mockDb = mock<MongoDatabase>() | ||
| val mockClient = mock<MongoClient>() | ||
| whenever(mockClient.getDatabase("testdb")).thenReturn(mockDb) | ||
| whenever(mockDb.getCollection("users")).thenReturn(mockCollection) | ||
|
|
||
| val connector = createConnector(mockClient) | ||
| val rows = listOf( | ||
| mapOf("_id" to "abc123", "name" to "Alice"), | ||
| mapOf("_id" to "def456", "name" to "Bob") | ||
| ) | ||
| val count = connector.writeData("users", rows) | ||
| assertEquals(2, count) | ||
|
|
||
| val captor = argumentCaptor<List<WriteModel<Document>>>() | ||
| verify(mockCollection).bulkWrite(captor.capture()) | ||
| verify(mockCollection, never()).insertMany(any()) | ||
|
|
||
| // Each ReplaceOneModel must have upsert=true and an _id filter | ||
| val models = captor.firstValue | ||
| assertEquals(2, models.size) | ||
| val expectedIds = setOf("abc123", "def456") | ||
| models.forEach { model -> | ||
| assertTrue(model is ReplaceOneModel<*>, | ||
| "Expected ReplaceOneModel but was ${model::class.simpleName}") | ||
| @Suppress("UNCHECKED_CAST") | ||
| val replaceModel = model as ReplaceOneModel<Document> | ||
| assertTrue(replaceModel.replaceOptions.isUpsert, | ||
| "ReplaceOneModel must have upsert=true") | ||
| val filter = replaceModel.filter as Document | ||
| assertTrue(filter.containsKey("_id"), "Filter must contain _id field") | ||
| assertTrue(expectedIds.contains(filter["_id"]), | ||
| "Filter _id must be one of the input row ids, got ${filter["_id"]}") | ||
| } | ||
| } | ||
|
Comment on lines
+106
to
+142
|
||
|
|
||
| @Test | ||
| fun `writeData handles mixed rows with and without _id`() { | ||
| val mockCollection = mock<MongoCollection<Document>>() | ||
| val mockDb = mock<MongoDatabase>() | ||
| val mockClient = mock<MongoClient>() | ||
| whenever(mockClient.getDatabase("testdb")).thenReturn(mockDb) | ||
| whenever(mockDb.getCollection("users")).thenReturn(mockCollection) | ||
|
|
||
| val connector = createConnector(mockClient) | ||
| val rows = listOf( | ||
| mapOf("_id" to "abc123", "name" to "Alice"), | ||
| mapOf("name" to "Bob") | ||
| ) | ||
| val count = connector.writeData("users", rows) | ||
| assertEquals(2, count) | ||
| verify(mockCollection).bulkWrite(any<List<WriteModel<Document>>>()) | ||
| verify(mockCollection).insertMany(any()) | ||
| } | ||
|
|
||
| @Test | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createConnectiononly checksconnectionStringfor null, so an empty/blank string will be accepted and persisted, despite the API conceptually requiring a usable connection string. UseisNullOrBlank()(or validation annotations/groups) so create rejects blank values with a 400.