From bd54d6f1f916d93ac5bbb6c3a934e0f19c7bf93d Mon Sep 17 00:00:00 2001 From: Geoff Greer Date: Wed, 31 Dec 2025 14:15:41 -0800 Subject: [PATCH] Default to exclusive locks on c1zs. Except for compaction, nothing should be writing to the same c1 file at the same time. Also improve some of the compaction tests. Now they insert data and validate the results. --- pkg/dotc1z/c1file.go | 16 ++++ pkg/dotc1z/session_store_test.go | 2 +- pkg/synccompactor/attached/attached_test.go | 86 +++++++++++++++---- .../attached/comprehensive_test.go | 7 +- pkg/synccompactor/attached/timestamp_test.go | 10 ++- pkg/synccompactor/compactor.go | 11 +-- 6 files changed, 102 insertions(+), 30 deletions(-) diff --git a/pkg/dotc1z/c1file.go b/pkg/dotc1z/c1file.go index 548968ce6..ee1f16bfc 100644 --- a/pkg/dotc1z/c1file.go +++ b/pkg/dotc1z/c1file.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "time" @@ -313,6 +314,21 @@ func (c *C1File) init(ctx context.Context) error { } } + hasLockingPragma := false + for _, pragma := range c.pragmas { + pragmaName := strings.ToLower(pragma.name) + if pragmaName == "main.locking_mode" || pragmaName == "locking_mode" { + hasLockingPragma = true + break + } + } + if !hasLockingPragma { + _, err = c.db.ExecContext(ctx, "PRAGMA main.locking_mode = EXCLUSIVE") + if err != nil { + return err + } + } + for _, pragma := range c.pragmas { _, err := c.db.ExecContext(ctx, fmt.Sprintf("PRAGMA %s = %s", pragma.name, pragma.value)) if err != nil { diff --git a/pkg/dotc1z/session_store_test.go b/pkg/dotc1z/session_store_test.go index 4131301f8..8d78d202c 100644 --- a/pkg/dotc1z/session_store_test.go +++ b/pkg/dotc1z/session_store_test.go @@ -1690,7 +1690,7 @@ func TestC1FileSessionStore_ConcurrentAccess(t *testing.T) { ctx := context.Background() tempDir := filepath.Join(t.TempDir(), "test-session.c1z") - c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL")) + c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL"), WithPragma("main.locking_mode", "NORMAL")) require.NoError(t, err) defer c1zFile.Close() diff --git a/pkg/synccompactor/attached/attached_test.go b/pkg/synccompactor/attached/attached_test.go index e11997ea2..62b9f1189 100644 --- a/pkg/synccompactor/attached/attached_test.go +++ b/pkg/synccompactor/attached/attached_test.go @@ -3,15 +3,56 @@ package attached import ( "context" "path/filepath" + "slices" "testing" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" "github.com/conductorone/baton-sdk/pkg/connectorstore" "github.com/conductorone/baton-sdk/pkg/dotc1z" "github.com/stretchr/testify/require" ) +func putTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) { + err := db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "user", DisplayName: "User"}.Build()) + require.NoError(t, err) + err = db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "group", DisplayName: "Group"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build(), DisplayName: "User1"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build(), DisplayName: "User2"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "group1"}.Build(), DisplayName: "Group1"}.Build()) + require.NoError(t, err) + err = db.PutEntitlements(ctx, v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build()) + require.NoError(t, err) + err = db.PutGrants(ctx, v2.Grant_builder{ + Id: "group1:member:user1", + Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build()}.Build(), + Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(), + }.Build()) + require.NoError(t, err) + err = db.PutGrants(ctx, v2.Grant_builder{ + Id: "group1:member:user2", Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build()}.Build(), + Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(), + }.Build()) + require.NoError(t, err) +} + +func verifyTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) { + usersResp, err := db.ListResources(ctx, v2.ResourcesServiceListResourcesRequest_builder{ + ResourceTypeId: "user", + }.Build()) + require.NoError(t, err) + require.NotNil(t, usersResp) + users := usersResp.GetList() + require.Equal(t, 2, len(users)) + require.Equal(t, "user1", users[0].GetId().GetResource()) + require.Equal(t, "user2", users[1].GetId().GetResource()) +} + func TestAttachedCompactor(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -19,7 +60,6 @@ func TestAttachedCompactor(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -36,7 +76,8 @@ func TestAttachedCompactor(t *testing.T) { require.NoError(t, err) // Create applied database with some test data - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -44,6 +85,8 @@ func TestAttachedCompactor(t *testing.T) { _, err = appliedDB.StartNewSync(ctx, connectorstore.SyncTypePartial, "") require.NoError(t, err) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -51,13 +94,18 @@ func TestAttachedCompactor(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) - // Verify that compaction completed without errors - // The actual verification would depend on having test data, - // but this test verifies that the compaction workflow works + baseSync, err := baseDB.GetLatestFinishedSync(ctx, reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest_builder{ + SyncType: string(connectorstore.SyncTypeAny), + }.Build()) + require.NoError(t, err) + require.NotNil(t, baseSync) + require.Equal(t, connectorstore.SyncTypeFull, connectorstore.SyncType(baseSync.GetSync().GetSyncType())) + + verifyTestData(ctx, t, baseDB) } func TestAttachedCompactorMixedSyncTypes(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -65,7 +113,6 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -83,7 +130,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { require.NoError(t, err) // Create applied database with an incremental sync - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -92,6 +140,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, appliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -99,15 +149,11 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) - // Verify that compaction completed without errors - // This test specifically verifies that: - // - Base full sync was correctly identified - // - Applied incremental sync was correctly identified and used - // - The compaction worked with mixed sync types + verifyTestData(ctx, t, baseDB) } func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -115,7 +161,6 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -132,7 +177,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) // Create applied database with multiple syncs of different types - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -141,6 +187,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, firstAppliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -149,6 +197,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, secondAppliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -156,6 +206,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) + verifyTestData(ctx, t, baseDB) + // Verify that compaction completed without errors // This test verifies that the latest sync (incremental) was used from applied // even though there was an earlier full sync diff --git a/pkg/synccompactor/attached/comprehensive_test.go b/pkg/synccompactor/attached/comprehensive_test.go index 29fba83f2..721d06b54 100644 --- a/pkg/synccompactor/attached/comprehensive_test.go +++ b/pkg/synccompactor/attached/comprehensive_test.go @@ -1,8 +1,8 @@ package attached import ( - "context" "path/filepath" + "slices" "testing" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" @@ -19,7 +19,7 @@ import ( // 4. Overlapping data where applied is newer // It also verifies that all data ends up with the correct destination sync ID. func TestAttachedCompactorComprehensiveScenarios(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -122,7 +122,8 @@ func TestAttachedCompactorComprehensiveScenarios(t *testing.T) { require.NoError(t, err) // ========= Create applied database ========= - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() diff --git a/pkg/synccompactor/attached/timestamp_test.go b/pkg/synccompactor/attached/timestamp_test.go index 60eba22dd..2d96783dd 100644 --- a/pkg/synccompactor/attached/timestamp_test.go +++ b/pkg/synccompactor/attached/timestamp_test.go @@ -1,8 +1,8 @@ package attached import ( - "context" "path/filepath" + "slices" "testing" "time" @@ -16,7 +16,7 @@ import ( // TestDiscoveredAtMergeLogic specifically tests the discovered_at timestamp comparison // by creating two separate scenarios with controlled timing. func TestDiscoveredAtMergeLogic(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Test Case 1: Applied is newer (natural case) t.Run("AppliedNewer", func(t *testing.T) { @@ -59,7 +59,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) { time.Sleep(10 * time.Millisecond) // Create applied database (newer timestamps) - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -107,7 +108,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) { } // Create applied database first (will have older timestamps) - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() diff --git a/pkg/synccompactor/compactor.go b/pkg/synccompactor/compactor.go index 017c22ef3..d4bc108cc 100644 --- a/pkg/synccompactor/compactor.go +++ b/pkg/synccompactor/compactor.go @@ -159,7 +159,12 @@ func (c *Compactor) Compact(ctx context.Context) (*CompactableSync, error) { l.Error("doOneCompaction failed: could not create c1z file", zap.Error(err)) return nil, err } - defer func() { _ = c.compactedC1z.Close() }() + defer func() { + err := c.compactedC1z.Close() + if err != nil { + l.Error("compactor: error closing compacted c1z", zap.Error(err), zap.String("compacted_c1z_file", destFilePath)) + } + }() newSyncId, err := c.compactedC1z.StartNewSync(ctx, connectorstore.SyncTypePartial, "") if err != nil { return nil, fmt.Errorf("failed to start new sync: %w", err) @@ -287,10 +292,6 @@ func (c *Compactor) doOneCompaction(ctx context.Context, cs *CompactableSync) er dotc1z.WithTmpDir(c.tmpDir), dotc1z.WithDecoderOptions(dotc1z.WithDecoderConcurrency(-1)), dotc1z.WithReadOnly(true), - // We're only reading, so it's safe to use these pragmas. - dotc1z.WithPragma("synchronous", "OFF"), - dotc1z.WithPragma("journal_mode", "OFF"), - dotc1z.WithPragma("locking_mode", "EXCLUSIVE"), ) if err != nil { return err