Skip to content

Ограничение истории FSM и очередей#24

Open
sonra44 wants to merge 1 commit intomainfrom
codex/implement-max_history-in-fsmsnapshotdto-6m3hbc
Open

Ограничение истории FSM и очередей#24
sonra44 wants to merge 1 commit intomainfrom
codex/implement-max_history-in-fsmsnapshotdto-6m3hbc

Conversation

@sonra44
Copy link
Owner

@sonra44 sonra44 commented Aug 18, 2025

Summary

  • Реализовано ограничение длины истории в FsmSnapshotDTO.add_transition
  • QSimService переведён на asyncio.Queue с обработкой переполнения
  • StateStore теперь чистит неактивных подписчиков и корректно ведёт версии/метрики
  • Добавлены тесты на отсечение старой истории

Testing

  • pytest QIKI_DTMP/services/q_core_agent/state/tests/test_types.py -q
  • pytest QIKI_DTMP/services/q_core_agent/state/tests/test_store.py -q

https://chatgpt.com/codex/tasks/task_e_68a27e4b71848331af1d95c7f02e0293

Summary by Sourcery

Implement history length restriction for FSM snapshots, migrate simulation queues in QSimService to asyncio with drop-on-full logic, enhance AsyncStateStore subscriber management and version/metrics enforcement, and introduce comprehensive unit tests for the FSM types and state store.

New Features:

  • Limit FSM transition history length in FsmSnapshotDTO.add_transition
  • Switch QSimService to asyncio.Queue with overflow handling for sensor and actuator queues
  • Auto-remove inactive subscriber queues on overflow in AsyncStateStore

Enhancements:

  • Enhance AsyncStateStore with version enforcement, metrics tracking, and subscriber cleanup
  • Refactor FSM DTO module with helper functions initial_snapshot, create_transition, and next_snapshot

Tests:

  • Add extensive unit tests for FSM DTO types covering immutability, snapshot creation, transitions, metadata, history limits, and edge cases
  • Add unit tests for AsyncStateStore subscription, versioning, metrics, and overflow behaviors

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Aug 18, 2025

Reviewer's Guide

This PR implements history length limitation for FSM snapshots, refactors AsyncStateStore to use asyncio.Queue with overflow handling and subscriber cleanup, migrates QSimService to bounded asyncio queues with drop-on-full logging, and adds comprehensive unit tests covering types, snapshot behaviors, and store functionality.

Sequence diagram for subscriber notification and cleanup in AsyncStateStore

sequenceDiagram
    participant StateStore
    participant Subscriber
    loop For each subscriber
        StateStore->>Subscriber: queue.put_nowait(snap)
        alt Queue full
            StateStore->>StateStore: Remove subscriber from list
        end
    end
Loading

Sequence diagram for QSimService queue overflow handling

sequenceDiagram
    participant QSimService
    participant WorldModel
    participant SensorDataQueue
    participant ActuatorCommandQueue
    QSimService->>WorldModel: step(delta_time)
    QSimService->>QSimService: generate_sensor_data()
    QSimService->>SensorDataQueue: put_nowait(sensor_data)
    alt SensorDataQueue full
        QSimService->>QSimService: Log warning, drop data
    end
    QSimService->>ActuatorCommandQueue: put_nowait(command)
    alt ActuatorCommandQueue full
        QSimService->>QSimService: Log warning, drop command
    end
Loading

Class diagram for FsmSnapshotDTO and TransitionDTO with history limitation

classDiagram
    class FsmState {
        <<enum>>
        UNSPECIFIED
        BOOTING
        IDLE
        ACTIVE
        ERROR_STATE
        SHUTDOWN
    }
    class TransitionStatus {
        <<enum>>
        UNSPECIFIED
        SUCCESS
        FAILED
        PENDING
    }
    class TransitionDTO {
        +from_state: FsmState
        +to_state: FsmState
        +trigger_event: str
        +status: TransitionStatus
        +error_message: str
        +ts_mono: float
        +ts_wall: float
    }
    class FsmSnapshotDTO {
        +version: int
        +state: FsmState
        +reason: str
        +ts_mono: float
        +ts_wall: float
        +snapshot_id: str
        +prev_state: FsmState
        +fsm_instance_id: str
        +source_module: str
        +attempt_count: int
        +history: Tuple[TransitionDTO]
        +context_data: Dict[str, str]
        +state_metadata: Dict[str, str]
        +add_transition(transition: TransitionDTO, max_history: int): FsmSnapshotDTO
    }
    FsmSnapshotDTO "*" -- "*" TransitionDTO : history
    TransitionDTO --> FsmState
    TransitionDTO --> TransitionStatus
    FsmSnapshotDTO --> FsmState
Loading

Class diagram for AsyncStateStore with subscriber queue management

classDiagram
    class AsyncStateStore {
        -_lock: asyncio.Lock
        -_snap: FsmSnapshotDTO
        -_subscribers: List[asyncio.Queue]
        -_subscriber_ids: Dict[int, str]
        -_metrics: Dict[str, Any]
        +get(): FsmSnapshotDTO
        +get_with_meta(): tuple[FsmSnapshotDTO, Dict[str, Any]]
        +set(new_snap: FsmSnapshotDTO, enforce_version: bool): FsmSnapshotDTO
        +subscribe(subscriber_id: str): asyncio.Queue
        +unsubscribe(queue: asyncio.Queue)
        +initialize_if_empty(): FsmSnapshotDTO
        +get_metrics(): Dict[str, Any]
        +health_check(): Dict[str, Any]
        -_notify_subscribers(snap: FsmSnapshotDTO)
    }
    AsyncStateStore --> FsmSnapshotDTO
    AsyncStateStore --> asyncio.Queue
Loading

Class diagram for QSimService with bounded asyncio queues

