Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions admin-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ drill {
fetchSize = ${?DRILL_ETL_FETCH_SIZE}
batchSize = 1000
batchSize = ${?DRILL_ETL_BATCH_SIZE}
vacuumAfterRows = 100000
vacuumAfterRows = ${?DRILL_ETL_VACUUM_AFTER_ROWS}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger

abstract class BatchDataLoader<T>(
override val name: String,
open val batchSize: Int = 1000
open val batchSize: Int = 1000,
private val vacuumAfterRows: Int,
) : DataLoader<T> {
private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -91,10 +92,10 @@ abstract class BatchDataLoader<T>(
previousTimestamp = currentTimestamp
return@collect
}

// If timestamp changed and buffer is full, flush the buffer
if (previousTimestamp != null && currentTimestamp != previousTimestamp) {
if (buffer.size >= batchSize) {
val before = result.processedRows / vacuumAfterRows
result += flushBuffer(groupId, buffer, batchNo) { batch ->
if (batch.success) {
lastLoadedTimestamp = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null")
Expand All @@ -109,6 +110,10 @@ abstract class BatchDataLoader<T>(
onLoadCompleted(it)
}
}
val after = result.processedRows / vacuumAfterRows
if (!result.isFailed && (after > before)) {
vacuum(groupId)
}
}
}

Expand Down Expand Up @@ -156,6 +161,7 @@ abstract class BatchDataLoader<T>(
abstract fun getLastExtractedTimestamp(args: T): Instant?
abstract fun isProcessable(args: T): Boolean
abstract suspend fun loadBatch(groupId: String, batch: List<T>, batchNo: Int): BatchResult
abstract suspend fun vacuum(groupId: String)

private suspend fun flushBuffer(
groupId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,47 @@ import org.jetbrains.exposed.sql.statements.Statement
import org.jetbrains.exposed.sql.statements.StatementType
import org.jetbrains.exposed.sql.statements.api.PreparedStatementApi
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import java.sql.Connection
import kotlin.system.measureTimeMillis

abstract class SqlDataLoader<T>(
override val name: String,
override val batchSize: Int,
open val sqlUpsert: String,
open val sqlDelete: String,
open val database: Database
) : BatchDataLoader<T>(name, batchSize) {
private val sqlVacuum: String?,
private val vacuumEnabled: Boolean,
vacuumAfterRows: Int,
open val database: Database,
) : BatchDataLoader<T>(name, batchSize, vacuumAfterRows) {
private val logger = KotlinLogging.logger {}

abstract fun prepareSql(sql: String): PreparedSql<T>

override suspend fun vacuum(groupId: String) {
if (!vacuumEnabled) return

if (sqlVacuum.isNullOrBlank()) {
logger.error { "Loader [$name] could not run vacuum - no SQL query provided" }
return
}

logger.debug { "Loader [$name] running vacuum for [$groupId]" }
val duration = try {
measureTimeMillis {
(database.connector().connection as Connection).use { connection: Connection? ->
connection?.createStatement().use { statement: java.sql.Statement? ->
statement?.execute(sqlVacuum)
}
}
}
} catch (e: Exception) {
logger.error(e) { "Error during vacuum with loader [$name] for group [$groupId]: ${e.message ?: e.javaClass.simpleName}" }
throw e
}
logger.debug { "Loader [$name] ran vacuum for group [$groupId] in ${duration}ms" }
}

override suspend fun loadBatch(
groupId: String,
batch: List<T>,
Expand Down Expand Up @@ -100,4 +129,6 @@ abstract class SqlDataLoader<T>(
}
logger.debug { "Loader [$name] deleted data for groupId $groupId in ${duration}ms" }
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package com.epam.drill.admin.etl.impl

import org.jetbrains.exposed.sql.Database
import java.time.Instant
import java.util.Date
import java.util.*

class UntypedSqlDataLoader(
name: String,
sqlUpsert: String,
sqlDelete: String,
sqlVacuum: String? = null,
vacuumEnabled: Boolean = false,
vacuumAfterRows: Int = 100_000,
database: Database,
private val lastExtractedAtColumnName: String,
batchSize: Int = 1000,
val processable: (Map<String, Any?>) -> Boolean = { true }
) : SqlDataLoader<Map<String, Any?>>(name, batchSize, sqlUpsert, sqlDelete, database) {
) : SqlDataLoader<Map<String, Any?>>(name, batchSize, sqlUpsert, sqlDelete, sqlVacuum, vacuumEnabled, vacuumAfterRows, database) {

override fun prepareSql(sql: String): PreparedSql<Map<String, Any?>> {
return UntypedPreparedSql.prepareSql(sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ class BatchDataLoaderTest {

private class TestItem(val timestamp: Instant, val data: String, val processable: Boolean = true)

private class TestBatchDataLoader(
private open class TestBatchDataLoader(
batchSize: Int,
vacuumAfterRows: Int,
private val failOnBatch: Int = -1,
private val failOnTimestamp: Boolean = false,
private val outOfOrder: Boolean = false
) : BatchDataLoader<TestItem>("test-loader", batchSize) {
) : BatchDataLoader<TestItem>("test-loader", batchSize, vacuumAfterRows) {

val loadedBatches = mutableListOf<List<TestItem>>()

Expand All @@ -52,13 +53,16 @@ class BatchDataLoaderTest {
return BatchResult(success = true, rowsLoaded = batch.size.toLong())
}

override suspend fun vacuum(groupId: String) {
}

override suspend fun deleteAll(groupId: String) {
}
}

@Test
fun `load should handle empty flow`() = runBlocking {
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf()) { }
assertEquals(EtlStatus.SUCCESS, result.status)
assertEquals(0, result.processedRows)
Expand All @@ -68,7 +72,7 @@ class BatchDataLoaderTest {
@Test
fun `load should process single batch for flow smaller than batch size`() = runBlocking {
val items = (1..5).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") }
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(EtlStatus.SUCCESS, result.status)
Expand All @@ -80,7 +84,7 @@ class BatchDataLoaderTest {
@Test
fun `load should process multiple batches`() = runBlocking {
val items = (1..25).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") }
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val results = mutableListOf<EtlLoadingResult>()
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { results.add(it) }

Expand All @@ -104,7 +108,7 @@ class BatchDataLoaderTest {
TestItem(Instant.ofEpochSecond(2), "item2", processable = false),
TestItem(Instant.ofEpochSecond(3), "item3")
)
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(EtlStatus.SUCCESS, result.status)
Expand All @@ -118,7 +122,7 @@ class BatchDataLoaderTest {
@Test
fun `load should fail on batch processing error`() = runBlocking {
val items = (1..15).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") }
val loader = TestBatchDataLoader(batchSize = 10, failOnBatch = 2)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20, failOnBatch = 2)
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(EtlStatus.FAILED, result.status)
Expand All @@ -133,7 +137,7 @@ class BatchDataLoaderTest {
TestItem(Instant.ofEpochSecond(2), "item2"),
TestItem(Instant.ofEpochSecond(1), "item1")
)
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(EtlStatus.FAILED, result.status)
Expand All @@ -143,7 +147,7 @@ class BatchDataLoaderTest {
fun `load should skip already processed rows`() = runBlocking {
val since = Instant.ofEpochSecond(5)
val items = (1..10).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") }
val loader = TestBatchDataLoader(batchSize = 10)
val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20)
val result = loader.load("test-group", since, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(EtlStatus.SUCCESS, result.status)
Expand All @@ -152,5 +156,25 @@ class BatchDataLoaderTest {
assertEquals(5, loader.loadedBatches[0].size)
assertEquals("item6", loader.loadedBatches[0][0].data)
}

@Test
fun `load should trigger vacuum after vacuumAfterRows`() = runBlocking {
val items = (1..45).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") }
val vacuumAfterRows = 20

val loader = object : TestBatchDataLoader(batchSize = 10, vacuumAfterRows = vacuumAfterRows) {
val vacuumCalls = mutableListOf<String>()

override suspend fun vacuum(groupId: String) {
vacuumCalls.add(groupId)
}
}

loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { }

assertEquals(2, loader.vacuumCalls.size)
assertEquals(listOf("test-group", "test-group"), loader.vacuumCalls)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.epam.drill.admin.metrics.config

import io.ktor.server.config.ApplicationConfig
import io.ktor.server.config.*

/**
* Configuration parameters for ETL processing.
Expand All @@ -38,4 +38,11 @@ class EtlConfig(private val config: ApplicationConfig) {
*/
val batchSize : Int
get() = config.propertyOrNull("batchSize")?.getString()?.toIntOrNull() ?: 1000

/**
* Controls intervals (in a number of written rows) for loader to run vacuum on a table.
*/
val vacuumAfterRows: Int
get() = config.propertyOrNull("vacuumAfterRows")?.getString()?.toIntOrNull() ?: 100_000

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ val EtlConfig.buildMethodTestDefinitionCoverageLoader
name = "build_method_test_definition_coverage",
sqlUpsert = fromResource("/metrics/db/etl/build_method_test_definition_coverage_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/build_method_test_definition_coverage_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/build_method_test_definition_coverage_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize,
Expand All @@ -47,6 +50,9 @@ val EtlConfig.buildMethodTestSessionCoverageLoader
name = "build_method_test_session_coverage",
sqlUpsert = fromResource("/metrics/db/etl/build_method_test_session_coverage_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/build_method_test_session_coverage_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/build_method_test_session_coverage_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize,
Expand All @@ -58,6 +64,9 @@ val EtlConfig.buildMethodCoverageLoader
name = "build_method_coverage",
sqlUpsert = fromResource("/metrics/db/etl/build_method_coverage_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/build_method_coverage_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/build_method_coverage_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize
Expand All @@ -68,6 +77,9 @@ val EtlConfig.methodCoverageLoader
name = "method_daily_coverage",
sqlUpsert = fromResource("/metrics/db/etl/method_daily_coverage_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/method_daily_coverage_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/method_daily_coverage_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize
Expand All @@ -78,6 +90,9 @@ val EtlConfig.test2CodeMappingLoader
name = "test_to_code_mapping",
sqlUpsert = fromResource("/metrics/db/etl/test_to_code_mapping_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/test_to_code_mapping_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/test_to_code_mapping_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ val EtlConfig.testSessionBuildsLoader
name = "test_session_builds",
sqlUpsert = fromResource("/metrics/db/etl/test_session_builds_loader.sql"),
sqlDelete = fromResource("/metrics/db/etl/test_session_builds_delete.sql"),
sqlVacuum = fromResource("/metrics/db/etl/test_session_builds_vacuum.sql"),
vacuumEnabled = true,
vacuumAfterRows = vacuumAfterRows,
lastExtractedAtColumnName = "created_at",
database = MetricsDatabaseConfig.database,
batchSize = batchSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.build_method_coverage;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.build_method_test_definition_coverage;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.build_method_test_session_coverage;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.method_daily_coverage;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.test_session_builds;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
VACUUM ANALYZE metrics.test_to_code_mapping;
Loading