From 4e7c74544d1c3b28265d098359b6c650bc273846 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 3 Jul 2025 07:48:09 +0530 Subject: [PATCH 1/4] Revert "chore(memtable): refactor code for memtable flush (#1866)" This reverts commit 94d6168e374708843aa191c32c36435ba0d13c99. --- db.go | 108 +++++++++++++++++++++++++++++------------------------ db_test.go | 21 ++++++----- 2 files changed, 71 insertions(+), 58 deletions(-) diff --git a/db.go b/db.go index 658963759..c9a6a20de 100644 --- a/db.go +++ b/db.go @@ -102,7 +102,7 @@ type DB struct { lc *levelsController vlog valueLog writeCh chan *request - flushChan chan *memTable // For flushing memtables. + flushChan chan flushTask // For flushing memtables. closeOnce sync.Once // For closing DB only once. blockWrites atomic.Int32 @@ -233,7 +233,7 @@ func Open(opt Options) (*DB, error) { db := &DB{ imm: make([]*memTable, 0, opt.NumMemtables), - flushChan: make(chan *memTable, opt.NumMemtables), + flushChan: make(chan flushTask, opt.NumMemtables), writeCh: make(chan *request, kvWriteChCapacity), opt: opt, manifest: manifestFile, @@ -347,11 +347,11 @@ func Open(opt Options) (*DB, error) { db.closers.memtable = z.NewCloser(1) go func() { - db.flushMemtable(db.closers.memtable) // Need levels controller to be up. + _ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up. }() // Flush them to disk asap. for _, mt := range db.imm { - db.flushChan <- mt + db.flushChan <- flushTask{mt: mt} } } // We do increment nextTxnTs below. So, no need to do it here. @@ -565,12 +565,12 @@ func (db *DB) close() (err error) { } else { db.opt.Debugf("Flushing memtable") for { - pushedMemTable := func() bool { + pushedFlushTask := func() bool { db.lock.Lock() defer db.lock.Unlock() y.AssertTrue(db.mt != nil) select { - case db.flushChan <- db.mt: + case db.flushChan <- flushTask{mt: db.mt}: db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm. db.mt = nil // Will segfault if we try writing! db.opt.Debugf("pushed to flush chan\n") @@ -583,7 +583,7 @@ func (db *DB) close() (err error) { } return false }() - if pushedMemTable { + if pushedFlushTask { break } time.Sleep(10 * time.Millisecond) @@ -852,7 +852,6 @@ func (db *DB) writeRequests(reqs []*request) error { } count += len(b.Entries) var i uint64 - var err error for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() { i++ if i%100 == 0 { @@ -1019,7 +1018,7 @@ func (db *DB) ensureRoomForWrite() error { } select { - case db.flushChan <- db.mt: + case db.flushChan <- flushTask{mt: db.mt}: db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n", db.mt.sl.MemSize(), len(db.flushChan)) // We manage to push this task. Let's modify imm. @@ -1041,12 +1040,12 @@ func arenaSize(opt Options) int64 { } // buildL0Table builds a new table from the memtable. -func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder { +func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { + iter := ft.mt.sl.NewIterator() defer iter.Close() - b := table.NewTableBuilder(bopts) - for iter.Rewind(); iter.Valid(); iter.Next() { - if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) { + for iter.SeekToFirst(); iter.Valid(); iter.Next() { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -1056,15 +1055,23 @@ func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) * } b.Add(iter.Key(), iter.Value(), vp.Len) } - return b } -// handleMemTableFlush must be run serially. -func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error { +type flushTask struct { + mt *memTable + dropPrefixes [][]byte +} + +// handleFlushTask must be run serially. +func (db *DB) handleFlushTask(ft flushTask) error { + // There can be a scenario, when empty memtable is flushed. + if ft.mt.sl.Empty() { + return nil + } + bopts := buildTableOptions(db) - itr := mt.sl.NewUniIterator(false) - builder := buildL0Table(itr, nil, bopts) + builder := buildL0Table(ft, bopts) defer builder.Close() // buildL0Table can return nil if the none of the items in the skiplist are @@ -1093,39 +1100,39 @@ func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error { return err } -// flushMemtable must keep running until we send it an empty memtable. If there -// are errors during handling the memtable flush, we'll retry indefinitely. -func (db *DB) flushMemtable(lc *z.Closer) { +// flushMemtable must keep running until we send it an empty flushTask. If there +// are errors during handling the flush task, we'll retry indefinitely. +func (db *DB) flushMemtable(lc *z.Closer) error { defer lc.Done() - for mt := range db.flushChan { - if mt == nil { + for ft := range db.flushChan { + if ft.mt == nil { + // We close db.flushChan now, instead of sending a nil ft.mt. continue } - for { - if err := db.handleMemTableFlush(mt, nil); err != nil { - // Encountered error. Retry indefinitely. - db.opt.Errorf("error flushing memtable to disk: %v, retrying", err) - time.Sleep(time.Second) - continue - } + err := db.handleFlushTask(ft) + if err == nil { + // Update s.imm. Need a lock. + db.lock.Lock() + // This is a single-threaded operation. ft.mt corresponds to the head of + // db.imm list. Once we flush it, we advance db.imm. The next ft.mt + // which would arrive here would match db.imm[0], because we acquire a + // lock over DB when pushing to flushChan. + // TODO: This logic is dirty AF. Any change and this could easily break. + y.AssertTrue(ft.mt == db.imm[0]) + db.imm = db.imm[1:] + ft.mt.DecrRef() // Return memory. + db.lock.Unlock() - // Update s.imm. Need a lock. - db.lock.Lock() - // This is a single-threaded operation. mt corresponds to the head of - // db.imm list. Once we flush it, we advance db.imm. The next mt - // which would arrive here would match db.imm[0], because we acquire a - // lock over DB when pushing to flushChan. - // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(mt == db.imm[0]) - db.imm = db.imm[1:] - mt.DecrRef() // Return memory. - // unlock - db.lock.Unlock() - break + break + } + // Encountered error. Retry indefinitely. + db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) + time.Sleep(time.Second) } } + return nil } func exists(path string) (bool, error) { @@ -1545,10 +1552,10 @@ func (db *DB) startCompactions() { func (db *DB) startMemoryFlush() { // Start memory fluhser. if db.closers.memtable != nil { - db.flushChan = make(chan *memTable, db.opt.NumMemtables) + db.flushChan = make(chan flushTask, db.opt.NumMemtables) db.closers.memtable = z.NewCloser(1) go func() { - db.flushMemtable(db.closers.memtable) + _ = db.flushMemtable(db.closers.memtable) }() } } @@ -1651,7 +1658,7 @@ func (db *DB) prepareToDrop() (func(), error) { panic("Attempting to drop data in read-only mode.") } // In order prepare for drop, we need to block the incoming writes and - // write it to db. Then, flush all the pending memtable. So that, we + // write it to db. Then, flush all the pending flushtask. So that, we // don't miss any entries. if err := db.blockWrite(); err != nil { return func() {}, err @@ -1700,7 +1707,7 @@ func (db *DB) dropAll() (func(), error) { if err != nil { return f, err } - // prepareToDrop will stop all the incoming write and flushes any pending memtables. + // prepareToDrop will stop all the incomming write and flushes any pending flush tasks. // Before we drop, we'll stop the compaction because anyways all the datas are going to // be deleted. db.stopCompactions() @@ -1782,8 +1789,13 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error { memtable.DecrRef() continue } + task := flushTask{ + mt: memtable, + // Ensure that the head of value log gets persisted to disk. + dropPrefixes: filtered, + } db.opt.Debugf("Flushing memtable") - if err := db.handleMemTableFlush(memtable, filtered); err != nil { + if err := db.handleFlushTask(task); err != nil { db.opt.Errorf("While trying to flush memtable: %v", err) return err } diff --git a/db_test.go b/db_test.go index 21293467d..0e357b82e 100644 --- a/db_test.go +++ b/db_test.go @@ -1529,7 +1529,7 @@ func TestGetSetDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1 << 20)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() val := make([]byte, 1<<19) key := []byte("key1") @@ -1571,7 +1571,7 @@ func TestWriteDeadlock(t *testing.T) { db, err := Open(DefaultOptions(dir).WithValueLogFileSize(10 << 20)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() print := func(count *int) { *count++ if *count%100 == 0 { @@ -1951,7 +1951,7 @@ func ExampleOpen() { if err != nil { panic(err) } - defer func() { y.Check(db.Close()) }() + defer db.Close() err = db.View(func(txn *Txn) error { _, err := txn.Get([]byte("key")) @@ -2007,7 +2007,7 @@ func ExampleTxn_NewIterator() { if err != nil { panic(err) } - defer func() { y.Check(db.Close()) }() + defer db.Close() bkey := func(i int) []byte { return []byte(fmt.Sprintf("%09d", i)) @@ -2027,7 +2027,8 @@ func ExampleTxn_NewIterator() { } } - if err := txn.Commit(); err != nil { + err = txn.Commit() + if err != nil { panic(err) } @@ -2059,7 +2060,7 @@ func TestSyncForRace(t *testing.T) { db, err := Open(DefaultOptions(dir).WithSyncWrites(false)) require.NoError(t, err) - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() closeChan := make(chan struct{}) doneChan := make(chan struct{}) @@ -2163,14 +2164,14 @@ func TestSyncForReadingTheEntriesThatWereSynced(t *testing.T) { func TestForceFlushMemtable(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") ops := getTestOptions(dir) ops.ValueLogMaxEntries = 1 db, err := Open(ops) require.NoError(t, err, "error while openning db") - defer func() { require.NoError(t, db.Close()) }() + defer db.Close() for i := 0; i < 3; i++ { err = db.Update(func(txn *Txn) error { @@ -2304,7 +2305,7 @@ func TestMinCacheSize(t *testing.T) { func TestUpdateMaxCost(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") defer os.RemoveAll(dir) ops := getTestOptions(dir). @@ -2411,7 +2412,7 @@ func TestOpenDBReadOnly(t *testing.T) { func TestBannedPrefixes(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") - require.NoError(t, err, "temp dir for badger could not be created") + require.NoError(t, err, "temp dir for badger count not be created") defer os.RemoveAll(dir) opt := getTestOptions(dir).WithNamespaceOffset(3) From 4fac80618b0dc6f47b499042177fa57dee415891 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 26 Apr 2021 21:37:45 -0700 Subject: [PATCH 2/4] feat(Skiplist): Introduce a way to hand over skiplists to Badger (#1696) In Dgraph, we already use Raft write-ahead log. Also, when we commit transactions, we update tens of thousands of keys in one go. To optimize this write path, this PR introduces a way to directly hand over Skiplist to Badger, short circuiting Badger's Value Log and WAL. This feature allows Dgraph to generate Skiplists while processing mutations and just hand them over to Badger during commits. It also accepts a callback which can be run when Skiplist is written to disk. This is useful for determining when to create a snapshot in Dgraph. --- backup_test.go | 2 +- db.go | 134 +++++++++++++++++++++++++++++++++++++-------- iterator.go | 2 +- levels.go | 4 +- levels_test.go | 34 ++++++------ managed_db_test.go | 48 ++++++++++++++++ merge.go | 2 +- options.go | 2 +- structs.go | 2 +- txn.go | 26 ++++++++- value.go | 2 +- value_test.go | 2 + y/y.go | 5 ++ 13 files changed, 215 insertions(+), 50 deletions(-) diff --git a/backup_test.go b/backup_test.go index 2be712e38..b151e39b4 100644 --- a/backup_test.go +++ b/backup_test.go @@ -435,7 +435,7 @@ func TestBackupLoadIncremental(t *testing.T) { if err := txn.SetEntry(entry); err != nil { return err } - updates[i] = bitDiscardEarlierVersions + updates[i] = BitDiscardEarlierVersions } return nil }) diff --git a/db.go b/db.go index c9a6a20de..84d1403f7 100644 --- a/db.go +++ b/db.go @@ -783,16 +783,9 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { - // We should check the length of b.Prts and b.Entries only when badger is not - // running in InMemory mode. In InMemory mode, we don't write anything to the - // value log and that's why the length of b.Ptrs will always be zero. - if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) { - return errors.Errorf("Ptrs and Entries don't match: %+v", b) - } - for i, entry := range b.Entries { var err error - if entry.skipVlogAndSetThreshold(db.valueThreshold()) { + if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) { // Will include deletion / tombstone case. err = db.mt.Put(entry.Key, y.ValueStruct{ @@ -838,10 +831,13 @@ func (db *DB) writeRequests(reqs []*request) error { } } db.opt.Debugf("writeRequests called. Writing to value log") - err := db.vlog.write(reqs) - if err != nil { - done(err) - return err + if !db.opt.managedTxns { + // Don't do value log writes in managed mode. + err := db.vlog.write(reqs) + if err != nil { + done(err) + return err + } } db.opt.Debugf("Writing to memtable") @@ -852,6 +848,7 @@ func (db *DB) writeRequests(reqs []*request) error { } count += len(b.Entries) var i uint64 + var err error for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() { i++ if i%100 == 0 { @@ -1035,16 +1032,61 @@ func (db *DB) ensureRoomForWrite() error { } } +func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { + if !db.opt.managedTxns { + panic("Handover Skiplist is only available in managed mode.") + } + db.lock.Lock() + defer db.lock.Unlock() + + // If we have some data in db.mt, we should push that first, so the ordering of writes is + // maintained. + if !db.mt.sl.Empty() { + sz := db.mt.sl.MemSize() + db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz) + var err error + select { + case db.flushChan <- flushTask{mt: db.mt}: + db.imm = append(db.imm, db.mt) + db.mt, err = db.newMemTable() + if err != nil { + return y.Wrapf(err, "cannot push current memtable") + } + default: + return errNoRoom + } + } + + mt := &memTable{sl: skl} + select { + case db.flushChan <- flushTask{mt: mt, cb: callback}: + db.imm = append(db.imm, mt) + return nil + default: + return errNoRoom + } +} + func arenaSize(opt Options) int64 { return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize) } +func (db *DB) NewSkiplist() *skl.Skiplist { + return skl.NewSkiplist(arenaSize(db.opt)) +} + // buildL0Table builds a new table from the memtable. func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { - iter := ft.mt.sl.NewIterator() + var iter y.Iterator + if ft.itr != nil { + iter = ft.itr + } else { + iter = ft.mt.sl.NewUniIterator(false) + } defer iter.Close() + b := table.NewTableBuilder(bopts) - for iter.SeekToFirst(); iter.Valid(); iter.Next() { + for iter.Rewind(); iter.Valid(); iter.Next() { if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } @@ -1060,16 +1102,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder { type flushTask struct { mt *memTable + cb func() + itr y.Iterator dropPrefixes [][]byte } // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. - if ft.mt.sl.Empty() { - return nil - } - + // ft.mt could be nil with ft.itr being the valid field. bopts := buildTableOptions(db) builder := buildL0Table(ft, bopts) defer builder.Close() @@ -1105,11 +1145,52 @@ func (db *DB) handleFlushTask(ft flushTask) error { func (db *DB) flushMemtable(lc *z.Closer) error { defer lc.Done() + var sz int64 + var itrs []y.Iterator + var mts []*memTable + var cbs []func() + slurp := func() { + for { + select { + case more := <-db.flushChan: + if more.mt == nil { + return + } + sl := more.mt.sl + itrs = append(itrs, sl.NewUniIterator(false)) + mts = append(mts, more.mt) + cbs = append(cbs, more.cb) + + sz += sl.MemSize() + if sz > db.opt.MemTableSize { + return + } + default: + return + } + } + } + for ft := range db.flushChan { if ft.mt == nil { // We close db.flushChan now, instead of sending a nil ft.mt. continue } + sz = ft.mt.sl.MemSize() + // Reset of itrs, mts etc. is being done below. + y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0) + itrs = append(itrs, ft.mt.sl.NewUniIterator(false)) + mts = append(mts, ft.mt) + cbs = append(cbs, ft.cb) + + // Pick more memtables, so we can really fill up the L0 table. + slurp() + + // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz) + ft.mt = nil + ft.itr = table.NewMergeIterator(itrs, false) + ft.cb = nil + for { err := db.handleFlushTask(ft) if err == nil { @@ -1120,17 +1201,26 @@ func (db *DB) flushMemtable(lc *z.Closer) error { // which would arrive here would match db.imm[0], because we acquire a // lock over DB when pushing to flushChan. // TODO: This logic is dirty AF. Any change and this could easily break. - y.AssertTrue(ft.mt == db.imm[0]) - db.imm = db.imm[1:] - ft.mt.DecrRef() // Return memory. + for _, mt := range mts { + y.AssertTrue(mt == db.imm[0]) + db.imm = db.imm[1:] + mt.DecrRef() // Return memory. + } db.lock.Unlock() + for _, cb := range cbs { + if cb != nil { + cb() + } + } break } // Encountered error. Retry indefinitely. db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err) time.Sleep(time.Second) } + // Reset everything. + itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0 } return nil } diff --git a/iterator.go b/iterator.go index 0ff786db0..dfb72fc6d 100644 --- a/iterator.go +++ b/iterator.go @@ -133,7 +133,7 @@ func (item *Item) IsDeletedOrExpired() bool { // DiscardEarlierVersions returns whether the item was created with the // option to discard earlier versions of a key when multiple are available. func (item *Item) DiscardEarlierVersions() bool { - return item.meta&bitDiscardEarlierVersions > 0 + return item.meta&BitDiscardEarlierVersions > 0 } func (item *Item) yieldItemValue() ([]byte, func(), error) { diff --git a/levels.go b/levels.go index 48a954316..fd442417b 100644 --- a/levels.go +++ b/levels.go @@ -724,7 +724,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, } lastKey = y.SafeCopy(lastKey, it.Key()) numVersions = 0 - firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0 + firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0 if len(tableKr.left) == 0 { tableKr.left = y.SafeCopy(tableKr.left, it.Key()) @@ -761,7 +761,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, // - The `discardEarlierVersions` bit is set OR // - We've already processed `NumVersionsToKeep` number of versions // (including the current item being processed) - lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 || + lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 || numVersions == s.kv.opt.NumVersionsToKeep if isExpired || lastValidVersion { diff --git a/levels_test.go b/levels_test.go index 29e1f833c..c93ea4dff 100644 --- a/levels_test.go +++ b/levels_test.go @@ -696,11 +696,11 @@ func TestDiscardFirstVersion(t *testing.T) { runBadgerTest(t, &opt, func(t *testing.T, db *DB) { l0 := []keyValVersion{{"foo", "bar", 1, 0}} - l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}} + l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}} l02 := []keyValVersion{{"foo", "bar", 3, 0}} l03 := []keyValVersion{{"foo", "bar", 4, 0}} l04 := []keyValVersion{{"foo", "bar", 9, 0}} - l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}} + l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}} // Level 0 has all the tables. createAndOpen(db, l0, 0) @@ -731,11 +731,11 @@ func TestDiscardFirstVersion(t *testing.T) { // - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions" // marker so IT WILL BE REMOVED. ExpectedKeys := []keyValVersion{ - {"foo", "bar", 10, bitDiscardEarlierVersions}, + {"foo", "bar", 10, BitDiscardEarlierVersions}, {"foo", "bar", 9, 0}, {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, - {"foo", "bar", 2, bitDiscardEarlierVersions}} + {"foo", "bar", 2, BitDiscardEarlierVersions}} getAllAndCheck(t, db, ExpectedKeys) }) @@ -1049,15 +1049,15 @@ func TestSameLevel(t *testing.T) { opt.LmaxCompaction = true runBadgerTest(t, &opt, func(t *testing.T, db *DB) { l6 := []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, } l61 := []keyValVersion{ - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, } l62 := []keyValVersion{ - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, } createAndOpen(db, l6, 6) @@ -1066,11 +1066,11 @@ func TestSameLevel(t *testing.T) { require.NoError(t, db.lc.validate()) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, }) @@ -1086,11 +1086,11 @@ func TestSameLevel(t *testing.T) { db.SetDiscardTs(3) require.NoError(t, db.lc.runCompactDef(-1, 6, cdef)) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0}, + {"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0}, {"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0}, {"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0}, {"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0}, }) @@ -1107,9 +1107,9 @@ func TestSameLevel(t *testing.T) { cdef.t.baseLevel = 1 require.NoError(t, db.lc.runCompactDef(-1, 6, cdef)) getAllAndCheck(t, db, []keyValVersion{ - {"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0}, - {"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0}, - {"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}}) + {"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0}, + {"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0}, + {"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}}) require.NoError(t, db.lc.validate()) }) } @@ -1255,7 +1255,7 @@ func TestStaleDataCleanup(t *testing.T) { for i := count; i > 0; i-- { var meta byte if i == 0 { - meta = bitDiscardEarlierVersions + meta = BitDiscardEarlierVersions } b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0) } diff --git a/managed_db_test.go b/managed_db_test.go index 58acbc8dc..2606d83e9 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -777,6 +777,54 @@ func TestWriteBatchDuplicate(t *testing.T) { }) } +func TestWriteViaSkip(t *testing.T) { + key := func(i int) []byte { + return []byte(fmt.Sprintf("%10d", i)) + } + val := func(i int) []byte { + return []byte(fmt.Sprintf("%128d", i)) + } + opt := DefaultOptions("") + opt.managedTxns = true + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + s := db.NewSkiplist() + for i := 0; i < 100; i++ { + s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)}) + } + { + // Update key timestamps by directly changing them in the skiplist. + itr := s.NewUniIterator(false) + defer itr.Close() + itr.Rewind() + for itr.Valid() { + y.SetKeyTs(itr.Key(), 101) + itr.Next() + } + } + + // Hand over skiplist to Badger. + require.NoError(t, db.HandoverSkiplist(s, nil)) + + // Read the data back. + txn := db.NewTransactionAt(101, false) + defer txn.Discard() + itr := txn.NewIterator(DefaultIteratorOptions) + defer itr.Close() + + i := 0 + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + require.Equal(t, string(key(i)), string(item.Key())) + require.Equal(t, item.Version(), uint64(101)) + valcopy, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, val(i), valcopy) + i++ + } + require.Equal(t, 100, i) + }) +} + func TestZeroDiscardStats(t *testing.T) { N := uint64(10000) populate := func(t *testing.T, db *DB) { diff --git a/merge.go b/merge.go index afc14df9b..07dd37485 100644 --- a/merge.go +++ b/merge.go @@ -105,7 +105,7 @@ func (op *MergeOperator) compact() error { { Key: y.KeyWithTs(op.key, version), Value: val, - meta: bitDiscardEarlierVersions, + meta: BitDiscardEarlierVersions, }, } // Write value back to the DB. It is important that we do not set the bitMergeEntry bit diff --git a/options.go b/options.go index 218b94772..d7b301b2a 100644 --- a/options.go +++ b/options.go @@ -139,7 +139,7 @@ func DefaultOptions(path string) Options { NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, NumLevelZeroTablesStall: 15, - NumMemtables: 5, + NumMemtables: 15, BloomFalsePositive: 0.01, BlockSize: 4 * 1024, SyncWrites: false, diff --git a/structs.go b/structs.go index 75aec4bc5..ba40b3196 100644 --- a/structs.go +++ b/structs.go @@ -196,7 +196,7 @@ func (e *Entry) WithMeta(meta byte) *Entry { // have a higher setting for NumVersionsToKeep (in Dgraph, we set it to infinity), you can use this // method to indicate that all the older versions can be discarded and removed during compactions. func (e *Entry) WithDiscard() *Entry { - e.meta = bitDiscardEarlierVersions + e.meta = BitDiscardEarlierVersions return e } diff --git a/txn.go b/txn.go index 50d17a5bc..9f79c0bbf 100644 --- a/txn.go +++ b/txn.go @@ -348,9 +348,30 @@ func exceedsSize(prefix string, max int64, key []byte) error { prefix, len(key), max, prefix, hex.Dump(key[:1<<10])) } -func (txn *Txn) modify(e *Entry) error { - const maxKeySize = 65000 +const maxKeySize = 65000 +const maxValSize = 1 << 20 +func ValidEntry(db *DB, key, val []byte) error { + switch { + case len(key) == 0: + return ErrEmptyKey + case bytes.HasPrefix(key, badgerPrefix): + return ErrInvalidKey + case len(key) > maxKeySize: + // Key length can't be more than uint16, as determined by table::header. To + // keep things safe and allow badger move prefix and a timestamp suffix, let's + // cut it down to 65000, instead of using 65536. + return exceedsSize("Key", maxKeySize, key) + case int64(len(val)) > maxValSize: + return exceedsSize("Value", maxValSize, val) + } + if err := db.isBanned(key); err != nil { + return err + } + return nil +} + +func (txn *Txn) modify(e *Entry) error { switch { case !txn.update: return ErrReadOnlyTxn @@ -374,7 +395,6 @@ func (txn *Txn) modify(e *Entry) error { if err := txn.db.isBanned(e.Key); err != nil { return err } - if err := txn.checkSize(e); err != nil { return err } diff --git a/value.go b/value.go index 80cc4c436..6028d1497 100644 --- a/value.go +++ b/value.go @@ -38,7 +38,7 @@ var maxVlogFileSize uint32 = math.MaxUint32 const ( bitDelete byte = 1 << 0 // Set if the key has been deleted. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key. - bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded. + BitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded. // Set if item shouldn't be discarded via compactions (used by merge operator) bitMergeEntry byte = 1 << 3 // The MSB 2 bits are for transactions. diff --git a/value_test.go b/value_test.go index 7999cc968..1ec5cc939 100644 --- a/value_test.go +++ b/value_test.go @@ -125,6 +125,8 @@ func TestValueBasic(t *testing.T) { } func TestValueGCManaged(t *testing.T) { + t.Skipf("Value Log is not used in managed mode.") + dir, err := os.MkdirTemp("", "badger-test") require.NoError(t, err) defer removeDir(dir) diff --git a/y/y.go b/y/y.go index 761e87038..9a225af5d 100644 --- a/y/y.go +++ b/y/y.go @@ -105,6 +105,11 @@ func Copy(a []byte) []byte { return b } +func SetKeyTs(key []byte, ts uint64) { + start := len(key) - 8 + binary.BigEndian.PutUint64(key[start:], math.MaxUint64-ts) +} + // KeyWithTs generates a new key by appending ts to key. func KeyWithTs(key []byte, ts uint64) []byte { out := make([]byte, len(key)+8) From 54d6ec5db1a78ae290562e8ddcc98aedfbdad530 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 3 Jul 2025 08:09:13 +0530 Subject: [PATCH 3/4] added changes --- skl/arena.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/skl/arena.go b/skl/arena.go index 33c448482..ac655cf9f 100644 --- a/skl/arena.go +++ b/skl/arena.go @@ -6,6 +6,7 @@ package skl import ( + "fmt" "sync/atomic" "unsafe" @@ -101,6 +102,7 @@ func (s *Arena) getNode(offset uint32) *node { // getKey returns byte slice at offset. func (s *Arena) getKey(offset uint32, size uint16) []byte { + fmt.Println("GET KEY", offset, size) return s.buf[offset : offset+uint32(size)] } From 64c6bd6af8912150793224172f0a088e8478d97b Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 3 Jul 2025 23:21:53 +0530 Subject: [PATCH 4/4] added changes --- skl/arena.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/skl/arena.go b/skl/arena.go index ac655cf9f..33c448482 100644 --- a/skl/arena.go +++ b/skl/arena.go @@ -6,7 +6,6 @@ package skl import ( - "fmt" "sync/atomic" "unsafe" @@ -102,7 +101,6 @@ func (s *Arena) getNode(offset uint32) *node { // getKey returns byte slice at offset. func (s *Arena) getKey(offset uint32, size uint16) []byte { - fmt.Println("GET KEY", offset, size) return s.buf[offset : offset+uint32(size)] }