Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion context-runtime/include/chimaera/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct PoolConfig {
PoolId pool_id_; /**< Pool ID for this module */
PoolQuery pool_query_; /**< Pool query routing (Dynamic or Local) */
std::string config_; /**< Remaining YAML configuration as string */
bool restart_ = false; /**< If true, store compose file for crash-restart */

PoolConfig() = default;

Expand All @@ -69,7 +70,7 @@ struct PoolConfig {
*/
template <class Archive>
void serialize(Archive& ar) {
ar(mod_name_, pool_name_, pool_id_, pool_query_, config_);
ar(mod_name_, pool_name_, pool_id_, pool_query_, config_, restart_);
}
};

Expand Down Expand Up @@ -195,6 +196,12 @@ class ConfigManager : public hshm::BaseConfig {
*/
const ComposeConfig& GetComposeConfig() const { return compose_config_; }

/**
* Get configuration directory for persistent runtime config
* @return Directory path for storing persistent runtime configuration
*/
std::string GetConfDir() const { return conf_dir_; }

/**
* Get wait_for_restart timeout in seconds
* @return Maximum time to wait for remote connection during system boot (default: 30 seconds)
Expand Down Expand Up @@ -264,6 +271,9 @@ class ConfigManager : public hshm::BaseConfig {

// Compose configuration
ComposeConfig compose_config_;

// Configuration directory for persistent runtime config
std::string conf_dir_ = "/tmp/chimaera";
};

} // namespace chi
Expand Down
9 changes: 9 additions & 0 deletions context-runtime/include/chimaera/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ class Container {
(void)increment; // Suppress unused warnings
}

/**
* Restart container after crash recovery
* Default: re-initialize. Override for state restoration.
*/
virtual void Restart(const PoolId& pool_id, const std::string& pool_name,
u32 container_id = 0) {
Init(pool_id, pool_name, container_id);
}

/**
* Serialize task parameters for network transfer (unified method)
* Must be implemented by derived classes
Expand Down
4 changes: 1 addition & 3 deletions context-runtime/include/chimaera/pool_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
#ifndef CHIMAERA_INCLUDE_CHIMAERA_POOL_QUERY_H_
#define CHIMAERA_INCLUDE_CHIMAERA_POOL_QUERY_H_

#include <cereal/cereal.hpp>

#include "chimaera/types.h"

namespace chi {
Expand Down Expand Up @@ -276,7 +274,7 @@ class PoolQuery {
}

/**
* Cereal serialization support
* Serialization support for any archive type
* @param ar Archive for serialization
*/
template <class Archive>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ class Client : public chi::ContainerClient {

return ipc_manager->SendZmq(task, chi::IpcMode::kTcp);
}
/**
* RestartContainers - Re-create pools from saved restart configs
* @param pool_query Pool routing information
* @return Future for the RestartContainers task
*/
chi::Future<RestartContainersTask> AsyncRestartContainers(
const chi::PoolQuery& pool_query) {
auto* ipc_manager = CHI_IPC;

auto task = ipc_manager->NewTask<RestartContainersTask>(
chi::CreateTaskId(), pool_id_, pool_query);

return ipc_manager->Send(task);
}
};

} // namespace chimaera::admin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ class Runtime : public chi::Container {
*/
chi::TaskResume RegisterMemory(hipc::FullPtr<RegisterMemoryTask> task, chi::RunContext &rctx);

/**
* Handle RestartContainers - Re-create pools from saved restart configs
* Reads conf_dir/restart/ directory and re-creates pools from saved YAML
*/
chi::TaskResume RestartContainers(hipc::FullPtr<RestartContainersTask> task, chi::RunContext &rctx);

/**
* Handle SubmitBatch - Submit a batch of tasks in a single RPC
* Deserializes tasks from the batch and executes them in parallel
Expand Down
51 changes: 51 additions & 0 deletions context-runtime/modules/admin/include/chimaera/admin/admin_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,57 @@ struct RegisterMemoryTask : public chi::Task {
}
};

/**
* RestartContainersTask - Restart containers from saved compose configs
* Reads conf_dir/restart/ directory and re-creates pools from saved YAML files
*/
struct RestartContainersTask : public chi::Task {
OUT chi::u32 containers_restarted_;
OUT chi::priv::string error_message_;

/** SHM default constructor */
RestartContainersTask()
: chi::Task(),
containers_restarted_(0),
error_message_(HSHM_MALLOC) {}

/** Emplace constructor */
explicit RestartContainersTask(const chi::TaskId &task_node,
const chi::PoolId &pool_id,
const chi::PoolQuery &pool_query)
: chi::Task(task_node, pool_id, pool_query, Method::kRestartContainers),
containers_restarted_(0),
error_message_(HSHM_MALLOC) {
task_id_ = task_node;
pool_id_ = pool_id;
method_ = Method::kRestartContainers;
task_flags_.Clear();
pool_query_ = pool_query;
}

template <typename Archive>
void SerializeIn(Archive &ar) {
Task::SerializeIn(ar);
}

template <typename Archive>
void SerializeOut(Archive &ar) {
Task::SerializeOut(ar);
ar(containers_restarted_, error_message_);
}

void Copy(const hipc::FullPtr<RestartContainersTask> &other) {
Task::Copy(other.template Cast<Task>());
containers_restarted_ = other->containers_restarted_;
error_message_ = other->error_message_;
}

void Aggregate(const hipc::FullPtr<RestartContainersTask> &other) {
Task::Aggregate(other.template Cast<Task>());
Copy(other);
}
};

} // namespace chimaera::admin

#endif // ADMIN_TASKS_H_
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ GLOBAL_CONST chi::u32 kWreapDeadIpcs = 19;
GLOBAL_CONST chi::u32 kClientRecv = 20;
GLOBAL_CONST chi::u32 kClientSend = 21;
GLOBAL_CONST chi::u32 kRegisterMemory = 22;
GLOBAL_CONST chi::u32 kRestartContainers = 23;
} // namespace Method

} // namespace chimaera::admin
Expand Down
65 changes: 65 additions & 0 deletions context-runtime/modules/admin/src/admin_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include <sstream>
#include <thread>
#include <unordered_map>
#include <filesystem>
#include <vector>

namespace chimaera::admin {
Expand Down Expand Up @@ -1313,6 +1314,70 @@ chi::TaskResume Runtime::RegisterMemory(hipc::FullPtr<RegisterMemoryTask> task,
co_return;
}

chi::TaskResume Runtime::RestartContainers(
hipc::FullPtr<RestartContainersTask> task, chi::RunContext &rctx) {
HLOG(kDebug, "Admin: Executing RestartContainers task");

task->containers_restarted_ = 0;
task->error_message_ = "";

try {
auto *config_manager = CHI_CONFIG_MANAGER;
std::string restart_dir = config_manager->GetConfDir() + "/restart";

namespace fs = std::filesystem;
if (!fs::exists(restart_dir) || !fs::is_directory(restart_dir)) {
HLOG(kDebug, "Admin: No restart directory found at {}", restart_dir);
task->SetReturnCode(0);
co_return;
}

for (const auto &entry : fs::directory_iterator(restart_dir)) {
if (entry.path().extension() != ".yaml") continue;

// Load pool config from YAML file
chi::ConfigManager temp_config;
if (!temp_config.LoadYaml(entry.path().string())) {
HLOG(kError, "Admin: Failed to load restart config: {}",
entry.path().string());
continue;
}

const auto &compose_config = temp_config.GetComposeConfig();
for (const auto &pool_config : compose_config.pools_) {
HLOG(kInfo, "Admin: Restarting pool {} (module: {})",
pool_config.pool_name_, pool_config.mod_name_);

auto future = client_.AsyncCompose(pool_config);
co_await future;

chi::u32 rc = future->GetReturnCode();
if (rc != 0) {
HLOG(kError, "Admin: Failed to restart pool {}: rc={}",
pool_config.pool_name_, rc);
continue;
}

task->containers_restarted_++;
HLOG(kInfo, "Admin: Successfully restarted pool {}",
pool_config.pool_name_);
}
}

task->SetReturnCode(0);
HLOG(kInfo, "Admin: RestartContainers completed, {} containers restarted",
task->containers_restarted_);
} catch (const std::exception &e) {
task->return_code_ = 99;
std::string error_msg =
std::string("Exception during RestartContainers: ") + e.what();
task->error_message_ = chi::priv::string(HSHM_MALLOC, error_msg);
HLOG(kError, "Admin: RestartContainers failed: {}", e.what());
}
(void)rctx;
co_return;
}

chi::TaskResume Runtime::WreapDeadIpcs(hipc::FullPtr<WreapDeadIpcsTask> task,
chi::RunContext &rctx) {
auto *ipc_manager = CHI_IPC;
Expand Down
55 changes: 55 additions & 0 deletions context-runtime/modules/admin/src/autogen/admin_lib_exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ chi::TaskResume Runtime::Run(chi::u32 method, hipc::FullPtr<chi::Task> task_ptr,
co_await RegisterMemory(typed_task, rctx);
break;
}
case Method::kRestartContainers: {
// Cast task FullPtr to specific type
hipc::FullPtr<RestartContainersTask> typed_task = task_ptr.template Cast<RestartContainersTask>();
co_await RestartContainers(typed_task, rctx);
break;
}
default: {
// Unknown method - do nothing
break;
Expand Down Expand Up @@ -193,6 +199,10 @@ void Runtime::DelTask(chi::u32 method, hipc::FullPtr<chi::Task> task_ptr) {
ipc_manager->DelTask(task_ptr.template Cast<RegisterMemoryTask>());
break;
}
case Method::kRestartContainers: {
ipc_manager->DelTask(task_ptr.template Cast<RestartContainersTask>());
break;
}
default: {
// For unknown methods, still try to delete from main segment
ipc_manager->DelTask(task_ptr);
Expand Down Expand Up @@ -279,6 +289,11 @@ void Runtime::SaveTask(chi::u32 method, chi::SaveTaskArchive& archive,
archive << *typed_task.ptr_;
break;
}
case Method::kRestartContainers: {
auto typed_task = task_ptr.template Cast<RestartContainersTask>();
archive << *typed_task.ptr_;
break;
}
default: {
// Unknown method - do nothing
break;
Expand Down Expand Up @@ -364,6 +379,11 @@ void Runtime::LoadTask(chi::u32 method, chi::LoadTaskArchive& archive,
archive >> *typed_task.ptr_;
break;
}
case Method::kRestartContainers: {
auto typed_task = task_ptr.template Cast<RestartContainersTask>();
archive >> *typed_task.ptr_;
break;
}
default: {
// Unknown method - do nothing
break;
Expand Down Expand Up @@ -472,6 +492,12 @@ void Runtime::LocalLoadTask(chi::u32 method, chi::LocalLoadTaskArchive& archive,
typed_task.ptr_->SerializeIn(archive);
break;
}
case Method::kRestartContainers: {
auto typed_task = task_ptr.template Cast<RestartContainersTask>();
// Call SerializeIn - task will call Task::SerializeIn for base fields
typed_task.ptr_->SerializeIn(archive);
break;
}
default: {
// Unknown method - do nothing
break;
Expand Down Expand Up @@ -580,6 +606,12 @@ void Runtime::LocalSaveTask(chi::u32 method, chi::LocalSaveTaskArchive& archive,
typed_task.ptr_->SerializeOut(archive);
break;
}
case Method::kRestartContainers: {
auto typed_task = task_ptr.template Cast<RestartContainersTask>();
// Call SerializeOut - task will call Task::SerializeOut for base fields
typed_task.ptr_->SerializeOut(archive);
break;
}
default: {
// Unknown method - do nothing
break;
Expand Down Expand Up @@ -759,6 +791,17 @@ hipc::FullPtr<chi::Task> Runtime::NewCopyTask(chi::u32 method, hipc::FullPtr<chi
}
break;
}
case Method::kRestartContainers: {
// Allocate new task
auto new_task_ptr = ipc_manager->NewTask<RestartContainersTask>();
if (!new_task_ptr.IsNull()) {
// Copy task fields (includes base Task fields)
auto task_typed = orig_task_ptr.template Cast<RestartContainersTask>();
new_task_ptr->Copy(task_typed);
return new_task_ptr.template Cast<chi::Task>();
}
break;
}
default: {
// For unknown methods, create base Task copy
auto new_task_ptr = ipc_manager->NewTask<chi::Task>();
Expand Down Expand Up @@ -841,6 +884,10 @@ hipc::FullPtr<chi::Task> Runtime::NewTask(chi::u32 method) {
auto new_task_ptr = ipc_manager->NewTask<RegisterMemoryTask>();
return new_task_ptr.template Cast<chi::Task>();
}
case Method::kRestartContainers: {
auto new_task_ptr = ipc_manager->NewTask<RestartContainersTask>();
return new_task_ptr.template Cast<chi::Task>();
}
default: {
// For unknown methods, return null pointer
return hipc::FullPtr<chi::Task>();
Expand Down Expand Up @@ -971,6 +1018,14 @@ void Runtime::Aggregate(chi::u32 method, hipc::FullPtr<chi::Task> origin_task_pt
typed_origin.ptr_->Aggregate(typed_replica);
break;
}
case Method::kRestartContainers: {
// Get typed tasks for Aggregate call
auto typed_origin = origin_task_ptr.template Cast<RestartContainersTask>();
auto typed_replica = replica_task_ptr.template Cast<RestartContainersTask>();
// Call Aggregate (uses task-specific Aggregate if available, otherwise base Task::Aggregate)
typed_origin.ptr_->Aggregate(typed_replica);
break;
}
default: {
// For unknown methods, use base Task Aggregate (which also propagates return codes)
origin_task_ptr.ptr_->Aggregate(replica_task_ptr);
Expand Down
Loading