From 8301e97ff100defd97377e679a170f8fdc42b77a Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Thu, 13 Mar 2025 18:12:52 -0500 Subject: [PATCH 1/3] add async timing logs --- neurons/validator.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 69941b69..52d2274f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -18,7 +18,7 @@ import pandas as pd from async_timeout import timeout import tenacity - +from collections import defaultdict import folding.utils.constants as c from folding.base.reward import BatchRewardInput @@ -59,16 +59,6 @@ def __init__(self, config=None): # If we do not have any miner registry saved to the machine, create. if not hasattr(self, "miner_registry"): self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids) - else: - REFERENCE_BLOCK = 5055585 + 7200 - if REFERENCE_BLOCK > self.block: - registry_path = os.path.join( - self.config.neuron.full_path, "miner_registry.pkl" - ) - # Now, remove the miner_registry file from local disk - logger.info(f"Removing old miner registry at {registry_path}") - os.remove(registry_path) - self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids) # Init sync with the network. Updates the metagraph. self.sync() @@ -77,6 +67,8 @@ def __init__(self, config=None): self.wandb_run_start = None self.RSYNC_EXCEPTION_COUNT = 0 + self.ASYNC_TIMINGS = defaultdict(list) + self.validator_hotkey_reference = self.wallet.hotkey.ss58_address[:8] # The last time that we checked the global job pool. @@ -504,6 +496,9 @@ async def create_synthetic_jobs(self): except Exception as e: logger.error(f"Error in create_jobs: {traceback.format_exc()}") + self.ASYNC_TIMINGS["create_synthetic_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) await asyncio.sleep(self.config.neuron.synthetic_job_interval) async def update_jobs(self): @@ -547,6 +542,9 @@ async def update_jobs(self): logger.error(f"Error in update_jobs: {traceback.format_exc()}") self.step += 1 + self.ASYNC_TIMINGS["update_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) logger.info( f"Sleeping {self.config.neuron.update_interval} seconds before next job update loop." @@ -618,6 +616,9 @@ async def reward_loop(self): while True: try: await asyncio.sleep(60) + self.ASYNC_TIMINGS["reward_loop"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) await self.read_and_update_rewards() except Exception as e: logger.error(f"Error in reward_loop: {traceback.format_exc()}") @@ -629,6 +630,9 @@ async def sync_loop(self): try: await asyncio.sleep(self.config.neuron.epoch_length * seconds_per_block) self.sync() + self.ASYNC_TIMINGS["sync_loop"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) except Exception as e: logger.error(f"Error in sync_loop: {traceback.format_exc()}") @@ -641,6 +645,9 @@ async def monitor_db(self): await asyncio.sleep(300) try: outdated = await self.store.monitor_db() + self.ASYNC_TIMINGS["monitor_db"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) except Exception as e: logger.error(f"Error in monitor_db: {traceback.format_exc()}") await self.start_rqlite() From 31fd37906628965fe53510fa4eaabc4b4b7c6a79 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Thu, 13 Mar 2025 18:15:31 -0500 Subject: [PATCH 2/3] save async timings every 60 seconds --- neurons/validator.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/neurons/validator.py b/neurons/validator.py index 52d2274f..78f7c1ba 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -28,6 +28,7 @@ # import base validator class which takes care of most of the boilerplate from folding.store import Job, SQLiteJobStore +from folding.utils.ops import write_pkl from folding.utils.logger import logger from folding.utils.logging import log_event from folding.utils.uids import get_all_miner_uids @@ -693,6 +694,7 @@ async def __aenter__(self): self.loop.create_task(self._organic_scoring.start_loop()) self.loop.create_task(self.start_organic_api()) self.loop.create_task(self.monitor_validator()) + self.loop.create_task(self.save_async_timings()) self.is_running = True logger.debug("Starting validator in background thread.") return self @@ -718,6 +720,17 @@ async def __aexit__(self, exc_type, exc_value, traceback): self.loop.stop() logger.debug("Stopped") + async def save_async_timings(self): + while True: + await asyncio.sleep(60) + try: + write_pkl( + self.ASYNC_TIMINGS, + os.path.join(self.config.neuron.full_path, "async_timings.pkl"), + ) + except Exception: + logger.error(f"Error in save_async_timings: {traceback.format_exc()}") + async def main(): async with Validator() as v: From 21a3a2818ab83c6c7cb10583eecda526aec28634 Mon Sep 17 00:00:00 2001 From: mccrindlebrian Date: Thu, 13 Mar 2025 18:20:19 -0500 Subject: [PATCH 3/3] re-arrange where timing is logged --- neurons/validator.py | 51 ++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 78f7c1ba..a16d711c 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -463,6 +463,10 @@ async def create_synthetic_jobs(self): """ while True: + self.ASYNC_TIMINGS["create_synthetic_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) + try: logger.info("Starting job creation loop.") queue = self.store.get_queue( @@ -494,12 +498,9 @@ async def create_synthetic_jobs(self): "Job queue is full. Sleeping 60 seconds before next job creation loop." ) - except Exception as e: + except Exception: logger.error(f"Error in create_jobs: {traceback.format_exc()}") - self.ASYNC_TIMINGS["create_synthetic_jobs"].append( - datetime.now().strftime("%Y-%m-%dT%H:%M:%S") - ) await asyncio.sleep(self.config.neuron.synthetic_job_interval) async def update_jobs(self): @@ -508,6 +509,10 @@ async def update_jobs(self): # Wait at the beginning of update_jobs since we want to avoid attemping to update jobs before we get data back. await asyncio.sleep(self.config.neuron.update_interval) + self.ASYNC_TIMINGS["update_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) + logger.info("Updating jobs.") for job in self.store.get_queue( @@ -539,14 +544,10 @@ async def update_jobs(self): await self.update_job(job=job) logger.info(f"step({self.step}) block({self.block})") - except Exception as e: + except Exception: logger.error(f"Error in update_jobs: {traceback.format_exc()}") self.step += 1 - self.ASYNC_TIMINGS["update_jobs"].append( - datetime.now().strftime("%Y-%m-%dT%H:%M:%S") - ) - logger.info( f"Sleeping {self.config.neuron.update_interval} seconds before next job update loop." ) @@ -621,7 +622,7 @@ async def reward_loop(self): datetime.now().strftime("%Y-%m-%dT%H:%M:%S") ) await self.read_and_update_rewards() - except Exception as e: + except Exception: logger.error(f"Error in reward_loop: {traceback.format_exc()}") async def sync_loop(self): @@ -630,11 +631,11 @@ async def sync_loop(self): seconds_per_block = 12 try: await asyncio.sleep(self.config.neuron.epoch_length * seconds_per_block) - self.sync() self.ASYNC_TIMINGS["sync_loop"].append( datetime.now().strftime("%Y-%m-%dT%H:%M:%S") ) - except Exception as e: + self.sync() + except Exception: logger.error(f"Error in sync_loop: {traceback.format_exc()}") async def monitor_db(self): @@ -649,7 +650,7 @@ async def monitor_db(self): self.ASYNC_TIMINGS["monitor_db"].append( datetime.now().strftime("%Y-%m-%dT%H:%M:%S") ) - except Exception as e: + except Exception: logger.error(f"Error in monitor_db: {traceback.format_exc()}") await self.start_rqlite() @@ -658,7 +659,7 @@ async def monitor_db(self): await self.start_rqlite() else: logger.debug("Database is up-to-date.") - except Exception as e: + except Exception: logger.error(f"Error in monitor_db: {traceback.format_exc()}") async def monitor_validator(self): @@ -680,6 +681,17 @@ async def monitor_validator(self): ) self.should_exit = True + async def save_async_timings(self): + while True: + await asyncio.sleep(60) + try: + write_pkl( + self.ASYNC_TIMINGS, + os.path.join(self.config.neuron.full_path, "async_timings.pkl"), + ) + except Exception: + logger.error(f"Error in save_async_timings: {traceback.format_exc()}") + async def __aenter__(self): await self.start_rqlite() await asyncio.sleep(10) # Wait for rqlite to start @@ -720,17 +732,6 @@ async def __aexit__(self, exc_type, exc_value, traceback): self.loop.stop() logger.debug("Stopped") - async def save_async_timings(self): - while True: - await asyncio.sleep(60) - try: - write_pkl( - self.ASYNC_TIMINGS, - os.path.join(self.config.neuron.full_path, "async_timings.pkl"), - ) - except Exception: - logger.error(f"Error in save_async_timings: {traceback.format_exc()}") - async def main(): async with Validator() as v: