diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cdc0358 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.obj/ +client/client +server/server +simulation/simulation +NOTES.md diff --git a/Makefile b/Makefile index 45c7a77..b536ba2 100644 --- a/Makefile +++ b/Makefile @@ -7,16 +7,6 @@ CXX = g++ LD = g++ EXPAND = lib/tmpl/expand -ERPC_PATH= "./third_party/eRPC" - -#ERPC_CFLAGS_RAW := -I $(ERPC_PATH)/src -DRAW=true -#ERPC_LDFLAGS_RAW := -L $(ERPC_PATH)/build -lerpc -lnuma -ldl -lgflags -libverbs - -DPDK_LIBS := $(shell pkg-config --libs libdpdk) -DPDK_CFLAGS := $(shell pkg-config --cflags libdpdk) -ERPC_CFLAGS_DPDK := -I $(ERPC_PATH)/src -I $(ERPC_PATH)/third_party/asio/include $(DPDK_CFLAGS) -DERPC_DPDK=true -march=native -ERPC_LDFLAGS_DPDK := -L $(ERPC_PATH)/build -lerpc -lnuma -ldl -lgflags -libverbs -lmlx4 -lmlx5 $(DPDK_LIBS) - CFLAGS_WARNINGS:= -Wno-unused-function -Wno-nested-anon-types -Wno-keyword-macro -Wno-uninitialized # -fno-omit-frame-pointer is needed to get accurate flame graphs. See [1] for @@ -27,9 +17,11 @@ CFLAGS := -g -Wall $(CFLAGS_WARNINGS) -iquote.obj/gen -O2 -DNASSERT -fno-omit-fr CXXFLAGS := -g -std=c++11 LDFLAGS := -levent_pthreads -pthread -lboost_fiber -lboost_context -lboost_system -lboost_thread -## Add ERPC flags ## -CFLAGS += $(ERPC_CFLAGS_DPDK) -LDFLAGS += $(ERPC_LDFLAGS_DPDK) +## Add RPC flags ## +RPC_CFLAGS := +RPC_LDFLAGS := -ldl -lgflags -libverbs +CFLAGS += $(RPC_CFLAGS) +LDFLAGS += $(RPC_LDFLAGS) ## Debian package: check ## #CHECK_CFLAGS := $(shell pkg-config --cflags check) @@ -113,6 +105,9 @@ SRCS := # TEST_SRCS is just like SRCS, but these source files will be compiled # with testing related flags. TEST_SRCS := +# SIM_SRCS is just like SRCS, but these source files will be compiled +# with simulation related flags. +SIM_SRCS := # GTEST_SRCS is tests that use Google's testing framework GTEST_SRCS := @@ -127,6 +122,12 @@ BINS := # using the appropriate flags. This is also used as the list of tests # to run for the `test' target. TEST_BINS := +# SIM_BINS is like BINS, but for simulation binaries. They will be linked +# using the appropriate flags. This is also used as the list of tests +# to run for the `sim` target. +# It is similar to test that it uses simulated tranpsort, but you can also +# provide manual inputs to it, as contrast to test +SIM_BINS := # add-CFLAGS is a utility macro that takes a space-separated list of # sources and a set of CFLAGS. It sets the CFLAGS for each provided @@ -147,6 +148,7 @@ include debug/Rules.mk include network/Rules.mk include client/Rules.mk include server/Rules.mk +include simulation/Rules.mk #include replication/common/Rules.mk #include replication/meerkatir/Rules.mk #include replication/leadermeerkatir/Rules.mk @@ -181,7 +183,7 @@ include server/Rules.mk DEPFLAGS = -M -MF ${@:.o=.d} -MP -MT $@ -MG # $(call add-CFLAGS,$(TEST_SRCS),$(CHECK_CFLAGS)) -OBJS := $(SRCS:%.cpp=.obj/%.o) $(TEST_SRCS:%.cpp=.obj/%.o) $(GTEST_SRCS:%.cpp=.obj/%.o) +OBJS := $(SRCS:%.cpp=.obj/%.o) $(TEST_SRCS:%.cpp=.obj/%.o) $(GTEST_SRCS:%.cpp=.obj/%.o) $(SIM_SRCS:%.cpp=.obj/%.o) define compile @mkdir -p $(dir $@) @@ -223,8 +225,9 @@ $(PROTOOBJS:%.o=%-pic.o): .obj/%-pic.o: .obj/gen/%.pb.cc $(PROTOSRCS) # $(call add-LDFLAGS,$(TEST_BINS),$(CHECK_LDFLAGS)) +$(call add-LDFLAGS,$(SIM_BINS),$(CHECK_LDFLAGS)) -$(BINS) $(TEST_BINS): %: +$(BINS) $(TEST_BINS) $(SIM_BINS): %: $(call trace,LD,$@,$(LD) -o $@ $^ $(LDFLAGS) $(LDFLAGS-$@)) # @@ -256,7 +259,7 @@ $(GTEST_MAIN) : .obj/gtest/gtest-all.o .obj/gtest/gtest_main.o .PHONY: clean clean: - $(call trace,RM,binaries,rm -f $(BINS) $(TEST_BINS)) + $(call trace,RM,binaries,rm -f $(BINS) $(TEST_BINS) $(SIM_BINS)) $(call trace,RM,objects,rm -rf .obj) # @@ -270,7 +273,13 @@ print-%: # .PHONY: all -all: $(BINS) +all: + ERPC_PATH= "./third_party/eRPC" + DPDK_LIBS := $(shell pkg-config --libs libdpdk) + DPDK_CFLAGS := $(shell pkg-config --cflags libdpdk) + RPC_CFLAGS := -I $(ERPC_PATH)/src -I $(ERPC_PATH)/third_party/asio/include $(DPDK_CFLAGS) -DERPC_DPDK=true -march=native + RPC_LDFLAGS := -L $(ERPC_PATH)/build -lerpc -lnuma -ldl -lgflags -libverbs -lmlx4 -lmlx5 $(DPDK_LIBS) + $(BINS) $(TEST_BINS:%=run-%): run-%: % $(call trace,RUN,$<,$<) @@ -278,8 +287,12 @@ $(TEST_BINS:%=run-%): run-%: % $(TEST_BINS:%=gdb-%): gdb-%: % $(call trace,GDB,$<,CK_FORK=no gdb $<) +.PHONY: simulation +simulation: $(SIM_BINS) + .PHONY: test test: $(TEST_BINS:%=run-%) + .PHONY: check check: test diff --git a/README.md b/README.md index 115dbc2..1ec0436 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,90 @@ # Be-Tree A simple, reference implementation of a B^e-tree + +## Environments +There are 3 different environments to run the implementation: +* Production +* Simulation +* Testing + +### Production Environment +The production environment is used on production. The RPC implementation of this envrionment uses eRPC. To be able to run on this environment, you need to build the eRPC, that is located under `third_party/eRPC` directory. + +**Ubuntu Prerequisites** +1. This implementation is tested and supported on Ubuntu OS. +2. Install the necessary Ubuntu packages to be able to run the implementation +``` +sudo apt install make +sudo apt install cmake +sudo apt install libssl-dev +sudo apt install libevent-dev +sudo apt install libboost-all-dev +sudo apt install libgtest-dev +sudo apt-get install libgflags-dev +sudo apt-get install libgoogle-glog-dev +sudo apt install g++ +sudo apt-get install -y pkg-config +``` + +**Setup** +1. Setup and do the necessary installations of the Ubuntu environment as specified above +2. Build eRPC. For instructions to build eRPC, refer to the `third_party/eRPC` directory. +2. Do `make` to compile and build the code +3. Do `./client/client` for the client machine to run the client code +4. Do `./server/server --configFile== --serverIndex==` + * An example configuration file is the file `config.txt` in the root directory. In this configuration file, there are 1 server spawned on one machine. In this example file, + * Server index 0 indicates that this server instance will occupy the port 38450 +5. Enter the file system operations from the client side as needed. + +### Simulation Environment +The simulation environment tries to simulate the production environment and hence is similar to the production environment. The interactivity and custom input ability of file system operations should be maintained for this environment. + +**Implementation Details** + +The main difference between this environment and the production environment is that it does not use the eRPC library, it simulates the transportation of message between the client and the server using a 'simulated transportation'. + +In the 'simulated transportation' implementation, there is a single transport object that is shared between the client and the server. Since the client and the server is actually located in the same runtime environment, the transport object is shared using variable sharing. + +The image below illustrates the difference between the production and simulation environment. + +![](assets/environment-comparison.png) + +In this environment, +* the number of server threads +* the number of client threads +* all requests from all clients will always be directed to one of the server threads throughout the whole simulation. Hence, there is actually no need to spawn more than one server threads. +* all requests from all clients will be handled one-by-one by that single server thread. Hence, it is not suitable for benchmarking. + +**Ubuntu Prerequisites** + +1. This implementation is tested and supported on Ubuntu OS. +2. Install the necessary Ubuntu packages to be able to run the implementation +``` +sudo apt install make +sudo apt install cmake +sudo apt install libssl-dev +sudo apt install libevent-dev +sudo apt install libboost-all-dev +sudo apt install libgtest-dev +sudo apt-get install libgflags-dev +sudo apt-get install libgoogle-glog-dev +sudo apt install g++ +sudo apt-get install -y pkg-config +``` + +**Setup** + +1. Do `make simulation` to compile and build the simulation code. +2. Do `./simulation/simulation [--numClientThreadsSim= --numServerThreadsSim=` to run both the server and the client code. + * You can optionally specify the number of server threads or number of client threads to be spawned in this simulation environment. If these are not specified, its value will be defaulted to 1. +3. Enter the file system operation as needed. + +### Testing Environment +The testing environment is for the purpose of testing the correctness of the implementation. There is no user input expected as the file system operations and its expected output has been defined beforehand. + +This environment is not yet implemented at the moment. + +## APIs +The APIs that are currently supported by the server are: +* `GetNodeId` + * This returns the ID of the node of the server applications in the server that it resides. A single server can have multiple sever applications \ No newline at end of file diff --git a/assets/environment-comparison.png b/assets/environment-comparison.png new file mode 100644 index 0000000..2008923 Binary files /dev/null and b/assets/environment-comparison.png differ diff --git a/client/storage_client.cpp b/client/storage_client.cpp index bdfee90..2c8ab83 100644 --- a/client/storage_client.cpp +++ b/client/storage_client.cpp @@ -7,6 +7,7 @@ //#include //#include +#include #include using namespace std; diff --git a/client/storage_client.hpp b/client/storage_client.hpp index e7b5694..7ceb502 100644 --- a/client/storage_client.hpp +++ b/client/storage_client.hpp @@ -2,8 +2,8 @@ #ifndef _STORAGE_CLIENT_H_ #define _STORAGE_CLIENT_H_ -#include "network/fasttransport.hpp" #include "network/configuration.hpp" +#include "network/transport.hpp" struct nodeid_t { uint32_t serverIdx; diff --git a/common/gflags.hpp b/common/gflags.hpp index 571aed3..819b987 100644 --- a/common/gflags.hpp +++ b/common/gflags.hpp @@ -5,6 +5,8 @@ // Defines all the command line flags shared between both // severs and clients +DEFINE_uint32(numServerThreadsSim, 1, "The number of server threads in the simulation environment"); +DEFINE_uint32(numClientThreadsSim, 1, "The number of client threads in the simulation environment"); DEFINE_string(configFile, "", "Path to the configuration file"); DEFINE_string(backingStoreDir, "", "Path to the backing store directory"); DEFINE_string(benchmark, "", "Benchmark mode (benchmark-, mode = upserts|queries)"); diff --git a/network/Rules.mk b/network/Rules.mk index 21da991..ac69568 100644 --- a/network/Rules.mk +++ b/network/Rules.mk @@ -1,9 +1,11 @@ d := $(dir $(lastword $(MAKEFILE_LIST))) -SRCS += $(addprefix $(d), transport.cpp fasttransport.cpp configuration.cpp) +SRCS += $(addprefix $(d), transport.cpp simtransport.cpp fasttransport.cpp configuration.cpp) LIB-configuration := $(o)configuration.o $(LIB-debug) LIB-transport := $(o)transport.o $(LIB-configuration) $(LIB-debug) LIB-fasttransport := $(o)fasttransport.o $(LIB-transport) $(LIB-debug) + +LIB-simtransport := $(LIB-transport) $(LIB-debug) $(o)simtransport.o diff --git a/network/app_mem_pool.cpp b/network/app_mem_pool.cpp new file mode 100644 index 0000000..03ccc3c --- /dev/null +++ b/network/app_mem_pool.cpp @@ -0,0 +1,29 @@ +// A basic mempool for preallocated objects of type T. eRPC has a faster, +// hugepage-backed one. +template class AppMemPool { + public: + size_t num_to_alloc = 1; + std::vector backing_ptr_vec; + std::vector pool; + + void extend_pool() { + T *backing_ptr = new T[num_to_alloc]; + for (size_t i = 0; i < num_to_alloc; i++) pool.push_back(&backing_ptr[i]); + backing_ptr_vec.push_back(backing_ptr); + num_to_alloc *= 2; + } + + T *alloc() { + if (pool.empty()) extend_pool(); + T *ret = pool.back(); + pool.pop_back(); + return ret; + } + + void free(T *t) { pool.push_back(t); } + + AppMemPool() {} + ~AppMemPool() { + for (T *ptr : backing_ptr_vec) delete[] ptr; + } +}; diff --git a/network/configuration.cpp b/network/configuration.cpp index 299c3e3..21bd94b 100644 --- a/network/configuration.cpp +++ b/network/configuration.cpp @@ -27,6 +27,11 @@ ServerAddress::operator<(const ServerAddress &other) const { return this_t < other_t; } +Configuration::Configuration() { + multicastAddress = new ServerAddress("", ""); + n = 1; +} + Configuration::Configuration(const Configuration &c) : n(c.n), servers(c.servers), hasMulticast(c.hasMulticast) { diff --git a/network/configuration.hpp b/network/configuration.hpp index 8cb1b9f..1242c0b 100644 --- a/network/configuration.hpp +++ b/network/configuration.hpp @@ -38,6 +38,10 @@ struct ServerAddress class Configuration { public: + /* Empty constructor for dummy configuration in simulation environment */ + Configuration(); + + /* Non-empty constructors for real configuration */ Configuration(const Configuration &c); Configuration(int n, // int f, diff --git a/network/fasttransport.hpp b/network/fasttransport.hpp index db89c87..8267abc 100644 --- a/network/fasttransport.hpp +++ b/network/fasttransport.hpp @@ -56,36 +56,6 @@ struct req_tag_t { TransportReceiver *src; }; -// A basic mempool for preallocated objects of type T. eRPC has a faster, -// hugepage-backed one. -template class AppMemPool { - public: - size_t num_to_alloc = 1; - std::vector backing_ptr_vec; - std::vector pool; - - void extend_pool() { - T *backing_ptr = new T[num_to_alloc]; - for (size_t i = 0; i < num_to_alloc; i++) pool.push_back(&backing_ptr[i]); - backing_ptr_vec.push_back(backing_ptr); - num_to_alloc *= 2; - } - - T *alloc() { - if (pool.empty()) extend_pool(); - T *ret = pool.back(); - pool.pop_back(); - return ret; - } - - void free(T *t) { pool.push_back(t); } - - AppMemPool() {} - ~AppMemPool() { - for (T *ptr : backing_ptr_vec) delete[] ptr; - } -}; - // eRPC context passed between request and responses class AppContext { public: @@ -118,6 +88,7 @@ class AppContext { class FastTransport : public Transport { public: + // TODO: I think we can remove the ip and phy_port argument here since we can get it from config? FastTransport(const network::Configuration &config, std::string &ip, int nthreads, diff --git a/network/simtransport.cpp b/network/simtransport.cpp new file mode 100644 index 0000000..39f9ebe --- /dev/null +++ b/network/simtransport.cpp @@ -0,0 +1,163 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * simtransport.cc: + * simulated message-passing interface for testing use + * + * Reference: 2013-2016 Dan R. K. Ports + * + * Difference between simulated transport and actual transport + * - In the actual transport, each client client thread and each server thread + * has 1 transport object. Server can choose to handle multithreading by spawning + * multiple transport threads. + * In this case, it is enough for each transport object to have 1 request handled + * at any point of time. + * - In simulated transport, this transport object is shared between the client and the server. + * In this case, in + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#include +#include + +#include "debug/assert.hpp" +#include "network/simtransport.hpp" +#include + +namespace network { + +static std::mutex simtransport_client_lock; +static std::mutex simtransport_server_lock; + +SimTransport::SimTransport( + const network::Configuration &config, + uint8_t id +) +: config(config), id(id) +{ + c = new SimAppContext(); + c->client.is_ready = false; +} + +SimTransport::~SimTransport() +{ +} + +void SimTransport::Register(TransportReceiver *receiver, int receiverIdx) +{ + // Only register server + // Client registration is invalid + if (receiverIdx > -1) { + c->server.receiver = receiver; + this->receiverIdx = receiverIdx; + } +} + +int SimTransport::MAX_DATA_PER_PKT = 16384; + +// Used when the client wants to create a request +char *SimTransport::GetRequestBuf(size_t reqLen, size_t respLen) +{ + // create a new request tag + simtransport_client_lock.lock(); + if (reqLen == 0) + reqLen = SimTransport::MAX_DATA_PER_PKT; + if (respLen == 0) + respLen = SimTransport::MAX_DATA_PER_PKT; + c->client.crt_req_tag = c->client.req_tag_pool.alloc(); + c->client.crt_req_tag->req_msgbuf = new char[reqLen]; + c->client.crt_req_tag->resp_msgbuf = new char[respLen]; + return c->client.crt_req_tag->req_msgbuf; +} + +int SimTransport::GetSession(TransportReceiver *src, uint8_t replicaIdx, uint8_t dstRpcIdx) +{ + return -1; +} + +void SimTransport::Run() +{ + /** + The run is supposed to be in the main thread + It picks up request that is allpocated by the client + And then it acts as the server caller + This way, it doesnt force the server to handle multithreadedness + If the server wants to be multithreaded, they can run several of this tranpost + We don't even need a queue here, because the initial design is that server doesnt even need to be multithreaded + But we can extend it to be a queue if the server wants itself to be multithreaded + **/ + while(!stop) { + // if c.client is not null -> will try to handle this request + // call the server + simtransport_server_lock.lock(); + if (c->client.is_ready) { + c->server.receiver->ReceiveRequest( + c->client.crt_req_tag->reqType, + c->client.crt_req_tag->req_msgbuf, + c->client.crt_req_tag->resp_msgbuf + ); + c->client.is_ready = false; + } + simtransport_server_lock.unlock(); + } +} + +void SimTransport::Stop() { + Debug("Stopping transport!"); + stop = true; +} + +bool SimTransport::SendRequestToServer(TransportReceiver *src, uint8_t reqType, uint32_t serverIdx, uint8_t dstRpcIdx, size_t msgLen) { + // Mutex so that only 1 client's request can be handled at any point + c->client.crt_req_tag->src = src; + c->client.crt_req_tag->reqType = reqType; + c->client.is_ready = true; + while (src->Blocked()) { + if (c.server.is_ready) { + sim_req_tag_t *tag = c->client.crt_req_tag; + tag->src->ReceiveResponse(tag->reqType, tag->resp_msgbuf); + c->server.is_ready = false; + } + boost::this_fiber::yield(); + } + simtransport_client_lock.unlock(); + return true; +} + +bool SimTransport::SendRequestToAllServers(TransportReceiver *src, uint8_t reqType, uint8_t dstRpcIdx, size_t msgLen) { + SendRequestToServer(src, reqType, 0, dstRpcIdx, msgLen); + return true; +} + +bool SimTransport::SendResponse(uint64_t reqHandleIdx, size_t msgLen) { + Debug("Sent response, msgLen = %lu\n", msgLen); + c->server.is_ready = true; + return true; +} + +bool SimTransport::SendResponse(size_t msgLen) { + Debug("Sent response, msgLen = %lu\n", msgLen); + c->server.is_ready = true; + return true; +} +} diff --git a/network/simtransport.hpp b/network/simtransport.hpp new file mode 100644 index 0000000..3f9fbdf --- /dev/null +++ b/network/simtransport.hpp @@ -0,0 +1,138 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +/*********************************************************************** + * + * simtransport.h: + * simulated message-passing interface for testing use + * + * Reference: 2013-2016 Dan R. K. Ports + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + **********************************************************************/ + +#ifndef _NETWORK_SIMTRANSPORT_H_ +#define _NETWORK_SIMTRANSPORT_H_ + +#include "network/configuration.hpp" +#include "network/transport.hpp" +#include "network/app_mem_pool.cpp" + +#include +#include + +#define MULTIPLE_ACTIVE_REQUESTS false + +namespace network { + +// A tag attached to every request we send; +// it is passed to the response function +struct sim_req_tag_t +{ + char *req_msgbuf; + char *resp_msgbuf; + uint8_t reqType; + TransportReceiver *src; +}; + +class SimRpc { + public: + void free_msg_buffer(sim_req_tag_t* req_tag) + { + delete[] req_tag->req_msgbuf; + delete[] req_tag->resp_msgbuf; + } + + void resize_msg_buffer(char* msgBuf, int newMsgLen) { + msgBuf = new char[newMsgLen]; + } + + char* alloc_msg_buffer(int msgLen) { + return new char[msgLen]; + } + +}; + +class SimAppContext +{ + public: + struct + { + sim_req_tag_t *crt_req_tag; + AppMemPool req_tag_pool; + bool is_ready; + } client; + + struct + { + // current req_handle + TransportReceiver *receiver = nullptr; + bool is_ready; + } server; + + // common to both servers and clients + SimRpc *rpc = new SimRpc(); +}; + +class SimTransport : public Transport +{ + public: + static int MAX_DATA_PER_PKT; + bool isResponseCompleted; + SimTransport( + const network::Configuration &config, + uint8_t id); + virtual ~SimTransport(); + void Register(TransportReceiver *receiver, int replicaIdx) override; + void Run(); + void Wait(); + void Stop(); + + bool SendRequestToServer(TransportReceiver *src, uint8_t reqType, uint32_t serverIdx, uint8_t dstRpcIdx, size_t msgLen) override; + bool SendRequestToAllServers(TransportReceiver *src, uint8_t reqType, uint8_t dstRpcIdx, size_t msgLen) override; + bool SendResponse(uint64_t reqHandleIdx, size_t msgLen) override; + bool SendResponse(size_t msgLen) override; + char *GetRequestBuf(size_t reqLen, size_t respLen) override; + int GetSession(TransportReceiver *src, uint8_t replicaIdx, uint8_t dstRpcIdx) override; + + uint8_t GetID() override { return id; }; + + private: + // Configuration containing the ids of the servers + network::Configuration config; + + SimAppContext *c; + + // Number of server threads + int nthreads; + + // This corresponds to the thread id of the current server + // used as the RPC id, must be unique per transport thread + uint8_t id; + + // This corresponds to the machine index + // Index of the receiver (if -1 then the receiver is a client that + // does not get requests, otherwise it is a server from the configuration) + int receiverIdx; + + bool stop = false; + }; +} +#endif // _NETWORK_SIMTRANSPORT_H_ diff --git a/server/storage_server.cpp b/server/storage_server.cpp index 05d8b28..b38bee8 100644 --- a/server/storage_server.cpp +++ b/server/storage_server.cpp @@ -10,11 +10,11 @@ using namespace std; StorageServerApp::StorageServerApp() : current_id(0) { - + current_id++; } uint32_t StorageServerApp::GetNodeId() { - return current_id++; + return current_id; } StorageServer::StorageServer(network::Configuration config, int myIdx, @@ -43,7 +43,7 @@ void StorageServer::ReceiveRequest(uint8_t reqType, char *reqBuf, char *respBuf) HandleEvictNode(reqBuf, respBuf, respLen); break; default: - Warning("Unrecognized rquest type: %d", reqType); + Warning("Unrecognized request type: %d", reqType); } // For every request, we need to send a response (because we use eRPC) diff --git a/simulation/Rules.mk b/simulation/Rules.mk new file mode 100644 index 0000000..804c0fb --- /dev/null +++ b/simulation/Rules.mk @@ -0,0 +1,10 @@ +d := $(dir $(lastword $(MAKEFILE_LIST))) + +SIM_SRCS := $(d)simulation.cpp + +$(d)simulation: $(o)simulation.o \ + $(OBJS-swap_space) $(OBJS-backing_store) \ + $(LIB-simtransport) $(OBJS-storage_server) \ + $(OBJS-storage_client) + +SIM_BINS += $(d)simulation diff --git a/simulation/simulation.cpp b/simulation/simulation.cpp new file mode 100644 index 0000000..ef110d0 --- /dev/null +++ b/simulation/simulation.cpp @@ -0,0 +1,64 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +#include +#include +#include +#include +#include +#include +#include + +#include "network/configuration.hpp" +#include "network/transport.hpp" +#include "network/simtransport.hpp" + +#include "common/gflags.hpp" +#include "server/storage_server.hpp" +#include "client/storage_client.hpp" + +using namespace std; + +void client_thread_func(StorageClient *sc) +{ + string request; + nodeid_t result = sc->GetNodeId(0, 0, request); + printf("This is the server's node: %d\n", result.nodeIdx); +} + +void server_thread_func(network::SimTransport *transport) +{ + transport->Run(); +} + +int main(int argc, char **argv) +{ + network::Configuration config; + + /* Client and server thread creation */ + network::SimTransport *transport = new network::SimTransport(config, 0); + + gflags::ParseCommandLineFlags(&argc, &argv, true); + int servers = FLAGS_numServerThreadsSim; + int clients = FLAGS_numClientThreadsSim; + + // Server thread creation + std::vector server_thread_arr(servers); + for (uint8_t i = 0; i < servers; i++) + { + StorageServer *ss = new StorageServer(config, i, transport, new StorageServerApp()); + server_thread_arr[i] = std::thread(server_thread_func, transport); + } + + // Client thread creation + std::vector client_thread_arr(clients); + for (uint8_t i = 0; i < clients; i++) + { + StorageClient *sc = new StorageClient(config, transport); + client_thread_arr[i] = std::thread(client_thread_func, sc); + } + + /* Blocking join, waits indefinitely */ + for (auto &server_thread : server_thread_arr) + server_thread.join(); + + return 0; +}