Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions sei-db/sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,15 +836,9 @@ func (db *DB) Close() error {
errs := []error{}
db.pruneSnapshotLock.Lock()
defer db.pruneSnapshotLock.Unlock()
// Close stream handler
db.logger.Info("Closing stream handler...")
if db.streamHandler != nil {
err := db.streamHandler.Close()
errs = append(errs, err)
db.streamHandler = nil
}

// Close rewrite channel
// Close rewrite channel FIRST - must wait for background goroutine to finish
// before closing streamHandler, as the goroutine uses db.streamHandler
db.logger.Info("Closing rewrite channel...")
if db.snapshotRewriteChan != nil {
db.snapshotRewriteCancelFunc()
Expand All @@ -854,8 +848,12 @@ func (db *DB) Close() error {
db.logger.Error("snapshot rewrite failed during close", "error", result.err)
}
}
db.snapshotRewriteChan = nil
db.snapshotRewriteCancelFunc = nil
}

// Close stream handler AFTER background goroutine finishes
db.logger.Info("Closing stream handler...")
if db.streamHandler != nil {
errs = append(errs, db.streamHandler.Close())
}

errs = append(errs, db.MultiTree.Close())
Expand All @@ -865,8 +863,13 @@ func (db *DB) Close() error {
if db.fileLock != nil {
errs = append(errs, db.fileLock.Unlock())
errs = append(errs, db.fileLock.Destroy())
db.fileLock = nil
}

// Nil out references as the last step
db.snapshotRewriteChan = nil
db.snapshotRewriteCancelFunc = nil
db.streamHandler = nil
db.fileLock = nil
db.logger.Info("Closed memiavl db.")
return errorutils.Join(errs...)
}
Expand Down
12 changes: 12 additions & 0 deletions sei-db/ss/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (itr *iterator) nextForward() {
// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
if !itr.valid {
// SeekLT failed to find a key, iterator is now invalid
return
}

tmpKey, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
Expand All @@ -187,6 +191,10 @@ func (itr *iterator) nextForward() {
if bytes.Equal(tmpKey, currKey) {
if itr.source.NextPrefix() {
itr.nextForward()
if !itr.valid {
// Recursive call invalidated the iterator
return
}

_, tmpKeyVersion, ok = SplitMVCCKey(itr.source.Key())
if !ok {
Expand Down Expand Up @@ -262,6 +270,10 @@ func (itr *iterator) nextReverse() {
// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
if !itr.valid {
// SeekLT failed to find a key, iterator is now invalid
return
}

_, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
Expand Down
Loading
Loading