Skip to content

Feature Request: Enable manual heartbeat / connection access inside Consumer callback #73

@jeancz

Description

@jeancz

Feature Request: RabbitMQ Connection Management in Consumer Callbacks

Problem Description

I am encountering issues when processing long-running tasks within a Consumer callback. Since PHP is single-threaded and the processing is synchronous, the main loop is blocked during the execution of the user callback.

If the processing time exceeds the heartbeat timeout, the RabbitMQ connection is dropped by the server because the client fails to send heartbeats.

Currently, the callback signature only receives the Message object:

public function consume(Message $message): int

There is no clean way to access the underlying Connection or Queue instance to manually trigger sendHeartbeat() from within the loop of a heavy task.

Workarounds involve:

  1. Injecting QueueFactory into the Consumer service (Service Locator anti-pattern).
  2. Using reflection to access the connection.

Describe the solution you'd like

I propose providing access to the IQueue or IConnection instance directly within the callback, allowing users to manually maintain the connection alive during heavy processing.

Also, please ensure that sendHeartbeat() is added to the Contributte\RabbitMQ\Connection\IConnection interface (it currently exists in the Client implementation but seems missing from the interface).

Context Object

Introduce a ConsumptionContext object that wraps the message and provides utility methods. This is a cleaner, more extensible approach.

Library change:

class ConsumptionContext
{
    public function __construct(
        private Message $message,
        private IQueue $queue
    ) {}

    public function getMessage(): Message { ... }
    
    public function heartbeat(): void 
    {
        $this->queue->getConnection()->sendHeartbeat();
    }
}

Userland usage:

public function consume(ConsumptionContext $context): int
{
    $msg = $context->getMessage();

    $largeDataSet = $this-manager->getDataSetFromMsg($msg);
    
    foreach ($largeDataSet as $row) {
        $this->process($row);
        // Manually keep connection alive
        $context->heartbeat();
    }
    
    return IConsumer::MESSAGE_ACK;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    Triage: Next

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions