Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ The REST API is available at `http://localhost:8080/api`. Key endpoints:
| GET | `/api/workspaces/{id}/tables/{table}/columns/{col}/comments` | List column comments |
| POST | `/api/workspaces/{id}/tables/{table}/columns/{col}/comments` | Add comment |

### Connection Pairs
| Method | Path | Description |
|--------|------|-------------|
| POST | `/api/workspaces/{id}/connection-pairs` | Create a connection pair |
| GET | `/api/workspaces/{id}/connection-pairs` | List active connection pairs |
| GET | `/api/workspaces/{id}/connection-pairs/{pid}` | Get a connection pair |
| PUT | `/api/workspaces/{id}/connection-pairs/{pid}` | Update a connection pair |
| DELETE | `/api/workspaces/{id}/connection-pairs/{pid}` | Soft-delete a connection pair |

### Jobs
| Method | Path | Description |
|--------|------|-------------|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.opendatamask.adapter.input.rest

import com.opendatamask.adapter.output.persistence.UserRepository
import com.opendatamask.application.service.ConnectionPairService
import com.opendatamask.application.service.PermissionService
import com.opendatamask.domain.model.WorkspacePermission
import com.opendatamask.domain.port.input.dto.ConnectionPairRequest
import com.opendatamask.domain.port.input.dto.ConnectionPairResponse
import jakarta.validation.Valid
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.security.core.annotation.AuthenticationPrincipal
import org.springframework.security.core.userdetails.UserDetails
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/api/workspaces/{workspaceId}/connection-pairs")
class ConnectionPairController(
private val connectionPairService: ConnectionPairService,
private val permissionService: PermissionService,
private val userRepository: UserRepository
) {

@PostMapping
fun createConnectionPair(
@PathVariable workspaceId: Long,
@Valid @RequestBody request: ConnectionPairRequest,
@AuthenticationPrincipal userDetails: UserDetails
): ResponseEntity<ConnectionPairResponse> {
permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS)
return ResponseEntity.status(HttpStatus.CREATED)
.body(connectionPairService.createConnectionPair(workspaceId, request))
}

@GetMapping("/{pairId}")
fun getConnectionPair(
@PathVariable workspaceId: Long,
@PathVariable pairId: Long
): ResponseEntity<ConnectionPairResponse> {
return ResponseEntity.ok(connectionPairService.getConnectionPair(workspaceId, pairId))
}

@GetMapping
fun listConnectionPairs(@PathVariable workspaceId: Long): ResponseEntity<List<ConnectionPairResponse>> {
return ResponseEntity.ok(connectionPairService.listConnectionPairs(workspaceId))
}

@PutMapping("/{pairId}")
fun updateConnectionPair(
@PathVariable workspaceId: Long,
@PathVariable pairId: Long,
@Valid @RequestBody request: ConnectionPairRequest,
@AuthenticationPrincipal userDetails: UserDetails
): ResponseEntity<ConnectionPairResponse> {
permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS)
return ResponseEntity.ok(connectionPairService.updateConnectionPair(workspaceId, pairId, request))
}

@DeleteMapping("/{pairId}")
fun deleteConnectionPair(
@PathVariable workspaceId: Long,
@PathVariable pairId: Long,
@AuthenticationPrincipal userDetails: UserDetails
): ResponseEntity<Void> {
permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS)
connectionPairService.deleteConnectionPair(workspaceId, pairId)
return ResponseEntity.noContent().build()
}

private fun getUserId(userDetails: UserDetails): Long =
userRepository.findByUsername(userDetails.username)
.orElseThrow { NoSuchElementException("User not found") }.id
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.opendatamask.adapter.input.rest

import com.opendatamask.domain.port.input.dto.JobLogResponse
import com.opendatamask.domain.port.input.dto.CreateJobRequest
import com.opendatamask.domain.port.input.dto.JobResponse
import com.opendatamask.domain.model.WorkspacePermission
import com.opendatamask.adapter.output.persistence.UserRepository
Expand All @@ -23,11 +24,12 @@ class JobController(
@PostMapping
fun createAndRunJob(
@PathVariable workspaceId: Long,
@RequestBody(required = false) request: CreateJobRequest?,
@AuthenticationPrincipal userDetails: UserDetails
): ResponseEntity<JobResponse> {
val userId = getUserId(userDetails)
permissionService.requirePermission(userId, workspaceId, WorkspacePermission.RUN_JOBS)
val job = jobService.createJob(workspaceId, userId)
val job = jobService.createJob(workspaceId, userId, request?.connectionPairId)
jobService.runJob(job.id)
return ResponseEntity.status(HttpStatus.CREATED).body(job)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.opendatamask.adapter.output.persistence

import com.opendatamask.domain.model.ConnectionPair
import com.opendatamask.domain.port.output.ConnectionPairPort
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.jpa.repository.Query
import org.springframework.stereotype.Repository
import java.util.Optional

@Repository
interface ConnectionPairRepository : JpaRepository<ConnectionPair, Long>, ConnectionPairPort {
override fun findById(id: Long): Optional<ConnectionPair>
override fun findByWorkspaceId(workspaceId: Long): List<ConnectionPair>

@Query("SELECT c FROM ConnectionPair c WHERE c.workspaceId = :workspaceId AND c.deletedAt IS NULL")
override fun findActiveByWorkspaceId(workspaceId: Long): List<ConnectionPair>

override fun save(connectionPair: ConnectionPair): ConnectionPair
override fun deleteById(id: Long)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.opendatamask.application.service

import com.opendatamask.domain.port.input.ConnectionPairUseCase
import com.opendatamask.domain.port.output.ConnectionPairPort
import com.opendatamask.domain.port.output.DataConnectionPort
import com.opendatamask.domain.port.output.WorkspacePort
import com.opendatamask.domain.model.ConnectionPair
import com.opendatamask.domain.port.input.dto.ConnectionPairRequest
import com.opendatamask.domain.port.input.dto.ConnectionPairResponse
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDateTime

@Service
class ConnectionPairService(
private val connectionPairRepository: ConnectionPairPort,
private val dataConnectionRepository: DataConnectionPort,
private val workspaceRepository: WorkspacePort
) : ConnectionPairUseCase {

@Transactional
override fun createConnectionPair(workspaceId: Long, request: ConnectionPairRequest): ConnectionPairResponse {
workspaceRepository.findById(workspaceId)
.orElseThrow { NoSuchElementException("Workspace not found: $workspaceId") }
validateConnectionsBelongToWorkspace(workspaceId, request.sourceConnectionId, request.destinationConnectionId)
val pair = ConnectionPair(
workspaceId = workspaceId,
name = request.name,
description = request.description,
sourceConnectionId = request.sourceConnectionId,
destinationConnectionId = request.destinationConnectionId
)
return connectionPairRepository.save(pair).toResponse()
}

@Transactional(readOnly = true)
override fun getConnectionPair(workspaceId: Long, pairId: Long): ConnectionPairResponse {
return findActivePair(workspaceId, pairId).toResponse()
}

@Transactional(readOnly = true)
override fun listConnectionPairs(workspaceId: Long): List<ConnectionPairResponse> {
return connectionPairRepository.findActiveByWorkspaceId(workspaceId).map { it.toResponse() }
}

@Transactional
override fun updateConnectionPair(
workspaceId: Long,
pairId: Long,
request: ConnectionPairRequest
): ConnectionPairResponse {
val pair = findActivePair(workspaceId, pairId)
validateConnectionsBelongToWorkspace(workspaceId, request.sourceConnectionId, request.destinationConnectionId)
pair.name = request.name
pair.description = request.description
pair.sourceConnectionId = request.sourceConnectionId
pair.destinationConnectionId = request.destinationConnectionId
return connectionPairRepository.save(pair).toResponse()
}

@Transactional
override fun deleteConnectionPair(workspaceId: Long, pairId: Long) {
val pair = findActivePair(workspaceId, pairId)
pair.deletedAt = LocalDateTime.now()
connectionPairRepository.save(pair)
}

private fun findActivePair(workspaceId: Long, pairId: Long): ConnectionPair {
val pair = connectionPairRepository.findById(pairId)
.orElseThrow { NoSuchElementException("Connection pair not found: $pairId") }
if (pair.workspaceId != workspaceId) {
throw NoSuchElementException("Connection pair $pairId does not belong to workspace $workspaceId")
}
if (pair.deletedAt != null) {
throw NoSuchElementException("Connection pair $pairId has been deleted")
}
return pair
}

private fun validateConnectionsBelongToWorkspace(workspaceId: Long, sourceId: Long, destinationId: Long) {
if (sourceId == destinationId) {
throw IllegalArgumentException("Source and destination connections must be distinct")
}
val source = dataConnectionRepository.findById(sourceId)
.orElseThrow { NoSuchElementException("Source connection not found: $sourceId") }
if (source.workspaceId != workspaceId) {
throw IllegalArgumentException("Source connection $sourceId does not belong to workspace $workspaceId")
}
if (!source.isSource) {
throw IllegalArgumentException("Connection $sourceId is not configured as a source connection")
}
val destination = dataConnectionRepository.findById(destinationId)
.orElseThrow { NoSuchElementException("Destination connection not found: $destinationId") }
if (destination.workspaceId != workspaceId) {
throw IllegalArgumentException(
"Destination connection $destinationId does not belong to workspace $workspaceId"
)
}
Comment on lines +80 to +98
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateConnectionsBelongToWorkspace ensures both connections are in the workspace, but it does not enforce the invariant that a ConnectionPair must bind one source and one destination connection. Add checks that source.isSource == true, destination.isDestination == true, and (optionally) sourceId != destinationId to avoid creating invalid pairs that will later fail or behave unexpectedly at job run time.

Copilot uses AI. Check for mistakes.
if (!destination.isDestination) {
throw IllegalArgumentException("Connection $destinationId is not configured as a destination connection")
}
}

private fun ConnectionPair.toResponse() = ConnectionPairResponse(
id = id,
workspaceId = workspaceId,
name = name,
description = description,
sourceConnectionId = sourceConnectionId,
destinationConnectionId = destinationConnectionId,
createdAt = createdAt,
updatedAt = updatedAt
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.opendatamask.domain.port.output.JobPort
import com.opendatamask.domain.port.output.JobLogPort
import com.opendatamask.domain.port.output.WorkspacePort
import com.opendatamask.domain.port.output.DataConnectionPort
import com.opendatamask.domain.port.output.ConnectionPairPort
import com.opendatamask.domain.port.output.TableConfigurationPort
import com.opendatamask.domain.port.output.ColumnGeneratorPort
import com.opendatamask.domain.port.output.SubsetTableConfigPort
Expand All @@ -27,6 +28,7 @@ class JobService(
private val jobLogRepository: JobLogPort,
private val workspaceRepository: WorkspacePort,
private val dataConnectionRepository: DataConnectionPort,
private val connectionPairRepository: ConnectionPairPort,
private val tableConfigurationRepository: TableConfigurationPort,
private val columnGeneratorRepository: ColumnGeneratorPort,
private val encryptionPort: EncryptionPort,
Expand All @@ -49,14 +51,15 @@ class JobService(
private val logger = LoggerFactory.getLogger(JobService::class.java)

@Transactional
override fun createJob(workspaceId: Long, createdBy: Long): JobResponse {
override fun createJob(workspaceId: Long, createdBy: Long, connectionPairId: Long?): JobResponse {
workspaceRepository.findById(workspaceId)
.orElseThrow { NoSuchElementException("Workspace not found: $workspaceId") }

val job = Job(
workspaceId = workspaceId,
status = JobStatus.PENDING,
createdBy = createdBy
createdBy = createdBy,
connectionPairId = connectionPairId
)
Comment on lines 53 to 63
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createJob now requires a third argument on the concrete JobService, which breaks existing Kotlin call sites that invoke jobService.createJob(workspaceId, userId) (e.g., tests and controllers that mock JobService). To preserve backward compatibility, add a default value in the override signature (e.g., connectionPairId: Long? = null) or provide an overload delegating to the 3-arg method.

Copilot uses AI. Check for mistakes.
val saved = jobRepository.save(job)
return saved.toResponse()
Expand Down Expand Up @@ -98,8 +101,8 @@ class JobService(
}

@Transactional
override fun createAndRunJob(workspaceId: Long, createdBy: Long): JobResponse {
val response = createJob(workspaceId, createdBy)
override fun createAndRunJob(workspaceId: Long, createdBy: Long, connectionPairId: Long?): JobResponse {
val response = createJob(workspaceId, createdBy, connectionPairId)
runJob(response.id)
return response
}
Comment on lines 103 to 108
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createAndRunJob has the same issue as createJob: the concrete JobService override requires the new connectionPairId parameter, which will break existing Kotlin call sites that pass only (workspaceId, createdBy). Consider adding a default value in the override signature (or an overload) to keep existing callers compiling.

Copilot uses AI. Check for mistakes.
Expand All @@ -116,20 +119,7 @@ class JobService(
throw IllegalStateException("Job blocked: unresolved schema changes require attention before running")
}

val sourceConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId)
.filter { it.isSource }
val destConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId)
.filter { it.isDestination }

if (sourceConnections.isEmpty()) {
throw IllegalStateException("No source connection configured for workspace ${job.workspaceId}")
}
if (destConnections.isEmpty()) {
throw IllegalStateException("No destination connection configured for workspace ${job.workspaceId}")
}

val sourceConn = sourceConnections.first()
val destConn = destConnections.first()
val (sourceConn, destConn) = resolveConnections(job)

addLog(job.id, "Connecting to source: ${sourceConn.name}", LogLevel.INFO)
val sourceConnector = connectorFactory.createConnector(
Expand Down Expand Up @@ -299,6 +289,68 @@ class JobService(
return job
}

// Returns (sourceConnection, destinationConnection) for a job. When the job has a
// connectionPairId the pair's explicit connections are used; otherwise the workspace's
// first source/destination connections are used (backward-compatible behaviour).
private fun resolveConnections(job: Job): Pair<DataConnection, DataConnection> {
val pairId = job.connectionPairId
if (pairId != null) {
val pair = connectionPairRepository.findById(pairId)
.orElseThrow { NoSuchElementException("Connection pair not found: $pairId") }
if (pair.deletedAt != null) {
throw IllegalStateException("Connection pair $pairId has been deleted")
}
if (pair.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Connection pair $pairId does not belong to workspace ${job.workspaceId}"
)
}
if (pair.sourceConnectionId == pair.destinationConnectionId) {
throw IllegalStateException(
"Connection pair $pairId is invalid: source and destination connections must be distinct"
)
}
val source = dataConnectionRepository.findById(pair.sourceConnectionId)
.orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") }
if (source.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Source connection ${pair.sourceConnectionId} does not belong to workspace ${job.workspaceId}"
)
}
if (!source.isSource) {
throw IllegalStateException(
"Connection pair $pairId is invalid: connection ${pair.sourceConnectionId} is not a source connection"
)
}
val destination = dataConnectionRepository.findById(pair.destinationConnectionId)
.orElseThrow {
NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}")
}
Comment on lines +302 to +328
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveConnections loads a ConnectionPair by ID but does not verify that the pair belongs to job.workspaceId. As a result, a user who can run jobs in workspace A could reference a pair from workspace B (by guessing IDs) and cause the job to use other workspace connections. Add an explicit pair.workspaceId == job.workspaceId check (and ideally also validate the loaded connections' workspaceId) before returning them.

Suggested change
}
val source = dataConnectionRepository.findById(pair.sourceConnectionId)
.orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") }
val destination = dataConnectionRepository.findById(pair.destinationConnectionId)
.orElseThrow {
NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}")
}
}
if (pair.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Connection pair $pairId does not belong to workspace ${job.workspaceId}"
)
}
val source = dataConnectionRepository.findById(pair.sourceConnectionId)
.orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") }
if (source.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Source connection ${pair.sourceConnectionId} does not belong to workspace ${job.workspaceId}"
)
}
val destination = dataConnectionRepository.findById(pair.destinationConnectionId)
.orElseThrow {
NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}")
}
if (destination.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Destination connection ${pair.destinationConnectionId} does not belong to workspace ${job.workspaceId}"
)
}

Copilot uses AI. Check for mistakes.
Comment on lines +302 to +328
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConnectionPair path does not validate that the selected connections actually match the expected roles (source isSource == true, destination isDestination == true) or that the IDs are distinct. Given the domain definition (“exactly one source and one destination”), add validation here (or in ConnectionPairService/persistence) to prevent misconfiguration leading to jobs reading from a destination or writing into a source.

Suggested change
}
val source = dataConnectionRepository.findById(pair.sourceConnectionId)
.orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") }
val destination = dataConnectionRepository.findById(pair.destinationConnectionId)
.orElseThrow {
NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}")
}
}
if (pair.sourceConnectionId == pair.destinationConnectionId) {
throw IllegalStateException(
"Connection pair $pairId is invalid: source and destination connections must be distinct"
)
}
val source = dataConnectionRepository.findById(pair.sourceConnectionId)
.orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") }
if (!source.isSource) {
throw IllegalStateException(
"Connection pair $pairId is invalid: connection ${pair.sourceConnectionId} is not a source connection"
)
}
val destination = dataConnectionRepository.findById(pair.destinationConnectionId)
.orElseThrow {
NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}")
}
if (!destination.isDestination) {
throw IllegalStateException(
"Connection pair $pairId is invalid: connection ${pair.destinationConnectionId} is not a destination connection"
)
}

Copilot uses AI. Check for mistakes.
if (destination.workspaceId != job.workspaceId) {
throw IllegalStateException(
"Destination connection ${pair.destinationConnectionId} does not belong to workspace ${job.workspaceId}"
)
}
if (!destination.isDestination) {
throw IllegalStateException(
"Connection pair $pairId is invalid: connection ${pair.destinationConnectionId} is not a destination connection"
)
}
return source to destination
}

val sourceConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId).filter { it.isSource }
val destConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId).filter { it.isDestination }

if (sourceConnections.isEmpty()) {
throw IllegalStateException("No source connection configured for workspace ${job.workspaceId}")
}
if (destConnections.isEmpty()) {
throw IllegalStateException("No destination connection configured for workspace ${job.workspaceId}")
}
return sourceConnections.first() to destConnections.first()
}

private fun Job.toResponse() = JobResponse(
id = id,
workspaceId = workspaceId,
Expand All @@ -307,7 +359,8 @@ class JobService(
completedAt = completedAt,
createdAt = createdAt,
errorMessage = errorMessage,
createdBy = createdBy
createdBy = createdBy,
connectionPairId = connectionPairId
)

private fun JobLog.toLogResponse() = JobLogResponse(
Expand Down
Loading
Loading