Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 27 additions & 40 deletions src/accessiweather/alert_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
alert identification and user controls.
"""

import json
import logging
import os
import time
from collections import deque
from datetime import UTC, datetime, timedelta
Expand All @@ -28,6 +26,7 @@
SEVERITY_PRIORITY_MAP,
)
from .models import WeatherAlert, WeatherAlerts
from .runtime_state import RuntimeStateManager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -251,12 +250,19 @@ def should_notify_category(self, event: str) -> bool:
class AlertManager:
"""Manages alert state tracking, change detection, and notifications."""

def __init__(self, config_dir: str, settings: AlertSettings | None = None):
def __init__(
self,
config_dir: str,
settings: AlertSettings | None = None,
runtime_state_manager: RuntimeStateManager | None = None,
):
"""Initialize the instance."""
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)

self.state_file = self.config_dir / "alert_state.json"
self.runtime_state_manager = runtime_state_manager or RuntimeStateManager(self.config_dir)
self.state_file = self.runtime_state_manager.state_file
self.legacy_state_file = self.runtime_state_manager.legacy_alert_state_file
self.settings = settings or AlertSettings()

# In-memory state tracking
Expand All @@ -276,7 +282,7 @@ def __init__(self, config_dir: str, settings: AlertSettings | None = None):
# Defer state loading until first use for faster startup
self._state_loaded = False

logger.info(f"AlertManager initialized with state file: {self.state_file}")
logger.info("AlertManager initialized with runtime state file: %s", self.state_file)

def _ensure_state_loaded(self):
"""Ensure state is loaded before first use (lazy loading)."""
Expand All @@ -288,24 +294,18 @@ def _ensure_state_loaded(self):
def _load_state(self):
"""Load alert state from persistent storage."""
try:
if self.state_file.exists():
with open(self.state_file) as f:
data = json.load(f)

# Load alert states
for state_data in data.get("alert_states", []):
state = AlertState.from_dict(state_data)
self.alert_states[state.alert_id] = state

# Load global state
if data.get("last_global_notification"):
self.last_global_notification = datetime.fromisoformat(
data["last_global_notification"]
)

logger.info(f"Loaded {len(self.alert_states)} alert states from storage")
else:
logger.info("No existing alert state file found, starting fresh")
data = self.runtime_state_manager.load_section("alerts")

for state_data in data.get("alert_states", []):
state = AlertState.from_dict(state_data)
self.alert_states[state.alert_id] = state

if data.get("last_global_notification"):
self.last_global_notification = datetime.fromisoformat(
data["last_global_notification"]
)

logger.info("Loaded %d alert states from runtime storage", len(self.alert_states))

except Exception as e:
logger.error(f"Failed to load alert state: {e}")
Expand All @@ -324,24 +324,11 @@ def _save_state(self):
if self.last_global_notification
else None
),
"saved_at": datetime.now(UTC).isoformat(),
}

# Write atomically
temp_file = self.state_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
json.dump(data, f, indent=2)

temp_file.replace(self.state_file)

# Set secure permissions on POSIX systems
try:
if os.name != "nt":
os.chmod(self.state_file, 0o600)
except Exception:
logger.debug("Could not set strict permissions on alert state file", exc_info=True)

logger.debug(f"Saved {len(self.alert_states)} alert states to storage")
if self.runtime_state_manager.save_section("alerts", data):
logger.debug("Saved %d alert states to runtime storage", len(self.alert_states))
else:
logger.error("Failed to save alert runtime state")

except Exception as e:
logger.error(f"Failed to save alert state: {e}")
Expand Down
8 changes: 6 additions & 2 deletions src/accessiweather/app_initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ def initialize_components(app: AccessiWeatherApp) -> None:
from .alert_manager import AlertManager
from .alert_notification_system import AlertNotificationSystem

config_dir_str = str(app.paths.config)
config_dir_str = str(app.config_manager.config_dir)
alert_settings = config.settings.to_alert_settings()
app.alert_manager = AlertManager(config_dir_str, alert_settings)
app.alert_manager = AlertManager(
config_dir_str,
alert_settings,
runtime_state_manager=app.runtime_state_manager,
)
app.alert_notification_system = AlertNotificationSystem(
app.alert_manager, app._notifier, config.settings
)
Expand Down
80 changes: 66 additions & 14 deletions src/accessiweather/notifications/notification_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from pathlib import Path
from typing import TYPE_CHECKING

from ..runtime_state import RuntimeStateManager

if TYPE_CHECKING:
from ..models import AppSettings, CurrentConditions, WeatherData

Expand Down Expand Up @@ -217,45 +219,95 @@ class NotificationEventManager:
Both notifications are opt-in (disabled by default).
"""

def __init__(self, state_file: Path | None = None):
def __init__(
self,
state_file: Path | None = None,
runtime_state_manager: RuntimeStateManager | None = None,
):
"""
Initialize the notification event manager.

Args:
state_file: Optional path to persist notification state
runtime_state_manager: Optional unified runtime-state manager

"""
self.state_file = state_file
self.runtime_state_manager = runtime_state_manager
self.state = NotificationState()
self._load_state()
logger.info("NotificationEventManager initialized")

def _load_state(self) -> None:
"""Load state from file if available."""
if not self.state_file or not self.state_file.exists():
return

try:
with open(self.state_file, encoding="utf-8") as f:
data = json.load(f)
if self.runtime_state_manager is not None:
data = self._runtime_section_to_legacy_shape(
self.runtime_state_manager.load_section("notification_events")
)
elif self.state_file and self.state_file.exists():
with open(self.state_file, encoding="utf-8") as f:
data = json.load(f)
else:
return
self.state = NotificationState.from_dict(data)
logger.debug("Loaded notification state from %s", self.state_file)
logger.debug(
"Loaded notification state from %s",
self.runtime_state_manager.state_file
if self.runtime_state_manager
else self.state_file,
)
except Exception as e:
logger.warning("Failed to load notification state: %s", e)

def _save_state(self) -> None:
"""Save state to file if configured."""
if not self.state_file:
return

try:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(self.state.to_dict(), f, indent=2)
logger.debug("Saved notification state to %s", self.state_file)
if self.runtime_state_manager is not None:
self.runtime_state_manager.save_section(
"notification_events",
self._legacy_shape_to_runtime_section(self.state.to_dict()),
)
logger.debug(
"Saved notification state to %s", self.runtime_state_manager.state_file
)
elif self.state_file:
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(self.state.to_dict(), f, indent=2)
logger.debug("Saved notification state to %s", self.state_file)
except Exception as e:
logger.warning("Failed to save notification state: %s", e)

@staticmethod
def _runtime_section_to_legacy_shape(section: dict) -> dict:
"""Convert unified runtime state to the legacy notification-state shape."""
discussion = section.get("discussion", {})
severe_risk = section.get("severe_risk", {})
return {
"last_discussion_issuance_time": discussion.get("last_issuance_time"),
"last_discussion_text": discussion.get("last_text"),
"last_severe_risk": severe_risk.get("last_value"),
"last_check_time": discussion.get("last_check_time")
or severe_risk.get("last_check_time"),
}

@staticmethod
def _legacy_shape_to_runtime_section(data: dict) -> dict:
"""Convert legacy notification-state payloads to the unified section shape."""
last_check_time = data.get("last_check_time")
return {
"discussion": {
"last_issuance_time": data.get("last_discussion_issuance_time"),
"last_text": data.get("last_discussion_text"),
"last_check_time": last_check_time,
},
"severe_risk": {
"last_value": data.get("last_severe_risk"),
"last_check_time": last_check_time,
},
}

def check_for_events(
self,
weather_data: WeatherData,
Expand Down
Loading
Loading