diff --git a/CMakeLists.txt b/CMakeLists.txt index f89dd23..b2cdf53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -198,7 +198,10 @@ if(BUILD_TESTING) dual_mpi_test(TESTNAME test_pingpong TIMEOUT ${test_timeout} NAME1 rdv PROCS1 1 EXE1 ./test_pingpong ARGS1 1 NAME2 app PROCS2 1 EXE2 ./test_pingpong ARGS2 0) - + add_exe(test_global_comm ./test_global_comm.cpp) + dual_mpi_test(TESTNAME test_global_comm TIMEOUT ${test_timeout} + NAME1 rdv PROCS1 1 EXE1 ./test_global_comm ARGS1 0 + NAME2 app PROCS2 1 EXE2 ./test_global_comm ARGS2 1) set(isSST 0) add_exe(test_twoClients test_twoClients.cpp) tri_mpi_test(TESTNAME test_twoClients @@ -215,6 +218,7 @@ if(BUILD_TESTING) NAME2 client1 EXE2 ./test_twoClients PROCS2 1 ARGS2 ${isSST} 1 NAME3 rdv EXE3 ./test_twoClients PROCS3 1 ARGS3 ${isSST} -1) endif() + endif(BUILD_TESTING) ## export the library diff --git a/redev_adios_channel.h b/redev_adios_channel.h index 8960fc2..d295aa2 100644 --- a/redev_adios_channel.h +++ b/redev_adios_channel.h @@ -90,20 +90,30 @@ class AdiosChannel { } } template - [[nodiscard]] BidirectionalComm CreateComm(std::string name, MPI_Comm comm) { + [[nodiscard]] BidirectionalComm CreateComm(std::string name, MPI_Comm comm, + CommType ctype){ REDEV_FUNCTION_TIMER; - // TODO, remove s2c/c2s destinction on variable names then use std::move + // TODO, remove s2c/c2s distinction on variable names then use std::move // name - if(comm != MPI_COMM_NULL) { - auto s2c = std::make_unique>(comm, num_client_ranks_, - s2c_engine_, s2c_io_, name); - auto c2s = std::make_unique>(comm, num_server_ranks_, - c2s_engine_, c2s_io_, name); + if (comm != MPI_COMM_NULL) { + std::unique_ptr> s2c, c2s; + switch (ctype) { + case CommType::Ptn: + s2c = std::make_unique>(comm, num_client_ranks_, + s2c_engine_, s2c_io_, name); + c2s = std::make_unique>(comm, num_server_ranks_, + c2s_engine_, c2s_io_, name); + break; + case CommType::Global: + s2c = std::make_unique>(comm, s2c_engine_, s2c_io_, + name); + c2s = std::make_unique>(comm, c2s_engine_, c2s_io_, + name); + break; + } switch (process_type_) { - case ProcessType::Client: - return {std::move(c2s), std::move(s2c)}; - case ProcessType::Server: - return {std::move(s2c), std::move(c2s)}; + case ProcessType::Client: return {std::move(c2s), std::move(s2c)}; + case ProcessType::Server: return {std::move(s2c), std::move(c2s)}; } } return {std::make_unique>(), std::make_unique>()}; diff --git a/redev_bidirectional_comm.h b/redev_bidirectional_comm.h index 60f054c..2cbde8f 100644 --- a/redev_bidirectional_comm.h +++ b/redev_bidirectional_comm.h @@ -42,6 +42,11 @@ template class BidirectionalComm { REDEV_ALWAYS_ASSERT(receiver != nullptr); return receiver->Recv(mode); } + void SetCommParams(std::string &varName, size_t &msgSize) { + REDEV_FUNCTION_TIMER; + sender->SetCommParams(varName, msgSize); + receiver->SetCommParams(varName, msgSize); + } private: std::unique_ptr> sender; diff --git a/redev_channel.h b/redev_channel.h index 6a1f58a..75cf317 100644 --- a/redev_channel.h +++ b/redev_channel.h @@ -20,18 +20,18 @@ class Channel { // than the exact type this function can be used to reduce the runtime // overhead of converting from the variant to the explicit type back to the // variant - template [[nodiscard]] CommV CreateCommV(std::string name, MPI_Comm comm) { + template [[nodiscard]] CommV CreateCommV(std::string name, MPI_Comm comm, CommType ctype) { REDEV_FUNCTION_TIMER; - return pimpl_->CreateComm(std::move(name), comm, + return pimpl_->CreateComm(std::move(name), comm, std::move(ctype), InvCommunicatorTypeMap::value); } // convenience typesafe wrapper to get back the specific communicator type // rather than the variant. This is here to simplify updating legacy code // that expects a typed communicator to be created. template - [[nodiscard]] BidirectionalComm CreateComm(std::string name, MPI_Comm comm) { + [[nodiscard]] BidirectionalComm CreateComm(std::string name, MPI_Comm comm, CommType ctype = CommType::Ptn) { REDEV_FUNCTION_TIMER; - return std::get>(CreateCommV(std::move(name), comm)); + return std::get>(CreateCommV(std::move(name), comm, std::move(ctype))); } void BeginSendCommunicationPhase() { REDEV_FUNCTION_TIMER; @@ -80,7 +80,7 @@ class Channel { private: class ChannelConcept { public: - virtual CommV CreateComm(std::string &&, MPI_Comm, CommunicatorDataType) = 0; + virtual CommV CreateComm(std::string &&, MPI_Comm, CommType, CommunicatorDataType) = 0; virtual void BeginSendCommunicationPhase() = 0; virtual void EndSendCommunicationPhase() = 0; virtual void BeginReceiveCommunicationPhase() = 0; @@ -96,62 +96,62 @@ class Channel { // entirely safe unlike doing it in user code. Although, it's not the most // beautiful construction in the world. ~ChannelModel() noexcept final {} - [[nodiscard]] CommV CreateComm(std::string &&name, MPI_Comm comm, + [[nodiscard]] CommV CreateComm(std::string &&name, MPI_Comm comm, CommType ctype, CommunicatorDataType type) final { REDEV_FUNCTION_TIMER; switch (type) { case CommunicatorDataType::INT8: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::INT16: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::INT32: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::INT64: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::UINT8: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::UINT16: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::UINT32: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::UINT64: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::LONG_INT: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::FLOAT: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::DOUBLE: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::LONG_DOUBLE: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; case CommunicatorDataType::COMPLEX_DOUBLE: return CommV{impl_.template CreateComm< CommunicatorTypeMap::type>( - std::move(name), comm)}; + std::move(name), comm, std::move(ctype))}; } return {}; } @@ -205,7 +205,7 @@ class NoOpChannel { public: template [[nodiscard]] - BidirectionalComm CreateComm(std::string, MPI_Comm) { + BidirectionalComm CreateComm(std::string, MPI_Comm, CommType) { return {std::make_unique>(), std::make_unique>()}; } void BeginSendCommunicationPhase(){} diff --git a/redev_comm.h b/redev_comm.h index e519000..ddf01ea 100644 --- a/redev_comm.h +++ b/redev_comm.h @@ -99,7 +99,7 @@ struct InMessageLayout { size_t start; /** * Number of items (of the user specified type passed to the template - * parameter of AdiosComm) that should be read from the messages array + * parameter of AdiosPtnComm) that should be read from the messages array * (returned by Communicator::Recv). */ size_t count; @@ -131,13 +131,18 @@ class Communicator { */ virtual void Send(T *msgs, Mode mode) = 0; /** - * Receive an array. Use AdiosComm's GetInMessageLayout to retreive + * Receive an array. Use AdiosPtnComm's GetInMessageLayout to retreive * an instance of the InMessageLayout struct containing the layout of * the received array. */ virtual std::vector Recv(Mode mode) = 0; virtual InMessageLayout GetInMessageLayout() = 0; + + virtual void SetCommParams(std::string VarName, size_t msgSize ) { + throw std::logic_error("Communicator::SetCommParams() called — must be overridden in the derived Comm class"); + } + virtual ~Communicator() = default; }; @@ -151,20 +156,20 @@ class NoOpComm : public Communicator { /** - * The AdiosComm class implements the Communicator interface to support sending + * The AdiosPtnComm class implements the Communicator interface to support sending * messages between the clients and server via ADIOS2. The BP4 and SST ADIOS2 * engines are currently supported. - * One AdiosComm object is required for each communication link direction. For + * One AdiosPtnComm object is required for each communication link direction. For * example, for a client and server to both send and receive messages one - * AdiosComm for client->server messaging and another AdiosComm for + * AdiosPtnComm for client->server messaging and another AdiosPtnComm for * server->client messaging are needed. Redev::BidirectionalComm is a helper * class for this use case. */ template -class AdiosComm : public Communicator { +class AdiosPtnComm : public Communicator { public: /** - * Create an AdiosComm object. Collective across sender and receiver ranks. + * Create an AdiosPtnComm object. Collective across sender and receiver ranks. * Calls to the constructor from the sender and receiver ranks must be in * the same order (i.e., first creating the client-to-server object then the * server-to-client link). @@ -172,19 +177,19 @@ class AdiosComm : public Communicator { * @param[in] recvRanks_ number of ranks in the receivers MPI communicator * @param[in] eng_ ADIOS2 engine for writing on the sender side * @param[in] io_ ADIOS2 IO associated with eng_ - * @param[in] name_ unique name among AdiosComm objects + * @param[in] name_ unique name among AdiosPtnComm objects */ - AdiosComm(MPI_Comm comm_, int recvRanks_, adios2::Engine& eng_, adios2::IO& io_, std::string name_) + AdiosPtnComm(MPI_Comm comm_, int recvRanks_, adios2::Engine& eng_, adios2::IO& io_, std::string name_) : comm(comm_), recvRanks(recvRanks_), eng(eng_), io(io_), name(name_), verbose(0) { inMsg.knownSizes = false; } /// We are explicitly not allowing copy/move constructor/assignment as we don't /// know if the ADIOS2 Engine and IO objects can be safely copied/moved. - AdiosComm(const AdiosComm& other) = delete; - AdiosComm(AdiosComm&& other) = delete; - AdiosComm& operator=(const AdiosComm& other) = delete; - AdiosComm& operator=(AdiosComm&& other) = delete; + AdiosPtnComm(const AdiosPtnComm& other) = delete; + AdiosPtnComm(AdiosPtnComm&& other) = delete; + AdiosPtnComm& operator=(const AdiosPtnComm& other) = delete; + AdiosPtnComm& operator=(AdiosPtnComm&& other) = delete; void SetOutMessageLayout(LOs& dest_, LOs& offsets_) { REDEV_FUNCTION_TIMER; @@ -343,7 +348,7 @@ class AdiosComm : public Communicator { return inMsg; } /** - * Control the amount of output from AdiosComm functions. The higher the value the more output is written. + * Control the amount of output from AdiosPtnComm functions. The higher the value the more output is written. * @param[in] lvl valid values are [0:5] where 0 is silent and 5 is produces * the most output */ @@ -370,4 +375,74 @@ class AdiosComm : public Communicator { InMessageLayout inMsg; }; +/** + * The AdiosGlobalComm class implements the Communicator interface to enable + * message exchange between clients and the server through ADIOS2. + * Similar to AdiosPtnComm, it provides bidirectional communication, + * but the key distinction is that the global communicator is shared + * across all ranks and partitions. + * + * It is primarily used for transferring global data and metadata + * relevant to coupled applications. + * + * Currently, the BP4 and SST ADIOS2 engines are supported. + */ +template +class AdiosGlobalComm : public Communicator + { + public: + AdiosGlobalComm(MPI_Comm comm_, adios2::Engine& eng_, adios2::IO& io_, + std::string name_) + : comm(comm_), eng(eng_), io(io_), name(name_) + { + } + + // copy/move of adios engine and io objects isn't safe. + AdiosGlobalComm(const AdiosGlobalComm& other) = delete; + AdiosGlobalComm(AdiosGlobalComm&& other) = delete; + AdiosGlobalComm& operator=(const AdiosGlobalComm& other) = delete; + AdiosGlobalComm& operator=(AdiosGlobalComm&& other) = delete; + + void SetCommParams(std::string varName_, size_t msgSize_){ + varName = varName_; + msgSize = msgSize_; + } + void Send(T* ptr, Mode mode) + { + REDEV_FUNCTION_TIMER; + auto var = io.InquireVariable(varName); + auto msg = std::vector(ptr, ptr + msgSize); + if (!var) { + var = io.DefineVariable(varName,{} ,{},{msgSize}); + } + assert(var); + eng.Put(var, msg.data()); + if(mode == Mode::Synchronous) { + eng.PerformPuts(); + } + } + std::vector Recv(Mode mode) + { + REDEV_FUNCTION_TIMER; + std::vector msg; + auto var = io.InquireVariable(varName); + assert(var); + msg.resize(msgSize); + eng.Get(var, msg.data()); + if(mode == Mode::Synchronous) { + eng.PerformGets(); + } + return msg; + } + void SetOutMessageLayout(LOs& dest, LOs& offsets) {}; + InMessageLayout GetInMessageLayout() { return {}; } + + private: + MPI_Comm comm; + adios2::Engine& eng; + adios2::IO& io; + std::string name; + std::string varName = ""; + std::size_t msgSize = 0; + }; } diff --git a/redev_types.h b/redev_types.h index c85f72c..40c6e68 100644 --- a/redev_types.h +++ b/redev_types.h @@ -24,6 +24,6 @@ using CVs = std::vector; enum class ProcessType { Client = 0, Server = 1 }; enum class TransportType { BP4 = 0, SST = 1 }; - +enum class CommType{ Ptn = 0, Global = 1 }; } #endif diff --git a/test_global_comm.cpp b/test_global_comm.cpp new file mode 100644 index 0000000..ba00311 --- /dev/null +++ b/test_global_comm.cpp @@ -0,0 +1,53 @@ +#include +#include +#include "redev.h" + +int main(int argc, char** argv) +{ + MPI_Init(&argc, &argv); + { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] + << " <1=isRendezvousApp,0=isParticipant>\n"; + exit(EXIT_FAILURE); + } + auto isRdv = atoi(argv[1]); + MPI_Comm localComm = MPI_COMM_WORLD; + + // dummy partition vector data + const auto dim = 2; + auto cuts = isRdv ? redev::Reals({0}) : redev::Reals(1); + auto ranks = isRdv ? redev::LOs({0}) : redev::LOs(1); + auto ptn = redev::RCBPtn(dim, ranks, cuts); + redev::Redev rdv(localComm, redev::Partition{std::move(ptn)}, static_cast(isRdv)); + // Initialize the Adios Channel + adios2::Params params{{"Streaming", "On"},{"OpenTimeoutSecs", "4"}}; + std::string name = "bar"; + auto channel = rdv.CreateAdiosChannel(name, params, redev::TransportType::BP4); + auto commPair = channel.CreateComm(name, localComm, redev::CommType::Global); + + // send data to test global comm + redev::Reals vals = {3.14}; + auto *msgs = &vals[0]; + std::string varName = "barVar"; + size_t n = vals.size(); + // test the ptn comm + // the non-rendezvous app sends to the rendezvous app + if (!isRdv) { + commPair.SetCommParams(varName, n); + channel.BeginSendCommunicationPhase(); + commPair.Send(msgs, redev::Mode::Synchronous); + channel.EndSendCommunicationPhase(); + } else { + // receive global date + channel.BeginReceiveCommunicationPhase(); + commPair.SetCommParams(varName, n); + auto msgVec = commPair.Recv(redev::Mode::Synchronous); + channel.EndReceiveCommunicationPhase(); + REDEV_ALWAYS_ASSERT(msgVec[0] == redev::Real{3.14}); + printf("\nTest passed."); + } + } + MPI_Finalize(); + return 0; +} \ No newline at end of file