Skip to content

Ограничение истории FSM и очередей#25

Open
sonra44 wants to merge 3 commits intomainfrom
codex/implement-max_history-in-fsmsnapshotdto-wcbuip
Open

Ограничение истории FSM и очередей#25
sonra44 wants to merge 3 commits intomainfrom
codex/implement-max_history-in-fsmsnapshotdto-wcbuip

Conversation

@sonra44
Copy link
Owner

@sonra44 sonra44 commented Aug 18, 2025

Summary

  • Реализовано ограничение длины истории в FsmSnapshotDTO.add_transition
  • QSimService переведён на asyncio.Queue с обработкой переполнения
  • StateStore теперь чистит неактивных подписчиков и корректно ведёт версии/метрики
  • Добавлены тесты на отсечение старой истории

Testing

  • pytest QIKI_DTMP/services/q_core_agent/state/tests/test_types.py -q
  • pytest QIKI_DTMP/services/q_core_agent/state/tests/test_store.py -q

https://chatgpt.com/codex/tasks/task_e_68a27e4b71848331af1d95c7f02e0293

Summary by Sourcery

Provide a new immutable FSM state model with controlled transition history, an asynchronous state store supporting pub/sub and metrics, update the QSimService to use asyncio queues with overflow handling, and add thorough unit tests for FSM types and history management

New Features:

  • Add immutable FSM DTOs (TransitionDTO and FsmSnapshotDTO) with helper functions for snapshot creation and state transitions
  • Introduce AsyncStateStore with asyncio-based pub/sub, version enforcement, metrics tracking, and subscriber cleanup
  • Convert QSimService to use asyncio.Queue for sensor and actuator streams with overflow handling

Enhancements:

  • Implement max_history parameter in FsmSnapshotDTO.add_transition to cap transition history length
  • Improve AsyncStateStore version conflict handling and automatically remove inactive subscriber queues

Tests:

  • Add comprehensive unit tests for FSM DTOs covering immutability, timestamp and UUID generation, history truncation, metadata handling, and edge cases

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Aug 18, 2025

Reviewer's Guide

This PR implements a cap on FSM transition history in snapshots, refactors QSimService to use bounded asyncio queues with overflow handling, enhances AsyncStateStore by cleaning stale subscribers and maintaining accurate version/metrics tracking, and adds exhaustive unit tests around history trimming and state transition behavior.

Sequence diagram for QSimService queue overflow handling

sequenceDiagram
    participant QSimService
    participant WorldModel
    participant SensorDataQueue
    participant ActuatorCommandQueue
    actor External

    External->>QSimService: send ActuatorCommand
    QSimService->>ActuatorCommandQueue: put_nowait(command)
    ActuatorCommandQueue-->>QSimService: QueueFull (if full)
    QSimService->>QSimService: log warning, drop command
    QSimService->>WorldModel: update(command)

    QSimService->>WorldModel: step(delta_time)
    QSimService->>QSimService: generate_sensor_data()
    QSimService->>SensorDataQueue: put_nowait(sensor_data)
    SensorDataQueue-->>QSimService: QueueFull (if full)
    QSimService->>QSimService: log warning, drop data
Loading

Class diagram for updated FSM DTOs and AsyncStateStore

classDiagram
    class FsmState {
        <<enum>>
        UNSPECIFIED
        BOOTING
        IDLE
        ACTIVE
        ERROR_STATE
        SHUTDOWN
    }
    class TransitionStatus {
        <<enum>>
        UNSPECIFIED
        SUCCESS
        FAILED
        PENDING
    }
    class TransitionDTO {
        +from_state: FsmState
        +to_state: FsmState
        +trigger_event: str
        +status: TransitionStatus
        +error_message: str
        +ts_mono: float
        +ts_wall: float
    }
    class FsmSnapshotDTO {
        +version: int
        +state: FsmState
        +reason: str
        +ts_mono: float
        +ts_wall: float
        +snapshot_id: str
        +prev_state: FsmState
        +fsm_instance_id: str
        +source_module: str
        +attempt_count: int
        +history: Tuple[TransitionDTO]
        +context_data: Dict[str, str]
        +state_metadata: Dict[str, str]
        +add_transition(transition, max_history): FsmSnapshotDTO
    }
    class AsyncStateStore {
        -_lock: asyncio.Lock
        -_snap: FsmSnapshotDTO
        -_subscribers: List[asyncio.Queue]
        -_subscriber_ids: Dict[int, str]
        -_metrics: Dict[str, Any]
        +get(): FsmSnapshotDTO
        +set(new_snap, enforce_version): FsmSnapshotDTO
        +subscribe(subscriber_id): asyncio.Queue
        +unsubscribe(queue)
        +initialize_if_empty(): FsmSnapshotDTO
        +get_metrics(): Dict[str, Any]
        +health_check(): Dict[str, Any]
    }
    FsmSnapshotDTO "1" *-- "*" TransitionDTO : history
    AsyncStateStore "1" *-- "*" asyncio.Queue : subscribers
