Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ func (f *Async[Resp]) Done() bool {
// AsyncMajority returns an Async future that resolves when a majority quorum is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncMajority() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncMajority() *Async[Resp] {
quorumSize := r.size/2 + 1
return r.AsyncThreshold(quorumSize)
}

// AsyncFirst returns an Async future that resolves when the first response is received.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncFirst() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncFirst() *Async[Resp] {
return r.AsyncThreshold(1)
}

// AsyncAll returns an Async future that resolves when all nodes have responded.
// Messages are sent immediately (synchronously) to preserve ordering.
func (r *Responses[Resp]) AsyncAll() *Async[Resp] {
func (r *Responses[T, Resp]) AsyncAll() *Async[Resp] {
return r.AsyncThreshold(r.size)
}

// AsyncThreshold returns an Async future that resolves when the threshold is reached.
// Messages are sent immediately (synchronously) to preserve ordering when multiple
// async calls are created in sequence.
func (r *Responses[Resp]) AsyncThreshold(threshold int) *Async[Resp] {
func (r *Responses[T, Resp]) AsyncThreshold(threshold int) *Async[Resp] {
// Send messages synchronously before spawning the goroutine to preserve ordering
r.sendNow()

Expand Down
10 changes: 5 additions & 5 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestAsync(t *testing.T) {
// a type alias short hand for the responses type
type respType = *gorums.Responses[*pb.StringValue]
type respType = *gorums.Responses[uint32, *pb.StringValue]
tests := []struct {
name string
call func(respType) *gorums.Async[*pb.StringValue]
Expand Down Expand Up @@ -45,7 +45,7 @@ func TestAsync(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
config := gorums.TestConfiguration(t, tt.numNodes, gorums.EchoServerFn)
ctx := gorums.TestContext(t, 2*time.Second)
responses := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
responses := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
config.Context(ctx),
pb.String("test"),
mock.TestMethod,
Expand All @@ -71,7 +71,7 @@ func TestAsync_Error(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

responses := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
responses := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
config.Context(ctx),
pb.String("test"),
mock.TestMethod,
Expand All @@ -92,7 +92,7 @@ func BenchmarkAsyncQuorumCall(b *testing.B) {
b.Run(fmt.Sprintf("AsyncMajority/%d", numNodes), func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
future := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
future := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
cfgCtx,
pb.String("test"),
mock.TestMethod,
Expand All @@ -108,7 +108,7 @@ func BenchmarkAsyncQuorumCall(b *testing.B) {
b.Run(fmt.Sprintf("BlockingMajority/%d", numNodes), func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, err := gorums.QuorumCall[*pb.StringValue, *pb.StringValue](
_, err := gorums.QuorumCall[uint32, *pb.StringValue, *pb.StringValue](
cfgCtx,
pb.String("test"),
mock.TestMethod,
Expand Down
43 changes: 23 additions & 20 deletions benchmark/benchmark_gorums.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions callopts.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gorums

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/runtime/protoimpl"
)

Expand Down Expand Up @@ -54,7 +53,7 @@ func IgnoreErrors() CallOption {
// resp, err := ReadQC(ctx, req,
// gorums.Interceptors(loggingInterceptor, filterInterceptor),
// ).Majority()
func Interceptors[Req, Resp proto.Message](interceptors ...QuorumInterceptor[Req, Resp]) CallOption {
func Interceptors[T NodeID, Req, Resp msg](interceptors ...QuorumInterceptor[T, Req, Resp]) CallOption {
return func(o *callOptions) {
for _, interceptor := range interceptors {
o.interceptors = append(o.interceptors, interceptor)
Expand Down
4 changes: 3 additions & 1 deletion callopts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func TestCallOptionsMustWaitSendDone(t *testing.T) {
}

func BenchmarkGetCallOptions(b *testing.B) {
interceptor := func(_ *ClientCtx[msg, msg], next ResponseSeq[msg]) ResponseSeq[msg] { return next }
interceptor := func(_ *ClientCtx[uint32, msg, msg], next ResponseSeq[uint32, msg]) ResponseSeq[uint32, msg] {
return next
}
tests := []struct {
numOpts int
}{
Expand Down
Loading
Loading