Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 307 additions & 4 deletions internal/adapters/accountprovider/mojang.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package accountprovider

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/Amund211/flashlight/internal/constants"
Expand All @@ -19,7 +22,11 @@ import (
"go.opentelemetry.io/otel/trace"
)

const getAccountMinOperationTime = 150 * time.Millisecond
const (
getAccountMinOperationTime = 150 * time.Millisecond
batchSize = 10
batchTimeout = 50 * time.Millisecond
)

type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
Expand All @@ -29,12 +36,45 @@ type RequestLimiter interface {
Limit(ctx context.Context, minOperationTime time.Duration, operation func(ctx context.Context)) bool
}

// usernameRequest represents a request to get account by username
type usernameRequest struct {
username string
response chan<- usernameResponse
}

// usernameResponse represents the response for a username request
type usernameResponse struct {
account domain.Account
err error
}

// uuidRequest represents a request to get account by UUID
type uuidRequest struct {
uuid string
response chan<- uuidResponse
}

// uuidResponse represents the response for a UUID request
type uuidResponse struct {
account domain.Account
err error
}

type Mojang struct {
httpClient HttpClient
limiter RequestLimiter
nowFunc func() time.Time

tracer trace.Tracer

// Channels for batching requests
usernameRequestChan chan usernameRequest
uuidRequestChan chan uuidRequest

// Synchronization for cleanup
shutdownOnce sync.Once
shutdownChan chan struct{}
wg sync.WaitGroup
}

