diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..67f10e8c2 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,49 @@ +{ +<<<<<<< HEAD + "editor.tokenColorCustomizations": { + "[*Light*]": { + "textMateRules": [ + { + "scope": "ref.matchtext", + "settings": { + "foreground": "#000" + } + } + ] + }, + "[*Dark*]": { + "textMateRules": [ + { + "scope": "ref.matchtext", + "settings": { + "foreground": "#fff" + } + } + ] + }, + "textMateRules": [ + { + "scope": "googletest.failed", + "settings": { + "foreground": "#f00" + } + }, + { + "scope": "googletest.passed", + "settings": { + "foreground": "#0f0" + } + }, + { + "scope": "googletest.run", + "settings": { + "foreground": "#0f0" + } + } + ] +======= + "files.associations": { + "array": "cpp" +>>>>>>> f2f01849eae98603aa3f1223f9af265743b91e5d + } +} \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c9d98321..2c61d92b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,6 +106,7 @@ add_subdirectory(src/mpi) add_subdirectory(src/proto) add_subdirectory(src/redis) add_subdirectory(src/runner) +add_subdirectory(src/loadbalance) add_subdirectory(src/scheduler) add_subdirectory(src/snapshot) add_subdirectory(src/state) @@ -130,6 +131,7 @@ add_library(faabric $ $ $ + $ $ $ ) diff --git a/include/faabric/loadbalance/LoadBalancePolicy.h b/include/faabric/loadbalance/LoadBalancePolicy.h new file mode 100644 index 000000000..47f482e85 --- /dev/null +++ b/include/faabric/loadbalance/LoadBalancePolicy.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include +#include + +class LoadBalancePolicy +{ + public: + virtual std::vector> dispatch(std::vector>& host_resources) = 0; +}; + +class FaasmDefaultPolicy : public LoadBalancePolicy +{ + public: + std::vector> dispatch(std::vector>& host_resources) override; +}; + +class LeastLoadAveragePolicy : public LoadBalancePolicy +{ + public: + std::vector> dispatch(std::vector>& host_resources) override; +}; + +class MostSlotsPolicy : public LoadBalancePolicy +{ + public: + std::vector> dispatch(std::vector>& host_resources) override; +}; \ No newline at end of file diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 70c630e79..8823f6002 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -277,7 +278,11 @@ class Scheduler inline void setFunctionResult(const faabric::Message& msg) { - setFunctionResult(std::make_unique(msg)); + try { + setFunctionResult(std::make_unique(msg)); + } catch (const std::exception& e) { + SPDLOG_ERROR("[Scheduler.h] Failed to set function result: {}", e.what()); + } } faabric::Message getFunctionResult(unsigned int messageId, @@ -462,6 +467,8 @@ class Scheduler faabric::util::SchedulingTopologyHint topologyHint, std::shared_ptr extraData); + std::set applyLoadBalancedPolicy(std::vector hosts); + std::shared_ptr claimExecutor(const faabric::MessageInBatch& msg); std::vector getUnregisteredHosts(const std::string& user, diff --git a/include/faabric/util/config.h b/include/faabric/util/config.h index c7dfce414..f082ca3c4 100644 --- a/include/faabric/util/config.h +++ b/include/faabric/util/config.h @@ -32,6 +32,11 @@ class SystemConfig bool isStorageNode; int noSingleHostOptimisations; + std::string load_balance_policy; + double offload_cpu_threshold; + double offload_ram_threshold; + double offload_load_avg_threshold; + // Worker-related timeouts int globalMessageTimeout; int boundTimeout; diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 6d89aab18..b74e77516 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -54,10 +54,13 @@ class Queue SPDLOG_ERROR("Invalid queue timeout: {} <= 0", timeoutMs); throw std::runtime_error("Invalid queue timeout"); } - - while (mq.empty()) { + + while (mq.size() == 0) { + SPDLOG_DEBUG("Queue is empty... waiting for dequeue"); std::cv_status returnVal = enqueueNotifier.wait_for( lock, std::chrono::milliseconds(timeoutMs)); + + SPDLOG_DEBUG("Queue has been notified"); // Work out if this has returned due to timeout expiring if (returnVal == std::cv_status::timeout) { @@ -65,11 +68,15 @@ class Queue } } - T value = std::move(mq.front()); - mq.pop(); - emptyNotifier.notify_one(); - - return value; + try { + T value = std::move(mq.front()); + mq.pop(); + emptyNotifier.notify_one(); + return value; + } catch (std::exception& e) { + SPDLOG_ERROR("Caught exception when dequeueing: {}", e.what()); + throw; + } } T* peek(long timeoutMs = 0) diff --git a/include/faabric/util/system_metrics.h b/include/faabric/util/system_metrics.h new file mode 100644 index 000000000..eafea3188 --- /dev/null +++ b/include/faabric/util/system_metrics.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace faabric::util { + struct UtilisationStats + { + double cpu_utilisation; + double ram_utilisation; + double load_average; + }; + + struct CPUStats + { + long totalCpuTime; + long idleCpuTime; + }; + + struct MemStats + { + uint64_t total; + uint64_t available; + }; + + UtilisationStats getSystemUtilisation(); + CPUStats getCPUUtilisation(); + double getMemoryUtilisation(); + double getLoadAverage(); +} \ No newline at end of file diff --git a/src/endpoint/FaabricEndpoint.cpp b/src/endpoint/FaabricEndpoint.cpp index 1510dafaa..2ad2aa8e4 100644 --- a/src/endpoint/FaabricEndpoint.cpp +++ b/src/endpoint/FaabricEndpoint.cpp @@ -78,7 +78,27 @@ class HttpConnection : public std::enable_shared_from_this stream.get_executor(), std::bind_front(&HttpConnection::sendResponse, this->shared_from_this()) }; - handler->onRequest(std::move(hrc), std::move(msg)); + try { + handler->onRequest(std::move(hrc), std::move(msg)); + } catch (std::exception& e) { + SPDLOG_ERROR("Error handling HTTP request: {}", e.what()); + faabric::util::BeastHttpResponse response; + response.result(beast::http::status::internal_server_error); + response.body() = e.what(); + sendResponse(std::move(response)); + } catch (boost::system::system_error& e) { + SPDLOG_ERROR("Error handling HTTP request: {}", e.what()); + faabric::util::BeastHttpResponse response; + response.result(beast::http::status::internal_server_error); + response.body() = e.what(); + sendResponse(std::move(response)); + } catch(...) { + SPDLOG_ERROR("Error handling HTTP request: unknown exception"); + faabric::util::BeastHttpResponse response; + response.result(beast::http::status::internal_server_error); + response.body() = "Unknown error"; + sendResponse(std::move(response)); + } } void onRead(beast::error_code ec, size_t bytesTransferred) diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index aebb33668..cc29eb0e5 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -89,8 +89,22 @@ void FaabricEndpointHandler::onRequest( response.result(beast::http::status::ok); response.body() = std::string("Flush sent"); } else { - executeFunction( - std::move(ctx), std::move(response), std::move(msg)); + try + { + executeFunction(std::move(ctx), std::move(response), std::move(msg)); + } + catch (const std::exception& e) + { + SPDLOG_ERROR("Caught exception in FaabricEndpointHandler::onRequest: {}", e.what()); + response.result(beast::http::status::internal_server_error); + response.body() = std::string("Caught exception: ") + e.what(); + ctx.sendFunction(std::move(response)); + } catch (faabric::util::FaabricException& e) { + SPDLOG_ERROR("Caught FaabricException in FaabricEndpointHandler::onRequest: {}", e.what()); + response.result(beast::http::status::internal_server_error); + response.body() = std::string("Caught exception: ") + e.what(); + ctx.sendFunction(std::move(response)); + } return; } } @@ -174,7 +188,10 @@ void FaabricEndpointHandler::onFunctionResult( faabric::util::funcToString(result, true)); response.body() = result.outputdata(); + SPDLOG_DEBUG("Worker thread {} sending response", gettid()); return ctx.sendFunction(std::move(response)); + SPDLOG_DEBUG("Worker thread {} response sent", gettid()); + // We're done with this request } } diff --git a/src/loadbalance/CMakeLists.txt b/src/loadbalance/CMakeLists.txt new file mode 100644 index 000000000..4d0392361 --- /dev/null +++ b/src/loadbalance/CMakeLists.txt @@ -0,0 +1,7 @@ +faabric_lib(loadbalance + FaasmDefaultPolicy.cpp + LeastLoadAveragePolicy.cpp + MostSlotsPolicy.cpp) + + +target_link_libraries(loadbalance PRIVATE faabric::scheduler) \ No newline at end of file diff --git a/src/loadbalance/FaasmDefaultPolicy.cpp b/src/loadbalance/FaasmDefaultPolicy.cpp new file mode 100644 index 000000000..0853325e3 --- /dev/null +++ b/src/loadbalance/FaasmDefaultPolicy.cpp @@ -0,0 +1,7 @@ +#include +#include + +std::vector> FaasmDefaultPolicy::dispatch(std::vector>& host_resources) +{ + return host_resources; +} \ No newline at end of file diff --git a/src/loadbalance/LeastLoadAveragePolicy.cpp b/src/loadbalance/LeastLoadAveragePolicy.cpp new file mode 100644 index 000000000..47f7ac349 --- /dev/null +++ b/src/loadbalance/LeastLoadAveragePolicy.cpp @@ -0,0 +1,12 @@ +#include +#include + +std::vector> LeastLoadAveragePolicy::dispatch(std::vector>& host_resources) +{ + // Sort the vector by the load average in ascending order + std::sort(host_resources.begin(), host_resources.end(), [](const auto &a, const auto &b) { + return a.second.loadaverage() < b.second.loadaverage(); + }); + + return host_resources; +} \ No newline at end of file diff --git a/src/loadbalance/MostSlotsPolicy.cpp b/src/loadbalance/MostSlotsPolicy.cpp new file mode 100644 index 000000000..e4d38e8ef --- /dev/null +++ b/src/loadbalance/MostSlotsPolicy.cpp @@ -0,0 +1,14 @@ +#include +#include + +std::vector> MostSlotsPolicy::dispatch(std::vector>& host_resources) +{ + // Sort the vector by the number of available slots in descending order + std::sort(host_resources.begin(), host_resources.end(), [](const auto &a, const auto &b) { + int available_a = a.second.slots() - a.second.usedslots(); + int available_b = b.second.slots() - b.second.usedslots(); + return available_a > available_b; + }); + + return host_resources; +} \ No newline at end of file diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index adad18e42..ad607dd1d 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -47,6 +47,7 @@ message BatchExecuteRequest { message HostResources { int32 slots = 1; int32 usedSlots = 2; + double loadAverage = 3; } message UnregisterRequest { @@ -72,6 +73,12 @@ message FunctionStatusResponse { FunctionStatus status = 1; } +message NodeUtilisationResponse { + double cpu_utilisation = 1; + double mem_utilisation = 2; + double load_avg = 3; +} + // --------------------------------------------- // MPI // --------------------------------------------- diff --git a/src/redis/Redis.cpp b/src/redis/Redis.cpp index a2780412a..db17c278c 100644 --- a/src/redis/Redis.cpp +++ b/src/redis/Redis.cpp @@ -678,7 +678,7 @@ UniqueRedisReply Redis::dequeueBase(const std::string& queueName, int timeoutMs) // Check if we got anything if (reply == nullptr || reply->type == REDIS_REPLY_NIL) { std::string msg = - fmt::format("No response from Redis dequeue in {}ms for queue {}", + fmt::format("d from Redis dequeue in {}ms for queue {}", timeoutMs, queueName); throw RedisNoResponseException(msg); diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 9da893120..6f6d3669d 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -428,13 +428,19 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) if (isThreads) { sch.setThreadResult(msg, 1, "", {}); } else { - sch.setFunctionResult(msg); + try { + sch.setFunctionResult(msg); + } catch (const std::exception& ex) { + SPDLOG_ERROR("[Executor.cpp::threadPoolThread] Failed to set function result: {}", ex.what()); + } } } }; if (threadPoolIdx == 0) { + SPDLOG_INFO("Thread pool thread {}:{} is the main thread", id, threadPoolIdx); std::unique_lock _lock(resetMutex); try { + SPDLOG_INFO("Resetting module"); reset(boundMessage); } catch (...) { SPDLOG_ERROR("Caught exception when initialising module for {}", @@ -457,15 +463,28 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // We terminate these threads by sending a shutdown message, but having this // check means they won't hang infinitely if destructed. while (!st.stop_requested()) { - SPDLOG_TRACE("Thread starting loop {}:{}", id, threadPoolIdx); + SPDLOG_INFO("Thread starting loop {}:{}", id, threadPoolIdx); ExecutorTask task; try { ZoneScopedNC("Dequeue task", 0x111111); - task = threadTaskQueues[threadPoolIdx].dequeue(conf.boundTimeout); + SPDLOG_DEBUG("Dequeueing task for thread {}:{} (timeout {}ms)", + id, + threadPoolIdx, + conf.boundTimeout); + + task = threadTaskQueues.at(threadPoolIdx).dequeue(conf.boundTimeout); + SPDLOG_DEBUG("Successfully dequeued task for thread {}:{}", + id, + threadPoolIdx); + } catch (std::bad_optional_access& e) { + SPDLOG_DEBUG("Bad optional access in thread {}:{}", + id, + threadPoolIdx); + continue; } catch (faabric::util::QueueTimeoutException& ex) { - SPDLOG_TRACE( + SPDLOG_DEBUG( "Thread {}:{} got no messages in timeout {}ms, looping", id, threadPoolIdx, @@ -482,15 +501,16 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) } assert(task.req->messages_size() >= task.messageIndex + 1); - faabric::Message& msg = - task.req->mutable_messages()->at(task.messageIndex); + faabric::Message& msg = task.req->mutable_messages()->at(task.messageIndex); // Start dirty tracking if executing threads across hosts bool isSingleHost = task.req->singlehost(); - bool isThreads = - task.req->type() == faabric::BatchExecuteRequest::THREADS; + bool isThreads = task.req->type() == faabric::BatchExecuteRequest::THREADS; bool doDirtyTracking = isThreads && !isSingleHost; if (doDirtyTracking) { + SPDLOG_DEBUG("Starting dirty tracking for thread {}:{}", + id, + threadPoolIdx); // If tracking is thread local, start here as it will happen for // each thread tracker->startThreadLocalTracking(getMemoryView()); @@ -499,11 +519,10 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // Check ptp group std::shared_ptr group = nullptr; if (msg.groupid() > 0) { - group = - faabric::transport::PointToPointGroup::getGroup(msg.groupid()); + group = faabric::transport::PointToPointGroup::getGroup(msg.groupid()); } - SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={}, group={})", + SPDLOG_INFO("Thread {}:{} executing task {} ({}, thread={}, group={})", id, threadPoolIdx, task.messageIndex, @@ -512,11 +531,16 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) msg.groupid()); // Set up context - getScheduler().monitorStartedTasks.fetch_add(1, - std::memory_order_acq_rel); + SPDLOG_INFO("Setting executor context for task {}:{}", + id, + threadPoolIdx); + getScheduler().monitorStartedTasks.fetch_add(1,std::memory_order_acq_rel); ExecutorContext::set(this, task.req, task.messageIndex); // Execute the task + SPDLOG_INFO("Executing task {}:{}", + id, + threadPoolIdx); int64_t msgTimestamp = msg.timestamp(); int64_t nowTimestamp = faabric::util::getGlobalClock().epochMillis(); int32_t returnValue; @@ -585,7 +609,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) assert(oldTaskCount >= 0); bool isLastInBatch = oldTaskCount == 1; - SPDLOG_TRACE("Task {} finished by thread {}:{} ({} left)", + SPDLOG_INFO("[Faabric] Task {} finished by thread {}:{} ({} left)", faabric::util::funcToString(msg, true), id, threadPoolIdx, @@ -645,6 +669,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // executor has been reset, otherwise the executor may not be reused for // a repeat invocation. if (isThreads) { + SPDLOG_INFO("Set result of the task"); ZoneScopedN("Task set result"); // Set non-final thread result if (isLastInBatch) { @@ -656,12 +681,17 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) } else { ZoneScopedN("Task set result"); // Set normal function result - sch.setFunctionResult(msg); + try { + sch.setFunctionResult(msg); + } catch (const std::exception& ex) { + SPDLOG_ERROR("Failed to set function result: {}", ex.what()); + } } // If this is not a threads request and last in its batch, it may be // the main function in a threaded application, in which case we // want to stop any tracking and delete the main thread snapshot if (!isThreads && isLastInBatch) { + SPDLOG_INFO("Not threads request and last in batch"); // Stop tracking memory std::span memView = getMemoryView(); if (!memView.empty()) { @@ -678,6 +708,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // claim. Note that we have to release the claim _after_ resetting, // otherwise the executor won't be ready for reuse if (isLastInBatch) { + SPDLOG_INFO("Last in batch detected, resetting executor and releasing claim"); // Threads skip the reset as they will be restored from their // respective snapshot on the next execution. if (isThreads || skippedExec) { @@ -693,7 +724,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) } task = ExecutorTask(); - + SPDLOG_INFO("Fetched executor task"); // Return this thread index to the pool available for scheduling { faabric::util::UniqueLock lock(threadsMutex); @@ -710,8 +741,11 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // try to schedule another function and be unable to reuse this // executor. ZoneScopedN("Task vacate slot"); + SPDLOG_INFO("Vacating slot"); sch.vacateSlot(); + SPDLOG_INFO("[Executor] Slot vacated"); } + SPDLOG_INFO("Calling soft shutdown"); softShutdown(); } diff --git a/src/scheduler/FunctionCallClient.cpp b/src/scheduler/FunctionCallClient.cpp index 8089013cc..aee8be126 100644 --- a/src/scheduler/FunctionCallClient.cpp +++ b/src/scheduler/FunctionCallClient.cpp @@ -196,10 +196,12 @@ void FunctionCallClient::unregister(faabric::UnregisterRequest& req) faabric::NdpDelta FunctionCallClient::requestNdpDelta(int msgId) { + SPDLOG_DEBUG("Requesting NDP delta for message {}", msgId); faabric::GetNdpDelta gnd; gnd.set_id(msgId); faabric::NdpDelta delta; syncSend(faabric::scheduler::FunctionCalls::NdpDeltaRequest, &gnd, &delta); + SPDLOG_DEBUG("Received NDP delta for message {}", msgId); return delta; } } diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 5ca8cea29..95267aaae 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -28,11 +28,13 @@ void FunctionCallServer::registerNdpDeltaHandler( int id, std::function()> handler) { + SPDLOG_DEBUG("Registering NDP delta handler for id {}", id); ndpDeltaHandlers.insertOrAssign(id, std::move(handler)); } void FunctionCallServer::removeNdpDeltaHandler(int id) { + SPDLOG_DEBUG("Removing NDP delta handler for id {}", id); ndpDeltaHandlers.erase(id); } @@ -74,6 +76,7 @@ std::unique_ptr FunctionCallServer::doSyncRecv( return recvPendingMigrations(message.udata()); } case faabric::scheduler::FunctionCalls::NdpDeltaRequest: { + SPDLOG_DEBUG("Received NDP delta request"); return recvNdpDeltaRequest(message.udata()); } default: { @@ -136,7 +139,11 @@ void FunctionCallServer::recvDirectResult(std::span buffer) PARSE_MSG(faabric::DirectResultTransmission, buffer.data(), buffer.size()) std::unique_ptr result{ parsedMsg.release_result() }; - scheduler.setFunctionResult(std::move(result)); + try { + scheduler.setFunctionResult(std::move(result)); + } catch (const std::exception& e) { + SPDLOG_ERROR("Failed to set direct result: {}", e.what()); + } } std::unique_ptr @@ -150,6 +157,7 @@ FunctionCallServer::recvPendingMigrations(std::span buffer) scheduler.addPendingMigration(msgPtr); return std::make_unique(); + } std::unique_ptr @@ -157,11 +165,18 @@ FunctionCallServer::recvNdpDeltaRequest(std::span buffer) { PARSE_MSG(faabric::GetNdpDelta, buffer.data(), buffer.size()); - auto ndpDelta = ndpDeltaHandlers.get(parsedMsg.id()).value()(); + auto ndpDelta = ndpDeltaHandlers.get(parsedMsg.id()); + + if (!ndpDelta.has_value()) { + SPDLOG_ERROR("No NDP delta handler found for id {}", parsedMsg.id()); + return std::make_unique(); + } + + std::vector ndpDeltaData = ndpDelta.value()(); auto response = std::make_unique(); response->mutable_delta()->assign( - reinterpret_cast(ndpDelta.data()), ndpDelta.size()); + reinterpret_cast(ndpDeltaData.data()), ndpDeltaData.size()); return response; } } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 7b57c0c57..4eab73428 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,8 @@ #include #include +#include + #include #include #include @@ -306,6 +309,7 @@ int Scheduler::reapStaleExecutors() req.set_user(user); req.set_function(function); + SPDLOG_DEBUG("Unregistering {} from {}", key, thisHost); getFunctionCallClient(masterHost)->unregister(req); } @@ -343,6 +347,7 @@ const std::set& Scheduler::getFunctionRegisteredHosts( { faabric::util::SharedLock lock; if (acquireLock) { + SPDLOG_DEBUG("Acquiring lock for registered hosts"); lock = faabric::util::SharedLock(mx); } std::string key = user + "/" + func; @@ -369,7 +374,13 @@ void Scheduler::addRegisteredHost(const std::string& host, void Scheduler::vacateSlot() { ZoneScopedNS("Vacate scheduler slot", 5); - thisHostUsedSlots.fetch_sub(1, std::memory_order_acq_rel); + SPDLOG_INFO("[Scheduler::vacateSlot() - Vacating slot"); + try { + thisHostUsedSlots.fetch_sub(1, std::memory_order_acq_rel); + SPDLOG_INFO("[Scheduler::vacateSlot() - Slot vacated"); + } catch (std::exception& ex) { + SPDLOG_ERROR("Caught exception vacating slot: {}", ex.what()); + } } faabric::util::SchedulingDecision Scheduler::callFunctions( @@ -400,6 +411,7 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( SPDLOG_DEBUG("Forwarding {} back to master {}", funcStr, masterHost); ZoneScopedN("Scheduler::callFunctions forward to master"); + SPDLOG_DEBUG("Forwarding {} to master {}", funcStr, masterHost); getFunctionCallClient(masterHost)->executeFunctions(req); SchedulingDecision decision(firstMsg.appid(), firstMsg.groupid()); decision.returnHost = masterHost; @@ -502,9 +514,6 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision( hosts.push_back(thisHost); } } else { - // At this point we know we're the master host, and we've not been - // asked to force full local execution. - // Work out how many we can handle locally int slots = thisHostResources.slots(); if (topologyHint == faabric::util::SchedulingTopologyHint::UNDERFULL) { @@ -537,11 +546,20 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision( int remainder = nMessages - nLocally; if (!hostKindDifferent && remainder > 0) { + SPDLOG_DEBUG("Getting registered hosts for {}/{}", firstMsg.user(), firstMsg.function()); const std::set& thisRegisteredHosts = getFunctionRegisteredHosts( firstMsg.user(), firstMsg.function(), false); + std::vector registeredHosts; for (const auto& h : thisRegisteredHosts) { + registeredHosts.push_back(h); + } + + std::set balanced_registered_hosts = applyLoadBalancedPolicy(registeredHosts); + + // Loop through the ordered registered hosts and schedule as many as possible on each + for (const auto& h : balanced_registered_hosts) { // Work out resources on the remote host faabric::HostResources r = getHostResources(h); int available = r.slots() - r.usedslots(); @@ -580,16 +598,21 @@ faabric::util::SchedulingDecision Scheduler::doSchedulingDecision( // Now schedule to unregistered hosts if there are messages left std::string lastHost; if (remainder > 0) { + // Do not edit any of this code! It is a critical section and must be left as is :) + std::vector unregisteredHosts; if (hostKindDifferent) { + SPDLOG_DEBUG("Getting available hosts for {}/{}", firstMsg.user(), firstMsg.function()); for (auto&& h : getAvailableHostsForFunction(firstMsg)) { unregisteredHosts.push_back(std::move(h)); } } else { + SPDLOG_DEBUG("Getting unregistered hosts"); unregisteredHosts = getUnregisteredHosts(firstMsg.user(), firstMsg.function()); } + for (const auto& h : unregisteredHosts) { // Skip if this host if (h == thisHost) { @@ -982,6 +1005,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( } // Dispatch the calls + SPDLOG_DEBUG("Dispatching {} to {}", funcStr, host); getFunctionCallClient(host)->executeFunctions(hostRequest); } } @@ -1005,6 +1029,47 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( return decision; } +std::set Scheduler::applyLoadBalancedPolicy(std::vector hosts) +{ + // get load policy from config + std::string policyName = faabric::util::getSystemConfig().load_balance_policy; + std::vector> host_resource_pairs; + + // Fetch resources for each host to inform decision + for (const auto& h : hosts) + { + faabric::HostResources r = getHostResources(h); + host_resource_pairs.push_back(std::make_pair(h, r)); + } + + // Determine policy based on faasm configuration + if (policyName == "faasm_default") { + FaasmDefaultPolicy policy; + SPDLOG_DEBUG("Applying default policy to hosts"); + policy.dispatch(host_resource_pairs); + } else if (policyName == "most_slots") { + MostSlotsPolicy policy; + SPDLOG_DEBUG("Applying most slots policy to hosts"); + policy.dispatch(host_resource_pairs); + } else if (policyName == "least_load") { + LeastLoadAveragePolicy policy; + SPDLOG_DEBUG("Applying least slots policy to hosts"); + policy.dispatch(host_resource_pairs); + } else { + SPDLOG_ERROR("Unknown load balance policy: {}! Applying default policy", policyName); + FaasmDefaultPolicy policy; + policy.dispatch(host_resource_pairs); + } + + // Extract the ordered hosts + std::set ordered_hosts; + for (const auto& [h, r] : host_resource_pairs) + { + ordered_hosts.insert(h); + } + + return ordered_hosts; +} std::vector Scheduler::getUnregisteredHosts( const std::string& user, const std::string& function, @@ -1200,6 +1265,7 @@ void Scheduler::broadcastFlush() allHosts.erase(thisHost); // Dispatch flush message to all other hosts + SPDLOG_DEBUG("Broadcasting flush to {} hosts", allHosts.size()); for (auto& otherHost : allHosts) { getFunctionCallClient(otherHost)->sendFlush(); } @@ -1234,8 +1300,10 @@ void Scheduler::setFunctionResult(std::unique_ptr msg) if (it != localResults.end()) { it->second->setValue(std::move(msg)); } else { + SPDLOG_ERROR("Result received for unknown message {}! Removing delta handler as a precaution", msg->id()); + faabric::scheduler::FunctionCallServer::removeNdpDeltaHandler(msg->id()); throw std::runtime_error( - "Got direct result, but promise is registered"); + "Result received for unknown message " + std::to_string(msg->id())); } return; } @@ -1254,6 +1322,7 @@ void Scheduler::setFunctionResult(std::unique_ptr msg) if (!directResultHost.empty()) { ZoneScopedN("Direct result send"); faabric::util::FullLock lock(mx); + SPDLOG_DEBUG("Sending direct result for {} to {}", msg->id(), directResultHost); auto fc = getFunctionCallClient(directResultHost); lock.unlock(); { @@ -1629,6 +1698,8 @@ faabric::HostResources Scheduler::getThisHostResources() faabric::HostResources hostResources = thisHostResources; hostResources.set_usedslots( this->thisHostUsedSlots.load(std::memory_order_acquire)); + auto load_average = faabric::util::getLoadAverage(); + hostResources.set_loadaverage(load_average); return hostResources; } @@ -1641,7 +1712,7 @@ void Scheduler::setThisHostResources(faabric::HostResources& res) faabric::HostResources Scheduler::getHostResources(const std::string& host) { - SPDLOG_TRACE("Requesting resources from {}", host); + SPDLOG_DEBUG("Requesting resources from {}", host); return getFunctionCallClient(host)->getResources(); } @@ -1822,6 +1893,7 @@ void Scheduler::broadcastPendingMigrations( registeredHosts.erase(thisHost); // Send pending migrations to all involved hosts + SPDLOG_DEBUG("Broadcasting pending migrations for app {}", msg.appid()); for (auto& otherHost : thisRegisteredHosts) { getFunctionCallClient(otherHost)->sendPendingMigrations( pendingMigrations); diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 6d0f81eef..0cbbd6263 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -27,4 +27,5 @@ faabric_lib(util string_tools.cpp timing.cpp testing.cpp + system_metrics.cpp ) diff --git a/src/util/config.cpp b/src/util/config.cpp index e417c8caa..45082df0f 100644 --- a/src/util/config.cpp +++ b/src/util/config.cpp @@ -36,9 +36,11 @@ void SystemConfig::initialise() overrideCpuCount = this->getSystemConfIntParam("OVERRIDE_CPU_COUNT", "0"); noTopologyHints = getEnvVar("NO_TOPOLOGY_HINTS", "off"); isStorageNode = this->getSystemConfIntParam("IS_STORAGE_NODE", "0"); - noSingleHostOptimisations = - this->getSystemConfIntParam("NO_SINGLE_HOST", "0"); - + noSingleHostOptimisations = this->getSystemConfIntParam("NO_SINGLE_HOST", "0"); + load_balance_policy = getEnvVar("LOAD_BALANCE_POLICY", "faasm_default"); + offload_cpu_threshold = stod(getEnvVar("OFFLOAD_CPU_THRESHOLD", "0.8")); + offload_ram_threshold = stod(getEnvVar("OFFLOAD_RAM_THRESHOLD", "0.8")); + offload_load_avg_threshold = stod(getEnvVar("OFFLOAD_LOAD_AVG_THRESHOLD", "0.8")); // Worker-related timeouts (all in seconds) globalMessageTimeout = this->getSystemConfIntParam("GLOBAL_MESSAGE_TIMEOUT", "60000"); @@ -115,6 +117,7 @@ void SystemConfig::print() SPDLOG_INFO("OVERRIDE_CPU_COUNT {}", overrideCpuCount); SPDLOG_INFO("NO_TOPOLOGY_HINTS {}", noTopologyHints); SPDLOG_INFO("IS_STORAGE_NODE {}", isStorageNode); + SPDLOG_INFO("LOAD_BALANCE_POLICY {}", load_balance_policy); SPDLOG_INFO("--- Timeouts ---"); SPDLOG_INFO("GLOBAL_MESSAGE_TIMEOUT {}", globalMessageTimeout); diff --git a/src/util/delta.cpp b/src/util/delta.cpp index fea09b933..07e565644 100644 --- a/src/util/delta.cpp +++ b/src/util/delta.cpp @@ -236,7 +236,7 @@ void applyDelta(std::span delta, std::function getDataPointer) { size_t deltaLen = delta.size(); - if (deltaLen < 2) { + if (deltaLen < 1) { throw std::runtime_error("Delta too short to be valid"); } if (delta[0] != DELTA_PROTOCOL_VERSION) { diff --git a/src/util/system_metrics.cpp b/src/util/system_metrics.cpp new file mode 100644 index 000000000..dc3b4b6eb --- /dev/null +++ b/src/util/system_metrics.cpp @@ -0,0 +1,84 @@ +#include +namespace faabric::util { + + CPUStats getCPUUtilisation() { + std::ifstream cpuinfo("/proc/stat"); + std::string line; + if (!cpuinfo.is_open()) { + throw std::runtime_error("Unable to open /proc/stat"); + } + std::getline(cpuinfo, line); + // Extract CPU utilization information from the line + std::istringstream iss(line); + std::string cpuLabel; + long user, nice, system, idle, iowait, irq, softirq, steal, guest, guest_nice; + iss >> cpuLabel >> user >> nice >> system >> idle >> iowait >> irq >> softirq >> steal >> guest >> guest_nice; + // Calculate total CPU time + long totalCpuTime = user + nice + system + idle + iowait + irq + softirq + steal + guest + guest_nice; + CPUStats stats; + stats.totalCpuTime = totalCpuTime; + stats.idleCpuTime = idle; + return stats; + } + + double getMemoryUtilisation() + { + std::ifstream meminfo("/proc/meminfo"); + std::string line; + if (!meminfo.is_open()) { + throw std::runtime_error("Unable to open /proc/meminfo"); + } + std::getline(meminfo, line); + std::istringstream ss(line); + std::string mem; + ss >> mem; + uint64_t totalMem; + ss >> totalMem; + std::getline(meminfo, line); + ss = std::istringstream(line); + ss >> mem; + uint64_t availableMem; + ss >> availableMem; + return 1.0 - (availableMem / (double)totalMem); + } + + double getLoadAverage() + { + std::ifstream loadavg("/proc/loadavg"); + std::string line; + if (!loadavg.is_open()) { + throw std::runtime_error("Unable to open /proc/loadavg"); + } + + std::getline(loadavg, line); + std::istringstream ss(line); + double load1, load5, load15; + ss >> load1 >> load5 >> load15; + return load1; + } + + UtilisationStats getSystemUtilisation() + { + UtilisationStats stats; + + // Get initial figures + CPUStats cpuStart = getCPUUtilisation(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + // Get final figures + CPUStats cpuEnd = getCPUUtilisation(); + long cpuTimeDelta = cpuEnd.totalCpuTime - cpuStart.totalCpuTime; + long idleTimeDelta = cpuEnd.idleCpuTime - cpuStart.idleCpuTime; + double cpu_utilisation = 1.0 - (idleTimeDelta / (double) cpuTimeDelta); + stats.cpu_utilisation = cpu_utilisation; + stats.ram_utilisation = getMemoryUtilisation(); + stats.load_average = getLoadAverage(); + return stats; + } + + + + +} + +