From 14dc5371c9e6f788caf036d6273522b183ea48eb Mon Sep 17 00:00:00 2001 From: Jessica Yu Date: Sun, 26 Nov 2017 23:39:32 -0600 Subject: [PATCH] Implemented recover_file. --- zookeeper/include/zk_nn_client.h | 1 + zookeeper/source/zk_nn_client.cc | 77 ++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/zookeeper/include/zk_nn_client.h b/zookeeper/include/zk_nn_client.h index 8c74d984..8b5681fa 100644 --- a/zookeeper/include/zk_nn_client.h +++ b/zookeeper/include/zk_nn_client.h @@ -266,6 +266,7 @@ class ZkNnClient : public ZkClientCommon { RenewLeaseResponseProto &res); void recover_lease(RecoverLeaseRequestProto &req, RecoverLeaseResponseProto &res); + void recover_file(std::string &file_path); /** * Get info of the file. * @param req GetFileInfoRequestProto diff --git a/zookeeper/source/zk_nn_client.cc b/zookeeper/source/zk_nn_client.cc index 41f4469a..84b59858 100644 --- a/zookeeper/source/zk_nn_client.cc +++ b/zookeeper/source/zk_nn_client.cc @@ -25,6 +25,7 @@ #include #include #include +#include using hadoop::hdfs::AddBlockRequestProto; @@ -380,6 +381,77 @@ void ZkNnClient::renew_lease(RenewLeaseRequestProto &req, } } +void ZkNnClient::recover_file(std::string &file_path) { + int error_code; + FileZNode znode_data; + read_file_znode(znode_data, file_path); + std::uint64_t file_length = znode_data.length; + + std::vector children; + if (!zk->get_children(ZookeeperBlocksPath(file_path), children, error_code)) { + LOG(FATAL) << "[recover_file] Failed to get children for " << file_path; + return; + } + + std::uint64_t size_left = file_length; + std::sort(children.begin(), children.end()); + for (auto &child : children) { + std::vector block_vec; + std::uint64_t block_id; + auto child_path = util::concat_path(ZookeeperBlocksPath(file_path), child); + if (!zk->get(child_path, block_vec, error_code, sizeof(block_id))) { + LOG(ERROR) << "[recover_file] failed block retrieval"; + } + block_id = *reinterpret_cast(block_vec.data()); + std::string block_metadata_path = get_block_metadata_path(block_id); + BlockZNode block_data; + std::vector block_data_vec(sizeof(block_data)); + if (!zk->get(block_metadata_path, block_data_vec, error_code, + sizeof(block_data))) { + LOG(ERROR) << "[recover_file] Failed getting block size for " + << block_metadata_path << " with error " << error_code; + } + memcpy(&block_data, &block_data_vec[0], sizeof(block_data)); + uint64_t block_size = block_data.block_size; + + if (size_left < block_size) { + // Updates the block_size + block_data.block_size = size_left; + memcpy(&block_data_vec[0], &block_data, sizeof(block_data)); + if (!zk->set(block_metadata_path, block_data_vec, error_code, + sizeof(block_data))) { + LOG(ERROR) << "[recover_file] Failed setting block size for " + << block_metadata_path << " with error " << error_code; + } + size_left = 0; + } else if (size_left <= 0) { + // Removes the block + std::vector datanodes; + + if (!zk->get_children(get_block_metadata_path(block_id), + datanodes, error_code)) { + LOG(ERROR) << "[recover_file] Failed getting datanode " + "locations for block: " + << block_id + << " with error: " + << error_code; + return; + } + // push delete to datanodes' delete queues + for (auto &dn : datanodes) { + auto delete_queue = util::concat_path(DELETE_QUEUES, dn); + auto delete_item = util::concat_path(delete_queue, "block-"); + LOG(INFO) << "[recover_file] pushed to delete queue " << delete_item; + if (!zk->create(delete_item, block_vec, error_code, false)) { + LOG(INFO) << "[recover_file] pushed create " << delete_item; + } + blockDeleted(block_id, dn); + } + } + size_left -= block_size; + } +} + void ZkNnClient::recover_lease(RecoverLeaseRequestProto &req, RecoverLeaseResponseProto &res) { std::string client_name = req.clientname(); @@ -421,9 +493,7 @@ void ZkNnClient::recover_lease(RecoverLeaseRequestProto &req, } std::string lease_holder_client = children[0]; if (lease_expired(lease_holder_client)) { - // TODO(elfyingying): start the lease recovery process - // that closes the file for the - // client and make sure the replicas are consistent. + recover_file(file_path); if (!zk->delete_node(LeaseZookeeperPath(file_path) + '/' + lease_holder_client, error_code, true)) { LOG(ERROR) << "[recover_lease] Failed to delete the dead lease holder " @@ -928,7 +998,6 @@ ZkNnClient::DeleteResponse ZkNnClient::destroy_helper(const std::string &path, } for (auto &child : children) { auto child_path = util::concat_path(ZookeeperBlocksPath(path), child); - child_path = child_path; ops.push_back(zk->build_delete_op(child_path)); std::vector block_vec; std::uint64_t block;