diff --git a/neurons/validator.py b/neurons/validator.py index 69941b69..a16d711c 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -18,7 +18,7 @@ import pandas as pd from async_timeout import timeout import tenacity - +from collections import defaultdict import folding.utils.constants as c from folding.base.reward import BatchRewardInput @@ -28,6 +28,7 @@ # import base validator class which takes care of most of the boilerplate from folding.store import Job, SQLiteJobStore +from folding.utils.ops import write_pkl from folding.utils.logger import logger from folding.utils.logging import log_event from folding.utils.uids import get_all_miner_uids @@ -59,16 +60,6 @@ def __init__(self, config=None): # If we do not have any miner registry saved to the machine, create. if not hasattr(self, "miner_registry"): self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids) - else: - REFERENCE_BLOCK = 5055585 + 7200 - if REFERENCE_BLOCK > self.block: - registry_path = os.path.join( - self.config.neuron.full_path, "miner_registry.pkl" - ) - # Now, remove the miner_registry file from local disk - logger.info(f"Removing old miner registry at {registry_path}") - os.remove(registry_path) - self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids) # Init sync with the network. Updates the metagraph. self.sync() @@ -77,6 +68,8 @@ def __init__(self, config=None): self.wandb_run_start = None self.RSYNC_EXCEPTION_COUNT = 0 + self.ASYNC_TIMINGS = defaultdict(list) + self.validator_hotkey_reference = self.wallet.hotkey.ss58_address[:8] # The last time that we checked the global job pool. @@ -470,6 +463,10 @@ async def create_synthetic_jobs(self): """ while True: + self.ASYNC_TIMINGS["create_synthetic_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) + try: logger.info("Starting job creation loop.") queue = self.store.get_queue( @@ -501,7 +498,7 @@ async def create_synthetic_jobs(self): "Job queue is full. Sleeping 60 seconds before next job creation loop." ) - except Exception as e: + except Exception: logger.error(f"Error in create_jobs: {traceback.format_exc()}") await asyncio.sleep(self.config.neuron.synthetic_job_interval) @@ -512,6 +509,10 @@ async def update_jobs(self): # Wait at the beginning of update_jobs since we want to avoid attemping to update jobs before we get data back. await asyncio.sleep(self.config.neuron.update_interval) + self.ASYNC_TIMINGS["update_jobs"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) + logger.info("Updating jobs.") for job in self.store.get_queue( @@ -543,11 +544,10 @@ async def update_jobs(self): await self.update_job(job=job) logger.info(f"step({self.step}) block({self.block})") - except Exception as e: + except Exception: logger.error(f"Error in update_jobs: {traceback.format_exc()}") self.step += 1 - logger.info( f"Sleeping {self.config.neuron.update_interval} seconds before next job update loop." ) @@ -618,8 +618,11 @@ async def reward_loop(self): while True: try: await asyncio.sleep(60) + self.ASYNC_TIMINGS["reward_loop"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) await self.read_and_update_rewards() - except Exception as e: + except Exception: logger.error(f"Error in reward_loop: {traceback.format_exc()}") async def sync_loop(self): @@ -628,8 +631,11 @@ async def sync_loop(self): seconds_per_block = 12 try: await asyncio.sleep(self.config.neuron.epoch_length * seconds_per_block) + self.ASYNC_TIMINGS["sync_loop"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) self.sync() - except Exception as e: + except Exception: logger.error(f"Error in sync_loop: {traceback.format_exc()}") async def monitor_db(self): @@ -641,7 +647,10 @@ async def monitor_db(self): await asyncio.sleep(300) try: outdated = await self.store.monitor_db() - except Exception as e: + self.ASYNC_TIMINGS["monitor_db"].append( + datetime.now().strftime("%Y-%m-%dT%H:%M:%S") + ) + except Exception: logger.error(f"Error in monitor_db: {traceback.format_exc()}") await self.start_rqlite() @@ -650,7 +659,7 @@ async def monitor_db(self): await self.start_rqlite() else: logger.debug("Database is up-to-date.") - except Exception as e: + except Exception: logger.error(f"Error in monitor_db: {traceback.format_exc()}") async def monitor_validator(self): @@ -672,6 +681,17 @@ async def monitor_validator(self): ) self.should_exit = True + async def save_async_timings(self): + while True: + await asyncio.sleep(60) + try: + write_pkl( + self.ASYNC_TIMINGS, + os.path.join(self.config.neuron.full_path, "async_timings.pkl"), + ) + except Exception: + logger.error(f"Error in save_async_timings: {traceback.format_exc()}") + async def __aenter__(self): await self.start_rqlite() await asyncio.sleep(10) # Wait for rqlite to start @@ -686,6 +706,7 @@ async def __aenter__(self): self.loop.create_task(self._organic_scoring.start_loop()) self.loop.create_task(self.start_organic_api()) self.loop.create_task(self.monitor_validator()) + self.loop.create_task(self.save_async_timings()) self.is_running = True logger.debug("Starting validator in background thread.") return self