diff --git a/admin-app/src/main/resources/application.conf b/admin-app/src/main/resources/application.conf index 6f45a4545..ff84a86c7 100644 --- a/admin-app/src/main/resources/application.conf +++ b/admin-app/src/main/resources/application.conf @@ -94,5 +94,11 @@ drill { fetchSize = ${?DRILL_ETL_FETCH_SIZE} batchSize = 1000 batchSize = ${?DRILL_ETL_BATCH_SIZE} + extractionLimit = 1000000 + extractionLimit = ${?DRILL_ETL_EXTRACTION_LIMIT} + transformationBufferSize = 2000 + transformationBufferSize = ${?DRILL_ETL_TRANSFORMATION_BUFFER_SIZE} + loggingFrequency = 10 + loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY} } } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataExtractor.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataExtractor.kt index d1e15d25e..7a27c3c72 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataExtractor.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataExtractor.kt @@ -30,14 +30,14 @@ import java.time.Instant * - how data will be transformed, * - how metrics will be stored. */ -interface DataExtractor { +interface DataExtractor { val name: String suspend fun extract( groupId: String, sinceTimestamp: Instant, untilTimestamp: Instant, emitter: FlowCollector, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit + onExtractingProgress: suspend (EtlExtractingResult) -> Unit ) } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataLoader.kt index 09cccfb34..1c431fbdf 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataLoader.kt @@ -29,14 +29,15 @@ import java.time.Instant * - Uses `batchSize` to control transaction size and DB pressure. * - Is resilient to partial failures */ -interface DataLoader { +interface DataLoader { val name: String suspend fun load( groupId: String, sinceTimestamp: Instant, untilTimestamp: Instant, collector: Flow, - onLoadCompleted: suspend (EtlLoadingResult) -> Unit + onLoadingProgress: suspend (EtlLoadingResult) -> Unit = {}, + onStatusChanged: suspend (EtlStatus) -> Unit = {}, ): EtlLoadingResult suspend fun deleteAll(groupId: String) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataTransformer.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataTransformer.kt new file mode 100644 index 000000000..0d81fb2e1 --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/DataTransformer.kt @@ -0,0 +1,36 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl + +import kotlinx.coroutines.flow.Flow + +interface DataTransformer { + val name: String + suspend fun transform( + groupId: String, + collector: Flow, + ): Flow +} + +class NopTransformer : DataTransformer { + override val name: String = "nop-transformer" + override suspend fun transform( + groupId: String, + collector: Flow, + ): Flow = collector +} + +val untypedNopTransformer = NopTransformer() diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlExtractingResult.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlExtractingResult.kt index b83997b82..8cac9a812 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlExtractingResult.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlExtractingResult.kt @@ -16,7 +16,6 @@ package com.epam.drill.admin.etl class EtlExtractingResult( - val success: Boolean, - val duration: Long, + val duration: Long = 0, val errorMessage: String? = null ) \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlLoadingResult.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlLoadingResult.kt index 0bbb37d63..b13212a6b 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlLoadingResult.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlLoadingResult.kt @@ -20,17 +20,15 @@ import java.time.Instant data class EtlLoadingResult( val lastProcessedAt: Instant, val processedRows: Long = 0L, - val status: EtlStatus, val duration: Long? = null, val errorMessage: String? = null ) : Comparable { val isFailed - get() = status == EtlStatus.FAILED + get() = errorMessage != null operator fun plus(other: EtlLoadingResult): EtlLoadingResult { val failed = this.isFailed || other.isFailed return EtlLoadingResult( - status = if (!failed) other.status else EtlStatus.FAILED, lastProcessedAt = if (!failed) other.lastProcessedAt else this.lastProcessedAt, processedRows = this.processedRows + other.processedRows, duration = listOfNotNull(this.duration, other.duration).sum(), diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadata.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadata.kt index 53edb4a88..4d4d5dcb5 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadata.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadata.kt @@ -24,9 +24,10 @@ data class EtlMetadata( val loaderName: String, val lastProcessedAt: Instant, val lastRunAt: Instant, - val lastDuration: Long, - val lastRowsProcessed: Long, + val lastLoadDuration: Long = 0L, + val lastExtractDuration: Long = 0L, + val lastRowsProcessed: Long = 0L, val status: EtlStatus, - val errorMessage: String?, + val errorMessage: String? = null, ) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadataRepository.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadataRepository.kt index e5417771c..5a7709b71 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadataRepository.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlMetadataRepository.kt @@ -15,13 +15,35 @@ */ package com.epam.drill.admin.etl +import java.time.Instant + interface EtlMetadataRepository { suspend fun getAllMetadata(groupId: String): List - suspend fun getAllMetadataByExtractor(groupId: String, pipelineName: String, extractorName: String): List - suspend fun getMetadata(groupId: String, pipelineName: String, extractorName: String, loaderName: String): EtlMetadata? + suspend fun getAllMetadataByExtractor( + groupId: String, + pipelineName: String, + extractorName: String + ): List + + suspend fun getMetadata( + groupId: String, + pipelineName: String, + extractorName: String, + loaderName: String + ): EtlMetadata? + suspend fun saveMetadata(metadata: EtlMetadata) - suspend fun accumulateMetadata(metadata: EtlMetadata) + suspend fun accumulateMetadataByLoader( + groupId: String, pipelineName: String, extractorName: String, loaderName: String, + lastProcessedAt: Instant? = null, + status: EtlStatus? = null, loadDuration: Long = 0L, rowsProcessed: Long = 0L, + errorMessage: String? = null + ) suspend fun deleteMetadataByPipeline(groupId: String, pipelineName: String) - suspend fun accumulateMetadataDurationByExtractor(groupId: String, pipelineName: String, extractorName: String, duration: Long) + suspend fun accumulateMetadataByExtractor( + groupId: String, pipelineName: String, extractorName: String, + status: EtlStatus? = null, extractDuration: Long = 0L, + errorMessage: String? = null + ) } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlPipeline.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlPipeline.kt index c98db6145..c96577245 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlPipeline.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlPipeline.kt @@ -35,16 +35,17 @@ import java.time.Instant * - failures in loaders or extractor are propagated to the pipeline, * - pipeline may cancel child coroutines and report status to `EtlOrchestrator`. */ -interface EtlPipeline { +interface EtlPipeline { val name: String - val extractor: DataExtractor - val loaders: List> + val extractor: DataExtractor + val loaders: List, DataLoader>> suspend fun execute( groupId: String, sinceTimestampPerLoader: Map, untilTimestamp: Instant, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit = {}, - onLoadCompleted: suspend (loaderName: String, result: EtlLoadingResult) -> Unit = { _, _ -> }, + onExtractingProgress: suspend (EtlExtractingResult) -> Unit = {}, + onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit = { _, _ -> }, + onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit = { _, _ -> }, ): EtlProcessingResult suspend fun cleanUp(groupId: String) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlRow.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlRow.kt new file mode 100644 index 000000000..a390df0ff --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlRow.kt @@ -0,0 +1,22 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl + +import java.time.Instant + +open class EtlRow( + val timestamp: Instant, +) \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/UntypedRow.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/UntypedRow.kt new file mode 100644 index 000000000..b54c96713 --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/UntypedRow.kt @@ -0,0 +1,23 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl + +import java.time.Instant + +class UntypedRow( + timestamp: Instant, + map: Map +): EtlRow(timestamp), Map by HashMap(map) \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/flow/LruMap.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/flow/LruMap.kt new file mode 100644 index 000000000..1e7cfc7f6 --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/flow/LruMap.kt @@ -0,0 +1,55 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl.flow + +/** + * A simple LRU (Least Recently Used) map implementation that evicts the oldest entry + * when the maximum size is reached. + */ +class LruMap( + private val maxSize: Int, + private val onEvict: (K, V) -> Unit +) { + private val map = LinkedHashMap(16, 0.75f, true) + + val size: Int + get() = map.size + + fun compute(key: K, update: (V?) -> V) { + map[key] = update(map[key]) + if (map.size >= maxSize) { + evictOldest() + } + } + + fun evictOldest() { + val it = map.entries.iterator() + if (!it.hasNext()) return + + val entry = it.next() + it.remove() + onEvict(entry.key, entry.value) + } + + fun evictAll() { + val it = map.entries.iterator() + while (it.hasNext()) { + val entry = it.next() + it.remove() + onEvict(entry.key, entry.value) + } + } +} \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index 30cdde1dc..a946ece4d 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -17,6 +17,7 @@ package com.epam.drill.admin.etl.impl import com.epam.drill.admin.etl.DataLoader import com.epam.drill.admin.etl.EtlLoadingResult +import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus import com.epam.drill.admin.etl.flow.StoppableFlow import com.epam.drill.admin.etl.flow.stoppable @@ -24,10 +25,13 @@ import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import java.time.Instant import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration.Companion.seconds -abstract class BatchDataLoader( +abstract class BatchDataLoader( override val name: String, - open val batchSize: Int = 1000 + open val batchSize: Int = 1000, + open val loggingFrequency: Int = 10, ) : DataLoader { private val logger = KotlinLogging.logger {} @@ -43,82 +47,106 @@ abstract class BatchDataLoader( sinceTimestamp: Instant, untilTimestamp: Instant, collector: Flow, - onLoadCompleted: suspend (EtlLoadingResult) -> Unit + onLoadingProgress: suspend (EtlLoadingResult) -> Unit, + onStatusChanged: suspend (EtlStatus) -> Unit ): EtlLoadingResult { - var result = EtlLoadingResult(status = EtlStatus.LOADING, lastProcessedAt = sinceTimestamp) + var result = EtlLoadingResult(lastProcessedAt = sinceTimestamp) val flow = collector.stoppable() val batchNo = AtomicInteger(0) + val loadedRows = AtomicLong(0) + val skippedRows = AtomicLong(0) val buffer = mutableListOf() + var skippedRowsForUpdate = 0L var lastLoadedTimestamp: Instant = sinceTimestamp var previousTimestamp: Instant? = null suspend fun StoppableFlow.stopWithMessage(message: String) { stop() result += EtlLoadingResult( - status = EtlStatus.FAILED, errorMessage = message, lastProcessedAt = lastLoadedTimestamp ).also { - onLoadCompleted(it) + onLoadingProgress(it) } } - logger.debug { "ETL loader [$name] loading rows..." } - flow.collect { row -> - val currentTimestamp = getLastExtractedTimestamp(row) - if (currentTimestamp == null) { - flow.stopWithMessage("Could not extract timestamp from the data row: $row") - return@collect - } - - if (previousTimestamp != null && currentTimestamp < previousTimestamp) { - flow.stopWithMessage("Timestamps in the extracted data are not in ascending order: $currentTimestamp < $previousTimestamp") - return@collect - } - - // Skip rows that are already processed - if (currentTimestamp <= sinceTimestamp) { - previousTimestamp = currentTimestamp - return@collect - } + trackProgressOf { + flow.collect { row -> + if (loadedRows.get() == 0L && skippedRows.get() == 0L) { + logger.debug { "ETL loader [$name] for group [$groupId] loading rows..." } + onStatusChanged(EtlStatus.LOADING) + } + val currentTimestamp = row.timestamp + if (previousTimestamp != null && currentTimestamp < previousTimestamp) { + flow.stopWithMessage("Timestamps in the extracted data are not in ascending order: $currentTimestamp < $previousTimestamp") + return@collect + } - if (currentTimestamp > untilTimestamp) { - flow.stop() - return@collect - } + // Skip rows that are already processed + if (currentTimestamp <= sinceTimestamp) { + previousTimestamp = currentTimestamp + skippedRows.incrementAndGet() + return@collect + } - // Skip rows that are not processable - if (!isProcessable(row)) { - previousTimestamp = currentTimestamp - return@collect - } + if (currentTimestamp > untilTimestamp) { + flow.stop() + return@collect + } - // If timestamp changed and buffer is full, flush the buffer - if (previousTimestamp != null && currentTimestamp != previousTimestamp) { - if (buffer.size >= batchSize) { + // If timestamp changed and buffer is full, flush the buffer + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.size >= batchSize) { result += flushBuffer(groupId, buffer, batchNo) { batch -> if (batch.success) { - lastLoadedTimestamp = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") + lastLoadedTimestamp = + previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") } EtlLoadingResult( - status = if (batch.success) EtlStatus.LOADING else EtlStatus.FAILED, errorMessage = if (!batch.success) result.errorMessage else null, lastProcessedAt = lastLoadedTimestamp, processedRows = if (batch.success) batch.rowsLoaded else 0L, duration = batch.duration ).also { - onLoadCompleted(it) + onLoadingProgress(it) } } } - } - if (result.isFailed) { - flow.stop() - return@collect + if (result.isFailed) { + flow.stop() + return@collect + } + + // Skip rows that are not processable + if (!isProcessable(row)) { + previousTimestamp = currentTimestamp + skippedRows.incrementAndGet() + skippedRowsForUpdate++ + // If timestamp changed and there are a lot of skipped rows, update progress + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) { + onLoadingProgress( + EtlLoadingResult( + lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"), + processedRows = 0, + ) + ) + skippedRowsForUpdate = 0 + } + return@collect + } + + buffer += row + previousTimestamp = currentTimestamp + loadedRows.incrementAndGet() + } + }.every(loggingFrequency.seconds) { + if (loadedRows.get() > 0L || skippedRows.get() > 0L) { + logger.debug { + "ETL loader [$name] for group [$groupId] loaded ${loadedRows.get()} rows" + + ", batch: ${batchNo.get()}" + + ", skipped rows: ${skippedRows.get()}" + } } - buffer += row - previousTimestamp = currentTimestamp } if (!result.isFailed) { @@ -126,34 +154,34 @@ abstract class BatchDataLoader( // Commit any remaining rows in the buffer result += flushBuffer(groupId, buffer, batchNo) { batch -> if (batch.success) { - lastLoadedTimestamp = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") + lastLoadedTimestamp = untilTimestamp } EtlLoadingResult( - status = if (batch.success) EtlStatus.SUCCESS else EtlStatus.FAILED, - errorMessage = if (!batch.success) result.errorMessage else null, + errorMessage = if (!batch.success) batch.errorMessage else null, lastProcessedAt = lastLoadedTimestamp, processedRows = if (batch.success) batch.rowsLoaded else 0, duration = batch.duration ).also { - onLoadCompleted(it) + onLoadingProgress(it) } } } else { - // Finalize with success - lastLoadedTimestamp = previousTimestamp ?: sinceTimestamp + // Update last processed timestamp even if no rows were left in the buffer result += EtlLoadingResult( - status = EtlStatus.SUCCESS, - lastProcessedAt = lastLoadedTimestamp + lastProcessedAt = untilTimestamp ).also { - onLoadCompleted(it) + onLoadingProgress(it) } } + onStatusChanged(EtlStatus.SUCCESS) + } + logger.debug { + val errors = result.errorMessage?.let { ", errors: $it" } ?: "" + "ETL loader [$name] for group [$groupId] complete loading for ${result.processedRows} rows" + errors } - logger.debug { "ETL loader [$name] complete loading for ${result.processedRows} rows, status: ${result.status}" } return result } - abstract fun getLastExtractedTimestamp(args: T): Instant? abstract fun isProcessable(args: T): Boolean abstract suspend fun loadBatch(groupId: String, batch: List, batchNo: Int): BatchResult @@ -170,6 +198,6 @@ abstract class BatchDataLoader( buffer.clear() onBatchCompleted(it) }.also { - logger.trace { "ETL loader [$name] loaded ${it.processedRows} rows in ${it.duration ?: 0}ms, batch: $batchNo" } + logger.trace { "ETL loader [$name] for group [$groupId] loaded ${it.processedRows} rows in ${it.duration ?: 0}ms, batch: $batchNo" } } } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlMetadataRepositoryImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlMetadataRepositoryImpl.kt index 97f3276ac..84afe64c0 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlMetadataRepositoryImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlMetadataRepositoryImpl.kt @@ -20,6 +20,7 @@ import com.epam.drill.admin.etl.EtlMetadata import com.epam.drill.admin.etl.EtlMetadataRepository import com.epam.drill.admin.etl.EtlStatus import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.Op import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq @@ -32,6 +33,7 @@ import org.jetbrains.exposed.sql.selectAll import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction import org.jetbrains.exposed.sql.update import org.jetbrains.exposed.sql.upsert +import java.time.Instant class EtlMetadataRepositoryImpl( private val database: Database, @@ -79,8 +81,10 @@ class EtlMetadataRepositoryImpl( override suspend fun saveMetadata(metadata: EtlMetadata): Unit = newSuspendedTransaction(db = database) { metadataTable.upsert( onUpdateExclude = listOf( - metadataTable.duration, + metadataTable.loadDuration, + metadataTable.extractDuration, metadataTable.rowsProcessed, + metadataTable.lastProcessedAt, metadataTable.createdAt ), ) { @@ -91,54 +95,83 @@ class EtlMetadataRepositoryImpl( it[status] = metadata.status.name it[lastProcessedAt] = metadata.lastProcessedAt it[lastRunAt] = metadata.lastRunAt - it[lastDuration] = metadata.lastDuration + it[lastLoadDuration] = metadata.lastLoadDuration + it[lastExtractDuration] = metadata.lastExtractDuration it[lastRowsProcessed] = metadata.lastRowsProcessed it[errorMessage] = metadata.errorMessage it[updatedAt] = CurrentDateTime } } - override suspend fun accumulateMetadata(metadata: EtlMetadata): Unit = newSuspendedTransaction(db = database) { - metadataTable.update(where = { - (metadataTable.groupId eq metadata.groupId) and - (metadataTable.pipelineName eq metadata.pipelineName) and - (metadataTable.extractorName eq metadata.extractorName) and - (metadataTable.loaderName eq metadata.loaderName) - }) { - it[status] = metadata.status.name - it[lastProcessedAt] = metadata.lastProcessedAt - it[lastRunAt] = metadata.lastRunAt - it[lastDuration] = lastDuration + metadata.lastDuration - it[lastRowsProcessed] = lastRowsProcessed + metadata.lastRowsProcessed - it[duration] = duration + metadata.lastDuration - it[rowsProcessed] = rowsProcessed + metadata.lastRowsProcessed - it[errorMessage] = metadata.errorMessage - it[updatedAt] = CurrentDateTime + override suspend fun accumulateMetadataByLoader( + groupId: String, + pipelineName: String, + extractorName: String, + loaderName: String, + lastProcessedAt: Instant?, + status: EtlStatus?, + loadDuration: Long, + rowsProcessed: Long, + errorMessage: String? + ) { + newSuspendedTransaction(db = database) { + metadataTable.update(where = { + (metadataTable.groupId eq groupId) and + (metadataTable.pipelineName eq pipelineName) and + (metadataTable.extractorName eq extractorName) and + (metadataTable.loaderName eq loaderName) + }) { + if (errorMessage != null) { + it[metadataTable.errorMessage] = errorMessage + it[metadataTable.status] = EtlStatus.FAILED.name + } + if (status != null) { + it[metadataTable.status] = status.name + } + if (lastProcessedAt != null) { + it[metadataTable.lastProcessedAt] = lastProcessedAt + } + it[metadataTable.lastLoadDuration] = metadataTable.lastLoadDuration + loadDuration + it[metadataTable.lastRowsProcessed] = metadataTable.lastRowsProcessed + rowsProcessed + it[metadataTable.loadDuration] = metadataTable.loadDuration + loadDuration + it[metadataTable.rowsProcessed] = metadataTable.rowsProcessed + rowsProcessed + it[updatedAt] = CurrentDateTime + } } } override suspend fun deleteMetadataByPipeline(groupId: String, pipelineName: String) { newSuspendedTransaction(db = database) { - metadataTable.deleteWhere { + metadataTable.deleteWhere { (metadataTable.groupId eq groupId) and (metadataTable.pipelineName eq pipelineName) } } } - override suspend fun accumulateMetadataDurationByExtractor( + override suspend fun accumulateMetadataByExtractor( groupId: String, pipelineName: String, extractorName: String, - duration: Long + status: EtlStatus?, + extractDuration: Long, + errorMessage: String? ) { newSuspendedTransaction(db = database) { metadataTable.update(where = { (metadataTable.groupId eq groupId) and - (metadataTable.pipelineName eq pipelineName) and - (metadataTable.extractorName eq extractorName) + (metadataTable.pipelineName eq pipelineName) and + (metadataTable.extractorName eq extractorName) and + (status?.let { metadataTable.status neq EtlStatus.FAILED.name } ?: Op.TRUE) }) { - it[lastDuration] = lastDuration + duration - it[metadataTable.duration] = metadataTable.duration + duration + if (errorMessage != null) { + it[metadataTable.errorMessage] = errorMessage + it[metadataTable.status] = EtlStatus.FAILED.name + } + if (status != null) { + it[metadataTable.status] = status.name + } + it[metadataTable.lastExtractDuration] = metadataTable.lastExtractDuration + extractDuration + it[metadataTable.extractDuration] = metadataTable.extractDuration + extractDuration it[metadataTable.updatedAt] = CurrentDateTime } } @@ -151,7 +184,8 @@ class EtlMetadataRepositoryImpl( loaderName = row[metadataTable.loaderName], lastProcessedAt = row[metadataTable.lastProcessedAt], lastRunAt = row[metadataTable.lastRunAt], - lastDuration = row[metadataTable.lastDuration], + lastLoadDuration = row[metadataTable.lastLoadDuration], + lastExtractDuration = row[metadataTable.lastExtractDuration], lastRowsProcessed = row[metadataTable.lastRowsProcessed], status = EtlStatus.valueOf(row[metadataTable.status]), errorMessage = row[metadataTable.errorMessage], diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt index e57271448..a1a2df82e 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt @@ -15,70 +15,92 @@ */ package com.epam.drill.admin.etl.impl +import com.epam.drill.admin.etl.EtlExtractingResult +import com.epam.drill.admin.etl.EtlLoadingResult import com.epam.drill.admin.etl.EtlMetadata import com.epam.drill.admin.etl.EtlMetadataRepository import com.epam.drill.admin.etl.EtlOrchestrator import com.epam.drill.admin.etl.EtlPipeline import com.epam.drill.admin.etl.EtlProcessingResult +import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus +import io.ktor.util.pipeline.Pipeline import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import mu.KotlinLogging import java.time.Instant import java.util.Collections import kotlin.system.measureTimeMillis +import kotlin.time.Duration.Companion.minutes open class EtlOrchestratorImpl( override val name: String, - open val pipelines: List>, + open val pipelines: List>, open val metadataRepository: EtlMetadataRepository, ) : EtlOrchestrator { private val logger = KotlinLogging.logger {} - override suspend fun run(groupId: String, initTimestamp: Instant): List = withContext(Dispatchers.IO) { - logger.info("ETL [$name] starting for group [$groupId] with init timestamp $initTimestamp...") - val results = Collections.synchronizedList(mutableListOf()) - val duration = measureTimeMillis { - pipelines.map { pipeline -> - async { - results += runPipeline(groupId, pipeline, initTimestamp) + override suspend fun run(groupId: String, initTimestamp: Instant): List = + withContext(Dispatchers.IO) { + logger.info("ETL [$name] for group [$groupId] is starting with init timestamp $initTimestamp...") + val results = Collections.synchronizedList(mutableListOf()) + val duration = measureTimeMillis { + trackProgressOf { + pipelines.map { pipeline -> + async { + results += runPipeline(groupId, pipeline, initTimestamp) + } + }.awaitAll() + }.every(1.minutes) { + logger.info { "ETL [$name] for group [$groupId] is still running..." } } - }.awaitAll() - } - logger.info { - val rowsProcessed = results.sumOf { it.rowsProcessed } - val failures = results.count { it.status == EtlStatus.FAILED } - if (rowsProcessed == 0L && failures == 0) - "ETL [$name] for group [$groupId] completed in ${duration}ms, no new rows" - else - "ETL [$name] for group [$groupId] completed in ${duration}ms, rows processed: $rowsProcessed, failures: $failures" + } + logger.info { + val rowsProcessed = results.sumOf { it.rowsProcessed } + val failures = results.count { it.status == EtlStatus.FAILED } + if (rowsProcessed == 0L && failures == 0) + "ETL [$name] for group [$groupId] completed in ${duration}ms, no new rows" + else + "ETL [$name] for group [$groupId] completed in ${duration}ms, rows processed: $rowsProcessed, failures: $failures" + } + return@withContext results } - return@withContext results - } - override suspend fun rerun(groupId: String, initTimestamp: Instant, withDataDeletion: Boolean): List = + override suspend fun rerun( + groupId: String, + initTimestamp: Instant, + withDataDeletion: Boolean + ): List = withContext(Dispatchers.IO) { - logger.info { "ETL [$name] deleting all metadata for group [$groupId] for rerun." } + logger.info { "ETL [$name] for group [$groupId] is deleting all metadata for rerun..." } pipelines.map { it.name }.forEach { pipelineName -> metadataRepository.deleteMetadataByPipeline(groupId, pipelineName) } - logger.info { "ETL [$name] deleted all metadata for group [$groupId] for rerun." } + logger.info { "ETL [$name] for group [$groupId] deleted all metadata for rerun." } if (withDataDeletion) { - logger.info { "ETL [$name] deleting all data for group [$groupId] for rerun." } + logger.info { "ETL [$name] for group [$groupId] is deleting all data for rerun..." } pipelines.forEach { it.cleanUp(groupId) } - logger.info { "ETL [$name] deleted all data for group [$groupId] for rerun." } + logger.info { "ETL [$name] for group [$groupId] deleted all data for rerun." } } val results = run(groupId, initTimestamp) return@withContext results } - private suspend fun runPipeline(groupId: String, pipeline: EtlPipeline<*>, initTimestamp: Instant): EtlProcessingResult { + private suspend fun runPipeline( + groupId: String, + pipeline: EtlPipeline<*, *>, + initTimestamp: Instant + ): EtlProcessingResult = coroutineScope { val snapshotTime = Instant.now() val metadata = metadataRepository.getAllMetadataByExtractor(groupId, pipeline.name, pipeline.extractor.name) .associateBy { it.loaderName } - val loaderNames = pipeline.loaders.map { it.name }.toSet() + val loaderNames = pipeline.loaders.map { it.second.name }.toSet() val timestampPerLoader = loaderNames.associateWith { (metadata[it]?.lastProcessedAt ?: initTimestamp) } try { @@ -92,9 +114,6 @@ open class EtlOrchestratorImpl( status = EtlStatus.EXTRACTING, lastProcessedAt = timestampPerLoader[loader] ?: initTimestamp, lastRunAt = snapshotTime, - lastDuration = 0L, - lastRowsProcessed = 0L, - errorMessage = null ) ) } @@ -102,43 +121,44 @@ open class EtlOrchestratorImpl( groupId = groupId, sinceTimestampPerLoader = timestampPerLoader, untilTimestamp = snapshotTime, - onExtractCompleted = { result -> - try { - metadataRepository.accumulateMetadataDurationByExtractor( - groupId, - pipeline.name, - pipeline.extractor.name, - result.duration - ) - } catch (e: Throwable) { - logger.warn("ETL pipeline [${pipeline.name}] failed to update metadata: ${e.message}", e) - } + onExtractingProgress = { result -> + progressExtracting( + groupId = groupId, + pipelineName = pipeline.name, + extractorName = pipeline.extractor.name, + result = result + ) + }, + onLoadingProgress = { loaderName, result -> + progressLoading( + groupId = groupId, + pipelineName = pipeline.name, + extractorName = pipeline.extractor.name, + loaderName = loaderName, + result = result + ) }, - onLoadCompleted = { loaderName, result -> + onStatusChanged = { loaderName, status -> try { - metadataRepository.accumulateMetadata( - EtlMetadata( - groupId = groupId, - pipelineName = pipeline.name, - extractorName = pipeline.extractor.name, - loaderName = loaderName, - status = result.status, - lastProcessedAt = result.lastProcessedAt, - errorMessage = result.errorMessage, - lastRunAt = snapshotTime, - lastDuration = result.duration ?: 0L, - lastRowsProcessed = result.processedRows - ) + metadataRepository.accumulateMetadataByLoader( + groupId = groupId, + pipelineName = pipeline.name, + extractorName = pipeline.extractor.name, + loaderName = loaderName, + status = status ) } catch (e: Throwable) { - logger.warn("ETL pipeline [${pipeline.name}] failed to update metadata: ${e.message}", e) + logger.warn( + "ETL pipeline [${pipeline.name}] for group [$groupId] failed to update loading status: ${e.message}", + e + ) } } ) - return pipelineResult + return@coroutineScope pipelineResult } catch (e: Throwable) { - logger.error("ETL pipeline [${pipeline.name}] failed: ${e.message}", e) - return EtlProcessingResult( + logger.error("ETL pipeline [${pipeline.name}] for group [$groupId] failed: ${e.message}", e) + return@coroutineScope EtlProcessingResult( groupId = groupId, pipelineName = pipeline.name, lastProcessedAt = initTimestamp, @@ -148,5 +168,51 @@ open class EtlOrchestratorImpl( ) } } + + suspend fun progressExtracting(groupId: String, + pipelineName: String, + extractorName: String, + result: EtlExtractingResult) { + try { + metadataRepository.accumulateMetadataByExtractor( + groupId = groupId, + pipelineName = pipelineName, + extractorName = extractorName, + errorMessage = result.errorMessage, + extractDuration = result.duration + ) + } catch (e: Throwable) { + logger.warn( + "ETL pipeline [${pipelineName}] for group [$groupId] failed to update extracting progress: ${e.message}", + e + ) + } + } + + suspend fun progressLoading( + groupId: String, + pipelineName: String, + extractorName: String, + loaderName: String, + result: EtlLoadingResult + ) { + try { + metadataRepository.accumulateMetadataByLoader( + groupId = groupId, + pipelineName = pipelineName, + extractorName = extractorName, + loaderName = loaderName, + errorMessage = result.errorMessage, + lastProcessedAt = result.lastProcessedAt, + loadDuration = result.duration ?: 0L, + rowsProcessed = result.processedRows + ) + } catch (e: Throwable) { + logger.warn( + "ETL pipeline [$pipelineName] for group [$groupId] failed to update loading progress: ${e.message}", + e + ) + } + } } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt index 5742e9019..0a4e2df31 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt @@ -17,11 +17,14 @@ package com.epam.drill.admin.etl.impl import com.epam.drill.admin.etl.DataExtractor import com.epam.drill.admin.etl.DataLoader +import com.epam.drill.admin.etl.DataTransformer import com.epam.drill.admin.etl.EtlExtractingResult import com.epam.drill.admin.etl.EtlPipeline import com.epam.drill.admin.etl.EtlProcessingResult import com.epam.drill.admin.etl.EtlLoadingResult +import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus +import com.epam.drill.admin.etl.NopTransformer import com.epam.drill.admin.etl.flow.CompletableSharedFlow import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred @@ -34,40 +37,81 @@ import mu.KotlinLogging import java.time.Instant import kotlin.system.measureTimeMillis -class EtlPipelineImpl( +class EtlPipelineImpl( override val name: String, override val extractor: DataExtractor, - override val loaders: List>, + override val loaders: List, DataLoader>>, private val bufferSize: Int = 2000 -) : EtlPipeline { +) : EtlPipeline { private val logger = KotlinLogging.logger {} + companion object { + fun singleLoader( + name: String, + extractor: DataExtractor, + loader: DataLoader, + bufferSize: Int = 2000 + ) = EtlPipelineImpl( + name = name, + extractor = extractor, + loaders = listOf(NopTransformer() to loader), + bufferSize = bufferSize + ) + + fun singleLoader( + name: String, + extractor: DataExtractor, + transformer: DataTransformer, + loader: DataLoader, + bufferSize: Int = 2000 + ) = EtlPipelineImpl( + name = name, + extractor = extractor, + loaders = listOf(transformer to loader), + bufferSize = bufferSize + ) + + fun multiLoaders( + name: String, + extractor: DataExtractor, + loaders: List>, + bufferSize: Int = 2000 + ) = EtlPipelineImpl( + name = name, + extractor = extractor, + loaders = loaders.map { NopTransformer() to it }, + bufferSize = bufferSize + ) + } + override suspend fun execute( groupId: String, sinceTimestampPerLoader: Map, untilTimestamp: Instant, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit, - onLoadCompleted: suspend (String, EtlLoadingResult) -> Unit + onExtractingProgress: suspend (EtlExtractingResult) -> Unit, + onLoadingProgress: suspend (String, EtlLoadingResult) -> Unit, + onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit ): EtlProcessingResult = withContext(Dispatchers.IO) { val minProcessedTime = sinceTimestampPerLoader.values.min() - logger.debug { "ETL pipeline [$name] starting for group [$groupId] since $minProcessedTime..." } - var results = EtlLoadingResult(status = EtlStatus.EXTRACTING, lastProcessedAt = minProcessedTime) + logger.debug { "ETL pipeline [$name] for group [$groupId] starting since $minProcessedTime..." } + var results = EtlLoadingResult(lastProcessedAt = minProcessedTime) val duration = measureTimeMillis { results = processEtl( groupId, minProcessedTime, sinceTimestampPerLoader, untilTimestamp, - onExtractCompleted, - onLoadCompleted + onExtractingProgress, + onLoadingProgress, + onStatusChanged ) } logger.debug { - if (results.processedRows == 0L && results.status == EtlStatus.SUCCESS) { + if (results.processedRows == 0L && !results.isFailed) { "ETL pipeline [$name] for group [$groupId] completed in ${duration}ms, no new rows" } else { val errors = results.errorMessage?.let { ", errors: $it" } ?: "" - "ETL pipeline [$name] for group [$groupId] completed in ${duration}ms, rows processed: ${results.processedRows}, status: ${results.status}" + errors + "ETL pipeline [$name] for group [$groupId] completed in ${duration}ms, rows processed: ${results.processedRows}" + errors } } EtlProcessingResult( @@ -75,13 +119,13 @@ class EtlPipelineImpl( pipelineName = name, lastProcessedAt = results.lastProcessedAt, rowsProcessed = results.processedRows, - status = results.status, - errorMessage = results.errorMessage + errorMessage = results.errorMessage, + status = if (results.isFailed) EtlStatus.FAILED else EtlStatus.SUCCESS ) } override suspend fun cleanUp(groupId: String) { - loaders.forEach { it.deleteAll(groupId) } + loaders.forEach { it.second.deleteAll(groupId) } } private suspend fun CoroutineScope.processEtl( @@ -89,8 +133,9 @@ class EtlPipelineImpl( extractorSinceTimestamp: Instant, sinceTimestampPerLoader: Map, untilTimestamp: Instant, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit, - onLoadCompleted: suspend (String, EtlLoadingResult) -> Unit + onExtractingProgress: suspend (EtlExtractingResult) -> Unit, + onLoadingProgress: suspend (loaderName: String, EtlLoadingResult) -> Unit, + onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit ): EtlLoadingResult { val flow = CompletableSharedFlow( replay = 0, @@ -98,10 +143,19 @@ class EtlPipelineImpl( ) return loaders.map { loader -> async { - loadData(groupId, loader, sinceTimestampPerLoader[loader.name] ?: extractorSinceTimestamp, untilTimestamp, flow, onLoadCompleted) + loadData( + groupId, + loader.first, + loader.second, + sinceTimestampPerLoader[loader.second.name] ?: extractorSinceTimestamp, + untilTimestamp, + flow, + onLoadingProgress, + onStatusChanged + ) } }.also { jobs -> - extractData(groupId, flow, jobs, extractorSinceTimestamp, untilTimestamp, onExtractCompleted) + extractData(groupId, flow, jobs, extractorSinceTimestamp, untilTimestamp, onExtractingProgress) }.awaitAll().max() } @@ -111,12 +165,19 @@ class EtlPipelineImpl( jobs: List>, sinceTimestamp: Instant, untilTimestamp: Instant, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit, + onExtractingProgress: suspend (EtlExtractingResult) -> Unit, ) { try { // Start extractor only after all jobs are ready to consume data otherwise data may be lost flow.waitForSubscribers(jobs.count { it.isActive }) - extractor.extract(groupId, sinceTimestamp, untilTimestamp, flow, onExtractCompleted) + extractor.extract(groupId, sinceTimestamp, untilTimestamp, flow, onExtractingProgress) + } catch (e: Throwable) { + logger.debug(e) { "ETL pipeline [$name] for group [$groupId] failed for extractor [${extractor.name}]: ${e.message}" } + onExtractingProgress( + EtlExtractingResult( + errorMessage = "Error during extracting data with extractor ${extractor.name}: ${e.message ?: e.javaClass.simpleName}", + ) + ) } finally { // Complete the flow to signal jobs that extraction is done flow.complete() @@ -125,19 +186,26 @@ class EtlPipelineImpl( private suspend fun loadData( groupId: String, - loader: DataLoader, + transformer: DataTransformer, + loader: DataLoader, sinceTimestamp: Instant, untilTimestamp: Instant, flow: Flow, - onLoadCompleted: suspend (String, EtlLoadingResult) -> Unit + onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit, + onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit ): EtlLoadingResult = try { - loader.load(groupId, sinceTimestamp, untilTimestamp, flow) { onLoadCompleted(loader.name, it) } + transformer.transform(groupId, flow).let { flow -> + loader.load( + groupId, sinceTimestamp, untilTimestamp, flow, + onLoadingProgress = { onLoadingProgress(loader.name, it) }, + onStatusChanged = { onStatusChanged(loader.name, it) } + ) + } } catch (e: Throwable) { - logger.debug(e) { "ETL pipeline [$name] failed for loader [${loader.name}]: ${e.message}" } + logger.debug(e) { "ETL pipeline [$name] for group [$groupId] failed for loader [${loader.name}]: ${e.message}" } EtlLoadingResult( - status = EtlStatus.FAILED, errorMessage = "Error during loading data with loader ${loader.name}: ${e.message ?: e.javaClass.simpleName}", lastProcessedAt = sinceTimestamp - ).also { onLoadCompleted(loader.name, it) } + ).also { onLoadingProgress(loader.name, it) } } } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt new file mode 100644 index 000000000..ba0372c0f --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt @@ -0,0 +1,154 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl.impl + +import com.epam.drill.admin.etl.DataExtractor +import com.epam.drill.admin.etl.EtlExtractingResult +import com.epam.drill.admin.etl.EtlRow +import kotlinx.coroutines.flow.FlowCollector +import mu.KotlinLogging +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration.Companion.seconds + +abstract class PageDataExtractor( + override val name: String, + open val extractionLimit: Int, + private val loggingFrequency: Int = 10, +) : DataExtractor { + private val logger = KotlinLogging.logger {} + + override suspend fun extract( + groupId: String, + sinceTimestamp: Instant, + untilTimestamp: Instant, + emitter: FlowCollector, + onExtractingProgress: suspend (EtlExtractingResult) -> Unit + ) { + var currentSince = sinceTimestamp + val page = AtomicInteger(0) + val rowsFetched = AtomicLong(0) + var hasMore = true + val buffer: MutableList = mutableListOf() + val isExecutingQuery = AtomicBoolean(true) + + trackProgressOf { + try { + while (hasMore && currentSince < untilTimestamp) { + page.incrementAndGet() + logger.debug { "ETL extractor [$name] for group [$groupId] is executing query for page ${page.get()} since $currentSince ..." } + + var previousTimestamp: Instant? = null + var previousEmittedTimestamp: Instant? = null + var pageRows = 0L + + isExecutingQuery.set(true) + extractPage( + groupId = groupId, + sinceTimestamp = currentSince, + untilTimestamp = untilTimestamp, + limit = extractionLimit, + onExtractionExecuted = { duration -> + logger.debug { "ETL extractor [$name] for group [$groupId] executed query for page ${page.get()} in ${duration}ms " } + onExtractingProgress( + EtlExtractingResult( + duration = duration + ) + ) + isExecutingQuery.set(false) + }, + rowsExtractor = { row -> + pageRows++ + val currentTimestamp = row.timestamp + + if (previousTimestamp != null && currentTimestamp < previousTimestamp) { + throw IllegalStateException("Timestamps in the extracted data are not in ascending order: $currentTimestamp < $previousTimestamp") + } + + if (buffer.isNotEmpty() && previousTimestamp != null && currentTimestamp != previousTimestamp) { + // Emit buffered rows when timestamp changes + emitBuffer(buffer, emitter) + previousEmittedTimestamp = previousTimestamp + } + + buffer.add(row) + + previousTimestamp = currentTimestamp + rowsFetched.incrementAndGet() + } + ) + if (pageRows == 0L || pageRows < extractionLimit) { + hasMore = false + emitBuffer(buffer, emitter) + logger.debug { "ETL extractor [$name] for group [$groupId] completed fetching" + + ", rows fetched: ${rowsFetched.get()}" + + ", total pages: ${page.get()}" + + ", last extracted at $currentSince" } + } else { + currentSince = previousEmittedTimestamp ?: throw IllegalStateException( + "No rows were emitted on page $page because all fetched records had the same timestamp. Please increase the extraction limit. Current is $extractionLimit." + ) + // Remove rows from buffer that have timestamp greater than currentSince to avoid re-emission on the next page + buffer.removeIf { it.timestamp > currentSince } + hasMore = true + logger.debug { "ETL extractor [$name] for group [$groupId] fetched $pageRows rows on page ${page.get()}, last extracted at $currentSince" } + } + } + } catch (e: Exception) { + logger.error { + "Error during data extraction with extractor [$name]: ${e.message ?: e.javaClass.simpleName}" + } + onExtractingProgress( + EtlExtractingResult( + errorMessage = e.message + ) + ) + } + }.every(loggingFrequency.seconds) { + if (isExecutingQuery.get()) { + logger.debug { + "ETL extractor [$name] for group [$groupId] is still executing query for page ${page.get()} ..." + } + } else { + logger.debug { + "ETL extractor [$name] for group [$groupId] fetched ${rowsFetched.get()} rows" + + ", page: ${page.get()}" + } + } + } + } + + private suspend fun emitBuffer( + buffer: MutableList, emitter: FlowCollector + ) { + if (buffer.isEmpty()) return + for (bufferedRow in buffer) { + emitter.emit(bufferedRow) + } + buffer.clear() + } + + abstract suspend fun extractPage( + groupId: String, + sinceTimestamp: Instant, + untilTimestamp: Instant, + limit: Int, + onExtractionExecuted: suspend (duration: Long) -> Unit, + rowsExtractor: suspend (row: T) -> Unit + ) +} \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PreparedSql.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PreparedSql.kt index d3887b543..a0ca77084 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PreparedSql.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PreparedSql.kt @@ -15,7 +15,9 @@ */ package com.epam.drill.admin.etl.impl -interface PreparedSql { +import com.epam.drill.admin.etl.EtlRow + +interface PreparedSql { fun getSql(): String fun getArgs(row: T): List } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/ProcessTracker.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/ProcessTracker.kt new file mode 100644 index 000000000..6f794fb13 --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/ProcessTracker.kt @@ -0,0 +1,40 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl.impl + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlin.time.Duration + +class ProgressTracker(val job: suspend () -> Unit) { + suspend fun every(duration: Duration, track: () -> Unit) = coroutineScope { + val trackingJob = launch { + while (isActive) { + delay(duration.inWholeMilliseconds) + track() + } + } + try { + job() + } finally { + trackingJob.cancel() + } + } +} + +fun trackProgressOf(job: suspend () -> Unit) = ProgressTracker(job) \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataExtractor.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataExtractor.kt index efa34a1ff..25ee09e5e 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataExtractor.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataExtractor.kt @@ -15,10 +15,9 @@ */ package com.epam.drill.admin.etl.impl -import com.epam.drill.admin.etl.DataExtractor -import com.epam.drill.admin.etl.EtlExtractingResult +import com.epam.drill.admin.etl.EtlRow +import com.epam.drill.admin.etl.UntypedRow import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.FlowCollector import mu.KotlinLogging import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.TextColumnType @@ -29,33 +28,45 @@ import java.time.Instant import kotlin.system.measureTimeMillis import kotlin.use -abstract class SqlDataExtractor( +abstract class SqlDataExtractor( override val name: String, + override val extractionLimit: Int, open val sqlQuery: String, open val database: Database, open val fetchSize: Int, -) : DataExtractor { + open val loggingFrequency: Int, +) : PageDataExtractor(name, extractionLimit, loggingFrequency) { private val logger = KotlinLogging.logger {} - override suspend fun extract( + override suspend fun extractPage( groupId: String, sinceTimestamp: Instant, untilTimestamp: Instant, - emitter: FlowCollector, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit + limit: Int, + onExtractionExecuted: suspend (Long) -> Unit, + rowsExtractor: suspend (T) -> Unit ) { val preparedSql = prepareSql(sqlQuery) - val params = mapOf( - "group_id" to groupId, - "since_timestamp" to java.sql.Timestamp.from(sinceTimestamp), - "until_timestamp" to java.sql.Timestamp.from(untilTimestamp) - ) + execSuspend( sql = preparedSql.getSql(), - args = preparedSql.getArgs(params), + args = preparedSql.getArgs( + UntypedRow(sinceTimestamp, mapOf( + "group_id" to groupId, + "since_timestamp" to java.sql.Timestamp.from(sinceTimestamp), + "until_timestamp" to java.sql.Timestamp.from(untilTimestamp), + "limit" to limit, + )) + ), ) { rs, duration -> - onExtractCompleted(EtlExtractingResult(success = true, duration = duration)) - collectInFlow(rs, emitter) + val meta = rs.metaData + val columnCount = meta.columnCount + + onExtractionExecuted(duration) + while (rs.next()) { + val row = parseRow(rs, meta, columnCount) + rowsExtractor(row) + } } } @@ -81,11 +92,9 @@ abstract class SqlDataExtractor( } val resultSet: ResultSet - logger.debug { "ETL extractor [$name] extracting rows..." } val duration = measureTimeMillis { resultSet = stmt.executeQuery() } - logger.debug { "ETL extractor [$name] extracted rows in ${duration}ms" } resultSet.use { rs -> collect(rs, duration) } @@ -95,16 +104,8 @@ abstract class SqlDataExtractor( } } - private suspend fun collectInFlow(rs: ResultSet, emitter: FlowCollector) { - val meta = rs.metaData - val columnCount = rs.metaData.columnCount - while (rs.next()) { - emitter.emit(parseRow(rs, meta, columnCount)) - } - } - abstract fun parseRow(rs: ResultSet, meta: ResultSetMetaData, columnCount: Int): T - abstract fun prepareSql(sql: String): PreparedSql> + abstract fun prepareSql(sql: String): PreparedSql } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt index 5863586c4..445cf6b5e 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt @@ -15,6 +15,7 @@ */ package com.epam.drill.admin.etl.impl +import com.epam.drill.admin.etl.EtlRow import kotlinx.coroutines.Dispatchers import mu.KotlinLogging import org.jetbrains.exposed.sql.Database @@ -26,13 +27,14 @@ import org.jetbrains.exposed.sql.statements.StatementType import org.jetbrains.exposed.sql.statements.api.PreparedStatementApi import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction -abstract class SqlDataLoader( +abstract class SqlDataLoader( override val name: String, override val batchSize: Int, + override val loggingFrequency: Int, open val sqlUpsert: String, open val sqlDelete: String, open val database: Database -) : BatchDataLoader(name, batchSize) { +) : BatchDataLoader(name, batchSize, loggingFrequency) { private val logger = KotlinLogging.logger {} abstract fun prepareSql(sql: String): PreparedSql diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedAggregationTransformer.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedAggregationTransformer.kt new file mode 100644 index 000000000..b4781aa04 --- /dev/null +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedAggregationTransformer.kt @@ -0,0 +1,102 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl.impl + +import com.epam.drill.admin.etl.DataTransformer +import com.epam.drill.admin.etl.UntypedRow +import com.epam.drill.admin.etl.flow.LruMap +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import mu.KotlinLogging +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration.Companion.seconds + +class UntypedAggregationTransformer( + override val name: String, + private val bufferSize: Int, + private val loggingFrequency: Int = 10, + private val groupKeys: List, + private val aggregate: (current: UntypedRow, next: UntypedRow) -> UntypedRow +) : DataTransformer { + private val logger = KotlinLogging.logger {} + + override suspend fun transform( + groupId: String, + collector: Flow + ): Flow = flow { + val transformedRows = AtomicLong(0) + val emittedRows = AtomicLong(0) + fun getAggregationRatio(): Double = + if (transformedRows.get() == 0L) 0.0 + else (1 - emittedRows.toDouble() / transformedRows.get()) + + val emittingChannel = Channel(capacity = bufferSize) + suspend fun drainChannel() { + var next = emittingChannel.tryReceive().getOrNull() + while (next != null) { + if (emittedRows.get() == 0L) + logger.debug { "ETL transformer [$name] for group [$groupId] started emitting aggregated rows..." } + emittedRows.incrementAndGet() + emit(next) + next = emittingChannel.tryReceive().getOrNull() + } + } + + val buffer = LruMap, UntypedRow>(maxSize = bufferSize) { _, value -> + emittingChannel.trySendBlocking(value) + } + + trackProgressOf { + try { + collector.collect { row -> + if (transformedRows.get() == 0L) + logger.debug { "ETL transformer [$name] for group [$groupId] started transformation..." } + + val groupKey = groupKeys.map { row[it] } + buffer.compute(groupKey) { value -> + if (value == null) { + row + } else { + aggregate(value, row) + } + } + drainChannel() + transformedRows.incrementAndGet() + } + + // Emit remaining aggregated rows + buffer.evictAll() + drainChannel() + } finally { + emittingChannel.close() + } + }.every(loggingFrequency.seconds) { + if (transformedRows.get() > 0L) + logger.debug { + "ETL transformer [$name] for group [$groupId] transformed ${transformedRows.get()} rows" + + ", aggregation ratio: ${getAggregationRatio()}" + } + } + if (transformedRows.get() > 0L) { + logger.debug { + "ETL transformer [$name] for group [$groupId] completed transformation for $transformedRows rows, " + + "aggregation ratio: ${getAggregationRatio()}" + } + } + } +} \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSql.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSql.kt index 2db33411e..f504d0bde 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSql.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSql.kt @@ -15,14 +15,18 @@ */ package com.epam.drill.admin.etl.impl -class UntypedPreparedSql(val preparedSql: String, val indexes: List) : PreparedSql> { +import com.epam.drill.admin.etl.UntypedRow + +class UntypedPreparedSql(val preparedSql: String, val indexes: List) : PreparedSql { override fun getSql() = preparedSql - override fun getArgs(row: Map): List { - return indexes.map { row[it] } + override fun getArgs(row: UntypedRow): List { + return indexes.map { + row[it] + } } companion object { - fun prepareSql(sql: String): PreparedSql> { + fun prepareSql(sql: String): PreparedSql { val regex = Regex("""(?() diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataExtractor.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataExtractor.kt index a4f042eb9..308b85e38 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataExtractor.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataExtractor.kt @@ -15,23 +15,29 @@ */ package com.epam.drill.admin.etl.impl +import com.epam.drill.admin.etl.UntypedRow import org.jetbrains.exposed.sql.Database import org.postgresql.util.PGobject import java.sql.ResultSet import java.sql.ResultSetMetaData +import java.time.Instant +import java.util.Date class UntypedSqlDataExtractor( name: String, sqlQuery: String, database: Database, - fetchSize: Int = 2000 -) : SqlDataExtractor>(name, sqlQuery, database, fetchSize) { + fetchSize: Int = 2000, + extractionLimit: Int = 1_000_000, + loggingFrequency: Int = 10, + private val lastExtractedAtColumnName: String, +) : SqlDataExtractor(name, extractionLimit, sqlQuery, database, fetchSize, loggingFrequency) { - override fun prepareSql(sql: String): PreparedSql> { + override fun prepareSql(sql: String): PreparedSql { return UntypedPreparedSql.prepareSql(sql) } - override fun parseRow(rs: ResultSet, meta: ResultSetMetaData, columnCount: Int): Map { + override fun parseRow(rs: ResultSet, meta: ResultSetMetaData, columnCount: Int): UntypedRow { val row = mutableMapOf() for (i in 1..columnCount) { val columnName = meta.getColumnName(i) @@ -55,6 +61,16 @@ class UntypedSqlDataExtractor( } row[columnName] = value } - return row + return UntypedRow(getLastExtractedTimestamp(row), row) + } + + private fun getLastExtractedTimestamp(args: Map): Instant { + val timestamp = args[lastExtractedAtColumnName] + return when (timestamp) { + is Instant -> timestamp + is Date -> timestamp.toInstant() + is String -> Instant.parse(timestamp) + else -> throw IllegalStateException("Could not extract timestamp column $lastExtractedAtColumnName from row: $args") + } } } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt index 35251187b..bf038582c 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt @@ -15,6 +15,7 @@ */ package com.epam.drill.admin.etl.impl +import com.epam.drill.admin.etl.UntypedRow import org.jetbrains.exposed.sql.Database import java.time.Instant import java.util.Date @@ -24,26 +25,16 @@ class UntypedSqlDataLoader( sqlUpsert: String, sqlDelete: String, database: Database, - private val lastExtractedAtColumnName: String, batchSize: Int = 1000, - val processable: (Map) -> Boolean = { true } -) : SqlDataLoader>(name, batchSize, sqlUpsert, sqlDelete, database) { + loggingFrequency: Int = 10, + val processable: (UntypedRow) -> Boolean = { true } +) : SqlDataLoader(name, batchSize, loggingFrequency, sqlUpsert, sqlDelete, database) { - override fun prepareSql(sql: String): PreparedSql> { + override fun prepareSql(sql: String): PreparedSql { return UntypedPreparedSql.prepareSql(sql) } - override fun getLastExtractedTimestamp(args: Map): Instant? { - val timestamp = args[lastExtractedAtColumnName] - return when (timestamp) { - is Instant -> timestamp - is Date -> timestamp.toInstant() - is String -> Instant.parse(timestamp) - else -> null - } - } - - override fun isProcessable(args: Map): Boolean { + override fun isProcessable(args: UntypedRow): Boolean { return processable(args) } } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/table/EtlMetadataTable.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/table/EtlMetadataTable.kt index a9ce6dcb7..932e8211d 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/table/EtlMetadataTable.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/table/EtlMetadataTable.kt @@ -30,9 +30,11 @@ class EtlMetadataTable(tableName: String) : Table(tableName) { val status = varchar("status", 50) val lastProcessedAt = timestamp("last_processed_at") val lastRunAt = timestamp("last_run_at") - val lastDuration = long("last_duration") + val lastExtractDuration = long("last_extract_duration") + val lastLoadDuration = long("last_load_duration") val lastRowsProcessed = long("last_rows_processed") - val duration = long("duration").default(0L) + val extractDuration = long("extract_duration").default(0L) + val loadDuration = long("load_duration").default(0L) val rowsProcessed = long("rows_processed").default(0L) val errorMessage = text("error_message").nullable() diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt index 0b1de1881..86b5dd53f 100644 --- a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger import kotlin.test.assertEquals import kotlin.test.assertTrue -data class SimpleClass(val id: Int, val createdAt: Instant) +data class SimpleClass(val id: Int, val createdAt: Instant): EtlRow(createdAt) private const val SIMPLE_PIPELINE = "simple-pipeline" private const val SIMPLE_EXTRACTOR = "simple-extractor" @@ -50,7 +50,7 @@ class ETLSimpleTest { sinceTimestamp: Instant, untilTimestamp: Instant, emitter: FlowCollector, - onExtractCompleted: suspend (EtlExtractingResult) -> Unit + onExtractingProgress: suspend (EtlExtractingResult) -> Unit ) { return dataStore.filter { it.createdAt > sinceTimestamp }.forEach { emitter.emit(it) } } @@ -63,7 +63,8 @@ class ETLSimpleTest { sinceTimestamp: Instant, untilTimestamp: Instant, collector: Flow, - onLoadCompleted: suspend (EtlLoadingResult) -> Unit + onLoadingProgress: suspend (EtlLoadingResult) -> Unit, + onStatusChanged: suspend (EtlStatus) -> Unit ): EtlLoadingResult { var lastExtracted: SimpleClass? = null var rowsProcessed = 0L @@ -72,11 +73,11 @@ class ETLSimpleTest { rowsProcessed++ } return EtlLoadingResult( - status = EtlStatus.SUCCESS, lastProcessedAt = lastExtracted?.createdAt ?: sinceTimestamp, processedRows = rowsProcessed ).also { - onLoadCompleted(it) + onLoadingProgress(it) + onStatusChanged(EtlStatus.SUCCESS) } } @@ -92,13 +93,13 @@ class ETLSimpleTest { sinceTimestamp: Instant, untilTimestamp: Instant, collector: Flow, - onLoadCompleted: suspend (EtlLoadingResult) -> Unit + onLoadingProgress: suspend (EtlLoadingResult) -> Unit, + onStatusChanged: suspend (EtlStatus) -> Unit ): EtlLoadingResult { collector.collect { throw RuntimeException("Simulated loader failure") } return EtlLoadingResult( - status = EtlStatus.FAILED, errorMessage = "This should never be returned", lastProcessedAt = sinceTimestamp) } @@ -114,10 +115,7 @@ class ETLSimpleTest { pipelineName = SIMPLE_PIPELINE, lastProcessedAt = Instant.EPOCH, lastRunAt = Instant.EPOCH, - lastDuration = 0, - lastRowsProcessed = 0, status = EtlStatus.SUCCESS, - errorMessage = null, extractorName = SIMPLE_EXTRACTOR, loaderName = SIMPLE_LOADER ) @@ -145,10 +143,7 @@ class ETLSimpleTest { pipelineName = pipelineName, lastProcessedAt = Instant.EPOCH, lastRunAt = Instant.EPOCH, - lastDuration = 0, - lastRowsProcessed = 0, status = EtlStatus.SUCCESS, - errorMessage = null, extractorName = SIMPLE_EXTRACTOR, loaderName = SIMPLE_LOADER ) @@ -166,34 +161,49 @@ class ETLSimpleTest { return listOf(metadata).filter { it.groupId == groupId } } - override suspend fun accumulateMetadata(metadata: EtlMetadata) { + override suspend fun accumulateMetadataByLoader( + groupId: String, + pipelineName: String, + extractorName: String, + loaderName: String, + lastProcessedAt: Instant?, + status: EtlStatus?, + loadDuration: Long, + rowsProcessed: Long, + errorMessage: String? + ) { this.metadata = this.metadata.copy( - lastProcessedAt = metadata.lastProcessedAt, - lastRunAt = metadata.lastRunAt, - lastDuration = this.metadata.lastDuration + metadata.lastDuration, - lastRowsProcessed = this.metadata.lastRowsProcessed + metadata.lastRowsProcessed, - status = metadata.status, - errorMessage = metadata.errorMessage + lastProcessedAt = lastProcessedAt ?: this.metadata.lastProcessedAt, + lastLoadDuration = this.metadata.lastLoadDuration + loadDuration, + lastRowsProcessed = this.metadata.lastRowsProcessed + rowsProcessed, + status = status ?: (if (errorMessage != null) EtlStatus.FAILED else this.metadata.status), + errorMessage = errorMessage ) } - override suspend fun accumulateMetadataDurationByExtractor( + override suspend fun accumulateMetadataByExtractor( groupId: String, pipelineName: String, extractorName: String, - duration: Long + status: EtlStatus?, + extractDuration: Long, + errorMessage: String? ) { - + this.metadata = this.metadata.copy( + lastExtractDuration = this.metadata.lastExtractDuration + extractDuration, + status = if (errorMessage != null) EtlStatus.FAILED else this.metadata.status, + errorMessage = errorMessage + ) } } val simpleOrchestrator = EtlOrchestratorImpl( "simple-etl", listOf( - EtlPipelineImpl( + EtlPipelineImpl.singleLoader( "simple-pipeline", extractor = SimpleExtractor(), - loaders = listOf(SimpleLoader()) + loader = SimpleLoader() ) ), metadataRepository = SimpleMetadataRepository() @@ -240,10 +250,10 @@ class ETLSimpleTest { val orchestrator = EtlOrchestratorImpl( "failed-etl", listOf( - EtlPipelineImpl( + EtlPipelineImpl.singleLoader( "failed-pipeline", extractor = SimpleExtractor(), - loaders = listOf(FailingLoader()) + loader = FailingLoader() ) ), metadataRepository = SimpleMetadataRepository() diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt index ecd185413..65ea6ed5d 100644 --- a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt @@ -15,7 +15,7 @@ */ package com.epam.drill.admin.etl.impl -import com.epam.drill.admin.etl.EtlLoadingResult +import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking @@ -25,7 +25,7 @@ import kotlin.test.assertEquals class BatchDataLoaderTest { - private class TestItem(val timestamp: Instant, val data: String, val processable: Boolean = true) + private class TestItem(timestamp: Instant, val data: String, val processable: Boolean = true) : EtlRow(timestamp) private class TestBatchDataLoader( batchSize: Int, @@ -36,12 +36,6 @@ class BatchDataLoaderTest { val loadedBatches = mutableListOf>() - override fun getLastExtractedTimestamp(args: TestItem): Instant? { - if (failOnTimestamp) return null - if (outOfOrder && loadedBatches.isNotEmpty()) return args.timestamp.minusSeconds(1) - return args.timestamp - } - override fun isProcessable(args: TestItem): Boolean = args.processable override suspend fun loadBatch(groupId: String, batch: List, batchNo: Int): BatchResult { @@ -60,7 +54,7 @@ class BatchDataLoaderTest { fun `load should handle empty flow`() = runBlocking { val loader = TestBatchDataLoader(batchSize = 10) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf()) { } - assertEquals(EtlStatus.SUCCESS, result.status) + assertEquals(false, result.isFailed) assertEquals(0, result.processedRows) assertEquals(0, loader.loadedBatches.size) } @@ -71,7 +65,7 @@ class BatchDataLoaderTest { val loader = TestBatchDataLoader(batchSize = 10) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } - assertEquals(EtlStatus.SUCCESS, result.status) + assertEquals(false, result.isFailed) assertEquals(5, result.processedRows) assertEquals(1, loader.loadedBatches.size) assertEquals(5, loader.loadedBatches[0].size) @@ -81,20 +75,27 @@ class BatchDataLoaderTest { fun `load should process multiple batches`() = runBlocking { val items = (1..25).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } val loader = TestBatchDataLoader(batchSize = 10) - val results = mutableListOf() - val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { results.add(it) } + val results = mutableListOf() + val result = loader.load( + "test-group", Instant.EPOCH, Instant.now(), + collector = flowOf(*items.toTypedArray()), + onLoadingProgress = { result -> + if (result.isFailed) + results.add(EtlStatus.FAILED) + }, + onStatusChanged = { + results.add(it) + } + ) - assertEquals(EtlStatus.SUCCESS, result.status) + assertEquals(false, result.isFailed) assertEquals(25, result.processedRows) assertEquals(3, loader.loadedBatches.size) assertEquals(10, loader.loadedBatches[0].size) assertEquals(10, loader.loadedBatches[1].size) assertEquals(5, loader.loadedBatches[2].size) - assertEquals(3, results.size) - assertEquals(EtlStatus.LOADING, results[0].status) - assertEquals(EtlStatus.LOADING, results[1].status) - assertEquals(EtlStatus.SUCCESS, results[2].status) + assertEquals(EtlStatus.SUCCESS, results.last()) } @Test @@ -107,7 +108,7 @@ class BatchDataLoaderTest { val loader = TestBatchDataLoader(batchSize = 10) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } - assertEquals(EtlStatus.SUCCESS, result.status) + assertEquals(false, result.isFailed) assertEquals(2, result.processedRows) assertEquals(1, loader.loadedBatches.size) assertEquals(2, loader.loadedBatches[0].size) @@ -119,9 +120,9 @@ class BatchDataLoaderTest { fun `load should fail on batch processing error`() = runBlocking { val items = (1..15).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } val loader = TestBatchDataLoader(batchSize = 10, failOnBatch = 2) - val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } + val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) - assertEquals(EtlStatus.FAILED, result.status) + assertEquals(true, result.isFailed) assertEquals(10, result.processedRows) // First batch succeeds assertEquals(1, loader.loadedBatches.size) assertEquals(loader.loadedBatches[0].last().timestamp, result.lastProcessedAt) @@ -136,7 +137,7 @@ class BatchDataLoaderTest { val loader = TestBatchDataLoader(batchSize = 10) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } - assertEquals(EtlStatus.FAILED, result.status) + assertEquals(true, result.isFailed) } @Test @@ -146,7 +147,7 @@ class BatchDataLoaderTest { val loader = TestBatchDataLoader(batchSize = 10) val result = loader.load("test-group", since, Instant.now(), flowOf(*items.toTypedArray())) { } - assertEquals(EtlStatus.SUCCESS, result.status) + assertEquals(false, result.isFailed) assertEquals(5, result.processedRows) assertEquals(1, loader.loadedBatches.size) assertEquals(5, loader.loadedBatches[0].size) diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractorTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractorTest.kt new file mode 100644 index 000000000..f9a5c485b --- /dev/null +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractorTest.kt @@ -0,0 +1,250 @@ +/** + * Copyright 2020 - 2022 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.drill.admin.etl.impl + +import com.epam.drill.admin.etl.EtlExtractingResult +import com.epam.drill.admin.etl.EtlRow +import com.epam.drill.admin.etl.impl.PageDataExtractorTest.TestPageDataExtractor.ExtractedPageInfo +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.runBlocking +import java.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class PageDataExtractorTest { + + private class TestItem(timestamp: Instant, val data: String) : EtlRow(timestamp) + + private class TestPageDataExtractor( + extractionLimit: Int, + private val data: List = emptyList(), + ) : PageDataExtractor("test-extractor", extractionLimit) { + + val extractedPages = mutableListOf() + + data class ExtractedPageInfo( + val groupId: String, + val sinceTimestamp: Instant, + val untilTimestamp: Instant, + val limit: Int, + val pageNumber: Int + ) + + override suspend fun extractPage( + groupId: String, + sinceTimestamp: Instant, + untilTimestamp: Instant, + limit: Int, + onExtractionExecuted: suspend (duration: Long) -> Unit, + rowsExtractor: suspend (row: TestItem) -> Unit + ) { + val info = ExtractedPageInfo( + groupId = groupId, + sinceTimestamp = sinceTimestamp, + untilTimestamp = untilTimestamp, + limit = limit, + pageNumber = extractedPages.size + 1 + ) + extractedPages.add(info) + onExtractionExecuted(100L) + val page = data.filter { it.timestamp > sinceTimestamp && it.timestamp <= untilTimestamp }.take(limit) + for (item in page) { + rowsExtractor(item) + } + } + } + + @Test + fun `extract should handle empty data`() = runBlocking { + val extractor = TestPageDataExtractor( + extractionLimit = 10, + data = emptyList() + ) + + val emittedItems = mutableListOf() + val emitter = FlowCollector { emittedItems.add(it) } + + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = {} + ) + + assertEquals(0, emittedItems.size) + assertEquals(1, extractor.extractedPages.size) + } + + @Test + fun `extract should process single page when rows less than limit`() = runBlocking { + val items = (1..5).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } + val extractor = TestPageDataExtractor( + extractionLimit = 10, + data = items + ) + + val emittedItems = mutableListOf() + val emitter = FlowCollector { emittedItems.add(it) } + + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = {} + ) + + assertEquals(5, emittedItems.size) + assertEquals(1, extractor.extractedPages.size) + assertEquals("item1", emittedItems[0].data) + assertEquals("item5", emittedItems[4].data) + } + + @Test + fun `extract should process multiple pages`() = runBlocking { + // First page: 10 items (limit reached) + val page1 = (1..10).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } + // Second page: 10 items (limit reached) + val page2 = (11..20).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } + // Third page: 5 items (less than limit, ends extraction) + val page3 = (21..25).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } + + val extractor = TestPageDataExtractor( + extractionLimit = 10, + data = listOf(page1, page2, page3).flatten() + ) + + val emittedItems = mutableListOf() + val emitter = FlowCollector { emittedItems.add(it) } + + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = {} + ) + + assertEquals(25, emittedItems.size) + assertEquals(3, extractor.extractedPages.size) + assertEquals("item1", emittedItems[0].data) + assertEquals("item25", emittedItems[24].data) + } + + @Test + fun `extract should buffer to avoid missing items with same timestamp at edges of pages`() = runBlocking { + // Multiple rows with same timestamp should be buffered and emitted together + val items = listOf( + TestItem(Instant.ofEpochSecond(1), "item1a"), + TestItem(Instant.ofEpochSecond(2), "item2a"), + TestItem(Instant.ofEpochSecond(2), "item2b"), + TestItem(Instant.ofEpochSecond(3), "item3a"), + TestItem(Instant.ofEpochSecond(3), "item3b") + ) + + val extractor = TestPageDataExtractor( + extractionLimit = 3, + data = items + ) + + val emittedItems = mutableListOf>() + val emitter = FlowCollector { + emittedItems.add(it to extractor.extractedPages.last()) + } + + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = {} + ) + + assertEquals(5, emittedItems.size) + // All items should be emitted in order + assertEquals("item1a", emittedItems[0].first.data) + assertEquals(1, emittedItems[0].second.pageNumber) + assertEquals("item2a", emittedItems[1].first.data) + assertEquals(2, emittedItems[1].second.pageNumber) + assertEquals("item2b", emittedItems[2].first.data) + assertEquals(2, emittedItems[2].second.pageNumber) + assertEquals("item3a", emittedItems[3].first.data) + assertEquals(3, emittedItems[3].second.pageNumber) + assertEquals("item3b", emittedItems[4].first.data) + assertEquals(3, emittedItems[4].second.pageNumber) + } + + @Test + fun `extract should fail when timestamps are out of order`() = runBlocking { + val items = listOf( + TestItem(Instant.ofEpochSecond(2), "item2"), + TestItem(Instant.ofEpochSecond(1), "item1") // Out of order + ) + + val extractor = TestPageDataExtractor( + extractionLimit = 10, + data = items + ) + + val emittedItems = mutableListOf() + val emitter = FlowCollector { emittedItems.add(it) } + + val progressResults = mutableListOf() + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = { progressResults.add(it) } + ) + + // Should catch the error and report it via progress callback + assertTrue(progressResults.any { it.errorMessage != null }) + assertTrue(progressResults.any { + it.errorMessage?.contains("not in ascending order") == true + }) + } + + @Test + fun `extract should throw exception when all rows have same timestamp and page is full`() = runBlocking { + // All rows with same timestamp, page is full (equals limit) + val items = (1..10).map { TestItem(Instant.ofEpochSecond(1), "item$it") } + + val extractor = TestPageDataExtractor( + extractionLimit = 10, + data = items + ) + + val emittedItems = mutableListOf() + val emitter = FlowCollector { emittedItems.add(it) } + + val progressResults = mutableListOf() + extractor.extract( + groupId = "test-group", + sinceTimestamp = Instant.EPOCH, + untilTimestamp = Instant.ofEpochSecond(100), + emitter = emitter, + onExtractingProgress = { progressResults.add(it) } + ) + + // Should report error about needing to increase extraction limit + assertTrue(progressResults.any { + it.errorMessage?.contains("increase the extraction limit") == true + }) + } +} diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSqlTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSqlTest.kt index b6466888d..477e91c0a 100644 --- a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSqlTest.kt +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/UntypedPreparedSqlTest.kt @@ -15,6 +15,8 @@ */ package com.epam.drill.admin.etl.impl +import com.epam.drill.admin.etl.UntypedRow +import java.time.Instant import kotlin.test.Test import kotlin.test.assertEquals @@ -27,7 +29,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("SELECT * FROM users WHERE id = ?", result.getSql()) - assertEquals(listOf(1), result.getArgs(mapOf("userId" to 1))) + assertEquals(listOf(1), result.getArgs(rowOf("userId" to 1))) } @Test @@ -37,7 +39,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("SELECT * FROM users WHERE name = ? AND age = ?", result.getSql()) - assertEquals(listOf("testUser", 25), result.getArgs(mapOf("userAge" to 25, "userName" to "testUser"))) + assertEquals(listOf("testUser", 25), result.getArgs(rowOf("userAge" to 25, "userName" to "testUser"))) } @Test @@ -47,7 +49,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("SELECT * FROM users WHERE id = ? OR parent_id = ?", result.getSql()) - assertEquals(listOf(1, 1), result.getArgs(mapOf("id" to 1))) + assertEquals(listOf(1, 1), result.getArgs(rowOf("id" to 1))) } @Test @@ -57,7 +59,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("SELECT * FROM users WHERE data::jsonb @> ?", result.getSql()) - assertEquals(listOf("testFilter"), result.getArgs(mapOf("filter" to "testFilter"))) + assertEquals(listOf("testFilter"), result.getArgs(rowOf("filter" to "testFilter"))) } @Test @@ -67,7 +69,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("SELECT * FROM users WHERE id = 1", result.getSql()) - assertEquals(emptyList(), result.getArgs(mapOf("id" to 1))) + assertEquals(emptyList(), result.getArgs(rowOf("id" to 1))) } @Test @@ -77,7 +79,7 @@ class UntypedPreparedSqlTest { val result = UntypedPreparedSql.prepareSql(sql) assertEquals("INSERT INTO table (col1, col2) VALUES (?, ?)", result.getSql()) - assertEquals(listOf(1, 2), result.getArgs(mapOf("param_1" to 1, "param2_test" to 2))) + assertEquals(listOf(1, 2), result.getArgs(rowOf("param_1" to 1, "param2_test" to 2))) } @@ -106,7 +108,7 @@ class UntypedPreparedSqlTest { assertEquals(expectedSql, result.getSql()) assertEquals( listOf("SUCCESS", "2025-01-01", "2025-12-10"), result.getArgs( - mapOf("endDate" to "2025-12-10", "status" to "SUCCESS", "startDate" to "2025-01-01") + rowOf("endDate" to "2025-12-10", "status" to "SUCCESS", "startDate" to "2025-01-01") ) ) } @@ -116,7 +118,7 @@ class UntypedPreparedSqlTest { val sql = "INSERT INTO users (name, email) VALUES (:name, :email)" val result = UntypedPreparedSql.prepareSql(sql) - val row = mapOf("name" to "John", "email" to null) + val row = rowOf("name" to "John", "email" to null) val args = result.getArgs(row) assertEquals(listOf("John", null), args) @@ -127,11 +129,13 @@ class UntypedPreparedSqlTest { val sql = "SELECT * FROM users WHERE name = :userName AND age = :userAge" val result = UntypedPreparedSql.prepareSql(sql) - val row = mapOf("userName" to "John") + val row = rowOf("userName" to "John") val args = result.getArgs(row) assertEquals(listOf("John", null), args) } + private fun rowOf(vararg entry: Pair) = UntypedRow(Instant.EPOCH, mapOf(*entry)) + } diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt index 00b126c7e..2d7e5ac3f 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt @@ -38,4 +38,22 @@ class EtlConfig(private val config: ApplicationConfig) { */ val batchSize : Int get() = config.propertyOrNull("batchSize")?.getString()?.toIntOrNull() ?: 1000 + + /** + * Sets a limit on the total number of records to be extracted by the ETL process in a single query. + */ + val extractionLimit : Int + get() = config.propertyOrNull("extractionLimit")?.getString()?.toIntOrNull() ?: 1_000_000 + + /** + * Controls the in-memory buffer capacity used for the shared flow between the transformer and loaders. + */ + val transformationBufferSize : Int + get() = config.propertyOrNull("transformationBufferSize")?.getString()?.toIntOrNull() ?: 2000 + + /** + * Defines how often the ETL process logs its progress (in seconds). + */ + val loggingFrequency : Int + get() = config.propertyOrNull("loggingFrequency")?.getString()?.toIntOrNull() ?: 10 } \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt index d8cc8633f..3a5d73f8b 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt @@ -20,6 +20,8 @@ import com.epam.drill.admin.etl.impl.EtlOrchestratorImpl import com.epam.drill.admin.etl.impl.EtlMetadataRepositoryImpl import com.epam.drill.admin.etl.EtlMetadataRepository import com.epam.drill.admin.etl.EtlOrchestrator +import com.epam.drill.admin.etl.EtlPipeline +import com.epam.drill.admin.etl.UntypedRow import com.epam.drill.admin.metrics.etl.methodsPipeline import com.epam.drill.admin.metrics.etl.buildsPipeline import com.epam.drill.admin.metrics.etl.coveragePipeline diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/BuildsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/BuildsEtl.kt index 489603d3a..3157aa972 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/BuildsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/BuildsEtl.kt @@ -27,7 +27,10 @@ val EtlConfig.buildsExtractor name = "builds", sqlQuery = fromResource("/metrics/db/etl/builds_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "updated_at", ) val EtlConfig.buildsLoader @@ -35,15 +38,15 @@ val EtlConfig.buildsLoader name = "builds", sqlUpsert = fromResource("/metrics/db/etl/builds_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/builds_delete.sql"), - lastExtractedAtColumnName = "updated_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) val EtlConfig.buildsPipeline - get() = EtlPipelineImpl( + get() = EtlPipelineImpl.singleLoader( name = "builds", extractor = buildsExtractor, - loaders = listOf(buildsLoader), + loader = buildsLoader, bufferSize = bufferSize ) \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt index 45f2015f2..ea8cde0ac 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt @@ -15,12 +15,16 @@ */ package com.epam.drill.admin.metrics.etl +import com.epam.drill.admin.etl.UntypedRow import com.epam.drill.admin.etl.impl.EtlPipelineImpl +import com.epam.drill.admin.etl.impl.UntypedAggregationTransformer import com.epam.drill.admin.etl.impl.UntypedSqlDataExtractor import com.epam.drill.admin.etl.impl.UntypedSqlDataLoader +import com.epam.drill.admin.etl.untypedNopTransformer import com.epam.drill.admin.metrics.config.EtlConfig import com.epam.drill.admin.metrics.config.MetricsDatabaseConfig import com.epam.drill.admin.metrics.config.fromResource +import org.postgresql.util.PGobject val EtlConfig.coverageExtractor @@ -28,7 +32,10 @@ val EtlConfig.coverageExtractor name = "coverage", sqlQuery = fromResource("/metrics/db/etl/coverage_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "created_at", ) val EtlConfig.buildMethodTestDefinitionCoverageLoader @@ -36,9 +43,9 @@ val EtlConfig.buildMethodTestDefinitionCoverageLoader name = "build_method_test_definition_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_test_definition_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_test_definition_coverage_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, + loggingFrequency = loggingFrequency, processable = { it["test_session_id"] != null && it["test_definition_id"] != null } ) @@ -47,40 +54,106 @@ val EtlConfig.buildMethodTestSessionCoverageLoader name = "build_method_test_session_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_test_session_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_test_session_coverage_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, + loggingFrequency = loggingFrequency, processable = { it["test_session_id"] != null } ) + +val EtlConfig.buildMethodCoverageTransformer + get() = UntypedAggregationTransformer( + name = "build_method_coverage", + bufferSize = transformationBufferSize, + loggingFrequency = loggingFrequency, + groupKeys = listOf( + "group_id", + "app_id", + "build_id", + "method_id", + "app_env_id", + "test_result", + "test_tag", + "test_task_id", + ) + ) { current, next -> + val map = HashMap(current) + map["probes"] = mergeProbes(current["probes"], next["probes"]) + map["created_at_day"] = next["created_at_day"] + UntypedRow(next.timestamp, map) + } + + val EtlConfig.buildMethodCoverageLoader get() = UntypedSqlDataLoader( name = "build_method_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_coverage_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) -val EtlConfig.methodCoverageLoader +val EtlConfig.methodDailyCoverageTransformer + get() = UntypedAggregationTransformer( + name = "method_daily_coverage", + bufferSize = transformationBufferSize, + loggingFrequency = loggingFrequency, + groupKeys = listOf( + "group_id", + "app_id", + "method_id", + "created_at_day", + "branch", + "app_env_id", + "test_result", + "test_tag", + "test_task_id" + ) + ) { current, next -> + val map = HashMap(current) + map["probes"] = mergeProbes(current["probes"], next["probes"]) + UntypedRow(next.timestamp, map) + } + +val EtlConfig.methodDailyCoverageLoader get() = UntypedSqlDataLoader( name = "method_daily_coverage", sqlUpsert = fromResource("/metrics/db/etl/method_daily_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/method_daily_coverage_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) +val EtlConfig.test2CodeMappingTransformer + get() = UntypedAggregationTransformer( + name = "test_to_code_mapping", + bufferSize = transformationBufferSize, + loggingFrequency = loggingFrequency, + groupKeys = listOf( + "group_id", + "app_id", + "signature", + "test_definition_id", + "branch", + "app_env_id", + "test_task_id", + ) + ) { current, next -> + val map = HashMap(current) + map["updated_at_day"] = next["created_at_day"] + UntypedRow(next.timestamp, map) + } + val EtlConfig.test2CodeMappingLoader get() = UntypedSqlDataLoader( name = "test_to_code_mapping", sqlUpsert = fromResource("/metrics/db/etl/test_to_code_mapping_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_to_code_mapping_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, + loggingFrequency = loggingFrequency, processable = { it["test_definition_id"] != null && it["test_result"] == "PASSED" } ) @@ -89,12 +162,36 @@ val EtlConfig.coveragePipeline name = "coverage", extractor = coverageExtractor, loaders = listOf( - buildMethodTestDefinitionCoverageLoader, - buildMethodTestSessionCoverageLoader, - buildMethodCoverageLoader, - methodCoverageLoader, - test2CodeMappingLoader, - testSessionBuildsLoader + untypedNopTransformer to buildMethodTestDefinitionCoverageLoader, + untypedNopTransformer to buildMethodTestSessionCoverageLoader, + buildMethodCoverageTransformer to buildMethodCoverageLoader, + methodDailyCoverageTransformer to methodDailyCoverageLoader, + test2CodeMappingTransformer to test2CodeMappingLoader, + untypedNopTransformer to testSessionBuildsLoader ), bufferSize = bufferSize - ) \ No newline at end of file + ) + +internal fun mergeProbes(current: Any?, next: Any?): PGobject { + if (current == null || next == null) { + throw IllegalArgumentException("Cannot merge null probes: current=$current, next=$next") + } + if (current !is PGobject || next !is PGobject) { + throw IllegalArgumentException("Probes must be of type PGobject: current=${current.javaClass.name}, next=${next.javaClass.name}") + } + val nextProbes = next.value ?: "" + val currentProbes = current.value ?: "" + if (currentProbes.length != nextProbes.length) + throw IllegalArgumentException("Cannot merge probes of different lengths: current=${currentProbes.length}, next=${nextProbes.length}") + val mergedProbes = buildString(currentProbes.length) { + for (i in 0 until currentProbes.length) { + val currentBit = currentProbes[i] + val nextBit = nextProbes[i] + append(if (currentBit == '1' || nextBit == '1') '1' else '0') + } + } + return PGobject().apply { + type = "varbit" + value = mergedProbes + } +} \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/MethodsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/MethodsEtl.kt index 921dfe61f..f8b091dac 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/MethodsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/MethodsEtl.kt @@ -15,9 +15,12 @@ */ package com.epam.drill.admin.metrics.etl +import com.epam.drill.admin.etl.UntypedRow import com.epam.drill.admin.etl.impl.EtlPipelineImpl +import com.epam.drill.admin.etl.impl.UntypedAggregationTransformer import com.epam.drill.admin.etl.impl.UntypedSqlDataExtractor import com.epam.drill.admin.etl.impl.UntypedSqlDataLoader +import com.epam.drill.admin.etl.untypedNopTransformer import com.epam.drill.admin.metrics.config.EtlConfig import com.epam.drill.admin.metrics.config.MetricsDatabaseConfig import com.epam.drill.admin.metrics.config.fromResource @@ -27,7 +30,9 @@ val EtlConfig.buildMethodsExtractor name = "build_methods", sqlQuery = fromResource("/metrics/db/etl/build_methods_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + lastExtractedAtColumnName = "created_at", ) val EtlConfig.buildMethodsLoader @@ -35,17 +40,28 @@ val EtlConfig.buildMethodsLoader name = "build_methods", sqlUpsert = fromResource("/metrics/db/etl/build_methods_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_methods_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize ) +val EtlConfig.methodLoaderTransformer + get() = UntypedAggregationTransformer( + name = "methods", + bufferSize = transformationBufferSize, + groupKeys = listOf( + "group_id", + "app_id", + "method_id" + ) + ) { current, next -> + UntypedRow(next.timestamp, next) + } + val EtlConfig.methodsLoader get() = UntypedSqlDataLoader( name = "methods", sqlUpsert = fromResource("/metrics/db/etl/methods_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/methods_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize ) @@ -54,6 +70,6 @@ val EtlConfig.methodsPipeline get() = EtlPipelineImpl( name = "methods", extractor = buildMethodsExtractor, - loaders = listOf(buildMethodsLoader, methodsLoader), + loaders = listOf(untypedNopTransformer to buildMethodsLoader, methodLoaderTransformer to methodsLoader), bufferSize = bufferSize ) \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestDefinitionsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestDefinitionsEtl.kt index d3c88843c..6abd412c7 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestDefinitionsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestDefinitionsEtl.kt @@ -27,7 +27,10 @@ val EtlConfig.testDefinitionsExtractor name = "test_definitions", sqlQuery = fromResource("/metrics/db/etl/test_definitions_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "updated_at", ) val EtlConfig.testDefinitionsLoader @@ -35,15 +38,15 @@ val EtlConfig.testDefinitionsLoader name = "test_definitions", sqlUpsert = fromResource("/metrics/db/etl/test_definitions_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_definitions_delete.sql"), - lastExtractedAtColumnName = "updated_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) val EtlConfig.testDefinitionsPipeline - get() = EtlPipelineImpl( + get() = EtlPipelineImpl.singleLoader( name = "test_definitions", extractor = testDefinitionsExtractor, - loaders = listOf(testDefinitionsLoader), + loader = testDefinitionsLoader, bufferSize = bufferSize ) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestLaunchesEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestLaunchesEtl.kt index 6eb466555..e202d346b 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestLaunchesEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestLaunchesEtl.kt @@ -27,7 +27,10 @@ val EtlConfig.testLaunchesExtractor name = "test_launches", sqlQuery = fromResource("/metrics/db/etl/test_launches_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "created_at", ) val EtlConfig.testLaunchesLoader @@ -35,15 +38,15 @@ val EtlConfig.testLaunchesLoader name = "test_launches", sqlUpsert = fromResource("/metrics/db/etl/test_launches_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_launches_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) val EtlConfig.testLaunchesPipeline - get() = EtlPipelineImpl( + get() = EtlPipelineImpl.singleLoader( name = "test_launches", extractor = testLaunchesExtractor, - loaders = listOf(testLaunchesLoader), + loader = testLaunchesLoader, bufferSize = bufferSize ) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt index b3fa0707e..bec058aca 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt @@ -27,7 +27,10 @@ val EtlConfig.testSessionBuildsExtractor name = "test_session_builds", sqlQuery = fromResource("/metrics/db/etl/test_session_builds_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "created_at", ) val EtlConfig.testSessionBuildsLoader @@ -35,16 +38,16 @@ val EtlConfig.testSessionBuildsLoader name = "test_session_builds", sqlUpsert = fromResource("/metrics/db/etl/test_session_builds_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_session_builds_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, + loggingFrequency = loggingFrequency, processable = { it["test_session_id"] != null } ) val EtlConfig.testSessionBuildsPipeline - get() = EtlPipelineImpl( + get() = EtlPipelineImpl.singleLoader( name = "test_session_builds", extractor = testSessionBuildsExtractor, - loaders = listOf(testSessionBuildsLoader), + loader = testSessionBuildsLoader, bufferSize = bufferSize ) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionsEtl.kt index 0593e7fbe..c95d8a60f 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionsEtl.kt @@ -27,7 +27,10 @@ val EtlConfig.testSessionsExtractor name = "test_sessions", sqlQuery = fromResource("/metrics/db/etl/test_sessions_extractor.sql"), database = MetricsDatabaseConfig.database, - fetchSize = fetchSize + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "created_at", ) val EtlConfig.testSessionsLoader @@ -35,15 +38,15 @@ val EtlConfig.testSessionsLoader name = "test_sessions", sqlUpsert = fromResource("/metrics/db/etl/test_sessions_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_sessions_delete.sql"), - lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, - batchSize = batchSize + batchSize = batchSize, + loggingFrequency = loggingFrequency, ) val EtlConfig.testSessionsPipeline - get() = EtlPipelineImpl( + get() = EtlPipelineImpl.singleLoader( name = "test_sessions", extractor = testSessionsExtractor, - loaders = listOf(testSessionsLoader), + loader = testSessionsLoader, bufferSize = bufferSize ) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt index ee9a8e1b0..0bbc398d2 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt @@ -601,7 +601,7 @@ class MetricsServiceImpl( val maxLastProcessedAt = metadata.maxOfOrNull { it.lastProcessedAt.toTimestamp() } val maxLastRunAt = metadata.maxOfOrNull { it.lastRunAt.toTimestamp() } val errorMessages = metadata.mapNotNull { it.errorMessage } - val sumDuration = metadata.sumOf { it.lastDuration } + val sumDuration = metadata.sumOf { it.lastLoadDuration + it.lastExtractDuration } val sumRowsProcessed = metadata.sumOf { it.lastRowsProcessed } return buildMap { diff --git a/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql index 0b295dd2e..9f601fa1a 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql @@ -25,4 +25,5 @@ WHERE m.group_id = :group_id OR r.classname_pattern IS NOT NULL AND m.classname::text ~ r.classname_pattern::text OR r.annotations_pattern IS NOT NULL AND m.annotations::text ~ r.annotations_pattern::text OR r.class_annotations_pattern IS NOT NULL AND m.class_annotations::text ~ r.class_annotations_pattern::text)) -ORDER BY m.created_at ASC, m.signature \ No newline at end of file +ORDER BY m.created_at ASC, m.signature +LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/builds_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/builds_extractor.sql index 4876b824c..2f759ff9d 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/builds_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/builds_extractor.sql @@ -18,4 +18,5 @@ FROM raw_data.builds b WHERE b.group_id = :group_id AND b.updated_at > :since_timestamp AND b.updated_at <= :until_timestamp -ORDER BY b.updated_at ASC \ No newline at end of file +ORDER BY b.updated_at ASC +LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql index baa18451f..1c0640208 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql @@ -1,11 +1,11 @@ SELECT c.group_id, c.app_id, - c.build_id, - c.app_env_id, - c.test_session_id, - c.test_launch_id, - c.method_id, + i.build_id, + i.env_id AS app_env_id, + CASE WHEN c.test_session_id = 'GLOBAL' THEN NULL ELSE c.test_session_id END AS test_session_id, + CASE WHEN c.test_id = 'TEST_CONTEXT_NONE' THEN NULL ELSE test_id END AS test_launch_id, + MD5(c.signature||':'||m.body_checksum||':'||m.probes_count) AS method_id, c.signature, b.branch, tl.test_definition_id, @@ -17,13 +17,31 @@ SELECT c.created_at, DATE_TRUNC('day', c.created_at) AS created_at_day, c.probes AS probes -FROM raw_data.view_methods_coverage_v4 c -JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id -LEFT JOIN raw_data.test_sessions ts ON ts.group_id = c.group_id AND ts.id = c.test_session_id -LEFT JOIN raw_data.test_launches tl ON tl.group_id = c.group_id AND tl.id = c.test_launch_id +FROM raw_data.method_coverage c +JOIN raw_data.methods m ON m.signature = c.signature + AND m.build_id = c.build_id + AND m.probes_count = c.probes_count + AND m.app_id = c.app_id + AND m.group_id = c.group_id + AND NOT EXISTS ( + SELECT 1 + FROM raw_data.method_ignore_rules r + WHERE r.group_id = m.group_id + AND r.app_id = m.app_id + AND (r.classname_pattern IS NOT NULL AND m.classname::text ~ r.classname_pattern::text + OR r.name_pattern IS NOT NULL AND m.name::text ~ r.name_pattern::text + OR r.annotations_pattern IS NOT NULL AND m.annotations::text ~ r.annotations_pattern::text + OR r.class_annotations_pattern IS NOT NULL AND m.class_annotations::text ~ r.class_annotations_pattern::text)) +JOIN raw_data.instances i ON i.id = c.instance_id + AND i.app_id = c.app_id + AND i.group_id = c.group_id +JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = i.build_id +LEFT JOIN raw_data.test_sessions ts ON ts.id = c.test_session_id AND ts.group_id = c.group_id +LEFT JOIN raw_data.test_launches tl ON tl.id = c.test_id AND tl.group_id = c.group_id LEFT JOIN raw_data.test_definitions td ON td.group_id = tl.group_id AND td.id = tl.test_definition_id LEFT JOIN LATERAL unnest(td.tags) AS test_tag ON TRUE -WHERE c.group_id = :group_id - AND c.created_at > :since_timestamp +WHERE c.created_at > :since_timestamp AND c.created_at <= :until_timestamp -ORDER BY c.created_at ASC, c.method_id ASC \ No newline at end of file + AND c.group_id = :group_id +ORDER BY c.created_at, c.signature +LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_definitions_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_definitions_extractor.sql index 81c00cd87..ebb7c21f8 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/test_definitions_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_definitions_extractor.sql @@ -15,3 +15,4 @@ WHERE td.group_id = :group_id AND td.updated_at > :since_timestamp AND td.updated_at <= :until_timestamp ORDER BY td.updated_at ASC, td.id ASC +LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_launches_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_launches_extractor.sql index 644a16807..f6a11d42f 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/test_launches_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_launches_extractor.sql @@ -17,4 +17,4 @@ WHERE tl.group_id = :group_id AND tl.created_at > :since_timestamp AND tl.created_at <= :until_timestamp ORDER BY tl.created_at ASC, tl.id ASC - +LIMIT :limit diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_extractor.sql index 9454d4d26..4fcf29686 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_extractor.sql @@ -11,4 +11,4 @@ WHERE b.group_id = :group_id AND b.created_at > :since_timestamp AND b.created_at <= :until_timestamp ORDER BY tsb.created_at ASC, tsb.test_session_id ASC, tsb.build_id ASC - +LIMIT :limit diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_sessions_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_sessions_extractor.sql index 8c09cb4a3..a2ba587b7 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/test_sessions_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_sessions_extractor.sql @@ -10,4 +10,5 @@ FROM raw_data.test_sessions ts WHERE ts.group_id = :group_id AND ts.created_at > :since_timestamp AND ts.created_at <= :until_timestamp -ORDER BY ts.created_at ASC, ts.id ASC \ No newline at end of file +ORDER BY ts.created_at ASC, ts.id ASC +LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/migration/V9__Add_extract_duration.sql b/admin-metrics/src/main/resources/metrics/db/migration/V9__Add_extract_duration.sql new file mode 100644 index 000000000..ca890f702 --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/migration/V9__Add_extract_duration.sql @@ -0,0 +1,14 @@ +ALTER TABLE metrics.etl_metadata +RENAME COLUMN duration TO load_duration; + +ALTER TABLE metrics.etl_metadata +RENAME COLUMN last_duration TO last_load_duration; + +ALTER TABLE metrics.etl_metadata +ADD COLUMN extract_duration BIGINT DEFAULT 0; + +ALTER TABLE metrics.etl_metadata +ADD COLUMN last_extract_duration BIGINT DEFAULT 0; + + + diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildDiffReportApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildDiffReportApiTest.kt index 13aca0d83..dfa5024aa 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildDiffReportApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/BuildDiffReportApiTest.kt @@ -20,11 +20,8 @@ import com.epam.drill.admin.test.DatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.table.* -import com.jayway.jsonpath.JsonPath -import io.ktor.client.call.body import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import org.junit.jupiter.api.AfterEach import kotlin.test.Test @@ -155,7 +152,7 @@ class BuildDiffReportApiTest : DatabaseTests({ @AfterEach fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ChangesApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ChangesApiTest.kt index 02223a71c..69bdf5e3e 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ChangesApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ChangesApiTest.kt @@ -21,17 +21,15 @@ import com.epam.drill.admin.test.* import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.route.payload.SingleMethodPayload import com.epam.drill.admin.writer.rawdata.table.BuildTable -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable import com.epam.drill.admin.writer.rawdata.table.MethodTable import com.epam.drill.admin.writer.rawdata.table.TestDefinitionTable import com.epam.drill.admin.writer.rawdata.table.TestLaunchTable import com.epam.drill.admin.writer.rawdata.table.TestSessionTable -import com.jayway.jsonpath.JsonPath import io.ktor.client.* import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import org.junit.jupiter.api.AfterEach import kotlin.test.Test @@ -178,7 +176,7 @@ class ChangesApiTest : DatabaseTests({ @AfterEach fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageApiTest.kt index 564b48a5d..007479674 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageApiTest.kt @@ -20,18 +20,15 @@ import com.epam.drill.admin.test.DatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.route.payload.SingleMethodPayload -import com.epam.drill.admin.writer.rawdata.route.payload.TestDetails import com.epam.drill.admin.writer.rawdata.table.BuildTable -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable import com.epam.drill.admin.writer.rawdata.table.MethodTable import com.epam.drill.admin.writer.rawdata.table.TestDefinitionTable import com.epam.drill.admin.writer.rawdata.table.TestLaunchTable import com.epam.drill.admin.writer.rawdata.table.TestSessionTable -import com.jayway.jsonpath.JsonPath import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import org.junit.jupiter.api.AfterEach import kotlin.test.Test @@ -114,7 +111,7 @@ class CoverageApiTest : DatabaseTests({ @AfterEach fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt index 05ab8c29d..7feb99bd0 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/CoverageTreemapTest.kt @@ -20,18 +20,12 @@ import com.epam.drill.admin.test.DatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.route.payload.BuildPayload -import com.epam.drill.admin.writer.rawdata.route.payload.InstancePayload -import com.epam.drill.admin.writer.rawdata.route.payload.MethodsPayload -import com.epam.drill.admin.writer.rawdata.route.payload.SingleMethodPayload import com.epam.drill.admin.writer.rawdata.table.* -import com.jayway.jsonpath.JsonPath import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import org.junit.jupiter.api.AfterEach import kotlin.test.Test -import kotlin.test.assertEquals import kotlin.test.assertTrue class CoverageTreemapTest : DatabaseTests({ @@ -78,7 +72,7 @@ class CoverageTreemapTest : DatabaseTests({ @AfterEach fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt index 2d857b0fd..6be135d94 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/DataIngestClient.kt @@ -70,14 +70,21 @@ suspend fun HttpClient.launchTest( groupId = instance.groupId, appId = instance.appId, instanceId = instance.instanceId, - coverage = arrayOf( - SingleClassCoveragePayload( - classname = testClass, + buildVersion = instance.buildVersion, + commitSha = instance.commitSha, + coverage = coverage.filter { c -> c.second.any { it != 0 } }.map { + SingleMethodCoveragePayload( + signature = listOf( + it.first.classname, + it.first.name, + it.first.params, + it.first.returnType + ).joinToString(":"), testId = testLaunchId, testSessionId = session.id, - probes = coverage.toClassProbes() + probes = it.second.map { probe -> probe != 0 }.toBooleanArray() ) - ) + } ) ) } @@ -130,11 +137,19 @@ suspend fun HttpClient.refreshMetrics() { } suspend fun HttpResponse.assertSuccessStatus() = also { - assertEquals(HttpStatusCode.OK, status, "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'") + assertEquals( + HttpStatusCode.OK, + status, + "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'" + ) } suspend fun HttpResponse.returns(path: String = "$.data", body: (List>) -> Unit) = also { - assertEquals(HttpStatusCode.OK, status, "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'") + assertEquals( + HttpStatusCode.OK, + status, + "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'" + ) }.apply { val json = JsonPath.parse(bodyAsText()) val data = json.read>>(path) @@ -142,18 +157,13 @@ suspend fun HttpResponse.returns(path: String = "$.data", body: (List) -> Unit) = also { - assertEquals(HttpStatusCode.OK, status, "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'") + assertEquals( + HttpStatusCode.OK, + status, + "Expected HTTP status OK, but got $status with a message '${this.bodyAsText()}'" + ) }.apply { val json = JsonPath.parse(bodyAsText()) val data = json.read>(path) body(data) -} - -private fun Array>.toClassProbes(): BooleanArray { - val classProbesCount = this.sumOf { it.first.probesCount } - val classProbes = BooleanArray(classProbesCount) - this.forEach { (method, probes) -> - probes.forEachIndexed { index, probe -> classProbes[method.probesStartPos + index] = probe != 0 } - } - return classProbes } \ No newline at end of file diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedMethodsApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedMethodsApiTest.kt index 0f2a6a9c3..651ec9f62 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedMethodsApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedMethodsApiTest.kt @@ -21,16 +21,14 @@ import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.route.payload.TestDetails import com.epam.drill.admin.writer.rawdata.table.BuildTable -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable import com.epam.drill.admin.writer.rawdata.table.MethodTable import com.epam.drill.admin.writer.rawdata.table.TestDefinitionTable import com.epam.drill.admin.writer.rawdata.table.TestLaunchTable import com.epam.drill.admin.writer.rawdata.table.TestSessionTable -import com.jayway.jsonpath.JsonPath import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import kotlin.test.AfterTest import kotlin.test.Test @@ -110,7 +108,7 @@ class ImpactedMethodsApiTest : DatabaseTests({ @AfterTest fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedTestsApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedTestsApiTest.kt index 297d6ece1..ce1ccb623 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedTestsApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/ImpactedTestsApiTest.kt @@ -21,21 +21,18 @@ import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.route.payload.TestDetails import com.epam.drill.admin.writer.rawdata.table.BuildTable -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable import com.epam.drill.admin.writer.rawdata.table.MethodTable import com.epam.drill.admin.writer.rawdata.table.TestDefinitionTable import com.epam.drill.admin.writer.rawdata.table.TestLaunchTable import com.epam.drill.admin.writer.rawdata.table.TestSessionTable -import com.jayway.jsonpath.JsonPath import io.ktor.client.request.* import io.ktor.client.statement.* -import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.deleteAll import kotlin.test.AfterTest import kotlin.test.Test import kotlin.test.assertTrue -import kotlin.text.get class ImpactedTestsApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) @@ -209,7 +206,7 @@ class ImpactedTestsApiTest : DatabaseTests({ @AfterTest fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt index 31bd5fcbb..58194b4d6 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/RecommendedTestsApiTest.kt @@ -209,7 +209,7 @@ class RecommendedTestsApiTest : DatabaseTests({ @AfterEach fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestImpactAnalysisTest.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestImpactAnalysisTest.kt index 756f8a4f7..d5e7cd736 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestImpactAnalysisTest.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestImpactAnalysisTest.kt @@ -20,7 +20,7 @@ import com.epam.drill.admin.test.DatabaseTests import com.epam.drill.admin.test.withTransaction import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.table.BuildTable -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable import com.epam.drill.admin.writer.rawdata.table.MethodTable import com.epam.drill.admin.writer.rawdata.table.TestDefinitionTable @@ -104,7 +104,7 @@ class TestImpactAnalysisTest : DatabaseTests({ @AfterTest fun clearAll() = withTransaction { - CoverageTable.deleteAll() + MethodCoverageTable.deleteAll() InstanceTable.deleteAll() MethodTable.deleteAll() BuildTable.deleteAll() diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/entity/Coverage.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/entity/Coverage.kt index 4fe9c3b64..1b2c4f1f1 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/entity/Coverage.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/entity/Coverage.kt @@ -19,7 +19,8 @@ class Coverage( val groupId: String, val appId: String, val instanceId: String, - val classname: String, + val buildId: String, + val signature: String, val testId: String?, val testSessionId: String?, val probes: BooleanArray diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/CoverageRepositoryImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/CoverageRepositoryImpl.kt index 0d5eb4072..137179a6e 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/CoverageRepositoryImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/CoverageRepositoryImpl.kt @@ -17,8 +17,8 @@ package com.epam.drill.admin.writer.rawdata.repository.impl import com.epam.drill.admin.writer.rawdata.entity.Coverage import com.epam.drill.admin.writer.rawdata.repository.CoverageRepository -import com.epam.drill.admin.writer.rawdata.table.CoverageTable import com.epam.drill.admin.writer.rawdata.table.InstanceTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.SqlExpressionBuilder.inSubQuery import org.jetbrains.exposed.sql.SqlExpressionBuilder.less @@ -29,26 +29,28 @@ import java.time.LocalDate class CoverageRepositoryImpl : CoverageRepository { override suspend fun createMany(data: List) { - CoverageTable.batchInsert(data, shouldReturnGeneratedValues = false) { - this[CoverageTable.groupId] = it.groupId - this[CoverageTable.appId] = it.appId - this[CoverageTable.instanceId] = it.instanceId - this[CoverageTable.classname] = it.classname - this[CoverageTable.testId] = it.testId - this[CoverageTable.testSessionId] = it.testSessionId - this[CoverageTable.probes] = it.probes + MethodCoverageTable.batchInsert(data, shouldReturnGeneratedValues = false) { + this[MethodCoverageTable.groupId] = it.groupId + this[MethodCoverageTable.appId] = it.appId + this[MethodCoverageTable.instanceId] = it.instanceId + this[MethodCoverageTable.buildId] = it.buildId + this[MethodCoverageTable.signature] = it.signature + this[MethodCoverageTable.testId] = it.testId + this[MethodCoverageTable.testSessionId] = it.testSessionId + this[MethodCoverageTable.probes] = it.probes + this[MethodCoverageTable.probesCount] = it.probes.size } } override suspend fun deleteAllCreatedBefore(groupId: String, createdBefore: LocalDate) { - CoverageTable.deleteWhere { (CoverageTable.groupId eq groupId) and (CoverageTable.createdAt less createdBefore.atStartOfDay()) } + MethodCoverageTable.deleteWhere { (MethodCoverageTable.groupId eq groupId) and (MethodCoverageTable.createdAt less createdBefore.atStartOfDay()) } } override suspend fun deleteAllByBuildId(groupId: String, appId: String, buildId: String) { - CoverageTable.deleteWhere { - (CoverageTable.groupId eq groupId) and - (CoverageTable.appId eq appId) and - (CoverageTable.instanceId inSubQuery InstanceTable + MethodCoverageTable.deleteWhere { + (MethodCoverageTable.groupId eq groupId) and + (MethodCoverageTable.appId eq appId) and + (MethodCoverageTable.instanceId inSubQuery InstanceTable .select(InstanceTable.id) .where { (InstanceTable.groupId eq groupId) and @@ -59,9 +61,9 @@ class CoverageRepositoryImpl : CoverageRepository { } override suspend fun deleteAllByTestSessionId(groupId: String, testSessionId: String) { - CoverageTable.deleteWhere { - (CoverageTable.groupId eq groupId) and - (CoverageTable.testSessionId eq testSessionId) + MethodCoverageTable.deleteWhere { + (MethodCoverageTable.groupId eq groupId) and + (MethodCoverageTable.testSessionId eq testSessionId) } } } \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt index 3b7057993..18fb7d0f3 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/route/payload/CoveragePayload.kt @@ -22,12 +22,14 @@ class CoveragePayload( val groupId: String, val appId: String, val instanceId: String, - val coverage: Array, + val commitSha: String?, + val buildVersion: String?, + val coverage: List ) @Serializable -class SingleClassCoveragePayload( - val classname: String, +class SingleMethodCoveragePayload( + val signature: String, val testId: String?, val testSessionId: String?, val probes: BooleanArray diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt index ce704109a..a9b658f13 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt @@ -150,12 +150,20 @@ class RawDataServiceImpl( } override suspend fun saveCoverage(coveragePayload: CoveragePayload) { - coveragePayload.coverage.map { coverage -> + val buildId = generateBuildId( + coveragePayload.groupId, + coveragePayload.appId, + coveragePayload.instanceId, + coveragePayload.commitSha, + coveragePayload.buildVersion + ) + coveragePayload.coverage.filter { probes -> probes.probes.any { it } }.map { coverage -> Coverage( groupId = coveragePayload.groupId, appId = coveragePayload.appId, instanceId = coveragePayload.instanceId, - classname = coverage.classname, + buildId = buildId, + signature = coverage.signature, testId = coverage.testId, testSessionId = coverage.testSessionId, probes = coverage.probes diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/CoverageTable.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/MethodCoverageTable.kt similarity index 83% rename from admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/CoverageTable.kt rename to admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/MethodCoverageTable.kt index 712fa3812..7e9b20a11 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/CoverageTable.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/table/MethodCoverageTable.kt @@ -20,13 +20,15 @@ import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.datetime -object CoverageTable : IntIdTable("raw_data.coverage") { +object MethodCoverageTable : IntIdTable("raw_data.method_coverage") { val groupId = varchar("group_id", SHORT_TEXT_LENGTH) val appId = varchar("app_id", SHORT_TEXT_LENGTH) val instanceId = varchar("instance_id", SHORT_TEXT_LENGTH).references(InstanceTable.id) - val classname = varchar("classname", LONG_TEXT_LENGTH) + val buildId = (varchar("build_id", MEDIUM_TEXT_LENGTH).references(BuildTable.id)).nullable() + val signature = varchar("signature", LONG_TEXT_LENGTH) val testId = varchar("test_id", SHORT_TEXT_LENGTH).nullable() val testSessionId = varchar("test_session_id", SHORT_TEXT_LENGTH).nullable() val probes = registerColumn("probes", ProbesColumnType()) + val probesCount = integer("probes_count") val createdAt = datetime("created_at").defaultExpression(CurrentDateTime) } \ No newline at end of file diff --git a/admin-writer/src/main/resources/raw_data/db/migration/R__1_Views.sql b/admin-writer/src/main/resources/raw_data/db/migration/R__1_Views.sql index c580983d9..0b85c1336 100644 --- a/admin-writer/src/main/resources/raw_data/db/migration/R__1_Views.sql +++ b/admin-writer/src/main/resources/raw_data/db/migration/R__1_Views.sql @@ -28,22 +28,21 @@ CREATE OR REPLACE VIEW raw_data.view_methods_with_rules AS ----------------------------------------------------------------- ----------------------------------------------------------------- -CREATE OR REPLACE VIEW raw_data.view_methods_coverage_v4 AS +CREATE OR REPLACE VIEW raw_data.view_methods_coverage_v5 AS SELECT c.group_id, c.app_id, i.build_id, - MD5(m.signature||':'||m.body_checksum||':'||m.probes_count) AS method_id, + MD5(c.signature||':'||m.body_checksum||':'||m.probes_count) AS method_id, m.signature, CASE WHEN c.test_session_id = 'GLOBAL' THEN NULL ELSE c.test_session_id END AS test_session_id, CASE WHEN c.test_id = 'TEST_CONTEXT_NONE' THEN NULL ELSE test_id END AS test_launch_id, i.env_id AS app_env_id, c.created_at, - SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count) AS probes -FROM raw_data.coverage c + c.probes AS probes +FROM raw_data.method_coverage c JOIN raw_data.instances i ON i.group_id = c.group_id AND i.app_id = c.app_id AND i.id = c.instance_id JOIN raw_data.view_methods_with_rules m ON m.group_id = i.group_id AND m.app_id = i.app_id AND m.build_id = i.build_id - AND m.classname = c.classname - AND BIT_COUNT(SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count)) > 0 - AND BIT_LENGTH(SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count)) = m.probes_count; \ No newline at end of file + AND m.signature = c.signature + AND m.probes_count = c.probes_count; \ No newline at end of file diff --git a/admin-writer/src/main/resources/raw_data/db/migration/V25__Method_Coverage.sql b/admin-writer/src/main/resources/raw_data/db/migration/V25__Method_Coverage.sql new file mode 100644 index 000000000..1b4b0d010 --- /dev/null +++ b/admin-writer/src/main/resources/raw_data/db/migration/V25__Method_Coverage.sql @@ -0,0 +1,63 @@ +DROP TABLE IF EXISTS raw_data.method_coverage CASCADE; +CREATE TABLE IF NOT EXISTS raw_data.method_coverage( + id SERIAL PRIMARY KEY, + instance_id VARCHAR, + signature VARCHAR, + test_id VARCHAR, + probes VARBIT, + probes_count INT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + group_id VARCHAR NOT NULL, + app_id VARCHAR NOT NULL, + test_session_id VARCHAR +); + +CREATE INDEX IF NOT EXISTS idx_coverage_group_id_created_at ON raw_data.method_coverage(group_id, created_at); +CREATE INDEX IF NOT EXISTS idx_coverage_instance_id ON raw_data.method_coverage(group_id, app_id, instance_id, signature, test_session_id, test_id); +CREATE INDEX IF NOT EXISTS idx_coverage_test_id ON raw_data.method_coverage(test_id); + +-- extract coverage from original raw_data.coverage table +-- insert into raw_data.method_coverage with probes split per-method +WITH extracted AS ( + SELECT + c.instance_id, + m.signature, + c.test_id, + SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count)::varbit AS probes, + c.group_id, + c.app_id, + c.test_session_id, + c.created_at + FROM raw_data.coverage c + JOIN raw_data.instances i + ON c.instance_id = i.id + JOIN raw_data.methods m + ON m.classname = c.classname + AND m.group_id = c.group_id + AND m.app_id = c.app_id + AND m.build_id = i.build_id + AND BIT_LENGTH(SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count)::varbit) = m.probes_count + AND BIT_COUNT(SUBSTRING(c.probes FROM m.probe_start_pos + 1 FOR m.probes_count)::varbit) > 0 +) +INSERT INTO raw_data.method_coverage ( + instance_id, + signature, + test_id, + probes, + probes_count, + group_id, + app_id, + test_session_id, + created_at +) +SELECT + e.instance_id, + e.signature, + e.test_id, + e.probes, + BIT_LENGTH(e.probes) AS probes_count, + e.group_id, + e.app_id, + e.test_session_id, + e.created_at +FROM extracted e; diff --git a/admin-writer/src/main/resources/raw_data/db/migration/V26__Add_build_id_into_method_coverage.sql b/admin-writer/src/main/resources/raw_data/db/migration/V26__Add_build_id_into_method_coverage.sql new file mode 100644 index 000000000..da809233a --- /dev/null +++ b/admin-writer/src/main/resources/raw_data/db/migration/V26__Add_build_id_into_method_coverage.sql @@ -0,0 +1,9 @@ +ALTER TABLE raw_data.method_coverage ADD COLUMN build_id VARCHAR; + +UPDATE raw_data.method_coverage SET build_id = ( + SELECT build_id + FROM raw_data.instances + WHERE group_id = raw_data.method_coverage.group_id + AND app_id = raw_data.method_coverage.app_id + AND id = raw_data.method_coverage.instance_id +); \ No newline at end of file diff --git a/admin-writer/src/main/resources/raw_data/db/migration/V27__Modify_indexes.sql b/admin-writer/src/main/resources/raw_data/db/migration/V27__Modify_indexes.sql new file mode 100644 index 000000000..0f05ae2ad --- /dev/null +++ b/admin-writer/src/main/resources/raw_data/db/migration/V27__Modify_indexes.sql @@ -0,0 +1,9 @@ +DROP INDEX IF EXISTS raw_data.idx_coverage_group_id_created_at; +DROP INDEX IF EXISTS raw_data.idx_coverage_instance_id; +DROP INDEX IF EXISTS raw_data.idx_coverage_test_id; +DROP INDEX IF EXISTS idx_instances_pk; +CREATE INDEX IF NOT EXISTS idx_method_coverage_created_at ON raw_data.method_coverage(created_at, group_id); +CREATE INDEX IF NOT EXISTS idx_method_coverage_instance_id ON raw_data.method_coverage(instance_id, app_id, group_id); +CREATE INDEX IF NOT EXISTS idx_method_coverage_signature ON raw_data.method_coverage(signature, build_id, probes_count, app_id, group_id); +CREATE INDEX IF NOT EXISTS idx_methods_signature ON raw_data.methods(signature, build_id, probes_count, app_id, group_id) INCLUDE (body_checksum); +CREATE INDEX IF NOT EXISTS idx_instances_id ON raw_data.instances(id, group_id, app_id); diff --git a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt index 4916f1ca4..82720157d 100644 --- a/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt +++ b/admin-writer/src/test/kotlin/com/epam/drill/admin/writer/rawdata/CoverageApiTest.kt @@ -16,7 +16,7 @@ package com.epam.drill.admin.writer.rawdata import com.epam.drill.admin.writer.rawdata.route.postCoverage -import com.epam.drill.admin.writer.rawdata.table.CoverageTable +import com.epam.drill.admin.writer.rawdata.table.MethodCoverageTable import com.epam.drill.admin.test.* import com.epam.drill.admin.writer.rawdata.config.RawDataWriterDatabaseConfig import com.epam.drill.admin.writer.rawdata.config.rawDataServicesDIModule @@ -36,7 +36,9 @@ class CoverageApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) val testGroup = "test-group" val testApp = "test-app" val testInstance = "test-instance" - val testClassname = "com.example.TestClass" + val testBuildVersion = "0.0.1" + val testMethodSignature1 = "com.example.TestClass:myMethod:myParam:void" + val testMethodSignature2 = "com.example.TestClass:myMethod2:myParam:void" val testTestId = "test-id" val timeBeforeTest = LocalDateTime.now() val app = drillApplication(rawDataServicesDIModule) { @@ -51,14 +53,15 @@ class CoverageApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) "groupId": "$testGroup", "appId": "$testApp", "instanceId": "$testInstance", + "buildVersion": "$testBuildVersion", "coverage": [ - { - "classname": "$testClassname", + { + "signature": "$testMethodSignature1", "testId": "$testTestId", "probes": [true, false, true] }, - { - "classname": "$testClassname", + { + "signature": "$testMethodSignature2", "testId": "$testTestId", "probes": [false, true, false] } @@ -77,16 +80,30 @@ class CoverageApiTest : DatabaseTests({ RawDataWriterDatabaseConfig.init(it) }) ) } - val savedCoverage = CoverageTable.selectAll().asSequence() - .filter { it[CoverageTable.groupId] == testGroup } - .filter { it[CoverageTable.appId] == testApp } - .filter { it[CoverageTable.instanceId] == testInstance } - .filter { it[CoverageTable.classname] == testClassname } - .filter { it[CoverageTable.testId] == testTestId } + val savedCoverageMethod1 = MethodCoverageTable.selectAll().asSequence() + .filter { it[MethodCoverageTable.groupId] == testGroup } + .filter { it[MethodCoverageTable.appId] == testApp } + .filter { it[MethodCoverageTable.instanceId] == testInstance } + .filter { it[MethodCoverageTable.buildId] == "$testGroup:$testApp:$testBuildVersion" } + .filter { it[MethodCoverageTable.signature] == testMethodSignature1 } + .filter { it[MethodCoverageTable.testId] == testTestId } .toList() - assertEquals(2, savedCoverage.size) - savedCoverage.forEach { - assertTrue(it[CoverageTable.createdAt] >= timeBeforeTest) + assertEquals(1, savedCoverageMethod1.size) + savedCoverageMethod1.forEach { + assertTrue(it[MethodCoverageTable.createdAt] >= timeBeforeTest) + } + + val savedCoverageMethod2 = MethodCoverageTable.selectAll().asSequence() + .filter { it[MethodCoverageTable.groupId] == testGroup } + .filter { it[MethodCoverageTable.appId] == testApp } + .filter { it[MethodCoverageTable.instanceId] == testInstance } + .filter { it[MethodCoverageTable.buildId] == "$testGroup:$testApp:$testBuildVersion" } + .filter { it[MethodCoverageTable.signature] == testMethodSignature2 } + .filter { it[MethodCoverageTable.testId] == testTestId } + .toList() + assertEquals(1, savedCoverageMethod2.size) + savedCoverageMethod2.forEach { + assertTrue(it[MethodCoverageTable.createdAt] >= timeBeforeTest) } } } \ No newline at end of file