Skip to content

[Feature🚀]Implement ConsumerProgressSubCommand for querying consumer progress and speed #5645

@mxsm

Description

@mxsm

Feature Description

Implement the consumerProgress command that allows users to query consumer group's consumption progress, speed, and lag metrics. This command should display broker offset, consumer offset, diff, inflight messages, and last consumption time for each topic partition.

Problem/Motivation

Currently, rocketmq-rust lacks the ability to monitor consumer group progress and consumption lag. This is essential for:

  • Monitoring consumer health and performance
  • Identifying slow or stuck consumers
  • Detecting message accumulation issues
  • Troubleshooting consumption problems

Proposed Solution

Create a new command under rocketmq-tools/src/commands/consumer/ with the following structure:

// rocketmq-tools/src/commands/consumer/consumer_progress.rs

pub struct ConsumerProgressCommand {
    admin_ext: DefaultMQAdminExt,
}

impl ConsumerProgressCommand {
    pub fn new() -> Self {
        // Initialize admin client
    }
    
    pub async fn execute(&self, opts: ConsumerProgressOptions) -> Result<()> {
        // Query consumer stats from nameserver/broker
        // Display: Topic, Broker Name, QID, Broker Offset, Consumer Offset, Diff, Inflight, LastTime
    }
}

pub struct ConsumerProgressOptions {
    pub consumer_group: Option<String>,
    pub cluster_name: Option<String>,
    pub topic_name: Option<String>,
    pub show_client_ip: bool,
    pub namesrv_addr: String,
}

Alternatives

  • Could implement as part of a monitoring dashboard
  • Could export metrics to Prometheus instead of CLI display

Additional Context

  • Java reference: ConsumerProgressSubCommand.java
  • Command name: consumerProgress
  • Command description: "Query consumer's progress, speed"
  • Key metrics: broker offset, consumer offset, lag (diff), inflight messages

Metadata

Metadata

Labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions