From 53bbd7d16825fc610c3d768c7284f4e49d95aa81 Mon Sep 17 00:00:00 2001 From: Thykof Date: Wed, 4 Feb 2026 20:10:16 +0100 Subject: [PATCH 1/3] fix #210 scoring gaps typo --- synth/validator/crps_calculation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synth/validator/crps_calculation.py b/synth/validator/crps_calculation.py index 530ae17d..44f8bece 100644 --- a/synth/validator/crps_calculation.py +++ b/synth/validator/crps_calculation.py @@ -35,7 +35,7 @@ def calculate_crps_for_miner( for interval_name, interval_seconds in scoring_intervals.items(): interval_steps = get_interval_steps(interval_seconds, time_increment) absolute_price = interval_name.endswith("_abs") - is_gap = interval_name.endswith("_gap") + is_gap = interval_name.endswith("_gaps") # If we are considering absolute prices, adjust the interval steps for potential gaps: # if only the initial price is present, then decrease the interval step From c961391ef92c27ae8fa46c2a126909a933dff7cd Mon Sep 17 00:00:00 2001 From: Thykof Date: Mon, 9 Feb 2026 18:29:20 +0100 Subject: [PATCH 2/3] wip: score with v4 --- .../1b8a8038d5eb_add_prompt_score_v4.py | 33 ++++++ neurons/validator.py | 28 +++-- synth/db/models.py | 2 + synth/validator/forward.py | 2 + synth/validator/miner_data_handler.py | 24 ++-- synth/validator/reward.py | 111 ++---------------- 6 files changed, 76 insertions(+), 124 deletions(-) create mode 100644 alembic/versions/1b8a8038d5eb_add_prompt_score_v4.py diff --git a/alembic/versions/1b8a8038d5eb_add_prompt_score_v4.py b/alembic/versions/1b8a8038d5eb_add_prompt_score_v4.py new file mode 100644 index 00000000..494501d5 --- /dev/null +++ b/alembic/versions/1b8a8038d5eb_add_prompt_score_v4.py @@ -0,0 +1,33 @@ +"""add score v4 + +Revision ID: 1b8a8038d5eb +Revises: 26ab499a7e04 +Create Date: 2026-02-09 18:10:27.571647 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + +# revision identifiers, used by Alembic. +revision: str = "1b8a8038d5eb" +down_revision: Union[str, None] = "26ab499a7e04" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "miner_scores", sa.Column("prompt_score_v4", sa.Float, nullable=True) + ) + op.add_column( + "miner_scores", sa.Column("score_details_v4", JSONB, nullable=True) + ) + + +def downgrade() -> None: + op.drop_column("miner_scores", "prompt_score_v4") + op.drop_column("miner_scores", "score_details_v4") diff --git a/neurons/validator.py b/neurons/validator.py index 6452bbc5..edea2d6a 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -187,22 +187,23 @@ def forward_score(self): # with predictions and calculate the rewards, # we store the rewards in the miner_scores table # ========================================== # - bt.logging.info( - f"forward score {LOW_FREQUENCY.label} frequency", "forward_score" - ) - current_time = get_current_time() - scored_time: datetime = round_time_to_minutes(current_time) - success_low = calculate_scores( - self.miner_data_handler, - self.price_data_provider, - scored_time, - LOW_FREQUENCY, - self.config.neuron.nprocs, - ) + # bt.logging.info( + # f"forward score {LOW_FREQUENCY.label} frequency", "forward_score" + # ) + # current_time = get_current_time() + # scored_time: datetime = round_time_to_minutes(current_time) + + # success_low = calculate_scores( + # self.miner_data_handler, + # self.price_data_provider, + # scored_time, + # LOW_FREQUENCY, + # self.config.neuron.nprocs, + # ) - scored_time: datetime = round_time_to_minutes(current_time) current_time = get_current_time() + scored_time: datetime = round_time_to_minutes(current_time) bt.logging.info( f"forward score {HIGH_FREQUENCY.label} frequency", "forward_score" ) @@ -213,6 +214,7 @@ def forward_score(self): HIGH_FREQUENCY, self.config.neuron.nprocs, ) + return self.cleanup_history() diff --git a/synth/db/models.py b/synth/db/models.py index 48cdb89b..40c6ee28 100644 --- a/synth/db/models.py +++ b/synth/db/models.py @@ -117,8 +117,10 @@ class MinerScore(Base): ) prompt_score = Column(Float, nullable=False) prompt_score_v3 = Column(Float, nullable=False) + prompt_score_v4 = Column(Float, nullable=False) score_details = Column(JSONB, nullable=False) score_details_v3 = Column(JSONB, nullable=False) + score_details_v4 = Column(JSONB, nullable=False) prediction = relationship("MinerPrediction", back_populates="scores") diff --git a/synth/validator/forward.py b/synth/validator/forward.py index b3ff9ca7..e1bc36f4 100644 --- a/synth/validator/forward.py +++ b/synth/validator/forward.py @@ -150,6 +150,7 @@ def calculate_scores( price_data_provider=price_data_provider, validator_request=validator_request, nprocs=nprocs, + prompt_score_version="v4", ) print_scores_df(prompt_scores, detailed_info) @@ -168,6 +169,7 @@ def calculate_scores( int(validator_request.id), detailed_info, miner_score_time, + "v4", ) # Success if at least one request succeed diff --git a/synth/validator/miner_data_handler.py b/synth/validator/miner_data_handler.py index 99c69649..dd8c3e1f 100644 --- a/synth/validator/miner_data_handler.py +++ b/synth/validator/miner_data_handler.py @@ -194,6 +194,7 @@ def set_miner_scores( validator_requests_id: int, reward_details: list[dict], scored_time: datetime, + prompt_score_version: str = "v3", ): try: with self.engine.connect() as connection: @@ -230,14 +231,18 @@ def set_miner_scores( "miner_predictions_id": row[ "miner_prediction_id" ], - "score_details_v3": { + f"score_details_{prompt_score_version}": { "total_crps": row["total_crps"], "percentile90": row["percentile90"], "lowest_score": row["lowest_score"], - "prompt_score_v3": row["prompt_score_v3"], + f"prompt_score_{prompt_score_version}": row[ + f"prompt_score_{prompt_score_version}" + ], "crps_data": row["crps_data"], }, - "prompt_score_v3": row["prompt_score_v3"], + f"prompt_score_{prompt_score_version}": row[ + f"prompt_score_{prompt_score_version}" + ], } ) insert_stmt_miner_scores = ( @@ -246,14 +251,18 @@ def set_miner_scores( .on_conflict_do_update( constraint="uq_miner_scores_miner_predictions_id", set_={ - "score_details_v3": { + f"score_details_{prompt_score_version}": { "total_crps": row["total_crps"], "percentile90": row["percentile90"], "lowest_score": row["lowest_score"], - "prompt_score_v3": row["prompt_score_v3"], + f"prompt_score_{prompt_score_version}": row[ + f"prompt_score_{prompt_score_version}" + ], "crps_data": row["crps_data"], }, - "prompt_score_v3": row["prompt_score_v3"], + f"prompt_score_{prompt_score_version}": row[ + f"prompt_score_{prompt_score_version}" + ], }, ) ) @@ -403,7 +412,8 @@ def get_validator_requests_to_score( and_( MinerPrediction.validator_requests_id == ValidatorRequest.id, - MinerScore.prompt_score_v3.isnot(None), + # MinerScore.prompt_score_v3.isnot(None), + MinerScore.prompt_score_v4.isnot(None), ) ) ) diff --git a/synth/validator/reward.py b/synth/validator/reward.py index f9808875..d6670259 100644 --- a/synth/validator/reward.py +++ b/synth/validator/reward.py @@ -12,7 +12,7 @@ # the Software. import typing -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor from multiprocessing import shared_memory import time @@ -215,12 +215,13 @@ def _build_detailed_info( miner_prediction_process_time: list, percentile90: float, lowest_score: float, + prompt_score_version: str = "v3", ) -> list[dict]: """Build detailed information dict from processing results.""" return [ { "miner_uid": pred.miner_uid, - "prompt_score_v3": float(prompt_score), + f"prompt_score_{prompt_score_version}": float(prompt_score), "percentile90": float(percentile90), "lowest_score": float(lowest_score), "miner_prediction_id": prediction_id, @@ -255,6 +256,7 @@ def get_rewards_multiprocess( price_data_provider: PriceDataProvider, validator_request: ValidatorRequest, nprocs: int = 2, + prompt_score_version: str = "v3", ) -> tuple[typing.Optional[np.ndarray], list, list[dict]]: """ Returns an array of rewards for the given query and responses. @@ -359,6 +361,7 @@ def get_rewards_multiprocess( miner_prediction_process_time, percentile90, lowest_score, + prompt_score_version, ) return prompt_scores, detailed_info, real_prices @@ -434,6 +437,7 @@ def get_rewards( miner_data_handler: MinerDataHandler, price_data_provider: PriceDataProvider, validator_request: ValidatorRequest, + prompt_score_version: str = "v3", ) -> tuple[typing.Optional[np.ndarray], list, list[dict]]: """ Returns an array of rewards for the given query and responses. @@ -491,108 +495,7 @@ def get_rewards( detailed_info = [ { "miner_uid": miner_uid, - "prompt_score_v3": float(prompt_score), - "percentile90": float(percentile90), - "lowest_score": float(lowest_score), - "miner_prediction_id": ( - miner_prediction.id if miner_prediction else None - ), - "format_validation": ( - miner_prediction.format_validation - if miner_prediction - else None - ), - "process_time": ( - miner_prediction.process_time if miner_prediction else None - ), - "total_crps": float(score), - "crps_data": clean_numpy_in_crps_data(crps_data), - } - for miner_uid, score, crps_data, prompt_score, miner_prediction in zip( - miner_uids, - scores, - detailed_crps_data_list, - prompt_scores, - miner_prediction_list, - ) - ] - - return prompt_scores, detailed_info, real_prices - - -@print_execution_time -def get_rewards_threading( - miner_data_handler: MinerDataHandler, - price_data_provider: PriceDataProvider, - validator_request: ValidatorRequest, -) -> tuple[typing.Optional[np.ndarray], list, list[dict]]: - """ - Returns an array of rewards for the given query and responses. - - Args: - - query (int): The query sent to the miner. - - responses (List[float]): A list of responses from the miner. - - Returns: - - np.ndarray: An array of rewards for the given query and responses. - """ - miner_uids = miner_data_handler.get_miner_uid_of_prediction_request( - int(validator_request.id) - ) - - if miner_uids is None: - return None, [], [] - - try: - real_prices = price_data_provider.fetch_data(validator_request) - except Exception as e: - bt.logging.warning( - f"Error fetching data for validator request " - f"{validator_request.id}: {e}" - ) - return None, [], [] - - # Submit ALL tasks first, THEN collect results - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [ - executor.submit( - reward, - miner_data_handler.get_miner_prediction( - miner_uid, int(validator_request.id) - ), - miner_uid, - validator_request, - real_prices, - ) - for miner_uid in miner_uids - ] - - # Collect results in order - results = [f.result() for f in futures] - - scores = [] - detailed_crps_data_list = [] - miner_prediction_list = [] - - for score, detailed_crps_data, miner_prediction in results: - scores.append(score) - detailed_crps_data_list.append(detailed_crps_data) - miner_prediction_list.append(miner_prediction) - - score_values = np.array(scores) - prompt_scores, percentile90, lowest_score = compute_prompt_scores( - score_values - ) - - if prompt_scores is None: - return None, [], [] - - # gather all the detailed information - # for log and debug purposes - detailed_info = [ - { - "miner_uid": miner_uid, - "prompt_score_v3": float(prompt_score), + f"prompt_score_{prompt_score_version}": float(prompt_score), "percentile90": float(percentile90), "lowest_score": float(lowest_score), "miner_prediction_id": ( From cbdc13bf358f1482e591628b4c40f0221dccaf3f Mon Sep 17 00:00:00 2001 From: Thykof Date: Mon, 9 Feb 2026 19:19:18 +0100 Subject: [PATCH 3/3] fix the on conflicts --- synth/validator/miner_data_handler.py | 35 ++++++++++++--------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/synth/validator/miner_data_handler.py b/synth/validator/miner_data_handler.py index dd8c3e1f..526cadcf 100644 --- a/synth/validator/miner_data_handler.py +++ b/synth/validator/miner_data_handler.py @@ -245,27 +245,22 @@ def set_miner_scores( ], } ) - insert_stmt_miner_scores = ( - insert(MinerScore) - .values(rows_to_insert) - .on_conflict_do_update( - constraint="uq_miner_scores_miner_predictions_id", - set_={ - f"score_details_{prompt_score_version}": { - "total_crps": row["total_crps"], - "percentile90": row["percentile90"], - "lowest_score": row["lowest_score"], - f"prompt_score_{prompt_score_version}": row[ - f"prompt_score_{prompt_score_version}" - ], - "crps_data": row["crps_data"], - }, - f"prompt_score_{prompt_score_version}": row[ - f"prompt_score_{prompt_score_version}" - ], - }, - ) + insert_stmt = insert(MinerScore).values(rows_to_insert) + + insert_stmt_miner_scores = insert_stmt.on_conflict_do_update( + constraint="uq_miner_scores_miner_predictions_id", + set_={ + f"score_details_{prompt_score_version}": getattr( + insert_stmt.excluded, + f"score_details_{prompt_score_version}", + ), + f"prompt_score_{prompt_score_version}": getattr( + insert_stmt.excluded, + f"prompt_score_{prompt_score_version}", + ), + }, ) + connection.execute(insert_stmt_miner_scores) except Exception as e: bt.logging.exception(