From aa7c583d8f811d74452c5e8fc03ab55ebd42e91f Mon Sep 17 00:00:00 2001 From: John Floren Date: Tue, 4 May 2021 08:41:20 -0700 Subject: [PATCH 1/6] Add test for issue #63 --- issues_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/issues_test.go b/issues_test.go index 5bf9e35..4c21de9 100644 --- a/issues_test.go +++ b/issues_test.go @@ -189,3 +189,55 @@ func TestIssue40(t *testing.T) { // is no room in the cache for this entry and it panics. d.Read(k2) } + +// Test issue #63, where a reader obtained from ReadStream will start +// to return invalid data if WriteStream is called before you finish +// reading. +func TestIssue63(t *testing.T) { + var ( + basePath = "test-data" + ) + // Simplest transform function: put all the data files into the base dir. + flatTransform := func(s string) []string { return []string{} } + + // Initialize a new diskv store, rooted at "my-data-dir", + // with no cache. + d := New(Options{ + BasePath: basePath, + Transform: flatTransform, + CacheSizeMax: 0, + }) + + defer d.EraseAll() + + // Write a big entry + k1 := "key1" + d1 := make([]byte, 1024*1024) + rand.Read(d1) + d.Write(k1, d1) + + // Open a reader. We set the direct flag to be sure we're going straight to disk. + s1, err := d.ReadStream(k1, true) + if err != nil { + t.Fatal(err) + } + + // Now generate a second big entry and put it in the *same* key + d2 := make([]byte, 1024*1024) + rand.Read(d2) + d.Write(k1, d2) + + // Now read from that stream we opened + out, err := ioutil.ReadAll(s1) + if err != nil { + t.Fatal(err) + } + if len(out) != len(d1) { + t.Fatalf("Invalid read: got %v bytes expected %v\n", len(out), len(d1)) + } + for i := range out { + if out[i] != d1[i] { + t.Fatalf("Output differs from expected at byte %v", i) + } + } +} \ No newline at end of file From ff85ff09d901fc3e5cdbe0b53cda3565b0e34bc8 Mon Sep 17 00:00:00 2001 From: John Floren Date: Thu, 6 May 2021 13:15:06 -0700 Subject: [PATCH 2/6] Implement fix for #63, read streams getting invalid data after write. This reworks the createKeyFileWithLock function (used only by writeStreamWithLock) so it always writes to a temporary file, which is renamed by writeStreamWithLock when complete. A deferred recover is added to clean up the temporary file if writing fails. This essentially deprecates the TempDir option by making all writes atomic. --- diskv.go | 55 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/diskv.go b/diskv.go index 9f07b85..b5e0cbb 100644 --- a/diskv.go +++ b/diskv.go @@ -9,11 +9,13 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "os" "path/filepath" "strings" "sync" "syscall" + "time" ) const ( @@ -40,6 +42,11 @@ var ( errImportDirectory = errors.New("can't import a directory") ) +func init() { + // Make sure we have good random numbers + rand.Seed(time.Now().UnixNano()) +} + // TransformFunction transforms a key into a slice of strings, with each // element in the slice representing a directory in the file path where the // key's entry will eventually be stored. @@ -76,6 +83,7 @@ type Options struct { CacheSizeMax uint64 // bytes PathPerm os.FileMode FilePerm os.FileMode + // Note: TempDir is deprecated, all writes are now atomic. // If TempDir is set, it will enable filesystem atomic writes by // writing temporary files to that location before being moved // to BasePath. @@ -196,28 +204,18 @@ func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error { return d.writeStreamWithLock(pathKey, r, sync) } -// createKeyFileWithLock either creates the key file directly, or -// creates a temporary file in TempDir if it is set. +// createKeyFileWithLock creates the key file with a random extension. This +// will be automatically renamed by writeStreamWithLock once the write has been +// completed. This solves issue #63, where calling ReadStream, then updating the +// key before reading completes, leads to the reader getting invalid data. func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) { - if d.TempDir != "" { - if err := os.MkdirAll(d.TempDir, d.PathPerm); err != nil { - return nil, fmt.Errorf("temp mkdir: %s", err) - } - f, err := ioutil.TempFile(d.TempDir, "") - if err != nil { - return nil, fmt.Errorf("temp file: %s", err) - } - - if err := os.Chmod(f.Name(), d.FilePerm); err != nil { - f.Close() // error deliberately ignored - os.Remove(f.Name()) // error deliberately ignored - return nil, fmt.Errorf("chmod: %s", err) - } - return f, nil - } - - mode := os.O_WRONLY | os.O_CREATE | os.O_TRUNC // overwrite if exists - f, err := os.OpenFile(d.completeFilename(pathKey), mode, d.FilePerm) + // Figure out the path and append a random number + path := fmt.Sprintf("%s.%d", d.completeFilename(pathKey), rand.Int()) + // It's incredibly unlikely that the destination file will exist, but + // we want to be absolutely sure: O_EXCL means we'll get an error if the + // file already exists. + mode := os.O_WRONLY | os.O_CREATE | os.O_EXCL + f, err := os.OpenFile(path, mode, d.FilePerm) if err != nil { return nil, fmt.Errorf("open file: %s", err) } @@ -226,14 +224,27 @@ func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) { // writeStream does no input validation checking. func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) error { + // fullPath is the on-disk location of the key + fullPath := d.completeFilename(pathKey) + if err := d.ensurePathWithLock(pathKey); err != nil { return fmt.Errorf("ensure path: %s", err) } + // createKeyFileWithLock gives us a temporary file we can write to. + // We'll move it when we're all done. f, err := d.createKeyFileWithLock(pathKey) if err != nil { return fmt.Errorf("create key file: %s", err) } + // In case something bad happens, we want to delete the temporary file, + // lest we leave junk in the store. + defer func() { + if r := recover(); r != nil { + os.Remove(f.Name()) + panic(r) + } + }() wc := io.WriteCloser(&nopWriteCloser{f}) if d.Compression != nil { @@ -269,7 +280,7 @@ func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) er return fmt.Errorf("file close: %s", err) } - fullPath := d.completeFilename(pathKey) + // Move the temporary file to the final location. if f.Name() != fullPath { if err := os.Rename(f.Name(), fullPath); err != nil { os.Remove(f.Name()) // error deliberately ignored From 7f654d3bf876a44828917bed0197c04b1de55929 Mon Sep 17 00:00:00 2001 From: John Floren Date: Wed, 18 Aug 2021 10:00:51 -0700 Subject: [PATCH 3/6] eliminate init function, make a random source inside Diskv struct --- diskv.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/diskv.go b/diskv.go index b5e0cbb..19dd4cf 100644 --- a/diskv.go +++ b/diskv.go @@ -42,11 +42,6 @@ var ( errImportDirectory = errors.New("can't import a directory") ) -func init() { - // Make sure we have good random numbers - rand.Seed(time.Now().UnixNano()) -} - // TransformFunction transforms a key into a slice of strings, with each // element in the slice representing a directory in the file path where the // key's entry will eventually be stored. @@ -104,6 +99,7 @@ type Diskv struct { mu sync.RWMutex cache map[string][]byte cacheSize uint64 + rnd *rand.Rand } // New returns an initialized Diskv structure, ready to use. @@ -140,6 +136,7 @@ func New(o Options) *Diskv { Options: o, cache: map[string][]byte{}, cacheSize: 0, + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), } if d.Index != nil && d.IndexLess != nil { @@ -210,7 +207,7 @@ func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error { // key before reading completes, leads to the reader getting invalid data. func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) { // Figure out the path and append a random number - path := fmt.Sprintf("%s.%d", d.completeFilename(pathKey), rand.Int()) + path := fmt.Sprintf("%s.%d", d.completeFilename(pathKey), d.rnd.Int()) // It's incredibly unlikely that the destination file will exist, but // we want to be absolutely sure: O_EXCL means we'll get an error if the // file already exists. From 823a6a35ba87e72a130824807277be0dc815ce76 Mon Sep 17 00:00:00 2001 From: John Floren Date: Wed, 18 Aug 2021 11:20:27 -0700 Subject: [PATCH 4/6] Put the temporary files used for atomic writes into a subdirectory of BasePath. The key walking functions have been modified to ignore everything inside that directory, so getting a list of keys while a WriteStream is in progress will not include the temporary files. --- basic_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++ diskv.go | 47 ++++++++++++++++++---------------------- issues_test.go | 2 +- 3 files changed, 80 insertions(+), 27 deletions(-) diff --git a/basic_test.go b/basic_test.go index a2cb88b..505afaf 100644 --- a/basic_test.go +++ b/basic_test.go @@ -3,6 +3,7 @@ package diskv import ( "bytes" "errors" + "io" "math/rand" "regexp" "strings" @@ -428,3 +429,60 @@ func TestHybridStore(t *testing.T) { } } + +// Make sure that temporary files used for atomic writes never +// show up in the key listing +func TestIgnoreAtomicTempFiles(t *testing.T) { + var ( + basePath = "test-data" + ) + // Simplest transform function: put all the data files into the base dir. + flatTransform := func(s string) []string { return []string{} } + + // Initialize a new diskv store, rooted at "my-data-dir", + // with no cache. + d := New(Options{ + BasePath: basePath, + Transform: flatTransform, + CacheSizeMax: 0, + }) + + // Write something in so everything is set up + d.Write("foo", []byte("bar")) + + // Start to write an entry using a stream, but do not + // put anything into it yet! + key := "key1" + data := make([]byte, 1024*1024) + rand.Read(data) + + // Get a pipe + rdr, wtr := io.Pipe() + + // Start the write + go d.WriteStream(key, rdr, true) + + // Now list keys: there should be 1 key. + keys := d.Keys(nil) + var count int + for _ = range keys { + count++ + } + if count != 1 { + t.Fatalf("Expected 1 key, got %d", count) + } + + // Now complete the write + wtr.Write(data) + wtr.Close() + + // And make sure we see exactly two keys + keys = d.Keys(nil) + for _ = range keys { + count++ + } + if count != 2 { + t.Fatalf("Expected 2 keys, got %d", count) + } + d.EraseAll() +} \ No newline at end of file diff --git a/diskv.go b/diskv.go index 19dd4cf..ed9f346 100644 --- a/diskv.go +++ b/diskv.go @@ -22,6 +22,8 @@ const ( defaultBasePath = "diskv" defaultFilePerm os.FileMode = 0666 defaultPathPerm os.FileMode = 0777 + + DefaultAtomicPrefix = ".diskv_atomic_temp" ) // PathKey represents a string key that has been transformed to @@ -79,12 +81,11 @@ type Options struct { PathPerm os.FileMode FilePerm os.FileMode // Note: TempDir is deprecated, all writes are now atomic. - // If TempDir is set, it will enable filesystem atomic writes by - // writing temporary files to that location before being moved - // to BasePath. - // Note that TempDir MUST be on the same device/partition as - // BasePath. TempDir string + // AtomicPrefix sets the name of a directory which will be created + // within BasePath to store temporary files for atomic writes. + // It defaults to DefaultAtomicPrefix; you probably don't need to change it. + AtomicPrefix string Index Index IndexLess LessFunction @@ -109,6 +110,9 @@ func New(o Options) *Diskv { if o.BasePath == "" { o.BasePath = defaultBasePath } + if o.AtomicPrefix == "" { + o.AtomicPrefix = DefaultAtomicPrefix + } if o.AdvancedTransform == nil { if o.Transform == nil { @@ -201,24 +205,6 @@ func (d *Diskv) WriteStream(key string, r io.Reader, sync bool) error { return d.writeStreamWithLock(pathKey, r, sync) } -// createKeyFileWithLock creates the key file with a random extension. This -// will be automatically renamed by writeStreamWithLock once the write has been -// completed. This solves issue #63, where calling ReadStream, then updating the -// key before reading completes, leads to the reader getting invalid data. -func (d *Diskv) createKeyFileWithLock(pathKey *PathKey) (*os.File, error) { - // Figure out the path and append a random number - path := fmt.Sprintf("%s.%d", d.completeFilename(pathKey), d.rnd.Int()) - // It's incredibly unlikely that the destination file will exist, but - // we want to be absolutely sure: O_EXCL means we'll get an error if the - // file already exists. - mode := os.O_WRONLY | os.O_CREATE | os.O_EXCL - f, err := os.OpenFile(path, mode, d.FilePerm) - if err != nil { - return nil, fmt.Errorf("open file: %s", err) - } - return f, nil -} - // writeStream does no input validation checking. func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) error { // fullPath is the on-disk location of the key @@ -228,9 +214,10 @@ func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) er return fmt.Errorf("ensure path: %s", err) } - // createKeyFileWithLock gives us a temporary file we can write to. + // Get a temporary file we can write to. // We'll move it when we're all done. - f, err := d.createKeyFileWithLock(pathKey) + d.ensureAtomicTempDir() + f, err := ioutil.TempFile(d.atomicTempPath(), pathKey.FileName) if err != nil { return fmt.Errorf("create key file: %s", err) } @@ -604,7 +591,7 @@ func (d *Diskv) walker(c chan<- string, prefix string, cancel <-chan struct{}) f key := d.InverseTransform(pathKey) - if info.IsDir() || !strings.HasPrefix(key, prefix) { + if info.IsDir() || !strings.HasPrefix(key, prefix) || strings.HasPrefix(dir, d.AtomicPrefix) { return nil // "pass" } @@ -635,6 +622,14 @@ func (d *Diskv) completeFilename(pathKey *PathKey) string { return filepath.Join(d.pathFor(pathKey), pathKey.FileName) } +func (d *Diskv) ensureAtomicTempDir() error { + return os.MkdirAll(d.atomicTempPath(), d.PathPerm) +} + +func (d *Diskv) atomicTempPath() string { + return filepath.Join(d.BasePath, d.AtomicPrefix) +} + // cacheWithLock attempts to cache the given key-value pair in the store's // cache. It can fail if the value is larger than the cache's maximum size. func (d *Diskv) cacheWithLock(key string, val []byte) error { diff --git a/issues_test.go b/issues_test.go index 4c21de9..db5fe7a 100644 --- a/issues_test.go +++ b/issues_test.go @@ -240,4 +240,4 @@ func TestIssue63(t *testing.T) { t.Fatalf("Output differs from expected at byte %v", i) } } -} \ No newline at end of file +} From 1bf728ee5b207776ea7fc87f04b27a1a531f8510 Mon Sep 17 00:00:00 2001 From: John Floren Date: Wed, 18 Aug 2021 11:26:45 -0700 Subject: [PATCH 5/6] remove recover and add some cleanup --- diskv.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/diskv.go b/diskv.go index ed9f346..b083c70 100644 --- a/diskv.go +++ b/diskv.go @@ -147,6 +147,11 @@ func New(o Options) *Diskv { d.Index.Initialize(d.IndexLess, d.Keys(nil)) } + // Just in case there were any failures during writes previously, we + // remove the atomic write directory (and any temp files within it). + // The directory will be created the first time we do a Write. + os.RemoveAll(d.atomicTempPath()) + return d } @@ -221,14 +226,6 @@ func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) er if err != nil { return fmt.Errorf("create key file: %s", err) } - // In case something bad happens, we want to delete the temporary file, - // lest we leave junk in the store. - defer func() { - if r := recover(); r != nil { - os.Remove(f.Name()) - panic(r) - } - }() wc := io.WriteCloser(&nopWriteCloser{f}) if d.Compression != nil { From aceff81d7c577f654cbd31f9e9aa10f24f496cda Mon Sep 17 00:00:00 2001 From: John Floren Date: Wed, 18 Aug 2021 11:27:37 -0700 Subject: [PATCH 6/6] remove extraneous if check --- diskv.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/diskv.go b/diskv.go index b083c70..46241a6 100644 --- a/diskv.go +++ b/diskv.go @@ -262,11 +262,9 @@ func (d *Diskv) writeStreamWithLock(pathKey *PathKey, r io.Reader, sync bool) er } // Move the temporary file to the final location. - if f.Name() != fullPath { - if err := os.Rename(f.Name(), fullPath); err != nil { - os.Remove(f.Name()) // error deliberately ignored - return fmt.Errorf("rename: %s", err) - } + if err := os.Rename(f.Name(), fullPath); err != nil { + os.Remove(f.Name()) // error deliberately ignored + return fmt.Errorf("rename: %s", err) } if d.Index != nil {