Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions db/query.mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,17 @@ FROM chunks
WHERE id = ?;

-- name: GetChunksByNarFileID :many
SELECT c.id, c.hash, c.size, c.created_at, c.updated_at
SELECT c.id, c.hash, c.size, c.compressed_size, c.created_at, c.updated_at
FROM chunks c
INNER JOIN nar_file_chunks nfc ON c.id = nfc.chunk_id
WHERE nfc.nar_file_id = ?
ORDER BY nfc.chunk_index;

-- name: CreateChunk :execresult
INSERT INTO chunks (
hash, size
hash, size, compressed_size
) VALUES (
?, ?
?, ?, ?
)
ON DUPLICATE KEY UPDATE
id = LAST_INSERT_ID(id),
Expand Down
6 changes: 3 additions & 3 deletions db/query.postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,17 @@ FROM chunks
WHERE id = $1;

-- name: GetChunksByNarFileID :many
SELECT c.id, c.hash, c.size, c.created_at, c.updated_at
SELECT c.id, c.hash, c.size, c.compressed_size, c.created_at, c.updated_at
FROM chunks c
INNER JOIN nar_file_chunks nfc ON c.id = nfc.chunk_id
WHERE nfc.nar_file_id = $1
ORDER BY nfc.chunk_index;

-- name: CreateChunk :one
INSERT INTO chunks (
hash, size
hash, size, compressed_size
) VALUES (
$1, $2
$1, $2, $3
)
ON CONFLICT(hash) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP
Expand Down
6 changes: 3 additions & 3 deletions db/query.sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,17 @@ FROM chunks
WHERE id = ?;

-- name: GetChunksByNarFileID :many
SELECT c.id, c.hash, c.size, c.created_at, c.updated_at
SELECT c.id, c.hash, c.size, c.compressed_size, c.created_at, c.updated_at
FROM chunks c
INNER JOIN nar_file_chunks nfc ON c.id = nfc.chunk_id
WHERE nfc.nar_file_id = ?
ORDER BY nfc.chunk_index;

-- name: CreateChunk :one
INSERT INTO chunks (
hash, size
hash, size, compressed_size
) VALUES (
?, ?
?, ?, ?
)
ON CONFLICT(hash) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP
Expand Down
9 changes: 6 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,14 +1346,16 @@ func (c *Cache) storeNarWithCDC(ctx context.Context, tempPath string, narURL *na
}

// Store in chunkStore if new
_, _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data)
_, compressedSize, err := chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data)
if err != nil {
chunkMetadata.Free()

return 0, fmt.Errorf("error storing chunk: %w", err)
}

chunkMetadata.Free()
//nolint:gosec // G115: Chunk size is small enough to fit in uint32
chunkMetadata.CompressedSize = uint32(compressedSize)

totalSize += int64(chunkMetadata.Size)

Expand Down Expand Up @@ -1415,8 +1417,9 @@ func (c *Cache) recordChunkBatch(ctx context.Context, narFileID int64, startInde
for i, chunkMetadata := range batch {
// Create or increment ref count.
ch, err := qtx.CreateChunk(ctx, database.CreateChunkParams{
Hash: chunkMetadata.Hash,
Size: chunkMetadata.Size,
Hash: chunkMetadata.Hash,
Size: chunkMetadata.Size,
CompressedSize: chunkMetadata.CompressedSize,
})
if err != nil {
return fmt.Errorf("error creating chunk record: %w", err)
Expand Down
64 changes: 64 additions & 0 deletions pkg/cache/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kalbasit/ncps/pkg/database"
"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/storage/chunk"
)
Expand Down Expand Up @@ -51,6 +52,7 @@ func runCDCTestSuite(t *testing.T, factory cacheFactory) {
t.Run("Mixed Mode", testCDCMixedMode(factory))
t.Run("GetNarInfo with CDC chunks", testCDCGetNarInfo(factory))
t.Run("Client Disconnect No Goroutine Leak", testCDCClientDisconnectNoGoroutineLeak(factory))
t.Run("chunks are stored compressed", testCDCChunksAreCompressed(factory))
}

func testCDCPutAndGet(factory cacheFactory) func(*testing.T) {
Expand Down Expand Up @@ -311,3 +313,65 @@ func testCDCClientDisconnectNoGoroutineLeak(factory cacheFactory) func(*testing.
"Goroutine leak detected: baseline=%d, final=%d", baselineGoroutines, finalGoroutines)
}
}

func testCDCChunksAreCompressed(factory cacheFactory) func(*testing.T) {
return func(t *testing.T) {
t.Parallel()

ctx := context.Background()

c, db, _, dir, _, cleanup := factory(t)
t.Cleanup(cleanup)

// Initialize chunk store
chunkStoreDir := filepath.Join(dir, "chunks-store")
chunkStore, err := chunk.NewLocalStore(chunkStoreDir)
require.NoError(t, err)

c.SetChunkStore(chunkStore)
err = c.SetCDCConfiguration(true, 1024, 4096, 8192) // Small sizes for testing
require.NoError(t, err)

// Use highly compressible data (repeated bytes)
content := strings.Repeat("compressible", 1000)
nu := nar.URL{Hash: "testnar-compress", Compression: nar.CompressionTypeNone}

r := io.NopCloser(strings.NewReader(content))
err = c.PutNar(ctx, nu, r)
require.NoError(t, err)

// Verify chunks exist in DB and have compressed_size set
narFile, err := db.GetNarFileByHashAndCompressionAndQuery(ctx, database.GetNarFileByHashAndCompressionAndQueryParams{
Hash: nu.Hash,
Compression: nu.Compression.String(),
Query: nu.Query.Encode(),
})
require.NoError(t, err)

chunks, err := db.GetChunksByNarFileID(ctx, narFile.ID)
require.NoError(t, err)
require.NotEmpty(t, chunks, "should have chunks in the database")

var totalSize, totalCompressedSize int64
for _, chunk := range chunks {
totalSize += int64(chunk.Size)
totalCompressedSize += int64(chunk.CompressedSize)
assert.Positive(t, chunk.CompressedSize, "compressed size should be positive")
}

assert.Equal(t, int64(len(content)), totalSize, "sum of chunk sizes should equal original content size")
assert.Less(t, totalCompressedSize, totalSize,
"total compressed size should be less than total original size for compressible data")

// Verify reassembly to ensure compression is transparent
size, rc, err := c.GetNar(ctx, nu)
require.NoError(t, err)

defer rc.Close()

data, err := io.ReadAll(rc)
require.NoError(t, err)
assert.Equal(t, content, string(data), "decompressed data should match original")
assert.Equal(t, int64(len(content)), size, "size should match original content size")
}
}
9 changes: 5 additions & 4 deletions pkg/chunker/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (

// Chunk represents a single content-defined chunk.
type Chunk struct {
Hash string // BLAKE3 hash of chunk content
Offset int64 // Offset in original stream
Size uint32 // Chunk size in bytes
Data []byte // Chunk data
Hash string // BLAKE3 hash of chunk content
Offset int64 // Offset in original stream
Size uint32 // Chunk size in bytes
CompressedSize uint32 // Compressed chunk size in bytes
Data []byte // Chunk data

free func() // function to return Data to the pool
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/database/generated_models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/database/generated_querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions pkg/database/generated_wrapper_mysql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions pkg/database/generated_wrapper_postgres.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions pkg/database/generated_wrapper_sqlite.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/database/mysqldb/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading