diff --git a/README.md b/README.md index 94fb95b..1b3f42c 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,15 @@ The REST API is available at `http://localhost:8080/api`. Key endpoints: | GET | `/api/workspaces/{id}/tables/{table}/columns/{col}/comments` | List column comments | | POST | `/api/workspaces/{id}/tables/{table}/columns/{col}/comments` | Add comment | +### Connection Pairs +| Method | Path | Description | +|--------|------|-------------| +| POST | `/api/workspaces/{id}/connection-pairs` | Create a connection pair | +| GET | `/api/workspaces/{id}/connection-pairs` | List active connection pairs | +| GET | `/api/workspaces/{id}/connection-pairs/{pid}` | Get a connection pair | +| PUT | `/api/workspaces/{id}/connection-pairs/{pid}` | Update a connection pair | +| DELETE | `/api/workspaces/{id}/connection-pairs/{pid}` | Soft-delete a connection pair | + ### Jobs | Method | Path | Description | |--------|------|-------------| diff --git a/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/ConnectionPairController.kt b/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/ConnectionPairController.kt new file mode 100644 index 0000000..63546e6 --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/ConnectionPairController.kt @@ -0,0 +1,73 @@ +package com.opendatamask.adapter.input.rest + +import com.opendatamask.adapter.output.persistence.UserRepository +import com.opendatamask.application.service.ConnectionPairService +import com.opendatamask.application.service.PermissionService +import com.opendatamask.domain.model.WorkspacePermission +import com.opendatamask.domain.port.input.dto.ConnectionPairRequest +import com.opendatamask.domain.port.input.dto.ConnectionPairResponse +import jakarta.validation.Valid +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.security.core.annotation.AuthenticationPrincipal +import org.springframework.security.core.userdetails.UserDetails +import org.springframework.web.bind.annotation.* + +@RestController +@RequestMapping("/api/workspaces/{workspaceId}/connection-pairs") +class ConnectionPairController( + private val connectionPairService: ConnectionPairService, + private val permissionService: PermissionService, + private val userRepository: UserRepository +) { + + @PostMapping + fun createConnectionPair( + @PathVariable workspaceId: Long, + @Valid @RequestBody request: ConnectionPairRequest, + @AuthenticationPrincipal userDetails: UserDetails + ): ResponseEntity { + permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS) + return ResponseEntity.status(HttpStatus.CREATED) + .body(connectionPairService.createConnectionPair(workspaceId, request)) + } + + @GetMapping("/{pairId}") + fun getConnectionPair( + @PathVariable workspaceId: Long, + @PathVariable pairId: Long + ): ResponseEntity { + return ResponseEntity.ok(connectionPairService.getConnectionPair(workspaceId, pairId)) + } + + @GetMapping + fun listConnectionPairs(@PathVariable workspaceId: Long): ResponseEntity> { + return ResponseEntity.ok(connectionPairService.listConnectionPairs(workspaceId)) + } + + @PutMapping("/{pairId}") + fun updateConnectionPair( + @PathVariable workspaceId: Long, + @PathVariable pairId: Long, + @Valid @RequestBody request: ConnectionPairRequest, + @AuthenticationPrincipal userDetails: UserDetails + ): ResponseEntity { + permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS) + return ResponseEntity.ok(connectionPairService.updateConnectionPair(workspaceId, pairId, request)) + } + + @DeleteMapping("/{pairId}") + fun deleteConnectionPair( + @PathVariable workspaceId: Long, + @PathVariable pairId: Long, + @AuthenticationPrincipal userDetails: UserDetails + ): ResponseEntity { + permissionService.requirePermission(getUserId(userDetails), workspaceId, WorkspacePermission.CONFIGURE_GENERATORS) + connectionPairService.deleteConnectionPair(workspaceId, pairId) + return ResponseEntity.noContent().build() + } + + private fun getUserId(userDetails: UserDetails): Long = + userRepository.findByUsername(userDetails.username) + .orElseThrow { NoSuchElementException("User not found") }.id +} diff --git a/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/JobController.kt b/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/JobController.kt index a7d0477..aa784b6 100644 --- a/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/JobController.kt +++ b/backend/src/main/kotlin/com/opendatamask/adapter/input/rest/JobController.kt @@ -1,6 +1,7 @@ package com.opendatamask.adapter.input.rest import com.opendatamask.domain.port.input.dto.JobLogResponse +import com.opendatamask.domain.port.input.dto.CreateJobRequest import com.opendatamask.domain.port.input.dto.JobResponse import com.opendatamask.domain.model.WorkspacePermission import com.opendatamask.adapter.output.persistence.UserRepository @@ -23,11 +24,12 @@ class JobController( @PostMapping fun createAndRunJob( @PathVariable workspaceId: Long, + @RequestBody(required = false) request: CreateJobRequest?, @AuthenticationPrincipal userDetails: UserDetails ): ResponseEntity { val userId = getUserId(userDetails) permissionService.requirePermission(userId, workspaceId, WorkspacePermission.RUN_JOBS) - val job = jobService.createJob(workspaceId, userId) + val job = jobService.createJob(workspaceId, userId, request?.connectionPairId) jobService.runJob(job.id) return ResponseEntity.status(HttpStatus.CREATED).body(job) } diff --git a/backend/src/main/kotlin/com/opendatamask/adapter/output/persistence/ConnectionPairRepository.kt b/backend/src/main/kotlin/com/opendatamask/adapter/output/persistence/ConnectionPairRepository.kt new file mode 100644 index 0000000..19754d6 --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/adapter/output/persistence/ConnectionPairRepository.kt @@ -0,0 +1,20 @@ +package com.opendatamask.adapter.output.persistence + +import com.opendatamask.domain.model.ConnectionPair +import com.opendatamask.domain.port.output.ConnectionPairPort +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import java.util.Optional + +@Repository +interface ConnectionPairRepository : JpaRepository, ConnectionPairPort { + override fun findById(id: Long): Optional + override fun findByWorkspaceId(workspaceId: Long): List + + @Query("SELECT c FROM ConnectionPair c WHERE c.workspaceId = :workspaceId AND c.deletedAt IS NULL") + override fun findActiveByWorkspaceId(workspaceId: Long): List + + override fun save(connectionPair: ConnectionPair): ConnectionPair + override fun deleteById(id: Long) +} diff --git a/backend/src/main/kotlin/com/opendatamask/application/service/ConnectionPairService.kt b/backend/src/main/kotlin/com/opendatamask/application/service/ConnectionPairService.kt new file mode 100644 index 0000000..e04209a --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/application/service/ConnectionPairService.kt @@ -0,0 +1,114 @@ +package com.opendatamask.application.service + +import com.opendatamask.domain.port.input.ConnectionPairUseCase +import com.opendatamask.domain.port.output.ConnectionPairPort +import com.opendatamask.domain.port.output.DataConnectionPort +import com.opendatamask.domain.port.output.WorkspacePort +import com.opendatamask.domain.model.ConnectionPair +import com.opendatamask.domain.port.input.dto.ConnectionPairRequest +import com.opendatamask.domain.port.input.dto.ConnectionPairResponse +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.time.LocalDateTime + +@Service +class ConnectionPairService( + private val connectionPairRepository: ConnectionPairPort, + private val dataConnectionRepository: DataConnectionPort, + private val workspaceRepository: WorkspacePort +) : ConnectionPairUseCase { + + @Transactional + override fun createConnectionPair(workspaceId: Long, request: ConnectionPairRequest): ConnectionPairResponse { + workspaceRepository.findById(workspaceId) + .orElseThrow { NoSuchElementException("Workspace not found: $workspaceId") } + validateConnectionsBelongToWorkspace(workspaceId, request.sourceConnectionId, request.destinationConnectionId) + val pair = ConnectionPair( + workspaceId = workspaceId, + name = request.name, + description = request.description, + sourceConnectionId = request.sourceConnectionId, + destinationConnectionId = request.destinationConnectionId + ) + return connectionPairRepository.save(pair).toResponse() + } + + @Transactional(readOnly = true) + override fun getConnectionPair(workspaceId: Long, pairId: Long): ConnectionPairResponse { + return findActivePair(workspaceId, pairId).toResponse() + } + + @Transactional(readOnly = true) + override fun listConnectionPairs(workspaceId: Long): List { + return connectionPairRepository.findActiveByWorkspaceId(workspaceId).map { it.toResponse() } + } + + @Transactional + override fun updateConnectionPair( + workspaceId: Long, + pairId: Long, + request: ConnectionPairRequest + ): ConnectionPairResponse { + val pair = findActivePair(workspaceId, pairId) + validateConnectionsBelongToWorkspace(workspaceId, request.sourceConnectionId, request.destinationConnectionId) + pair.name = request.name + pair.description = request.description + pair.sourceConnectionId = request.sourceConnectionId + pair.destinationConnectionId = request.destinationConnectionId + return connectionPairRepository.save(pair).toResponse() + } + + @Transactional + override fun deleteConnectionPair(workspaceId: Long, pairId: Long) { + val pair = findActivePair(workspaceId, pairId) + pair.deletedAt = LocalDateTime.now() + connectionPairRepository.save(pair) + } + + private fun findActivePair(workspaceId: Long, pairId: Long): ConnectionPair { + val pair = connectionPairRepository.findById(pairId) + .orElseThrow { NoSuchElementException("Connection pair not found: $pairId") } + if (pair.workspaceId != workspaceId) { + throw NoSuchElementException("Connection pair $pairId does not belong to workspace $workspaceId") + } + if (pair.deletedAt != null) { + throw NoSuchElementException("Connection pair $pairId has been deleted") + } + return pair + } + + private fun validateConnectionsBelongToWorkspace(workspaceId: Long, sourceId: Long, destinationId: Long) { + if (sourceId == destinationId) { + throw IllegalArgumentException("Source and destination connections must be distinct") + } + val source = dataConnectionRepository.findById(sourceId) + .orElseThrow { NoSuchElementException("Source connection not found: $sourceId") } + if (source.workspaceId != workspaceId) { + throw IllegalArgumentException("Source connection $sourceId does not belong to workspace $workspaceId") + } + if (!source.isSource) { + throw IllegalArgumentException("Connection $sourceId is not configured as a source connection") + } + val destination = dataConnectionRepository.findById(destinationId) + .orElseThrow { NoSuchElementException("Destination connection not found: $destinationId") } + if (destination.workspaceId != workspaceId) { + throw IllegalArgumentException( + "Destination connection $destinationId does not belong to workspace $workspaceId" + ) + } + if (!destination.isDestination) { + throw IllegalArgumentException("Connection $destinationId is not configured as a destination connection") + } + } + + private fun ConnectionPair.toResponse() = ConnectionPairResponse( + id = id, + workspaceId = workspaceId, + name = name, + description = description, + sourceConnectionId = sourceConnectionId, + destinationConnectionId = destinationConnectionId, + createdAt = createdAt, + updatedAt = updatedAt + ) +} diff --git a/backend/src/main/kotlin/com/opendatamask/application/service/JobService.kt b/backend/src/main/kotlin/com/opendatamask/application/service/JobService.kt index b93ee0a..7d3c5c1 100644 --- a/backend/src/main/kotlin/com/opendatamask/application/service/JobService.kt +++ b/backend/src/main/kotlin/com/opendatamask/application/service/JobService.kt @@ -8,6 +8,7 @@ import com.opendatamask.domain.port.output.JobPort import com.opendatamask.domain.port.output.JobLogPort import com.opendatamask.domain.port.output.WorkspacePort import com.opendatamask.domain.port.output.DataConnectionPort +import com.opendatamask.domain.port.output.ConnectionPairPort import com.opendatamask.domain.port.output.TableConfigurationPort import com.opendatamask.domain.port.output.ColumnGeneratorPort import com.opendatamask.domain.port.output.SubsetTableConfigPort @@ -27,6 +28,7 @@ class JobService( private val jobLogRepository: JobLogPort, private val workspaceRepository: WorkspacePort, private val dataConnectionRepository: DataConnectionPort, + private val connectionPairRepository: ConnectionPairPort, private val tableConfigurationRepository: TableConfigurationPort, private val columnGeneratorRepository: ColumnGeneratorPort, private val encryptionPort: EncryptionPort, @@ -49,14 +51,15 @@ class JobService( private val logger = LoggerFactory.getLogger(JobService::class.java) @Transactional - override fun createJob(workspaceId: Long, createdBy: Long): JobResponse { + override fun createJob(workspaceId: Long, createdBy: Long, connectionPairId: Long?): JobResponse { workspaceRepository.findById(workspaceId) .orElseThrow { NoSuchElementException("Workspace not found: $workspaceId") } val job = Job( workspaceId = workspaceId, status = JobStatus.PENDING, - createdBy = createdBy + createdBy = createdBy, + connectionPairId = connectionPairId ) val saved = jobRepository.save(job) return saved.toResponse() @@ -98,8 +101,8 @@ class JobService( } @Transactional - override fun createAndRunJob(workspaceId: Long, createdBy: Long): JobResponse { - val response = createJob(workspaceId, createdBy) + override fun createAndRunJob(workspaceId: Long, createdBy: Long, connectionPairId: Long?): JobResponse { + val response = createJob(workspaceId, createdBy, connectionPairId) runJob(response.id) return response } @@ -116,20 +119,7 @@ class JobService( throw IllegalStateException("Job blocked: unresolved schema changes require attention before running") } - val sourceConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId) - .filter { it.isSource } - val destConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId) - .filter { it.isDestination } - - if (sourceConnections.isEmpty()) { - throw IllegalStateException("No source connection configured for workspace ${job.workspaceId}") - } - if (destConnections.isEmpty()) { - throw IllegalStateException("No destination connection configured for workspace ${job.workspaceId}") - } - - val sourceConn = sourceConnections.first() - val destConn = destConnections.first() + val (sourceConn, destConn) = resolveConnections(job) addLog(job.id, "Connecting to source: ${sourceConn.name}", LogLevel.INFO) val sourceConnector = connectorFactory.createConnector( @@ -299,6 +289,68 @@ class JobService( return job } + // Returns (sourceConnection, destinationConnection) for a job. When the job has a + // connectionPairId the pair's explicit connections are used; otherwise the workspace's + // first source/destination connections are used (backward-compatible behaviour). + private fun resolveConnections(job: Job): Pair { + val pairId = job.connectionPairId + if (pairId != null) { + val pair = connectionPairRepository.findById(pairId) + .orElseThrow { NoSuchElementException("Connection pair not found: $pairId") } + if (pair.deletedAt != null) { + throw IllegalStateException("Connection pair $pairId has been deleted") + } + if (pair.workspaceId != job.workspaceId) { + throw IllegalStateException( + "Connection pair $pairId does not belong to workspace ${job.workspaceId}" + ) + } + if (pair.sourceConnectionId == pair.destinationConnectionId) { + throw IllegalStateException( + "Connection pair $pairId is invalid: source and destination connections must be distinct" + ) + } + val source = dataConnectionRepository.findById(pair.sourceConnectionId) + .orElseThrow { NoSuchElementException("Source connection not found: ${pair.sourceConnectionId}") } + if (source.workspaceId != job.workspaceId) { + throw IllegalStateException( + "Source connection ${pair.sourceConnectionId} does not belong to workspace ${job.workspaceId}" + ) + } + if (!source.isSource) { + throw IllegalStateException( + "Connection pair $pairId is invalid: connection ${pair.sourceConnectionId} is not a source connection" + ) + } + val destination = dataConnectionRepository.findById(pair.destinationConnectionId) + .orElseThrow { + NoSuchElementException("Destination connection not found: ${pair.destinationConnectionId}") + } + if (destination.workspaceId != job.workspaceId) { + throw IllegalStateException( + "Destination connection ${pair.destinationConnectionId} does not belong to workspace ${job.workspaceId}" + ) + } + if (!destination.isDestination) { + throw IllegalStateException( + "Connection pair $pairId is invalid: connection ${pair.destinationConnectionId} is not a destination connection" + ) + } + return source to destination + } + + val sourceConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId).filter { it.isSource } + val destConnections = dataConnectionRepository.findByWorkspaceId(job.workspaceId).filter { it.isDestination } + + if (sourceConnections.isEmpty()) { + throw IllegalStateException("No source connection configured for workspace ${job.workspaceId}") + } + if (destConnections.isEmpty()) { + throw IllegalStateException("No destination connection configured for workspace ${job.workspaceId}") + } + return sourceConnections.first() to destConnections.first() + } + private fun Job.toResponse() = JobResponse( id = id, workspaceId = workspaceId, @@ -307,7 +359,8 @@ class JobService( completedAt = completedAt, createdAt = createdAt, errorMessage = errorMessage, - createdBy = createdBy + createdBy = createdBy, + connectionPairId = connectionPairId ) private fun JobLog.toLogResponse() = JobLogResponse( diff --git a/backend/src/main/kotlin/com/opendatamask/domain/model/ConnectionPair.kt b/backend/src/main/kotlin/com/opendatamask/domain/model/ConnectionPair.kt new file mode 100644 index 0000000..90da3a8 --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/domain/model/ConnectionPair.kt @@ -0,0 +1,48 @@ +package com.opendatamask.domain.model + +import jakarta.persistence.* +import java.time.LocalDateTime + +@Entity +@Table(name = "connection_pairs") +class ConnectionPair( + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + val id: Long = 0, + + @Column(nullable = false) + var workspaceId: Long, + + @Column(nullable = false) + var name: String, + + @Column + var description: String? = null, + + @Column(nullable = false) + var sourceConnectionId: Long, + + @Column(nullable = false) + var destinationConnectionId: Long, + + @Column(nullable = false) + var createdAt: LocalDateTime = LocalDateTime.now(), + + @Column(nullable = false) + var updatedAt: LocalDateTime = LocalDateTime.now(), + + // Soft-delete timestamp; null means the pair is active. + @Column + var deletedAt: LocalDateTime? = null +) { + @PrePersist + fun prePersist() { + createdAt = LocalDateTime.now() + updatedAt = LocalDateTime.now() + } + + @PreUpdate + fun preUpdate() { + updatedAt = LocalDateTime.now() + } +} diff --git a/backend/src/main/kotlin/com/opendatamask/domain/model/Job.kt b/backend/src/main/kotlin/com/opendatamask/domain/model/Job.kt index 8e927d3..a0d1175 100644 --- a/backend/src/main/kotlin/com/opendatamask/domain/model/Job.kt +++ b/backend/src/main/kotlin/com/opendatamask/domain/model/Job.kt @@ -34,7 +34,12 @@ class Job( var errorMessage: String? = null, @Column(nullable = false) - var createdBy: Long + var createdBy: Long, + + // Optional reference to a ConnectionPair; when set, the job uses that pair's + // source and destination connections instead of the workspace defaults. + @Column + var connectionPairId: Long? = null ) { @PrePersist fun prePersist() { diff --git a/backend/src/main/kotlin/com/opendatamask/domain/port/input/ConnectionPairUseCase.kt b/backend/src/main/kotlin/com/opendatamask/domain/port/input/ConnectionPairUseCase.kt new file mode 100644 index 0000000..62b1362 --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/domain/port/input/ConnectionPairUseCase.kt @@ -0,0 +1,12 @@ +package com.opendatamask.domain.port.input + +import com.opendatamask.domain.port.input.dto.ConnectionPairRequest +import com.opendatamask.domain.port.input.dto.ConnectionPairResponse + +interface ConnectionPairUseCase { + fun createConnectionPair(workspaceId: Long, request: ConnectionPairRequest): ConnectionPairResponse + fun getConnectionPair(workspaceId: Long, pairId: Long): ConnectionPairResponse + fun listConnectionPairs(workspaceId: Long): List + fun updateConnectionPair(workspaceId: Long, pairId: Long, request: ConnectionPairRequest): ConnectionPairResponse + fun deleteConnectionPair(workspaceId: Long, pairId: Long) +} diff --git a/backend/src/main/kotlin/com/opendatamask/domain/port/input/JobUseCase.kt b/backend/src/main/kotlin/com/opendatamask/domain/port/input/JobUseCase.kt index d6fe571..b470e5f 100644 --- a/backend/src/main/kotlin/com/opendatamask/domain/port/input/JobUseCase.kt +++ b/backend/src/main/kotlin/com/opendatamask/domain/port/input/JobUseCase.kt @@ -4,10 +4,10 @@ import com.opendatamask.domain.port.input.dto.JobLogResponse import com.opendatamask.domain.port.input.dto.JobResponse interface JobUseCase { - fun createJob(workspaceId: Long, createdBy: Long): JobResponse + fun createJob(workspaceId: Long, createdBy: Long, connectionPairId: Long? = null): JobResponse fun getJob(workspaceId: Long, jobId: Long): JobResponse fun listJobs(workspaceId: Long): List fun getJobLogs(workspaceId: Long, jobId: Long): List fun cancelJob(workspaceId: Long, jobId: Long): JobResponse - fun createAndRunJob(workspaceId: Long, createdBy: Long): JobResponse + fun createAndRunJob(workspaceId: Long, createdBy: Long, connectionPairId: Long? = null): JobResponse } diff --git a/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/ConnectionPairDto.kt b/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/ConnectionPairDto.kt new file mode 100644 index 0000000..377718b --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/ConnectionPairDto.kt @@ -0,0 +1,29 @@ +package com.opendatamask.domain.port.input.dto + +import jakarta.validation.constraints.NotBlank +import jakarta.validation.constraints.NotNull +import java.time.LocalDateTime + +data class ConnectionPairRequest( + @field:NotBlank(message = "Connection pair name is required") + val name: String, + + val description: String? = null, + + @field:NotNull(message = "Source connection ID is required") + val sourceConnectionId: Long, + + @field:NotNull(message = "Destination connection ID is required") + val destinationConnectionId: Long +) + +data class ConnectionPairResponse( + val id: Long, + val workspaceId: Long, + val name: String, + val description: String?, + val sourceConnectionId: Long, + val destinationConnectionId: Long, + val createdAt: LocalDateTime, + val updatedAt: LocalDateTime +) diff --git a/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/JobDto.kt b/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/JobDto.kt index 579ac40..f1dc098 100644 --- a/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/JobDto.kt +++ b/backend/src/main/kotlin/com/opendatamask/domain/port/input/dto/JobDto.kt @@ -12,7 +12,8 @@ data class JobResponse( val completedAt: LocalDateTime?, val createdAt: LocalDateTime, val errorMessage: String?, - val createdBy: Long + val createdBy: Long, + val connectionPairId: Long? = null ) data class JobLogResponse( @@ -22,3 +23,9 @@ data class JobLogResponse( val level: LogLevel, val timestamp: LocalDateTime ) + +// Optional request body for job creation; all fields are nullable for backward compatibility. +// When connectionPairId is null/omitted, the job falls back to the workspace-wide source/destination lookup. +data class CreateJobRequest( + val connectionPairId: Long? = null +) diff --git a/backend/src/main/kotlin/com/opendatamask/domain/port/output/ConnectionPairPort.kt b/backend/src/main/kotlin/com/opendatamask/domain/port/output/ConnectionPairPort.kt new file mode 100644 index 0000000..c01c936 --- /dev/null +++ b/backend/src/main/kotlin/com/opendatamask/domain/port/output/ConnectionPairPort.kt @@ -0,0 +1,12 @@ +package com.opendatamask.domain.port.output + +import com.opendatamask.domain.model.ConnectionPair +import java.util.Optional + +interface ConnectionPairPort { + fun findById(id: Long): Optional + fun findByWorkspaceId(workspaceId: Long): List + fun findActiveByWorkspaceId(workspaceId: Long): List + fun save(connectionPair: ConnectionPair): ConnectionPair + fun deleteById(id: Long) +} diff --git a/backend/src/test/kotlin/com/opendatamask/application/service/ConnectionPairServiceTest.kt b/backend/src/test/kotlin/com/opendatamask/application/service/ConnectionPairServiceTest.kt new file mode 100644 index 0000000..21a0385 --- /dev/null +++ b/backend/src/test/kotlin/com/opendatamask/application/service/ConnectionPairServiceTest.kt @@ -0,0 +1,273 @@ +package com.opendatamask.application.service + +import com.opendatamask.domain.port.output.ConnectionPairPort +import com.opendatamask.domain.port.output.DataConnectionPort +import com.opendatamask.domain.port.output.WorkspacePort +import com.opendatamask.domain.model.ConnectionPair +import com.opendatamask.domain.model.ConnectionType +import com.opendatamask.domain.model.DataConnection +import com.opendatamask.domain.model.Workspace +import com.opendatamask.domain.port.input.dto.ConnectionPairRequest +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.InjectMocks +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import java.time.LocalDateTime +import java.util.Optional + +@ExtendWith(MockitoExtension::class) +class ConnectionPairServiceTest { + + @Mock private lateinit var connectionPairRepository: ConnectionPairPort + @Mock private lateinit var dataConnectionRepository: DataConnectionPort + @Mock private lateinit var workspaceRepository: WorkspacePort + + @InjectMocks + private lateinit var service: ConnectionPairService + + private fun makeWorkspace(id: Long = 1L) = Workspace( + id = id, name = "WS", ownerId = 1L, + createdAt = LocalDateTime.now(), updatedAt = LocalDateTime.now() + ) + + private fun makeDataConnection( + id: Long = 1L, + workspaceId: Long = 1L, + isSource: Boolean = (id == 1L), + isDestination: Boolean = (id == 2L) + ) = DataConnection( + id = id, workspaceId = workspaceId, name = "conn-$id", + type = ConnectionType.POSTGRESQL, connectionString = "enc_conn", + isSource = isSource, isDestination = isDestination + ) + + private fun makeConnectionPair( + id: Long = 10L, + workspaceId: Long = 1L, + deletedAt: LocalDateTime? = null + ) = ConnectionPair( + id = id, workspaceId = workspaceId, + name = "Pair A", description = "Test pair", + sourceConnectionId = 1L, destinationConnectionId = 2L, + createdAt = LocalDateTime.now(), updatedAt = LocalDateTime.now(), + deletedAt = deletedAt + ) + + private fun makeRequest( + name: String = "Pair A", + sourceConnectionId: Long = 1L, + destinationConnectionId: Long = 2L + ) = ConnectionPairRequest( + name = name, + description = "Test pair", + sourceConnectionId = sourceConnectionId, + destinationConnectionId = destinationConnectionId + ) + + // ── createConnectionPair ─────────────────────────────────────────────── + + @Test + fun `createConnectionPair saves pair and returns response`() { + val saved = makeConnectionPair() + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(makeDataConnection(1L))) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(makeDataConnection(2L))) + whenever(connectionPairRepository.save(any())).thenReturn(saved) + + val response = service.createConnectionPair(1L, makeRequest()) + + assertEquals(10L, response.id) + assertEquals(1L, response.workspaceId) + assertEquals("Pair A", response.name) + assertEquals(1L, response.sourceConnectionId) + assertEquals(2L, response.destinationConnectionId) + verify(connectionPairRepository).save(any()) + } + + @Test + fun `createConnectionPair throws when workspace not found`() { + whenever(workspaceRepository.findById(99L)).thenReturn(Optional.empty()) + + assertThrows { service.createConnectionPair(99L, makeRequest()) } + verify(connectionPairRepository, never()).save(any()) + } + + @Test + fun `createConnectionPair throws when source connection not found`() { + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.empty()) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + @Test + fun `createConnectionPair throws when destination connection not found`() { + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(makeDataConnection(1L))) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.empty()) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + @Test + fun `createConnectionPair throws when source connection belongs to different workspace`() { + val foreignConn = makeDataConnection(1L, workspaceId = 99L) + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(foreignConn)) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + @Test + fun `createConnectionPair throws when destination connection belongs to different workspace`() { + val foreignDest = makeDataConnection(2L, workspaceId = 99L) + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(makeDataConnection(1L))) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(foreignDest)) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + @Test + fun `createConnectionPair throws when source and destination are the same connection`() { + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + + assertThrows { + service.createConnectionPair(1L, makeRequest(sourceConnectionId = 1L, destinationConnectionId = 1L)) + } + } + + @Test + fun `createConnectionPair throws when source connection is not marked as source`() { + val notSourceConn = makeDataConnection(1L, isSource = false, isDestination = false) + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(notSourceConn)) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + @Test + fun `createConnectionPair throws when destination connection is not marked as destination`() { + val notDestConn = makeDataConnection(2L, isSource = false, isDestination = false) + whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(makeDataConnection(1L))) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(notDestConn)) + + assertThrows { service.createConnectionPair(1L, makeRequest()) } + } + + // ── getConnectionPair ────────────────────────────────────────────────── + + @Test + fun `getConnectionPair returns active pair for matching workspace`() { + val pair = makeConnectionPair(id = 10L, workspaceId = 1L) + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(pair)) + + val response = service.getConnectionPair(1L, 10L) + + assertEquals(10L, response.id) + assertEquals(1L, response.workspaceId) + } + + @Test + fun `getConnectionPair throws when pair not found`() { + whenever(connectionPairRepository.findById(99L)).thenReturn(Optional.empty()) + + assertThrows { service.getConnectionPair(1L, 99L) } + } + + @Test + fun `getConnectionPair throws when pair belongs to different workspace`() { + val pair = makeConnectionPair(id = 10L, workspaceId = 99L) + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(pair)) + + assertThrows { service.getConnectionPair(1L, 10L) } + } + + @Test + fun `getConnectionPair throws when pair is soft-deleted`() { + val pair = makeConnectionPair(id = 10L, deletedAt = LocalDateTime.now()) + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(pair)) + + assertThrows { service.getConnectionPair(1L, 10L) } + } + + // ── listConnectionPairs ──────────────────────────────────────────────── + + @Test + fun `listConnectionPairs returns only active pairs for workspace`() { + val pairs = listOf(makeConnectionPair(id = 10L), makeConnectionPair(id = 11L)) + whenever(connectionPairRepository.findActiveByWorkspaceId(1L)).thenReturn(pairs) + + val result = service.listConnectionPairs(1L) + + assertEquals(2, result.size) + } + + @Test + fun `listConnectionPairs returns empty list when none found`() { + whenever(connectionPairRepository.findActiveByWorkspaceId(1L)).thenReturn(emptyList()) + + val result = service.listConnectionPairs(1L) + + assertTrue(result.isEmpty()) + } + + // ── updateConnectionPair ─────────────────────────────────────────────── + + @Test + fun `updateConnectionPair updates fields and saves`() { + val pair = makeConnectionPair(id = 10L, workspaceId = 1L) + val updateRequest = makeRequest(name = "Updated Pair") + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(makeDataConnection(1L))) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(makeDataConnection(2L))) + whenever(connectionPairRepository.save(any())).thenAnswer { it.arguments[0] as ConnectionPair } + + val response = service.updateConnectionPair(1L, 10L, updateRequest) + + assertEquals("Updated Pair", response.name) + verify(connectionPairRepository).save(any()) + } + + @Test + fun `updateConnectionPair throws when pair not found`() { + whenever(connectionPairRepository.findById(99L)).thenReturn(Optional.empty()) + + assertThrows { service.updateConnectionPair(1L, 99L, makeRequest()) } + } + + // ── deleteConnectionPair ─────────────────────────────────────────────── + + @Test + fun `deleteConnectionPair sets deletedAt for soft delete`() { + val pair = makeConnectionPair(id = 10L, workspaceId = 1L) + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(pair)) + whenever(connectionPairRepository.save(any())).thenAnswer { it.arguments[0] as ConnectionPair } + + service.deleteConnectionPair(1L, 10L) + + val captor = argumentCaptor() + verify(connectionPairRepository).save(captor.capture()) + assertNotNull(captor.firstValue.deletedAt) + } + + @Test + fun `deleteConnectionPair throws when pair not found`() { + whenever(connectionPairRepository.findById(99L)).thenReturn(Optional.empty()) + + assertThrows { service.deleteConnectionPair(1L, 99L) } + } + + @Test + fun `deleteConnectionPair throws when pair already deleted`() { + val deletedPair = makeConnectionPair(id = 10L, deletedAt = LocalDateTime.now()) + whenever(connectionPairRepository.findById(10L)).thenReturn(Optional.of(deletedPair)) + + assertThrows { service.deleteConnectionPair(1L, 10L) } + } +} diff --git a/backend/src/test/kotlin/com/opendatamask/application/service/JobServiceTest.kt b/backend/src/test/kotlin/com/opendatamask/application/service/JobServiceTest.kt index 1930bd3..118e498 100644 --- a/backend/src/test/kotlin/com/opendatamask/application/service/JobServiceTest.kt +++ b/backend/src/test/kotlin/com/opendatamask/application/service/JobServiceTest.kt @@ -21,6 +21,7 @@ class JobServiceTest { @Mock private lateinit var jobLogRepository: JobLogPort @Mock private lateinit var workspaceRepository: WorkspacePort @Mock private lateinit var dataConnectionRepository: DataConnectionPort + @Mock private lateinit var connectionPairRepository: ConnectionPairPort @Mock private lateinit var tableConfigurationRepository: TableConfigurationPort @Mock private lateinit var columnGeneratorRepository: ColumnGeneratorPort @Mock private lateinit var EncryptionPort: EncryptionPort @@ -81,7 +82,7 @@ class JobServiceTest { whenever(workspaceRepository.findById(1L)).thenReturn(Optional.of(makeWorkspace())) whenever(jobRepository.save(any())).thenReturn(savedJob) - val response = jobService.createJob(1L, 1L) + val response = jobService.createJob(1L, 1L, null) assertEquals(1L, response.id) assertEquals(JobStatus.PENDING, response.status) @@ -92,7 +93,7 @@ class JobServiceTest { fun `createJob throws when workspace not found`() { whenever(workspaceRepository.findById(999L)).thenReturn(Optional.empty()) - assertThrows { jobService.createJob(999L, 1L) } + assertThrows { jobService.createJob(999L, 1L, null) } } // ── getJob ───────────────────────────────────────────────────────────── @@ -399,6 +400,167 @@ class JobServiceTest { verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) verify(postJobActionService).triggerActions(any()) } + + // ── resolveConnections — ConnectionPair path ─────────────────────────── + + private fun makeConnectionPair( + id: Long = 5L, + workspaceId: Long = 1L, + sourceConnectionId: Long = 1L, + destinationConnectionId: Long = 2L, + deletedAt: LocalDateTime? = null + ) = ConnectionPair( + id = id, workspaceId = workspaceId, + name = "Pair A", + sourceConnectionId = sourceConnectionId, + destinationConnectionId = destinationConnectionId, + createdAt = LocalDateTime.now(), updatedAt = LocalDateTime.now(), + deletedAt = deletedAt + ) + + @Test + fun `runJob uses ConnectionPair source and destination when connectionPairId is set`() { + val pair = makeConnectionPair(id = 5L, sourceConnectionId = 1L, destinationConnectionId = 2L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + val sourceConn = makeDataConnection(id = 1L, workspaceId = 1L, isSource = true) + val destConn = makeDataConnection(id = 2L, workspaceId = 1L, isDestination = true) + val mockSrc = mock() + val mockDst = mock() + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(sourceConn)) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(destConn)) + whenever(EncryptionPort.decrypt(any())).thenReturn("decrypted") + whenever(connectorFactory.createConnector(any(), any(), anyOrNull(), anyOrNull(), anyOrNull())) + .thenReturn(mockSrc, mockDst) + whenever(mockSrc.testConnection()).thenReturn(true) + whenever(mockDst.testConnection()).thenReturn(true) + whenever(tableConfigurationRepository.findByWorkspaceId(1L)).thenReturn(emptyList()) + + jobService.runJob(1L) + + verify(connectionPairRepository).findById(5L) + // Pair connections are loaded by ID, not by workspace scan + verify(dataConnectionRepository, never()).findByWorkspaceId(any()) + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.COMPLETED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair has been soft-deleted`() { + val deletedPair = makeConnectionPair(id = 5L, deletedAt = LocalDateTime.now()) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(deletedPair)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair source connection is missing`() { + val pair = makeConnectionPair(id = 5L, sourceConnectionId = 99L, destinationConnectionId = 2L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(99L)).thenReturn(Optional.empty()) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair destination connection is missing`() { + val pair = makeConnectionPair(id = 5L, sourceConnectionId = 1L, destinationConnectionId = 99L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + val sourceConn = makeDataConnection(id = 1L, workspaceId = 1L, isSource = true) + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(sourceConn)) + whenever(dataConnectionRepository.findById(99L)).thenReturn(Optional.empty()) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair belongs to a different workspace`() { + val foreignPair = makeConnectionPair(id = 5L, workspaceId = 99L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(foreignPair)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair source and destination are the same connection`() { + val samePair = makeConnectionPair(id = 5L, sourceConnectionId = 1L, destinationConnectionId = 1L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(samePair)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair source connection is not marked as source`() { + val pair = makeConnectionPair(id = 5L, sourceConnectionId = 1L, destinationConnectionId = 2L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + val notSource = makeDataConnection(id = 1L, workspaceId = 1L, isSource = false) + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(notSource)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair destination connection is not marked as destination`() { + val pair = makeConnectionPair(id = 5L, sourceConnectionId = 1L, destinationConnectionId = 2L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + val sourceConn = makeDataConnection(id = 1L, workspaceId = 1L, isSource = true) + val notDest = makeDataConnection(id = 2L, workspaceId = 1L, isDestination = false) + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(sourceConn)) + whenever(dataConnectionRepository.findById(2L)).thenReturn(Optional.of(notDest)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } + + @Test + fun `runJob sets FAILED when ConnectionPair source connection belongs to different workspace`() { + val pair = makeConnectionPair(id = 5L, workspaceId = 1L, sourceConnectionId = 1L, destinationConnectionId = 2L) + val job = makeJob(id = 1L, workspaceId = 1L).apply { connectionPairId = 5L } + val foreignSource = makeDataConnection(id = 1L, workspaceId = 99L, isSource = true) + + stubJobSaveAndLog(job) + whenever(connectionPairRepository.findById(5L)).thenReturn(Optional.of(pair)) + whenever(dataConnectionRepository.findById(1L)).thenReturn(Optional.of(foreignSource)) + + jobService.runJob(1L) + + verify(jobRepository, atLeastOnce()).save(argThat { status == JobStatus.FAILED }) + } } diff --git a/docs/user-guide.md b/docs/user-guide.md index ccdbb1a..db8fa1b 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -171,11 +171,70 @@ Example API request body to create an Azure SQL connection: A **Workspace** is an isolated configuration scope. Each workspace has: - One or more **Data Connections** (source and target databases) +- One or more **Connection Pairs** linking a source and destination together - **Table Configurations** that define which tables to process and how - **Column Generators** that specify what fake data to produce per column - **Team members** with ADMIN or USER roles - An optional **parent workspace** for configuration inheritance +### Connection Pairs + +A **Connection Pair** groups exactly one **Source** `DataConnection` and one **Destination** `DataConnection` under a workspace. Using pairs gives you explicit, named source→destination routes and makes it easy to run the same masking rules against different database environments (e.g. staging vs. QA). + +#### Key properties + +| Field | Description | +|---|---| +| `name` | Human-readable name for the pair (required) | +| `description` | Optional notes | +| `sourceConnectionId` | ID of the source `DataConnection` | +| `destinationConnectionId` | ID of the destination `DataConnection` | + +Both connections must belong to the same workspace. + +#### Soft delete + +Deleting a connection pair performs a **soft delete** — the record is retained in the database with a `deletedAt` timestamp, preserving audit history. Soft-deleted pairs are excluded from `list` responses but can still be found via raw database queries or audit tooling. + +#### API endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/api/workspaces/{id}/connection-pairs` | Create a connection pair | +| `GET` | `/api/workspaces/{id}/connection-pairs` | List active connection pairs | +| `GET` | `/api/workspaces/{id}/connection-pairs/{pairId}` | Get a single pair | +| `PUT` | `/api/workspaces/{id}/connection-pairs/{pairId}` | Update a pair | +| `DELETE` | `/api/workspaces/{id}/connection-pairs/{pairId}` | Soft-delete a pair | + +#### Example: create a pair + +```http +POST /api/workspaces/1/connection-pairs +Content-Type: application/json + +{ + "name": "Prod → Staging", + "description": "Copy masked production data to staging environment", + "sourceConnectionId": 3, + "destinationConnectionId": 7 +} +``` + +#### Running a job with a specific pair + +When triggering a masking job you can optionally pass a `connectionPairId` in the request body. The engine will then use that pair's source and destination connections instead of searching for the workspace defaults. + +```http +POST /api/workspaces/1/jobs +Content-Type: application/json + +{ + "connectionPairId": 5 +} +``` + +Omitting the body (or setting `connectionPairId` to `null`) falls back to the workspace-wide source / destination connection selection, preserving full backward compatibility. + ### Masking Modes | Mode | Description | diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 29868dc..6c819c1 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -220,6 +220,26 @@ export interface ColumnGeneratorRequest { parameters?: Record } +// ── Connection Pair ─────────────────────────────────────────────────────── + +export interface ConnectionPair { + id: number + workspaceId: number + name: string + description: string | null + sourceConnectionId: number + destinationConnectionId: number + createdAt: string + updatedAt: string +} + +export interface ConnectionPairRequest { + name: string + description?: string | null + sourceConnectionId: number + destinationConnectionId: number +} + // ── Job ─────────────────────────────────────────────────────────────────── export enum JobStatus { @@ -247,12 +267,13 @@ export interface Job { rowsProcessed: number createdAt: string updatedAt: string + connectionPairId?: number | null } export interface JobRequest { - name: string - sourceConnectionId: number - targetConnectionId: number + // When provided, the job uses the specified ConnectionPair's source and destination connections. + // When null or omitted, the system falls back to the workspace-wide source/destination lookup. + connectionPairId?: number | null } // ── Job Logs ────────────────────────────────────────────────────────────── diff --git a/frontend/src/views/JobsView.vue b/frontend/src/views/JobsView.vue index 5f2f02a..23129a1 100644 --- a/frontend/src/views/JobsView.vue +++ b/frontend/src/views/JobsView.vue @@ -18,7 +18,7 @@ const error = ref('') // Create modal const showCreateModal = ref(false) -const createForm = ref({ name: '', sourceConnectionId: 0, targetConnectionId: 0 }) +const createForm = ref({ connectionPairId: null }) const createError = ref('') const creating = ref(false)