Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ class AzureSQLConnector(
}
}

override fun fetchData(tableName: String, limit: Int?, whereClause: String?): List<Map<String, Any?>> {
override fun fetchData(tableName: String, limit: Int?, whereClause: String?, selectedAttributes: List<String>?): List<Map<String, Any?>> {
// Use square-bracket quoting for SQL Server identifier safety; strip brackets from input
val sanitizedTable = tableName.replace("[", "").replace("]", "")
val selectPart = if (!selectedAttributes.isNullOrEmpty()) {
selectedAttributes.joinToString(", ") { col ->
"[${col.replace("[", "").replace("]", "")}]"
}
} else {
"*"
}
val wherePart = if (!whereClause.isNullOrBlank()) " WHERE $whereClause" else ""
val query = if (limit != null) {
"SELECT TOP $limit * FROM [$sanitizedTable]$wherePart"
"SELECT TOP $limit $selectPart FROM [$sanitizedTable]$wherePart"
} else {
"SELECT * FROM [$sanitizedTable]$wherePart"
"SELECT $selectPart FROM [$sanitizedTable]$wherePart"
}
return getConnection().use { conn ->
conn.prepareStatement(query).use { stmt ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ class FileConnector(
}
}

override fun fetchData(tableName: String, limit: Int?, whereClause: String?): List<Map<String, Any?>> {
override fun fetchData(tableName: String, limit: Int?, whereClause: String?, selectedAttributes: List<String>?): List<Map<String, Any?>> {
val rows = when {
contentType.contains("csv", ignoreCase = true) || filename.endsWith(".csv") -> parseCsv()
contentType.contains("json", ignoreCase = true) || filename.endsWith(".json") -> parseJson()
else -> emptyList()
}
return if (limit != null) rows.take(limit) else rows
val limited = if (limit != null) rows.take(limit) else rows
return if (!selectedAttributes.isNullOrEmpty()) {
val normalizedSelected = selectedAttributes.map { it.lowercase() }.toSet()
limited.map { row ->
row.filterKeys { it.lowercase() in normalizedSelected }
}
} else {
limited
}
}

override fun createTable(tableName: String, columns: List<ColumnInfo>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,22 @@ open class MongoDBConnector(
}
}

override fun fetchData(tableName: String, limit: Int?, whereClause: String?): List<Map<String, Any?>> {
override fun fetchData(tableName: String, limit: Int?, whereClause: String?, selectedAttributes: List<String>?): List<Map<String, Any?>> {
// whereClause is expected to be a MongoDB query filter as JSON, e.g. {"age": {"$gt": 18}}
return createMongoClient().use { client ->
val collection = client.getDatabase(getDatabaseName()).getCollection(tableName)
val filter = if (!whereClause.isNullOrBlank()) Document.parse(whereClause) else Document()
val cursor = if (limit != null) collection.find(filter).limit(limit) else collection.find(filter)
cursor.map { doc ->
var query = if (limit != null) collection.find(filter).limit(limit) else collection.find(filter)
if (!selectedAttributes.isNullOrEmpty()) {
val projection = Document().apply {
selectedAttributes.forEach { field -> put(field, 1) }
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MongoDB projections that include specific fields will still return _id by default. With the current projection document, fetchData() will likely include _id even when it wasn't requested, which contradicts the "only requested columns" behavior used elsewhere. Consider explicitly excluding _id (set _id: 0) unless _id is present in selectedAttributes.

Suggested change
selectedAttributes.forEach { field -> put(field, 1) }
selectedAttributes.forEach { field -> put(field, 1) }
if (!selectedAttributes.contains("_id")) {
put("_id", 0)
}

Copilot uses AI. Check for mistakes.
if (!selectedAttributes.contains("_id")) {
put("_id", 0)
}
}
query = query.projection(projection)
}
query.map { doc ->
doc.entries.associate { it.key to it.value }
}.toList()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ class MySQLConnector(
}
}

override fun fetchData(tableName: String, limit: Int?, whereClause: String?): List<Map<String, Any?>> {
override fun fetchData(tableName: String, limit: Int?, whereClause: String?, selectedAttributes: List<String>?): List<Map<String, Any?>> {
getConnection().use { conn ->
val selectPart = if (!selectedAttributes.isNullOrEmpty()) {
selectedAttributes.joinToString(", ") { "`${it.replace("`", "``")}`" }
} else {
"*"
}
val sql = buildString {
append("SELECT * FROM `$tableName`")
append("SELECT $selectPart FROM `$tableName`")
if (whereClause != null) append(" WHERE $whereClause")
Comment on lines +53 to 60
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySQL column identifiers in selectedAttributes are interpolated as `$it` without escaping/sanitizing. This can break queries for columns containing backticks and expands the injection surface compared to other connectors that strip quote characters. Sanitize/escape backticks in column names (e.g., remove or double them) before building selectPart.

Copilot uses AI. Check for mistakes.
if (limit != null) append(" LIMIT $limit")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ class PostgreSQLConnector(
}
}

override fun fetchData(tableName: String, limit: Int?, whereClause: String?): List<Map<String, Any?>> {
override fun fetchData(tableName: String, limit: Int?, whereClause: String?, selectedAttributes: List<String>?): List<Map<String, Any?>> {
// Use double-quoting for PostgreSQL identifier safety
val quotedTable = "\"${tableName.replace("\"", "")}\""
val selectPart = if (!selectedAttributes.isNullOrEmpty()) {
selectedAttributes.joinToString(", ") { "\"${it.replace("\"", "")}\"" }
} else {
"*"
}
val wherePart = if (!whereClause.isNullOrBlank()) " WHERE $whereClause" else ""
val query = if (limit != null) {
"SELECT * FROM $quotedTable$wherePart LIMIT $limit"
"SELECT $selectPart FROM $quotedTable$wherePart LIMIT $limit"
} else {
"SELECT * FROM $quotedTable$wherePart"
"SELECT $selectPart FROM $quotedTable$wherePart"
}
return getConnection().use { conn ->
conn.prepareStatement(query).use { stmt ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,29 @@ class DestinationSchemaService {
sourceType: ConnectionType,
destConnector: DatabaseConnector,
destType: ConnectionType,
tableName: String
tableName: String,
selectedAttributes: List<String> = emptyList()
) {
logger.info("Mirroring schema for table: $tableName ($sourceType -> $destType)")
val sourceColumns = sourceConnector.listColumns(tableName)
val destColumns = sourceColumns.map { col ->
val filteredColumns = if (selectedAttributes.isEmpty()) {
sourceColumns
} else {
val sourceColumnNames = sourceColumns.map { it.name }
val sourceColumnNamesNormalized = sourceColumnNames.map { it.lowercase() }.toSet()
val normalizedSelected = selectedAttributes.map { it.lowercase() }.toSet()
val matched = sourceColumns.filter { it.name.lowercase() in normalizedSelected }
if (matched.isEmpty()) {
val unknownAttributes = selectedAttributes.filter { it.lowercase() !in sourceColumnNamesNormalized }
throw IllegalArgumentException(
"No selected attributes matched source columns for table '$tableName'. " +
"Unknown attributes: ${if (unknownAttributes.isEmpty()) selectedAttributes else unknownAttributes}. " +
"Available columns: $sourceColumnNames"
)
}
matched
}
Comment on lines +85 to +101
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mirrorSchema() can end up with filteredColumns empty when selectedAttributes doesn't match any source column names. This will call createTable() with 0 columns (invalid SQL in some connectors, and a silent no-op in MySQL), causing confusing job failures later. Consider validating that at least one column matched and fail fast with a clear error listing the unknown attributes (or fall back to all columns).

Suggested change
val filteredColumns = if (selectedAttributes.isEmpty()) {
sourceColumns
} else {
val normalizedSelected = selectedAttributes.map { it.lowercase() }.toSet()
sourceColumns.filter { it.name.lowercase() in normalizedSelected }
}
val sourceColumnNames = sourceColumns.map { it.name }
val sourceColumnNamesNormalized = sourceColumnNames.map { it.lowercase() }.toSet()
val normalizedSelected = selectedAttributes.map { it.lowercase() }.toSet()
val filteredColumns = if (selectedAttributes.isEmpty()) {
sourceColumns
} else {
sourceColumns.filter { it.name.lowercase() in normalizedSelected }
}
if (selectedAttributes.isNotEmpty() && filteredColumns.isEmpty()) {
val unknownAttributes = selectedAttributes.filter { it.lowercase() !in sourceColumnNamesNormalized }
throw IllegalArgumentException(
"No selected attributes matched source columns for table '$tableName'. " +
"Unknown attributes: ${if (unknownAttributes.isEmpty()) selectedAttributes else unknownAttributes}. " +
"Available columns: $sourceColumnNames"
)
}

Copilot uses AI. Check for mistakes.
val destColumns = filteredColumns.map { col ->
ColumnInfo(
name = col.name,
type = mapColumnType(col.type, sourceType, destType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,13 @@ class JobService(
continue
}
addLog(job.id, "Mirroring schema for table: ${tableConfig.tableName}", LogLevel.INFO)
val selectedAttrs = tableConfig.selectedAttributes
?.split(",")?.map { it.trim() }?.filter { it.isNotBlank() } ?: emptyList()
destinationSchemaService.mirrorSchema(
sourceConnector, sourceConn.type,
destConnector, destConn.type,
tableConfig.tableName
tableConfig.tableName,
selectedAttrs
)
processTable(job.id, tableConfig, sourceConnector, destConnector, subsetRows)
}
Expand Down Expand Up @@ -217,16 +220,19 @@ class JobService(
) {
addLog(jobId, "Processing table: ${tableConfig.tableName} (mode: ${tableConfig.mode})", LogLevel.INFO)

val selectedAttrs = tableConfig.selectedAttributes
?.split(",")?.map { it.trim() }?.filter { it.isNotBlank() }?.takeIf { it.isNotEmpty() }

when (tableConfig.mode) {
TableMode.PASSTHROUGH -> {
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt())
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt(), null, selectedAttrs)
addLog(jobId, "Fetched ${data.size} rows from ${tableConfig.tableName}", LogLevel.INFO)
val written = destConnector.writeData(tableConfig.tableName, data)
addLog(jobId, "Wrote $written rows to destination ${tableConfig.tableName}", LogLevel.INFO)
}
TableMode.MASK -> {
val generators = columnGeneratorRepository.findByTableConfigurationId(tableConfig.id)
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt())
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt(), null, selectedAttrs)
addLog(
jobId,
"Masking ${data.size} rows in ${tableConfig.tableName} with ${generators.size} generator(s)",
Expand All @@ -249,15 +255,16 @@ class JobService(
?: sourceConnector.fetchData(
tableConfig.tableName,
tableConfig.rowLimit?.toInt(),
tableConfig.whereClause
tableConfig.whereClause,
selectedAttrs
)
addLog(jobId, "Subsetting ${data.size} rows from ${tableConfig.tableName}", LogLevel.INFO)
val written = destConnector.writeData(tableConfig.tableName, data)
addLog(jobId, "Wrote $written rows to destination ${tableConfig.tableName}", LogLevel.INFO)
}
TableMode.UPSERT -> {
val generators = columnGeneratorRepository.findByTableConfigurationId(tableConfig.id)
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt(), tableConfig.whereClause)
val data = sourceConnector.fetchData(tableConfig.tableName, tableConfig.rowLimit?.toInt(), tableConfig.whereClause, selectedAttrs)
addLog(jobId, "Upserting ${data.size} rows in ${tableConfig.tableName} with ${generators.size} generator(s)", LogLevel.INFO)
val maskedData = if (generators.isNotEmpty()) data.map { row -> generatorService.applyGenerators(row, generators) } else data
val written = destConnector.writeData(tableConfig.tableName, maskedData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class TableConfigurationService(
rowLimit = request.rowLimit,
whereClause = request.whereClause
)
config.selectedAttributes = request.selectedAttributes
?.filter { it.isNotBlank() }
?.takeIf { it.isNotEmpty() }
?.joinToString(",")
return tableConfigurationRepository.save(config).toResponse()
Comment on lines 26 to 33
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When request.selectedAttributes is an empty list, joinToString(",") produces an empty string, so the entity stores "" rather than null. This makes responses/export potentially return an empty list instead of null and complicates the "empty/omitted = all columns" contract. Consider normalizing to null when the filtered+trimmed list is empty (e.g., takeIf { it.isNotEmpty() }).

Copilot uses AI. Check for mistakes.
}

Expand All @@ -51,6 +55,10 @@ class TableConfigurationService(
config.mode = request.mode
config.rowLimit = request.rowLimit
config.whereClause = request.whereClause
config.selectedAttributes = request.selectedAttributes
?.filter { it.isNotBlank() }
?.takeIf { it.isNotEmpty() }
?.joinToString(",")
return tableConfigurationRepository.save(config).toResponse()
}

Expand Down Expand Up @@ -132,6 +140,7 @@ class TableConfigurationService(
mode = mode,
rowLimit = rowLimit,
whereClause = whereClause,
selectedAttributes = selectedAttributes?.split(",")?.map { it.trim() }?.filter { it.isNotBlank() },
createdAt = createdAt
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class WorkspaceExportService(
mode = table.mode,
rowLimit = table.rowLimit,
whereClause = table.whereClause,
selectedAttributes = table.selectedAttributes?.split(",")?.map { it.trim() }?.filter { it.isNotBlank() },
columnGenerators = generators.map { gen ->
ColumnGeneratorExportDto(
columnName = gen.columnName,
Expand Down Expand Up @@ -63,6 +64,10 @@ class WorkspaceExportService(
table.mode = tableDto.mode
table.rowLimit = tableDto.rowLimit
table.whereClause = tableDto.whereClause
table.selectedAttributes = tableDto.selectedAttributes
?.filter { it.isNotBlank() }
?.takeIf { it.isNotEmpty() }
?.joinToString(",")
val savedTable = tableConfigurationRepository.save(table)

val existingGenerators = columnGeneratorRepository.findByTableConfigurationId(savedTable.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class TableConfiguration(
@Column(length = 4096)
var whereClause: String? = null,

// Comma-separated list of column names to include in the extraction (null/empty = all columns).
@Column(length = 2048)
var selectedAttributes: String? = null,

@Column(nullable = false)
var createdAt: LocalDateTime = LocalDateTime.now()
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data class TableConfigurationRequest(
val mode: TableMode,

val rowLimit: Long? = null,
val whereClause: String? = null
val whereClause: String? = null,
val selectedAttributes: List<String>? = null
)

data class TableConfigurationResponse(
Expand All @@ -28,6 +29,7 @@ data class TableConfigurationResponse(
val mode: TableMode,
val rowLimit: Long?,
val whereClause: String?,
val selectedAttributes: List<String>?,
val createdAt: LocalDateTime
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ data class TableConfigExportDto(
val mode: TableMode,
val rowLimit: Long?,
val whereClause: String?,
val selectedAttributes: List<String>? = null,
val columnGenerators: List<ColumnGeneratorExportDto> = emptyList()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ interface DatabaseConnector {
fun testConnection(): Boolean
fun listTables(): List<String>
fun listColumns(tableName: String): List<ColumnInfo>
fun fetchData(tableName: String, limit: Int? = null, whereClause: String? = null): List<Map<String, Any?>>
fun fetchData(tableName: String, limit: Int? = null, whereClause: String? = null, selectedAttributes: List<String>? = null): List<Map<String, Any?>>
fun createTable(tableName: String, columns: List<ColumnInfo>)
fun truncateTable(tableName: String)
fun writeData(tableName: String, rows: List<Map<String, Any?>>): Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TableConfigurationControllerTest {
private fun makeConfigResponse(id: Long = 1L, workspaceId: Long = 1L) = TableConfigurationResponse(
id = id, workspaceId = workspaceId, tableName = "users", schemaName = null,
mode = TableMode.PASSTHROUGH, rowLimit = null, whereClause = null,
createdAt = LocalDateTime.now()
selectedAttributes = null, createdAt = LocalDateTime.now()
)

private fun makeGeneratorResponse(id: Long = 1L, tableConfigId: Long = 1L) = ColumnGeneratorResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,37 @@ class PostgreSQLConnectorTest {
val rows = connector.fetchData("test_users")
assertTrue(rows.isEmpty())
}

@Test
fun `fetchData with selectedAttributes returns only requested columns`() {
val rows = connector.fetchData("test_users", selectedAttributes = listOf("id", "name"))
assertEquals(2, rows.size)
// Each row should only contain id and name keys (case may vary in H2)
val firstRow = rows[0]
val keys = firstRow.keys.map { it.lowercase() }.toSet()
assertTrue(keys.contains("id") || keys.contains("ID"))
assertTrue(keys.contains("name") || keys.contains("NAME"))
// email column should not be present
assertFalse(keys.contains("email"))
}

@Test
fun `fetchData with empty selectedAttributes returns all columns`() {
val rows = connector.fetchData("test_users", selectedAttributes = emptyList())
assertEquals(2, rows.size)
val keys = rows[0].keys.map { it.lowercase() }.toSet()
assertTrue(keys.contains("id") || keys.contains("ID"))
assertTrue(keys.contains("name") || keys.contains("NAME"))
assertTrue(keys.contains("email") || keys.contains("EMAIL"))
}

@Test
fun `fetchData with selectedAttributes and whereClause filters both rows and columns`() {
val rows = connector.fetchData("test_users", whereClause = "name = 'Alice'", selectedAttributes = listOf("id"))
assertEquals(1, rows.size)
val keys = rows[0].keys.map { it.lowercase() }.toSet()
// Only id should be returned
assertTrue(keys.size == 1)
assertTrue(keys.contains("id") || keys.contains("ID"))
}
}
Loading
Loading