Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 18, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Nov 18, 2025 11:53pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 18, 2025 11:53pm
rivet-inspector Ignored Ignored Preview Nov 18, 2025 11:53pm
rivet-site Ignored Ignored Preview Nov 18, 2025 11:53pm

Copy link
Contributor Author

MasterPtato commented Nov 18, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 18, 2025

Pull Request Review: Batch Listen Feature

This PR introduces batch signal listening functionality to the workflow engine, allowing workflows to receive multiple signals in a single operation. This is a significant enhancement that can improve throughput and reduce overhead when dealing with multiple signals.

Summary

Changes: 1,215 additions, 732 deletions across 16 files
Core Feature: Transforms single-signal listening to batch-based signal processing


Positive Aspects

Architecture & Design

  • Well-structured approach: The batch listening is implemented at multiple layers (trait, context, database) with proper abstraction
  • Backwards compatibility: The single listen() method is preserved and now delegates to listen_n() with limit=1, maintaining existing API contracts
  • Consistent pattern: Both listen_with_timeout and listen_until variants support batch operations

Code Quality

  • Type safety: The Listen trait now returns Vec<Self>, enforcing that all signal types support batch operations
  • History tracking: New SignalsEvent properly tracks multiple signals with coordinated signal_ids, names, and bodies vectors
  • Error handling: Maintains existing error patterns while adapting to batch scenarios

Critical Issues

1. Breaking Change: Macro-generated Listen Implementation

Location: engine/packages/gasoline-macros/src/lib.rs:375-381

The #[signal] macro now generates a Listen implementation that returns Vec<Self>:

async fn listen(ctx: &mut gas::prelude::ListenCtx, limit: usize) -> gas::prelude::WorkflowResult<Vec<Self>> {
    ctx
        .listen_any(&[<Self as gas::signal::Signal>::NAME], limit)
        .await?
        .into_iter()
        .map(|signal| Self::parse(&signal.signal_name, &signal.body))
        .collect()
}

Issue: For a single signal type, this will return a Vec containing at most limit signals of the same type. However, if only 1 signal is available, it returns a 1-element vec. The consumer must now handle this differently.

Impact: All workflows using ctx.listen::<SomeSignal>() will need updates, or you need migration logic.

Recommendation: Document this breaking change clearly and consider:

  • Providing a migration guide for existing workflows
  • Adding deprecation warnings if keeping old single-signal methods temporarily

2. Inconsistent Limit Behavior

Location: engine/packages/gasoline/src/ctx/listen.rs:40-49