func NewMojang(httpClient HttpClient, nowFunc func() time.Time, afterFunc func(time.Duration) <-chan time.Time) *Mojang {
Expand All @@ -50,27 +90,290 @@ func NewMojang(httpClient HttpClient, nowFunc func() time.Time, afterFunc func(t

tracer := otel.Tracer("flashlight/accountprovider/mojangaccountprovider")

return &Mojang{
m := &Mojang{
httpClient: httpClient,
limiter: limiter,
nowFunc: nowFunc,

tracer: tracer,

usernameRequestChan: make(chan usernameRequest, 100),
uuidRequestChan: make(chan uuidRequest, 100),
shutdownChan: make(chan struct{}),
}

// Start the batch processors
m.wg.Add(2)
go m.batchUsernameProcessor(afterFunc)
go m.batchUUIDProcessor(afterFunc)

return m
}

// Shutdown gracefully shuts down the batch processors
func (m *Mojang) Shutdown() {
m.shutdownOnce.Do(func() {
close(m.shutdownChan)
m.wg.Wait()
})
}

// batchUsernameProcessor processes username requests in batches
func (m *Mojang) batchUsernameProcessor(afterFunc func(time.Duration) <-chan time.Time) {
defer m.wg.Done()

var batch []usernameRequest
timer := afterFunc(batchTimeout)

for {
select {
case <-m.shutdownChan:
return
case req := <-m.usernameRequestChan:
batch = append(batch, req)
if len(batch) >= batchSize {
m.processBatchUsernames(batch)
batch = nil
timer = afterFunc(batchTimeout)
}
case <-timer:
if len(batch) > 0 {
m.processBatchUsernames(batch)
batch = nil
}
timer = afterFunc(batchTimeout)
}
}
}

// batchUUIDProcessor processes UUID requests in batches
func (m *Mojang) batchUUIDProcessor(afterFunc func(time.Duration) <-chan time.Time) {
defer m.wg.Done()

var batch []uuidRequest
timer := afterFunc(batchTimeout)

for {
select {
case <-m.shutdownChan:
return
case req := <-m.uuidRequestChan:
batch = append(batch, req)
if len(batch) >= batchSize {
m.processBatchUUIDs(batch)
batch = nil
timer = afterFunc(batchTimeout)
}
case <-timer:
if len(batch) > 0 {
m.processBatchUUIDs(batch)
batch = nil
}
timer = afterFunc(batchTimeout)
}
}
}

// processBatchUsernames processes a batch of username requests
func (m *Mojang) processBatchUsernames(batch []usernameRequest) {
// Use background context as batch processing runs independently of caller contexts
// Individual request contexts are not used here to avoid cancellation affecting the batch
ctx := context.Background()
ctx, span := m.tracer.Start(ctx, "Mojang.processBatchUsernames")
defer span.End()

usernames := make([]string, len(batch))
for i, req := range batch {
usernames[i] = req.username
}

accounts, err := m.bulkGetAccountsByUsername(ctx, usernames)
if err != nil {
// If bulk request fails, fall back to individual requests
for _, req := range batch {
account, err := m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/users/profiles/minecraft/%s", req.username))
req.response <- usernameResponse{account: account, err: err}
}
return
}

// Create a map for quick lookup (case-insensitive)
accountMap := make(map[string]domain.Account)
for _, account := range accounts {
// Store with lowercase key for case-insensitive lookup
accountMap[strings.ToLower(account.Username)] = account
}

// Send responses to each request
for _, req := range batch {
if account, found := accountMap[strings.ToLower(req.username)]; found {
req.response <- usernameResponse{account: account, err: nil}
} else {
req.response <- usernameResponse{err: domain.ErrUsernameNotFound}
}
}
}

// processBatchUUIDs processes a batch of UUID requests
func (m *Mojang) processBatchUUIDs(batch []uuidRequest) {
// Use background context as batch processing runs independently of caller contexts
// Individual request contexts are not used here to avoid cancellation affecting the batch
ctx := context.Background()
ctx, span := m.tracer.Start(ctx, "Mojang.processBatchUUIDs")
defer span.End()

// Note: Mojang doesn't have a bulk UUID endpoint, so we fall back to individual requests
for _, req := range batch {
account, err := m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/user/profile/%s", req.uuid))
req.response <- uuidResponse{account: account, err: err}
}
}

// bulkGetAccountsByUsername fetches multiple accounts by username using the bulk endpoint
func (m *Mojang) bulkGetAccountsByUsername(ctx context.Context, usernames []string) ([]domain.Account, error) {
ctx, span := m.tracer.Start(ctx, "Mojang.bulkGetAccountsByUsername")
defer span.End()

// Mojang bulk endpoint: POST https://api.mojang.com/profiles/minecraft
// Accepts up to 10 usernames in a JSON array
requestBody, err := json.Marshal(usernames)
if err != nil {
err := fmt.Errorf("failed to marshal usernames: %w", err)
reporting.Report(ctx, err)
return nil, err
}

req, err := http.NewRequest("POST", "https://api.mojang.com/profiles/minecraft", bytes.NewReader(requestBody))
if err != nil {
err := fmt.Errorf("failed to create request: %w", err)
reporting.Report(ctx, err)
return nil, err
}

req.Header.Set("User-Agent", constants.USER_AGENT)
req.Header.Set("Content-Type", "application/json")

var resp *http.Response
var data []byte
ran := m.limiter.Limit(ctx, getAccountMinOperationTime, func(ctx context.Context) {
ctx, span := m.tracer.Start(ctx, "MojangAPI.bulkGetAccountsByUsername")
defer span.End()

resp, err = m.httpClient.Do(req)
if err != nil {
err := fmt.Errorf("failed to send request: %w", err)
reporting.Report(ctx, err)
return
}

defer resp.Body.Close()
data, err = io.ReadAll(resp.Body)
if err != nil {
err := fmt.Errorf("failed to read response body: %w", err)
reporting.Report(ctx, err)
return
}
})
if !ran {
return nil, fmt.Errorf("%w: too many requests to mojang API", domain.ErrTemporarilyUnavailable)
}

if err != nil {
return nil, err
}

// Check status code
switch resp.StatusCode {
case http.StatusTooManyRequests,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout:
return nil, fmt.Errorf("%w: mojang API returned status code %d", domain.ErrTemporarilyUnavailable, resp.StatusCode)
case http.StatusOK:
// Continue processing
default:
err := fmt.Errorf("unexpected status code %d from mojang bulk API", resp.StatusCode)
reporting.Report(ctx, err, map[string]string{
"data": string(data),
"status": strconv.Itoa(resp.StatusCode),
})
return nil, err
}

var responses []mojangResponse
if err := json.Unmarshal(data, &responses); err != nil {
err := fmt.Errorf("failed to parse mojang bulk response: %w", err)
reporting.Report(ctx, err)
return nil, err
}

queriedAt := m.nowFunc()
accounts := make([]domain.Account, 0, len(responses))
for _, response := range responses {
uuid, err := strutils.NormalizeUUID(response.UUID)
if err != nil {
err := fmt.Errorf("failed to normalize UUID from mojang: %w", err)
reporting.Report(ctx, err)
continue
}

accounts = append(accounts, domain.Account{
Username: response.Username,
UUID: uuid,
QueriedAt: queriedAt,
})
}

return accounts, nil
}

func (m *Mojang) GetAccountByUUID(ctx context.Context, uuid string) (domain.Account, error) {
ctx, span := m.tracer.Start(ctx, "Mojang.GetAccountByUUID")
defer span.End()

return m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/user/profile/%s", uuid))
responseChan := make(chan uuidResponse, 1)
req := uuidRequest{
uuid: uuid,
response: responseChan,
}

select {
case m.uuidRequestChan <- req:
// Request queued successfully
case <-ctx.Done():
return domain.Account{}, ctx.Err()
}

select {
case resp := <-responseChan:
return resp.account, resp.err
case <-ctx.Done():
return domain.Account{}, ctx.Err()
}
}

func (m *Mojang) GetAccountByUsername(ctx context.Context, username string) (domain.Account, error) {
ctx, span := m.tracer.Start(ctx, "Mojang.GetAccountByUsername")
defer span.End()

return m.getProfile(ctx, fmt.Sprintf("https://api.mojang.com/users/profiles/minecraft/%s", username))
responseChan := make(chan usernameResponse, 1)
req := usernameRequest{
username: username,
response: responseChan,
}

select {
case m.usernameRequestChan <- req:
// Request queued successfully
case <-ctx.Done():
return domain.Account{}, ctx.Err()
}

select {
case resp := <-responseChan:
return resp.account, resp.err
case <-ctx.Done():
return domain.Account{}, ctx.Err()
}
}

func (m *Mojang) getProfile(ctx context.Context, url string) (domain.Account, error) {
Expand Down
Loading