Skip to content

Commit 37c0402

Browse files
committed
RPC cancellation test cases
1 parent ea0a9a9 commit 37c0402

File tree

9 files changed

+167
-16
lines changed

9 files changed

+167
-16
lines changed

cppwamp/include/cppwamp/corosession.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ class CoroSession : public TBase
108108
/** Function type for handling remote procedure calls. */
109109
using CallSlot = std::function<Outcome (Invocation)>;
110110

111+
/** Function type for handling RPC interruptions. */
112+
using InterruptSlot = std::function<Outcome (Interruption)>;
113+
111114
/** Yield context type used by the boost::asio::spawn handler. */
112115
template <typename TSpawnHandler>
113116
using YieldContext = boost::asio::basic_yield_context<TSpawnHandler>;
@@ -173,6 +176,12 @@ class CoroSession : public TBase
173176
Registration enroll(Procedure procedure, CallSlot slot,
174177
YieldContext<H> yield, std::error_code* ec = nullptr);
175178

179+
/** Registers a WAMP remote procedure call with an interruption handler. */
180+
template <typename H>
181+
Registration enroll(Procedure procedure, CallSlot slot,
182+
InterruptSlot interruptSlot, YieldContext<H> yield,
183+
std::error_code* ec = nullptr);
184+
176185
/** Unregisters a remote procedure call. */
177186
template <typename H>
178187
bool unregister(const Registration& reg, YieldContext<H> yield,

cppwamp/include/cppwamp/internal/client.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
727727
if (found != pendingInvocations_.end())
728728
{
729729
auto registrationId = found->second;
730+
pendingInvocations_.erase(found);
730731
auto kv = registry_.find(registrationId);
731732
if ((kv != registry_.end()) &&
732733
(kv->second.interruptSlot != nullptr))

cppwamp/include/cppwamp/internal/corosession.ipp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,36 @@ Registration CoroSession<B>::enroll(
201201
});
202202
}
203203

204+
//------------------------------------------------------------------------------
205+
/** @copydetails Session::enroll
206+
@throws error::Failure with an error code if a runtime error occured and
207+
the `ec` parameter is null. */
208+
//------------------------------------------------------------------------------
209+
template <typename B>
210+
template <typename H>
211+
Registration CoroSession<B>::enroll(
212+
Procedure procedure, /**< The procedure URI to register. */
213+
CallSlot callSlot, /**< Callable target to invoke when a matching RPC
214+
invocation is received. */
215+
InterruptSlot interruptSlot, /** Handler to execute when RPC is interrupted. */
216+
YieldContext<H> yield, /**< Represents the current coroutine. */
217+
std::error_code* ec /**< Optional pointer to an error code to set,
218+
instead of throwing an exception upon
219+
failure. */
220+
)
221+
{
222+
CPPWAMP_LOGIC_CHECK(this->state() == State::established,
223+
"Session is not established");
224+
225+
return run<Registration>(yield, ec,
226+
[this, &procedure, &callSlot, &interruptSlot]
227+
(CoroHandler<H, Registration>& handler)
228+
{
229+
this->enroll(std::move(procedure), std::move(callSlot),
230+
std::move(interruptSlot), handler);
231+
});
232+
}
233+
204234
//------------------------------------------------------------------------------
205235
/** @copydetails Session::unregister(const Registration&, AsyncHandler<bool>)
206236
@throws error::Failure with an error code if a runtime error occured and

cppwamp/include/cppwamp/internal/session.ipp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ CPPWAMP_INLINE void Session::enroll(
431431
CPPWAMP_INLINE void Session::enroll(
432432
Procedure procedure, /**< The procedure to register. */
433433
CallSlot callSlot, /**< The handler to execute when the RPC is invoked. */
434-
InterruptSlot interruptSlot, /** Handler to execute when RPC in interrupted. */
434+
InterruptSlot interruptSlot, /** Handler to execute when RPC is interrupted. */
435435
AsyncHandler<Registration> handler /**< Handler to invoke when
436436
the enroll operation completes. */
437437
)

cppwamp/include/cppwamp/internal/sessiondata.ipp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -483,14 +483,6 @@ Cancellation::Cancellation(RequestId reqId, CancelMode cancelMode)
483483
String modeStr;
484484
switch (cancelMode)
485485
{
486-
case CancelMode::unspecified:
487-
// Do nothing
488-
break;
489-
490-
case CancelMode::skip:
491-
modeStr = "skip";
492-
break;
493-
494486
case CancelMode::kill:
495487
modeStr = "kill";
496488
break;
@@ -499,6 +491,10 @@ Cancellation::Cancellation(RequestId reqId, CancelMode cancelMode)
499491
modeStr = "killNoWait";
500492
break;
501493

494+
case CancelMode::skip:
495+
modeStr = "skip";
496+
break;
497+
502498
default:
503499
assert(false && "Unexpected CancelMode enumerator");
504500
break;

cppwamp/include/cppwamp/session.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,11 @@ class Session : public std::enable_shared_from_this<Session>
189189
/// @name Remote Procedures
190190
/// @{
191191
/** Registers a WAMP remote procedure call. */
192-
void enroll(Procedure procedure, CallSlot callSlot,
192+
void enroll(Procedure procedure, CallSlot slot,
193193
AsyncHandler<Registration> handler);
194194

195195
/** Registers a WAMP remote procedure call with an interruption handler. */
196-
void enroll(Procedure procedure, CallSlot slot, InterruptSlot interruptSlot,
196+
void enroll(Procedure procedure, CallSlot callSlot, InterruptSlot interruptSlot,
197197
AsyncHandler<Registration> handler);
198198

199199
/** Unregisters a remote procedure call. */

cppwamp/include/cppwamp/sessiondata.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class Cancellation : public Options<Cancellation>
312312
{
313313
public:
314314
/** Converting constructor. */
315-
Cancellation(RequestId reqId, CancelMode cancelMode = CancelMode::skip);
315+
Cancellation(RequestId reqId, CancelMode cancelMode = CancelMode::kill);
316316

317317
RequestId requestId() const;
318318

cppwamp/include/cppwamp/wampdefs.hpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ enum class SessionState
4444
//------------------------------------------------------------------------------
4545
enum class CancelMode
4646
{
47-
unspecified, ///< The cancel mode was not specified
48-
skip, ///< No INTERRUPT sent to callee; router immediately returns ERROR
49-
kill, ///< INTERRUPT sent to callee; RESULT or ERROR returned, depending on callee
50-
killNoWait ///< INTERRUPT sent to callee; router immediately returns ERROR
47+
kill, ///< INTERRUPT sent to callee; RESULT or ERROR returned, depending on callee
48+
killNoWait, ///< INTERRUPT sent to callee; router immediately returns ERROR
49+
skip ///< No INTERRUPT sent to callee; router immediately returns ERROR
5150
};
5251

5352
} // namespace wamp

test/wamptestadvanced.cpp

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,4 +405,120 @@ GIVEN( "a Session with a registered challenge handler" )
405405
}
406406
}}
407407

408+
//------------------------------------------------------------------------------
409+
SCENARIO( "RPC Cancellation", "[WAMP]" )
410+
{
411+
GIVEN( "a caller and a callee" )
412+
{
413+
AsioService iosvc;
414+
RpcFixture f(iosvc, tcp(iosvc));
415+
416+
WHEN( "cancelling an RPC in kill mode before it returns" )
417+
{
418+
boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield)
419+
{
420+
RequestId callRequestId = 0;
421+
RequestId invocationRequestId = 0;
422+
RequestId interruptionRequestId = 0;
423+
bool responseReceived = false;
424+
AsyncResult<Result> response;
425+
426+
f.join(yield);
427+
428+
f.callee->enroll(
429+
Procedure("rpc"),
430+
[&invocationRequestId](Invocation inv) -> Outcome
431+
{
432+
invocationRequestId = inv.requestId();
433+
return Outcome::deferred();
434+
},
435+
[&interruptionRequestId](Interruption intr) -> Outcome
436+
{
437+
interruptionRequestId = intr.requestId();
438+
return Error("wamp.error.canceled");
439+
},
440+
yield);
441+
442+
callRequestId = f.caller->call(Rpc("rpc"),
443+
[&response, &responseReceived](AsyncResult<Result> callResponse)
444+
{
445+
responseReceived = true;
446+
response = std::move(callResponse);
447+
});
448+
449+
REQUIRE( callRequestId != 0 );
450+
451+
while (invocationRequestId == 0)
452+
f.caller->suspend(yield);
453+
454+
REQUIRE( invocationRequestId != 0 );
455+
456+
f.caller->cancel(Cancellation(callRequestId, CancelMode::kill));
457+
458+
while (!responseReceived)
459+
f.caller->suspend(yield);
460+
461+
CHECK( interruptionRequestId == invocationRequestId );
462+
CHECK( response.errorCode() == SessionErrc::cancelled );
463+
464+
f.disconnect();
465+
});
466+
iosvc.run();
467+
}
468+
469+
WHEN( "cancelling an RPC after it returns" )
470+
{
471+
boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield)
472+
{
473+
RequestId callRequestId = 0;
474+
RequestId invocationRequestId = 0;
475+
RequestId interruptionRequestId = 0;
476+
bool responseReceived = false;
477+
AsyncResult<Result> response;
478+
479+
f.join(yield);
480+
481+
f.callee->enroll(
482+
Procedure("rpc"),
483+
[&invocationRequestId](Invocation inv) -> Outcome
484+
{
485+
invocationRequestId = inv.requestId();
486+
return Result{Variant{"completed"}};
487+
},
488+
[&interruptionRequestId](Interruption intr) -> Outcome
489+
{
490+
interruptionRequestId = intr.requestId();
491+
return Error("wamp.error.canceled");
492+
},
493+
yield);
494+
495+
callRequestId = f.caller->call(Rpc("rpc"),
496+
[&response, &responseReceived](AsyncResult<Result> callResponse)
497+
{
498+
responseReceived = true;
499+
response = std::move(callResponse);
500+
});
501+
502+
while (!responseReceived)
503+
f.caller->suspend(yield);
504+
505+
REQUIRE( response.get().args() == Array{Variant{"completed"}} );
506+
507+
f.caller->cancel(Cancellation(callRequestId, CancelMode::kill));
508+
509+
/* Router should not treat late CANCEL as a protocol error, and
510+
should allow clients to continue calling RPCs. */
511+
f.caller->call(Rpc("rpc"), yield);
512+
513+
/* Router should discard INTERRUPT messages for non-pending RPCs. */
514+
CHECK( interruptionRequestId == 0 );
515+
516+
f.disconnect();
517+
});
518+
iosvc.run();
519+
}
520+
521+
// TODO: Test other cancel modes once they're supported by Crossbar
522+
}}
523+
408524
#endif // #if CPPWAMP_TESTING_WAMP

0 commit comments

Comments
 (0)