From bfc508c78805297347c35fc25527b51c3afbb004 Mon Sep 17 00:00:00 2001 From: Rakni1988 Date: Sat, 15 Mar 2025 18:01:41 +0100 Subject: [PATCH] fix: optimize pool refresh logic & prevent redundant block validation --- yadacoin/core/block.py | 3 ++ yadacoin/core/consensus.py | 57 +++++++++++++++++++++----------- yadacoin/core/miningpool.py | 9 +++-- yadacoin/core/processingqueue.py | 3 +- 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/yadacoin/core/block.py b/yadacoin/core/block.py index 9794e58c..6640fcda 100644 --- a/yadacoin/core/block.py +++ b/yadacoin/core/block.py @@ -125,6 +125,7 @@ class Block(object): "target", "special_target", "header", + "is_verified", ) @classmethod @@ -144,6 +145,7 @@ async def init_async( header="", target: int = 0, special_target: int = 0, + is_verified: bool = False ): self = cls() self.config = Config() @@ -182,6 +184,7 @@ async def init_async( transaction = Transaction.ensure_instance(txn) transaction.coinbase = Block.is_coinbase(self, transaction) self.transactions.append(transaction) + self.is_verified = is_verified return self diff --git a/yadacoin/core/consensus.py b/yadacoin/core/consensus.py index ecf48ce2..f2875bd6 100644 --- a/yadacoin/core/consensus.py +++ b/yadacoin/core/consensus.py @@ -112,7 +112,19 @@ async def process_block_queue_item(self, item): self.config.processing_queues.block_queue.inc_num_items_processed() stream = item.stream body = item.body + source = item.source + if body: + stream_source = body.get("method", "unknown") + else: + stream_source = source + + if stream is not None: + stream.source = stream_source + + self.app_log.info(f"Processing block from source: {stream_source}") + + if body and "method" in body: if body["method"] == "blockresponse": payload = body.get("result", {}) block = payload.get("block") @@ -145,7 +157,6 @@ async def process_block_queue_item(self, item): return block = await Block.from_dict(block) - stream.peer.block = block if block.time > time(): @@ -178,6 +189,7 @@ async def process_block_queue_item(self, item): return self.config.processing_queues.block_queue.time_sum_start() + if isinstance(item.blockchain.init_blocks, list): first_block = await Block.from_dict(item.blockchain.first_block) else: @@ -208,9 +220,10 @@ async def process_block_queue_item(self, item): if count < 1: return elif count == 1: - await self.integrate_block_with_existing_chain(first_block, stream) + await self.integrate_block_with_existing_chain(first_block, stream, source=stream_source) else: - await self.integrate_blocks_with_existing_chain(item.blockchain, stream) + await self.integrate_blocks_with_existing_chain(item.blockchain, stream, source=stream_source) + async def remove_pending_transactions_now_in_chain(self, block): # remove transactions from miner_transactions collection in the blockchain @@ -453,7 +466,7 @@ async def build_backward_from_block_to_fork( backward_blocks.append(retrace_consensus_block) return backward_blocks, status - async def integrate_block_with_existing_chain(self, block: Block, stream): + async def integrate_block_with_existing_chain(self, block: Block, stream, source="unknown"): self.app_log.debug("integrate_block_with_existing_chain") backward_blocks, status = await self.build_backward_from_block_to_fork( block, [], stream @@ -478,9 +491,9 @@ async def integrate_block_with_existing_chain(self, block: Block, stream): ) return False - await self.integrate_blocks_with_existing_chain(inbound_blockchain, stream) + await self.integrate_blocks_with_existing_chain(inbound_blockchain, stream, source) - async def integrate_blocks_with_existing_chain(self, blockchain, stream): + async def integrate_blocks_with_existing_chain(self, blockchain, stream, source="unknown"): self.app_log.debug("integrate_blocks_with_existing_chain") extra_blocks = [x async for x in blockchain.blocks] @@ -489,6 +502,7 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream): async for block in blockchain.blocks: if self.config.network == "regnet": break + if not await Blockchain.test_block( block, extra_blocks=extra_blocks, simulate_last_block=prev_block ): @@ -498,6 +512,8 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream): break else: return + + block.is_verified = True prev_block = block i += 1 @@ -525,25 +541,26 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream): return async for block in blockchain.blocks: - if ( - not await Blockchain.test_block(block) - and self.config.network == "mainnet" - ): - return - await self.insert_block(block, stream) + if not getattr(block, "is_verified", False) and self.config.network == "mainnet": + if not await Blockchain.test_block(block): + return + + await self.insert_block(block, stream, source) if stream: stream.syncing = False - async def insert_block(self, block, stream): + async def insert_block(self, block, stream, source="unknown"): self.app_log.debug("insert_block") + + self.app_log.info(f"Inserting block {block.index} from {source}") + try: await self.mongo.async_db.blocks.delete_many( {"index": {"$gte": block.index}} ) db_block = block.to_dict() - db_block["updated_at"] = time() await self.mongo.async_db.blocks.replace_one( @@ -556,21 +573,21 @@ async def insert_block(self, block, stream): await self.config.LatestBlock.update_latest_block() - self.app_log.info("New block inserted for height: {}".format(block.index)) + self.app_log.info(f"New block inserted for height: {block.index}") - if self.config.mp: - if self.syncing or (hasattr(stream, "syncing") and stream.syncing): - return True + if self.config.mp and source in ["newblock", "miningpool"]: + self.app_log.info("Updating miners (newblock detected).") try: await self.config.mp.refresh() except Exception: - self.app_log.warning("{}".format(format_exc())) + self.app_log.warning("Failed to refresh miners: {}".format(format_exc())) try: await StratumServer.block_checker() except Exception: - self.app_log.warning("{}".format(format_exc())) + self.app_log.warning("Block checker failed: {}".format(format_exc())) return True + except Exception: self.app_log.warning("{}".format(format_exc())) diff --git a/yadacoin/core/miningpool.py b/yadacoin/core/miningpool.py index 538d9c75..8af4fd06 100644 --- a/yadacoin/core/miningpool.py +++ b/yadacoin/core/miningpool.py @@ -22,6 +22,7 @@ from yadacoin.core.chain import CHAIN from yadacoin.core.config import Config from yadacoin.core.job import Job +from yadacoin.core.miner import Miner from yadacoin.core.peer import Peer from yadacoin.core.processingqueue import BlockProcessingQueueItem from yadacoin.core.transaction import Transaction @@ -104,7 +105,7 @@ async def process_nonce(self, miner, nonce, job, body): self.config.app_log.debug(f"Nonce for job {job.index}: {nonce}") hash1 = self.block_factory.generate_hash_from_header(job.index, header, nonce) - self.config.app_log.info(f"Hash1 for job {job.index}: {hash1}") + #self.config.app_log.info(f"Hash1 for job {job.index}: {hash1}") if self.block_factory.index >= CHAIN.BLOCK_V5_FORK: hash1_test = Blockchain.little_hash(hash1) @@ -288,7 +289,7 @@ async def refresh(self): # TODO: to be taken care of, no refresh atm between blocks try: if self.refreshing or not await Peer.is_synced(): - return + self.app_log.warning("Pool not fully synced, but refreshing anyway!") self.refreshing = True await self.config.LatestBlock.block_checker() if self.block_factory: @@ -658,12 +659,10 @@ async def accept_block(self, block): await self.config.consensus.insert_consensus_block(block, self.config.peer) self.config.processing_queues.block_queue.add( - BlockProcessingQueueItem(Blockchain(block.to_dict())) + BlockProcessingQueueItem(Blockchain(block.to_dict()), source="miningpool") ) if self.config.network != "regnet": await self.config.nodeShared.send_block_to_peers(block) await self.config.websocketServer.send_block(block) - - await self.refresh() diff --git a/yadacoin/core/processingqueue.py b/yadacoin/core/processingqueue.py index 01b472fc..e5596b89 100644 --- a/yadacoin/core/processingqueue.py +++ b/yadacoin/core/processingqueue.py @@ -47,10 +47,11 @@ def to_status_dict(self): class BlockProcessingQueueItem: - def __init__(self, blockchain: Blockchain, stream=None, body=None): + def __init__(self, blockchain: Blockchain, stream=None, body=None, source="unknown"): self.blockchain = blockchain self.body = body or {} self.stream = stream + self.source = source class BlockProcessingQueue(ProcessingQueue):