Skip to content

Commit ea0a9a9

Browse files
committed
First cut of call cancellation implementation
1 parent 738f224 commit ea0a9a9

File tree

10 files changed

+329
-47
lines changed

10 files changed

+329
-47
lines changed

cppwamp/include/cppwamp/corosession.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class CoroSession : public TBase
128128
using Base::enroll;
129129
using Base::unregister;
130130
using Base::call;
131+
using Base::cancel;
131132

132133
/// @name Session Management
133134
/// @{

cppwamp/include/cppwamp/internal/client.hpp

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,15 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
127127

128128
virtual void disconnect() override
129129
{
130+
pendingInvocations_.clear();
130131
this->close(false);
131132
}
132133

133134
virtual void terminate() override
134135
{
135136
using Handler = AsyncTask<std::string>;
136137
setLogHandlers(Handler(), Handler());
138+
pendingInvocations_.clear();
137139
this->close(true);
138140
}
139141

@@ -251,11 +253,13 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
251253
});
252254
}
253255

254-
virtual void enroll(Procedure&& procedure, CallSlot&& slot,
256+
virtual void enroll(Procedure&& procedure, CallSlot&& callSlot,
257+
InterruptSlot&& interruptSlot,
255258
AsyncTask<Registration>&& handler) override
256259
{
257260
using std::move;
258-
RegistrationRecord rec{move(procedure), move(slot), handler.iosvc()};
261+
RegistrationRecord rec{ move(procedure), move(callSlot),
262+
move(interruptSlot), handler.iosvc() };
259263
enrollMsg_.at(2) = rec.procedure.options();
260264
enrollMsg_.at(3) = rec.procedure.uri();
261265
auto self = this->shared_from_this();
@@ -318,36 +322,48 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
318322
std::move(handler)(false);
319323
}
320324

321-
virtual void call(Rpc&& rpc, AsyncTask<Result>&& handler) override
325+
virtual RequestId call(Rpc&& rpc, AsyncTask<Result>&& handler) override
322326
{
323327
using std::move;
324328
Error* errorPtr = rpc.error({});
329+
RequestId id = 0;
325330

326331
if (!rpc.kwargs().empty())
327332
{
328333
callWithKwargsMsg_.at(2) = move(rpc.options({}));
329334
callWithKwargsMsg_.at(3) = move(rpc.procedure({}));
330335
callWithKwargsMsg_.at(4) = move(rpc.args({}));
331336
callWithKwargsMsg_.at(5) = move(rpc.kwargs({}));
332-
callProcedure(callWithKwargsMsg_, errorPtr, move(handler));
337+
id = callProcedure(callWithKwargsMsg_, errorPtr, move(handler));
333338
}
334339
else if (!rpc.args().empty())
335340
{
336341
callWithArgsMsg_.at(2) = move(rpc.options({}));
337342
callWithArgsMsg_.at(3) = move(rpc.procedure({}));
338343
callWithArgsMsg_.at(4) = move(rpc.args({}));
339-
callProcedure(callWithArgsMsg_, errorPtr, move(handler));
344+
id = callProcedure(callWithArgsMsg_, errorPtr, move(handler));
340345
}
341346
else
342347
{
343348
callMsg_.at(2) = move(rpc.options({}));
344349
callMsg_.at(3) = move(rpc.procedure({}));
345-
callProcedure(callMsg_, errorPtr, move(handler));
350+
id = callProcedure(callMsg_, errorPtr, move(handler));
346351
}
352+
353+
return id;
354+
}
355+
356+
virtual void cancel(Cancellation&& cancellation) override
357+
{
358+
cancelMsg_.at(1) = cancellation.requestId();
359+
cancelMsg_.at(2) = move(cancellation.options({}));
360+
this->send(cancelMsg_);
347361
}
348362

349363
virtual void yield(RequestId reqId, Result&& result) override
350364
{
365+
pendingInvocations_.erase(reqId);
366+
351367
using std::move;
352368
if (!result.kwargs().empty())
353369
{
@@ -374,8 +390,8 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
374390

375391
virtual void yield(RequestId reqId, Error&& failure) override
376392
{
377-
using std::move;
378-
this->sendError(WampMsgType::invocation, reqId, move(failure));
393+
pendingInvocations_.erase(reqId);
394+
this->sendError(WampMsgType::invocation, reqId, std::move(failure));
379395
}
380396

381397
virtual void setLogHandlers(AsyncTask<std::string> warningHandler,
@@ -408,19 +424,22 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
408424

409425
struct RegistrationRecord
410426
{
411-
using Slot = std::function<Outcome (Invocation)>;
427+
using CallSlot = std::function<Outcome (Invocation)>;
428+
using InterruptSlot = std::function<Outcome (Interruption)>;
412429

413430
RegistrationRecord() : procedure(""), iosvc(nullptr) {}
414431

415-
RegistrationRecord(Procedure&& procedure, Slot&& slot,
416-
AsioService& iosvc)
432+
RegistrationRecord(Procedure&& procedure, CallSlot&& callSlot,
433+
InterruptSlot&& interruptSlot, AsioService& iosvc)
417434
: procedure(std::move(procedure)),
418-
slot(std::move(slot)),
435+
callSlot(std::move(callSlot)),
436+
interruptSlot(std::move(interruptSlot)),
419437
iosvc(&iosvc)
420438
{}
421439

422440
Procedure procedure;
423-
Slot slot;
441+
CallSlot callSlot;
442+
InterruptSlot interruptSlot;
424443
AsioService* iosvc = nullptr;
425444
};
426445

@@ -432,6 +451,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
432451
using Readership = std::map<SubscriptionId, LocalSubs>;
433452
using TopicMap = std::map<std::string, SubscriptionId>;
434453
using Registry = std::map<RegistrationId, RegistrationRecord>;
454+
using InvocationMap = std::map<RequestId, RegistrationId>;
435455

436456
Client(TransportPtr transport)
437457
: Base(std::move(transport))
@@ -502,11 +522,11 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
502522
}
503523
}
504524

505-
void callProcedure(Message& msg, Error* errorPtr,
506-
AsyncTask<Result>&& handler)
525+
RequestId callProcedure(Message& msg, Error* errorPtr,
526+
AsyncTask<Result>&& handler)
507527
{
508528
auto self = this->shared_from_this();
509-
this->request(msg,
529+
return this->request(msg,
510530
[this, self, errorPtr, handler](std::error_code ec, Message reply)
511531
{
512532
if ((reply.type == WampMsgType::error) && (errorPtr != nullptr))
@@ -555,6 +575,10 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
555575
onInvocation(std::move(msg));
556576
break;
557577

578+
case WampMsgType::interrupt:
579+
onInterrupt(std::move(msg));
580+
break;
581+
558582
default:
559583
assert(false);
560584
}
@@ -673,14 +697,16 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
673697
if (kv != registry_.end())
674698
{
675699
auto self = this->shared_from_this();
676-
Invocation inv({}, self, requestId, kv->second.iosvc,
700+
const RegistrationRecord& rec = kv->second;
701+
Invocation inv({}, self, requestId, rec.iosvc,
677702
move(msg.as<Object>(3)));
678703
if (msg.fields.size() >= 5)
679704
inv.args({}) = move(msg.as<Array>(4));
680705
if (msg.fields.size() >= 6)
681706
inv.kwargs({}) = move(msg.as<Object>(5));
682707

683-
dispatchInvocation(kv->second, inv);
708+
pendingInvocations_[requestId] = regId;
709+
dispatchRpcRequest(*rec.iosvc, rec.callSlot, inv);
684710
}
685711
else
686712
{
@@ -691,31 +717,54 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
691717
}
692718
}
693719

694-
void dispatchInvocation(const RegistrationRecord& reg,
695-
const Invocation& invocation)
720+
void onInterrupt(Message&& msg)
721+
{
722+
using std::move;
723+
724+
auto requestId = msg.to<RequestId>(1);
725+
726+
auto found = pendingInvocations_.find(requestId);
727+
if (found != pendingInvocations_.end())
728+
{
729+
auto registrationId = found->second;
730+
auto kv = registry_.find(registrationId);
731+
if ((kv != registry_.end()) &&
732+
(kv->second.interruptSlot != nullptr))
733+
{
734+
auto self = this->shared_from_this();
735+
const RegistrationRecord& rec = kv->second;
736+
Interruption intr({}, self, requestId, rec.iosvc,
737+
move(msg.as<Object>(2)));
738+
dispatchRpcRequest(*rec.iosvc, rec.interruptSlot, intr);
739+
}
740+
}
741+
}
742+
743+
template <typename TSlot, typename TRequest>
744+
void dispatchRpcRequest(AsioService& iosvc, const TSlot& slot,
745+
const TRequest& request)
696746
{
697747
auto self = this->shared_from_this();
698-
const auto& slot = reg.slot;
699-
reg.iosvc->post([this, self, slot, invocation]()
748+
iosvc.post([this, self, slot, request]()
700749
{
701-
// Copy the request ID before the Invocation object gets moved away.
702-
auto reqId = invocation.requestId();
750+
// Copy the request ID before the request object gets moved away.
751+
auto requestId = request.requestId();
703752

704753
try
705754
{
706-
Outcome outcome(slot(std::move(invocation)));
755+
Outcome outcome(slot(std::move(request)));
707756
switch (outcome.type())
708757
{
709758
case Outcome::Type::deferred:
710759
// Do nothing
711760
break;
712761

713762
case Outcome::Type::result:
714-
yield(reqId, std::move(outcome).asResult());
763+
yield(requestId, std::move(outcome).asResult());
715764
break;
716765

717766
case Outcome::Type::error:
718-
yield(reqId, std::move(outcome).asError());
767+
yield(requestId, std::move(outcome).asError());
719768
break;
720769

721770
default:
@@ -724,12 +773,12 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
724773
}
725774
catch (Error& error)
726775
{
727-
yield(reqId, std::move(error));
776+
yield(requestId, std::move(error));
728777
}
729778
catch (const error::BadType& e)
730779
{
731780
// Forward Variant conversion exceptions as ERROR messages.
732-
yield(reqId, Error(e));
781+
yield(requestId, Error(e));
733782
}
734783
});
735784
}
@@ -813,6 +862,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
813862
callMsg_ = M{ T::call, {n, n, o, s} };
814863
callWithArgsMsg_ = M{ T::call, {n, n, o, s, a} };
815864
callWithKwargsMsg_ = M{ T::call, {n, n, o, s, a, o} };
865+
cancelMsg_ = M{ T::cancel, {n, n, o} };
816866
yieldMsg_ = M{ T::yield, {n, n, o} };
817867
yieldWithArgsMsg_ = M{ T::yield, {n, n, o, a} };
818868
yieldWithKwargsMsg_ = M{ T::yield, {n, n, o, a, o} };
@@ -824,6 +874,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
824874
TopicMap topics_;
825875
Readership readership_;
826876
Registry registry_;
877+
InvocationMap pendingInvocations_;
827878
AsyncTask<std::string> warningHandler_;
828879
AsyncTask<Challenge> challengeHandler_;
829880

@@ -838,6 +889,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
838889
Message callMsg_;
839890
Message callWithArgsMsg_;
840891
Message callWithKwargsMsg_;
892+
Message cancelMsg_;
841893
Message yieldMsg_;
842894
Message yieldWithArgsMsg_;
843895
Message yieldWithKwargsMsg_;

cppwamp/include/cppwamp/internal/clientinterface.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ class SubscriptionImpl;
3737
class ClientInterface : public Callee, public Subscriber
3838
{
3939
public:
40-
using Ptr = std::shared_ptr<ClientInterface>;
41-
using WeakPtr = std::weak_ptr<ClientInterface>;
42-
using EventSlot = std::function<void (Event)>;
43-
using CallSlot = std::function<Outcome (Invocation)>;
40+
using Ptr = std::shared_ptr<ClientInterface>;
41+
using WeakPtr = std::weak_ptr<ClientInterface>;
42+
using EventSlot = std::function<void (Event)>;
43+
using CallSlot = std::function<Outcome (Invocation)>;
44+
using InterruptSlot = std::function<Outcome (Interruption)>;
4445

4546
static const Object& roles();
4647

@@ -65,10 +66,13 @@ class ClientInterface : public Callee, public Subscriber
6566

6667
virtual void publish(Pub&& pub, AsyncTask<PublicationId>&& handler) = 0;
6768

68-
virtual void enroll(Procedure&& procedure, CallSlot&& slot,
69+
virtual void enroll(Procedure&& procedure, CallSlot&& callSlot,
70+
InterruptSlot&& interruptSlot,
6971
AsyncTask<Registration>&& handler) = 0;
7072

71-
virtual void call(Rpc&& rpc, AsyncTask<Result>&& handler) = 0;
73+
virtual RequestId call(Rpc&& rpc, AsyncTask<Result>&& handler) = 0;
74+
75+
virtual void cancel(Cancellation&& cancellation) = 0;
7276

7377
virtual void setLogHandlers(AsyncTask<std::string> warningHandler,
7478
AsyncTask<std::string> traceHandler) = 0;
@@ -81,14 +85,15 @@ inline const Object& ClientInterface::roles()
8185
static const Object roles =
8286
{
8387
{"callee", Object{{"features", Object{{
88+
{"call_canceling", true},
8489
{"call_trustlevels", true},
8590
{"caller_identification", true},
8691
{"pattern_based_registration", true},
8792
{"progressive_call_results", true}
8893
}}}}},
8994
{"caller", Object{{"features", Object{{
95+
{"call_canceling", true},
9096
{"call_timeout", true},
91-
{"callee_blackwhite_listing", true}, // Deprecated
9297
{"caller_exclusion", true},
9398
{"caller_identification", true}
9499
}}}}},

cppwamp/include/cppwamp/internal/messagetraits.ipp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ CPPWAMP_INLINE const MessageTraits& MessageTraits::lookup(WampMsgType type)
9696
/* 46, --- */ {W::none, 0, 0, 0, 0, 0, 0, 0, 0, {i,n,n,n,n,n,n}},
9797
/* 47, --- */ {W::none, 0, 0, 0, 0, 0, 0, 0, 0, {i,n,n,n,n,n,n}},
9898
/* 48, call */ {W::none, 1, 4, 6, 0, 1, 0, 0, 1, {i,i,o,s,a,o,n}},
99-
/* 49, cancel */ {W::none, 0, 3, 3, 0, 0, 0, 0, 1, {i,i,o,n,n,n,n}},
99+
/* 49, cancel */ {W::none, 0, 3, 3, 0, 1, 0, 0, 1, {i,i,o,n,n,n,n}},
100100
/* 50, result */ {W::call, 1, 3, 5, 1, 0, 0, 0, 1, {i,i,o,a,o,n,n}},
101101
/* 51, --- */ {W::none, 0, 0, 0, 0, 0, 0, 0, 0, {i,n,n,n,n,n,n}},
102102
/* 52, --- */ {W::none, 0, 0, 0, 0, 0, 0, 0, 0, {i,n,n,n,n,n,n}},
@@ -116,7 +116,7 @@ CPPWAMP_INLINE const MessageTraits& MessageTraits::lookup(WampMsgType type)
116116
/* 66, unregister */ {W::none, 1, 3, 3, 0, 1, 0, 0, 1, {i,i,i,n,n,n,n}},
117117
/* 67, unregistered */ {W::unregister, 1, 2, 2, 1, 0, 0, 0, 1, {i,i,n,n,n,n,n}},
118118
/* 68, invocation */ {W::none, 1, 4, 6, 1, 0, 0, 0, 1, {i,i,i,o,a,o,n}},
119-
/* 69, interrupt */ {W::none, 0, 3, 3, 0, 0, 0, 0, 1, {i,i,o,n,n,n,n}},
119+
/* 69, interrupt */ {W::none, 0, 3, 3, 1, 0, 0, 0, 1, {i,i,o,n,n,n,n}},
120120
/* 70, yield */ {W::invocation, 0, 3, 5, 0, 1, 0, 0, 1, {i,i,o,a,o,n,n}}
121121
};
122122

cppwamp/include/cppwamp/internal/peer.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
158158
}
159159
}
160160

161-
void request(Message& msg, Handler&& handler)
161+
RequestId request(Message& msg, Handler&& handler)
162162
{
163-
sendMessage(msg, std::move(handler));
163+
return sendMessage(msg, std::move(handler));
164164
}
165165

166166
template <typename TErrorValue>
@@ -187,15 +187,19 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
187187
using RequestKey = typename Message::RequestKey;
188188
using RequestMap = std::map<RequestKey, Handler>;
189189

190-
void sendMessage(Message& msg, Handler&& handler = nullptr)
190+
RequestId sendMessage(Message& msg, Handler&& handler = nullptr)
191191
{
192192
assert(msg.type != WampMsgType::none);
193193

194194
msg.fields.at(0) = static_cast<Int>(msg.type);
195195

196+
RequestId requestId = 0;
196197
auto idPos = msg.traits().idPosition;
197198
if (idPos > 0)
198-
msg.fields.at(idPos) = nextRequestId();
199+
{
200+
requestId = nextRequestId();
201+
msg.fields.at(idPos) = requestId;
202+
}
199203

200204
trace(msg, true);
201205

@@ -207,6 +211,8 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
207211
requestMap_[msg.requestKey()] = std::move(handler);
208212

209213
transport_->send(std::move(buf));
214+
215+
return requestId;
210216
}
211217

212218
RequestId nextRequestId()

0 commit comments

Comments
 (0)