From bd06e39a44b16728bbba95622ed4acc2f5284f4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 26 Apr 2023 19:04:23 +0200 Subject: [PATCH 01/16] GEDS: Send heartbeats and add api to enable server decomissioning. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 81 ++++++++++ src/libgeds/GEDS.h | 4 + src/libgeds/MetadataService.cpp | 51 +++++++ src/libgeds/MetadataService.h | 7 + src/libgeds/Server.cpp | 30 ++++ src/metadataservice/CMakeLists.txt | 8 + src/metadataservice/GRPCServer.cpp | 49 ++++++- src/metadataservice/MDSHttpServer.cpp | 67 +++++++++ src/metadataservice/MDSHttpServer.h | 37 +++++ src/metadataservice/MDSHttpSession.cpp | 147 +++++++++++++++++++ src/metadataservice/MDSHttpSession.h | 46 ++++++ src/metadataservice/NodeInformation.cpp | 101 +++++++++++++ src/metadataservice/NodeInformation.h | 69 +++++++++ src/metadataservice/Nodes.cpp | 187 ++++++++++++++++++++++++ src/metadataservice/Nodes.h | 46 ++++++ src/protos/geds.proto | 30 +++- src/utility/MDSKVSBucket.cpp | 10 ++ src/utility/MDSKVSBucket.h | 2 + 18 files changed, 970 insertions(+), 2 deletions(-) create mode 100644 src/metadataservice/MDSHttpServer.cpp create mode 100644 src/metadataservice/MDSHttpServer.h create mode 100644 src/metadataservice/MDSHttpSession.cpp create mode 100644 src/metadataservice/MDSHttpSession.h create mode 100644 src/metadataservice/NodeInformation.cpp create mode 100644 src/metadataservice/NodeInformation.h create mode 100644 src/metadataservice/Nodes.cpp create mode 100644 src/metadataservice/Nodes.h diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 4d1c41a0..7853ac33 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -159,6 +159,8 @@ absl::Status GEDS::start() { // Update state. _state = ServiceState::Running; + (void)_metadataService.configureNode(_hostname, geds::rpc::NodeState::Register); + startStorageMonitoringThread(); auto st = syncObjectStoreConfigs(); @@ -921,6 +923,77 @@ void GEDS::relocate(std::shared_ptr handle, bool force) { (void)handle->relocate(); } +absl::Status GEDS::downloadObject(const std::string &bucket, const std::string &key) { + auto oldFile = openAsFileHandle(bucket, key); + if (!oldFile.ok()) { + return oldFile.status(); + } + auto newFile = createAsFileHandle(bucket, key, true /* overwrite */); + if (!newFile.ok()) { + return newFile.status(); + } + return (*oldFile)->download(*newFile); +} + +absl::Status GEDS::downloadObjects(std::vector objects) { + struct PullHelper { + std::mutex mutex; + std::condition_variable cv; + size_t nTasks; + size_t nErrors; + auto lock() { return std::unique_lock(mutex); } + }; + auto h = std::make_shared(); + { + auto lock = h->lock(); + h->nTasks = objects.size(); + h->nErrors = 0; + } + + auto self = shared_from_this(); + size_t off = 3 * _config.io_thread_pool_size; + + for (size_t offset = 0; offset < objects.size(); offset += off) { + auto rbegin = offset; + auto rend = rbegin + off; + if (rend > objects.size()) { + rend = objects.size(); + } + for (auto i = rbegin; i < rend; i++) { + auto file = objects[i]; + boost::asio::post(_ioThreadPool, [self, &file, h]() { + bool error = false; + try { + auto status = self->downloadObject(file.bucket, file.key); + if (!status.ok()) { + LOG_ERROR("Unable to download ", file.bucket, "/", file.key); + error = true; + } + } catch (...) { + LOG_ERROR("Encountered an exception when downloading ", file.bucket, "/", file.key); + error = true; + } + { + auto lock = h->lock(); + h->nTasks -= 1; + if (error) { + h->nErrors += 1; + } + } + h->cv.notify_all(); + }); + } + } + auto relocateLock = h->lock(); + h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); + LOG_INFO("Downloaded ", objects.size(), " objects, errors: ", h->nErrors); + if (h->nErrors) { + return absl::UnknownError("Some objects were not downloaded: Observed " + + std::to_string(h->nErrors) + " errors!"); + } + return absl::OkStatus(); +} + void GEDS::startStorageMonitoringThread() { _storageMonitoringThread = std::thread([&]() { auto statsLocalStorageUsed = geds::Statistics::createGauge("GEDS: Local Storage used"); @@ -968,6 +1041,14 @@ void GEDS::startStorageMonitoringThread() { *statsLocalMemoryFree = _memoryCounters.free; } + { + // Send heartbeat. + auto status = _metadataService.heartBeat(_hostname, _storageCounters, _memoryCounters); + if (!status.ok()) { + LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message()); + } + } + auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); if (memoryUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index 455700ab..6d978c24 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -321,6 +322,9 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj void relocate(bool force = false); void relocate(std::vector> &relocatable, bool force = false); void relocate(std::shared_ptr handle, bool force = false); + + absl::Status downloadObject(const std::string &bucket, const std::string &key); + absl::Status downloadObjects(std::vector objects); }; #endif // GEDS_GEDS_H diff --git a/src/libgeds/MetadataService.cpp b/src/libgeds/MetadataService.cpp index 82e7b66f..b73f25d0 100644 --- a/src/libgeds/MetadataService.cpp +++ b/src/libgeds/MetadataService.cpp @@ -136,6 +136,57 @@ absl::StatusOr MetadataService::getConnectionInformation() { return response.remoteaddress(); } +absl::Status MetadataService::configureNode(const std::string &identifier, + geds::rpc::NodeState state) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::NodeStatus request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + auto node = request.mutable_node(); + node->set_identifier(identifier); + + request.set_state(state); + + auto status = _stub->ConfigureNode(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to execute ConfigureNode: " + status.error_message()); + } + return convertStatus(response); +} + +absl::Status MetadataService::heartBeat(const std::string &identifier, + const StorageCounter &storage, + const StorageCounter &memory) { + METADATASERVICE_CHECK_CONNECTED; + + geds::rpc::HeartbeatMessage request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + auto node = request.mutable_node(); + node->set_identifier(identifier); + + { + auto lock = memory.getReadLock(); + request.set_memoryallocated(memory.allocated); + request.set_memoryused(memory.used); + } + + { + auto lock = storage.getReadLock(); + request.set_storageused(storage.used); + request.set_storageallocated(storage.allocated); + } + + auto status = _stub->Heartbeat(&context, request, &response); + if (!status.ok()) { + return absl::UnavailableError("Unable to send heart beat: " + status.error_message()); + } + return convertStatus(response); +} + absl::Status MetadataService::createBucket(const std::string_view &bucket) { METADATASERVICE_CHECK_CONNECTED; geds::rpc::Bucket request; diff --git a/src/libgeds/MetadataService.h b/src/libgeds/MetadataService.h index ec9772bb..f2fe031b 100644 --- a/src/libgeds/MetadataService.h +++ b/src/libgeds/MetadataService.h @@ -21,7 +21,9 @@ #include "Object.h" #include "ObjectStoreConfig.h" +#include "StorageCounter.h" #include "geds.grpc.pb.h" +#include "geds.pb.h" namespace geds { @@ -43,6 +45,11 @@ class MetadataService { absl::Status disconnect(); + absl::Status configureNode(const std::string &identifier, geds::rpc::NodeState state); + + absl::Status heartBeat(const std::string &identifier, const StorageCounter &storage, + const StorageCounter &memory); + absl::StatusOr getConnectionInformation(); absl::Status registerObjectStoreConfig(const ObjectStoreConfig &mapping); diff --git a/src/libgeds/Server.cpp b/src/libgeds/Server.cpp index 663fd29b..a2bc1251 100644 --- a/src/libgeds/Server.cpp +++ b/src/libgeds/Server.cpp @@ -30,6 +30,7 @@ #include "Platform.h" #include "Ports.h" #include "Status.h" +#include "absl/status/status.h" #include "geds.grpc.pb.h" #include "geds.pb.h" @@ -63,6 +64,35 @@ class ServerImpl final : public geds::rpc::GEDSService::Service { return grpc::Status::OK; } + ::grpc::Status DownloadObjects(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to pull ", request->objects().size(), " objects."); + + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + auto status = _geds->downloadObjects(objects); + convertStatus(response, status); + return grpc::Status::OK; + }; + + ::grpc::Status DeleteObjectsLocally(::grpc::ServerContext *context, + const ::geds::rpc::MultiObjectID *request, + ::geds::rpc::StatusResponse *response) override { + LOG_INFO(context->peer(), " has requested to delete ", request->objects().size(), " objects."); + + (void)context; + (void)request; + (void)response; + auto status = absl::UnimplementedError("DeleteObjectsLocally: NYI"); + convertStatus(response, status); + return grpc::Status::OK; + } + public: ServerImpl(std::shared_ptr geds, Server &server) : _geds(geds), _server(server) {} diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 07d70ae8..55b846d1 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -6,6 +6,14 @@ set(SOURCES GRPCServer.cpp GRPCServer.h + MDSHttpServer.cpp + MDSHttpServer.h + MDSHttpSession.cpp + MDSHttpSession.h + NodeInformation.cpp + NodeInformation.h + Nodes.cpp + Nodes.h ObjectStoreHandler.cpp ObjectStoreHandler.h S3Helper.cpp diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index fae039c5..90d6c154 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -20,6 +20,8 @@ #include "FormatISO8601.h" #include "Logging.h" +#include "MDSHttpServer.h" +#include "Nodes.h" #include "ObjectStoreConfig.h" #include "ObjectStoreHandler.h" #include "ParseGRPC.h" @@ -37,8 +39,16 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { std::shared_ptr _kvs; ObjectStoreHandler _objectStoreHandler; + Nodes _nodes; + geds::MDSHttpServer _httpServer; + public: - MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs) {} + MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes) { + auto status = _httpServer.start(); + if (!status.ok()) { + LOG_ERROR("Unable to start HTTP Server!"); + } + } geds::ObjectID convert(const ::geds::rpc::ObjectID *r) { return geds::ObjectID(r->bucket(), r->key()); @@ -53,6 +63,43 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { } protected: + grpc::Status ConfigureNode(::grpc::ServerContext *context, const ::geds::rpc::NodeStatus *request, + ::geds::rpc::StatusResponse *response) override { + LOG_ACCESS("ConfigureNode"); + const auto &identifier = request->node().identifier(); + const auto state = request->state(); + + absl::Status status; + if (state == geds::rpc::NodeState::Register) { + status = _nodes.registerNode(identifier); + } else if (state == geds::rpc::NodeState::Unregister) { + status = _nodes.unregisterNode(identifier); + } else { + LOG_ERROR("Invalid state ", state); + status = absl::InvalidArgumentError("Invalid state: " + std::to_string(state)); + } + + convertStatus(response, status); + return grpc::Status::OK; + } + + grpc::Status Heartbeat(::grpc::ServerContext *context, + const ::geds::rpc::HeartbeatMessage *request, + ::geds::rpc::StatusResponse *response) override { + LOG_ACCESS("Heartbeat: ", request->node().identifier()); + + const auto &identifier = request->node().identifier(); + NodeHeartBeat val; + val.memoryAllocated = request->memoryallocated(); + val.memoryUsed = request->memoryused(); + val.storageAllocated = request->storageallocated(); + val.storageUsed = request->storageused(); + + auto status = _nodes.heartbeat(identifier, std::move(val)); + convertStatus(response, status); + return grpc::Status::OK; + } + grpc::Status GetConnectionInformation(::grpc::ServerContext *context, const ::geds::rpc::EmptyParams * /* unused request */, ::geds::rpc::ConnectionInformation *response) override { diff --git a/src/metadataservice/MDSHttpServer.cpp b/src/metadataservice/MDSHttpServer.cpp new file mode 100644 index 00000000..80db7c5f --- /dev/null +++ b/src/metadataservice/MDSHttpServer.cpp @@ -0,0 +1,67 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpServer.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "MDSHttpSession.h" +#include "Nodes.h" + +namespace geds { + +MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes) : _port(port), _nodes(nodes) {} + +absl::Status MDSHttpServer::start() { + if (_acceptor != nullptr) { + return absl::UnknownError("The server is already running!"); + } + try { + auto host = boost::asio::ip::make_address("0.0.0.0"); + _acceptor = std::unique_ptr( + new boost::asio::ip::tcp::acceptor(_ioContext, {host, _port})); + _thread = std::thread([&] { + accept(); + _ioContext.run(); + }); + } catch (boost::exception &e) { + // Workaround until GEDS properly supports multiple processes. + auto diag = boost::diagnostic_information(e, false); + return absl::InternalError("Unable to start webserver: " + diag); + } + return absl::OkStatus(); +} + +void MDSHttpServer::stop() { + _ioContext.stop(); + _acceptor->close(); + _thread.join(); +} + +void MDSHttpServer::accept() { + _acceptor->async_accept(boost::asio::make_strand(_ioContext), + [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { + if (ec) { + LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); + return; + } + std::make_shared(std::move(socket), _nodes)->start(); + accept(); + }); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpServer.h b/src/metadataservice/MDSHttpServer.h new file mode 100644 index 00000000..983cfac1 --- /dev/null +++ b/src/metadataservice/MDSHttpServer.h @@ -0,0 +1,37 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include +#include + +#include "MDSHttpSession.h" + +class Nodes; + +namespace geds { + +class MDSHttpServer { + uint16_t _port; + + boost::asio::io_context _ioContext{1}; + std::unique_ptr _acceptor = nullptr; + std::thread _thread; + + Nodes &_nodes; + +public: + MDSHttpServer(uint16_t port, Nodes &nodes); + absl::Status start(); + void stop(); + +private: + void accept(); +}; + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp new file mode 100644 index 00000000..e961841b --- /dev/null +++ b/src/metadataservice/MDSHttpSession.cpp @@ -0,0 +1,147 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "MDSHttpSession.h" + +#include + +#include "Logging.h" +#include "Nodes.h" +#include "Statistics.h" + +namespace geds { + +MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes) + : _stream(std::move(socket)), _nodes(nodes) {} + +MDSHttpSession::~MDSHttpSession() { close(); } + +void MDSHttpSession::start() { + LOG_DEBUG("Start connection"); + auto self = shared_from_this(); + boost::asio::dispatch(_stream.get_executor(), + boost::beast::bind_front_handler(&MDSHttpSession::awaitRequest, self)); +} + +void MDSHttpSession::awaitRequest() { + auto self = shared_from_this(); + _request = {}; + _stream.expires_after(std::chrono::seconds(10)); + + boost::beast::http::async_read( + _stream, _buffer, _request, + [self](boost::beast::error_code ec, std::size_t /* bytes_transferred */) { + if (ec == boost::beast::http::error::end_of_stream) { + return; + } + if (ec) { + LOG_ERROR("Failed reading stream", ec.message()); + return; + } + self->handleRequest(); + }); +} + +void MDSHttpSession::prepareHtmlReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + + boost::beast::ostream(_response.body()) << "" + << "" + << "GEDS Service" + << "" + << "" + << "
"
+                                          << "GEDS Metadata Service\n"
+                                          << "=====================\n"
+                                          << "\n";
+
+  const auto &info = _nodes.information();
+  if (info.size()) {
+    boost::beast::ostream(_response.body()) << "Registered nodes:\n";
+    info.forall([&](const std::shared_ptr &node) {
+      const auto &[heartBeat, ts] = node->lastHeartBeat();
+      boost::beast::ostream(_response.body())
+          << " - " << node->identifier << " "                         //
+          << "Allocated: " << heartBeat.storageAllocated << " "       //
+          << "Used: " << heartBeat.storageUsed << " "                 //
+          << "Memory Allocated: " << heartBeat.memoryAllocated << " " //
+          << "Memory Used: " << heartBeat.memoryUsed << "\n";
+    });
+  }
+  boost::beast::ostream(_response.body()) << "
" + << "" << std::endl; + handleWrite(); +} + +void MDSHttpSession::handleRequest() { + if (_request.method() != boost::beast::http::verb::get) { + return prepareError(boost::beast::http::status::bad_request, "Invalid method."); + } + if (_request.target().empty() || _request.target()[0] != '/') { + return prepareError(boost::beast::http::status::bad_request, "Invalid path."); + } + + if (_request.target() == "/") { + return prepareHtmlReply(); + } + if (_request.target() == "/metrics") { + return prepareMetricsReply(); + } + + return prepareError(boost::beast::http::status::not_found, "Invalid path"); +} + +void MDSHttpSession::prepareMetricsReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "plain/text"); + _response.keep_alive(_request.keep_alive()); + + std::stringstream stream; + Statistics::get().prometheusMetrics(stream); + boost::beast::ostream(_response.body()) << stream.str(); + handleWrite(); +} + +void MDSHttpSession::prepareError(boost::beast::http::status status, std::string message) { + _response.result(status); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/html"); + _response.keep_alive(_request.keep_alive()); + boost::beast::ostream(_response.body()) << message; + + return handleWrite(); +} + +void MDSHttpSession::handleWrite() { + auto self = shared_from_this(); + _response.content_length(_response.body().size()); + + boost::beast::http::async_write( + _stream, _response, + [self](boost::beast::error_code ec, std::size_t /* unused bytesTransferred */) { + if (ec) { + LOG_ERROR("Error ", ec.message()); + return; + } + if (self->_request.keep_alive()) { + self->awaitRequest(); + } + self->_buffer.clear(); + }); +} + +void MDSHttpSession::close() { + LOG_DEBUG("Closing connection"); + + boost::beast::error_code ec; + _stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); + _stream.socket().close(); +} + +} // namespace geds diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h new file mode 100644 index 00000000..f9714a11 --- /dev/null +++ b/src/metadataservice/MDSHttpSession.h @@ -0,0 +1,46 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +class Nodes; + +namespace geds { + +class MDSHttpSession : public std::enable_shared_from_this { + boost::beast::tcp_stream _stream; + boost::beast::flat_buffer _buffer{4096}; + boost::beast::http::request _request; + boost::beast::http::response _response; + + Nodes &_nodes; + +public: + MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes); + ~MDSHttpSession(); + void start(); + + void awaitRequest(); + void handleRequest(); + void prepareHtmlReply(); + void prepareMetricsReply(); + void prepareError(boost::beast::http::status status, std::string message); + void handleWrite(); + + void close(); +}; +} // namespace geds diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp new file mode 100644 index 00000000..ca82a228 --- /dev/null +++ b/src/metadataservice/NodeInformation.cpp @@ -0,0 +1,101 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "NodeInformation.h" + +#include "Logging.h" +#include "Status.h" + +NodeInformation::NodeInformation(std::string identifierArg) + : identifier(std::move(identifierArg)) {} + +absl::Status NodeInformation::connect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() != nullptr) { + // Already connected. + return absl::OkStatus(); + } + try { + _channel = grpc::CreateChannel(identifier, grpc::InsecureChannelCredentials()); + auto success = + _channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(10)); + if (!success) { + // Destroy channel. + _channel = nullptr; + return absl::UnavailableError("Could not connect to " + identifier + "."); + } + _stub = geds::rpc::GEDSService::NewStub(_channel); + } catch (const std::exception &e) { + _channel = nullptr; + return absl::UnavailableError("Could not open channel with " + identifier + + ". Reason: " + e.what()); + } + return absl::OkStatus(); +} + +absl::Status NodeInformation::disconnect() { + auto channelLock = std::lock_guard(_connectionMutex); + auto lock = getWriteLock(); + if (_channel.get() == nullptr) { + // Already disconnected. + return absl::OkStatus(); + } + _stub.release(); + _channel = nullptr; + return absl::OkStatus(); +} + +void NodeInformation::setState(NodeState state) { + auto lock = getWriteLock(); + _state = state; +} +NodeState NodeInformation::state() { + auto lock = getReadLock(); + return _state; +} + +void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { + auto lock = getWriteLock(); + _heartBeat = heartBeat; + _lastCheckin = std::chrono::system_clock::now(); +} + +std::tuple> +NodeInformation::lastHeartBeat() { + auto lock = getReadLock(); + return std::make_tuple(_heartBeat, _lastCheckin); +} + +std::chrono::time_point NodeInformation::lastCheckin() { + auto lock = getReadLock(); + return _lastCheckin; +} + +absl::Status +NodeInformation::downloadObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DownloadObjects(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object pull grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object pull!"); + } + + return convertStatus(response); +} diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h new file mode 100644 index 00000000..040ce943 --- /dev/null +++ b/src/metadataservice/NodeInformation.h @@ -0,0 +1,69 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "RWConcurrentObjectAdaptor.h" +#include "geds.grpc.pb.h" + +enum class NodeState { Registered, Decomissioning, Unknown }; + +struct NodeHeartBeat { + size_t memoryAllocated{0}; + size_t memoryUsed{0}; + size_t storageAllocated{0}; + size_t storageUsed{0}; +}; + +struct RelocatableObject { + const std::string &bucket; + std::string key; + size_t size; +}; + +class NodeInformation : public utility::RWConcurrentObjectAdaptor { + std::mutex _connectionMutex; + std::shared_ptr _channel{nullptr}; + std::unique_ptr _stub{nullptr}; + + // Subject to state mutex + NodeState _state{NodeState::Unknown}; + NodeHeartBeat _heartBeat; + std::chrono::time_point _lastCheckin; + // End subject to state mutex +public: + NodeInformation(std::string identifier); + NodeInformation(NodeInformation &) = delete; + NodeInformation(NodeInformation &&) = delete; + NodeInformation &operator=(NodeInformation &) = delete; + NodeInformation &operator=(NodeInformation &&) = delete; + + absl::Status connect(); + absl::Status disconnect(); + + // Subject to state mutex + absl::Status queryHeartBeat(); + const std::string identifier; + + void setState(NodeState state); + NodeState state(); + + void updateHeartBeat(const NodeHeartBeat &heartBeat); + std::tuple> lastHeartBeat(); + std::chrono::time_point lastCheckin(); + // End subject to state mutex + + absl::Status downloadObjects(const std::vector> &objects); +}; diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp new file mode 100644 index 00000000..a0f774d1 --- /dev/null +++ b/src/metadataservice/Nodes.cpp @@ -0,0 +1,187 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "Nodes.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "Logging.h" +#include "geds.grpc.pb.h" + +absl::Status Nodes::registerNode(const std::string &identifier) { + auto val = std::make_shared(identifier); + auto existing = _nodes.insertOrExists(identifier, val); + if (existing.get() != val.get()) { + // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); + if (existing->state() == NodeState::Decomissioning) { + // Allow reregistering decomissioned nodes. + auto connect = val->connect(); + if (!connect.ok()) { + LOG_ERROR("Unable to establish backchannel to " + identifier + + " unable to decomission node!"); + } + _nodes.insertOrReplace(identifier, val); + + return absl::OkStatus(); + } + auto message = "Node " + identifier + " was already registered!"; + LOG_ERROR(message); + return absl::AlreadyExistsError(message); + } + + auto connect = val->connect(); + if (!connect.ok()) { + LOG_ERROR("Unable to establish backchannel to " + identifier + " unable to decomission node!"); + } + return absl::OkStatus(); +} + +absl::Status Nodes::unregisterNode(const std::string &identifier) { + auto removed = _nodes.getAndRemove(identifier); + if (!removed.value()) { + auto message = "Unable to remove " + identifier + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + return absl::OkStatus(); +} + +absl::Status Nodes::heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat) { + auto val = _nodes.get(identifier); + if (!val.value()) { + auto message = "Unable to process heartbeat " + identifier + " not found!"; + LOG_ERROR(message); + return absl::NotFoundError(message); + } + (*val)->updateHeartBeat(heartbeat); + return absl::OkStatus(); +} + +absl::Status Nodes::decomissionNodes(const std::vector &nodes, + std::shared_ptr kvs) { + if (!_isDecommissioning.try_lock()) { + return absl::UnavailableError("Already decomissioning."); + } + + for (const auto &node : nodes) { + auto existing = _nodes.get(node); + if (!existing.has_value()) { + LOG_ERROR("Unable to decomission: Node " + node + " since it does not exist!"); + continue; + } + (*existing)->setState(NodeState::Decomissioning); + } + + // Prefix nodes with geds:// + std::vector prefixedNodes(nodes.size()); + for (const auto &node : nodes) { + prefixedNodes.emplace_back("geds://" + node); + } + + // Find all buckets. + auto buckets = kvs->listBuckets(); + if (!buckets.ok()) { + LOG_ERROR("Unable to list buckets when decomissioning: ", buckets.status().message()); + _isDecommissioning.unlock(); + return buckets.status(); + } + + // Find all objects that need to be relocated. + std::vector> objects; + for (const auto &bucketName : *buckets) { + auto bucket = kvs->getBucket(bucketName); + if (!bucket.ok()) { + continue; + } + (*bucket)->forall([&objects, &prefixedNodes, &bucketName](const utility::Path &path, + const geds::ObjectInfo &obj) { + // Do not relocate cached blocks. + if (path.name.starts_with("_$cachedblock$/")) { + return; + } + for (const auto &n : prefixedNodes) { + if (obj.location.starts_with(n)) { + objects.emplace_back( + new RelocatableObject{.bucket = bucketName, .key = path.name, .size = obj.size}); + } + } + }); + } + + std::sort(objects.begin(), objects.end(), + [](const std::shared_ptr &a, + const std::shared_ptr &b) { return a->size > b->size; }); + + // List all available nodes. + struct NodeTargetInfo { + std::shared_ptr node; + std::vector> objects; + size_t available; + size_t target; + }; + std::vector> targetNodes; + _nodes.forall([&targetNodes](std::shared_ptr &info) { + if (info->state() == NodeState::Registered) { + const auto [hb, _] = info->lastHeartBeat(); + + size_t storageAvailable = + (hb.storageUsed > hb.storageAllocated) ? 0 : hb.storageAllocated - hb.storageUsed; + targetNodes.emplace_back(new NodeTargetInfo{ + .node = info, .objects = {}, .available = storageAvailable, .target = 0}); + } + }); + if (targetNodes.size() == 0) { + return absl::UnavailableError("No target nodes available!"); + } + + // Sort target nodes based on available size. + auto targetNodeCompare = [](const std::shared_ptr &a, + const std::shared_ptr &b) { + auto aAvail = (a->available > a->target) ? a->available - a->target : 0; + auto bAvail = (b->available > b->target) ? b->available - b->target : 0; + return aAvail > bAvail || a->available > b->available; + }; + std::sort(targetNodes.begin(), targetNodes.end(), targetNodeCompare); + + // Simple binpacking by filling up the empty nodes. + std::vector> nonRelocatable; + for (auto &obj : objects) { + bool foundTarget = false; + for (auto &target : targetNodes) { + if (target->target + obj->size < target->available) { + foundTarget = true; + target->objects.push_back(obj); + target->target += obj->size; + } + } + if (!foundTarget) { + LOG_ERROR("Unable to relocate ", obj->bucket, "/", obj->key, " (", obj->size, " bytes)"); + nonRelocatable.push_back(obj); + } + } + + std::vector threads; + threads.reserve(targetNodes.size()); + for (auto &target : targetNodes) { + threads.emplace_back([target] { + auto status = target->node->downloadObjects(target->objects); + if (!status.ok()) { + LOG_ERROR("Unable to relocate objects to ", target->node->identifier); + } + }); + } + for (auto &thread : threads) { + thread.join(); + } + + _isDecommissioning.unlock(); + return absl::OkStatus(); +} diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h new file mode 100644 index 00000000..a16d913b --- /dev/null +++ b/src/metadataservice/Nodes.h @@ -0,0 +1,46 @@ +/** + * Copyright 2023- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ConcurrentMap.h" +#include "MDSKVS.h" +#include "NodeInformation.h" +#include "geds.grpc.pb.h" + +class Nodes { + mutable std::shared_mutex _mutex; + + utility::ConcurrentMap> _nodes; + + std::mutex _isDecommissioning; + +public: + Nodes() = default; + Nodes(Nodes &) = delete; + Nodes(Nodes &&) = delete; + Nodes &operator=(Nodes &) = delete; + Nodes &operator=(Nodes &&) = delete; + ~Nodes() = default; + + const utility::ConcurrentMap> &information() const { + return _nodes; + }; + + absl::Status registerNode(const std::string &identifier); + absl::Status unregisterNode(const std::string &identifier); + absl::Status heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat); + absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); +}; diff --git a/src/protos/geds.proto b/src/protos/geds.proto index 9255177c..481502e4 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -30,6 +30,7 @@ message ObjectID { string bucket = 1; string key = 2; } +message MultiObjectID { repeated ObjectID objects = 1; } message ObjectInfo { string location = 1; @@ -66,7 +67,30 @@ message ObjectResponse { optional StatusResponse error = 2; } +enum NodeState { + Register = 0; + Unregister = 1; + PrepareDecomission = 2; +} + +message NodeIdentifier { string identifier = 1; } + +message NodeStatus { + NodeIdentifier node = 1; + NodeState state = 2; +} + +message HeartbeatMessage { + NodeIdentifier node = 1; + uint64 memoryAllocated = 2; + uint64 memoryUsed = 3; + uint64 storageAllocated = 4; + uint64 storageUsed = 5; +} + service MetadataService { + rpc ConfigureNode(NodeStatus) returns (StatusResponse); + rpc Heartbeat(HeartbeatMessage) returns (StatusResponse); rpc GetConnectionInformation(EmptyParams) returns (ConnectionInformation); rpc RegisterObjectStore(ObjectStoreConfig) returns (StatusResponse); rpc ListObjectStores(EmptyParams) returns (AvailableObjectStoreConfigs); @@ -95,4 +119,8 @@ message TransportEndpoint { message AvailTransportEndpoints { repeated TransportEndpoint endpoint = 1; } -service GEDSService { rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); } +service GEDSService { + rpc GetAvailEndpoints(EmptyParams) returns (AvailTransportEndpoints); + rpc DownloadObjects(MultiObjectID) returns (StatusResponse); + rpc DeleteObjectsLocally(MultiObjectID) returns (StatusResponse); +} diff --git a/src/utility/MDSKVSBucket.cpp b/src/utility/MDSKVSBucket.cpp index ce87c234..4025b870 100644 --- a/src/utility/MDSKVSBucket.cpp +++ b/src/utility/MDSKVSBucket.cpp @@ -104,3 +104,13 @@ MDSKVSBucket::listObjects(const std::string &keyPrefix, char delimiter) { return std::make_pair(std::move(result), std::vector{commonPrefixes.begin(), commonPrefixes.end()}); } + +void MDSKVSBucket::forall( + std::function action) { + auto lock = getReadLock(); + for (const auto &v : _map) { + const auto &key = v.first; + const auto &value = v.second; + action(key, value->obj); + } +} diff --git a/src/utility/MDSKVSBucket.h b/src/utility/MDSKVSBucket.h index 4eae24a8..bcba00fe 100644 --- a/src/utility/MDSKVSBucket.h +++ b/src/utility/MDSKVSBucket.h @@ -52,4 +52,6 @@ class MDSKVSBucket : public utility::RWConcurrentObjectAdaptor { absl::StatusOr lookup(const std::string &key); absl::StatusOr, std::vector>> listObjects(const std::string &keyPrefix, char delimiter = 0); + + void forall(std::function action); }; From 50ec5f4a742a0d42a1827dbfb8287532872d8ff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 11:50:02 +0200 Subject: [PATCH 02/16] Integrate UUID with HeartBeat and Expose api/nodes json endpoint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 5 ++- src/libgeds/MetadataService.cpp | 14 +++---- src/libgeds/MetadataService.h | 5 ++- src/metadataservice/CMakeLists.txt | 2 + src/metadataservice/GRPCServer.cpp | 13 +++--- src/metadataservice/MDSHttpSession.cpp | 40 +++++++++++++----- src/metadataservice/MDSHttpSession.h | 1 + src/metadataservice/NodeInformation.cpp | 36 +++++++++++++---- src/metadataservice/NodeInformation.h | 22 +++++++--- src/metadataservice/Nodes.cpp | 54 +++++++++++++++++-------- src/metadataservice/Nodes.h | 15 +++++-- src/protos/geds.proto | 8 +++- 12 files changed, 154 insertions(+), 61 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 7853ac33..13084a1e 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -159,7 +159,8 @@ absl::Status GEDS::start() { // Update state. _state = ServiceState::Running; - (void)_metadataService.configureNode(_hostname, geds::rpc::NodeState::Register); + (void)_metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Register); startStorageMonitoringThread(); @@ -1043,7 +1044,7 @@ void GEDS::startStorageMonitoringThread() { { // Send heartbeat. - auto status = _metadataService.heartBeat(_hostname, _storageCounters, _memoryCounters); + auto status = _metadataService.heartBeat(uuid, _storageCounters, _memoryCounters); if (!status.ok()) { LOG_ERROR("Unable to send heartbeat to metadata service: ", status.message()); } diff --git a/src/libgeds/MetadataService.cpp b/src/libgeds/MetadataService.cpp index b73f25d0..d1d339e5 100644 --- a/src/libgeds/MetadataService.cpp +++ b/src/libgeds/MetadataService.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include "GEDS.h" #include "Logging.h" @@ -136,8 +135,8 @@ absl::StatusOr MetadataService::getConnectionInformation() { return response.remoteaddress(); } -absl::Status MetadataService::configureNode(const std::string &identifier, - geds::rpc::NodeState state) { +absl::Status MetadataService::configureNode(const std::string &uuid, const std::string &identifier, + uint16_t port, geds::rpc::NodeState state) { METADATASERVICE_CHECK_CONNECTED; geds::rpc::NodeStatus request; @@ -146,8 +145,10 @@ absl::Status MetadataService::configureNode(const std::string &identifier, auto node = request.mutable_node(); node->set_identifier(identifier); + node->set_port(port); request.set_state(state); + request.set_uuid(uuid); auto status = _stub->ConfigureNode(&context, request, &response); if (!status.ok()) { @@ -156,8 +157,7 @@ absl::Status MetadataService::configureNode(const std::string &identifier, return convertStatus(response); } -absl::Status MetadataService::heartBeat(const std::string &identifier, - const StorageCounter &storage, +absl::Status MetadataService::heartBeat(const std::string &uuid, const StorageCounter &storage, const StorageCounter &memory) { METADATASERVICE_CHECK_CONNECTED; @@ -165,9 +165,7 @@ absl::Status MetadataService::heartBeat(const std::string &identifier, geds::rpc::StatusResponse response; grpc::ClientContext context; - auto node = request.mutable_node(); - node->set_identifier(identifier); - + request.set_uuid(uuid); { auto lock = memory.getReadLock(); request.set_memoryallocated(memory.allocated); diff --git a/src/libgeds/MetadataService.h b/src/libgeds/MetadataService.h index f2fe031b..aa7afa48 100644 --- a/src/libgeds/MetadataService.h +++ b/src/libgeds/MetadataService.h @@ -45,9 +45,10 @@ class MetadataService { absl::Status disconnect(); - absl::Status configureNode(const std::string &identifier, geds::rpc::NodeState state); + absl::Status configureNode(const std::string &uuid, const std::string &identifier, uint16_t port, + geds::rpc::NodeState state); - absl::Status heartBeat(const std::string &identifier, const StorageCounter &storage, + absl::Status heartBeat(const std::string &uuid, const StorageCounter &storage, const StorageCounter &memory); absl::StatusOr getConnectionInformation(); diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 55b846d1..3b67d783 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -23,7 +23,9 @@ set(SOURCES add_library(libmetadataservice STATIC ${SOURCES}) target_link_libraries(libmetadataservice PUBLIC + magic_enum::magic_enum ${GRPC_LIBRARIES} + ${Boost_LIBRARIES} geds_utility geds_proto geds_s3 diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 90d6c154..6a232b15 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -5,6 +5,7 @@ #include "GRPCServer.h" +#include #include #include #include @@ -67,13 +68,15 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { ::geds::rpc::StatusResponse *response) override { LOG_ACCESS("ConfigureNode"); const auto &identifier = request->node().identifier(); + uint16_t port = request->node().port(); const auto state = request->state(); + const auto &uuid = request->uuid(); absl::Status status; if (state == geds::rpc::NodeState::Register) { - status = _nodes.registerNode(identifier); + status = _nodes.registerNode(uuid, identifier, port); } else if (state == geds::rpc::NodeState::Unregister) { - status = _nodes.unregisterNode(identifier); + status = _nodes.unregisterNode(uuid); } else { LOG_ERROR("Invalid state ", state); status = absl::InvalidArgumentError("Invalid state: " + std::to_string(state)); @@ -86,16 +89,16 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { grpc::Status Heartbeat(::grpc::ServerContext *context, const ::geds::rpc::HeartbeatMessage *request, ::geds::rpc::StatusResponse *response) override { - LOG_ACCESS("Heartbeat: ", request->node().identifier()); + LOG_ACCESS("Heartbeat: ", request->uuid()); - const auto &identifier = request->node().identifier(); + const auto &uuid = request->uuid(); NodeHeartBeat val; val.memoryAllocated = request->memoryallocated(); val.memoryUsed = request->memoryused(); val.storageAllocated = request->storageallocated(); val.storageUsed = request->storageused(); - auto status = _nodes.heartbeat(identifier, std::move(val)); + auto status = _nodes.heartbeat(uuid, std::move(val)); convertStatus(response, status); return grpc::Status::OK; } diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index e961841b..cc483fd5 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -66,7 +66,8 @@ void MDSHttpSession::prepareHtmlReply() { info.forall([&](const std::shared_ptr &node) { const auto &[heartBeat, ts] = node->lastHeartBeat(); boost::beast::ostream(_response.body()) - << " - " << node->identifier << " " // + << " - " << node->uuid << ": " // + << node->host << ":" << node->port << " " // << "Allocated: " << heartBeat.storageAllocated << " " // << "Used: " << heartBeat.storageUsed << " " // << "Memory Allocated: " << heartBeat.memoryAllocated << " " // @@ -78,22 +79,41 @@ void MDSHttpSession::prepareHtmlReply() { handleWrite(); } +void MDSHttpSession::prepareApiNodesReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "text/csv"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_nodes); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + + handleWrite(); +} + void MDSHttpSession::handleRequest() { - if (_request.method() != boost::beast::http::verb::get) { - return prepareError(boost::beast::http::status::bad_request, "Invalid method."); - } if (_request.target().empty() || _request.target()[0] != '/') { return prepareError(boost::beast::http::status::bad_request, "Invalid path."); } - if (_request.target() == "/") { - return prepareHtmlReply(); + if (_request.method() == boost::beast::http::verb::get) { + if (_request.target() == "/") { + return prepareHtmlReply(); + } + if (_request.target() == "/api/nodes") { + return prepareApiNodesReply(); + } + if (_request.target() == "/metrics") { + return prepareMetricsReply(); + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); } - if (_request.target() == "/metrics") { - return prepareMetricsReply(); + if (_request.method() == boost::beast::http::verb::post) { + if (_request.target() == "/api/decomission") { + } + return prepareError(boost::beast::http::status::not_found, "Invalid path"); } - - return prepareError(boost::beast::http::status::not_found, "Invalid path"); + return prepareError(boost::beast::http::status::bad_request, "Invalid method."); } void MDSHttpSession::prepareMetricsReply() { diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index f9714a11..116f3e39 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -38,6 +38,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void handleRequest(); void prepareHtmlReply(); void prepareMetricsReply(); + void prepareApiNodesReply(); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index ca82a228..e7711d97 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -8,8 +8,8 @@ #include "Logging.h" #include "Status.h" -NodeInformation::NodeInformation(std::string identifierArg) - : identifier(std::move(identifierArg)) {} +NodeInformation::NodeInformation(std::string uuid, std::string hostArg, uint16_t port) + : uuid(std::move(uuid)), host(std::move(hostArg)), port(port) {} absl::Status NodeInformation::connect() { auto channelLock = std::lock_guard(_connectionMutex); @@ -18,20 +18,20 @@ absl::Status NodeInformation::connect() { // Already connected. return absl::OkStatus(); } + auto loc = host + ":" + std::to_string(port); try { - _channel = grpc::CreateChannel(identifier, grpc::InsecureChannelCredentials()); + _channel = grpc::CreateChannel(loc, grpc::InsecureChannelCredentials()); auto success = _channel->WaitForConnected(std::chrono::system_clock::now() + std::chrono::seconds(10)); if (!success) { // Destroy channel. _channel = nullptr; - return absl::UnavailableError("Could not connect to " + identifier + "."); + return absl::UnavailableError("Could not connect to " + loc + "."); } _stub = geds::rpc::GEDSService::NewStub(_channel); } catch (const std::exception &e) { _channel = nullptr; - return absl::UnavailableError("Could not open channel with " + identifier + - ". Reason: " + e.what()); + return absl::UnavailableError("Could not open channel with " + loc + ". Reason: " + e.what()); } return absl::OkStatus(); } @@ -52,7 +52,7 @@ void NodeInformation::setState(NodeState state) { auto lock = getWriteLock(); _state = state; } -NodeState NodeInformation::state() { +NodeState NodeInformation::state() const { auto lock = getReadLock(); return _state; } @@ -64,7 +64,7 @@ void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { } std::tuple> -NodeInformation::lastHeartBeat() { +NodeInformation::lastHeartBeat() const { auto lock = getReadLock(); return std::make_tuple(_heartBeat, _lastCheckin); } @@ -99,3 +99,23 @@ NodeInformation::downloadObjects(const std::vector const &n) { + const auto &[h, timeStamp] = n->lastHeartBeat(); + + auto storageFree = h.storageUsed > h.storageAllocated ? 0 : h.storageAllocated - h.storageUsed; + auto memoryFree = h.memoryUsed > h.memoryAllocated ? 0 : h.memoryAllocated - h.memoryUsed; + + jv = {{"uuid", n->uuid}, + {"host", n->host}, + {"port", n->port}, + {"storageAllocated", h.storageAllocated}, + {"storageUsed", h.storageUsed}, + {"storageFree", storageFree}, + {"memoryAllocated", h.memoryAllocated}, + {"memoryUsed", h.memoryUsed}, + {"memoryFree", memoryFree}, + {"lastCheckIn", toISO8601String(timeStamp)}, + {"state", std::string{magic_enum::enum_name(n->state())}}}; +} diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 040ce943..73742a65 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -6,16 +6,21 @@ #pragma once #include +#include #include #include #include #include +#include #include +#include #include -#include +#include +#include "FormatISO8601.h" #include "RWConcurrentObjectAdaptor.h" +#include "boost/json/detail/value_from.hpp" #include "geds.grpc.pb.h" enum class NodeState { Registered, Decomissioning, Unknown }; @@ -44,7 +49,7 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { std::chrono::time_point _lastCheckin; // End subject to state mutex public: - NodeInformation(std::string identifier); + NodeInformation(std::string uuid, std::string host, uint16_t port); NodeInformation(NodeInformation &) = delete; NodeInformation(NodeInformation &&) = delete; NodeInformation &operator=(NodeInformation &) = delete; @@ -55,15 +60,22 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { // Subject to state mutex absl::Status queryHeartBeat(); - const std::string identifier; + + const std::string uuid; + const std::string host; + const uint16_t port; void setState(NodeState state); - NodeState state(); + NodeState state() const; void updateHeartBeat(const NodeHeartBeat &heartBeat); - std::tuple> lastHeartBeat(); + std::tuple> + lastHeartBeat() const; std::chrono::time_point lastCheckin(); // End subject to state mutex absl::Status downloadObjects(const std::vector> &objects); }; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index a0f774d1..2468e484 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -6,58 +6,66 @@ #include "Nodes.h" #include +#include #include +#include #include #include #include #include #include +#include +#include +#include + #include "Logging.h" +#include "NodeInformation.h" #include "geds.grpc.pb.h" -absl::Status Nodes::registerNode(const std::string &identifier) { - auto val = std::make_shared(identifier); - auto existing = _nodes.insertOrExists(identifier, val); +absl::Status Nodes::registerNode(const std::string &uuid, const std::string &host, uint16_t port) { + auto val = std::make_shared(uuid, host, port); + auto existing = _nodes.insertOrExists(uuid, val); if (existing.get() != val.get()) { // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); if (existing->state() == NodeState::Decomissioning) { // Allow reregistering decomissioned nodes. auto connect = val->connect(); if (!connect.ok()) { - LOG_ERROR("Unable to establish backchannel to " + identifier + - " unable to decomission node!"); + LOG_ERROR(connect.message()); } - _nodes.insertOrReplace(identifier, val); + _nodes.insertOrReplace(uuid, val); return absl::OkStatus(); } - auto message = "Node " + identifier + " was already registered!"; + auto message = "Node " + uuid + " was already registered!"; LOG_ERROR(message); return absl::AlreadyExistsError(message); } auto connect = val->connect(); - if (!connect.ok()) { - LOG_ERROR("Unable to establish backchannel to " + identifier + " unable to decomission node!"); + if (connect.ok()) { + val->setState(NodeState::Registered); + } else { + LOG_ERROR(connect.message()); } return absl::OkStatus(); } -absl::Status Nodes::unregisterNode(const std::string &identifier) { - auto removed = _nodes.getAndRemove(identifier); +absl::Status Nodes::unregisterNode(const std::string &uuid) { + auto removed = _nodes.getAndRemove(uuid); if (!removed.value()) { - auto message = "Unable to remove " + identifier + " not found!"; + auto message = "Unable to remove " + uuid + " not found!"; LOG_ERROR(message); return absl::NotFoundError(message); } return absl::OkStatus(); } -absl::Status Nodes::heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat) { - auto val = _nodes.get(identifier); +absl::Status Nodes::heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat) { + auto val = _nodes.get(uuid); if (!val.value()) { - auto message = "Unable to process heartbeat " + identifier + " not found!"; + auto message = "Unable to process heartbeat " + uuid + " not found!"; LOG_ERROR(message); return absl::NotFoundError(message); } @@ -174,7 +182,7 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, threads.emplace_back([target] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { - LOG_ERROR("Unable to relocate objects to ", target->node->identifier); + LOG_ERROR("Unable to relocate objects to ", target->node->host); } }); } @@ -185,3 +193,17 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, _isDecommissioning.unlock(); return absl::OkStatus(); } + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n) { + boost::json::array nv; + const auto &info = n.information(); + info.forall([&nv](const std::shared_ptr &node) { + nv.emplace_back(boost::json::value_from(node)); + }); + jv = {{"nodes", nv}}; +} + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n) { + tag_invoke(t, jv, *n); +} diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index a16d913b..38a97b89 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -6,6 +6,7 @@ #pragma once #include +#include #include #include #include @@ -14,6 +15,7 @@ #include #include +#include #include "ConcurrentMap.h" #include "MDSKVS.h" @@ -39,8 +41,15 @@ class Nodes { return _nodes; }; - absl::Status registerNode(const std::string &identifier); - absl::Status unregisterNode(const std::string &identifier); - absl::Status heartbeat(const std::string &identifier, const NodeHeartBeat &heartbeat); + absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); + absl::Status unregisterNode(const std::string &uuid); + absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); + + std::string toJson() const; }; + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, Nodes const &n); + +void tag_invoke(boost::json::value_from_tag t, boost::json::value &jv, + std::shared_ptr const &n); diff --git a/src/protos/geds.proto b/src/protos/geds.proto index 481502e4..60c881d9 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -73,15 +73,19 @@ enum NodeState { PrepareDecomission = 2; } -message NodeIdentifier { string identifier = 1; } +message NodeIdentifier { + string identifier = 1; + uint32 port = 2; +} message NodeStatus { NodeIdentifier node = 1; NodeState state = 2; + string uuid = 3; } message HeartbeatMessage { - NodeIdentifier node = 1; + string uuid = 1; uint64 memoryAllocated = 2; uint64 memoryUsed = 3; uint64 storageAllocated = 4; From 07ee92a45372cb6509574bd3fc646c3f4b630dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 13:18:19 +0200 Subject: [PATCH 03/16] GEDS: Seal objects once they have been downloaded. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 13084a1e..344846ea 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -933,7 +933,11 @@ absl::Status GEDS::downloadObject(const std::string &bucket, const std::string & if (!newFile.ok()) { return newFile.status(); } - return (*oldFile)->download(*newFile); + auto status = (*oldFile)->download(*newFile); + if (!status.ok()) { + return status; + } + return (*newFile)->seal(); } absl::Status GEDS::downloadObjects(std::vector objects) { From aabbe54b8d32b39271868bdb263331d15b261ac1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 13:19:14 +0200 Subject: [PATCH 04/16] Implement: Purge Local Objects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 20 +++++++++++++++++++ src/libgeds/GEDS.h | 13 +++++++++++++ src/libgeds/Server.cpp | 12 ++++++++---- src/metadataservice/NodeInformation.cpp | 26 +++++++++++++++++++++++++ src/metadataservice/NodeInformation.h | 1 + src/metadataservice/Nodes.cpp | 9 ++++++++- 6 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 344846ea..38e20d9f 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -1079,3 +1079,23 @@ void GEDS::startStorageMonitoringThread() { } }); } + +absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string &key) { + const auto path = getPath(bucket, key); + auto result = _fileHandles.getAndRemove(path); + if (!result.has_value()) { + return absl::NotFoundError("The object with the path " + path.name + + " does not exist locally."); + } + return absl::OkStatus(); +} + +absl::Status GEDS::purgeLocalObjects(std::vector objects) { + for (const auto &obj : objects) { + auto status = purgeLocalObject(obj.bucket, obj.key); + if (!status.ok()) { + LOG_ERROR(status.message()); + } + } + return absl::OkStatus(); +} diff --git a/src/libgeds/GEDS.h b/src/libgeds/GEDS.h index 6d978c24..7342616c 100644 --- a/src/libgeds/GEDS.h +++ b/src/libgeds/GEDS.h @@ -323,8 +323,21 @@ class GEDS : public std::enable_shared_from_this, utility::RWConcurrentObj void relocate(std::vector> &relocatable, bool force = false); void relocate(std::shared_ptr handle, bool force = false); + /** + * @brief Pull object to this GEDS instance. + */ absl::Status downloadObject(const std::string &bucket, const std::string &key); absl::Status downloadObjects(std::vector objects); + + /** + * @brief Purge locally stored object without updating the Metadata server. + */ + absl::Status purgeLocalObject(const std::string &bucket, const std::string &key); + + /** + * @brief Purge locally stored objects if they exist locally. Missing files will be logged. + */ + absl::Status purgeLocalObjects(std::vector objects); }; #endif // GEDS_GEDS_H diff --git a/src/libgeds/Server.cpp b/src/libgeds/Server.cpp index a2bc1251..6dd9dff7 100644 --- a/src/libgeds/Server.cpp +++ b/src/libgeds/Server.cpp @@ -85,10 +85,14 @@ class ServerImpl final : public geds::rpc::GEDSService::Service { ::geds::rpc::StatusResponse *response) override { LOG_INFO(context->peer(), " has requested to delete ", request->objects().size(), " objects."); - (void)context; - (void)request; - (void)response; - auto status = absl::UnimplementedError("DeleteObjectsLocally: NYI"); + std::vector objects; + const auto &data = request->objects(); + objects.reserve(data.size()); + for (const auto &o : data) { + objects.emplace_back(geds::ObjectID(o.bucket(), o.key())); + } + + auto status = _geds->purgeLocalObjects(objects); convertStatus(response, status); return grpc::Status::OK; } diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index e7711d97..22467399 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -100,6 +100,32 @@ NodeInformation::downloadObjects(const std::vector> &objects) { + if (!_connectionMutex.try_lock()) { + return absl::UnavailableError("Unable to pull objects: Lock is unavailable!"); + } + + geds::rpc::MultiObjectID request; + geds::rpc::StatusResponse response; + grpc::ClientContext context; + + for (const auto &o : objects) { + auto obj = request.add_objects(); + obj->set_bucket(o->bucket); + obj->set_key(o->key); + } + + auto status = _stub->DeleteObjectsLocally(&context, request, &response); + if (!status.ok()) { + LOG_ERROR("Unable to execute object delete grpc call, status: ", status.error_code(), " ", + status.error_details()); + return absl::UnknownError("Unable to execute object delete!"); + } + + return convertStatus(response); +} + void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, std::shared_ptr const &n) { const auto &[h, timeStamp] = n->lastHeartBeat(); diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 73742a65..98360af6 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -75,6 +75,7 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { // End subject to state mutex absl::Status downloadObjects(const std::vector> &objects); + absl::Status purgeLocalObjects(const std::vector> &objects); }; void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index 2468e484..ad15abd7 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -182,7 +182,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, threads.emplace_back([target] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { - LOG_ERROR("Unable to relocate objects to ", target->node->host); + LOG_ERROR("Unable to relocate objects to ", target->node->host, + " uuid: ", target->node->uuid); + return; + } + status = target->node->purgeLocalObjects(target->objects); + if (!status.ok()) { + LOG_ERROR("Unable to cleanup local objects on ", target->node->host, + " uuid: ", target->node->uuid); } }); } From d6379d0d4718254db6b935f5356956191629f29b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 15:34:38 +0200 Subject: [PATCH 05/16] Implement HTTP Endpoints to support server decommissioning. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/metadataservice/GRPCServer.cpp | 2 +- src/metadataservice/MDSHttpServer.cpp | 22 +++++----- src/metadataservice/MDSHttpServer.h | 4 +- src/metadataservice/MDSHttpSession.cpp | 54 ++++++++++++++++++++++--- src/metadataservice/MDSHttpSession.h | 6 ++- src/metadataservice/NodeInformation.cpp | 4 ++ src/metadataservice/NodeInformation.h | 4 +- src/metadataservice/Nodes.cpp | 53 +++++++++++++----------- src/metadataservice/Nodes.h | 3 +- src/protos/geds.proto | 3 +- 10 files changed, 111 insertions(+), 44 deletions(-) diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 6a232b15..2ddb5592 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -44,7 +44,7 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { geds::MDSHttpServer _httpServer; public: - MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes) { + MetadataServiceImpl(std::shared_ptr kvs) : _kvs(kvs), _httpServer(4383, _nodes, _kvs) { auto status = _httpServer.start(); if (!status.ok()) { LOG_ERROR("Unable to start HTTP Server!"); diff --git a/src/metadataservice/MDSHttpServer.cpp b/src/metadataservice/MDSHttpServer.cpp index 80db7c5f..4b666c79 100644 --- a/src/metadataservice/MDSHttpServer.cpp +++ b/src/metadataservice/MDSHttpServer.cpp @@ -24,7 +24,8 @@ namespace geds { -MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes) : _port(port), _nodes(nodes) {} +MDSHttpServer::MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs) + : _port(port), _nodes(nodes), _kvs(kvs) {} absl::Status MDSHttpServer::start() { if (_acceptor != nullptr) { @@ -53,15 +54,16 @@ void MDSHttpServer::stop() { } void MDSHttpServer::accept() { - _acceptor->async_accept(boost::asio::make_strand(_ioContext), - [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { - if (ec) { - LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); - return; - } - std::make_shared(std::move(socket), _nodes)->start(); - accept(); - }); + _acceptor->async_accept( + boost::asio::make_strand(_ioContext), + [&](boost::beast::error_code ec, boost::asio::ip::tcp::socket socket) { + if (ec) { + LOG_ERROR("Unable to accept ", ec.message(), " ABORT."); + return; + } + std::make_shared(std::move(socket), _nodes, _kvs)->start(); + accept(); + }); } } // namespace geds diff --git a/src/metadataservice/MDSHttpServer.h b/src/metadataservice/MDSHttpServer.h index 983cfac1..a32a6208 100644 --- a/src/metadataservice/MDSHttpServer.h +++ b/src/metadataservice/MDSHttpServer.h @@ -11,6 +11,7 @@ #include #include "MDSHttpSession.h" +#include "MDSKVS.h" class Nodes; @@ -24,9 +25,10 @@ class MDSHttpServer { std::thread _thread; Nodes &_nodes; + std::shared_ptr _kvs; public: - MDSHttpServer(uint16_t port, Nodes &nodes); + MDSHttpServer(uint16_t port, Nodes &nodes, std::shared_ptr kvs); absl::Status start(); void stop(); diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index cc483fd5..6882afcf 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -5,7 +5,14 @@ #include "MDSHttpSession.h" +#include #include +#include +#include + +#include +#include +#include #include "Logging.h" #include "Nodes.h" @@ -13,8 +20,9 @@ namespace geds { -MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes) - : _stream(std::move(socket)), _nodes(nodes) {} +MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, + std::shared_ptr kvs) + : _stream(std::move(socket)), _nodes(nodes), _kvs(kvs) {} MDSHttpSession::~MDSHttpSession() { close(); } @@ -82,7 +90,7 @@ void MDSHttpSession::prepareHtmlReply() { void MDSHttpSession::prepareApiNodesReply() { _response.result(boost::beast::http::status::ok); _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); - _response.set(boost::beast::http::field::content_type, "text/csv"); + _response.set(boost::beast::http::field::content_type, "application/json"); _response.keep_alive(_request.keep_alive()); auto data = boost::json::value_from(_nodes); @@ -91,6 +99,40 @@ void MDSHttpSession::prepareApiNodesReply() { handleWrite(); } +void MDSHttpSession::prepareApiDecommissionReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + std::vector hostsToDecommission; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + return prepareError(boost::beast::http::status::bad_request, "Unexpected element in array!"); + } + hostsToDecommission.push_back(boost::json::value_to(value)); + } + + auto status = _nodes.decommissionNodes(hostsToDecommission, _kvs); + if (!status.ok()) { + return prepareError(boost::beast::http::status::internal_server_error, + std::string{status.message()}); + } + + boost::beast::ostream(_response.body()) + << R"({"status": "success", "nodes": )" << body << R"(}\n)"; + handleWrite(); +} + void MDSHttpSession::handleRequest() { if (_request.target().empty() || _request.target()[0] != '/') { return prepareError(boost::beast::http::status::bad_request, "Invalid path."); @@ -109,7 +151,9 @@ void MDSHttpSession::handleRequest() { return prepareError(boost::beast::http::status::not_found, "Invalid path"); } if (_request.method() == boost::beast::http::verb::post) { - if (_request.target() == "/api/decomission") { + auto body = boost::beast::buffers_to_string(_request.body().data()); + if (_request.target() == "/api/decommission") { + return prepareApiDecommissionReply(body); } return prepareError(boost::beast::http::status::not_found, "Invalid path"); } @@ -133,7 +177,7 @@ void MDSHttpSession::prepareError(boost::beast::http::status status, std::string _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); _response.set(boost::beast::http::field::content_type, "text/html"); _response.keep_alive(_request.keep_alive()); - boost::beast::ostream(_response.body()) << message; + boost::beast::ostream(_response.body()) << message << "\n"; return handleWrite(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 116f3e39..460e8e67 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -17,6 +17,8 @@ #include #include +#include "MDSKVS.h" + class Nodes; namespace geds { @@ -28,9 +30,10 @@ class MDSHttpSession : public std::enable_shared_from_this { boost::beast::http::response _response; Nodes &_nodes; + std::shared_ptr _kvs; public: - MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes); + MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, std::shared_ptr kvs); ~MDSHttpSession(); void start(); @@ -39,6 +42,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void prepareHtmlReply(); void prepareMetricsReply(); void prepareApiNodesReply(); + void prepareApiDecommissionReply(const std::string &body); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/NodeInformation.cpp b/src/metadataservice/NodeInformation.cpp index 22467399..a4d4dfa6 100644 --- a/src/metadataservice/NodeInformation.cpp +++ b/src/metadataservice/NodeInformation.cpp @@ -57,6 +57,10 @@ NodeState NodeInformation::state() const { return _state; } +std::string NodeInformation::gedsHostUri() const { + return "geds://" + host + ":" + std::to_string(port); +} + void NodeInformation::updateHeartBeat(const NodeHeartBeat &heartBeat) { auto lock = getWriteLock(); _heartBeat = heartBeat; diff --git a/src/metadataservice/NodeInformation.h b/src/metadataservice/NodeInformation.h index 98360af6..f2803c1e 100644 --- a/src/metadataservice/NodeInformation.h +++ b/src/metadataservice/NodeInformation.h @@ -23,7 +23,7 @@ #include "boost/json/detail/value_from.hpp" #include "geds.grpc.pb.h" -enum class NodeState { Registered, Decomissioning, Unknown }; +enum class NodeState { Registered, Decommissioning, ReadyForShutdown, Unknown }; struct NodeHeartBeat { size_t memoryAllocated{0}; @@ -68,6 +68,8 @@ class NodeInformation : public utility::RWConcurrentObjectAdaptor { void setState(NodeState state); NodeState state() const; + std::string gedsHostUri() const; + void updateHeartBeat(const NodeHeartBeat &heartBeat); std::tuple> lastHeartBeat() const; diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index ad15abd7..b64cac6a 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -21,6 +21,7 @@ #include "Logging.h" #include "NodeInformation.h" +#include "absl/status/status.h" #include "geds.grpc.pb.h" absl::Status Nodes::registerNode(const std::string &uuid, const std::string &host, uint16_t port) { @@ -28,8 +29,8 @@ absl::Status Nodes::registerNode(const std::string &uuid, const std::string &hos auto existing = _nodes.insertOrExists(uuid, val); if (existing.get() != val.get()) { // auto diff = std::chrono::duration_cast(now - existing->lastCheckin); - if (existing->state() == NodeState::Decomissioning) { - // Allow reregistering decomissioned nodes. + if (existing->state() == NodeState::Decommissioning) { + // Allow reregistering decommissioned nodes. auto connect = val->connect(); if (!connect.ok()) { LOG_ERROR(connect.message()); @@ -73,32 +74,31 @@ absl::Status Nodes::heartbeat(const std::string &uuid, const NodeHeartBeat &hear return absl::OkStatus(); } -absl::Status Nodes::decomissionNodes(const std::vector &nodes, - std::shared_ptr kvs) { - if (!_isDecommissioning.try_lock()) { - return absl::UnavailableError("Already decomissioning."); +absl::Status Nodes::decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs) { + auto lock = std::unique_lock(_isDecommissioning, std::try_to_lock); + if (!lock.owns_lock()) { + return absl::UnavailableError("Already decommissioning."); } + // Mark hosts as decommissioning and determine host uris to collect objects. + std::vector gedsHostUris; + std::vector> hostsToDecommision; for (const auto &node : nodes) { auto existing = _nodes.get(node); if (!existing.has_value()) { - LOG_ERROR("Unable to decomission: Node " + node + " since it does not exist!"); - continue; + return absl::UnavailableError("Unable to decommission node: " + node + + " since it does not exist!"); } - (*existing)->setState(NodeState::Decomissioning); - } - - // Prefix nodes with geds:// - std::vector prefixedNodes(nodes.size()); - for (const auto &node : nodes) { - prefixedNodes.emplace_back("geds://" + node); + (*existing)->setState(NodeState::Decommissioning); + hostsToDecommision.emplace_back(*existing); + gedsHostUris.emplace_back((*existing)->gedsHostUri()); } // Find all buckets. auto buckets = kvs->listBuckets(); if (!buckets.ok()) { - LOG_ERROR("Unable to list buckets when decomissioning: ", buckets.status().message()); - _isDecommissioning.unlock(); + LOG_ERROR("Unable to list buckets when decommissioning: ", buckets.status().message()); return buckets.status(); } @@ -109,14 +109,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, if (!bucket.ok()) { continue; } - (*bucket)->forall([&objects, &prefixedNodes, &bucketName](const utility::Path &path, - const geds::ObjectInfo &obj) { + (*bucket)->forall([&objects, &gedsHostUris, &bucketName](const utility::Path &path, + const geds::ObjectInfo &obj) { // Do not relocate cached blocks. if (path.name.starts_with("_$cachedblock$/")) { return; } - for (const auto &n : prefixedNodes) { - if (obj.location.starts_with(n)) { + for (const auto &uri : gedsHostUris) { + if (obj.location == uri) { objects.emplace_back( new RelocatableObject{.bucket = bucketName, .key = path.name, .size = obj.size}); } @@ -176,12 +176,14 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, } } + std::atomic failures; std::vector threads; threads.reserve(targetNodes.size()); for (auto &target : targetNodes) { - threads.emplace_back([target] { + threads.emplace_back([target, &failures] { auto status = target->node->downloadObjects(target->objects); if (!status.ok()) { + failures += 1; LOG_ERROR("Unable to relocate objects to ", target->node->host, " uuid: ", target->node->uuid); return; @@ -197,7 +199,12 @@ absl::Status Nodes::decomissionNodes(const std::vector &nodes, thread.join(); } - _isDecommissioning.unlock(); + if (failures == 0) { + for (auto &host : hostsToDecommision) { + host->setState(NodeState::ReadyForShutdown); + } + } + return absl::OkStatus(); } diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index 38a97b89..76d0e807 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -44,7 +44,8 @@ class Nodes { absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); absl::Status unregisterNode(const std::string &uuid); absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); - absl::Status decomissionNodes(const std::vector &nodes, std::shared_ptr kvs); + absl::Status decommissionNodes(const std::vector &nodes, + std::shared_ptr kvs); std::string toJson() const; }; diff --git a/src/protos/geds.proto b/src/protos/geds.proto index 60c881d9..0a4057c4 100644 --- a/src/protos/geds.proto +++ b/src/protos/geds.proto @@ -70,7 +70,8 @@ message ObjectResponse { enum NodeState { Register = 0; Unregister = 1; - PrepareDecomission = 2; + PrepareDecommission = 2; + ReadyForShutdown = 3; } message NodeIdentifier { From 2fcddb3469f5bd56bb23c3c9d5e7971e7daa0bf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 10 May 2023 17:31:52 +0200 Subject: [PATCH 06/16] Python: Create.py add a second file. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/python/create.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/python/create.py b/src/python/create.py index 3a0dba2b..5e031688 100644 --- a/src/python/create.py +++ b/src/python/create.py @@ -36,11 +36,12 @@ l = file.read(message_read, 0, len(message_read)) print(f"Read: {message_read.decode()}") -# Print path -# print(f"File: {file.path()}") +file.seal() -# Seal file +file2 = instance.create("bucket2", "testfile2") +file2.write(message_read, 0, len(message)) +file2.seal() -file.seal() +# Seal file sleep(1000000) From 2c19145fcda75a791a9900b6f27a661659421e69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 10:53:23 +0200 Subject: [PATCH 07/16] Fix server decommissioning and add API to reregister client. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 10 +++-- src/libgeds/GEDSFileHandle.cpp | 1 + src/metadataservice/GRPCServer.cpp | 3 +- src/metadataservice/MDSHttpSession.cpp | 61 ++++++++++++++++++++++---- src/metadataservice/MDSHttpSession.h | 1 + src/metadataservice/Nodes.cpp | 15 ++++--- src/metadataservice/Nodes.h | 1 + 7 files changed, 74 insertions(+), 18 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 38e20d9f..5f71d8f7 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -965,8 +965,8 @@ absl::Status GEDS::downloadObjects(std::vector objects) { rend = objects.size(); } for (auto i = rbegin; i < rend; i++) { - auto file = objects[i]; - boost::asio::post(_ioThreadPool, [self, &file, h]() { + const auto &file = objects[i]; + boost::asio::post(_ioThreadPool, [self, file, h]() { bool error = false; try { auto status = self->downloadObject(file.bucket, file.key); @@ -1084,13 +1084,15 @@ absl::Status GEDS::purgeLocalObject(const std::string &bucket, const std::string const auto path = getPath(bucket, key); auto result = _fileHandles.getAndRemove(path); if (!result.has_value()) { - return absl::NotFoundError("The object with the path " + path.name + - " does not exist locally."); + auto message = "The object with the path " + path.name + " does not exist locally."; + LOG_ERROR(message); + return absl::NotFoundError(message); } return absl::OkStatus(); } absl::Status GEDS::purgeLocalObjects(std::vector objects) { + LOG_DEBUG("Purging ", objects.size(), "."); for (const auto &obj : objects) { auto status = purgeLocalObject(obj.bucket, obj.key); if (!status.ok()) { diff --git a/src/libgeds/GEDSFileHandle.cpp b/src/libgeds/GEDSFileHandle.cpp index bc1157da..91f888e6 100644 --- a/src/libgeds/GEDSFileHandle.cpp +++ b/src/libgeds/GEDSFileHandle.cpp @@ -120,6 +120,7 @@ absl::Status GEDSFileHandle::download(std::shared_ptr destinatio } pos += *count; } while (pos < *totalSize); + return absl::OkStatus(); } diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 2ddb5592..5f083744 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -89,7 +89,8 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { grpc::Status Heartbeat(::grpc::ServerContext *context, const ::geds::rpc::HeartbeatMessage *request, ::geds::rpc::StatusResponse *response) override { - LOG_ACCESS("Heartbeat: ", request->uuid()); + // LOG_ACCESS("Heartbeat: ", request->uuid()); + (void)context; const auto &uuid = request->uuid(); NodeHeartBeat val; diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index 6882afcf..418e732d 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "Logging.h" #include "Nodes.h" @@ -74,16 +75,18 @@ void MDSHttpSession::prepareHtmlReply() { info.forall([&](const std::shared_ptr &node) { const auto &[heartBeat, ts] = node->lastHeartBeat(); boost::beast::ostream(_response.body()) - << " - " << node->uuid << ": " // - << node->host << ":" << node->port << " " // - << "Allocated: " << heartBeat.storageAllocated << " " // - << "Used: " << heartBeat.storageUsed << " " // - << "Memory Allocated: " << heartBeat.memoryAllocated << " " // + << " - " << node->uuid << ": " // + << node->host << ":" << node->port << " " // + << std::string{magic_enum::enum_name(node->state())} << " -- " // + << "Allocated: " << heartBeat.storageAllocated << " " // + << "Used: " << heartBeat.storageUsed << " " // + << "Memory Allocated: " << heartBeat.memoryAllocated << " " // << "Memory Used: " << heartBeat.memoryUsed << "\n"; }); } boost::beast::ostream(_response.body()) << "" << "" << std::endl; + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -95,7 +98,7 @@ void MDSHttpSession::prepareApiNodesReply() { auto data = boost::json::value_from(_nodes); boost::beast::ostream(_response.body()) << boost::json::serialize(data); - + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -128,8 +131,46 @@ void MDSHttpSession::prepareApiDecommissionReply(const std::string &body) { std::string{status.message()}); } - boost::beast::ostream(_response.body()) - << R"({"status": "success", "nodes": )" << body << R"(}\n)"; + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": )" << body << "}"; + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + +void MDSHttpSession::prepareApiReregisterReply(const std::string &body) { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + // Parse body + boost::json::error_code ec; + auto parsed = boost::json::parse(body, ec); + if (ec) { + return prepareError(boost::beast::http::status::bad_request, ec.message()); + } + if (!parsed.is_array()) { + return prepareError(boost::beast::http::status::bad_request, "Expected array!"); + } + + // Send reply + size_t count = 0; + boost::beast::ostream(_response.body()) << R"({"status": "success", "nodes": [)"; + for (const auto &value : parsed.as_array()) { + if (!value.is_string()) { + continue; + } + auto host = boost::json::value_to(value); + auto status = _nodes.reregisterNode(host); + if (status.ok()) { + if (count > 1) { + boost::beast::ostream(_response.body()) << ", "; + } + boost::beast::ostream(_response.body()) << "\"" << host << "\""; + count++; + } + } + + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } @@ -155,6 +196,9 @@ void MDSHttpSession::handleRequest() { if (_request.target() == "/api/decommission") { return prepareApiDecommissionReply(body); } + if (_request.target() == "/api/reregister") { + return prepareApiReregisterReply(body); + } return prepareError(boost::beast::http::status::not_found, "Invalid path"); } return prepareError(boost::beast::http::status::bad_request, "Invalid method."); @@ -169,6 +213,7 @@ void MDSHttpSession::prepareMetricsReply() { std::stringstream stream; Statistics::get().prometheusMetrics(stream); boost::beast::ostream(_response.body()) << stream.str(); + boost::beast::ostream(_response.body()) << "\n"; handleWrite(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 460e8e67..1a90e2d9 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -43,6 +43,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void prepareMetricsReply(); void prepareApiNodesReply(); void prepareApiDecommissionReply(const std::string &body); + void prepareApiReregisterReply(const std::string &body); void prepareError(boost::beast::http::status status, std::string message); void handleWrite(); diff --git a/src/metadataservice/Nodes.cpp b/src/metadataservice/Nodes.cpp index b64cac6a..8945ace7 100644 --- a/src/metadataservice/Nodes.cpp +++ b/src/metadataservice/Nodes.cpp @@ -53,6 +53,15 @@ absl::Status Nodes::registerNode(const std::string &uuid, const std::string &hos return absl::OkStatus(); } +absl::Status Nodes::reregisterNode(const std::string &uuid) { + auto exists = _nodes.get(uuid); + if (!exists.has_value()) { + return absl::NotFoundError("Node " + uuid + " does not exist!"); + } + (*exists)->setState(NodeState::Registered); + return absl::OkStatus(); +} + absl::Status Nodes::unregisterNode(const std::string &uuid) { auto removed = _nodes.getAndRemove(uuid); if (!removed.value()) { @@ -168,6 +177,7 @@ absl::Status Nodes::decommissionNodes(const std::vector &nodes, foundTarget = true; target->objects.push_back(obj); target->target += obj->size; + break; } } if (!foundTarget) { @@ -188,11 +198,6 @@ absl::Status Nodes::decommissionNodes(const std::vector &nodes, " uuid: ", target->node->uuid); return; } - status = target->node->purgeLocalObjects(target->objects); - if (!status.ok()) { - LOG_ERROR("Unable to cleanup local objects on ", target->node->host, - " uuid: ", target->node->uuid); - } }); } for (auto &thread : threads) { diff --git a/src/metadataservice/Nodes.h b/src/metadataservice/Nodes.h index 76d0e807..5b6df20a 100644 --- a/src/metadataservice/Nodes.h +++ b/src/metadataservice/Nodes.h @@ -42,6 +42,7 @@ class Nodes { }; absl::Status registerNode(const std::string &uuid, const std::string &host, uint16_t port); + absl::Status reregisterNode(const std::string &uuid); absl::Status unregisterNode(const std::string &uuid); absl::Status heartbeat(const std::string &uuid, const NodeHeartBeat &heartbeat); absl::Status decommissionNodes(const std::vector &nodes, From 955f02a8d18f45017d7bc6ab1663b7c9cde17300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 11:44:32 +0200 Subject: [PATCH 08/16] MDSKVS: Make forall const. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/utility/MDSKVSBucket.cpp | 2 +- src/utility/MDSKVSBucket.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utility/MDSKVSBucket.cpp b/src/utility/MDSKVSBucket.cpp index 4025b870..07b104b6 100644 --- a/src/utility/MDSKVSBucket.cpp +++ b/src/utility/MDSKVSBucket.cpp @@ -106,7 +106,7 @@ MDSKVSBucket::listObjects(const std::string &keyPrefix, char delimiter) { } void MDSKVSBucket::forall( - std::function action) { + std::function action) const { auto lock = getReadLock(); for (const auto &v : _map) { const auto &key = v.first; diff --git a/src/utility/MDSKVSBucket.h b/src/utility/MDSKVSBucket.h index bcba00fe..dd3a515b 100644 --- a/src/utility/MDSKVSBucket.h +++ b/src/utility/MDSKVSBucket.h @@ -53,5 +53,5 @@ class MDSKVSBucket : public utility::RWConcurrentObjectAdaptor { absl::StatusOr, std::vector>> listObjects(const std::string &keyPrefix, char delimiter = 0); - void forall(std::function action); + void forall(std::function action) const; }; From 382abadb38c108a093f75679cb7a4e1ca5a1a42e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 11 May 2023 11:44:46 +0200 Subject: [PATCH 09/16] MDS HTTP Server: Expose objects as API endpoint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/metadataservice/MDSHttpSession.cpp | 53 ++++++++++++++++++++++++++ src/metadataservice/MDSHttpSession.h | 2 + 2 files changed, 55 insertions(+) diff --git a/src/metadataservice/MDSHttpSession.cpp b/src/metadataservice/MDSHttpSession.cpp index 418e732d..c9bef3b3 100644 --- a/src/metadataservice/MDSHttpSession.cpp +++ b/src/metadataservice/MDSHttpSession.cpp @@ -11,14 +11,52 @@ #include #include +#include +#include #include #include +#include #include #include "Logging.h" +#include "MDSKVS.h" +#include "MDSKVSBucket.h" #include "Nodes.h" #include "Statistics.h" +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + n->forall([&nv](const utility::Path &key, const geds::ObjectInfo &info) { + nv.push_back({key.name, + {{"location", info.location}, + {"size", info.size}, + {"metadata", info.metadata.has_value() + ? (std::to_string(info.metadata->size()) + " bytes") + : std::string{""}}}}); + }); +} + +void tag_invoke(boost::json::value_from_tag, boost::json::value &jv, + const std::shared_ptr &n) { + auto &nv = jv.emplace_array(); + auto buckets = n->listBuckets(); + if (!buckets.ok()) { + jv = nv; + return; + } + for (const auto &bucket : *buckets) { + auto objs = n->getBucket(bucket); + if (!objs.ok()) { + continue; + } + auto b = *objs; + auto value = boost::json::value_from(b); + nv.push_back({bucket, value}); + } + jv = nv; +} + namespace geds { MDSHttpSession::MDSHttpSession(boost::asio::ip::tcp::socket &&socket, Nodes &nodes, @@ -90,6 +128,18 @@ void MDSHttpSession::prepareHtmlReply() { handleWrite(); } +void MDSHttpSession::prepareApiListReply() { + _response.result(boost::beast::http::status::ok); + _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + _response.set(boost::beast::http::field::content_type, "application/json"); + _response.keep_alive(_request.keep_alive()); + + auto data = boost::json::value_from(_kvs); + boost::beast::ostream(_response.body()) << boost::json::serialize(data); + boost::beast::ostream(_response.body()) << "\n"; + handleWrite(); +} + void MDSHttpSession::prepareApiNodesReply() { _response.result(boost::beast::http::status::ok); _response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); @@ -183,6 +233,9 @@ void MDSHttpSession::handleRequest() { if (_request.target() == "/") { return prepareHtmlReply(); } + if (_request.target() == "/api/list") { + return prepareApiListReply(); + } if (_request.target() == "/api/nodes") { return prepareApiNodesReply(); } diff --git a/src/metadataservice/MDSHttpSession.h b/src/metadataservice/MDSHttpSession.h index 1a90e2d9..272f5024 100644 --- a/src/metadataservice/MDSHttpSession.h +++ b/src/metadataservice/MDSHttpSession.h @@ -41,6 +41,7 @@ class MDSHttpSession : public std::enable_shared_from_this { void handleRequest(); void prepareHtmlReply(); void prepareMetricsReply(); + void prepareApiListReply(); void prepareApiNodesReply(); void prepareApiDecommissionReply(const std::string &body); void prepareApiReregisterReply(const std::string &body); @@ -49,4 +50,5 @@ class MDSHttpSession : public std::enable_shared_from_this { void close(); }; + } // namespace geds From 70eabd3a63aaf6c3e0f5bd3bb77d6f0d0772704a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Tue, 16 May 2023 13:51:21 +0200 Subject: [PATCH 10/16] Automatically decomission nodes when they disconnect. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 11 ++++++++--- src/libgeds/TcpTransport.cpp | 4 ++-- src/metadataservice/GRPCServer.cpp | 5 +++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 5f71d8f7..095c960f 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -176,11 +176,17 @@ absl::Status GEDS::stop() { GEDS_CHECK_SERVICE_RUNNING LOG_INFO("Stopping"); LOG_INFO("Printing statistics"); + _state = ServiceState::Stopped; geds::Statistics::print(); - auto result = _metadataService.disconnect(); + auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), + geds::rpc::NodeState::Unregister); + if (!result.ok()) { + LOG_ERROR("Unable to unregister: ", result.message()); + } + result = _metadataService.disconnect(); if (!result.ok()) { - LOG_ERROR("cannot disconnect metadata service"); + LOG_ERROR("cannot disconnect metadata service: ", result.message()); } result = _server.stop(); if (!result.ok()) { @@ -194,7 +200,6 @@ absl::Status GEDS::stop() { _fileTransfers.clear(); _tcpTransport->stop(); - _state = ServiceState::Stopped; _storageMonitoringThread.join(); diff --git a/src/libgeds/TcpTransport.cpp b/src/libgeds/TcpTransport.cpp index f703ee35..994fabf7 100644 --- a/src/libgeds/TcpTransport.cpp +++ b/src/libgeds/TcpTransport.cpp @@ -311,7 +311,7 @@ void TcpTransport::tcpTxThread(unsigned int id) { } epoll_wfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; @@ -657,7 +657,7 @@ void TcpTransport::tcpRxThread(unsigned int id) { epoll_rfd[id] = poll_fd; do { - int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, -1); + int cnt = ::epoll_wait(poll_fd, events, EPOLL_MAXEVENTS, 500); for (int i = 0; i < cnt; i++) { struct epoll_event *ev = &events[i]; diff --git a/src/metadataservice/GRPCServer.cpp b/src/metadataservice/GRPCServer.cpp index 5f083744..5bb0358b 100644 --- a/src/metadataservice/GRPCServer.cpp +++ b/src/metadataservice/GRPCServer.cpp @@ -76,6 +76,11 @@ class MetadataServiceImpl final : public geds::rpc::MetadataService::Service { if (state == geds::rpc::NodeState::Register) { status = _nodes.registerNode(uuid, identifier, port); } else if (state == geds::rpc::NodeState::Unregister) { + std::vector toDecommission = {uuid}; + auto decommissionStatus = _nodes.decommissionNodes(toDecommission, _kvs); + if (!decommissionStatus.ok()) { + LOG_ERROR("Unable to decommission node: ", decommissionStatus.message()); + } status = _nodes.unregisterNode(uuid); } else { LOG_ERROR("Invalid state ", state); From 1f21bedc5d8d3a54130d5d7ba4d31fe54caee60e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 17 May 2023 07:34:35 +0200 Subject: [PATCH 11/16] IO Benchmark: Preallocate Buffers. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/benchmarks/benchmark_io.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/benchmarks/benchmark_io.cpp b/src/benchmarks/benchmark_io.cpp index b6cc7af8..7c949870 100644 --- a/src/benchmarks/benchmark_io.cpp +++ b/src/benchmarks/benchmark_io.cpp @@ -65,6 +65,8 @@ struct Latency { constexpr size_t KILOBYTE = 1024; constexpr size_t MEGABYTE = KILOBYTE * KILOBYTE; +std::vector> buffers; + size_t getPayloadSize(size_t factor) { return KILOBYTE * (1 << factor); } void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t factor, @@ -77,10 +79,12 @@ void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t fact } else { key = std::to_string(factor) + "-" + std::to_string(threadId) + ".data"; } - std::vector buffer(payloadSize); + // std::vector buffer(payloadSize); + auto file = geds->open(FLAGS_bucket.CurrentValue(), key); if (file.ok()) { auto openTime = std::chrono::steady_clock::now(); + auto buffer = buffers[threadId].data(); auto status = file->read(buffer, 0, payloadSize); auto lastByteTime = std::chrono::steady_clock::now(); constexpr auto milliseconds = 1e6; @@ -93,7 +97,6 @@ void runBenchmarkThread(std::shared_ptr geds, size_t threadId, size_t fact lastByteTime - startTime) .count() / milliseconds}); - if (!status.ok()) { auto message = "Error: " + std::string{status.status().message()}; LOG_ERROR(message); @@ -183,8 +186,16 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - for (size_t i = 0; i <= absl::GetFlag(FLAGS_maxFactor); i++) { - for (size_t j = 1; j < absl::GetFlag(FLAGS_maxThreads); j++) { + auto threadCount = absl::GetFlag(FLAGS_maxThreads); + buffers.resize(threadCount); + auto factorCount = absl::GetFlag(FLAGS_maxFactor); + auto maxPayload = getPayloadSize(factorCount); + for (auto &buffer : buffers) { + buffer.resize(maxPayload); + } + + for (size_t i = 0; i <= factorCount; i++) { + for (size_t j = 1; j < threadCount; j++) { auto result = benchmark(geds, i, j); std::cout << result.payloadSize << ": " << result.threads << " " << result.rate << " MB/s" << std::endl; From 29323ba07745958489644696c5c895edcafac950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 17 May 2023 14:52:59 +0200 Subject: [PATCH 12/16] GEDS: Add additional logging when a file is deleted. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/Filesystem.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/libgeds/Filesystem.cpp b/src/libgeds/Filesystem.cpp index 490e5d9c..0c39fd72 100644 --- a/src/libgeds/Filesystem.cpp +++ b/src/libgeds/Filesystem.cpp @@ -44,8 +44,11 @@ absl::Status removeFile(const std::string &path) { int err = unlink(path.c_str()); if (err != 0 && (errno != ENOENT)) { int error = errno; - return absl::UnknownError("Unable to delete file " + path + ": " + std::strerror(error)); + auto message = "Unable to delete file " + path + ": " + std::strerror(error); + LOG_ERROR(message); + return absl::UnknownError(message); } + LOG_DEBUG("Removed ", path); return absl::OkStatus(); } From 2a408bb7a1b4ea51e0dbadc1c523418a346dc5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 17 May 2023 14:54:14 +0200 Subject: [PATCH 13/16] GEDSAbstractFileHandle: Fix typos. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDSAbstractFileHandle.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libgeds/GEDSAbstractFileHandle.h b/src/libgeds/GEDSAbstractFileHandle.h index 570edc5f..7288713b 100644 --- a/src/libgeds/GEDSAbstractFileHandle.h +++ b/src/libgeds/GEDSAbstractFileHandle.h @@ -205,13 +205,13 @@ template class GEDSAbstractFileHandle : public GEDSFileHandle { } auto fh = GEDSS3FileHandle::factory(_gedsService, bucket, key, metadata()); if (!fh.ok()) { - LOG_ERROR("Unable to reopen the relocateed file ", identifier, + LOG_ERROR("Unable to reopen the relocated file ", identifier, " on s3:", fh.status().message()); return fh.status(); } auto status = (*fh)->seal(); if (!status.ok()) { - LOG_ERROR("Unable to seal relocateed file!"); + LOG_ERROR("Unable to seal relocated file!"); (void)(*s3Endpoint)->deleteObject(bucket, key); return status; } From 823af2d853f06507b59f197576e83f6ae3e0ce67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 17 May 2023 14:54:41 +0200 Subject: [PATCH 14/16] GEDS: Allow relocating objects which are in-use. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal Spörri --- src/libgeds/GEDS.cpp | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 095c960f..35cc4a65 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -179,6 +179,9 @@ absl::Status GEDS::stop() { _state = ServiceState::Stopped; geds::Statistics::print(); + // Relocate to S3 if available. + relocate(true); + // Unregister. auto result = _metadataService.configureNode(uuid, _hostname, _server.port(), geds::rpc::NodeState::Unregister); if (!result.ok()) { @@ -200,7 +203,6 @@ absl::Status GEDS::stop() { _fileTransfers.clear(); _tcpTransport->stop(); - _storageMonitoringThread.join(); return result; @@ -1024,12 +1026,12 @@ void GEDS::startStorageMonitoringThread() { allFileHandles.push_back(fh); }); for (const auto &fh : allFileHandles) { - storageUsed += fh->localStorageSize(); - memoryUsed += fh->localMemorySize(); - if (fh->isRelocatable()) { - if (fh->openCount() == 0) { - relocatable.push_back(fh); - } + auto storageSize = fh->localStorageSize(); + auto memSize = fh->localMemorySize(); + storageUsed += storageSize; + memoryUsed += memSize; + if (fh->isRelocatable() && memoryUsed == 0) { + relocatable.push_back(fh); } } } @@ -1059,11 +1061,20 @@ void GEDS::startStorageMonitoringThread() { } } - auto targetStorage = (size_t)(0.7 * (double)_config.available_local_storage); - if (memoryUsed > targetStorage) { + auto targetStorage = (size_t)(0.5 * (double)_config.available_local_storage); + if (storageUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { - return a->lastReleased() < b->lastReleased(); + if (a->openCount() == 0 && b->openCount() == 0) { + return a->lastReleased() < b->lastReleased(); + } + if (a->openCount() == 0) { + return true; + } + if (b->openCount() == 0) { + return false; + } + return a->lastOpened() < b->lastOpened(); }); std::vector> tasks; @@ -1077,6 +1088,8 @@ void GEDS::startStorageMonitoringThread() { } if (tasks.size()) { relocate(tasks); + } else { + LOG_WARNING("Unable to relocate files: No task found!"); } } relocatable.clear(); From a4eaaffd6275a4f8e5d77385c0496f606685bd4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 17 May 2023 19:30:32 +0200 Subject: [PATCH 15/16] GEDS: Fix Object relocation. --- src/libgeds/GEDS.cpp | 47 +++++++++++----------------- src/libgeds/GEDSAbstractFileHandle.h | 2 +- 2 files changed, 19 insertions(+), 30 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 35cc4a65..3765a092 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -865,45 +865,34 @@ void GEDS::relocate(std::vector> &relocatable, b struct RelocateHelper { std::mutex mutex; std::condition_variable cv; - size_t nTasks; - auto lock() { return std::unique_lock(mutex); } + std::atomic nTasks; }; auto h = std::make_shared(); { - auto lock = h->lock(); + std::lock_guard lock(h->mutex); h->nTasks = relocatable.size(); } LOG_INFO("Relocating ", relocatable.size(), " objects."); auto self = shared_from_this(); - size_t off = 3 * _config.io_thread_pool_size; - - for (size_t offset = 0; offset < relocatable.size(); offset += off) { - auto rbegin = offset; - auto rend = rbegin + off; - if (rend > relocatable.size()) { - rend = relocatable.size(); - } - for (auto i = rbegin; i < rend; i++) { - auto fh = relocatable[i]; - boost::asio::post(_ioThreadPool, [self, fh, h, force]() { - try { - self->relocate(fh, force); - } catch (...) { - LOG_ERROR("Encountered an exception during relocation ", fh->identifier); - } - { - auto lock = h->lock(); - h->nTasks -= 1; - } - h->cv.notify_all(); - }); - } - auto relocateLock = h->lock(); - h->cv.wait(relocateLock, [h]() { return h->nTasks == 0; }); - LOG_INFO("Relocated ", relocatable.size(), " objects."); + for (auto fh : relocatable) { + boost::asio::post(_ioThreadPool, [self, fh, h, force]() { + try { + self->relocate(fh, force); + } catch (...) { + LOG_ERROR("Encountered an exception during relocation ", fh->identifier); + } + { + std::lock_guard lock(h->mutex); + h->nTasks -= 1; + } + h->cv.notify_one(); + }); } + std::unique_lock lock(h->mutex); + h->cv.wait(lock, [h]() { return h->nTasks == 0; }); + LOG_INFO("Relocated ", relocatable.size(), " objects."); } void GEDS::relocate(std::shared_ptr handle, bool force) { diff --git a/src/libgeds/GEDSAbstractFileHandle.h b/src/libgeds/GEDSAbstractFileHandle.h index 7288713b..ec939825 100644 --- a/src/libgeds/GEDSAbstractFileHandle.h +++ b/src/libgeds/GEDSAbstractFileHandle.h @@ -174,7 +174,7 @@ template class GEDSAbstractFileHandle : public GEDSFileHandle { if (!isValid()) { return absl::UnavailableError("The file " + identifier + " is no longer valid!"); } - LOG_INFO("Relocating ", identifier); + LOG_INFO("Relocating ", identifier, " (size: ", _file.size(), ") "); if (_openCount > 0) { auto message = "Unable to relocate " + identifier + " reason: The file is still in use."; LOG_ERROR(message); From ed5b5f9777067c297357d121d35cca07c8aeaa69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Wed, 24 May 2023 16:23:07 +0200 Subject: [PATCH 16/16] GEDSConfig: Support storage_spilling_fraction. --- src/libgeds/GEDS.cpp | 18 +++++------------- src/libgeds/GEDSConfig.cpp | 32 ++++++++++++++++++++++++++++++++ src/libgeds/GEDSConfig.h | 14 ++++++++++++++ 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/libgeds/GEDS.cpp b/src/libgeds/GEDS.cpp index 3765a092..7ab640d4 100644 --- a/src/libgeds/GEDS.cpp +++ b/src/libgeds/GEDS.cpp @@ -1019,7 +1019,7 @@ void GEDS::startStorageMonitoringThread() { auto memSize = fh->localMemorySize(); storageUsed += storageSize; memoryUsed += memSize; - if (fh->isRelocatable() && memoryUsed == 0) { + if (fh->isRelocatable() && fh->openCount() == 0) { relocatable.push_back(fh); } } @@ -1050,26 +1050,18 @@ void GEDS::startStorageMonitoringThread() { } } - auto targetStorage = (size_t)(0.5 * (double)_config.available_local_storage); + auto targetStorage = + (size_t)(_config.storage_spilling_fraction * (double)_config.available_local_storage); if (storageUsed > targetStorage) { std::sort(std::begin(relocatable), std::end(relocatable), [](std::shared_ptr a, std::shared_ptr b) { - if (a->openCount() == 0 && b->openCount() == 0) { - return a->lastReleased() < b->lastReleased(); - } - if (a->openCount() == 0) { - return true; - } - if (b->openCount() == 0) { - return false; - } - return a->lastOpened() < b->lastOpened(); + return a->lastReleased() < b->lastReleased(); }); std::vector> tasks; size_t relocateBytes = 0; for (auto &f : relocatable) { - if (relocateBytes > targetStorage) { + if (relocateBytes > (storageUsed - targetStorage)) { break; } relocateBytes += f->localStorageSize(); diff --git a/src/libgeds/GEDSConfig.cpp b/src/libgeds/GEDSConfig.cpp index 5a15baed..17d87b36 100644 --- a/src/libgeds/GEDSConfig.cpp +++ b/src/libgeds/GEDSConfig.cpp @@ -4,7 +4,11 @@ */ #include "GEDSConfig.h" + +#include + #include "Logging.h" +#include "absl/status/status.h" absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { LOG_DEBUG("Trying to set '", key, "' to '", value, "'"); @@ -14,6 +18,14 @@ absl::Status GEDSConfig::set(const std::string &key, const std::string &value) { hostname = value == "" ? std::nullopt : std::make_optional(value); } else if (key == "local_storage_path") { localStoragePath = value; + } else if (key == "node_type") { + if (value == "Standard") { + node_type = GEDSNodeType::Standard; + } else if (value == "Storage") { + node_type = GEDSNodeType::Storage; + } else { + return absl::NotFoundError("Invalid node type " + value); + } } else { LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); @@ -64,6 +76,15 @@ absl::Status GEDSConfig::set(const std::string &key, int64_t value) { return set(key, (size_t)value); } +absl::Status GEDSConfig::set(const std::string &key, double value) { + if (key == "storage_spilling_fraction") { + storage_spilling_fraction = value; + return absl::OkStatus(); + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " not found."); +} + absl::StatusOr GEDSConfig::getString(const std::string &key) const { LOG_INFO("Get ", key, " as string"); if (key == "listen_address") { @@ -75,6 +96,9 @@ absl::StatusOr GEDSConfig::getString(const std::string &key) const if (key == "local_storage_path") { return localStoragePath; } + if (key == "node_type") { + return std::string{magic_enum::enum_name(node_type)}; + } LOG_ERROR("Configuration " + key + " not supported (type: string)."); return absl::NotFoundError("Key " + key + " not found."); } @@ -110,3 +134,11 @@ absl::StatusOr GEDSConfig::getSignedInt(const std::string &key) const { } return value.status(); } + +absl::StatusOr GEDSConfig::getDouble(const std::string &key) const { + if (key == "storage_spilling_fraction") { + return storage_spilling_fraction; + } + LOG_ERROR("Configuration " + key + " not supported (type: double)."); + return absl::NotFoundError("Key " + key + " (double) not found."); +} diff --git a/src/libgeds/GEDSConfig.h b/src/libgeds/GEDSConfig.h index 5923598c..03354e1e 100644 --- a/src/libgeds/GEDSConfig.h +++ b/src/libgeds/GEDSConfig.h @@ -16,6 +16,8 @@ #include "Ports.h" +enum class GEDSNodeType { Standard, Storage }; + struct GEDSConfig { /** * @brief The hostname of the metadata service/ @@ -79,14 +81,26 @@ struct GEDSConfig { size_t available_local_memory = 16 * 1024 * 1024 * (size_t)1024; + /** + * @brief Fraction of the storage where GEDS should start spilling. + */ + double storage_spilling_fraction = 0.7; + + /** + * @brief Node type. + */ + GEDSNodeType node_type = GEDSNodeType::Standard; + GEDSConfig(std::string metadataServiceAddressArg) : metadataServiceAddress(std::move(metadataServiceAddressArg)) {} absl::Status set(const std::string &key, const std::string &value); absl::Status set(const std::string &key, size_t value); absl::Status set(const std::string &key, int64_t value); + absl::Status set(const std::string &key, double value); absl::StatusOr getString(const std::string &key) const; absl::StatusOr getUnsignedInt(const std::string &key) const; absl::StatusOr getSignedInt(const std::string &key) const; + absl::StatusOr getDouble(const std::string &key) const; };