Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions yadacoin/core/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class Block(object):
"target",
"special_target",
"header",
"is_verified",
)

@classmethod
Expand All @@ -144,6 +145,7 @@ async def init_async(
header="",
target: int = 0,
special_target: int = 0,
is_verified: bool = False
):
self = cls()
self.config = Config()
Expand Down Expand Up @@ -182,6 +184,7 @@ async def init_async(
transaction = Transaction.ensure_instance(txn)
transaction.coinbase = Block.is_coinbase(self, transaction)
self.transactions.append(transaction)
self.is_verified = is_verified

return self

Expand Down
57 changes: 37 additions & 20 deletions yadacoin/core/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,19 @@ async def process_block_queue_item(self, item):
self.config.processing_queues.block_queue.inc_num_items_processed()
stream = item.stream
body = item.body
source = item.source

if body:
stream_source = body.get("method", "unknown")
else:
stream_source = source

if stream is not None:
stream.source = stream_source

self.app_log.info(f"Processing block from source: {stream_source}")

if body and "method" in body:
if body["method"] == "blockresponse":
payload = body.get("result", {})
block = payload.get("block")
Expand Down Expand Up @@ -145,7 +157,6 @@ async def process_block_queue_item(self, item):
return

block = await Block.from_dict(block)

stream.peer.block = block

if block.time > time():
Expand Down Expand Up @@ -178,6 +189,7 @@ async def process_block_queue_item(self, item):
return

self.config.processing_queues.block_queue.time_sum_start()

if isinstance(item.blockchain.init_blocks, list):
first_block = await Block.from_dict(item.blockchain.first_block)
else:
Expand Down Expand Up @@ -208,9 +220,10 @@ async def process_block_queue_item(self, item):
if count < 1:
return
elif count == 1:
await self.integrate_block_with_existing_chain(first_block, stream)
await self.integrate_block_with_existing_chain(first_block, stream, source=stream_source)
else:
await self.integrate_blocks_with_existing_chain(item.blockchain, stream)
await self.integrate_blocks_with_existing_chain(item.blockchain, stream, source=stream_source)


async def remove_pending_transactions_now_in_chain(self, block):
# remove transactions from miner_transactions collection in the blockchain
Expand Down Expand Up @@ -453,7 +466,7 @@ async def build_backward_from_block_to_fork(
backward_blocks.append(retrace_consensus_block)
return backward_blocks, status

async def integrate_block_with_existing_chain(self, block: Block, stream):
async def integrate_block_with_existing_chain(self, block: Block, stream, source="unknown"):
self.app_log.debug("integrate_block_with_existing_chain")
backward_blocks, status = await self.build_backward_from_block_to_fork(
block, [], stream
Expand All @@ -478,9 +491,9 @@ async def integrate_block_with_existing_chain(self, block: Block, stream):
)
return False

await self.integrate_blocks_with_existing_chain(inbound_blockchain, stream)
await self.integrate_blocks_with_existing_chain(inbound_blockchain, stream, source)

async def integrate_blocks_with_existing_chain(self, blockchain, stream):
async def integrate_blocks_with_existing_chain(self, blockchain, stream, source="unknown"):
self.app_log.debug("integrate_blocks_with_existing_chain")

extra_blocks = [x async for x in blockchain.blocks]
Expand All @@ -489,6 +502,7 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream):
async for block in blockchain.blocks:
if self.config.network == "regnet":
break

if not await Blockchain.test_block(
block, extra_blocks=extra_blocks, simulate_last_block=prev_block
):
Expand All @@ -498,6 +512,8 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream):
break
else:
return

block.is_verified = True
prev_block = block
i += 1

Expand Down Expand Up @@ -525,25 +541,26 @@ async def integrate_blocks_with_existing_chain(self, blockchain, stream):
return

async for block in blockchain.blocks:
if (
not await Blockchain.test_block(block)
and self.config.network == "mainnet"
):
return
await self.insert_block(block, stream)
if not getattr(block, "is_verified", False) and self.config.network == "mainnet":
if not await Blockchain.test_block(block):
return

await self.insert_block(block, stream, source)

if stream:
stream.syncing = False

async def insert_block(self, block, stream):
async def insert_block(self, block, stream, source="unknown"):
self.app_log.debug("insert_block")

self.app_log.info(f"Inserting block {block.index} from {source}")

try:
await self.mongo.async_db.blocks.delete_many(
{"index": {"$gte": block.index}}
)

db_block = block.to_dict()

db_block["updated_at"] = time()

await self.mongo.async_db.blocks.replace_one(
Expand All @@ -556,21 +573,21 @@ async def insert_block(self, block, stream):

await self.config.LatestBlock.update_latest_block()

self.app_log.info("New block inserted for height: {}".format(block.index))
self.app_log.info(f"New block inserted for height: {block.index}")

if self.config.mp:
if self.syncing or (hasattr(stream, "syncing") and stream.syncing):
return True
if self.config.mp and source in ["newblock", "miningpool"]:
self.app_log.info("Updating miners (newblock detected).")
try:
await self.config.mp.refresh()
except Exception:
self.app_log.warning("{}".format(format_exc()))
self.app_log.warning("Failed to refresh miners: {}".format(format_exc()))

try:
await StratumServer.block_checker()
except Exception:
self.app_log.warning("{}".format(format_exc()))
self.app_log.warning("Block checker failed: {}".format(format_exc()))

return True

except Exception:
self.app_log.warning("{}".format(format_exc()))
9 changes: 4 additions & 5 deletions yadacoin/core/miningpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from yadacoin.core.chain import CHAIN
from yadacoin.core.config import Config
from yadacoin.core.job import Job
from yadacoin.core.miner import Miner
from yadacoin.core.peer import Peer
from yadacoin.core.processingqueue import BlockProcessingQueueItem
from yadacoin.core.transaction import Transaction
Expand Down Expand Up @@ -104,7 +105,7 @@ async def process_nonce(self, miner, nonce, job, body):
self.config.app_log.debug(f"Nonce for job {job.index}: {nonce}")

hash1 = self.block_factory.generate_hash_from_header(job.index, header, nonce)
self.config.app_log.info(f"Hash1 for job {job.index}: {hash1}")
#self.config.app_log.info(f"Hash1 for job {job.index}: {hash1}")

if self.block_factory.index >= CHAIN.BLOCK_V5_FORK:
hash1_test = Blockchain.little_hash(hash1)
Expand Down Expand Up @@ -288,7 +289,7 @@ async def refresh(self):
# TODO: to be taken care of, no refresh atm between blocks
try:
if self.refreshing or not await Peer.is_synced():
return
self.app_log.warning("Pool not fully synced, but refreshing anyway!")
self.refreshing = True
await self.config.LatestBlock.block_checker()
if self.block_factory:
Expand Down Expand Up @@ -658,12 +659,10 @@ async def accept_block(self, block):
await self.config.consensus.insert_consensus_block(block, self.config.peer)

self.config.processing_queues.block_queue.add(
BlockProcessingQueueItem(Blockchain(block.to_dict()))
BlockProcessingQueueItem(Blockchain(block.to_dict()), source="miningpool")
)

if self.config.network != "regnet":
await self.config.nodeShared.send_block_to_peers(block)

await self.config.websocketServer.send_block(block)

await self.refresh()
3 changes: 2 additions & 1 deletion yadacoin/core/processingqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ def to_status_dict(self):


class BlockProcessingQueueItem:
def __init__(self, blockchain: Blockchain, stream=None, body=None):
def __init__(self, blockchain: Blockchain, stream=None, body=None, source="unknown"):
self.blockchain = blockchain
self.body = body or {}
self.stream = stream
self.source = source


class BlockProcessingQueue(ProcessingQueue):
Expand Down