From adaf9429fb6397f77ba53bca5caa66c7c3432e29 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Sun, 2 Nov 2014 21:12:27 -0800 Subject: [PATCH 1/5] add log abstraction to store records in disk --- db/backend.go | 30 +++++++++------------ db/db.go | 2 +- db/log/decoder.go | 34 +++++++++++++++++++++++ db/log/encoder.go | 37 +++++++++++++++++++++++++ db/log/encoder_test.go | 47 ++++++++++++++++++++++++++++++++ db/log/log.go | 51 +++++++++++++++++++++++++++++++++++ db/log/log_test.go | 54 +++++++++++++++++++++++++++++++++++++ db/message/record.go | 17 ++++++++++++ db/message/record_test.go | 36 +++++++++++++++++++++++++ db/recordio/appender.go | 30 --------------------- db/recordio/fetcher.go | 30 --------------------- db/recordio/io_test.go | 55 -------------------------------------- db/recordio/record.go | 46 ------------------------------- db/recordio/record_test.go | 33 ----------------------- 14 files changed, 289 insertions(+), 213 deletions(-) create mode 100644 db/log/decoder.go create mode 100644 db/log/encoder.go create mode 100644 db/log/encoder_test.go create mode 100644 db/log/log.go create mode 100644 db/log/log_test.go create mode 100644 db/message/record.go create mode 100644 db/message/record_test.go delete mode 100644 db/recordio/appender.go delete mode 100644 db/recordio/fetcher.go delete mode 100644 db/recordio/io_test.go delete mode 100644 db/recordio/record.go delete mode 100644 db/recordio/record_test.go diff --git a/db/backend.go b/db/backend.go index a6c17cc..a74b37d 100644 --- a/db/backend.go +++ b/db/backend.go @@ -1,11 +1,10 @@ package db import ( - "io/ioutil" - "os" "strings" - "github.com/go-distributed/xtree/db/recordio" + "github.com/go-distributed/xtree/db/log" + "github.com/go-distributed/xtree/db/message" "github.com/go-distributed/xtree/third-party/github.com/google/btree" ) @@ -13,33 +12,24 @@ type backend struct { bt *btree.BTree cache *cache rev int - fc recordio.Fetcher - ap recordio.Appender + log *log.Log } func newBackend() *backend { bt := btree.New(10) - - // temporary file IO to test in-disk values - writeFile, err := ioutil.TempFile("", "backend") - if err != nil { - panic("can't create temp file") - } - readFile, err := os.Open(writeFile.Name()) + log, err := log.Create() if err != nil { - panic("can't open temp file") + panic("Not implemented") } - return &backend{ bt: bt, cache: newCache(), - fc: recordio.NewFetcher(readFile), - ap: recordio.NewAppender(writeFile), + log: log, } } func (b *backend) getData(offset int64) []byte { - rec, err := b.fc.Fetch(offset) + rec, err := b.log.GetRecord(offset) if err != nil { panic("unimplemented") } @@ -87,7 +77,11 @@ func (b *backend) Put(rev int, path Path, data []byte) { } b.rev++ - offset, err := b.ap.Append(recordio.Record{data}) + offset, err := b.log.Append(&message.Record{ + Rev: b.rev, + Key: path.p, + Data: data, + }) if err != nil { panic("unimplemented") } diff --git a/db/db.go b/db/db.go index 582d737..e3ea960 100644 --- a/db/db.go +++ b/db/db.go @@ -23,7 +23,7 @@ type DB interface { // Otherwise, it lists recursively all paths. // // if count is >= 0, it is the number of paths we want in the list. - // if count is -1, it means any. + // if count is -1, it means all. // // if it failed, an error is returned. Ls(rev int, path string, recursive bool, count int) ([]Path, error) diff --git a/db/log/decoder.go b/db/log/decoder.go new file mode 100644 index 0000000..0178e27 --- /dev/null +++ b/db/log/decoder.go @@ -0,0 +1,34 @@ +package log + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/go-distributed/xtree/db/message" +) + +type decoder struct { + br *bufio.Reader +} + +func newDecoder(r io.Reader) *decoder { + return &decoder{bufio.NewReader(r)} +} + +func (d *decoder) decode(r *message.Record) (err error) { + var l int64 + if l, err = readInt64(d.br); err != nil { + return + } + data := make([]byte, l) + if _, err = io.ReadFull(d.br, data); err != nil { + return + } + return r.Unmarshal(data) +} + +func readInt64(r io.Reader) (n int64, err error) { + err = binary.Read(r, binary.LittleEndian, &n) + return +} diff --git a/db/log/encoder.go b/db/log/encoder.go new file mode 100644 index 0000000..a4c42e9 --- /dev/null +++ b/db/log/encoder.go @@ -0,0 +1,37 @@ +package log + +import ( + "bufio" + "encoding/binary" + "io" + + "github.com/go-distributed/xtree/db/message" +) + +type encoder struct { + bw *bufio.Writer +} + +func newEncoder(w io.Writer) *encoder { + return &encoder{bufio.NewWriter(w)} +} + +func (e *encoder) encode(r *message.Record) (err error) { + var data []byte + if data, err = r.Marshal(); err != nil { + return + } + if err = writeInt64(e.bw, int64(len(data))); err != nil { + return + } + _, err = e.bw.Write(data) + return +} + +func (e *encoder) flush() error { + return e.bw.Flush() +} + +func writeInt64(w io.Writer, n int64) error { + return binary.Write(w, binary.LittleEndian, n) +} diff --git a/db/log/encoder_test.go b/db/log/encoder_test.go new file mode 100644 index 0000000..ad3c3b5 --- /dev/null +++ b/db/log/encoder_test.go @@ -0,0 +1,47 @@ +package log + +import ( + "bytes" + "reflect" + "testing" + + "github.com/go-distributed/xtree/db/message" +) + +func TestEncoderDecoder(t *testing.T) { + tests := []struct { + rec *message.Record + }{ + {&message.Record{ + Rev: 1, + Key: "/test", + Data: []byte("some data"), + }}, + } + + for i, tt := range tests { + var err error + eBuf := new(bytes.Buffer) + encoder := newEncoder(eBuf) + + if err = encoder.encode(tt.rec); err != nil { + t.Fatalf("#%d: cannot encode, err: %v", i, err) + } + if err = encoder.flush(); err != nil { + t.Fatalf("#%d: cannot flush encode, err: %v", i, err) + } + + rec := &message.Record{} + dBuf := bytes.NewBuffer(eBuf.Bytes()) + decoder := newDecoder(dBuf) + + if err = decoder.decode(rec); err != nil { + t.Fatalf("#%d: cannot decode, err: %v", i, err) + } + + if !reflect.DeepEqual(tt.rec, rec) { + t.Fatalf("#%d: records are not the same, want: %v, get: %v", + i, tt.rec, rec) + } + } +} diff --git a/db/log/log.go b/db/log/log.go new file mode 100644 index 0000000..84a6321 --- /dev/null +++ b/db/log/log.go @@ -0,0 +1,51 @@ +package log + +import ( + "io/ioutil" + "os" + + "github.com/go-distributed/xtree/db/message" +) + +type Log struct { + f *os.File +} + +func Create() (*Log, error) { + f, err := ioutil.TempFile("", "backend") + if err != nil { + return nil, err + } + + return &Log{f}, nil +} + +func (l *Log) Destroy() error { + return os.Remove(l.f.Name()) +} + +func (l *Log) GetRecord(offset int64) (*message.Record, error) { + _, err := l.f.Seek(offset, 0) + if err != nil { + return nil, err + } + + decoder := newDecoder(l.f) + r := &message.Record{} + decoder.decode(r) + + return r, nil +} + +func (l *Log) Append(r *message.Record) (offset int64, err error) { + offset, err = l.f.Seek(0, 2) + if err != nil { + return -1, err + } + + encoder := newEncoder(l.f) + + err = encoder.encode(r) + encoder.flush() + return offset, err +} diff --git a/db/log/log_test.go b/db/log/log_test.go new file mode 100644 index 0000000..a033993 --- /dev/null +++ b/db/log/log_test.go @@ -0,0 +1,54 @@ +package log + +import ( + "reflect" + "testing" + + "github.com/go-distributed/xtree/db/message" +) + +func TestAppendAndGetRecord(t *testing.T) { + var err error + var log *Log + if log, err = Create(); err != nil { + t.Errorf("Create failed: %v", err) + } + + defer log.Destroy() + + tests := []struct { + offset int64 + rec *message.Record + }{ + {-1, &message.Record{ + Rev: 1, + Key: "/test", + Data: []byte("some data"), + }}, + {-1, &message.Record{ + Rev: 2, + Key: "/test2", + Data: []byte("some other data"), + }}, + } + + for i, tt := range tests { + tests[i].offset, err = log.Append(tt.rec) + if err != nil { + t.Errorf("#%d: Append failed: %v", i, err) + } + } + + for i, tt := range tests { + var rec *message.Record + if rec, err = log.GetRecord(tt.offset); err != nil { + t.Errorf("#%d: GetRecord failed: %v", i, err) + } + + if !reflect.DeepEqual(tt.rec, rec) { + t.Errorf("#%d: records not the same, want: %v, get %v", + i, tt.rec, rec) + } + + } +} diff --git a/db/message/record.go b/db/message/record.go new file mode 100644 index 0000000..37ec943 --- /dev/null +++ b/db/message/record.go @@ -0,0 +1,17 @@ +package message + +import "encoding/json" + +type Record struct { + Rev int + Key string + Data []byte +} + +func (r *Record) Marshal() ([]byte, error) { + return json.Marshal(r) +} + +func (r *Record) Unmarshal(data []byte) error { + return json.Unmarshal(data, r) +} diff --git a/db/message/record_test.go b/db/message/record_test.go new file mode 100644 index 0000000..6fe957e --- /dev/null +++ b/db/message/record_test.go @@ -0,0 +1,36 @@ +package message + +import ( + "reflect" + "testing" +) + +func TestLogRecord(t *testing.T) { + tests := []struct { + rec *Record + }{ + {&Record{ + Rev: 1, + Key: "/test", + Data: []byte("some data"), + }}, + } + + for i, tt := range tests { + var data []byte + var err error + if data, err = tt.rec.Marshal(); err != nil { + t.Fatalf("#%d: cannot marshal, err: %v", i, err) + } + + rec := &Record{} + if err = rec.Unmarshal(data); err != nil { + t.Fatalf("#%d: cannot unmarshal, err: %v", i, err) + } + + if !reflect.DeepEqual(tt.rec, rec) { + t.Fatalf("#%d: records are not the same, want: %v, get: %v", + i, tt.rec, rec) + } + } +} diff --git a/db/recordio/appender.go b/db/recordio/appender.go deleted file mode 100644 index 05bd537..0000000 --- a/db/recordio/appender.go +++ /dev/null @@ -1,30 +0,0 @@ -package recordio - -import "io" - -type Appender interface { - Append(Record) (int64, error) -} - -type appender struct { - w io.WriteSeeker -} - -func NewAppender(w io.WriteSeeker) Appender { - return &appender{w} -} - -// Not thread-safe -func (ap *appender) Append(r Record) (offset int64, err error) { - offset, err = ap.w.Seek(0, 2) - if err != nil { - return -1, err - } - - err = (&r).encodeTo(ap.w) - if err != nil { - return -1, err - } - - return offset, nil -} diff --git a/db/recordio/fetcher.go b/db/recordio/fetcher.go deleted file mode 100644 index 930dc83..0000000 --- a/db/recordio/fetcher.go +++ /dev/null @@ -1,30 +0,0 @@ -package recordio - -import "io" - -type Fetcher interface { - Fetch(offset int64) (Record, error) -} - -type fetcher struct { - r io.ReadSeeker -} - -func NewFetcher(r io.ReadSeeker) Fetcher { - return &fetcher{r} -} - -func (fc *fetcher) Fetch(offset int64) (Record, error) { - _, err := fc.r.Seek(offset, 0) - if err != nil { - return Record{}, err - } - - r := Record{} - err = (&r).decodeFrom(fc.r) - if err != nil { - return Record{}, err - } - - return r, nil -} diff --git a/db/recordio/io_test.go b/db/recordio/io_test.go deleted file mode 100644 index 9ef9ae9..0000000 --- a/db/recordio/io_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package recordio - -import ( - "io/ioutil" - "os" - "reflect" - "testing" -) - -func TestFetch(t *testing.T) { - writeFile, err := ioutil.TempFile("", "testfetch") - if err != nil { - t.Error("can't create temp file") - } - defer os.Remove(writeFile.Name()) - defer writeFile.Close() - - readFile, err := os.Open(writeFile.Name()) - if err != nil { - t.Error("can't open temp file") - } - defer readFile.Close() - - ap := NewAppender(writeFile) - fc := NewFetcher(readFile) - - tests := []struct { - offset int64 - record Record - }{ - {-1, Record{[]byte("someData")}}, - {-1, Record{[]byte("someOtherData")}}, - } - - for i, tt := range tests { - offset, err := ap.Append(tt.record) - if err != nil { - t.Errorf("#%d: Append failed: %s", i, err.Error()) - } - tests[i].offset = offset - } - - for i, tt := range tests { - recRead, err := fc.Fetch(tt.offset) - if err != nil { - t.Errorf("#%d: Fetch failed: %s", i, err.Error()) - } - - if !reflect.DeepEqual(recRead, tt.record) { - t.Errorf("#%d: records not the same, want: %v, get %v", - i, tt.offset, recRead) - } - - } -} diff --git a/db/recordio/record.go b/db/recordio/record.go deleted file mode 100644 index 33ca3fb..0000000 --- a/db/recordio/record.go +++ /dev/null @@ -1,46 +0,0 @@ -package recordio - -import ( - "encoding/binary" - "io" -) - -const ( - sizeOfLength = 4 -) - -type Record struct { - Data []byte -} - -func (r *Record) encodeTo(wr io.Writer) error { - // Write length - lBuf := make([]byte, sizeOfLength) - binary.LittleEndian.PutUint32(lBuf, uint32(len(r.Data))) - if _, err := wr.Write(lBuf); err != nil { - return err - } - // Write data - if _, err := wr.Write(r.Data); err != nil { - return err - } - - return nil -} - -func (r *Record) decodeFrom(rd io.Reader) error { - var length uint32 - // Read length - err := binary.Read(rd, binary.LittleEndian, &length) - if err != nil { - return err - } - // Read data - r.Data = make([]byte, length) - _, err = io.ReadFull(rd, r.Data) - if err != nil { - return err - } - - return nil -} diff --git a/db/recordio/record_test.go b/db/recordio/record_test.go deleted file mode 100644 index c8073d8..0000000 --- a/db/recordio/record_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package recordio - -import ( - "bytes" - "reflect" - "testing" -) - -func TestEncodeDecode(t *testing.T) { - tests := []struct { - data []byte - }{ - {[]byte("someData")}, - {[]byte("someOtherData")}, - } - - for i, tt := range tests { - buf := new(bytes.Buffer) - recordToWrite := &Record{tt.data} - - if err := recordToWrite.encodeTo(buf); err != nil { - t.Fatalf("#%d: cannot encode, err: %s", i, err) - } - recordToRead := new(Record) - if err := recordToRead.decodeFrom(buf); err != nil { - t.Fatalf("#%d: cannot decode, err: %s", i, err) - } - if !reflect.DeepEqual(recordToRead, recordToWrite) { - t.Fatalf("#%d: records are not the same, want: %v, get: %v", - i, recordToWrite, recordToRead) - } - } -} From f991b1bd9a755e92e93ec9875c9e42cf0f9d66ae Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 4 Nov 2014 12:51:17 -0800 Subject: [PATCH 2/5] remove Rev field in record --- db/backend.go | 1 - db/log/encoder_test.go | 1 - db/log/log_test.go | 2 -- db/message/record.go | 1 - db/message/record_test.go | 1 - 5 files changed, 6 deletions(-) diff --git a/db/backend.go b/db/backend.go index a74b37d..be49e06 100644 --- a/db/backend.go +++ b/db/backend.go @@ -78,7 +78,6 @@ func (b *backend) Put(rev int, path Path, data []byte) { b.rev++ offset, err := b.log.Append(&message.Record{ - Rev: b.rev, Key: path.p, Data: data, }) diff --git a/db/log/encoder_test.go b/db/log/encoder_test.go index ad3c3b5..09cd6c1 100644 --- a/db/log/encoder_test.go +++ b/db/log/encoder_test.go @@ -13,7 +13,6 @@ func TestEncoderDecoder(t *testing.T) { rec *message.Record }{ {&message.Record{ - Rev: 1, Key: "/test", Data: []byte("some data"), }}, diff --git a/db/log/log_test.go b/db/log/log_test.go index a033993..86fc784 100644 --- a/db/log/log_test.go +++ b/db/log/log_test.go @@ -21,12 +21,10 @@ func TestAppendAndGetRecord(t *testing.T) { rec *message.Record }{ {-1, &message.Record{ - Rev: 1, Key: "/test", Data: []byte("some data"), }}, {-1, &message.Record{ - Rev: 2, Key: "/test2", Data: []byte("some other data"), }}, diff --git a/db/message/record.go b/db/message/record.go index 37ec943..2b53136 100644 --- a/db/message/record.go +++ b/db/message/record.go @@ -3,7 +3,6 @@ package message import "encoding/json" type Record struct { - Rev int Key string Data []byte } diff --git a/db/message/record_test.go b/db/message/record_test.go index 6fe957e..62d95a3 100644 --- a/db/message/record_test.go +++ b/db/message/record_test.go @@ -10,7 +10,6 @@ func TestLogRecord(t *testing.T) { rec *Record }{ {&Record{ - Rev: 1, Key: "/test", Data: []byte("some data"), }}, From 0d5f4f0bd61c8bc0ac57f401142b3f9c37092ec5 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Tue, 4 Nov 2014 16:09:23 -0800 Subject: [PATCH 3/5] nit: log error check --- db/log/log.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/db/log/log.go b/db/log/log.go index 84a6321..63408cb 100644 --- a/db/log/log.go +++ b/db/log/log.go @@ -16,7 +16,6 @@ func Create() (*Log, error) { if err != nil { return nil, err } - return &Log{f}, nil } @@ -24,28 +23,24 @@ func (l *Log) Destroy() error { return os.Remove(l.f.Name()) } -func (l *Log) GetRecord(offset int64) (*message.Record, error) { - _, err := l.f.Seek(offset, 0) - if err != nil { - return nil, err +func (l *Log) GetRecord(offset int64) (r *message.Record, err error) { + if _, err = l.f.Seek(offset, 0); err != nil { + return } - decoder := newDecoder(l.f) - r := &message.Record{} - decoder.decode(r) - - return r, nil + r = &message.Record{} + err = decoder.decode(r) + return } func (l *Log) Append(r *message.Record) (offset int64, err error) { - offset, err = l.f.Seek(0, 2) - if err != nil { - return -1, err + if offset, err = l.f.Seek(0, 2); err != nil { + return } - encoder := newEncoder(l.f) - - err = encoder.encode(r) - encoder.flush() + if err = encoder.encode(r); err != nil { + return + } + err = encoder.flush() return offset, err } From cbf9f1cfe22e45c0c26eeb9da6d297a422528833 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Wed, 5 Nov 2014 17:43:15 -0800 Subject: [PATCH 4/5] close file in Destroy() --- db/backend.go | 8 ++++---- db/log/log.go | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db/backend.go b/db/backend.go index be49e06..4c3d038 100644 --- a/db/backend.go +++ b/db/backend.go @@ -89,8 +89,8 @@ func (b *backend) Put(rev int, path Path, data []byte) { } // one-level listing -func (b *backend) Ls(pathname string) []Path { - result := make([]Path, 0) +func (b *backend) Ls(pathname string) (paths []Path) { + paths = make([]Path, 0) pivot := newPathForLs(pathname) b.bt.AscendGreaterOrEqual(pivot, func(treeItem btree.Item) bool { @@ -99,8 +99,8 @@ func (b *backend) Ls(pathname string) []Path { p.level != pivot.level { return false } - result = append(result, *p) + paths = append(paths, *p) return true }) - return result + return } diff --git a/db/log/log.go b/db/log/log.go index 63408cb..11f5431 100644 --- a/db/log/log.go +++ b/db/log/log.go @@ -20,6 +20,9 @@ func Create() (*Log, error) { } func (l *Log) Destroy() error { + if err := l.f.Close(); err != nil { + return err + } return os.Remove(l.f.Name()) } From 28d0ede18f7c4f4add26b31caf85ad3ec4cae2ba Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Thu, 6 Nov 2014 13:49:49 -0800 Subject: [PATCH 5/5] add log init and restore --- db/backend.go | 82 +++++++++++++++++++++++++++------ db/backend_test.go | 58 ++++++++++++++++++++--- db/config.go | 5 ++ db/log/encoder_test.go | 2 +- db/log/log.go | 102 ++++++++++++++++++++++++++++++++--------- db/log/log_test.go | 28 +++++++---- 6 files changed, 225 insertions(+), 52 deletions(-) create mode 100644 db/config.go diff --git a/db/backend.go b/db/backend.go index 4c3d038..b338bde 100644 --- a/db/backend.go +++ b/db/backend.go @@ -1,35 +1,61 @@ package db import ( + "fmt" + "io/ioutil" + "os" "strings" - "github.com/go-distributed/xtree/db/log" + dblog "github.com/go-distributed/xtree/db/log" "github.com/go-distributed/xtree/db/message" "github.com/go-distributed/xtree/third-party/github.com/google/btree" ) type backend struct { - bt *btree.BTree - cache *cache - rev int - log *log.Log + bt *btree.BTree + cache *cache + rev int + dblog *dblog.DBLog + config *DBConfig } func newBackend() *backend { - bt := btree.New(10) - log, err := log.Create() + dataDir, err := ioutil.TempDir("", "backend") + if err != nil { + panic("not implemented") + } + + config := &DBConfig{ + DataDir: dataDir, + } + b, err := newBackendWithConfig(config) if err != nil { - panic("Not implemented") + panic("not implemented") + } + return b +} + +func newBackendWithConfig(config *DBConfig) (b *backend, err error) { + bt := btree.New(10) + b = &backend{ + bt: bt, + cache: newCache(), + config: config, } - return &backend{ - bt: bt, - cache: newCache(), - log: log, + haveLog := dblog.Exist(config.DataDir) + switch haveLog { + case false: + fmt.Println("didn't have log file. Init...") + err = b.init(config) + case true: + fmt.Println("had log file. Restore...") + err = b.restore(config) } + return } func (b *backend) getData(offset int64) []byte { - rec, err := b.log.GetRecord(offset) + rec, err := b.dblog.GetRecord(offset) if err != nil { panic("unimplemented") } @@ -77,7 +103,7 @@ func (b *backend) Put(rev int, path Path, data []byte) { } b.rev++ - offset, err := b.log.Append(&message.Record{ + offset, err := b.dblog.Append(&message.Record{ Key: path.p, Data: data, }) @@ -102,5 +128,33 @@ func (b *backend) Ls(pathname string) (paths []Path) { paths = append(paths, *p) return true }) + + return +} + +// init() creates a new log file +func (b *backend) init(config *DBConfig) (err error) { + b.dblog, err = dblog.Create(config.DataDir) return } + +// restore() restores database from the log file. +func (b *backend) restore(config *DBConfig) (err error) { + rev := 0 + return dblog.Reuse(config.DataDir, + func(l *dblog.DBLog) { + b.dblog = l + }, + func(r *message.Record) (err error) { + rev++ + p := newPath(r.Key) + b.Put(rev, *p, r.Data) + return + }) +} + +// clean up resource after testing +func (b *backend) testableCleanupResource() (err error) { + b.dblog.Close() + return os.RemoveAll(b.config.DataDir) +} diff --git a/db/backend_test.go b/db/backend_test.go index dfff190..07e3a18 100644 --- a/db/backend_test.go +++ b/db/backend_test.go @@ -17,6 +17,7 @@ func TestPut(t *testing.T) { } b := newBackend() + defer b.testableCleanupResource() for i, tt := range tests { b.Put(tt.rev, tt.path, tt.data) v := b.Get(tt.rev, tt.path) @@ -27,6 +28,7 @@ func TestPut(t *testing.T) { t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data) } } + } func TestPutOnExistingPath(t *testing.T) { @@ -40,6 +42,7 @@ func TestPutOnExistingPath(t *testing.T) { } b := newBackend() + defer b.testableCleanupResource() for i, tt := range tests { b.Put(2*i+1, tt.path, tt.data1) v := b.Get(2*i+1, tt.path) @@ -65,6 +68,8 @@ func TestPutOnExistingPath(t *testing.T) { func TestGetMVCC(t *testing.T) { b := newBackend() + defer b.testableCleanupResource() + b.Put(1, *newPath("/a"), []byte("1")) b.Put(2, *newPath("/b"), []byte("2")) b.Put(3, *newPath("/a"), []byte("3")) @@ -99,12 +104,15 @@ func TestGetMVCC(t *testing.T) { } func TestLs(t *testing.T) { - back := newBackend() d := []byte("somedata") - back.Put(1, *newPath("/a"), d) - back.Put(2, *newPath("/a/b"), d) - back.Put(3, *newPath("/a/c"), d) - back.Put(4, *newPath("/b"), d) + + b := newBackend() + defer b.testableCleanupResource() + + b.Put(1, *newPath("/a"), d) + b.Put(2, *newPath("/a/b"), d) + b.Put(3, *newPath("/a/c"), d) + b.Put(4, *newPath("/b"), d) tests := []struct { p string @@ -118,7 +126,7 @@ func TestLs(t *testing.T) { {"/c", []string{}}, } for i, tt := range tests { - ps := back.Ls(tt.p) + ps := b.Ls(tt.p) if len(ps) != len(tt.wps) { t.Fatalf("#%d: len(ps) = %d, want %d", i, len(ps), len(tt.wps)) } @@ -130,9 +138,44 @@ func TestLs(t *testing.T) { } } +func TestRestore(t *testing.T) { + tests := []struct { + rev int + path Path + data []byte + }{ + {1, *newPath("/foo/bar"), []byte("somedata")}, + {2, *newPath("/bar/foo"), []byte("datasome")}, + } + + b := newBackend() + for _, tt := range tests { + // append records to the log + b.Put(tt.rev, tt.path, tt.data) + } + b.dblog.Close() + + // simulate restoring log in another backend + b2, err := newBackendWithConfig(b.config) + defer b2.testableCleanupResource() + if err != nil { + t.Errorf("newBackendWithConfig failed: %v", err) + } + for i, tt := range tests { + v := b2.Get(tt.rev, tt.path) + if v.rev != tt.rev { + t.Errorf("#%d: rev = %d, want %d", i, v.rev, tt.rev) + } + if !reflect.DeepEqual(v.data, tt.data) { + t.Errorf("#%d: data = %s, want %s", i, v.data, tt.data) + } + } +} + func BenchmarkPut(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() d := []byte("somedata") path := make([]Path, b.N) for i := range path { @@ -148,6 +191,8 @@ func BenchmarkPut(b *testing.B) { func BenchmarkGetWithCache(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() + d := []byte("somedata") path := make([]Path, b.N) for i := range path { @@ -168,6 +213,7 @@ func BenchmarkGetWithCache(b *testing.B) { func BenchmarkGetWithOutCache(b *testing.B) { b.StopTimer() back := newBackend() + defer back.testableCleanupResource() back.cache = nil d := []byte("somedata") path := make([]Path, b.N) diff --git a/db/config.go b/db/config.go new file mode 100644 index 0000000..b76e42f --- /dev/null +++ b/db/config.go @@ -0,0 +1,5 @@ +package db + +type DBConfig struct { + DataDir string +} diff --git a/db/log/encoder_test.go b/db/log/encoder_test.go index 09cd6c1..a83cc57 100644 --- a/db/log/encoder_test.go +++ b/db/log/encoder_test.go @@ -30,7 +30,7 @@ func TestEncoderDecoder(t *testing.T) { t.Fatalf("#%d: cannot flush encode, err: %v", i, err) } - rec := &message.Record{} + rec := new(message.Record) dBuf := bytes.NewBuffer(eBuf.Bytes()) decoder := newDecoder(dBuf) diff --git a/db/log/log.go b/db/log/log.go index 11f5431..5046f72 100644 --- a/db/log/log.go +++ b/db/log/log.go @@ -1,49 +1,109 @@ package log import ( - "io/ioutil" + "fmt" + "io" "os" + "path" "github.com/go-distributed/xtree/db/message" ) -type Log struct { - f *os.File +const ( + logFilename = "records.log" +) + +type DBLog struct { + writeFile, readFile *os.File + encoder *encoder +} + +func Create(dataDir string) (l *DBLog, err error) { + l, err = newDBLog(path.Join(dataDir, logFilename), true) + return } -func Create() (*Log, error) { - f, err := ioutil.TempFile("", "backend") - if err != nil { - return nil, err +func newDBLog(logPath string, needCreate bool) (l *DBLog, err error) { + var writeFile, readFile *os.File + flag := os.O_WRONLY | os.O_APPEND | os.O_SYNC + if needCreate { + flag |= os.O_CREATE + } + if writeFile, err = os.OpenFile(logPath, flag, 0600); err != nil { + return + } + if !needCreate { + writeFile.Seek(0, os.SEEK_END) + } + if readFile, err = os.Open(logPath); err != nil { + writeFile.Close() + return } - return &Log{f}, nil + l = &DBLog{ + writeFile: writeFile, + readFile: readFile, + encoder: newEncoder(writeFile), + } + return } -func (l *Log) Destroy() error { - if err := l.f.Close(); err != nil { - return err +func Reuse(dataDir string, + setLog func(*DBLog), + replayRecord func(*message.Record) error) (err error) { + var l *DBLog + if l, err = newDBLog(path.Join(dataDir, logFilename), + false); err != nil { + return } - return os.Remove(l.f.Name()) + setLog(l) + decoder := newDecoder(l.readFile) + // TODO: parallel? + for { + r := new(message.Record) + if err = decoder.decode(r); err != nil { + if err == io.EOF { + return nil + } + return + } + ret, _ := l.readFile.Seek(0, os.SEEK_CUR) + fmt.Printf("%v %#v\n", ret, r) + if err = replayRecord(r); err != nil { + return + } + } +} + +func Exist(dataDir string) bool { + p := path.Join(dataDir, logFilename) + _, err := os.Stat(p) + return err == nil } -func (l *Log) GetRecord(offset int64) (r *message.Record, err error) { - if _, err = l.f.Seek(offset, 0); err != nil { +func (l *DBLog) GetRecord(offset int64) (r *message.Record, err error) { + if _, err = l.readFile.Seek(offset, 0); err != nil { return } - decoder := newDecoder(l.f) - r = &message.Record{} + decoder := newDecoder(l.readFile) + r = new(message.Record) err = decoder.decode(r) return } -func (l *Log) Append(r *message.Record) (offset int64, err error) { - if offset, err = l.f.Seek(0, 2); err != nil { +func (l *DBLog) Append(r *message.Record) (offset int64, err error) { + if offset, err = l.writeFile.Seek(0, os.SEEK_CUR); err != nil { return } - encoder := newEncoder(l.f) - if err = encoder.encode(r); err != nil { + if err = l.encoder.encode(r); err != nil { return } - err = encoder.flush() + err = l.encoder.flush() return offset, err } + +func (l *DBLog) Close() (err error) { + if err = l.readFile.Close(); err != nil { + return + } + return l.writeFile.Close() +} diff --git a/db/log/log_test.go b/db/log/log_test.go index 86fc784..7ab2918 100644 --- a/db/log/log_test.go +++ b/db/log/log_test.go @@ -1,6 +1,8 @@ package log import ( + "io/ioutil" + "os" "reflect" "testing" @@ -9,16 +11,22 @@ import ( func TestAppendAndGetRecord(t *testing.T) { var err error - var log *Log - if log, err = Create(); err != nil { - t.Errorf("Create failed: %v", err) + var l *DBLog + var dataDir string + + if dataDir, err = ioutil.TempDir("", "logtest"); err != nil { + t.Errorf("ioutil.TempDir failed: %v", err) } - defer log.Destroy() + defer os.RemoveAll(dataDir) + + if l, err = Create(dataDir); err != nil { + t.Errorf("Create failed: %v", err) + } tests := []struct { offset int64 - rec *message.Record + record *message.Record }{ {-1, &message.Record{ Key: "/test", @@ -31,21 +39,21 @@ func TestAppendAndGetRecord(t *testing.T) { } for i, tt := range tests { - tests[i].offset, err = log.Append(tt.rec) + tests[i].offset, err = l.Append(tt.record) if err != nil { t.Errorf("#%d: Append failed: %v", i, err) } } for i, tt := range tests { - var rec *message.Record - if rec, err = log.GetRecord(tt.offset); err != nil { + var r *message.Record + if r, err = l.GetRecord(tt.offset); err != nil { t.Errorf("#%d: GetRecord failed: %v", i, err) } - if !reflect.DeepEqual(tt.rec, rec) { + if !reflect.DeepEqual(tt.record, r) { t.Errorf("#%d: records not the same, want: %v, get %v", - i, tt.rec, rec) + i, tt.record, r) } }