From 128d3ff95fc101863d233f543e5dda41a1b7b53c Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 10 Dec 2025 11:36:18 -0500 Subject: [PATCH] fix race where multiple workers' callbacks overwrite shared c.recValue field --- internal/internal_flags.go | 2 +- internal/internal_workflow.go | 8 ++- internal/internal_workflow_testsuite_test.go | 62 ++++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 456a7fe81..756ab2e09 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -34,7 +34,7 @@ const ( // unblockSelectorSignal exists to allow us to configure the default behavior of // SDKFlagBlockedSelectorSignalReceive. This is primarily useful with tests. -var unblockSelectorSignal = os.Getenv("UNBLOCK_SIGNAL_SELECTOR") != "" +var unblockSelectorSignal = os.Getenv("UNBLOCK_SIGNAL_SELECTOR") != "false" func sdkFlagFromUint(value uint32) sdkFlag { switch value { diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index eca3cdda3..f71aa2fe4 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1401,6 +1401,7 @@ func (s *selectorImpl) Select(ctx Context) { if pair.receiveFunc != nil { f := *pair.receiveFunc c := pair.channel + hasDefault := s.defaultFunc != nil callback := &receiveCallback{ fn: func(v interface{}, more bool) bool { if readyBranch != nil { @@ -1416,12 +1417,15 @@ func (s *selectorImpl) Select(ctx Context) { dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive) } - if dropSignalFlag { + // Only store value immediately when default branch exists, + // otherwise store in readyBranch to avoid race when multiple + // selectors blocked on same channel + if dropSignalFlag && hasDefault { c.recValue = &v } readyBranch = func() { - if !dropSignalFlag { + if !dropSignalFlag || !hasDefault { c.recValue = &v } f(c, more) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 6bf136647..de13d4f8c 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4250,3 +4250,65 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalNotLost() { err := env.GetWorkflowError() s.NoError(err) } + +func (s *WorkflowTestSuiteUnitTest) TestChannelWorkerPattern() { + previousFlag := unblockSelectorSignal + unblockSelectorSignal = true + defer func() { unblockSelectorSignal = previousFlag }() + + require := s.Require() + + // Two workers listening on the same channel with multiple items sent quickly. + // Without the fix, callbacks overwrite c.recValue causing items to be lost. + workflowFn := func(ctx Context) error { + ch := NewChannel(ctx) + var received []int + + Go(ctx, func(ctx Context) { + ch.Send(ctx, 1) + ch.Send(ctx, 2) + ch.Send(ctx, 3) + ch.Close() + }) + + // Two workers both selecting on the same channel + wg := NewWaitGroup(ctx) + wg.Add(2) + for i := 0; i < 2; i++ { + Go(ctx, func(ctx Context) { + defer wg.Done() + for { + selector := NewSelector(ctx) + done := false + selector.AddReceive(ch, func(c ReceiveChannel, more bool) { + if more { + var v int + c.Receive(ctx, &v) + received = append(received, v) + } else { + done = true + } + }) + selector.Select(ctx) + if done { + break + } + } + }) + } + + wg.Wait(ctx) + + // Without the fix: callbacks overwrite c.recValue, causing items to be lost + // With the fix: each callback's value is stored in its own readyBranch closure + require.Len(received, 3, "Expected 3 items to be received") + require.ElementsMatch([]int{1, 2, 3}, received, "Expected all items to be received exactly once") + return nil + } + + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(workflowFn) + + require.True(env.IsWorkflowCompleted()) + require.NoError(env.GetWorkflowError()) +}