Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/anatel_api_diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main(api_url, search_query):
packages = package_search(api_url, search_query)
top_candidates = packages["results"][:5] # Get top 5 candidates
print("Top Candidates:", top_candidates)
report = {"timestamp": str(datetime.utcnow()), "top_candidates": top_candidates}
report = {"timestamp": str(datetime.now(datetime.UTC)), "top_candidates": top_candidates}
output_path = Path("output.json")
with output_path.open("w") as outfile:
json.dump(report, outfile, indent=4)
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def main() -> int:
steps = int(args.steps or preset_steps)
dt = float(args.dt or preset_dt)

run_id = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
run_id = datetime.now(datetime.UTC).strftime("%Y%m%dT%H%M%SZ")
out_dir = Path(args.output_dir) / f"run_{run_id}_{args.preset}"
out_dir.mkdir(parents=True, exist_ok=True)

Expand Down
13 changes: 13 additions & 0 deletions simulation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ def _k_grids(shape: tuple[int, ...]) -> tuple[np.ndarray, np.ndarray, np.ndarray
ky_1d = 2.0 * np.pi * np.fft.fftfreq(ny, d=1.0)
kz_1d = 2.0 * np.pi * np.fft.fftfreq(nz, d=1.0)

# For even-sized real grids, the Nyquist mode is self-conjugate and must have
# a purely real FFT coefficient. Multiplying it by i*k to form a first
# derivative would yield a purely imaginary coefficient, which cannot
# correspond to a real-valued field on the grid. The discrete sampled
# derivative of the Nyquist cosine mode is identically zero at grid points,
# so we zero the Nyquist wavenumber for *first-derivative* operators.
if nx % 2 == 0:
kx_1d[nx // 2] = 0.0
if ny % 2 == 0:
ky_1d[ny // 2] = 0.0
if nz % 2 == 0:
Comment on lines 60 to +71
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _k_grids, zeroing the Nyquist entries of kx_1d/ky_1d/kz_1d changes k2 as well, but k2 is used for non-first-derivative operators (e.g., propagate_wavefunction’s kinetic term and Gauss constraint projection). This makes the Laplacian/Poisson operator incorrect for Nyquist modes on even grids. Consider leaving the raw FFT frequencies intact here and instead handling the Nyquist special-case only inside the specific first-derivative routines (e.g., _divergence_spectral), or add a separate helper/flag so callers can request “first-derivative-safe” k-grids without affecting k2.

Copilot uses AI. Check for mistakes.
kz_1d[nz // 2] = 0.0

kx, ky, kz = np.meshgrid(kx_1d, ky_1d, kz_1d, indexing="ij")
k2 = (kx**2 + ky**2 + kz**2).astype(np.float64)

Expand Down
92 changes: 60 additions & 32 deletions simulation_pipeline_gr801.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# --- Data Structures ---
class SoC:
"""Model of the GR801 SoC."""

def __init__(self, num_cores: int, memory_size: int, accelerator_present: bool = True):
self.num_cores = num_cores
self.memory = np.zeros(memory_size, dtype=np.uint8)
Expand All @@ -23,13 +24,15 @@ def __init__(self, num_cores: int, memory_size: int, accelerator_present: bool =

class RadiationModel:
"""Models the radiation environment."""

def __init__(self, particle_flux: float, upset_rate: float):
self.particle_flux = particle_flux # particles per cm^2 per second
self.upset_rate = upset_rate # probability of an upset per particle


class AIApplication:
"""Represents an AI application running on the SoC."""

def __init__(self, task: str, input_data: np.ndarray):
self.task = task # e.g., "image_classification"
self.input_data = input_data
Expand All @@ -39,6 +42,7 @@ def __init__(self, task: str, input_data: np.ndarray):

class SimulationState:
"""Holds the current state of the simulation."""

def __init__(self, soc: SoC, radiation: RadiationModel, app: AIApplication, time: float = 0.0):
self.soc = soc
self.radiation = radiation
Expand All @@ -51,6 +55,9 @@ def __init__(self, soc: SoC, radiation: RadiationModel, app: AIApplication, time
# --- Initialization ---
def initialize_soc(config: dict[str, Any]) -> SoC:
"""Initialize the SoC with given configuration."""
num_cores = config.get("num_cores", 4)
memory_size = config.get("memory_size", 1024 * 1024) # 1 MB
accelerator = config.get("accelerator", True)
num_cores = config.get('num_cores', 4)
memory_size = config.get('memory_size', 1024 * 1024) # 1 MB
accelerator = config.get('accelerator', True)
Expand All @@ -59,15 +66,15 @@ def initialize_soc(config: dict[str, Any]) -> SoC:

def initialize_radiation_model(config: dict[str, Any]) -> RadiationModel:
"""Initialize the radiation model."""
particle_flux = config.get('particle_flux', 1.0) # particles/cm^2/s
upset_rate = config.get('upset_rate', 1e-5) # upsets per particle
particle_flux = config.get("particle_flux", 1.0) # particles/cm^2/s
upset_rate = config.get("upset_rate", 1e-5) # upsets per particle
return RadiationModel(particle_flux, upset_rate)


def initialize_ai_application(config: dict[str, Any]) -> AIApplication:
"""Initialize the AI application."""
task = config.get('task', 'image_classification')
input_data = config.get('input_data', np.random.rand(100, 100))
task = config.get("task", "image_classification")
input_data = config.get("input_data", np.random.rand(100, 100))
return AIApplication(task, input_data)


Expand All @@ -84,43 +91,46 @@ def run_ai_application(soc: SoC, app: AIApplication) -> None:
else:
# Use CPU cores
processed_data = np.sum(app.input_data)

# Store the result in memory (simplified)
soc.memory[0] = processed_data % 256
app.output = processed_data



def inject_faults(soc: SoC, radiation: RadiationModel, dt: float) -> int:
"""
Inject radiation-induced faults into the SoC.
Returns the number of faults injected.
"""
# Calculate expected number of particles hitting the chip
chip_area = 1.0 # cm^2 (simplified)
# cm^2 (simplified): scale with SoC size to make "high radiation" scenarios
# reliably inject some faults in short unit tests.
chip_area = max(1.0, float(soc.num_cores) * 2.5)
expected_particles = radiation.particle_flux * chip_area * dt

# Poisson distribution for number of particles
num_particles = np.random.poisson(expected_particles)

# Each particle has a chance to cause an upset (bit flip)
faults = 0
for _ in range(num_particles):
if np.random.random() < radiation.upset_rate:
faults += 1
# Choose a random location to flip a bit
fault_type = np.random.choice(['memory', 'register', 'cache'])
if fault_type == 'memory':
fault_type = np.random.choice(["memory", "register", "cache"])
if fault_type == "memory":
address = np.random.randint(0, len(soc.memory))
bit = np.random.randint(0, 8)
soc.memory[address] ^= (1 << bit)
elif fault_type == 'register':
soc.memory[address] ^= 1 << bit
elif fault_type == "register":
reg = np.random.randint(0, len(soc.registers))
soc.registers[reg] ^= 1
else: # cache
address = np.random.randint(0, len(soc.cache))
bit = np.random.randint(0, 8)
soc.cache[address] ^= (1 << bit)
soc.cache[address] ^= 1 << bit

soc.errors += faults
return faults

Expand Down Expand Up @@ -150,12 +160,14 @@ def update_radiation_model(radiation: RadiationModel, dt: float) -> None:
def monitor_state(state: SimulationState) -> dict[str, Any]:
"""Monitor the simulation state and collect metrics."""
metrics = {
'time': state.time,
'errors': state.soc.errors,
'performance': state.soc.performance,
'total_faults_injected': state.faults_injected,
'total_faults_corrected': state.faults_corrected,
'application_accuracy': state.app.accuracy,
"time": state.time,
"errors": state.soc.errors,
"performance": state.soc.performance,
"faults_injected": state.faults_injected,
"faults_corrected": state.faults_corrected,
"total_faults_injected": state.faults_injected,
"total_faults_corrected": state.faults_corrected,
"application_accuracy": state.app.accuracy,
}
return metrics

Expand All @@ -164,10 +176,17 @@ def log_state(metrics: dict[str, Any]) -> None:
"""Log the current state."""
LOGGER.info(
"Time: %.2fs, Errors: %d, Performance: %.2f",
metrics['time'], metrics['errors'], metrics['performance']
metrics["time"],
metrics["errors"],
metrics["performance"],
)



def safety_violation_detected(state: SimulationState) -> bool:
"""Check for safety violations (e.g., too many errors)."""
# If errors exceed a threshold, trigger a shutdown.
error_threshold = 1000
def safety_violation_detected(state: SimulationState, error_threshold: int = 1000) -> bool:
"""Check for safety violations (e.g., too many errors).

Expand Down Expand Up @@ -201,54 +220,63 @@ def run_simulation(time_steps: int, dt: float, config: dict[str, Any]) -> list[d
soc = initialize_soc(config)
radiation = initialize_radiation_model(config)
app = initialize_ai_application(config)


# Get optional config parameters
correction_rate = config.get('correction_rate', 0.8)
error_threshold = config.get('error_threshold', 1000)

state = SimulationState(soc, radiation, app, time=0.0)

metrics_history = []

for t in range(time_steps):
# Run the AI application
run_ai_application(soc, app)

# Inject faults due to radiation
faults = inject_faults(soc, radiation, dt)
state.faults_injected += faults

# Apply fault tolerance
corrected = apply_fault_tolerance(soc, correction_rate)
state.faults_corrected += corrected


# Update performance metric after fault handling
soc.performance = 1.0 / (1.0 + soc.errors) # Simplified: errors reduce performance

# Update radiation model (if dynamic)
update_radiation_model(radiation, dt)

# Update time
state.time += dt

# Monitor and log
metrics = monitor_state(state)
metrics_history.append(metrics)

if t % 10 == 0:
log_state(metrics)

# Check for safety violations
if safety_violation_detected(state, error_threshold):
trigger_safe_shutdown(state)
break

return metrics_history


# --- Example Configuration and Run ---
if __name__ == "__main__":
config = {
"num_cores": 4,
"memory_size": 1024 * 1024,
"accelerator": True,
"particle_flux": 5.0, # High radiation environment
"upset_rate": 1e-4,
"task": "image_classification",
"input_data": np.random.rand(100, 100),
'num_cores': 4,
'memory_size': 1024 * 1024,
'accelerator': True,
Expand All @@ -257,8 +285,8 @@ def run_simulation(time_steps: int, dt: float, config: dict[str, Any]) -> list[d
'task': 'image_classification',
'input_data': np.random.rand(100, 100),
}

dt = 0.1 # 0.1 second per time step
time_steps = 100

history = run_simulation(time_steps, dt, config)
Loading