diff --git a/helper/raftutil/state.go b/helper/raftutil/state.go index b50173c76b9..498f905b5d8 100644 --- a/helper/raftutil/state.go +++ b/helper/raftutil/state.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/nomad/helper/raftwalmetadata" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" @@ -78,7 +79,7 @@ func raftStateInfoBoltDB(p string) (store RaftStore, firstIdx uint64, lastIdx ui } func raftStateInfoWAL(p string) (store RaftStore, firstIdx uint64, lastIdx uint64, err error) { - s, err := raftwal.Open(p) + s, err := raftwal.Open(p, raftwal.WithMetaStore(&raftwalmetadata.FileMetaDB{})) if err != nil { return nil, 0, 0, fmt.Errorf("failed to open WAL logs: %v", err) } diff --git a/helper/raftwalmetadata/raft_wal_file_metadata.go b/helper/raftwalmetadata/raft_wal_file_metadata.go new file mode 100644 index 00000000000..999dda32b56 --- /dev/null +++ b/helper/raftwalmetadata/raft_wal_file_metadata.go @@ -0,0 +1,261 @@ +// Copyright IBM Corp. 2020, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package raftwalmetadata + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/hashicorp/raft-wal/metadb" + "github.com/hashicorp/raft-wal/types" +) + +const ( + // FileMetaDBFileName is the name of the JSON metadata file. + FileMetaDBFileName = "wal-meta.json" +) + +// persistedData is the JSON-serialisable envelope written to FileMetaDBFileName. +// Both halves of the MetaStore interface are flushed together in one atomic +// write so the file is always self-consistent. +// +// Stable-store keys are stored as plain Go strings (string([]byte)). All keys +// used by hashicorp/raft are valid UTF-8, which is a requirement of the JSON +// object-key encoding. The []byte values are base64-encoded automatically by +// encoding/json. +type persistedData struct { + State types.PersistentState `json:"state"` + Stable map[string][]byte `json:"stable"` +} + +// FileMetaDB implements types.MetaStore with a single pretty-printed JSON file +// written via an atomic rename sequence: +// +// 1. Serialise all state to .tmp +// 2. fsync the temp file (data durable) +// 3. rename(tmp → final) (POSIX-atomic) +// 4. fsync the parent directory (rename durable) +// +// The entire dataset is kept in memory after Load returns, so read operations +// (GetStable) never touch the disk. Write operations (CommitState, SetStable) +// always flush the complete, consistent state in one shot. +// +// Because the file is plain JSON it can be inspected with ordinary text tools +// (cat, jq, …). +type FileMetaDB struct { + mu sync.RWMutex + dir string + state types.PersistentState + stable map[string][]byte // nil → not yet loaded / already closed +} + +// Load implements types.MetaStore. +// +// It is safe to call Load more than once with the same directory; subsequent +// calls are no-ops that return the current in-memory state. Calling Load with +// a different directory after a successful Load returns an error. +// +// If a BoltDB metadata file (wal-meta.db) is found in dir but no JSON file +// exists yet, Load returns an error rather than silently discarding the +// existing segment list (which would cause the WAL to delete all segment +// files on startup). Migrate the metadata first, or open the WAL with +// WithMetaStore(&metadb.BoltMetaDB{}) to continue using BoltDB. +func (db *FileMetaDB) Load(dir string) (types.PersistentState, error) { + db.mu.Lock() + defer db.mu.Unlock() + + // Already open from the same dir: return the cached state. + if db.stable != nil { + if db.dir != dir { + return types.PersistentState{}, fmt.Errorf( + "can't load dir %s, already open in dir %s", dir, db.dir) + } + return db.state, nil + } + + // Confirm the directory exists before touching any files inside it. + if _, err := os.Stat(dir); err != nil { + return types.PersistentState{}, err + } + + mainPath := filepath.Join(dir, FileMetaDBFileName) + + // Detect an existing BoltDB deployment. Silently returning an empty + // PersistentState here would cause the WAL to delete all segment files, + // so we fail loudly and direct the operator to migrate. + if _, err := os.Stat(filepath.Join(dir, metadb.FileName)); err == nil { + if _, err := os.Stat(mainPath); errors.Is(err, os.ErrNotExist) { + return types.PersistentState{}, fmt.Errorf( + "found existing BoltDB metadata file %q but no JSON metadata "+ + "file %q: migrate the metadata store before switching to "+ + "FileMetaDB, or open the WAL with "+ + "WithMetaStore(&metadb.BoltMetaDB{}) to keep using BoltDB", + metadb.FileName, FileMetaDBFileName) + } + } + + db.dir = dir + db.stable = make(map[string][]byte) + + // Remove any temp file left behind by a previously-crashed write. + os.Remove(filepath.Join(dir, FileMetaDBFileName+".tmp")) + + data, err := os.ReadFile(mainPath) + if errors.Is(err, os.ErrNotExist) { + // Fresh directory — caller receives a zero-value PersistentState and + // the WAL will initialise itself from scratch. + return db.state, nil + } + if err != nil { + db.dir, db.stable = "", nil + return types.PersistentState{}, fmt.Errorf( + "failed to read %s: %w", FileMetaDBFileName, err) + } + + var pd persistedData + if err := json.Unmarshal(data, &pd); err != nil { + db.dir, db.stable = "", nil + return types.PersistentState{}, fmt.Errorf( + "%w: failed to parse %s: %s", types.ErrCorrupt, FileMetaDBFileName, err) + } + + db.state = pd.State + if pd.Stable != nil { + db.stable = pd.Stable + } + return db.state, nil +} + +// CommitState implements types.MetaStore. +func (db *FileMetaDB) CommitState(state types.PersistentState) error { + db.mu.Lock() + defer db.mu.Unlock() + + if db.stable == nil { + return metadb.ErrUnintialized + } + db.state = state + return db.persist() +} + +// GetStable implements types.MetaStore. Safe for concurrent use with all other +// methods. +func (db *FileMetaDB) GetStable(key []byte) ([]byte, error) { + db.mu.RLock() + defer db.mu.RUnlock() + + if db.stable == nil { + return nil, metadb.ErrUnintialized + } + val := db.stable[string(key)] + if val == nil { + return nil, nil + } + // Return an independent copy: the caller must not be able to mutate the + // in-memory stable store through the returned slice. + cp := make([]byte, len(val)) + copy(cp, val) + return cp, nil +} + +// SetStable implements types.MetaStore. Safe for concurrent use with all other +// methods. +func (db *FileMetaDB) SetStable(key []byte, value []byte) error { + db.mu.Lock() + defer db.mu.Unlock() + + if db.stable == nil { + return metadb.ErrUnintialized + } + if value == nil { + delete(db.stable, string(key)) + } else { + // Store a defensive copy so that later mutations of the caller's slice + // don't silently corrupt our in-memory state. + cp := make([]byte, len(value)) + copy(cp, value) + db.stable[string(key)] = cp + } + return db.persist() +} + +// Close implements io.Closer. +func (db *FileMetaDB) Close() error { + db.mu.Lock() + defer db.mu.Unlock() + db.dir = "" + db.stable = nil + db.state = types.PersistentState{} + return nil +} + +// persist serialises the complete in-memory state and writes it to disk via +// an atomic rename. It must be called with db.mu held for writing. +// +// The four-step sequence provides the following crash guarantees: +// - Crash before step 1: old file (if any) is still valid. +// - Crash during step 1 or 2: temp file is incomplete; old file is intact. +// The stale temp file is removed on the next Load. +// - Crash during step 3: POSIX rename(2) is atomic — either the old name or +// the new name is visible; both refer to a fully-written, fsynced file. +// - Crash after step 3: new file is durable; step 4 may need to be +// replayed by the OS journal but the data itself is safe. +func (db *FileMetaDB) persist() error { + data, err := json.Marshal(persistedData{ + State: db.state, + Stable: db.stable, + }) + if err != nil { + return fmt.Errorf("failed to encode metadata: %w", err) + } + + tmpPath := filepath.Join(db.dir, FileMetaDBFileName+".tmp") + mainPath := filepath.Join(db.dir, FileMetaDBFileName) + + // Step 1: write new state to a temp file. + f, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + if err != nil { + return fmt.Errorf("failed to create temp metadata file: %w", err) + } + if _, err := f.Write(data); err != nil { + f.Close() + os.Remove(tmpPath) + return fmt.Errorf("failed to write metadata: %w", err) + } + + // Step 2: fsync the temp file so the bytes are durable before we rename. + if err := f.Sync(); err != nil { + f.Close() + os.Remove(tmpPath) + return fmt.Errorf("failed to sync metadata file: %w", err) + } + if err := f.Close(); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("failed to close temp metadata file: %w", err) + } + + // Step 3: atomically replace the canonical file. + if err := os.Rename(tmpPath, mainPath); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("failed to commit metadata file: %w", err) + } + + // Step 4: fsync the parent directory so the rename directory entry is + // durable. Without this a crash could leave the directory pointing to the + // old file even though the new one was fully written. + dirF, err := os.Open(db.dir) + if err != nil { + return fmt.Errorf("failed to open dir for sync: %w", err) + } + syncErr := dirF.Sync() + closeErr := dirF.Close() + if syncErr != nil { + return fmt.Errorf("failed to sync directory: %w", syncErr) + } + return closeErr +} diff --git a/helper/raftwalmetadata/raft_wal_file_metadata_test.go b/helper/raftwalmetadata/raft_wal_file_metadata_test.go new file mode 100644 index 00000000000..691ecd90b55 --- /dev/null +++ b/helper/raftwalmetadata/raft_wal_file_metadata_test.go @@ -0,0 +1,301 @@ +// Copyright IBM Corp. 2020, 2025 +// SPDX-License-Identifier: BUSL-1.1 + +package raftwalmetadata + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/raft-wal/metadb" + "github.com/hashicorp/raft-wal/types" + "github.com/shoenig/test/must" +) + +func makeState(nSegs int) *types.PersistentState { + startIdx := 1000 + perSegment := 100 + startID := 1234 + + startTime := time.Now().UTC().Round(time.Second).Add(time.Duration(-1*nSegs) * time.Minute) + + state := &types.PersistentState{ + NextSegmentID: uint64(startID + nSegs), + } + + for i := 0; i < (nSegs - 1); i++ { + si := types.SegmentInfo{ + ID: uint64(startID + i), + BaseIndex: uint64(startIdx + (i * perSegment)), + MinIndex: uint64(startIdx + (i * perSegment)), + MaxIndex: uint64(startIdx + ((i + 1) * perSegment) - 1), + Codec: 1, + IndexStart: 123456, + CreateTime: startTime.Add(time.Duration(i) * time.Minute), + SealTime: startTime.Add(time.Duration(i+1) * time.Minute), + SizeLimit: 64 * 1024 * 1024, + } + state.Segments = append(state.Segments, si) + } + if nSegs > 0 { + // Append an unsealed tail + i := nSegs - 1 + si := types.SegmentInfo{ + ID: uint64(startID + i), + BaseIndex: uint64(startIdx + (i * perSegment)), + MinIndex: uint64(startIdx + (i * perSegment)), + Codec: 1, + CreateTime: startTime.Add(time.Duration(i) * time.Minute), + SizeLimit: 64 * 1024 * 1024, + } + state.Segments = append(state.Segments, si) + } + return state +} + +func TestFileMetaDB(t *testing.T) { + cases := []struct { + name string + writeState *types.PersistentState + writeStable map[string][]byte + }{ + { + name: "basic storage", + writeState: makeState(4), + writeStable: map[string][]byte{ + "CurrentTerm": {0, 0, 0, 0, 0, 0, 0, 5}, + "LastVoteTerm": {0, 0, 0, 0, 0, 0, 0, 5}, + "LastVoteCand": []byte("server1"), + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + { + // Open a fresh DB, confirm it's empty, write state and stable values. + var db FileMetaDB + gotState, err := db.Load(tmpDir) + must.NoError(t, err) + defer db.Close() + + must.Eq(t, 0, int(gotState.NextSegmentID)) + must.SliceEmpty(t, gotState.Segments) + + if tc.writeState != nil { + must.NoError(t, db.CommitState(*tc.writeState)) + } + for k, v := range tc.writeStable { + must.NoError(t, db.SetStable([]byte(k), v)) + } + + // Close and re-open to verify persistence. + db.Close() + } + + var db FileMetaDB + gotState, err := db.Load(tmpDir) + must.NoError(t, err) + defer db.Close() + + must.Eq(t, *tc.writeState, gotState) + + for k, v := range tc.writeStable { + got, err := db.GetStable([]byte(k)) + must.NoError(t, err) + must.Eq(t, v, got) + } + }) + } +} + +func TestFileMetaDBErrors(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + var db FileMetaDB + + // Calling anything before Load is an error. + must.ErrorIs(t, db.CommitState(types.PersistentState{NextSegmentID: 1234}), metadb.ErrUnintialized) + + _, err = db.GetStable([]byte("foo")) + must.ErrorIs(t, err, metadb.ErrUnintialized) + + err = db.SetStable([]byte("foo"), []byte("bar")) + must.ErrorIs(t, err, metadb.ErrUnintialized) + + // Loading twice from the same dir is OK. + _, err = db.Load(tmpDir) + must.NoError(t, err) + _, err = db.Load(tmpDir) + must.NoError(t, err) + + // Loading from a different dir is not. + tmpDir2, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir2) + + _, err = db.Load(tmpDir2) + must.ErrorContains(t, err, "already open in dir") + + // Loading from a non-existent dir is an error. + var db2 FileMetaDB + _, err = db2.Load("fake-dir-that-does-not-exist") + must.ErrorContains(t, err, "no such file or directory") +} + +// TestFileMetaDBRoundTrip verifies that Close followed by Load correctly +// reloads all state from disk. +func TestFileMetaDBRoundTrip(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + state := makeState(3) + + var db FileMetaDB + _, err = db.Load(tmpDir) + must.NoError(t, err) + must.NoError(t, db.CommitState(*state)) + must.NoError(t, db.SetStable([]byte("CurrentTerm"), []byte{0, 0, 0, 0, 0, 0, 0, 7})) + db.Close() + + // Re-open and verify both halves round-tripped correctly. + var db2 FileMetaDB + got, err := db2.Load(tmpDir) + must.NoError(t, err) + defer db2.Close() + must.Eq(t, *state, got) + + term, err := db2.GetStable([]byte("CurrentTerm")) + must.NoError(t, err) + must.Eq(t, []byte{0, 0, 0, 0, 0, 0, 0, 7}, term) +} + +// TestFileMetaDBSetStableDelete checks that passing nil to SetStable removes +// the key, and that GetStable on a missing key returns nil without error. +func TestFileMetaDBSetStableDelete(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + var db FileMetaDB + _, err = db.Load(tmpDir) + must.NoError(t, err) + defer db.Close() + + key := []byte("mykey") + + // Key absent → nil, no error. + got, err := db.GetStable(key) + must.NoError(t, err) + must.Nil(t, got) + + // Write then read back. + must.NoError(t, db.SetStable(key, []byte("myvalue"))) + got, err = db.GetStable(key) + must.NoError(t, err) + must.Eq(t, []byte("myvalue"), got) + + // Delete by passing nil. + must.NoError(t, db.SetStable(key, nil)) + got, err = db.GetStable(key) + must.NoError(t, err) + must.Nil(t, got) + + // Deletion must be persisted across a close/reopen. + db.Close() + var db2 FileMetaDB + _, err = db2.Load(tmpDir) + must.NoError(t, err) + defer db2.Close() + got, err = db2.GetStable(key) + must.NoError(t, err) + must.Nil(t, got) +} + +// TestFileMetaDBBoltDetection verifies that Load returns an informative error +// when a wal-meta.db file exists but wal-meta.json does not, preventing silent +// data loss during a BoltDB → FileMetaDB migration. +func TestFileMetaDBBoltDetection(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Simulate an existing BoltDB deployment. + must.NoError(t, os.WriteFile( + filepath.Join(tmpDir, metadb.FileName), []byte("fake bolt data"), 0o644, + )) + + var db FileMetaDB + _, err = db.Load(tmpDir) + must.Error(t, err) + must.ErrorContains(t, err, "BoltDB metadata file") + must.ErrorContains(t, err, "migrate") + + // Once wal-meta.json also exists the error must not fire (both files can + // legitimately coexist right after a migration before the old file is + // cleaned up). + must.NoError(t, os.WriteFile( + filepath.Join(tmpDir, FileMetaDBFileName), []byte(`{"state":{},"stable":{}}`), 0o644, + )) + var db2 FileMetaDB + _, err = db2.Load(tmpDir) + must.NoError(t, err) + db2.Close() +} + +// TestFileMetaDBGetStableIsolation verifies that the slice returned by +// GetStable is an independent copy: mutating it must not affect stored state. +func TestFileMetaDBGetStableIsolation(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + var db FileMetaDB + _, err = db.Load(tmpDir) + must.NoError(t, err) + defer db.Close() + + original := []byte{1, 2, 3} + must.NoError(t, db.SetStable([]byte("k"), original)) + + got, err := db.GetStable([]byte("k")) + must.NoError(t, err) + must.Eq(t, original, got) + + // Mutating the returned slice must not affect the stored value. + got[0] = 99 + got2, err := db.GetStable([]byte("k")) + must.NoError(t, err) + must.Eq(t, original, got2) +} + +// TestFileMetaDBStaleTempCleanup verifies that a leftover .tmp file from a +// previously-crashed write does not prevent Load from succeeding. +func TestFileMetaDBStaleTempCleanup(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "raft-wal-file-test-*") + must.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Simulate a stale temp file (e.g. crash between write and rename). + stale := filepath.Join(tmpDir, FileMetaDBFileName+".tmp") + must.NoError(t, os.WriteFile(stale, []byte("incomplete garbage"), 0o644)) + + var db FileMetaDB + _, err = db.Load(tmpDir) + must.NoError(t, err) // must not fail + db.Close() + + // The stale file must have been removed. + _, err = os.Stat(stale) + must.ErrorIs(t, err, os.ErrNotExist) +} diff --git a/nomad/server.go b/nomad/server.go index 987efee78b6..d25bd29b303 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/nomad/helper/goruntime" "github.com/hashicorp/nomad/helper/group" "github.com/hashicorp/nomad/helper/pool" + "github.com/hashicorp/nomad/helper/raftwalmetadata" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/lib/auth/oidc" "github.com/hashicorp/nomad/nomad/auth" @@ -1610,6 +1611,7 @@ func (s *Server) openRaftWAL(dir string) (*raftwal.WAL, error) { raftwal.WithLogger(s.logger.Named("wal")), raftwal.WithSegmentSize(s.config.RaftLogStoreConfig.WALSegmentSize), raftwal.WithMetricsCollector(mc), + raftwal.WithMetaStore(&raftwalmetadata.FileMetaDB{}), ) if err != nil { return nil, err