diff --git a/src/master/master_impl.cc b/src/master/master_impl.cc index 7978668bc..4f5e39670 100644 --- a/src/master/master_impl.cc +++ b/src/master/master_impl.cc @@ -2384,14 +2384,13 @@ void MasterImpl::LoadTabletAsync(TabletPtr tablet, LoadClosure* done, uint64_t) request->add_snapshots_id(snapshot_id[i]); request->add_snapshots_sequence(snapshot_seq[i]); } - std::vector rollback_snapshots; - std::vector rollback_points; - table->ListRollback(&rollback_snapshots); - tablet->ListRollback(&rollback_points); - assert(rollback_snapshots.size() == rollback_points.size()); - for (uint32_t i = 0; i < rollback_snapshots.size(); ++i) { - request->add_rollback_snapshots(rollback_snapshots[i]); - request->add_rollback_points(rollback_points[i]); + std::vector rollback_names; + std::vector rollbacks; + table->ListRollback(&rollback_names); + tablet->ListRollback(&rollbacks); + assert(rollback_names.size() == rollbacks.size()); + for (uint32_t i = 0; i < rollbacks.size(); ++i) { + request->add_rollbacks()->CopyFrom(rollbacks[i]); } TabletMeta meta; @@ -2981,9 +2980,9 @@ void MasterImpl::ReleaseSnapshotCallback(ReleaseSnapshotRequest* request, /// 删掉删不掉无所谓, 不计较~ } -void MasterImpl::Rollback(const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { +void MasterImpl::GetRollback(const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { LOG(INFO) << "MasterImpl Rollback"; response->set_sequence_id(request->sequence_id()); @@ -3005,31 +3004,88 @@ void MasterImpl::Rollback(const RollbackRequest* request, return; } + std::vector tablets; + table->GetTablet(&tablets); + // write memory and meta with default rollback_point + int sid = table->AddRollback(request->rollback_name()); + for (uint32_t i = 0; i < tablets.size(); ++i) { + int tsid = tablets[i]->AddRollback(request->rollback_name(), request->snapshot_id(), + leveldb::kMaxSequenceNumber); + assert(sid == tsid); + } + WriteClosure* closure = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, tablets, + FLAGS_tera_master_meta_retry_times, request, response, done); + BatchWriteMetaTableAsync(table, tablets, false, closure); +} + +void MasterImpl::AddDefaultRollbackCallback(TablePtr table, + std::vector tablets, + int32_t retry_times, + const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, + google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, + WriteTabletResponse* response, + bool failed, int error_code) { + StatusCode status = response->status(); + if (!failed && status == kTabletNodeOk) { + // all the row status should be the same + CHECK_GT(response->row_status_list_size(), 0); + status = response->row_status_list(0); + } + delete request; + delete response; + if (failed || status != kTabletNodeOk) { + if (failed) { + LOG(WARNING) << "fail to write rollback to meta: " + << sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " + << tablets[0] << "..."; + } else { + LOG(WARNING) << "fail to write rollback to meta: " + << StatusCodeToString(status) << ", " << tablets[0] << "..."; + } + if (retry_times <= 0) { + rpc_response->set_status(kMetaTabletError); + rpc_done->Run(); + } else { + WriteClosure* done = + NewClosure(this, &MasterImpl::AddDefaultRollbackCallback, table, + tablets, retry_times - 1, rpc_request, rpc_response, + rpc_done); + SuspendMetaOperation(table, tablets, false, done); + } + return; + } + LOG(INFO) << "Add default rollback " << rpc_request->rollback_name() << " to " + << rpc_request->table_name() << " done"; + RollbackTask* task = new RollbackTask; table->GetTablet(&task->tablets); assert(task->tablets.size()); task->rollback_points.resize(task->tablets.size()); - task->request = request; - task->response = response; - task->done = done; + task->request = rpc_request; + task->response = rpc_response; + task->done = rpc_done; task->table = table; task->task_num = 0; task->finish_num = 0; + task->aborted = false; MutexLock lock(&task->mutex); for (uint32_t i = 0; i < task->tablets.size(); ++i) { TabletPtr tablet = task->tablets[i]; ++task->task_num; RollbackClosure* closure = NewClosure(this, &MasterImpl::RollbackCallback, static_cast(i), task); - RollbackAsync(tablet, request->snapshot_id(), 3000, closure); + RollbackAsync(tablet, rpc_request->snapshot_id(), 3000, closure); } if (task->task_num == 0) { - LOG(WARNING) << "fail to rollback to snapshot: " << request->table_name() + LOG(WARNING) << "fail to rollback to snapshot: " << rpc_request->table_name() << ", all tables kTabletNodeOffLine"; response->set_status(kTabletNodeOffLine); - done->Run(); + rpc_done->Run(); return; } } @@ -3077,18 +3133,21 @@ void MasterImpl::RollbackCallback(int32_t tablet_id, RollbackTask* task, } else { task->rollback_points[tablet_id] = master_response->rollback_point(); LOG(INFO) << "MasterImpl rollback all tablet done"; - int sid = task->table->AddRollback(master_request->snapshot_id()); + int sid = task->table->GetRollbackSize(); for (uint32_t i = 0; i < task->tablets.size(); ++i) { - int tsid = task->tablets[i]->AddRollback(task->rollback_points[i]); + int tsid = task->tablets[i]->UpdateRollback(task->request->rollback_name(), + master_request->snapshot_id(), + task->rollback_points[i]); assert(sid == tsid); } WriteClosure* closure = NewClosure(this, &MasterImpl::AddRollbackCallback, - task->table, task->tablets, - FLAGS_tera_master_meta_retry_times, - task->request, task->response, task->done); + task->table, task->tablets, + FLAGS_tera_master_meta_retry_times, + task->request, task->response, task->done); BatchWriteMetaTableAsync(task->table, task->tablets, false, closure); } + task->mutex.Unlock(); delete task; } @@ -3130,8 +3189,8 @@ void MasterImpl::AddRollbackCallback(TablePtr table, } return; } - LOG(INFO) << "Rollback " << rpc_request->table_name() - << ", write meta " << rpc_request->snapshot_id() << " done"; + LOG(INFO) << "Rollback " << rpc_request->rollback_name() << " to " << rpc_request->table_name() + << ", write meta with snpashot_id " << rpc_request->snapshot_id() << " done"; rpc_response->set_status(kMasterOk); rpc_done->Run(); } diff --git a/src/master/master_impl.h b/src/master/master_impl.h index dc763c802..971a7901b 100644 --- a/src/master/master_impl.h +++ b/src/master/master_impl.h @@ -79,7 +79,7 @@ class MasterImpl { DelSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(const RollbackRequest* request, + void GetRollback(const RollbackRequest* request, RollbackResponse* response, google::protobuf::Closure* done); @@ -297,8 +297,13 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, int32_t timeout, - RollbackClosure* done); + void AddDefaultRollbackCallback(TablePtr table, std::vector tablets, + int32_t retry_times, const RollbackRequest* rpc_request, + RollbackResponse* rpc_response, google::protobuf::Closure* rpc_done, + WriteTabletRequest* request, WriteTabletResponse* response, + bool failed, int error_code); + void RollbackAsync(TabletPtr tablet, uint64_t snapshot_id, + int32_t timeout, RollbackClosure* done); void RollbackCallback(int32_t tablet_id, RollbackTask* task, SnapshotRollbackRequest* master_request, SnapshotRollbackResponse* master_response, @@ -312,7 +317,6 @@ class MasterImpl { WriteTabletRequest* request, WriteTabletResponse* response, bool failed, int error_code); - void ScheduleQueryTabletNode(); void QueryTabletNode(); void QueryTabletNodeAsync(std::string addr, int32_t timeout, diff --git a/src/master/remote_master.cc b/src/master/remote_master.cc index db7f1c142..648f3d42c 100644 --- a/src/master/remote_master.cc +++ b/src/master/remote_master.cc @@ -46,10 +46,10 @@ void RemoteMaster::DelSnapshot(google::protobuf::RpcController* controller, m_thread_pool->AddTask(callback); } -void RemoteMaster::Rollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { +void RemoteMaster::GetRollback(google::protobuf::RpcController* controller, + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { ThreadPool::Task callback = boost::bind(&RemoteMaster::DoRollback, this, controller, request, response, done); @@ -187,11 +187,11 @@ void RemoteMaster::DoDelSnapshot(google::protobuf::RpcController* controller, } void RemoteMaster::DoRollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done) { + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done) { LOG(INFO) << "accept RPC (Rollback)"; - m_master_impl->Rollback(request, response, done); + m_master_impl->GetRollback(request, response, done); LOG(INFO) << "finish RPC (Rollback)"; } diff --git a/src/master/remote_master.h b/src/master/remote_master.h index 70cd5d849..738a3258b 100644 --- a/src/master/remote_master.h +++ b/src/master/remote_master.h @@ -30,10 +30,10 @@ class RemoteMaster : public MasterServer { DelSnapshotResponse* response, google::protobuf::Closure* done); - void Rollback(google::protobuf::RpcController* controller, - const RollbackRequest* request, - RollbackResponse* response, - google::protobuf::Closure* done); + void GetRollback(google::protobuf::RpcController* controller, + const RollbackRequest* request, + RollbackResponse* response, + google::protobuf::Closure* done); void CreateTable(google::protobuf::RpcController* controller, const CreateTableRequest* request, diff --git a/src/master/tablet_manager.cc b/src/master/tablet_manager.cc index 01d7d24c2..545335c0a 100644 --- a/src/master/tablet_manager.cc +++ b/src/master/tablet_manager.cc @@ -348,19 +348,42 @@ void Tablet::DelSnapshot(int32_t id) { snapshot_list->RemoveLast(); } -int32_t Tablet::AddRollback(uint64_t rollback_point) { +int32_t Tablet::AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { MutexLock lock(&m_mutex); - m_meta.add_rollback_points(rollback_point); - return m_meta.rollback_points_size() - 1; + Rollback rollback; + rollback.set_name(name); + rollback.set_snapshot_id(snapshot_id); + rollback.set_rollback_point(rollback_point); + m_meta.add_rollbacks()->CopyFrom(rollback); + return m_meta.rollbacks_size() - 1; } -void Tablet::ListRollback(std::vector* rollback_points) { +void Tablet::ListRollback(std::vector* rollbacks) { MutexLock lock(&m_mutex); - for (int i = 0; i < m_meta.rollback_points_size(); i++) { - rollback_points->push_back(m_meta.rollback_points(i)); + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + rollbacks->push_back(m_meta.rollbacks(i)); + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); } } +int32_t Tablet::UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point) { + MutexLock lock(&m_mutex); + bool has_rollback_name = false; + for (int32_t i = 0; i < m_meta.rollbacks_size(); ++i) { + Rollback* cur_rollback = m_meta.mutable_rollbacks(i); + if (cur_rollback->name() == name) { + has_rollback_name = true; + assert(cur_rollback->snapshot_id() == snapshot_id); + cur_rollback->set_rollback_point(rollback_point); + } + } + for (int i = 0; i < m_meta.rollbacks_size(); i++) { + VLOG(11) << "rollback " << m_meta.path() << ": " << m_meta.rollbacks(i).ShortDebugString(); + } + assert(has_rollback_name); + return m_meta.rollbacks_size() - 1; +} + bool Tablet::IsBound() { TablePtr null_ptr; if (m_table != null_ptr) { @@ -628,15 +651,15 @@ void Table::ListSnapshot(std::vector* snapshots) { *snapshots = m_snapshot_list; } -int32_t Table::AddRollback(uint64_t snapshot_id) { +int32_t Table::AddRollback(std::string rollback_name) { MutexLock lock(&m_mutex); - m_rollback_snapshots.push_back(snapshot_id); - return m_rollback_snapshots.size() - 1; + m_rollback_names.push_back(rollback_name); + return m_rollback_names.size() - 1; } -void Table::ListRollback(std::vector* snapshots) { +void Table::ListRollback(std::vector* rollback_names) { MutexLock lock(&m_mutex); - *snapshots = m_rollback_snapshots; + *rollback_names = m_rollback_names; } void Table::AddDeleteTabletCount() { @@ -668,8 +691,8 @@ void Table::ToMeta(TableMeta* meta) { for (size_t i = 0; i < m_snapshot_list.size(); i++) { meta->add_snapshot_list(m_snapshot_list[i]); } - for (size_t i = 0; i < m_rollback_snapshots.size(); ++i) { - meta->add_rollback_snapshot(m_rollback_snapshots[i]); + for (size_t i = 0; i < m_rollback_names.size(); ++i) { + meta->add_rollback_names(m_rollback_names[i]); } } @@ -764,9 +787,9 @@ bool TabletManager::AddTable(const std::string& table_name, (*table)->m_snapshot_list.push_back(meta.snapshot_list(i)); LOG(INFO) << table_name << " add snapshot " << meta.snapshot_list(i); } - for (int32_t i = 0; i < meta.rollback_snapshot_size(); ++i) { - (*table)->m_rollback_snapshots.push_back(meta.rollback_snapshot(i)); - LOG(INFO) << table_name << " add rollback " << meta.rollback_snapshot(i); + for (int32_t i = 0; i < meta.rollback_names_size(); ++i) { + (*table)->m_rollback_names.push_back(meta.rollback_names(i)); + LOG(INFO) << table_name << " add rollback " << meta.rollback_names(i); } (*table)->m_mutex.Unlock(); return true; diff --git a/src/master/tablet_manager.h b/src/master/tablet_manager.h index 61c075991..bc752a28f 100644 --- a/src/master/tablet_manager.h +++ b/src/master/tablet_manager.h @@ -106,8 +106,9 @@ class Tablet { int32_t AddSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshot); void DelSnapshot(int32_t id); - int32_t AddRollback(uint64_t rollback_point); - void ListRollback(std::vector* rollback_points); + int32_t AddRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + int32_t UpdateRollback(std::string name, uint64_t snapshot_id, uint64_t rollback_point); + void ListRollback(std::vector* rollbacks); // is belong to a table? bool IsBound(); @@ -175,8 +176,9 @@ class Table { int32_t AddSnapshot(uint64_t snapshot); int32_t DelSnapshot(uint64_t snapshot); void ListSnapshot(std::vector* snapshots); - int32_t AddRollback(uint64_t rollback_snapshot); - void ListRollback(std::vector* rollback_snapshots); + int32_t AddRollback(std::string rollback_name); + void ListRollback(std::vector* rollback_names); + int32_t GetRollbackSize() {return m_rollback_names.size() - 1;} void AddDeleteTabletCount(); bool NeedDelete(); void ToMetaTableKeyValue(std::string* packed_key = NULL, @@ -195,7 +197,7 @@ class Table { std::string m_name; TableSchema m_schema; std::vector m_snapshot_list; - std::vector m_rollback_snapshots; + std::vector m_rollback_names; TableStatus m_status; uint32_t m_deleted_tablet_num; uint64_t m_max_tablet_no; diff --git a/src/proto/master_client.cc b/src/proto/master_client.cc index 8f0201330..c2012630d 100644 --- a/src/proto/master_client.cc +++ b/src/proto/master_client.cc @@ -39,12 +39,12 @@ bool MasterClient::DelSnapshot(const DelSnapshotRequest* request, "DelSnapshot", m_rpc_timeout); } -bool MasterClient::Rollback(const RollbackRequest* request, - RollbackResponse* response) { - return SendMessageWithRetry(&MasterServer::Stub::Rollback, +bool MasterClient::GetRollback(const RollbackRequest* request, + RollbackResponse* response) { + return SendMessageWithRetry(&MasterServer::Stub::GetRollback, request, response, (Closure*)NULL, - "Rollback", m_rpc_timeout); + "GetRollback", m_rpc_timeout); } bool MasterClient::CreateTable(const CreateTableRequest* request, diff --git a/src/proto/master_client.h b/src/proto/master_client.h index 5e53cff3d..95e35054c 100644 --- a/src/proto/master_client.h +++ b/src/proto/master_client.h @@ -29,8 +29,8 @@ class MasterClient : public RpcClient { virtual bool DelSnapshot(const DelSnapshotRequest* request, DelSnapshotResponse* response); - virtual bool Rollback(const RollbackRequest* request, - RollbackResponse* response); + virtual bool GetRollback(const RollbackRequest* request, + RollbackResponse* response); virtual bool CreateTable(const CreateTableRequest* request, CreateTableResponse* response); diff --git a/src/proto/master_rpc.proto b/src/proto/master_rpc.proto index 4ad215e25..1674a33f8 100644 --- a/src/proto/master_rpc.proto +++ b/src/proto/master_rpc.proto @@ -199,6 +199,7 @@ message RollbackRequest { optional uint64 sequence_id = 1; optional string table_name = 2; optional uint64 snapshot_id = 3; + optional string rollback_name = 4; } message RollbackResponse { @@ -237,7 +238,7 @@ message RenameTableResponse { service MasterServer { rpc GetSnapshot(GetSnapshotRequest) returns(GetSnapshotResponse); rpc DelSnapshot(DelSnapshotRequest) returns(DelSnapshotResponse); - rpc Rollback(RollbackRequest) returns (RollbackResponse); + rpc GetRollback(RollbackRequest) returns (RollbackResponse); rpc CreateTable(CreateTableRequest) returns(CreateTableResponse); rpc DeleteTable(DeleteTableRequest) returns (DeleteTableResponse); rpc DisableTable(DisableTableRequest) returns (DisableTableResponse); diff --git a/src/proto/table_meta.proto b/src/proto/table_meta.proto index 0c077131c..8828820a1 100644 --- a/src/proto/table_meta.proto +++ b/src/proto/table_meta.proto @@ -32,6 +32,12 @@ message TabletLocation { required string server_addr = 2; } +message Rollback { + optional string name = 1; + optional uint64 snapshot_id = 2; + optional uint64 rollback_point = 3; +} + message ScanOption { optional KeyRange key_range = 1; optional int32 max_version = 2; @@ -65,7 +71,7 @@ message TableMeta { optional TableSchema schema = 3; repeated uint64 snapshot_list = 4; optional uint64 create_time = 5; - repeated uint64 rollback_snapshot = 6; + repeated string rollback_names = 6; } message TabletMeta { @@ -81,7 +87,7 @@ message TabletMeta { repeated uint64 snapshot_list = 11; repeated uint64 parent_tablets = 12; repeated int64 lg_size = 13; - repeated uint64 rollback_points = 14; + repeated Rollback rollbacks = 14; } message TableMetaList { diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index 44d22cdba..53b0df546 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -70,8 +70,7 @@ message LoadTabletRequest { repeated uint64 snapshots_id = 9; repeated uint64 snapshots_sequence = 10; repeated uint64 parent_tablets = 11; - repeated uint64 rollback_snapshots = 12; - repeated uint64 rollback_points = 13; + repeated Rollback rollbacks = 12; } message LoadTabletResponse { diff --git a/src/sdk/client_impl.cc b/src/sdk/client_impl.cc index 95f0a79a3..3d6f83d88 100644 --- a/src/sdk/client_impl.cc +++ b/src/sdk/client_impl.cc @@ -805,16 +805,24 @@ bool ClientImpl::DelSnapshot(const string& name, uint64_t snapshot, ErrorCode* e return false; } -bool ClientImpl::Rollback(const string& name, uint64_t snapshot, ErrorCode* err) { +bool ClientImpl::Rollback(const string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err) { master::MasterClient master_client(_cluster->MasterAddr()); + std::string internal_table_name; + if (!GetInternalTableName(name, err, &internal_table_name)) { + LOG(ERROR) << "faild to scan meta schema"; + return false; + } RollbackRequest request; RollbackResponse response; request.set_sequence_id(0); - request.set_table_name(name); + request.set_table_name(internal_table_name); request.set_snapshot_id(snapshot); + request.set_rollback_name(rollback_name); + std::cout << name << " " << rollback_name << std::endl; - if (master_client.Rollback(&request, &response)) { + if (master_client.GetRollback(&request, &response)) { if (response.status() == kMasterOk) { std::cout << name << " rollback to snapshot sucessfully" << std::endl; return true; diff --git a/src/sdk/client_impl.h b/src/sdk/client_impl.h index 58498ffd0..3a2a0c49d 100644 --- a/src/sdk/client_impl.h +++ b/src/sdk/client_impl.h @@ -86,7 +86,8 @@ class ClientImpl : public Client { virtual bool DelSnapshot(const string& name, uint64_t snapshot, ErrorCode* err); - virtual bool Rollback(const string& name, uint64_t snapshot, ErrorCode* err); + virtual bool Rollback(const string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err); virtual bool CmdCtrl(const string& command, const std::vector& arg_list, diff --git a/src/sdk/tera.h b/src/sdk/tera.h index e035745a7..ad2b1bdb0 100644 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -672,8 +672,9 @@ class Client { virtual bool IsTableEmpty(const std::string& table_name, ErrorCode* err) = 0; virtual bool GetSnapshot(const std::string& name, uint64_t* snapshot, ErrorCode* err) = 0; - virtual bool DelSnapshot(const std::string& name, uint64_t snapshot, ErrorCode* err) = 0; - virtual bool Rollback(const std::string& name, uint64_t snapshot, ErrorCode* err) = 0; + virtual bool DelSnapshot(const std::string& name, uint64_t snapshot,ErrorCode* err) = 0; + virtual bool Rollback(const std::string& name, uint64_t snapshot, + const std::string& rollback_name, ErrorCode* err) = 0; virtual bool CmdCtrl(const std::string& command, const std::vector& arg_list, diff --git a/src/tabletnode/tabletnode_impl.cc b/src/tabletnode/tabletnode_impl.cc index 27ad2f2f8..56180e3d7 100644 --- a/src/tabletnode/tabletnode_impl.cc +++ b/src/tabletnode/tabletnode_impl.cc @@ -230,11 +230,12 @@ void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request, } // to recover rollbacks - assert(request->rollback_snapshots_size() == request->rollback_points_size()); std::map rollbacks; - int32_t num_of_rollbacks = request->rollback_snapshots_size(); + int32_t num_of_rollbacks = request->rollbacks_size(); for (int32_t i = 0; i < num_of_rollbacks; ++i) { - rollbacks[request->rollback_snapshots(i)] = request->rollback_points(i); + rollbacks[request->rollbacks(i).snapshot_id()] = request->rollbacks(i).rollback_point(); + VLOG(10) << "load tablet with rollback " << request->rollbacks(i).snapshot_id() + << "-" << request->rollbacks(i).rollback_point(); } LOG(INFO) << "start load tablet, id: " << request->sequence_id() diff --git a/src/teracli_main.cc b/src/teracli_main.cc index bcaff1e44..818794282 100644 --- a/src/teracli_main.cc +++ b/src/teracli_main.cc @@ -46,6 +46,7 @@ DEFINE_bool(tera_client_scan_async_enabled, false, "enable the streaming scan mo DEFINE_int64(scan_pack_interval, 5000, "scan timeout"); DEFINE_int64(snapshot, 0, "read | scan snapshot"); DEFINE_string(rollback_switch, "close", "Pandora's box, do not open"); +DEFINE_string(rollback_name, "", "rollback operation's name"); volatile int32_t g_start_time = 0; volatile int32_t g_end_time = 0; @@ -1732,7 +1733,14 @@ int32_t SnapshotOp(Client* client, int32_t argc, char** argv, ErrorCode* err) { } std::cout << "new snapshot: " << snapshot << std::endl; } else if (FLAGS_rollback_switch == "open" && strcmp(argv[3], "rollback") == 0) { - if (!client->Rollback(tablename, FLAGS_snapshot, err)) { + if (FLAGS_snapshot == 0) { + std::cerr << "missing or invalid --snapshot option" << std::endl; + return -1; + } else if (FLAGS_rollback_name == "") { + std::cerr << "missing or invalid --rollback_name option" << std::endl; + return -1; + } + if (!client->Rollback(tablename, FLAGS_snapshot, FLAGS_rollback_name, err)) { LOG(ERROR) << "fail to rollback to snapshot: " << err->GetReason(); return -1; } diff --git a/test/testcase/common.py b/test/testcase/common.py index 22e03b5d5..f15a33f9f 100644 --- a/test/testcase/common.py +++ b/test/testcase/common.py @@ -83,7 +83,7 @@ def cluster_op(op): def create_kv_table(): - print 'print kv table' + print 'create kv table' cleanup() ret = subprocess.Popen(const.teracli_binary + ' create test', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) print ''.join(ret.stdout.readlines()) @@ -91,7 +91,7 @@ def create_kv_table(): def create_singleversion_table(): - print 'print single version table' + print 'create single version table' cleanup() ret = subprocess.Popen(const.teracli_binary + ' create "test{cf0, cf1}"', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -100,7 +100,7 @@ def create_singleversion_table(): def create_multiversion_table(): - print 'print multi version table' + print 'create multi version table' cleanup() ret = subprocess.Popen(const.teracli_binary + ' create "test{cf0, cf1}"', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -124,6 +124,21 @@ def createbyfile(schema, deli=''): print ''.join(ret.stderr.readlines()) +def createbyfile(schema, deli=''): + """ + This function creates a table according to a specified schema + :param schema: schema file path + :param deli: deli file path + :return: None + """ + + cleanup() + create_cmd = '{teracli} createbyfile {schema} {deli}'.format(teracli=const.teracli_binary, schema=schema, deli=deli) + print create_cmd + ret = subprocess.Popen(create_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + + def run_tera_mark(file_path, op, table_name, random, value_size, num, key_size, cf='', key_seed=1, value_seed=1): """ This function provide means to write data into Tera and dump a copy into a specified file at the same time. @@ -296,6 +311,20 @@ def snapshot_op(table_name): return None +def rollback_op(table_name, snapshot, rollback_name): + """ + Invoke rollback action + :param table_name: table name + :param snapshot: rollback to a specific snapshot + :return: None + """ + rollback_cmd = '{teracli} snapshot {table_name} rollback --snapshot={snapshot} --rollback_name={rname}'.\ + format(teracli=const.teracli_binary, table_name=table_name, snapshot=snapshot, rname=rollback_name) + print rollback_cmd + ret = subprocess.Popen(rollback_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + print ''.join(ret.stdout.readlines()) + + def compare_files(file1, file2, need_sort): """ This function compares two files. @@ -312,7 +341,7 @@ def compare_files(file1, file2, need_sort): print ''.join(ret.stderr.readlines()) os.rename(file1+'.sort', file1) os.rename(file2+'.sort', file2) - return filecmp.cmp(file1, file2) + return filecmp.cmp(file1, file2, shallow=False) def file_is_empty(file_path): diff --git a/test/testcase/test_data.py b/test/testcase/test_data.py index 0a9058a79..cab645744 100644 --- a/test/testcase/test_data.py +++ b/test/testcase/test_data.py @@ -56,9 +56,9 @@ def test_table_random_write_versions(): dump_file1 = 'dump1.out' dump_file2 = 'dump2.out' scan_file = 'scan.out' - common.run_tera_mark([(dump_file1, False), (dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) - common.run_tera_mark([(dump_file1, True)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + common.run_tera_mark([(dump_file1, True), (dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) common.scan_table(table_name=table_name, file_path=scan_file, allversion=True) nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) diff --git a/test/testcase/test_rollback.py b/test/testcase/test_rollback.py new file mode 100644 index 000000000..d05f0f877 --- /dev/null +++ b/test/testcase/test_rollback.py @@ -0,0 +1,151 @@ +""" +Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. +""" + +import nose +import time + +import common +from conf import const + + +@nose.tools.with_setup(common.create_kv_table, common.cleanup) +def test_rollback_kv(): + """ + test kv rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table(): + """ + test table rollback + 1. write data set 1 + 2. create snapshot + 3. write data set 2 + 4. scan & compare with set 2 + 5. rollback to snapshot + 6. scan & compare snapshot 1 + 7. compact then compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + dump_file2 = 'dump2.out' + scan_file = 'scan.out' + common.run_tera_mark([(dump_file1, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=10, value_size=100, num=10000, key_size=20) + snapshot = common.snapshot_op(table_name) + common.run_tera_mark([(dump_file2, False)], op='w', table_name=table_name, cf='cf0:q,cf1:q', random='random', + key_seed=1, value_seed=11, value_size=100, num=10000, key_size=20) + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file2, scan_file, need_sort=True)) + + common.rollback_op(table_name, snapshot, 'roll') + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + +@nose.tools.with_setup(common.create_kv_table) +def test_rollback_kv_relaunch(): + """ + test kv rollback w/relaunch + 1. test_rollback_kv() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_kv() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(common.create_singleversion_table, common.cleanup) +def test_rollback_table_relaunch(): + """ + test table rollback w/relaunch + 1. test_rollback_table() + 2. relaunch + 3. compare + :return: + """ + table_name = 'test' + dump_file1 = 'dump1.out' + scan_file = 'scan.out' + test_rollback_table() + + common.cluster_op('kill') + common.cluster_op('launch') + time.sleep(2) + + common.scan_table(table_name=table_name, file_path=scan_file, allversion=False, snapshot=0) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=True)) + + common.compact_tablets(common.get_tablet_list(table_name)) + nose.tools.assert_true(common.compare_files(dump_file1, scan_file, need_sort=False)) + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_kv_multitablets(): + """ + test kv rollback w/multi tablets + 1. test_rollback_kv_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'kv.schema', deli=const.data_path + 'deli.10') + test_rollback_kv_relaunch() + + +@nose.tools.with_setup(None, common.cleanup) +def test_rollback_table_multitablets(): + """ + test table rollback w/multi tablets + 1. test_rollback_table_relaunch() + :return: + """ + + common.createbyfile(schema=const.data_path + 'table.schema', deli=const.data_path + 'deli.10') + test_rollback_table_relaunch() diff --git a/test/testcase/test_snapshot.py b/test/testcase/test_snapshot.py index 21fa55e92..24c3fa9e6 100644 --- a/test/testcase/test_snapshot.py +++ b/test/testcase/test_snapshot.py @@ -158,6 +158,7 @@ def test_table_snapshot_relaunch(): common.scan_table(table_name=table_name, file_path=scan_file2, allversion=True, snapshot=0) nose.tools.assert_true(common.compare_files(dump_file1, scan_file1, need_sort=True)) nose.tools.assert_true(common.compare_files(dump_file2, scan_file2, need_sort=True)) + common.cluster_op('kill') common.cluster_op('launch') time.sleep(2)