Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions alembic/versions/1b8a8038d5eb_add_prompt_score_v4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add score v4

Revision ID: 1b8a8038d5eb
Revises: 26ab499a7e04
Create Date: 2026-02-09 18:10:27.571647

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB

# revision identifiers, used by Alembic.
revision: str = "1b8a8038d5eb"
down_revision: Union[str, None] = "26ab499a7e04"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"miner_scores", sa.Column("prompt_score_v4", sa.Float, nullable=True)
)
op.add_column(
"miner_scores", sa.Column("score_details_v4", JSONB, nullable=True)
)


def downgrade() -> None:
op.drop_column("miner_scores", "prompt_score_v4")
op.drop_column("miner_scores", "score_details_v4")
28 changes: 15 additions & 13 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,23 @@ def forward_score(self):
# with predictions and calculate the rewards,
# we store the rewards in the miner_scores table
# ========================================== #
bt.logging.info(
f"forward score {LOW_FREQUENCY.label} frequency", "forward_score"
)
current_time = get_current_time()
scored_time: datetime = round_time_to_minutes(current_time)

success_low = calculate_scores(
self.miner_data_handler,
self.price_data_provider,
scored_time,
LOW_FREQUENCY,
self.config.neuron.nprocs,
)
# bt.logging.info(
# f"forward score {LOW_FREQUENCY.label} frequency", "forward_score"
# )
# current_time = get_current_time()
# scored_time: datetime = round_time_to_minutes(current_time)

# success_low = calculate_scores(
# self.miner_data_handler,
# self.price_data_provider,
# scored_time,
# LOW_FREQUENCY,
# self.config.neuron.nprocs,
# )

scored_time: datetime = round_time_to_minutes(current_time)
current_time = get_current_time()
scored_time: datetime = round_time_to_minutes(current_time)
bt.logging.info(
f"forward score {HIGH_FREQUENCY.label} frequency", "forward_score"
)
Expand All @@ -213,6 +214,7 @@ def forward_score(self):
HIGH_FREQUENCY,
self.config.neuron.nprocs,
)
return

self.cleanup_history()

Expand Down
2 changes: 2 additions & 0 deletions synth/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ class MinerScore(Base):
)
prompt_score = Column(Float, nullable=False)
prompt_score_v3 = Column(Float, nullable=False)
prompt_score_v4 = Column(Float, nullable=False)
score_details = Column(JSONB, nullable=False)
score_details_v3 = Column(JSONB, nullable=False)
score_details_v4 = Column(JSONB, nullable=False)

prediction = relationship("MinerPrediction", back_populates="scores")

Expand Down
2 changes: 1 addition & 1 deletion synth/validator/crps_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def calculate_crps_for_miner(
for interval_name, interval_seconds in scoring_intervals.items():
interval_steps = get_interval_steps(interval_seconds, time_increment)
absolute_price = interval_name.endswith("_abs")
is_gap = interval_name.endswith("_gap")
is_gap = interval_name.endswith("_gaps")

# If we are considering absolute prices, adjust the interval steps for potential gaps:
# if only the initial price is present, then decrease the interval step
Expand Down
2 changes: 2 additions & 0 deletions synth/validator/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def calculate_scores(
price_data_provider=price_data_provider,
validator_request=validator_request,
nprocs=nprocs,
prompt_score_version="v4",
)

print_scores_df(prompt_scores, detailed_info)
Expand All @@ -168,6 +169,7 @@ def calculate_scores(
int(validator_request.id),
detailed_info,
miner_score_time,
"v4",
)

# Success if at least one request succeed
Expand Down
45 changes: 25 additions & 20 deletions synth/validator/miner_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def set_miner_scores(
validator_requests_id: int,
reward_details: list[dict],
scored_time: datetime,
prompt_score_version: str = "v3",
):
try:
with self.engine.connect() as connection:
Expand Down Expand Up @@ -230,33 +231,36 @@ def set_miner_scores(
"miner_predictions_id": row[
"miner_prediction_id"
],
"score_details_v3": {
f"score_details_{prompt_score_version}": {
"total_crps": row["total_crps"],
"percentile90": row["percentile90"],
"lowest_score": row["lowest_score"],
"prompt_score_v3": row["prompt_score_v3"],
f"prompt_score_{prompt_score_version}": row[
f"prompt_score_{prompt_score_version}"
],
"crps_data": row["crps_data"],
},
"prompt_score_v3": row["prompt_score_v3"],
f"prompt_score_{prompt_score_version}": row[
f"prompt_score_{prompt_score_version}"
],
}
)
insert_stmt_miner_scores = (
insert(MinerScore)
.values(rows_to_insert)
.on_conflict_do_update(
constraint="uq_miner_scores_miner_predictions_id",
set_={
"score_details_v3": {
"total_crps": row["total_crps"],
"percentile90": row["percentile90"],
"lowest_score": row["lowest_score"],
"prompt_score_v3": row["prompt_score_v3"],
"crps_data": row["crps_data"],
},
"prompt_score_v3": row["prompt_score_v3"],
},
)
insert_stmt = insert(MinerScore).values(rows_to_insert)

insert_stmt_miner_scores = insert_stmt.on_conflict_do_update(
constraint="uq_miner_scores_miner_predictions_id",
set_={
f"score_details_{prompt_score_version}": getattr(
insert_stmt.excluded,
f"score_details_{prompt_score_version}",
),
f"prompt_score_{prompt_score_version}": getattr(
insert_stmt.excluded,
f"prompt_score_{prompt_score_version}",
),
},
)

connection.execute(insert_stmt_miner_scores)
except Exception as e:
bt.logging.exception(
Expand Down Expand Up @@ -403,7 +407,8 @@ def get_validator_requests_to_score(
and_(
MinerPrediction.validator_requests_id
== ValidatorRequest.id,
MinerScore.prompt_score_v3.isnot(None),
# MinerScore.prompt_score_v3.isnot(None),
MinerScore.prompt_score_v4.isnot(None),
)
)
)
Expand Down
111 changes: 7 additions & 104 deletions synth/validator/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# the Software.

import typing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import shared_memory
import time

Expand Down Expand Up @@ -215,12 +215,13 @@ def _build_detailed_info(
miner_prediction_process_time: list,
percentile90: float,
lowest_score: float,
prompt_score_version: str = "v3",
) -> list[dict]:
"""Build detailed information dict from processing results."""
return [
{
"miner_uid": pred.miner_uid,
"prompt_score_v3": float(prompt_score),
f"prompt_score_{prompt_score_version}": float(prompt_score),
"percentile90": float(percentile90),
"lowest_score": float(lowest_score),
"miner_prediction_id": prediction_id,
Expand Down Expand Up @@ -255,6 +256,7 @@ def get_rewards_multiprocess(
price_data_provider: PriceDataProvider,
validator_request: ValidatorRequest,
nprocs: int = 2,
prompt_score_version: str = "v3",
) -> tuple[typing.Optional[np.ndarray], list, list[dict]]:
"""
Returns an array of rewards for the given query and responses.
Expand Down Expand Up @@ -359,6 +361,7 @@ def get_rewards_multiprocess(
miner_prediction_process_time,
percentile90,
lowest_score,
prompt_score_version,
)

return prompt_scores, detailed_info, real_prices
Expand Down Expand Up @@ -434,6 +437,7 @@ def get_rewards(
miner_data_handler: MinerDataHandler,
price_data_provider: PriceDataProvider,
validator_request: ValidatorRequest,
prompt_score_version: str = "v3",
) -> tuple[typing.Optional[np.ndarray], list, list[dict]]:
"""
Returns an array of rewards for the given query and responses.
Expand Down Expand Up @@ -491,108 +495,7 @@ def get_rewards(
detailed_info = [
{
"miner_uid": miner_uid,
"prompt_score_v3": float(prompt_score),
"percentile90": float(percentile90),
"lowest_score": float(lowest_score),
"miner_prediction_id": (
miner_prediction.id if miner_prediction else None
),
"format_validation": (
miner_prediction.format_validation
if miner_prediction
else None
),
"process_time": (
miner_prediction.process_time if miner_prediction else None
),
"total_crps": float(score),
"crps_data": clean_numpy_in_crps_data(crps_data),
}
for miner_uid, score, crps_data, prompt_score, miner_prediction in zip(
miner_uids,
scores,
detailed_crps_data_list,
prompt_scores,
miner_prediction_list,
)
]

return prompt_scores, detailed_info, real_prices


@print_execution_time
def get_rewards_threading(
miner_data_handler: MinerDataHandler,
price_data_provider: PriceDataProvider,
validator_request: ValidatorRequest,
) -> tuple[typing.Optional[np.ndarray], list, list[dict]]:
"""
Returns an array of rewards for the given query and responses.

Args:
- query (int): The query sent to the miner.
- responses (List[float]): A list of responses from the miner.

Returns:
- np.ndarray: An array of rewards for the given query and responses.
"""
miner_uids = miner_data_handler.get_miner_uid_of_prediction_request(
int(validator_request.id)
)

if miner_uids is None:
return None, [], []

try:
real_prices = price_data_provider.fetch_data(validator_request)
except Exception as e:
bt.logging.warning(
f"Error fetching data for validator request "
f"{validator_request.id}: {e}"
)
return None, [], []

# Submit ALL tasks first, THEN collect results
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(
reward,
miner_data_handler.get_miner_prediction(
miner_uid, int(validator_request.id)
),
miner_uid,
validator_request,
real_prices,
)
for miner_uid in miner_uids
]

# Collect results in order
results = [f.result() for f in futures]

scores = []
detailed_crps_data_list = []
miner_prediction_list = []

for score, detailed_crps_data, miner_prediction in results:
scores.append(score)
detailed_crps_data_list.append(detailed_crps_data)
miner_prediction_list.append(miner_prediction)

score_values = np.array(scores)
prompt_scores, percentile90, lowest_score = compute_prompt_scores(
score_values
)

if prompt_scores is None:
return None, [], []

# gather all the detailed information
# for log and debug purposes
detailed_info = [
{
"miner_uid": miner_uid,
"prompt_score_v3": float(prompt_score),
f"prompt_score_{prompt_score_version}": float(prompt_score),
"percentile90": float(percentile90),
"lowest_score": float(lowest_score),
"miner_prediction_id": (
Expand Down
Loading