Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import pandas as pd
from async_timeout import timeout
import tenacity

from collections import defaultdict

import folding.utils.constants as c
from folding.base.reward import BatchRewardInput
Expand All @@ -28,6 +28,7 @@

# import base validator class which takes care of most of the boilerplate
from folding.store import Job, SQLiteJobStore
from folding.utils.ops import write_pkl
from folding.utils.logger import logger
from folding.utils.logging import log_event
from folding.utils.uids import get_all_miner_uids
Expand Down Expand Up @@ -59,16 +60,6 @@ def __init__(self, config=None):
# If we do not have any miner registry saved to the machine, create.
if not hasattr(self, "miner_registry"):
self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids)
else:
REFERENCE_BLOCK = 5055585 + 7200
if REFERENCE_BLOCK > self.block:
registry_path = os.path.join(
self.config.neuron.full_path, "miner_registry.pkl"
)
# Now, remove the miner_registry file from local disk
logger.info(f"Removing old miner registry at {registry_path}")
os.remove(registry_path)
self.miner_registry = MinerRegistry(miner_uids=self.all_miner_uids)

# Init sync with the network. Updates the metagraph.
self.sync()
Expand All @@ -77,6 +68,8 @@ def __init__(self, config=None):
self.wandb_run_start = None
self.RSYNC_EXCEPTION_COUNT = 0

self.ASYNC_TIMINGS = defaultdict(list)

self.validator_hotkey_reference = self.wallet.hotkey.ss58_address[:8]

# The last time that we checked the global job pool.
Expand Down Expand Up @@ -470,6 +463,10 @@ async def create_synthetic_jobs(self):
"""

while True:
self.ASYNC_TIMINGS["create_synthetic_jobs"].append(
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)

try:
logger.info("Starting job creation loop.")
queue = self.store.get_queue(
Expand Down Expand Up @@ -501,7 +498,7 @@ async def create_synthetic_jobs(self):
"Job queue is full. Sleeping 60 seconds before next job creation loop."
)

except Exception as e:
except Exception:
logger.error(f"Error in create_jobs: {traceback.format_exc()}")

await asyncio.sleep(self.config.neuron.synthetic_job_interval)
Expand All @@ -512,6 +509,10 @@ async def update_jobs(self):
# Wait at the beginning of update_jobs since we want to avoid attemping to update jobs before we get data back.
await asyncio.sleep(self.config.neuron.update_interval)

self.ASYNC_TIMINGS["update_jobs"].append(
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)

logger.info("Updating jobs.")

for job in self.store.get_queue(
Expand Down Expand Up @@ -543,11 +544,10 @@ async def update_jobs(self):
await self.update_job(job=job)
logger.info(f"step({self.step}) block({self.block})")

except Exception as e:
except Exception:
logger.error(f"Error in update_jobs: {traceback.format_exc()}")

self.step += 1

logger.info(
f"Sleeping {self.config.neuron.update_interval} seconds before next job update loop."
)
Expand Down Expand Up @@ -618,8 +618,11 @@ async def reward_loop(self):
while True:
try:
await asyncio.sleep(60)
self.ASYNC_TIMINGS["reward_loop"].append(
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)
await self.read_and_update_rewards()
except Exception as e:
except Exception:
logger.error(f"Error in reward_loop: {traceback.format_exc()}")

async def sync_loop(self):
Expand All @@ -628,8 +631,11 @@ async def sync_loop(self):
seconds_per_block = 12
try:
await asyncio.sleep(self.config.neuron.epoch_length * seconds_per_block)
self.ASYNC_TIMINGS["sync_loop"].append(
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)
self.sync()
except Exception as e:
except Exception:
logger.error(f"Error in sync_loop: {traceback.format_exc()}")

async def monitor_db(self):
Expand All @@ -641,7 +647,10 @@ async def monitor_db(self):
await asyncio.sleep(300)
try:
outdated = await self.store.monitor_db()
except Exception as e:
self.ASYNC_TIMINGS["monitor_db"].append(
datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)
except Exception:
logger.error(f"Error in monitor_db: {traceback.format_exc()}")
await self.start_rqlite()

Expand All @@ -650,7 +659,7 @@ async def monitor_db(self):
await self.start_rqlite()
else:
logger.debug("Database is up-to-date.")
except Exception as e:
except Exception:
logger.error(f"Error in monitor_db: {traceback.format_exc()}")

async def monitor_validator(self):
Expand All @@ -672,6 +681,17 @@ async def monitor_validator(self):
)
self.should_exit = True

async def save_async_timings(self):
while True:
await asyncio.sleep(60)
try:
write_pkl(
self.ASYNC_TIMINGS,
os.path.join(self.config.neuron.full_path, "async_timings.pkl"),
)
except Exception:
logger.error(f"Error in save_async_timings: {traceback.format_exc()}")

async def __aenter__(self):
await self.start_rqlite()
await asyncio.sleep(10) # Wait for rqlite to start
Expand All @@ -686,6 +706,7 @@ async def __aenter__(self):
self.loop.create_task(self._organic_scoring.start_loop())
self.loop.create_task(self.start_organic_api())
self.loop.create_task(self.monitor_validator())
self.loop.create_task(self.save_async_timings())
self.is_running = True
logger.debug("Starting validator in background thread.")
return self
Expand Down