diff --git a/extracted/QIKI_DTMP/services/q_core_agent/__init__.py b/extracted/QIKI_DTMP/services/q_core_agent/__init__.py new file mode 100644 index 0000000..ae9c520 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/__init__.py @@ -0,0 +1,2 @@ +"""Основной пакет q_core_agent для работы с состоянием.""" + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/generated/__init__.py b/extracted/QIKI_DTMP/services/q_core_agent/generated/__init__.py new file mode 100644 index 0000000..d8e3e2b --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/generated/__init__.py @@ -0,0 +1,2 @@ +"""Заглушки protobuf сообщений для тестов.""" + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/generated/common_types_pb2.py b/extracted/QIKI_DTMP/services/q_core_agent/generated/common_types_pb2.py new file mode 100644 index 0000000..b76fd7e --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/generated/common_types_pb2.py @@ -0,0 +1,12 @@ +"""Простейшие типы, имитирующие protobuf common_types.""" +from dataclasses import dataclass + + +@dataclass +class UUID: + """Минималистичная реализация protobuf UUID""" + value: str = "" + + def CopyFrom(self, other: "UUID") -> None: + self.value = other.value + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/generated/fsm_state_pb2.py b/extracted/QIKI_DTMP/services/q_core_agent/generated/fsm_state_pb2.py new file mode 100644 index 0000000..0adba4c --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/generated/fsm_state_pb2.py @@ -0,0 +1,106 @@ +"""Заглушка protobuf сообщений FSM для тестов.""" +from dataclasses import dataclass, field +from enum import IntEnum +from typing import List, Dict +import json + +from google.protobuf.timestamp_pb2 import Timestamp + +from .common_types_pb2 import UUID + + +class FSMStateEnum(IntEnum): + FSM_STATE_UNSPECIFIED = 0 + BOOTING = 1 + IDLE = 2 + ACTIVE = 3 + ERROR_STATE = 4 + SHUTDOWN = 5 + + +class FSMTransitionStatus(IntEnum): + FSM_TRANSITION_STATUS_UNSPECIFIED = 0 + SUCCESS = 1 + FAILED = 2 + PENDING = 3 + + +@dataclass +class StateTransition: + from_state: int = FSMStateEnum.FSM_STATE_UNSPECIFIED + to_state: int = FSMStateEnum.FSM_STATE_UNSPECIFIED + trigger_event: str = "" + status: int = FSMTransitionStatus.FSM_TRANSITION_STATUS_UNSPECIFIED + error_message: str = "" + timestamp: Timestamp = field(default_factory=Timestamp) + + +@dataclass +class FsmStateSnapshot: + snapshot_id: UUID = field(default_factory=UUID) + fsm_instance_id: UUID = field(default_factory=UUID) + timestamp: Timestamp = field(default_factory=Timestamp) + current_state: int = FSMStateEnum.FSM_STATE_UNSPECIFIED + source_module: str = "" + attempt_count: int = 0 + history: List[StateTransition] = field(default_factory=list) + context_data: Dict[str, str] = field(default_factory=dict) + state_metadata: Dict[str, str] = field(default_factory=dict) + + def SerializeToString(self) -> bytes: + """Простая сериализация в JSON.""" + def transition_dict(t: StateTransition) -> Dict[str, object]: + return { + 'from_state': int(t.from_state), + 'to_state': int(t.to_state), + 'trigger_event': t.trigger_event, + 'status': int(t.status), + 'error_message': t.error_message, + 'timestamp': {'seconds': t.timestamp.seconds, 'nanos': t.timestamp.nanos}, + } + + data = { + 'snapshot_id': self.snapshot_id.value, + 'fsm_instance_id': self.fsm_instance_id.value, + 'timestamp': {'seconds': self.timestamp.seconds, 'nanos': self.timestamp.nanos}, + 'current_state': int(self.current_state), + 'source_module': self.source_module, + 'attempt_count': self.attempt_count, + 'history': [transition_dict(t) for t in self.history], + 'context_data': self.context_data, + 'state_metadata': self.state_metadata, + } + return json.dumps(data).encode() + + def ParseFromString(self, data: bytes) -> None: + """Простая десериализация из JSON.""" + obj = json.loads(data.decode()) + self.snapshot_id.value = obj.get('snapshot_id', '') + self.fsm_instance_id.value = obj.get('fsm_instance_id', '') + ts = obj.get('timestamp', {}) + self.timestamp.seconds = ts.get('seconds', 0) + self.timestamp.nanos = ts.get('nanos', 0) + self.current_state = obj.get('current_state', 0) + self.source_module = obj.get('source_module', '') + self.attempt_count = obj.get('attempt_count', 0) + self.history.clear() + for h in obj.get('history', []): + tr = StateTransition( + from_state=h.get('from_state', 0), + to_state=h.get('to_state', 0), + trigger_event=h.get('trigger_event', ''), + status=h.get('status', 0), + error_message=h.get('error_message', ''), + ) + ts = h.get('timestamp', {}) + tr.timestamp.seconds = ts.get('seconds', 0) + tr.timestamp.nanos = ts.get('nanos', 0) + self.history.append(tr) + self.context_data = {str(k): str(v) for k, v in obj.get('context_data', {}).items()} + self.state_metadata = {str(k): str(v) for k, v in obj.get('state_metadata', {}).items()} + + +def MessageToDict(proto: FsmStateSnapshot) -> Dict[str, object]: + """Утилита для преобразования снапшота в dict.""" + return json.loads(proto.SerializeToString()) + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/__init__.py b/extracted/QIKI_DTMP/services/q_core_agent/state/__init__.py new file mode 100644 index 0000000..35db4b5 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/__init__.py @@ -0,0 +1,2 @@ +"""Пакет утилит для работы со StateStore.""" + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/conv.py b/extracted/QIKI_DTMP/services/q_core_agent/state/conv.py new file mode 100644 index 0000000..89fcce5 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/conv.py @@ -0,0 +1,281 @@ +""" +Конвертеры между DTO и protobuf для FSM состояний. +Изоляция protobuf логики от внутренней модели данных. +""" +import uuid +from typing import Optional + +from q_core_agent.generated.fsm_state_pb2 import ( + FsmStateSnapshot, StateTransition, FSMStateEnum, FSMTransitionStatus, MessageToDict +) +from q_core_agent.generated.common_types_pb2 import UUID +from google.protobuf.timestamp_pb2 import Timestamp +from .types import ( + FsmSnapshotDTO, TransitionDTO, FsmState, TransitionStatus, + initial_snapshot, create_transition, next_snapshot +) + + +class ConversionError(Exception): + """Ошибка при конвертации между DTO и protobuf""" + pass + + +# Маппинг enum'ов между DTO и protobuf +FSM_STATE_DTO_TO_PROTO = { + FsmState.UNSPECIFIED: FSMStateEnum.FSM_STATE_UNSPECIFIED, + FsmState.BOOTING: FSMStateEnum.BOOTING, + FsmState.IDLE: FSMStateEnum.IDLE, + FsmState.ACTIVE: FSMStateEnum.ACTIVE, + FsmState.ERROR_STATE: FSMStateEnum.ERROR_STATE, + FsmState.SHUTDOWN: FSMStateEnum.SHUTDOWN, +} + +FSM_STATE_PROTO_TO_DTO = {v: k for k, v in FSM_STATE_DTO_TO_PROTO.items()} + +TRANSITION_STATUS_DTO_TO_PROTO = { + TransitionStatus.UNSPECIFIED: FSMTransitionStatus.FSM_TRANSITION_STATUS_UNSPECIFIED, + TransitionStatus.SUCCESS: FSMTransitionStatus.SUCCESS, + TransitionStatus.FAILED: FSMTransitionStatus.FAILED, + TransitionStatus.PENDING: FSMTransitionStatus.PENDING, +} + +TRANSITION_STATUS_PROTO_TO_DTO = {v: k for k, v in TRANSITION_STATUS_DTO_TO_PROTO.items()} + + +def _create_uuid_proto(uuid_str: str) -> UUID: + """Создать protobuf UUID из строки""" + uuid_proto = UUID() + try: + uuid_obj = uuid.UUID(uuid_str) + uuid_proto.value = str(uuid_obj) + except (ValueError, AttributeError): + # Генерируем новый UUID если строка невалидна + uuid_proto.value = str(uuid.uuid4()) + return uuid_proto + + +def _extract_uuid_string(uuid_proto: Optional[UUID]) -> str: + """Извлечь строку UUID из protobuf""" + if uuid_proto and uuid_proto.value: + return uuid_proto.value + return str(uuid.uuid4()) # fallback + + +def _timestamp_to_float(ts: Optional[Timestamp]) -> float: + """Конвертировать protobuf Timestamp в float секунд""" + if ts is None: + return 0.0 + return ts.seconds + ts.nanos / 1e9 + + +def _float_to_timestamp(ts_float: float) -> Timestamp: + """Конвертировать float секунд в protobuf Timestamp""" + ts = Timestamp() + if ts_float > 0: + ts.FromSeconds(int(ts_float)) + # Добавляем наносекунды + nanos = int((ts_float - int(ts_float)) * 1e9) + ts.nanos = nanos + return ts + + +def transition_dto_to_proto(dto: TransitionDTO) -> StateTransition: + """Конвертировать TransitionDTO в protobuf StateTransition""" + try: + proto = StateTransition() + + # Конвертируем enum'ы + proto.from_state = FSM_STATE_DTO_TO_PROTO.get(dto.from_state, FSMStateEnum.FSM_STATE_UNSPECIFIED) + proto.to_state = FSM_STATE_DTO_TO_PROTO.get(dto.to_state, FSMStateEnum.FSM_STATE_UNSPECIFIED) + proto.status = TRANSITION_STATUS_DTO_TO_PROTO.get(dto.status, FSMTransitionStatus.FSM_TRANSITION_STATUS_UNSPECIFIED) + + # Строковые поля + proto.trigger_event = dto.trigger_event + proto.error_message = dto.error_message + + # Временная метка (используем wall time) + if dto.ts_wall > 0: + proto.timestamp.CopyFrom(_float_to_timestamp(dto.ts_wall)) + + return proto + + except Exception as e: + raise ConversionError(f"Ошибка конвертации TransitionDTO->proto: {e}") + + +def transition_proto_to_dto(proto: StateTransition) -> TransitionDTO: + """Конвертировать protobuf StateTransition в TransitionDTO""" + try: + return TransitionDTO( + from_state=FSM_STATE_PROTO_TO_DTO.get(proto.from_state, FsmState.UNSPECIFIED), + to_state=FSM_STATE_PROTO_TO_DTO.get(proto.to_state, FsmState.UNSPECIFIED), + trigger_event=proto.trigger_event or "", + status=TRANSITION_STATUS_PROTO_TO_DTO.get(proto.status, TransitionStatus.UNSPECIFIED), + error_message=proto.error_message or "", + ts_wall=_timestamp_to_float(proto.timestamp), + ts_mono=0.0 # не храним в protobuf, только wall time + ) + except Exception as e: + raise ConversionError(f"Ошибка конвертации StateTransition->DTO: {e}") + + +def dto_to_proto(dto: FsmSnapshotDTO) -> FsmStateSnapshot: + """ + Конвертировать FsmSnapshotDTO в protobuf FsmStateSnapshot. + Главная функция для экспорта состояния в внешние интерфейсы. + """ + try: + proto = FsmStateSnapshot() + + # UUID'ы + if dto.snapshot_id: + proto.snapshot_id.CopyFrom(_create_uuid_proto(dto.snapshot_id)) + if dto.fsm_instance_id: + proto.fsm_instance_id.CopyFrom(_create_uuid_proto(dto.fsm_instance_id)) + + # Временная метка (wall time) + if dto.ts_wall > 0: + proto.timestamp.CopyFrom(_float_to_timestamp(dto.ts_wall)) + + # Основное состояние + proto.current_state = FSM_STATE_DTO_TO_PROTO.get(dto.state, FSMStateEnum.FSM_STATE_UNSPECIFIED) + + # Строковые поля + proto.source_module = dto.source_module or "" + proto.attempt_count = dto.attempt_count + + # История переходов + if dto.history: + for transition_dto in dto.history: + transition_proto = transition_dto_to_proto(transition_dto) + proto.history.append(transition_proto) + + # Контекстные данные + if dto.context_data: + for key, value in dto.context_data.items(): + proto.context_data[key] = str(value) + + # Метаданные состояния + if dto.state_metadata: + for key, value in dto.state_metadata.items(): + proto.state_metadata[key] = str(value) + + # Добавляем DTO-специфичные метаданные в state_metadata + proto.state_metadata['dto_version'] = str(dto.version) + proto.state_metadata['dto_reason'] = dto.reason + if dto.prev_state: + proto.state_metadata['dto_prev_state'] = dto.prev_state.name + proto.state_metadata['dto_ts_mono'] = str(dto.ts_mono) + + return proto + + except Exception as e: + raise ConversionError(f"Ошибка конвертации FsmSnapshotDTO->proto: {e}") + + +def proto_to_dto(proto: FsmStateSnapshot) -> FsmSnapshotDTO: + """ + Конвертировать protobuf FsmStateSnapshot в FsmSnapshotDTO. + Используется редко - в основном для тестов или импорта внешних состояний. + """ + try: + # Извлекаем UUID'ы + snapshot_id = _extract_uuid_string(proto.snapshot_id) + fsm_instance_id = _extract_uuid_string(proto.fsm_instance_id) + + # Временные метки + ts_wall = _timestamp_to_float(proto.timestamp) + + # Основное состояние + current_state = FSM_STATE_PROTO_TO_DTO.get(proto.current_state, FsmState.UNSPECIFIED) + + # История переходов + history = [] + for transition_proto in proto.history: + transition_dto = transition_proto_to_dto(transition_proto) + history.append(transition_dto) + + # Контекстные данные и метаданные + context_data = dict(proto.context_data) if proto.context_data else {} + state_metadata = dict(proto.state_metadata) if proto.state_metadata else {} + + # Извлекаем DTO-специфичные поля из метаданных + version = int(state_metadata.pop('dto_version', '0')) + reason = state_metadata.pop('dto_reason', '') + prev_state_name = state_metadata.pop('dto_prev_state', None) + prev_state = None + if prev_state_name: + try: + prev_state = FsmState[prev_state_name] + except KeyError: + pass + ts_mono = float(state_metadata.pop('dto_ts_mono', '0.0')) + + return FsmSnapshotDTO( + version=version, + state=current_state, + prev_state=prev_state, + reason=reason, + ts_mono=ts_mono, + ts_wall=ts_wall, + snapshot_id=snapshot_id, + fsm_instance_id=fsm_instance_id, + source_module=proto.source_module or "unknown", + attempt_count=proto.attempt_count, + history=history, + context_data=context_data, + state_metadata=state_metadata + ) + + except Exception as e: + raise ConversionError(f"Ошибка конвертации FsmStateSnapshot->DTO: {e}") + + +def dto_to_json_dict(dto: FsmSnapshotDTO) -> dict: + """ + Конвертировать DTO в JSON-совместимый словарь для логирования. + Более лёгкая альтернатива полной protobuf конвертации. + """ + return { + 'version': dto.version, + 'state': dto.state.name, + 'prev_state': dto.prev_state.name if dto.prev_state else None, + 'reason': dto.reason, + 'ts_mono': dto.ts_mono, + 'ts_wall': dto.ts_wall, + 'snapshot_id': dto.snapshot_id, + 'fsm_instance_id': dto.fsm_instance_id, + 'source_module': dto.source_module, + 'attempt_count': dto.attempt_count, + 'history_count': len(dto.history) if dto.history else 0, + 'context_keys': list(dto.context_data.keys()) if dto.context_data else [], + 'metadata_keys': list(dto.state_metadata.keys()) if dto.state_metadata else [] + } + + +def dto_to_protobuf_json(dto: FsmSnapshotDTO) -> dict: + """ + Конвертировать DTO в JSON через protobuf (для совместимости с существующими логами). + Более медленная, но полностью совместимая с текущим форматом логов. + """ + proto = dto_to_proto(dto) + try: + return MessageToDict(proto) + except Exception as e: + # Оборачиваем любые ошибки конвертации в единый тип + raise ConversionError(f"Ошибка protobuf->JSON: {e}") + + +# Удобные функции для быстрого использования +def create_proto_snapshot(state: FsmState, reason: str, version: int = 1) -> FsmStateSnapshot: + """Создать protobuf снапшот из основных параметров""" + dto = FsmSnapshotDTO(version=version, state=state, reason=reason) + return dto_to_proto(dto) + + +def parse_proto_snapshot(proto_data: bytes) -> FsmSnapshotDTO: + """Распарсить protobuf данные в DTO""" + proto = FsmStateSnapshot() + proto.ParseFromString(proto_data) + return proto_to_dto(proto) diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/store.py b/extracted/QIKI_DTMP/services/q_core_agent/state/store.py new file mode 100644 index 0000000..37b57c1 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/store.py @@ -0,0 +1,265 @@ +""" +AsyncStateStore - потокобезопасное хранилище FSM состояний. +Single Source of Truth (SSOT) для FSM состояния в Q-Core процессе. +""" +import asyncio +from typing import Optional, List, Any, Dict +import logging +import time +from dataclasses import replace + +from .types import FsmSnapshotDTO, initial_snapshot + + +logger = logging.getLogger(__name__) + + +class StateStoreError(Exception): + """Базовое исключение для ошибок StateStore""" + pass + + +class StateVersionError(StateStoreError): + """Ошибка версионирования состояния""" + pass + + +class AsyncStateStore: + """ + Async-only StateStore для FSM состояния. + + Ключевые принципы: + - Только один писатель (FSMHandler) + - Множественные читатели (логи, gRPC, CLI) + - Pub/Sub через asyncio.Queue для подписчиков + - Версионирование и защита от дублирования + - Иммутабельные DTO снапшоты + """ + + def __init__(self, initial_state: Optional[FsmSnapshotDTO] = None): + self._lock = asyncio.Lock() + self._snap: Optional[FsmSnapshotDTO] = initial_state + self._subscribers: List[asyncio.Queue] = [] + self._subscriber_ids: Dict[int, str] = {} # для отладки + self._metrics: Dict[str, Any] = { + 'total_sets': 0, + 'total_gets': 0, + 'version_conflicts': 0, + 'subscriber_count': 0, + 'last_update_ts': 0.0, + 'creation_ts': time.time() + } + + async def get(self) -> Optional[FsmSnapshotDTO]: + """ + Получить текущий снапшот состояния. + Возвращает immutable DTO или None если состояние не инициализировано. + """ + async with self._lock: + self._metrics['total_gets'] += 1 + # DTO уже immutable, можно возвращать как есть + return self._snap + + async def get_with_meta(self) -> tuple[Optional[FsmSnapshotDTO], Dict[str, Any]]: + """Получить состояние с метаинформацией""" + async with self._lock: + self._metrics['total_gets'] += 1 + meta = { + 'store_metrics': dict(self._metrics), + 'subscriber_count': len(self._subscribers), + 'has_state': self._snap is not None, + 'current_version': self._snap.version if self._snap else -1 + } + return self._snap, meta + + async def set(self, new_snap: FsmSnapshotDTO, enforce_version: bool = False) -> FsmSnapshotDTO: + """ + Установить новое состояние. + + Args: + new_snap: Новый снапшот состояния + enforce_version: Если True, проверяет что версия новее текущей + + Returns: + Установленный снапшот (может отличаться от входного по версии) + + Raises: + StateVersionError: При нарушении версионности + """ + if new_snap is None: + raise StateStoreError("Попытка установить None состояние") + + async with self._lock: + # Проверка версионности + if enforce_version and self._snap is not None: + if new_snap.version <= self._snap.version: + self._metrics['version_conflicts'] += 1 + raise StateVersionError( + f"Версия {new_snap.version} не больше текущей {self._snap.version}" + ) + + # Автоинкремент версии для любого изменения состояния + if self._snap is None: + initial_version = new_snap.version if new_snap.version != 0 else 1 + new_snap = replace(new_snap, version=initial_version) + else: + same_state = ( + new_snap.state == self._snap.state and new_snap.reason == "NO_CHANGE" + ) + if same_state: + new_snap = replace(new_snap, version=self._snap.version) + else: + new_snap = replace(new_snap, version=self._snap.version + 1) + + self._snap = new_snap + self._metrics['total_sets'] += 1 + self._metrics['last_update_ts'] = time.time() + + # Уведомляем подписчиков + await self._notify_subscribers(new_snap) + + state_name = getattr(new_snap.state, "name", str(new_snap.state)) + logger.debug( + f"StateStore updated: version={new_snap.version}, " + f"state={state_name}, reason='{new_snap.reason}'" + ) + + return self._snap + + async def subscribe(self, subscriber_id: str = "unknown") -> asyncio.Queue: + """ + Подписаться на изменения состояния. + + Returns: + asyncio.Queue с FsmSnapshotDTO объектами при изменениях + """ + queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) + + async with self._lock: + self._subscribers.append(queue) + queue_id = id(queue) + self._subscriber_ids[queue_id] = subscriber_id + self._metrics['subscriber_count'] = len(self._subscribers) + + # Отправляем текущее состояние новому подписчику + if self._snap is not None: + try: + queue.put_nowait(self._snap) + except asyncio.QueueFull: + logger.warning(f"Queue full for new subscriber {subscriber_id}") + + logger.debug(f"New subscriber: {subscriber_id}, total: {len(self._subscribers)}") + + return queue + + async def unsubscribe(self, queue: asyncio.Queue): + """Отписаться от уведомлений""" + async with self._lock: + if queue in self._subscribers: + self._subscribers.remove(queue) + queue_id = id(queue) + subscriber_id = self._subscriber_ids.pop(queue_id, "unknown") + self._metrics['subscriber_count'] = len(self._subscribers) + logger.debug(f"Unsubscribed: {subscriber_id}, remaining: {len(self._subscribers)}") + + async def _notify_subscribers(self, snap: FsmSnapshotDTO): + """Уведомить всех подписчиков о новом состоянии""" + dead_queues = [] + + for queue in self._subscribers: + try: + queue.put_nowait(snap) + except asyncio.QueueFull: + # Очередь переполнена - логируем но не блокируем + queue_id = id(queue) + subscriber_id = self._subscriber_ids.get(queue_id, "unknown") + logger.warning(f"Subscriber {subscriber_id} queue full, skipping update") + except Exception as e: + # Очередь мертва - помечаем для удаления + queue_id = id(queue) + subscriber_id = self._subscriber_ids.get(queue_id, "unknown") + logger.warning(f"Dead subscriber {subscriber_id}: {e}") + dead_queues.append(queue) + + # Удаляем мертвые очереди + for dead_queue in dead_queues: + try: + self._subscribers.remove(dead_queue) + queue_id = id(dead_queue) + self._subscriber_ids.pop(queue_id, None) + except ValueError: + pass # Уже удалена + + if dead_queues: + self._metrics['subscriber_count'] = len(self._subscribers) + + async def initialize_if_empty(self) -> FsmSnapshotDTO: + """Инициализировать начальным состоянием если пусто""" + async with self._lock: + if self._snap is None: + self._snap = initial_snapshot() + self._metrics['total_sets'] += 1 + self._metrics['last_update_ts'] = time.time() + + # Уведомляем подписчиков + await self._notify_subscribers(self._snap) + + logger.info("StateStore initialized with COLD_START state") + + return self._snap + + async def get_metrics(self) -> Dict[str, Any]: + """Получить метрики работы StateStore""" + async with self._lock: + # get_metrics считается ещё одним чтением состояния + self._metrics['total_gets'] += 1 + snap = self._snap + uptime = time.time() - self._metrics['creation_ts'] + state_name = snap.state.name if snap else "UNINITIALIZED" + return { + **self._metrics, + 'uptime_seconds': uptime, + 'current_version': snap.version if snap else -1, + 'current_state': state_name, + 'active_subscribers': len(self._subscribers) + } + + async def health_check(self) -> Dict[str, Any]: + """Проверка здоровья StateStore""" + metrics = await self.get_metrics() + + health = { + 'healthy': True, + 'issues': [], + 'metrics': metrics + } + + # Проверки здоровья + if metrics['version_conflicts'] > metrics['total_sets'] * 0.1: + health['healthy'] = False + health['issues'].append("Высокий процент конфликтов версий") + + if len(self._subscribers) > 100: + health['issues'].append("Много подписчиков, возможна утечка") + + if self._snap is None: + health['healthy'] = False + health['issues'].append("StateStore не инициализирован") + + return health + + +# Удобные функции для создания store +def create_store(initial_state: Optional[FsmSnapshotDTO] = None) -> AsyncStateStore: + """Создать новый AsyncStateStore""" + return AsyncStateStore(initial_state) + + +def create_initialized_store() -> AsyncStateStore: + """Создать StateStore с начальным состоянием COLD_START""" + return AsyncStateStore(initial_snapshot()) + + +# Константы для тестирования +TEST_SUBSCRIBER_TIMEOUT = 5.0 # таймаут для тестовых подписчиков +MAX_QUEUE_SIZE = 64 # размер очереди подписчиков diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/tests/__init__.py b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/__init__.py new file mode 100644 index 0000000..60e0d9d --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/__init__.py @@ -0,0 +1,2 @@ +"""Тестовый пакет для модуля state.""" + diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_conv.py b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_conv.py new file mode 100644 index 0000000..34e2510 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_conv.py @@ -0,0 +1,519 @@ +""" +Серьёзные unit тесты для конвертеров DTO ↔ protobuf. +Проверяют корректность конвертации, обработку ошибок, совместимость. +""" +import pytest +import uuid +from unittest.mock import patch + +from ..conv import ( + dto_to_proto, proto_to_dto, transition_dto_to_proto, transition_proto_to_dto, + dto_to_json_dict, dto_to_protobuf_json, ConversionError, + create_proto_snapshot, parse_proto_snapshot, + FSM_STATE_DTO_TO_PROTO, FSM_STATE_PROTO_TO_DTO +) +from ..types import ( + FsmSnapshotDTO, TransitionDTO, FsmState, TransitionStatus, + initial_snapshot, create_transition +) + +# Импорт protobuf для тестов +from q_core_agent.generated.fsm_state_pb2 import ( + FsmStateSnapshot, StateTransition, FSMStateEnum, FSMTransitionStatus +) +from q_core_agent.generated.common_types_pb2 import UUID as ProtoUUID + + +class TestEnumMappings: + """Тесты маппинга enum'ов между DTO и protobuf""" + + def test_fsm_state_dto_to_proto_mapping(self): + """Тест маппинга FsmState -> FSMStateEnum""" + assert FSM_STATE_DTO_TO_PROTO[FsmState.UNSPECIFIED] == FSMStateEnum.FSM_STATE_UNSPECIFIED + assert FSM_STATE_DTO_TO_PROTO[FsmState.BOOTING] == FSMStateEnum.BOOTING + assert FSM_STATE_DTO_TO_PROTO[FsmState.IDLE] == FSMStateEnum.IDLE + assert FSM_STATE_DTO_TO_PROTO[FsmState.ACTIVE] == FSMStateEnum.ACTIVE + assert FSM_STATE_DTO_TO_PROTO[FsmState.ERROR_STATE] == FSMStateEnum.ERROR_STATE + assert FSM_STATE_DTO_TO_PROTO[FsmState.SHUTDOWN] == FSMStateEnum.SHUTDOWN + + def test_fsm_state_proto_to_dto_mapping(self): + """Тест маппинга FSMStateEnum -> FsmState""" + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.FSM_STATE_UNSPECIFIED] == FsmState.UNSPECIFIED + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.BOOTING] == FsmState.BOOTING + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.IDLE] == FsmState.IDLE + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.ACTIVE] == FsmState.ACTIVE + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.ERROR_STATE] == FsmState.ERROR_STATE + assert FSM_STATE_PROTO_TO_DTO[FSMStateEnum.SHUTDOWN] == FsmState.SHUTDOWN + + def test_bidirectional_enum_mapping(self): + """Тест что маппинг работает в обе стороны""" + for dto_state, proto_state in FSM_STATE_DTO_TO_PROTO.items(): + assert FSM_STATE_PROTO_TO_DTO[proto_state] == dto_state + + +class TestTransitionConversion: + """Тесты конвертации переходов TransitionDTO ↔ StateTransition""" + + def test_transition_dto_to_proto(self): + """Тест конвертации TransitionDTO -> StateTransition""" + dto = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ACTIVE, + trigger_event="PROPOSALS_RECEIVED", + status=TransitionStatus.SUCCESS, + error_message="", + ts_wall=1234567890.5 + ) + + proto = transition_dto_to_proto(dto) + + assert proto.from_state == FSMStateEnum.IDLE + assert proto.to_state == FSMStateEnum.ACTIVE + assert proto.trigger_event == "PROPOSALS_RECEIVED" + assert proto.status == FSMTransitionStatus.SUCCESS + assert proto.error_message == "" + assert proto.timestamp.seconds == 1234567890 + assert proto.timestamp.nanos == 500000000 # 0.5 сек в наносекундах + + def test_transition_proto_to_dto(self): + """Тест конвертации StateTransition -> TransitionDTO""" + proto = StateTransition() + proto.from_state = FSMStateEnum.ACTIVE + proto.to_state = FSMStateEnum.IDLE + proto.trigger_event = "NO_PROPOSALS" + proto.status = FSMTransitionStatus.SUCCESS + proto.error_message = "" + proto.timestamp.FromSeconds(1234567890) + proto.timestamp.nanos = 250000000 # 0.25 сек + + dto = transition_proto_to_dto(proto) + + assert dto.from_state == FsmState.ACTIVE + assert dto.to_state == FsmState.IDLE + assert dto.trigger_event == "NO_PROPOSALS" + assert dto.status == TransitionStatus.SUCCESS + assert dto.error_message == "" + assert abs(dto.ts_wall - 1234567890.25) < 0.001 # примерно равно + + def test_transition_with_error(self): + """Тест конвертации перехода с ошибкой""" + dto = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ERROR_STATE, + trigger_event="BIOS_ERROR", + status=TransitionStatus.FAILED, + error_message="BIOS system check failed" + ) + + proto = transition_dto_to_proto(dto) + back_dto = transition_proto_to_dto(proto) + + assert back_dto.from_state == dto.from_state + assert back_dto.to_state == dto.to_state + assert back_dto.trigger_event == dto.trigger_event + assert back_dto.status == dto.status + assert back_dto.error_message == dto.error_message + + def test_transition_roundtrip(self): + """Тест roundtrip конвертации перехода""" + original = create_transition( + FsmState.BOOTING, + FsmState.IDLE, + "BOOT_COMPLETE", + TransitionStatus.SUCCESS + ) + + proto = transition_dto_to_proto(original) + converted_back = transition_proto_to_dto(proto) + + # Проверяем основные поля (временные метки могут отличаться) + assert converted_back.from_state == original.from_state + assert converted_back.to_state == original.to_state + assert converted_back.trigger_event == original.trigger_event + assert converted_back.status == original.status + assert converted_back.error_message == original.error_message + + +class TestSnapshotConversion: + """Тесты конвертации снапшотов FsmSnapshotDTO ↔ FsmStateSnapshot""" + + def test_dto_to_proto_basic(self): + """Тест базовой конвертации DTO -> protobuf""" + dto = FsmSnapshotDTO( + version=42, + state=FsmState.ACTIVE, + reason="TEST_REASON", + snapshot_id=str(uuid.uuid4()), + fsm_instance_id=str(uuid.uuid4()), + source_module="test_module", + attempt_count=5 + ) + + proto = dto_to_proto(dto) + + assert proto.current_state == FSMStateEnum.ACTIVE + assert proto.source_module == "test_module" + assert proto.attempt_count == 5 + assert proto.snapshot_id.value == dto.snapshot_id + assert proto.fsm_instance_id.value == dto.fsm_instance_id + + # DTO-специфичные поля должны быть в метаданных + assert proto.state_metadata['dto_version'] == '42' + assert proto.state_metadata['dto_reason'] == 'TEST_REASON' + + def test_dto_to_proto_with_history(self): + """Тест конвертации с историей переходов""" + transition1 = create_transition(FsmState.BOOTING, FsmState.IDLE, "BOOT_COMPLETE") + transition2 = create_transition(FsmState.IDLE, FsmState.ACTIVE, "PROPOSALS_RECEIVED") + + dto = FsmSnapshotDTO( + version=3, + state=FsmState.ACTIVE, + history=[transition1, transition2] + ) + + proto = dto_to_proto(dto) + + assert len(proto.history) == 2 + assert proto.history[0].from_state == FSMStateEnum.BOOTING + assert proto.history[0].to_state == FSMStateEnum.IDLE + assert proto.history[1].from_state == FSMStateEnum.IDLE + assert proto.history[1].to_state == FSMStateEnum.ACTIVE + + def test_dto_to_proto_with_metadata(self): + """Тест конвертации с метаданными""" + context_data = {"sensor_count": "10", "proposal_active": "true"} + state_metadata = {"debug": "false", "test_mode": "true"} + + dto = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + context_data=context_data, + state_metadata=state_metadata + ) + + proto = dto_to_proto(dto) + + # Контекстные данные + assert proto.context_data["sensor_count"] == "10" + assert proto.context_data["proposal_active"] == "true" + + # Метаданные состояния (включая DTO-специфичные) + assert proto.state_metadata["debug"] == "false" + assert proto.state_metadata["test_mode"] == "true" + assert proto.state_metadata["dto_version"] == "1" + + def test_proto_to_dto_basic(self): + """Тест базовой конвертации protobuf -> DTO""" + proto = FsmStateSnapshot() + proto.current_state = FSMStateEnum.IDLE + proto.source_module = "proto_test" + proto.attempt_count = 7 + + # Устанавливаем UUID + proto.snapshot_id.value = str(uuid.uuid4()) + proto.fsm_instance_id.value = str(uuid.uuid4()) + + # DTO-специфичные метаданные + proto.state_metadata["dto_version"] = "15" + proto.state_metadata["dto_reason"] = "PROTO_TEST" + proto.state_metadata["dto_prev_state"] = "BOOTING" + proto.state_metadata["dto_ts_mono"] = "1234.567" + + dto = proto_to_dto(proto) + + assert dto.state == FsmState.IDLE + assert dto.version == 15 + assert dto.reason == "PROTO_TEST" + assert dto.prev_state == FsmState.BOOTING + assert dto.source_module == "proto_test" + assert dto.attempt_count == 7 + assert abs(dto.ts_mono - 1234.567) < 0.001 + + def test_roundtrip_conversion(self): + """Тест roundtrip конвертации снапшота""" + original = FsmSnapshotDTO( + version=99, + state=FsmState.ERROR_STATE, + prev_state=FsmState.ACTIVE, + reason="ROUNDTRIP_TEST", + source_module="roundtrip_module", + attempt_count=3, + context_data={"key1": "value1"}, + state_metadata={"key2": "value2"} + ) + + proto = dto_to_proto(original) + converted_back = proto_to_dto(proto) + + # Основные поля должны совпадать + assert converted_back.version == original.version + assert converted_back.state == original.state + assert converted_back.prev_state == original.prev_state + assert converted_back.reason == original.reason + assert converted_back.source_module == original.source_module + assert converted_back.attempt_count == original.attempt_count + assert converted_back.context_data == original.context_data + assert converted_back.state_metadata == original.state_metadata + + def test_empty_and_none_handling(self): + """Тест обработки пустых значений и None""" + dto = FsmSnapshotDTO( + version=0, + state=FsmState.UNSPECIFIED, + reason="", + prev_state=None, + history=None, # должно стать [] + context_data=None, # должно стать {} + state_metadata=None # должно стать {} + ) + + proto = dto_to_proto(dto) + converted_back = proto_to_dto(proto) + + assert converted_back.reason == "" + assert converted_back.prev_state is None + assert converted_back.history == () + assert converted_back.context_data == {} + # state_metadata содержит DTO-специфичные поля, но пустые пользовательские + + +class TestConversionErrors: + """Тесты обработки ошибок конвертации""" + + def test_dto_to_proto_with_invalid_enum(self): + """Тест конвертации с некорректным enum (защита от будущих изменений)""" + # Создаём DTO с "новым" состоянием которого нет в маппинге + dto = FsmSnapshotDTO(version=1, state=999, reason="INVALID_ENUM") # некорректное состояние + + # Конвертация должна использовать fallback + proto = dto_to_proto(dto) + assert proto.current_state == FSMStateEnum.FSM_STATE_UNSPECIFIED + + @patch('q_core_agent.state.conv.MessageToDict') + def test_conversion_exception_handling(self, mock_message_to_dict): + """Тест обработки исключений в процессе конвертации""" + mock_message_to_dict.side_effect = Exception("Protobuf error") + + dto = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + + # Должно упасть с ConversionError + with pytest.raises(ConversionError): + dto_to_protobuf_json(dto) + + def test_invalid_uuid_handling(self): + """Тест обработки некорректных UUID""" + dto = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + snapshot_id="invalid-uuid-string", + fsm_instance_id="" # пустой UUID + ) + + # Конвертация должна пройти с fallback на новые UUID + proto = dto_to_proto(dto) + + # UUID должны быть валидными + uuid.UUID(proto.snapshot_id.value) # не должно упасть + uuid.UUID(proto.fsm_instance_id.value) # не должно упасть + + +class TestJSONConversion: + """Тесты конвертации в JSON форматы""" + + def test_dto_to_json_dict(self): + """Тест конвертации DTO в JSON-совместимый словарь""" + transition = create_transition(FsmState.BOOTING, FsmState.IDLE, "BOOT") + + dto = FsmSnapshotDTO( + version=5, + state=FsmState.ACTIVE, + prev_state=FsmState.IDLE, + reason="JSON_TEST", + history=[transition], + context_data={"key": "value"}, + state_metadata={"meta": "data"} + ) + + json_dict = dto_to_json_dict(dto) + + assert json_dict['version'] == 5 + assert json_dict['state'] == 'ACTIVE' + assert json_dict['prev_state'] == 'IDLE' + assert json_dict['reason'] == 'JSON_TEST' + assert json_dict['history_count'] == 1 + assert json_dict['context_keys'] == ['key'] + assert json_dict['metadata_keys'] == ['meta'] + assert 'ts_mono' in json_dict + assert 'ts_wall' in json_dict + + def test_dto_to_protobuf_json(self): + """Тест конвертации DTO в JSON через protobuf""" + dto = FsmSnapshotDTO( + version=3, + state=FsmState.ERROR_STATE, + reason="PROTOBUF_JSON_TEST" + ) + + json_dict = dto_to_protobuf_json(dto) + + # Должен быть словарь с protobuf структурой + assert isinstance(json_dict, dict) + assert 'currentState' in json_dict or 'current_state' in json_dict # может варьироваться + + def test_json_formats_consistency(self): + """Тест согласованности разных JSON форматов""" + dto = initial_snapshot() + + lightweight_json = dto_to_json_dict(dto) + protobuf_json = dto_to_protobuf_json(dto) + + # Версии должны совпадать + assert lightweight_json['version'] == int(protobuf_json.get('stateMetadata', {}).get('dtoVersion', '0')) + + # Состояния должны соответствовать + assert lightweight_json['state'] == 'BOOTING' + + +class TestHelperFunctions: + """Тесты вспомогательных функций""" + + def test_create_proto_snapshot(self): + """Тест создания protobuf снапшота из основных параметров""" + proto = create_proto_snapshot(FsmState.ACTIVE, "HELPER_TEST", version=10) + + assert proto.current_state == FSMStateEnum.ACTIVE + assert proto.state_metadata['dto_reason'] == "HELPER_TEST" + assert proto.state_metadata['dto_version'] == "10" + + def test_parse_proto_snapshot(self): + """Тест парсинга protobuf данных в DTO""" + # Создаём protobuf снапшот + original_dto = FsmSnapshotDTO( + version=7, + state=FsmState.SHUTDOWN, + reason="PARSE_TEST" + ) + proto = dto_to_proto(original_dto) + proto_bytes = proto.SerializeToString() + + # Парсим обратно + parsed_dto = parse_proto_snapshot(proto_bytes) + + assert parsed_dto.version == 7 + assert parsed_dto.state == FsmState.SHUTDOWN + assert parsed_dto.reason == "PARSE_TEST" + + +class TestTimestampHandling: + """Тесты обработки временных меток""" + + def test_float_to_timestamp_conversion(self): + """Тест конвертации float времени в protobuf Timestamp""" + from services.q_core_agent.state.conv import _float_to_timestamp, _timestamp_to_float + + # Тестируем различные временные значения + test_times = [0.0, 1234567890.5, 1234567890.123456789] + + for original_time in test_times: + timestamp = _float_to_timestamp(original_time) + converted_back = _timestamp_to_float(timestamp) + + # Точность до миллисекунд должна сохраняться + assert abs(converted_back - original_time) < 0.001 + + def test_zero_timestamp_handling(self): + """Тест обработки нулевых временных меток""" + from services.q_core_agent.state.conv import _float_to_timestamp, _timestamp_to_float + + # Нулевое время + timestamp = _float_to_timestamp(0.0) + assert timestamp.seconds == 0 + assert timestamp.nanos == 0 + + # Обратная конвертация + assert _timestamp_to_float(timestamp) == 0.0 + + # None timestamp + assert _timestamp_to_float(None) == 0.0 + + +class TestEdgeCasesAndBoundaries: + """Тесты граничных случаев""" + + def test_very_large_version_numbers(self): + """Тест очень больших номеров версий""" + large_version = 2**50 # очень большое число + + dto = FsmSnapshotDTO(version=large_version, state=FsmState.IDLE) + proto = dto_to_proto(dto) + converted_back = proto_to_dto(proto) + + assert converted_back.version == large_version + + def test_unicode_strings_in_conversion(self): + """Тест unicode строк в конвертации""" + unicode_reason = "Тест на русском 🚀 中文测试" + unicode_trigger = "Событие_中文_🎯" + + dto = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason=unicode_reason, + context_data={"unicode_key": "значение_中文"}, + state_metadata={"событие": unicode_trigger} + ) + + proto = dto_to_proto(dto) + converted_back = proto_to_dto(proto) + + assert converted_back.reason == unicode_reason + assert converted_back.context_data["unicode_key"] == "значение_中文" + assert converted_back.state_metadata["событие"] == unicode_trigger + + def test_empty_collections_handling(self): + """Тест обработки пустых коллекций""" + dto = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + history=[], # пустая история + context_data={}, # пустые данные + state_metadata={} # пустые метаданные + ) + + proto = dto_to_proto(dto) + converted_back = proto_to_dto(proto) + + assert converted_back.history == () + assert converted_back.context_data == {} + # state_metadata может содержать DTO-специфичные поля + + def test_large_collections_handling(self): + """Тест обработки больших коллекций""" + # Большая история + large_history = [ + create_transition(FsmState.IDLE, FsmState.ACTIVE, f"EVENT_{i}") + for i in range(1000) + ] + + # Много метаданных + large_context = {f"key_{i}": f"value_{i}" for i in range(500)} + + dto = FsmSnapshotDTO( + version=1, + state=FsmState.ACTIVE, + history=large_history, + context_data=large_context + ) + + proto = dto_to_proto(dto) + converted_back = proto_to_dto(proto) + + assert len(converted_back.history) == 1000 + assert len(converted_back.context_data) == 500 + assert all(h.trigger_event.startswith("EVENT_") for h in converted_back.history) + + +if __name__ == "__main__": + # Быстрый запуск тестов + pytest.main([__file__, "-v", "--tb=short"]) diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_integration.py b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_integration.py new file mode 100644 index 0000000..40633e5 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_integration.py @@ -0,0 +1,600 @@ +""" +Integration тесты для StateStore архитектуры. +Проверяют взаимодействие между компонентами: FSMHandler + StateStore + конвертеры. +""" +import pytest +import asyncio +import os +from unittest.mock import Mock, AsyncMock, patch + +from ..store import AsyncStateStore, create_initialized_store +from ..types import FsmSnapshotDTO, FsmState, TransitionStatus, initial_snapshot, next_snapshot +from ..conv import dto_to_proto, proto_to_dto, dto_to_json_dict + +# Импорт компонентов для интеграции (мокаем для изоляции от core) +from unittest.mock import MagicMock + + +class MockAgentContext: + """Мок контекста агента для тестирования FSMHandler""" + + def __init__(self): + self.bios_ok = True + self.has_proposals = False + self.fsm_state = None + self.bios_status = None + self.proposals = [] + + def is_bios_ok(self) -> bool: + return self.bios_ok + + def has_valid_proposals(self) -> bool: + return self.has_proposals + + +class MockFSMHandler: + """Мок FSMHandler для тестирования интеграции""" + + def __init__(self, context: MockAgentContext, state_store: AsyncStateStore = None): + self.context = context + self.state_store = state_store + + async def process_fsm_dto(self, current_dto: FsmSnapshotDTO) -> FsmSnapshotDTO: + """Упрощённая логика FSM для тестов""" + bios_ok = self.context.is_bios_ok() + has_proposals = self.context.has_valid_proposals() + + # Простая логика переходов + new_state = current_dto.state + reason = "NO_CHANGE" + + if current_dto.state == FsmState.BOOTING and bios_ok: + new_state = FsmState.IDLE + reason = "BOOT_COMPLETE" + elif current_dto.state == FsmState.IDLE and has_proposals: + new_state = FsmState.ACTIVE + reason = "PROPOSALS_RECEIVED" + elif current_dto.state == FsmState.ACTIVE and not has_proposals: + new_state = FsmState.IDLE + reason = "NO_PROPOSALS" + elif not bios_ok: + new_state = FsmState.ERROR_STATE + reason = "BIOS_ERROR" + + # Создаём новый снапшот + updated_dto = next_snapshot( + current=current_dto, + new_state=new_state, + reason=reason + ) + + # Записываем в StateStore + if self.state_store: + try: + stored_dto = await self.state_store.set(updated_dto) + return stored_dto + except Exception: + # Игнорируем проблемы StateStore, возвращаем локальный результат + return updated_dto + + return updated_dto + + +@pytest.fixture +def mock_context(): + """Мок контекста агента""" + return MockAgentContext() + + +@pytest.fixture +def state_store(): + """StateStore с начальным состоянием""" + return create_initialized_store() + + +@pytest.fixture +def fsm_handler(mock_context, state_store): + """FSMHandler с StateStore""" + return MockFSMHandler(mock_context, state_store) + + +class TestFSMHandlerStateStoreIntegration: + """Интеграционные тесты FSMHandler + StateStore""" + + @pytest.mark.asyncio + async def test_basic_fsm_processing_with_store(self, fsm_handler, state_store): + """Тест базовой обработки FSM с записью в StateStore""" + # Получаем начальное состояние + initial = await state_store.get() + assert initial.state == FsmState.BOOTING + + # Обрабатываем переход BOOTING -> IDLE + result = await fsm_handler.process_fsm_dto(initial) + + assert result.state == FsmState.IDLE + assert result.reason == "BOOT_COMPLETE" + assert result.version == 1 # версия увеличилась + + # Проверяем что состояние записалось в StateStore + stored = await state_store.get() + assert stored == result + assert stored.state == FsmState.IDLE + + @pytest.mark.asyncio + async def test_fsm_state_sequence(self, fsm_handler, state_store, mock_context): + """Тест последовательности переходов FSM""" + # 1. BOOTING -> IDLE (BIOS OK) + initial = await state_store.get() + mock_context.bios_ok = True + mock_context.has_proposals = False + + result1 = await fsm_handler.process_fsm_dto(initial) + assert result1.state == FsmState.IDLE + assert result1.version == 1 + + # 2. IDLE -> ACTIVE (proposals появились) + mock_context.has_proposals = True + + result2 = await fsm_handler.process_fsm_dto(result1) + assert result2.state == FsmState.ACTIVE + assert result2.reason == "PROPOSALS_RECEIVED" + assert result2.version == 2 + + # 3. ACTIVE -> IDLE (proposals исчезли) + mock_context.has_proposals = False + + result3 = await fsm_handler.process_fsm_dto(result2) + assert result3.state == FsmState.IDLE + assert result3.reason == "NO_PROPOSALS" + assert result3.version == 3 + + # 4. IDLE -> ERROR_STATE (BIOS ошибка) + mock_context.bios_ok = False + + result4 = await fsm_handler.process_fsm_dto(result3) + assert result4.state == FsmState.ERROR_STATE + assert result4.reason == "BIOS_ERROR" + assert result4.version == 4 + + # Проверяем финальное состояние в StateStore + final_stored = await state_store.get() + assert final_stored == result4 + + @pytest.mark.asyncio + async def test_version_monotonicity(self, fsm_handler, state_store, mock_context): + """Тест монотонности версий при множественных изменениях""" + versions = [] + + # Серия изменений состояний + for i in range(10): + current = await state_store.get() + + # Меняем условия для вызова переходов + mock_context.has_proposals = (i % 2 == 0) + + result = await fsm_handler.process_fsm_dto(current) + versions.append(result.version) + + # Версии должны монотонно возрастать + assert all(versions[i] <= versions[i+1] for i in range(len(versions)-1)) + assert len(set(versions)) > 1 # должны быть разные версии + + @pytest.mark.asyncio + async def test_no_state_change_keeps_version(self, fsm_handler, state_store, mock_context): + """Тест что отсутствие изменений не увеличивает версию""" + # Устанавливаем стабильные условия + mock_context.bios_ok = True + mock_context.has_proposals = False + + # Первый переход BOOTING -> IDLE + initial = await state_store.get() + result1 = await fsm_handler.process_fsm_dto(initial) + assert result1.state == FsmState.IDLE + assert result1.version == 1 + + # Повторная обработка без изменений условий + result2 = await fsm_handler.process_fsm_dto(result1) + + # Состояние и версия не должны измениться + assert result2.state == FsmState.IDLE + assert result2.version == 1 # версия не увеличилась + assert result2.reason == "NO_CHANGE" + + @pytest.mark.asyncio + async def test_fsm_handler_without_state_store(self, mock_context): + """Тест FSMHandler без StateStore (fallback режим)""" + # FSMHandler без StateStore + handler = MockFSMHandler(mock_context, state_store=None) + + initial = initial_snapshot() + result = await handler.process_fsm_dto(initial) + + # Должно работать, но без записи в StateStore + assert result.state == FsmState.IDLE # переход произошёл + assert result.version == 1 + + +class TestStateStoreSubscriberIntegration: + """Интеграционные тесты подписчиков StateStore""" + + @pytest.mark.asyncio + async def test_subscriber_receives_fsm_updates(self, fsm_handler, state_store): + """Тест что подписчики получают обновления от FSM""" + # Подписываемся на изменения + queue = await state_store.subscribe("fsm_test_subscriber") + + # Должно быть начальное состояние + initial_update = await asyncio.wait_for(queue.get(), timeout=1.0) + assert initial_update.state == FsmState.BOOTING + + # Выполняем FSM переход + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + + # Подписчик должен получить обновление + update = await asyncio.wait_for(queue.get(), timeout=1.0) + assert update.state == FsmState.IDLE + assert update.reason == "BOOT_COMPLETE" + + @pytest.mark.asyncio + async def test_multiple_subscribers_fsm_updates(self, fsm_handler, state_store, mock_context): + """Тест множественных подписчиков при FSM обновлениях""" + # Создаём несколько подписчиков + queues = [] + for i in range(5): + queue = await state_store.subscribe(f"subscriber_{i}") + queues.append(queue) + + # Очищаем начальные сообщения + for queue in queues: + await queue.get() # начальное состояние + + # Выполняем серию FSM переходов + transitions = [ + (True, False, FsmState.IDLE), # BOOTING -> IDLE + (True, True, FsmState.ACTIVE), # IDLE -> ACTIVE + (True, False, FsmState.IDLE), # ACTIVE -> IDLE + ] + + for bios_ok, has_proposals, expected_state in transitions: + mock_context.bios_ok = bios_ok + mock_context.has_proposals = has_proposals + + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + + # Все подписчики должны получить обновление + for i, queue in enumerate(queues): + update = await asyncio.wait_for(queue.get(), timeout=1.0) + assert update.state == expected_state, f"Subscriber {i} got wrong state" + + @pytest.mark.asyncio + async def test_subscriber_stream_consistency(self, fsm_handler, state_store, mock_context): + """Тест согласованности потока обновлений у подписчика""" + queue = await state_store.subscribe("consistency_test") + + # Очищаем начальное сообщение + await queue.get() + + received_updates = [] + + # Выполняем быструю серию изменений + for i in range(20): + mock_context.has_proposals = (i % 3 == 0) + + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + + # Собираем все доступные обновления + try: + while True: + update = queue.get_nowait() + received_updates.append(update) + except asyncio.QueueEmpty: + pass + + # Должны получить все обновления в правильном порядке + assert len(received_updates) > 0 + + # Версии должны идти по возрастанию + versions = [u.version for u in received_updates] + assert all(versions[i] <= versions[i+1] for i in range(len(versions)-1)) + + +class TestConversionIntegration: + """Интеграционные тесты конвертации с реальными данными""" + + @pytest.mark.asyncio + async def test_dto_protobuf_roundtrip_with_real_fsm_data(self, fsm_handler, state_store, mock_context): + """Тест roundtrip конвертации с реальными FSM данными""" + # Выполняем несколько FSM переходов для накопления реальных данных + mock_context.bios_ok = True + mock_context.has_proposals = False + + current = await state_store.get() + result1 = await fsm_handler.process_fsm_dto(current) # BOOTING -> IDLE + + mock_context.has_proposals = True + result2 = await fsm_handler.process_fsm_dto(result1) # IDLE -> ACTIVE + + # Конвертируем в protobuf и обратно + proto = dto_to_proto(result2) + converted_back = proto_to_dto(proto) + + # Основные данные должны сохраниться + assert converted_back.version == result2.version + assert converted_back.state == result2.state + assert converted_back.reason == result2.reason + assert len(converted_back.history) == len(result2.history) + + @pytest.mark.asyncio + async def test_json_conversion_with_fsm_history(self, fsm_handler, state_store, mock_context): + """Тест JSON конвертации с историей FSM переходов""" + # Накапливаем историю переходов + transitions_sequence = [ + (True, False), # -> IDLE + (True, True), # -> ACTIVE + (True, False), # -> IDLE + (False, False), # -> ERROR_STATE + ] + + for bios_ok, has_proposals in transitions_sequence: + mock_context.bios_ok = bios_ok + mock_context.has_proposals = has_proposals + + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + + # Получаем финальное состояние с историей + final_state = await state_store.get() + + # Конвертируем в JSON + json_dict = dto_to_json_dict(final_state) + + assert json_dict['state'] == 'ERROR_STATE' + assert json_dict['history_count'] > 0 + assert 'version' in json_dict + assert 'ts_mono' in json_dict + assert 'ts_wall' in json_dict + + # JSON должен быть сериализуемым + import json + json_str = json.dumps(json_dict) # не должно упасть + parsed_back = json.loads(json_str) + assert parsed_back['state'] == 'ERROR_STATE' + + +class TestConcurrentIntegration: + """Интеграционные тесты конкурентного доступа между компонентами""" + + @pytest.mark.asyncio + async def test_concurrent_fsm_processing(self, mock_context, state_store): + """Тест конкурентной обработки FSM от нескольких handlers""" + # Создаём несколько FSM handlers + handlers = [ + MockFSMHandler(mock_context, state_store) for _ in range(5) + ] + + async def process_transitions(handler, handler_id): + """Выполняет серию переходов от одного handler'а""" + for i in range(10): + current = await state_store.get() + + # Небольшая вариация условий + mock_context.has_proposals = ((handler_id + i) % 2 == 0) + + await handler.process_fsm_dto(current) + await asyncio.sleep(0.001) # небольшая пауза + + # Запускаем конкурентную обработку + tasks = [ + process_transitions(handler, i) + for i, handler in enumerate(handlers) + ] + await asyncio.gather(*tasks) + + # Проверяем финальное состояние + final_state = await state_store.get() + assert final_state is not None + assert final_state.version > 0 + + # Метрики должны отражать активность + metrics = await state_store.get_metrics() + assert metrics['total_sets'] >= 50 # минимум от всех handlers + + @pytest.mark.asyncio + async def test_concurrent_subscribers_and_fsm(self, fsm_handler, state_store, mock_context): + """Тест конкурентных подписчиков во время FSM обработки""" + # Запускаем подписчиков + subscriber_results = [] + + async def subscriber_task(subscriber_id): + """Задача подписчика - собирает все обновления""" + queue = await state_store.subscribe(f"concurrent_sub_{subscriber_id}") + updates = [] + + try: + # Собираем обновления в течение времени теста + while len(updates) < 10: # ожидаем несколько обновлений + update = await asyncio.wait_for(queue.get(), timeout=2.0) + updates.append(update) + except asyncio.TimeoutError: + pass + + subscriber_results.append((subscriber_id, updates)) + + async def fsm_processing_task(): + """Задача FSM - выполняет переходы""" + for i in range(15): + mock_context.bios_ok = (i % 3 != 2) # иногда BIOS падает + mock_context.has_proposals = (i % 2 == 0) + + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + await asyncio.sleep(0.05) + + # Запускаем всё конкурентно + await asyncio.gather( + subscriber_task(0), + subscriber_task(1), + subscriber_task(2), + fsm_processing_task() + ) + + # Все подписчики должны получить обновления + assert len(subscriber_results) == 3 + for sub_id, updates in subscriber_results: + assert len(updates) > 0, f"Subscriber {sub_id} got no updates" + + # Проверяем монотонность версий + versions = [u.version for u in updates] + assert all(versions[i] <= versions[i+1] for i in range(len(versions)-1)) + + +class TestErrorHandlingIntegration: + """Интеграционные тесты обработки ошибок""" + + @pytest.mark.asyncio + async def test_state_store_failure_recovery(self, mock_context): + """Тест восстановления при сбоях StateStore""" + # Создаём StateStore который иногда падает + store = AsyncStateStore() + handler = MockFSMHandler(mock_context, store) + + # Мокаем store.set чтобы иногда падал + original_set = store.set + call_count = 0 + + async def failing_set(snapshot): + nonlocal call_count + call_count += 1 + if call_count == 2: # второй вызов падает + raise Exception("StateStore failure simulation") + return await original_set(snapshot) + + store.set = failing_set + + # Выполняем серию операций + current = await store.initialize_if_empty() + + # Первая операция должна пройти + result1 = await handler.process_fsm_dto(current) + assert result1.state == FsmState.IDLE + + # Вторая операция упадёт в StateStore, но FSMHandler должен обработать + mock_context.has_proposals = True + result2 = await handler.process_fsm_dto(result1) + assert result2.state == FsmState.ACTIVE # переход всё равно произошёл + + # StateStore может быть в несогласованном состоянии + stored = await store.get() + # Может не совпадать с result2 из-за ошибки записи + + @pytest.mark.asyncio + async def test_conversion_error_handling(self, state_store): + """Тест обработки ошибок конвертации в интеграции""" + # Создаём снапшот с проблемными данными + problematic_dto = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason="Test" * 10000, # очень длинная строка + context_data={f"key_{i}": f"value_{i}" * 100 for i in range(1000)} # много данных + ) + + await state_store.set(problematic_dto) + + # Конвертация должна работать даже с большими данными + retrieved = await state_store.get() + proto = dto_to_proto(retrieved) + + assert proto.current_state == dto_to_proto(problematic_dto).current_state + + # JSON конвертация должна быть безопасной + json_dict = dto_to_json_dict(retrieved) + assert 'version' in json_dict + + @pytest.mark.asyncio + async def test_subscriber_error_isolation(self, fsm_handler, state_store): + """Тест изоляции ошибок подписчиков""" + # Создаём нормального подписчика + good_queue = await state_store.subscribe("good_subscriber") + + # Создаём "плохого" подписчика (мок который падает) + bad_queue = Mock() + bad_queue.put_nowait = Mock(side_effect=Exception("Subscriber error")) + + # Добавляем плохого подписчика напрямую + async with state_store._lock: + state_store._subscribers.append(bad_queue) + state_store._subscriber_ids[id(bad_queue)] = "bad_subscriber" + + # Очищаем начальные сообщения у хорошего подписчика + await good_queue.get() + + # Выполняем FSM операцию + current = await state_store.get() + await fsm_handler.process_fsm_dto(current) + + # Хороший подписчик должен получить обновление несмотря на плохого + update = await asyncio.wait_for(good_queue.get(), timeout=1.0) + assert update.state == FsmState.IDLE + + # Плохой подписчик должен быть удалён + async with state_store._lock: + assert bad_queue not in state_store._subscribers + + +class TestFeatureFlagIntegration: + """Интеграционные тесты с feature флагами""" + + @pytest.mark.asyncio + async def test_state_store_enable_disable(self, mock_context): + """Тест включения/выключения StateStore через переменную окружения""" + store = AsyncStateStore() + + # Тест с включённым StateStore + with patch.dict(os.environ, {'QIKI_USE_STATESTORE': 'true'}): + handler = MockFSMHandler(mock_context, store) + + initial = await store.initialize_if_empty() + result = await handler.process_fsm_dto(initial) + + # Результат должен быть записан в StateStore + stored = await store.get() + assert stored == result + + # Тест с отключённым StateStore + with patch.dict(os.environ, {'QIKI_USE_STATESTORE': 'false'}): + handler_disabled = MockFSMHandler(mock_context, None) + + result_disabled = await handler_disabled.process_fsm_dto(initial) + + # StateStore не должен обновляться + stored_after = await store.get() + assert stored_after == stored # не изменился + + @pytest.mark.asyncio + async def test_graceful_degradation(self, mock_context): + """Тест плавной деградации при проблемах с StateStore""" + # Создаём "сломанный" StateStore + broken_store = Mock(spec=AsyncStateStore) + broken_store.set = AsyncMock(side_effect=Exception("Store is broken")) + broken_store.get = AsyncMock(return_value=None) + + handler = MockFSMHandler(mock_context, broken_store) + + # Обработка должна работать даже со сломанным StateStore + initial = initial_snapshot() + result = await handler.process_fsm_dto(initial) + + # FSM переход должен произойти + assert result.state == FsmState.IDLE + assert result.reason == "BOOT_COMPLETE" + + # StateStore.set был вызван, но упал + broken_store.set.assert_called_once() + + +if __name__ == "__main__": + # Запуск интеграционных тестов + pytest.main([__file__, "-v", "-s", "--tb=short"]) diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py new file mode 100644 index 0000000..1af08ac --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/tests/test_types.py @@ -0,0 +1,400 @@ +""" +Серьёзные unit тесты для DTO типов StateStore. +Проверяют immutability, создание снапшотов, переходы состояний. +""" +import pytest +import time +import uuid +from dataclasses import FrozenInstanceError + +from ..types import ( + FsmSnapshotDTO, TransitionDTO, FsmState, TransitionStatus, + initial_snapshot, create_transition, next_snapshot +) + + +class TestFsmState: + """Тесты enum'а FsmState""" + + def test_fsm_state_values(self): + """Проверяем корректность значений enum""" + assert FsmState.UNSPECIFIED == 0 + assert FsmState.BOOTING == 1 + assert FsmState.IDLE == 2 + assert FsmState.ACTIVE == 3 + assert FsmState.ERROR_STATE == 4 + assert FsmState.SHUTDOWN == 5 + + def test_fsm_state_names(self): + """Проверяем корректность имён enum""" + assert FsmState.BOOTING.name == 'BOOTING' + assert FsmState.IDLE.name == 'IDLE' + assert FsmState.ACTIVE.name == 'ACTIVE' + assert FsmState.ERROR_STATE.name == 'ERROR_STATE' + + +class TestTransitionDTO: + """Тесты TransitionDTO - immutable переходы состояний""" + + def test_transition_creation(self): + """Тест создания перехода""" + transition = TransitionDTO( + from_state=FsmState.BOOTING, + to_state=FsmState.IDLE, + trigger_event="BOOT_COMPLETE", + status=TransitionStatus.SUCCESS + ) + + assert transition.from_state == FsmState.BOOTING + assert transition.to_state == FsmState.IDLE + assert transition.trigger_event == "BOOT_COMPLETE" + assert transition.status == TransitionStatus.SUCCESS + assert transition.ts_mono > 0 # автоустановка времени + assert transition.ts_wall > 0 + + def test_transition_immutability(self): + """Тест неизменяемости TransitionDTO""" + transition = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ACTIVE, + trigger_event="TEST" + ) + + # Попытка изменения должна упасть + with pytest.raises(FrozenInstanceError): + transition.from_state = FsmState.BOOTING + + with pytest.raises(FrozenInstanceError): + transition.trigger_event = "MODIFIED" + + def test_transition_with_error(self): + """Тест перехода с ошибкой""" + transition = TransitionDTO( + from_state=FsmState.IDLE, + to_state=FsmState.ERROR_STATE, + trigger_event="BIOS_ERROR", + status=TransitionStatus.FAILED, + error_message="BIOS check failed" + ) + + assert transition.status == TransitionStatus.FAILED + assert transition.error_message == "BIOS check failed" + + def test_create_transition_helper(self): + """Тест helper функции create_transition""" + transition = create_transition( + from_state=FsmState.ACTIVE, + to_state=FsmState.IDLE, + trigger="NO_PROPOSALS", + status=TransitionStatus.SUCCESS, + error_msg="" + ) + + assert isinstance(transition, TransitionDTO) + assert transition.from_state == FsmState.ACTIVE + assert transition.to_state == FsmState.IDLE + assert transition.trigger_event == "NO_PROPOSALS" + + +class TestFsmSnapshotDTO: + """Тесты FsmSnapshotDTO - основной DTO для состояния FSM""" + + def test_snapshot_creation(self): + """Тест создания снапшота""" + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason="TEST_REASON" + ) + + assert snapshot.version == 1 + assert snapshot.state == FsmState.IDLE + assert snapshot.reason == "TEST_REASON" + assert snapshot.ts_mono > 0 # автоустановка времени + assert snapshot.ts_wall > 0 + assert snapshot.snapshot_id # автогенерация UUID + assert snapshot.fsm_instance_id + + def test_snapshot_immutability(self): + """Тест неизменяемости FsmSnapshotDTO""" + snapshot = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + + # Попытка изменения должна упасть + with pytest.raises(FrozenInstanceError): + snapshot.version = 2 + + with pytest.raises(FrozenInstanceError): + snapshot.state = FsmState.ACTIVE + + def test_snapshot_with_history(self): + """Тест снапшота с историей переходов""" + transition1 = create_transition(FsmState.BOOTING, FsmState.IDLE, "BOOT_COMPLETE") + transition2 = create_transition(FsmState.IDLE, FsmState.ACTIVE, "PROPOSALS_RECEIVED") + + snapshot = FsmSnapshotDTO( + version=2, + state=FsmState.ACTIVE, + history=[transition1, transition2] + ) + + assert len(snapshot.history) == 2 + assert snapshot.history[0] == transition1 + assert snapshot.history[1] == transition2 + + # История immutable + with pytest.raises(AttributeError): + snapshot.history.append(transition1) + + def test_snapshot_with_metadata(self): + """Тест снапшота с метаданными""" + context_data = {"sensor_count": "5", "proposal_count": "3"} + state_metadata = {"debug_mode": "true", "test_run": "false"} + + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.ACTIVE, + context_data=context_data, + state_metadata=state_metadata + ) + + assert snapshot.context_data == context_data + assert snapshot.state_metadata == state_metadata + + def test_snapshot_defaults(self): + """Тест значений по умолчанию""" + snapshot = FsmSnapshotDTO(version=0, state=FsmState.BOOTING) + + assert snapshot.prev_state is None + assert snapshot.source_module == "fsm_handler" + assert snapshot.attempt_count == 0 + assert snapshot.history == () + assert snapshot.context_data == {} + assert snapshot.state_metadata == {} + + def test_snapshot_uuid_validation(self): + """Тест генерации и валидации UUID""" + snapshot = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + + # UUID должны быть валидными + uuid.UUID(snapshot.snapshot_id) # не должно упасть + uuid.UUID(snapshot.fsm_instance_id) # не должно упасть + + # UUID должны быть уникальными + snapshot2 = FsmSnapshotDTO(version=1, state=FsmState.IDLE) + assert snapshot.snapshot_id != snapshot2.snapshot_id + assert snapshot.fsm_instance_id != snapshot2.fsm_instance_id + + +class TestInitialSnapshot: + """Тесты функции initial_snapshot""" + + def test_initial_snapshot_creation(self): + """Тест создания начального снапшота""" + snapshot = initial_snapshot() + + assert snapshot.version == 0 + assert snapshot.state == FsmState.BOOTING + assert snapshot.prev_state is None + assert snapshot.reason == "COLD_START" + assert snapshot.source_module == "initial_boot" + assert snapshot.attempt_count == 0 + + def test_initial_snapshot_immutability(self): + """Тест неизменяемости начального снапшота""" + snapshot = initial_snapshot() + + with pytest.raises(FrozenInstanceError): + snapshot.version = 1 + + with pytest.raises(FrozenInstanceError): + snapshot.state = FsmState.IDLE + + def test_initial_snapshot_timing(self): + """Тест временных меток начального снапшота""" + before = time.time() + snapshot = initial_snapshot() + after = time.time() + + # Времена должны быть в разумных пределах + assert before <= snapshot.ts_wall <= after + assert snapshot.ts_mono > 0 + + +class TestNextSnapshot: + """Тесты функции next_snapshot - ключевая для переходов""" + + def test_state_change_increments_version(self): + """Тест инкремента версии при изменении состояния""" + current = FsmSnapshotDTO(version=1, state=FsmState.BOOTING) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.IDLE, + reason="BOOT_COMPLETE" + ) + + assert next_snap.version == 2 # версия увеличилась + assert next_snap.state == FsmState.IDLE + assert next_snap.prev_state == FsmState.BOOTING + assert next_snap.reason == "BOOT_COMPLETE" + assert next_snap.attempt_count == 1 # попытка увеличилась + + def test_no_state_change_keeps_version(self): + """Тест сохранения версии при отсутствии изменений""" + current = FsmSnapshotDTO(version=5, state=FsmState.IDLE) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.IDLE, # то же состояние + reason="NO_CHANGE" + ) + + assert next_snap.version == 5 # версия не изменилась + assert next_snap.state == FsmState.IDLE + assert next_snap.prev_state == current.prev_state # сохранилось предыдущее + assert next_snap.attempt_count == current.attempt_count # не увеличилось + + def test_next_snapshot_with_transition(self): + """Тест создания следующего снапшота с переходом""" + current = FsmSnapshotDTO(version=2, state=FsmState.IDLE) + transition = create_transition(FsmState.IDLE, FsmState.ACTIVE, "PROPOSALS_RECEIVED") + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="PROPOSALS_RECEIVED", + transition=transition + ) + + assert len(next_snap.history) == len(current.history) + 1 + assert next_snap.history[-1] == transition # переход добавлен в конец + + def test_next_snapshot_preserves_instance_id(self): + """Тест сохранения instance_id между снапшотами""" + instance_id = str(uuid.uuid4()) + current = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + fsm_instance_id=instance_id + ) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="STATE_CHANGE" + ) + + assert next_snap.fsm_instance_id == instance_id # сохранился + assert next_snap.snapshot_id != current.snapshot_id # но snapshot_id новый + + def test_next_snapshot_preserves_metadata(self): + """Тест сохранения метаданных между снапшотами""" + context_data = {"key1": "value1"} + state_metadata = {"key2": "value2"} + + current = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + context_data=context_data, + state_metadata=state_metadata + ) + + next_snap = next_snapshot( + current=current, + new_state=FsmState.ACTIVE, + reason="CHANGE" + ) + + # Метаданные скопированы (но это новые dict'ы) + assert next_snap.context_data == context_data + assert next_snap.state_metadata == state_metadata + assert next_snap.context_data is not current.context_data # новый объект + assert next_snap.state_metadata is not current.state_metadata # новый объект + + +class TestEdgeCases: + """Тесты граничных случаев и ошибок""" + + def test_empty_strings_and_none_values(self): + """Тест обработки пустых строк и None значений""" + snapshot = FsmSnapshotDTO( + version=0, + state=FsmState.IDLE, + reason="", # пустая строка + snapshot_id="", # пустая строка -> должна автогенерироваться + prev_state=None, # None значение + history=None, # None -> должен стать [] + context_data=None, # None -> должен стать {} + ) + + assert snapshot.reason == "" + assert snapshot.snapshot_id # автогенерированный UUID + assert snapshot.prev_state is None + assert snapshot.history == () + assert snapshot.context_data == {} + + def test_large_history_handling(self): + """Тест обработки большой истории переходов""" + # Создаём большую историю переходов + large_history = [ + create_transition(FsmState.IDLE, FsmState.ACTIVE, f"EVENT_{i}") + for i in range(1000) + ] + + snapshot = FsmSnapshotDTO( + version=1000, + state=FsmState.ACTIVE, + history=large_history + ) + + assert len(snapshot.history) == 1000 + assert all(isinstance(t, TransitionDTO) for t in snapshot.history) + + def test_version_overflow_behavior(self): + """Тест поведения при больших номерах версий""" + large_version = 2**63 - 1 # максимальный int64 + + snapshot = FsmSnapshotDTO( + version=large_version, + state=FsmState.IDLE + ) + + # Следующий снапшот должен обработать overflow корректно + # (в Python int'ы не переполняются, но тестируем логику) + next_snap = next_snapshot( + current=snapshot, + new_state=FsmState.ACTIVE, + reason="TEST" + ) + + assert next_snap.version == large_version + 1 + + def test_unicode_strings(self): + """Тест обработки unicode строк""" + unicode_reason = "Причина на русском 🚀" + unicode_trigger = "事件_中文" + + snapshot = FsmSnapshotDTO( + version=1, + state=FsmState.IDLE, + reason=unicode_reason + ) + + transition = create_transition( + FsmState.IDLE, + FsmState.ACTIVE, + unicode_trigger + ) + + assert snapshot.reason == unicode_reason + assert transition.trigger_event == unicode_trigger + + # Должно работать с next_snapshot + next_snap = next_snapshot( + current=snapshot, + new_state=FsmState.ACTIVE, + reason=unicode_reason, + transition=transition + ) + + assert next_snap.reason == unicode_reason diff --git a/extracted/QIKI_DTMP/services/q_core_agent/state/types.py b/extracted/QIKI_DTMP/services/q_core_agent/state/types.py new file mode 100644 index 0000000..e446a21 --- /dev/null +++ b/extracted/QIKI_DTMP/services/q_core_agent/state/types.py @@ -0,0 +1,166 @@ +""" +DTO модель для FSM состояний без зависимостей от protobuf. +Immutable dataclasses для безопасной работы с состоянием. +""" +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Optional, Dict, Tuple +import time +import uuid + + +class FsmState(IntEnum): + """FSM состояния (копия из protobuf enum без привязки к proto)""" + UNSPECIFIED = 0 + BOOTING = 1 + IDLE = 2 + ACTIVE = 3 + ERROR_STATE = 4 + SHUTDOWN = 5 + + +class TransitionStatus(IntEnum): + """Статус перехода FSM""" + UNSPECIFIED = 0 + SUCCESS = 1 + FAILED = 2 + PENDING = 3 + + +@dataclass(frozen=True) +class TransitionDTO: + """DTO для перехода состояния FSM (immutable)""" + from_state: FsmState + to_state: FsmState + trigger_event: str + status: TransitionStatus = TransitionStatus.SUCCESS + error_message: str = "" + ts_mono: float = 0.0 + ts_wall: float = 0.0 + + def __post_init__(self): + # Устанавливаем временные метки если не заданы + if self.ts_mono == 0.0: + object.__setattr__(self, 'ts_mono', time.monotonic()) + if self.ts_wall == 0.0: + object.__setattr__(self, 'ts_wall', time.time()) + + +@dataclass(frozen=True) +class FsmSnapshotDTO: + """ + DTO для снапшота FSM состояния (immutable). + Внутренняя модель без protobuf зависимостей. + """ + # Обязательные поля + version: int + state: FsmState + reason: str = "" + + # Временные метки + ts_mono: float = 0.0 # monotonic time для порядка + ts_wall: float = 0.0 # wall clock time для логов + + # Опциональные поля + snapshot_id: str = "" + prev_state: Optional[FsmState] = None + fsm_instance_id: str = "" + source_module: str = "fsm_handler" + attempt_count: int = 0 + + # История переходов и метаданные + history: Tuple[TransitionDTO, ...] = field(default_factory=tuple) + context_data: Dict[str, str] = None + state_metadata: Dict[str, str] = None + + def __post_init__(self): + # Устанавливаем значения по умолчанию для mutable полей + if self.history is None: + object.__setattr__(self, 'history', tuple()) + elif isinstance(self.history, list): + object.__setattr__(self, 'history', tuple(self.history)) + + if self.context_data is None: + object.__setattr__(self, 'context_data', {}) + if self.state_metadata is None: + object.__setattr__(self, 'state_metadata', {}) + + # Устанавливаем временные метки если не заданы + if self.ts_mono == 0.0: + object.__setattr__(self, 'ts_mono', time.monotonic()) + if self.ts_wall == 0.0: + object.__setattr__(self, 'ts_wall', time.time()) + + # Генерируем ID если не задан + if not self.snapshot_id: + object.__setattr__(self, 'snapshot_id', str(uuid.uuid4())) + if not self.fsm_instance_id: + object.__setattr__(self, 'fsm_instance_id', str(uuid.uuid4())) + + +def initial_snapshot() -> FsmSnapshotDTO: + """Создаёт начальный снапшот для COLD_START""" + now_mono = time.monotonic() + now_wall = time.time() + + return FsmSnapshotDTO( + version=0, + state=FsmState.BOOTING, + prev_state=None, + reason="COLD_START", + ts_mono=now_mono, + ts_wall=now_wall, + source_module="initial_boot", + attempt_count=0 + ) + + +def create_transition( + from_state: FsmState, + to_state: FsmState, + trigger: str, + status: TransitionStatus = TransitionStatus.SUCCESS, + error_msg: str = "" +) -> TransitionDTO: + """Хелпер для создания перехода состояния""" + return TransitionDTO( + from_state=from_state, + to_state=to_state, + trigger_event=trigger, + status=status, + error_message=error_msg + ) + + +def next_snapshot( + current: FsmSnapshotDTO, + new_state: FsmState, + reason: str, + transition: Optional[TransitionDTO] = None +) -> FsmSnapshotDTO: + """ + Создаёт новый снапшот на базе текущего с обновлённым состоянием. + Увеличивает версию только при реальном изменении состояния. + """ + version_increment = 1 if new_state != current.state else 0 + new_version = current.version + version_increment + + # Автоматически создаём запись истории если переход произошёл + if transition is None and new_state != current.state: + transition = create_transition(current.state, new_state, reason) + + new_history = current.history + ((transition,) if transition else ()) + + return FsmSnapshotDTO( + version=new_version, + state=new_state, + prev_state=current.state if new_state != current.state else current.prev_state, + reason=reason, + snapshot_id=str(uuid.uuid4()), # новый ID для нового снапшота + fsm_instance_id=current.fsm_instance_id, # сохраняем instance ID + source_module=current.source_module, + attempt_count=current.attempt_count + (1 if new_state != current.state else 0), + history=new_history, + context_data=dict(current.context_data) if current.context_data else {}, + state_metadata=dict(current.state_metadata) if current.state_metadata else {} + )