diff --git a/admin-app/src/main/resources/application.conf b/admin-app/src/main/resources/application.conf index 6f45a4545..4e366bed7 100644 --- a/admin-app/src/main/resources/application.conf +++ b/admin-app/src/main/resources/application.conf @@ -94,5 +94,7 @@ drill { fetchSize = ${?DRILL_ETL_FETCH_SIZE} batchSize = 1000 batchSize = ${?DRILL_ETL_BATCH_SIZE} + vacuumAfterRows = 100000 + vacuumAfterRows = ${?DRILL_ETL_VACUUM_AFTER_ROWS} } } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt index 30cdde1dc..9876caf63 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoader.kt @@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger abstract class BatchDataLoader( override val name: String, - open val batchSize: Int = 1000 + open val batchSize: Int = 1000, + private val vacuumAfterRows: Int, ) : DataLoader { private val logger = KotlinLogging.logger {} @@ -91,10 +92,10 @@ abstract class BatchDataLoader( previousTimestamp = currentTimestamp return@collect } - // If timestamp changed and buffer is full, flush the buffer if (previousTimestamp != null && currentTimestamp != previousTimestamp) { if (buffer.size >= batchSize) { + val before = result.processedRows / vacuumAfterRows result += flushBuffer(groupId, buffer, batchNo) { batch -> if (batch.success) { lastLoadedTimestamp = previousTimestamp ?: throw IllegalStateException("Previous timestamp is null") @@ -109,6 +110,10 @@ abstract class BatchDataLoader( onLoadCompleted(it) } } + val after = result.processedRows / vacuumAfterRows + if (!result.isFailed && (after > before)) { + vacuum(groupId) + } } } @@ -156,6 +161,7 @@ abstract class BatchDataLoader( abstract fun getLastExtractedTimestamp(args: T): Instant? abstract fun isProcessable(args: T): Boolean abstract suspend fun loadBatch(groupId: String, batch: List, batchNo: Int): BatchResult + abstract suspend fun vacuum(groupId: String) private suspend fun flushBuffer( groupId: String, diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt index 5863586c4..ea5233bd1 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/SqlDataLoader.kt @@ -25,18 +25,47 @@ import org.jetbrains.exposed.sql.statements.Statement import org.jetbrains.exposed.sql.statements.StatementType import org.jetbrains.exposed.sql.statements.api.PreparedStatementApi import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction +import java.sql.Connection +import kotlin.system.measureTimeMillis abstract class SqlDataLoader( override val name: String, override val batchSize: Int, open val sqlUpsert: String, open val sqlDelete: String, - open val database: Database -) : BatchDataLoader(name, batchSize) { + private val sqlVacuum: String?, + private val vacuumEnabled: Boolean, + vacuumAfterRows: Int, + open val database: Database, +) : BatchDataLoader(name, batchSize, vacuumAfterRows) { private val logger = KotlinLogging.logger {} abstract fun prepareSql(sql: String): PreparedSql + override suspend fun vacuum(groupId: String) { + if (!vacuumEnabled) return + + if (sqlVacuum.isNullOrBlank()) { + logger.error { "Loader [$name] could not run vacuum - no SQL query provided" } + return + } + + logger.debug { "Loader [$name] running vacuum for [$groupId]" } + val duration = try { + measureTimeMillis { + (database.connector().connection as Connection).use { connection: Connection? -> + connection?.createStatement().use { statement: java.sql.Statement? -> + statement?.execute(sqlVacuum) + } + } + } + } catch (e: Exception) { + logger.error(e) { "Error during vacuum with loader [$name] for group [$groupId]: ${e.message ?: e.javaClass.simpleName}" } + throw e + } + logger.debug { "Loader [$name] ran vacuum for group [$groupId] in ${duration}ms" } + } + override suspend fun loadBatch( groupId: String, batch: List, @@ -100,4 +129,6 @@ abstract class SqlDataLoader( } logger.debug { "Loader [$name] deleted data for groupId $groupId in ${duration}ms" } } + + } \ No newline at end of file diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt index 35251187b..e1ff4e3c0 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/UntypedSqlDataLoader.kt @@ -17,17 +17,20 @@ package com.epam.drill.admin.etl.impl import org.jetbrains.exposed.sql.Database import java.time.Instant -import java.util.Date +import java.util.* class UntypedSqlDataLoader( name: String, sqlUpsert: String, sqlDelete: String, + sqlVacuum: String? = null, + vacuumEnabled: Boolean = false, + vacuumAfterRows: Int = 100_000, database: Database, private val lastExtractedAtColumnName: String, batchSize: Int = 1000, val processable: (Map) -> Boolean = { true } -) : SqlDataLoader>(name, batchSize, sqlUpsert, sqlDelete, database) { +) : SqlDataLoader>(name, batchSize, sqlUpsert, sqlDelete, sqlVacuum, vacuumEnabled, vacuumAfterRows, database) { override fun prepareSql(sql: String): PreparedSql> { return UntypedPreparedSql.prepareSql(sql) diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt index ecd185413..a2bf0b763 100644 --- a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/impl/BatchDataLoaderTest.kt @@ -27,12 +27,13 @@ class BatchDataLoaderTest { private class TestItem(val timestamp: Instant, val data: String, val processable: Boolean = true) - private class TestBatchDataLoader( + private open class TestBatchDataLoader( batchSize: Int, + vacuumAfterRows: Int, private val failOnBatch: Int = -1, private val failOnTimestamp: Boolean = false, private val outOfOrder: Boolean = false - ) : BatchDataLoader("test-loader", batchSize) { + ) : BatchDataLoader("test-loader", batchSize, vacuumAfterRows) { val loadedBatches = mutableListOf>() @@ -52,13 +53,16 @@ class BatchDataLoaderTest { return BatchResult(success = true, rowsLoaded = batch.size.toLong()) } + override suspend fun vacuum(groupId: String) { + } + override suspend fun deleteAll(groupId: String) { } } @Test fun `load should handle empty flow`() = runBlocking { - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf()) { } assertEquals(EtlStatus.SUCCESS, result.status) assertEquals(0, result.processedRows) @@ -68,7 +72,7 @@ class BatchDataLoaderTest { @Test fun `load should process single batch for flow smaller than batch size`() = runBlocking { val items = (1..5).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } assertEquals(EtlStatus.SUCCESS, result.status) @@ -80,7 +84,7 @@ class BatchDataLoaderTest { @Test fun `load should process multiple batches`() = runBlocking { val items = (1..25).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val results = mutableListOf() val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { results.add(it) } @@ -104,7 +108,7 @@ class BatchDataLoaderTest { TestItem(Instant.ofEpochSecond(2), "item2", processable = false), TestItem(Instant.ofEpochSecond(3), "item3") ) - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } assertEquals(EtlStatus.SUCCESS, result.status) @@ -118,7 +122,7 @@ class BatchDataLoaderTest { @Test fun `load should fail on batch processing error`() = runBlocking { val items = (1..15).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } - val loader = TestBatchDataLoader(batchSize = 10, failOnBatch = 2) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20, failOnBatch = 2) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } assertEquals(EtlStatus.FAILED, result.status) @@ -133,7 +137,7 @@ class BatchDataLoaderTest { TestItem(Instant.ofEpochSecond(2), "item2"), TestItem(Instant.ofEpochSecond(1), "item1") ) - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val result = loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } assertEquals(EtlStatus.FAILED, result.status) @@ -143,7 +147,7 @@ class BatchDataLoaderTest { fun `load should skip already processed rows`() = runBlocking { val since = Instant.ofEpochSecond(5) val items = (1..10).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } - val loader = TestBatchDataLoader(batchSize = 10) + val loader = TestBatchDataLoader(batchSize = 10, vacuumAfterRows = 20) val result = loader.load("test-group", since, Instant.now(), flowOf(*items.toTypedArray())) { } assertEquals(EtlStatus.SUCCESS, result.status) @@ -152,5 +156,25 @@ class BatchDataLoaderTest { assertEquals(5, loader.loadedBatches[0].size) assertEquals("item6", loader.loadedBatches[0][0].data) } + + @Test + fun `load should trigger vacuum after vacuumAfterRows`() = runBlocking { + val items = (1..45).map { TestItem(Instant.ofEpochSecond(it.toLong()), "item$it") } + val vacuumAfterRows = 20 + + val loader = object : TestBatchDataLoader(batchSize = 10, vacuumAfterRows = vacuumAfterRows) { + val vacuumCalls = mutableListOf() + + override suspend fun vacuum(groupId: String) { + vacuumCalls.add(groupId) + } + } + + loader.load("test-group", Instant.EPOCH, Instant.now(), flowOf(*items.toTypedArray())) { } + + assertEquals(2, loader.vacuumCalls.size) + assertEquals(listOf("test-group", "test-group"), loader.vacuumCalls) + } + } diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt index 00b126c7e..b51e72015 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt @@ -15,7 +15,7 @@ */ package com.epam.drill.admin.metrics.config -import io.ktor.server.config.ApplicationConfig +import io.ktor.server.config.* /** * Configuration parameters for ETL processing. @@ -38,4 +38,11 @@ class EtlConfig(private val config: ApplicationConfig) { */ val batchSize : Int get() = config.propertyOrNull("batchSize")?.getString()?.toIntOrNull() ?: 1000 + + /** + * Controls intervals (in a number of written rows) for loader to run vacuum on a table. + */ + val vacuumAfterRows: Int + get() = config.propertyOrNull("vacuumAfterRows")?.getString()?.toIntOrNull() ?: 100_000 + } \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt index 45f2015f2..151431ce7 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/CoverageEtl.kt @@ -36,6 +36,9 @@ val EtlConfig.buildMethodTestDefinitionCoverageLoader name = "build_method_test_definition_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_test_definition_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_test_definition_coverage_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/build_method_test_definition_coverage_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, @@ -47,6 +50,9 @@ val EtlConfig.buildMethodTestSessionCoverageLoader name = "build_method_test_session_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_test_session_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_test_session_coverage_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/build_method_test_session_coverage_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, @@ -58,6 +64,9 @@ val EtlConfig.buildMethodCoverageLoader name = "build_method_coverage", sqlUpsert = fromResource("/metrics/db/etl/build_method_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/build_method_coverage_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/build_method_coverage_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize @@ -68,6 +77,9 @@ val EtlConfig.methodCoverageLoader name = "method_daily_coverage", sqlUpsert = fromResource("/metrics/db/etl/method_daily_coverage_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/method_daily_coverage_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/method_daily_coverage_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize @@ -78,6 +90,9 @@ val EtlConfig.test2CodeMappingLoader name = "test_to_code_mapping", sqlUpsert = fromResource("/metrics/db/etl/test_to_code_mapping_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_to_code_mapping_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/test_to_code_mapping_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt index b3fa0707e..c753a360a 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/etl/TestSessionBuildsEtl.kt @@ -35,6 +35,9 @@ val EtlConfig.testSessionBuildsLoader name = "test_session_builds", sqlUpsert = fromResource("/metrics/db/etl/test_session_builds_loader.sql"), sqlDelete = fromResource("/metrics/db/etl/test_session_builds_delete.sql"), + sqlVacuum = fromResource("/metrics/db/etl/test_session_builds_vacuum.sql"), + vacuumEnabled = true, + vacuumAfterRows = vacuumAfterRows, lastExtractedAtColumnName = "created_at", database = MetricsDatabaseConfig.database, batchSize = batchSize, diff --git a/admin-metrics/src/main/resources/metrics/db/etl/build_method_coverage_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/build_method_coverage_vacuum.sql new file mode 100644 index 000000000..55df1600c --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/build_method_coverage_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.build_method_coverage; \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_definition_coverage_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_definition_coverage_vacuum.sql new file mode 100644 index 000000000..60d89bdce --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_definition_coverage_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.build_method_test_definition_coverage; \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_session_coverage_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_session_coverage_vacuum.sql new file mode 100644 index 000000000..5b0acc5fc --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/build_method_test_session_coverage_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.build_method_test_session_coverage; \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/method_daily_coverage_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/method_daily_coverage_vacuum.sql new file mode 100644 index 000000000..784448603 --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/method_daily_coverage_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.method_daily_coverage; \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_vacuum.sql new file mode 100644 index 000000000..f24add53f --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_session_builds_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.test_session_builds; \ No newline at end of file diff --git a/admin-metrics/src/main/resources/metrics/db/etl/test_to_code_mapping_vacuum.sql b/admin-metrics/src/main/resources/metrics/db/etl/test_to_code_mapping_vacuum.sql new file mode 100644 index 000000000..99348b42c --- /dev/null +++ b/admin-metrics/src/main/resources/metrics/db/etl/test_to_code_mapping_vacuum.sql @@ -0,0 +1 @@ +VACUUM ANALYZE metrics.test_to_code_mapping; \ No newline at end of file