Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3a81614
feat: add the "extractionLimit" parameter, which limits the data extr…
iryabov Jan 15, 2026
486ccc7
feat: enhance logging
iryabov Jan 16, 2026
823f026
feat: implement PageDataExtractor for paginated data extraction with …
iryabov Jan 16, 2026
4413c06
feat: add debug logging for initial row loading in BatchDataLoader
iryabov Jan 16, 2026
0576bc5
feat: introduce DataTransformer interface and implement UntypedGroupA…
iryabov Jan 20, 2026
cea237b
feat: implement LRU cache for efficient row aggregation in UntypedAgg…
iryabov Jan 21, 2026
59e9f28
refactor: rename emitting channel
iryabov Jan 21, 2026
3430287
feat: store coverage per method in method_coverage table
RomanDavlyatshin Jan 21, 2026
95c6cfd
feat: add logging for transformation process and buffer size
iryabov Jan 21, 2026
36ee6a1
feat: remove redundant classname field for coverage
RomanDavlyatshin Jan 21, 2026
79b76f4
fix: rename view to v5
RomanDavlyatshin Jan 21, 2026
0f7e36a
fix: rename view to v5
RomanDavlyatshin Jan 21, 2026
c6811c3
feat: add probes_count to method_coverage_table
RomanDavlyatshin Jan 21, 2026
4cd9322
feat: add migration from raw_data.coverage to raw_data.method_coverage
RomanDavlyatshin Jan 21, 2026
b8244a5
fix: migration
RomanDavlyatshin Jan 21, 2026
e7da55b
feat: add a condition to exclude the disappearance of failure statuse…
iryabov Jan 22, 2026
26c4c3a
feat: add build_id to method coverage and update related queries
iryabov Jan 23, 2026
75b2def
Merge branch 'feature/store-coverage-per-method' into feature/etl-tra…
iryabov Jan 23, 2026
3454d61
feat: add transformers for test to code mapping and method loading
iryabov Jan 23, 2026
45618b5
feat: add logging frequency configuration and implement progress logg…
iryabov Jan 27, 2026
197803b
feat: add logging frequency configuration and implement progress logg…
iryabov Jan 27, 2026
ede7c5c
refactor: remove duplicates of progress logging methods in ETL orches…
iryabov Jan 27, 2026
73725c8
feat: enhance ETL process by improving skipped row handling and progr…
iryabov Jan 27, 2026
7757946
chore: add license
iryabov Jan 27, 2026
0e77b24
Merge branch 'feature/etl-progress-logging-EPMDJ-11186' into feature/…
iryabov Jan 27, 2026
7388dd6
feat: improve skipped row handling in BatchDataLoader with progress u…
iryabov Jan 27, 2026
3c88643
fix: correct typo in parameter name for onLoadingProgress in EtlPipel…
iryabov Jan 27, 2026
1d99303
Merge branch 'fix/do-not-update-methods' into feature/etl-skipped-row…
iryabov Feb 2, 2026
8c618ef
Merge pull request #458 from Drill4J/feature/etl-skipped-rows-EPMDJ-1…
iryabov Feb 2, 2026
4772fc6
fix: remove extra logging job from EtlOrchestratorImpl
iryabov Feb 2, 2026
c1c1e4b
Merge pull request #457 from Drill4J/feature/etl-progress-logging-EPM…
iryabov Feb 2, 2026
f375e02
Merge pull request #456 from Drill4J/feature/etl-transformation-EPMDJ…
iryabov Feb 2, 2026
4fd3d00
Merge pull request #455 from Drill4J/feature/store-coverage-per-method
iryabov Feb 2, 2026
36513f9
Merge remote-tracking branch 'origin/main' into feature/etl-optimization
iryabov Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions admin-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,11 @@ drill {
fetchSize = ${?DRILL_ETL_FETCH_SIZE}
batchSize = 1000
batchSize = ${?DRILL_ETL_BATCH_SIZE}
extractionLimit = 1000000
extractionLimit = ${?DRILL_ETL_EXTRACTION_LIMIT}
transformationBufferSize = 2000
transformationBufferSize = ${?DRILL_ETL_TRANSFORMATION_BUFFER_SIZE}
loggingFrequency = 10
loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import java.time.Instant
* - how data will be transformed,
* - how metrics will be stored.
*/
interface DataExtractor<T> {
interface DataExtractor<T: EtlRow> {
val name: String
suspend fun extract(
groupId: String,
sinceTimestamp: Instant,
untilTimestamp: Instant,
emitter: FlowCollector<T>,
onExtractCompleted: suspend (EtlExtractingResult) -> Unit
onExtractingProgress: suspend (EtlExtractingResult) -> Unit
)
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import java.time.Instant
* - Uses `batchSize` to control transaction size and DB pressure.
* - Is resilient to partial failures
*/
interface DataLoader<T> {
interface DataLoader<T: EtlRow> {
val name: String
suspend fun load(
groupId: String,
sinceTimestamp: Instant,
untilTimestamp: Instant,
collector: Flow<T>,
onLoadCompleted: suspend (EtlLoadingResult) -> Unit
onLoadingProgress: suspend (EtlLoadingResult) -> Unit = {},
onStatusChanged: suspend (EtlStatus) -> Unit = {},
): EtlLoadingResult

suspend fun deleteAll(groupId: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.admin.etl

import kotlinx.coroutines.flow.Flow

interface DataTransformer<in T: EtlRow, out R: EtlRow> {
val name: String
suspend fun transform(
groupId: String,
collector: Flow<T>,
): Flow<R>
}

class NopTransformer<T: EtlRow> : DataTransformer<T, T> {
override val name: String = "nop-transformer"
override suspend fun transform(
groupId: String,
collector: Flow<T>,
): Flow<T> = collector
}

val untypedNopTransformer = NopTransformer<UntypedRow>()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.epam.drill.admin.etl

class EtlExtractingResult(
val success: Boolean,
val duration: Long,
val duration: Long = 0,
val errorMessage: String? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ import java.time.Instant
data class EtlLoadingResult(
val lastProcessedAt: Instant,
val processedRows: Long = 0L,
val status: EtlStatus,
val duration: Long? = null,
val errorMessage: String? = null
) : Comparable<EtlLoadingResult> {
val isFailed
get() = status == EtlStatus.FAILED
get() = errorMessage != null

operator fun plus(other: EtlLoadingResult): EtlLoadingResult {
val failed = this.isFailed || other.isFailed
return EtlLoadingResult(
status = if (!failed) other.status else EtlStatus.FAILED,
lastProcessedAt = if (!failed) other.lastProcessedAt else this.lastProcessedAt,
processedRows = this.processedRows + other.processedRows,
duration = listOfNotNull(this.duration, other.duration).sum(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ data class EtlMetadata(
val loaderName: String,
val lastProcessedAt: Instant,
val lastRunAt: Instant,
val lastDuration: Long,
val lastRowsProcessed: Long,
val lastLoadDuration: Long = 0L,
val lastExtractDuration: Long = 0L,
val lastRowsProcessed: Long = 0L,
val status: EtlStatus,
val errorMessage: String?,
val errorMessage: String? = null,
)

Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,35 @@
*/
package com.epam.drill.admin.etl

import java.time.Instant

interface EtlMetadataRepository {
suspend fun getAllMetadata(groupId: String): List<EtlMetadata>
suspend fun getAllMetadataByExtractor(groupId: String, pipelineName: String, extractorName: String): List<EtlMetadata>
suspend fun getMetadata(groupId: String, pipelineName: String, extractorName: String, loaderName: String): EtlMetadata?
suspend fun getAllMetadataByExtractor(
groupId: String,
pipelineName: String,
extractorName: String
): List<EtlMetadata>

suspend fun getMetadata(
groupId: String,
pipelineName: String,
extractorName: String,
loaderName: String
): EtlMetadata?

suspend fun saveMetadata(metadata: EtlMetadata)
suspend fun accumulateMetadata(metadata: EtlMetadata)
suspend fun accumulateMetadataByLoader(
groupId: String, pipelineName: String, extractorName: String, loaderName: String,
lastProcessedAt: Instant? = null,
status: EtlStatus? = null, loadDuration: Long = 0L, rowsProcessed: Long = 0L,
errorMessage: String? = null
)
suspend fun deleteMetadataByPipeline(groupId: String, pipelineName: String)
suspend fun accumulateMetadataDurationByExtractor(groupId: String, pipelineName: String, extractorName: String, duration: Long)
suspend fun accumulateMetadataByExtractor(
groupId: String, pipelineName: String, extractorName: String,
status: EtlStatus? = null, extractDuration: Long = 0L,
errorMessage: String? = null
)
}

Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ import java.time.Instant
* - failures in loaders or extractor are propagated to the pipeline,
* - pipeline may cancel child coroutines and report status to `EtlOrchestrator`.
*/
interface EtlPipeline<T> {
interface EtlPipeline<in T : EtlRow, out R : EtlRow> {
val name: String
val extractor: DataExtractor<T>
val loaders: List<DataLoader<T>>
val extractor: DataExtractor<in T>
val loaders: List<Pair<DataTransformer<T, R>, DataLoader<out R>>>
suspend fun execute(
groupId: String,
sinceTimestampPerLoader: Map<String, Instant>,
untilTimestamp: Instant,
onExtractCompleted: suspend (EtlExtractingResult) -> Unit = {},
onLoadCompleted: suspend (loaderName: String, result: EtlLoadingResult) -> Unit = { _, _ -> },
onExtractingProgress: suspend (EtlExtractingResult) -> Unit = {},
onLoadingProgress: suspend (loaderName: String, result: EtlLoadingResult) -> Unit = { _, _ -> },
onStatusChanged: suspend (loaderName: String, status: EtlStatus) -> Unit = { _, _ -> },
): EtlProcessingResult

suspend fun cleanUp(groupId: String)
Expand Down
22 changes: 22 additions & 0 deletions admin-etl/src/main/kotlin/com/epam/drill/admin/etl/EtlRow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.admin.etl

import java.time.Instant

open class EtlRow(
val timestamp: Instant,
)
23 changes: 23 additions & 0 deletions admin-etl/src/main/kotlin/com/epam/drill/admin/etl/UntypedRow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.admin.etl

import java.time.Instant

class UntypedRow(
timestamp: Instant,
map: Map<String, Any?>
): EtlRow(timestamp), Map<String, Any?> by HashMap(map)
55 changes: 55 additions & 0 deletions admin-etl/src/main/kotlin/com/epam/drill/admin/etl/flow/LruMap.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright 2020 - 2022 EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.epam.drill.admin.etl.flow

/**
* A simple LRU (Least Recently Used) map implementation that evicts the oldest entry
* when the maximum size is reached.
*/
class LruMap<K, V>(
private val maxSize: Int,
private val onEvict: (K, V) -> Unit
) {
private val map = LinkedHashMap<K, V>(16, 0.75f, true)

val size: Int
get() = map.size

fun compute(key: K, update: (V?) -> V) {
map[key] = update(map[key])
if (map.size >= maxSize) {
evictOldest()
}
}

fun evictOldest() {
val it = map.entries.iterator()
if (!it.hasNext()) return

val entry = it.next()
it.remove()
onEvict(entry.key, entry.value)
}

fun evictAll() {
val it = map.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
it.remove()
onEvict(entry.key, entry.value)
}
}
}
Loading
Loading