diff --git a/backup_test.go b/backup_test.go index 2be712e38..b151e39b4 100644 --- a/backup_test.go +++ b/backup_test.go @@ -435,7 +435,7 @@ func TestBackupLoadIncremental(t *testing.T) { if err := txn.SetEntry(entry); err != nil { return err } - updates[i] = bitDiscardEarlierVersions + updates[i] = BitDiscardEarlierVersions } return nil }) diff --git a/db.go b/db.go index 658963759..84d1403f7 100644 --- a/db.go +++ b/db.go @@ -102,7 +102,7 @@ type DB struct { lc *levelsController vlog valueLog writeCh chan *request - flushChan chan *memTable // For flushing memtables. + flushChan chan flushTask // For flushing memtables. closeOnce sync.Once // For closing DB only once. blockWrites atomic.Int32 @@ -233,7 +233,7 @@ func Open(opt Options) (*DB, error) { db := &DB{ imm: make([]*memTable, 0, opt.NumMemtables), - flushChan: make(chan *memTable, opt.NumMemtables), + flushChan: make(chan flushTask, opt.NumMemtables), writeCh: make(chan *request, kvWriteChCapacity), opt: opt, manifest: manifestFile, @@ -347,11 +347,11 @@ func Open(opt Options) (*DB, error) { db.closers.memtable = z.NewCloser(1) go func() { - db.flushMemtable(db.closers.memtable) // Need levels controller to be up. + _ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up. }() // Flush them to disk asap. for _, mt := range db.imm { - db.flushChan <- mt + db.flushChan <- flushTask{mt: mt} } } // We do increment nextTxnTs below. So, no need to do it here. @@ -565,12 +565,12 @@ func (db *DB) close() (err error) { } else { db.opt.Debugf("Flushing memtable") for { - pushedMemTable := func() bool { + pushedFlushTask := func() bool { db.lock.Lock() defer db.lock.Unlock() y.AssertTrue(db.mt != nil) select { - case db.flushChan <- db.mt: + case db.flushChan <- flushTask{mt: db.mt}: db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm. db.mt = nil // Will segfault if we try writing! db.opt.Debugf("pushed to flush chan\n") @@ -583,7 +583,7 @@ func (db *DB) close() (err error) { } return false }() - if pushedMemTable { + if pushedFlushTask { break } time.Sleep(10 * time.Millisecond) @@ -783,16 +783,9 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { - // We should check the length of b.Prts and b.Entries only when badger is not - // running in InMemory mode. In InMemory mode, we don't write anything to the - // value log and that's why the length of b.Ptrs will always be zero. - if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) { - return errors.Errorf("Ptrs and Entries don't match: %+v", b) - } - for i, entry := range b.Entries { var err error - if entry.skipVlogAndSetThreshold(db.valueThreshold()) { + if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) { // Will include deletion / tombstone case. err = db.mt.Put(entry.Key, y.ValueStruct{ @@ -838,10 +831,13 @@ func (db *DB) writeRequests(reqs []*request) error { } } db.opt.Debugf("writeRequests called. Writing to value log") - err := db.vlog.write(reqs) - if err != nil { - done(err) - return err + if !db.opt.managedTxns { + // Don't do value log writes in managed mode. + err := db.vlog.write(reqs) + if err != nil { + done(err) + return err + } } db.opt.Debugf("Writing to memtable") @@ -1019,7 +1015,7 @@ func (db *DB) ensureRoomForWrite() error { } select { - case db.flushChan <- db.mt: + case db.flushChan <- flushTask{mt: db.mt}: db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n", db.mt.sl.MemSize(), len(db.flushChan)) // We manage to push this task. Let's modify imm. @@ -1036,17 +1032,62 @@ func (db *DB) ensureRoomForWrite() error { } } +func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { + if !db.opt.managedTxns { + panic("Handover Skiplist is only available in managed mode.") + } + db.lock.Lock() + defer db.lock.Unlock() + + // If we have some data in db.mt, we should push that first, so the ordering of writes is + // maintained. + if !db.mt.sl.Empty() { + sz := db.mt.sl.MemSize() + db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz) + var err error + select { + case db.flushChan <- flushTask{mt: db.mt}: + db.imm = append(db.imm, db.mt) + db.mt, err = db.newMemTable() + if err != nil { + return y.Wrapf(err, "cannot push current memtable") + } + default: + return errNoRoom + } + } + + mt := &memTable{sl: skl} + select { + case db.flushChan <- flushTask{mt: mt, cb: callback}: + db.imm = append(db.imm, mt) + return nil + default: + return errNoRoom + } +} + func arenaSize(opt Options) int64 { return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize) } +func (db *DB) NewSkiplist() *skl.Skiplist { + return skl.NewSkiplist(arenaSize(db.opt)) +} + // buildL0Table builds a new table from the memtable. -func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder { +func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { + var iter y.Iterator + if ft.itr != nil { + iter = ft.itr + } else { + iter = ft.mt.sl.NewUniIterator(false) + } defer iter.Close() b := table.NewTableBuilder(bopts) for iter.Rewind(); iter.Valid(); iter.Next() { - if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -1056,15 +1097,21 @@ func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) * } b.Add(iter.Key(), iter.Value(), vp.Len) } - return b } -// handleMemTableFlush must be run serially. -func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error { +type flushTask struct { + mt *memTable + cb func() + itr y.Iterator + dropPrefixes [][]byte +} + +// handleFlushTask must be run serially. +func (db *DB) handleFlushTask(ft flushTask) error { + // ft.mt could be nil with ft.itr being the valid field. bopts := buildTableOptions(db) - itr := mt.sl.NewUniIterator(false) - builder := buildL0Table(itr, nil, bopts) + builder := buildL0Table(ft, bopts) defer builder.Close() // buildL0Table can return nil if the none of the items in the skiplist are @@ -1093,39 +1140,89 @@ func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error { return err } -// flushMemtable must keep running until we send it an empty memtable. If there -// are errors during handling the memtable flush, we'll retry indefinitely. -func (db *DB) flushMemtable(lc *z.Closer) { +// flushMemtable must keep running until we send it an empty flushTask. If there +// are errors during handling the flush task, we'll retry indefinitely. +func (db *DB) flushMemtable(lc *z.Closer) error { defer lc.Done() - for mt := range db.flushChan { - if mt == nil { + var sz int64 + var itrs []y.Iterator + var mts []*memTable + var cbs []func() + slurp := func() { + for { + select { + case more := <-db.flushChan: + if more.mt == nil { + return + } + sl := more.mt.sl + itrs = append(itrs, sl.NewUniIterator(false)) + mts = append(mts, more.mt) + cbs = append(cbs, more.cb) + + sz += sl.MemSize() + if sz > db.opt.MemTableSize { + return + } + default: + return + } + } + } + + for ft := range db.flushChan { + if ft.mt == nil { + // We close db.flushChan now, instead of sending a nil ft.mt. continue } + sz = ft.mt.sl.MemSize() + // Reset of itrs, mts etc. is being done below. + y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0) + itrs = append(itrs, ft.mt.sl.NewUniIterator(false)) + mts = append(mts, ft.mt) + cbs = append(cbs, ft.cb) + + // Pick more memtables, so we can really fill up the L0 table. + slurp() + + // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz) + ft.mt = nil + ft.itr = table.NewMergeIterator(itrs, false) + ft.cb = nil for { - if err := db.handleMemTableFlush(mt, nil); err != nil { - // Encountered error. Retry indefinitely. - db.opt.Errorf("error flushing memtable to disk: %v, retrying", err) - time.Sleep(time.Second) - continue - } + err := db.handleFlushTask(ft) + if err == nil { + // Update s.imm. Need a lock. + db.lock.Lock() + // This is a single-threaded operation. ft.mt corresponds to the head of + // db.imm list. Once we flush it, we advance db.imm. The next ft.mt + // which would arrive here would match db.imm[0], because we acquire a + // lock over DB when pushing to flushChan. + // TODO: This logic is dirty AF. Any change and this could easily break. + for _, mt := range mts { + y.AssertTrue(mt == db.imm[0]) + db.imm = db.imm[1:] + mt.DecrRef() // Return memory. + } + db.lock.Unlock() - // Update s.imm. Need a lock. - db.lock.Lock() - // This is a single-threaded operation. mt corresponds to the head of - // db.imm list. Once we flush it, we advance db.imm. The next mt - // which would arrive here would match db.imm[0], because we acquire a - // lock over DB when pushing to flushChan. - // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(mt == db.imm[0]) - db.imm = db.imm[1:] - mt.DecrRef() // Return memory. - // unlock - db.lock.Unlock() - break + for _, cb := range cbs { + if cb != nil { + cb() + } + } + break + } + // Encountered error. Retry indefinitely. + db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) + time.Sleep(time.Second) } + // Reset everything. + itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0 } + return nil } func exists(path string) (bool, error) { @@ -1545,10 +1642,10 @@ func (db *DB) startCompactions() { func (db *DB) startMemoryFlush() { // Start memory fluhser. if db.closers.memtable != nil { - db.flushChan = make(chan *memTable, db.opt.NumMemtables) + db.flushChan = make(chan flushTask, db.opt.NumMemtables) db.closers.memtable = z.NewCloser(1) go func() { - db.flushMemtable(db.closers.memtable) + _ = db.flushMemtable(db.closers.memtable) }() } } @@ -1651,7 +1748,7 @@ func (db *DB) prepareToDrop() (func(), error) { panic("Attempting to drop data in read-only mode.") } // In order prepare for drop, we need to block the incoming writes and - // write it to db. Then, flush all the pending memtable. So that, we + // write it to db. Then, flush all the pending flushtask. So that, we // don't miss any entries. if err := db.blockWrite(); err != nil { return func() {}, err @@ -1700,7 +1797,7 @@ func (db *DB) dropAll() (func(), error) { if err != nil { return f, err } - // prepareToDrop will stop all the incoming write and flushes any pending memtables. + // prepareToDrop will stop all the incomming write and flushes any pending flush tasks. // Before we drop, we'll stop the compaction because anyways all the datas are going to // be deleted. db.stopCompactions() @@ -1782,8 +1879,13 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error { memtable.DecrRef() continue } + task := flushTask{ + mt: memtable, + // Ensure that the head of value log gets persisted to disk. + dropPrefixes: filtered, + } db.opt.Debugf("Flushing memtable") - if err := db.handleMemTableFlush(memtable, filtered); err != nil { + if err := db.handleFlushTask(task); err != nil { db.opt.Errorf("While trying to flush memtable: %v", err) return err } diff --git a/db_test.go b/db_test.go index 21293467d..0e357b82e 100644 --- a/db_test.go +++ b/db_test.go @@ -1529,7 +1529,7 @@ func TestGetSetDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() val := make([]byte, 1<<19) key := []byte("key1") @@ -1571,7 +1571,7 @@ func TestWriteDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() print := func(count *int) { *count++ if *count%100 == 0 { @@ -1951,7 +1951,7 @@ func ExampleOpen() { if err != nil { panic(err) } - defer func() { y.Check(db.Close()) }() + defer db.Close() err = db.View(func(txn *Txn) error { _, err := txn.Get([]byte("key")) @@ -2007,7 +2007,7 @@ func ExampleTxn_NewIterator() { if err != nil { panic(err) } - defer func() { y.Check(db.Close()) }() + defer db.Close() bkey := func(i int) []byte { return []byte(fmt.Sprintf("%09d", i)) @@ -2027,7 +2027,8 @@ func ExampleTxn_NewIterator() { } } - if err := txn.Commit(); err != nil { + err = txn.Commit() + if err != nil { panic(err) } @@ -2059,7 +2060,7 @@ func TestSyncForRace(t *testing.T) { db, err := Open(DefaultOptions(dir).WithSyncWrites(false)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() closeChan := make(chan struct{}) doneChan := make(chan struct{}) @@ -2163,14 +2164,14 @@ func TestSyncForReadingTheEntriesThatWereSynced(t *testing.T) { func TestForceFlushMemtable(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") ops := getTestOptions(dir) ops.ValueLogMaxEntries = 1 db, err := Open(ops) require.NoError(t, err, "error while openning db") - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() for i := 0; i < 3; i++ { err = db.Update(func(txn *Txn) error { @@ -2304,7 +2305,7 @@ func TestMinCacheSize(t *testing.T) { func TestUpdateMaxCost(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") defer os.RemoveAll(dir) ops := getTestOptions(dir). @@ -2411,7 +2412,7 @@ func TestOpenDBReadOnly(t *testing.T) { func TestBannedPrefixes(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") defer os.RemoveAll(dir) opt := getTestOptions(dir).WithNamespaceOffset(3) diff --git a/iterator.go b/iterator.go index 0ff786db0..dfb72fc6d 100644 --- a/iterator.go +++ b/iterator.go @@ -133,7 +133,7 @@ func (item *Item) IsDeletedOrExpired() bool { // DiscardEarlierVersions returns whether the item was created with the // option to discard earlier versions of a key when multiple are available. func (item *Item) DiscardEarlierVersions() bool { - return item.meta&bitDiscardEarlierVersions > 0 + return item.meta&BitDiscardEarlierVersions > 0 } func (item *Item) yieldItemValue() ([]byte, func(), error) { diff --git a/levels.go b/levels.go index 48a954316..fd442417b 100644 --- a/levels.go +++ b/levels.go @@ -724,7 +724,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, } lastKey = y.SafeCopy(lastKey, it.Key()) numVersions = 0 - firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0 + firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0 if len(tableKr.left) == 0 { tableKr.left = y.SafeCopy(tableKr.left, it.Key()) @@ -761,7 +761,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, // - The `discardEarlierVersions` bit is set OR // - We've already processed `NumVersionsToKeep` number of versions // (including the current item being processed) - lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 || + lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 || numVersions == s.kv.opt.NumVersionsToKeep if isExpired || lastValidVersion { diff --git a/levels_test.go b/levels_test.go index 29e1f833c..c93ea4dff 100644 --- a/levels_test.go +++ b/levels_test.go @@ -696,11 +696,11 @@ func TestDiscardFirstVersion(t *testing.T) { runBadgerTest(t, &opt, func(t *testing.T, db *DB) { l0 := []keyValVersion{{"foo", "bar", 1, 0}} - l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}} + l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}} l02 := []keyValVersion{{"foo", "bar", 3, 0}} l03 := []keyValVersion{{"foo", "bar", 4, 0}} l04 := []keyValVersion{{"foo", "bar", 9, 0}} - l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}} + l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}} // Level 0 has all the tables. createAndOpen(db, l0, 0) @@ -731,11 +731,11 @@ func TestDiscardFirstVersion(t *testing.T) { // - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions" // marker so IT WILL BE REMOVED. ExpectedKeys := []keyValVersion{ - {"foo", "bar", 10, bitDiscardEarlierVersions}, + {"foo", "bar", 10, BitDiscardEarlierVersions}, {"foo", "bar", 9, 0}, {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, bitDiscardEarlierVersions}} + {"foo", "bar", 2, BitDiscardEarlierVersions}} getAllAndCheck(t, db, ExpectedKeys) }) @@ -1049,15 +1049,15 @@ func TestSameLevel(t *testing.T) { opt.LmaxCompaction = true runBadgerTest(t, &opt, func(t *testing.T, db *DB) { l6 := []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, } l61 := []keyValVersion{ - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, } l62 := []keyValVersion{ - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, } createAndOpen(db, l6, 6) @@ -1066,11 +1066,11 @@ func TestSameLevel(t *testing.T) { require.NoError(t, db.lc.validate()) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, }) @@ -1086,11 +1086,11 @@ func TestSameLevel(t *testing.T) { db.SetDiscardTs(3) require.NoError(t, db.lc.runCompactDef(-1, 6, cdef)) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, }) @@ -1107,9 +1107,9 @@ func TestSameLevel(t *testing.T) { cdef.t.baseLevel = 1 require.NoError(t, db.lc.runCompactDef(-1, 6, cdef)) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}}) + {"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}}) require.NoError(t, db.lc.validate()) }) } @@ -1255,7 +1255,7 @@ func TestStaleDataCleanup(t *testing.T) { for i := count; i > 0; i-- { var meta byte if i == 0 { - meta = bitDiscardEarlierVersions + meta = BitDiscardEarlierVersions } b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0) } diff --git a/managed_db_test.go b/managed_db_test.go index 58acbc8dc..2606d83e9 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -777,6 +777,54 @@ func TestWriteBatchDuplicate(t *testing.T) { }) } +func TestWriteViaSkip(t *testing.T) { + key := func(i int) []byte { + return []byte(fmt.Sprintf("%10d", i)) + } + val := func(i int) []byte { + return []byte(fmt.Sprintf("%128d", i)) + } + opt := DefaultOptions("") + opt.managedTxns = true + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + s := db.NewSkiplist() + for i := 0; i < 100; i++ { + s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)}) + } + { + // Update key timestamps by directly changing them in the skiplist. + itr := s.NewUniIterator(false) + defer itr.Close() + itr.Rewind() + for itr.Valid() { + y.SetKeyTs(itr.Key(), 101) + itr.Next() + } + } + + // Hand over skiplist to Badger. + require.NoError(t, db.HandoverSkiplist(s, nil)) + + // Read the data back. + txn := db.NewTransactionAt(101, false) + defer txn.Discard() + itr := txn.NewIterator(DefaultIteratorOptions) + defer itr.Close() + + i := 0 + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + require.Equal(t, string(key(i)), string(item.Key())) + require.Equal(t, item.Version(), uint64(101)) + valcopy, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, val(i), valcopy) + i++ + } + require.Equal(t, 100, i) + }) +} + func TestZeroDiscardStats(t *testing.T) { N := uint64(10000) populate := func(t *testing.T, db *DB) { diff --git a/merge.go b/merge.go index afc14df9b..07dd37485 100644 --- a/merge.go +++ b/merge.go @@ -105,7 +105,7 @@ func (op *MergeOperator) compact() error { { Key: y.KeyWithTs(op.key, version), Value: val, - meta: bitDiscardEarlierVersions, + meta: BitDiscardEarlierVersions, }, } // Write value back to the DB. It is important that we do not set the bitMergeEntry bit diff --git a/options.go b/options.go index 218b94772..d7b301b2a 100644 --- a/options.go +++ b/options.go @@ -139,7 +139,7 @@ func DefaultOptions(path string) Options { NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, NumLevelZeroTablesStall: 15, - NumMemtables: 5, + NumMemtables: 15, BloomFalsePositive: 0.01, BlockSize: 4 * 1024, SyncWrites: false, diff --git a/structs.go b/structs.go index 75aec4bc5..ba40b3196 100644 --- a/structs.go +++ b/structs.go @@ -196,7 +196,7 @@ func (e *Entry) WithMeta(meta byte) *Entry { // have a higher setting for NumVersionsToKeep (in Dgraph, we set it to infinity), you can use this // method to indicate that all the older versions can be discarded and removed during compactions. func (e *Entry) WithDiscard() *Entry { - e.meta = bitDiscardEarlierVersions + e.meta = BitDiscardEarlierVersions return e } diff --git a/txn.go b/txn.go index 50d17a5bc..9f79c0bbf 100644 --- a/txn.go +++ b/txn.go @@ -348,9 +348,30 @@ func exceedsSize(prefix string, max int64, key []byte) error { prefix, len(key), max, prefix, hex.Dump(key[:1<<10])) } -func (txn *Txn) modify(e *Entry) error { - const maxKeySize = 65000 +const maxKeySize = 65000 +const maxValSize = 1 << 20 +func ValidEntry(db *DB, key, val []byte) error { + switch { + case len(key) == 0: + return ErrEmptyKey + case bytes.HasPrefix(key, badgerPrefix): + return ErrInvalidKey + case len(key) > maxKeySize: + // Key length can't be more than uint16, as determined by table::header. To + // keep things safe and allow badger move prefix and a timestamp suffix, let's + // cut it down to 65000, instead of using 65536. + return exceedsSize("Key", maxKeySize, key) + case int64(len(val)) > maxValSize: + return exceedsSize("Value", maxValSize, val) + } + if err := db.isBanned(key); err != nil { + return err + } + return nil +} + +func (txn *Txn) modify(e *Entry) error { switch { case !txn.update: return ErrReadOnlyTxn @@ -374,7 +395,6 @@ func (txn *Txn) modify(e *Entry) error { if err := txn.db.isBanned(e.Key); err != nil { return err } - if err := txn.checkSize(e); err != nil { return err } diff --git a/value.go b/value.go index 80cc4c436..6028d1497 100644 --- a/value.go +++ b/value.go @@ -38,7 +38,7 @@ var maxVlogFileSize uint32 = math.MaxUint32 const ( bitDelete byte = 1 << 0 // Set if the key has been deleted. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key. - bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded. + BitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded. // Set if item shouldn't be discarded via compactions (used by merge operator) bitMergeEntry byte = 1 << 3 // The MSB 2 bits are for transactions. diff --git a/value_test.go b/value_test.go index 7999cc968..1ec5cc939 100644 --- a/value_test.go +++ b/value_test.go @@ -125,6 +125,8 @@ func TestValueBasic(t *testing.T) { } func TestValueGCManaged(t *testing.T) { + t.Skipf("Value Log is not used in managed mode.") + dir, err := os.MkdirTemp("", "badger-test") require.NoError(t, err) defer removeDir(dir) diff --git a/y/y.go b/y/y.go index 761e87038..9a225af5d 100644 --- a/y/y.go +++ b/y/y.go @@ -105,6 +105,11 @@ func Copy(a []byte) []byte { return b } +func SetKeyTs(key []byte, ts uint64) { + start := len(key) - 8 + binary.BigEndian.PutUint64(key[start:], math.MaxUint64-ts) +} + // KeyWithTs generates a new key by appending ts to key. func KeyWithTs(key []byte, ts uint64) []byte { out := make([]byte, len(key)+8)