From f3cc5f38b1a66546487b03cedc7d80d1340ebf0e Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Mon, 16 Jun 2025 01:16:55 -0400 Subject: [PATCH 1/7] write and use distribute_nodes --- src/hypofuzz/bayes.py | 58 ++++++++++++++++++++++++++++++++++++++ src/hypofuzz/entrypoint.py | 25 ++++++++-------- tests/test_bayes.py | 33 ++++++++++++++++++++++ 3 files changed, 104 insertions(+), 12 deletions(-) create mode 100644 src/hypofuzz/bayes.py create mode 100644 tests/test_bayes.py diff --git a/src/hypofuzz/bayes.py b/src/hypofuzz/bayes.py new file mode 100644 index 00000000..1b139cea --- /dev/null +++ b/src/hypofuzz/bayes.py @@ -0,0 +1,58 @@ +from collections.abc import Sequence +from typing import Any + + +def distribute_nodes( + nodeids: Sequence[str], estimators: Sequence[float], *, n: int +) -> tuple[tuple[str, ...], ...]: + # We have x nodes node_i, each with an estimator \hat{v}_i for "behaviors per + # second". We have n bins (processes), and we want to distribute node_i + # into the n bins such that we maximize overall behaviors per second. + # + # If our estimators \hat{v}_i were static, then this is trivial: the + # overall behaviors per second is `sum from i=1 to c of max(p_i)`. Of course, + # our estimators are *not* static. Which means we are optimizing over something + # more complicated - a set of multi-armed bandit problems perhaps? + # + # A more naive quantity to maximize (minimize) is the largest bin sum. + # So we are minimizing `max(sum(p_i) for i in c)`. This is related to + # "makespan minimization", and is a classic bin packing problem. Optimality + # is NP complete, so we approximate the optimal solution with a greedy one, + # specifically "longest processing time first scheduling". + # + # (intuitively, we assign the "best" nodes to processes first. So with + # e.g. 10 processes with estimator of \hat{v}_i = 1 behavior per second (which + # is really good!) they would all go to different processes (at least until + # the cap of n processes), which is what we want. + + assert len(nodeids) == len(estimators) + # estimators of 0 are mathematically valid, but semantically weird, and much + # more likely to be indicative of a logic error + assert all(estimator > 0 for estimator in estimators) + + bins: list[list[Any]] = [[] for _ in range(n)] + nodes = [ + {"nodeid": nodeid, "estimator": estimator} + for nodeid, estimator in zip(nodeids, estimators) + ] + + # first, we sort the node_i in decreasing order by their estimator. + nodes.sort(key=lambda node: node["estimator"], reverse=True) # type: ignore + + # If we have fewer than `n` nodes, we repeat the list of nodes in decreasing + # order of their estimator until we reach `n` nodes. This ensures every + # processes receives a node (in fact, in this case, exactly one). + node_idx = 0 + while len(nodes) < n: + nodes.append(nodes[node_idx]) + node_idx = (node_idx + 1) % len(nodes) + + # then, we assign each node_i to the partition with the least sum. + for node in nodes: + smallest_bin = min( + bins, + key=lambda bin: sum(node["estimator"] for node in bin), + ) + smallest_bin.append(node) + + return tuple(tuple(node["nodeid"] for node in bin) for bin in bins) diff --git a/src/hypofuzz/entrypoint.py b/src/hypofuzz/entrypoint.py index 703445c5..77bf7773 100644 --- a/src/hypofuzz/entrypoint.py +++ b/src/hypofuzz/entrypoint.py @@ -10,6 +10,8 @@ import psutil from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS +from hypofuzz.bayes import distribute_nodes + AVAILABLE_PROVIDERS["hypofuzz"] = "hypofuzz.provider.HypofuzzProvider" @@ -155,24 +157,23 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: f"test{tests_s}{skipped_msg}" ) + nodeids = [t.nodeid for t in tests] + # TODO actually save/load estimator state + estimators = [1.0 for _ in tests] if n_processes <= 1: - _fuzz(pytest_args=pytest_args, nodeids=[t.nodeid for t in tests]) + _fuzz(pytest_args=pytest_args, nodeids=nodeids) else: - processes: list[Process] = [] - for i in range(n_processes): - # Round-robin for large test suites; all-on-all for tiny, etc. - nodeids: set[str] = set() - for ix in range(n_processes): - nodeids.update(t.nodeid for t in tests[i + ix :: n_processes]) - if len(nodeids) >= 10: # enough to prioritize between - break - - p = Process( + partitions = distribute_nodes(nodeids, estimators, n=n_processes) + processes = [ + Process( target=_fuzz, kwargs={"pytest_args": pytest_args, "nodeids": nodeids}, ) + for nodeids in partitions + ] + for p in processes: p.start() - processes.append(p) + for p in processes: p.join() diff --git a/tests/test_bayes.py b/tests/test_bayes.py new file mode 100644 index 00000000..5683bc64 --- /dev/null +++ b/tests/test_bayes.py @@ -0,0 +1,33 @@ +from hypothesis import given, strategies as st +import itertools + +from hypofuzz.bayes import distribute_nodes + + +@given( + st.lists( + st.tuples( + st.integers(0, 1000).map(lambda n: f"node{n}"), + st.floats(min_value=0, allow_infinity=False, exclude_min=True), + ), + min_size=1, + ), + st.integers(1, 100), +) +def test_distribute_nodes(items, n): + nodeids = [item[0] for item in items] + estimators = [item[1] for item in items] + partitions = distribute_nodes(nodeids, estimators, n=n) + + total_nodes = sum(len(partition) for partition in partitions) + assert total_nodes == n if len(nodeids) < n else len(nodeids) + assert all(len(partition) > 0 for partition in partitions) + + +@given(st.integers(4, 100)) +def test_distribute_nodes_more_processes_than_nodes(n): + # when there are more processes than nodes, we should see the highest value + # nodes getting assigned to processes first. + assert distribute_nodes(["1", "2", "3"], [1, 3, 2], n=n) == tuple( + itertools.islice(itertools.cycle([("2",), ("3",), ("1",)]), n) + ) From 65c5a2c6bcacf37f295652b56aa6495c2064eb00 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Mon, 16 Jun 2025 01:19:29 -0400 Subject: [PATCH 2/7] format --- src/hypofuzz/bayes.py | 2 +- tests/test_bayes.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/hypofuzz/bayes.py b/src/hypofuzz/bayes.py index 1b139cea..2483b5a1 100644 --- a/src/hypofuzz/bayes.py +++ b/src/hypofuzz/bayes.py @@ -21,7 +21,7 @@ def distribute_nodes( # specifically "longest processing time first scheduling". # # (intuitively, we assign the "best" nodes to processes first. So with - # e.g. 10 processes with estimator of \hat{v}_i = 1 behavior per second (which + # e.g. 10 nodes with an estimator of \hat{v}_i = 1 behavior per second (which # is really good!) they would all go to different processes (at least until # the cap of n processes), which is what we want. diff --git a/tests/test_bayes.py b/tests/test_bayes.py index 5683bc64..6421fa33 100644 --- a/tests/test_bayes.py +++ b/tests/test_bayes.py @@ -1,6 +1,7 @@ -from hypothesis import given, strategies as st import itertools +from hypothesis import given, strategies as st + from hypofuzz.bayes import distribute_nodes From 7604805b6327c623cba55019b9fc67cb27f78794 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Tue, 17 Jun 2025 04:49:20 -0400 Subject: [PATCH 3/7] new worker hub structure --- src/hypofuzz/entrypoint.py | 28 +++--- src/hypofuzz/hypofuzz.py | 174 ++++++++++++++++++++++++++++++++++--- 2 files changed, 171 insertions(+), 31 deletions(-) diff --git a/src/hypofuzz/entrypoint.py b/src/hypofuzz/entrypoint.py index 77bf7773..2e77e504 100644 --- a/src/hypofuzz/entrypoint.py +++ b/src/hypofuzz/entrypoint.py @@ -10,7 +10,7 @@ import psutil from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS -from hypofuzz.bayes import distribute_nodes +from hypofuzz.hypofuzz import FuzzWorkerHub AVAILABLE_PROVIDERS["hypofuzz"] = "hypofuzz.provider.HypofuzzProvider" @@ -134,7 +134,7 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: ) from hypofuzz.collection import collect_tests - from hypofuzz.hypofuzz import _fuzz + from hypofuzz.hypofuzz import _start_worker # With our arguments validated, it's time to actually do the work. collection = collect_tests(pytest_args) @@ -158,23 +158,15 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: ) nodeids = [t.nodeid for t in tests] - # TODO actually save/load estimator state - estimators = [1.0 for _ in tests] if n_processes <= 1: - _fuzz(pytest_args=pytest_args, nodeids=nodeids) + # if we only have one process, skip the FuzzWorkerHub abstraction (which + # would cost a process) and just start a FuzzWorker with constant node_ids + shared_state = {"hub_state": {"nodeids": nodeids}, "worker_state": {}} + _start_worker(pytest_args=pytest_args, shared_state=shared_state) else: - partitions = distribute_nodes(nodeids, estimators, n=n_processes) - processes = [ - Process( - target=_fuzz, - kwargs={"pytest_args": pytest_args, "nodeids": nodeids}, - ) - for nodeids in partitions - ] - for p in processes: - p.start() - - for p in processes: - p.join() + hub = FuzzWorkerHub( + nodeids=nodeids, pytest_args=pytest_args, n_processes=n_processes + ) + hub.start() print("Found a failing input for every test!", file=sys.stderr) diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index 319da213..a23b8e4a 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -2,10 +2,12 @@ import contextlib import math +import time from collections import defaultdict -from collections.abc import Callable +from collections.abc import Callable, Mapping, Sequence from contextlib import nullcontext from functools import partial +from multiprocessing import Manager, Process from random import Random from typing import Any, Literal, Optional, Union @@ -36,7 +38,7 @@ get_signature, ) -from hypofuzz.bayes import behaviors_per_second, softmax +from hypofuzz.bayes import behaviors_per_second, distribute_nodes, softmax from hypofuzz.collection import collect_tests from hypofuzz.corpus import ( get_shrinker, @@ -256,19 +258,57 @@ def has_found_failure(self) -> bool: return corpus is not None and bool(corpus.interesting_examples) -class FuzzProcess: +class FuzzWorker: """ - Manages switching between several FuzzTargets, and managing their associated - higher-level state, like setting up and tearing down pytest fixtures. + Manages switching between several FuzzTargets, and also manages their + associated higher-level state, like setting up and tearing down pytest + fixtures. """ - def __init__(self, targets: list[FuzzTarget]) -> None: - self.random = Random() - self.targets = targets + def __init__( + self, + *, + pytest_args: Sequence[str], + shared_state: Mapping, + ) -> None: + self.pytest_args = pytest_args + self.shared_state = shared_state + self.random = Random() + # the current pool of node ids this process has available to fuzz. This + # might be adjusted by FuzzWorkerHub via `shared_state` as estimators + # update and nodeids are rebalanced across workers. + self.nodeids: Sequence[str] = [] + # The list of all collected fuzz targets. We collect this at the beginning + # by running a pytest collection step. + # + # This is never modified or copied from after the initial collection. + # When we need an actual target to fuzz, we create a new FuzzTarget + # instance to put into self.targets. + self.collected_fuzz_targets: list[FuzzTarget] = [] + # the current pool of active targets this worker can fuzz immediately. + # This is the subset of `nodeids` which this worker has chosen to start + # up. + self.targets: list[FuzzTarget] = [] self.event_dispatch: dict[bytes, list[FuzzTarget]] = defaultdict(list) - for target in targets: - self.event_dispatch[target.database_key].append(target) + + def add_target(self, nodeid: str) -> None: + targets = [t for t in self.collected_fuzz_targets if t.nodeid == nodeid] + assert len(targets) == 1 + target = targets[0] + + # create a new FuzzTarget to put into self.targets, to avoid modifying + # self.collected_fuzz_targets at all + target = FuzzTarget( + test_fn=target._test_fn, + stuff=target._stuff, + database=target.database, + database_key=target.database_key, + wrapped_test=target.wrapped_test, + pytest_item=target.pytest_item, + ) + self.targets.append(target) + self.event_dispatch[target.database_key].append(target) def on_event(self, listener_event: ListenerEventT) -> None: event = DatabaseEvent.from_event(listener_event) @@ -283,10 +323,38 @@ def valid_targets(self) -> list[FuzzTarget]: # the targets we actually want to run/fuzz return [t for t in self.targets if not t.has_found_failure] + def _maybe_add_targets(self) -> None: + # consider whether it's worthwhile to add more targets + active_nodeids = {target.nodeid for target in self.targets} + candidates = [nodeid for nodeid in self.nodeids if nodeid not in active_nodeids] + # TODO actually defer starting up targets here, based on worker lifetime + # and startup cost estimators here + for nodeid in candidates: + self.add_target(nodeid) + + def _update_targets(self, nodeids: Sequence[str]) -> None: + # Update our nodeids and targets with new directives from the hub. + # * Nodes in both nodeids and self.targets are kept as-is + # * Nodes in nodeids but not self.targets are added to our available + # nodeids, to potentially be added as targets later (by _maybe_add_targets) + # * Nodes in self.targets but not nodeids are evicted from our targets. + # These are nodes that the hub has decided are better to hand off to + # another process. + for target in self.targets.copy(): + if target.nodeid not in nodeids: + self.targets.remove(target) + self.event_dispatch[target.database_key].remove(target) + + self.nodeids = nodeids + def start(self) -> None: + self.collected_fuzz_targets = collect_tests(self.pytest_args).fuzz_targets settings().database.add_listener(self.on_event) while True: + self._update_targets(self.shared_state["hub_state"]["nodeids"]) + self._maybe_add_targets() + if not self.valid_targets: break @@ -314,13 +382,93 @@ def start(self) -> None: for _ in range(100): target.run_one() + # give the hub with up-to-date estimator state + self.shared_state["worker_state"][target.nodeid] = { + "behaviors_per_second": behaviors_per_second(target), + } + -def _fuzz(pytest_args: tuple[str, ...], nodeids: list[str]) -> None: +class FuzzWorkerHub: + def __init__( + self, + *, + nodeids: Sequence[str], + pytest_args: Sequence[str], + n_processes: int, + ) -> None: + self.nodeids = nodeids + self.pytest_args = pytest_args + self.n_processes = n_processes + + self.shared_states: list[Mapping] = [] + + def start(self) -> None: + processes: list[Process] = [] + + with Manager() as manager: + for _ in range(self.n_processes): + shared_state = manager.dict() + shared_state["hub_state"] = manager.dict() + shared_state["worker_state"] = manager.dict() + + process = Process( + target=_start_worker, + kwargs={ + "pytest_args": self.pytest_args, + "shared_state": shared_state, + }, + ) + processes.append(process) + self.shared_states.append(shared_state) + + # rebalance once at the start to put the initial node assignments + # in the shared state + self._rebalance() + for process in processes: + process.start() + + while True: + # rebalance automatically on an interval. + # We may want to check some condition more frequently than this, + # like "a process has no more nodes" (due to e.g. finding a + # failure). So we rebalance either once every n seconds, or whenever + # some worker needs a rebalancing. + time.sleep(60) + # if all our workers have exited, we should exit as well + if all(not process.is_alive() for process in processes): + break + + self._rebalance() + + def _rebalance(self) -> None: + # rebalance the assignment of nodeids to workers, according to the + # up-to-date estimators from our workers. + + # TODO actually read/use new estimator state + # # nodeid: estimator + # estimators = {} + + # for shared_state in self.shared_states: + # # if multiple workers have the same node, we'll use the estimator + # # from whichever has been running longer. + # pass + + # TODO actually save/load estimator state + estimators = [1.0 for _ in self.nodeids] + partitions = distribute_nodes(self.nodeids, estimators, n=self.n_processes) + + for shared_state, nodeids in zip(self.shared_states, partitions): + shared_state["hub_state"]["nodeids"] = nodeids + + +def _start_worker( + pytest_args: Sequence[str], + shared_state: Mapping, +) -> None: """Collect and fuzz tests. Designed to be used inside a multiprocessing.Process started with the spawn() method - requires picklable arguments but works on Windows too. """ - tests = [t for t in collect_tests(pytest_args).fuzz_targets if t.nodeid in nodeids] - process = FuzzProcess(tests) + process = FuzzWorker(pytest_args=pytest_args, shared_state=shared_state) process.start() From 0fbe9565dfa75230e7bd6a16df9639dd5d3587d1 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Wed, 18 Jun 2025 05:31:38 -0400 Subject: [PATCH 4/7] more work on estimators --- src/hypofuzz/bayes.py | 260 +++++++++++++++++++++++++++++-------- src/hypofuzz/entrypoint.py | 5 +- src/hypofuzz/hypofuzz.py | 126 ++++++++++++++---- tests/test_bayes.py | 80 ++++++++++-- 4 files changed, 374 insertions(+), 97 deletions(-) diff --git a/src/hypofuzz/bayes.py b/src/hypofuzz/bayes.py index 3bc7ba20..0c2a2494 100644 --- a/src/hypofuzz/bayes.py +++ b/src/hypofuzz/bayes.py @@ -1,90 +1,224 @@ import math from collections.abc import Sequence -from typing import TYPE_CHECKING, Any +from dataclasses import dataclass +from itertools import takewhile +from random import Random +from typing import TYPE_CHECKING, Any, Optional, TypeVar if TYPE_CHECKING: from hypofuzz.hypofuzz import FuzzTarget +T = TypeVar("T") + +# in the absence of any knowledge about worker lifetimes, assume a worker lives +# for 5 minutes. +DEFAULT_EXPECTED_LIFETIME_ESTIMATOR = 60 * 5 + + +@dataclass +class CurrentWorker: + nodeids: Sequence[str] + e_lifetime: float + + +@dataclass +class DistributeNodesTarget: + nodeid: str + rates: "BehaviorRates" + e_startup_time: float + + +@dataclass +class BehaviorRates: + # An estimator for the number of behaviors the next input will discover. This + # will be between 0 and 1. + per_input: float + # An estimator for the number of behaviors discovered per second for a target, + # assuming one worker is fuzzing this target continuously over that second. + per_second: float + + +def _min_values(values: Sequence[T], key: Any) -> Sequence[T]: + candidates = sorted( + [(value, key(value)) for value in values], key=lambda item: item[1] + ) + min_value = candidates[0][1] + return [ + item[0] for item in takewhile(lambda item: item[1] == min_value, candidates) + ] + def distribute_nodes( - nodeids: Sequence[str], estimators: Sequence[float], *, n: int + targets: Sequence[DistributeNodesTarget], + *, + n: int, + current_workers: Optional[Sequence[CurrentWorker]] = None, ) -> tuple[tuple[str, ...], ...]: # We have x nodes node_i, each with an estimator \hat{v}_i for "behaviors per - # second". We have n bins (processes), and we want to distribute node_i - # into the n bins such that we maximize overall behaviors per second. + # second". We have n bins (worker), and we want to distribute node_i + # into the n bins such that we maximize the the sum of worker_behaviors. + # + # The estimator for the number of behaviors for a worker is given by + # e_worker_behaviors. Instead of trying for the optimal solution of maximizing + # the sum of worker_behaviors, we instead maximize the smallest worker_behaviors + # quantity. # - # If our estimators \hat{v}_i were static, then this is trivial: the - # overall behaviors per second is `sum from i=1 to c of max(p_i)`. Of course, - # our estimators are *not* static. Which means we are optimizing over something - # more complicated - a set of multi-armed bandit problems perhaps? + # This is related to "makespan minimization", and is a classic bin packing + # problem. Optimality of this problem is NP complete, so we instead approximate + # the optimal solution with a greedy one, specifically a variant of + # "longest processing time first scheduling": we first sort the nodes in + # increasing order of their estimator. Then, for each mode, we check which + # worker has the lowest worker_behaiors, and assign the node to that worker. + # Since we are iterating in increasing order of estimator, we know that adding + # a node to a worker will increase that worker's worker_behaviors (unless the + # worker's scheduling algorithm for targets is literally adversarial, ie + # adding a higher-than-average value target decreases its expected behaviors + # per second, which we will assume is not the case). # - # A more naive quantity to maximize (minimize) is the largest bin sum. - # So we are minimizing `max(sum(p_i) for i in c)`. This is related to - # "makespan minimization", and is a classic bin packing problem. Optimality - # is NP complete, so we approximate the optimal solution with a greedy one, - # specifically "longest processing time first scheduling". + # Optionally, the current assignment `current_workers` of node ids to workers + # can be passed. This incorporates an overhead cost to switching a nodeid to a + # different worker. The algorithm is the standard bin packing algorithm, but + # with a penalty to a node being assigned to a worker other than its current + # worker. # - # (intuitively, we assign the "best" nodes to processes first. So with - # e.g. 10 nodes with an estimator of \hat{v}_i = 1 behavior per second (which - # is really good!) they would all go to different processes (at least until - # the cap of n processes), which is what we want. - - assert len(nodeids) == len(estimators) - # estimators of 0 are mathematically valid, but semantically weird, and much - # more likely to be indicative of a logic error - assert all(estimator > 0 for estimator in estimators) - - bins: list[list[Any]] = [[] for _ in range(n)] - nodes = [ - {"nodeid": nodeid, "estimator": estimator} - for nodeid, estimator in zip(nodeids, estimators) + # This penalty cost of switching a nodeid between workers is + # worker_behaviors_per_second * node_startup_cost_seconds, ie the number of + # behaviors we expect to lose by spending time starting up this node. + + random = Random() + if current_workers is None: + current_workers = [CurrentWorker(nodeids=[], e_lifetime=0.0) for _ in range(n)] + + assert len(current_workers) == n + + # estimators of 0 are mathematically valid, but can lead to bad/pathological + # algorithm outcomes + assert all(target.rates.per_second > 0 for target in targets) + assert all(target.rates.per_input > 0 for target in targets) + + # return partitions in the same iteration order they were passed in current_workers + workers: list[dict[str, Any]] = [ + {"current_worker": worker, "targets": []} for worker in current_workers ] - # first, we sort the node_i in decreasing order by their estimator. - nodes.sort(key=lambda node: node["estimator"], reverse=True) # type: ignore - - # If we have fewer than `n` nodes, we repeat the list of nodes in decreasing - # order of their estimator until we reach `n` nodes. This ensures every - # processes receives a node (in fact, in this case, exactly one). - node_idx = 0 - while len(nodes) < n: - nodes.append(nodes[node_idx]) - node_idx = (node_idx + 1) % len(nodes) - - # then, we assign each node_i to the partition with the least sum. - for node in nodes: - smallest_bin = min( - bins, - key=lambda bin: sum(node["estimator"] for node in bin), + # first, we sort the target in increasing order by their estimator. + targets = sorted(targets, key=lambda target: target.rates.per_second) + + # If we have fewer than `n` targets, we repeat the list of targets in decreasing + # order of their estimator until we reach `n` targets. This ensures every + # worker receives at least one target (in fact, in this case, exactly one). + target_idx = 0 + while len(targets) < n: + # `targets` are in increasing order, so we index negatively to get + # a decreasing order + targets.append(targets[-target_idx]) + target_idx = (target_idx + 1) % len(targets) + + # then, we assign each target to the worker with the worst worker_behaviors. + # Since we're iterating over the targets in increasing order of behaviors + # per-second, adding a target to a worker will always increase its + # worker_behaviors. + def worker_score( + worker: dict[str, Any], *, target: Optional[DistributeNodesTarget] = None + ) -> float: + e_lifetime: float = worker["current_worker"].e_lifetime + worker_rates = e_worker_rates( + target_rates=[target.rates for target in worker["targets"]], + ) + offset = 0.0 + if target is not None and target.nodeid not in worker["current_worker"].nodeids: + # Add a penalty for switching nodes between workers. Since the ordering + # quantity is the e_worker_behaviors estimator of lifetime worker + # behaviors, we want to allow a node to switch workers if the ev + # differential is greater than the number of behaviors we expect to + # lose from spending time starting up this worker. + # + # And the number of behaviors we expect to lose is the behaviors per + # second estimator for the worker, times the estimator for the startup + # time of this node. + # + # We are choosing the worker with the lowest score to add this node to, + # so if we want to encourage this node to be assigned to its current + # worker, we want that worker to have a low score, which means we + # want to increase the score of all other workers. So the offset here + # should be positive. + offset = worker_rates.per_second * target.e_startup_time + + # to avoid crazy rebalancing during the initial startup phase, don't + # work with small lifetime estimators + e_lifetime = max(e_lifetime, DEFAULT_EXPECTED_LIFETIME_ESTIMATOR) + return (worker_rates.per_second * e_lifetime) + offset + + for target in targets: + # find all the workers with the minimum value score, and randomly assign + # this target to one of them. Normally there won't be ties, and the target + # simply goes to the worst worker. But when fuzzing for the first time + # (or after a db wipe) where all targets have the same estimators, we + # don't want to end in an assignment where one worker is given n - 1 nodes + # and the other is given just 1. + smallest_workers = _min_values( + workers, + key=lambda worker: worker_score(worker, target=target), + ) + smallest_worker = random.choice(smallest_workers) + + score_before = worker_score(smallest_worker) + smallest_worker["targets"].append(target) + # ignore float rounding errors for our invariant check + assert worker_score(smallest_worker) - score_before >= -1e-6, ( + score_before, + worker_score(smallest_worker), ) - smallest_bin.append(node) - return tuple(tuple(node["nodeid"] for node in bin) for bin in bins) + return tuple( + tuple(target.nodeid for target in worker["targets"]) for worker in workers + ) -# for the behaviors estimators, we should incorporate a lookback across the +# TODO for the behaviors estimators, we should incorporate a lookback across the # history of workers for this test. Give higher weight to newer estimators # (proportional to their confidence ie sample size). -def behaviors_per_input(target: "FuzzTarget") -> float: - # an estimator for the number of behaviors the next input will discover. +def e_target_rates(target: "FuzzTarget") -> BehaviorRates: + # per_input computation since = target.provider.since_new_behavior - return (1 / since) if since > 0 else 1 - + per_input = (1 / since) if since > 0 else 1 -def behaviors_per_second(target: "FuzzTarget") -> float: - # an estimator for the number of behaviors discovered per second, assuming - # one process is fuzzing this target continuously over that second. - # This is a simple adjustment of behaviors_per_input for the test runtime. + # per_second computation ninputs = target.provider.ninputs elapsed_time = target.provider.elapsed_time if elapsed_time == 0: - return 1 + per_second = 1.0 + else: + inputs_per_second = ninputs / elapsed_time + per_second = per_input * inputs_per_second + + return BehaviorRates(per_input=per_input, per_second=per_second) - inputs_per_second = ninputs / elapsed_time - return behaviors_per_input(target) * inputs_per_second + +def e_worker_lifetime(current_lifetime: float) -> float: + """ + An estimator for the total lifetime of a worker. + """ + # We use the doomsday-argument estimator that the total lifetime is twice the + # current lifetime. In the future, this could incorporate past worker + # lifetimes as well. + return current_lifetime * 2 + + +def e_worker_rates(*, target_rates: Sequence[BehaviorRates]) -> BehaviorRates: + weights = bandit_weights(target_rates) + # the expected behavior rates of a worker is + # sum(probability * expected_value) for each of its targets. + # Note that this is tightly dependent on the sampling algorithm used in + # practice by the workers. If that changes (to e.g. thompson sampling), our + # estimators will need to change to use the same sampling algorithm as well. + return BehaviorRates( + per_input=sum(p * rates.per_input for p, rates in zip(weights, target_rates)), + per_second=sum(p * rates.per_second for p, rates in zip(weights, target_rates)), + ) def softmax(values: list[float]) -> list[float]: @@ -96,3 +230,15 @@ def softmax(values: list[float]) -> list[float]: total = sum(softmaxed) return [value / total for value in softmaxed] + + +def bandit_weights(behavior_rates: Sequence[BehaviorRates]) -> list[float]: + """ + Returns the probability that each target should be chosen, as a solution + to the multi-armed-bandit problem. + """ + + # choose the next target to fuzz with probability equal to the softmax + # of its expected value (behaviors per second), aka boltzmann exploration + per_second_estimators = [rates.per_second for rates in behavior_rates] + return softmax(per_second_estimators) diff --git a/src/hypofuzz/entrypoint.py b/src/hypofuzz/entrypoint.py index 2e77e504..c4481631 100644 --- a/src/hypofuzz/entrypoint.py +++ b/src/hypofuzz/entrypoint.py @@ -161,7 +161,10 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: if n_processes <= 1: # if we only have one process, skip the FuzzWorkerHub abstraction (which # would cost a process) and just start a FuzzWorker with constant node_ids - shared_state = {"hub_state": {"nodeids": nodeids}, "worker_state": {}} + shared_state = { + "hub_state": {"nodeids": nodeids}, + "worker_state": {"nodeids": {}}, + } _start_worker(pytest_args=pytest_args, shared_state=shared_state) else: hub = FuzzWorkerHub( diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index a23b8e4a..554e0539 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -38,7 +38,15 @@ get_signature, ) -from hypofuzz.bayes import behaviors_per_second, distribute_nodes, softmax +from hypofuzz.bayes import ( + BehaviorRates, + CurrentWorker, + DistributeNodesTarget, + bandit_weights, + distribute_nodes, + e_target_rates, + e_worker_lifetime, +) from hypofuzz.collection import collect_tests from hypofuzz.corpus import ( get_shrinker, @@ -290,15 +298,35 @@ def __init__( # This is the subset of `nodeids` which this worker has chosen to start # up. self.targets: list[FuzzTarget] = [] + # targets which we have previously started fuzzing, but have since been + # told to drop by the hub. We keep the fuzz target in memory because we + # might be told by the hub to pick this target up again in the future. + # + # When starting, dropping, and starting a target again, we cannot violate + # the linear reports invariant that we do not write reports from the same + # worker, on the same target, at two different fuzz campaigns for that + # target. Once a worker starts fuzzing a target, it cannot restart fuzzing + # that target from scratch without changing its uuid or wiping the previous + # campaign, neither of which are feasible. + self.dropped_targets: list[FuzzTarget] = [] self.event_dispatch: dict[bytes, list[FuzzTarget]] = defaultdict(list) def add_target(self, nodeid: str) -> None: + # if this target was previously dropped, move it from `dropped_targets` + # to `targets`, without creating a new FuzzTarget. + dropped_targets = [t for t in self.dropped_targets if t.nodeid == nodeid] + if dropped_targets: + target = dropped_targets[0] + self.targets.append(target) + self.dropped_targets.remove(target) + return + targets = [t for t in self.collected_fuzz_targets if t.nodeid == nodeid] assert len(targets) == 1 target = targets[0] # create a new FuzzTarget to put into self.targets, to avoid modifying - # self.collected_fuzz_targets at all + # collected_fuzz_targets at all target = FuzzTarget( test_fn=target._test_fn, stuff=target._stuff, @@ -343,11 +371,16 @@ def _update_targets(self, nodeids: Sequence[str]) -> None: for target in self.targets.copy(): if target.nodeid not in nodeids: self.targets.remove(target) - self.event_dispatch[target.database_key].remove(target) + self.dropped_targets.append(target) + # we intentionally do not remove our event_dispatch listener + # here, because if we are ever told to pick up this dropped target + # again in the future, we still want its corpus and failure replay + # to be up to date from other workers. self.nodeids = nodeids def start(self) -> None: + self.worker_start = time.perf_counter() self.collected_fuzz_targets = collect_tests(self.pytest_args).fuzz_targets settings().database.add_listener(self.on_event) @@ -358,10 +391,6 @@ def start(self) -> None: if not self.valid_targets: break - # choose the next target to fuzz with probability equal to the softmax - # of its estimator. aka boltzmann exploration - estimators = [behaviors_per_second(target) for target in self.valid_targets] - estimators = softmax(estimators) # softmax might return 0.0 probability for some targets if there is # a substantial gap in estimator values (e.g. behaviors_per_second=1_000 # vs behaviors_per_second=1.0). We don't expect this to happen normally, @@ -372,19 +401,27 @@ def start(self) -> None: if self.random.random() < 0.01: target = self.random.choice(self.valid_targets) else: - target = self.random.choices( - self.valid_targets, weights=estimators, k=1 - )[0] + behaviors_rates = [ + e_target_rates(target) for target in self.valid_targets + ] + weights = bandit_weights(behaviors_rates) + target = self.random.choices(self.valid_targets, weights=weights, k=1)[ + 0 + ] # TODO we should scale this n up if our estimator expects that it will # take a long time to discover a new behavior, to reduce the overhead - # of switching. + # of switching targets. for _ in range(100): target.run_one() - # give the hub with up-to-date estimator state - self.shared_state["worker_state"][target.nodeid] = { - "behaviors_per_second": behaviors_per_second(target), + # give the hub an up-to-date estimator state + current_lifetime = time.perf_counter() - self.worker_start + worker_state = self.shared_state["worker_state"] + worker_state["current_lifetime"] = current_lifetime + worker_state["expected_lifetime"] = e_worker_lifetime(current_lifetime) + worker_state["nodeids"][target.nodeid] = { + "behavior_rates": e_target_rates(target), } @@ -410,6 +447,9 @@ def start(self) -> None: shared_state = manager.dict() shared_state["hub_state"] = manager.dict() shared_state["worker_state"] = manager.dict() + shared_state["worker_state"]["nodeids"] = manager.dict() + shared_state["worker_state"]["current_lifetime"] = 0.0 + shared_state["worker_state"]["expected_lifetime"] = 0.0 process = Process( target=_start_worker, @@ -444,21 +484,51 @@ def _rebalance(self) -> None: # rebalance the assignment of nodeids to workers, according to the # up-to-date estimators from our workers. - # TODO actually read/use new estimator state - # # nodeid: estimator - # estimators = {} - - # for shared_state in self.shared_states: - # # if multiple workers have the same node, we'll use the estimator - # # from whichever has been running longer. - # pass - - # TODO actually save/load estimator state - estimators = [1.0 for _ in self.nodeids] - partitions = distribute_nodes(self.nodeids, estimators, n=self.n_processes) + assert len(self.shared_states) == self.n_processes + current_workers = [ + CurrentWorker( + nodeids=state["worker_state"]["nodeids"].keys(), + e_lifetime=state["worker_state"]["expected_lifetime"], + ) + for state in self.shared_states + ] + + # fill with default estimators, for the first-time startup + # nodeid: (worker_lifetime, rates) + targets = { + nodeid: (0.0, BehaviorRates(per_second=1.0, per_input=1.0)) + for nodeid in self.nodeids + } + for state in self.shared_states: + worker_state = state["worker_state"] + worker_lifetime = worker_state["current_lifetime"] + for nodeid, rates in worker_state["nodeids"].items(): + if nodeid not in targets: + targets[nodeid] = (worker_lifetime, rates) + + # if the nodeid already exists, but we have a better estimator + # for it, replace it + if worker_lifetime > targets[nodeid][0]: + targets[nodeid] = (worker_lifetime, rates) + + # TODO estimate startup time of this target + # ("number of corpus elements" * "average input runtime" probably?) + targets = [ + DistributeNodesTarget(nodeid=nodeid, rates=rates, e_startup_time=0) + for nodeid, (_lifetime, rates) in targets.items() + ] + partitions = distribute_nodes( + targets, + n=self.n_processes, + current_workers=current_workers, + ) - for shared_state, nodeids in zip(self.shared_states, partitions): - shared_state["hub_state"]["nodeids"] = nodeids + # communicate the worker's new nodeids back to the worker. + # + # the iteration order of the partitions returned by distribute_nodes is + # the same as the iteration order of current_workers + for state, nodeids in zip(self.shared_states, partitions): + state["hub_state"]["nodeids"] = nodeids def _start_worker( diff --git a/tests/test_bayes.py b/tests/test_bayes.py index 0490153b..69fea904 100644 --- a/tests/test_bayes.py +++ b/tests/test_bayes.py @@ -1,28 +1,39 @@ import itertools +from collections.abc import Sequence +from typing import Any import pytest from hypothesis import given, note, strategies as st -from hypofuzz.bayes import distribute_nodes, softmax +from hypofuzz.bayes import ( + BehaviorRates, + DistributeNodesTarget, + distribute_nodes, + softmax, +) @given( + st.integers(1, 100), st.lists( - st.tuples( - st.integers(0, 1000).map(lambda n: f"node{n}"), - st.floats(min_value=0, allow_infinity=False, exclude_min=True), + st.builds( + DistributeNodesTarget, + nodeid=st.integers(0, 1000).map(lambda n: f"node{n}"), + rates=st.builds( + BehaviorRates, + per_input=st.floats(min_value=0, max_value=10**10, exclude_min=True), + per_second=st.floats(min_value=0, max_value=10**10, exclude_min=True), + ), + e_startup_time=st.floats(min_value=0, max_value=10**10, exclude_min=True), ), min_size=1, ), - st.integers(1, 100), ) -def test_distribute_nodes(items, n): - nodeids = [item[0] for item in items] - estimators = [item[1] for item in items] - partitions = distribute_nodes(nodeids, estimators, n=n) +def test_distribute_nodes(n, targets): + partitions = distribute_nodes(targets, n=n, current_workers=None) total_nodes = sum(len(partition) for partition in partitions) - assert total_nodes == n if len(nodeids) < n else len(nodeids) + assert total_nodes == n if len(targets) < n else len(targets) assert all(len(partition) > 0 for partition in partitions) @@ -30,11 +41,54 @@ def test_distribute_nodes(items, n): def test_distribute_nodes_more_processes_than_nodes(n): # when there are more processes than nodes, we should see the highest value # nodes getting assigned to processes first. - assert distribute_nodes(["1", "2", "3"], [1, 3, 2], n=n) == tuple( + targets = [ + DistributeNodesTarget( + nodeid=str(i + 1), + rates=BehaviorRates(per_second=n, per_input=n), + e_startup_time=0.0, + ) + for i, n in enumerate([1, 3, 2]) + ] + assert set(distribute_nodes(targets, n=n)) == set( itertools.islice(itertools.cycle([("2",), ("3",), ("1",)]), n) ) +def _targets(*targets: Sequence[Any]) -> list[DistributeNodesTarget]: + return [ + DistributeNodesTarget( + nodeid=nodeid, + rates=BehaviorRates(per_second=per_second, per_input=per_input), + e_startup_time=startup_time, + ) + for nodeid, (per_second, per_input), startup_time in targets + ] + + +@pytest.mark.parametrize( + "targets, n, expected", + [ + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 1, + {("a", "b", "c")}, + ), + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 2, + {("a", "c"), ("b",)}, + ), + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 3, + {("a",), ("b",), ("c",)}, + ), + ], +) +def test_distribute_nodes_explicit(targets, n, expected): + assert set(distribute_nodes(targets, n=n)) == expected + + @given(st.lists(st.floats(min_value=0, allow_nan=False, allow_infinity=False))) def test_softmax(values): softmaxed = softmax(values) @@ -54,3 +108,7 @@ def test_softmax(values): if values: assert sum(softmaxed) == pytest.approx(1) + + +# TODO: test current_workers and e_startup_time, specifically that the penalty for +# a node switching workers works as expected From 1d5351b71b26fae20108a632569af1194c14cf9e Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Sat, 28 Jun 2025 01:23:11 -0400 Subject: [PATCH 5/7] resolve merge conflicts --- src/hypofuzz/entrypoint.py | 23 ++--- src/hypofuzz/hypofuzz.py | 195 +++++++++++++++++++------------------ 2 files changed, 105 insertions(+), 113 deletions(-) diff --git a/src/hypofuzz/entrypoint.py b/src/hypofuzz/entrypoint.py index de02a17e..b14adfc6 100644 --- a/src/hypofuzz/entrypoint.py +++ b/src/hypofuzz/entrypoint.py @@ -10,8 +10,6 @@ import psutil from hypothesis.internal.conjecture.providers import AVAILABLE_PROVIDERS -from hypofuzz.hypofuzz import FuzzWorkerHub - AVAILABLE_PROVIDERS["hypofuzz"] = "hypofuzz.provider.HypofuzzProvider" @@ -136,7 +134,6 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: ) from hypofuzz.collection import collect_tests - from hypofuzz.hypofuzz import _start_worker # With our arguments validated, it's time to actually do the work. collection = collect_tests(pytest_args) @@ -159,19 +156,11 @@ def _fuzz_impl(n_processes: int, pytest_args: tuple[str, ...]) -> None: f"test{tests_s}{skipped_msg}" ) - nodeids = [t.nodeid for t in tests] - if n_processes <= 1: - # if we only have one process, skip the FuzzWorkerHub abstraction (which - # would cost a process) and just start a FuzzWorker with constant node_ids - shared_state = { - "hub_state": {"nodeids": nodeids}, - "worker_state": {"nodeids": {}}, - } - _start_worker(pytest_args=pytest_args, shared_state=shared_state) - else: - hub = FuzzWorkerHub( - nodeids=nodeids, pytest_args=pytest_args, n_processes=n_processes - ) - hub.start() + hub = FuzzWorkerHub( + nodeids=[t.nodeid for t in tests], + pytest_args=pytest_args, + n_processes=n_processes, + ) + hub.start() print("Found a failing input for every test!", file=sys.stderr) diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index 6b11c477..568c2cdf 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -3,14 +3,11 @@ import contextlib import math import time -import time from collections import defaultdict from collections.abc import Callable, Mapping, Sequence -from collections.abc import Callable, Mapping, Sequence from contextlib import nullcontext from functools import partial from multiprocessing import Manager, Process -from multiprocessing import Manager, Process from random import Random from typing import Any, Literal, Optional, Union @@ -340,26 +337,13 @@ def has_found_failure(self) -> bool: return corpus is not None and bool(corpus.interesting_examples) -class FuzzWorker: class FuzzWorker: """ Manages switching between several FuzzTargets, and also manages their associated higher-level state, like setting up and tearing down pytest fixtures. - Manages switching between several FuzzTargets, and also manages their - associated higher-level state, like setting up and tearing down pytest - fixtures. """ - def __init__( - self, - *, - pytest_args: Sequence[str], - shared_state: Mapping, - ) -> None: - self.pytest_args = pytest_args - self.shared_state = shared_state - def __init__( self, *, @@ -370,21 +354,16 @@ def __init__( self.shared_state = shared_state self.random = Random() - # the current pool of node ids this process has available to fuzz. This - # might be adjusted by FuzzWorkerHub via `shared_state` as estimators - # update and nodeids are rebalanced across workers. - self.nodeids: Sequence[str] = [] # The list of all collected fuzz targets. We collect this at the beginning # by running a pytest collection step. # # This is never modified or copied from after the initial collection. # When we need an actual target to fuzz, we create a new FuzzTarget # instance to put into self.targets. - self.collected_fuzz_targets: list[FuzzTarget] = [] - # the current pool of active targets this worker can fuzz immediately. - # This is the subset of `nodeids` which this worker has chosen to start - # up. - self.targets: list[FuzzTarget] = [] + self.collected_targets: dict[str, FuzzTarget] = {} + # the current pool of targets this worker can fuzz. This might change + # based on directives from the hub. + self.targets: dict[str, FuzzTarget] = {} # targets which we have previously started fuzzing, but have since been # told to drop by the hub. We keep the fuzz target in memory because we # might be told by the hub to pick this target up again in the future. @@ -395,34 +374,33 @@ def __init__( # target. Once a worker starts fuzzing a target, it cannot restart fuzzing # that target from scratch without changing its uuid or wiping the previous # campaign, neither of which are feasible. - self.dropped_targets: list[FuzzTarget] = [] + self.dropped_targets: dict[str, FuzzTarget] = {} + + self._current_target: Optional[FuzzTarget] = None self.event_dispatch: dict[bytes, list[FuzzTarget]] = defaultdict(list) - def add_target(self, nodeid: str) -> None: + def _add_target(self, nodeid: str) -> None: # if this target was previously dropped, move it from `dropped_targets` # to `targets`, without creating a new FuzzTarget. - dropped_targets = [t for t in self.dropped_targets if t.nodeid == nodeid] - if dropped_targets: - target = dropped_targets[0] - self.targets.append(target) - self.dropped_targets.remove(target) + if nodeid in self.dropped_targets: + target = self.dropped_targets[nodeid] + self.targets[nodeid] = target + del self.dropped_targets[nodeid] return - targets = [t for t in self.collected_fuzz_targets if t.nodeid == nodeid] - assert len(targets) == 1 - target = targets[0] - + target = self.collected_targets[nodeid] # create a new FuzzTarget to put into self.targets, to avoid modifying # collected_fuzz_targets at all target = FuzzTarget( - test_fn=target._test_fn, - stuff=target._stuff, + test_fn=target.test_fn, + extra_kwargs=target.extra_kwargs, database=target.database, database_key=target.database_key, wrapped_test=target.wrapped_test, pytest_item=target.pytest_item, ) - self.targets.append(target) + assert nodeid not in self.targets + self.targets[nodeid] = target self.event_dispatch[target.database_key].append(target) def _remove_target(self, nodeid: str) -> None: @@ -451,80 +429,98 @@ def on_event(self, listener_event: ListenerEventT) -> None: @property def valid_targets(self) -> list[FuzzTarget]: # the targets we actually want to run/fuzz - return [t for t in self.targets if not t.has_found_failure] - - def _maybe_add_targets(self) -> None: - # consider whether it's worthwhile to add more targets - active_nodeids = {target.nodeid for target in self.targets} - candidates = [nodeid for nodeid in self.nodeids if nodeid not in active_nodeids] - # TODO actually defer starting up targets here, based on worker lifetime - # and startup cost estimators here - for nodeid in candidates: - self.add_target(nodeid) + return [t for t in self.targets.values() if not t.has_found_failure] def _update_targets(self, nodeids: Sequence[str]) -> None: # Update our nodeids and targets with new directives from the hub. # * Nodes in both nodeids and self.targets are kept as-is # * Nodes in nodeids but not self.targets are added to our available - # nodeids, to potentially be added as targets later (by _maybe_add_targets) + # targets # * Nodes in self.targets but not nodeids are evicted from our targets. # These are nodes that the hub has decided are better to hand off to # another process. - for target in self.targets.copy(): - if target.nodeid not in nodeids: - self.targets.remove(target) - self.dropped_targets.append(target) - # we intentionally do not remove our event_dispatch listener - # here, because if we are ever told to pick up this dropped target - # again in the future, we still want its corpus and failure replay - # to be up to date from other workers. - self.nodeids = nodeids + # we get passed unique nodeids + assert len(set(nodeids)) == len(nodeids) + added_nodeids = set(nodeids) - set(self.targets.keys()) + removed_nodeids = set(self.targets.keys()) - set(nodeids) + + for nodeid in added_nodeids: + self._add_target(nodeid) + + for nodeid in removed_nodeids: + self._remove_target(nodeid) + + assert set(self.targets.keys()) == set(nodeids) + + def _switch_to_target(self, target: FuzzTarget) -> None: + # if we're sticking with our current target, then we don't need to + # do anything expensive like cleaning up fixtures. + if target == self._current_target: + return + + if self._current_target is not None: + # first, clean up any fixtures from the old target. + self._current_target._exit_fixtures() + + # then, set up any fixtures for the new target. + target._enter_fixtures() + self._current_target = target def start(self) -> None: self.worker_start = time.perf_counter() - self.collected_fuzz_targets = collect_tests(self.pytest_args).fuzz_targets + + collected = collect_tests(self.pytest_args) + self.collected_targets = { + target.nodeid: target for target in collected.fuzz_targets + } + settings().database.add_listener(self.on_event) while True: self._update_targets(self.shared_state["hub_state"]["nodeids"]) - self._maybe_add_targets() - - if not self.valid_targets: - break - # softmax might return 0.0 probability for some targets if there is - # a substantial gap in estimator values (e.g. behaviors_per_second=1_000 - # vs behaviors_per_second=1.0). We don't expect this to happen normally, - # but it might when our estimator state is just getting started. - # - # Mix in a uniform probability of 1%, so we will eventually get out of - # such a hole. - if self.random.random() < 0.01: - target = self.random.choice(self.valid_targets) - else: - behaviors_rates = [ - e_target_rates(target) for target in self.valid_targets - ] - weights = bandit_weights(behaviors_rates) - target = self.random.choices(self.valid_targets, weights=weights, k=1)[ - 0 - ] - - # TODO we should scale this n up if our estimator expects that it will - # take a long time to discover a new behavior, to reduce the overhead - # of switching targets. - for _ in range(100): - target.run_one() - - # give the hub an up-to-date estimator state + # it's possible to go through an interim period where we have no nodeids, + # but the hub still has nodeids to assign. We don't want the worker to + # exit in this case, but rather keep waiting for nodeids. Even if n_workers + # exceeds n_tests, we still want to keep all workers alive, because the + # hub will assign the same test to multiple workers simultaneously. + if self.valid_targets: + # softmax might return 0.0 probability for some targets if there is + # a substantial gap in estimator values (e.g. behaviors_per_second=1_000 + # vs behaviors_per_second=1.0). We don't expect this to happen normally, + # but it might when our estimator state is just getting started. + # + # Mix in a uniform probability of 1%, so we will eventually get out of + # such a hole. + if self.random.random() < 0.01: + target = self.random.choice(self.valid_targets) + else: + behaviors_rates = [ + e_target_rates(target) for target in self.valid_targets + ] + weights = bandit_weights(behaviors_rates) + target = self.random.choices( + self.valid_targets, weights=weights, k=1 + )[0] + + self._switch_to_target(target) + # TODO we should scale this n up if our estimator expects that it will + # take a long time to discover a new behavior, to reduce the overhead + # of switching targets. + for _ in range(100): + target.run_one() + + worker_state = self.shared_state["worker_state"] + worker_state["nodeids"][target.nodeid] = { + "behavior_rates": e_target_rates(target), + } + + # give the hub up-to-date estimator states current_lifetime = time.perf_counter() - self.worker_start worker_state = self.shared_state["worker_state"] worker_state["current_lifetime"] = current_lifetime worker_state["expected_lifetime"] = e_worker_lifetime(current_lifetime) - worker_state["nodeids"][target.nodeid] = { - "behavior_rates": e_target_rates(target), - } class FuzzWorkerHub: @@ -552,6 +548,7 @@ def start(self) -> None: shared_state["worker_state"]["nodeids"] = manager.dict() shared_state["worker_state"]["current_lifetime"] = 0.0 shared_state["worker_state"]["expected_lifetime"] = 0.0 + shared_state["worker_state"]["valid_nodeids"] = manager.list() process = Process( target=_start_worker, @@ -576,8 +573,11 @@ def start(self) -> None: # failure). So we rebalance either once every n seconds, or whenever # some worker needs a rebalancing. time.sleep(60) - # if all our workers have exited, we should exit as well - if all(not process.is_alive() for process in processes): + # if none of our workers have anything to do, we should exit as well + if all( + not state["worker_state"]["valid_nodeids"] + for state in self.shared_states + ): break self._rebalance() @@ -585,6 +585,9 @@ def start(self) -> None: def _rebalance(self) -> None: # rebalance the assignment of nodeids to workers, according to the # up-to-date estimators from our workers. + # TODO actually defer starting up targets here, based on worker lifetime + # and startup cost estimators. We should limit what we assign initially, + # and only assign more as the estimator says it's worthwhile. assert len(self.shared_states) == self.n_processes current_workers = [ @@ -642,5 +645,5 @@ def _start_worker( Designed to be used inside a multiprocessing.Process started with the spawn() method - requires picklable arguments but works on Windows too. """ - process = FuzzWorker(pytest_args=pytest_args, shared_state=shared_state) - process.start() + worker = FuzzWorker(pytest_args=pytest_args, shared_state=shared_state) + worker.start() From 80fbab49ada84e4f6e4caa6535d9bb030653034a Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Sat, 28 Jun 2025 01:25:48 -0400 Subject: [PATCH 6/7] fix valid_nodeids --- src/hypofuzz/hypofuzz.py | 3 +++ tests/common.py | 2 +- tests/test_workers.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index 568c2cdf..58fffecb 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -521,6 +521,9 @@ def start(self) -> None: worker_state = self.shared_state["worker_state"] worker_state["current_lifetime"] = current_lifetime worker_state["expected_lifetime"] = e_worker_lifetime(current_lifetime) + worker_state["valid_nodeids"] = [ + target.nodeid for target in self.valid_targets + ] class FuzzWorkerHub: diff --git a/tests/common.py b/tests/common.py index 527578b9..16639513 100644 --- a/tests/common.py +++ b/tests/common.py @@ -44,7 +44,7 @@ def patches(self, *, nodeid): return r.json() -def wait_for(condition, *, timeout=10, interval): +def wait_for(condition, *, timeout=5, interval): for _ in range(int(timeout // interval) + 1): if value := condition(): return value diff --git a/tests/test_workers.py b/tests/test_workers.py index 189abbf3..54f59dd9 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -20,7 +20,7 @@ def test_c(): """ -def test_workers(tmp_path): +def test_worker_general(tmp_path): test_dir, _db_dir = setup_test_code(tmp_path, test_code) with multiprocessing.Manager() as manager: From 816c050746d8f023ac75da20161bd875af814d81 Mon Sep 17 00:00:00 2001 From: Liam DeVoe Date: Fri, 4 Jul 2025 00:25:22 -0400 Subject: [PATCH 7/7] address some review comments --- src/hypofuzz/bayes.py | 37 ++++++++++++++++++++++++++++--------- tests/test_bayes.py | 6 +++--- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/hypofuzz/bayes.py b/src/hypofuzz/bayes.py index 0c2a2494..cc6a1e86 100644 --- a/src/hypofuzz/bayes.py +++ b/src/hypofuzz/bayes.py @@ -68,7 +68,7 @@ def distribute_nodes( # the optimal solution with a greedy one, specifically a variant of # "longest processing time first scheduling": we first sort the nodes in # increasing order of their estimator. Then, for each mode, we check which - # worker has the lowest worker_behaiors, and assign the node to that worker. + # worker has the lowest worker_behaviors, and assign the node to that worker. # Since we are iterating in increasing order of estimator, we know that adding # a node to a worker will increase that worker's worker_behaviors (unless the # worker's scheduling algorithm for targets is literally adversarial, ie @@ -83,7 +83,12 @@ def distribute_nodes( # # This penalty cost of switching a nodeid between workers is # worker_behaviors_per_second * node_startup_cost_seconds, ie the number of - # behaviors we expect to lose by spending time starting up this node. + # behaviors we expect to lose by spending time starting up this node. We + # only switch a nodeid between workers if the expected gain of doing so is + # greater than this penalty. Note that in practice the penalty is applied + # during assignment of nodes to workers in the greedy algorithm, so the penalty + # may not produce optimal results, since the greedy solution is only an + # approximation in general. random = Random() if current_workers is None: @@ -114,7 +119,7 @@ def distribute_nodes( targets.append(targets[-target_idx]) target_idx = (target_idx + 1) % len(targets) - # then, we assign each target to the worker with the worst worker_behaviors. + # then, we assign each target to the worker with the lowest worker_behaviors. # Since we're iterating over the targets in increasing order of behaviors # per-second, adding a target to a worker will always increase its # worker_behaviors. @@ -143,16 +148,17 @@ def worker_score( # want to increase the score of all other workers. So the offset here # should be positive. offset = worker_rates.per_second * target.e_startup_time + assert offset >= 0, offset # to avoid crazy rebalancing during the initial startup phase, don't # work with small lifetime estimators e_lifetime = max(e_lifetime, DEFAULT_EXPECTED_LIFETIME_ESTIMATOR) - return (worker_rates.per_second * e_lifetime) + offset + return e_worker_behaviors(rates=worker_rates, e_lifetime=e_lifetime) + offset for target in targets: # find all the workers with the minimum value score, and randomly assign # this target to one of them. Normally there won't be ties, and the target - # simply goes to the worst worker. But when fuzzing for the first time + # simply goes to the lowest worker. But when fuzzing for the first time # (or after a db wipe) where all targets have the same estimators, we # don't want to end in an assignment where one worker is given n - 1 nodes # and the other is given just 1. @@ -208,13 +214,26 @@ def e_worker_lifetime(current_lifetime: float) -> float: return current_lifetime * 2 +def e_worker_behaviors(rates: BehaviorRates, e_lifetime: float) -> float: + """ + An estimator for the total number of behaviors we expect a worker to discover + over its lifetime. + + `lifetime` is the estimator for the worker's total lifetime, given by + e_worker_lifetime. + """ + return rates.per_second * e_lifetime + + def e_worker_rates(*, target_rates: Sequence[BehaviorRates]) -> BehaviorRates: - weights = bandit_weights(target_rates) # the expected behavior rates of a worker is # sum(probability * expected_value) for each of its targets. - # Note that this is tightly dependent on the sampling algorithm used in - # practice by the workers. If that changes (to e.g. thompson sampling), our - # estimators will need to change to use the same sampling algorithm as well. + # + # Note that this estimator is tightly dependent on the sampling algorithm used + # in practice by the workers. If that changes (to e.g. thompson sampling), this + # estimator will need to change to use the same sampling algorithm as well. + # (ie, both places need to continue calling the same bandit_weights function). + weights = bandit_weights(target_rates) return BehaviorRates( per_input=sum(p * rates.per_input for p, rates in zip(weights, target_rates)), per_second=sum(p * rates.per_second for p, rates in zip(weights, target_rates)), diff --git a/tests/test_bayes.py b/tests/test_bayes.py index 69fea904..882146d8 100644 --- a/tests/test_bayes.py +++ b/tests/test_bayes.py @@ -21,10 +21,10 @@ nodeid=st.integers(0, 1000).map(lambda n: f"node{n}"), rates=st.builds( BehaviorRates, - per_input=st.floats(min_value=0, max_value=10**10, exclude_min=True), - per_second=st.floats(min_value=0, max_value=10**10, exclude_min=True), + per_input=st.floats(min_value=0, max_value=10e6, exclude_min=True), + per_second=st.floats(min_value=0, max_value=10e6, exclude_min=True), ), - e_startup_time=st.floats(min_value=0, max_value=10**10, exclude_min=True), + e_startup_time=st.floats(min_value=0, max_value=10e6, exclude_min=True), ), min_size=1, ),