Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions src/core/ndd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class IndexManager {
std::thread autosave_thread_;
std::atomic<bool> running_{true};
BackupStore backup_store_;
void executeBackupJob(const std::string& index_id, const std::string& backup_name);
void executeBackupJob(const std::string& index_id, const std::string& backup_name,
std::stop_token st);

std::unique_ptr<WriteAheadLog> createWAL(const std::string& index_id) {
const std::string wal_dir = data_dir_ + "/" + index_id;
Expand Down Expand Up @@ -574,9 +575,13 @@ class IndexManager {
}

~IndexManager() {
// Signal autosave thread to stop
// Signal all threads to stop (running_ is checked by autosave and backup threads)
running_ = false;

// Join background backup threads before destroying members
// (prevents use-after-free when detached threads outlive IndexManager)
backup_store_.joinAllThreads();

/**
* Don't wait for autosave thread to exit.
* Since the thread might be sleeping, waiting for join
Expand Down Expand Up @@ -1860,7 +1865,7 @@ class IndexManager {
return backup_store_.deleteBackup(backup_name, username);
}

std::optional<ActiveBackup> getActiveBackup(const std::string& username) {
std::optional<std::pair<std::string, std::string>> getActiveBackup(const std::string& username) {
return backup_store_.getActiveBackup(username);
}

Expand All @@ -1876,7 +1881,8 @@ class IndexManager {

// ========== IndexManager backup implementations ==========

inline void IndexManager::executeBackupJob(const std::string& index_id, const std::string& backup_name) {
inline void IndexManager::executeBackupJob(const std::string& index_id, const std::string& backup_name,
std::stop_token st) {
std::string username;
size_t upos = index_id.find('/');
if (upos != std::string::npos) {
Expand Down Expand Up @@ -1937,6 +1943,11 @@ inline void IndexManager::executeBackupJob(const std::string& index_id, const st
throw std::runtime_error("Cannot create backup without index metadata");
}

// Check stop_token before expensive operations
if (st.stop_requested()) {
throw std::runtime_error("Backup aborted: server shutting down");
}

auto entry_ptr = getIndexEntry(index_id);
auto& entry = *entry_ptr;
std::string metadata_file_in_index = source_dir + "/metadata.json";
Expand All @@ -1951,6 +1962,11 @@ inline void IndexManager::executeBackupJob(const std::string& index_id, const st
*/
std::unique_lock<std::shared_mutex> operation_lock(entry.operation_mutex);

// Check again after acquiring lock (shutdown may have been requested while waiting)
if (st.stop_requested()) {
throw std::runtime_error("Backup aborted: server shutting down");
}

saveIndexInternal(entry);

if(!metadata_json.empty()) {
Expand All @@ -1970,7 +1986,7 @@ inline void IndexManager::executeBackupJob(const std::string& index_id, const st

std::string error_msg;
LOG_DEBUG("Creating tar archive from " << source_dir << " to " << backup_tar_temp);
if(!backup_store_.createBackupTar(source_dir, backup_tar_temp, error_msg)) {
if(!backup_store_.createBackupTar(source_dir, backup_tar_temp, error_msg, st)) {
if(std::filesystem::exists(metadata_file_in_index)) {
std::filesystem::remove(metadata_file_in_index);
}
Expand Down Expand Up @@ -2141,11 +2157,11 @@ inline std::pair<bool, std::string> IndexManager::createBackupAsync(const std::s
}

(void)getIndexEntry(index_id);
backup_store_.setActiveBackup(username, index_id, backup_name);

std::thread([this, index_id, backup_name]() {
executeBackupJob(index_id, backup_name);
}).detach();
std::jthread t([this, index_id, backup_name](std::stop_token st) {
executeBackupJob(index_id, backup_name, st);
});
backup_store_.setActiveBackup(username, index_id, backup_name, std::move(t));

LOG_INFO(2046, index_id, "Backup started: " << backup_name);

Expand Down
4 changes: 2 additions & 2 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ int main(int argc, char** argv) {
crow::json::wvalue response;
if (active) {
response["active"] = true;
response["backup_name"] = active->backup_name;
response["index_id"] = active->index_id;
response["backup_name"] = active->second;
response["index_id"] = active->first;
} else {
response["active"] = false;
}
Expand Down
54 changes: 48 additions & 6 deletions src/storage/backup_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
struct ActiveBackup {
std::string index_id;
std::string backup_name;
std::jthread thread; // jthread: built-in stop_token + auto-join on destruction
};

class BackupStore {
Expand All @@ -41,7 +42,8 @@ class BackupStore {

bool createBackupTar(const std::filesystem::path& source_dir,
const std::filesystem::path& archive_path,
std::string& error_msg) {
std::string& error_msg,
std::stop_token st = {}) {
struct archive* a = archive_write_new();
archive_write_set_format_pax_restricted(a);

Expand All @@ -52,6 +54,13 @@ class BackupStore {
}

for(const auto& entry : std::filesystem::recursive_directory_iterator(source_dir)) {
// Check stop_token per-file so shutdown doesn't block on large tar operations
if(st.stop_requested()) {
archive_write_close(a);
archive_write_free(a);
error_msg = "Backup aborted: server shutting down";
return false;
}
if(entry.is_regular_file()) {
struct archive_entry* e = archive_entry_new();

Expand Down Expand Up @@ -178,21 +187,52 @@ class BackupStore {

// Active backup tracking

void setActiveBackup(const std::string& username, const std::string& index_id, const std::string& backup_name) {
void setActiveBackup(const std::string& username, const std::string& index_id,
const std::string& backup_name, std::jthread&& thread) {
std::lock_guard<std::mutex> lock(backup_state_mutex_);
active_user_backups_[username] = {index_id, backup_name};
active_user_backups_[username] = {index_id, backup_name, std::move(thread)};
}

void clearActiveBackup(const std::string& username) {
std::lock_guard<std::mutex> lock(backup_state_mutex_);
active_user_backups_.erase(username);
auto it = active_user_backups_.find(username);
if (it != active_user_backups_.end()) {
// Called from within the thread itself — detach so erase doesn't try to join
if (it->second.thread.joinable()) {
it->second.thread.detach();
}
active_user_backups_.erase(it);
}
}

bool hasActiveBackup(const std::string& username) const {
std::lock_guard<std::mutex> lock(backup_state_mutex_);
return active_user_backups_.count(username) > 0;
}

// Join all background backup threads before destroying IndexManager members.
// Moves threads out under lock, then request_stop + join outside lock to avoid
// deadlock (finishing threads call clearActiveBackup which also locks backup_state_mutex_).
void joinAllThreads() {
std::vector<std::jthread> threads_to_join;
{
std::lock_guard<std::mutex> lock(backup_state_mutex_);
for (auto& [username, backup] : active_user_backups_) {
if (backup.thread.joinable()) {
threads_to_join.push_back(std::move(backup.thread));
}
}
active_user_backups_.clear();
}
// request_stop + join outside the lock
for (auto& t : threads_to_join) {
t.request_stop(); // signal stop_token — thread sees it inside createBackupTar
if (t.joinable()) {
t.join();
}
}
}

// Backup name validation

std::pair<bool, std::string> validateBackupName(const std::string& backup_name) const {
Expand Down Expand Up @@ -251,10 +291,12 @@ class BackupStore {

// Active backup query

std::optional<ActiveBackup> getActiveBackup(const std::string& username) {
std::optional<std::pair<std::string, std::string>> getActiveBackup(const std::string& username) {
std::lock_guard<std::mutex> lock(backup_state_mutex_);
auto it = active_user_backups_.find(username);
if (it != active_user_backups_.end()) return it->second;
if (it != active_user_backups_.end()) {
return std::make_pair(it->second.index_id, it->second.backup_name);
}
return std::nullopt;
}

Expand Down