diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index c992ee31d..a946ece4d 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -56,6 +56,7 @@ abstract class BatchDataLoader( val loadedRows = AtomicLong(0) val skippedRows = AtomicLong(0) val buffer = mutableListOf() + var skippedRowsForUpdate = 0L var lastLoadedTimestamp: Instant = sinceTimestamp var previousTimestamp: Instant? = null suspend fun StoppableFlow.stopWithMessage(message: String) { @@ -92,29 +93,20 @@ abstract class BatchDataLoader( return@collect } - // Skip rows that are not processable - if (!isProcessable(row)) { - previousTimestamp = currentTimestamp - skippedRows.incrementAndGet() - return@collect - } - // If timestamp changed and buffer is full, flush the buffer - if (previousTimestamp != null && currentTimestamp != previousTimestamp) { - if (buffer.size >= batchSize) { - result += flushBuffer(groupId, buffer, batchNo) { batch -> - if (batch.success) { - lastLoadedTimestamp = - previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") - } - EtlLoadingResult( - errorMessage = if (!batch.success) result.errorMessage else null, - lastProcessedAt = lastLoadedTimestamp, - processedRows = if (batch.success) batch.rowsLoaded else 0L, - duration = batch.duration - ).also { - onLoadingProgress(it) - } + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.size >= batchSize) { + result += flushBuffer(groupId, buffer, batchNo) { batch -> + if (batch.success) { + lastLoadedTimestamp = + previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") + } + EtlLoadingResult( + errorMessage = if (!batch.success) result.errorMessage else null, + lastProcessedAt = lastLoadedTimestamp, + processedRows = if (batch.success) batch.rowsLoaded else 0L, + duration = batch.duration + ).also { + onLoadingProgress(it) } } } @@ -124,6 +116,24 @@ abstract class BatchDataLoader( return@collect } + // Skip rows that are not processable + if (!isProcessable(row)) { + previousTimestamp = currentTimestamp + skippedRows.incrementAndGet() + skippedRowsForUpdate++ + // If timestamp changed and there are a lot of skipped rows, update progress + if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) { + onLoadingProgress( + EtlLoadingResult( + lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"), + processedRows = 0, + ) + ) + skippedRowsForUpdate = 0 + } + return@collect + } + buffer += row previousTimestamp = currentTimestamp loadedRows.incrementAndGet() diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt index 058915af2..0a4e2df31 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlPipelineImpl.kt @@ -26,7 +26,6 @@ import com.epam.drill.admin.etl.EtlRow import com.epam.drill.admin.etl.EtlStatus import com.epam.drill.admin.etl.NopTransformer import com.epam.drill.admin.etl.flow.CompletableSharedFlow -import com.epam.drill.admin.etl.impl.EtlPipelineImpl import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers @@ -192,7 +191,7 @@ class EtlPipelineImpl( sinceTimestamp: Instant, untilTimestamp: Instant, flow: Flow, - onLoadingProgress: suspend (loaderNmae: String, result: EtlLoadingResult) -> Unit, + onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit, onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit ): EtlLoadingResult = try { transformer.transform(groupId, flow).let { flow -> diff --git a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt index 6c3d3e381..4c2b95923 100644 --- a/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt +++ b/admin-metrics/src/test/kotlin/com/epam/drill/admin/metrics/TestDataDsl.kt @@ -53,7 +53,9 @@ fun havingData(testsData: suspend TestDataDsl.() -> Unit): HttpClient { metricsRoutes() metricsManagementRoutes() }.drillClient().apply { - testsData(TestDataDsl(this)) + val testDataDsl = TestDataDsl(this) + testsData(testDataDsl) + testDataDsl.build() refreshMetrics() } } @@ -86,12 +88,17 @@ data class ImpactedMethods( ) class TestDataDsl(val client: HttpClient) { - private val builds = mutableMapOf>() + private val builds = linkedMapOf>() + + suspend fun build() { + builds.forEach { (b, m) -> + client.deployInstance(b, m.toTypedArray()) + } + } suspend infix fun InstancePayload.has(methods: List) { val newMethods = methods.recalcProbesStartPos() builds.put(this, ArrayList(newMethods)) - client.deployInstance(this, newMethods.toTypedArray()) } suspend infix fun InstancePayload.hasModified(method: SingleMethodPayload) = @@ -134,7 +141,6 @@ class TestDataDsl(val client: HttpClient) { } val newTargetMethods = targetMethods.recalcProbesStartPos() builds.put(this.build, newTargetMethods) - client.deployInstance(this.build, newTargetMethods.toTypedArray()) } private fun List.recalcProbesStartPos(): MutableList { @@ -154,7 +160,6 @@ class TestDataDsl(val client: HttpClient) { ) { val otherMethods = builds.getOrDefault(baseline, ArrayList()) builds.put(this, ArrayList(otherMethods)) - client.deployInstance(this, otherMethods.toTypedArray()) } private fun isSignatureEqual(one: SingleMethodPayload, other: SingleMethodPayload) = diff --git a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt index 05c90eee5..249a61e9b 100644 --- a/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt +++ b/admin-writer/src/main/kotlin/com/epam/drill/admin/writer/rawdata/repository/impl/MethodRepositoryImpl.kt @@ -25,9 +25,13 @@ import org.jetbrains.exposed.sql.batchUpsert import org.jetbrains.exposed.sql.deleteWhere import java.time.LocalDate -class MethodRepositoryImpl: MethodRepository { +class MethodRepositoryImpl : MethodRepository { override suspend fun createMany(data: List) { - MethodTable.batchUpsert(data, shouldReturnGeneratedValues = false) { + MethodTable.batchUpsert( + data, + shouldReturnGeneratedValues = false, + onUpdateExclude = listOf(MethodTable.probesStartPos, MethodTable.bodyChecksum, MethodTable.probesCount) + ) { this[MethodTable.id] = it.id this[MethodTable.groupId] = it.groupId this[MethodTable.appId] = it.appId @@ -38,7 +42,7 @@ class MethodRepositoryImpl: MethodRepository { this[MethodTable.returnType] = it.returnType this[MethodTable.probesStartPos] = it.probesStartPos this[MethodTable.bodyChecksum] = it.bodyChecksum - this[MethodTable.signature] = it .signature + this[MethodTable.signature] = it.signature this[MethodTable.probesCount] = it.probesCount it.annotations?.let { annotations -> this[MethodTable.annotations] = annotations.takeIf { it.isNotEmpty() }?.toString() @@ -56,8 +60,8 @@ class MethodRepositoryImpl: MethodRepository { override suspend fun deleteAllByBuildId(groupId: String, appId: String, buildId: String) { MethodTable.deleteWhere { (MethodTable.groupId eq groupId) and - (MethodTable.appId eq appId) and - (MethodTable.buildId eq buildId) + (MethodTable.appId eq appId) and + (MethodTable.buildId eq buildId) } } } \ No newline at end of file