Skip to content

Commit 132b51a

Browse files
committed
Overlapping transport sends no longer corrupt RawSocket messages (fixes #117)
1 parent 37c0402 commit 132b51a

File tree

3 files changed

+106
-12
lines changed

3 files changed

+106
-12
lines changed

cppwamp/include/cppwamp/internal/asiotransport.hpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,35 +176,43 @@ class AsioTransport :
176176
"Outgoing message is longer than allowed by peer");
177177

178178
message->prepare(type);
179-
if (txQueue_.empty())
180-
transmit(std::move(message));
181-
else
182-
txQueue_.push(std::move(message));
179+
txQueue_.push(std::move(message));
180+
transmit();
183181
}
184182

185-
void transmit(Buffer message)
183+
private:
184+
void transmit()
186185
{
187-
if (socket_)
186+
if (isReadyToTransmit())
188187
{
188+
txBuffer_ = txQueue_.front();
189+
txQueue_.pop();
190+
189191
auto self = this->shared_from_this();
190-
boost::asio::async_write(*socket_, message->gatherBuffers(),
191-
[this, self, message](AsioErrorCode ec, size_t)
192+
boost::asio::async_write(*socket_, txBuffer_->gatherBuffers(),
193+
[this, self](AsioErrorCode ec, size_t size)
192194
{
195+
txBuffer_.reset();
193196
if (ec)
194197
{
195198
txQueue_ = TransmitQueue();
196199
socket_.reset();
197200
}
198-
else if (!txQueue_.empty())
201+
else
199202
{
200-
auto msg = txQueue_.front();
201-
txQueue_.pop();
202-
transmit(std::move(msg));
203+
transmit();
203204
}
204205
});
205206
}
206207
}
207208

209+
bool isReadyToTransmit() const
210+
{
211+
return socket_ && // Socket is still open
212+
!txBuffer_ && // No async_write is in progress
213+
!txQueue_.empty(); // One or more messages are enqueued
214+
}
215+
208216
void receive()
209217
{
210218
if (socket_)
@@ -336,6 +344,7 @@ class AsioTransport :
336344
Buffer rxBuffer_;
337345
Buffer pingBuffer_;
338346
TransmitQueue txQueue_;
347+
Buffer txBuffer_;
339348
TimePoint pingStart_;
340349
TimePoint pingStop_;
341350
};

test/.crossbar/config.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"type": "tcp",
3939
"port": 12345
4040
},
41+
"max_message_size": 16777216,
4142
"serializers": [
4243
"json"
4344
],
@@ -49,6 +50,7 @@
4950
"type": "unix",
5051
"path": "./udstest"
5152
},
53+
"max_message_size": 16777216,
5254
"serializers": [
5355
"msgpack"
5456
],

test/wamptest.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2644,4 +2644,87 @@ GIVEN( "an IO service and a TCP connector" )
26442644
}
26452645
}}
26462646

2647+
2648+
//------------------------------------------------------------------------------
2649+
SCENARIO( "Outbound Messages are Properly Enqueued", "[WAMP]" )
2650+
{
2651+
GIVEN( "these test fixture objects" )
2652+
{
2653+
using Yield = boost::asio::yield_context;
2654+
2655+
AsioService iosvc;
2656+
auto cnct = tcp(iosvc);
2657+
auto session1 = CoroSession<>::create(iosvc, cnct);
2658+
auto session2 = CoroSession<>::create(iosvc, cnct);
2659+
2660+
WHEN( "an RPC is invoked while a large event payload is being transmitted" )
2661+
{
2662+
auto callee = session1;
2663+
auto subscriber = session2;
2664+
std::string eventString;
2665+
2666+
// Fill large string with repeating character sequence
2667+
std::string largeString(1024*1024, ' ');
2668+
for (size_t i=0; i<largeString.length(); ++i)
2669+
largeString[i] = '0' + (i % 64);
2670+
2671+
auto onEvent = [&iosvc, &eventString](Event, std::string str)
2672+
{
2673+
eventString = std::move(str);
2674+
};
2675+
2676+
// Simple RPC that returns the string argument back to the caller.
2677+
auto echo =
2678+
[callee](Invocation, std::string str) -> Outcome
2679+
{
2680+
return Result({str});
2681+
};
2682+
2683+
// RPC that triggers the publishing of a large event payload
2684+
auto trigger =
2685+
[callee, &largeString] (Invocation) -> Outcome
2686+
{
2687+
callee->publish(Pub("grapevine").withArgs(largeString));
2688+
return Result();
2689+
};
2690+
2691+
boost::asio::spawn(iosvc, [&](boost::asio::yield_context yield)
2692+
{
2693+
callee->connect(yield);
2694+
callee->join(Realm(testRealm), yield);
2695+
callee->enroll(Procedure("echo"), unpackedRpc<std::string>(echo),
2696+
yield);
2697+
callee->enroll(Procedure("trigger"), trigger, yield);
2698+
2699+
subscriber->connect(yield);
2700+
subscriber->join(Realm(testRealm), yield);
2701+
subscriber->subscribe(Topic("grapevine"),
2702+
unpackedEvent<std::string>(onEvent),
2703+
yield);
2704+
2705+
for (int i=0; i<10; ++i)
2706+
{
2707+
// Use async call so that it doesn't block until completion.
2708+
subscriber->call(Rpc("trigger").withArgs("hello"),
2709+
[](AsyncResult<Result>) {});
2710+
2711+
/* Try to get callee to send an RPC response while it's still
2712+
transmitting the large event payload. AsioTransport should
2713+
properly enqueue the RPC response while the large event
2714+
payload is being transmitted. */
2715+
while (eventString.empty())
2716+
subscriber->call(Rpc("echo").withArgs("hello"), yield);
2717+
2718+
CHECK_THAT( eventString, Equals(largeString) );
2719+
eventString.clear();
2720+
}
2721+
callee->disconnect();
2722+
subscriber->disconnect();
2723+
});
2724+
2725+
iosvc.run();
2726+
}
2727+
}
2728+
}
2729+
26472730
#endif // #if CPPWAMP_TESTING_WAMP

0 commit comments

Comments
 (0)