diff --git a/pkg/dotc1z/c1file.go b/pkg/dotc1z/c1file.go index 548968ce6..ee1f16bfc 100644 --- a/pkg/dotc1z/c1file.go +++ b/pkg/dotc1z/c1file.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "time" @@ -313,6 +314,21 @@ func (c *C1File) init(ctx context.Context) error { } } + hasLockingPragma := false + for _, pragma := range c.pragmas { + pragmaName := strings.ToLower(pragma.name) + if pragmaName == "main.locking_mode" || pragmaName == "locking_mode" { + hasLockingPragma = true + break + } + } + if !hasLockingPragma { + _, err = c.db.ExecContext(ctx, "PRAGMA main.locking_mode = EXCLUSIVE") + if err != nil { + return err + } + } + for _, pragma := range c.pragmas { _, err := c.db.ExecContext(ctx, fmt.Sprintf("PRAGMA %s = %s", pragma.name, pragma.value)) if err != nil { diff --git a/pkg/dotc1z/session_store_test.go b/pkg/dotc1z/session_store_test.go index 4131301f8..8d78d202c 100644 --- a/pkg/dotc1z/session_store_test.go +++ b/pkg/dotc1z/session_store_test.go @@ -1690,7 +1690,7 @@ func TestC1FileSessionStore_ConcurrentAccess(t *testing.T) { ctx := context.Background() tempDir := filepath.Join(t.TempDir(), "test-session.c1z") - c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL")) + c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL"), WithPragma("main.locking_mode", "NORMAL")) require.NoError(t, err) defer c1zFile.Close() diff --git a/pkg/synccompactor/attached/attached_test.go b/pkg/synccompactor/attached/attached_test.go index e11997ea2..62b9f1189 100644 --- a/pkg/synccompactor/attached/attached_test.go +++ b/pkg/synccompactor/attached/attached_test.go @@ -3,15 +3,56 @@ package attached import ( "context" "path/filepath" + "slices" "testing" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" "github.com/conductorone/baton-sdk/pkg/connectorstore" "github.com/conductorone/baton-sdk/pkg/dotc1z" "github.com/stretchr/testify/require" ) +func putTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) { + err := db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "user", DisplayName: "User"}.Build()) + require.NoError(t, err) + err = db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "group", DisplayName: "Group"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build(), DisplayName: "User1"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build(), DisplayName: "User2"}.Build()) + require.NoError(t, err) + err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "group1"}.Build(), DisplayName: "Group1"}.Build()) + require.NoError(t, err) + err = db.PutEntitlements(ctx, v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build()) + require.NoError(t, err) + err = db.PutGrants(ctx, v2.Grant_builder{ + Id: "group1:member:user1", + Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build()}.Build(), + Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(), + }.Build()) + require.NoError(t, err) + err = db.PutGrants(ctx, v2.Grant_builder{ + Id: "group1:member:user2", Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build()}.Build(), + Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(), + }.Build()) + require.NoError(t, err) +} + +func verifyTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) { + usersResp, err := db.ListResources(ctx, v2.ResourcesServiceListResourcesRequest_builder{ + ResourceTypeId: "user", + }.Build()) + require.NoError(t, err) + require.NotNil(t, usersResp) + users := usersResp.GetList() + require.Equal(t, 2, len(users)) + require.Equal(t, "user1", users[0].GetId().GetResource()) + require.Equal(t, "user2", users[1].GetId().GetResource()) +} + func TestAttachedCompactor(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -19,7 +60,6 @@ func TestAttachedCompactor(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -36,7 +76,8 @@ func TestAttachedCompactor(t *testing.T) { require.NoError(t, err) // Create applied database with some test data - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -44,6 +85,8 @@ func TestAttachedCompactor(t *testing.T) { _, err = appliedDB.StartNewSync(ctx, connectorstore.SyncTypePartial, "") require.NoError(t, err) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -51,13 +94,18 @@ func TestAttachedCompactor(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) - // Verify that compaction completed without errors - // The actual verification would depend on having test data, - // but this test verifies that the compaction workflow works + baseSync, err := baseDB.GetLatestFinishedSync(ctx, reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest_builder{ + SyncType: string(connectorstore.SyncTypeAny), + }.Build()) + require.NoError(t, err) + require.NotNil(t, baseSync) + require.Equal(t, connectorstore.SyncTypeFull, connectorstore.SyncType(baseSync.GetSync().GetSyncType())) + + verifyTestData(ctx, t, baseDB) } func TestAttachedCompactorMixedSyncTypes(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -65,7 +113,6 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -83,7 +130,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { require.NoError(t, err) // Create applied database with an incremental sync - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -92,6 +140,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, appliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -99,15 +149,11 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) - // Verify that compaction completed without errors - // This test specifically verifies that: - // - Base full sync was correctly identified - // - Applied incremental sync was correctly identified and used - // - The compaction worked with mixed sync types + verifyTestData(ctx, t, baseDB) } func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -115,7 +161,6 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { appliedFile := filepath.Join(tmpDir, "applied.c1z") opts := []dotc1z.C1ZOption{ - dotc1z.WithPragma("journal_mode", "WAL"), dotc1z.WithTmpDir(tmpDir), } @@ -132,7 +177,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) // Create applied database with multiple syncs of different types - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -141,6 +187,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, firstAppliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -149,6 +197,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, secondAppliedSyncID) + putTestData(ctx, t, appliedDB) + err = appliedDB.EndSync(ctx) require.NoError(t, err) @@ -156,6 +206,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) { err = compactor.Compact(ctx) require.NoError(t, err) + verifyTestData(ctx, t, baseDB) + // Verify that compaction completed without errors // This test verifies that the latest sync (incremental) was used from applied // even though there was an earlier full sync diff --git a/pkg/synccompactor/attached/comprehensive_test.go b/pkg/synccompactor/attached/comprehensive_test.go index 29fba83f2..721d06b54 100644 --- a/pkg/synccompactor/attached/comprehensive_test.go +++ b/pkg/synccompactor/attached/comprehensive_test.go @@ -1,8 +1,8 @@ package attached import ( - "context" "path/filepath" + "slices" "testing" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" @@ -19,7 +19,7 @@ import ( // 4. Overlapping data where applied is newer // It also verifies that all data ends up with the correct destination sync ID. func TestAttachedCompactorComprehensiveScenarios(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Create temporary files for base, applied, and dest databases tmpDir := t.TempDir() @@ -122,7 +122,8 @@ func TestAttachedCompactorComprehensiveScenarios(t *testing.T) { require.NoError(t, err) // ========= Create applied database ========= - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() diff --git a/pkg/synccompactor/attached/timestamp_test.go b/pkg/synccompactor/attached/timestamp_test.go index 60eba22dd..2d96783dd 100644 --- a/pkg/synccompactor/attached/timestamp_test.go +++ b/pkg/synccompactor/attached/timestamp_test.go @@ -1,8 +1,8 @@ package attached import ( - "context" "path/filepath" + "slices" "testing" "time" @@ -16,7 +16,7 @@ import ( // TestDiscoveredAtMergeLogic specifically tests the discovered_at timestamp comparison // by creating two separate scenarios with controlled timing. func TestDiscoveredAtMergeLogic(t *testing.T) { - ctx := context.Background() + ctx := t.Context() // Test Case 1: Applied is newer (natural case) t.Run("AppliedNewer", func(t *testing.T) { @@ -59,7 +59,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) { time.Sleep(10 * time.Millisecond) // Create applied database (newer timestamps) - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() @@ -107,7 +108,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) { } // Create applied database first (will have older timestamps) - appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...) + appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal")) + appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...) require.NoError(t, err) defer appliedDB.Close() diff --git a/pkg/synccompactor/compactor.go b/pkg/synccompactor/compactor.go index 017c22ef3..d4bc108cc 100644 --- a/pkg/synccompactor/compactor.go +++ b/pkg/synccompactor/compactor.go @@ -159,7 +159,12 @@ func (c *Compactor) Compact(ctx context.Context) (*CompactableSync, error) { l.Error("doOneCompaction failed: could not create c1z file", zap.Error(err)) return nil, err } - defer func() { _ = c.compactedC1z.Close() }() + defer func() { + err := c.compactedC1z.Close() + if err != nil { + l.Error("compactor: error closing compacted c1z", zap.Error(err), zap.String("compacted_c1z_file", destFilePath)) + } + }() newSyncId, err := c.compactedC1z.StartNewSync(ctx, connectorstore.SyncTypePartial, "") if err != nil { return nil, fmt.Errorf("failed to start new sync: %w", err) @@ -287,10 +292,6 @@ func (c *Compactor) doOneCompaction(ctx context.Context, cs *CompactableSync) er dotc1z.WithTmpDir(c.tmpDir), dotc1z.WithDecoderOptions(dotc1z.WithDecoderConcurrency(-1)), dotc1z.WithReadOnly(true), - // We're only reading, so it's safe to use these pragmas. - dotc1z.WithPragma("synchronous", "OFF"), - dotc1z.WithPragma("journal_mode", "OFF"), - dotc1z.WithPragma("locking_mode", "EXCLUSIVE"), ) if err != nil { return err