From c8c37bc7e6df260c4600495787205aee4d1ab747 Mon Sep 17 00:00:00 2001 From: Bohdan Vanieiev Date: Tue, 15 Dec 2020 21:30:56 +0200 Subject: [PATCH 1/2] TxManager --- .../main/java/org/veriblock/spv/SpvContext.kt | 5 +- .../spv/model/PopTransactionLight.kt | 2 +- .../spv/model/StandardTransaction.kt | 4 +- .../org/veriblock/spv/model/Transaction.kt | 2 +- .../java/org/veriblock/spv/net/P2PService.kt | 3 +- .../org/veriblock/spv/net/SpvPeerTable.kt | 18 +- .../java/org/veriblock/spv/service/Model.kt | 4 +- .../org/veriblock/spv/service/SpvService.kt | 15 +- .../spv/service/TransactionService.kt | 2 +- .../org/veriblock/spv/service/tx/TxManager.kt | 281 ++++++++++++++++++ .../java/org/veriblock/spv/wallet/Ledger.kt | 4 +- .../admin/service/impl/AdminApiServiceTest.kt | 3 +- .../veriblock/spv/net/impl/P2PServiceTest.kt | 3 +- .../transactionmonitor/TransactionMonitor.kt | 2 +- 14 files changed, 324 insertions(+), 24 deletions(-) create mode 100644 nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt b/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt index 73c757f4b..50e3218be 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt @@ -21,6 +21,7 @@ import org.veriblock.spv.model.LedgerValue import org.veriblock.spv.model.TransactionPool import org.veriblock.spv.net.* import org.veriblock.spv.service.* +import org.veriblock.spv.service.tx.TxManager import org.veriblock.spv.util.AddressStateChangeEvent import org.veriblock.spv.util.SpvEventBus.addressStateUpdatedEvent import org.veriblock.spv.wallet.PendingTransactionDownloadedListener @@ -51,7 +52,7 @@ class SpvContext( val p2PService: P2PService val addressManager: AddressManager val transactionService: TransactionService - val pendingTransactionContainer: PendingTransactionContainer + val pendingTransactionContainer: TxManager val pendingTransactionDownloadedListener: PendingTransactionDownloadedListener private val addressState: ConcurrentHashMap = ConcurrentHashMap() @@ -100,7 +101,7 @@ class SpvContext( blockStore = BlockStore(networkParameters, directory) transactionPool = TransactionPool() blockchain = Blockchain(blockStore) - pendingTransactionContainer = PendingTransactionContainer() + pendingTransactionContainer = TxManager(blockchain) p2PService = P2PService(pendingTransactionContainer, networkParameters) addressManager = AddressManager() val walletFile = File(directory, filePrefix + FILE_EXTENSION) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/model/PopTransactionLight.kt b/nodecore-spv/src/main/java/org/veriblock/spv/model/PopTransactionLight.kt index 4d948edef..987b88ea8 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/model/PopTransactionLight.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/model/PopTransactionLight.kt @@ -59,7 +59,7 @@ class PopTransactionLight( @Throws(IOException::class) private fun serializeToStream(stream: OutputStream) { stream.write(transactionTypeIdentifier.id.toInt()) - inputAddress!!.serializeToStream(stream) + inputAddress.serializeToStream(stream) SerializeDeserializeService.serialize(endorsedBlock, stream) SerializeDeserializeService.serialize(bitcoinTx, stream) SerializeDeserializeService.serialize(bitcoinMerklePath, stream) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/model/StandardTransaction.kt b/nodecore-spv/src/main/java/org/veriblock/spv/model/StandardTransaction.kt index c8ec0dc34..2f3108236 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/model/StandardTransaction.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/model/StandardTransaction.kt @@ -105,7 +105,7 @@ open class StandardTransaction : Transaction { builder.type = VeriBlockMessages.Transaction.Type.MULTISIG } builder.sourceAmount = inputAmount!!.atomicUnits - builder.sourceAddress = ByteString.copyFrom(inputAddress!!.toByteArray()) + builder.sourceAddress = ByteString.copyFrom(inputAddress.toByteArray()) builder.data = ByteString.copyFrom(data) builder.size = toByteArray(networkParameters).size for (output in getOutputs()) { @@ -127,7 +127,7 @@ open class StandardTransaction : Transaction { stream.write(transactionTypeIdentifier.id.toInt()) // Write source address - inputAddress!!.serializeToStream(stream) + inputAddress.serializeToStream(stream) // Write source amount SerializeDeserializeService.serialize(inputAmount!!, stream) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/model/Transaction.kt b/nodecore-spv/src/main/java/org/veriblock/spv/model/Transaction.kt index f3b87006a..f4ecee9e9 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/model/Transaction.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/model/Transaction.kt @@ -14,7 +14,7 @@ import org.veriblock.core.params.NetworkParameters abstract class Transaction { lateinit var txId: VbkTxId - var inputAddress: AddressLight? = null + lateinit var inputAddress: AddressLight var transactionMeta: TransactionMeta? = null var signature: ByteArray? = null var publicKey: ByteArray? = null diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt b/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt index f51f61da7..e7902ec0e 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt @@ -8,12 +8,13 @@ import org.veriblock.core.crypto.VbkTxId import org.veriblock.core.params.NetworkParameters import org.veriblock.spv.model.TransactionTypeIdentifier import org.veriblock.spv.service.PendingTransactionContainer +import org.veriblock.spv.service.tx.TxManager import org.veriblock.spv.util.nextMessageId private val logger = createLogger {} class P2PService( - private val pendingTransactionContainer: PendingTransactionContainer, + private val pendingTransactionContainer: TxManager, private val networkParameters: NetworkParameters ) { fun onTransactionRequest(txIds: List, sender: SpvPeer) { diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt b/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt index f22864b6a..235c363db 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt @@ -22,6 +22,9 @@ import nodecore.api.grpc.utilities.ByteStringUtility import nodecore.api.grpc.utilities.extensions.toByteString import nodecore.api.grpc.utilities.extensions.toHex import org.veriblock.core.crypto.BloomFilter +import org.veriblock.core.crypto.Sha256Hash +import org.veriblock.core.crypto.asAnyVbkHash +import org.veriblock.core.crypto.asVbkHash import org.veriblock.core.utilities.createLogger import org.veriblock.core.crypto.asVbkTxId import org.veriblock.core.utilities.debugError @@ -32,6 +35,7 @@ import org.veriblock.spv.model.* import org.veriblock.spv.serialization.MessageSerializer import org.veriblock.spv.serialization.MessageSerializer.deserializeNormalTransaction import org.veriblock.spv.service.* +import org.veriblock.spv.service.tx.TxManager import org.veriblock.spv.util.SpvEventBus import org.veriblock.spv.util.Threading import org.veriblock.spv.util.Threading.PEER_TABLE_DISPATCHER @@ -59,7 +63,7 @@ class SpvPeerTable( private val spvContext: SpvContext, private val p2pService: P2PService, peerDiscovery: PeerDiscovery, - pendingTransactionContainer: PendingTransactionContainer + val txMgr: TxManager ) { private val lock = ReentrantLock() private val running = AtomicBoolean(false) @@ -68,7 +72,6 @@ class SpvPeerTable( var maximumPeers = DEFAULT_CONNECTIONS var downloadPeer: SpvPeer? = null val bloomFilter: BloomFilter - private val pendingTransactionContainer: PendingTransactionContainer private val peers = ConcurrentHashMap() private val pendingPeers = ConcurrentHashMap() @@ -84,17 +87,18 @@ class SpvPeerTable( bloomFilter = createBloomFilter() blockchain = spvContext.blockchain discovery = peerDiscovery - this.pendingTransactionContainer = pendingTransactionContainer - SpvEventBus.pendingTransactionDownloadedEvent.register( - spvContext.pendingTransactionDownloadedListener, - spvContext.pendingTransactionDownloadedListener::onPendingTransactionDownloaded - ) + SpvEventBus.pendingTransactionDownloadedEvent.register(this) { + logger.debug {"Tx=${it.txId} downloaded..."} + } } fun start() { running.set(true) + SpvEventBus.newBestBlockEvent.register(this) { + txMgr.onVbkBestBlock(it) + } SpvEventBus.peerConnectedEvent.register(this, ::onPeerConnected) SpvEventBus.peerDisconnectedEvent.register(this, ::onPeerDisconnected) SpvEventBus.messageReceivedEvent.register(this) { diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/Model.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/Model.kt index 0996ef5ca..b856b09e5 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/Model.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/Model.kt @@ -1,7 +1,9 @@ package org.veriblock.spv.service +import org.veriblock.core.crypto.AnyVbkHash import org.veriblock.core.crypto.Sha256Hash import org.veriblock.core.crypto.VbkTxId +import org.veriblock.core.crypto.VbkHash import org.veriblock.sdk.models.Address import org.veriblock.sdk.models.Coin import org.veriblock.spv.model.AddressLight @@ -89,7 +91,7 @@ data class TransactionInfo( val bitcoinBlockHash: String, val bitcoinTxId: String, val bitcoinConfirmations: Int, - val blockHash: String, + val blockHash: AnyVbkHash, val merklePath: String ) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt index 85809f0c3..14f8ae875 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt @@ -42,6 +42,8 @@ import org.veriblock.spv.model.Transaction import org.veriblock.spv.model.asLightAddress import org.veriblock.spv.net.SpvPeerTable import org.veriblock.spv.service.TransactionService.Companion.predictAltChainEndorsementTransactionSize +import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TxStatusChangedEvent import org.veriblock.spv.util.buildMessage import java.io.File import java.io.IOException @@ -54,7 +56,7 @@ class SpvService( private val peerTable: SpvPeerTable, private val transactionService: TransactionService, private val addressManager: AddressManager, - private val pendingTransactionContainer: PendingTransactionContainer, + private val pendingTransactionContainer: TxManager, private val blockchain: Blockchain ) { fun getStateInfo(): StateInfo { @@ -123,7 +125,14 @@ class SpvService( addressCoinsIndexList, outputs ) return transactions.asFlow().onEach { - pendingTransactionContainer.addTransaction(it) + val event = pendingTransactionContainer.addTransaction(it) + event.register(this) { e -> + when (e) { + is TxStatusChangedEvent.Confirmation -> logger.info { "Tx=${e.id} has confirmation=${e.confirmations} in a block=${e.block}" } + is TxStatusChangedEvent.GotRequiredConfirmationsN -> logger.info { "Tx=${e.id} is confirmed!" } + is TxStatusChangedEvent.Invalid -> logger.info { "Tx=${e.id} is invalid" } + } + } peerTable.advertise(it) }.map { it.txId @@ -348,7 +357,7 @@ class SpvService( fun getLastBitcoinBlock(): Sha256Hash = spvContext.networkParameters.bitcoinOriginBlock.hash //Mock todo SPV-111 - fun getTransactions(ids: List) = ids.mapNotNull { + fun getTransactions(ids: List): List = ids.mapNotNull { pendingTransactionContainer.getTransactionInfo(it) } diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/TransactionService.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/TransactionService.kt index cfd32c7f0..708b998a0 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/TransactionService.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/TransactionService.kt @@ -233,7 +233,7 @@ class TransactionService( builder.txId = ByteString.copyFrom(tx.txId.bytes) builder.type = VeriBlockMessages.Transaction.Type.STANDARD builder.sourceAmount = tx.inputAmount!!.atomicUnits - builder.sourceAddress = ByteString.copyFrom(tx.inputAddress!!.toByteArray()) + builder.sourceAddress = ByteString.copyFrom(tx.inputAddress.toByteArray()) builder.data = ByteString.copyFrom(tx.data) // builder.setTimestamp(getTimeStamp()); // builder.setSize(tx.getSize()); diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt new file mode 100644 index 000000000..371434e1f --- /dev/null +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt @@ -0,0 +1,281 @@ +package org.veriblock.spv.service.tx + +import org.veriblock.core.crypto.Sha256Hash +import org.veriblock.core.crypto.asAnyVbkHash +import org.veriblock.core.crypto.asVbkHash +import org.veriblock.core.utilities.Event +import org.veriblock.core.utilities.Utility +import org.veriblock.core.utilities.createLogger +import org.veriblock.sdk.models.Address +import org.veriblock.sdk.models.Constants +import org.veriblock.sdk.models.VeriBlockBlock +import org.veriblock.sdk.models.VeriBlockMerklePath +import org.veriblock.spv.model.Transaction +import org.veriblock.spv.service.Blockchain +import org.veriblock.spv.service.TransactionInfo +import org.veriblock.spv.service.tx.TxStatusChangedEvent.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +private val logger = createLogger {} + +data class MempoolEntry( + val tx: Transaction, +) { + val id: Sha256Hash get() = tx.txId + + // timestamp, ms + val addedTime: Long = System.currentTimeMillis() +} + +sealed class TxStatusChangedEvent { + // intermediate status = can be received more than once + class Confirmation(val id: Sha256Hash, val block: VeriBlockBlock, val confirmations: Int) : TxStatusChangedEvent() + + // TODO(warchant): when nodecore can respond "tx is invalid" in TransactionInfo, modify this event to support this + // final status = can be received exactly once, then all listeners unsubscribed + class Invalid(val id: Sha256Hash) : TxStatusChangedEvent() + + // final status + class GotRequiredConfirmationsN(val id: Sha256Hash, val block: VeriBlockBlock, val confirmations: Int) : TxStatusChangedEvent() +} + +private data class TxIdMonitorEntry( + val id: Sha256Hash, + var observer: Event, + var lastTxInfoTime: Long, + val requiredConfirmations: Int +) { + var lastTxInfo: TransactionInfo? = null + + // it was cryptographically proven that this block contains this tx + var containingBlock: VeriBlockBlock? = null + + // <0 = unknown + // 0 = in a mempool + // >0 = in a block + var confirmations: Int = -1 +} + +private const val TEN_MINUTES = 10 * 60 * 1000 /* ms */ + +class TxManager( + val blockchain: Blockchain +) { + private val lock = ReentrantLock() + private val mempool: MutableMap = ConcurrentHashMap() + private val monitoredTxIds = ConcurrentHashMap() + private val txesByAddress = ConcurrentHashMap>() + + fun getPendingTxIds(): List = monitoredTxIds.keys().toList() + + fun addTransaction(id: Sha256Hash, confirmations: Int = 1): Event { + check(confirmations > 0) { + "Confirmations must be > 0" + } + + val event = Event("OnTxStatusChanged=${id}"); + monitoredTxIds[id] = TxIdMonitorEntry( + id = id, + observer = event, + requiredConfirmations = confirmations, + lastTxInfoTime = System.currentTimeMillis() + ) + + return event + } + + fun addTransaction(transaction: Transaction, confirmations: Int = 1): Event = lock.withLock { + check(confirmations > 0) { + "Confirmations must be > 0" + } + + val id = transaction.txId + mempool[id] = MempoolEntry(transaction) + val addr = Address(transaction.inputAddress.get()) + txesByAddress.getOrPut(addr) { ArrayList() }.add(id) + + val event = addTransaction(id, confirmations) + event.register(this) { e -> + when (e) { + is Invalid -> clearMempoolTxesWithGreaterOrEqualSigIndex(e.id) + is GotRequiredConfirmationsN -> removeTxId(e.id) + else -> return@register + } + } + + return event + } + + fun removeTxId(id: Sha256Hash) { + // first, try to remove from `monitoredTxIds`. + // if does not exist - all other storages will not contain this `id` + val e = monitoredTxIds.remove(id) ?: return + + // is `id` in mempool? + val entry = mempool.remove(e.id) ?: return + val addr = Address(entry.tx.inputAddress.get()) + // if txesByAddress contains `addr`, remove `id` + txesByAddress[addr]?.remove(e.id) + // if txesByAddress array is empty, cleanup + if (txesByAddress[addr]?.isEmpty() == true) { + txesByAddress.remove(addr) + } + } + + fun getTransaction(id: Sha256Hash): Transaction? { + return mempool[id]?.tx + } + + fun getTransactionInfo(id: Sha256Hash): TransactionInfo? { + return monitoredTxIds[id]?.lastTxInfo + } + + fun getPendingSignatureIndexForAddress(address: Address): Long? = txesByAddress[address] + ?.mapNotNull { mempool[it] } + ?.map { it.tx.getSignatureIndex() } + ?.maxOrNull() + + /** + * Execute this callback whenever we get a response "missing transaction" for given tx 'id' + * @return false if peer sent us response for tx which we never requested/not monitor + */ + fun onMissingTransactionInfo(id: Sha256Hash): Boolean { + val item = monitoredTxIds[id] ?: return false + val current = System.currentTimeMillis() + if (item.containingBlock == null && current - item.lastTxInfoTime > TEN_MINUTES) { + // last time this tx received 'TxInfo' was 10 min ago, Tx is likely to be invalid. + logger.warn { "Transaction=${item.id} is marked as invalid" } + item.observer.trigger(Invalid(item.id)) + } + + return true + } + + /** + * Execute this callback whenever we get a response with "TxInfo". + * @return false if peer sent us txinfo which we never requested/not monitor, or cryptographic proof is invalid + */ + fun onTransactionInfo(info: TransactionInfo): Boolean = lock.withLock { + val id = info.transaction.txId + val item = monitoredTxIds[id] + ?: return false + item.lastTxInfoTime = System.currentTimeMillis() + item.lastTxInfo = info + + // do we know containing block? + if (item.containingBlock == null && item.confirmations <= 0) { + // no. is tx in mempool? + if (info.confirmations == 0) { + // yes! tx is is mempool + item.confirmations = 0 + item.containingBlock = null + return true + } + + // we don't know containing block yet, and tx is not in mempool + } + + val hash = try { + info.blockHash.trimToPreviousBlockSize() + } catch (e: Exception) { + return false + } + val block = blockchain.getBlock(hash) + // can't find block locally, ignore this tx info + ?: return false + + val mp = VeriBlockMerklePath(info.merklePath) + // verify merkle proof for this TX + if (mp.merkleRoot.truncate() != block.header.merkleRoot) { + // can't prove that TX is in `block` + logger.warn { "mp=${mp.merkleRoot}, trimmed=${mp.merkleRoot.truncate()} == mroot=${block.header.merkleRoot}" } + return false + } + + // txinfo is cryptographically valid + + // determine which block is earlier AND on active chain - the one we know about or the one peer just sent us + val containingBlock = item.containingBlock ?: block.header + if (blockchain.isOnActiveChain(block.hash) && block.height < containingBlock.height) { + // new block is on active chain and is earlier + item.containingBlock = block.header + } else if (blockchain.isOnActiveChain(containingBlock.hash)) { + // old containing block is on active chain and is earlier + item.containingBlock = containingBlock + } else { + // neither of these two are on active chain, reset known containing block... + item.containingBlock = null + item.confirmations = -1 + return true + } + + // we do not rely on remote peer's 'confirmations', we calculate confirmations ourselves + val tip = blockchain.activeChain.tip + item.confirmations = tip.height - containingBlock.height + 1 /* containing itself */ + + return true + } + + /** + * Execute this function whenever VBK Best Block changes. + */ + fun onVbkBestBlock(block: VeriBlockBlock) { + val index = blockchain.getBlockIndex(block.hash) + ?: throw IllegalStateException( + "Invariant failed: OnVbkBestBlock is called with a block which does not exist in VBK blockchain or not on active chain. Block=${block}" + ) + + // index is supposed to be on active chain. + check(blockchain.isOnActiveChain(block.hash)) { + // if this happens, it is likely caused by incorrect API usage or concurrency issue + "Invariant failed: OnVbkBestBlock is called on a block=${block}, which is not on active chain!" + } + + monitoredTxIds.values.forEach { tx -> + val containingBlock = tx.containingBlock ?: return + + lock.withLock { + // is containing block in direct ancestry of best block? + val ancestor = index.getAncestorAtHeight(containingBlock.height) + if (ancestor?.smallHash != containingBlock.hash.trimToPreviousBlockSize()) { + // containing block is not direct ancestor of best chain + tx.confirmations = -1 + tx.observer.trigger(Confirmation(tx.id, containingBlock, tx.confirmations)) + return + } + + // this tx is contained in a block which is in direct ancestry of best block + tx.confirmations = block.height - ancestor.height + 1 /* containing itself */ + tx.observer.trigger(Confirmation(tx.id, containingBlock, tx.confirmations)) + if (tx.confirmations >= tx.requiredConfirmations) { + tx.observer.trigger(GotRequiredConfirmationsN(tx.id, containingBlock, tx.confirmations)) + // unsubscribe listeners + tx.observer.clear() + } + } + + } + } + + // we determined that TX with `id` is invalid. All TXes that have SigIndex >= TX.getSigIndex() shall be removed. + private fun clearMempoolTxesWithGreaterOrEqualSigIndex(id: Sha256Hash) { + // this id must be in mempool + val invalid = mempool.remove(id) + ?: throw IllegalStateException("Invariant failed: got Invalid(id=$id) but mempool does not contain this TX") + + // take all txids sent by this Address and filter them + val address = Address(invalid.tx.inputAddress.get()) + val txids = txesByAddress[address] + ?: throw IllegalStateException("Invariant failed: mempool contained TX=$id but txesByAddress didn't!") + + // do remove + txids + .asSequence() + .mapNotNull { mempool[it] } + // remove all txes whose sig index is greater or equal than invalid sigindex + .filter { invalid.tx.getSignatureIndex() <= it.tx.getSignatureIndex() } + .forEach { removeTxId(it.id) } + } +} diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/wallet/Ledger.kt b/nodecore-spv/src/main/java/org/veriblock/spv/wallet/Ledger.kt index fc5445694..48cffc178 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/wallet/Ledger.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/wallet/Ledger.kt @@ -57,9 +57,9 @@ class Ledger { } else { LedgerEntry.Status.PENDING } - if (entries.containsKey(tx.inputAddress!!.get())) { + if (entries.containsKey(tx.inputAddress.get())) { ledgerEntries.add( - LedgerEntry(tx.inputAddress!!.get(), tx.txId, tx.inputAmount!!, Coin.ZERO, tx.getSignatureIndex(), 0, status) + LedgerEntry(tx.inputAddress.get(), tx.txId, tx.inputAmount!!, Coin.ZERO, tx.getSignatureIndex(), 0, status) ) } for (i in tx.getOutputs().indices) { diff --git a/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt b/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt index 59e915b5c..66804c213 100644 --- a/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt +++ b/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt @@ -39,6 +39,7 @@ import org.veriblock.spv.service.SpvService import org.veriblock.spv.service.Blockchain import org.veriblock.spv.service.PendingTransactionContainer import org.veriblock.spv.service.TransactionService +import org.veriblock.spv.service.tx.TxManager import java.io.IOException import java.security.KeyPairGenerator @@ -48,7 +49,7 @@ class AdminApiServiceTest { private lateinit var addressManager: AddressManager private lateinit var peerTable: SpvPeerTable private lateinit var spvService: SpvService - private lateinit var transactionContainer: PendingTransactionContainer + private lateinit var transactionContainer: TxManager private lateinit var blockchain: Blockchain val address = "VHoWCZrQB4kqLHm1EoNoU8rih7ohyG" val block = defaultTestNetParameters.genesisBlock diff --git a/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt b/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt index 664c76469..a62edbcab 100644 --- a/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt +++ b/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt @@ -19,10 +19,11 @@ import org.veriblock.spv.model.asStandardAddress import org.veriblock.spv.net.P2PService import org.veriblock.spv.net.SpvPeer import org.veriblock.spv.service.PendingTransactionContainer +import org.veriblock.spv.service.tx.TxManager class P2PServiceTest { private lateinit var spvContext: SpvContext - private lateinit var pendingTransactionContainer: PendingTransactionContainer + private lateinit var pendingTransactionContainer: TxManager private lateinit var peer: SpvPeer private lateinit var p2PService: P2PService diff --git a/pop-miners/altchain-pop-miner/src/main/kotlin/org/veriblock/miners/pop/transactionmonitor/TransactionMonitor.kt b/pop-miners/altchain-pop-miner/src/main/kotlin/org/veriblock/miners/pop/transactionmonitor/TransactionMonitor.kt index 7eb1cbdbd..aa852fb8f 100644 --- a/pop-miners/altchain-pop-miner/src/main/kotlin/org/veriblock/miners/pop/transactionmonitor/TransactionMonitor.kt +++ b/pop-miners/altchain-pop-miner/src/main/kotlin/org/veriblock/miners/pop/transactionmonitor/TransactionMonitor.kt @@ -75,7 +75,7 @@ class TransactionMonitor( ?: error("Unable to retrieve pending transactions") tx.transactionMeta.depth = it.confirmations tx.transactionMeta.appearsAtChainHeight = it.blockNumber - tx.transactionMeta.appearsInBestChainBlock = it.blockHash.asAnyVbkHash() + tx.transactionMeta.appearsInBestChainBlock = it.blockHash tx.merklePath = VeriBlockMerklePath(it.merklePath) tx.transactionMeta.setState(TransactionMeta.MetaState.CONFIRMED) } From a71753b04c583dc82dc8b1b1bfc3b5582f519267 Mon Sep 17 00:00:00 2001 From: PCasafont Date: Wed, 16 Dec 2020 01:32:49 +0100 Subject: [PATCH 2/2] Merge updates --- .../grpc/utilities/extensions/ByteString.kt | 3 ++ .../main/java/org/veriblock/spv/SpvContext.kt | 12 +++--- .../java/org/veriblock/spv/net/P2PService.kt | 6 +-- .../org/veriblock/spv/net/SpvPeerTable.kt | 14 +------ .../org/veriblock/spv/service/SpvService.kt | 5 +-- .../task/PendingTransactionsUpdateTask.kt | 37 +++++++++++++------ .../{TxManager.kt => TransactionManager.kt} | 11 +++++- .../admin/service/impl/AdminApiServiceTest.kt | 6 +-- .../veriblock/spv/net/impl/P2PServiceTest.kt | 6 +-- 9 files changed, 54 insertions(+), 46 deletions(-) rename nodecore-spv/src/main/java/org/veriblock/spv/service/tx/{TxManager.kt => TransactionManager.kt} (97%) diff --git a/nodecore-grpc/src/main/java/nodecore/api/grpc/utilities/extensions/ByteString.kt b/nodecore-grpc/src/main/java/nodecore/api/grpc/utilities/extensions/ByteString.kt index fb79263b0..ec811627f 100644 --- a/nodecore-grpc/src/main/java/nodecore/api/grpc/utilities/extensions/ByteString.kt +++ b/nodecore-grpc/src/main/java/nodecore/api/grpc/utilities/extensions/ByteString.kt @@ -1,9 +1,11 @@ package nodecore.api.grpc.utilities.extensions import com.google.protobuf.ByteString +import org.veriblock.core.crypto.AnyVbkHash import org.veriblock.core.crypto.PreviousBlockVbkHash import org.veriblock.core.crypto.PreviousKeystoneVbkHash import org.veriblock.core.crypto.VbkHash +import org.veriblock.core.crypto.asAnyVbkHash import org.veriblock.core.crypto.asVbkHash import org.veriblock.core.crypto.asVbkPreviousBlockHash import org.veriblock.core.crypto.asVbkPreviousKeystoneHash @@ -46,3 +48,4 @@ fun ByteArray.toByteString(): ByteString = fun ByteString.asVbkHash(): VbkHash = toByteArray().asVbkHash() fun ByteString.asVbkPreviousBlockHash(): PreviousBlockVbkHash = toByteArray().asVbkPreviousBlockHash() fun ByteString.asVbkPreviousKeystoneHash(): PreviousKeystoneVbkHash = toByteArray().asVbkPreviousKeystoneHash() +fun ByteString.asAnyVbkHash(): AnyVbkHash = toByteArray().asAnyVbkHash() diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt b/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt index 50e3218be..bc4882959 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/SpvContext.kt @@ -21,7 +21,7 @@ import org.veriblock.spv.model.LedgerValue import org.veriblock.spv.model.TransactionPool import org.veriblock.spv.net.* import org.veriblock.spv.service.* -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager import org.veriblock.spv.util.AddressStateChangeEvent import org.veriblock.spv.util.SpvEventBus.addressStateUpdatedEvent import org.veriblock.spv.wallet.PendingTransactionDownloadedListener @@ -52,7 +52,7 @@ class SpvContext( val p2PService: P2PService val addressManager: AddressManager val transactionService: TransactionService - val pendingTransactionContainer: TxManager + val transactionManager: TransactionManager val pendingTransactionDownloadedListener: PendingTransactionDownloadedListener private val addressState: ConcurrentHashMap = ConcurrentHashMap() @@ -101,17 +101,17 @@ class SpvContext( blockStore = BlockStore(networkParameters, directory) transactionPool = TransactionPool() blockchain = Blockchain(blockStore) - pendingTransactionContainer = TxManager(blockchain) - p2PService = P2PService(pendingTransactionContainer, networkParameters) + transactionManager = TransactionManager(blockchain) + p2PService = P2PService(transactionManager, networkParameters) addressManager = AddressManager() val walletFile = File(directory, filePrefix + FILE_EXTENSION) addressManager.load(walletFile) pendingTransactionDownloadedListener = PendingTransactionDownloadedListener(this) - peerTable = SpvPeerTable(this, p2PService, peerDiscovery, pendingTransactionContainer) + peerTable = SpvPeerTable(this, p2PService, peerDiscovery) transactionService = TransactionService(addressManager, networkParameters) spvService = SpvService( this, peerTable, transactionService, addressManager, - pendingTransactionContainer, blockchain + transactionManager, blockchain ) Runtime.getRuntime().addShutdownHook(Thread({ shutdown() }, "ShutdownHook nodecore-spv")) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt b/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt index e7902ec0e..806686480 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/net/P2PService.kt @@ -3,18 +3,16 @@ package org.veriblock.spv.net import com.google.protobuf.ByteString import nodecore.api.grpc.VeriBlockMessages import org.veriblock.core.utilities.createLogger -import org.veriblock.core.crypto.Sha256Hash import org.veriblock.core.crypto.VbkTxId import org.veriblock.core.params.NetworkParameters import org.veriblock.spv.model.TransactionTypeIdentifier -import org.veriblock.spv.service.PendingTransactionContainer -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager import org.veriblock.spv.util.nextMessageId private val logger = createLogger {} class P2PService( - private val pendingTransactionContainer: TxManager, + private val pendingTransactionContainer: TransactionManager, private val networkParameters: NetworkParameters ) { fun onTransactionRequest(txIds: List, sender: SpvPeer) { diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt b/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt index 235c363db..2b5c302aa 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/net/SpvPeerTable.kt @@ -19,23 +19,17 @@ import nodecore.api.grpc.VeriBlockMessages import nodecore.api.grpc.VeriBlockMessages.Event.ResultsCase import nodecore.api.grpc.VeriBlockMessages.TransactionAnnounce import nodecore.api.grpc.utilities.ByteStringUtility -import nodecore.api.grpc.utilities.extensions.toByteString -import nodecore.api.grpc.utilities.extensions.toHex import org.veriblock.core.crypto.BloomFilter -import org.veriblock.core.crypto.Sha256Hash -import org.veriblock.core.crypto.asAnyVbkHash -import org.veriblock.core.crypto.asVbkHash import org.veriblock.core.utilities.createLogger import org.veriblock.core.crypto.asVbkTxId import org.veriblock.core.utilities.debugError -import org.veriblock.core.utilities.debugWarn import org.veriblock.sdk.models.VeriBlockBlock import org.veriblock.spv.SpvContext import org.veriblock.spv.model.* import org.veriblock.spv.serialization.MessageSerializer import org.veriblock.spv.serialization.MessageSerializer.deserializeNormalTransaction import org.veriblock.spv.service.* -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager import org.veriblock.spv.util.SpvEventBus import org.veriblock.spv.util.Threading import org.veriblock.spv.util.Threading.PEER_TABLE_DISPATCHER @@ -62,8 +56,7 @@ const val AMOUNT_OF_BLOCKS_WHEN_WE_CAN_START_WORKING = 4//50 class SpvPeerTable( private val spvContext: SpvContext, private val p2pService: P2PService, - peerDiscovery: PeerDiscovery, - val txMgr: TxManager + peerDiscovery: PeerDiscovery ) { private val lock = ReentrantLock() private val running = AtomicBoolean(false) @@ -96,9 +89,6 @@ class SpvPeerTable( fun start() { running.set(true) - SpvEventBus.newBestBlockEvent.register(this) { - txMgr.onVbkBestBlock(it) - } SpvEventBus.peerConnectedEvent.register(this, ::onPeerConnected) SpvEventBus.peerDisconnectedEvent.register(this, ::onPeerDisconnected) SpvEventBus.messageReceivedEvent.register(this) { diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt index 14f8ae875..354db3d98 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/SpvService.kt @@ -26,7 +26,6 @@ import org.veriblock.core.utilities.extensions.toHex import org.veriblock.core.wallet.AddressManager import org.veriblock.core.wallet.AddressPubKey import org.veriblock.sdk.models.Address -import org.veriblock.sdk.models.Coin import org.veriblock.sdk.models.asCoin import org.veriblock.sdk.services.SerializeDeserializeService import org.veriblock.spv.SpvContext @@ -42,7 +41,7 @@ import org.veriblock.spv.model.Transaction import org.veriblock.spv.model.asLightAddress import org.veriblock.spv.net.SpvPeerTable import org.veriblock.spv.service.TransactionService.Companion.predictAltChainEndorsementTransactionSize -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager import org.veriblock.spv.service.tx.TxStatusChangedEvent import org.veriblock.spv.util.buildMessage import java.io.File @@ -56,7 +55,7 @@ class SpvService( private val peerTable: SpvPeerTable, private val transactionService: TransactionService, private val addressManager: AddressManager, - private val pendingTransactionContainer: TxManager, + private val pendingTransactionContainer: TransactionManager, private val blockchain: Blockchain ) { fun getStateInfo(): StateInfo { diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/task/PendingTransactionsUpdateTask.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/task/PendingTransactionsUpdateTask.kt index 7c0016055..b2cbdd846 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/task/PendingTransactionsUpdateTask.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/task/PendingTransactionsUpdateTask.kt @@ -1,7 +1,9 @@ package org.veriblock.spv.net - +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map import nodecore.api.grpc.VeriBlockMessages +import nodecore.api.grpc.utilities.extensions.asAnyVbkHash import nodecore.api.grpc.utilities.extensions.toByteString import nodecore.api.grpc.utilities.extensions.toHex import org.veriblock.core.crypto.asVbkTxId @@ -29,7 +31,7 @@ fun SpvContext.startPendingTransactionsUpdateTask() { } suspend fun SpvContext.requestPendingTransactions() { - val pendingTransactionIds = pendingTransactionContainer.getPendingTransactionIds() + val pendingTransactionIds = transactionManager.getPendingTransactionIds() try { for (txId in pendingTransactionIds) { val request = buildMessage { @@ -37,15 +39,28 @@ suspend fun SpvContext.requestPendingTransactions() { .setId(txId.bytes.toByteString()) .build() } - val response = peerTable.requestMessage(request) - if (response.transactionReply.success) { - pendingTransactionContainer.updateTransactionInfo(response.transactionReply.transaction.toModel()) - } else { - val transaction = pendingTransactionContainer.getTransaction(txId) - if (transaction != null) { - peerTable.advertise(transaction) + peerTable.requestAllMessages(request) + .map { it.transactionReply } + .collect { + if (it.success) { + // tx exists + val info = it.transaction.toModel() + logger.info { "Tx=${txId} found: VBK:${info.blockNumber}:${info.blockHash}"} + + transactionManager.onTransactionInfo(info) + } else { + // remote peer does not know about this tx. + logger.info { "Tx=${txId} NOT found"} + + val transaction = transactionManager.getTransaction(txId) + if (transaction != null) { + // rebroadcast it + peerTable.advertise(transaction) + } + + transactionManager.onMissingTransactionInfo(txId) + } } - } } } catch (e: Exception) { logger.debugWarn(e) { "Unable to request pending transactions" } @@ -61,7 +76,7 @@ private fun VeriBlockMessages.TransactionInfo.toModel() = TransactionInfo( bitcoinBlockHash = bitcoinBlockHash.toHex(), bitcoinTxId = bitcoinTxId.toHex(), bitcoinConfirmations = bitcoinConfirmations, - blockHash = blockHash.toHex(), + blockHash = blockHash.asAnyVbkHash(), merklePath = merklePath ) diff --git a/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt b/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TransactionManager.kt similarity index 97% rename from nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt rename to nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TransactionManager.kt index 371434e1f..8d427681f 100644 --- a/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TxManager.kt +++ b/nodecore-spv/src/main/java/org/veriblock/spv/service/tx/TransactionManager.kt @@ -14,6 +14,7 @@ import org.veriblock.spv.model.Transaction import org.veriblock.spv.service.Blockchain import org.veriblock.spv.service.TransactionInfo import org.veriblock.spv.service.tx.TxStatusChangedEvent.* +import org.veriblock.spv.util.SpvEventBus import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -60,7 +61,7 @@ private data class TxIdMonitorEntry( private const val TEN_MINUTES = 10 * 60 * 1000 /* ms */ -class TxManager( +class TransactionManager( val blockchain: Blockchain ) { private val lock = ReentrantLock() @@ -68,7 +69,13 @@ class TxManager( private val monitoredTxIds = ConcurrentHashMap() private val txesByAddress = ConcurrentHashMap>() - fun getPendingTxIds(): List = monitoredTxIds.keys().toList() + init { + SpvEventBus.newBestBlockEvent.register(this) { + onVbkBestBlock(it) + } + } + + fun getPendingTransactionIds(): List = monitoredTxIds.keys().toList() fun addTransaction(id: Sha256Hash, confirmations: Int = 1): Event { check(confirmations > 0) { diff --git a/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt b/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt index 66804c213..c30f6c03f 100644 --- a/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt +++ b/nodecore-spv/src/test/java/org/veriblock/spv/admin/service/impl/AdminApiServiceTest.kt @@ -16,7 +16,6 @@ import org.veriblock.core.ImportException import org.veriblock.core.SendCoinsException import org.veriblock.core.WalletException import org.veriblock.core.WalletLockedException -import org.veriblock.core.crypto.EMPTY_BITCOIN_HASH import org.veriblock.core.crypto.EMPTY_VBK_TX import org.veriblock.core.params.defaultTestNetParameters import org.veriblock.core.types.Pair @@ -37,9 +36,8 @@ import org.veriblock.spv.model.asLightAddress import org.veriblock.spv.net.SpvPeerTable import org.veriblock.spv.service.SpvService import org.veriblock.spv.service.Blockchain -import org.veriblock.spv.service.PendingTransactionContainer import org.veriblock.spv.service.TransactionService -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager import java.io.IOException import java.security.KeyPairGenerator @@ -49,7 +47,7 @@ class AdminApiServiceTest { private lateinit var addressManager: AddressManager private lateinit var peerTable: SpvPeerTable private lateinit var spvService: SpvService - private lateinit var transactionContainer: TxManager + private lateinit var transactionContainer: TransactionManager private lateinit var blockchain: Blockchain val address = "VHoWCZrQB4kqLHm1EoNoU8rih7ohyG" val block = defaultTestNetParameters.genesisBlock diff --git a/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt b/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt index a62edbcab..ff53f0d2d 100644 --- a/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt +++ b/nodecore-spv/src/test/java/org/veriblock/spv/net/impl/P2PServiceTest.kt @@ -7,7 +7,6 @@ import nodecore.api.grpc.VeriBlockMessages import org.junit.Before import org.junit.Test import org.veriblock.core.Context -import org.veriblock.core.crypto.EMPTY_BITCOIN_HASH import org.veriblock.core.crypto.EMPTY_VBK_TX import org.veriblock.core.params.defaultTestNetParameters import org.veriblock.sdk.models.asCoin @@ -18,12 +17,11 @@ import org.veriblock.spv.model.StandardTransaction import org.veriblock.spv.model.asStandardAddress import org.veriblock.spv.net.P2PService import org.veriblock.spv.net.SpvPeer -import org.veriblock.spv.service.PendingTransactionContainer -import org.veriblock.spv.service.tx.TxManager +import org.veriblock.spv.service.tx.TransactionManager class P2PServiceTest { private lateinit var spvContext: SpvContext - private lateinit var pendingTransactionContainer: TxManager + private lateinit var pendingTransactionContainer: TransactionManager private lateinit var peer: SpvPeer private lateinit var p2PService: P2PService