pub async fn listen_any(
    &mut self,
    signal_names: &[&'static str],
    limit: usize,
) -> WorkflowResult<Vec<SignalData>> {
    // ...
    if signals.is_empty() {
        return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
    }

Issue: The function fetches up to limit signals across ALL signal names, but the limit is applied per signal name in the database layer (line 1953 in db/kv/mod.rs). This creates ambiguity:

  • Does limit=5 mean 5 total signals, or 5 per signal type?
  • The current implementation fetches up to limit per signal type, then flattens them

Example Problem:

// Listening for 2 signal types with limit=5
// Could return up to 10 signals (5 per type)!
ctx.listen_any(&["signal_a", "signal_b"], 5).await?

Recommendation:

  • Clarify the limit semantics in documentation
  • Consider renaming to limit_per_type or enforcing a global limit by taking only the first N after flattening

3. Missing Signal ID Tracking

Location: engine/packages/gasoline/src/history/event.rs:210-214

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

Issue: The public SignalsEvent struct in the event history doesn't include signal_ids, but the debug version does (line 1535 in db/kv/debug.rs). This makes it impossible to:

  • Correlate signals to their original IDs in user code
  • Debug which specific signals were processed
  • Implement proper idempotency checks

Recommendation: Add pub signal_ids: Vec<Id> to the public SignalsEvent struct.

4. Data Consistency Risk in History

Location: engine/packages/gasoline/src/db/kv/debug.rs:1542-1577

The TryFrom<WorkflowHistoryEventBuilder> for SignalsEvent checks that vectors are non-empty but doesn't validate they have the same length:

Ok(SignalsEvent {
    signal_ids: if value.indexed_signal_ids.is_empty() {
        return Err(WorkflowError::MissingEventData("signal_id"));
    } else {
        value.indexed_signal_ids
    },
    names: if value.indexed_names.is_empty() {
        return Err(WorkflowError::MissingEventData("name"));
    } else {
        value.indexed_names
    },
    bodies: if value.indexed_input_chunks.is_empty() {
        // ...
    }
})

Issue: If the vectors have different lengths (due to corruption or partial writes), this could cause panics or incorrect signal matching.

Recommendation: Add validation:

ensure!(
    value.indexed_signal_ids.len() == value.indexed_names.len() 
        && value.indexed_names.len() == value.indexed_input_chunks.len(),
    "signal event data vectors must have equal length"
);

Moderate Issues

5. Error Context Loss

Location: engine/packages/gasoline/src/ctx/workflow.rs:707

pub async fn listen<T: Listen>(&mut self) -> Result<T> {
    let signals = self.listen_n::<T>(1).in_current_span().await?;
    signals
        .into_iter()
        .next()
        .context("must return at least 1 signal")
}

Issue: The error message "must return at least 1 signal" is generic and doesn't indicate which signal type was expected. This makes debugging harder.

Recommendation:

.with_context(|| format!("must return at least 1 signal for type {}", std::any::type_name::<T>()))

6. Metrics Removed Without Explanation

Location: engine/packages/gasoline/src/ctx/listen.rs (removed lines 28-42)

The SIGNAL_PULL_DURATION metric was removed, but it provided valuable insight into database performance.

Recommendation: Either restore the metric with batch-aware labels, or document why it was removed in the commit message.

7. Unused Loop Location Parameter

Location: engine/packages/gasoline/src/db/kv/mod.rs:1922

_loop_location: Option<&Location>,

Issue: The parameter is prefixed with _, indicating it's unused. If it's not needed, remove it from the trait definition.

Recommendation: Clean up unused parameters or document why they're reserved for future use.


Minor Issues & Suggestions

8. Display Implementation Inefficiency

Location: engine/packages/gasoline/src/history/event.rs:77-83

EventData::Signals(signals) => {
    let mut unique_names = signals.names.clone();
    unique_names.sort();
    unique_names.dedup();
    write!(f, "signals {:?}", unique_names.join(", "))
}

Suggestion: This allocates and clones for every display call. Consider using a HashSet or caching if displayed frequently.

9. Naming: last_attempt vs last_try

Location: engine/packages/gasoline/src/ctx/listen.rs:17

The parameter was renamed from last_try to last_attempt. While more professional, ensure all documentation references are updated.

10. Test Coverage

Observation: No new test files were added for the batch listening functionality.

Recommendation: Add tests for:

  • Batch signal reception with multiple signals
  • Limit behavior with various signal counts
  • History replay with SignalsEvent
  • Edge cases (empty results, limit=0, etc.)

Security Considerations

11. Resource Exhaustion Risk

Location: Database signal fetching logic

Issue: The limit parameter is user-controlled (via listen_n()). If a workflow requests limit=1000000, it could:

  • Consume excessive memory
  • Cause database performance issues
  • Enable DoS attacks

Recommendation:

  • Add a maximum limit constant (e.g., MAX_BATCH_SIZE = 100)
  • Document the reasonable limit range
  • Consider adding metrics for large batch requests

Performance Considerations

Positive

  • Batch operations reduce database round-trips
  • Parallel signal stream fetching (line 1941-1972 in db/kv/mod.rs) is well-designed

Potential Concerns

  • Multiple signal types with large limits could cause memory spikes
  • No pagination for very large signal backlogs

Documentation & Style

Per CLAUDE.md:

  • ✅ Uses structured logging correctly (e.g., tracing::info!(?x, "foo"))
  • ✅ Lowercase log messages
  • ✅ Proper error handling with anyhow::Result
  • ✅ Follows timestamp naming (create_ts)
  • ❌ Could use more inline documentation for complex functions like pull_next_signals

Recommendations

High Priority

  1. Clarify limit semantics - Document whether limit is per-signal-type or total
  2. Validate vector lengths in SignalsEvent deserialization
  3. Add signal_ids to public API for better debugging and idempotency
  4. Add integration tests for batch signal handling
  5. Document breaking changes for workflows using the old API

Medium Priority

  1. Restore or replace removed metrics
  2. Add maximum limit validation to prevent resource exhaustion
  3. Improve error messages with signal type information
  4. Clean up unused parameters

Low Priority

  1. Optimize Display implementation
  2. Add inline documentation for complex database operations

Conclusion

This is a well-architected feature that adds valuable batch processing capabilities to the workflow engine. The implementation is mostly solid with good separation of concerns and proper abstraction layers.

However, there are critical issues around limit semantics and data consistency that should be addressed before merging. The breaking API changes also need clear documentation and migration guidance.

Overall Assessment: Approve with requested changes

Recommendation: Address critical issues #1-4, then merge. Other issues can be follow-up PRs.


🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Nov 18, 2025

PR Review: feat(gas): add batch listen

Summary

This PR introduces batch signal listening to the Gasoline workflow engine, allowing workflows to listen for and process multiple signals at once instead of one at a time. This is a significant architectural improvement that can reduce latency and improve throughput for workflows that process multiple signals.


Code Quality & Best Practices

Strengths:

  • ✅ Good use of structured logging with tracing (e.g., tracing::debug!(?x, "foo") pattern)
  • ✅ Proper error handling with custom error types
  • ✅ Backward compatibility: Old EventType::Signal is properly converted to the new EventType::Signals format
  • ✅ Consistent naming conventions (snake_case, past tense for timestamps)
  • ✅ Good separation of concerns between database layer and workflow context

Issues & Suggestions:

  1. Missing signal_id in SignalsEvent (engine/packages/gasoline/src/history/event.rs:210-214)

    pub struct SignalsEvent {
        pub names: Vec<String>,
        pub bodies: Vec<Box<serde_json::value::RawValue>>,
    }

    The SignalsEvent struct is missing signal_ids: Vec<Id> field that appears to be used in the debug implementation and history keys. This could cause issues when replaying events or debugging.

  2. Potential ordering issue (engine/packages/gasoline/src/db/kv/mod.rs:2059-2063)

    // Sort by ts
    signals.sort_by_key(|key| key.create_ts);
    
    // Apply limit
    Ok(signals.into_iter().take(limit).collect())

    The limit is applied AFTER sorting, but signals are fetched with a limit from each signal name subspace. If you have 100 signals of type A and 1 of type B, and request limit=10, you might get all 10 from A and miss the one from B even if B is older. Consider sorting BEFORE fetching or using a priority queue approach.

  3. Deprecated metric removed (engine/packages/gasoline/src/metrics.rs)
    The SIGNAL_PULL_DURATION metric was removed but might still be valuable for performance monitoring. Consider keeping it and recording the total duration for pulling all signals in a batch.

  4. API Breaking Change
    The Listen trait now requires returning Vec<Self> instead of Self, and CustomListener trait was removed. This is a breaking change that should be:

    • Documented in a changelog/migration guide
    • Validated that all existing implementations are updated

Potential Bugs

  1. Empty signals vec guarantee (engine/packages/gasoline/src/ctx/workflow.rs:703-710)

    pub async fn listen<T: Listen>(&mut self) -> Result<T> {
        let signals = self.listen_n::<T>(1).in_current_span().await?;
    
        signals
            .into_iter()
            .next()
            .context("must return at least 1 signal")
    }

    The code assumes listen_n will never return an empty vec, but this isn't enforced by the type system. If listen_n returns empty, this will panic with "must return at least 1 signal". Consider adding an assertion in listen_n or handling this more gracefully.

  2. Race condition in listen_n_until (engine/packages/gasoline/src/ctx/workflow.rs:1154-1182)
    After a sleep is interrupted, the code checks for signals in history. However, if the history check finds HistoryResult::New instead of signals, it continues to listen for new signals. There's a potential edge case where signals could be missed between the sleep interruption and the history check.

  3. Buffer size assumption (engine/packages/gasoline/src/db/kv/mod.rs:2055)

    .buffer_unordered(1024)

    Hard-coded buffer of 1024 could be problematic if limit is much smaller or larger. Consider using buffer_unordered(limit.max(16).min(1024)) for better performance.


Performance Considerations

Improvements:

  • ✅ Batching reduces database round trips
  • ✅ Using buffer_unordered for concurrent signal processing
  • ✅ Sorting and limiting happen after fetching, avoiding multiple DB queries

Concerns:

  1. Flattening signal streams (engine/packages/gasoline/src/db/kv/mod.rs:1941-1961)

    futures_util::stream::iter(owned_filter.clone())
        .map(|signal_name| { /* ... */ })
        .flatten()

    Using .flatten() on multiple range queries could lead to uneven fetching if one signal type has many more entries than others. The current approach fetches limit from EACH signal type, then sorts and takes limit total. This could fetch more data than needed.

  2. Multiple indexed key writes (engine/packages/gasoline/src/db/kv/keys/history.rs)
    The new implementation writes separate keys for:

    • Each signal ID (IndexedSignalIdKey)
    • Each signal name (IndexedNameKey)
    • Each signal input chunk (IndexedInputChunkKey)

    This could significantly increase write amplification. For N signals with M chunks each, you're writing ~N*(2+M) keys instead of N keys. Consider benchmarking the performance impact.


Security Concerns

  1. No validation on limit parameter
    The limit parameter in listen_n is not validated. A malicious or buggy caller could pass usize::MAX which could cause:

    • Excessive memory allocation
    • Database query timeouts
    • DoS conditions

    Recommend adding: let limit = limit.min(MAX_SIGNAL_BATCH_SIZE); with a reasonable constant.

  2. Signal body size not validated
    When combining chunks into signal bodies, there's no validation that the total size is reasonable. Large signal bodies could cause OOM issues.


Test Coverage

Missing:

  • ❌ No test files modified in this PR
  • ❌ No tests for listen_n functionality
  • ❌ No tests for batch signal processing
  • ❌ No tests for the new SignalsEvent type
  • ❌ No tests for backward compatibility (old Signal → new Signals conversion)
  • ❌ No tests for limit parameter edge cases (0, 1, large numbers)
  • ❌ No tests for signal ordering when batching multiple signal types

Recommended Tests:

// Test batch listen with multiple signals
#[tokio::test]
async fn test_listen_n_multiple_signals() {
    // Send 5 signals, listen for 3, verify we get the oldest 3
}

// Test listen_n with limit larger than available signals
#[tokio::test]  
async fn test_listen_n_limit_exceeds_available() {
    // Send 2 signals, listen for 5, verify we get 2
}

// Test backward compatibility
#[tokio::test]
async fn test_old_signal_event_conversion() {
    // Load workflow with old Signal events, verify they convert to Signals
}

// Test signal ordering across types
#[tokio::test]
async fn test_listen_n_mixed_signal_types() {
    // Send signals of type A and B interleaved by time
    // Verify they're received in chronological order
}

Architecture & Design

Positive:

  • The batch approach is a good architectural decision for high-throughput scenarios
  • Clean separation between listen() (single) and listen_n() (batch) maintains backward compatibility at the API level
  • The history cursor changes properly support the new batch model

Questions:

  1. Why was CustomListener removed? Were there no use cases for it, or does batch listening subsume its functionality?
  2. Should there be a way to listen for "all available signals up to N" vs "exactly N signals or error"? The current implementation errors if fewer signals are available.

Summary & Recommendations

This is a well-structured PR that adds valuable batching capabilities to the workflow engine. However, it needs:

Must Fix:

  1. Add signal_ids field to SignalsEvent struct
  2. Add validation for limit parameter to prevent DoS
  3. Fix the ordering issue in signal fetching (fetch limit total, not limit per type)
  4. Add comprehensive test coverage

Should Fix:

  1. Consider keeping SIGNAL_PULL_DURATION metric
  2. Document the breaking changes to Listen trait
  3. Add buffer size tuning based on limit
  4. Add signal body size validation

Nice to Have:

  1. Add migration guide for users of old CustomListener trait
  2. Benchmark write amplification impact
  3. Consider exposing batch size as a configuration parameter

The core implementation is solid, but the missing tests and potential edge cases around limits and ordering need to be addressed before merging.


Generated by Claude Code

@claude
Copy link

claude bot commented Nov 18, 2025

Code Review

Summary

This PR adds batch signal listening capabilities to the Gasoline workflow engine, allowing workflows to receive multiple signals in a single operation instead of one at a time. This is a significant architectural improvement that should improve efficiency when dealing with high-volume signal processing.

Architecture Changes

Well-designed changes:

  1. Signal History Consolidation: The new SignalsEvent replaces individual signal events in history, batching multiple signals into a single event. This is more efficient and cleaner than the previous approach.

  2. Indexed Storage: The implementation uses indexed keys (IndexedSignalIdKey, IndexedNameKey, IndexedInputKey) to store multiple signals within a single history event, which is a clean database schema design.

  3. Backward Compatibility: The PR properly deprecates the old Signal event type (EventType::Signal = 1) while introducing Signals (EventType::Signals = 10), maintaining compatibility.

Code Quality

Strengths:

  1. Consistent API Design: The new listen_n() and listen_n_with_timeout() methods follow the existing naming conventions and patterns.

  2. Proper Error Handling: Maintains proper error handling throughout, including history divergence checks in compare_signals().

  3. Metrics & Observability: Maintains logging and metrics (signal receive lag tracking, etc.).

  4. Macro Updates: The #[signal] macro and join_signal\! macro properly updated to return Vec<Self> instead of single values.

Potential Issues

⚠️ Critical Issues:

1. EventType Mismatch in insert::signals_event (packages/gasoline/src/db/kv/keys/history.rs:1645)

pub fn signals_event(/* ... */) -> Result<()> {
    common(
        subspace,
        tx,
        workflow_id,
        location,
        EventType::Signal,  // ❌ Should be EventType::Signals
        version,
        create_ts,
    )
}

This appears to use EventType::Signal (the deprecated type) instead of EventType::Signals. This could cause history corruption or replay issues.

Recommended fix:

EventType::Signals,  // Use the new batch signal type

2. Missing Validation on Signal Count

The SignalsEvent struct (packages/gasoline/src/history/event.rs:211-214) stores names and bodies as separate vectors but does not enforce that they have the same length:

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

In the database deserialization code (packages/gasoline/src/db/kv/debug.rs:1545), there is a zip operation that could silently drop data if lengths do not match. Consider adding validation:

pub struct SignalsEvent {
    pub names: Vec<String>,
    pub bodies: Vec<Box<serde_json::value::RawValue>>,
}

impl SignalsEvent {
    pub fn new(names: Vec<String>, bodies: Vec<Box<serde_json::value::RawValue>>) -> Result<Self> {
        ensure\!(names.len() == bodies.len(), "signals names and bodies must have same length");
        ensure\!(\!names.is_empty(), "signals event must contain at least one signal");
        Ok(Self { names, bodies })
    }
}

⚠️ Medium Priority Issues:

3. Removed CustomListener Without Clear Migration Path

The PR removes the CustomListener trait and custom_listener() method entirely:

  • Removed from packages/gasoline/src/ctx/versioned_workflow.rs (lines 128-136)
  • Not mentioned in PR description

If any users were using CustomListener, this is a breaking change. Consider:

  1. Adding a deprecation note/migration guide
  2. Ensuring no internal code was using this functionality
  3. Documenting this breaking change

4. Potential Performance Issue: Sort After Parallel Fetch

In pull_next_signals (packages/gasoline/src/db/kv/mod.rs:2059-2063):

.buffer_unordered(1024)
.try_collect::<Vec<_>>()
.await?;

// Sort by ts
signals.sort_by_key(|key| key.create_ts);

// Apply limit
Ok(signals.into_iter().take(limit).collect())

The code fetches up to limit signals from each signal name stream in parallel, then sorts all results by timestamp, then takes limit items. This means:

  • If listening for N signal types with limit=10, you might fetch 10*N signals then discard all but 10
  • The buffer_unordered(1024) suggests very high parallelism but the final limit might waste most of that work

Recommendation: Consider limiting the total fetch earlier or documenting the tradeoff.

Performance Considerations

Good:

  • Batching signals reduces database round-trips
  • Parallel signal body fetching with buffer_unordered(1024)
  • Maintains existing polling/subscription mechanisms

⚠️ Concern:

  • The listen_n semantics could lead to workflows requesting large batches unnecessarily if not used carefully
  • No apparent safeguard against very large batch sizes (could the limit parameter be validated against a max?)

Security

No obvious security concerns

  • Maintains existing workflow isolation
  • No new injection vectors
  • Proper deserialization error handling

Test Coverage

Critical Gap: No Tests for Batch Listening

The PR adds significant new functionality but I could not find any tests for:

  • listen_n() functionality
  • listen_n_with_timeout()
  • listen_n_until()
  • Multiple signals in a single SignalsEvent
  • History replay with batched signals

Recommendation: Add tests covering:

  1. Basic listen_n(limit=5) with multiple signals arriving
  2. Behavior when fewer than limit signals arrive
  3. Correct ordering of batched signals by timestamp
  4. History replay with SignalsEvent containing multiple signals
  5. Timeout behavior with listen_n_with_timeout

Example test case:

#[workflow_test]
async fn test_batch_signal_listen() {
    // Publish 5 signals
    // ctx.listen_n::<MySignal>(10) should return all 5
    // Verify order matches timestamps
}

Additional Observations

  1. Good: The listen() method is now implemented as listen_n(1) which is a clean refactor that ensures consistency.

  2. Good: Updated print/debug utilities to handle the new SignalsEvent type.

  3. Question: The PR description is empty. Consider adding:

    • Why batch listening is needed
    • Performance improvement metrics (if available)
    • Breaking changes documentation
    • Migration guide for CustomListener users

Recommendations

Must Fix:

  1. Fix EventType::SignalEventType::Signals in insert::signals_event
  2. Add validation to SignalsEvent to ensure names.len() == bodies.len()
  3. Add comprehensive test coverage for batch listening

Should Consider:

  1. Add PR description explaining the changes and migration path
  2. Document the breaking change (removal of CustomListener)
  3. Review the fetch-all-then-limit performance pattern in pull_next_signals
  4. Add max batch size validation to prevent abuse

Nice to Have:

  1. Add examples of using listen_n in documentation
  2. Performance benchmarks showing improvement over sequential signal listening

Verdict

This is a well-architected feature that makes logical sense for the workflow engine. However, it has a critical bug (wrong EventType) and lacks test coverage. Once these issues are addressed, this will be a solid improvement.

Status: Needs Changes 🔴

The EventType bug could cause production issues with history replay, and the lack of tests makes it risky to merge. Please address the critical issues before merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants