feat: Implement Processor and Queue Infrastructure (Issue #6)#16
Merged
feat: Implement Processor and Queue Infrastructure (Issue #6)#16
Conversation
- Split spec.md (698 lines) into 4 focused files: - spec.md (434 lines) - High-level overview, user stories, success criteria - api.md (117 lines) - Endpoints, request/response formats, examples - data-models.md (169 lines) - RawEvent, TypedEvent schemas, normalization - architecture.md (365 lines) - Components, configuration, future plans - Added queue architecture config (EVENTKIT_QUEUE_MODE) with future modes - Added BUFFER_MAX_SIZE to config table - Expanded 'Future: Queue Architecture' section with implementation roadmap Benefits: - Easier to navigate and reference specific sections - Clear separation of concerns (what vs how vs API) - Can link to individual docs (e.g., share API reference) - Prevents monolithic spec files as project grows
Updated Phase 4 (Processing Pipeline) to document queue-agnostic design: - Renamed Processor.enqueue() → process_event() for clarity - Added Processor.start()/stop() lifecycle methods - Added EventQueue Protocol (base.py) - Added DirectQueue implementation (direct.py) - Added queue factory pattern (factory.py) - Updated file structure to include queues/ directory Key design decisions: - Processor is queue-agnostic (doesn't know about queues) - process_event() is called by queues (DirectQueue, AsyncQueue, PubSubQueue) - EventQueue Protocol enables easy swapping via config (EVENTKIT_QUEUE_MODE) This enables swapping queue implementations: - DirectQueue: Inline processing (no actual queue) - AsyncQueue: In-process workers with asyncio.Queue - PubSubQueue: Distributed workers with GCP Pub/Sub
Implementation: - Add EventQueue Protocol (queues/base.py) for pluggable queue backends - Add DirectQueue (queues/direct.py) for inline processing - Add queue factory (queues/factory.py) with config-based creation - Add QueueMode enum to config.py (DIRECT, ASYNC, PUBSUB) - Add Processor class (processing/processor.py) with queue-agnostic design Tests (13 new tests, all passing): - test_direct.py: DirectQueue lifecycle and event processing - test_factory.py: Queue factory mode selection - test_processor.py: Processor orchestration, error handling, lifecycle Coverage: - Processor: 100% (24/24 statements) - DirectQueue: 100% (11/11 statements) - Queue Factory: 92% (12/13 statements) - Overall: 98% coverage (500 statements, 11 missing) Key Design: - Processor.process_event() is called by queues (queue-agnostic) - Processor wires: Adapter → Sequencer → Buffer → Stores - DirectQueue processes inline (no actual queue) - Queue factory enables swapping via EVENTKIT_QUEUE_MODE env var Fixes: - Fix EventAdapter Protocol to accept RawEvent (not dict) - Fix AdapterResult usage (.ok not .success) - Fix ErrorStore method call (store_error not write_error) All 128 unit tests passing ✅
Implementation: - Add AsyncQueue (queues/async_queue.py) with asyncio.Queue - Background workers pull events and call processor.process_event() - Configurable worker count via EVENTKIT_ASYNC_WORKERS (default: 4) - Graceful shutdown with queue draining - Update factory to create AsyncQueue for ASYNC mode Tests (8 new tests, all passing): - test_async.py: Queue operations, workers, error handling, shutdown - test_factory.py: Updated to test AsyncQueue creation Coverage: - AsyncQueue: 96% (45 statements, 2 missing) - Overall: 98% coverage (548 statements, 13 missing) Key Features: - Fast API responses (enqueue is O(1), returns immediately) - Parallel processing with N configurable workers - Error isolation (one failed event doesn't stop workers) - Clean shutdown (waits for queue to drain) Usage: EVENTKIT_QUEUE_MODE=async EVENTKIT_ASYNC_WORKERS=8 queue = create_queue(processor, settings) await queue.start() # Starts 8 workers await queue.enqueue(event) # Returns immediately await queue.stop() # Drains queue, then stops All 136 unit tests passing ✅
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements the core Processor and Queue infrastructure for eventkit, providing a queue-agnostic event processing pipeline with two queue implementations.
Implementation
Core Components
Queue Factory
EVENTKIT_QUEUE_MODEdirect,async,pubsub(future)Configuration
EVENTKIT_QUEUE_MODE: Queue backend selection (default:direct)EVENTKIT_ASYNC_WORKERS: Number of workers for AsyncQueue (default: 4)Tests
All 136 unit tests passing
New Tests (21 tests)
Coverage
Key Features
DirectQueue
AsyncQueue
Usage
Architecture Decisions
processor.process_event()Files Changed
src/eventkit/processing/processor.py: Main processor orchestrationsrc/eventkit/queues/base.py: EventQueue Protocolsrc/eventkit/queues/direct.py: DirectQueue implementationsrc/eventkit/queues/async_queue.py: AsyncQueue implementationsrc/eventkit/queues/factory.py: Queue factorysrc/eventkit/config.py: Queue configuration settingstests/unit/processing/test_processor.py: Processor teststests/unit/queues/test_*.py: Queue testsCloses #6