diff --git a/.agent/skills/git-commit-formatter/SKILL.md b/.agent/skills/git-commit-formatter/SKILL.md new file mode 100644 index 000000000..bc2479027 --- /dev/null +++ b/.agent/skills/git-commit-formatter/SKILL.md @@ -0,0 +1,30 @@ +--- +name: git-commit-formatter +description: Formats git commit messages according to Conventional Commits specification. Use this when the user asks to commit changes or write a commit message. +--- + +# Git Commit Formatter Skill + +When writing a git commit message, you MUST follow the Conventional Commits specification. + +## Format +`[optional scope]: ` + +## Allowed Types +- **feat**: A new feature +- **fix**: A bug fix +- **docs**: Documentation only changes +- **style**: Changes that do not affect the meaning of the code (white-space, formatting, etc) +- **refactor**: A code change that neither fixes a bug nor adds a feature +- **perf**: A code change that improves performance +- **test**: Adding missing tests or correcting existing tests +- **chore**: Changes to the build process or auxiliary tools and libraries such as documentation generation + +## Instructions +1. Analyze the changes to determine the primary `type`. +2. Identify the `scope` if applicable (e.g., specific component or file). +3. Write a concise `description` in imperative mood (e.g., "add feature" not "added feature"). +4. If there are breaking changes, add a footer starting with `BREAKING CHANGE:`. + +## Example +`feat(auth): implement login with google` \ No newline at end of file diff --git a/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-prd.md b/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-prd.md new file mode 100644 index 000000000..b103b50a2 --- /dev/null +++ b/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-prd.md @@ -0,0 +1,104 @@ +# Chain Sync Initial-Load Performance Improvements + +## Overview +Improve full initial sync performance for newm-chain/newm-chain-db without changing external behavior or weakening transactional guarantees. Focus on reducing write amplification, eliminating N+1 query patterns, and shortening transaction duration while keeping the system runnable and correct after every incremental change. + +## Requirements +- [x] Initial sync throughput improves measurably without regressions in correctness. +- [x] No changes to external APIs or observable ledger semantics. +- [x] Transactional guarantees remain intact (no partial or inconsistent state). +- [x] Each incremental change is safe, isolated, and testable. +- [x] Rollback handling remains correct and reliable. +- [x] Sync logic remains stable and recoverable after crashes/restarts. + +## Technical Design + +### Components Affected +- Module: newm-chain (BlockDaemon) + - Guard rollback logic to run only on actual rollbacks. + - Tune block batching behavior for initial sync. + - Optional config-gated fast sync for non-critical metadata/logs. +- Module: newm-chain-db (ChainRepositoryImpl) + - Batch insert chain blocks. + - Compute etaV in-memory for batches to reduce DB reads. +- Module: newm-chain-db (LedgerRepositoryImpl) + - Replace N+1 per-UTXO/asset queries with batch operations. + - Batch spend updates. + - Reduce repeated metadata/log reads within a block. + +### Database Changes +No schema changes required. Existing indexes already cover key lookups: +- chain: unique indexes on slot_number and block_number. +- ledger_utxos: indexes on block_spent, block_created, (tx_id, tx_ix), ledger_id, transaction_spent. +- ledger_assets: (policy, name). +- ledger_utxo_assets: ledger_utxo_id, ledger_asset_id. +- logs: address_tx_log(address, tx_id), block_number indexes; native_asset_log(block_number). +- stake: stake_delegations and stake_registrations indexes. + +### API Changes +None. + +## Implementation Steps +1. Make rollbacks explicit and rare + - Only perform rollback deletes when a RollBackward actually occurs. A RollBackward is always sent when the connection to Kogmios is made, so normal forward syncs should not trigger rollback logic. + - Keep rollback semantics intact; avoid delete/rollback on normal forward batches. + +2. Batch chain inserts and etaV computation + - Implement batch insert for chain blocks in ChainRepositoryImpl.insertAll. + - Precompute etaV sequentially in memory per batch using the last known etaV. + +3. Batch ledger UTXO creation + - Preload ledger ids for all addresses in a block. + - Batch insert missing ledger rows. + - Batch insert ledger_utxos. + - Resolve asset ids in a single query and batch insert missing assets. + - Batch insert ledger_utxo_assets rows. + +4. Batch spend updates + - Replace per-UTXO updates with a batch update keyed by (tx_id, tx_ix). + +5. Optimize metadata/log reads within a block + - Cache asset and metadata lookups per block to avoid repeated DB reads. + +6. Stabilize batch sizing for initial sync + - Cap blockBufferSize and use deterministic batch sizes for catch-up. + +7. Add performance instrumentation + - Time per repository method and per batch. + - Capture query counts where possible. + +## Testing Strategy +- Replay a representative block range and verify: + - Chain table continuity and etaV correctness. + - Ledger UTXO counts and balances vs. baseline. + - Stake registrations/delegations match baseline. + - Native asset metadata/log outputs unchanged (or match when fast sync is off). +- Integration test rollback scenario to ensure rollback correctness. +- Measure sync throughput before and after each change. +- Verify idempotency and crash recovery across mid-sync restarts. + +## Rollout Plan +- Implement changes incrementally in separate PRs or commits. +- Validate each step with a small sync range and a rollback test. +- Only enable optional fast sync mode after validation and with a backfill plan. + +## Open Questions +- What is the acceptable tradeoff between throughput and metadata/log latency during initial sync? + +--- + +Status: ✅ Completed +Date: 2026-01-30 +Author: OpenCode + +## Progress +- [x] Task-001: Rollback only on RollBackward (gate rollback deletes to rollback events and clear forward buffer on rollback). +- [x] Task-002: Cap batch size during catch-up (cap block buffer growth for non-tip batches). +- [x] Task-003: Batch insert chain blocks (batch chain inserts and compute etaV in-memory per batch). +- [x] Task-004: Batch ledger lookup/insert (preload ledger ids by address and batch insert missing ledger rows). +- [x] Task-005: Batch insert ledger_utxos (insert created UTXOs in batch and return ids for asset join). +- [x] Task-006: Batch resolve and insert ledger_assets (resolve asset ids in one query and batch insert missing assets). +- [x] Task-007: Batch insert ledger_utxo_assets (insert asset-to-utxo rows in a batch). +- [x] Task-008: Batch spend updates (batch update spent UTXOs by block). +- [x] Task-009: Per-block metadata lookup cache (cache ledger asset/metadata lookups within a block to reduce repeated reads). +- [~] Step-007: Performance instrumentation (deferred). diff --git a/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-tasks.json b/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-tasks.json new file mode 100644 index 000000000..843ee8cca --- /dev/null +++ b/.agent/task/newm-chain-db-improvement/newm-chain-db-performance-tasks.json @@ -0,0 +1,228 @@ +{ + "metadata": { + "title": "NEWM Chain DB Initial Sync Performance Tasks", + "created": "2026-01-30", + "updated": "2026-01-30", + "version": "1.0.0", + "totalTasks": 9, + "prdReference": ".agent/task/newm-chain-db-improvement/newm-chain-db-performance-prd.md", + "description": "Incremental, safe performance improvements for initial chain sync without changing external behavior or transactional guarantees", + "importantNote": "Each task must keep the system runnable and preserve existing rollback and transactional semantics. Validate after every change." + }, + "phases": [ + { + "id": "phase-1", + "name": "Rollback & Batch Safety", + "description": "Remove unnecessary rollback work during forward sync and establish safe batch boundaries", + "riskLevel": "medium", + "tasks": [ + { + "id": "task-001", + "title": "Rollback only on RollBackward", + "description": "Stop running rollback deletes on every forward batch; perform rollbacks only when RollBackward is received", + "status": "completed", + "priority": "high", + "estimatedHours": 2, + "dependencies": [], + "targetPath": "newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt", + "acceptanceCriteria": [ + "Forward-only sync no longer triggers rollback deletes", + "Rollback events still cleanly revert chain/ledger/log tables" + ], + "testPlan": [ + "Sync forward over N blocks and verify no rollback deletes", + "Simulate rollback and verify state consistency" + ] + }, + { + "id": "task-002", + "title": "Cap batch size during catch-up", + "description": "Introduce a safe max blockBufferSize for initial sync while preserving tip syncing behavior", + "status": "completed", + "priority": "medium", + "estimatedHours": 1, + "dependencies": ["task-001"], + "targetPath": "newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt", + "acceptanceCriteria": [ + "blockBufferSize does not grow unbounded during catch-up", + "Tip sync latency unchanged" + ], + "testPlan": [ + "Observe batch size during long catch-up", + "Verify tip syncing remains responsive" + ] + } + ] + }, + { + "id": "phase-2", + "name": "Chain Inserts", + "description": "Reduce per-block DB work for chain table writes", + "riskLevel": "medium", + "tasks": [ + { + "id": "task-003", + "title": "Batch insert chain blocks", + "description": "Replace per-block inserts with batchInsert and compute etaV in-memory per batch", + "status": "completed", + "priority": "high", + "estimatedHours": 3, + "dependencies": ["task-001"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt", + "acceptanceCriteria": [ + "Chain rows match baseline for same range", + "etaV continuity preserved across batch boundaries" + ], + "testPlan": [ + "Compare chain table rows vs baseline", + "Validate etaV sequence for a multi-batch range" + ] + } + ] + }, + { + "id": "phase-3", + "name": "Ledger UTXO Writes", + "description": "Eliminate N+1 queries for UTXO creation and asset joins", + "riskLevel": "high", + "tasks": [ + { + "id": "task-004", + "title": "Batch ledger lookup/insert", + "description": "Preload ledger ids by address and batch insert missing ledger rows", + "status": "completed", + "priority": "high", + "estimatedHours": 3, + "dependencies": ["task-003"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt", + "acceptanceCriteria": [ + "Ledger rows match baseline", + "No duplicate ledger rows created" + ], + "testPlan": [ + "Compare ledger table counts and distinct addresses", + "Verify existing addresses are reused" + ] + }, + { + "id": "task-005", + "title": "Batch insert ledger_utxos", + "description": "Insert created UTXOs in batch and return ids for asset join", + "status": "completed", + "priority": "high", + "estimatedHours": 3, + "dependencies": ["task-004"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt", + "acceptanceCriteria": [ + "UTXO rows match baseline", + "UTXO fields (datum, scripts, creds) preserved" + ], + "testPlan": [ + "Compare UTXO counts and sample rows vs baseline", + "Verify datum/script fields for script outputs" + ] + }, + { + "id": "task-006", + "title": "Batch resolve and insert ledger_assets", + "description": "Resolve asset ids in one query and batch insert missing assets", + "status": "completed", + "priority": "high", + "estimatedHours": 2, + "dependencies": ["task-005"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt", + "acceptanceCriteria": [ + "Ledger assets match baseline", + "No duplicate assets for same policy/name" + ], + "testPlan": [ + "Compare ledger_assets counts and sample rows", + "Verify policy/name uniqueness" + ] + }, + { + "id": "task-007", + "title": "Batch insert ledger_utxo_assets", + "description": "Insert all asset-to-utxo rows in batch", + "status": "completed", + "priority": "high", + "estimatedHours": 2, + "dependencies": ["task-006"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt", + "acceptanceCriteria": [ + "UTXO asset links match baseline", + "Amounts preserved" + ], + "testPlan": [ + "Compare asset balances per policy/name vs baseline", + "Verify UTXO asset counts for sample transactions" + ] + } + ] + }, + { + "id": "phase-4", + "name": "Spend & Metadata Optimization", + "description": "Reduce per-input updates and repeated metadata reads", + "riskLevel": "medium", + "tasks": [ + { + "id": "task-008", + "title": "Batch spend updates", + "description": "Replace per-spent-UTXO updates with a batch update", + "status": "completed", + "priority": "medium", + "estimatedHours": 2, + "dependencies": ["task-005"], + "targetPath": "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt", + "acceptanceCriteria": [ + "Spent UTXOs match baseline", + "No unspent UTXOs incorrectly marked" + ], + "testPlan": [ + "Compare spent UTXO counts after block range", + "Verify random sample of unspent UTXOs remain unspent" + ] + }, + { + "id": "task-009", + "title": "Per-block metadata lookup cache", + "description": "Cache ledger asset and metadata queries within a block to reduce repeated reads", + "status": "completed", + "priority": "medium", + "estimatedHours": 2, + "dependencies": ["task-007"], + "targetPath": "newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt", + "acceptanceCriteria": [ + "Native asset log output unchanged", + "Fewer metadata DB queries per block" + ], + "testPlan": [ + "Compare log outputs for sample blocks", + "Measure query count per block" + ] + } + ] + } + ], + "summary": { + "totalPhases": 4, + "totalTasks": 9, + "estimatedTotalHours": 20, + "criticalPath": ["task-001", "task-003", "task-004", "task-005", "task-006", "task-007"], + "keyFiles": [ + "newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt", + "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt", + "newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt" + ], + "deferred": [ + "step-007: performance instrumentation" + ], + "safetyNotes": [ + "No API changes", + "Keep rollback semantics identical", + "Each change must keep sync runnable and consistent", + "Validate after every incremental change" + ] + } +} diff --git a/.opencode b/.opencode new file mode 120000 index 000000000..9b617313a --- /dev/null +++ b/.opencode @@ -0,0 +1 @@ +.agent \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 507492d5a..e43eb771f 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -25,7 +25,7 @@ object Versions { const val J_AUDIO_TAGGER = "3.0.1" const val KOIN = "4.1.0-Beta8" const val KOIN_TEST = "4.1.1" - const val KOGMIOS = "2.6.1" + const val KOGMIOS = "2.7.1" const val KOTLINX_SERIALIZATION = "1.9.0" const val KOTLIN_LOGGING = "7.0.13" const val KOTLIN_PLUGIN = "2.3.0" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 23449a2b5..37f78a6af 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.3.1-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt b/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt index e2f696f63..5db1478b1 100644 --- a/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt +++ b/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/ChainRepositoryImpl.kt @@ -127,44 +127,56 @@ class ChainRepositoryImpl : ChainRepository { } override fun insertAll(blocks: List) { + if (blocks.isEmpty()) { + return + } + transaction { - blocks.forEach { block -> - ChainTable.deleteWhere { blockNumber greaterEq block.blockNumber } - val prevEtaV: String = - ChainTable - .select( - ChainTable.etaV - ).where { ChainTable.slotNumber eqSubQuery ChainTable.select(ChainTable.slotNumber.max()) } - .singleOrNull() - ?.get(ChainTable.etaV) - ?: Config.shelleyGenesisHash + val firstBlockNumber = blocks.minOf { it.blockNumber } + ChainTable.deleteWhere { blockNumber greaterEq firstBlockNumber } + var prevEtaV: String = ChainTable - .insertAndGetId { row -> - row[blockNumber] = block.blockNumber - row[slotNumber] = block.slotNumber - row[hash] = block.hash - row[prevHash] = block.prevHash - row[poolId] = nodeVKeyToPoolId(block.nodeVkey) - row[etaV] = calculateEtaV(prevEtaV, block.etaVrf0) - row[nodeVkey] = block.nodeVkey - row[nodeVrfVkey] = block.nodeVrfVkey - row[blockVrf0] = block.blockVrf - row[blockVrf1] = block.blockVrfProof - row[etaVrf0] = block.etaVrf0 - row[etaVrf1] = block.etaVrf1 - row[leaderVrf0] = block.leaderVrf0 - row[leaderVrf1] = block.leaderVrf1 - row[blockSize] = block.blockSize - row[blockBodyHash] = block.blockBodyHash - row[poolOpcert] = block.poolOpcert - row[sequenceNumber] = block.sequenceNumber - row[kesPeriod] = block.kesPeriod - row[sigmaSignature] = block.sigmaSignature - row[protocolMajorVersion] = block.protocolMajorVersion - row[protocolMinorVersion] = block.protocolMinorVersion - }.value + .select( + ChainTable.etaV + ).where { ChainTable.slotNumber eqSubQuery ChainTable.select(ChainTable.slotNumber.max()) } + .singleOrNull() + ?.get(ChainTable.etaV) + ?: Config.shelleyGenesisHash + val blocksWithEtaV = blocks.map { block -> + val newEtaV = calculateEtaV(prevEtaV, block.etaVrf0) + prevEtaV = newEtaV + prevEtaVCache.put(block.blockNumber, newEtaV) + block to newEtaV + } + + ChainTable.batchInsert(blocksWithEtaV, shouldReturnGeneratedValues = false) { (block, etaV) -> + this[ChainTable.blockNumber] = block.blockNumber + this[ChainTable.slotNumber] = block.slotNumber + this[ChainTable.hash] = block.hash + this[ChainTable.prevHash] = block.prevHash + this[ChainTable.poolId] = nodeVKeyToPoolId(block.nodeVkey) + this[ChainTable.etaV] = etaV + this[ChainTable.nodeVkey] = block.nodeVkey + this[ChainTable.nodeVrfVkey] = block.nodeVrfVkey + this[ChainTable.blockVrf0] = block.blockVrf + this[ChainTable.blockVrf1] = block.blockVrfProof + this[ChainTable.etaVrf0] = block.etaVrf0 + this[ChainTable.etaVrf1] = block.etaVrf1 + this[ChainTable.leaderVrf0] = block.leaderVrf0 + this[ChainTable.leaderVrf1] = block.leaderVrf1 + this[ChainTable.blockSize] = block.blockSize + this[ChainTable.blockBodyHash] = block.blockBodyHash + this[ChainTable.poolOpcert] = block.poolOpcert + this[ChainTable.sequenceNumber] = block.sequenceNumber + this[ChainTable.kesPeriod] = block.kesPeriod + this[ChainTable.sigmaSignature] = block.sigmaSignature + this[ChainTable.protocolMajorVersion] = block.protocolMajorVersion + this[ChainTable.protocolMinorVersion] = block.protocolMinorVersion + } + + blocks.forEach { block -> if (block.stakeDestAddresses.isNotEmpty()) { // Ignore errors as we want to just keep the existing record as-is because it's older PaymentStakeAddressTable.batchInsert( diff --git a/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt b/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt index 325251b6d..9371b370e 100644 --- a/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt +++ b/newm-chain-db/src/main/kotlin/io/newm/chain/database/repository/LedgerRepositoryImpl.kt @@ -73,12 +73,18 @@ import org.jetbrains.exposed.sql.selectAll import org.jetbrains.exposed.sql.sum import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.update +import org.jetbrains.exposed.sql.transactions.TransactionManager import org.slf4j.LoggerFactory class LedgerRepositoryImpl : LedgerRepository { private val log by lazy { LoggerFactory.getLogger("LedgerRepository") } private val utxoMutex = Mutex() + private data class LedgerAssetKey( + val policy: String, + val name: String + ) + /** * Set of the Utxos that are "spent", but not yet in a block. These should be removed once observed to be * used up in a block. @@ -650,6 +656,12 @@ class LedgerRepositoryImpl : LedgerRepository { // Wait until 10 blocks have passed to make sure the blocks are immutable before removing them from live utxo map private val blockQueue = LinkedHashMap>(11) + private val missingAfterTxIdLogCache = + Caffeine + .newBuilder() + .expireAfterWrite(Duration.ofMinutes(1)) + .build() + private suspend fun processLiveUtxoFromBlock( blockNumber: Long, createdUtxos: Set @@ -881,16 +893,44 @@ class LedgerRepositoryImpl : LedgerRepository { blockNumber: Long, spentUtxos: Set ) { - var count = 0 - spentUtxos.forEach { spentUtxo -> - count += - LedgerUtxosTable.update({ - (LedgerUtxosTable.txId eq spentUtxo.hash) and - (LedgerUtxosTable.txIx eq spentUtxo.ix.toInt()) - }) { row -> - row[blockSpent] = blockNumber - row[transactionSpent] = spentUtxo.transactionSpent + if (spentUtxos.isEmpty()) { + runBlocking { + processSpentUtxoFromBlock(spentUtxos) + } + return + } + val spentUtxoList = spentUtxos.toList() + spentUtxoList.chunked(500).forEach { batch -> + val sql = StringBuilder() + sql.append("UPDATE ledger_utxos SET block_spent = ?, transaction_spent = CASE ") + batch.forEach { _ -> + sql.append("WHEN tx_id = ? AND tx_ix = ? THEN ? ") + } + sql.append("ELSE transaction_spent END WHERE ") + batch.forEachIndexed { index, _ -> + if (index > 0) { + sql.append(" OR ") + } + sql.append("(tx_id = ? AND tx_ix = ?)") + } + val connection = TransactionManager.current().connection + val statement = connection.prepareStatement(sql.toString(), false) + try { + var paramIndex = 1 + statement.set(paramIndex++, blockNumber) + batch.forEach { spentUtxo -> + statement.set(paramIndex++, spentUtxo.hash) + statement.set(paramIndex++, spentUtxo.ix.toInt()) + statement.set(paramIndex++, spentUtxo.transactionSpent) + } + batch.forEach { spentUtxo -> + statement.set(paramIndex++, spentUtxo.hash) + statement.set(paramIndex++, spentUtxo.ix.toInt()) } + statement.executeUpdate() + } finally { + statement.closeIfPossible() + } } runBlocking { processSpentUtxoFromBlock(spentUtxos) @@ -901,60 +941,146 @@ class LedgerRepositoryImpl : LedgerRepository { blockNumber: Long, createdUtxos: Set ) { + if (createdUtxos.isEmpty()) { + return + } + val createdUtxoByAddress = LinkedHashMap() createdUtxos.forEach { createdUtxo -> - val ledgerTableId = - LedgerTable - .select(LedgerTable.id) - .where { - LedgerTable.address eq createdUtxo.address - }.limit(1) - .firstOrNull() - ?.let { row -> - row[LedgerTable.id].value - } ?: LedgerTable - .insertAndGetId { row -> - row[address] = createdUtxo.address - row[stakeAddress] = createdUtxo.stakeAddress - row[addressType] = createdUtxo.addressType - }.value - - val ledgerUtxoTableId = - LedgerUtxosTable - .insertAndGetId { row -> - row[ledgerId] = ledgerTableId - row[txId] = createdUtxo.hash - row[txIx] = createdUtxo.ix.toInt() - row[datumHash] = createdUtxo.datumHash ?: createdUtxo.datum - ?.hexToByteArray() - ?.let { Blake2b.hash256(it).toHexString() } - row[datum] = createdUtxo.datum - row[isInlineDatum] = createdUtxo.isInlineDatum - row[scriptRef] = createdUtxo.scriptRef - row[scriptRefVersion] = createdUtxo.scriptRefVersion - row[lovelace] = createdUtxo.lovelace.toString() - row[blockCreated] = blockNumber - row[blockSpent] = null - row[transactionSpent] = null - row[cbor] = createdUtxo.cbor - row[paymentCred] = createdUtxo.paymentCred - row[stakeCred] = createdUtxo.stakeCred - }.value + createdUtxoByAddress.putIfAbsent(createdUtxo.address, createdUtxo) + } + val addresses = createdUtxoByAddress.keys.toList() + val ledgerIdsByAddress = + LedgerTable + .select(LedgerTable.id, LedgerTable.address) + .where { LedgerTable.address inList addresses } + .associate { row -> + row[LedgerTable.address] to row[LedgerTable.id].value + }.toMutableMap() + + val missingAddresses = addresses.filterNot { ledgerIdsByAddress.containsKey(it) } + if (missingAddresses.isNotEmpty()) { + LedgerTable.batchInsert( + data = missingAddresses, + ignore = true, + shouldReturnGeneratedValues = false, + ) { missingAddress -> + val createdUtxo = createdUtxoByAddress.getValue(missingAddress) + this[LedgerTable.address] = missingAddress + this[LedgerTable.stakeAddress] = createdUtxo.stakeAddress + this[LedgerTable.addressType] = createdUtxo.addressType + } + + LedgerTable + .select(LedgerTable.id, LedgerTable.address) + .where { LedgerTable.address inList missingAddresses } + .forEach { row -> + ledgerIdsByAddress[row[LedgerTable.address]] = row[LedgerTable.id].value + } + } + val createdUtxoList = createdUtxos.toList() + val ledgerAssetIdsByKey = + createdUtxoList + .flatMap { createdUtxo -> + createdUtxo.nativeAssets.map { nativeAsset -> + LedgerAssetKey(nativeAsset.policy, nativeAsset.name) + } + }.toSet() + .let { assetKeys -> + if (assetKeys.isEmpty()) { + mutableMapOf() + } else { + val assetMatchExpression = + assetKeys + .map { assetKey -> + (LedgerAssetsTable.policy eq assetKey.policy) and + (LedgerAssetsTable.name eq assetKey.name) + }.reduce { acc, expression -> acc or expression } + val existingAssets = + LedgerAssetsTable + .select(LedgerAssetsTable.id, LedgerAssetsTable.policy, LedgerAssetsTable.name) + .where { assetMatchExpression } + .associate { row -> + LedgerAssetKey(row[LedgerAssetsTable.policy], row[LedgerAssetsTable.name]) to + row[LedgerAssetsTable.id].value + }.toMutableMap() + val missingAssets = assetKeys.filterNot { assetKey -> existingAssets.containsKey(assetKey) } + if (missingAssets.isNotEmpty()) { + LedgerAssetsTable.batchInsert( + data = missingAssets, + ignore = true, + shouldReturnGeneratedValues = false, + ) { assetKey -> + this[LedgerAssetsTable.policy] = assetKey.policy + this[LedgerAssetsTable.name] = assetKey.name + this[LedgerAssetsTable.supply] = "0" + } + + val missingMatchExpression = + missingAssets + .map { assetKey -> + (LedgerAssetsTable.policy eq assetKey.policy) and + (LedgerAssetsTable.name eq assetKey.name) + }.reduce { acc, expression -> acc or expression } + LedgerAssetsTable + .select(LedgerAssetsTable.id, LedgerAssetsTable.policy, LedgerAssetsTable.name) + .where { missingMatchExpression } + .forEach { row -> + existingAssets[ + LedgerAssetKey(row[LedgerAssetsTable.policy], row[LedgerAssetsTable.name]) + ] = row[LedgerAssetsTable.id].value + } + } + existingAssets + } + } + val insertedRows = + LedgerUtxosTable.batchInsert( + data = createdUtxoList, + shouldReturnGeneratedValues = true, + ) { createdUtxo -> + val ledgerTableId = ledgerIdsByAddress.getValue(createdUtxo.address) + this[LedgerUtxosTable.ledgerId] = ledgerTableId + this[LedgerUtxosTable.txId] = createdUtxo.hash + this[LedgerUtxosTable.txIx] = createdUtxo.ix.toInt() + this[LedgerUtxosTable.datumHash] = createdUtxo.datumHash ?: createdUtxo.datum + ?.hexToByteArray() + ?.let { Blake2b.hash256(it).toHexString() } + this[LedgerUtxosTable.datum] = createdUtxo.datum + this[LedgerUtxosTable.isInlineDatum] = createdUtxo.isInlineDatum + this[LedgerUtxosTable.scriptRef] = createdUtxo.scriptRef + this[LedgerUtxosTable.scriptRefVersion] = createdUtxo.scriptRefVersion + this[LedgerUtxosTable.lovelace] = createdUtxo.lovelace.toString() + this[LedgerUtxosTable.blockCreated] = blockNumber + this[LedgerUtxosTable.blockSpent] = null + this[LedgerUtxosTable.transactionSpent] = null + this[LedgerUtxosTable.cbor] = createdUtxo.cbor + this[LedgerUtxosTable.paymentCred] = createdUtxo.paymentCred + this[LedgerUtxosTable.stakeCred] = createdUtxo.stakeCred + } + val ledgerUtxoAssetRows = mutableListOf>() + createdUtxoList.forEachIndexed { index, createdUtxo -> + val ledgerUtxoTableId = insertedRows[index][LedgerUtxosTable.id].value createdUtxo.nativeAssets.forEach { nativeAsset -> val ledgerAssetTableId = - LedgerAssetsTable - .selectAll() - .where { - (LedgerAssetsTable.policy eq nativeAsset.policy) and (LedgerAssetsTable.name eq nativeAsset.name) - }.limit(1) - .first()[LedgerAssetsTable.id] - .value - - LedgerUtxoAssetsTable.insert { row -> - row[ledgerUtxoId] = ledgerUtxoTableId - row[ledgerAssetId] = ledgerAssetTableId - row[amount] = nativeAsset.amount.toString() - } + ledgerAssetIdsByKey.getValue(LedgerAssetKey(nativeAsset.policy, nativeAsset.name)) + ledgerUtxoAssetRows.add( + Triple( + ledgerUtxoTableId, + ledgerAssetTableId, + nativeAsset.amount.toString() + ) + ) + } + } + if (ledgerUtxoAssetRows.isNotEmpty()) { + LedgerUtxoAssetsTable.batchInsert( + data = ledgerUtxoAssetRows, + shouldReturnGeneratedValues = false, + ) { (ledgerUtxoTableId, ledgerAssetTableId, amountValue) -> + this[LedgerUtxoAssetsTable.ledgerUtxoId] = ledgerUtxoTableId + this[LedgerUtxoAssetsTable.ledgerAssetId] = ledgerAssetTableId + this[LedgerUtxoAssetsTable.amount] = amountValue } } runBlocking { @@ -1234,7 +1360,12 @@ class LedgerRepositoryImpl : LedgerRepository { } ?: -1L if (afterId == -1L && afterTxId != null) { - log.warn("Unable to find txId: $afterTxId for address: $address. afterId: $afterId, limit: $limit, offset: $offset") + if (missingAfterTxIdLogCache.getIfPresent(afterTxId) == null) { + missingAfterTxIdLogCache.put(afterTxId, true) + log.warn( + "Unable to find txId: $afterTxId for address: $address. afterId: $afterId, limit: $limit, offset: $offset" + ) + } } val maxBlockNumberExpression = ChainTable.blockNumber.max() diff --git a/newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt b/newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt index 0c920955b..6bca2fb9a 100644 --- a/newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt +++ b/newm-chain/src/main/kotlin/io/newm/chain/daemon/BlockDaemon.kt @@ -10,6 +10,7 @@ import io.newm.chain.cardano.to721Json import io.newm.chain.cardano.toMetadataMap import io.newm.chain.util.config.Config import io.newm.chain.database.entity.LedgerAsset +import io.newm.chain.database.entity.LedgerAssetMetadata import io.newm.chain.database.repository.ChainRepository import io.newm.chain.database.repository.LedgerRepository import io.newm.chain.database.table.AddressTxLogTable @@ -86,6 +87,14 @@ class BlockDaemon( private val secure by lazy { environment.getConfigBoolean("ogmios.secure") } private val syncRawTxns by lazy { environment.getConfigBoolean("newmchain.syncRawTxns") } private val pruneUtxos by lazy { environment.getConfigBoolean("newmchain.pruneUtxos") } + private val maxCatchupBlockBufferSize by lazy { + environment.config + .propertyOrNull("newmchain.maxCatchupBlockBufferSize") + ?.getString() + ?.toIntOrNull() + ?.takeIf { it > 0 } + ?: DEFAULT_MAX_CATCHUP_BLOCK_BUFFER_SIZE + } private val rollForwardFlow = MutableSharedFlow( @@ -106,6 +115,9 @@ class BlockDaemon( private var tipBlockHeight = 0L + @Volatile + private var rollbackRequested = false + /** * Store the blocknumber mapped to a map of transactionIds so we can re-submit to the mempool * in the event of a rollback. @@ -249,6 +261,7 @@ class BlockDaemon( when (response.result) { is RollBackward -> { log.info { "RollBackward: ${(response.result as RollBackward).point}" } + rollbackRequested = true } is RollForward -> { @@ -294,6 +307,9 @@ class BlockDaemon( rollForwardData: RollForward, isTip: Boolean ) { + if (rollbackRequested && blockBuffer.isNotEmpty()) { + blockBuffer.clear() + } blockBuffer.add(rollForwardData) if (blockBuffer.size == blockBufferSize || isTip) { // Create a copy of the list @@ -325,13 +341,16 @@ class BlockDaemon( warnLongQueriesDuration = 1000L val firstBlock = blocksToCommit.first().block as BlockPraos val lastBlock = blocksToCommit.last().block as BlockPraos - rollbackTime += - measureTimeMillis { - chainRepository.rollback(firstBlock.height) - ledgerRepository.doRollback(firstBlock.height) - AddressTxLogTable.deleteWhere { blockNumber greaterEq firstBlock.height } - NativeAssetMonitorLogTable.deleteWhere { blockNumber greaterEq firstBlock.height } - } + if (rollbackRequested) { + rollbackTime += + measureTimeMillis { + chainRepository.rollback(firstBlock.height) + ledgerRepository.doRollback(firstBlock.height) + AddressTxLogTable.deleteWhere { blockNumber greaterEq firstBlock.height } + NativeAssetMonitorLogTable.deleteWhere { blockNumber greaterEq firstBlock.height } + } + rollbackRequested = false + } blocksToCommit.forEach { rollForwardData -> val block = rollForwardData.block as BlockPraos @@ -340,6 +359,7 @@ class BlockDaemon( // } val createdUtxos = block.toCreatedUtxoSet() val ledgerAssets = block.toLedgerAssets() + val ledgerMetadataCache = LedgerMetadataCache(ledgerRepository) // Mark same block number as rolled back rollbackTime += @@ -403,7 +423,8 @@ class BlockDaemon( nativeAssetTime += measureTimeMillis { // Save metadata for CIP-68 reference metadata appearing on createdUtxos datum values - val nativeAssetMetadataList = cip68UtxoOutputsTo721MetadataMap(createdUtxos) + val nativeAssetMetadataList = + cip68UtxoOutputsTo721MetadataMap(createdUtxos, ledgerMetadataCache) nativeAssetMetadataList.forEach { (metadataMap, assetList) -> try { ledgerRepository.insertLedgerAssetMetadataList( @@ -430,7 +451,7 @@ class BlockDaemon( nativeAssetTime += measureTimeMillis { if (ledgerAssets.isNotEmpty()) { - commitNativeAssetLogTransactions(block, ledgerAssets, createdUtxos) + commitNativeAssetLogTransactions(block, ledgerAssets, createdUtxos, ledgerMetadataCache) } } } @@ -472,10 +493,16 @@ class BlockDaemon( } else if (totalTime < COMMIT_BLOCKS_ERROR_LEVEL_MILLIS) { blockBufferSize++ } + if (!isTip && blockBufferSize > maxCatchupBlockBufferSize) { + blockBufferSize = maxCatchupBlockBufferSize + } } } - private fun cip68UtxoOutputsTo721MetadataMap(createdUtxos: Set): List>> = + private fun cip68UtxoOutputsTo721MetadataMap( + createdUtxos: Set, + ledgerMetadataCache: LedgerMetadataCache + ): List>> = createdUtxos .filter { createdUtxo -> createdUtxo.nativeAssets.any { nativeAsset -> @@ -492,7 +519,7 @@ class BlockDaemon( val metadataMap = cip68PlutusData.toMetadataMap(nativeAsset.policy, nativeAsset.name) metadataMap to cip68CreatedUtxo.nativeAssets.map { na -> - ledgerRepository.queryLedgerAsset(na.policy, na.name)!! + ledgerMetadataCache.getLedgerAsset(na.policy, na.name)!! } } } else { @@ -574,9 +601,11 @@ class BlockDaemon( private fun commitNativeAssetLogTransactions( block: BlockPraos, ledgerAssetList: List, - createdUtxos: Set + createdUtxos: Set, + ledgerMetadataCache: LedgerMetadataCache ) { val ledgerAssets = ledgerRepository.queryLedgerAssets(ledgerAssetList) + ledgerAssets.forEach { ledgerMetadataCache.putLedgerAsset(it) } // handle supply changes val batch = mutableListOf() batch.addAll( @@ -600,8 +629,8 @@ class BlockDaemon( val metadataLedgerAsset = if (ledgerAsset.name.matches(CIP68_USER_TOKEN_REGEX)) { val name = "$CIP68_REFERENCE_TOKEN_PREFIX${ledgerAsset.name.substring(8)}" - ledgerRepository - .queryLedgerAsset(ledgerAsset.policy, name) + ledgerMetadataCache + .getLedgerAsset(ledgerAsset.policy, name) ?.copy(txId = ledgerAsset.txId) ?: run { log.warn { "No LedgerAsset REF Token found for: '${ledgerAsset.policy}.${ledgerAsset.name}' -> '${ledgerAsset.policy}.$name' !" @@ -613,7 +642,7 @@ class BlockDaemon( } val ledgerAssetMetadataList = - ledgerRepository.queryLedgerAssetMetadataList(metadataLedgerAsset.id!!) + ledgerMetadataCache.getLedgerAssetMetadataList(metadataLedgerAsset.id!!) val bos = ByteArrayOutputStream() monitorNativeAssetsResponse { policy = ledgerAsset.policy @@ -643,13 +672,13 @@ class BlockDaemon( }.flatten() .flatMap { updatedNativeAsset -> val metadataBatch = mutableListOf() - ledgerRepository - .queryLedgerAsset( + ledgerMetadataCache + .getLedgerAsset( updatedNativeAsset.policy, updatedNativeAsset.name )?.let { nativeAsset -> val ledgerAssetMetadataList = - ledgerRepository.queryLedgerAssetMetadataList(nativeAsset.id!!) + ledgerMetadataCache.getLedgerAssetMetadataList(nativeAsset.id!!) val bos = ByteArrayOutputStream() // Create a response for the cip68 reference token itself monitorNativeAssetsResponse { @@ -673,8 +702,8 @@ class BlockDaemon( prefixes.forEach { prefix -> val name = prefix + updatedNativeAsset.name.substring(8) - ledgerRepository - .queryLedgerAsset(updatedNativeAsset.policy, name) + ledgerMetadataCache + .getLedgerAsset(updatedNativeAsset.policy, name) ?.let { nativeAsset -> val bos1 = ByteArrayOutputStream() // Create a response for any cip68 user token based on the reference token @@ -708,9 +737,34 @@ class BlockDaemon( } } + private class LedgerMetadataCache( + private val ledgerRepository: LedgerRepository + ) { + private val assetCache = mutableMapOf, LedgerAsset?>() + private val metadataCache = mutableMapOf>() + + fun getLedgerAsset( + policyId: String, + hexName: String + ): LedgerAsset? = + assetCache.getOrPut(policyId to hexName) { + ledgerRepository.queryLedgerAsset(policyId, hexName) + } + + fun putLedgerAsset(ledgerAsset: LedgerAsset) { + assetCache[ledgerAsset.policy to ledgerAsset.name] = ledgerAsset + } + + fun getLedgerAssetMetadataList(assetId: Long): List = + metadataCache.getOrPut(assetId) { + ledgerRepository.queryLedgerAssetMetadataList(assetId) + } + } + companion object { private const val COMMIT_BLOCKS_WARN_LEVEL_MILLIS = 1_000L private const val COMMIT_BLOCKS_ERROR_LEVEL_MILLIS = 5_000L + private const val DEFAULT_MAX_CATCHUP_BLOCK_BUFFER_SIZE = 100 private const val CIP68_REFERENCE_TOKEN_PREFIX = "000643b0" private val CIP68_REFERENCE_TOKEN_REGEX = Regex("^000643b0.*$") // (100)TokenName private val CIP68_USER_TOKEN_REGEX = diff --git a/newm-chain/src/main/kotlin/io/newm/chain/daemon/MonitorAddressDaemon.kt b/newm-chain/src/main/kotlin/io/newm/chain/daemon/MonitorAddressDaemon.kt index bac7eebad..da25b2503 100644 --- a/newm-chain/src/main/kotlin/io/newm/chain/daemon/MonitorAddressDaemon.kt +++ b/newm-chain/src/main/kotlin/io/newm/chain/daemon/MonitorAddressDaemon.kt @@ -68,6 +68,14 @@ class MonitorAddressDaemon( private val server by lazy { environment.getConfigString("ogmios.server") } private val port by lazy { environment.getConfigInt("ogmios.port") } private val secure by lazy { environment.getConfigBoolean("ogmios.secure") } + private val maxCatchupBlockBufferSize by lazy { + environment.config + .propertyOrNull("newmchain.maxCatchupBlockBufferSize") + ?.getString() + ?.toIntOrNull() + ?.takeIf { it > 0 } + ?: DEFAULT_MAX_CATCHUP_BLOCK_BUFFER_SIZE + } private val blockDaemon by inject() private val rollForwardFlow = MutableSharedFlow( @@ -292,6 +300,9 @@ class MonitorAddressDaemon( } else if (totalTime < COMMIT_BLOCKS_ERROR_LEVEL_MILLIS) { blockBufferSize++ } + if (!isTip && blockBufferSize > maxCatchupBlockBufferSize) { + blockBufferSize = maxCatchupBlockBufferSize + } } } @@ -531,5 +542,6 @@ class MonitorAddressDaemon( companion object { private const val COMMIT_BLOCKS_WARN_LEVEL_MILLIS = 1_000L private const val COMMIT_BLOCKS_ERROR_LEVEL_MILLIS = 5_000L + private const val DEFAULT_MAX_CATCHUP_BLOCK_BUFFER_SIZE = 100 } } diff --git a/newm-chain/src/main/kotlin/io/newm/chain/database/DatabaseInit.kt b/newm-chain/src/main/kotlin/io/newm/chain/database/DatabaseInit.kt index e064f28d2..201775a3c 100644 --- a/newm-chain/src/main/kotlin/io/newm/chain/database/DatabaseInit.kt +++ b/newm-chain/src/main/kotlin/io/newm/chain/database/DatabaseInit.kt @@ -28,13 +28,13 @@ fun Application.initializeDatabase() { this["dataSource.portNumber"] = environment.getConfigString("database.port") this["dataSource.serverName"] = environment.getConfigString("database.server") this["autoCommit"] = false - this["transactionIsolation"] = "TRANSACTION_REPEATABLE_READ" - this["connectionTimeout"] = 40_000L - this["maximumPoolSize"] = 30 - this["minimumIdle"] = 5 - this["maxLifetime"] = 600_000L // 10 minutes - this["validationTimeout"] = 12_000L - this["idleTimeout"] = 12_000L + this["transactionIsolation"] = "TRANSACTION_READ_COMMITTED" + this["connectionTimeout"] = 10_000L + this["maximumPoolSize"] = 10 + this["minimumIdle"] = 1 + this["maxLifetime"] = 1_500_000L // 25 minutes + this["validationTimeout"] = 5_000L + this["idleTimeout"] = 30_000L this["leakDetectionThreshold"] = 60_000L } )