Skip to content

Commit 637589d

Browse files
committed
Session state change callback
Closes #78
1 parent cdd2731 commit 637589d

File tree

8 files changed

+312
-75
lines changed

8 files changed

+312
-75
lines changed

cppwamp/include/cppwamp/internal/asynctask.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ class AsyncTask
4343
other.executor_ = nullptr;
4444
}
4545

46+
void reset()
47+
{
48+
executor_ = nullptr;
49+
handler_ = nullptr;
50+
}
51+
4652
AsyncTask& operator=(const AsyncTask& other) = default;
4753

4854
AsyncTask& operator=(AsyncTask&& other) noexcept

cppwamp/include/cppwamp/internal/client.hpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
122122

123123
virtual void terminate() override
124124
{
125-
using Handler = AsyncTask<std::string>;
126-
setLogHandlers(Handler(), Handler());
125+
setSessionHandlers({}, {}, {}, {});
127126
pendingInvocations_.clear();
128127
timeoutScheduler_->clear();
129128
this->close(true);
@@ -393,16 +392,16 @@ class Client : public ClientInterface, public Peer<TCodec, TTransport>
393392
this->sendError(WampMsgType::invocation, reqId, std::move(failure));
394393
}
395394

396-
virtual void setLogHandlers(AsyncTask<std::string> warningHandler,
397-
AsyncTask<std::string> traceHandler) override
395+
virtual void setSessionHandlers(
396+
AsyncTask<std::string> warningHandler,
397+
AsyncTask<std::string> traceHandler,
398+
AsyncTask<SessionState> stateChangeHandler,
399+
AsyncTask<Challenge> challengeHandler) override
398400
{
399401
warningHandler_ = std::move(warningHandler);
400402
this->setTraceHandler(std::move(traceHandler));
401-
}
402-
403-
virtual void setChallengeHandler(AsyncTask<Challenge> handler) override
404-
{
405-
challengeHandler_ = std::move(handler);
403+
this->setStateChangeHandler(std::move(stateChangeHandler));
404+
challengeHandler_ = std::move(challengeHandler);
406405
}
407406

408407
private:

cppwamp/include/cppwamp/internal/clientinterface.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ class ClientInterface :
7474

7575
virtual void cancel(Cancellation&& cancellation) = 0;
7676

77-
virtual void setLogHandlers(AsyncTask<std::string> warningHandler,
78-
AsyncTask<std::string> traceHandler) = 0;
79-
80-
virtual void setChallengeHandler(AsyncTask<Challenge> handler) = 0;
77+
virtual void setSessionHandlers(AsyncTask<std::string> warningHandler,
78+
AsyncTask<std::string> traceHandler,
79+
AsyncTask<SessionState> stateChangeHandler,
80+
AsyncTask<Challenge> challengeHandler) = 0;
8181
};
8282

8383
inline const Object& ClientInterface::roles()

cppwamp/include/cppwamp/internal/peer.hpp

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,8 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
4545
State state() const {return state_;}
4646

4747
protected:
48-
using Message = WampMessage;
49-
using Handler = std::function<void (std::error_code, Message)>;
50-
using RxHandler = std::function<void (Message)>;
51-
using LogHandler = std::function<void (std::string)>;
48+
using Message = WampMessage;
49+
using Handler = std::function<void (std::error_code, Message)>;
5250

5351
struct RequestOptions
5452
{
@@ -79,7 +77,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
7977
void start()
8078
{
8179
assert(state_ == State::closed || state_ == State::disconnected);
82-
state_ = State::establishing;
80+
setState(State::establishing);
8381

8482
if (!transport_->isStarted())
8583
{
@@ -106,7 +104,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
106104
{
107105
assert(state_ == State::established);
108106

109-
state_ = State::shuttingDown;
107+
setState(State::shuttingDown);
110108
using std::move;
111109
Message msg = { WampMsgType::goodbye,
112110
{0u, move(reason.options({})), move(reason.uri({}))} };
@@ -116,7 +114,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
116114
{
117115
if (!ec)
118116
{
119-
state_ = State::closed;
117+
setState(State::closed);
120118
abortPending(make_error_code(SessionErrc::sessionEnded));
121119
post(handler, make_error_code(ProtocolErrc::success),
122120
move(reply));
@@ -128,7 +126,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
128126

129127
void close(bool terminating)
130128
{
131-
state_ = State::disconnected;
129+
setState(State::disconnected);
132130
abortPending(make_error_code(SessionErrc::sessionEnded), terminating);
133131
transport_->close();
134132
}
@@ -214,6 +212,9 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
214212
void setTraceHandler(AsyncTask<std::string> handler)
215213
{traceHandler_ = std::move(handler);}
216214

215+
void setStateChangeHandler(AsyncTask<State> handler)
216+
{stateChangeHandler_ = std::move(handler);}
217+
217218
template <typename TFunctor>
218219
void post(TFunctor&& fn)
219220
{
@@ -254,6 +255,16 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
254255
using RequestKey = typename Message::RequestKey;
255256
using RequestMap = std::map<RequestKey, RequestRecord>;
256257

258+
void setState(State state)
259+
{
260+
if (state != state_)
261+
{
262+
state_ = state;
263+
if (stateChangeHandler_)
264+
stateChangeHandler_(state);
265+
}
266+
}
267+
257268
RequestId sendMessage(Message& msg, Handler&& handler = nullptr,
258269
RequestOptions opts = {})
259270
{
@@ -412,15 +423,15 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
412423
void processHello(Message&& msg)
413424
{
414425
assert(state_ == State::establishing);
415-
state_ = State::established;
426+
setState(State::established);
416427
auto self = this->shared_from_this();
417428
post(std::bind(&Peer::onInbound, self, std::move(msg)));
418429
}
419430

420431
void processChallenge(Message&& msg)
421432
{
422433
assert(state_ == State::establishing);
423-
state_ = State::authenticating;
434+
setState(State::authenticating);
424435
auto self = this->shared_from_this();
425436
post(std::bind(&Peer::onInbound, self, std::move(msg)));
426437
}
@@ -429,7 +440,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
429440
{
430441
assert((state_ == State::establishing) ||
431442
(state_ == State::authenticating));
432-
state_ = State::established;
443+
setState(State::established);
433444
processWampReply( RequestKey(WampMsgType::hello, msg.requestId()),
434445
std::move(msg) );
435446
}
@@ -438,7 +449,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
438449
{
439450
assert((state_ == State::establishing) ||
440451
(state_ == State::authenticating));
441-
state_ = State::closed;
452+
setState(State::closed);
442453
processWampReply( RequestKey(WampMsgType::hello, msg.requestId()),
443454
std::move(msg) );
444455
}
@@ -447,7 +458,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
447458
{
448459
if (state_ == State::shuttingDown)
449460
{
450-
state_ = State::closed;
461+
setState(State::closed);
451462
processWampReply(msg.requestKey(), std::move(msg));
452463
}
453464
else
@@ -458,7 +469,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
458469
Message reply{ WampMsgType::goodbye,
459470
{0u, Object{}, "wamp.error.goodbye_and_out"} };
460471
send(reply);
461-
state_ = State::closed;
472+
setState(State::closed);
462473
}
463474
}
464475

@@ -515,7 +526,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
515526

516527
void fail(std::error_code ec)
517528
{
518-
state_ = State::failed;
529+
setState(State::failed);
519530
abortPending(ec);
520531
transport_->close();
521532
}
@@ -557,6 +568,7 @@ class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
557568
TransportPtr transport_;
558569
AnyExecutor executor_;
559570
AsyncTask<std::string> traceHandler_;
571+
AsyncTask<State> stateChangeHandler_;
560572
State state_ = State::closed;
561573
RequestMap requestMap_;
562574
RequestId nextRequestId_ = 0;

cppwamp/include/cppwamp/internal/session.ipp

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ CPPWAMP_INLINE const Object& Session::roles()
6363
//------------------------------------------------------------------------------
6464
CPPWAMP_INLINE Session::~Session()
6565
{
66+
stateChangeHandler_.reset();
6667
reset();
6768
}
6869

@@ -72,6 +73,8 @@ CPPWAMP_INLINE AnyExecutor Session::userExecutor() const
7273
return userExecutor_;
7374
}
7475

76+
//------------------------------------------------------------------------------
77+
/** @deprecated Use wamp::Session::userExecutor instead. */
7578
//------------------------------------------------------------------------------
7679
CPPWAMP_INLINE AnyExecutor Session::userIosvc() const
7780
{
@@ -125,6 +128,26 @@ CPPWAMP_INLINE void Session::setTraceHandler(
125128
};
126129
}
127130

131+
//------------------------------------------------------------------------------
132+
/** @note Changes to the state change handler start taking effect when the
133+
session is connecting.
134+
@note No state change events are fired when the session object is
135+
destructing. */
136+
//------------------------------------------------------------------------------
137+
CPPWAMP_INLINE void Session::setStateChangeHandler(
138+
StateChangeHandler handler /**< Function called when
139+
session state changes. */)
140+
{
141+
stateChangeHandler_ = AsyncTask<State>
142+
{
143+
userExecutor_,
144+
[handler](AsyncResult<State> state) {handler(state.get());}
145+
};
146+
}
147+
148+
//------------------------------------------------------------------------------
149+
/** @note Changes to the challenge handler start taking effect when the
150+
session is connecting. */
128151
//------------------------------------------------------------------------------
129152
CPPWAMP_INLINE void Session::setChallengeHandler(
130153
ChallengeHandler handler /**< Function called to handle
@@ -144,7 +167,7 @@ CPPWAMP_INLINE void Session::setChallengeHandler(
144167
//------------------------------------------------------------------------------
145168
/** @details
146169
The session will attempt to connect using the transports that were
147-
specified by the Connector objects passed during create().
170+
specified by the wamp::Connector objects passed during create().
148171
If more than one transport was specified, they will be traversed in the
149172
same order as they appeared in the @ref ConnectorList.
150173
@return The index of the Connector object used to establish the connetion.
@@ -166,7 +189,7 @@ CPPWAMP_INLINE void Session::connect(
166189
assert(!connectors_.empty());
167190
CPPWAMP_LOGIC_CHECK(state() == State::disconnected,
168191
"Session is not disconnected");
169-
state_ = State::connecting;
192+
setState(State::connecting);
170193
isTerminating_ = false;
171194
currentConnector_ = nullptr;
172195
doConnect(0, {userExecutor_, handler});
@@ -261,17 +284,22 @@ CPPWAMP_INLINE void Session::leave(
261284
CPPWAMP_INLINE void Session::disconnect()
262285
{
263286
bool isConnecting = state() == State::connecting;
264-
state_ = State::disconnected;
265287
if (isConnecting)
266288
{
289+
setState(State::disconnected);
267290
currentConnector_->cancel();
268291
}
269292
else if (impl_)
270293
{
294+
// Peer will fire the disconnected state change event.
271295
state_ = State::disconnected;
272296
impl_->disconnect();
273297
impl_.reset();
274298
}
299+
else
300+
{
301+
setState(State::disconnected);
302+
}
275303
}
276304

277305
//------------------------------------------------------------------------------
@@ -286,7 +314,7 @@ CPPWAMP_INLINE void Session::reset()
286314
{
287315
bool isConnecting = state() == State::connecting;
288316
isTerminating_ = true;
289-
state_ = State::disconnected;
317+
setState(State::disconnected);
290318
if (isConnecting)
291319
{
292320
currentConnector_->cancel();
@@ -586,15 +614,15 @@ CPPWAMP_INLINE void Session::doConnect(size_t index, AsyncTask<size_t> handler)
586614
if (ec)
587615
{
588616
if (ec == TransportErrc::aborted)
589-
handler(ec);
617+
std::move(handler)(ec);
590618
else
591619
{
592620
auto newIndex = index + 1;
593621
if (newIndex < connectors_.size())
594622
doConnect(newIndex, handler);
595623
else
596624
{
597-
state_ = State::failed;
625+
setState(State::failed);
598626
if (connectors_.size() > 1)
599627
{
600628
ec = make_error_code(
@@ -607,16 +635,36 @@ CPPWAMP_INLINE void Session::doConnect(size_t index, AsyncTask<size_t> handler)
607635
else
608636
{
609637
assert(impl);
610-
state_ = State::closed;
611-
impl_ = impl;
612-
impl_->setLogHandlers(warningHandler_, traceHandler_);
613-
impl_->setChallengeHandler(challengeHandler_);
614-
std::move(handler)(index);
638+
if (state_ == State::connecting)
639+
{
640+
impl_ = impl;
641+
impl_->setSessionHandlers(
642+
warningHandler_, traceHandler_, stateChangeHandler_,
643+
challengeHandler_);
644+
std::move(handler)(index);
645+
setState(State::closed);
646+
}
647+
else
648+
{
649+
auto ec = make_error_code(TransportErrc::aborted);
650+
std::move(handler)(ec);
651+
}
615652
}
616653
}
617654
});
618655
}
619656

657+
//------------------------------------------------------------------------------
658+
CPPWAMP_INLINE void Session::setState(SessionState state)
659+
{
660+
if (state != state_)
661+
{
662+
state_ = state;
663+
if (stateChangeHandler_)
664+
stateChangeHandler_(state);
665+
}
666+
}
667+
620668
//------------------------------------------------------------------------------
621669
CPPWAMP_INLINE std::shared_ptr<internal::ClientInterface> Session::impl()
622670
{return impl_;}

0 commit comments

Comments
 (0)