diff --git a/src/hypofuzz/bayes.py b/src/hypofuzz/bayes.py index 07bf98a8..cc6a1e86 100644 --- a/src/hypofuzz/bayes.py +++ b/src/hypofuzz/bayes.py @@ -1,32 +1,243 @@ import math -from typing import TYPE_CHECKING +from collections.abc import Sequence +from dataclasses import dataclass +from itertools import takewhile +from random import Random +from typing import TYPE_CHECKING, Any, Optional, TypeVar if TYPE_CHECKING: from hypofuzz.hypofuzz import FuzzTarget -# for the behaviors estimators, we should incorporate a lookback across the +T = TypeVar("T") + +# in the absence of any knowledge about worker lifetimes, assume a worker lives +# for 5 minutes. +DEFAULT_EXPECTED_LIFETIME_ESTIMATOR = 60 * 5 + + +@dataclass +class CurrentWorker: + nodeids: Sequence[str] + e_lifetime: float + + +@dataclass +class DistributeNodesTarget: + nodeid: str + rates: "BehaviorRates" + e_startup_time: float + + +@dataclass +class BehaviorRates: + # An estimator for the number of behaviors the next input will discover. This + # will be between 0 and 1. + per_input: float + # An estimator for the number of behaviors discovered per second for a target, + # assuming one worker is fuzzing this target continuously over that second. + per_second: float + + +def _min_values(values: Sequence[T], key: Any) -> Sequence[T]: + candidates = sorted( + [(value, key(value)) for value in values], key=lambda item: item[1] + ) + min_value = candidates[0][1] + return [ + item[0] for item in takewhile(lambda item: item[1] == min_value, candidates) + ] + + +def distribute_nodes( + targets: Sequence[DistributeNodesTarget], + *, + n: int, + current_workers: Optional[Sequence[CurrentWorker]] = None, +) -> tuple[tuple[str, ...], ...]: + # We have x nodes node_i, each with an estimator \hat{v}_i for "behaviors per + # second". We have n bins (worker), and we want to distribute node_i + # into the n bins such that we maximize the the sum of worker_behaviors. + # + # The estimator for the number of behaviors for a worker is given by + # e_worker_behaviors. Instead of trying for the optimal solution of maximizing + # the sum of worker_behaviors, we instead maximize the smallest worker_behaviors + # quantity. + # + # This is related to "makespan minimization", and is a classic bin packing + # problem. Optimality of this problem is NP complete, so we instead approximate + # the optimal solution with a greedy one, specifically a variant of + # "longest processing time first scheduling": we first sort the nodes in + # increasing order of their estimator. Then, for each mode, we check which + # worker has the lowest worker_behaviors, and assign the node to that worker. + # Since we are iterating in increasing order of estimator, we know that adding + # a node to a worker will increase that worker's worker_behaviors (unless the + # worker's scheduling algorithm for targets is literally adversarial, ie + # adding a higher-than-average value target decreases its expected behaviors + # per second, which we will assume is not the case). + # + # Optionally, the current assignment `current_workers` of node ids to workers + # can be passed. This incorporates an overhead cost to switching a nodeid to a + # different worker. The algorithm is the standard bin packing algorithm, but + # with a penalty to a node being assigned to a worker other than its current + # worker. + # + # This penalty cost of switching a nodeid between workers is + # worker_behaviors_per_second * node_startup_cost_seconds, ie the number of + # behaviors we expect to lose by spending time starting up this node. We + # only switch a nodeid between workers if the expected gain of doing so is + # greater than this penalty. Note that in practice the penalty is applied + # during assignment of nodes to workers in the greedy algorithm, so the penalty + # may not produce optimal results, since the greedy solution is only an + # approximation in general. + + random = Random() + if current_workers is None: + current_workers = [CurrentWorker(nodeids=[], e_lifetime=0.0) for _ in range(n)] + + assert len(current_workers) == n + + # estimators of 0 are mathematically valid, but can lead to bad/pathological + # algorithm outcomes + assert all(target.rates.per_second > 0 for target in targets) + assert all(target.rates.per_input > 0 for target in targets) + + # return partitions in the same iteration order they were passed in current_workers + workers: list[dict[str, Any]] = [ + {"current_worker": worker, "targets": []} for worker in current_workers + ] + + # first, we sort the target in increasing order by their estimator. + targets = sorted(targets, key=lambda target: target.rates.per_second) + + # If we have fewer than `n` targets, we repeat the list of targets in decreasing + # order of their estimator until we reach `n` targets. This ensures every + # worker receives at least one target (in fact, in this case, exactly one). + target_idx = 0 + while len(targets) < n: + # `targets` are in increasing order, so we index negatively to get + # a decreasing order + targets.append(targets[-target_idx]) + target_idx = (target_idx + 1) % len(targets) + + # then, we assign each target to the worker with the lowest worker_behaviors. + # Since we're iterating over the targets in increasing order of behaviors + # per-second, adding a target to a worker will always increase its + # worker_behaviors. + def worker_score( + worker: dict[str, Any], *, target: Optional[DistributeNodesTarget] = None + ) -> float: + e_lifetime: float = worker["current_worker"].e_lifetime + worker_rates = e_worker_rates( + target_rates=[target.rates for target in worker["targets"]], + ) + offset = 0.0 + if target is not None and target.nodeid not in worker["current_worker"].nodeids: + # Add a penalty for switching nodes between workers. Since the ordering + # quantity is the e_worker_behaviors estimator of lifetime worker + # behaviors, we want to allow a node to switch workers if the ev + # differential is greater than the number of behaviors we expect to + # lose from spending time starting up this worker. + # + # And the number of behaviors we expect to lose is the behaviors per + # second estimator for the worker, times the estimator for the startup + # time of this node. + # + # We are choosing the worker with the lowest score to add this node to, + # so if we want to encourage this node to be assigned to its current + # worker, we want that worker to have a low score, which means we + # want to increase the score of all other workers. So the offset here + # should be positive. + offset = worker_rates.per_second * target.e_startup_time + assert offset >= 0, offset + + # to avoid crazy rebalancing during the initial startup phase, don't + # work with small lifetime estimators + e_lifetime = max(e_lifetime, DEFAULT_EXPECTED_LIFETIME_ESTIMATOR) + return e_worker_behaviors(rates=worker_rates, e_lifetime=e_lifetime) + offset + + for target in targets: + # find all the workers with the minimum value score, and randomly assign + # this target to one of them. Normally there won't be ties, and the target + # simply goes to the lowest worker. But when fuzzing for the first time + # (or after a db wipe) where all targets have the same estimators, we + # don't want to end in an assignment where one worker is given n - 1 nodes + # and the other is given just 1. + smallest_workers = _min_values( + workers, + key=lambda worker: worker_score(worker, target=target), + ) + smallest_worker = random.choice(smallest_workers) + + score_before = worker_score(smallest_worker) + smallest_worker["targets"].append(target) + # ignore float rounding errors for our invariant check + assert worker_score(smallest_worker) - score_before >= -1e-6, ( + score_before, + worker_score(smallest_worker), + ) + + return tuple( + tuple(target.nodeid for target in worker["targets"]) for worker in workers + ) + + +# TODO for the behaviors estimators, we should incorporate a lookback across the # history of workers for this test. Give higher weight to newer estimators # (proportional to their confidence ie sample size). -def behaviors_per_input(target: "FuzzTarget") -> float: - # an estimator for the number of behaviors the next input will discover. +def e_target_rates(target: "FuzzTarget") -> BehaviorRates: + # per_input computation since = target.provider.since_new_behavior - return (1 / since) if since > 0 else 1 - + per_input = (1 / since) if since > 0 else 1 -def behaviors_per_second(target: "FuzzTarget") -> float: - # an estimator for the number of behaviors discovered per second, assuming - # one process is fuzzing this target continuously over that second. - # This is a simple adjustment of behaviors_per_input for the test runtime. + # per_second computation ninputs = target.provider.ninputs elapsed_time = target.provider.elapsed_time if elapsed_time == 0: - return 1 + per_second = 1.0 + else: + inputs_per_second = ninputs / elapsed_time + per_second = per_input * inputs_per_second + + return BehaviorRates(per_input=per_input, per_second=per_second) + - inputs_per_second = ninputs / elapsed_time - return behaviors_per_input(target) * inputs_per_second +def e_worker_lifetime(current_lifetime: float) -> float: + """ + An estimator for the total lifetime of a worker. + """ + # We use the doomsday-argument estimator that the total lifetime is twice the + # current lifetime. In the future, this could incorporate past worker + # lifetimes as well. + return current_lifetime * 2 + + +def e_worker_behaviors(rates: BehaviorRates, e_lifetime: float) -> float: + """ + An estimator for the total number of behaviors we expect a worker to discover + over its lifetime. + + `lifetime` is the estimator for the worker's total lifetime, given by + e_worker_lifetime. + """ + return rates.per_second * e_lifetime + + +def e_worker_rates(*, target_rates: Sequence[BehaviorRates]) -> BehaviorRates: + # the expected behavior rates of a worker is + # sum(probability * expected_value) for each of its targets. + # + # Note that this estimator is tightly dependent on the sampling algorithm used + # in practice by the workers. If that changes (to e.g. thompson sampling), this + # estimator will need to change to use the same sampling algorithm as well. + # (ie, both places need to continue calling the same bandit_weights function). + weights = bandit_weights(target_rates) + return BehaviorRates( + per_input=sum(p * rates.per_input for p, rates in zip(weights, target_rates)), + per_second=sum(p * rates.per_second for p, rates in zip(weights, target_rates)), + ) def softmax(values: list[float]) -> list[float]: @@ -38,3 +249,15 @@ def softmax(values: list[float]) -> list[float]: total = sum(softmaxed) return [value / total for value in softmaxed] + + +def bandit_weights(behavior_rates: Sequence[BehaviorRates]) -> list[float]: + """ + Returns the probability that each target should be chosen, as a solution + to the multi-armed-bandit problem. + """ + + # choose the next target to fuzz with probability equal to the softmax + # of its expected value (behaviors per second), aka boltzmann exploration + per_second_estimators = [rates.per_second for rates in behavior_rates] + return softmax(per_second_estimators) diff --git a/src/hypofuzz/hypofuzz.py b/src/hypofuzz/hypofuzz.py index 4b7d3d5e..58fffecb 100644 --- a/src/hypofuzz/hypofuzz.py +++ b/src/hypofuzz/hypofuzz.py @@ -39,7 +39,15 @@ get_signature, ) -from hypofuzz.bayes import behaviors_per_second, softmax +from hypofuzz.bayes import ( + BehaviorRates, + CurrentWorker, + DistributeNodesTarget, + bandit_weights, + distribute_nodes, + e_target_rates, + e_worker_lifetime, +) from hypofuzz.collection import collect_tests from hypofuzz.corpus import ( get_shrinker, @@ -478,12 +486,6 @@ def start(self) -> None: # exceeds n_tests, we still want to keep all workers alive, because the # hub will assign the same test to multiple workers simultaneously. if self.valid_targets: - # choose the next target to fuzz with probability equal to the softmax - # of its estimator. aka boltzmann exploration - estimators = [ - behaviors_per_second(target) for target in self.valid_targets - ] - estimators = softmax(estimators) # softmax might return 0.0 probability for some targets if there is # a substantial gap in estimator values (e.g. behaviors_per_second=1_000 # vs behaviors_per_second=1.0). We don't expect this to happen normally, @@ -494,8 +496,12 @@ def start(self) -> None: if self.random.random() < 0.01: target = self.random.choice(self.valid_targets) else: + behaviors_rates = [ + e_target_rates(target) for target in self.valid_targets + ] + weights = bandit_weights(behaviors_rates) target = self.random.choices( - self.valid_targets, weights=estimators, k=1 + self.valid_targets, weights=weights, k=1 )[0] self._switch_to_target(target) @@ -507,15 +513,14 @@ def start(self) -> None: worker_state = self.shared_state["worker_state"] worker_state["nodeids"][target.nodeid] = { - "behavior_rates": None, + "behavior_rates": e_target_rates(target), } # give the hub up-to-date estimator states current_lifetime = time.perf_counter() - self.worker_start worker_state = self.shared_state["worker_state"] worker_state["current_lifetime"] = current_lifetime - worker_state["expected_lifetime"] = None - + worker_state["expected_lifetime"] = e_worker_lifetime(current_lifetime) worker_state["valid_nodeids"] = [ target.nodeid for target in self.valid_targets ] @@ -588,18 +593,48 @@ def _rebalance(self) -> None: # and only assign more as the estimator says it's worthwhile. assert len(self.shared_states) == self.n_processes - partitions = [] - for i in range(self.n_processes): - # Round-robin for large test suites; all-on-all for tiny, etc. - nodeids: set[str] = set() - for ix in range(self.n_processes): - nodeids.update( - nodeid for nodeid in self.nodeids[i + ix :: self.n_processes] - ) - if len(nodeids) >= 10: # enough to prioritize between - break - partitions.append(nodeids) + current_workers = [ + CurrentWorker( + nodeids=state["worker_state"]["nodeids"].keys(), + e_lifetime=state["worker_state"]["expected_lifetime"], + ) + for state in self.shared_states + ] + + # fill with default estimators, for the first-time startup + # nodeid: (worker_lifetime, rates) + targets = { + nodeid: (0.0, BehaviorRates(per_second=1.0, per_input=1.0)) + for nodeid in self.nodeids + } + for state in self.shared_states: + worker_state = state["worker_state"] + worker_lifetime = worker_state["current_lifetime"] + for nodeid, rates in worker_state["nodeids"].items(): + if nodeid not in targets: + targets[nodeid] = (worker_lifetime, rates) + + # if the nodeid already exists, but we have a better estimator + # for it, replace it + if worker_lifetime > targets[nodeid][0]: + targets[nodeid] = (worker_lifetime, rates) + + # TODO estimate startup time of this target + # ("number of corpus elements" * "average input runtime" probably?) + targets = [ + DistributeNodesTarget(nodeid=nodeid, rates=rates, e_startup_time=0) + for nodeid, (_lifetime, rates) in targets.items() + ] + partitions = distribute_nodes( + targets, + n=self.n_processes, + current_workers=current_workers, + ) + # communicate the worker's new nodeids back to the worker. + # + # the iteration order of the partitions returned by distribute_nodes is + # the same as the iteration order of current_workers for state, nodeids in zip(self.shared_states, partitions): state["hub_state"]["nodeids"] = nodeids diff --git a/tests/common.py b/tests/common.py index 527578b9..16639513 100644 --- a/tests/common.py +++ b/tests/common.py @@ -44,7 +44,7 @@ def patches(self, *, nodeid): return r.json() -def wait_for(condition, *, timeout=10, interval): +def wait_for(condition, *, timeout=5, interval): for _ in range(int(timeout // interval) + 1): if value := condition(): return value diff --git a/tests/test_bayes.py b/tests/test_bayes.py index e54458b6..882146d8 100644 --- a/tests/test_bayes.py +++ b/tests/test_bayes.py @@ -1,7 +1,92 @@ +import itertools +from collections.abc import Sequence +from typing import Any + import pytest from hypothesis import given, note, strategies as st -from hypofuzz.bayes import softmax +from hypofuzz.bayes import ( + BehaviorRates, + DistributeNodesTarget, + distribute_nodes, + softmax, +) + + +@given( + st.integers(1, 100), + st.lists( + st.builds( + DistributeNodesTarget, + nodeid=st.integers(0, 1000).map(lambda n: f"node{n}"), + rates=st.builds( + BehaviorRates, + per_input=st.floats(min_value=0, max_value=10e6, exclude_min=True), + per_second=st.floats(min_value=0, max_value=10e6, exclude_min=True), + ), + e_startup_time=st.floats(min_value=0, max_value=10e6, exclude_min=True), + ), + min_size=1, + ), +) +def test_distribute_nodes(n, targets): + partitions = distribute_nodes(targets, n=n, current_workers=None) + + total_nodes = sum(len(partition) for partition in partitions) + assert total_nodes == n if len(targets) < n else len(targets) + assert all(len(partition) > 0 for partition in partitions) + + +@given(st.integers(4, 100)) +def test_distribute_nodes_more_processes_than_nodes(n): + # when there are more processes than nodes, we should see the highest value + # nodes getting assigned to processes first. + targets = [ + DistributeNodesTarget( + nodeid=str(i + 1), + rates=BehaviorRates(per_second=n, per_input=n), + e_startup_time=0.0, + ) + for i, n in enumerate([1, 3, 2]) + ] + assert set(distribute_nodes(targets, n=n)) == set( + itertools.islice(itertools.cycle([("2",), ("3",), ("1",)]), n) + ) + + +def _targets(*targets: Sequence[Any]) -> list[DistributeNodesTarget]: + return [ + DistributeNodesTarget( + nodeid=nodeid, + rates=BehaviorRates(per_second=per_second, per_input=per_input), + e_startup_time=startup_time, + ) + for nodeid, (per_second, per_input), startup_time in targets + ] + + +@pytest.mark.parametrize( + "targets, n, expected", + [ + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 1, + {("a", "b", "c")}, + ), + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 2, + {("a", "c"), ("b",)}, + ), + ( + _targets(("a", (1, 1), 0), ("b", (2, 2), 0), ("c", (3, 3), 0)), + 3, + {("a",), ("b",), ("c",)}, + ), + ], +) +def test_distribute_nodes_explicit(targets, n, expected): + assert set(distribute_nodes(targets, n=n)) == expected @given(st.lists(st.floats(min_value=0, allow_nan=False, allow_infinity=False))) @@ -23,3 +108,7 @@ def test_softmax(values): if values: assert sum(softmaxed) == pytest.approx(1) + + +# TODO: test current_workers and e_startup_time, specifically that the penalty for +# a node switching workers works as expected diff --git a/tests/test_workers.py b/tests/test_workers.py index 189abbf3..54f59dd9 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -20,7 +20,7 @@ def test_c(): """ -def test_workers(tmp_path): +def test_worker_general(tmp_path): test_dir, _db_dir = setup_test_code(tmp_path, test_code) with multiprocessing.Manager() as manager: