diff --git a/src/benchmarks/benchmark_io.cpp b/src/benchmarks/benchmark_io.cpp index b6cc7af8..7c949870 100644 --- a/src/benchmarks/benchmark_io.cpp +++ b/src/benchmarks/benchmark_io.cpp @@ -65,6 +65,8 @@ struct Latency { constexpr size_t KILOBYTE = 1024; constexpr size_t MEGABYTE = KILOBYTE * KILOBYTE; +std::vector> buffers; + size_t getPayloadSize(size_t factor) { return KILOBYTE * (1 << factor); } void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t factor, @@ -77,10 +79,12 @@ void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t fact } else { key = std::to_string(factor) + "-" + std::to_string(threadId) + ".data"; } - std::vector buffer(payloadSize); + // std::vector buffer(payloadSize); + auto file = geds->open(FLAGS_bucket.CurrentValue(), key); if (file.ok()) { auto openTime = std::chrono::steady_clock::now(); + auto buffer = buffers[threadId].data(); auto status = file->read(buffer, 0, payloadSize); auto lastByteTime = std::chrono::steady_clock::now(); constexpr auto milliseconds = 1e6; @@ -93,7 +97,6 @@ void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t fact lastByteTime - startTime) .count() / milliseconds}); - if (!status.ok()) { auto message = "Error: " + std::string{status.status().message()}; LOG_ERROR(message); @@ -183,8 +186,16 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - for (size_t i = 0; i <= absl::GetFlag(FLAGS_maxFactor); i++) { - for (size_t j = 1; j < absl::GetFlag(FLAGS_maxThreads); j++) { + auto threadCount = absl::GetFlag(FLAGS_maxThreads); + buffers.resize(threadCount); + auto factorCount = absl::GetFlag(FLAGS_maxFactor); + auto maxPayload = getPayloadSize(factorCount); + for (auto &buffer : buffers) { + buffer.resize(maxPayload); + } + + for (size_t i = 0; i <= factorCount; i++) { + for (size_t j = 1; j < threadCount; j++) { auto result = benchmark(geds, i, j); std::cout << result.payloadSize << ": " << result.threads << " " << result.rate << " MB/s" << std::endl; diff --git a/src/libgeds/Filesystem.cpp b/src/libgeds/Filesystem.cpp index 490e5d9c..0c39fd72 100644 --- a/src/libgeds/Filesystem.cpp +++ b/src/libgeds/Filesystem.cpp @@ -44,8 +44,11 @@ absl::Status removeFile(const std::string &path) { int err = unlink(path.c_str()); if (err != 0 && (errno != ENOENT)) { int error = errno; - return absl::UnknownError("Unable to delete file " + path + ": " + std::strerror(error)); + auto message = "Unable to delete file " + path + ": " + std::strerror(error); + LOG_ERROR(message); + return absl::UnknownError(message); } + LOG_DEBUG("Removed ", path); return absl::OkStatus(); } diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 4d1c41a0..7ab640d4 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -159,6 +159,9 @@ absl::Status GEDS::start() { // Update state. _state = ServiceState::Running; + (void)_metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Register); + startStorageMonitoringThread(); auto st = syncObjectStoreConfigs(); @@ -173,11 +176,20 @@ absl::Status GEDS::stop() { GEDS_CHECK_SERVICE_RUNNING LOG_INFO("Stopping"); LOG_INFO("Printing statistics"); + _state = ServiceState::Stopped; geds::Statistics::print(); - auto result = _metadataService.disconnect(); + // Relocate to S3 if available. + relocate(true); + // Unregister. + auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Unregister); + if (!result.ok()) { + LOG_ERROR("Unable to unregister: ", result.message()); + } + result = _metadataService.disconnect(); if (!result.ok()) { - LOG_ERROR("cannot disconnect metadata service"); + LOG_ERROR("cannot disconnect metadata service: ", result.message()); } result = _server.stop(); if (!result.ok()) { @@ -191,8 +203,6 @@ absl::Status GEDS::stop() { _fileTransfers.clear(); _tcpTransport->stop(); - _state = ServiceState::Stopped; - _storageMonitoringThread.join(); return result; @@ -855,45 +865,34 @@ void GEDS::relocate(std::vector> &relocatable, b struct RelocateHelper { std::mutex mutex; std::condition_variable cv; - size_t nTasks; - auto lock() { return std::unique_lock(mutex); } + std::atomic nTasks; }; auto h = std::make_shared(); { - auto lock = h->lock(); + std::lock_guard lock(h->mutex); h->nTasks = relocatable.size(); } LOG_INFO("Relocating ", relocatable.size(), " objects."); auto self = shared_from_this(); - size_t off = 3 * _config.io_thread_pool_size; - - for (size_t offset = 0; offset < relocatable.size(); offset += off) { - auto rbegin = offset; - auto rend = rbegin + off; - if (rend > relocatable.size()) { - rend = relocatable.size(); - } - for (auto i = rbegin; i < rend; i++) { - auto fh = relocatable[i]; - boost::asio::post(_ioThreadPool, [self, fh, h, force]() { - try { - self->relocate(fh, force); - } catch (...) { - LOG_ERROR("Encountered an exception during relocation ", fh->identifier); - } - { - auto lock = h->lock(); - h->nTasks -= 1; - } - h->cv.notify_all(); - }); - } - auto relocateLock = h->lock(); - h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); - LOG_INFO("Relocated ", relocatable.size(), " objects."); + for (auto fh : relocatable) { + boost::asio::post(_ioThreadPool, [self, fh, h, force]() { + try { + self->relocate(fh, force); + } catch (...) { + LOG_ERROR("Encountered an exception during relocation ", fh->identifier); + } + { + std::lock_guard lock(h->mutex); + h->nTasks -= 1; + } + h->cv.notify_one(); + }); } + std::unique_lock lock(h->mutex); + h->cv.wait(lock, [h]() { return h->nTasks == 0; }); + LOG_INFO("Relocated ", relocatable.size(), " objects."); } void GEDS::relocate(std::shared_ptr handle, bool force) { @@ -921,6 +920,81 @@ void GEDS::relocate(std::shared_ptr handle, bool force) { (void)handle->relocate(); } +absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) { + auto oldFile = openAsFileHandle(bucket, key); + if (!oldFile.ok()) { + return oldFile.status(); + } + auto newFile = createAsFileHandle(bucket, key, true /* overwrite */); + if (!newFile.ok()) { + return newFile.status(); + } + auto status = (*oldFile)->download(*newFile); + if (!status.ok()) { + return status; + } + return (*newFile)->seal(); +} + +absl::Status GEDS::downloadObjects(std::vector objects) { + struct PullHelper { + std::mutex mutex; + std::condition_variable cv; + size_t nTasks; + size_t nErrors; + auto lock() { return std::unique_lock(mutex); } + }; + auto h = std::make_shared(); + { + auto lock = h->lock(); + h->nTasks = objects.size(); + h->nErrors = 0; + } + + auto self = shared_from_this(); + size_t off = 3 * _config.io_thread_pool_size; + + for (size_t offset = 0; offset < objects.size(); offset += off) { + auto rbegin = offset; + auto rend = rbegin + off; + if (rend > objects.size()) { + rend = objects.size(); + } + for (auto i = rbegin; i < rend; i++) { + const auto &file = objects[i]; + boost::asio::post(_ioThreadPool, [self, file, h]() { + bool error = false; + try { + auto status = self->downloadObject(file.bucket, file.key); + if (!status.ok()) { + LOG_ERROR("Unable to download ", file.bucket, "/", file.key); + error = true; + } + } catch (...) { + LOG_ERROR("Encountered an exception when downloading ", file.bucket, "/", file.key); + error = true; + } + { + auto lock = h->lock(); + h->nTasks -= 1; + if (error) { + h->nErrors += 1; + } + } + h->cv.notify_all(); + }); + } + } + auto relocateLock = h->lock(); + h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); + LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors); + if (h->nErrors) { + return absl::UnknownError("Some objects were not downloaded: Observed " + + std::to_string(h->nErrors) + " errors!"); + } + return absl::OkStatus(); +} + void GEDS::startStorageMonitoringThread() { _storageMonitoringThread = std::thread([&]() { auto statsLocalStorageUsed = geds::Statistics::createGauge("GEDS: Local Storage used"); @@ -941,12 +1015,12 @@ void GEDS::startStorageMonitoringThread() { allFileHandles.push_back(fh); }); for (const auto &fh : allFileHandles) { - storageUsed += fh->localStorageSize(); - memoryUsed += fh->localMemorySize(); - if (fh->isRelocatable()) { - if (fh->openCount() == 0) { - relocatable.push_back(fh); - } + auto storageSize = fh->localStorageSize(); + auto memSize = fh->localMemorySize(); + storageUsed += storageSize; + memoryUsed += memSize; + if (fh->isRelocatable() && fh->openCount() == 0) { + relocatable.push_back(fh); } } } @@ -968,8 +1042,17 @@ void GEDS::startStorageMonitoringThread() { *statsLocalMemoryFree = _memoryCounters.free; } - auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); - if (memoryUsed > targetStorage) { + { + // Send heartbeat. + auto status = _metadataService.heartBeat(uuid, _storageCounters, _memoryCounters); + if (!status.ok()) { + LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message()); + } + } + + auto targetStorage = + (size_t)(_config.storage_spilling_fraction * (double)_config.available_local_storage); + if (storageUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { return a->lastReleased() < b->lastReleased(); @@ -978,7 +1061,7 @@ void GEDS::startStorageMonitoringThread() { std::vector> tasks; size_t relocateBytes = 0; for (auto &f : relocatable) { - if (relocateBytes > targetStorage) { + if (relocateBytes > (storageUsed - targetStorage)) { break; } relocateBytes += f->localStorageSize(); @@ -986,6 +1069,8 @@ void GEDS::startStorageMonitoringThread() { } if (tasks.size()) { relocate(tasks); + } else { + LOG_WARNING("Unable to relocate files: No task found!"); } } relocatable.clear(); @@ -993,3 +1078,25 @@ void GEDS::startStorageMonitoringThread() { } }); } + +absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string &key) { + const auto path = getPath(bucket, key); + auto result = _fileHandles.getAndRemove(path); + if (!result.has_value()) { + auto message = "The object with the path " + path.name + " does not exist locally."; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + return absl::OkStatus(); +} + +absl::Status GEDS::purgeLocalObjects(std::vector objects) { + LOG_DEBUG("Purging ", objects.size(), "."); + for (const auto &obj : objects) { + auto status = purgeLocalObject(obj.bucket, obj.key); + if (!status.ok()) { + LOG_ERROR(status.message()); + } + } + return absl::OkStatus(); +} diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index 455700ab..7342616c 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -321,6 +322,22 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj void relocate(bool force = false); void relocate(std::vector> &relocatable, bool force = false); void relocate(std::shared_ptr handle, bool force = false); + + /** + * @brief Pull object to this GEDS instance. + */ + absl::Status downloadObject(const std::string &bucket, const std::string &key); + absl::Status downloadObjects(std::vector objects); + + /** + * @brief Purge locally stored object without updating the Metadata server. + */ + absl::Status purgeLocalObject(const std::string &bucket, const std::string &key); + + /** + * @brief Purge locally stored objects if they exist locally. Missing files will be logged. + */ + absl::Status purgeLocalObjects(std::vector objects); }; #endif // GEDS_GEDS_H diff --git a/src/libgeds/GEDSAbstractFileHandle.h b/src/libgeds/GEDSAbstractFileHandle.h index 570edc5f..ec939825 100644 --- a/src/libgeds/GEDSAbstractFileHandle.h +++ b/src/libgeds/GEDSAbstractFileHandle.h @@ -174,7 +174,7 @@ template class GEDSAbstractFileHandle : public GEDSFileHandle { if (!isValid()) { return absl::UnavailableError("The file " + identifier + " is no longer valid!"); } - LOG_INFO("Relocating ", identifier); + LOG_INFO("Relocating ", identifier, " (size: ", _file.size(), ") "); if (_openCount > 0) { auto message = "Unable to relocate " + identifier + " reason: The file is still in use."; LOG_ERROR(message); @@ -205,13 +205,13 @@ template class GEDSAbstractFileHandle : public GEDSFileHandle { } auto fh = GEDSS3FileHandle::factory(_gedsService, bucket, key, metadata()); if (!fh.ok()) { - LOG_ERROR("Unable to reopen the relocateed file ", identifier, + LOG_ERROR("Unable to reopen the relocated file ", identifier, " on s3:", fh.status().message()); return fh.status(); } auto status = (*fh)->seal(); if (!status.ok()) { - LOG_ERROR("Unable to seal relocateed file!"); + LOG_ERROR("Unable to seal relocated file!"); (void)(*s3Endpoint)->deleteObject(bucket, key); return status; } diff --git a/src/libgeds/GEDSConfig.cpp b/src/libgeds/GEDSConfig.cpp index 5a15baed..17d87b36 100644 --- a/src/libgeds/GEDSConfig.cpp +++ b/src/libgeds/GEDSConfig.cpp @@ -4,7 +4,11 @@ */ #include "GEDSConfig.h" + +#include + #include "Logging.h" +#include "absl/status/status.h" absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { LOG_DEBUG("Trying to set '", key, "' to '", value, "'"); @@ -14,6 +18,14 @@ absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { hostname = value == "" ? std::nullopt : std::make_optional(value); } else if (key == "local_storage_path") { localStoragePath = value; + } else if (key == "node_type") { + if (value == "Standard") { + node_type = GEDSNodeType::Standard; + } else if (value == "Storage") { + node_type = GEDSNodeType::Storage; + } else { + return absl::NotFoundError("Invalid node type " + value); + } } else { LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); @@ -64,6 +76,15 @@ absl::Status GEDSConfig::set(const std::string &key, int64_t value) { return set(key, (size_t)value); } +absl::Status GEDSConfig::set(const std::string &key, double value) { + if (key == "storage_spilling_fraction") { + storage_spilling_fraction = value; + return absl::OkStatus(); + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " not found."); +} + absl::StatusOr GEDSConfig::getString(const std::string &key) const { LOG_INFO("Get ", key, " as string"); if (key == "listen_address") { @@ -75,6 +96,9 @@ absl::StatusOr GEDSConfig::getString(const std::string &key) const if (key == "local_storage_path") { return localStoragePath; } + if (key == "node_type") { + return std::string{magic_enum::enum_name(node_type)}; + } LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); } @@ -110,3 +134,11 @@ absl::StatusOr GEDSConfig::getSignedInt(const std::string &key) const { } return value.status(); } + +absl::StatusOr GEDSConfig::getDouble(const std::string &key) const { + if (key == "storage_spilling_fraction") { + return storage_spilling_fraction; + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " (double) not found."); +} diff --git a/src/libgeds/GEDSConfig.h b/src/libgeds/GEDSConfig.h index 5923598c..03354e1e 100644 --- a/src/libgeds/GEDSConfig.h +++ b/src/libgeds/GEDSConfig.h @@ -16,6 +16,8 @@ #include "Ports.h" +enum class GEDSNodeType { Standard, Storage }; + struct GEDSConfig { /** * @brief The hostname of the metadata service/ @@ -79,14 +81,26 @@ struct GEDSConfig { size_t available_local_memory = 16 * 1024 * 1024 * (size_t)1024; + /** + * @brief Fraction of the storage where GEDS should start spilling. + */ + double storage_spilling_fraction = 0.7; + + /** + * @brief Node type. + */ + GEDSNodeType node_type = GEDSNodeType::Standard; + GEDSConfig(std::string metadataServiceAddressArg) : metadataServiceAddress(std::move(metadataServiceAddressArg)) {} absl::Status set(const std::string &key, const std::string &value); absl::Status set(const std::string &key, size_t value); absl::Status set(const std::string &key, int64_t value); + absl::Status set(const std::string &key, double value); absl::StatusOr getString(const std::string &key) const; absl::StatusOr getUnsignedInt(const std::string &key) const; absl::StatusOr getSignedInt(const std::string &key) const; + absl::StatusOr getDouble(const std::string &key) const; }; diff --git a/src/libgeds/GEDSFileHandle.cpp b/src/libgeds/GEDSFileHandle.cpp index bc1157da..91f888e6 100644 --- a/src/libgeds/GEDSFileHandle.cpp +++ b/src/libgeds/GEDSFileHandle.cpp @@ -120,6 +120,7 @@ absl::Status GEDSFileHandle::download(std::shared_ptr destinatio } pos += *count; } while (pos < *totalSize); + return absl::OkStatus(); } diff --git a/src/libgeds/MetadataService.cpp b/src/libgeds/MetadataService.cpp index 82e7b66f..d1d339e5 100644 --- a/src/libgeds/MetadataService.cpp +++ b/src/libgeds/MetadataService.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include "GEDS.h" #include "Logging.h" @@ -136,6 +135,56 @@ absl::StatusOr MetadataService::getConnectionInformation() { return response.remoteaddress(); } +absl::Status MetadataService::configureNode(const std::string &uuid, const std::string &identifier, + uint16_t port, geds::rpc::NodeState state) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::NodeStatus request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + auto node = request.mutable_node(); + node->set_identifier(identifier); + node->set_port(port); + + request.set_state(state); + request.set_uuid(uuid); + + auto status = _stub->ConfigureNode(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to execute ConfigureNode: " + status.error_message()); + } + return convertStatus(response); +} + +absl::Status MetadataService::heartBeat(const std::string &uuid, const StorageCounter &storage, + const StorageCounter &memory) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::HeartbeatMessage request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + request.set_uuid(uuid); + { + auto lock = memory.getReadLock(); + request.set_memoryallocated(memory.allocated); + request.set_memoryused(memory.used); + } + + { + auto lock = storage.getReadLock(); + request.set_storageused(storage.used); + request.set_storageallocated(storage.allocated); + } + + auto status = _stub->Heartbeat(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to send heart beat: " + status.error_message()); + } + return convertStatus(response); +} + absl::Status MetadataService::createBucket(const std::string_view &bucket) { METADATASERVICE_CHECK_CONNECTED; geds::rpc::Bucket request; diff --git a/src/libgeds/MetadataService.h b/src/libgeds/MetadataService.h index ec9772bb..aa7afa48 100644 --- a/src/libgeds/MetadataService.h +++ b/src/libgeds/MetadataService.h @@ -21,7 +21,9 @@ #include "Object.h" #include "ObjectStoreConfig.h" +#include "StorageCounter.h" #include "geds.grpc.pb.h" +#include "geds.pb.h" namespace geds { @@ -43,6 +45,12 @@ class MetadataService { absl::Status disconnect(); + absl::Status configureNode(const std::string &uuid, const std::string &identifier, uint16_t port, + geds::rpc::NodeState state); + + absl::Status heartBeat(const std::string &uuid, const StorageCounter &storage, + const StorageCounter &memory); + absl::StatusOr getConnectionInformation(); absl::Status registerObjectStoreConfig(const ObjectStoreConfig &mapping); diff --git a/src/libgeds/Server.cpp b/src/libgeds/Server.cpp index 663fd29b..6dd9dff7 100644 --- a/src/libgeds/Server.cpp +++ b/src/libgeds/Server.cpp @@ -30,6 +30,7 @@ #include "Platform.h" #include "Ports.h" #include "Status.h" +#include "absl/status/status.h" #include "geds.grpc.pb.h" #include "geds.pb.h" @@ -63,6 +64,39 @@ class ServerImpl final : public geds::rpc::GEDSService::Service { return grpc::Status::OK; } + ::grpc::Status DownloadObjects(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to pull ", request->objects().size(), " objects."); + + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + auto status = _geds->downloadObjects(objects); + convertStatus(response, status); + return grpc::Status::OK; + }; + + ::grpc::Status DeleteObjectsLocally(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to delete ", request->objects().size(), " objects."); + + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + + auto status = _geds->purgeLocalObjects(objects); + convertStatus(response, status); + return grpc::Status::OK; + } + public: ServerImpl(std::shared_ptr geds, Server &server) : _geds(geds), _server(server) {} diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index f703ee35..994fabf7 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -311,7 +311,7 @@ void TcpTransport::tcpTxThread(unsigned int id) { } epoll_wfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; @@ -657,7 +657,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { epoll_rfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 07d70ae8..3b67d783 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -6,6 +6,14 @@ set(SOURCES GRPCServer.cpp GRPCServer.h + MDSHttpServer.cpp + MDSHttpServer.h + MDSHttpSession.cpp + MDSHttpSession.h + NodeInformation.cpp + NodeInformation.h + Nodes.cpp + Nodes.h ObjectStoreHandler.cpp ObjectStoreHandler.h S3Helper.cpp @@ -15,7 +23,9 @@ set(SOURCES add_library(libmetadataservice STATIC ${SOURCES}) target_link_libraries(libmetadataservice PUBLIC + magic_enum::magic_enum ${GRPC_LIBRARIES} + ${Boost_LIBRARIES} geds_utility geds_proto geds_s3 diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index fae039c5..5bb0358b 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -5,6 +5,7 @@ #include "GRPCServer.h" +#include #include #include #include @@ -20,6 +21,8 @@ #include "FormatISO8601.h" #include "Logging.h" +#include "MDSHttpServer.h" +#include "Nodes.h" #include "ObjectStoreConfig.h" #include "ObjectStoreHandler.h" #include "ParseGRPC.h" @@ -37,8 +40,16 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { std::shared_ptr _kvs; ObjectStoreHandler _objectStoreHandler; + Nodes _nodes; + geds::MDSHttpServer _httpServer; + public: - MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs) {} + MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes, _kvs) { + auto status = _httpServer.start(); + if (!status.ok()) { + LOG_ERROR("Unable to start HTTP Server!"); + } + } geds::ObjectID convert(const ::geds::rpc::ObjectID *r) { return geds::ObjectID(r->bucket(), r->key()); @@ -53,6 +64,51 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { } protected: + grpc::Status ConfigureNode(::grpc::ServerContext *context, const ::geds::rpc::NodeStatus *request, + ::geds::rpc::StatusResponse *response) override { + LOG_ACCESS("ConfigureNode"); + const auto &identifier = request->node().identifier(); + uint16_t port = request->node().port(); + const auto state = request->state(); + const auto &uuid = request->uuid(); + + absl::Status status; + if (state == geds::rpc::NodeState::Register) { + status = _nodes.registerNode(uuid, identifier, port); + } else if (state == geds::rpc::NodeState::Unregister) { + std::vector toDecommission = {uuid}; + auto decommissionStatus = _nodes.decommissionNodes(toDecommission, _kvs); + if (!decommissionStatus.ok()) { + LOG_ERROR("Unable to decommission node: ", decommissionStatus.message()); + } + status = _nodes.unregisterNode(uuid); + } else { + LOG_ERROR("Invalid state ", state); + status = absl::InvalidArgumentError("Invalid state: " + std::to_string(state)); + } + + convertStatus(response, status); + return grpc::Status::OK; + } + + grpc::Status Heartbeat(::grpc::ServerContext *context, + const ::geds::rpc::HeartbeatMessage *request, + ::geds::rpc::StatusResponse *response) override { + // LOG_ACCESS("Heartbeat: ", request->uuid()); + (void)context; + + const auto &uuid = request->uuid(); + NodeHeartBeat val; + val.memoryAllocated = request->memoryallocated(); + val.memoryUsed = request->memoryused(); + val.storageAllocated = request->storageallocated(); + val.storageUsed = request->storageused(); + + auto status = _nodes.heartbeat(uuid, std::move(val)); + convertStatus(response, status); + return grpc::Status::OK; + } + grpc::Status GetConnectionInformation(::grpc::ServerContext *context, const ::geds::rpc::EmptyParams * /* unused request */, ::geds::rpc::ConnectionInformation *response) override { diff --git a/src/metadataservice/MDSHttpServer.cpp b/src/metadataservice/MDSHttpServer.cpp new file mode 100644 index 00000000..4b666c79 --- /dev/null +++ b/src/metadataservice/MDSHttpServer.cpp @@ -0,0 +1,69 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpServer.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "MDSHttpSession.h" +#include "Nodes.h" + +namespace geds { + +MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs) + : _port(port), _nodes(nodes), _kvs(kvs) {} + +absl::Status MDSHttpServer::start() { + if (_acceptor != nullptr) { + return absl::UnknownError("The server is already running!"); + } + try { + auto host = boost::asio::ip::make_address("0.0.0.0"); + _acceptor = std::unique_ptr( + new boost::asio::ip::tcp::acceptor(_ioContext, {host, _port})); + _thread = std::thread([&] { + accept(); + _ioContext.run(); + }); + } catch (boost::exception &e) { + // Workaround until GEDS properly supports multiple processes. + auto diag = boost::diagnostic_information(e, false); + return absl::InternalError("Unable to start webserver: " + diag); + } + return absl::OkStatus(); +} + +void MDSHttpServer::stop() { + _ioContext.stop(); + _acceptor->close(); + _thread.join(); +} + +void MDSHttpServer::accept() { + _acceptor->async_accept( + boost::asio::make_strand(_ioContext), + [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { + if (ec) { + LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); + return; + } + std::make_shared(std::move(socket), _nodes, _kvs)->start(); + accept(); + }); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpServer.h b/src/metadataservice/MDSHttpServer.h new file mode 100644 index 00000000..a32a6208 --- /dev/null +++ b/src/metadataservice/MDSHttpServer.h @@ -0,0 +1,39 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include +#include + +#include "MDSHttpSession.h" +#include "MDSKVS.h" + +class Nodes; + +namespace geds { + +class MDSHttpServer { + uint16_t _port; + + boost::asio::io_context _ioContext{1}; + std::unique_ptr _acceptor = nullptr; + std::thread _thread; + + Nodes &_nodes; + std::shared_ptr _kvs; + +public: + MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs); + absl::Status start(); + void stop(); + +private: + void accept(); +}; + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp new file mode 100644 index 00000000..c9bef3b3 --- /dev/null +++ b/src/metadataservice/MDSHttpSession.cpp @@ -0,0 +1,309 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpSession.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "MDSKVS.h" +#include "MDSKVSBucket.h" +#include "Nodes.h" +#include "Statistics.h" + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + n->forall([&nv](const utility::Path &key, const geds::ObjectInfo &info) { + nv.push_back({key.name, + {{"location", info.location}, + {"size", info.size}, + {"metadata", info.metadata.has_value() + ? (std::to_string(info.metadata->size()) + " bytes") + : std::string{""}}}}); + }); +} + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + auto buckets = n->listBuckets(); + if (!buckets.ok()) { + jv = nv; + return; + } + for (const auto &bucket : *buckets) { + auto objs = n->getBucket(bucket); + if (!objs.ok()) { + continue; + } + auto b = *objs; + auto value = boost::json::value_from(b); + nv.push_back({bucket, value}); + } + jv = nv; +} + +namespace geds { + +MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, + std::shared_ptr kvs) + : _stream(std::move(socket)), _nodes(nodes), _kvs(kvs) {} + +MDSHttpSession::~MDSHttpSession() { close(); } + +void MDSHttpSession::start() { + LOG_DEBUG("Start connection"); + auto self = shared_from_this(); + boost::asio::dispatch(_stream.get_executor(), + boost::beast::bind_front_handler(&MDSHttpSession::awaitRequest, self)); +} + +void MDSHttpSession::awaitRequest() { + auto self = shared_from_this(); + _request = {}; + _stream.expires_after(std::chrono::seconds(10)); + + boost::beast::http::async_read( + _stream, _buffer, _request, + [self](boost::beast::error_code ec, std::size_t /* bytes_transferred */) { + if (ec == boost::beast::http::error::end_of_stream) { + return; + } + if (ec) { + LOG_ERROR("Failed reading stream", ec.message()); + return; + } + self->handleRequest(); + }); +} + +void MDSHttpSession::prepareHtmlReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + + boost::beast::ostream(_response.body()) << "" + << "" + << "GEDS Service" + << "" + << "" + << "
"
+                                          << "GEDS Metadata Service\n"
+                                          << "=====================\n"
+                                          << "\n";
+
+  const auto &info = _nodes.information();
+  if (info.size()) {
+    boost::beast::ostream(_response.body()) << "Registered nodes:\n";
+    info.forall([&](const std::shared_ptr &node) {
+      const auto &[heartBeat, ts] = node->lastHeartBeat();
+      boost::beast::ostream(_response.body())
+          << " - " << node->uuid << ": "                                 //
+          << node->host << ":" << node->port << " "                      //
+          << std::string{magic_enum::enum_name(node->state())} << " -- " //
+          << "Allocated: " << heartBeat.storageAllocated << " "          //
+          << "Used: " << heartBeat.storageUsed << " "                    //
+          << "Memory Allocated: " << heartBeat.memoryAllocated << " "    //
+          << "Memory Used: " << heartBeat.memoryUsed << "\n";
+    });
+  }
+  boost::beast::ostream(_response.body()) << "
" + << "" << std::endl; + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiListReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_kvs); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiNodesReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_nodes); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiDecommissionReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + std::vector hostsToDecommission; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + return prepareError(boost::beast::http::status::bad_request, "Unexpected element in array!"); + } + hostsToDecommission.push_back(boost::json::value_to(value)); + } + + auto status = _nodes.decommissionNodes(hostsToDecommission, _kvs); + if (!status.ok()) { + return prepareError(boost::beast::http::status::internal_server_error, + std::string{status.message()}); + } + + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": )" << body << "}"; + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiReregisterReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + // Parse body + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + // Send reply + size_t count = 0; + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": [)"; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + continue; + } + auto host = boost::json::value_to(value); + auto status = _nodes.reregisterNode(host); + if (status.ok()) { + if (count > 1) { + boost::beast::ostream(_response.body()) << ", "; + } + boost::beast::ostream(_response.body()) << "\"" << host << "\""; + count++; + } + } + + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::handleRequest() { + if (_request.target().empty() || _request.target()[0] != '/') { + return prepareError(boost::beast::http::status::bad_request, "Invalid path."); + } + + if (_request.method() == boost::beast::http::verb::get) { + if (_request.target() == "/") { + return prepareHtmlReply(); + } + if (_request.target() == "/api/list") { + return prepareApiListReply(); + } + if (_request.target() == "/api/nodes") { + return prepareApiNodesReply(); + } + if (_request.target() == "/metrics") { + return prepareMetricsReply(); + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); + } + if (_request.method() == boost::beast::http::verb::post) { + auto body = boost::beast::buffers_to_string(_request.body().data()); + if (_request.target() == "/api/decommission") { + return prepareApiDecommissionReply(body); + } + if (_request.target() == "/api/reregister") { + return prepareApiReregisterReply(body); + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); + } + return prepareError(boost::beast::http::status::bad_request, "Invalid method."); +} + +void MDSHttpSession::prepareMetricsReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "plain/text"); + _response.keep_alive(_request.keep_alive()); + + std::stringstream stream; + Statistics::get().prometheusMetrics(stream); + boost::beast::ostream(_response.body()) << stream.str(); + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareError(boost::beast::http::status status, std::string message) { + _response.result(status); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + boost::beast::ostream(_response.body()) << message << "\n"; + + return handleWrite(); +} + +void MDSHttpSession::handleWrite() { + auto self = shared_from_this(); + _response.content_length(_response.body().size()); + + boost::beast::http::async_write( + _stream, _response, + [self](boost::beast::error_code ec, std::size_t /* unused bytesTransferred */) { + if (ec) { + LOG_ERROR("Error ", ec.message()); + return; + } + if (self->_request.keep_alive()) { + self->awaitRequest(); + } + self->_buffer.clear(); + }); +} + +void MDSHttpSession::close() { + LOG_DEBUG("Closing connection"); + + boost::beast::error_code ec; + _stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); + _stream.socket().close(); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h new file mode 100644 index 00000000..272f5024 --- /dev/null +++ b/src/metadataservice/MDSHttpSession.h @@ -0,0 +1,54 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "MDSKVS.h" + +class Nodes; + +namespace geds { + +class MDSHttpSession : public std::enable_shared_from_this { + boost::beast::tcp_stream _stream; + boost::beast::flat_buffer _buffer{4096}; + boost::beast::http::request _request; + boost::beast::http::response _response; + + Nodes &_nodes; + std::shared_ptr _kvs; + +public: + MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, std::shared_ptr kvs); + ~MDSHttpSession(); + void start(); + + void awaitRequest(); + void handleRequest(); + void prepareHtmlReply(); + void prepareMetricsReply(); + void prepareApiListReply(); + void prepareApiNodesReply(); + void prepareApiDecommissionReply(const std::string &body); + void prepareApiReregisterReply(const std::string &body); + void prepareError(boost::beast::http::status status, std::string message); + void handleWrite(); + + void close(); +}; + +} // namespace geds diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp new file mode 100644 index 00000000..a4d4dfa6 --- /dev/null +++ b/src/metadataservice/NodeInformation.cpp @@ -0,0 +1,151 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "NodeInformation.h" + +#include "Logging.h" +#include "Status.h" + +NodeInformation::NodeInformation(std::string uuid, std::string hostArg, uint16_t port) + : uuid(std::move(uuid)), host(std::move(hostArg)), port(port) {} + +absl::Status NodeInformation::connect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() != nullptr) { + // Already connected. + return absl::OkStatus(); + } + auto loc = host + ":" + std::to_string(port); + try { + _channel = grpc::CreateChannel(loc, grpc::InsecureChannelCredentials()); + auto success = + _channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(10)); + if (!success) { + // Destroy channel. + _channel = nullptr; + return absl::UnavailableError("Could not connect to " + loc + "."); + } + _stub = geds::rpc::GEDSService::NewStub(_channel); + } catch (const std::exception &e) { + _channel = nullptr; + return absl::UnavailableError("Could not open channel with " + loc + ". Reason: " + e.what()); + } + return absl::OkStatus(); +} + +absl::Status NodeInformation::disconnect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() == nullptr) { + // Already disconnected. + return absl::OkStatus(); + } + _stub.release(); + _channel = nullptr; + return absl::OkStatus(); +} + +void NodeInformation::setState(NodeState state) { + auto lock = getWriteLock(); + _state = state; +} +NodeState NodeInformation::state() const { + auto lock = getReadLock(); + return _state; +} + +std::string NodeInformation::gedsHostUri() const { + return "geds://" + host + ":" + std::to_string(port); +} + +void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { + auto lock = getWriteLock(); + _heartBeat = heartBeat; + _lastCheckin = std::chrono::system_clock::now(); +} + +std::tuple> +NodeInformation::lastHeartBeat() const { + auto lock = getReadLock(); + return std::make_tuple(_heartBeat, _lastCheckin); +} + +std::chrono::time_point NodeInformation::lastCheckin() { + auto lock = getReadLock(); + return _lastCheckin; +} + +absl::Status +NodeInformation::downloadObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DownloadObjects(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object pull grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object pull!"); + } + + return convertStatus(response); +} + +absl::Status +NodeInformation::purgeLocalObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DeleteObjectsLocally(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object delete grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object delete!"); + } + + return convertStatus(response); +} + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + std::shared_ptr const &n) { + const auto &[h, timeStamp] = n->lastHeartBeat(); + + auto storageFree = h.storageUsed > h.storageAllocated ? 0 : h.storageAllocated - h.storageUsed; + auto memoryFree = h.memoryUsed > h.memoryAllocated ? 0 : h.memoryAllocated - h.memoryUsed; + + jv = {{"uuid", n->uuid}, + {"host", n->host}, + {"port", n->port}, + {"storageAllocated", h.storageAllocated}, + {"storageUsed", h.storageUsed}, + {"storageFree", storageFree}, + {"memoryAllocated", h.memoryAllocated}, + {"memoryUsed", h.memoryUsed}, + {"memoryFree", memoryFree}, + {"lastCheckIn", toISO8601String(timeStamp)}, + {"state", std::string{magic_enum::enum_name(n->state())}}}; +} diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h new file mode 100644 index 00000000..f2803c1e --- /dev/null +++ b/src/metadataservice/NodeInformation.h @@ -0,0 +1,84 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "FormatISO8601.h" +#include "RWConcurrentObjectAdaptor.h" +#include "boost/json/detail/value_from.hpp" +#include "geds.grpc.pb.h" + +enum class NodeState { Registered, Decommissioning, ReadyForShutdown, Unknown }; + +struct NodeHeartBeat { + size_t memoryAllocated{0}; + size_t memoryUsed{0}; + size_t storageAllocated{0}; + size_t storageUsed{0}; +}; + +struct RelocatableObject { + const std::string &bucket; + std::string key; + size_t size; +}; + +class NodeInformation : public utility::RWConcurrentObjectAdaptor { + std::mutex _connectionMutex; + std::shared_ptr _channel{nullptr}; + std::unique_ptr _stub{nullptr}; + + // Subject to state mutex + NodeState _state{NodeState::Unknown}; + NodeHeartBeat _heartBeat; + std::chrono::time_point _lastCheckin; + // End subject to state mutex +public: + NodeInformation(std::string uuid, std::string host, uint16_t port); + NodeInformation(NodeInformation &) = delete; + NodeInformation(NodeInformation &&) = delete; + NodeInformation &operator=(NodeInformation &) = delete; + NodeInformation &operator=(NodeInformation &&) = delete; + + absl::Status connect(); + absl::Status disconnect(); + + // Subject to state mutex + absl::Status queryHeartBeat(); + + const std::string uuid; + const std::string host; + const uint16_t port; + + void setState(NodeState state); + NodeState state() const; + + std::string gedsHostUri() const; + + void updateHeartBeat(const NodeHeartBeat &heartBeat); + std::tuple> + lastHeartBeat() const; + std::chrono::time_point lastCheckin(); + // End subject to state mutex + + absl::Status downloadObjects(const std::vector> &objects); + absl::Status purgeLocalObjects(const std::vector> &objects); +}; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp new file mode 100644 index 00000000..8945ace7 --- /dev/null +++ b/src/metadataservice/Nodes.cpp @@ -0,0 +1,228 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "Nodes.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "Logging.h" +#include "NodeInformation.h" +#include "absl/status/status.h" +#include "geds.grpc.pb.h" + +absl::Status Nodes::registerNode(const std::string &uuid, const std::string &host, uint16_t port) { + auto val = std::make_shared(uuid, host, port); + auto existing = _nodes.insertOrExists(uuid, val); + if (existing.get() != val.get()) { + // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); + if (existing->state() == NodeState::Decommissioning) { + // Allow reregistering decommissioned nodes. + auto connect = val->connect(); + if (!connect.ok()) { + LOG_ERROR(connect.message()); + } + _nodes.insertOrReplace(uuid, val); + + return absl::OkStatus(); + } + auto message = "Node " + uuid + " was already registered!"; + LOG_ERROR(message); + return absl::AlreadyExistsError(message); + } + + auto connect = val->connect(); + if (connect.ok()) { + val->setState(NodeState::Registered); + } else { + LOG_ERROR(connect.message()); + } + return absl::OkStatus(); +} + +absl::Status Nodes::reregisterNode(const std::string &uuid) { + auto exists = _nodes.get(uuid); + if (!exists.has_value()) { + return absl::NotFoundError("Node " + uuid + " does not exist!"); + } + (*exists)->setState(NodeState::Registered); + return absl::OkStatus(); +} + +absl::Status Nodes::unregisterNode(const std::string &uuid) { + auto removed = _nodes.getAndRemove(uuid); + if (!removed.value()) { + auto message = "Unable to remove " + uuid + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + return absl::OkStatus(); +} + +absl::Status Nodes::heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat) { + auto val = _nodes.get(uuid); + if (!val.value()) { + auto message = "Unable to process heartbeat " + uuid + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + (*val)->updateHeartBeat(heartbeat); + return absl::OkStatus(); +} + +absl::Status Nodes::decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs) { + auto lock = std::unique_lock(_isDecommissioning, std::try_to_lock); + if (!lock.owns_lock()) { + return absl::UnavailableError("Already decommissioning."); + } + + // Mark hosts as decommissioning and determine host uris to collect objects. + std::vector gedsHostUris; + std::vector> hostsToDecommision; + for (const auto &node : nodes) { + auto existing = _nodes.get(node); + if (!existing.has_value()) { + return absl::UnavailableError("Unable to decommission node: " + node + + " since it does not exist!"); + } + (*existing)->setState(NodeState::Decommissioning); + hostsToDecommision.emplace_back(*existing); + gedsHostUris.emplace_back((*existing)->gedsHostUri()); + } + + // Find all buckets. + auto buckets = kvs->listBuckets(); + if (!buckets.ok()) { + LOG_ERROR("Unable to list buckets when decommissioning: ", buckets.status().message()); + return buckets.status(); + } + + // Find all objects that need to be relocated. + std::vector> objects; + for (const auto &bucketName : *buckets) { + auto bucket = kvs->getBucket(bucketName); + if (!bucket.ok()) { + continue; + } + (*bucket)->forall([&objects, &gedsHostUris, &bucketName](const utility::Path &path, + const geds::ObjectInfo &obj) { + // Do not relocate cached blocks. + if (path.name.starts_with("_$cachedblock$/")) { + return; + } + for (const auto &uri : gedsHostUris) { + if (obj.location == uri) { + objects.emplace_back( + new RelocatableObject{.bucket = bucketName, .key = path.name, .size = obj.size}); + } + } + }); + } + + std::sort(objects.begin(), objects.end(), + [](const std::shared_ptr &a, + const std::shared_ptr &b) { return a->size > b->size; }); + + // List all available nodes. + struct NodeTargetInfo { + std::shared_ptr node; + std::vector> objects; + size_t available; + size_t target; + }; + std::vector> targetNodes; + _nodes.forall([&targetNodes](std::shared_ptr &info) { + if (info->state() == NodeState::Registered) { + const auto [hb, _] = info->lastHeartBeat(); + + size_t storageAvailable = + (hb.storageUsed > hb.storageAllocated) ? 0 : hb.storageAllocated - hb.storageUsed; + targetNodes.emplace_back(new NodeTargetInfo{ + .node = info, .objects = {}, .available = storageAvailable, .target = 0}); + } + }); + if (targetNodes.size() == 0) { + return absl::UnavailableError("No target nodes available!"); + } + + // Sort target nodes based on available size. + auto targetNodeCompare = [](const std::shared_ptr &a, + const std::shared_ptr &b) { + auto aAvail = (a->available > a->target) ? a->available - a->target : 0; + auto bAvail = (b->available > b->target) ? b->available - b->target : 0; + return aAvail > bAvail || a->available > b->available; + }; + std::sort(targetNodes.begin(), targetNodes.end(), targetNodeCompare); + + // Simple binpacking by filling up the empty nodes. + std::vector> nonRelocatable; + for (auto &obj : objects) { + bool foundTarget = false; + for (auto &target : targetNodes) { + if (target->target + obj->size < target->available) { + foundTarget = true; + target->objects.push_back(obj); + target->target += obj->size; + break; + } + } + if (!foundTarget) { + LOG_ERROR("Unable to relocate ", obj->bucket, "/", obj->key, " (", obj->size, " bytes)"); + nonRelocatable.push_back(obj); + } + } + + std::atomic failures; + std::vector threads; + threads.reserve(targetNodes.size()); + for (auto &target : targetNodes) { + threads.emplace_back([target, &failures] { + auto status = target->node->downloadObjects(target->objects); + if (!status.ok()) { + failures += 1; + LOG_ERROR("Unable to relocate objects to ", target->node->host, + " uuid: ", target->node->uuid); + return; + } + }); + } + for (auto &thread : threads) { + thread.join(); + } + + if (failures == 0) { + for (auto &host : hostsToDecommision) { + host->setState(NodeState::ReadyForShutdown); + } + } + + return absl::OkStatus(); +} + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n) { + boost::json::array nv; + const auto &info = n.information(); + info.forall([&nv](const std::shared_ptr &node) { + nv.emplace_back(boost::json::value_from(node)); + }); + jv = {{"nodes", nv}}; +} + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n) { + tag_invoke(t, jv, *n); +} diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h new file mode 100644 index 00000000..5b6df20a --- /dev/null +++ b/src/metadataservice/Nodes.h @@ -0,0 +1,57 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "ConcurrentMap.h" +#include "MDSKVS.h" +#include "NodeInformation.h" +#include "geds.grpc.pb.h" + +class Nodes { + mutable std::shared_mutex _mutex; + + utility::ConcurrentMap> _nodes; + + std::mutex _isDecommissioning; + +public: + Nodes() = default; + Nodes(Nodes &) = delete; + Nodes(Nodes &&) = delete; + Nodes &operator=(Nodes &) = delete; + Nodes &operator=(Nodes &&) = delete; + ~Nodes() = default; + + const utility::ConcurrentMap> &information() const { + return _nodes; + }; + + absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); + absl::Status reregisterNode(const std::string &uuid); + absl::Status unregisterNode(const std::string &uuid); + absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); + absl::Status decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs); + + std::string toJson() const; +}; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n); + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/protos/geds.proto b/src/protos/geds.proto index 9255177c..0a4057c4 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -30,6 +30,7 @@ message ObjectID { string bucket = 1; string key = 2; } +message MultiObjectID { repeated ObjectID objects = 1; } message ObjectInfo { string location = 1; @@ -66,7 +67,35 @@ message ObjectResponse { optional StatusResponse error = 2; } +enum NodeState { + Register = 0; + Unregister = 1; + PrepareDecommission = 2; + ReadyForShutdown = 3; +} + +message NodeIdentifier { + string identifier = 1; + uint32 port = 2; +} + +message NodeStatus { + NodeIdentifier node = 1; + NodeState state = 2; + string uuid = 3; +} + +message HeartbeatMessage { + string uuid = 1; + uint64 memoryAllocated = 2; + uint64 memoryUsed = 3; + uint64 storageAllocated = 4; + uint64 storageUsed = 5; +} + service MetadataService { + rpc ConfigureNode(NodeStatus) returns (StatusResponse); + rpc Heartbeat(HeartbeatMessage) returns (StatusResponse); rpc GetConnectionInformation(EmptyParams) returns (ConnectionInformation); rpc RegisterObjectStore(ObjectStoreConfig) returns (StatusResponse); rpc ListObjectStores(EmptyParams) returns (AvailableObjectStoreConfigs); @@ -95,4 +124,8 @@ message TransportEndpoint { message AvailTransportEndpoints { repeated TransportEndpoint endpoint = 1; } -service GEDSService { rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); } +service GEDSService { + rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); + rpc DownloadObjects(MultiObjectID) returns (StatusResponse); + rpc DeleteObjectsLocally(MultiObjectID) returns (StatusResponse); +} diff --git a/src/python/create.py b/src/python/create.py index 3a0dba2b..5e031688 100644 --- a/src/python/create.py +++ b/src/python/create.py @@ -36,11 +36,12 @@ l = file.read(message_read, 0, len(message_read)) print(f"Read: {message_read.decode()}") -# Print path -# print(f"File: {file.path()}") +file.seal() -# Seal file +file2 = instance.create("bucket2", "testfile2") +file2.write(message_read, 0, len(message)) +file2.seal() -file.seal() +# Seal file sleep(1000000) diff --git a/src/utility/MDSKVSBucket.cpp b/src/utility/MDSKVSBucket.cpp index ce87c234..07b104b6 100644 --- a/src/utility/MDSKVSBucket.cpp +++ b/src/utility/MDSKVSBucket.cpp @@ -104,3 +104,13 @@ MDSKVSBucket::listObjects(const std::string &keyPrefix, char delimiter) { return std::make_pair(std::move(result), std::vector{commonPrefixes.begin(), commonPrefixes.end()}); } + +void MDSKVSBucket::forall( + std::function action) const { + auto lock = getReadLock(); + for (const auto &v : _map) { + const auto &key = v.first; + const auto &value = v.second; + action(key, value->obj); + } +} diff --git a/src/utility/MDSKVSBucket.h b/src/utility/MDSKVSBucket.h index 4eae24a8..dd3a515b 100644 --- a/src/utility/MDSKVSBucket.h +++ b/src/utility/MDSKVSBucket.h @@ -52,4 +52,6 @@ class MDSKVSBucket : public utility::RWConcurrentObjectAdaptor { absl::StatusOr lookup(const std::string &key); absl::StatusOr, std::vector>> listObjects(const std::string &keyPrefix, char delimiter = 0); + + void forall(std::function action) const; };