Loading

Class diagram for QSimService with asyncio.Queue integration

classDiagram
    class QSimService {
        +config: Dict[str, Any]
        +world_model: WorldModel
        +sensor_data_queue: asyncio.Queue
        +actuator_command_queue: asyncio.Queue
        +generate_sensor_data(): SensorReading
        +receive_actuator_command(command)
        +run()
        +step()
    }
    QSimService "1" *-- "1" WorldModel
    QSimService "1" *-- "1" asyncio.Queue : sensor_data_queue
    QSimService "1" *-- "1" asyncio.Queue : actuator_command_queue
Loading

Class diagram for FSM history trimming in FsmSnapshotDTO

classDiagram
    class FsmSnapshotDTO {
        +add_transition(transition, max_history): FsmSnapshotDTO
    }
    class TransitionDTO
    FsmSnapshotDTO "1" *-- "*" TransitionDTO : history
    FsmSnapshotDTO : history capped by max_history
Loading

File-Level Changes

Change Details Files
Limit FSM snapshot history when adding transitions
  • Introduced max_history parameter to add_transition
  • Trim oldest entries when history exceeds max_history
  • Return a new immutable snapshot with updated history
QIKI_DTMP/services/q_core_agent/state/types.py
Refactor QSimService to use asyncio queues with overflow handling
  • Switched sensor_data_queue and actuator_command_queue to asyncio.Queue with configurable maxsize
  • Replaced blocking puts with put_nowait and catch QueueFull exceptions
  • Logged warnings when queues are full to drop data or commands
QIKI_DTMP/services/q_sim_service/main.py
Enhance AsyncStateStore to clean inactive subscribers and track metrics
  • Maintain subscriber IDs and metrics for total_sets, total_gets, conflicts
  • In _notify_subscribers remove queues on QueueFull or errors
  • Update subscribe/unsubscribe to adjust subscriber_count and send initial state
QIKI_DTMP/services/q_core_agent/state/store.py
Add unit tests covering history limits and FSM transitions
  • Tests for add_transition history capping
  • Tests for immutability and default values of DTOs
  • Tests for initial_snapshot and next_snapshot versioning and metadata preservation
QIKI_DTMP/services/q_core_agent/state/tests/test_types.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • AsyncStateStore is awaiting _notify_subscribers while holding the store lock in set()/initialize_if_empty, which can block other operations; consider releasing the lock before dispatching notifications.
  • QSimService mixes asyncio.Queue with a synchronous loop using time.sleep, which can lead to unexpected behavior; either convert run/step to async (await asyncio.sleep) or switch to a thread-safe queue.Queue for a purely synchronous design.
  • Consider adding unit tests for AsyncStateStore to cover subscriber overflow removal, unsubscribe behavior, and metrics updates, ensuring the new cleanup and versioning logic behaves correctly under load.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- AsyncStateStore is awaiting _notify_subscribers while holding the store lock in set()/initialize_if_empty, which can block other operations; consider releasing the lock before dispatching notifications.
- QSimService mixes asyncio.Queue with a synchronous loop using time.sleep, which can lead to unexpected behavior; either convert run/step to async (await asyncio.sleep) or switch to a thread-safe queue.Queue for a purely synchronous design.
- Consider adding unit tests for AsyncStateStore to cover subscriber overflow removal, unsubscribe behavior, and metrics updates, ensuring the new cleanup and versioning logic behaves correctly under load.

## Individual Comments

### Comment 1
<location> `QIKI_DTMP/services/q_sim_service/main.py:42` </location>
<code_context>
+            scalar_data=world_state["position"]["x"] # Example: return X position as scalar data
+        )
+
+    def receive_actuator_command(self, command: ActuatorCommand):
+        try:
+            self.actuator_command_queue.put_nowait(command)
+        except asyncio.QueueFull:
+            logger.warning("Actuator command queue full, dropping command")
+        else:
+            logger.info(f"QSim received actuator command: {MessageToDict(command)}")
+            self.world_model.update(command)  # Update world model based on command
+
+    def run(self):
</code_context>

<issue_to_address>
receive_actuator_command is synchronous but interacts with asyncio.Queue.

Consider making this method async and using 'await queue.put' to improve flow control and handle backpressure if it will be called from async code.

Suggested implementation:

```python
    async def receive_actuator_command(self, command: ActuatorCommand):
        await self.actuator_command_queue.put(command)
        logger.info(f"QSim received actuator command: {MessageToDict(command)}")
        self.world_model.update(command)  # Update world model based on command

```

If `receive_actuator_command` is called from synchronous code elsewhere, those call sites will need to be updated to use `await` and be in async functions. If you need help updating those, please provide the relevant code.
</issue_to_address>

### Comment 2
<location> `QIKI_DTMP/services/q_sim_service/main.py:30` </location>
<code_context>
+        self.actuator_command_queue = asyncio.Queue(maxsize=queue_size)
+        logger.info("QSimService initialized.")
+
+    def generate_sensor_data(self) -> SensorReading:
+        # Generate sensor data based on world model state
+        timestamp = Timestamp()
+        timestamp.GetCurrentTime()
+        world_state = self.world_model.get_state()
+        return SensorReading(
+            sensor_id=UUID(value="sim_lidar_front"),
+            sensor_type=self.config.get("sim_sensor_type", 1), # LIDAR
+            timestamp=timestamp,
+            scalar_data=world_state["position"]["x"] # Example: return X position as scalar data
+        )
+
</code_context>

<issue_to_address>
generate_sensor_data assumes world_state["position"]["x"] exists.

This will raise a KeyError if any expected field is missing. Please add checks or defaults to ensure robustness against malformed world_state.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    def generate_sensor_data(self) -> SensorReading:
        # Generate sensor data based on world model state
        timestamp = Timestamp()
        timestamp.GetCurrentTime()
        world_state = self.world_model.get_state()
        return SensorReading(
            sensor_id=UUID(value="sim_lidar_front"),
            sensor_type=self.config.get("sim_sensor_type", 1), # LIDAR
            timestamp=timestamp,
            scalar_data=world_state["position"]["x"] # Example: return X position as scalar data
        )
=======
    def generate_sensor_data(self) -> SensorReading:
        # Generate sensor data based on world model state
        timestamp = Timestamp()
        timestamp.GetCurrentTime()
        world_state = self.world_model.get_state()
        # Safely extract position.x, defaulting to 0.0 if missing
        position = world_state.get("position", {})
        x_value = position.get("x", 0.0)
        return SensorReading(
            sensor_id=UUID(value="sim_lidar_front"),
            sensor_type=self.config.get("sim_sensor_type", 1), # LIDAR
            timestamp=timestamp,
            scalar_data=x_value # Example: return X position as scalar data
        )
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +42 to +49
def receive_actuator_command(self, command: ActuatorCommand):
try:
self.actuator_command_queue.put_nowait(command)
except asyncio.QueueFull:
logger.warning("Actuator command queue full, dropping command")
else:
logger.info(f"QSim received actuator command: {MessageToDict(command)}")
self.world_model.update(command) # Update world model based on command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: receive_actuator_command is synchronous but interacts with asyncio.Queue.

Consider making this method async and using 'await queue.put' to improve flow control and handle backpressure if it will be called from async code.

Suggested implementation:

    async def receive_actuator_command(self, command: ActuatorCommand):
        await self.actuator_command_queue.put(command)
        logger.info(f"QSim received actuator command: {MessageToDict(command)}")
        self.world_model.update(command)  # Update world model based on command

If receive_actuator_command is called from synchronous code elsewhere, those call sites will need to be updated to use await and be in async functions. If you need help updating those, please provide the relevant code.

def __post_init__(self):
# Устанавливаем значения по умолчанию для mutable полей
if self.history is None:
object.__setattr__(self, 'history', tuple())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace tuple() with () (tuple-literal)

Suggested change
object.__setattr__(self, 'history', tuple())
object.__setattr__(self, 'history', ())


ExplanationThe most concise and Pythonic way to create an empty tuple is to use the ()
literal.

This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:

x = ("first", "second")

Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:

$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop

new_version = current.version + version_increment

# Новая история переходов
new_history = current.history if current.history else tuple()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace tuple() with () (tuple-literal)

Suggested change
new_history = current.history if current.history else tuple()
new_history = current.history if current.history else ()


ExplanationThe most concise and Pythonic way to create an empty tuple is to use the ()
literal.

This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:

x = ("first", "second")

Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:

$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop

sonra44 and others added 2 commits August 18, 2025 08:24
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant