Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func TestBackupLoadIncremental(t *testing.T) {
if err := txn.SetEntry(entry); err != nil {
return err
}
updates[i] = bitDiscardEarlierVersions
updates[i] = BitDiscardEarlierVersions
}
return nil
})
Expand Down
214 changes: 158 additions & 56 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type DB struct {
lc *levelsController
vlog valueLog
writeCh chan *request
flushChan chan *memTable // For flushing memtables.
flushChan chan flushTask // For flushing memtables.
closeOnce sync.Once // For closing DB only once.

blockWrites atomic.Int32
Expand Down Expand Up @@ -233,7 +233,7 @@ func Open(opt Options) (*DB, error) {

db := &DB{
imm: make([]*memTable, 0, opt.NumMemtables),
flushChan: make(chan *memTable, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
Expand Down Expand Up @@ -347,11 +347,11 @@ func Open(opt Options) (*DB, error) {

db.closers.memtable = z.NewCloser(1)
go func() {
db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- mt
db.flushChan <- flushTask{mt: mt}
}
}
// We do increment nextTxnTs below. So, no need to do it here.
Expand Down Expand Up @@ -565,12 +565,12 @@ func (db *DB) close() (err error) {
} else {
db.opt.Debugf("Flushing memtable")
for {
pushedMemTable := func() bool {
pushedFlushTask := func() bool {
db.lock.Lock()
defer db.lock.Unlock()
y.AssertTrue(db.mt != nil)
select {
case db.flushChan <- db.mt:
case db.flushChan <- flushTask{mt: db.mt}:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.opt.Debugf("pushed to flush chan\n")
Expand All @@ -583,7 +583,7 @@ func (db *DB) close() (err error) {
}
return false
}()
if pushedMemTable {
if pushedFlushTask {
break
}
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -783,16 +783,9 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
// value log and that's why the length of b.Ptrs will always be zero.
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}

for i, entry := range b.Entries {
var err error
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -838,10 +831,13 @@ func (db *DB) writeRequests(reqs []*request) error {
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
if !db.opt.managedTxns {
// Don't do value log writes in managed mode.
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
}

db.opt.Debugf("Writing to memtable")
Expand Down Expand Up @@ -1019,7 +1015,7 @@ func (db *DB) ensureRoomForWrite() error {
}

select {
case db.flushChan <- db.mt:
case db.flushChan <- flushTask{mt: db.mt}:
db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.sl.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
Expand All @@ -1036,17 +1032,62 @@ func (db *DB) ensureRoomForWrite() error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}
db.lock.Lock()
defer db.lock.Unlock()

// If we have some data in db.mt, we should push that first, so the ordering of writes is
// maintained.
if !db.mt.sl.Empty() {
sz := db.mt.sl.MemSize()
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
var err error
select {
case db.flushChan <- flushTask{mt: db.mt}:
db.imm = append(db.imm, db.mt)
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot push current memtable")
}
default:
return errNoRoom
}
}

mt := &memTable{sl: skl}
select {
case db.flushChan <- flushTask{mt: mt, cb: callback}:
db.imm = append(db.imm, mt)
return nil
default:
return errNoRoom
}
}

func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}

func (db *DB) NewSkiplist() *skl.Skiplist {
return skl.NewSkiplist(arenaSize(db.opt))
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
var iter y.Iterator
if ft.itr != nil {
iter = ft.itr
} else {
iter = ft.mt.sl.NewUniIterator(false)
}
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -1056,15 +1097,21 @@ func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}

return b
}

// handleMemTableFlush must be run serially.
func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
type flushTask struct {
mt *memTable
cb func()
itr y.Iterator
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// ft.mt could be nil with ft.itr being the valid field.
bopts := buildTableOptions(db)
itr := mt.sl.NewUniIterator(false)
builder := buildL0Table(itr, nil, bopts)
builder := buildL0Table(ft, bopts)
defer builder.Close()

// buildL0Table can return nil if the none of the items in the skiplist are
Expand Down Expand Up @@ -1093,39 +1140,89 @@ func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
return err
}

// flushMemtable must keep running until we send it an empty memtable. If there
// are errors during handling the memtable flush, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) {
// flushMemtable must keep running until we send it an empty flushTask. If there
// are errors during handling the flush task, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

for mt := range db.flushChan {
if mt == nil {
var sz int64
var itrs []y.Iterator
var mts []*memTable
var cbs []func()
slurp := func() {
for {
select {
case more := <-db.flushChan:
if more.mt == nil {
return
}
sl := more.mt.sl
itrs = append(itrs, sl.NewUniIterator(false))
mts = append(mts, more.mt)
cbs = append(cbs, more.cb)

sz += sl.MemSize()
if sz > db.opt.MemTableSize {
return
}
default:
return
}
}
}

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
continue
}
sz = ft.mt.sl.MemSize()
// Reset of itrs, mts etc. is being done below.
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
mts = append(mts, ft.mt)
cbs = append(cbs, ft.cb)

// Pick more memtables, so we can really fill up the L0 table.
slurp()

// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
ft.mt = nil
ft.itr = table.NewMergeIterator(itrs, false)
ft.cb = nil

for {
if err := db.handleMemTableFlush(mt, nil); err != nil {
// Encountered error. Retry indefinitely.
db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
time.Sleep(time.Second)
continue
}
err := db.handleFlushTask(ft)
if err == nil {
// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. ft.mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next ft.mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
for _, mt := range mts {
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
}
db.lock.Unlock()

// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
// unlock
db.lock.Unlock()
break
for _, cb := range cbs {
if cb != nil {
cb()
}
}
break
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
// Reset everything.
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
}
return nil
}

func exists(path string) (bool, error) {
Expand Down Expand Up @@ -1545,10 +1642,10 @@ func (db *DB) startCompactions() {
func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan *memTable, db.opt.NumMemtables)
db.flushChan = make(chan flushTask, db.opt.NumMemtables)
db.closers.memtable = z.NewCloser(1)
go func() {
db.flushMemtable(db.closers.memtable)
_ = db.flushMemtable(db.closers.memtable)
}()
}
}
Expand Down Expand Up @@ -1651,7 +1748,7 @@ func (db *DB) prepareToDrop() (func(), error) {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending memtable. So that, we
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return func() {}, err
Expand Down Expand Up @@ -1700,7 +1797,7 @@ func (db *DB) dropAll() (func(), error) {
if err != nil {
return f, err
}
// prepareToDrop will stop all the incoming write and flushes any pending memtables.
// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
db.stopCompactions()
Expand Down Expand Up @@ -1782,8 +1879,13 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
memtable.DecrRef()
continue
}
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
dropPrefixes: filtered,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleMemTableFlush(memtable, filtered); err != nil {
if err := db.handleFlushTask(task); err != nil {
db.opt.Errorf("While trying to flush memtable: %v", err)
return err
}
Expand Down
Loading