Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion agentevac/agents/agent_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
"""

import math
import random
from dataclasses import dataclass, field
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple


@dataclass
Expand Down Expand Up @@ -72,6 +73,49 @@ class AgentRuntimeState:
AGENT_STATES: Dict[str, AgentRuntimeState] = {}


def sample_profile_params(
agent_id: str,
means: Dict[str, float],
spreads: Dict[str, float],
bounds: Dict[str, Tuple[float, float]],
) -> Dict[str, float]:
"""Sample per-agent profile parameters from truncated normal distributions.

Each parameter is drawn from ``N(mean, spread)`` and clipped to ``[lo, hi]``.
When ``spread <= 0`` the mean is returned unchanged (no heterogeneity).

A deterministic RNG seeded by ``agent_id`` ensures that the same agent always
receives the same profile regardless of which code path creates it first.

Args:
agent_id: Vehicle ID used to seed the per-agent RNG.
means: Dict of parameter names to population means.
spreads: Dict of parameter names to population standard deviations.
Missing keys or values <= 0 disable sampling for that parameter.
bounds: Dict of parameter names to ``(lo, hi)`` clipping bounds.

Returns:
A dict of sampled parameter values, one per key in ``means``.
"""
rng = random.Random(hash(agent_id))
result: Dict[str, float] = {}
for key, mu in means.items():
sigma = float(spreads.get(key, 0.0))
lo, hi = bounds.get(key, (mu, mu))
if sigma <= 0.0:
result[key] = mu
else:
# Rejection-sample from truncated normal (bounded).
for _ in range(100):
v = rng.gauss(mu, sigma)
if lo <= v <= hi:
result[key] = round(v, 4)
break
else:
result[key] = round(max(lo, min(hi, mu)), 4)
return result


def ensure_agent_state(
agent_id: str,
sim_t_s: float,
Expand Down
43 changes: 43 additions & 0 deletions agentevac/agents/belief_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,45 @@ def bucket_uncertainty(entropy_norm: float) -> str:
return "High"


def compute_signal_conflict(
env_belief: Dict[str, float],
social_belief: Dict[str, float],
) -> float:
"""Measure disagreement between env and social beliefs via Jensen-Shannon divergence.

JSD is symmetric, bounded [0, ln 2], and information-theoretic — consistent with
the entropy framework used elsewhere in this module. The raw JSD is normalized
by ln(2) so the return value lies in [0, 1]:

0 = sources perfectly agree
1 = sources maximally disagree (e.g., one says safe, the other says danger)

This score is recorded for post-hoc RQ1 analysis and surfaced in the LLM prompt
so the agent can reason about contradictions between its own observation and
neighbor messages.

Args:
env_belief: Belief derived from the agent's own hazard observation.
social_belief: Belief inferred from neighbor inbox messages.

Returns:
Normalized JSD ∈ [0, 1].
"""
keys = ("p_safe", "p_risky", "p_danger")
env = _normalize_triplet(env_belief)
soc = _normalize_triplet(social_belief)
m = {k: 0.5 * env[k] + 0.5 * soc[k] for k in keys}

def _kl(p: Dict[str, float], q: Dict[str, float]) -> float:
return sum(
max(1e-12, p[k]) * math.log(max(1e-12, p[k]) / max(1e-12, q[k]))
for k in keys
)

jsd = 0.5 * _kl(env, m) + 0.5 * _kl(soc, m)
return _clamp(jsd / math.log(2), 0.0, 1.0)


def update_agent_belief(
prev_belief: Dict[str, float],
env_signal: Dict[str, Any],
Expand Down Expand Up @@ -252,6 +291,7 @@ def update_agent_belief(
- p_safe, p_risky, p_danger : smoothed posterior probabilities
- entropy, entropy_norm : Shannon entropy (raw and normalized)
- uncertainty_bucket : "Low", "Medium", or "High"
- signal_conflict : JSD between env and social beliefs [0, 1]
- env_weight, social_weight : fusion weights applied this round
- env_belief, social_belief : component beliefs before fusion
"""
Expand All @@ -264,12 +304,14 @@ def update_agent_belief(
fused = fuse_env_and_social_beliefs(env_belief, social_belief, theta_trust)
social_weight = _clamp(theta_trust, 0.0, 1.0)
env_weight = 1.0 - social_weight
conflict = compute_signal_conflict(env_belief, social_belief)
else:
# No messages in inbox: rely entirely on own environmental observation.
social_belief = {"p_safe": 1.0 / 3.0, "p_risky": 1.0 / 3.0, "p_danger": 1.0 / 3.0}
fused = dict(env_belief)
social_weight = 0.0
env_weight = 1.0
conflict = 0.0

smoothed = smooth_belief(prev_belief or env_belief, fused, inertia=inertia)
entropy = compute_belief_entropy(smoothed)
Expand All @@ -282,6 +324,7 @@ def update_agent_belief(
"entropy": round(entropy, 4),
"entropy_norm": round(entropy_norm, 4),
"uncertainty_bucket": bucket_uncertainty(entropy_norm),
"signal_conflict": round(conflict, 4),
"env_weight": round(env_weight, 4),
"social_weight": round(social_weight, 4),
"env_belief": env_belief,
Expand Down
21 changes: 20 additions & 1 deletion agentevac/agents/information_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,20 @@ def inject_signal_noise(
signal: Dict[str, Any],
sigma_info: float,
rng: Optional[random.Random] = None,
distance_ref_m: float = 0.0,
) -> Dict[str, Any]:
"""Add zero-mean Gaussian noise to the observed fire margin.

Simulates imperfect environmental sensing (e.g., smoke, sensor noise, GPS error).
The noisy observation is clamped by the natural arithmetic (can go negative, meaning
the agent *believes* the fire has reached it even if it hasn't, or vice-versa).

When ``distance_ref_m > 0``, the effective noise standard deviation is scaled by
the ratio ``base_margin / distance_ref_m`` (proposal Eq. 1: ``Dist(s_t)``). This
models the perceptual reality that close fires are easy to judge while distant fires
are harder to assess. Setting ``distance_ref_m=0`` (default) disables scaling and
applies ``sigma_info`` uniformly (legacy behaviour).

If ``base_margin_m`` is absent (no fire active or edge not found), the function
returns the signal unchanged with ``observed_margin_m=None``.

Expand All @@ -60,6 +67,9 @@ def inject_signal_noise(
A value of 0 disables noise injection.
rng: Optional seeded ``random.Random`` instance for reproducible noise.
Falls back to the global ``random`` module if not provided.
distance_ref_m: Reference distance for distance-based noise scaling.
When > 0, effective sigma = sigma_info * (base_margin / distance_ref_m).
When 0, sigma_info is applied uniformly (no scaling).

Returns:
A shallow copy of ``signal`` with added fields:
Expand All @@ -76,6 +86,11 @@ def inject_signal_noise(
out["observed_state"] = "unknown"
return out

# Distance-based noise scaling (proposal Eq. 1): closer fire → less noise.
d_ref = float(distance_ref_m)
if d_ref > 0.0 and sigma > 0.0:
sigma = sigma * (max(0.0, float(base_margin)) / d_ref)

src = rng if rng is not None else random
noise_delta = float(src.gauss(0.0, sigma)) if sigma > 0.0 else 0.0
observed_margin = float(base_margin) + noise_delta
Expand Down Expand Up @@ -144,6 +159,7 @@ def sample_environment_signal(
decision_round: int,
sigma_info: float,
rng: Optional[random.Random] = None,
distance_ref_m: float = 0.0,
) -> Dict[str, Any]:
"""Build a noisy environmental hazard signal for one agent at one decision round.

Expand All @@ -163,6 +179,9 @@ def sample_environment_signal(
decision_round: Global decision-round counter (used as a history key).
sigma_info: Noise standard deviation in metres (0 = noiseless).
rng: Optional seeded RNG for reproducibility.
distance_ref_m: Reference distance for distance-based noise scaling (metres).
When > 0, noise sigma scales with base_margin / distance_ref_m.
When 0, sigma_info is applied uniformly (no scaling).

Returns:
A signal dict with fields including ``base_margin_m``, ``observed_margin_m``,
Expand All @@ -186,7 +205,7 @@ def sample_environment_signal(
"observed_margin_m": None,
"observed_state": "unknown",
}
return inject_signal_noise(signal, sigma_info, rng=rng)
return inject_signal_noise(signal, sigma_info, rng=rng, distance_ref_m=distance_ref_m)


def build_social_signal(
Expand Down
2 changes: 1 addition & 1 deletion agentevac/agents/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def scenario_prompt_suffix(mode: str) -> str:
if cfg["mode"] == "no_notice":
return (
"This is a no-notice wildfire scenario: do not assume official route instructions exist. "
"Rely mainly on subjective_information, inbox messages, and your own caution. "
"Rely mainly on your_observation, inbox messages, and your own caution. "
"Do NOT invent official instructions. Base decisions on environmental cues (smoke/flames/visibility), "
"your current hazard or forecast inputs if provided, and peer-to-peer messages. Seek credible info when available "
", and choose conservative actions if uncertain."
Expand Down
Loading
Loading