Skip to content

Progressive callees deadlock when handler returns error while caller IsInProgress #319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Skinner927 opened this issue Jan 25, 2025 · 1 comment · May be fixed by #320
Open

Progressive callees deadlock when handler returns error while caller IsInProgress #319

Skinner927 opened this issue Jan 25, 2025 · 1 comment · May be fixed by #320

Comments

@Skinner927
Copy link

Describe the bug

If a progressive call InvocationHandler (callee) function returns an error before it receives the final progressive message from the caller:

  1. The handler function is incorrectly called again after returning an error
  2. The callee Client cannot be closed because of a blocking write to a full chan resChan <- handler(ctx, msg)

To Reproduce

I wrote a test function that can be dropped into client_test.go. I'll include it at the end.

Expected behavior

According to the second example in Continuations on Completed Calls

Due to network delays, the Dealer may be unaware that the call is completed by time it sends another progressive INVOCATION.When a Callee receives an INVOCATION under the following conditions:

  • the Callee supports Progressive Call Invocations,
  • the INVOCATION request ID does not correspond to a new call request, and,
  • the INVOCATION request ID does not match any RPC invocation in progress,

then it MUST ignore and discard that INVOCATION message without any further correlated response to the Dealer.

Emphasis my own, but because Nexus is handling the protocol, it should not call the handler function after the handler returns an error.

More importantly, it should not deadlock when the handler returns an error before the caller is done sending progressive invocations. The diagram that follows the above quoted text clearly shows this is legal.

Environment (please complete the following information):

  • OS: Linux, Ubuntu 22.04
  • Nexus Router version: branch v3 HEAD 5cfa511313807ad6802ff4b5a2b738d18507ef23
  • Name and version of WAMP Client library (if applicable): ^

Additional context

Test:

// Tests that a callee can return error while caller IsInProgress
func TestProgressiveCallInvocationCalleeError(t *testing.T) {
	// Connect two clients to the same server
	callee, caller, rooter := connectedTestClients(t)

	const forcedError = wamp.URI("error.forced")
	moreArgsSent := make(chan struct{}, 1)
	errorRaised := false

	invocationHandler := func(ctx context.Context, inv *wamp.Invocation) InvokeResult {
		switch inv.Arguments[0].(int) {
		case 1:
			// Eat the first arg
			t.Log("n=1 Returning OmitResult")
			return InvokeResult{Err: wamp.InternalProgressiveOmitResult}
		case 2:
			t.Log("n=2 Waiting for moreArgsSent")
			// Wait till the 4th arg is sent which means 3 should already
			// be waiting
			<-moreArgsSent
			time.Sleep(100 * time.Millisecond)
			errorRaised = true
			t.Log("n=2 Returning error (as expected)")
			// Error
			return InvokeResult{Err: forcedError}
		default:
			// BUG: The handler function should never be called again
			t.Error("Handler should not have been called after error returned")
			return InvokeResult{Err: forcedError}
		}
	}

	const procName = "nexus.test.progprocerr"

	// Register procedure
	err := callee.Register(procName, invocationHandler, nil)
	require.NoError(t, err)

	// Test calling the procedure.
	callArgs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	ctx := context.Background()

	callSends := 0
	sendProgDataCb := func(ctx context.Context) (options wamp.Dict, args wamp.List, kwargs wamp.Dict, err error) {
		options = wamp.Dict{}

		if callSends == (len(callArgs) - 1) {
			options[wamp.OptProgress] = false
		} else {
			options[wamp.OptProgress] = true
		}

		args = wamp.List{callArgs[callSends]}
		callSends++

		// signal the handler should return its error
		if 4 == callSends {
			moreArgsSent <- struct{}{}
		}
		t.Logf("Sending n=%v", callSends)

		return options, args, nil, nil
	}

	result, err := caller.CallProgressive(ctx, procName, sendProgDataCb, nil)
	require.Error(t, err, "Expected call to return an error")
	require.Nil(t, result, "Expected call to return no result")
	var rErr RPCError
	if errors.As(err, &rErr) {
		require.Equal(t, forcedError, rErr.Err.Error, "Unexpected error URI")
	} else {
		t.Error("Unexpected error type")
	}
	require.GreaterOrEqual(t, callSends, 4)
	require.True(t, errorRaised, "Error was never raised in handler")

	// Test unregister.
	err = callee.Unregister(procName)
	// require.NoError(t, err)

	// BUG: Show deadlock
	t.Log("Closing rooter")
	rooter.Close()
	t.Log("Closing caller")
	caller.Close()
	goleak.VerifyNone(t)
	t.Log("Closing callee")
	// BUG: We never get past here
	callee.Close()

	t.Log("All closed")
	goleak.VerifyNone(t)
	t.Log("Done")
}

