Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/dotc1z/c1file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -313,6 +314,21 @@ func (c *C1File) init(ctx context.Context) error {
}
}

hasLockingPragma := false
for _, pragma := range c.pragmas {
pragmaName := strings.ToLower(pragma.name)
if pragmaName == "main.locking_mode" || pragmaName == "locking_mode" {
hasLockingPragma = true
break
}
}
if !hasLockingPragma {
_, err = c.db.ExecContext(ctx, "PRAGMA main.locking_mode = EXCLUSIVE")
if err != nil {
return err
}
}

for _, pragma := range c.pragmas {
_, err := c.db.ExecContext(ctx, fmt.Sprintf("PRAGMA %s = %s", pragma.name, pragma.value))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/dotc1z/session_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ func TestC1FileSessionStore_ConcurrentAccess(t *testing.T) {
ctx := context.Background()
tempDir := filepath.Join(t.TempDir(), "test-session.c1z")

c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL"))
c1zFile, err := NewC1ZFile(ctx, tempDir, WithPragma("journal_mode", "WAL"), WithPragma("main.locking_mode", "NORMAL"))
require.NoError(t, err)
defer c1zFile.Close()

Expand Down
86 changes: 69 additions & 17 deletions pkg/synccompactor/attached/attached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,63 @@ package attached
import (
"context"
"path/filepath"
"slices"
"testing"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2"
"github.com/conductorone/baton-sdk/pkg/connectorstore"
"github.com/conductorone/baton-sdk/pkg/dotc1z"
"github.com/stretchr/testify/require"
)

func putTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) {
err := db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "user", DisplayName: "User"}.Build())
require.NoError(t, err)
err = db.PutResourceTypes(ctx, v2.ResourceType_builder{Id: "group", DisplayName: "Group"}.Build())
require.NoError(t, err)
err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build(), DisplayName: "User1"}.Build())
require.NoError(t, err)
err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build(), DisplayName: "User2"}.Build())
require.NoError(t, err)
err = db.PutResources(ctx, v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "group1"}.Build(), DisplayName: "Group1"}.Build())
require.NoError(t, err)
err = db.PutEntitlements(ctx, v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build())
require.NoError(t, err)
err = db.PutGrants(ctx, v2.Grant_builder{
Id: "group1:member:user1",
Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user1"}.Build()}.Build(),
Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(),
}.Build())
require.NoError(t, err)
err = db.PutGrants(ctx, v2.Grant_builder{
Id: "group1:member:user2", Principal: v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "user2"}.Build()}.Build(),
Entitlement: v2.Entitlement_builder{Id: "group1:member", DisplayName: "Member"}.Build(),
}.Build())
require.NoError(t, err)
}

func verifyTestData(ctx context.Context, t *testing.T, db *dotc1z.C1File) {
usersResp, err := db.ListResources(ctx, v2.ResourcesServiceListResourcesRequest_builder{
ResourceTypeId: "user",
}.Build())
require.NoError(t, err)
require.NotNil(t, usersResp)
users := usersResp.GetList()
require.Equal(t, 2, len(users))
require.Equal(t, "user1", users[0].GetId().GetResource())
require.Equal(t, "user2", users[1].GetId().GetResource())
}

func TestAttachedCompactor(t *testing.T) {
ctx := context.Background()
ctx := t.Context()

// Create temporary files for base, applied, and dest databases
tmpDir := t.TempDir()
baseFile := filepath.Join(tmpDir, "base.c1z")
appliedFile := filepath.Join(tmpDir, "applied.c1z")

opts := []dotc1z.C1ZOption{
dotc1z.WithPragma("journal_mode", "WAL"),
dotc1z.WithTmpDir(tmpDir),
}

Expand All @@ -36,36 +76,43 @@ func TestAttachedCompactor(t *testing.T) {
require.NoError(t, err)

// Create applied database with some test data
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
Comment on lines +79 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for NewAttachedCompactor usage
echo "=== NewAttachedCompactor usage in production code ==="
rg -n 'NewAttachedCompactor' --type go -g '!*test*'

echo -e "\n=== All NewAttachedCompactor calls (including tests) ==="
rg -n 'NewAttachedCompactor' --type go -B2 -A5

echo -e "\n=== NewC1ZFile calls in production (non-test) code ==="
rg -n 'NewC1ZFile' --type go -g '!*test*' -B2 -A2

echo -e "\n=== Search for appliedOpts or similar pattern in production ==="
rg -n 'appliedDB.*NewC1ZFile|appliedOpts' --type go -g '!*test*' -B3 -A3

Repository: ConductorOne/baton-sdk

Length of output: 8036


🏁 Script executed:

#!/bin/bash
echo "=== compactor.go lines 150-170 (where compactedC1z is created) ==="
sed -n '150,170p' pkg/synccompactor/compactor.go

echo -e "\n=== compactor.go lines 280-320 (where applyFile is created and compactor used) ==="
sed -n '280,320p' pkg/synccompactor/compactor.go

echo -e "\n=== Check how opts is defined in doOneCompaction ==="
rg -n 'opts\s*:=|opts\s*=' pkg/synccompactor/compactor.go -B2 -A2 | head -40

Repository: ConductorOne/baton-sdk

Length of output: 2193


🏁 Script executed:

#!/bin/bash
echo "=== Search for WithReadOnly in c1file.go ==="
rg -n 'WithReadOnly|readOnly' pkg/dotc1z/c1file.go -B3 -A3

echo -e "\n=== Check c1file.go around journal_mode and locking_mode pragmas ==="
rg -n 'journal_mode|locking_mode' pkg/dotc1z/c1file.go -B2 -A2

Repository: ConductorOne/baton-sdk

Length of output: 2329


🏁 Script executed:

#!/bin/bash
echo "=== attached.go Compact method and ATTACH usage ==="
cat -n pkg/synccompactor/attached/attached.go | head -100

Repository: ConductorOne/baton-sdk

Length of output: 3823


🏁 Script executed:

#!/bin/bash
echo "=== Search for comments around locking or ATTACH in compactor.go ==="
sed -n '280,320p' pkg/synccompactor/compactor.go | head -50

echo -e "\n=== Check if there's any issue or TODO about locking_mode ==="
rg -n -i 'locking|attach.*locking|exclusive.*normal' pkg/synccompactor/ --type go -B2 -A2

Repository: ConductorOne/baton-sdk

Length of output: 4644


Production code should either apply the same locking_mode override to applied databases as tests do, or document why it's not needed.

The tests consistently override the applied database to use locking_mode=normal (lines 79-80, 133-134, 180-181, and in other test files), while the base database uses the default EXCLUSIVE mode (set at compactor.go:147). However, the production code in compactor.go:291-298 creates the applied database without this override, meaning it defaults to EXCLUSIVE locking mode just like the base database.

When NewAttachedCompactor performs the SQLite ATTACH operation (attached.go:50), both databases would have EXCLUSIVE locking in production but EXCLUSIVE and NORMAL in tests. This mismatch suggests either:

  1. The override is necessary for correct behavior during ATTACH operations and should be applied in production, or
  2. The applied database being ReadOnly makes the override unnecessary in practice, and tests are being overly cautious

Clarify the intent and ensure consistency between test and production behavior.

require.NoError(t, err)
defer appliedDB.Close()

// Start sync and add some applied data
_, err = appliedDB.StartNewSync(ctx, connectorstore.SyncTypePartial, "")
require.NoError(t, err)

putTestData(ctx, t, appliedDB)

err = appliedDB.EndSync(ctx)
require.NoError(t, err)

compactor := NewAttachedCompactor(baseDB, appliedDB)
err = compactor.Compact(ctx)
require.NoError(t, err)

// Verify that compaction completed without errors
// The actual verification would depend on having test data,
// but this test verifies that the compaction workflow works
baseSync, err := baseDB.GetLatestFinishedSync(ctx, reader_v2.SyncsReaderServiceGetLatestFinishedSyncRequest_builder{
SyncType: string(connectorstore.SyncTypeAny),
}.Build())
require.NoError(t, err)
require.NotNil(t, baseSync)
require.Equal(t, connectorstore.SyncTypeFull, connectorstore.SyncType(baseSync.GetSync().GetSyncType()))

verifyTestData(ctx, t, baseDB)
}

func TestAttachedCompactorMixedSyncTypes(t *testing.T) {
ctx := context.Background()
ctx := t.Context()

// Create temporary files for base, applied, and dest databases
tmpDir := t.TempDir()
baseFile := filepath.Join(tmpDir, "base.c1z")
appliedFile := filepath.Join(tmpDir, "applied.c1z")

opts := []dotc1z.C1ZOption{
dotc1z.WithPragma("journal_mode", "WAL"),
dotc1z.WithTmpDir(tmpDir),
}

Expand All @@ -83,7 +130,8 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) {
require.NoError(t, err)

// Create applied database with an incremental sync
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
require.NoError(t, err)
defer appliedDB.Close()

Expand All @@ -92,30 +140,27 @@ func TestAttachedCompactorMixedSyncTypes(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, appliedSyncID)

putTestData(ctx, t, appliedDB)

err = appliedDB.EndSync(ctx)
require.NoError(t, err)

compactor := NewAttachedCompactor(baseDB, appliedDB)
err = compactor.Compact(ctx)
require.NoError(t, err)

// Verify that compaction completed without errors
// This test specifically verifies that:
// - Base full sync was correctly identified
// - Applied incremental sync was correctly identified and used
// - The compaction worked with mixed sync types
verifyTestData(ctx, t, baseDB)
}

func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
ctx := context.Background()
ctx := t.Context()

// Create temporary files for base, applied, and dest databases
tmpDir := t.TempDir()
baseFile := filepath.Join(tmpDir, "base.c1z")
appliedFile := filepath.Join(tmpDir, "applied.c1z")

opts := []dotc1z.C1ZOption{
dotc1z.WithPragma("journal_mode", "WAL"),
dotc1z.WithTmpDir(tmpDir),
}

Expand All @@ -132,7 +177,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
require.NoError(t, err)

// Create applied database with multiple syncs of different types
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
require.NoError(t, err)
defer appliedDB.Close()

Expand All @@ -141,6 +187,8 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, firstAppliedSyncID)

