From 8ae33f4703db045c5a9ce759312a6b70bbc1ae21 Mon Sep 17 00:00:00 2001 From: RomanDavlyatshin Date: Wed, 21 Jan 2026 14:00:15 +0400 Subject: [PATCH 1/6] fix: metrics.get_changes excludeMethodSignatures not working with multiple conditions --- .../src/main/resources/metrics/db/migration/R__3_Functions.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/admin-metrics/src/main/resources/metrics/db/migration/R__3_Functions.sql b/admin-metrics/src/main/resources/metrics/db/migration/R__3_Functions.sql index 5da321881..6c86fc99e 100644 --- a/admin-metrics/src/main/resources/metrics/db/migration/R__3_Functions.sql +++ b/admin-metrics/src/main/resources/metrics/db/migration/R__3_Functions.sql @@ -383,7 +383,7 @@ BEGIN -- Filters by methods AND (input_package_name_pattern IS NULL OR m.class_name LIKE input_package_name_pattern) AND (input_method_signature_pattern IS NULL OR m.signature LIKE input_method_signature_pattern) - AND (input_exclude_method_signatures IS NULL OR m.signature != ANY(input_exclude_method_signatures::VARCHAR[])) + AND (input_exclude_method_signatures IS NULL OR m.signature <> ALL(input_exclude_method_signatures::VARCHAR[])) -- Deprecated filters AND (input_method_signature IS NULL OR m.signature = input_method_signature) AND (input_class_name IS NULL OR m.class_name = input_class_name) From 73725c8e73085f068adac07836fd4070205559cb Mon Sep 17 00:00:00 2001 From: iryabov Date: Tue, 27 Jan 2026 11:22:14 +0100 Subject: [PATCH 2/6] feat: enhance ETL process by improving skipped row handling and progress updates EPMDJ-11187 --- .../drill/admin/etl/impl/BatchDataLoader.kt | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index c992ee31d..26db6b808 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -92,29 +92,20 @@ abstract class BatchDataLoader( return@collect } - // Skip rows that are not processable - if (!isProcessable(row)) { - previousTimestamp = currentTimestamp - skippedRows.incrementAndGet() - return@collect - } - // If timestamp changed and buffer is full, flush the buffer - if (previousTimestamp != null && currentTimestamp != previousTimestamp) { - if (buffer.size >= batchSize) { - result += flushBuffer(groupId, buffer, batchNo) { batch -> - if (batch.success) { - lastLoadedTimestamp = - previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") - } - EtlLoadingResult( - errorMessage = if (!batch.success) result.errorMessage else null, - lastProcessedAt = lastLoadedTimestamp, - processedRows = if (batch.success) batch.rowsLoaded else 0L, - duration = batch.duration - ).also { - onLoadingProgress(it) - } + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.size >= batchSize) { + result += flushBuffer(groupId, buffer, batchNo) { batch -> + if (batch.success) { + lastLoadedTimestamp = + previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") + } + EtlLoadingResult( + errorMessage = if (!batch.success) result.errorMessage else null, + lastProcessedAt = lastLoadedTimestamp, + processedRows = if (batch.success) batch.rowsLoaded else 0L, + duration = batch.duration + ).also { + onLoadingProgress(it) } } } @@ -124,6 +115,22 @@ abstract class BatchDataLoader( return@collect } + // Skip rows that are not processable + if (!isProcessable(row)) { + previousTimestamp = currentTimestamp + skippedRows.incrementAndGet() + // If timestamp changed and there are a lot of skipped rows, update progress + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRows.get() % batchSize == 0L) { + onLoadingProgress( + EtlLoadingResult( + lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"), + processedRows = 0, + ) + ) + } + return@collect + } + buffer += row previousTimestamp = currentTimestamp loadedRows.incrementAndGet() From 7388dd64f1d0aa7640fb6175fba73be2308d0bcc Mon Sep 17 00:00:00 2001 From: iryabov Date: Tue, 27 Jan 2026 12:27:52 +0100 Subject: [PATCH 3/6] feat: improve skipped row handling in BatchDataLoader with progress updates EPMDJ-11187 --- .../kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index 26db6b808..a946ece4d 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -56,6 +56,7 @@ abstract class BatchDataLoader( val loadedRows = AtomicLong(0) val skippedRows = AtomicLong(0) val buffer = mutableListOf() + var skippedRowsForUpdate = 0L var lastLoadedTimestamp: Instant = sinceTimestamp var previousTimestamp: Instant? = null suspend fun StoppableFlow.stopWithMessage(message: String) { @@ -119,14 +120,16 @@ abstract class BatchDataLoader( if (!isProcessable(row)) { previousTimestamp = currentTimestamp skippedRows.incrementAndGet() + skippedRowsForUpdate++ // If timestamp changed and there are a lot of skipped rows, update progress - if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRows.get() % batchSize == 0L) { + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) { onLoadingProgress( EtlLoadingResult( lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"), processedRows = 0, ) ) + skippedRowsForUpdate = 0 } return@collect } From 3c8864323545deaccbe6a6cd5a1cb2df2209fc3e Mon Sep 17 00:00:00 2001 From: iryabov Date: Tue, 27 Jan 2026 12:51:09 +0100 Subject: [PATCH 4/6] fix: correct typo in parameter name for onLoadingProgress in EtlPipelineImpl EPMDJ-11187 --- .../kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt index 058915af2..0a4e2df31 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt @@ -26,7 +26,6 @@ import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus import com.epam.drill.admin.etl.NopTransformer import com.epam.drill.admin.etl.flow.CompletableSharedFlow -import com.epam.drill.admin.etl.impl.EtlPipelineImpl import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers @@ -192,7 +191,7 @@ class EtlPipelineImpl( sinceTimestamp: Instant, untilTimestamp: Instant, flow: Flow, - onLoadingProgress: suspend (loaderNmae: String, result: EtlLoadingResult) -> Unit, + onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit, onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit ): EtlLoadingResult = try { transformer.transform(groupId, flow).let { flow -> From 4775c65a154adcf688ad7152d8d609be926a3c7a Mon Sep 17 00:00:00 2001 From: iryabov Date: Wed, 28 Jan 2026 13:09:38 +0100 Subject: [PATCH 5/6] fix: do not update checksum and probes count on update methods EPMDJ-11188 --- .../repository/impl/MethodRepositoryImpl.kt | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt index 05c90eee5..249a61e9b 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt @@ -25,9 +25,13 @@ import org.jetbrains.exposed.sql.batchUpsert import org.jetbrains.exposed.sql.deleteWhere import java.time.LocalDate -class MethodRepositoryImpl: MethodRepository { +class MethodRepositoryImpl : MethodRepository { override suspend fun createMany(data: List) { - MethodTable.batchUpsert(data, shouldReturnGeneratedValues = false) { + MethodTable.batchUpsert( + data, + shouldReturnGeneratedValues = false, + onUpdateExclude = listOf(MethodTable.probesStartPos, MethodTable.bodyChecksum, MethodTable.probesCount) + ) { this[MethodTable.id] = it.id this[MethodTable.groupId] = it.groupId this[MethodTable.appId] = it.appId @@ -38,7 +42,7 @@ class MethodRepositoryImpl: MethodRepository { this[MethodTable.returnType] = it.returnType this[MethodTable.probesStartPos] = it.probesStartPos this[MethodTable.bodyChecksum] = it.bodyChecksum - this[MethodTable.signature] = it .signature + this[MethodTable.signature] = it.signature this[MethodTable.probesCount] = it.probesCount it.annotations?.let { annotations -> this[MethodTable.annotations] = annotations.takeIf { it.isNotEmpty() }?.toString() @@ -56,8 +60,8 @@ class MethodRepositoryImpl: MethodRepository { override suspend fun deleteAllByBuildId(groupId: String, appId: String, buildId: String) { MethodTable.deleteWhere { (MethodTable.groupId eq groupId) and - (MethodTable.appId eq appId) and - (MethodTable.buildId eq buildId) + (MethodTable.appId eq appId) and + (MethodTable.buildId eq buildId) } } } \ No newline at end of file From b0696e5b37ea24ca16e770984e415277481888ef Mon Sep 17 00:00:00 2001 From: iryabov Date: Wed, 28 Jan 2026 13:10:02 +0100 Subject: [PATCH 6/6] fix: refactor TestDataDsl to build and deploy instances in a separate method EPMDJ-11188 --- .../com/epam/drill/admin/metrics/TestDataDsl.kt | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt index 6c3d3e381..4c2b95923 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt @@ -53,7 +53,9 @@ fun havingData(testsData: suspend TestDataDsl.() -> Unit): HttpClient { metricsRoutes() metricsManagementRoutes() }.drillClient().apply { - testsData(TestDataDsl(this)) + val testDataDsl = TestDataDsl(this) + testsData(testDataDsl) + testDataDsl.build() refreshMetrics() } } @@ -86,12 +88,17 @@ data class ImpactedMethods( ) class TestDataDsl(val client: HttpClient) { - private val builds = mutableMapOf>() + private val builds = linkedMapOf>() + + suspend fun build() { + builds.forEach { (b, m) -> + client.deployInstance(b, m.toTypedArray()) + } + } suspend infix fun InstancePayload.has(methods: List) { val newMethods = methods.recalcProbesStartPos() builds.put(this, ArrayList(newMethods)) - client.deployInstance(this, newMethods.toTypedArray()) } suspend infix fun InstancePayload.hasModified(method: SingleMethodPayload) = @@ -134,7 +141,6 @@ class TestDataDsl(val client: HttpClient) { } val newTargetMethods = targetMethods.recalcProbesStartPos() builds.put(this.build, newTargetMethods) - client.deployInstance(this.build, newTargetMethods.toTypedArray()) } private fun List.recalcProbesStartPos(): MutableList { @@ -154,7 +160,6 @@ class TestDataDsl(val client: HttpClient) { ) { val otherMethods = builds.getOrDefault(baseline, ArrayList()) builds.put(this, ArrayList(otherMethods)) - client.deployInstance(this, otherMethods.toTypedArray()) } private fun isSignatureEqual(one: SingleMethodPayload, other: SingleMethodPayload) =