diff --git a/.gitignore b/.gitignore index 7bb75ec..e990abf 100644 --- a/.gitignore +++ b/.gitignore @@ -277,3 +277,4 @@ FakesAssemblies/ **/*.Server/GeneratedArtifacts **/*.Server/ModelManifest.xml _Pvt_Extensions +/nbproject/ \ No newline at end of file diff --git a/README.md b/README.md index 17b9d4b..8ec9c2e 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,34 @@ Currently, the only dependency is rapidjson (https://github.com/miloyip/rapidjso Another advantage of removing the dependencies is that now it is easy to compile and use on most platforms that support c++11, without much work. +## Supporting Asynchronous Calls + +Exposing methods which do not block the server is possible on the basis of [C++ Futures](http://www.modernescpp.com/index.php/component/content/article/44-blog/multithreading/multithreading-c-17-and-c-20/279-std-future-extensions?Itemid=239) Extension Concurrency TS which are already available via the [Boost Futures v.4implementation](https://www.boost.org/doc/libs/1_67_0/doc/html/thread/synchronization.html#thread.synchronization.futures). The idea is that methods which take longer to complete return a `future` of the result which is later collected and replied with in a continuation method `.then()`. This way the JSON-RPC server does not block it's thread while computing the response to a call. + + +The future should carry a type convertible to `jsonrpc::Value`. The non-blocking operation of the server can be reached by replacing `Server::HandleRequest` with `Server::asyncHandleRequest` and processing the response' `FormattedData` in the future callback: + +```C++ +std::string incomingRequest; +ioStream >> incomingRequest; // read from a data source +server.asyncHandleRequest(incomingRequest) +.then([=](auto futureDataPtr){ + iostream << futureDataPtr.get()->GetData(); // write to the data sink +}); +``` + +This can be done in general for all incoming requests, because the implementation of `Server::asyncHandleRequest()` can deal with synchronous plain-value-returning methods too. Except for [Lambda methods](#asynchronous-lambdas) no other changes are required by the implementation. + +### Asynchronous Lambdas + +For now rvalue references of asynchronous lambda are not supported and need to be wrapped with `std::function`. Also these `std::functions`s are not properly disassembled by the template machinery in `Dispatcher::AddMethod` and thus need to register via a custom method `Dispatcher::AddAsyncLambda`. + +Asynchronous free static functions and member methods should register to the dispatcher in the same manner as the synchronous versions by `Dispatcher::AddMethod`. + +### Additional Dependencies + +Until the [TS for Extensions for Concurrency](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4107.html) are implemented by C++2a, asynchronous call handling depends on it's implementation in [Boost Thread](https://www.boost.org/doc/libs/1_67_0/doc/html/thread.html) v. 4. Thus building and linking to `boost_thread` (with `BOOST_THREAD_VERSION=4`) and `boost_system` (a dependency) is required. Linking to POSIX threads or the Windows counterpart is also needed for multi-threaded programs. + ## Examples A simple server that process JSON-RPC requests: @@ -26,6 +54,12 @@ public: return a + b; } + boost::future AsyncAddInt(int a, int b) const { + return boost::async([](auto a, auto b){ + return a + b; + }, a, b); + } + int64_t AddArray(const jsonrpc::Value::Array& a) { return std::accumulate(a.begin(), a.end(), int64_t(0), [](const int64_t& a, const jsonrpc::Value& b) { return a + b.AsInteger32(); }); @@ -49,55 +83,86 @@ void PrintNotification(const std::string& a) { } int main() { - Math math; - jsonrpc::Server server; - - jsonrpc::JsonFormatHandler jsonFormatHandler; - server.RegisterFormatHandler(jsonFormatHandler); + Math math; + jsonrpc::Server server; + + jsonrpc::JsonFormatHandler jsonFormatHandler; + server.RegisterFormatHandler(jsonFormatHandler); + + auto& dispatcher = server.GetDispatcher(); + // if it is a member method, you must use this 3 parameter version, passing an instance of an object that implements it + dispatcher.AddMethod("add", &Math::Add, math); + dispatcher.AddMethod("async_add_int", &Math::AsyncAddInt, math) + dispatcher.AddMethod("add_array", &Math::AddArray, math); + + // if it is just a regular function (non-member or static), you can you the 2 parameter AddMethod + dispatcher.AddMethod("concat", &Concat); + dispatcher.AddMethod("to_struct", &ToStruct); + dispatcher.AddMethod("print_notification", &PrintNotification); + + std::function(std::string)> sReverse = [](std::string in) -> boost::future { + std::string res; + return boost::make_ready_future(res.assign(in.rbegin(), in.rend())); + }; + dispatcher.AddAsyncLambda("async_reverse", sReverse); + + // on a real world, these requests come from your own transport implementation (sockets, http, ipc, named-pipes, etc) + const char addRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add\",\"id\":0,\"params\":[3,2]}"; + const char addIntAsyncRequest[] = R"({"jsonrpc":"2.0","method":"async_add_int","id":11,"params":[300,200]})"; + const char concatRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"concat\",\"id\":1,\"params\":[\"Hello, \",\"World!\"]}"; + const char addArrayRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add_array\",\"id\":2,\"params\":[[1000,2147483647]]}"; + const char toStructRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"to_struct\",\"id\":5,\"params\":[[12,\"foobar\",[12,\"foobar\"]]]}"; + const char printNotificationRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"print_notification\",\"params\":[\"This is just a notification, no response expected!\"]}"; + const char asyncReverseRequest[] = R"({"jsonrpc":"2.0","method":"async_reverse","id":13,"params":["xyz"]})"; + + std::shared_ptr outputFormattedData; + std::cout << "request: " << addRequest << std::endl; + outputFormattedData = server.HandleRequest(addRequest); + std::cout << "response: " << outputFormattedData->GetData() << std::endl; + + outputFormattedData.reset(); + std::cout << "request: " << concatRequest << std::endl; + outputFormattedData = server.HandleRequest(concatRequest); + std::cout << "response: " << outputFormattedData->GetData() << std::endl; + + outputFormattedData.reset(); + std::cout << "request: " << addArrayRequest << std::endl; + outputFormattedData = server.HandleRequest(addArrayRequest); + std::cout << "response: " << outputFormattedData->GetData() << std::endl; + + outputFormattedData.reset(); + std::cout << "request: " << toStructRequest << std::endl; + outputFormattedData = server.HandleRequest(toStructRequest); + std::cout << "response: " << outputFormattedData->GetData() << std::endl; + + outputFormatedData.reset(); + std::cout << "request: " << printNotificationRequest << std::endl; + outputFormatedData = server.HandleRequest(printNotificationRequest); + std::cout << "response size: " << outputFormatedData->GetSize() << std::endl; + + std::cout << "test async wrapper around sync\nrequest: " << addRequest << std::endl; + server.asyncHandleRequest(addRequest) + .then([](boost::shared_future> futureDataPtr) + { + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; // {"jsonrpc":"2.0","id":0,"result":5} + }); - auto& dispatcher = server.GetDispatcher(); - // if it is a member method, you must use this 3 parameter version, passing an instance of an object that implements it - dispatcher.AddMethod("add", &Math::Add, math); - dispatcher.AddMethod("add_array", &Math::AddArray, math); - - // if it is just a regular function (non-member or static), you can you the 2 parameter AddMethod - dispatcher.AddMethod("concat", &Concat); - dispatcher.AddMethod("to_struct", &ToStruct); - dispatcher.AddMethod("print_notification", &PrintNotification); - - // on a real world, these requests come from your own transport implementation (sockets, http, ipc, named-pipes, etc) - const char addRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add\",\"id\":0,\"params\":[3,2]}"; - const char concatRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"concat\",\"id\":1,\"params\":[\"Hello, \",\"World!\"]}"; - const char addArrayRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add_array\",\"id\":2,\"params\":[[1000,2147483647]]}"; - const char toStructRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"to_struct\",\"id\":5,\"params\":[[12,\"foobar\",[12,\"foobar\"]]]}"; - const char printNotificationRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"print_notification\",\"params\":[\"This is just a notification, no response expected!\"]}"; - - std::shared_ptr outputFormattedData; - std::cout << "request: " << addRequest << std::endl; - outputFormattedData = server.HandleRequest(addRequest); - std::cout << "response: " << outputFormattedData->GetData() << std::endl; - - outputFormattedData.reset(); - std::cout << "request: " << concatRequest << std::endl; - outputFormattedData = server.HandleRequest(concatRequest); - std::cout << "response: " << outputFormattedData->GetData() << std::endl; - - outputFormattedData.reset(); - std::cout << "request: " << addArrayRequest << std::endl; - outputFormattedData = server.HandleRequest(addArrayRequest); - std::cout << "response: " << outputFormattedData->GetData() << std::endl; - - outputFormattedData.reset(); - std::cout << "request: " << toStructRequest << std::endl; - outputFormattedData = server.HandleRequest(toStructRequest); - std::cout << "response: " << outputFormattedData->GetData() << std::endl; - - outputFormatedData.reset(); - std::cout << "request: " << printNotificationRequest << std::endl; - outputFormatedData = server.HandleRequest(printNotificationRequest); - std::cout << "response size: " << outputFormatedData->GetSize() << std::endl; + std::cout << "request: " << addIntAsyncRequest << std::endl; + server.asyncHandleRequest(addIntAsyncRequest) + .then([](boost::shared_future> futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; // {"jsonrpc":"2.0","id":11,"result":500} + }); + + std::cout << "request: " << asyncReverseRequest << std::endl; + server.asyncHandleRequest(asyncReverseRequest) + .then([](auto futureDataPtr){ // can use auto parameter type in C++14 + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; // {"jsonrpc":"2.0","id":13,"result":"zyx"} + }); + + // Sleep in the main thread to allow the threaded requests to be processed. + boost::this_thread::sleep_for(boost::chrono::seconds(2)); - return 0; + return 0; } ``` diff --git a/examples/Makefile b/examples/Makefile new file mode 100644 index 0000000..c8ded04 --- /dev/null +++ b/examples/Makefile @@ -0,0 +1,119 @@ +## -*- Makefile -*- +## +## User: semmel +## Time: Jul 20, 2018 5:00:41 PM +## Makefile created by Oracle Developer Studio. +## +## This file is generated automatically. +## + + +#### Compiler and tool definitions shared by all build targets ##### +CCC = g++ +CXX = g++ +BASICOPTS = -g -std=gnu++14 +CCFLAGS = $(BASICOPTS) "-Wno-parentheses" +CXXFLAGS = $(BASICOPTS) "-Wno-parentheses" +CCADMIN = + + +# Define the target directories. +TARGETDIR_testserver=GNU-amd64-Linux +TARGETDIR_testserver=GNU-amd64-Linux + + +all: $(TARGETDIR_testserver)/testserver $(TARGETDIR_testserver)/testclient + +## Target: testserver +CPPFLAGS_testserver = \ + -I../include \ + -I../../../../boost_1_67_0 \ + -I../../../../rapidjson/include \ + -DBOOST_CHRONO_HEADER_ONLY +OBJS_testserver = \ + $(TARGETDIR_testserver)/testserver.o +USERLIBS_testserver = +USERLIBS_testserver = $(SYSLIBS_testserver) +DEPLIBS_testserver = +LDLIBS_testserver = $(USERLIBS_testserver) + +LDLIBS := boost_system +LDLIBS += boost_filesystem + +BOOST_SHARED_LIBRARY_INSTALL_PATH = /home/semmel/NetBeansProjects/lib-sources/boost_1_67_0/stage/lib + +# Link or archive +$(TARGETDIR_testserver)/testserver: $(TARGETDIR_testserver) $(OBJS_testserver) $(DEPLIBS_testserver) + $(LINK.cc) $(CCFLAGS_testserver) $(CPPFLAGS_testserver) -o $@ $(OBJS_testserver) -L../../../../boost_1_67_0/stage/lib -lboost_system -lboost_filesystem -lpthread -lboost_thread -Wl,-rpath,$(BOOST_SHARED_LIBRARY_INSTALL_PATH) + + +# Compile source files into .o files +$(TARGETDIR_testserver)/testserver.o: $(TARGETDIR_testserver) testserver.cpp + $(COMPILE.cc) $(CCFLAGS_testserver) $(CPPFLAGS_testserver) -o $@ testserver.cpp + +$(TARGETDIR_testserver)/base64.o: $(TARGETDIR_testserver) base64.cpp + $(COMPILE.cc) $(CCFLAGS_testserver) $(CPPFLAGS_testserver) -o $@ base64.cpp + + + +## Target: testclient +CPPFLAGS_testclient = \ + -I../include \ + -I../../../../boost_1_67_0 \ + -I../../../../rapidjson/include \ + -DBOOST_CHRONO_HEADER_ONLY +OBJS_testclient = \ + $(TARGETDIR_testserver)/testclient.o +USERLIBS_testclient = $(SYSLIBS_testclient) +DEPLIBS_testclient = +LDLIBS_testclient = $(USERLIBS_testclient) + + +# Link or archive +$(TARGETDIR_testserver)/testclient: $(TARGETDIR_testserver) $(OBJS_testclient) $(DEPLIBS_testclient) + $(LINK.cc) $(CCFLAGS_testclient) $(CPPFLAGS_testclient) -o $@ $(OBJS_testclient) $(LDLIBS_testclient) -L../../../../boost_1_67_0/stage/lib -lboost_system -lboost_filesystem -lpthread -lboost_thread -Wl,-rpath,$(BOOST_SHARED_LIBRARY_INSTALL_PATH) + + +# Compile source files into .o files +$(TARGETDIR_testserver)/base64.o: $(TARGETDIR_testserver) base64.cpp + $(COMPILE.cc) $(CCFLAGS_testclient) $(CPPFLAGS_testclient) -o $@ base64.cpp + +$(TARGETDIR_testserver)/testclient.o: $(TARGETDIR_testserver) testclient.cpp + $(COMPILE.cc) $(CCFLAGS_testclient) $(CPPFLAGS_testclient) -o $@ testclient.cpp + + + +#### Clean target deletes all generated files #### +clean: + rm -f \ + $(TARGETDIR_testserver)/testserver \ + $(TARGETDIR_testserver)/testserver.o \ + $(TARGETDIR_testserver)/base64.o \ + $(TARGETDIR_testserver)/testclient \ + $(TARGETDIR_testserver)/base64.o \ + $(TARGETDIR_testserver)/testclient.o \ + $(TARGETDIR_testserver)/libboost_system.so.1.67.0 \ + $(TARGETDIR_testserver)/libboost_filesystem.so.1.67.0 \ + $(TARGETDIR_testserver)/libboost_thread.so.1.67.0 + $(CCADMIN) + rm -f -r $(TARGETDIR_testserver) + + +# Create the target directory (if needed) +$(TARGETDIR_testserver): + mkdir -p $(TARGETDIR_testserver); \ + cd $(TARGETDIR_testserver); \ + ln -s ../../../../../boost_1_67_0/stage/lib/libboost_system.so.1.67.0 libboost_system.so.1.67.0; \ + ln -s ../../../../../boost_1_67_0/stage/lib/libboost_filesystem.so.1.67.0 libboost_filesystem.so.1.67.0; \ + ln -s ../../../../../boost_1_67_0/stage/lib/libboost_thread.so.1.67.0 libboost_thread.so.1.67.0 + +## Target: test_server +runserver: + cd $(TARGETDIR_testserver); \ + LD_LIBRARY_PATH=. ; \ + ./testserver + +# Enable dependency checking +.KEEP_STATE: +.KEEP_STATE_FILE:.make.state.GNU-amd64-Linux + diff --git a/examples/testserver.cpp b/examples/testserver.cpp index 95770ee..bf2e742 100644 --- a/examples/testserver.cpp +++ b/examples/testserver.cpp @@ -17,7 +17,9 @@ // along with this library; if not, write to the Free Software Foundation, // Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +#ifdef _WIN32 #include "stdafx.h" +#endif #include "../include/jsonrpc-lean/jsonformathandler.h" #include "../include/jsonrpc-lean/formathandler.h" @@ -28,12 +30,30 @@ #include #include #include +#include + +#define BOOST_THREAD_VERSION 4 +#include class Math { public: - int Add(int a, int b) { + int Add(int a, int b) const { return a + b; } + + boost::future AsyncAddInt(int a, int b) const { + return boost::async([](auto a, auto b){ + return a + b; + }, a, b); + } + + boost::future AsyncAdd(int a, int b) const { + return boost::async(&Math::Add, this, a, b); + } + + boost::future AsyncAddSizeT(int a, int b) const { + return boost::make_ready_future((int64_t)(size_t)10); + } int64_t AddArray(const jsonrpc::Value::Array& a) { return std::accumulate(a.begin(), a.end(), int64_t(0), @@ -45,6 +65,10 @@ std::string Concat(const std::string& a, const std::string& b) { return a + b; } +boost::future AsyncConcat(const std::string& a, const std::string& b) { + boost::async([](auto a, auto b){ return a + b; }, a, b); +} + jsonrpc::Value ToBinary(const std::string& s) { return jsonrpc::Value(s, true); } @@ -83,7 +107,37 @@ void RunServer() { dispatcher.AddMethod("from_binary", &FromBinary); dispatcher.AddMethod("to_struct", &ToStruct); dispatcher.AddMethod("print_notification", &PrintNotification); + dispatcher.AddMethod("async_add", &Math::AsyncAdd, math); + dispatcher.AddMethod("async_add_size_t", &Math::AsyncAddSizeT, math); + dispatcher.AddMethod("async_add_int", &Math::AsyncAddInt, math); + dispatcher.AddMethod("async_concat", AsyncConcat); + + std::function(std::string)> sReverse = [](std::string in) -> boost::future { + std::string res; + return boost::make_ready_future(res.assign(in.rbegin(), in.rend())); + }; + + dispatcher.AddAsyncLambda("async_reverse", sReverse); + + // EACCESS is 13 (see http://www-numi.fnal.gov/offline_software/srt_public_context/WebDocs/Errors/unix_system_errors.html) + dispatcher.AddMethod("fail", [](){ + throw std::system_error(std::make_error_code(std::errc::permission_denied), "specific error message"); + }); + + std::function()> asyncFail = []() -> boost::future { + return boost::make_exceptional_future(std::system_error( + std::make_error_code(std::errc::permission_denied), + "specific error message") + ); + }; + dispatcher.AddAsyncLambda("asyncFail", asyncFail); + std::function(int)> asyncThrow = [](int x) -> boost::future { + throw std::out_of_range("Exception description from inside an async lambda method."); + return boost::make_ready_future( x * x ); + }; + dispatcher.AddAsyncLambda("asyncThrow", asyncThrow); + dispatcher.GetMethod("add") .SetHelpText("Add two integers") .AddSignature(jsonrpc::Value::Type::INTEGER_32, jsonrpc::Value::Type::INTEGER_32, jsonrpc::Value::Type::INTEGER_32); @@ -91,17 +145,34 @@ void RunServer() { //bool run = true; //dispatcher.AddMethod("exit", [&]() { run = false; }).SetHidden(); - const char addRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add\",\"id\":0,\"params\":[3,2]}"; - const char concatRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"concat\",\"id\":1,\"params\":[\"Hello, \",\"World!\"]}"; + const char addRequest[] = R"({"jsonrpc":"2.0","method":"add","id":0,"params":[3,2]})"; + const char concatRequest[] = R"({"jsonrpc":"2.0","method":"concat","id":1,"params":["Hello, ","33"]})"; const char addArrayRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"add_array\",\"id\":2,\"params\":[[1000,2147483647]]}"; const char toBinaryRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"to_binary\",\"id\":3,\"params\":[\"Hello World!\"]}"; const char toStructRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"to_struct\",\"id\":4,\"params\":[[12,\"foobar\",[12,\"foobar\"]]]}"; const char printNotificationRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"print_notification\",\"params\":[\"This is just a notification, no response expected!\"]}"; + const char addAsyncRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"async_add\",\"id\":10,\"params\":[30,20]}"; + const char addIntAsyncRequest[] = R"({"jsonrpc":"2.0","method":"async_add_int","id":11,"params":[300,200]})"; + const char addIntAsyncToSizeTRequest[] = R"({"jsonrpc":"2.0","method":"async_add_size_t","id":14,"params":[300,200]})"; + const char asyncConcatRequest[] = "{\"jsonrpc\":\"2.0\",\"method\":\"concat\",\"id\":12,\"params\":[\"Hello, \",\"World!\"]}"; + const char asyncReverseRequest[] = R"({"jsonrpc":"2.0","method":"async_reverse","id":13,"params":["xyz"]})"; + const char failRequest[] = R"({"jsonrpc":"2.0","method":"fail","id":20})"; + const char asyncFailRequest[] = R"({"jsonrpc":"2.0","method":"asyncFail","id":21})"; + const char asyncThrowRequest[] = R"({"jsonrpc":"2.0","method":"asyncThrow","id":22, "params":[5]})"; std::shared_ptr outputFormatedData; + std::cout << "request: " << addRequest << std::endl; outputFormatedData = server.HandleRequest(addRequest); std::cout << "response: " << outputFormatedData->GetData() << std::endl; + + std::cout << "test async wrapper around sync request: " << addRequest << std::endl; + server.asyncHandleRequest(addRequest) + .then([](boost::shared_future> futureDataPtr) + { + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + outputFormatedData.reset(); std::cout << "request: " << concatRequest << std::endl; @@ -127,6 +198,55 @@ void RunServer() { std::cout << "request: " << printNotificationRequest << std::endl; outputFormatedData = server.HandleRequest(printNotificationRequest); std::cout << "response size: " << outputFormatedData->GetSize() << std::endl; + + outputFormatedData.reset(); + std::cout << "request: " << failRequest << std::endl; + outputFormatedData = server.HandleRequest(failRequest); + std::cout << "response: " << outputFormatedData->GetData() << std::endl; + + std::cout << "request: " << addAsyncRequest << std::endl; + server.asyncHandleRequest(addAsyncRequest) + .then([](boost::shared_future> futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << addIntAsyncRequest << std::endl; + server.asyncHandleRequest(addIntAsyncRequest) + .then([](boost::shared_future> futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << asyncConcatRequest << std::endl; + server.asyncHandleRequest(asyncConcatRequest) + .then([](boost::shared_future> futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << asyncReverseRequest << std::endl; + server.asyncHandleRequest(asyncReverseRequest) + .then([](auto futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << addIntAsyncToSizeTRequest << std::endl; + server.asyncHandleRequest(addIntAsyncToSizeTRequest) + .then([](boost::shared_future> futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << asyncFailRequest << std::endl; + server.asyncHandleRequest(asyncFailRequest) + .then([](auto futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + std::cout << "request: " << asyncThrowRequest << std::endl; + server.asyncHandleRequest(asyncThrowRequest) + .then([](auto futureDataPtr){ + std::cout << "response: " << futureDataPtr.get()->GetData() << std::endl; + }); + + boost::this_thread::sleep_for(boost::chrono::seconds(2)); } int main() { diff --git a/include/jsonrpc-lean/dispatcher.h b/include/jsonrpc-lean/dispatcher.h index dc0069b..cb2a37a 100644 --- a/include/jsonrpc-lean/dispatcher.h +++ b/include/jsonrpc-lean/dispatcher.h @@ -24,6 +24,9 @@ #include #include +#define BOOST_THREAD_VERSION 4 +#include + namespace jsonrpc { class MethodWrapper { @@ -125,16 +128,55 @@ namespace jsonrpc { } return result.first->second; } + + template MethodWrapper& - //typename std::enable_if>::value && !std::is_member_pointer::value, MethodWrapper>::type& +// typename std::enable_if>::value && +// !std::is_member_pointer::value, MethodWrapper>::type& AddMethod(std::string name, MethodType method) { //static_assert(!std::is_bind_expression::value, // "Use AddMethod with 3 arguments to add member method"); typename StdFunction::value>::Type function(std::move(method)); return AddMethodInternal(std::move(name), std::move(function)); } + + // for static functions returning futures + template + MethodWrapper& AddMethod(std::string name, boost::future(*fun)(ParameterTypes...)) + { + std::function(ParameterTypes...)> function = + [fun](ParameterTypes&&... params) -> boost::shared_future + { + return fun(std::forward(params)...) + .then([](boost::future f) + { + return jsonrpc::Value(f.get()); + }) + .share(); + }; + + return AddMethodInternal(std::move(name), std::move(function)); + } + + // just copyable std::functions + template + MethodWrapper& AddAsyncLambda(std::string name, std::function(ParameterTypes...)> fun) + { + std::function(ParameterTypes...)> function = + [fun](ParameterTypes&&... params) -> boost::shared_future + { + return fun(std::forward(params)...) + .then([](boost::future f) + { + return jsonrpc::Value(f.get()); + }) + .share(); + }; + + return AddMethodInternal(std::move(name), std::move(function)); + } template MethodWrapper& AddMethod(std::string name, Value(T::*method)(const Request::Parameters&), T& instance) { @@ -161,6 +203,63 @@ namespace jsonrpc { }; return AddMethodInternal(std::move(name), std::move(function)); } + + // member methods returning futures + + template + MethodWrapper& AddMethod(std::string name, boost::future(T::*method)(ParameterTypes...), + T& instance) + { + std::function(ParameterTypes...)> function = + [&instance, method](ParameterTypes&&... params) -> boost::shared_future + { + return (instance.*method)(std::forward(params)...) + .then([](boost::future f) + { + return jsonrpc::Value(f.get()); // the outcome must be jsonrpc::Value-compatible + }) + .share(); // convert to SharedFuture which is jsonrpc::Value-compatible + }; + + return AddMethodInternal(std::move(name), std::move(function)); + } + + template + MethodWrapper& AddMethod(std::string name, boost::future(T::*method)(ParameterTypes...), + T& instance) + { + std::function(ParameterTypes...)> function = + [&instance, method](ParameterTypes&&... params) -> boost::shared_future + { + return (instance.*method)(std::forward(params)...) + .then([](boost::future f) + { + f.get(); + return jsonrpc::Value(0); // the outcome must be jsonrpc::Value-compatible + }) + .share(); // convert to SharedFuture which is jsonrpc::Value-compatible + }; + + return AddMethodInternal(std::move(name), std::move(function)); + } + + template + MethodWrapper& AddMethod(std::string name, boost::future(T::*method)(ParameterTypes...) const, + T& instance) + { + std::function(ParameterTypes...)> function = + [&instance, method](ParameterTypes&&... params) -> boost::shared_future + { + return (instance.*method)(std::forward(params)...) + .then([](boost::future f) + { + return jsonrpc::Value(f.get()); + }) + .share(); + }; + + return AddMethodInternal(std::move(name), std::move(function)); + } void RemoveMethod(const std::string& name) { myMethods.erase(name); @@ -177,10 +276,9 @@ namespace jsonrpc { catch (const Fault& fault) { return Response(fault.GetCode(), fault.GetString(), Value(id)); } - catch (const std::out_of_range&) { - InvalidParametersFault fault; - return Response(fault.GetCode(), fault.GetString(), Value(id)); - } + catch (const std::system_error& ex) { + return Response(ex.code().value(), ex.what(), Value(id), ex.code().category().name()); + } catch (const std::exception& ex) { return Response(0, ex.what(), Value(id)); } @@ -209,7 +307,7 @@ namespace jsonrpc { MethodWrapper::Method realMethod = [method](const Request::Parameters& params) -> Value { if (params.size() != sizeof...(ParameterTypes)) { throw InvalidParametersFault(); - } + } return method(params[index].AsType::type>()...); }; return AddMethod(std::move(name), std::move(realMethod)); diff --git a/include/jsonrpc-lean/json.h b/include/jsonrpc-lean/json.h index 3c8af09..c8d4350 100644 --- a/include/jsonrpc-lean/json.h +++ b/include/jsonrpc-lean/json.h @@ -22,6 +22,7 @@ namespace jsonrpc { const char ERROR_NAME[] = "error"; const char ERROR_CODE_NAME[] = "code"; const char ERROR_MESSAGE_NAME[] = "message"; + const char ERROR_DATA_NAME[] = "data"; } // namespace json } // namespace jsonrpc diff --git a/include/jsonrpc-lean/jsonreader.h b/include/jsonrpc-lean/jsonreader.h index 26d9b79..92f0d3f 100644 --- a/include/jsonrpc-lean/jsonreader.h +++ b/include/jsonrpc-lean/jsonreader.h @@ -150,10 +150,12 @@ namespace jsonrpc { return Value(std::move(array)); } case rapidjson::kStringType: { - tm dt; + // dropping support for date string detection because of + // https://github.com/uskr/jsonrpc-lean/issues/8 + /*tm dt; if (util::ParseIso8601DateTime(value.GetString(), dt)) { return Value(dt); - } + }*/ std::string str(value.GetString(), value.GetStringLength()); const bool binary = str.find('\0') != std::string::npos; diff --git a/include/jsonrpc-lean/jsonwriter.h b/include/jsonrpc-lean/jsonwriter.h index 00ae978..979099b 100644 --- a/include/jsonrpc-lean/jsonwriter.h +++ b/include/jsonrpc-lean/jsonwriter.h @@ -95,7 +95,7 @@ namespace jsonrpc { myRequestData->Writer.EndObject(); } - void WriteFault(int32_t code, const std::string& string) override { + void WriteFault(int32_t code, const std::string& string, const std::string& dataString = "") override { myRequestData->Writer.Key(json::ERROR_NAME, sizeof(json::ERROR_NAME) - 1); myRequestData->Writer.StartObject(); @@ -104,6 +104,11 @@ namespace jsonrpc { myRequestData->Writer.Key(json::ERROR_MESSAGE_NAME, sizeof(json::ERROR_MESSAGE_NAME) - 1); myRequestData->Writer.String(string.data(), string.size(), true); + + if (!dataString.empty()) { + myRequestData->Writer.Key(json::ERROR_DATA_NAME, sizeof(json::ERROR_DATA_NAME) - 1); + myRequestData->Writer.String(dataString.data(), dataString.size(), true); + } myRequestData->Writer.EndObject(); } diff --git a/include/jsonrpc-lean/response.h b/include/jsonrpc-lean/response.h index 574a3fb..17f36ab 100644 --- a/include/jsonrpc-lean/response.h +++ b/include/jsonrpc-lean/response.h @@ -9,21 +9,29 @@ #include "value.h" +#define BOOST_THREAD_VERSION 4 +#include + namespace jsonrpc { class Writer; + + typedef std::unique_ptr WriterPtr; + typedef std::shared_ptr FormattedDataPtr; class Response { public: Response(Value value, Value id) : myResult(std::move(value)), myIsFault(false), myFaultCode(0), + myFaultDataString(""), myId(std::move(id)) { } - Response(int32_t faultCode, std::string faultString, Value id) : myIsFault(true), + Response(int32_t faultCode, std::string faultString, Value id, const std::string faultDataString = "") : myIsFault(true), myFaultCode(faultCode), myFaultString(std::move(faultString)), + myFaultDataString(faultDataString), myId(std::move(id)) { } @@ -31,7 +39,7 @@ namespace jsonrpc { writer.StartDocument(); if (myIsFault) { writer.StartFaultResponse(myId); - writer.WriteFault(myFaultCode, myFaultString); + writer.WriteFault(myFaultCode, myFaultString, myFaultDataString); writer.EndFaultResponse(); } else { writer.StartResponse(myId); @@ -40,6 +48,64 @@ namespace jsonrpc { } writer.EndDocument(); } + + boost::future asyncWrite(WriterPtr writer) const + { + writer->StartDocument(); + + if (myIsFault) + { + writer->StartFaultResponse(myId); + writer->WriteFault(myFaultCode, myFaultString); + writer->EndFaultResponse(); + writer->EndDocument(); + return boost::make_ready_future(writer->GetData()); + } + + if (myResult.IsFuture()) + { + Value requestId(myId); + + return myResult.AsFuture() + .then([writer = std::move(writer), myId = std::move(requestId)](boost::shared_future futureResult) + { + try + { + const Value& result = futureResult.get(); + + writer->StartResponse(myId); + result.Write(*writer); + writer->EndResponse(); + } + catch (const std::system_error& ex) { + writer->StartFaultResponse(myId); + writer->WriteFault(ex.code().value(), ex.what(), ex.code().category().name()); + writer->EndFaultResponse(); + } + catch (const std::exception& ex) + { + writer->StartFaultResponse(myId); + writer->WriteFault(0, ex.what()); + writer->EndFaultResponse(); + } + catch (...) { + writer->StartFaultResponse(myId); + writer->WriteFault(0, "unknown error"); + writer->EndFaultResponse(); + } + + writer->EndDocument(); + + return writer->GetData(); + }); + } + + writer->StartResponse(myId); + myResult.Write(*writer); + writer->EndResponse(); + writer->EndDocument(); + return boost::make_ready_future(writer->GetData()); + } Value& GetResult() { return myResult; } bool IsFault() const { return myIsFault; } @@ -85,7 +151,7 @@ namespace jsonrpc { Value myResult; bool myIsFault; int32_t myFaultCode; - std::string myFaultString; + std::string myFaultString, myFaultDataString; Value myId; }; diff --git a/include/jsonrpc-lean/server.h b/include/jsonrpc-lean/server.h index 6c06773..d88ceb1 100644 --- a/include/jsonrpc-lean/server.h +++ b/include/jsonrpc-lean/server.h @@ -22,8 +22,11 @@ #include -namespace jsonrpc { +#define BOOST_THREAD_VERSION 4 +#include +namespace jsonrpc { + class Server { public: Server() {} @@ -76,6 +79,47 @@ namespace jsonrpc { return writer->GetData(); } + + boost::future asyncHandleRequest(const std::string& aRequestData, const std::string& aContentType = "application/json") + { + // first find the correct handler + FormatHandler *fmtHandler = nullptr; + for (auto handler : myFormatHandlers) { + if (handler->CanHandleRequest(aContentType)) { + fmtHandler = handler; + } + } + + if (fmtHandler == nullptr) { + // no FormatHandler able to handle this request type was found + return boost::make_ready_future(std::shared_ptr(new jsonrpc::JsonFormattedData())); + } + + std::unique_ptr writer = fmtHandler->CreateWriter(); + + try { + auto reader = fmtHandler->CreateReader(aRequestData); + Request request = reader->GetRequest(); + reader.reset(); + + // the response type may be a future + auto response = myDispatcher.Invoke(request.GetMethodName(), request.GetParameters(), request.GetId()); + + // if Id is false, this is a notification and we don't have to write a response + if (!response.GetId().IsBoolean() || response.GetId().AsBoolean() != false) + { + // here we return the future of the response outcome processed by the writer + return response.asyncWrite(std::move(writer)); + } + + return boost::make_ready_future(writer->GetData()); + } + catch (const Fault& ex) + { + Response(ex.GetCode(), ex.GetString(), Value()).Write(*writer); + return boost::make_ready_future(writer->GetData()); + } + } private: Dispatcher myDispatcher; std::vector myFormatHandlers; diff --git a/include/jsonrpc-lean/value.h b/include/jsonrpc-lean/value.h index db3fbee..de632c2 100644 --- a/include/jsonrpc-lean/value.h +++ b/include/jsonrpc-lean/value.h @@ -15,6 +15,9 @@ #include #include +#define BOOST_THREAD_VERSION 4 +#include + #include "util.h" #include "fault.h" #include "writer.h" @@ -29,6 +32,7 @@ namespace jsonrpc { typedef tm DateTime; typedef std::string String; typedef std::map Struct; + typedef boost::shared_future SharedFuture; enum class Type { ARRAY, @@ -40,7 +44,8 @@ namespace jsonrpc { INTEGER_64, NIL, STRING, - STRUCT + STRUCT, + FUTURE }; Value() : myType(Type::NIL) {} @@ -79,6 +84,10 @@ namespace jsonrpc { Value(Struct value) : myType(Type::STRUCT) { as.myStruct = new Struct(std::move(value)); } + + Value(SharedFuture value) : myType(Type::FUTURE) { + as.myFuture = new SharedFuture(value); + } ~Value() { Reset(); @@ -127,6 +136,9 @@ namespace jsonrpc { break; case Type::STRUCT: as.myStruct = new Struct(other.AsStruct()); + break; + case Type::FUTURE: + as.myFuture = new SharedFuture(other.AsFuture()); break; } } @@ -159,6 +171,7 @@ namespace jsonrpc { bool IsNil() const { return myType == Type::NIL; } bool IsString() const { return myType == Type::STRING; } bool IsStruct() const { return myType == Type::STRUCT; } + bool IsFuture() const { return myType == Type::FUTURE; } const Array& AsArray() const { if (IsArray()) { @@ -220,9 +233,19 @@ namespace jsonrpc { } throw InvalidParametersFault(); } + + SharedFuture& AsFuture() const { + if (IsFuture()) { + return *as.myFuture; + } + throw InvalidParametersFault(); + } template inline const T& AsType() const; + + template + inline T& AsType(); Type GetType() const { return myType; } @@ -268,6 +291,8 @@ namespace jsonrpc { } writer.EndStruct(); break; + case Type::FUTURE: + throw std::runtime_error("Future of a result cannot be written"); } } @@ -290,6 +315,10 @@ namespace jsonrpc { case Type::STRUCT: delete as.myStruct; break; + + case Type::FUTURE: + delete as.myFuture; + break; case Type::BOOLEAN: case Type::DOUBLE: @@ -309,6 +338,7 @@ namespace jsonrpc { DateTime* myDateTime; String* myString; Struct* myStruct; + SharedFuture* myFuture; struct { double myDouble; int32_t myInteger32; @@ -348,6 +378,10 @@ namespace jsonrpc { template<> inline const Value::Struct& Value::AsType() const { return AsStruct(); } + + template<> inline Value::SharedFuture& Value::AsType() { + return AsFuture(); + } template<> inline const Value& Value::AsType() const { return *this; @@ -411,6 +445,9 @@ namespace jsonrpc { os << '}'; break; } + case Value::Type::FUTURE: + os << ""; + break; } return os; } diff --git a/include/jsonrpc-lean/writer.h b/include/jsonrpc-lean/writer.h index 08707d1..23bdd2e 100644 --- a/include/jsonrpc-lean/writer.h +++ b/include/jsonrpc-lean/writer.h @@ -40,7 +40,7 @@ namespace jsonrpc { virtual void EndResponse() = 0; virtual void StartFaultResponse(const Value& id) = 0; virtual void EndFaultResponse() = 0; - virtual void WriteFault(int32_t code, const std::string& string) = 0; + virtual void WriteFault(int32_t code, const std::string& string, const std::string& dataString = "") = 0; // Values virtual void StartArray() = 0;