diff --git a/.gitignore b/.gitignore index c93dc49..df82bc0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,17 @@ +# Ignore most of extracted project except selected source files QIKI_DTMP/* -!QIKI_DTMP/docs/ -QIKI_DTMP/docs/* -!QIKI_DTMP/docs/file_analysis/ -!QIKI_DTMP/docs/file_analysis/*.md +!QIKI_DTMP/services/ +QIKI_DTMP/services/* +!QIKI_DTMP/services/q_core_agent/ +QIKI_DTMP/services/q_core_agent/* +!QIKI_DTMP/services/q_core_agent/state/ +QIKI_DTMP/services/q_core_agent/state/* +!QIKI_DTMP/services/q_core_agent/state/types.py +!QIKI_DTMP/services/q_core_agent/state/store.py +!QIKI_DTMP/services/q_core_agent/state/tests/ +QIKI_DTMP/services/q_core_agent/state/tests/* +!QIKI_DTMP/services/q_core_agent/state/tests/test_types.py +!QIKI_DTMP/services/q_sim_service/ +QIKI_DTMP/services/q_sim_service/* +!QIKI_DTMP/services/q_sim_service/main.py QIKI_DTMP.zip diff --git a/QIKI_DTMP/services/q_core_agent/state/store.py b/QIKI_DTMP/services/q_core_agent/state/store.py new file mode 100644 index 0000000..1455ffb --- /dev/null +++ b/QIKI_DTMP/services/q_core_agent/state/store.py @@ -0,0 +1,264 @@ +""" +AsyncStateStore - потокобезопасное хранилище FSM состояний. +Single Source of Truth (SSOT) для FSM состояния в Q-Core процессе. +""" +import asyncio +from typing import Optional, List, Callable, Any, Dict, Set +import logging +import time +from dataclasses import replace + +from .types import FsmSnapshotDTO, initial_snapshot + + +logger = logging.getLogger(__name__) + + +class StateStoreError(Exception): + """Базовое исключение для ошибок StateStore""" + pass + + +class StateVersionError(StateStoreError): + """Ошибка версионирования состояния""" + pass + + +class AsyncStateStore: + """ + Async-only StateStore для FSM состояния. + + Ключевые принципы: + - Только один писатель (FSMHandler) + - Множественные читатели (логи, gRPC, CLI) + - Pub/Sub через asyncio.Queue для подписчиков + - Версионирование и защита от дублирования + - Иммутабельные DTO снапшоты + """ + + def __init__(self, initial_state: Optional[FsmSnapshotDTO] = None): + self._lock = asyncio.Lock() + self._snap: Optional[FsmSnapshotDTO] = initial_state + self._subscribers: List[asyncio.Queue] = [] + self._subscriber_ids: Dict[int, str] = {} # для отладки + self._metrics: Dict[str, Any] = { + 'total_sets': 0, + 'total_gets': 0, + 'version_conflicts': 0, + 'subscriber_count': 0, + 'last_update_ts': 0.0, + 'creation_ts': time.time() + } + + async def get(self) -> Optional[FsmSnapshotDTO]: + """ + Получить текущий снапшот состояния. + Возвращает immutable DTO или None если состояние не инициализировано. + """ + async with self._lock: + self._metrics['total_gets'] += 1 + # DTO уже immutable, можно возвращать как есть + return self._snap + + async def get_with_meta(self) -> tuple[Optional[FsmSnapshotDTO], Dict[str, Any]]: + """Получить состояние с метаинформацией""" + async with self._lock: + self._metrics['total_gets'] += 1 + meta = { + 'store_metrics': dict(self._metrics), + 'subscriber_count': len(self._subscribers), + 'has_state': self._snap is not None, + 'current_version': self._snap.version if self._snap else -1 + } + return self._snap, meta + + async def set(self, new_snap: FsmSnapshotDTO, enforce_version: bool = False) -> FsmSnapshotDTO: + """ + Установить новое состояние. + + Args: + new_snap: Новый снапшот состояния + enforce_version: Если True, проверяет что версия новее текущей + + Returns: + Установленный снапшот (может отличаться от входного по версии) + + Raises: + StateVersionError: При нарушении версионности + """ + if new_snap is None: + raise StateStoreError("Попытка установить None состояние") + + async with self._lock: + if self._snap is None: + if enforce_version and new_snap.version <= 0: + self._metrics['version_conflicts'] += 1 + raise StateVersionError( + f"Версия {new_snap.version} не больше текущей 0" + ) + new_version = 1 if new_snap.version == 0 else new_snap.version + else: + current_version = self._snap.version + if enforce_version and new_snap.version <= current_version: + self._metrics['version_conflicts'] += 1 + raise StateVersionError( + f"Версия {new_snap.version} не больше текущей {current_version}" + ) + if new_snap.version <= current_version: + new_version = current_version + 1 + else: + new_version = new_snap.version + + new_snap = replace(new_snap, version=new_version) + + self._snap = new_snap + self._metrics['total_sets'] += 1 + self._metrics['last_update_ts'] = time.time() + + # Уведомляем подписчиков + await self._notify_subscribers(new_snap) + + state_name = getattr(new_snap.state, 'name', str(new_snap.state)) + logger.debug( + f"StateStore updated: version={new_snap.version}, " + f"state={state_name}, reason='{new_snap.reason}'" + ) + + return self._snap + + async def subscribe(self, subscriber_id: str = "unknown") -> asyncio.Queue: + """ + Подписаться на изменения состояния. + + Returns: + asyncio.Queue с FsmSnapshotDTO объектами при изменениях + """ + queue = asyncio.Queue(maxsize=64) + + async with self._lock: + self._subscribers.append(queue) + queue_id = id(queue) + self._subscriber_ids[queue_id] = subscriber_id + self._metrics['subscriber_count'] = len(self._subscribers) + + # Отправляем текущее состояние новому подписчику + if self._snap is not None: + try: + queue.put_nowait(self._snap) + except asyncio.QueueFull: + logger.warning(f"Queue full for new subscriber {subscriber_id}") + + logger.debug(f"New subscriber: {subscriber_id}, total: {len(self._subscribers)}") + + return queue + + async def unsubscribe(self, queue: asyncio.Queue): + """Отписаться от уведомлений""" + async with self._lock: + if queue in self._subscribers: + self._subscribers.remove(queue) + queue_id = id(queue) + subscriber_id = self._subscriber_ids.pop(queue_id, "unknown") + self._metrics['subscriber_count'] = len(self._subscribers) + logger.debug(f"Unsubscribed: {subscriber_id}, remaining: {len(self._subscribers)}") + + async def _notify_subscribers(self, snap: FsmSnapshotDTO): + """Уведомить всех подписчиков о новом состоянии""" + dead_queues = [] + + for queue in self._subscribers: + try: + queue.put_nowait(snap) + except asyncio.QueueFull: + # Очередь переполнена - считаем подписчика неактивным + queue_id = id(queue) + subscriber_id = self._subscriber_ids.get(queue_id, "unknown") + logger.warning(f"Subscriber {subscriber_id} queue full, removing") + dead_queues.append(queue) + except Exception as e: + # Очередь мертва - помечаем для удаления + queue_id = id(queue) + subscriber_id = self._subscriber_ids.get(queue_id, "unknown") + logger.warning(f"Dead subscriber {subscriber_id}: {e}") + dead_queues.append(queue) + + # Удаляем мертвые очереди + for dead_queue in dead_queues: + try: + self._subscribers.remove(dead_queue) + queue_id = id(dead_queue) + self._subscriber_ids.pop(queue_id, None) + except ValueError: + pass # Уже удалена + + if dead_queues: + self._metrics['subscriber_count'] = len(self._subscribers) + + async def initialize_if_empty(self) -> FsmSnapshotDTO: + """Инициализировать начальным состоянием если пусто""" + async with self._lock: + if self._snap is None: + self._snap = initial_snapshot() + self._metrics['total_sets'] += 1 + self._metrics['last_update_ts'] = time.time() + + # Уведомляем подписчиков + await self._notify_subscribers(self._snap) + + logger.info("StateStore initialized with COLD_START state") + + return self._snap + + async def get_metrics(self) -> Dict[str, Any]: + """Получить метрики работы StateStore""" + async with self._lock: + self._metrics['total_gets'] += 1 + uptime = time.time() - self._metrics['creation_ts'] + current_state_name = self._snap.state.name if self._snap else "UNINITIALIZED" + return { + **self._metrics, + 'uptime_seconds': uptime, + 'current_version': self._snap.version if self._snap else -1, + 'current_state': current_state_name, + 'active_subscribers': len(self._subscribers) + } + + async def health_check(self) -> Dict[str, Any]: + """Проверка здоровья StateStore""" + metrics = await self.get_metrics() + + health = { + 'healthy': True, + 'issues': [], + 'metrics': metrics + } + + # Проверки здоровья + if metrics['version_conflicts'] > metrics['total_sets'] * 0.1: + health['healthy'] = False + health['issues'].append("Высокий процент конфликтов версий") + + if len(self._subscribers) > 100: + health['issues'].append("Много подписчиков, возможна утечка") + + if self._snap is None: + health['healthy'] = False + health['issues'].append("StateStore не инициализирован") + + return health + + +# Удобные функции для создания store +def create_store(initial_state: Optional[FsmSnapshotDTO] = None) -> AsyncStateStore: + """Создать новый AsyncStateStore""" + return AsyncStateStore(initial_state) + + +def create_initialized_store() -> AsyncStateStore: + """Создать StateStore с начальным состоянием COLD_START""" + return AsyncStateStore(initial_snapshot()) + + +# Константы для тестирования +TEST_SUBSCRIBER_TIMEOUT = 5.0 # таймаут для тестовых подписчиков +MAX_QUEUE_SIZE = 64 # размер очереди подписчиков \ No newline at end of file diff --git a/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py b/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py new file mode 100644 index 0000000..ce9bd0e --- /dev/null +++ b/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py @@ -0,0 +1,410 @@ +""" +Серьёзные unit тесты для DTO типов StateStore. +Проверяют immutability, создание снапшотов, переходы состояний. +""" +import pytest +import time +import uuid +from dataclasses import FrozenInstanceError + +from ..types import ( + FsmSnapshotDTO, TransitionDTO, FsmState, TransitionStatus, + initial_snapshot, create_transition, next_snapshot +) + + +class TestFsmState: + """Тесты enum'а FsmState""" + + def test_fsm_state_values(self): + """Проверяем корректность значений enum""" + assert FsmState.UNSPECIFIED == 0 + assert FsmState.BOOTING == 1 + assert FsmState.IDLE == 2 + assert FsmState.ACTIVE == 3 + assert FsmState.ERROR_STATE == 4 + assert FsmState.SHUTDOWN == 5 + + def test_fsm_state_names(self): + """Проверяем корректность имён enum""" + assert FsmState.BOOTING.name == 'BOOTING' + assert FsmState.IDLE.name == 'IDLE' + assert FsmState.ACTIVE.name == 'ACTIVE' + assert FsmState.ERROR_STATE.name == 'ERROR_STATE' + + +class TestTransitionDTO: + """Тесты TransitionDTO - immutable переходы состояний""" + + def test_transition_creation(self): + """Тест создания перехода""" + transition = TransitionDTO( + from_state=FsmState.BOOTING, + to_state=FsmState.IDLE, + trigger_event="BOOT_COMPLETE", + status=TransitionStatus.SUCCESS + ) + + assert transition.from_state == FsmState.BOOTING + assert transition.to_state == FsmState.IDLE + assert transition.trigger_event == "BOOT_COMPLETE" + assert transition.status == TransitionStatus.SUCCESS + assert transition.ts_mono > 0 # автоустановка времени + assert transition.ts_wall > 0 + + def test_transition_immutability(self): + """Тест неизменяемости TransitionDTO""" + transition = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ACTIVE, + trigger_event="TEST" + ) + + # Попытка изменения должна упасть + with pytest.raises(FrozenInstanceError): + transition.from_state = FsmState.BOOTING + + with pytest.raises(FrozenInstanceError): + transition.trigger_event = "MODIFIED" + + def test_transition_with_error(self): + """Тест перехода с ошибкой""" + transition = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ERROR_STATE, + trigger_event="BIOS_ERROR", + status=TransitionStatus.FAILED, + error_message="BIOS check failed" + ) + + assert transition.status == TransitionStatus.FAILED + assert transition.error_message == "BIOS check failed" + + def test_create_transition_helper(self): + """Тест helper функции create_transition""" + transition = create_transition( + from_state=FsmState.ACTIVE, + to_state=FsmState.IDLE, + trigger="NO_PROPOSALS", + status=TransitionStatus.SUCCESS, + error_msg="" + ) + + assert isinstance(transition, TransitionDTO) + assert transition.from_state == FsmState.ACTIVE + assert transition.to_state == FsmState.IDLE + assert transition.trigger_event == "NO_PROPOSALS" + + +class TestFsmSnapshotDTO: + """Тесты FsmSnapshotDTO - основной DTO для состояния FSM""" + + def test_snapshot_creation(self): + """Тест создания снапшота""" + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason="TEST_REASON" + ) + + assert snapshot.version == 1 + assert snapshot.state == FsmState.IDLE + assert snapshot.reason == "TEST_REASON" + assert snapshot.ts_mono > 0 # автоустановка времени + assert snapshot.ts_wall > 0 + assert snapshot.snapshot_id # автогенерация UUID + assert snapshot.fsm_instance_id + + def test_snapshot_immutability(self): + """Тест неизменяемости FsmSnapshotDTO""" + snapshot = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + + # Попытка изменения должна упасть + with pytest.raises(FrozenInstanceError): + snapshot.version = 2 + + with pytest.raises(FrozenInstanceError): + snapshot.state = FsmState.ACTIVE + + def test_snapshot_with_history(self): + """Тест снапшота с историей переходов""" + transition1 = create_transition(FsmState.BOOTING, FsmState.IDLE, "BOOT_COMPLETE") + transition2 = create_transition(FsmState.IDLE, FsmState.ACTIVE, "PROPOSALS_RECEIVED") + + snapshot = FsmSnapshotDTO( + version=2, + state=FsmState.ACTIVE, + history=[transition1, transition2] + ) + + assert len(snapshot.history) == 2 + assert snapshot.history[0] == transition1 + assert snapshot.history[1] == transition2 + + # История immutable + with pytest.raises(AttributeError): + snapshot.history.append(transition1) + + def test_snapshot_with_metadata(self): + """Тест снапшота с метаданными""" + context_data = {"sensor_count": "5", "proposal_count": "3"} + state_metadata = {"debug_mode": "true", "test_run": "false"} + + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.ACTIVE, + context_data=context_data, + state_metadata=state_metadata + ) + + assert snapshot.context_data == context_data + assert snapshot.state_metadata == state_metadata + + def test_snapshot_defaults(self): + """Тест значений по умолчанию""" + snapshot = FsmSnapshotDTO(version=0, state=FsmState.BOOTING) + + assert snapshot.prev_state is None + assert snapshot.source_module == "fsm_handler" + assert snapshot.attempt_count == 0 + assert snapshot.history == () + assert snapshot.context_data == {} + assert snapshot.state_metadata == {} + + def test_add_transition_history_limit(self): + """Проверяем ограничение истории при добавлении переходов""" + snap = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + snap = snap.add_transition(create_transition(FsmState.IDLE, FsmState.ACTIVE, "E1"), max_history=2) + snap = snap.add_transition(create_transition(FsmState.ACTIVE, FsmState.ERROR_STATE, "E2"), max_history=2) + snap = snap.add_transition(create_transition(FsmState.ERROR_STATE, FsmState.IDLE, "E3"), max_history=2) + + assert len(snap.history) == 2 + assert [t.trigger_event for t in snap.history] == ["E2", "E3"] + + def test_snapshot_uuid_validation(self): + """Тест генерации и валидации UUID""" + snapshot = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + + # UUID должны быть валидными + uuid.UUID(snapshot.snapshot_id) # не должно упасть + uuid.UUID(snapshot.fsm_instance_id) # не должно упасть + + # UUID должны быть уникальными + snapshot2 = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + assert snapshot.snapshot_id != snapshot2.snapshot_id + assert snapshot.fsm_instance_id != snapshot2.fsm_instance_id + + +class TestInitialSnapshot: + """Тесты функции initial_snapshot""" + + def test_initial_snapshot_creation(self): + """Тест создания начального снапшота""" + snapshot = initial_snapshot() + + assert snapshot.version == 0 + assert snapshot.state == FsmState.BOOTING + assert snapshot.prev_state is None + assert snapshot.reason == "COLD_START" + assert snapshot.source_module == "initial_boot" + assert snapshot.attempt_count == 0 + + def test_initial_snapshot_immutability(self): + """Тест неизменяемости начального снапшота""" + snapshot = initial_snapshot() + + with pytest.raises(FrozenInstanceError): + snapshot.version = 1 + + with pytest.raises(FrozenInstanceError): + snapshot.state = FsmState.IDLE + + def test_initial_snapshot_timing(self): + """Тест временных меток начального снапшота""" + before = time.time() + snapshot = initial_snapshot() + after = time.time() + + # Времена должны быть в разумных пределах + assert before <= snapshot.ts_wall <= after + assert snapshot.ts_mono > 0 + + +class TestNextSnapshot: + """Тесты функции next_snapshot - ключевая для переходов""" + + def test_state_change_increments_version(self): + """Тест инкремента версии при изменении состояния""" + current = FsmSnapshotDTO(version=1, state=FsmState.BOOTING) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.IDLE, + reason="BOOT_COMPLETE" + ) + + assert next_snap.version == 2 # версия увеличилась + assert next_snap.state == FsmState.IDLE + assert next_snap.prev_state == FsmState.BOOTING + assert next_snap.reason == "BOOT_COMPLETE" + assert next_snap.attempt_count == 1 # попытка увеличилась + + def test_no_state_change_keeps_version(self): + """Тест сохранения версии при отсутствии изменений""" + current = FsmSnapshotDTO(version=5, state=FsmState.IDLE) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.IDLE, # то же состояние + reason="NO_CHANGE" + ) + + assert next_snap.version == 5 # версия не изменилась + assert next_snap.state == FsmState.IDLE + assert next_snap.prev_state == current.prev_state # сохранилось предыдущее + assert next_snap.attempt_count == current.attempt_count # не увеличилось + + def test_next_snapshot_with_transition(self): + """Тест создания следующего снапшота с переходом""" + current = FsmSnapshotDTO(version=2, state=FsmState.IDLE) + transition = create_transition(FsmState.IDLE, FsmState.ACTIVE, "PROPOSALS_RECEIVED") + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="PROPOSALS_RECEIVED", + transition=transition + ) + + assert len(next_snap.history) == len(current.history) + 1 + assert next_snap.history[-1] == transition # переход добавлен в конец + + def test_next_snapshot_preserves_instance_id(self): + """Тест сохранения instance_id между снапшотами""" + instance_id = str(uuid.uuid4()) + current = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + fsm_instance_id=instance_id + ) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="STATE_CHANGE" + ) + + assert next_snap.fsm_instance_id == instance_id # сохранился + assert next_snap.snapshot_id != current.snapshot_id # но snapshot_id новый + + def test_next_snapshot_preserves_metadata(self): + """Тест сохранения метаданных между снапшотами""" + context_data = {"key1": "value1"} + state_metadata = {"key2": "value2"} + + current = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + context_data=context_data, + state_metadata=state_metadata + ) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="CHANGE" + ) + + # Метаданные скопированы (но это новые dict'ы) + assert next_snap.context_data == context_data + assert next_snap.state_metadata == state_metadata + assert next_snap.context_data is not current.context_data # новый объект + assert next_snap.state_metadata is not current.state_metadata # новый объект + + +class TestEdgeCases: + """Тесты граничных случаев и ошибок""" + + def test_empty_strings_and_none_values(self): + """Тест обработки пустых строк и None значений""" + snapshot = FsmSnapshotDTO( + version=0, + state=FsmState.IDLE, + reason="", # пустая строка + snapshot_id="", # пустая строка -> должна автогенерироваться + prev_state=None, # None значение + history=None, # None -> должен стать [] + context_data=None, # None -> должен стать {} + ) + + assert snapshot.reason == "" + assert snapshot.snapshot_id # автогенерированный UUID + assert snapshot.prev_state is None + assert snapshot.history == () + assert snapshot.context_data == {} + + def test_large_history_handling(self): + """Тест обработки большой истории переходов""" + # Создаём большую историю переходов + large_history = [ + create_transition(FsmState.IDLE, FsmState.ACTIVE, f"EVENT_{i}") + for i in range(1000) + ] + + snapshot = FsmSnapshotDTO( + version=1000, + state=FsmState.ACTIVE, + history=large_history + ) + + assert len(snapshot.history) == 1000 + assert all(isinstance(t, TransitionDTO) for t in snapshot.history) + + def test_version_overflow_behavior(self): + """Тест поведения при больших номерах версий""" + large_version = 2**63 - 1 # максимальный int64 + + snapshot = FsmSnapshotDTO( + version=large_version, + state=FsmState.IDLE + ) + + # Следующий снапшот должен обработать overflow корректно + # (в Python int'ы не переполняются, но тестируем логику) + next_snap = next_snapshot( + current=snapshot, + new_state=FsmState.ACTIVE, + reason="TEST" + ) + + assert next_snap.version == large_version + 1 + + def test_unicode_strings(self): + """Тест обработки unicode строк""" + unicode_reason = "Причина на русском 🚀" + unicode_trigger = "事件_中文" + + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason=unicode_reason + ) + + transition = create_transition( + FsmState.IDLE, + FsmState.ACTIVE, + unicode_trigger + ) + + assert snapshot.reason == unicode_reason + assert transition.trigger_event == unicode_trigger + + # Должно работать с next_snapshot + next_snap = next_snapshot( + current=snapshot, + new_state=FsmState.ACTIVE, + reason=unicode_reason, + transition=transition + ) + + assert next_snap.reason == unicode_reason \ No newline at end of file diff --git a/QIKI_DTMP/services/q_core_agent/state/types.py b/QIKI_DTMP/services/q_core_agent/state/types.py new file mode 100644 index 0000000..b0b5847 --- /dev/null +++ b/QIKI_DTMP/services/q_core_agent/state/types.py @@ -0,0 +1,182 @@ +""" +DTO модель для FSM состояний без зависимостей от protobuf. +Immutable dataclasses для безопасной работы с состоянием. +""" +from dataclasses import dataclass, replace +from enum import IntEnum +from typing import Optional, Dict, Any, Tuple +import time +import uuid + + +class FsmState(IntEnum): + """FSM состояния (копия из protobuf enum без привязки к proto)""" + UNSPECIFIED = 0 + BOOTING = 1 + IDLE = 2 + ACTIVE = 3 + ERROR_STATE = 4 + SHUTDOWN = 5 + + +class TransitionStatus(IntEnum): + """Статус перехода FSM""" + UNSPECIFIED = 0 + SUCCESS = 1 + FAILED = 2 + PENDING = 3 + + +@dataclass(frozen=True) +class TransitionDTO: + """DTO для перехода состояния FSM (immutable)""" + from_state: FsmState + to_state: FsmState + trigger_event: str + status: TransitionStatus = TransitionStatus.SUCCESS + error_message: str = "" + ts_mono: float = 0.0 + ts_wall: float = 0.0 + + def __post_init__(self): + # Устанавливаем временные метки если не заданы + if self.ts_mono == 0.0: + object.__setattr__(self, 'ts_mono', time.monotonic()) + if self.ts_wall == 0.0: + object.__setattr__(self, 'ts_wall', time.time()) + + +@dataclass(frozen=True) +class FsmSnapshotDTO: + """ + DTO для снапшота FSM состояния (immutable). + Внутренняя модель без protobuf зависимостей. + """ + # Обязательные поля + version: int + state: FsmState + reason: str = "" + + # Временные метки + ts_mono: float = 0.0 # monotonic time для порядка + ts_wall: float = 0.0 # wall clock time для логов + + # Опциональные поля + snapshot_id: str = "" + prev_state: Optional[FsmState] = None + fsm_instance_id: str = "" + source_module: str = "fsm_handler" + attempt_count: int = 0 + + # История переходов и метаданные + history: Tuple[TransitionDTO, ...] | None = None + context_data: Dict[str, str] = None + state_metadata: Dict[str, str] = None + + def __post_init__(self): + # Устанавливаем значения по умолчанию для mutable полей + if self.history is None: + object.__setattr__(self, 'history', tuple()) + else: + object.__setattr__(self, 'history', tuple(self.history)) + if self.context_data is None: + object.__setattr__(self, 'context_data', {}) + if self.state_metadata is None: + object.__setattr__(self, 'state_metadata', {}) + + # Устанавливаем временные метки если не заданы + if self.ts_mono == 0.0: + object.__setattr__(self, 'ts_mono', time.monotonic()) + if self.ts_wall == 0.0: + object.__setattr__(self, 'ts_wall', time.time()) + + # Генерируем ID если не задан + if not self.snapshot_id: + object.__setattr__(self, 'snapshot_id', str(uuid.uuid4())) + if not self.fsm_instance_id: + object.__setattr__(self, 'fsm_instance_id', str(uuid.uuid4())) + + def add_transition(self, transition: "TransitionDTO", max_history: int | None = None) -> "FsmSnapshotDTO": + """Вернуть новый снапшот с добавленным переходом. + + Args: + transition: TransitionDTO для добавления в историю + max_history: Максимальная длина истории. При превышении + самые старые элементы удаляются. + + Returns: + Новый FsmSnapshotDTO с обновлённой историей. + """ + new_history = self.history + (transition,) + + if max_history is not None and max_history > 0 and len(new_history) > max_history: + new_history = new_history[-max_history:] + + return replace(self, history=new_history) + + +def initial_snapshot() -> FsmSnapshotDTO: + """Создаёт начальный снапшот для COLD_START""" + now_mono = time.monotonic() + now_wall = time.time() + + return FsmSnapshotDTO( + version=0, + state=FsmState.BOOTING, + prev_state=None, + reason="COLD_START", + ts_mono=now_mono, + ts_wall=now_wall, + source_module="initial_boot", + attempt_count=0 + ) + + +def create_transition( + from_state: FsmState, + to_state: FsmState, + trigger: str, + status: TransitionStatus = TransitionStatus.SUCCESS, + error_msg: str = "" +) -> TransitionDTO: + """Хелпер для создания перехода состояния""" + return TransitionDTO( + from_state=from_state, + to_state=to_state, + trigger_event=trigger, + status=status, + error_message=error_msg + ) + + +def next_snapshot( + current: FsmSnapshotDTO, + new_state: FsmState, + reason: str, + transition: Optional[TransitionDTO] = None +) -> FsmSnapshotDTO: + """ + Создаёт новый снапшот на базе текущего с обновлённым состоянием. + Увеличивает версию только при реальном изменении состояния. + """ + version_increment = 1 if new_state != current.state else 0 + new_version = current.version + version_increment + + # Новая история переходов + new_history = current.history if current.history else tuple() + if transition: + new_history = new_history + (transition,) + + return FsmSnapshotDTO( + version=new_version, + state=new_state, + prev_state=current.state if new_state != current.state else current.prev_state, + reason=reason, + snapshot_id=str(uuid.uuid4()), # новый ID для нового снапшота + fsm_instance_id=current.fsm_instance_id, # сохраняем instance ID + source_module=current.source_module, + attempt_count=current.attempt_count + (1 if new_state != current.state else 0), + history=new_history, + context_data=dict(current.context_data) if current.context_data else {}, + state_metadata=dict(current.state_metadata) if current.state_metadata else {} + ) \ No newline at end of file diff --git a/QIKI_DTMP/services/q_sim_service/main.py b/QIKI_DTMP/services/q_sim_service/main.py new file mode 100644 index 0000000..629434a --- /dev/null +++ b/QIKI_DTMP/services/q_sim_service/main.py @@ -0,0 +1,92 @@ + +import time +import sys +import os +import yaml +import asyncio +from typing import Dict, Any + +# Добавляем корневую директорию проекта в sys.path +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_DIR) + +from services.q_core_agent.core.agent_logger import setup_logging, logger +from services.q_sim_service.core.world_model import WorldModel +from generated.sensor_raw_in_pb2 import SensorReading +from generated.actuator_raw_out_pb2 import ActuatorCommand +from generated.common_types_pb2 import UUID +from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.json_format import MessageToDict + +class QSimService: + def __init__(self, config: Dict[str, Any]): + self.config = config + self.world_model = WorldModel() + queue_size = self.config.get("queue_maxsize", 64) + self.sensor_data_queue = asyncio.Queue(maxsize=queue_size) + self.actuator_command_queue = asyncio.Queue(maxsize=queue_size) + logger.info("QSimService initialized.") + + def generate_sensor_data(self) -> SensorReading: + # Generate sensor data based on world model state + timestamp = Timestamp() + timestamp.GetCurrentTime() + world_state = self.world_model.get_state() + return SensorReading( + sensor_id=UUID(value="sim_lidar_front"), + sensor_type=self.config.get("sim_sensor_type", 1), # LIDAR + timestamp=timestamp, + scalar_data=world_state["position"]["x"] # Example: return X position as scalar data + ) + + def receive_actuator_command(self, command: ActuatorCommand): + try: + self.actuator_command_queue.put_nowait(command) + except asyncio.QueueFull: + logger.warning("Actuator command queue full, dropping command") + else: + logger.info(f"QSim received actuator command: {MessageToDict(command)}") + self.world_model.update(command) # Update world model based on command + + def run(self): + logger.info("QSimService started.") + try: + while True: + self.step() # Call the new step method + time.sleep(self.config.get("sim_tick_interval", 1)) + except KeyboardInterrupt: + logger.info("QSimService stopped by user.") + + def step(self): + """ + Performs one step of the simulation. + """ + # Advance world model state + delta_time = self.config.get("sim_tick_interval", 1) + self.world_model.step(delta_time) + + # Generate sensor data + sensor_data = self.generate_sensor_data() + try: + self.sensor_data_queue.put_nowait(sensor_data) + except asyncio.QueueFull: + logger.warning("Sensor data queue full, dropping data") + else: + logger.debug(f"Generated sensor data: {MessageToDict(sensor_data)}") + +def load_config(path='config.yaml'): + config_path = os.path.join(os.path.dirname(__file__), path) + if not os.path.exists(config_path): + raise FileNotFoundError(f"Config file not found: {config_path}") + + with open(config_path, 'r') as f: + return yaml.safe_load(f) + +if __name__ == "__main__": + # Настройка логирования + log_config_path = os.path.join(os.path.dirname(__file__), '..', 'q_core_agent', 'config', 'logging.yaml') + setup_logging(default_path=log_config_path) + + config = load_config() + sim_service = QSimService(config) + sim_service.run()