diff --git a/internal/adapters/accountprovider/mojang.go b/internal/adapters/accountprovider/mojang.go index af56b447..95763bb7 100644 --- a/internal/adapters/accountprovider/mojang.go +++ b/internal/adapters/accountprovider/mojang.go @@ -1,6 +1,7 @@ package accountprovider import ( + "bytes" "context" "encoding/json" "errors" @@ -8,6 +9,8 @@ import ( "io" "net/http" "strconv" + "strings" + "sync" "time" "github.com/Amund211/flashlight/internal/constants" @@ -19,7 +22,11 @@ import ( "go.opentelemetry.io/otel/trace" ) -const getAccountMinOperationTime = 150 * time.Millisecond +const ( + getAccountMinOperationTime = 150 * time.Millisecond + batchSize = 10 + batchTimeout = 50 * time.Millisecond +) type HttpClient interface { Do(req *http.Request) (*http.Response, error) @@ -29,12 +36,45 @@ type RequestLimiter interface { Limit(ctx context.Context, minOperationTime time.Duration, operation func(ctx context.Context)) bool } +// usernameRequest represents a request to get account by username +type usernameRequest struct { + username string + response chan<- usernameResponse +} + +// usernameResponse represents the response for a username request +type usernameResponse struct { + account domain.Account + err error +} + +// uuidRequest represents a request to get account by UUID +type uuidRequest struct { + uuid string + response chan<- uuidResponse +} + +// uuidResponse represents the response for a UUID request +type uuidResponse struct { + account domain.Account + err error +} + type Mojang struct { httpClient HttpClient limiter RequestLimiter nowFunc func() time.Time tracer trace.Tracer + + // Channels for batching requests + usernameRequestChan chan usernameRequest + uuidRequestChan chan uuidRequest + + // Synchronization for cleanup + shutdownOnce sync.Once + shutdownChan chan struct{} + wg sync.WaitGroup } func NewMojang(httpClient HttpClient, nowFunc func() time.Time, afterFunc func(time.Duration) <-chan time.Time) *Mojang { @@ -50,27 +90,290 @@ func NewMojang(httpClient HttpClient, nowFunc func() time.Time, afterFunc func(t tracer := otel.Tracer("flashlight/accountprovider/mojangaccountprovider") - return &Mojang{ + m := &Mojang{ httpClient: httpClient, limiter: limiter, nowFunc: nowFunc, tracer: tracer, + + usernameRequestChan: make(chan usernameRequest, 100), + uuidRequestChan: make(chan uuidRequest, 100), + shutdownChan: make(chan struct{}), } + + // Start the batch processors + m.wg.Add(2) + go m.batchUsernameProcessor(afterFunc) + go m.batchUUIDProcessor(afterFunc) + + return m +} + +// Shutdown gracefully shuts down the batch processors +func (m *Mojang) Shutdown() { + m.shutdownOnce.Do(func() { + close(m.shutdownChan) + m.wg.Wait() + }) +} + +// batchUsernameProcessor processes username requests in batches +func (m *Mojang) batchUsernameProcessor(afterFunc func(time.Duration) <-chan time.Time) { + defer m.wg.Done() + + var batch []usernameRequest + timer := afterFunc(batchTimeout) + + for { + select { + case <-m.shutdownChan: + return + case req := <-m.usernameRequestChan: + batch = append(batch, req) + if len(batch) >= batchSize { + m.processBatchUsernames(batch) + batch = nil + timer = afterFunc(batchTimeout) + } + case <-timer: + if len(batch) > 0 { + m.processBatchUsernames(batch) + batch = nil + } + timer = afterFunc(batchTimeout) + } + } +} + +// batchUUIDProcessor processes UUID requests in batches +func (m *Mojang) batchUUIDProcessor(afterFunc func(time.Duration) <-chan time.Time) { + defer m.wg.Done() + + var batch []uuidRequest + timer := afterFunc(batchTimeout) + + for { + select { + case <-m.shutdownChan: + return + case req := <-m.uuidRequestChan: + batch = append(batch, req) + if len(batch) >= batchSize { + m.processBatchUUIDs(batch) + batch = nil + timer = afterFunc(batchTimeout) + } + case <-timer: + if len(batch) > 0 { + m.processBatchUUIDs(batch) + batch = nil + } + timer = afterFunc(batchTimeout) + } + } +} + +// processBatchUsernames processes a batch of username requests +func (m *Mojang) processBatchUsernames(batch []usernameRequest) { + // Use background context as batch processing runs independently of caller contexts + // Individual request contexts are not used here to avoid cancellation affecting the batch + ctx := context.Background() + ctx, span := m.tracer.Start(ctx, "Mojang.processBatchUsernames") + defer span.End() + + usernames := make([]string, len(batch)) + for i, req := range batch { + usernames[i] = req.username + } + + accounts, err := m.bulkGetAccountsByUsername(ctx, usernames) + if err != nil { + // If bulk request fails, fall back to individual requests + for _, req := range batch { + account, err := m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/users/profiles/minecraft/%s", req.username)) + req.response <- usernameResponse{account: account, err: err} + } + return + } + + // Create a map for quick lookup (case-insensitive) + accountMap := make(map[string]domain.Account) + for _, account := range accounts { + // Store with lowercase key for case-insensitive lookup + accountMap[strings.ToLower(account.Username)] = account + } + + // Send responses to each request + for _, req := range batch { + if account, found := accountMap[strings.ToLower(req.username)]; found { + req.response <- usernameResponse{account: account, err: nil} + } else { + req.response <- usernameResponse{err: domain.ErrUsernameNotFound} + } + } +} + +// processBatchUUIDs processes a batch of UUID requests +func (m *Mojang) processBatchUUIDs(batch []uuidRequest) { + // Use background context as batch processing runs independently of caller contexts + // Individual request contexts are not used here to avoid cancellation affecting the batch + ctx := context.Background() + ctx, span := m.tracer.Start(ctx, "Mojang.processBatchUUIDs") + defer span.End() + + // Note: Mojang doesn't have a bulk UUID endpoint, so we fall back to individual requests + for _, req := range batch { + account, err := m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/user/profile/%s", req.uuid)) + req.response <- uuidResponse{account: account, err: err} + } +} + +// bulkGetAccountsByUsername fetches multiple accounts by username using the bulk endpoint +func (m *Mojang) bulkGetAccountsByUsername(ctx context.Context, usernames []string) ([]domain.Account, error) { + ctx, span := m.tracer.Start(ctx, "Mojang.bulkGetAccountsByUsername") + defer span.End() + + // Mojang bulk endpoint: POST https://api.mojang.com/profiles/minecraft + // Accepts up to 10 usernames in a JSON array + requestBody, err := json.Marshal(usernames) + if err != nil { + err := fmt.Errorf("failed to marshal usernames: %w", err) + reporting.Report(ctx, err) + return nil, err + } + + req, err := http.NewRequest("POST", "https://api.mojang.com/profiles/minecraft", bytes.NewReader(requestBody)) + if err != nil { + err := fmt.Errorf("failed to create request: %w", err) + reporting.Report(ctx, err) + return nil, err + } + + req.Header.Set("User-Agent", constants.USER_AGENT) + req.Header.Set("Content-Type", "application/json") + + var resp *http.Response + var data []byte + ran := m.limiter.Limit(ctx, getAccountMinOperationTime, func(ctx context.Context) { + ctx, span := m.tracer.Start(ctx, "MojangAPI.bulkGetAccountsByUsername") + defer span.End() + + resp, err = m.httpClient.Do(req) + if err != nil { + err := fmt.Errorf("failed to send request: %w", err) + reporting.Report(ctx, err) + return + } + + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + if err != nil { + err := fmt.Errorf("failed to read response body: %w", err) + reporting.Report(ctx, err) + return + } + }) + if !ran { + return nil, fmt.Errorf("%w: too many requests to mojang API", domain.ErrTemporarilyUnavailable) + } + + if err != nil { + return nil, err + } + + // Check status code + switch resp.StatusCode { + case http.StatusTooManyRequests, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout: + return nil, fmt.Errorf("%w: mojang API returned status code %d", domain.ErrTemporarilyUnavailable, resp.StatusCode) + case http.StatusOK: + // Continue processing + default: + err := fmt.Errorf("unexpected status code %d from mojang bulk API", resp.StatusCode) + reporting.Report(ctx, err, map[string]string{ + "data": string(data), + "status": strconv.Itoa(resp.StatusCode), + }) + return nil, err + } + + var responses []mojangResponse + if err := json.Unmarshal(data, &responses); err != nil { + err := fmt.Errorf("failed to parse mojang bulk response: %w", err) + reporting.Report(ctx, err) + return nil, err + } + + queriedAt := m.nowFunc() + accounts := make([]domain.Account, 0, len(responses)) + for _, response := range responses { + uuid, err := strutils.NormalizeUUID(response.UUID) + if err != nil { + err := fmt.Errorf("failed to normalize UUID from mojang: %w", err) + reporting.Report(ctx, err) + continue + } + + accounts = append(accounts, domain.Account{ + Username: response.Username, + UUID: uuid, + QueriedAt: queriedAt, + }) + } + + return accounts, nil } func (m *Mojang) GetAccountByUUID(ctx context.Context, uuid string) (domain.Account, error) { ctx, span := m.tracer.Start(ctx, "Mojang.GetAccountByUUID") defer span.End() - return m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/user/profile/%s", uuid)) + responseChan := make(chan uuidResponse, 1) + req := uuidRequest{ + uuid: uuid, + response: responseChan, + } + + select { + case m.uuidRequestChan <- req: + // Request queued successfully + case <-ctx.Done(): + return domain.Account{}, ctx.Err() + } + + select { + case resp := <-responseChan: + return resp.account, resp.err + case <-ctx.Done(): + return domain.Account{}, ctx.Err() + } } func (m *Mojang) GetAccountByUsername(ctx context.Context, username string) (domain.Account, error) { ctx, span := m.tracer.Start(ctx, "Mojang.GetAccountByUsername") defer span.End() - return m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/users/profiles/minecraft/%s", username)) + responseChan := make(chan usernameResponse, 1) + req := usernameRequest{ + username: username, + response: responseChan, + } + + select { + case m.usernameRequestChan <- req: + // Request queued successfully + case <-ctx.Done(): + return domain.Account{}, ctx.Err() + } + + select { + case resp := <-responseChan: + return resp.account, resp.err + case <-ctx.Done(): + return domain.Account{}, ctx.Err() + } } func (m *Mojang) getProfile(ctx context.Context, url string) (domain.Account, error) { diff --git a/internal/adapters/accountprovider/mojang_test.go b/internal/adapters/accountprovider/mojang_test.go index 04063303..43286b67 100644 --- a/internal/adapters/accountprovider/mojang_test.go +++ b/internal/adapters/accountprovider/mojang_test.go @@ -2,9 +2,12 @@ package accountprovider import ( "bytes" + "encoding/json" "errors" + "fmt" "io" "net/http" + "sync" "testing" "time" @@ -182,4 +185,207 @@ func TestMojang(t *testing.T) { _, err = provider.GetAccountByUsername(ctx, "skydeath") require.ErrorIs(t, err, assert.AnError) + + provider.Shutdown() +} + +// batchMockedClient tracks requests and returns appropriate responses for batching tests +type batchMockedClient struct { + mu sync.Mutex + requestCount int + bulkRequests int +} + +func (m *batchMockedClient) Do(req *http.Request) (*http.Response, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.requestCount++ + + // Check if this is a bulk request + if req.Method == "POST" && req.URL.Path == "/profiles/minecraft" { + m.bulkRequests++ + // Parse the request body to get usernames + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + var usernames []string + if err := json.Unmarshal(body, &usernames); err != nil { + return nil, err + } + + // Return bulk response + responses := make([]map[string]string, 0, len(usernames)) + for _, username := range usernames { + // For testing, we'll return a valid response for any username + responses = append(responses, map[string]string{ + "id": "a937646bf11544c38dbf9ae4a65669a0", + "name": username, + }) + } + responseData, err := json.Marshal(responses) + if err != nil { + return nil, err + } + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader(responseData)), + Header: make(http.Header), + }, nil + } + + // Single request + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(`{ + "id" : "a937646bf11544c38dbf9ae4a65669a0", + "name" : "Skydeath" +}`))), + Header: make(http.Header), + }, nil +} + +func TestMojangBatching(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + now := time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC) + mockedNow := func() time.Time { + return now + } + + client := &batchMockedClient{} + provider := NewMojang(client, mockedNow, time.After) + defer provider.Shutdown() + + // Test that multiple username requests are batched + t.Run("batch username requests", func(t *testing.T) { + client.mu.Lock() + initialRequests := client.requestCount + initialBulkRequests := client.bulkRequests + client.mu.Unlock() + + // Make 5 concurrent username requests + var wg sync.WaitGroup + usernames := []string{"user1", "user2", "user3", "user4", "user5"} + results := make([]domain.Account, len(usernames)) + errors := make([]error, len(usernames)) + + for i, username := range usernames { + wg.Add(1) + go func(idx int, uname string) { + defer wg.Done() + account, err := provider.GetAccountByUsername(ctx, uname) + results[idx] = account + errors[idx] = err + }(i, username) + } + + wg.Wait() + + // Verify all requests succeeded + for i, err := range errors { + require.NoError(t, err, "Request %d failed", i) + } + + // Give time for batch processing + time.Sleep(100 * time.Millisecond) + + client.mu.Lock() + totalRequests := client.requestCount - initialRequests + bulkRequests := client.bulkRequests - initialBulkRequests + client.mu.Unlock() + + // Should have batched into 1 bulk request + assert.Equal(t, 1, bulkRequests, "Expected 1 bulk request") + assert.Equal(t, 1, totalRequests, "Expected 1 total request") + }) + + // Test that requests exceeding batch size trigger multiple batches + t.Run("batch size overflow", func(t *testing.T) { + client.mu.Lock() + initialBulkRequests := client.bulkRequests + client.mu.Unlock() + + // Make 15 concurrent username requests (exceeds batch size of 10) + var wg sync.WaitGroup + for i := 0; i < 15; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + username := fmt.Sprintf("user%d", idx) + _, err := provider.GetAccountByUsername(ctx, username) + require.NoError(t, err) + }(i) + } + + wg.Wait() + + // Give time for batch processing + time.Sleep(100 * time.Millisecond) + + client.mu.Lock() + bulkRequests := client.bulkRequests - initialBulkRequests + client.mu.Unlock() + + // Should have batched into 2 bulk requests (10 + 5) + assert.Equal(t, 2, bulkRequests, "Expected 2 bulk requests for 15 items") + }) +} + +func TestMojangBatchingTimeout(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + now := time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC) + mockedNow := func() time.Time { + return now + } + + client := &batchMockedClient{} + provider := NewMojang(client, mockedNow, time.After) + defer provider.Shutdown() + + // Test that timeout triggers batch processing + client.mu.Lock() + initialBulkRequests := client.bulkRequests + client.mu.Unlock() + + // Make a single request that won't reach the batch size + account, err := provider.GetAccountByUsername(ctx, "singleuser") + require.NoError(t, err) + require.Equal(t, "singleuser", account.Username) + + // Wait for timeout to trigger + time.Sleep(100 * time.Millisecond) + + client.mu.Lock() + bulkRequests := client.bulkRequests - initialBulkRequests + client.mu.Unlock() + + // Should have sent 1 bulk request after timeout + assert.Equal(t, 1, bulkRequests, "Expected 1 bulk request after timeout") +} + +func TestMojangBatchingCaseInsensitive(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + now := time.Date(2023, 10, 1, 12, 0, 0, 0, time.UTC) + mockedNow := func() time.Time { + return now + } + + client := &batchMockedClient{} + provider := NewMojang(client, mockedNow, time.After) + defer provider.Shutdown() + + // Test case-insensitive username lookup + account, err := provider.GetAccountByUsername(ctx, "SkyDeath") + require.NoError(t, err) + // The bulk API returns the username as provided, but we should still find it + assert.NotEmpty(t, account.Username) }