The send() method below allows transformations and sending different messages to different nodes from the same RPC. We should consider to rename this to sendPerNode or something and create another send method that can reuse the metadata and marshaling step across all nodes.
// send dispatches requests to all nodes, applying any registered transformations.
// It updates expectedReplies based on how many nodes actually receive requests
// (nodes may be skipped if a transformation returns nil).
func (c *ClientCtx[Req, Resp]) send() {
var expected int
for _, n := range c.config {
msg := c.applyTransforms(c.request, n)
if msg == nil {
continue // Skip node if transformation returns nil
}
expected++
// Clone metadata for each request to avoid race conditions during
// concurrent marshaling when SetMessageData is called.
md := proto.CloneOf(c.md)
// Marshal the proto message into the metadata's message_data field
msgData, err := proto.Marshal(msg)
if err != nil {
continue // Skip node if marshaling fails
}
md.SetMessageData(msgData)
n.channel.enqueue(request{
ctx: c.Context,
md: md,
streaming: c.streaming,
waitSendDone: c.waitSendDone,
responseChan: c.replyChan,
})
}
c.expectedReplies = expected
}