From 16a89ad19b2f0a54a9382d1567a66d44ae7570fb Mon Sep 17 00:00:00 2001 From: iryabov Date: Mon, 20 Apr 2026 12:51:38 +0200 Subject: [PATCH 1/6] feat: implement test launch coverage extractor and pipeline EPMDJ-11206 --- .../admin/metrics/config/MetricsModule.kt | 3 +- .../drill/admin/metrics/etl/CoverageEtl.kt | 24 +++++++++++ .../metrics/db/etl/coverage_extractor.sql | 16 ++++---- .../db/etl/test_launch_coverage_extractor.sql | 41 +++++++++++++++++++ 4 files changed, 74 insertions(+), 10 deletions(-) create mode 100644 admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt index cabd13c71..c02c77b3a 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt @@ -26,6 +26,7 @@ import com.epam.drill.admin.metrics.etl.methodsPipeline import com.epam.drill.admin.metrics.etl.buildsPipeline import com.epam.drill.admin.metrics.etl.coveragePipeline import com.epam.drill.admin.metrics.etl.testDefinitionsPipeline +import com.epam.drill.admin.metrics.etl.testLaunchCoveragePipeline import com.epam.drill.admin.metrics.etl.testLaunchesPipeline import com.epam.drill.admin.metrics.etl.testSessionBuildsPipeline import com.epam.drill.admin.metrics.etl.testSessionsPipeline @@ -63,7 +64,7 @@ val metricsDIModule pipelines = listOf( buildsPipeline, methodsPipeline, testLaunchesPipeline, testDefinitionsPipeline, testSessionsPipeline, - coveragePipeline, testSessionBuildsPipeline + coveragePipeline, testLaunchCoveragePipeline, testSessionBuildsPipeline ), metadataRepository = instance(), consistencyWindow = etlConfig.consistencyWindow diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt index ea8cde0ac..b26eac8e7 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt @@ -38,6 +38,17 @@ val EtlConfig.coverageExtractor lastExtractedAtColumnName = "created_at", ) +val EtlConfig.testLaunchCoverageExtractor + get() = UntypedSqlDataExtractor( + name = "test_launch_coverage", + sqlQuery = fromResource("/metrics/db/etl/test_launch_coverage_extractor.sql"), + database = MetricsDatabaseConfig.database, + fetchSize = fetchSize, + extractionLimit = extractionLimit, + loggingFrequency = loggingFrequency, + lastExtractedAtColumnName = "test_completed_at", + ) + val EtlConfig.buildMethodTestDefinitionCoverageLoader get() = UntypedSqlDataLoader( name = "build_method_test_definition_coverage", @@ -161,6 +172,19 @@ val EtlConfig.coveragePipeline get() = EtlPipelineImpl( name = "coverage", extractor = coverageExtractor, + loaders = listOf( + untypedNopTransformer to buildMethodTestSessionCoverageLoader, + buildMethodCoverageTransformer to buildMethodCoverageLoader, + methodDailyCoverageTransformer to methodDailyCoverageLoader, + untypedNopTransformer to testSessionBuildsLoader + ), + bufferSize = bufferSize + ) + +val EtlConfig.testLaunchCoveragePipeline + get() = EtlPipelineImpl( + name = "test_launch_coverage", + extractor = testLaunchCoverageExtractor, loaders = listOf( untypedNopTransformer to buildMethodTestDefinitionCoverageLoader, untypedNopTransformer to buildMethodTestSessionCoverageLoader, diff --git a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql index 2b34be0f6..5a03a4ed9 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql @@ -4,16 +4,16 @@ SELECT i.build_id, i.env_id AS app_env_id, CASE WHEN c.test_session_id = 'GLOBAL' THEN NULL ELSE c.test_session_id END AS test_session_id, - CASE WHEN c.test_id = 'TEST_CONTEXT_NONE' THEN NULL ELSE c.test_id END AS test_launch_id, + NULL AS test_launch_id, c.method_id, m.signature, b.branch, - tl.test_definition_id, - test_tag, - td.path AS test_path, - td.name AS test_name, + NULL AS test_definition_id, + NULL AS test_tag, + NULL AS test_path, + NULL AS test_name, ts.test_task_id, - tl.result AS test_result, + NULL AS test_result, c.created_at, DATE_TRUNC('day', c.created_at) AS created_at_day, c.probes AS probes @@ -30,11 +30,9 @@ JOIN raw_data.methods m ON m.method_id = c.method_id AND m.app_id = c.app_id AND JOIN raw_data.instances i ON i.id = c.instance_id AND i.app_id = c.app_id AND i.group_id = c.group_id JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id LEFT JOIN raw_data.test_sessions ts ON ts.id = c.test_session_id AND ts.group_id = c.group_id -LEFT JOIN raw_data.test_launches tl ON tl.id = c.test_id AND tl.group_id = c.group_id -LEFT JOIN raw_data.test_definitions td ON td.group_id = tl.group_id AND td.id = tl.test_definition_id -LEFT JOIN LATERAL unnest(td.tags) AS test_tag ON TRUE WHERE c.created_at > :since_timestamp AND c.created_at <= :until_timestamp AND c.group_id = :group_id + AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE') ORDER BY c.created_at, c.group_id, c.method_id LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql new file mode 100644 index 000000000..db7c7cba1 --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql @@ -0,0 +1,41 @@ +SELECT + c.group_id, + c.app_id, + i.build_id, + i.env_id AS app_env_id, + ts.id AS test_session_id, + tl.id AS test_launch_id, + c.method_id, + m.signature, + b.branch, + tl.test_definition_id, + test_tag, + td.path AS test_path, + td.name AS test_name, + ts.test_task_id, + tl.result AS test_result, + tl.created_at AS test_completed_at, + c.created_at AS created_at, + DATE_TRUNC('day', c.created_at) AS created_at_day, + c.probes AS probes +FROM raw_data.test_launches tl +JOIN raw_data.test_sessions ts ON ts.id = tl.test_session_id AND ts.group_id = tl.group_id +JOIN raw_data.test_definitions td ON td.id = tl.test_definition_id AND td.group_id = tl.group_id +JOIN raw_data.method_coverage c ON c.test_id = tl.id AND c.group_id = tl.group_id +JOIN raw_data.methods m ON m.method_id = c.method_id AND m.app_id = c.app_id AND m.group_id = c.group_id + AND NOT EXISTS ( + SELECT 1 + FROM raw_data.method_ignore_rules r + WHERE r.group_id = m.group_id + AND r.app_id = m.app_id + AND (r.classname_pattern IS NOT NULL AND m.class_name::text ~ r.classname_pattern::text + OR r.name_pattern IS NOT NULL AND m.method_name::text ~ r.name_pattern::text) + ) +JOIN raw_data.instances i ON i.id = c.instance_id AND i.app_id = c.app_id AND i.group_id = c.group_id +JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id +LEFT JOIN LATERAL unnest(td.tags) AS test_tag ON TRUE +WHERE tl.created_at > :since_timestamp + AND tl.created_at <= :until_timestamp + AND tl.group_id = :group_id +ORDER BY tl.created_at, tl.id, m.method_id +LIMIT :limit \ No newline at end of file From 8dd57d0af310e3d00ed707023d8a57f243035d18 Mon Sep 17 00:00:00 2001 From: iryabov Date: Thu, 23 Apr 2026 17:27:47 +0200 Subject: [PATCH 2/6] feat: update coverage extractor to include test launch data and improve SQL queries --- .../kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt | 2 ++ .../resources/metrics/db/etl/build_methods_extractor.sql | 2 +- .../main/resources/metrics/db/etl/coverage_extractor.sql | 5 +++-- .../metrics/db/etl/test_launch_coverage_extractor.sql | 6 +++--- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt index b26eac8e7..3e82a3a77 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt @@ -173,9 +173,11 @@ val EtlConfig.coveragePipeline name = "coverage", extractor = coverageExtractor, loaders = listOf( + untypedNopTransformer to buildMethodTestDefinitionCoverageLoader, untypedNopTransformer to buildMethodTestSessionCoverageLoader, buildMethodCoverageTransformer to buildMethodCoverageLoader, methodDailyCoverageTransformer to methodDailyCoverageLoader, + test2CodeMappingTransformer to test2CodeMappingLoader, untypedNopTransformer to testSessionBuildsLoader ), bufferSize = bufferSize diff --git a/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql index e7cc2b722..3b36e856c 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/build_methods_extractor.sql @@ -26,5 +26,5 @@ WHERE bm.group_id = :group_id AND (r.classname_pattern IS NOT NULL AND m.class_name::text ~ r.classname_pattern::text OR r.name_pattern IS NOT NULL AND m.method_name::text ~ r.name_pattern::text) ) -ORDER BY bm.created_at ASC, bm.group_id, bm.method_id +ORDER BY bm.created_at ASC, bm.method_id LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql index 5a03a4ed9..b40ada86a 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql @@ -30,9 +30,10 @@ JOIN raw_data.methods m ON m.method_id = c.method_id AND m.app_id = c.app_id AND JOIN raw_data.instances i ON i.id = c.instance_id AND i.app_id = c.app_id AND i.group_id = c.group_id JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id LEFT JOIN raw_data.test_sessions ts ON ts.id = c.test_session_id AND ts.group_id = c.group_id +LEFT JOIN raw_data.test_launches tl ON tl.id = c.test_id AND tl.group_id = c.group_id WHERE c.created_at > :since_timestamp AND c.created_at <= :until_timestamp AND c.group_id = :group_id - AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE') -ORDER BY c.created_at, c.group_id, c.method_id + AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE' OR tl.id IS NOT NULL) +ORDER BY c.created_at, c.method_id LIMIT :limit \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql index db7c7cba1..ba831e3bd 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_launch_coverage_extractor.sql @@ -34,8 +34,8 @@ JOIN raw_data.methods m ON m.method_id = c.method_id AND m.app_id = c.app_id AND JOIN raw_data.instances i ON i.id = c.instance_id AND i.app_id = c.app_id AND i.group_id = c.group_id JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id LEFT JOIN LATERAL unnest(td.tags) AS test_tag ON TRUE -WHERE tl.created_at > :since_timestamp +WHERE tl.group_id = :group_id + AND tl.created_at > :since_timestamp AND tl.created_at <= :until_timestamp - AND tl.group_id = :group_id -ORDER BY tl.created_at, tl.id, m.method_id +ORDER BY tl.created_at, c.created_at, c.method_id LIMIT :limit \ No newline at end of file From 1738f8dc918fc2aea19d8cd896b33d265ed8dada Mon Sep 17 00:00:00 2001 From: iryabov Date: Thu, 23 Apr 2026 17:28:04 +0200 Subject: [PATCH 3/6] feat: refine logging in ETL orchestrator and data extractor for improved clarity --- .../drill/admin/etl/impl/EtlOrchestratorImpl.kt | 2 +- .../drill/admin/etl/impl/PageDataExtractor.kt | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt index 5beda0ec8..0812d89cb 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt @@ -49,7 +49,7 @@ open class EtlOrchestratorImpl( override suspend fun run(groupId: String, initTimestamp: Instant): List = withContext(Dispatchers.IO) { - logger.info("ETL [$name] for group [$groupId] is starting with init timestamp $initTimestamp...") + logger.info("ETL [$name] for group [$groupId] is starting...") val results = Collections.synchronizedList(mutableListOf()) val duration = measureTimeMillis { trackProgressOf { diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt index ba0372c0f..2d7c7311b 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/PageDataExtractor.kt @@ -51,7 +51,7 @@ abstract class PageDataExtractor( try { while (hasMore && currentSince < untilTimestamp) { page.incrementAndGet() - logger.debug { "ETL extractor [$name] for group [$groupId] is executing query for page ${page.get()} since $currentSince ..." } + logger.debug { "ETL extractor [$name] for group [$groupId] is executing query for page ${page.get()} since $currentSince until $untilTimestamp ..." } var previousTimestamp: Instant? = null var previousEmittedTimestamp: Instant? = null @@ -95,10 +95,13 @@ abstract class PageDataExtractor( if (pageRows == 0L || pageRows < extractionLimit) { hasMore = false emitBuffer(buffer, emitter) - logger.debug { "ETL extractor [$name] for group [$groupId] completed fetching" + - ", rows fetched: ${rowsFetched.get()}" + - ", total pages: ${page.get()}" + - ", last extracted at $currentSince" } + previousEmittedTimestamp = previousTimestamp + logger.debug { + "ETL extractor [$name] for group [$groupId] completed fetching" + + ", rows fetched: ${rowsFetched.get()}" + + ", total pages: ${page.get()}" + + (if (previousEmittedTimestamp != null) ", last extracted at $previousEmittedTimestamp" else "") + } } else { currentSince = previousEmittedTimestamp ?: throw IllegalStateException( "No rows were emitted on page $page because all fetched records had the same timestamp. Please increase the extraction limit. Current is $extractionLimit." @@ -106,7 +109,8 @@ abstract class PageDataExtractor( // Remove rows from buffer that have timestamp greater than currentSince to avoid re-emission on the next page buffer.removeIf { it.timestamp > currentSince } hasMore = true - logger.debug { "ETL extractor [$name] for group [$groupId] fetched $pageRows rows on page ${page.get()}, last extracted at $currentSince" } + logger.debug { "ETL extractor [$name] for group [$groupId] fetched $pageRows rows on page ${page.get()}" + + ", last extracted at $previousEmittedTimestamp" } } } } catch (e: Exception) { From d4d70ced6fbfc3dd290737be550288579dfd3627 Mon Sep 17 00:00:00 2001 From: iryabov Date: Fri, 24 Apr 2026 09:48:24 +0200 Subject: [PATCH 4/6] feat: add processing delay option to ETL orchestrator for improved transaction consistency --- admin-app/src/main/resources/application.conf | 2 ++ .../com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt | 3 ++- .../com/epam/drill/admin/metrics/config/EtlConfig.kt | 7 +++++++ .../com/epam/drill/admin/metrics/config/MetricsModule.kt | 3 ++- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/admin-app/src/main/resources/application.conf b/admin-app/src/main/resources/application.conf index 62da9b8b9..6e87bb6e5 100644 --- a/admin-app/src/main/resources/application.conf +++ b/admin-app/src/main/resources/application.conf @@ -108,5 +108,7 @@ drill { loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY} consistencyWindow = 0 consistencyWindow = ${?DRILL_ETL_CONSISTENCY_WINDOW} + processingDelay = 0 + processingDelay = ${?DRILL_ETL_PROCESSING_DELAY} } } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt index 0812d89cb..c6b865fae 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt @@ -44,6 +44,7 @@ open class EtlOrchestratorImpl( open val pipelines: List>, open val metadataRepository: EtlMetadataRepository, open val consistencyWindow: Long = 0, + open val processingDelay: Long = 0, ) : EtlOrchestrator { private val logger = KotlinLogging.logger {} @@ -98,7 +99,7 @@ open class EtlOrchestratorImpl( pipeline: EtlPipeline<*, *>, initTimestamp: Instant ): EtlProcessingResult = coroutineScope { - val snapshotTime = Instant.now() + val snapshotTime = Instant.now().minusSeconds(processingDelay) val metadata = metadataRepository.getAllMetadataByExtractor(groupId, pipeline.name, pipeline.extractor.name) .associateBy { it.loaderName } val loaderNames = pipeline.loaders.map { it.second.name }.toSet() diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt index 71a80b1c8..f3f563f4c 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt @@ -64,4 +64,11 @@ class EtlConfig(private val config: ApplicationConfig) { */ val consistencyWindow : Long get() = config.propertyOrNull("consistencyWindow")?.getString()?.toLongOrNull() ?: 0L + + /** + * Number of seconds to subtract from the current time when calculating the upper bound of the ETL processing window. + * This delays ETL process to allow current transactions in the data source to complete. + */ + val processingDelay : Long + get() = config.propertyOrNull("processingDelay")?.getString()?.toLongOrNull() ?: 0L } \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt index c02c77b3a..6d5d29fec 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt @@ -67,7 +67,8 @@ val metricsDIModule coveragePipeline, testLaunchCoveragePipeline, testSessionBuildsPipeline ), metadataRepository = instance(), - consistencyWindow = etlConfig.consistencyWindow + consistencyWindow = etlConfig.consistencyWindow, + processingDelay = etlConfig.processingDelay ) } } From 51f2faae0c7495e7300411738768da4bfbdd4558 Mon Sep 17 00:00:00 2001 From: iryabov Date: Fri, 24 Apr 2026 17:01:04 +0200 Subject: [PATCH 5/6] feat: update coverage extractor SQL --- .../drill/admin/etl/impl/BatchDataLoader.kt | 18 +++++++++++------- .../drill/admin/metrics/etl/CoverageEtl.kt | 2 -- .../metrics/db/etl/coverage_extractor.sql | 5 ++--- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index a946ece4d..3ee84ab87 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -125,7 +125,8 @@ abstract class BatchDataLoader( if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) { onLoadingProgress( EtlLoadingResult( - lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"), + lastProcessedAt = previousTimestamp + ?: throw IllegalStateException("Previous timestamp is null"), processedRows = 0, ) ) @@ -154,7 +155,8 @@ abstract class BatchDataLoader( // Commit any remaining rows in the buffer result += flushBuffer(groupId, buffer, batchNo) { batch -> if (batch.success) { - lastLoadedTimestamp = untilTimestamp + lastLoadedTimestamp = previousTimestamp + ?: throw IllegalStateException("Previous timestamp is null") } EtlLoadingResult( errorMessage = if (!batch.success) batch.errorMessage else null, @@ -166,11 +168,13 @@ abstract class BatchDataLoader( } } } else { - // Update last processed timestamp even if no rows were left in the buffer - result += EtlLoadingResult( - lastProcessedAt = untilTimestamp - ).also { - onLoadingProgress(it) + // Update last processed timestamp even if no rows were loaded + if (lastLoadedTimestamp == sinceTimestamp) { + result += EtlLoadingResult( + lastProcessedAt = untilTimestamp + ).also { + onLoadingProgress(it) + } } } onStatusChanged(EtlStatus.SUCCESS) diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt index 3e82a3a77..b26eac8e7 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt @@ -173,11 +173,9 @@ val EtlConfig.coveragePipeline name = "coverage", extractor = coverageExtractor, loaders = listOf( - untypedNopTransformer to buildMethodTestDefinitionCoverageLoader, untypedNopTransformer to buildMethodTestSessionCoverageLoader, buildMethodCoverageTransformer to buildMethodCoverageLoader, methodDailyCoverageTransformer to methodDailyCoverageLoader, - test2CodeMappingTransformer to test2CodeMappingLoader, untypedNopTransformer to testSessionBuildsLoader ), bufferSize = bufferSize diff --git a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql index b40ada86a..872f46064 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql @@ -3,7 +3,7 @@ SELECT c.app_id, i.build_id, i.env_id AS app_env_id, - CASE WHEN c.test_session_id = 'GLOBAL' THEN NULL ELSE c.test_session_id END AS test_session_id, + ts.id AS test_session_id, NULL AS test_launch_id, c.method_id, m.signature, @@ -30,10 +30,9 @@ JOIN raw_data.methods m ON m.method_id = c.method_id AND m.app_id = c.app_id AND JOIN raw_data.instances i ON i.id = c.instance_id AND i.app_id = c.app_id AND i.group_id = c.group_id JOIN raw_data.builds b ON b.group_id = c.group_id AND b.app_id = c.app_id AND b.id = c.build_id LEFT JOIN raw_data.test_sessions ts ON ts.id = c.test_session_id AND ts.group_id = c.group_id -LEFT JOIN raw_data.test_launches tl ON tl.id = c.test_id AND tl.group_id = c.group_id WHERE c.created_at > :since_timestamp AND c.created_at <= :until_timestamp AND c.group_id = :group_id - AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE' OR tl.id IS NOT NULL) + AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE') ORDER BY c.created_at, c.method_id LIMIT :limit \ No newline at end of file From ca0dc9602849b489846583d8afeecbdcfd33087c Mon Sep 17 00:00:00 2001 From: iryabov Date: Fri, 24 Apr 2026 17:09:13 +0200 Subject: [PATCH 6/6] feat: normalize test and session IDs in coverage extractor and update SQL queries --- .../main/resources/metrics/db/etl/coverage_extractor.sql | 2 +- .../writer/rawdata/service/impl/RawDataServiceImpl.kt | 6 +++--- .../V32__normalize_coverage_test_and_session_ids.sql | 7 +++++++ 3 files changed, 11 insertions(+), 4 deletions(-) create mode 100644 admin-writer/src/main/resources/raw_data/db/migration/V32__normalize_coverage_test_and_session_ids.sql diff --git a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql index 872f46064..57f93dee7 100644 --- a/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql +++ b/admin-metrics/src/main/resources/metrics/db/etl/coverage_extractor.sql @@ -33,6 +33,6 @@ LEFT JOIN raw_data.test_sessions ts ON ts.id = c.test_session_id AND ts.group_id WHERE c.created_at > :since_timestamp AND c.created_at <= :until_timestamp AND c.group_id = :group_id - AND (c.test_id IS NULL OR c.test_id = 'TEST_CONTEXT_NONE') + AND c.test_id IS NULL ORDER BY c.created_at, c.method_id LIMIT :limit \ No newline at end of file diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt index 2cf4314d8..8c37b30d1 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/service/impl/RawDataServiceImpl.kt @@ -32,7 +32,7 @@ import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.util.* -private const val EXEC_DATA_BATCH_SIZE = 100 +private const val EXEC_DATA_BATCH_SIZE = 1000 class RawDataServiceImpl( private val instanceRepository: InstanceRepository, @@ -183,8 +183,8 @@ class RawDataServiceImpl( coverage.bodyChecksum, coverage.probes.size ).joinToString(":").md5(), - testId = coverage.testId, - testSessionId = coverage.testSessionId, + testId = coverage.testId?.takeIf { it != "TEST_CONTEXT_NONE" }, + testSessionId = coverage.testSessionId?.takeIf { it != "GLOBAL" }, probes = coverage.probes ) } diff --git a/admin-writer/src/main/resources/raw_data/db/migration/V32__normalize_coverage_test_and_session_ids.sql b/admin-writer/src/main/resources/raw_data/db/migration/V32__normalize_coverage_test_and_session_ids.sql new file mode 100644 index 000000000..5b418f1ef --- /dev/null +++ b/admin-writer/src/main/resources/raw_data/db/migration/V32__normalize_coverage_test_and_session_ids.sql @@ -0,0 +1,7 @@ +UPDATE raw_data.method_coverage +SET test_session_id = NULL +WHERE test_session_id = 'GLOBAL'; + +UPDATE raw_data.method_coverage +SET test_id = NULL +WHERE test_id = 'TEST_CONTEXT_NONE'; \ No newline at end of file