Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ abstract class BatchDataLoader<T : EtlRow>(
val loadedRows = AtomicLong(0)
val skippedRows = AtomicLong(0)
val buffer = mutableListOf<T>()
var skippedRowsForUpdate = 0L
var lastLoadedTimestamp: Instant = sinceTimestamp
var previousTimestamp: Instant? = null
suspend fun <T> StoppableFlow<T>.stopWithMessage(message: String) {
Expand Down Expand Up @@ -92,29 +93,20 @@ abstract class BatchDataLoader<T : EtlRow>(
return@collect
}

// Skip rows that are not processable
if (!isProcessable(row)) {
previousTimestamp = currentTimestamp
skippedRows.incrementAndGet()
return@collect
}

// If timestamp changed and buffer is full, flush the buffer
if (previousTimestamp != null && currentTimestamp != previousTimestamp) {
if (buffer.size >= batchSize) {
result += flushBuffer(groupId, buffer, batchNo) { batch ->
if (batch.success) {
lastLoadedTimestamp =
previousTimestamp ?: throw IllegalStateException("Previous timestamp is null")
}
EtlLoadingResult(
errorMessage = if (!batch.success) result.errorMessage else null,
lastProcessedAt = lastLoadedTimestamp,
processedRows = if (batch.success) batch.rowsLoaded else 0L,
duration = batch.duration
).also {
onLoadingProgress(it)
}
if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.size >= batchSize) {
result += flushBuffer(groupId, buffer, batchNo) { batch ->
if (batch.success) {
lastLoadedTimestamp =
previousTimestamp ?: throw IllegalStateException("Previous timestamp is null")
}
EtlLoadingResult(
errorMessage = if (!batch.success) result.errorMessage else null,
lastProcessedAt = lastLoadedTimestamp,
processedRows = if (batch.success) batch.rowsLoaded else 0L,
duration = batch.duration
).also {
onLoadingProgress(it)
}
}
}
Expand All @@ -124,6 +116,24 @@ abstract class BatchDataLoader<T : EtlRow>(
return@collect
}

// Skip rows that are not processable
if (!isProcessable(row)) {
previousTimestamp = currentTimestamp
skippedRows.incrementAndGet()
skippedRowsForUpdate++
// If timestamp changed and there are a lot of skipped rows, update progress
if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) {
onLoadingProgress(
EtlLoadingResult(
lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"),
processedRows = 0,
)
)
skippedRowsForUpdate = 0
}
return@collect
}

buffer += row
previousTimestamp = currentTimestamp
loadedRows.incrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import com.epam.drill.admin.etl.EtlRow
import com.epam.drill.admin.etl.EtlStatus
import com.epam.drill.admin.etl.NopTransformer
import com.epam.drill.admin.etl.flow.CompletableSharedFlow
import com.epam.drill.admin.etl.impl.EtlPipelineImpl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -192,7 +191,7 @@ class EtlPipelineImpl<T : EtlRow, R : EtlRow>(
sinceTimestamp: Instant,
untilTimestamp: Instant,
flow: Flow<T>,
onLoadingProgress: suspend (loaderNmae: String, result: EtlLoadingResult) -> Unit,
onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit,
onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit
): EtlLoadingResult = try {
transformer.transform(groupId, flow).let { flow ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ fun havingData(testsData: suspend TestDataDsl.() -> Unit): HttpClient {
metricsRoutes()
metricsManagementRoutes()
}.drillClient().apply {
testsData(TestDataDsl(this))
val testDataDsl = TestDataDsl(this)
testsData(testDataDsl)
testDataDsl.build()
refreshMetrics()
}
}
Expand Down Expand Up @@ -86,12 +88,17 @@ data class ImpactedMethods(
)

class TestDataDsl(val client: HttpClient) {
private val builds = mutableMapOf<InstancePayload, MutableList<SingleMethodPayload>>()
private val builds = linkedMapOf<InstancePayload, MutableList<SingleMethodPayload>>()

suspend fun build() {
builds.forEach { (b, m) ->
client.deployInstance(b, m.toTypedArray())
}
}

suspend infix fun InstancePayload.has(methods: List<SingleMethodPayload>) {
val newMethods = methods.recalcProbesStartPos()
builds.put(this, ArrayList(newMethods))
client.deployInstance(this, newMethods.toTypedArray())
}

suspend infix fun InstancePayload.hasModified(method: SingleMethodPayload) =
Expand Down Expand Up @@ -134,7 +141,6 @@ class TestDataDsl(val client: HttpClient) {
}
val newTargetMethods = targetMethods.recalcProbesStartPos()
builds.put(this.build, newTargetMethods)
client.deployInstance(this.build, newTargetMethods.toTypedArray())
}

private fun List<SingleMethodPayload>.recalcProbesStartPos(): MutableList<SingleMethodPayload> {
Expand All @@ -154,7 +160,6 @@ class TestDataDsl(val client: HttpClient) {
) {
val otherMethods = builds.getOrDefault(baseline, ArrayList())
builds.put(this, ArrayList(otherMethods))
client.deployInstance(this, otherMethods.toTypedArray())
}

private fun isSignatureEqual(one: SingleMethodPayload, other: SingleMethodPayload) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ import org.jetbrains.exposed.sql.batchUpsert
import org.jetbrains.exposed.sql.deleteWhere
import java.time.LocalDate

class MethodRepositoryImpl: MethodRepository {
class MethodRepositoryImpl : MethodRepository {
override suspend fun createMany(data: List<Method>) {
MethodTable.batchUpsert(data, shouldReturnGeneratedValues = false) {
MethodTable.batchUpsert(
data,
shouldReturnGeneratedValues = false,
onUpdateExclude = listOf(MethodTable.probesStartPos, MethodTable.bodyChecksum, MethodTable.probesCount)
) {
this[MethodTable.id] = it.id
this[MethodTable.groupId] = it.groupId
this[MethodTable.appId] = it.appId
Expand All @@ -38,7 +42,7 @@ class MethodRepositoryImpl: MethodRepository {
this[MethodTable.returnType] = it.returnType
this[MethodTable.probesStartPos] = it.probesStartPos
this[MethodTable.bodyChecksum] = it.bodyChecksum
this[MethodTable.signature] = it .signature
this[MethodTable.signature] = it.signature
this[MethodTable.probesCount] = it.probesCount
it.annotations?.let { annotations ->
this[MethodTable.annotations] = annotations.takeIf { it.isNotEmpty() }?.toString()
Expand All @@ -56,8 +60,8 @@ class MethodRepositoryImpl: MethodRepository {
override suspend fun deleteAllByBuildId(groupId: String, appId: String, buildId: String) {
MethodTable.deleteWhere {
(MethodTable.groupId eq groupId) and
(MethodTable.appId eq appId) and
(MethodTable.buildId eq buildId)
(MethodTable.appId eq appId) and
(MethodTable.buildId eq buildId)
}
}
}
Loading