Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ _help:

# Run tests
test:
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format github-actions} ./... -- -race -timeout 30s
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 30s

# Lint code
lint:
Expand Down
26 changes: 16 additions & 10 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/gitclone"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/strategy/git"
Expand Down Expand Up @@ -56,11 +57,12 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
assert.NoError(t, err)

// Create the git strategy
mux := http.NewServeMux()
strategy, err := git.New(ctx, git.Config{
gc := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux)
})
mux := http.NewServeMux()
strategy, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc)
assert.NoError(t, err)
assert.NotZero(t, strategy)

Expand Down Expand Up @@ -134,11 +136,13 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) {
err := os.MkdirAll(workDir, 0o750)
assert.NoError(t, err)

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{
gc := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux)
})

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc)
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -214,10 +218,11 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) {
defer upstreamServer.Close()

mux := http.NewServeMux()
_, err = git.New(ctx, git.Config{
gc := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux)
})
_, err = git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc)
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -307,10 +312,11 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) {
var upstreamUploadPackRequests atomic.Int32

mux := http.NewServeMux()
strategy, err := git.New(ctx, git.Config{
gc := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: clonesDir,
FetchInterval: 15,
}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux)
})
strategy, err := git.New(ctx, git.Config{}, jobscheduler.New(ctx, jobscheduler.Config{}), nil, mux, gc)
assert.NoError(t, err)

strategy.SetHTTPTransport(&countingTransport{
Expand Down
37 changes: 24 additions & 13 deletions internal/strategy/git/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ var ErrSpoolFailed = errors.New("spool failed before response started")
// allowing one writer and multiple concurrent readers. Readers follow the writer,
// blocking when caught up until the write completes.
type ResponseSpool struct {
mu sync.Mutex
cond *sync.Cond
filePath string
file *os.File
status int
headers http.Header
written int64
complete bool
err error
readers sync.WaitGroup
mu sync.Mutex
cond *sync.Cond
filePath string
file *os.File
status int
headers http.Header
written int64
complete bool
err error
readerCount int
}

func NewResponseSpool(filePath string) (*ResponseSpool, error) {
Expand Down Expand Up @@ -102,8 +102,15 @@ func (rs *ResponseSpool) Failed() bool {

// ServeTo streams the spooled response to w, blocking when caught up to the writer.
func (rs *ResponseSpool) ServeTo(w http.ResponseWriter) error {
rs.readers.Add(1)
defer rs.readers.Done()
rs.mu.Lock()
rs.readerCount++
rs.mu.Unlock()
defer func() {
rs.mu.Lock()
rs.readerCount--
rs.cond.Broadcast()
rs.mu.Unlock()
}()

// Wait for headers to be available.
rs.mu.Lock()
Expand Down Expand Up @@ -168,7 +175,11 @@ func (rs *ResponseSpool) ServeTo(w http.ResponseWriter) error {

// WaitForReaders blocks until all active spool readers have finished.
func (rs *ResponseSpool) WaitForReaders() {
rs.readers.Wait()
rs.mu.Lock()
defer rs.mu.Unlock()
for rs.readerCount > 0 {
rs.cond.Wait()
}
}

// SpoolTeeWriter wraps an http.ResponseWriter to capture the response into a spool
Expand Down