diff --git a/src/full_check/client/client.go b/src/full_check/client/client.go index bd7e74e..c439a2a 100644 --- a/src/full_check/client/client.go +++ b/src/full_check/client/client.go @@ -325,10 +325,11 @@ func (p *RedisClient) PipeLenCommand(keyInfo []*common.Key) ([]int64, error) { if v, ok := ele.(int64); ok { result[i] = v } else { - err := fmt.Errorf("run PipeRawCommand with commands[%s] return element[%v] isn't type int64[%v]", - printCombinList(commands), ele, reflect.TypeOf(ele)) - common.Logger.Error(err) - return nil, err + //err := fmt.Errorf("run PipeRawCommand with commands[%s] return element[%v] isn't type int64[%v]", + // printCombinList(commands), ele, reflect.TypeOf(ele)) + //common.Logger.Error(err) + //return nil, err + result[i] = common.TypeChanged } } } diff --git a/src/full_check/configure/conf.go b/src/full_check/configure/conf.go index 7bf58b4..7b4cc2b 100644 --- a/src/full_check/configure/conf.go +++ b/src/full_check/configure/conf.go @@ -13,6 +13,7 @@ var Opts struct { TargetDBFilterList string `long:"targetdbfilterlist" default:"-1" description:"db white list that need to be compared, -1 means fetch all, \"0;5;15\" means fetch db 0, 5, and 15"` ResultDBFile string `short:"d" long:"db" value-name:"Sqlite3-DB-FILE" default:"result.db" description:"sqlite3 db file for store result. If exist, it will be removed and a new file is created."` ResultFile string `long:"result" value-name:"FILE" description:"store all diff result into the file, format is 'db\tdiff-type\tkey\tfield'"` + ResultBytesLimit int `long:"resultbyteslimit" value-NAME:"RESULT-BYTES-LIMIT" default:"10485760" description:"total bytes allowed to store the temp data in target redis"` CompareTimes string `long:"comparetimes" value-name:"COUNT" default:"3" description:"Total compare count, at least 1. In the first round, all keys will be compared. The subsequent rounds of the comparison will be done on the previous results."` CompareMode int `short:"m" long:"comparemode" default:"2" description:"compare mode, 1: compare full value, 2: only compare value length, 3: only compare keys outline, 4: compare full value, but only compare value length when meets big key"` Id string `long:"id" default:"unknown" description:"used in metric, run id, useless for open source"` diff --git a/src/full_check/full_check/conflict.go b/src/full_check/full_check/conflict.go new file mode 100644 index 0000000..412569f --- /dev/null +++ b/src/full_check/full_check/conflict.go @@ -0,0 +1,217 @@ +package full_check + +import ( + "encoding/json" + "errors" + "fmt" + "full_check/client" + "full_check/common" + conf "full_check/configure" + redigoredis "github.com/garyburd/redigo/redis" + "log" + "math/rand" + "os" + "sync/atomic" + "time" +) + +var ( + random int64 = -1 +) + +type keyInfo struct { + Key string `json:"k"` + Type string `json:"t"` + ConflictType string `json:"ct"` + Db int32 `json:"db"` + SourceLen int64 `json:"sl"` + TargetLen int64 `json:"tl"` +} + +type fieldInfo struct { + Key string `json:"k"` + Field string `json:"f"` + ConflictType string `json:"ct"` +} + +func (p *FullCheck) WriteConflictKey(conflictKey <-chan *common.Key) { + if random == -1 { + rand.Seed(time.Now().UnixNano()) + random = rand.Int63() + } + rc, err := client.NewRedisClient(p.TargetHost, 0) + if err != nil { + log.Fatal("unable to store conflict keys, because the target cluster is unreacheable") + return + } + + var resultfile *os.File + if len(conf.Opts.ResultFile) > 0 { + resultfile, _ = os.OpenFile(conf.Opts.ResultFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + defer resultfile.Close() + } + + conflictKeyTableName, conflictFieldTableName := p.GetCurrentResultTable() + + keyList := fmt.Sprintf("fullcheck:%d:%s:key", random, conflictKeyTableName) + c := 0 + for oneKeyInfo := range conflictKey { + info := &keyInfo{ + Key: string(oneKeyInfo.Key), + Type: oneKeyInfo.Tp.Name, + ConflictType: oneKeyInfo.ConflictType.String(), + Db: p.currentDB, + SourceLen: oneKeyInfo.SourceAttr.ItemCount, + TargetLen: oneKeyInfo.TargetAttr.ItemCount, + } + infoJson, _ := json.Marshal(info) + total := atomic.AddUint64(&(p.conflictBytesUsed), uint64(len(infoJson))) + if total > uint64(conf.Opts.ResultBytesLimit) { + panic(common.Logger.Errorf("too many conflicts!")) + } + _, err := rc.Do("RPUSH", keyList, string(infoJson)) + if err != nil { + panic(common.Logger.Errorf("failed to exec rpush command: ", err)) + } + if c == 0 { + rc.Do("EXPIRE", keyList, 3600*4) + c++ + } + if len(oneKeyInfo.Field) != 0 { + keyFields := []*fieldInfo{} + for i := 0; i < len(oneKeyInfo.Field); i++ { + keyFields = append(keyFields, &fieldInfo{ + Key: info.Key, + Field: string(oneKeyInfo.Field[i].Field), + ConflictType: oneKeyInfo.Field[i].ConflictType.String(), + }) + if p.times == p.CompareCount { + if len(conf.Opts.ResultFile) != 0 { + resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.Field[i].ConflictType.String(), string(oneKeyInfo.Key), string(oneKeyInfo.Field[i].Field))) + } + } + } + fieldsList := fmt.Sprintf("fullcheck:%d:%s:key:%s:fields", random, conflictFieldTableName, info.Key) + fieldsInfo, _ := json.Marshal(keyFields) + atomic.AddUint64(&(p.conflictBytesUsed), uint64(len(fieldsInfo))) + if total > uint64(conf.Opts.ResultBytesLimit) { + panic(common.Logger.Errorf("too many conflicts!")) + } + _, err = rc.Do("SET", fieldsList, string(fieldsInfo), "EX", 3600*4) + } else { + if p.times == p.CompareCount { + if len(conf.Opts.ResultFile) != 0 { + resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.ConflictType.String(), string(oneKeyInfo.Key), "")) + } + } + } + } +} + +func byteSlices(reply interface{}, err error) ([][]byte, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []interface{}: + result := make([][]byte, len(reply)) + for i := range reply { + if reply[i] == nil { + continue + } + p, ok := reply[i].([]byte) + if !ok { + return nil, fmt.Errorf("redigo: unexpected element type for ByteSlices, got type %T", reply[i]) + } + result[i] = p + } + return result, nil + case []byte: + return [][]byte{reply}, nil + case nil: + return nil, errors.New("ErrNil") + case error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for ByteSlices, got type %T", reply) +} + +func (p *FullCheck) ScanFromDB(allkeys chan<- []*common.Key) { + conflictKeyTableName, conflictFieldTableName := p.GetLastResultTable() + keyList := fmt.Sprintf("fullcheck:%d:%s:key", random, conflictKeyTableName) + rc, err := client.NewRedisClient(p.TargetHost, 0) + if err != nil { + log.Fatal("unable to scan conflict keys, because the target redis is unreacheable") + return + } + + keyInfoBatch := []*common.Key{} + for { + result, err := byteSlices(rc.Do("BLPOP", keyList, 1)) + if err == redigoredis.ErrNil || len(result) == 0 || len(result[0]) == 0 { + if len(keyInfoBatch) > 0 { + p.IncrScanStat(len(keyInfoBatch)) + allkeys <- keyInfoBatch + } + close(allkeys) + rc.Do("DEL", keyList) + break + } + if err != nil { + panic(common.Logger.Errorf("failed to exec blpop command: ", err.Error())) + } + + keyInfo := &keyInfo{} + err = json.Unmarshal(result[1], keyInfo) + if err != nil { + panic(common.Logger.Errorf("failed to unmarshal data: ", err)) + } + atomic.AddUint64(&(p.conflictBytesUsed), -uint64(len(result[1]))) + oneKeyInfo := &common.Key{ + Key: []byte(keyInfo.Key), + Tp: common.NewKeyType(keyInfo.Type), + ConflictType: common.NewConflictType(keyInfo.ConflictType), + SourceAttr: common.Attribute{ItemCount: keyInfo.SourceLen}, + TargetAttr: common.Attribute{ItemCount: keyInfo.TargetLen}, + } + if oneKeyInfo.Tp == common.EndKeyType { + panic(common.Logger.Errorf("invalid type from redis %s: key=%s type=%s ", conflictKeyTableName, keyInfo.Key, keyInfo.Type)) + } + if oneKeyInfo.ConflictType == common.EndConflict { + panic(common.Logger.Errorf("invalid conflict_type from redis %s: key=%s conflict_type=%s ", conflictKeyTableName, keyInfo.Key, keyInfo.Type)) + } + + if oneKeyInfo.Tp != common.StringKeyType { + fieldsListKey := fmt.Sprintf("fullcheck:%d:%s:key:%s:fields", random, conflictFieldTableName, oneKeyInfo.Key) + fieldsBytes, err := redigoredis.Bytes(rc.Do("GET", fieldsListKey)) + if err != nil && err != redigoredis.ErrNil { + panic(common.Logger.Errorf("failed to exec get command: ", err)) + } + if err == nil { + keyFields := []*fieldInfo{} + err = json.Unmarshal(fieldsBytes, &keyFields) + if err != nil { + panic(common.Logger.Errorf("failed to unmarshal data: ", err)) + } + for _, field := range keyFields { + oneField := common.Field{ + Field: []byte(field.Field), + ConflictType: common.NewConflictType(field.ConflictType), + } + if oneField.ConflictType == common.EndConflict { + panic(common.Logger.Errorf("invalid conflict_type from redis %s: field=%s type=%s ", conflictFieldTableName, field.Field, field.ConflictType)) + } + oneKeyInfo.Field = append(oneKeyInfo.Field, oneField) + } + rc.Do("DEL", fieldsListKey) + atomic.AddUint64(&(p.conflictBytesUsed), -uint64(len(fieldsBytes))) + } + } + keyInfoBatch = append(keyInfoBatch, oneKeyInfo) + if len(keyInfoBatch) == p.BatchCount { + p.IncrScanStat(len(keyInfoBatch)) + allkeys <- keyInfoBatch + keyInfoBatch = []*common.Key{} + } + } +} diff --git a/src/full_check/full_check/full_check.go b/src/full_check/full_check/full_check.go index e2432a4..4e7c0a4 100644 --- a/src/full_check/full_check/full_check.go +++ b/src/full_check/full_check/full_check.go @@ -6,19 +6,15 @@ import ( "database/sql" "encoding/json" "fmt" - "os" _ "path" - "strconv" "sync" "time" - "full_check/common" - "full_check/metric" "full_check/checker" - "full_check/configure" "full_check/client" - - _ "github.com/mattn/go-sqlite3" + "full_check/common" + "full_check/configure" + "full_check/metric" ) type CheckType int @@ -45,6 +41,8 @@ type FullCheck struct { totalFieldConflict int64 verifier checker.IVerifier + + conflictBytesUsed uint64 } func NewFullCheck(f checker.FullCheckParameter, checktype CheckType) *FullCheck { @@ -211,16 +209,6 @@ func (p *FullCheck) IncrScanStat(a int) { func (p *FullCheck) Start() { var err error - for i := 1; i <= p.CompareCount; i++ { - // init sqlite db - os.Remove(p.ResultDBFile + "." + strconv.Itoa(i)) - p.db[i], err = sql.Open("sqlite3", p.ResultDBFile+"."+strconv.Itoa(i)) - if err != nil { - panic(common.Logger.Critical(err)) - } - defer p.db[i].Close() - } - sourceClient, err := client.NewRedisClient(p.SourceHost, 0) if err != nil { panic(common.Logger.Errorf("create redis client with host[%v] db[%v] error[%v]", @@ -245,7 +233,6 @@ func (p *FullCheck) Start() { } for p.times = 1; p.times <= p.CompareCount; p.times++ { - p.CreateDbTable(p.times) if p.times != 1 { common.Logger.Infof("wait %d seconds before start", p.Interval) time.Sleep(time.Second * time.Duration(p.Interval)) @@ -339,53 +326,6 @@ func (p *FullCheck) GetLastResultTable() (key string, field string) { return fmt.Sprintf("key_%d", p.times-1), fmt.Sprintf("field_%d", p.times-1) } -func (p *FullCheck) CreateDbTable(times int) { - /** create table **/ - conflictKeyTableName, conflictFieldTableName := p.GetCurrentResultTable() - - conflictKeyTableSql := fmt.Sprintf(` -CREATE TABLE %s( - id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, - key TEXT NOT NULL, - type TEXT NOT NULL, - conflict_type TEXT NOT NULL, - db INTEGER NOT NULL, - source_len INTEGER NOT NULL, - target_len INTEGER NOT NULL -); -`, conflictKeyTableName) - _, err := p.db[times].Exec(conflictKeyTableSql) - if err != nil { - panic(common.Logger.Errorf("exec sql %s failed: %s", conflictKeyTableSql, err)) - } - conflictFieldTableSql := fmt.Sprintf(` -CREATE TABLE %s( - id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, - field TEXT NOT NULL, - conflict_type TEXT NOT NULL, - key_id INTEGER NOT NULL -); -`, conflictFieldTableName) - _, err = p.db[times].Exec(conflictFieldTableSql) - if err != nil { - panic(common.Logger.Errorf("exec sql %s failed: %s", conflictFieldTableSql, err)) - } - - conflictResultSql := fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s( - InstanceA TEXT NOT NULL, - InstanceB TEXT NOT NULL, - Key TEXT NOT NULL, - Schema TEXT NOT NULL, - InconsistentType TEXT NOT NULL, - Extra TEXT NOT NULL - );`, "FINAL_RESULT") - _, err = p.db[times].Exec(conflictResultSql) - if err != nil { - panic(common.Logger.Errorf("exec sql %s failed: %s", conflictResultSql, err)) - } -} - func (p *FullCheck) VerifyAllKeyInfo(allKeys <-chan []*common.Key, conflictKey chan<- *common.Key) { sourceClient, err := client.NewRedisClient(p.SourceHost, p.currentDB) if err != nil { @@ -409,103 +349,4 @@ func (p *FullCheck) VerifyAllKeyInfo(allKeys <-chan []*common.Key, conflictKey c } // for oneGroupKeys := range allKeys qos.Close() -} - -func (p *FullCheck) WriteConflictKey(conflictKey <-chan *common.Key) { - conflictKeyTableName, conflictFieldTableName := p.GetCurrentResultTable() - - var resultfile *os.File - if len(conf.Opts.ResultFile) > 0 { - resultfile, _ = os.OpenFile(conf.Opts.ResultFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - defer resultfile.Close() - } - - tx, _ := p.db[p.times].Begin() - statInsertKey, err := tx.Prepare(fmt.Sprintf("insert into %s (key, type, conflict_type, db, source_len, target_len) values(?,?,?,?,?,?)", conflictKeyTableName)) - if err != nil { - panic(common.Logger.Error(err)) - } - statInsertField, err := tx.Prepare(fmt.Sprintf("insert into %s (field, conflict_type, key_id) values (?,?,?)", conflictFieldTableName)) - if err != nil { - panic(common.Logger.Error(err)) - } - - count := 0 - for oneKeyInfo := range conflictKey { - if count%1000 == 0 { - var err error - statInsertKey.Close() - statInsertField.Close() - e := tx.Commit() - if e != nil { - common.Logger.Error(e.Error()) - } - - tx, _ = p.db[p.times].Begin() - statInsertKey, err = tx.Prepare(fmt.Sprintf("insert into %s (key, type, conflict_type, db, source_len, target_len) values(?,?,?,?,?,?)", conflictKeyTableName)) - if err != nil { - panic(common.Logger.Error(err)) - } - - statInsertField, err = tx.Prepare(fmt.Sprintf("insert into %s (field, conflict_type, key_id) values (?,?,?)", conflictFieldTableName)) - if err != nil { - panic(common.Logger.Error(err)) - } - } - count += 1 - - result, err := statInsertKey.Exec(string(oneKeyInfo.Key), oneKeyInfo.Tp.Name, oneKeyInfo.ConflictType.String(), p.currentDB, oneKeyInfo.SourceAttr.ItemCount, oneKeyInfo.TargetAttr.ItemCount) - if err != nil { - panic(common.Logger.Error(err)) - } - if len(oneKeyInfo.Field) != 0 { - lastId, _ := result.LastInsertId() - for i := 0; i < len(oneKeyInfo.Field); i++ { - _, err = statInsertField.Exec(string(oneKeyInfo.Field[i].Field), oneKeyInfo.Field[i].ConflictType.String(), lastId) - if err != nil { - panic(common.Logger.Error(err)) - } - - if p.times == p.CompareCount { - finalstat, err := tx.Prepare(fmt.Sprintf("insert into FINAL_RESULT (InstanceA, InstanceB, Key, Schema, InconsistentType, Extra) VALUES(?, ?, ?, ?, ?, ?)")) - if err != nil { - panic(common.Logger.Error(err)) - } - // defer finalstat.Close() - _, err = finalstat.Exec("", "", string(oneKeyInfo.Key), strconv.Itoa(int(p.currentDB)), - oneKeyInfo.Field[i].ConflictType.String(), - string(oneKeyInfo.Field[i].Field)) - if err != nil { - panic(common.Logger.Error(err)) - } - - finalstat.Close() - - if len(conf.Opts.ResultFile) != 0 { - resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.Field[i].ConflictType.String(), string(oneKeyInfo.Key), string(oneKeyInfo.Field[i].Field))) - } - } - } - } else { - if p.times == p.CompareCount { - finalstat, err := tx.Prepare(fmt.Sprintf("insert into FINAL_RESULT (InstanceA, InstanceB, Key, Schema, InconsistentType, Extra) VALUES(?, ?, ?, ?, ?, ?)")) - if err != nil { - panic(common.Logger.Error(err)) - } - // defer finalstat.Close() - _, err = finalstat.Exec("", "", string(oneKeyInfo.Key), strconv.Itoa(int(p.currentDB)), oneKeyInfo.ConflictType.String(), "") - if err != nil { - panic(common.Logger.Error(err)) - } - finalstat.Close() - - if len(conf.Opts.ResultFile) != 0 { - resultfile.WriteString(fmt.Sprintf("%d\t%s\t%s\t%s\n", int(p.currentDB), oneKeyInfo.ConflictType.String(), string(oneKeyInfo.Key), "")) - } - } - } - } - statInsertKey.Close() - statInsertField.Close() - tx.Commit() -} +} \ No newline at end of file diff --git a/src/full_check/full_check/scan.go b/src/full_check/full_check/scan.go index e1c24ac..e69fed1 100644 --- a/src/full_check/full_check/scan.go +++ b/src/full_check/full_check/scan.go @@ -2,10 +2,9 @@ package full_check import ( "strconv" - "fmt" - "full_check/common" "full_check/client" + "full_check/common" "github.com/jinzhu/copier" "sync" @@ -113,95 +112,4 @@ func (p *FullCheck) ScanFromSourceRedis(allKeys chan<- []*common.Key) { wg.Wait() close(allKeys) -} - -func (p *FullCheck) ScanFromDB(allKeys chan<- []*common.Key) { - conflictKeyTableName, conflictFieldTableName := p.GetLastResultTable() - - keyQuery := fmt.Sprintf("select id,key,type,conflict_type,source_len,target_len from %s where id>? and db=%d limit %d", - conflictKeyTableName, p.currentDB, p.BatchCount) - keyStatm, err := p.db[p.times-1].Prepare(keyQuery) - if err != nil { - panic(common.Logger.Error(err)) - } - defer keyStatm.Close() - - fieldQuery := fmt.Sprintf("select field,conflict_type from %s where key_id=?", conflictFieldTableName) - fieldStatm, err := p.db[p.times-1].Prepare(fieldQuery) - if err != nil { - panic(common.Logger.Error(err)) - } - defer fieldStatm.Close() - - var startId int64 = 0 - for { - rows, err := keyStatm.Query(startId) - if err != nil { - panic(common.Logger.Error(err)) - } - keyInfo := make([]*common.Key, 0, p.BatchCount) - for rows.Next() { - var key, keytype, conflictType string - var id, source_len, target_len int64 - err = rows.Scan(&id, &key, &keytype, &conflictType, &source_len, &target_len) - if err != nil { - panic(common.Logger.Error(err)) - } - oneKeyInfo := &common.Key{ - Key: []byte(key), - Tp: common.NewKeyType(keytype), - ConflictType: common.NewConflictType(conflictType), - SourceAttr: common.Attribute{ItemCount: source_len}, - TargetAttr: common.Attribute{ItemCount: target_len}, - } - if oneKeyInfo.Tp == common.EndKeyType { - panic(common.Logger.Errorf("invalid type from table %s: key=%s type=%s ", conflictKeyTableName, key, keytype)) - } - if oneKeyInfo.ConflictType == common.EndConflict { - panic(common.Logger.Errorf("invalid conflict_type from table %s: key=%s conflict_type=%s ", conflictKeyTableName, key, conflictType)) - } - - if oneKeyInfo.Tp != common.StringKeyType { - oneKeyInfo.Field = make([]common.Field, 0, 10) - rowsField, err := fieldStatm.Query(id) - if err != nil { - panic(common.Logger.Error(err)) - } - for rowsField.Next() { - var field, conflictType string - err = rowsField.Scan(&field, &conflictType) - if err != nil { - panic(common.Logger.Error(err)) - } - oneField := common.Field{ - Field: []byte(field), - ConflictType: common.NewConflictType(conflictType), - } - if oneField.ConflictType == common.EndConflict { - panic(common.Logger.Errorf("invalid conflict_type from table %s: field=%s type=%s ", conflictFieldTableName, field, conflictType)) - } - oneKeyInfo.Field = append(oneKeyInfo.Field, oneField) - } - if err := rowsField.Err(); err != nil { - panic(common.Logger.Error(err)) - } - rowsField.Close() - } - keyInfo = append(keyInfo, oneKeyInfo) - if startId < id { - startId = id - } - } // rows.Next - if err := rows.Err(); err != nil { - panic(common.Logger.Error(err)) - } - rows.Close() - // 结束 - if len(keyInfo) == 0 { - close(allKeys) - break - } - p.IncrScanStat(len(keyInfo)) - allKeys <- keyInfo - } // for{} } \ No newline at end of file