diff --git a/processor.go b/processor.go index 090bcbd..65e1043 100644 --- a/processor.go +++ b/processor.go @@ -135,8 +135,10 @@ func (p *FanOutProcessor) WithPipe(pipe Pipe) { // Process processes the stream nodeMessage. func (p *FanOutProcessor) Process(msg Message) error { + src, meta := msg.Metadata() for i := 0; i < p.streams; i++ { - if err := p.pipe.ForwardToChild(msg, i); err != nil { + fMsg := NewMessageWithContext(msg.Ctx, msg.Key, msg.Value).WithMetadata(src, meta) + if err := p.pipe.ForwardToChild(fMsg, i); err != nil { return err } }