classDiagram
    class QSimService {
        +config: Dict[str, Any]
        +world_model: WorldModel
        +sensor_data_queue: asyncio.Queue
        +actuator_command_queue: asyncio.Queue
        +generate_sensor_data(): SensorReading
        +receive_actuator_command(command: ActuatorCommand)
        +run()
        +step()
    }
    QSimService --> WorldModel
    QSimService --> asyncio.Queue
    QSimService --> SensorReading
    QSimService --> ActuatorCommand
Loading

File-Level Changes

Change Details Files
Enforce limit on FSM transition history in snapshots
  • add max_history parameter to add_transition
  • trim oldest entries when history exceeds limit
  • preserve immutability by returning new snapshot
QIKI_DTMP/services/q_core_agent/state/types.py
Refactor AsyncStateStore to async pub/sub with queue overflow handling
  • replace raw subscriber lists with asyncio.Queue(maxsize)
  • on QueueFull remove inactive subscribers
  • update and expose metrics for subscriber count and version conflicts
  • ensure safe concurrent access with asyncio.Lock
QIKI_DTMP/services/q_core_agent/state/store.py
Migrate QSimService to asyncio.Queue with bounded size and overflow logging
  • initialize sensor and actuator queues with configurable maxsize
  • use put_nowait and catch QueueFull to drop items
  • log warnings when data or commands are dropped
QIKI_DTMP/services/q_sim_service/main.py
Add comprehensive unit tests for FSM DTOs and StateStore
  • tests for immutability and UUID generation of DTOs
  • tests for snapshot creation, next_snapshot logic, and history truncation
  • tests for AsyncStateStore metrics, subscriber cleanup, and version enforcement
QIKI_DTMP/services/q_core_agent/state/tests/test_types.py
QIKI_DTMP/services/q_core_agent/state/tests/test_store.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `QIKI_DTMP/services/q_sim_service/main.py:51` </location>
<code_context>
+            logger.info(f"QSim received actuator command: {MessageToDict(command)}")
+            self.world_model.update(command)  # Update world model based on command
+
+    def run(self):
+        logger.info("QSimService started.")
+        try:
+            while True:
+                self.step() # Call the new step method
+                time.sleep(self.config.get("sim_tick_interval", 1))
+        except KeyboardInterrupt:
+            logger.info("QSimService stopped by user.")
+
</code_context>

<issue_to_address>
Blocking run method uses time.sleep in a synchronous loop.

Refactor run to be async and replace time.sleep with await asyncio.sleep to prevent blocking the event loop and improve compatibility with async code.

Suggested implementation:

```python
    async def run(self):
        logger.info("QSimService started.")
        try:
            while True:
                self.step() # Call the new step method
                await asyncio.sleep(self.config.get("sim_tick_interval", 1))
        except KeyboardInterrupt:
            logger.info("QSimService stopped by user.")

```

If there are places in your codebase where `run()` is called directly (e.g., `service.run()`), you will need to update those calls to use `await service.run()` or schedule it with `asyncio.create_task(service.run())` depending on your application's async structure.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +51 to +57
def run(self):
logger.info("QSimService started.")
try:
while True:
self.step() # Call the new step method
time.sleep(self.config.get("sim_tick_interval", 1))
except KeyboardInterrupt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Blocking run method uses time.sleep in a synchronous loop.

Refactor run to be async and replace time.sleep with await asyncio.sleep to prevent blocking the event loop and improve compatibility with async code.

Suggested implementation:

    async def run(self):
        logger.info("QSimService started.")
        try:
            while True:
                self.step() # Call the new step method
                await asyncio.sleep(self.config.get("sim_tick_interval", 1))
        except KeyboardInterrupt:
            logger.info("QSimService stopped by user.")

If there are places in your codebase where run() is called directly (e.g., service.run()), you will need to update those calls to use await service.run() or schedule it with asyncio.create_task(service.run()) depending on your application's async structure.

def __post_init__(self):
# Устанавливаем значения по умолчанию для mutable полей
if self.history is None:
object.__setattr__(self, 'history', tuple())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace tuple() with () (tuple-literal)

Suggested change
object.__setattr__(self, 'history', tuple())
object.__setattr__(self, 'history', ())


ExplanationThe most concise and Pythonic way to create an empty tuple is to use the ()
literal.

This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:

x = ("first", "second")

Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:

$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop

new_version = current.version + version_increment

# Новая история переходов
new_history = current.history if current.history else tuple()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace if-expression with or (or-if-exp-identity)

Suggested change
new_history = current.history if current.history else tuple()
new_history = current.history or tuple()


ExplanationHere we find ourselves setting a value if it evaluates to True, and otherwise
using a default.

The 'After' case is a bit easier to read and avoids the duplication of
input_currency.

It works because the left-hand side is evaluated first. If it evaluates to
true then currency will be set to this and the right-hand side will not be
evaluated. If it evaluates to false the right-hand side will be evaluated and
currency will be set to DEFAULT_CURRENCY.

new_version = current.version + version_increment

# Новая история переходов
new_history = current.history if current.history else tuple()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace tuple() with () (tuple-literal)

Suggested change
new_history = current.history if current.history else tuple()
new_history = current.history if current.history else ()


ExplanationThe most concise and Pythonic way to create an empty tuple is to use the ()
literal.

This fits in with the way we create tuples with items, saving a bit of
mental energy that might be taken up with thinking about two different ways of
creating tuples:

x = ("first", "second")

Doing things this way has the added advantage of being a nice little performance
improvement. Here are the timings before and after the change:

$ python3 -m timeit "tuple()"
10000000 loops, best of 5: 22.6 nsec per loop
$ python3 -m timeit "()"
50000000 loops, best of 5: 5.46 nsec per loop

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant