-
Notifications
You must be signed in to change notification settings - Fork 244
[Feature🚀] Implement CleanBrokerData Request Handler #5536
Copy link
Copy link
Open
Labels
Difficulty level/HardHard ISSUEHard ISSUEfeature🚀Suggest an idea for this project.Suggest an idea for this project.help wantedExtra attention is neededExtra attention is needed
Description
Feature Request: Implement CleanBrokerData Request Handler
Feature Description
Implement the handle_clean_broker_data method in ControllerRequestProcessor to handle requests for cleaning broker data from the controller. This feature is used when a broker is permanently removed from the cluster or during maintenance operations.
Problem/Motivation
Currently, the handle_clean_broker_data method is unimplemented. Cleaning broker data is essential for:
- Removing metadata of decommissioned brokers
- Cleaning up after broker failures that won't recover
- Maintaining accurate cluster state (avoiding stale broker entries)
- Freeing resources associated with offline brokers
- Ensuring monitoring and management tools have accurate data
Without this implementation, decommissioned broker metadata accumulates in the controller, causing:
- Inaccurate cluster size and topology reporting
- Wasted resources tracking dead brokers
- Confusion in monitoring dashboards
- Potential issues during broker ID reallocation
Proposed Solution
Location
- File:
rocketmq-controller/src/processor/controller_request_processor.rs - Method:
handle_clean_broker_data(lines 416-429) - Request Code:
RequestCode::CleanBrokerData
Implementation Steps
-
Decode Request Header:
- Create/use
CleanBrokerDataRequestHeaderinrocketmq-remoting/protocol/header/controller/ - Header should contain:
cluster_name: The cluster namebroker_name: The broker group name to cleanbroker_addr: Optional specific broker address- Or could specify broker_id
- Create/use
-
Validate Request:
- Check if broker exists
- Verify broker is offline (hasn't sent heartbeat recently)
- Optionally require admin permissions
-
Clean Broker Data via Controller:
- Call
self.controller_manager.controller().clean_broker_data(&request_header).await - This requires Raft consensus as it modifies cluster state
- Remove from:
- ReplicasInfoManager (replica topology)
- Heartbeat manager (heartbeat records)
- Broker ID allocation records
- Any other broker metadata
- Call
-
Return Response:
- Return success response
- Optionally include details of what was cleaned
Example Implementation Pattern
async fn handle_clean_broker_data(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
use rocketmq_error::RocketMQError;
use rocketmq_remoting::protocol::header::controller::clean_broker_data_request_header::CleanBrokerDataRequestHeader;
// Decode request header
let request_header = request
.decode_command_custom_header::<CleanBrokerDataRequestHeader>()
.map_err(|e| {
RocketMQError::request_header_error(format!(
"Failed to decode CleanBrokerDataRequestHeader: {:?}", e
))
})?;
// Validate broker is offline
let heartbeat_info = self.heartbeat_manager
.get_broker_heartbeat(&request_header.broker_name, request_header.broker_id);
if let Some(info) = heartbeat_info {
if !info.is_timed_out() {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
"Cannot clean data for online broker".to_string(),
)
));
}
}
// Forward to controller for consensus and cleanup
self.controller_manager
.controller()
.clean_broker_data(&request_header)
.await
}Alternatives
- Could implement automatic cleanup based on timeout, but explicit cleanup gives more control
- Could use soft-delete with retention period before permanent cleanup
- Could require multiple confirmations for safety
Additional Context
Dependencies Required
-
Request Header Type:
- Create
CleanBrokerDataRequestHeaderinrocketmq-remoting/src/protocol/header/controller/ - Fields:
cluster_name: Stringbroker_name: Stringbroker_addr: Option<String>- specific broker or entire groupbroker_id: Option<u64>- specific broker ID
- Create
-
Controller Method:
- Implement
clean_broker_data(&self, header: &CleanBrokerDataRequestHeader) -> RocketMQResult<Option<RemotingCommand>> - Requires Raft consensus for state changes
- Should clean from multiple subsystems
- Implement
-
State Cleanup:
- Remove from
ReplicasInfoManager- replica sets and metadata - Remove from
BrokerHeartbeatManager- heartbeat records - Remove from broker ID allocation tracking
- Remove from any caches or indices
- Remove from
-
Safety Checks:
- Verify broker is actually offline
- Prevent cleaning active brokers
- Optionally require timeout period before cleanup
Related Code
- Related to
handle_broker_heartbeatwhich tracks broker liveness - Related to
handle_register_brokerwhich adds broker data - Opposite operation to registration
- Uses
DefaultBrokerHeartbeatManagerto check broker status
Use Cases
- Decommissioning: Permanently removing a broker from the cluster
- Maintenance: Cleaning up after failed broker replacement
- Testing: Cleaning up test brokers
- Recovery: Removing corrupted broker entries
- Scale down: Reducing cluster size
Safety Considerations
- Critical operation: Should verify broker is offline
- Prevent accidents: Should not allow cleaning online brokers
- Audit logging: Should log all cleanup operations
- Reversibility: Consider whether cleanup should be reversible
- Access control: Should require admin permissions
Implementation Notes
- This is a state-changing operation requiring Raft consensus
- Should wait for consensus with timeout (WAIT_TIMEOUT_SECONDS)
- May trigger notifications to other brokers about topology changes
- Should be idempotent (cleaning already-cleaned broker is OK)
Edge Cases
- Broker comes online during cleanup: Reject operation
- Broker is master: May need to trigger election first
- Broker in ISR: May need to update sync state sets
- Last broker in group: Handle group deletion
References
- Apache RocketMQ Java implementation:
CleanBrokerDataRequestHeaderand processor - Current file: controller_request_processor.rs
- Heartbeat manager: default_broker_heartbeat_manager.rs
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
Difficulty level/HardHard ISSUEHard ISSUEfeature🚀Suggest an idea for this project.Suggest an idea for this project.help wantedExtra attention is neededExtra attention is needed