Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 143 additions & 5 deletions internal/strategy/git/backend.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package git

import (
"bufio"
"context"
"log/slog"
"net/http"
"net/http/cgi" //nolint:gosec // CVE-2016-5386 only affects Go < 1.6.3
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/alecthomas/errors"

Expand All @@ -17,7 +20,8 @@ import (

// serveFromBackend serves a Git request using git http-backend.
func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, c *clone) {
logger := logging.FromContext(r.Context())
ctx := r.Context()
logger := logging.FromContext(ctx)

gitPath, err := exec.LookPath("git")
if err != nil {
Expand Down Expand Up @@ -92,21 +96,155 @@ func (s *Strategy) executeClone(ctx context.Context, c *clone) error {
func (s *Strategy) executeFetch(ctx context.Context, c *clone) error {
logger := logging.FromContext(ctx)

// Try to acquire the semaphore
select {
case <-c.fetchSem:
// We acquired the semaphore, perform the fetch
defer func() {
// Release the semaphore
c.fetchSem <- struct{}{}
}()
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context cancelled before acquiring fetch semaphore")
default:
// Semaphore is held by another goroutine, wait for it
logger.DebugContext(ctx, "Fetch already in progress, waiting")
select {
case <-c.fetchSem:
// Fetch completed by another goroutine, release and return
c.fetchSem <- struct{}{}
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context cancelled while waiting for fetch")
}
}

// #nosec G204 - c.path is controlled by us
// Configure git for large repositories to avoid network buffer issues
// Use 'remote update' for mirror clones to properly handle ref updates and pruning
cmd := exec.CommandContext(ctx, "git", "-C", c.path,
"-c", "http.postBuffer=524288000", // 500MB buffer
"-c", "http.lowSpeedLimit=1000", // 1KB/s minimum speed
"-c", "http.lowSpeedTime=600", // 10 minute timeout at low speed
"fetch", "--all")
"remote", "update", "--prune")
output, err := cmd.CombinedOutput()
if err != nil {
logger.ErrorContext(ctx, "git fetch failed",
logger.ErrorContext(ctx, "git remote update failed",
slog.String("error", err.Error()),
slog.String("output", string(output)))
return errors.Wrap(err, "git fetch")
return errors.Wrap(err, "git remote update")
}

logger.DebugContext(ctx, "git fetch succeeded", slog.String("output", string(output)))
logger.DebugContext(ctx, "git remote update succeeded", slog.String("output", string(output)))
return nil
}

// ensureRefsUpToDate checks if upstream has refs we don't have and fetches if needed.
// Uses a short-lived cache to avoid excessive ls-remote calls.
func (s *Strategy) ensureRefsUpToDate(ctx context.Context, c *clone) error {
logger := logging.FromContext(ctx)

c.mu.Lock()
// Check if we've done a recent ref check
if c.refCheckValid && time.Since(c.lastRefCheck) < s.config.RefCheckInterval {
c.mu.Unlock()
logger.DebugContext(ctx, "Skipping ref check, recently checked",
slog.Duration("since_last_check", time.Since(c.lastRefCheck)))
return nil
}
c.lastRefCheck = time.Now()
c.mu.Unlock()

logger.DebugContext(ctx, "Checking upstream for new refs",
slog.String("upstream", c.upstreamURL))

// Get local refs
localRefs, err := s.getLocalRefs(ctx, c)
if err != nil {
return errors.Wrap(err, "get local refs")
}

// Get upstream refs
upstreamRefs, err := s.getUpstreamRefs(ctx, c)
if err != nil {
return errors.Wrap(err, "get upstream refs")
}

// Check if upstream has any refs we don't have or refs that have been updated
// Skip peeled refs (refs ending in ^{}) as they're not real refs
needsFetch := false
for ref, upstreamSHA := range upstreamRefs {
// Skip peeled tag refs like refs/tags/v1.0.0^{}
if strings.HasSuffix(ref, "^{}") {
continue
}
localSHA, exists := localRefs[ref]
if !exists || localSHA != upstreamSHA {
logger.DebugContext(ctx, "Upstream ref differs from local",
slog.String("ref", ref),
slog.String("upstream_sha", upstreamSHA),
slog.String("local_sha", localSHA))
needsFetch = true
break
}
}

if !needsFetch {
c.mu.Lock()
c.refCheckValid = true
c.mu.Unlock()
logger.DebugContext(ctx, "No upstream changes detected")
return nil
}

logger.InfoContext(ctx, "Upstream has new or updated refs, fetching")
err = s.executeFetch(ctx, c)
if err == nil {
c.mu.Lock()
c.refCheckValid = true
c.mu.Unlock()
}
return err
}

// getLocalRefs returns a map of ref names to SHAs for the local clone.
func (s *Strategy) getLocalRefs(ctx context.Context, c *clone) (map[string]string, error) {
// #nosec G204 - c.path is controlled by us
// Use --head to include HEAD symbolic ref
cmd := exec.CommandContext(ctx, "git", "-C", c.path, "show-ref", "--head")
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrap(err, "git show-ref")
}

return ParseGitRefs(output), nil
}

// getUpstreamRefs returns a map of ref names to SHAs for the upstream repository.
func (s *Strategy) getUpstreamRefs(ctx context.Context, c *clone) (map[string]string, error) {
// #nosec G204 - c.upstreamURL is controlled by us
cmd := exec.CommandContext(ctx, "git", "ls-remote", c.upstreamURL)
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrap(err, "git ls-remote")
}

return ParseGitRefs(output), nil
}

// ParseGitRefs parses the output of git show-ref or git ls-remote.
// Format: <SHA> <ref>.
func ParseGitRefs(output []byte) map[string]string {
refs := make(map[string]string)
scanner := bufio.NewScanner(strings.NewReader(string(output)))
for scanner.Scan() {
line := scanner.Text()
parts := strings.Fields(line)
if len(parts) >= 2 {
sha := parts[0]
ref := parts[1]
refs[ref] = sha
}
}
return refs
}
42 changes: 33 additions & 9 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func init() {

// Config for the Git strategy.
type Config struct {
MirrorRoot string `hcl:"mirror-root" help:"Directory to store git mirrors." required:""`
FetchInterval time.Duration `hcl:"fetch-interval,optional" help:"How often to fetch from upstream in minutes." default:"15m"`
MirrorRoot string `hcl:"mirror-root" help:"Directory to store git mirrors." required:""`
FetchInterval time.Duration `hcl:"fetch-interval,optional" help:"How often to fetch from upstream in minutes." default:"15m"`
RefCheckInterval time.Duration `hcl:"ref-check-interval,optional" help:"How long to cache ref checks." default:"10s"`
}

// cloneState represents the current state of a bare clone.
Expand All @@ -41,11 +42,14 @@ const (

// clone represents a bare clone of an upstream repository.
type clone struct {
mu sync.RWMutex
state cloneState
path string
upstreamURL string
lastFetch time.Time
mu sync.RWMutex
state cloneState
path string
upstreamURL string
lastFetch time.Time
lastRefCheck time.Time
refCheckValid bool
fetchSem chan struct{} // Semaphore to coordinate fetch operations
}

// Strategy implements a protocol-aware Git caching proxy.
Expand All @@ -70,6 +74,10 @@ func New(ctx context.Context, config Config, cache cache.Cache, mux strategy.Mux
config.FetchInterval = 15 * time.Minute
}

if config.RefCheckInterval == 0 {
config.RefCheckInterval = 10 * time.Second
}

if err := os.MkdirAll(config.MirrorRoot, 0o750); err != nil {
return nil, errors.Wrap(err, "create mirror root directory")
}
Expand Down Expand Up @@ -100,7 +108,8 @@ func New(ctx context.Context, config Config, cache cache.Cache, mux strategy.Mux

logger.InfoContext(ctx, "Git strategy initialized",
"mirror_root", config.MirrorRoot,
"fetch_interval", config.FetchInterval)
"fetch_interval", config.FetchInterval,
"ref_check_interval", config.RefCheckInterval)

return s, nil
}
Expand Down Expand Up @@ -143,9 +152,20 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
state := c.state
c.mu.RUnlock()

// Check if this is an info/refs request (ref discovery)
isInfoRefs := strings.HasSuffix(pathValue, "/info/refs")

switch state {
case stateReady:
// Check if we need to fetch updates
// For info/refs requests, ensure we have the latest refs from upstream
if isInfoRefs {
if err := s.ensureRefsUpToDate(ctx, c); err != nil {
logger.WarnContext(ctx, "Failed to ensure refs up to date",
slog.String("error", err.Error()))
// Continue serving even if ref check fails
}
}
// Also do background fetch if interval has passed
s.maybeBackgroundFetch(ctx, c)
s.serveFromBackend(w, r, c)

Expand Down Expand Up @@ -198,6 +218,7 @@ func (s *Strategy) getOrCreateClone(ctx context.Context, upstreamURL string) *cl
state: stateEmpty,
path: clonePath,
upstreamURL: upstreamURL,
fetchSem: make(chan struct{}, 1),
}

// Check if clone already exists on disk (from previous run)
Expand All @@ -207,6 +228,9 @@ func (s *Strategy) getOrCreateClone(ctx context.Context, upstreamURL string) *cl
slog.String("path", clonePath))
}

// Initialize semaphore as available
c.fetchSem <- struct{}{}

s.clones[upstreamURL] = c
return c
}
Expand Down
54 changes: 54 additions & 0 deletions internal/strategy/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,57 @@ func TestIntegrationWithMockUpstream(t *testing.T) {
assert.NotZero(t, mux.handlers["GET /git/{host}/{path...}"])
assert.NotZero(t, mux.handlers["POST /git/{host}/{path...}"])
}

func TestParseGitRefs(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})
_ = ctx

tests := []struct {
name string
output string
expected map[string]string
}{
{
name: "MultipleRefs",
output: `abc123def456 refs/heads/main
789ghi012jkl refs/heads/develop
mno345pqr678 refs/tags/v1.0.0`,
expected: map[string]string{
"refs/heads/main": "abc123def456",
"refs/heads/develop": "789ghi012jkl",
"refs/tags/v1.0.0": "mno345pqr678",
},
},
{
name: "EmptyOutput",
output: "",
expected: map[string]string{},
},
{
name: "SingleRef",
output: `abc123def456 refs/heads/main
`,
expected: map[string]string{
"refs/heads/main": "abc123def456",
},
},
{
name: "WithPeeledRefs",
output: `e93f19bd6cab17c507792599b8a22f7b567ef516 refs/tags/v1.2.1
babfaf8dee0baa09c56d1a2ec5623b60d900518b refs/tags/v1.2.1^{}
abc123def456 refs/heads/main`,
expected: map[string]string{
"refs/tags/v1.2.1": "e93f19bd6cab17c507792599b8a22f7b567ef516",
"refs/tags/v1.2.1^{}": "babfaf8dee0baa09c56d1a2ec5623b60d900518b",
"refs/heads/main": "abc123def456",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := git.ParseGitRefs([]byte(tt.output))
assert.Equal(t, tt.expected, result)
})
}
}