Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions admin-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,7 @@ drill {
loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY}
consistencyWindow = 0
consistencyWindow = ${?DRILL_ETL_CONSISTENCY_WINDOW}
processingDelay = 0
processingDelay = ${?DRILL_ETL_PROCESSING_DELAY}
}
}
10 changes: 10 additions & 0 deletions admin-app/src/main/resources/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,16 @@ paths:
description: Optional root node identifier to retrieve a specific subtree of the treemap hierarchy. Used for drill-down navigation.
schema:
type: string
- name: testSessionId
in: query
description: Filter coverage by test session ID. When specified, only coverage data from the given test session is included.
schema:
type: string
- name: testDefinitionId
in: query
description: Filter coverage by test definition ID. Requires testSessionId to be specified. When provided, only coverage data from the specific test definition is included.
schema:
type: string
responses:
'200':
description: Coverage treemap data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
result += flushBuffer(groupId, buffer, batchNo) { batch ->
if (batch.success) {
lastLoadedTimestamp =
previousTimestamp ?: throw IllegalStateException("Previous timestamp is null")

Check failure on line 101 in admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Previous timestamp is null" 3 times.

See more on https://sonarcloud.io/project/issues?id=Drill4J_admin&issues=AZ3AA1xjeo-0ZjtOpVhD&open=AZ3AA1xjeo-0ZjtOpVhD&pullRequest=471
}
EtlLoadingResult(
errorMessage = if (!batch.success) result.errorMessage else null,
Expand All @@ -125,7 +125,8 @@
if (previousTimestamp != null && currentTimestamp != previousTimestamp && buffer.isEmpty() && skippedRowsForUpdate >= batchSize) {
onLoadingProgress(
EtlLoadingResult(
lastProcessedAt = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null"),
lastProcessedAt = previousTimestamp
?: throw IllegalStateException("Previous timestamp is null"),
processedRows = 0,
)
)
Expand Down Expand Up @@ -154,7 +155,8 @@
// Commit any remaining rows in the buffer
result += flushBuffer(groupId, buffer, batchNo) { batch ->
if (batch.success) {
lastLoadedTimestamp = untilTimestamp
lastLoadedTimestamp = previousTimestamp
?: throw IllegalStateException("Previous timestamp is null")
}
EtlLoadingResult(
errorMessage = if (!batch.success) batch.errorMessage else null,
Expand All @@ -166,11 +168,13 @@
}
}
} else {
// Update last processed timestamp even if no rows were left in the buffer
result += EtlLoadingResult(
lastProcessedAt = untilTimestamp
).also {
onLoadingProgress(it)
// Update last processed timestamp even if no rows were loaded
if (lastLoadedTimestamp == sinceTimestamp) {
result += EtlLoadingResult(
lastProcessedAt = untilTimestamp
).also {
onLoadingProgress(it)
}
}
}
onStatusChanged(EtlStatus.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ open class EtlOrchestratorImpl(
open val pipelines: List<EtlPipeline<*, *>>,
open val metadataRepository: EtlMetadataRepository,
open val consistencyWindow: Long = 0,
open val processingDelay: Long = 0,
) : EtlOrchestrator {
private val logger = KotlinLogging.logger {}

override suspend fun run(groupId: String, initTimestamp: Instant): List<EtlProcessingResult> =
withContext(Dispatchers.IO) {
logger.info("ETL [$name] for group [$groupId] is starting with init timestamp $initTimestamp...")
logger.info("ETL [$name] for group [$groupId] is starting...")
val results = Collections.synchronizedList(mutableListOf<EtlProcessingResult>())
val duration = measureTimeMillis {
trackProgressOf {
Expand Down Expand Up @@ -98,7 +99,7 @@ open class EtlOrchestratorImpl(
pipeline: EtlPipeline<*, *>,
initTimestamp: Instant
): EtlProcessingResult = coroutineScope {
val snapshotTime = Instant.now()
val snapshotTime = Instant.now().minusSeconds(processingDelay)
val metadata = metadataRepository.getAllMetadataByExtractor(groupId, pipeline.name, pipeline.extractor.name)
.associateBy { it.loaderName }
val loaderNames = pipeline.loaders.map { it.second.name }.toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class PageDataExtractor<T : EtlRow>(
try {
while (hasMore && currentSince < untilTimestamp) {
page.incrementAndGet()
logger.debug { "ETL extractor [$name] for group [$groupId] is executing query for page ${page.get()} since $currentSince ..." }
logger.debug { "ETL extractor [$name] for group [$groupId] is executing query for page ${page.get()} since $currentSince until $untilTimestamp ..." }

var previousTimestamp: Instant? = null
var previousEmittedTimestamp: Instant? = null
Expand Down Expand Up @@ -95,18 +95,22 @@ abstract class PageDataExtractor<T : EtlRow>(
if (pageRows == 0L || pageRows < extractionLimit) {
hasMore = false
emitBuffer(buffer, emitter)
logger.debug { "ETL extractor [$name] for group [$groupId] completed fetching" +
", rows fetched: ${rowsFetched.get()}" +
", total pages: ${page.get()}" +
", last extracted at $currentSince" }
previousEmittedTimestamp = previousTimestamp
logger.debug {
"ETL extractor [$name] for group [$groupId] completed fetching" +
", rows fetched: ${rowsFetched.get()}" +
", total pages: ${page.get()}" +
(if (previousEmittedTimestamp != null) ", last extracted at $previousEmittedTimestamp" else "")
}
} else {
currentSince = previousEmittedTimestamp ?: throw IllegalStateException(
"No rows were emitted on page $page because all fetched records had the same timestamp. Please increase the extraction limit. Current is $extractionLimit."
)
// Remove rows from buffer that have timestamp greater than currentSince to avoid re-emission on the next page
buffer.removeIf { it.timestamp > currentSince }
hasMore = true
logger.debug { "ETL extractor [$name] for group [$groupId] fetched $pageRows rows on page ${page.get()}, last extracted at $currentSince" }
logger.debug { "ETL extractor [$name] for group [$groupId] fetched $pageRows rows on page ${page.get()}" +
", last extracted at $previousEmittedTimestamp" }
}
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,11 @@ class EtlConfig(private val config: ApplicationConfig) {
*/
val consistencyWindow : Long
get() = config.propertyOrNull("consistencyWindow")?.getString()?.toLongOrNull() ?: 0L

/**
* Number of seconds to subtract from the current time when calculating the upper bound of the ETL processing window.
* This delays ETL process to allow current transactions in the data source to complete.
*/
val processingDelay : Long
get() = config.propertyOrNull("processingDelay")?.getString()?.toLongOrNull() ?: 0L
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.epam.drill.admin.metrics.etl.methodsPipeline
import com.epam.drill.admin.metrics.etl.buildsPipeline
import com.epam.drill.admin.metrics.etl.coveragePipeline
import com.epam.drill.admin.metrics.etl.testDefinitionsPipeline
import com.epam.drill.admin.metrics.etl.testLaunchCoveragePipeline
import com.epam.drill.admin.metrics.etl.testLaunchesPipeline
import com.epam.drill.admin.metrics.etl.testSessionBuildsPipeline
import com.epam.drill.admin.metrics.etl.testSessionsPipeline
Expand Down Expand Up @@ -63,10 +64,11 @@ val metricsDIModule
pipelines = listOf(
buildsPipeline, methodsPipeline,
testLaunchesPipeline, testDefinitionsPipeline, testSessionsPipeline,
coveragePipeline, testSessionBuildsPipeline
coveragePipeline, testLaunchCoveragePipeline, testSessionBuildsPipeline
),
metadataRepository = instance(),
consistencyWindow = etlConfig.consistencyWindow
consistencyWindow = etlConfig.consistencyWindow,
processingDelay = etlConfig.processingDelay
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ val EtlConfig.coverageExtractor
lastExtractedAtColumnName = "created_at",
)

val EtlConfig.testLaunchCoverageExtractor
get() = UntypedSqlDataExtractor(
name = "test_launch_coverage",
sqlQuery = fromResource("/metrics/db/etl/test_launch_coverage_extractor.sql"),
database = MetricsDatabaseConfig.database,
fetchSize = fetchSize,
extractionLimit = extractionLimit,
loggingFrequency = loggingFrequency,
lastExtractedAtColumnName = "test_completed_at",
)

val EtlConfig.buildMethodTestDefinitionCoverageLoader
get() = UntypedSqlDataLoader(
name = "build_method_test_definition_coverage",
Expand Down Expand Up @@ -161,6 +172,19 @@ val EtlConfig.coveragePipeline
get() = EtlPipelineImpl(
name = "coverage",
extractor = coverageExtractor,
loaders = listOf(
untypedNopTransformer to buildMethodTestSessionCoverageLoader,
buildMethodCoverageTransformer to buildMethodCoverageLoader,
methodDailyCoverageTransformer to methodDailyCoverageLoader,
untypedNopTransformer to testSessionBuildsLoader
),
bufferSize = bufferSize
)

val EtlConfig.testLaunchCoveragePipeline
get() = EtlPipelineImpl(
name = "test_launch_coverage",
extractor = testLaunchCoverageExtractor,
loaders = listOf(
untypedNopTransformer to buildMethodTestDefinitionCoverageLoader,
untypedNopTransformer to buildMethodTestSessionCoverageLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ open class MethodCriteria(
object NONE : MethodCriteria()

open val packageNamePattern: String?
get() = packageName?.let { it.removeSuffix("/") + "/" + "%" }
get() = packageName?.let { "$it%" }

open val signaturePattern: String?
get() = listOf(
className ?: "%",
methodName ?: "%",
methodParams ?: "%",
returnType ?: "%"
className?.takeIf { it.isNotEmpty() } ?: "%",
methodName?.takeIf { it.isNotEmpty() } ?: "%",
methodParams?.takeIf { it.isNotEmpty() } ?: "%",
returnType?.takeIf { it.isNotEmpty() } ?: "%"
).joinToString(":").takeIf { it != "%:%:%:%" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ interface MetricsRepository {
offset: Int? = null, limit: Int? = null
): List<Map<String, Any?>>

suspend fun getMethodsWithCoverageByTestSession(
buildId: String,
testSessionId: String,
testTags: List<String> = emptyList(),
packageNamePattern: String? = null,
methodSignaturePattern: String? = null,
coverageAppEnvIds: List<String> = emptyList(),
): List<Map<String, Any?>>

suspend fun getMethodsWithCoverageByTestDefinition(
buildId: String,
testSessionId: String,
testDefinitionId: String,
packageNamePattern: String? = null,
methodSignaturePattern: String? = null,
coverageAppEnvIds: List<String> = emptyList(),
): List<Map<String, Any?>>


suspend fun getMethodsCount(
buildId: String,
packageNamePattern: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,87 @@ class MetricsRepositoryImpl : MetricsRepository {
}
}

override suspend fun getMethodsWithCoverageByTestSession(
buildId: String,
testSessionId: String,
testTags: List<String>,
packageNamePattern: String?,
methodSignaturePattern: String?,
coverageAppEnvIds: List<String>,
): List<Map<String, Any?>> = transaction {
executeQueryReturnMap {
append(
"""
SELECT
signature,
class_name,
method_name,
method_params,
return_type,
probes_count,
covered_probes AS isolated_covered_probes,
covered_probes AS aggregated_covered_probes,
probes_coverage_ratio AS isolated_probes_coverage_ratio,
probes_coverage_ratio AS aggregated_probes_coverage_ratio
FROM metrics.get_methods_with_coverage_by_test_session(
input_build_id => ?,
input_test_session_id => ?
""".trimIndent(), buildId, testSessionId
)
appendOptional(", input_coverage_test_tags => ?", testTags)
appendOptional(", input_package_name_pattern => ?", packageNamePattern) { "$it%" }
appendOptional(", input_signature_pattern => ?", methodSignaturePattern)
appendOptional(", input_coverage_app_env_ids => ?", coverageAppEnvIds)
append(
"""
)
ORDER BY signature
""".trimIndent()
)
}
}

override suspend fun getMethodsWithCoverageByTestDefinition(
buildId: String,
testSessionId: String,
testDefinitionId: String,
packageNamePattern: String?,
methodSignaturePattern: String?,
coverageAppEnvIds: List<String>,
): List<Map<String, Any?>> = transaction {
executeQueryReturnMap {
append(
"""
SELECT
signature,
class_name,
method_name,
method_params,
return_type,
probes_count,
covered_probes AS isolated_covered_probes,
covered_probes AS aggregated_covered_probes,
probes_coverage_ratio AS isolated_probes_coverage_ratio,
probes_coverage_ratio AS aggregated_probes_coverage_ratio
FROM metrics.get_methods_with_coverage_by_test_definition(
input_build_id => ?,
input_test_session_id => ?,
input_test_definition_id => ?
""".trimIndent(), buildId, testSessionId, testDefinitionId
)
appendOptional(", input_package_name_pattern => ?", packageNamePattern) { "$it%" }
appendOptional(", input_signature_pattern => ?", methodSignaturePattern)
appendOptional(", input_coverage_app_env_ids => ?", coverageAppEnvIds)
append(
"""
)
ORDER BY signature
""".trimIndent()
)
}
}


override suspend fun getChangesWithCoverage(
buildId: String,
baselineBuildId: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class Metrics {
val branch: String? = null,
val packageNamePattern: String? = null,
val classNamePattern: String? = null,
val rootId: String? = null
val rootId: String? = null,
val testSessionId: String? = null,
val testDefinitionId: String? = null
)

@Resource("/changes-coverage-treemap")
Expand Down Expand Up @@ -327,7 +329,9 @@ fun Route.getCoverageTreemap() {
params.branch,
params.packageNamePattern,
params.classNamePattern,
params.rootId
params.rootId,
params.testSessionId,
params.testDefinitionId
)
this.call.respond(HttpStatusCode.OK, ApiResponse(treemap))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ interface MetricsService {
packageNamePattern: String?,
classNamePattern: String?,
rootId: String?,
testSessionId: String? = null,
testDefinitionId: String? = null,
): List<Any>

suspend fun getChangesCoverageTreemap(
Expand Down
Loading
Loading