Terminal output:

=== RUN   TestProgressiveCallInvocationCalleeError
2025/01/25 01:56:38 Starting router 
2025/01/25 01:56:38 Added realm: nexus.test
    client_test.go:766: Sending n=1
    client_test.go:766: Sending n=2
    client_test.go:766: Sending n=3
    client_test.go:766: Sending n=4
    client_test.go:766: Sending n=5
    client_test.go:766: Sending n=6
    client_test.go:766: Sending n=7
    client_test.go:766: Sending n=8
    client_test.go:766: Sending n=9
    client_test.go:720: n=1 Returning OmitResult
    client_test.go:723: n=2 Waiting for moreArgsSent
    client_test.go:766: Sending n=10
    client_test.go:729: n=2 Returning error (as expected)
    client_test.go:734: Handler should not have been called after error returned
    client_test.go:734: Handler should not have been called after error returned
    client_test.go:788: Closing rooter
2025/01/25 01:56:38 Realm nexus.test completed shutdown
2025/01/25 01:56:38 Router stopped
    client_test.go:790: Closing caller
    client_test.go:792: found unexpected goroutines:
        [Goroutine 28 in state chan send, with github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation on top of the stack:
        goroutine 28 [chan send]:
        github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation(0xc0001484e0, 0xc000166300)
        	/home/user/git/nexus/client/client.go:1644 +0xd6d
        github.com/gammazero/nexus/v3/client.(*Client).runReceiveFromRouter(0xc0001484e0, {0x8e11c0?, 0xc000166300?})
        	/home/user/git/nexus/client/client.go:1453 +0x13e
        github.com/gammazero/nexus/v3/client.(*Client).run(0xc0001484e0)
        	/home/user/git/nexus/client/client.go:1428 +0x1c5
        created by github.com/gammazero/nexus/v3/client.NewClient in goroutine 19
        	/home/user/git/nexus/client/client.go:262 +0x509
        
         Goroutine 35 in state chan send, with github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1.1 on top of the stack:
        goroutine 35 [chan send]:
        github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1.1()
        	/home/user/git/nexus/client/client.go:1669 +0xfd
        created by github.com/gammazero/nexus/v3/client.(*Client).runHandleInvocation.func1 in goroutine 34
        	/home/user/git/nexus/client/client.go:1655 +0x18a
        ]
    client_test.go:793: Closing callee

// process doesn't exit

I'll likely put up a PR in the coming days, but if a maintainer wants to tackle this, just please let me know.

@Skinner927
Copy link
Author

Skinner927 commented Feb 4, 2025

Curious if anyone could chime in on what the best or most correct way to fix this is.

The underlying issue is handlerQueues are being created for invocations that have already been closed on our end.

My first idea was to store the newest (greatest) reqID (INVOCATION.Request) and prevent any older (less or equal) reqID from creating new handlerQueues. This works great until the reqID rolls over. Since the Dealer controls the reqID and the only requirements are that it increments and wraps back to 1, it's difficult to properly check for rollover in runHandleInvocation() alone. This seems like the most correct solution, but I have no idea how to properly handle the rollover.
Note: it's (appears to be?) completely legal for a progressive invocation with reqID == 1 to still be alive when reqID wraps back to 1 (which seems like its own bag of problems; how does that work?).

My second idea is to store reqIDs we've closed on our end for an amount of time (5 min?) or quantity (50?). This seems less buggy but technically potentially suffers from the same wraparound problem as reqID == 1 could close, we could get an onslaught of messages that are not invocations which will increase the session reqID, but runHandleInvocation() would be oblivious to it and reqID rolls back around and since it's still in our "recently closed" lookup, we reject it.

🦆 I think the only real solution is to have the Client keep track of incoming reqIDs from the Dealer and expose a method for checking if a reqID is "new" or "old". Trouble is, the first ID I receive in runReceiveFromRouter() is 23 (for REGISTER). I'll need to keep digging but I'm not sure why it's so high to begin with?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant