diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index 2f53d655e8..cb2fad5d88 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -31,6 +31,7 @@ const ( const ( opsErrorLogTimeout = 5 * time.Second opsErrorLogDrainTimeout = 10 * time.Second + opsErrorLogBatchWindow = 200 * time.Millisecond opsErrorLogMinWorkerCount = 4 opsErrorLogMaxWorkerCount = 32 @@ -38,6 +39,7 @@ const ( opsErrorLogQueueSizePerWorker = 128 opsErrorLogMinQueueSize = 256 opsErrorLogMaxQueueSize = 8192 + opsErrorLogBatchSize = 32 ) type opsErrorLogJob struct { @@ -82,27 +84,82 @@ func startOpsErrorLogWorkers() { for i := 0; i < workerCount; i++ { go func() { defer opsErrorLogWorkersWg.Done() - for job := range opsErrorLogQueue { - opsErrorLogQueueLen.Add(-1) - if job.ops == nil || job.entry == nil { - continue + for { + job, ok := <-opsErrorLogQueue + if !ok { + return } - func() { - defer func() { - if r := recover(); r != nil { - log.Printf("[OpsErrorLogger] worker panic: %v\n%s", r, debug.Stack()) + opsErrorLogQueueLen.Add(-1) + batch := make([]opsErrorLogJob, 0, opsErrorLogBatchSize) + batch = append(batch, job) + + timer := time.NewTimer(opsErrorLogBatchWindow) + batchLoop: + for len(batch) < opsErrorLogBatchSize { + select { + case nextJob, ok := <-opsErrorLogQueue: + if !ok { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + flushOpsErrorLogBatch(batch) + return } - }() - ctx, cancel := context.WithTimeout(context.Background(), opsErrorLogTimeout) - _ = job.ops.RecordError(ctx, job.entry, nil) - cancel() - opsErrorLogProcessed.Add(1) - }() + opsErrorLogQueueLen.Add(-1) + batch = append(batch, nextJob) + case <-timer.C: + break batchLoop + } + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + flushOpsErrorLogBatch(batch) } }() } } +func flushOpsErrorLogBatch(batch []opsErrorLogJob) { + if len(batch) == 0 { + return + } + defer func() { + if r := recover(); r != nil { + log.Printf("[OpsErrorLogger] worker panic: %v\n%s", r, debug.Stack()) + } + }() + + grouped := make(map[*service.OpsService][]*service.OpsInsertErrorLogInput, len(batch)) + var processed int64 + for _, job := range batch { + if job.ops == nil || job.entry == nil { + continue + } + grouped[job.ops] = append(grouped[job.ops], job.entry) + processed++ + } + if processed == 0 { + return + } + + for opsSvc, entries := range grouped { + if opsSvc == nil || len(entries) == 0 { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), opsErrorLogTimeout) + _ = opsSvc.RecordErrorBatch(ctx, entries) + cancel() + } + opsErrorLogProcessed.Add(processed) +} + func enqueueOpsErrorLog(ops *service.OpsService, entry *service.OpsInsertErrorLogInput) { if ops == nil || entry == nil { return diff --git a/backend/internal/repository/ops_repo.go b/backend/internal/repository/ops_repo.go index 989573f2c9..02ca1a3b16 100644 --- a/backend/internal/repository/ops_repo.go +++ b/backend/internal/repository/ops_repo.go @@ -16,19 +16,7 @@ type opsRepository struct { db *sql.DB } -func NewOpsRepository(db *sql.DB) service.OpsRepository { - return &opsRepository{db: db} -} - -func (r *opsRepository) InsertErrorLog(ctx context.Context, input *service.OpsInsertErrorLogInput) (int64, error) { - if r == nil || r.db == nil { - return 0, fmt.Errorf("nil ops repository") - } - if input == nil { - return 0, fmt.Errorf("nil input") - } - - q := ` +const insertOpsErrorLogSQL = ` INSERT INTO ops_error_logs ( request_id, client_request_id, @@ -70,12 +58,77 @@ INSERT INTO ops_error_logs ( created_at ) VALUES ( $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38 -) RETURNING id` +)` + +func NewOpsRepository(db *sql.DB) service.OpsRepository { + return &opsRepository{db: db} +} + +func (r *opsRepository) InsertErrorLog(ctx context.Context, input *service.OpsInsertErrorLogInput) (int64, error) { + if r == nil || r.db == nil { + return 0, fmt.Errorf("nil ops repository") + } + if input == nil { + return 0, fmt.Errorf("nil input") + } var id int64 err := r.db.QueryRowContext( ctx, - q, + insertOpsErrorLogSQL+" RETURNING id", + opsInsertErrorLogArgs(input)..., + ).Scan(&id) + if err != nil { + return 0, err + } + return id, nil +} + +func (r *opsRepository) BatchInsertErrorLogs(ctx context.Context, inputs []*service.OpsInsertErrorLogInput) (int64, error) { + if r == nil || r.db == nil { + return 0, fmt.Errorf("nil ops repository") + } + if len(inputs) == 0 { + return 0, nil + } + + tx, err := r.db.BeginTx(ctx, nil) + if err != nil { + return 0, err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + stmt, err := tx.PrepareContext(ctx, insertOpsErrorLogSQL) + if err != nil { + return 0, err + } + defer func() { + _ = stmt.Close() + }() + + var inserted int64 + for _, input := range inputs { + if input == nil { + continue + } + if _, err = stmt.ExecContext(ctx, opsInsertErrorLogArgs(input)...); err != nil { + return inserted, err + } + inserted++ + } + + if err = tx.Commit(); err != nil { + return inserted, err + } + return inserted, nil +} + +func opsInsertErrorLogArgs(input *service.OpsInsertErrorLogInput) []any { + return []any{ opsNullString(input.RequestID), opsNullString(input.ClientRequestID), opsNullInt64(input.UserID), @@ -114,11 +167,7 @@ INSERT INTO ops_error_logs ( input.IsRetryable, input.RetryCount, input.CreatedAt, - ).Scan(&id) - if err != nil { - return 0, err } - return id, nil } func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsErrorLogFilter) (*service.OpsErrorLogList, error) { diff --git a/backend/internal/repository/ops_write_pressure_integration_test.go b/backend/internal/repository/ops_write_pressure_integration_test.go new file mode 100644 index 0000000000..ebb7a84226 --- /dev/null +++ b/backend/internal/repository/ops_write_pressure_integration_test.go @@ -0,0 +1,79 @@ +//go:build integration + +package repository + +import ( + "context" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/stretchr/testify/require" +) + +func TestOpsRepositoryBatchInsertErrorLogs(t *testing.T) { + ctx := context.Background() + _, _ = integrationDB.ExecContext(ctx, "TRUNCATE ops_error_logs RESTART IDENTITY") + + repo := NewOpsRepository(integrationDB).(*opsRepository) + now := time.Now().UTC() + inserted, err := repo.BatchInsertErrorLogs(ctx, []*service.OpsInsertErrorLogInput{ + { + RequestID: "batch-ops-1", + ErrorPhase: "upstream", + ErrorType: "upstream_error", + Severity: "error", + StatusCode: 429, + ErrorMessage: "rate limited", + CreatedAt: now, + }, + { + RequestID: "batch-ops-2", + ErrorPhase: "internal", + ErrorType: "api_error", + Severity: "error", + StatusCode: 500, + ErrorMessage: "internal error", + CreatedAt: now.Add(time.Millisecond), + }, + }) + require.NoError(t, err) + require.EqualValues(t, 2, inserted) + + var count int + require.NoError(t, integrationDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM ops_error_logs WHERE request_id IN ('batch-ops-1', 'batch-ops-2')").Scan(&count)) + require.Equal(t, 2, count) +} + +func TestEnqueueSchedulerOutbox_DeduplicatesIdempotentEvents(t *testing.T) { + ctx := context.Background() + _, _ = integrationDB.ExecContext(ctx, "TRUNCATE scheduler_outbox RESTART IDENTITY") + + accountID := int64(12345) + require.NoError(t, enqueueSchedulerOutbox(ctx, integrationDB, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil)) + require.NoError(t, enqueueSchedulerOutbox(ctx, integrationDB, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil)) + + var count int + require.NoError(t, integrationDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1", service.SchedulerOutboxEventAccountChanged).Scan(&count)) + require.Equal(t, 1, count) + + time.Sleep(schedulerOutboxDedupWindow + 150*time.Millisecond) + require.NoError(t, enqueueSchedulerOutbox(ctx, integrationDB, service.SchedulerOutboxEventAccountChanged, &accountID, nil, nil)) + require.NoError(t, integrationDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1", service.SchedulerOutboxEventAccountChanged).Scan(&count)) + require.Equal(t, 2, count) +} + +func TestEnqueueSchedulerOutbox_DoesNotDeduplicateLastUsed(t *testing.T) { + ctx := context.Background() + _, _ = integrationDB.ExecContext(ctx, "TRUNCATE scheduler_outbox RESTART IDENTITY") + + accountID := int64(67890) + payload1 := map[string]any{"last_used": map[string]int64{"67890": 100}} + payload2 := map[string]any{"last_used": map[string]int64{"67890": 200}} + require.NoError(t, enqueueSchedulerOutbox(ctx, integrationDB, service.SchedulerOutboxEventAccountLastUsed, &accountID, nil, payload1)) + require.NoError(t, enqueueSchedulerOutbox(ctx, integrationDB, service.SchedulerOutboxEventAccountLastUsed, &accountID, nil, payload2)) + + var count int + require.NoError(t, integrationDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM scheduler_outbox WHERE event_type = $1", service.SchedulerOutboxEventAccountLastUsed).Scan(&count)) + require.Equal(t, 2, count) +} diff --git a/backend/internal/repository/scheduler_outbox_repo.go b/backend/internal/repository/scheduler_outbox_repo.go index d7bc97dad2..4b9a9f58b1 100644 --- a/backend/internal/repository/scheduler_outbox_repo.go +++ b/backend/internal/repository/scheduler_outbox_repo.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "time" "github.com/Wei-Shaw/sub2api/internal/service" ) @@ -12,6 +13,8 @@ type schedulerOutboxRepository struct { db *sql.DB } +const schedulerOutboxDedupWindow = time.Second + func NewSchedulerOutboxRepository(db *sql.DB) service.SchedulerOutboxRepository { return &schedulerOutboxRepository{db: db} } @@ -88,9 +91,37 @@ func enqueueSchedulerOutbox(ctx context.Context, exec sqlExecutor, eventType str } payloadArg = encoded } - _, err := exec.ExecContext(ctx, ` + query := ` INSERT INTO scheduler_outbox (event_type, account_id, group_id, payload) VALUES ($1, $2, $3, $4) - `, eventType, accountID, groupID, payloadArg) + ` + args := []any{eventType, accountID, groupID, payloadArg} + if schedulerOutboxEventSupportsDedup(eventType) { + query = ` + INSERT INTO scheduler_outbox (event_type, account_id, group_id, payload) + SELECT $1, $2, $3, $4 + WHERE NOT EXISTS ( + SELECT 1 + FROM scheduler_outbox + WHERE event_type = $1 + AND account_id IS NOT DISTINCT FROM $2 + AND group_id IS NOT DISTINCT FROM $3 + AND created_at >= NOW() - make_interval(secs => $5) + ) + ` + args = append(args, schedulerOutboxDedupWindow.Seconds()) + } + _, err := exec.ExecContext(ctx, query, args...) return err } + +func schedulerOutboxEventSupportsDedup(eventType string) bool { + switch eventType { + case service.SchedulerOutboxEventAccountChanged, + service.SchedulerOutboxEventGroupChanged, + service.SchedulerOutboxEventFullRebuild: + return true + default: + return false + } +} diff --git a/backend/internal/service/ops_port.go b/backend/internal/service/ops_port.go index f3633eae92..0ce9d42591 100644 --- a/backend/internal/service/ops_port.go +++ b/backend/internal/service/ops_port.go @@ -7,6 +7,7 @@ import ( type OpsRepository interface { InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) + BatchInsertErrorLogs(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error) ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error) diff --git a/backend/internal/service/ops_repo_mock_test.go b/backend/internal/service/ops_repo_mock_test.go index e250dea3dd..c8c66ec639 100644 --- a/backend/internal/service/ops_repo_mock_test.go +++ b/backend/internal/service/ops_repo_mock_test.go @@ -7,6 +7,8 @@ import ( // opsRepoMock is a test-only OpsRepository implementation with optional function hooks. type opsRepoMock struct { + InsertErrorLogFn func(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) + BatchInsertErrorLogsFn func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) BatchInsertSystemLogsFn func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) ListSystemLogsFn func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) DeleteSystemLogsFn func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) @@ -14,9 +16,19 @@ type opsRepoMock struct { } func (m *opsRepoMock) InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) { + if m.InsertErrorLogFn != nil { + return m.InsertErrorLogFn(ctx, input) + } return 0, nil } +func (m *opsRepoMock) BatchInsertErrorLogs(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) { + if m.BatchInsertErrorLogsFn != nil { + return m.BatchInsertErrorLogsFn(ctx, inputs) + } + return int64(len(inputs)), nil +} + func (m *opsRepoMock) ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) { return &OpsErrorLogList{Errors: []*OpsErrorLog{}, Page: 1, PageSize: 20}, nil } diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 767d1704b7..29f0aa8b50 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -121,15 +121,75 @@ func (s *OpsService) IsMonitoringEnabled(ctx context.Context) bool { } func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) error { - if entry == nil { + prepared, ok, err := s.prepareErrorLogInput(ctx, entry, rawRequestBody) + if err != nil { + log.Printf("[Ops] RecordError prepare failed: %v", err) + return err + } + if !ok { return nil } - if !s.IsMonitoringEnabled(ctx) { + + if _, err := s.opsRepo.InsertErrorLog(ctx, prepared); err != nil { + // Never bubble up to gateway; best-effort logging. + log.Printf("[Ops] RecordError failed: %v", err) + return err + } + return nil +} + +func (s *OpsService) RecordErrorBatch(ctx context.Context, entries []*OpsInsertErrorLogInput) error { + if len(entries) == 0 { return nil } - if s.opsRepo == nil { + prepared := make([]*OpsInsertErrorLogInput, 0, len(entries)) + for _, entry := range entries { + item, ok, err := s.prepareErrorLogInput(ctx, entry, nil) + if err != nil { + log.Printf("[Ops] RecordErrorBatch prepare failed: %v", err) + continue + } + if ok { + prepared = append(prepared, item) + } + } + if len(prepared) == 0 { return nil } + if len(prepared) == 1 { + _, err := s.opsRepo.InsertErrorLog(ctx, prepared[0]) + if err != nil { + log.Printf("[Ops] RecordErrorBatch single insert failed: %v", err) + } + return err + } + + if _, err := s.opsRepo.BatchInsertErrorLogs(ctx, prepared); err != nil { + log.Printf("[Ops] RecordErrorBatch failed, fallback to single inserts: %v", err) + var firstErr error + for _, entry := range prepared { + if _, insertErr := s.opsRepo.InsertErrorLog(ctx, entry); insertErr != nil { + log.Printf("[Ops] RecordErrorBatch fallback insert failed: %v", insertErr) + if firstErr == nil { + firstErr = insertErr + } + } + } + return firstErr + } + return nil +} + +func (s *OpsService) prepareErrorLogInput(ctx context.Context, entry *OpsInsertErrorLogInput, rawRequestBody []byte) (*OpsInsertErrorLogInput, bool, error) { + if entry == nil { + return nil, false, nil + } + if !s.IsMonitoringEnabled(ctx) { + return nil, false, nil + } + if s.opsRepo == nil { + return nil, false, nil + } // Ensure timestamps are always populated. if entry.CreatedAt.IsZero() { @@ -185,85 +245,88 @@ func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogIn } } - // Sanitize + serialize upstream error events list. - if len(entry.UpstreamErrors) > 0 { - const maxEvents = 32 - events := entry.UpstreamErrors - if len(events) > maxEvents { - events = events[len(events)-maxEvents:] - } + if err := sanitizeOpsUpstreamErrors(entry); err != nil { + return nil, false, err + } - sanitized := make([]*OpsUpstreamErrorEvent, 0, len(events)) - for _, ev := range events { - if ev == nil { - continue - } - out := *ev + return entry, true, nil +} - out.Platform = strings.TrimSpace(out.Platform) - out.UpstreamRequestID = truncateString(strings.TrimSpace(out.UpstreamRequestID), 128) - out.Kind = truncateString(strings.TrimSpace(out.Kind), 64) +func sanitizeOpsUpstreamErrors(entry *OpsInsertErrorLogInput) error { + if entry == nil || len(entry.UpstreamErrors) == 0 { + return nil + } - if out.AccountID < 0 { - out.AccountID = 0 - } - if out.UpstreamStatusCode < 0 { - out.UpstreamStatusCode = 0 - } - if out.AtUnixMs < 0 { - out.AtUnixMs = 0 - } + const maxEvents = 32 + events := entry.UpstreamErrors + if len(events) > maxEvents { + events = events[len(events)-maxEvents:] + } - msg := sanitizeUpstreamErrorMessage(strings.TrimSpace(out.Message)) - msg = truncateString(msg, 2048) - out.Message = msg + sanitized := make([]*OpsUpstreamErrorEvent, 0, len(events)) + for _, ev := range events { + if ev == nil { + continue + } + out := *ev - detail := strings.TrimSpace(out.Detail) - if detail != "" { - // Keep upstream detail small; request bodies are not stored here, only upstream error payloads. - sanitizedDetail, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes) - out.Detail = sanitizedDetail - } else { - out.Detail = "" - } + out.Platform = strings.TrimSpace(out.Platform) + out.UpstreamRequestID = truncateString(strings.TrimSpace(out.UpstreamRequestID), 128) + out.Kind = truncateString(strings.TrimSpace(out.Kind), 64) - out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody) - if out.UpstreamRequestBody != "" { - // Reuse the same sanitization/trimming strategy as request body storage. - // Keep it small so it is safe to persist in ops_error_logs JSON. - sanitized, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024) - if sanitized != "" { - out.UpstreamRequestBody = sanitized - if truncated { - out.Kind = strings.TrimSpace(out.Kind) - if out.Kind == "" { - out.Kind = "upstream" - } - out.Kind = out.Kind + ":request_body_truncated" + if out.AccountID < 0 { + out.AccountID = 0 + } + if out.UpstreamStatusCode < 0 { + out.UpstreamStatusCode = 0 + } + if out.AtUnixMs < 0 { + out.AtUnixMs = 0 + } + + msg := sanitizeUpstreamErrorMessage(strings.TrimSpace(out.Message)) + msg = truncateString(msg, 2048) + out.Message = msg + + detail := strings.TrimSpace(out.Detail) + if detail != "" { + // Keep upstream detail small; request bodies are not stored here, only upstream error payloads. + sanitizedDetail, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes) + out.Detail = sanitizedDetail + } else { + out.Detail = "" + } + + out.UpstreamRequestBody = strings.TrimSpace(out.UpstreamRequestBody) + if out.UpstreamRequestBody != "" { + // Reuse the same sanitization/trimming strategy as request body storage. + // Keep it small so it is safe to persist in ops_error_logs JSON. + sanitizedBody, truncated, _ := sanitizeAndTrimRequestBody([]byte(out.UpstreamRequestBody), 10*1024) + if sanitizedBody != "" { + out.UpstreamRequestBody = sanitizedBody + if truncated { + out.Kind = strings.TrimSpace(out.Kind) + if out.Kind == "" { + out.Kind = "upstream" } - } else { - out.UpstreamRequestBody = "" + out.Kind = out.Kind + ":request_body_truncated" } + } else { + out.UpstreamRequestBody = "" } + } - // Drop fully-empty events (can happen if only status code was known). - if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" { - continue - } - - evCopy := out - sanitized = append(sanitized, &evCopy) + // Drop fully-empty events (can happen if only status code was known). + if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" { + continue } - entry.UpstreamErrorsJSON = marshalOpsUpstreamErrors(sanitized) - entry.UpstreamErrors = nil + evCopy := out + sanitized = append(sanitized, &evCopy) } - if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil { - // Never bubble up to gateway; best-effort logging. - log.Printf("[Ops] RecordError failed: %v", err) - return err - } + entry.UpstreamErrorsJSON = marshalOpsUpstreamErrors(sanitized) + entry.UpstreamErrors = nil return nil } diff --git a/backend/internal/service/ops_service_batch_test.go b/backend/internal/service/ops_service_batch_test.go new file mode 100644 index 0000000000..f3a14d7fdd --- /dev/null +++ b/backend/internal/service/ops_service_batch_test.go @@ -0,0 +1,103 @@ +package service + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestOpsServiceRecordErrorBatch_SanitizesAndBatches(t *testing.T) { + t.Parallel() + + var captured []*OpsInsertErrorLogInput + repo := &opsRepoMock{ + BatchInsertErrorLogsFn: func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) { + captured = append(captured, inputs...) + return int64(len(inputs)), nil + }, + } + svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + msg := " upstream failed: https://example.com?access_token=secret-value " + detail := `{"authorization":"Bearer secret-token"}` + entries := []*OpsInsertErrorLogInput{ + { + ErrorBody: `{"error":"bad","access_token":"secret"}`, + UpstreamStatusCode: intPtr(-10), + UpstreamErrorMessage: strPtr(msg), + UpstreamErrorDetail: strPtr(detail), + UpstreamErrors: []*OpsUpstreamErrorEvent{ + { + AccountID: -2, + UpstreamStatusCode: 429, + Message: " token leaked ", + Detail: `{"refresh_token":"secret"}`, + UpstreamRequestBody: `{"api_key":"secret","messages":[{"role":"user","content":"hello"}]}`, + }, + }, + }, + { + ErrorPhase: "upstream", + ErrorType: "upstream_error", + CreatedAt: time.Now().UTC(), + }, + } + + require.NoError(t, svc.RecordErrorBatch(context.Background(), entries)) + require.Len(t, captured, 2) + + first := captured[0] + require.Equal(t, "internal", first.ErrorPhase) + require.Equal(t, "api_error", first.ErrorType) + require.Nil(t, first.UpstreamStatusCode) + require.NotNil(t, first.UpstreamErrorMessage) + require.NotContains(t, *first.UpstreamErrorMessage, "secret-value") + require.Contains(t, *first.UpstreamErrorMessage, "access_token=***") + require.NotNil(t, first.UpstreamErrorDetail) + require.NotContains(t, *first.UpstreamErrorDetail, "secret-token") + require.NotContains(t, first.ErrorBody, "secret") + require.Nil(t, first.UpstreamErrors) + require.NotNil(t, first.UpstreamErrorsJSON) + require.NotContains(t, *first.UpstreamErrorsJSON, "secret") + require.Contains(t, *first.UpstreamErrorsJSON, "[REDACTED]") + + second := captured[1] + require.Equal(t, "upstream", second.ErrorPhase) + require.Equal(t, "upstream_error", second.ErrorType) + require.False(t, second.CreatedAt.IsZero()) +} + +func TestOpsServiceRecordErrorBatch_FallsBackToSingleInsert(t *testing.T) { + t.Parallel() + + var ( + batchCalls int + singleCalls int + ) + repo := &opsRepoMock{ + BatchInsertErrorLogsFn: func(ctx context.Context, inputs []*OpsInsertErrorLogInput) (int64, error) { + batchCalls++ + return 0, errors.New("batch failed") + }, + InsertErrorLogFn: func(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) { + singleCalls++ + return int64(singleCalls), nil + }, + } + svc := NewOpsService(repo, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + + err := svc.RecordErrorBatch(context.Background(), []*OpsInsertErrorLogInput{ + {ErrorMessage: "first"}, + {ErrorMessage: "second"}, + }) + require.NoError(t, err) + require.Equal(t, 1, batchCalls) + require.Equal(t, 2, singleCalls) +} + +func strPtr(v string) *string { + return &v +}