putTestData(ctx, t, appliedDB)

err = appliedDB.EndSync(ctx)
require.NoError(t, err)

Expand All @@ -149,13 +197,17 @@ func TestAttachedCompactorUsesLatestAppliedSyncOfAnyType(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, secondAppliedSyncID)

putTestData(ctx, t, appliedDB)

err = appliedDB.EndSync(ctx)
require.NoError(t, err)

compactor := NewAttachedCompactor(baseDB, appliedDB)
err = compactor.Compact(ctx)
require.NoError(t, err)

verifyTestData(ctx, t, baseDB)

// Verify that compaction completed without errors
// This test verifies that the latest sync (incremental) was used from applied
// even though there was an earlier full sync
Expand Down
7 changes: 4 additions & 3 deletions pkg/synccompactor/attached/comprehensive_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package attached

import (
"context"
"path/filepath"
"slices"
"testing"

v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
Expand All @@ -19,7 +19,7 @@ import (
// 4. Overlapping data where applied is newer
// It also verifies that all data ends up with the correct destination sync ID.
func TestAttachedCompactorComprehensiveScenarios(t *testing.T) {
ctx := context.Background()
ctx := t.Context()

// Create temporary files for base, applied, and dest databases
tmpDir := t.TempDir()
Expand Down Expand Up @@ -122,7 +122,8 @@ func TestAttachedCompactorComprehensiveScenarios(t *testing.T) {
require.NoError(t, err)

// ========= Create applied database =========
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
require.NoError(t, err)
defer appliedDB.Close()

Expand Down
10 changes: 6 additions & 4 deletions pkg/synccompactor/attached/timestamp_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package attached

import (
"context"
"path/filepath"
"slices"
"testing"
"time"

Expand All @@ -16,7 +16,7 @@ import (
// TestDiscoveredAtMergeLogic specifically tests the discovered_at timestamp comparison
// by creating two separate scenarios with controlled timing.
func TestDiscoveredAtMergeLogic(t *testing.T) {
ctx := context.Background()
ctx := t.Context()

// Test Case 1: Applied is newer (natural case)
t.Run("AppliedNewer", func(t *testing.T) {
Expand Down Expand Up @@ -59,7 +59,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// Create applied database (newer timestamps)
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
require.NoError(t, err)
defer appliedDB.Close()

Expand Down Expand Up @@ -107,7 +108,8 @@ func TestDiscoveredAtMergeLogic(t *testing.T) {
}

// Create applied database first (will have older timestamps)
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, opts...)
appliedOpts := append(slices.Clone(opts), dotc1z.WithPragma("locking_mode", "normal"))
appliedDB, err := dotc1z.NewC1ZFile(ctx, appliedFile, appliedOpts...)
require.NoError(t, err)
defer appliedDB.Close()

Expand Down
11 changes: 6 additions & 5 deletions pkg/synccompactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ func (c *Compactor) Compact(ctx context.Context) (*CompactableSync, error) {
l.Error("doOneCompaction failed: could not create c1z file", zap.Error(err))
return nil, err
}
defer func() { _ = c.compactedC1z.Close() }()
defer func() {
err := c.compactedC1z.Close()
if err != nil {
l.Error("compactor: error closing compacted c1z", zap.Error(err), zap.String("compacted_c1z_file", destFilePath))
}
}()
newSyncId, err := c.compactedC1z.StartNewSync(ctx, connectorstore.SyncTypePartial, "")
if err != nil {
return nil, fmt.Errorf("failed to start new sync: %w", err)
Expand Down Expand Up @@ -287,10 +292,6 @@ func (c *Compactor) doOneCompaction(ctx context.Context, cs *CompactableSync) er
dotc1z.WithTmpDir(c.tmpDir),
dotc1z.WithDecoderOptions(dotc1z.WithDecoderConcurrency(-1)),
dotc1z.WithReadOnly(true),
// We're only reading, so it's safe to use these pragmas.
dotc1z.WithPragma("synchronous", "OFF"),
dotc1z.WithPragma("journal_mode", "OFF"),
dotc1z.WithPragma("locking_mode", "EXCLUSIVE"),
)
if err != nil {
return err
Expand Down
Loading