Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8791c99
refactor(channel): use Metadata directly instead of Message wrapper
meling Feb 8, 2026
e5f7c3f
refactor(server): use type-safe Send/Recv instead of SendMsg/RecvMsg
meling Feb 8, 2026
62cb5bb
refactor(server): simplify response with responseWithError helper
meling Feb 8, 2026
a9ab22e
refactor: remove custom gRPC codec
meling Feb 8, 2026
2e97ff8
chore: unexport marshal and unmarshal functions
meling Feb 8, 2026
a62bb77
refactor(encoding): remove msgType from Message struct
meling Feb 8, 2026
0a8e891
refactor: remove unused NewRequest
meling Feb 8, 2026
c8bc6e9
refactor(metadata): consolidate metadata creation into NewMetadata
meling Feb 8, 2026
f7a191f
refactor: rename ordering to stream and unify message envelope
meling Feb 9, 2026
a8ce766
chore: regenerate proto files after refactor to get tests to pass
meling Feb 11, 2026
8f6d17b
refactor: move stream package to internal/stream
meling Feb 11, 2026
2138865
refactor: client interceptor to ensure consistent response handling
meling Feb 11, 2026
d1976b0
fix: multicast tests to sort responses to avoid flakiness
meling Feb 11, 2026
202d606
refactor: simplify messageWithError logic for response message creation
meling Feb 11, 2026
de50d15
doc: corrected the AsProto documentation
meling Feb 11, 2026
4d9de3c
chore: fix TestNewResponse to use mock.GetValueMethod string
meling Feb 11, 2026
a237be5
chore: removed obsolete comment in clientCtxBuilder.Build()
meling Feb 11, 2026
23f3e46
refactor: message handling in ClientCtx.send method
meling Feb 11, 2026
3139e12
refactor: add ErrorStatus method in response handling
meling Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/gorums.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pprof
proto
protobuf
protoc
protocmp
protodesc
protogen
protoimpl
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ static_files := $(shell find $(dev_path) -name "*.go" -not -name "zorums*" -no
proto_path := $(dev_path):third_party:.

plugin_deps := gorums.pb.go $(static_file)
runtime_deps := ordering/ordering.pb.go ordering/ordering_grpc.pb.go
runtime_deps := internal/stream/stream.pb.go internal/stream/stream_grpc.pb.go
benchmark_deps := benchmark/benchmark.pb.go benchmark/benchmark_gorums.pb.go

.PHONY: all dev tools bootstrapgorums installgorums benchmark test compiletests genproto benchtest bench
Expand Down
30 changes: 24 additions & 6 deletions benchmark/benchmark_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 23 additions & 19 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/relab/gorums/ordering"
"github.com/relab/gorums/internal/stream"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -44,9 +44,9 @@ var (

type request struct {
ctx context.Context
msg *Message
waitSendDone bool
msg *stream.Message
streaming bool
waitSendDone bool
responseChan chan<- NodeResponse[msg]
sendTime time.Time
}
Expand All @@ -68,8 +68,8 @@ type channel struct {

// Stream lifecycle management for FIFO ordered message delivery
// stream is a bidirectional stream for
// sending and receiving ordering.Metadata messages.
stream ordering.Gorums_NodeStreamClient
// sending and receiving stream.Message messages.
stream stream.Gorums_NodeStreamClient
streamMut sync.Mutex
streamCtx context.Context
streamCancel context.CancelFunc
Expand Down Expand Up @@ -149,12 +149,12 @@ func (c *channel) ensureConnectedNodeStream() (err error) {
return nil
}
c.streamCtx, c.streamCancel = context.WithCancel(c.connCtx)
c.stream, err = ordering.NewGorumsClient(c.conn).NodeStream(c.streamCtx)
c.stream, err = stream.NewGorumsClient(c.conn).NodeStream(c.streamCtx)
return err
}

// getStream returns the current stream, or nil if no stream is available.
func (c *channel) getStream() grpc.ClientStream {
func (c *channel) getStream() stream.Gorums_NodeStreamClient {
c.streamMut.Lock()
defer c.streamMut.Unlock()
return c.stream
Expand All @@ -180,7 +180,7 @@ func (c *channel) isConnected() bool {
func (c *channel) enqueue(req request) {
if req.responseChan != nil {
req.sendTime = time.Now()
msgID := req.msg.GetMessageID()
msgID := req.msg.GetMessageSeqNo()
c.responseMut.Lock()
c.responseRouters[msgID] = req
c.responseMut.Unlock()
Expand All @@ -190,7 +190,7 @@ func (c *channel) enqueue(req request) {
select {
case <-c.connCtx.Done():
// the node's close() method was called: respond with error instead of enqueueing
c.routeResponse(req.msg.GetMessageID(), NodeResponse[msg]{NodeID: c.id, Err: nodeClosedErr})
c.routeResponse(req.msg.GetMessageSeqNo(), NodeResponse[msg]{NodeID: c.id, Err: nodeClosedErr})
return
case c.sendQ <- req:
// enqueued successfully
Expand Down Expand Up @@ -252,11 +252,11 @@ func (c *channel) sender() {
// take next request from sendQ
}
if err := c.ensureStream(); err != nil {
c.routeResponse(req.msg.GetMessageID(), NodeResponse[msg]{NodeID: c.id, Err: err})
c.routeResponse(req.msg.GetMessageSeqNo(), NodeResponse[msg]{NodeID: c.id, Err: err})
continue
}
if err := c.sendMsg(req); err != nil {
c.routeResponse(req.msg.GetMessageID(), NodeResponse[msg]{NodeID: c.id, Err: err})
c.routeResponse(req.msg.GetMessageSeqNo(), NodeResponse[msg]{NodeID: c.id, Err: err})
}
}
}
Expand All @@ -279,14 +279,18 @@ func (c *channel) receiver() {
}
}

resp := newMessage(responseType)
if err := stream.RecvMsg(resp); err != nil {
c.setLastErr(err)
c.cancelPendingMsgs(err)
respMsg, e := stream.Recv()
if e != nil {
c.setLastErr(e)
c.cancelPendingMsgs(e)
c.clearStream()
} else {
err := resp.GetStatus().Err()
c.routeResponse(resp.GetMessageID(), NodeResponse[msg]{NodeID: c.id, Value: resp.GetProtoMessage(), Err: err})
err := respMsg.ErrorStatus()
var resp msg
if err == nil {
resp, err = unmarshalResponse(respMsg)
}
c.routeResponse(respMsg.GetMessageSeqNo(), NodeResponse[msg]{NodeID: c.id, Value: resp, Err: err})
}

select {
Expand Down Expand Up @@ -316,7 +320,7 @@ func (c *channel) sendMsg(req request) (err error) {
// wait for actual server responses, so waitSendDone is false for them.
if req.waitSendDone && err == nil {
// Send succeeded: unblock the caller and clean up the responseRouter
c.routeResponse(req.msg.GetMessageID(), NodeResponse[msg]{})
c.routeResponse(req.msg.GetMessageSeqNo(), NodeResponse[msg]{})
}
}()

Expand Down Expand Up @@ -353,7 +357,7 @@ func (c *channel) sendMsg(req request) (err error) {
}
}()

if err = stream.SendMsg(req.msg); err != nil {
if err = stream.Send(req.msg); err != nil {
c.setLastErr(err)
c.clearStream()
}
Expand Down
24 changes: 18 additions & 6 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/relab/gorums/internal/stream"
"github.com/relab/gorums/internal/testutils/mock"
"google.golang.org/grpc"
pb "google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -48,7 +49,10 @@ func delayServerFn(delay time.Duration) func(_ int) ServerIface {
time.Sleep(delay)
req := AsProto[*pb.StringValue](in)
resp, err := mockSrv.Test(ctx, req)
return NewResponseMessage(in.GetMetadata(), resp), err
if err != nil {
return nil, err
}
return NewResponseMessage(in, resp), nil
})
return srv
}
Expand All @@ -59,7 +63,11 @@ func sendRequest(t testing.TB, node *Node, req request, msgID uint64) NodeRespon
if req.ctx == nil {
req.ctx = t.Context()
}
req.msg = NewRequest(req.ctx, msgID, mock.TestMethod, nil)
reqMsg, err := stream.NewMessage(req.ctx, msgID, mock.TestMethod, nil)
if err != nil {
t.Fatalf("NewMessage failed: %v", err)
}
req.msg = reqMsg
replyChan := make(chan NodeResponse[msg], 1)
req.responseChan = replyChan
node.channel.enqueue(req)
Expand Down Expand Up @@ -593,7 +601,8 @@ func TestChannelDeadlock(t *testing.T) {
for id := range 10 {
go func() {
ctx := TestContext(t, 3*time.Second)
req := request{ctx: ctx, msg: NewRequest(ctx, uint64(100+id), mock.TestMethod, nil)}
reqMsg, _ := stream.NewMessage(ctx, uint64(100+id), mock.TestMethod, nil)
req := request{ctx: ctx, msg: reqMsg}

// try to enqueue
select {
Expand Down Expand Up @@ -798,7 +807,8 @@ func BenchmarkChannelStreamReadyFirstRequest(b *testing.B) {

// Use a fresh context for the benchmark request
ctx := TestContext(b, defaultTestTimeout)
req := request{ctx: ctx, msg: NewRequest(ctx, 1, mock.TestMethod, nil)}
reqMsg, _ := stream.NewMessage(ctx, 1, mock.TestMethod, nil)
req := request{ctx: ctx, msg: reqMsg}
replyChan := make(chan NodeResponse[msg], 1)
req.responseChan = replyChan
node.channel.enqueue(req)
Expand Down Expand Up @@ -847,7 +857,8 @@ func BenchmarkChannelStreamReadyReconnect(b *testing.B) {

// Establish initial stream with a fresh context
ctx := context.Background()
req := request{ctx: ctx, msg: NewRequest(ctx, 0, mock.TestMethod, nil)}
reqMsg, _ := stream.NewMessage(ctx, 0, mock.TestMethod, nil)
req := request{ctx: ctx, msg: reqMsg}
replyChan := make(chan NodeResponse[msg], 1)
req.responseChan = replyChan
node.channel.enqueue(req)
Expand All @@ -872,7 +883,8 @@ func BenchmarkChannelStreamReadyReconnect(b *testing.B) {

// Now send a request which will trigger ensureStream -> newNodeStream -> signal
ctx := context.Background()
req := request{ctx: ctx, msg: NewRequest(ctx, uint64(i+1), mock.TestMethod, nil)}
reqMsg, _ := stream.NewMessage(ctx, uint64(i+1), mock.TestMethod, nil)
req := request{ctx: ctx, msg: reqMsg}
replyChan := make(chan NodeResponse[msg], 1)
req.responseChan = replyChan
node.channel.enqueue(req)
Expand Down
Loading