Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestPebbleEncryption(t *testing.T) {

func() {
// Initialize the filesystem env.
settings := cluster.MakeTestingClusterSettings()
env, err := fs.InitEnvFromStoreSpec(
ctx,
base.StoreSpec{
Expand All @@ -255,12 +256,15 @@ func TestPebbleEncryption(t *testing.T) {
EncryptionOptions: encOptions,
StickyVFSID: stickyVFSID,
},
fs.ReadWrite,
fs.EnvConfig{
RW: fs.ReadWrite,
Version: settings.Version,
},
stickyRegistry, /* sticky registry */
nil, /* statsCollector */
)
require.NoError(t, err)
db, err := storage.Open(ctx, env, cluster.MakeTestingClusterSettings())
db, err := storage.Open(ctx, env, settings)
require.NoError(t, err)
defer db.Close()

Expand Down Expand Up @@ -295,6 +299,7 @@ func TestPebbleEncryption(t *testing.T) {

func() {
// Initialize the filesystem env again, replaying the file registries.
settings := cluster.MakeTestingClusterSettings()
env, err := fs.InitEnvFromStoreSpec(
ctx,
base.StoreSpec{
Expand All @@ -303,12 +308,15 @@ func TestPebbleEncryption(t *testing.T) {
EncryptionOptions: encOptions,
StickyVFSID: stickyVFSID,
},
fs.ReadWrite,
fs.EnvConfig{
RW: fs.ReadWrite,
Version: settings.Version,
},
stickyRegistry, /* sticky registry */
nil, /* statsCollector */
)
require.NoError(t, err)
db, err := storage.Open(ctx, env, cluster.MakeTestingClusterSettings())
db, err := storage.Open(ctx, env, settings)
require.NoError(t, err)
defer db.Close()
require.Equal(t, []byte("a"), storageutils.MVCCGetRaw(t, db, storageutils.PointKey("a", 0)))
Expand Down Expand Up @@ -382,6 +390,7 @@ func TestPebbleEncryption2(t *testing.T) {

// Initialize the filesystem env.
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
env, err := fs.InitEnvFromStoreSpec(
ctx,
base.StoreSpec{
Expand All @@ -390,12 +399,15 @@ func TestPebbleEncryption2(t *testing.T) {
EncryptionOptions: encOptions,
StickyVFSID: stickyVFSID,
},
fs.ReadWrite,
fs.EnvConfig{
RW: fs.ReadWrite,
Version: settings.Version,
},
stickyRegistry, /* sticky registry */
nil, /* statsCollector */
)
require.NoError(t, err)
db, err := storage.Open(ctx, env, cluster.MakeTestingClusterSettings())
db, err := storage.Open(ctx, env, settings)
require.NoError(t, err)
defer db.Close()

Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/auto_decrypt_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ func TestAutoDecryptFS(t *testing.T) {
expected := `
$TMPDIR/path1: mkdir-all: 0777
$TMPDIR/path1: lock: LOCK
$TMPDIR/path1: open: STORAGE_MIN_VERSION
$TMPDIR/path1: mkdir-all: $TMPDIR/path1 0755
$TMPDIR/path1: create: $TMPDIR/path1/bar
$TMPDIR/path1: close: $TMPDIR/path1/bar
$TMPDIR/foo/path2: mkdir-all: 0777
$TMPDIR/foo/path2: lock: LOCK
$TMPDIR/foo/path2: open: STORAGE_MIN_VERSION
$TMPDIR/foo/path2: mkdir-all: $TMPDIR/foo/path2 0755
$TMPDIR/foo/path2: create: $TMPDIR/foo/path2/baz
$TMPDIR/foo/path2: close: $TMPDIR/foo/path2/baz
Expand Down
6 changes: 4 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (f *keyFormat) Type() string {
// engine, they should manually open it using storage.Open. The returned Env has
// 1 reference and the caller must ensure it's closed.
func OpenFilesystemEnv(dir string, rw fs.RWMode) (*fs.Env, error) {
envConfig := fs.EnvConfig{RW: rw}
envConfig := fs.EnvConfig{RW: rw, Version: serverCfg.Settings.Version}
if err := fillEncryptionOptionsForStore(dir, &envConfig); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1743,7 +1743,9 @@ func pebbleCryptoInitializer(ctx context.Context) {
encryptedPaths = append(encryptedPaths, spec.Path)
}
resolveFn := func(dir string) (*fs.Env, error) {
var envConfig fs.EnvConfig
envConfig := fs.EnvConfig{
Version: serverCfg.Settings.Version,
}
if err := fillEncryptionOptionsForStore(dir, &envConfig); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/ttl/ttlbase",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/testutils",
"//pkg/testutils/fingerprintutils",
"//pkg/testutils/floatcmp",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/versionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils/release"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -281,7 +281,7 @@ func makeVersionFixtureAndFatal(
// #54761.
c.Run(ctx, option.WithNodes(c.Node(1)), "cp", "{store-dir}/cluster-bootstrapped", "{store-dir}/"+name)
// Similar to the above - newer versions require the min version file to open a store.
c.Run(ctx, option.WithNodes(c.All()), "cp", fmt.Sprintf("{store-dir}/%s", storage.MinVersionFilename), "{store-dir}/"+name)
c.Run(ctx, option.WithNodes(c.All()), "cp", fmt.Sprintf("{store-dir}/%s", fs.MinVersionFilename), "{store-dir}/"+name)
c.Run(ctx, option.WithNodes(c.All()), "tar", "-C", "{store-dir}/"+name, "-czf", "{log-dir}/"+name+".tgz", ".")
t.Fatalf(`successfully created checkpoints; failing test on purpose.

Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func verifyStatsOnServers(
// To recompute the metrics, we need an open engine. Open the
// Engine again in read-only mode (leaving the rest of the
// Server stopped) to compute MVCC stats.
env, err := fs.InitEnvFromStoreSpec(ctx, specs[storeIdx], fs.ReadOnly, stickyRegistry, nil /* statsCollector */)
env, err := fs.InitEnvFromStoreSpec(ctx, specs[storeIdx], fs.EnvConfig{
RW: fs.ReadOnly,
Version: s.GetStoreConfig().Settings.Version,
}, stickyRegistry, nil /* statsCollector */)
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
// VFS to verify its contents.
ctx := context.Background()
memFS := stickyVFSRegistry.Get(vfsIDs[i])
env, err := fs.InitEnv(ctx, memFS, cps[0], fs.EnvConfig{RW: fs.ReadOnly}, nil /* statsCollector */)
settings := cluster.MakeClusterSettings()
env, err := fs.InitEnv(ctx, memFS, cps[0], fs.EnvConfig{RW: fs.ReadOnly, Version: settings.Version}, nil /* statsCollector */)
require.NoError(t, err)
cpEng, err := storage.Open(ctx, env, cluster.MakeClusterSettings(),
cpEng, err := storage.Open(ctx, env, settings,
storage.ForTesting, storage.MustExist, storage.CacheSize(1<<20))
if err != nil {
require.NoError(t, err)
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,12 @@ func TestSideloadStorageSync(t *testing.T) {
// able to emulate crash restart by rolling it back to last synced state.
ctx := context.Background()
memFS := vfs.NewCrashableMem()
env, err := fs.InitEnv(ctx, memFS, "", fs.EnvConfig{}, nil /* statsCollector */)
settings := cluster.MakeTestingClusterSettings()
env, err := fs.InitEnv(ctx, memFS, "", fs.EnvConfig{
Version: settings.Version,
}, nil /* statsCollector */)
require.NoError(t, err)
eng, err := storage.Open(ctx, env, cluster.MakeTestingClusterSettings(), storage.ForTesting)
eng, err := storage.Open(ctx, env, settings, storage.ForTesting)
require.NoError(t, err)
ss := newTestingSideloadStorage(eng)

Expand All @@ -595,9 +598,11 @@ func TestSideloadStorageSync(t *testing.T) {
// Reset filesystem to the last synced state.

// Emulate process restart. Load from the last synced state.
env, err = fs.InitEnv(ctx, crashFS, "", fs.EnvConfig{}, nil /* statsCollector */)
env, err = fs.InitEnv(ctx, crashFS, "", fs.EnvConfig{
Version: settings.Version,
}, nil /* statsCollector */)
require.NoError(t, err)
eng, err = storage.Open(ctx, env, cluster.MakeTestingClusterSettings(), storage.ForTesting)
eng, err = storage.Open(ctx, env, settings, storage.ForTesting)
require.NoError(t, err)
defer eng.Close()
ss = newTestingSideloadStorage(eng)
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,10 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
stickyRegistry = serverKnobs.StickyVFSRegistry
}

storeEnvs, err := fs.InitEnvsFromStoreSpecs(ctx, cfg.Stores.Specs, fs.ReadWrite, stickyRegistry, cfg.DiskWriteStats)
storeEnvs, err := fs.InitEnvsFromStoreSpecs(ctx, cfg.Stores.Specs, fs.EnvConfig{
RW: fs.ReadWrite,
Version: cfg.Settings.Version,
}, stickyRegistry, cfg.DiskWriteStats)
if err != nil {
return Engines{}, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bench_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,13 @@ func buildInitialState(
// or not, we build the conditions using an in-memory engine for
// performance.
buildFS = vfs.NewMem()
env, err := fs.InitEnv(ctx, buildFS, "", fs.EnvConfig{}, nil /* statsCollector */)
settings := cluster.MakeClusterSettings()
env, err := fs.InitEnv(ctx, buildFS, "", fs.EnvConfig{
Version: settings.Version,
}, nil /* statsCollector */)
require.NoError(b, err)

e, err := Open(ctx, env, cluster.MakeClusterSettings(), opts...)
e, err := Open(ctx, env, settings, opts...)
require.NoError(b, err)
require.NoError(b, initial.Build(ctx, b, e))
e.Close()
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,10 @@ func TestCreateCheckpoint(t *testing.T) {
}

func mustInitTestEnv(t testing.TB, baseFS vfs.FS, dir string) *fs.Env {
e, err := fs.InitEnv(context.Background(), baseFS, dir, fs.EnvConfig{}, nil /* statsCollector */)
settings := cluster.MakeTestingClusterSettings()
e, err := fs.InitEnv(context.Background(), baseFS, dir, fs.EnvConfig{
Version: settings.Version,
}, nil /* statsCollector */)
require.NoError(t, err)
return e
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"encryption_at_rest.go",
"file_registry.go",
"fs.go",
"min_version.go",
"sticky_vfs.go",
"temp_dir.go",
"test_utils.go",
Expand All @@ -16,7 +17,10 @@ go_library(
deps = [
"//pkg/base",
"//pkg/cli/exit",
"//pkg/clusterversion",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/storageconfig",
Expand Down
44 changes: 44 additions & 0 deletions pkg/storage/fs/file_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,47 @@ func TestFileRegistryBlockedWriteAllowsRead(t *testing.T) {
fs.WaitForBlockAndUnblock()
require.NoError(t, registry.Close())
}

func TestSafeWriteToUnencryptedFile(t *testing.T) {
defer leaktest.AfterTest(t)()

// Use an in-memory FS that strictly enforces syncs.
mem := vfs.NewCrashableMem()
syncDir := func(dir string) {
fdir, err := mem.OpenDir(dir)
require.NoError(t, err)
require.NoError(t, fdir.Sync())
require.NoError(t, fdir.Close())
}
readFile := func(mem *vfs.MemFS, filename string) []byte {
f, err := mem.Open("foo/bar")
require.NoError(t, err)
b, err := io.ReadAll(f)
require.NoError(t, err)
require.NoError(t, f.Close())
return b
}

require.NoError(t, mem.MkdirAll("foo", os.ModePerm))
syncDir("")
f, err := mem.Create("foo/bar", UnspecifiedWriteCategory)
require.NoError(t, err)
_, err = io.WriteString(f, "Hello world")
require.NoError(t, err)
require.NoError(t, f.Sync())
require.NoError(t, f.Close())
syncDir("foo")

// Discard any unsynced writes to make sure we set up the test
// preconditions correctly.
crashFS := mem.CrashClone(vfs.CrashCloneCfg{})
require.Equal(t, []byte("Hello world"), readFile(crashFS, "foo/bar"))

// Use SafeWriteToUnencryptedFile to atomically, durably change the contents of the
// file.
require.NoError(t, SafeWriteToUnencryptedFile(crashFS, "foo", "foo/bar", []byte("Hello everyone"), UnspecifiedWriteCategory))

// Discard any unsynced writes.
crashFS = crashFS.CrashClone(vfs.CrashCloneCfg{})
require.Equal(t, []byte("Hello everyone"), readFile(crashFS, "foo/bar"))
}
Loading
Loading