Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ func (c *ClientCtx[Req, Resp]) send() {
if streamMsg == nil {
continue // Skip node: transformAndMarshal already sent ErrSkipNode
}
n.channel.enqueue(request{
ctx: c.Context,
msg: streamMsg,
streaming: c.streaming,
waitSendDone: c.waitSendDone,
responseChan: c.replyChan,
n.channel.Enqueue(stream.Request{
Ctx: c.Context,
Msg: streamMsg,
Streaming: c.streaming,
WaitSendDone: c.waitSendDone,
ResponseChan: c.replyChan,
})
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *ClientCtx[Req, Resp]) defaultResponseSeq() ResponseSeq[Resp] {
for range c.Size() {
select {
case r := <-c.replyChan:
res := newNodeResponse[Resp](r)
res := mapToCallResponse[Resp](r)
if !yield(res) {
return // Consumer stopped iteration
}
Expand All @@ -261,7 +261,7 @@ func (c *ClientCtx[Req, Resp]) streamingResponseSeq() ResponseSeq[Resp] {
for {
select {
case r := <-c.replyChan:
res := newNodeResponse[Resp](r)
res := mapToCallResponse[Resp](r)
if !yield(res) {
return // Consumer stopped iteration
}
Expand Down
62 changes: 0 additions & 62 deletions encoding.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package gorums

import (
"fmt"

"github.com/relab/gorums/internal/stream"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
)

// Message encapsulates the stream.Message and the actual proto.Message.
Expand Down Expand Up @@ -82,61 +78,3 @@ func messageWithError(in, out *Message, err error) *Message {
}
return out
}

// unmarshalRequest unmarshals the request proto message from the message.
// It uses the method name in the message to look up the Input type from the proto registry.
//
// This function should only be used by internal channel operations.
func unmarshalRequest(in *stream.Message) (proto.Message, error) {
// get method descriptor from registry
desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(in.GetMethod()))
if err != nil {
return nil, fmt.Errorf("gorums: could not find method descriptor for %s", in.GetMethod())
}
methodDesc := desc.(protoreflect.MethodDescriptor)

// get the request message type (Input type)
msgType, err := protoregistry.GlobalTypes.FindMessageByName(methodDesc.Input().FullName())
if err != nil {
return nil, fmt.Errorf("gorums: could not find message type %s", methodDesc.Input().FullName())
}
req := msgType.New().Interface()

// unmarshal message from the Message.Payload field
payload := in.GetPayload()
if len(payload) > 0 {
if err := proto.Unmarshal(payload, req); err != nil {
return nil, fmt.Errorf("gorums: could not unmarshal request: %w", err)
}
}
return req, nil
}

// unmarshalResponse unmarshals the response proto message from the message.
// It uses the method name in the message to look up the Output type from the proto registry.
//
// This function should only be used by internal channel operations.
func unmarshalResponse(out *stream.Message) (proto.Message, error) {
// get method descriptor from registry
desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(out.GetMethod()))
if err != nil {
return nil, fmt.Errorf("gorums: could not find method descriptor for %s", out.GetMethod())
}
methodDesc := desc.(protoreflect.MethodDescriptor)

// get the response message type (Output type)
msgType, err := protoregistry.GlobalTypes.FindMessageByName(methodDesc.Output().FullName())
if err != nil {
return nil, fmt.Errorf("gorums: could not find message type %s", methodDesc.Output().FullName())
}
resp := msgType.New().Interface()

// unmarshal message from the Message.Payload field
payload := out.GetPayload()
if len(payload) > 0 {
if err := proto.Unmarshal(payload, resp); err != nil {
return nil, fmt.Errorf("gorums: could not unmarshal response: %w", err)
}
}
return resp, nil
}
4 changes: 3 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"strings"

"github.com/relab/gorums/internal/stream"
)

// ErrIncomplete is the error returned by a quorum call when the call cannot be completed
Expand All @@ -14,7 +16,7 @@ var ErrIncomplete = errors.New("incomplete call")
var ErrSendFailure = errors.New("send failure")

// ErrTypeMismatch is returned when a response cannot be cast to the expected type.
var ErrTypeMismatch = errors.New("response type mismatch")
var ErrTypeMismatch = stream.ErrTypeMismatch

// ErrSkipNode is returned when a node is skipped by request transformations.
// This allows the response iterator to account for all nodes without blocking.
Expand Down
Loading
Loading