From 58684af07e67d0c1337ebdab537f27a62fcfa83d Mon Sep 17 00:00:00 2001 From: lylei Date: Fri, 9 Oct 2015 11:41:58 +0800 Subject: [PATCH 1/5] issue=#324 rollback with name --- src/master/master_impl.cc | 58 +++++++++++++++++++++---------- src/master/master_impl.h | 3 +- src/master/remote_master.cc | 16 ++++----- src/master/remote_master.h | 8 ++--- src/master/tablet_manager.cc | 45 ++++++++++++++---------- src/master/tablet_manager.h | 10 +++--- src/proto/master_client.cc | 8 ++--- src/proto/master_client.h | 4 +-- src/proto/master_rpc.proto | 3 +- src/proto/table_meta.proto | 10 ++++-- src/proto/tabletnode_rpc.proto | 3 +- src/sdk/client_impl.cc | 14 ++++++-- src/sdk/client_impl.h | 3 +- src/sdk/tera.h | 5 +-- src/tabletnode/tabletnode_impl.cc | 7 ++-- src/teracli_main.cc | 13 +++++-- 16 files changed, 133 insertions(+), 77 deletions(-) mode change 100644 => 100755 src/master/master_impl.cc mode change 100644 => 100755 src/master/master_impl.h mode change 100644 => 100755 src/master/remote_master.cc mode change 100644 => 100755 src/master/remote_master.h mode change 100644 => 100755 src/master/tablet_manager.cc mode change 100644 => 100755 src/master/tablet_manager.h mode change 100644 => 100755 src/proto/master_client.cc mode change 100644 => 100755 src/proto/master_client.h mode change 100644 => 100755 src/proto/master_rpc.proto mode change 100644 => 100755 src/proto/table_meta.proto mode change 100644 => 100755 src/proto/tabletnode_rpc.proto mode change 100644 => 100755 src/sdk/client_impl.cc mode change 100644 => 100755 src/sdk/client_impl.h mode change 100644 => 100755 src/sdk/tera.h mode change 100644 => 100755 src/tabletnode/tabletnode_impl.cc mode change 100644 => 100755 src/teracli_main.cc diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc old mode 100644 new mode 100755 index 2d05dfa97..339ecfaf7 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -2225,19 +2225,22 @@ void MasterImpl::LoadTabletAsync(TabletPtr tablet, LoadClosure* done, uint64_t) std::vector snapshot_seq; table->ListSnapshot(&snapshot_id); tablet->ListSnapshot(&snapshot_seq); + LOG(INFO) << "LL: id_num=" << snapshot_id.size() << "seq_num=" << snapshot_seq.size(); + for (uint32_t i = 0; i < snapshot_seq.size(); ++i) { + LOG(INFO) << "LL: ss.seq=" << snapshot_seq[i]; + } assert(snapshot_id.size() == snapshot_seq.size()); for (uint32_t i = 0; i < snapshot_id.size(); ++i) { request->add_snapshots_id(snapshot_id[i]); request->add_snapshots_sequence(snapshot_seq[i]); } - std::vector rollback_snapshots; - std::vector rollback_points; - table->ListRollback(&rollback_snapshots); - tablet->ListRollback(&rollback_points); - assert(rollback_snapshots.size() == rollback_points.size()); - for (uint32_t i = 0; i < rollback_snapshots.size(); ++i) { - request->add_rollback_snapshots(rollback_snapshots[i]); - request->add_rollback_points(rollback_points[i]); + std::vector rollback_names; + std::vector rollbacks; + table->ListRollback(&rollback_names); + tablet->ListRollback(&rollbacks); + assert(rollback_names.size() == rollbacks.size()); + for (uint32_t i = 0; i < rollbacks.size(); ++i) { + request->add_rollbacks()->CopyFrom(rollbacks[i]); } TabletMeta meta; @@ -2826,10 +2829,10 @@ void MasterImpl::ReleaseSnapshotCallback(ReleaseSnapshotRequest* request, /// 删掉删不掉无所谓, 不计较~ } -void MasterImpl::Rollback(const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { - LOG(INFO) << "MasterImpl Rollback"; +void MasterImpl::GetRollback(const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { + LOG(INFO) << "MasterImpl Rollback" << request->rollback_name(); response->set_sequence_id(request->sequence_id()); MasterStatus master_status = GetMasterStatus(); @@ -2862,6 +2865,7 @@ void MasterImpl::Rollback(const RollbackRequest* request, task->table = table; task->task_num = 0; task->finish_num = 0; + task->aborted = false; MutexLock lock(&task->mutex); for (uint32_t i = 0; i < task->tablets.size(); ++i) { TabletPtr tablet = task->tablets[i]; @@ -2906,34 +2910,51 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, VLOG(6) << "MasterImpl Rollback id= " << tablet_id << " finish_num= " << task->finish_num << ". Return " << master_response->rollback_point(); + LOG(INFO) << "LL: tast_num=" << task->task_num << " finished=" << task->finish_num; if (task->finish_num != task->task_num) { if (!failed && master_response->status() == kTabletNodeOk) { + LOG(INFO) << "LL: ok status=" << StatusCodeToString(master_response->status()); task->rollback_points[tablet_id] = master_response->rollback_point(); } else { + LOG(INFO) << "LL: abort status=" << StatusCodeToString(master_response->status()); task->aborted = true; } return; } if (failed || task->aborted) { + if (failed) { + LOG(INFO) << "LL:failed"; + } else if (task->aborted) { + LOG(INFO) << "LL:aborted"; + } LOG(WARNING) << "MasterImpl Rollback fail done"; task->response->set_status(kTabletNodeOffLine); task->done->Run(); } else { task->rollback_points[tablet_id] = master_response->rollback_point(); LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->AddRollback(master_request->snapshot_id()); + int sid = task->table->AddRollback(task->request->rollback_name()); for (uint32_t i = 0; i < task->tablets.size(); ++i) { - int tsid = task->tablets[i]->AddRollback(task->rollback_points[i]); + /* + Rollback rollback; + rollback.set_name(task->request->rollback_name()); + rollback.set_snapshot_id(master_request->snapshot_id()); + rollback.set_rollback_point(task->rollback_points[i]); + */ + int tsid = task->tablets[i]->AddRollback(task->request->rollback_name(), + master_request->snapshot_id(), + task->rollback_points[i]); assert(sid == tsid); } WriteClosure* closure = NewClosure(this, &MasterImpl::AddRollbackCallback, - task->table, task->tablets, - FLAGS_tera_master_meta_retry_times, - task->request, task->response, task->done); + task->table, task->tablets, + FLAGS_tera_master_meta_retry_times, + task->request, task->response, task->done); BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); } + task->mutex.Unlock(); delete task; } @@ -2946,6 +2967,7 @@ void MasterImpl::AddRollbackCallback(TablePtr table, WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code) { + LOG(INFO) << "LL:in add rolback callback"; StatusCode status = response->status(); if (!failed && status == kTabletNodeOk) { // all the row status should be the same @@ -2975,7 +2997,7 @@ void MasterImpl::AddRollbackCallback(TablePtr table, } return; } - LOG(INFO) << "Rollback " << rpc_request->table_name() + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() << ", write meta " << rpc_request->snapshot_id() << " done"; rpc_response->set_status(kMasterOk); rpc_done->Run(); diff --git a/src/master/master_impl.h b/src/master/master_impl.h old mode 100644 new mode 100755 index 871cce565..51384b928 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -78,7 +78,7 @@ class MasterImpl { DelSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(const RollbackRequest* request, + void GetRollback(const RollbackRequest* request, RollbackResponse* response, google::protobuf::Closure* done); @@ -193,6 +193,7 @@ class MasterImpl { const RollbackRequest* request; RollbackResponse* response; google::protobuf::Closure* done; + std::string rollback_name; TablePtr table; std::vector tablets; std::vector rollback_points; diff --git a/src/master/remote_master.cc b/src/master/remote_master.cc old mode 100644 new mode 100755 index de037e119..05eafef17 --- a/src/master/remote_master.cc +++ b/src/master/remote_master.cc @@ -46,10 +46,10 @@ void RemoteMaster::DelSnapshot(google::protobuf::RpcController* controller, m_thread_pool->AddTask(callback); } -void RemoteMaster::Rollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { +void RemoteMaster::GetRollback(google::protobuf::RpcController* controller, + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { ThreadPool::Task callback = boost::bind(&RemoteMaster::DoRollback, this, controller, request, response, done); @@ -177,11 +177,11 @@ void RemoteMaster::DoDelSnapshot(google::protobuf::RpcController* controller, } void RemoteMaster::DoRollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { LOG(INFO) << "accept RPC (Rollback)"; - m_master_impl->Rollback(request, response, done); + m_master_impl->GetRollback(request, response, done); LOG(INFO) << "finish RPC (Rollback)"; } diff --git a/src/master/remote_master.h b/src/master/remote_master.h old mode 100644 new mode 100755 index 7eed4abe0..3b2b92392 --- a/src/master/remote_master.h +++ b/src/master/remote_master.h @@ -30,10 +30,10 @@ class RemoteMaster : public MasterServer { DelSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done); + void GetRollback(google::protobuf::RpcController* controller, + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done); void CreateTable(google::protobuf::RpcController* controller, const CreateTableRequest* request, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc old mode 100644 new mode 100755 index 6fff58d52..215654a1d --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -348,16 +348,20 @@ void Tablet::DelSnapshot(int32_t id) { snapshot_list->RemoveLast(); } -int32_t Tablet::AddRollback(uint64_t rollback_point) { +int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { MutexLock lock(&m_mutex); - m_meta.add_rollback_points(rollback_point); - return m_meta.rollback_points_size() - 1; + Rollback rollback; + rollback.set_name(name); + rollback.set_snapshot_id(snapshot_id); + rollback.set_rollback_point(rollback_point); + m_meta.add_rollbacks()->CopyFrom(rollback); + return m_meta.rollbacks_size() - 1; } -void Tablet::ListRollback(std::vector* rollback_points) { +void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); - for (int i = 0; i < m_meta.rollback_points_size(); i++) { - rollback_points->push_back(m_meta.rollback_points(i)); + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + rollbacks->push_back(m_meta.rollbacks(i)); } } @@ -631,15 +635,15 @@ void Table::ListSnapshot(std::vector* snapshots) { *snapshots = m_snapshot_list; } -int32_t Table::AddRollback(uint64_t snapshot_id) { +int32_t Table::AddRollback(std::string rollback_name) { MutexLock lock(&m_mutex); - m_rollback_snapshots.push_back(snapshot_id); - return m_rollback_snapshots.size() - 1; + m_rollback_names.push_back(rollback_name); + return m_rollback_names.size() - 1; } -void Table::ListRollback(std::vector* snapshots) { +void Table::ListRollback(std::vector* rollback_names) { MutexLock lock(&m_mutex); - *snapshots = m_rollback_snapshots; + *rollback_names = m_rollback_names; } void Table::AddDeleteTabletCount() { @@ -671,8 +675,8 @@ void Table::ToMeta(TableMeta* meta) { for (size_t i = 0; i < m_snapshot_list.size(); i++) { meta->add_snapshot_list(m_snapshot_list[i]); } - for (size_t i = 0; i < m_rollback_snapshots.size(); ++i) { - meta->add_rollback_snapshot(m_rollback_snapshots[i]); + for (size_t i = 0; i < m_rollback_names.size(); ++i) { + meta->add_rollback_names(m_rollback_names[i]); } } @@ -767,9 +771,9 @@ bool TabletManager::AddTable(const std::string& table_name, (*table)->m_snapshot_list.push_back(meta.snapshot_list(i)); LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i); } - for (int32_t i = 0; i < meta.rollback_snapshot_size(); ++i) { - (*table)->m_rollback_snapshots.push_back(meta.rollback_snapshot(i)); - LOG(INFO) << table_name << " add rollback " << meta.rollback_snapshot(i); + for (int32_t i = 0; i < meta.rollback_names_size(); ++i) { + (*table)->m_rollback_names.push_back(meta.rollback_names(i)); + LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i); } (*table)->m_mutex.Unlock(); return true; @@ -1207,12 +1211,15 @@ void TabletManager::LoadTabletMeta(const std::string& key, << " start=" << DebugString(meta.key_range().key_start()); // TODO: try correct invalid record } else { + /* for (int i = 0; i < meta.snapshot_list_size(); ++i) { tablet->AddSnapshot(meta.snapshot_list(i)); } - for (int i = 0 ; i < meta.rollback_points_size(); ++i) { - tablet->AddRollback(meta.rollback_points(i)); - } + for (int i = 0 ; i < meta.rollbacks_size(); ++i) { + tablet->AddRollback(meta.rollbacks(i).name(), + meta.rollbacks(i).snapshot_id(), + meta.rollbacks(i).rollback_point()); + }*/ VLOG(5) << "load tablet record: " << tablet; } } diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h old mode 100644 new mode 100755 index 61c075991..d1aee8091 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -106,8 +106,8 @@ class Tablet { int32_t AddSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); - int32_t AddRollback(uint64_t rollback_point); - void ListRollback(std::vector* rollback_points); + int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + void ListRollback(std::vector* rollbacks); // is belong to a table? bool IsBound(); @@ -175,8 +175,8 @@ class Table { int32_t AddSnapshot(uint64_t snapshot); int32_t DelSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshots); - int32_t AddRollback(uint64_t rollback_snapshot); - void ListRollback(std::vector* rollback_snapshots); + int32_t AddRollback(std::string rollback_name); + void ListRollback(std::vector* rollback_names); void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, @@ -195,7 +195,7 @@ class Table { std::string m_name; TableSchema m_schema; std::vector m_snapshot_list; - std::vector m_rollback_snapshots; + std::vector m_rollback_names; TableStatus m_status; uint32_t m_deleted_tablet_num; uint64_t m_max_tablet_no; diff --git a/src/proto/master_client.cc b/src/proto/master_client.cc old mode 100644 new mode 100755 index 4c1bd7db3..223bab329 --- a/src/proto/master_client.cc +++ b/src/proto/master_client.cc @@ -39,12 +39,12 @@ bool MasterClient::DelSnapshot(const DelSnapshotRequest* request, "DelSnapshot", m_rpc_timeout); } -bool MasterClient::Rollback(const RollbackRequest* request, - RollbackResponse* response) { - return SendMessageWithRetry(&MasterServer::Stub::Rollback, +bool MasterClient::GetRollback(const RollbackRequest* request, + RollbackResponse* response) { + return SendMessageWithRetry(&MasterServer::Stub::GetRollback, request, response, (Closure*)NULL, - "Rollback", m_rpc_timeout); + "GetRollback", m_rpc_timeout); } bool MasterClient::CreateTable(const CreateTableRequest* request, diff --git a/src/proto/master_client.h b/src/proto/master_client.h old mode 100644 new mode 100755 index 51e0627fd..985e5a797 --- a/src/proto/master_client.h +++ b/src/proto/master_client.h @@ -29,8 +29,8 @@ class MasterClient : public RpcClient { virtual bool DelSnapshot(const DelSnapshotRequest* request, DelSnapshotResponse* response); - virtual bool Rollback(const RollbackRequest* request, - RollbackResponse* response); + virtual bool GetRollback(const RollbackRequest* request, + RollbackResponse* response); virtual bool CreateTable(const CreateTableRequest* request, CreateTableResponse* response); diff --git a/src/proto/master_rpc.proto b/src/proto/master_rpc.proto old mode 100644 new mode 100755 index 9b41184d4..f1d057c9e --- a/src/proto/master_rpc.proto +++ b/src/proto/master_rpc.proto @@ -170,6 +170,7 @@ message RollbackRequest { optional uint64 sequence_id = 1; optional string table_name = 2; optional uint64 snapshot_id = 3; + optional string rollback_name = 4; } message RollbackResponse { @@ -208,7 +209,7 @@ message RenameTableResponse { service MasterServer { rpc GetSnapshot(GetSnapshotRequest) returns(GetSnapshotResponse); rpc DelSnapshot(DelSnapshotRequest) returns(DelSnapshotResponse); - rpc Rollback(RollbackRequest) returns (RollbackResponse); + rpc GetRollback(RollbackRequest) returns (RollbackResponse); rpc CreateTable(CreateTableRequest) returns(CreateTableResponse); rpc DeleteTable(DeleteTableRequest) returns (DeleteTableResponse); rpc DisableTable(DisableTableRequest) returns (DisableTableResponse); diff --git a/src/proto/table_meta.proto b/src/proto/table_meta.proto old mode 100644 new mode 100755 index 0c077131c..8828820a1 --- a/src/proto/table_meta.proto +++ b/src/proto/table_meta.proto @@ -32,6 +32,12 @@ message TabletLocation { required string server_addr = 2; } +message Rollback { + optional string name = 1; + optional uint64 snapshot_id = 2; + optional uint64 rollback_point = 3; +} + message ScanOption { optional KeyRange key_range = 1; optional int32 max_version = 2; @@ -65,7 +71,7 @@ message TableMeta { optional TableSchema schema = 3; repeated uint64 snapshot_list = 4; optional uint64 create_time = 5; - repeated uint64 rollback_snapshot = 6; + repeated string rollback_names = 6; } message TabletMeta { @@ -81,7 +87,7 @@ message TabletMeta { repeated uint64 snapshot_list = 11; repeated uint64 parent_tablets = 12; repeated int64 lg_size = 13; - repeated uint64 rollback_points = 14; + repeated Rollback rollbacks = 14; } message TableMetaList { diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto old mode 100644 new mode 100755 index 44d22cdba..53b0df546 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -70,8 +70,7 @@ message LoadTabletRequest { repeated uint64 snapshots_id = 9; repeated uint64 snapshots_sequence = 10; repeated uint64 parent_tablets = 11; - repeated uint64 rollback_snapshots = 12; - repeated uint64 rollback_points = 13; + repeated Rollback rollbacks = 12; } message LoadTabletResponse { diff --git a/src/sdk/client_impl.cc b/src/sdk/client_impl.cc old mode 100644 new mode 100755 index 0dd860904..1903ff42c --- a/src/sdk/client_impl.cc +++ b/src/sdk/client_impl.cc @@ -713,16 +713,24 @@ bool ClientImpl::DelSnapshot(const string& name, uint64_t snapshot, ErrorCode* e return false; } -bool ClientImpl::Rollback(const string& name, uint64_t snapshot, ErrorCode* err) { +bool ClientImpl::Rollback(const string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err) { master::MasterClient master_client(_cluster->MasterAddr()); + std::string internal_table_name; + if (!GetInternalTableName(name, err, &internal_table_name)) { + LOG(ERROR) << "faild to scan meta schema"; + return false; + } RollbackRequest request; RollbackResponse response; request.set_sequence_id(0); - request.set_table_name(name); + request.set_table_name(internal_table_name); request.set_snapshot_id(snapshot); + request.set_rollback_name(rollback_name); + std::cout << name << " " << rollback_name << std::endl; - if (master_client.Rollback(&request, &response)) { + if (master_client.GetRollback(&request, &response)) { if (response.status() == kMasterOk) { std::cout << name << " rollback to snapshot sucessfully" << std::endl; return true; diff --git a/src/sdk/client_impl.h b/src/sdk/client_impl.h old mode 100644 new mode 100755 index 8f270f248..5ab8e5ef7 --- a/src/sdk/client_impl.h +++ b/src/sdk/client_impl.h @@ -71,7 +71,8 @@ class ClientImpl : public Client { virtual bool DelSnapshot(const string& name, uint64_t snapshot, ErrorCode* err); - virtual bool Rollback(const string& name, uint64_t snapshot, ErrorCode* err); + virtual bool Rollback(const string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err); virtual bool CmdCtrl(const string& command, const std::vector& arg_list, diff --git a/src/sdk/tera.h b/src/sdk/tera.h old mode 100644 new mode 100755 index ba61df8a7..7c3a48448 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -659,8 +659,9 @@ class Client { virtual bool IsTableEmpty(const std::string& table_name, ErrorCode* err) = 0; virtual bool GetSnapshot(const std::string& name, uint64_t* snapshot, ErrorCode* err) = 0; - virtual bool DelSnapshot(const std::string& name, uint64_t snapshot, ErrorCode* err) = 0; - virtual bool Rollback(const std::string& name, uint64_t snapshot, ErrorCode* err) = 0; + virtual bool DelSnapshot(const std::string& name, uint64_t snapshot,ErrorCode* err) = 0; + virtual bool Rollback(const std::string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err) = 0; virtual bool CmdCtrl(const std::string& command, const std::vector& arg_list, diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc old mode 100644 new mode 100755 index 27ad2f2f8..56180e3d7 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -230,11 +230,12 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, } // to recover rollbacks - assert(request->rollback_snapshots_size() == request->rollback_points_size()); std::map rollbacks; - int32_t num_of_rollbacks = request->rollback_snapshots_size(); + int32_t num_of_rollbacks = request->rollbacks_size(); for (int32_t i = 0; i < num_of_rollbacks; ++i) { - rollbacks[request->rollback_snapshots(i)] = request->rollback_points(i); + rollbacks[request->rollbacks(i).snapshot_id()] = request->rollbacks(i).rollback_point(); + VLOG(10) << "load tablet with rollback " << request->rollbacks(i).snapshot_id() + << "-" << request->rollbacks(i).rollback_point(); } LOG(INFO) << "start load tablet, id: " << request->sequence_id() diff --git a/src/teracli_main.cc b/src/teracli_main.cc old mode 100644 new mode 100755 index 45549a029..a4de8e36a --- a/src/teracli_main.cc +++ b/src/teracli_main.cc @@ -45,7 +45,8 @@ DEFINE_bool(tera_client_scan_async_enabled, false, "enable the streaming scan mo DEFINE_int64(scan_pack_interval, 5000, "scan timeout"); DEFINE_int64(snapshot, 0, "read | scan snapshot"); -DEFINE_string(rollback_switch, "close", "Pandora's box, do not open"); +DEFINE_string(rollback_switch, "open", "Pandora's box, do not open"); +DEFINE_string(rollback_name, "", "rollback operation's name"); volatile int32_t g_start_time = 0; volatile int32_t g_end_time = 0; @@ -1768,7 +1769,7 @@ int32_t SnapshotOp(Client* client, int32_t argc, char** argv, ErrorCode* err) { } std::cout << "new snapshot: " << snapshot << std::endl; } else if (FLAGS_rollback_switch == "open" && strcmp(argv[3], "rollback") == 0) { - if (!client->Rollback(tablename, FLAGS_snapshot, err)) { + if (!client->Rollback(tablename, FLAGS_snapshot, FLAGS_rollback_name, err)) { LOG(ERROR) << "fail to rollback to snapshot: " << err->GetReason(); return -1; } @@ -2123,6 +2124,10 @@ int32_t Meta2Op(Client *client, int32_t argc, char** argv) { const tera::TableMeta& meta = table_list.meta(i); if (op == "show") { std::cout << "table: " << meta.table_name() << std::endl; + int32_t rollback_size = meta.rollback_names_size(); + for (int32_t rollback_i = 0; rollback_i < rollback_size; ++rollback_i) { + std::cout << "rollback_name: " << meta.rollback_names(rollback_i) << std::endl; + } int32_t lg_size = meta.schema().locality_groups_size(); for (int32_t lg_id = 0; lg_id < lg_size; lg_id++) { const tera::LocalityGroupSchema& lg = @@ -2160,6 +2165,10 @@ int32_t Meta2Op(Client *client, int32_t argc, char** argv) { << meta.size() << ", " << StatusCodeToString(meta.status()) << ", " << StatusCodeToString(meta.compact_status()) << std::endl; + int32_t rollback_size = meta.rollbacks_size(); + for (int rollback_i = 0; rollback_i < rollback_size; ++rollback_i) { + std::cout << " rollback:" << meta.rollbacks(rollback_i).name() << ", " << meta.rollbacks(rollback_i).snapshot_id() << ", " << meta.rollbacks(rollback_i).rollback_point() << std::endl; + } } if (op == "bak") { WriteTablet(meta, bak); From 1447d44b26638174c4c91128df49123135e60044 Mon Sep 17 00:00:00 2001 From: lylei Date: Thu, 15 Oct 2015 19:43:51 +0800 Subject: [PATCH 2/5] issue=#382 add rollback test cases --- test/testcase/common.py | 78 ++++++++++++++++++++---------- test/testcase/test_data.py | 21 +++++---- test/testcase/test_snapshot.py | 86 +++++++++++++++++++++++++++++++--- 3 files changed, 146 insertions(+), 39 deletions(-) diff --git a/test/testcase/common.py b/test/testcase/common.py index 09bf8fdc4..f33c823fa 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -9,16 +9,14 @@ import os import nose.tools -tera_bench_binary = './tera_bench' -tera_mark_binary = './tera_mark' -teracli_binary = './teracli' +from conf import const def print_debug_msg(sid=0, msg=""): """ provide general print interface """ - print "@%d======================%s"%(sid, msg) + print "@%d======================%s" % (sid, msg) def exe_and_check_res(cmd): @@ -52,9 +50,11 @@ def clear_env(): def cleanup(): - ret = subprocess.Popen('./teracli disable test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + ret = subprocess.Popen(const.teracli_binary + ' disable test', + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) - ret = subprocess.Popen('./teracli drop test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + ret = subprocess.Popen(const.teracli_binary + ' drop test', + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) files = os.listdir('.') @@ -63,19 +63,39 @@ def cleanup(): os.remove(f) +def cluster_op(op): + if op == 'kill': + print 'kill cluster' + ret = subprocess.Popen(const.kill_script, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + elif op == 'launch': + print 'launch cluster' + ret = subprocess.Popen(const.launch_script, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + else: + print 'unknown argument' + nose.tools.assert_true(False) + + def create_kv_table(): - """ - create kv table - :return: - """ + print 'print kv table' + cleanup() + ret = subprocess.Popen(const.teracli_binary + ' create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + + +def create_singleversion_table(): + print 'print single version table' cleanup() - ret = subprocess.Popen('./teracli create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + ret = subprocess.Popen(const.teracli_binary + ' create "test{cf0, cf1}"', + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) -def create_table(): +def create_multiversion_table(): + print 'print multi version table' cleanup() - ret = subprocess.Popen('./teracli create "test{cf0, cf1}"', + ret = subprocess.Popen(const.teracli_binary + ' create "test{cf0, cf1}"', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) @@ -90,7 +110,6 @@ def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, :param value_size: value size in Bytes :param num: entry number :param key_size: key size in Bytes - :param is_append: append the data to an exists file :param cf: cf list, e.g. 'cf0:qual,cf1:flag'. Empty cf list for kv mode. Notice: no space in between :param key_seed: seed for random key generator :param value_seed: seed for random value generator @@ -124,8 +143,8 @@ def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, """ --verify=false""".format(op=op, table_name=table_name) cmd = '{tera_bench} {bench_args} | awk {awk_args} | {tera_mark} {mark_args}'.format( - tera_bench=tera_bench_binary, bench_args=tera_bench_args, awk_args=awk_args, - tera_mark=tera_mark_binary, mark_args=tera_mark_args) + tera_bench=const.tera_bench_binary, bench_args=tera_bench_args, awk_args=awk_args, + tera_mark=const.tera_mark_binary, mark_args=tera_mark_args) print cmd ret = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -145,7 +164,7 @@ def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, redirect_op += '>' dump_cmd = '{tera_bench} {tera_bench_args} | awk {awk_args} {redirect_op} {out}'.format( - tera_bench=tera_bench_binary, tera_bench_args=tera_bench_args, + tera_bench=const.tera_bench_binary, tera_bench_args=tera_bench_args, redirect_op=redirect_op, awk_args=awk_args, out=path) print dump_cmd ret = subprocess.Popen(dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -170,8 +189,8 @@ def scan_table(table_name, file_path, allversion, snapshot=0): if snapshot != 0: snapshot_args += '--snapshot={snapshot}'.format(snapshot=snapshot) - scan_cmd = '{teracli} {op} {table_name} "" "" {snapshot}> {out}'.format( - teracli=teracli_binary, op=allv, table_name=table_name, snapshot=snapshot_args, out=file_path) + scan_cmd = '{teracli} {op} {table_name} "" "" {snapshot} > {out}'.format( + teracli=const.teracli_binary, op=allv, table_name=table_name, snapshot=snapshot_args, out=file_path) print scan_cmd ret = subprocess.Popen(scan_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) @@ -179,7 +198,7 @@ def scan_table(table_name, file_path, allversion, snapshot=0): def get_tablet_list(table_name): # TODO: need a more elegant & general way to obtain tablet info - show_cmd = '{teracli} show {table}'.format(teracli=teracli_binary, table=table_name) + show_cmd = '{teracli} show {table}'.format(teracli=const.teracli_binary, table=table_name) print show_cmd ret = subprocess.Popen(show_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) tablet_info = ret.stdout.readlines()[5:] # tablet info starts from the 6th line @@ -194,7 +213,7 @@ def get_tablet_list(table_name): def compact_tablets(tablet_list): # TODO: compact may timeout for tablet in tablet_list: - compact_cmd = '{teracli} tablet compact {tablet}'.format(teracli=teracli_binary, tablet=tablet) + compact_cmd = '{teracli} tablet compact {tablet}'.format(teracli=const.teracli_binary, tablet=tablet) print compact_cmd ret = subprocess.Popen(compact_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) @@ -207,7 +226,7 @@ def snapshot_op(table_name): :return: snapshot id on success, None otherwise """ # TODO: delete snapshot - snapshot_cmd = '{teracli} snapshot {table_name} create'.format(teracli=teracli_binary, table_name=table_name) + snapshot_cmd = '{teracli} snapshot {table_name} create'.format(teracli=const.teracli_binary, table_name=table_name) print snapshot_cmd ret = subprocess.Popen(snapshot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out = ret.stdout.readlines() @@ -224,6 +243,18 @@ def snapshot_op(table_name): return None +def rollback_op(table_name, snapshot, rollback_name): + """ + Invoke rollback action + :param table_name: table name + :param snapshot: rollback to a specific snapshot + :return: None + """ + rollback_cmd = '{teracli} snapshot {table_name} rollback --snapshot={snapshot} --rollback_name={rname}'.\ + format(teracli=const.teracli_binary, table_name=table_name, snapshot=snapshot, rname=rollback_name) + print rollback_cmd + ret = subprocess.Popen(rollback_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) def compare_files(file1, file2, need_sort): @@ -241,7 +272,7 @@ def compare_files(file1, file2, need_sort): print ''.join(ret.stdout.readlines()) os.rename(file1+'.sort', file1) os.rename(file2+'.sort', file2) - return filecmp.cmp(file1, file2) + return filecmp.cmp(file1, file2, shallow=False) def file_is_empty(file_path): @@ -256,4 +287,3 @@ def file_is_empty(file_path): def cleanup_files(file_list): for file_path in file_list: os.remove(file_path) - diff --git a/test/testcase/test_data.py b/test/testcase/test_data.py index 7e405a7ef..a02b56619 100644 --- a/test/testcase/test_data.py +++ b/test/testcase/test_data.py @@ -26,7 +26,7 @@ def test_kv_random_write(): nose.tools.assert_true(common.compare_files(dump_file, scan_file, need_sort=True)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) def test_table_random_write(): """ table write simple @@ -43,7 +43,7 @@ def test_table_random_write(): nose.tools.assert_true(common.compare_files(dump_file, scan_file, need_sort=True)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_multiversion_table) def test_table_random_write_versions(): """ table write w/versions @@ -53,17 +53,20 @@ def test_table_random_write_versions(): :return: None """ table_name = 'test' - dump_file = 'dump.out' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' scan_file = 'scan.out' - common.run_tera_mark([(dump_file, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', - key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) - common.run_tera_mark([(dump_file, True)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', + random='random', key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + common.run_tera_mark([(dump_file1, True), (dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) common.scan_table(table_name=table_name, file_path=scan_file, allversion=True) - nose.tools.assert_true(common.compare_files(dump_file, scan_file, need_sort=True)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) def test_table_write_delete(): """ table write and deletion @@ -82,7 +85,7 @@ def test_table_write_delete(): nose.tools.assert_true(common.file_is_empty(scan_file)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_multiversion_table, common.cleanup) def test_table_write_delete_version(): """ table write and deletion w/versions diff --git a/test/testcase/test_snapshot.py b/test/testcase/test_snapshot.py index 7f4d09e4c..9bfe751be 100644 --- a/test/testcase/test_snapshot.py +++ b/test/testcase/test_snapshot.py @@ -5,16 +5,17 @@ """ import nose +import time import common -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) def test_table_write_snapshot(): """ table write w/snapshot 1. write data set 1 - 2. take snapshot + 2. create snapshot 3. write data set 2 4. scan w/snapshot, scan w/o snapshot & compare :return: None @@ -36,12 +37,12 @@ def test_table_write_snapshot(): nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) def test_table_write_del_snapshot(): """ table write deletion w/snapshot 1. write data set 1 - 2. take snapshot + 2. create snapshot 3. delete data set 1 4. scan w/snapshot, scan w/o snapshot & compare :return: None @@ -62,12 +63,12 @@ def test_table_write_del_snapshot(): nose.tools.assert_true(common.file_is_empty(scan_file2)) -@nose.tools.with_setup(common.create_table, common.cleanup) +@nose.tools.with_setup(common.create_multiversion_table, common.cleanup) def test_table_write_multiversion_snapshot(): """ table write w/version w/snapshot 1. write data set 1, 2 - 2. take snapshot + 2. create snapshot 3. write data set 3, 4 4. scan w/snapshot, scan w/o snapshot & compare :return: None @@ -91,3 +92,76 @@ def test_table_write_multiversion_snapshot(): common.scan_table(table_name=table_name, file_path=scan_file2, allversion=True, snapshot=0) nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) + + +@nose.tools.with_setup(common.create_kv_table, common.cleanup) +def test_kv_snapshot_relaunch(): + """ + kv cluster relaunch + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan w/snapshot, scan w/o snapshot & compare + 5. kill & launch cluster + 6. repeat 4 + :return: None + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file1 = 'scan1.out' + scan_file2 = 'scan2.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.compact_tablets(common.get_tablet_list(table_name)) + common.scan_table(table_name=table_name, file_path=scan_file1, allversion=True, snapshot=snapshot) + common.scan_table(table_name=table_name, file_path=scan_file2, allversion=True, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + common.scan_table(table_name=table_name, file_path=scan_file1, allversion=False, snapshot=snapshot) + common.scan_table(table_name=table_name, file_path=scan_file2, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) + + +@nose.tools.with_setup(common.create_singleversion_table) +def test_table_snapshot_relaunch(): + """ + table cluster relaunch + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan w/snapshot, scan w/o snapshot & compare + 5. kill & launch cluster + 6. repeat 4 + :return: None + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file1 = 'scan1.out' + scan_file2 = 'scan2.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.compact_tablets(common.get_tablet_list(table_name)) + common.scan_table(table_name=table_name, file_path=scan_file1, allversion=True, snapshot=snapshot) + common.scan_table(table_name=table_name, file_path=scan_file2, allversion=True, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + common.scan_table(table_name=table_name, file_path=scan_file1, allversion=True, snapshot=snapshot) + common.scan_table(table_name=table_name, file_path=scan_file2, allversion=True, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) From 80922105b815f6da0d2ba438f9e17ecc1276663d Mon Sep 17 00:00:00 2001 From: lylei Date: Mon, 19 Oct 2015 17:21:41 +0800 Subject: [PATCH 3/5] issue=#324 rollback write meta with defualt value first --- src/master/master_impl.cc | 78 +++++++++++++++++++++++++++++++----- src/master/master_impl.h | 10 +++-- src/master/tablet_manager.cc | 15 +++++++ src/master/tablet_manager.h | 2 + 4 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 1020f2103..aca594909 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -3003,15 +3003,71 @@ void MasterImpl::GetRollback(const RollbackRequest* request, return; } + std::vector tablets; + table->GetTablet(&tablets); + // write memory and meta with default rollback_point + int sid = table->AddRollback(request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->AddRollback(request->rollback_name(), request->snapshot_id(), + leveldb::kMaxSequenceNumber); + assert(sid == tsid); + } + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, tablets, + FLAGS_tera_master_meta_retry_times, request, response, done); + BatchWriteMetaTableAsync(table, tablets, false, closure); +} + +void MasterImpl::AddDefaultRollbackCallback(TablePtr table, + std::vector tablets, + int32_t retry_times, + const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, + google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + // all the row status should be the same + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to write rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablets[0] << "..."; + } else { + LOG(WARNING) << "fail to write rollback to meta: " + << StatusCodeToString(status) << ", " << tablets[0] << "..."; + } + if (retry_times <= 0) { + rpc_response->set_status(kMetaTabletError); + rpc_done->Run(); + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, + tablets, retry_times - 1, rpc_request, rpc_response, + rpc_done); + SuspendMetaOperation(table, tablets, false, done); + } + return; + } + LOG(INFO) << "Add default rollback " << rpc_request->rollback_name() << " to " + << rpc_request->table_name() << " done"; + RollbackTask* task = new RollbackTask; table->GetTablet(&task->tablets); assert(task->tablets.size()); task->rollback_points.resize(task->tablets.size()); - task->request = request; - task->response = response; - task->done = done; + task->request = rpc_request; + task->response = rpc_response; + task->done = rpc_done; task->table = table; task->task_num = 0; task->finish_num = 0; @@ -3022,13 +3078,13 @@ void MasterImpl::GetRollback(const RollbackRequest* request, ++task->task_num; RollbackClosure* closure = NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); - RollbackAsync(tablet, request->snapshot_id(), 3000, closure); + RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure); } if (task->task_num == 0) { - LOG(WARNING) << "fail to rollback to snapshot: " << request->table_name() + LOG(WARNING) << "fail to rollback to snapshot: " << rpc_request->table_name() << ", all tables kTabletNodeOffLine"; response->set_status(kTabletNodeOffLine); - done->Run(); + rpc_done->Run(); return; } } @@ -3076,11 +3132,11 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, } else { task->rollback_points[tablet_id] = master_response->rollback_point(); LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->AddRollback(task->request->rollback_name()); + int sid = task->table->GetRollbackSize(); for (uint32_t i = 0; i < task->tablets.size(); ++i) { - int tsid = task->tablets[i]->AddRollback(task->request->rollback_name(), - master_request->snapshot_id(), - task->rollback_points[i]); + int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(), + master_request->snapshot_id(), + task->rollback_points[i]); assert(sid == tsid); } WriteClosure* closure = @@ -3133,7 +3189,7 @@ void MasterImpl::AddRollbackCallback(TablePtr table, return; } LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() - << ", write meta " << rpc_request->snapshot_id() << " done"; + << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; rpc_response->set_status(kMasterOk); rpc_done->Run(); } diff --git a/src/master/master_impl.h b/src/master/master_impl.h index 1629edae4..967f58143 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -297,8 +297,13 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout, - RollbackClosure* done); + void AddDefaultRollbackCallback(TablePtr table, std::vector tablets, + int32_t retry_times, const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, WriteTabletResponse* response, + bool failed, int error_code); + void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, + int32_t timeout, RollbackClosure* done); void RollbackCallback(int32_t tablet_id, RollbackTask* task, SnapshotRollbackRequest* master_request, SnapshotRollbackResponse* master_response, @@ -312,7 +317,6 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void ScheduleQueryTabletNode(); void QueryTabletNode(); void QueryTabletNodeAsync(std::string addr, int32_t timeout, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index e4f51736c..74f2f8e28 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -365,6 +365,21 @@ void Tablet::ListRollback(std::vector* rollbacks) { } } +int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { + MutexLock lock(&m_mutex); + bool has_rollback_name = false; + for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { + Rollback cur_rollback = m_meta.rollbacks(i); + if (cur_rollback.name() == name) { + has_rollback_name = true; + assert(cur_rollback.snapshot_id() == snapshot_id); + cur_rollback.set_rollback_point(rollback_point); + } + } + assert(has_rollback_name); + return m_meta.rollbacks_size() - 1; +} + bool Tablet::IsBound() { TablePtr null_ptr; if (m_table != null_ptr) { diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index d1aee8091..bc752a28f 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -107,6 +107,7 @@ class Tablet { void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); void ListRollback(std::vector* rollbacks); // is belong to a table? @@ -177,6 +178,7 @@ class Table { void ListSnapshot(std::vector* snapshots); int32_t AddRollback(std::string rollback_name); void ListRollback(std::vector* rollback_names); + int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, From 50050466571ebb9074c97332fed686630ac5df62 Mon Sep 17 00:00:00 2001 From: lylei Date: Tue, 20 Oct 2015 16:50:50 +0800 Subject: [PATCH 4/5] issue=#382 add rollback cases --- test/testcase/common.py | 15 +++++++++++++++ test/testcase/conf.py | 1 + 2 files changed, 16 insertions(+) diff --git a/test/testcase/common.py b/test/testcase/common.py index 973c39246..ea19fe1e4 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -100,6 +100,21 @@ def create_multiversion_table(): print ''.join(ret.stdout.readlines()) +def createbyfile(schema, deli=''): + """ + This function creates a table according to a specified schema + :param schema: schema file path + :param deli: deli file path + :return: None + """ + + cleanup() + create_cmd = '{teracli} createbyfile {schema} {deli}'.format(teracli=const.teracli_binary, schema=schema, deli=deli) + print create_cmd + ret = subprocess.Popen(create_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + + def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, cf='', key_seed=1, value_seed=1): """ This function provide means to write data into Tera and dump a copy into a specified file at the same time. diff --git a/test/testcase/conf.py b/test/testcase/conf.py index cc0f0a639..d5fbeee18 100644 --- a/test/testcase/conf.py +++ b/test/testcase/conf.py @@ -12,5 +12,6 @@ def __init__(self): self.teracli_binary = './teracli' self.kill_script = './kill_tera.sh' self.launch_script = './launch_tera.sh' + self.data_path = 'testcase/data/' const = Const() From 34adc4c04df037bb6c6069b401bc93b92688ff70 Mon Sep 17 00:00:00 2001 From: lylei Date: Wed, 21 Oct 2015 14:56:47 +0800 Subject: [PATCH 5/5] issue=#382 add rollback cases --- src/master/tablet_manager.cc | 24 +++--- test/testcase/test_rollback.py | 151 +++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 10 deletions(-) create mode 100644 test/testcase/test_rollback.py diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 4165ba721..545335c0a 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -362,22 +362,26 @@ void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); for (int i = 0; i < m_meta.rollbacks_size(); i++) { rollbacks->push_back(m_meta.rollbacks(i)); + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); } } int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { - MutexLock lock(&m_mutex); - bool has_rollback_name = false; - for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { - Rollback cur_rollback = m_meta.rollbacks(i); - if (cur_rollback.name() == name) { + MutexLock lock(&m_mutex); + bool has_rollback_name = false; + for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { + Rollback* cur_rollback = m_meta.mutable_rollbacks(i); + if (cur_rollback->name() == name) { has_rollback_name = true; - assert(cur_rollback.snapshot_id() == snapshot_id); - cur_rollback.set_rollback_point(rollback_point); + assert(cur_rollback->snapshot_id() == snapshot_id); + cur_rollback->set_rollback_point(rollback_point); } - } - assert(has_rollback_name); - return m_meta.rollbacks_size() - 1; + } + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); + } + assert(has_rollback_name); + return m_meta.rollbacks_size() - 1; } bool Tablet::IsBound() { diff --git a/test/testcase/test_rollback.py b/test/testcase/test_rollback.py new file mode 100644 index 000000000..d05f0f877 --- /dev/null +++ b/test/testcase/test_rollback.py @@ -0,0 +1,151 @@ +""" +Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. +""" + +import nose +import time + +import common +from conf import const + + +@nose.tools.with_setup(common.create_kv_table, common.cleanup) +def test_rollback_kv(): + """ + test kv rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table(): + """ + test table rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + +@nose.tools.with_setup(common.create_kv_table) +def test_rollback_kv_relaunch(): + """ + test kv rollback w/relaunch + 1. test_rollback_kv() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_kv() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table_relaunch(): + """ + test table rollback w/relaunch + 1. test_rollback_table() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_table() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_kv_multitablets(): + """ + test kv rollback w/multi tablets + 1. test_rollback_kv_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'kv.schema', deli=const.data_path + 'deli.10') + test_rollback_kv_relaunch() + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_table_multitablets(): + """ + test table rollback w/multi tablets + 1. test_rollback_table_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'table.schema', deli=const.data_path + 'deli.10') + test_rollback_table_relaunch()