-
Notifications
You must be signed in to change notification settings - Fork 244
[Feature🚀] Implement ControllerRegisterBroker Request Handler #5543
Description
Feature Request: Implement ControllerRegisterBroker Request Handler
Feature Description
Implement the handle_register_broker method in ControllerRequestProcessor to handle broker registration requests. This feature is the primary mechanism for brokers to register themselves with the controller and establish their presence in the cluster.
Problem/Motivation
Currently, the handle_register_broker method is unimplemented. Broker registration is fundamental for:
- Initial broker startup and cluster joining
- Broker discovery by clients and other brokers
- Establishing broker identity (ID, address, role)
- Registering broker capabilities and metadata
- Updating broker information after configuration changes
- Maintaining up-to-date broker topology in the controller
Without this implementation, brokers cannot join the cluster, making the entire controller system non-functional. This is one of the most critical operations in the controller.
Proposed Solution
Location
- File:
rocketmq-controller/src/processor/controller_request_processor.rs - Method:
handle_register_broker(lines 461-474) - Request Code:
RequestCode::ControllerRegisterBroker
Implementation Steps
-
Decode Request Header and Body:
- Use
RegisterBrokerRequestHeaderfromrocketmq-remoting/protocol/header/ - Header contains:
broker_name: Broker group namebroker_addr: Network addresscluster_name: Cluster identifierbroker_id: Broker ID (obtained via get_next_broker_id or apply_broker_id)epoch: Broker epoch for fencing
- Body may contain:
- Topic configuration list
- Filter server list
- Additional metadata
- Use
-
Validate Registration Request:
- Verify broker ID is allocated/valid
- Check broker is not already registered with different address
- Validate cluster name matches
- Verify epoch is valid for fencing
-
Register Broker via Controller:
- Call
self.controller_manager.controller().register_broker(&request_header, body).await - This requires Raft consensus as it modifies cluster state
- Controller should:
- Update ReplicasInfoManager with broker info
- Add broker to appropriate replica set
- Update sync state if needed
- Store broker metadata
- Call
-
Start Heartbeat Tracking:
- Initialize heartbeat tracking for this broker
- Set initial heartbeat timestamp
-
Notify Other Brokers:
- May need to notify other brokers in the group about new member
- Update sync state sets if this affects ISR
-
Return Response:
- Return response with registration confirmation
- Include controller leader information
- May include instructions (role assignment, sync state, etc.)
Example Implementation Pattern
async fn handle_register_broker(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
use rocketmq_error::RocketMQError;
use rocketmq_remoting::protocol::header::namesrv::register_broker_request_header::RegisterBrokerRequestHeader;
use rocketmq_remoting::protocol::body::register_broker_body::RegisterBrokerBody;
// Decode request header
let request_header = request
.decode_command_custom_header::<RegisterBrokerRequestHeader>()
.map_err(|e| {
RocketMQError::request_header_error(format!(
"Failed to decode RegisterBrokerRequestHeader: {:?}", e
))
})?;
// Decode request body if present
let body = if let Some(body_bytes) = request.body() {
Some(RegisterBrokerBody::decode(body_bytes)?)
} else {
None
};
// Validate broker ID is allocated
// (broker should have called get_next_broker_id or apply_broker_id first)
// Register broker via controller (requires consensus)
let response = self.controller_manager
.controller()
.register_broker(&request_header, body)
.await?;
// Initialize heartbeat tracking
self.heartbeat_manager
.on_broker_register(&request_header)
.await?;
Ok(response)
}Alternatives
- Could use service discovery systems (Consul, etcd) instead of controller-based registration
- Could implement peer-to-peer discovery without central controller
- Could use multicast for broker discovery
Additional Context
Dependencies Required
-
Request Header Type:
RegisterBrokerRequestHeadershould already exist inrocketmq-remoting/src/protocol/header/namesrv/- May need controller-specific version with additional fields:
epoch: u64- for fencingenable_acting_master: bool- for auto-promotion
-
Request Body Type:
RegisterBrokerBodyinrocketmq-remoting/src/protocol/body/- Contains:
- Topic configuration data
- Filter server list
- Broker capabilities
-
Controller Method:
- Implement
register_broker(&self, header: &RegisterBrokerRequestHeader, body: Option<RegisterBrokerBody>) -> RocketMQResult<Option<RemotingCommand>> - Requires Raft consensus for state changes
- Updates multiple state components
- Implement
-
State Updates:
- ReplicasInfoManager: Add/update broker in replica set
- BrokerHeartbeatManager: Start tracking heartbeats
- SyncStateSet: Update ISR if needed
- Broker metadata: Store broker info for queries
Related Code
- Follows
handle_get_next_broker_idorhandle_apply_broker_idfor ID allocation - Precedes
handle_broker_heartbeatfor liveness tracking - Related to
handle_elect_masterfor role assignment - Related to
handle_alter_sync_state_setfor ISR management - Opposite of
handle_clean_broker_datafor deregistration
Registration Flow
Broker Startup:
1. Broker determines broker_name and cluster_name from config
2. Broker calls get_next_broker_id (new) or apply_broker_id (restart)
3. Controller allocates broker_id
4. Broker calls register_broker with allocated ID
5. Controller adds broker to cluster state
6. Broker begins sending heartbeats
7. Broker participates in replication and serving traffic
Use Cases
- Initial broker startup: Brand new broker joining cluster
- Broker restart: Broker rejoining after restart
- Broker migration: New instance replacing old broker
- Configuration update: Broker re-registering after config change
- Cluster scaling: Adding capacity to existing cluster
Registration Information
Key data registered:
- Identity: cluster_name, broker_name, broker_id
- Network: broker_addr (ip:port)
- Role: Master (ID=0) or Slave (ID>0)
- Epoch: For fencing stale brokers
- Capabilities: Supported features, protocol versions
- Topics: Topic configurations (for master)
- Metadata: Arbitrary broker metadata
Epoch-Based Fencing
- Each broker maintains an epoch counter
- Epoch increments on role changes or significant events
- Controller tracks expected epoch for each broker
- Requests with stale epoch are rejected
- Prevents split-brain and stale broker operations
Implementation Notes
- This is a state-changing operation requiring Raft consensus
- Should be idempotent (re-registering same broker = update)
- May need to handle role changes (slave -> master promotion)
- Should validate broker is not creating conflicts
- Response should include controller's view of broker's role
Response Contents
Typical response includes:
- Success/failure status
- Controller leader info: For future requests
- Assigned role: Master or slave
- Sync state: Current ISR membership
- Configuration: Any controller-managed config
- Cluster topology: Other brokers in group
Edge Cases
- Broker registers with ID not allocated: Reject
- Broker registers with different address than before: Update or reject?
- Master (ID=0) registers when master exists: Election required?
- Broker registers during controller failover: Handle gracefully
- Concurrent registrations: Raft ensures consistency
Security Considerations
- Validate broker has authority to use this ID
- Verify cluster_name matches controller's cluster
- Prevent malicious broker registration
- Consider authentication/authorization
- Audit log all registrations
References
- Apache RocketMQ Java implementation:
RegisterBrokerRequestHeaderand processor - Current file: controller_request_processor.rs
- Related: GetNextBrokerId, ApplyBrokerId, BrokerHeartbeat