From b50d8db8328b2956d32ca7d3abd9d310810b5734 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Wed, 4 Jun 2025 18:20:05 -0400 Subject: [PATCH] send observation without reprs first, then reprs --- src/hypofuzz/dashboard/dashboard.py | 173 ++++++++++-------- src/hypofuzz/dashboard/models.py | 126 +++++++++++++ .../frontend/src/context/DataProvider.tsx | 85 +++++---- .../frontend/src/tyche/Representation.tsx | 3 + src/hypofuzz/frontend/src/tyche/Samples.tsx | 9 + src/hypofuzz/frontend/src/types/dashboard.ts | 4 +- src/hypofuzz/hypofuzz.py | 2 + 7 files changed, 285 insertions(+), 117 deletions(-) create mode 100644 src/hypofuzz/dashboard/models.py diff --git a/src/hypofuzz/dashboard/dashboard.py b/src/hypofuzz/dashboard/dashboard.py index 7db2e87a..55965b39 100644 --- a/src/hypofuzz/dashboard/dashboard.py +++ b/src/hypofuzz/dashboard/dashboard.py @@ -4,7 +4,6 @@ import json import math from collections import defaultdict -from enum import IntEnum from pathlib import Path from typing import Any, Literal, Optional @@ -24,6 +23,14 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from trio import MemoryReceiveChannel +from hypofuzz.dashboard.models import ( + AddReportsEvent, + AddTestsEvent, + DashboardEventT, + DashboardEventType, + dashboard_observation, + dashboard_report, +) from hypofuzz.dashboard.patching import make_and_save_patches from hypofuzz.dashboard.test import Test from hypofuzz.database import ( @@ -43,16 +50,6 @@ websockets: set["HypofuzzWebsocket"] = set() -class DashboardEventType(IntEnum): - # minimize header frame overhead with a shared IntEnum definition between - # python and ts. - ADD_TESTS = 1 - ADD_REPORTS = 2 - ADD_ROLLING_OBSERVATIONS = 3 - ADD_CORPUS_OBSERVATIONS = 4 - SET_FAILURE = 5 - - def _sample_reports( reports_by_worker: dict[str, list[ReportWithDiff]], *, soft_limit: int ) -> dict[str, list[ReportWithDiff]]: @@ -88,22 +85,6 @@ def _sample_reports( return by_worker -def report_for_websocket(report: Report) -> dict[str, Any]: - # we send reports to the dashboard in two contexts: attached to a node and worker, - # and as a standalone report. In the former, the dashboard already knows - # the nodeid and worker uuid, and deleting them avoids substantial overhead. - # In the latter, we send the necessary attributes separately. - return { - "elapsed_time": report.elapsed_time, - "status_counts": report.status_counts, - "behaviors": report.behaviors, - "fingerprints": report.fingerprints, - "timestamp": report.timestamp, - "since_new_branch": report.since_new_branch, - "phase": report.phase, - } - - class HypofuzzJSONResponse(JSONResponse): def render(self, content: Any) -> bytes: return json.dumps( @@ -127,10 +108,8 @@ async def receive_json(self) -> None: async def send_json(self, data: Any) -> None: await self.websocket.send_text(json.dumps(data, cls=HypofuzzEncoder)) - async def send_event(self, header: dict[str, Any], data: Any) -> None: - await self.websocket.send_text( - f"{json.dumps(header, cls=HypofuzzEncoder)}|{json.dumps(data, cls=HypofuzzEncoder)}" - ) + async def send_event(self, event: DashboardEventT) -> None: + await self.send_json(event) @abc.abstractmethod async def initial(self, tests: dict[str, Test]) -> None: @@ -147,7 +126,8 @@ class OverviewWebsocket(HypofuzzWebsocket): async def initial(self, tests: dict[str, Test]) -> None: # we start by sending all tests, which is the most important # thing for the user to see first. - tests_data = { + event: AddTestsEvent = { + "type": DashboardEventType.ADD_TESTS, "tests": [ { "database_key": test.database_key, @@ -155,23 +135,22 @@ async def initial(self, tests: dict[str, Test]) -> None: "failure": test.failure, } for test in tests.values() - ] + ], } - await self.send_event({"type": DashboardEventType.ADD_TESTS}, tests_data) + await self.send_event(event) # then we send the reports for each test. for test in tests.values(): # limit for performance reports_by_worker = _sample_reports(test.reports_by_worker, soft_limit=1000) for worker_uuid, reports in reports_by_worker.items(): - report_data = { + report_event: AddReportsEvent = { + "type": DashboardEventType.ADD_REPORTS, "nodeid": test.nodeid, "worker_uuid": worker_uuid, - "reports": [report_for_websocket(report) for report in reports], + "reports": [dashboard_report(report) for report in reports], } - await self.send_event( - {"type": DashboardEventType.ADD_REPORTS}, report_data - ) + await self.send_event(report_event) async def on_event( self, event_type: Literal["save", "delete"], key: DatabaseEventKey, value: Any @@ -179,21 +158,23 @@ async def on_event( if event_type == "save": # don't send observations events, the overview page doesn't use # observations. + event: DashboardEventT if key is DatabaseEventKey.REPORT: assert isinstance(value, Report) - data: Any = { + event = { + "type": DashboardEventType.ADD_REPORTS, "nodeid": value.nodeid, "worker_uuid": value.worker_uuid, - "reports": [report_for_websocket(value)], + "reports": [dashboard_report(value)], } - await self.send_event({"type": DashboardEventType.ADD_REPORTS}, data) + await self.send_event(event) if key is DatabaseEventKey.FAILURE: assert isinstance(value, Observation) - data = { - "nodeid": value.property, - "failure": value, + event = { + "type": DashboardEventType.SET_FAILURE, + "failure": dashboard_observation(value), } - await self.send_event({"type": DashboardEventType.SET_FAILURE}, data) + await self.send_event(event) class TestWebsocket(HypofuzzWebsocket): @@ -204,52 +185,86 @@ def __init__(self, websocket: WebSocket, nodeid: str) -> None: async def initial(self, tests: dict[str, Test]) -> None: test = tests[self.nodeid] # send the test first - test_data = { - "database_key": test.database_key, - "nodeid": test.nodeid, - "failure": test.failure, + test_data: AddTestsEvent = { + "type": DashboardEventType.ADD_TESTS, + "tests": [ + { + "database_key": test.database_key, + "nodeid": test.nodeid, + "failure": test.failure, + } + ], } - await self.send_event({"type": DashboardEventType.ADD_TESTS}, [test_data]) + await self.send_event(test_data) # then its reports. Note we don't currently downsample with _sample_reports # on individual test pages, unlike the overview page. for worker_uuid, reports in test.reports_by_worker.items(): - report_data = { + report_event: AddReportsEvent = { + "type": DashboardEventType.ADD_REPORTS, "nodeid": test.nodeid, "worker_uuid": worker_uuid, - "reports": [report_for_websocket(report) for report in reports], + "reports": [dashboard_report(report) for report in reports], } - await self.send_event({"type": DashboardEventType.ADD_REPORTS}, report_data) - - await self.send_event( - {"type": DashboardEventType.ADD_ROLLING_OBSERVATIONS}, - {"nodeid": self.nodeid, "observations": test.rolling_observations}, - ) - await self.send_event( - {"type": DashboardEventType.ADD_CORPUS_OBSERVATIONS}, - {"nodeid": self.nodeid, "observations": test.corpus_observations}, - ) + await self.send_event(report_event) + + # then its observations, sending the observations (without their repr) + # first, followed by the repr of each observation. + for obs_type, observations in [ + ("rolling", test.rolling_observations), + ("corpus", test.corpus_observations), + ]: + await self.send_event( + { + "type": DashboardEventType.ADD_OBSERVATIONS, + "nodeid": self.nodeid, + "observation_type": obs_type, # type: ignore + "observations": [ + dashboard_observation(obs) for obs in observations + ], + }, + ) + + for obs_type, observations in [ + ("rolling", test.rolling_observations), + ("corpus", test.corpus_observations), + ]: + await self.send_event( + { + "type": DashboardEventType.SET_OBSERVATION_REPRS, + "nodeid": self.nodeid, + "observation_type": obs_type, # type: ignore + "representations": [ + { + # using run_start as the observation primary key + "run_start": obs.run_start, + "repr": obs.representation, + } + for obs in observations + ], + }, + ) async def on_event( self, event_type: Literal["save", "delete"], key: DatabaseEventKey, value: Any ) -> None: if event_type == "save": + event: DashboardEventT if key is DatabaseEventKey.REPORT: assert isinstance(value, Report) nodeid = value.nodeid - value = { + event = { + "type": DashboardEventType.ADD_REPORTS, "nodeid": nodeid, "worker_uuid": value.worker_uuid, - "reports": [report_for_websocket(value)], + "reports": [dashboard_report(value)], } - dashboard_event = DashboardEventType.ADD_REPORTS elif key is DatabaseEventKey.FAILURE: assert isinstance(value, Observation) nodeid = value.property - dashboard_event = DashboardEventType.SET_FAILURE - value = { - "nodeid": nodeid, - "failure": value, + event = { + "type": DashboardEventType.SET_FAILURE, + "failure": dashboard_observation(value), } elif key in [ DatabaseEventKey.ROLLING_OBSERVATION, @@ -257,13 +272,15 @@ async def on_event( ]: assert isinstance(value, Observation) nodeid = value.property - dashboard_event = { - DatabaseEventKey.ROLLING_OBSERVATION: DashboardEventType.ADD_ROLLING_OBSERVATIONS, - DatabaseEventKey.CORPUS_OBSERVATION: DashboardEventType.ADD_CORPUS_OBSERVATIONS, - }[key] - value = { + event = { + "type": DashboardEventType.ADD_OBSERVATIONS, "nodeid": nodeid, - "observations": [value], + "observation_type": ( + "rolling" + if key is DatabaseEventKey.ROLLING_OBSERVATION + else "corpus" + ), + "observations": [dashboard_observation(value)], } else: # assert so I don't forget a case @@ -277,7 +294,7 @@ async def on_event( if nodeid != self.nodeid: return - await self.send_event({"type": dashboard_event}, value) + await self.send_event(event) async def websocket(websocket: WebSocket) -> None: @@ -376,7 +393,7 @@ async def api_backing_state_tests(request: Request) -> Response: "nodeid": test.nodeid, "failure": test.failure, "reports_by_worker": { - worker_uuid: [report_for_websocket(report) for report in reports] + worker_uuid: [dashboard_report(report) for report in reports] for worker_uuid, reports in test.reports_by_worker.items() }, } diff --git a/src/hypofuzz/dashboard/models.py b/src/hypofuzz/dashboard/models.py new file mode 100644 index 00000000..54ced898 --- /dev/null +++ b/src/hypofuzz/dashboard/models.py @@ -0,0 +1,126 @@ +from enum import IntEnum +from typing import Any, Literal, Optional, TypedDict, Union + +from hypofuzz.database import ( + Observation, + ObservationStatus, + Phase, + Report, + StatusCounts, +) + + +class DashboardObservation(TypedDict): + type: str + status: ObservationStatus + status_reason: str + representation: None + arguments: dict[str, Any] + how_generated: str + features: dict[str, Any] + timing: dict[str, Any] + metadata: dict[str, Any] + property: str + run_start: float + + +def dashboard_observation(observation: Observation) -> DashboardObservation: + return { + "type": observation.type, + "status": observation.status, + "status_reason": observation.status_reason, + # explicitly dropping representation + "representation": None, + "arguments": observation.arguments, + "how_generated": observation.how_generated, + "features": observation.features, + "timing": observation.timing, + "metadata": observation.metadata, + "property": observation.property, + "run_start": observation.run_start, + } + + +class DashboardReport(TypedDict): + elapsed_time: float + status_counts: StatusCounts + behaviors: int + fingerprints: int + timestamp: float + since_new_branch: Optional[int] + phase: Phase + + +def dashboard_report(report: Report) -> DashboardReport: + return { + "elapsed_time": report.elapsed_time, + "status_counts": report.status_counts, + "behaviors": report.behaviors, + "fingerprints": report.fingerprints, + "timestamp": report.timestamp, + "since_new_branch": report.since_new_branch, + "phase": report.phase, + } + + +# keep in sync with DashboardEventType in DataProvider.tsx +class DashboardEventType(IntEnum): + # minimize header frame overhead with a shared IntEnum definition between + # python and ts. + ADD_TESTS = 1 + ADD_REPORTS = 2 + ADD_OBSERVATIONS = 3 + SET_OBSERVATION_REPRS = 4 + SET_FAILURE = 5 + + +ObservationType = Literal["rolling", "corpus"] + + +# keep in sync with TestsAction in DataProvider.tsx +class AddTestsTest(TypedDict): + database_key: str + nodeid: str + failure: Optional[Observation] + + +class AddTestsEvent(TypedDict): + type: Literal[DashboardEventType.ADD_TESTS] + tests: list[AddTestsTest] + + +class AddReportsEvent(TypedDict): + type: Literal[DashboardEventType.ADD_REPORTS] + nodeid: str + worker_uuid: str + reports: list[DashboardReport] + + +class AddObservationsEvent(TypedDict): + type: Literal[DashboardEventType.ADD_OBSERVATIONS] + nodeid: str + observation_type: ObservationType + observations: list[DashboardObservation] + + +class SetObservationReprsEvent(TypedDict): + type: Literal[DashboardEventType.SET_OBSERVATION_REPRS] + nodeid: str + # TODO we might want to make this names shorter (obs_type / reprs) for json + # size + observation_type: ObservationType + representations: list[dict[str, Any]] + + +class SetFailureEvent(TypedDict): + type: Literal[DashboardEventType.SET_FAILURE] + failure: DashboardObservation + + +DashboardEventT = Union[ + AddTestsEvent, + AddReportsEvent, + AddObservationsEvent, + SetObservationReprsEvent, + SetFailureEvent, +] diff --git a/src/hypofuzz/frontend/src/context/DataProvider.tsx b/src/hypofuzz/frontend/src/context/DataProvider.tsx index 54eaa277..18e986b9 100644 --- a/src/hypofuzz/frontend/src/context/DataProvider.tsx +++ b/src/hypofuzz/frontend/src/context/DataProvider.tsx @@ -22,8 +22,8 @@ interface DataProviderProps { enum DashboardEventType { ADD_TESTS = 1, ADD_REPORTS = 2, - ADD_ROLLING_OBSERVATIONS = 3, - ADD_CORPUS_OBSERVATIONS = 4, + ADD_OBSERVATIONS = 3, + SET_OBSERVATION_REPRS = 4, SET_FAILURE = 5, } @@ -44,14 +44,16 @@ type TestsAction = } | { type: DashboardEventType.SET_FAILURE; failure: Observation } | { - type: DashboardEventType.ADD_ROLLING_OBSERVATIONS + type: DashboardEventType.ADD_OBSERVATIONS nodeid: string + observation_type: "rolling" | "corpus" observations: Observation[] } | { - type: DashboardEventType.ADD_CORPUS_OBSERVATIONS + type: DashboardEventType.SET_OBSERVATION_REPRS nodeid: string - observations: Observation[] + observation_type: "rolling" | "corpus" + representations: { run_start: number; representation: string }[] } function testsReducer( @@ -97,23 +99,35 @@ function testsReducer( return newState } - case DashboardEventType.ADD_ROLLING_OBSERVATIONS: { - const { nodeid, observations } = action + case DashboardEventType.ADD_OBSERVATIONS: { + const { nodeid, observation_type, observations } = action const test = getOrCreateTest(nodeid) - test.rolling_observations.push(...observations) - // keep only the most recent 300 rolling observations, by run_start - // - // this is a good candidate for a proper nlogn SortedList - test.rolling_observations = test.rolling_observations - .sortKey(observation => observation.run_start) - .slice(-300) + if (observation_type === "rolling") { + test.rolling_observations.push(...observations) + // keep only the most recent 300 rolling observations, by run_start + // + // this is a good candidate for a proper nlogn SortedList + test.rolling_observations = test.rolling_observations + .sortKey(observation => observation.run_start) + .slice(-300) + } else { + console.assert(observation_type === "corpus") + test.corpus_observations.push(...observations) + } return newState } - case DashboardEventType.ADD_CORPUS_OBSERVATIONS: { - const { nodeid, observations } = action + case DashboardEventType.SET_OBSERVATION_REPRS: { + const { nodeid, observation_type, representations } = action const test = getOrCreateTest(nodeid) - test.corpus_observations.push(...observations) + const observations = + observation_type === "rolling" + ? test.rolling_observations + : test.corpus_observations + for (const { run_start, representation } of representations) { + const observation = observations.find(o => o.run_start === run_start)! + observation.representation = representation + } return newState } @@ -193,13 +207,15 @@ export function DataProvider({ children }: DataProviderProps) { .then(data => { for (const [nodeid, test] of Object.entries(data)) { dispatch({ - type: DashboardEventType.ADD_ROLLING_OBSERVATIONS, + type: DashboardEventType.ADD_OBSERVATIONS, nodeid: nodeid, + observation_type: "rolling", observations: test.rolling.map(Observation.fromJson), }) dispatch({ - type: DashboardEventType.ADD_CORPUS_OBSERVATIONS, + type: DashboardEventType.ADD_OBSERVATIONS, nodeid: nodeid, + observation_type: "corpus", observations: test.corpus.map(Observation.fromJson), }) } @@ -218,17 +234,10 @@ export function DataProvider({ children }: DataProviderProps) { const ws = new WebSocket(url) ws.onmessage = event => { - // split the message into the header and the body. The format is an extremely simple pipe separator. + const data = JSON.parse(event.data) - // note: data.split("|", 2) is incorrect, as it drops everything after the second pipe, unlike python's split - const pipeIndex = event.data.indexOf("|") - let header = event.data.slice(0, pipeIndex) - let data = event.data.slice(pipeIndex + 1) - header = JSON.parse(header) - - switch (Number(header.type)) { + switch (Number(data.type)) { case DashboardEventType.ADD_TESTS: { - data = JSON.parse(data) dispatch({ type: DashboardEventType.ADD_TESTS, tests: data.tests.map((test: any) => ({ @@ -241,7 +250,6 @@ export function DataProvider({ children }: DataProviderProps) { } case DashboardEventType.ADD_REPORTS: { - data = JSON.parse(data) dispatch({ type: DashboardEventType.ADD_REPORTS, nodeid: data.nodeid, @@ -253,28 +261,31 @@ export function DataProvider({ children }: DataProviderProps) { break } - case DashboardEventType.ADD_CORPUS_OBSERVATIONS: { - data = JSON.parse(data) + case DashboardEventType.ADD_OBSERVATIONS: { dispatch({ - type: DashboardEventType.ADD_CORPUS_OBSERVATIONS, + type: DashboardEventType.ADD_OBSERVATIONS, nodeid: data.nodeid, + observation_type: data.observation_type, observations: data.observations.map(Observation.fromJson), }) break } - case DashboardEventType.ADD_ROLLING_OBSERVATIONS: { - data = JSON.parse(data) + case DashboardEventType.SET_OBSERVATION_REPRS: { dispatch({ - type: DashboardEventType.ADD_ROLLING_OBSERVATIONS, + type: DashboardEventType.SET_OBSERVATION_REPRS, nodeid: data.nodeid, - observations: data.observations.map(Observation.fromJson), + observation_type: data.observation_type, + representations: data.representations.map((r: any) => ({ + run_start: Number(r.run_start), + representation: r.repr, + })), }) break } default: - throw new Error(`Unknown event type: ${header.type}`) + throw new Error(`Unknown event type: ${data.type}`) } } diff --git a/src/hypofuzz/frontend/src/tyche/Representation.tsx b/src/hypofuzz/frontend/src/tyche/Representation.tsx index 80ebc581..daebb3b7 100644 --- a/src/hypofuzz/frontend/src/tyche/Representation.tsx +++ b/src/hypofuzz/frontend/src/tyche/Representation.tsx @@ -52,6 +52,9 @@ export function Representation({ observations, observationType }: Props) { const rawRepresentations = new Map() observations.forEach(observation => { const repr = observation.representation + if (repr === null) { + return + } rawRepresentations.set(repr, (rawRepresentations.get(repr) || 0) + 1) }) diff --git a/src/hypofuzz/frontend/src/tyche/Samples.tsx b/src/hypofuzz/frontend/src/tyche/Samples.tsx index 1fa36293..ce86befb 100644 --- a/src/hypofuzz/frontend/src/tyche/Samples.tsx +++ b/src/hypofuzz/frontend/src/tyche/Samples.tsx @@ -15,16 +15,25 @@ export function Samples({ observations }: { observations: Observation[] }) { // unique observation for that representation. const uniqueReprIndex = new Map() for (const [index, observation] of observations.entries()) { + if (observation.representation === null) { + continue + } uniqueReprIndex.set(observation.representation, index) } function isUnique(observation: Observation) { + if (observation.representation === null) { + return false + } return ( observations.indexOf(observation) === uniqueReprIndex.get(observation.representation) ) } function isDuplicate(observation: Observation) { + if (observation.representation === null) { + return false + } return ( observations.indexOf(observation) !== uniqueReprIndex.get(observation.representation) diff --git a/src/hypofuzz/frontend/src/types/dashboard.ts b/src/hypofuzz/frontend/src/types/dashboard.ts index 70fed0d2..e3d6b3b6 100644 --- a/src/hypofuzz/frontend/src/types/dashboard.ts +++ b/src/hypofuzz/frontend/src/types/dashboard.ts @@ -162,7 +162,7 @@ export class Observation extends Dataclass { public type: string, public status: ObservationStatus, public status_reason: string, - public representation: string, + public representation: string | null, // arguments is a reserved keyword in javascript public arguments_: Map, public how_generated: string, @@ -187,7 +187,7 @@ export class Observation extends Dataclass { new Map(Object.entries(data.timing)), new Map(Object.entries(data.metadata)), data.property, - data.run_start, + Number(data.run_start), ) } } diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index 3a5efce0..e41a8fb3 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -478,6 +478,8 @@ def callback(test_case: dict) -> None: # re-use per FuzzProcess. Overwrite with the current timestamp for use # in sorting observations. This is not perfectly reliable in a # distributed setting, but is good enough. + # + # We also re-use this as the primary key for this observation. test_case["run_start"] = time.time() # "arguments" duplicates part of the call repr in "representation". # We don't use this for anything, so drop it.