Skip to content

refactor: modernize balance sync worker - remove legacy code, batch UPDATE, structured logging, MT terminology#2034

Merged
ClaraTersi merged 31 commits intodevelopfrom
feature/balance-sync-mt
Apr 6, 2026
Merged

refactor: modernize balance sync worker - remove legacy code, batch UPDATE, structured logging, MT terminology#2034
ClaraTersi merged 31 commits intodevelopfrom
feature/balance-sync-mt

Conversation

@ClaraTersi
Copy link
Copy Markdown
Member

Summary

Comprehensive review and modernization of the balance sync pipeline (worker, collector, Redis adapter, PostgreSQL repository, and use case).

Changes

Legacy code removal (-1000+ lines)

  • Removed expiration-based sync path (waitForNextOrBackoff, waitUntilDue, processBalancesToExpire, processBalanceToExpire, shouldShutdown)
  • Removed individual Sync/SyncBalance methods (replaced by UpdateMany/SyncBalancesBatch)
  • Removed unconditional RemoveBalanceSyncKey and unschedule_synced_balance.lua in favor of batch-only conditional removal
  • Removed dead struct fields (redisConn, batchSize, maxWorkers) and deprecated BALANCE_SYNC_MAX_WORKERS env var
  • Removed test-only Size() method from collector

Performance

  • Replaced N individual UPDATEs inside a transaction with a single UPDATE ... FROM (VALUES ...) statement in UpdateMany (1 round-trip vs N)
  • Added deduplication by balance ID in UpdateMany to prevent version mismatch when the same ID appears multiple times

Observability

  • Migrated all fmt.Sprintf log calls to structured fields (libLog.Err, libLog.String, libLog.Int) across worker, collector, Redis adapter, and use case
  • Added span instrumentation to error paths that were missing it
  • Demoted idle/polling logs from INFO to DEBUG to reduce noise in production

Naming & conventions

  • Adopted MT suffix for multi-tenant code (runWorkerMT, NewBalanceSyncWorkerMT, isMTReady, mtEnabled)
  • Renamed files to dot-separated pattern (balance_sync.worker.go, balance_sync.collector.go)
  • Renamed Lua script: get_balances_near_expiration.luaclaim_balance_sync_keys.lua
  • Renamed processBalancesToExpireBatchprocessSyncBatch, SyncBatchUpdateMany
  • Renamed aggregator types with Sync prefix (InMemorySyncAggregator, SyncAggregator)

API cleanup

  • Moved flush callback from SetFlushCallback into Run() as required parameter
  • Removed idleBackoff dead field from collector constructor
  • Simplified worker constructors (removed conn and maxWorkers params)
  • Added FlushTimeout()/PollInterval() duration methods to BalanceSyncConfig

Documentation

  • Updated CLAUDE.md and PROJECT_RULES.md with current conventions (file naming, structured logging, MT terminology, balance sync worker description)
  • Added comprehensive code comments explaining non-obvious design decisions (ZSET score as concurrency token, stopAndDrain timer pattern, context.WithoutCancel in shutdown flush, etc.)
  • Standardized RedisRepository interface with doc comments on all 16 methods

Tests

  • Separated balance sync tests from RedisQueueConsumer tests into dedicated files
  • Removed ~8 duplicate test functions (isMTReady tested in 5+ places, waitOrDone duplicated)
  • Added unit tests for parseSyncKeysFromLuaResult resilience (bad score, odd elements, unexpected types)
  • Fixed test reliability: replaced time.Sleep with channel sync, added iteration index to property test subtests
  • Renamed test files to follow balance_sync.{type}_test.go convention

…mproved data handling and atomic operations ✨
@ClaraTersi ClaraTersi requested a review from a team as a code owner April 5, 2026 21:21
@ClaraTersi ClaraTersi requested a review from a team as a code owner April 5, 2026 21:21
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 5, 2026

Walkthrough

Refactors ledger balance sync from a max-concurrency worker to a dual-trigger batching collector with distributed Redis locking. Replaces per-balance Sync/SyncBatch APIs with UpdateMany using a single UPDATE ... FROM (VALUES ...) batch statement. Removes BALANCE_SYNC_MAX_WORKERS in favor of size/time/poll env vars; renames aggregator and updates logging.

Changes

Cohort / File(s) Summary
Configuration & Docs
CLAUDE.md, docs/PROJECT_RULES.md, components/ledger/.env.example
Removed BALANCE_SYNC_MAX_WORKERS; added BALANCE_SYNC_BATCH_SIZE, BALANCE_SYNC_FLUSH_TIMEOUT_MS, BALANCE_SYNC_POLL_INTERVAL_MS. Updated file-naming guidance and logging conventions.
Postgres balance persistence
components/ledger/internal/adapters/postgres/balance/balance.postgresql.go, components/ledger/internal/adapters/postgres/balance/balance.postgresql_mock.go, components/ledger/internal/adapters/postgres/balance/balance.postgresql_integration_test.go
Removed Sync/SyncBatch; added UpdateMany that deduplicates by id/version and issues a single UPDATE ... FROM (VALUES ...) per batch. Updated mocks and integration tests.
Redis aggregation
components/ledger/internal/adapters/redis/transaction/balance/aggregation.go, .../aggregation_test.go
Renamed BalanceAggregatorSyncAggregator, InMemoryAggregatorInMemorySyncAggregator; adjusted span name and error messages; tests renamed.
Redis consumer API & parsing
components/ledger/internal/adapters/redis/transaction/consumer.redis.go, .../consumer.redis_mock.go, .../consumer.redis_test.go
Expanded RedisRepository interface (SetNX/MGet/Del/Incr/SetBytes/GetBytes/queue methods/ListBalanceByKey); removed RemoveBalanceSyncKey. Switched to claim_balance_sync_keys.lua, added parseSyncKeysFromLuaResult, and converted logs to structured form.
Lua scripts
components/ledger/internal/adapters/redis/transaction/scripts/claim_balance_sync_keys.lua, components/ledger/internal/adapters/redis/transaction/scripts/unschedule_synced_balance.lua
Added claim_balance_sync_keys.lua (claims due keys with locking and returns claimed entries). Removed unschedule_synced_balance.lua.
Collector implementation
components/ledger/internal/bootstrap/balance_sync.collector.go, components/ledger/internal/bootstrap/balance_sync.collector_test.go
Collector API changed: Run now accepts flushFn (removed SetFlushCallback), removed idleBackoff and Size(), adjusted fetch sizing, timer/drain and shutdown behavior, replaced waitOrShutdown with waitOrDone, and switched to structured logging. Tests updated.
New worker implementation
components/ledger/internal/bootstrap/balance_sync.worker.go, components/ledger/internal/bootstrap/balance_sync.worker_test.go, components/ledger/internal/bootstrap/balance_sync.worker_integration_test.go
Added BalanceSyncWorker and BalanceSyncConfig implementing dual-trigger batching (size + timeout), single-tenant and multi-tenant modes, key claiming/grouping by (orgID,ledgerID), per-batch UpdateMany, metrics and shutdown semantics. Tests/integration updated or added.
Bootstrap wiring / config
components/ledger/internal/bootstrap/config.go
Removed BalanceSyncMaxWorkers from Config, dropped redisConn param from initBalanceSyncWorker, and switched constructor calls to new worker APIs; startup logging converted to structured fields.
Removed legacy worker & tests
components/ledger/internal/bootstrap/balance.worker.go, components/ledger/internal/bootstrap/balance.worker_test.go, various workers_multitenant_* test files
Deleted legacy max-workers balance worker and many obsolete tests tied to the old worker model.
Use-case / command layer
components/ledger/internal/services/command/sync-balance.go, components/ledger/internal/services/command/sync-balance_test.go, components/ledger/internal/services/command/sync_balances_batch.go, components/ledger/internal/services/command/sync_balances_batch_test.go
Removed single-balance SyncBalance and its tests. SyncBalancesBatch now uses NewInMemorySyncAggregator() and calls BalanceRepo.UpdateMany(...); improved orphan-key detection and structured logging. Tests/mocks updated to expect UpdateMany.
Misc Redis tests & mocks
components/ledger/internal/adapters/redis/transaction/consumer.redis_atomic_state_test.go, components/ledger/internal/adapters/redis/transaction/consumer.redis_mock.go, other adjusted tests
Removed tests and mock support referencing RemoveBalanceSyncKey; added unit tests for parseSyncKeysFromLuaResult and adjusted script-name expectations.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as BalanceSyncWorker
    participant Collector as BalanceSyncCollector
    participant Redis as Redis ZSET
    participant Lua as claim_balance_sync_keys.lua
    participant Aggregator as SyncAggregator
    participant DB as PostgreSQL

    Worker->>Collector: Run(ctx, flushFn, fetchKeys, waitForNext)
    activate Collector
    loop running
        Collector->>Redis: EVALSHA claim_balance_sync_keys.lua (limit)
        Redis->>Lua: execute script
        Lua-->>Redis: claimed members (member, score) with lock TTL
        Redis-->>Collector: []SyncKey
        Collector->>Collector: buffer keys
        alt buffer size ≥ batchSize or timer elapsed
            Collector->>Aggregator: Aggregate(buffered keys)
            Aggregator-->>Collector: []*AggregatedBalance
            Collector->>Worker: flushFn(grouped by orgID/ledgerID)
            Worker->>DB: UpdateMany(orgID, ledgerID, balances...)
            DB-->>Worker: RowsAffected
            Worker->>Redis: RemoveBalanceSyncKeysBatch(claimed keys)
            Redis-->>Worker: removed count
            Worker-->>Collector: flush result
            Collector->>Collector: reset buffer/timer
        else
            Collector->>Collector: wait (poll interval) or continue
        end
    end
    deactivate Collector
Loading

Comment @coderabbitai help to get the list of available commands and usage tips.

@lerian-studio
Copy link
Copy Markdown
Contributor

This PR is very large (35 files, 6683 lines changed). Consider breaking it into smaller PRs for easier review.

@lerian-studio
Copy link
Copy Markdown
Contributor

lerian-studio commented Apr 5, 2026

🔒 Security Scan Results — ledger

Trivy

Filesystem Scan

✅ No vulnerabilities or secrets found.

Docker Image Scan

✅ No vulnerabilities found.


Docker Hub Health Score Compliance

✅ Policies — 4/4 met

Policy Status
Default non-root user ✅ Passed
No fixable critical/high CVEs ✅ Passed
No high-profile vulnerabilities ✅ Passed
No AGPL v3 licenses ✅ Passed

🔍 View full scan logs

@lerian-studio
Copy link
Copy Markdown
Contributor

lerian-studio commented Apr 5, 2026

📊 Unit Test Coverage Report: midaz-ledger

Metric Value
Overall Coverage 86.9% ✅ PASS
Threshold 85%

Coverage by Package

Package Coverage
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/http/in 86.5%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/mongodb/onboarding 66.7%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/mongodb/transaction 66.7%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/account 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/accounttype 66.7%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/asset 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/assetrate 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/balance 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/ledger 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/operation 90.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/operationroute 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/organization 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/portfolio 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/segment 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/transaction 97.4%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/postgres/transactionroute 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/rabbitmq 91.5%
github.com/LerianStudio/midaz/v3/components/ledger/internal/adapters/redis/transaction/balance 100.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/services/command 88.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/services/query 93.0%
github.com/LerianStudio/midaz/v3/components/ledger/internal/services 0.0%

Generated by Go PR Analysis workflow

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
components/ledger/internal/bootstrap/redis_consumer.worker_mt_fuzz_test.go (1)

117-133: 🧹 Nitpick | 🔵 Trivial

This worker fuzz branch never exercises isMTReady() == true.

tenantCache is never set here, so expectWorkerReady is effectively hard-wired to false and the positive predicate path cannot regress under fuzz coverage. Add a hasTenantCache dimension, or seed at least one non-nil cache case. The consumer half below has the same blind spot.

Also applies to: 138-139

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/ledger/internal/bootstrap/redis_consumer.worker_mt_fuzz_test.go`
around lines 117 - 133, The fuzz test never sets worker.tenantCache so
isMTReady() can never be true; update the fuzz inputs to include a
hasTenantCache boolean (or seed a non-nil cache case) and when true assign a
valid tenant cache to worker.tenantCache before calling isMTReady(); do the same
change for the consumer branch later in the file to ensure both positive
predicate paths for NewBalanceSyncWorker.isMTReady() are exercised (reference
NewBalanceSyncWorker, worker.mtEnabled, worker.pgManager, worker.tenantCache,
and isMTReady()).
components/ledger/internal/bootstrap/config.go (1)

837-856: ⚠️ Potential issue | 🟡 Minor

Log the effective sync config, not the raw env values.

NewBalanceSyncWorker/NewBalanceSyncWorkerMT clamp <= 0 values to safe defaults, but this log still emits the pre-normalized syncCfg. With an invalid env value you'll report one configuration and run another, which makes startup diagnostics misleading.

📝 Proposed fix
 func initBalanceSyncWorker(opts *Options, cfg *Config, logger libLog.Logger, commandUC *command.UseCase, pgManager *tmpostgres.Manager, tenantServiceName string) *BalanceSyncWorker {
 	syncCfg := BalanceSyncConfig{
 		BatchSize:      cfg.BalanceSyncBatchSize,
 		FlushTimeoutMs: cfg.BalanceSyncFlushTimeoutMs,
 		PollIntervalMs: cfg.BalanceSyncPollIntervalMs,
 	}
@@
 	if opts != nil && opts.MultiTenantEnabled && opts.TenantCache != nil {
 		balanceSyncWorker = NewBalanceSyncWorkerMT(logger, commandUC, syncCfg, true, opts.TenantCache, pgManager, tenantServiceName)
 	} else {
 		balanceSyncWorker = NewBalanceSyncWorker(logger, commandUC, syncCfg)
 	}
 
+	effectiveCfg := balanceSyncWorker.syncConfig
 	logger.Log(context.Background(), libLog.LevelInfo, "BalanceSyncWorker enabled",
-		libLog.Int("batch_size", syncCfg.BatchSize),
-		libLog.Int("flush_timeout_ms", syncCfg.FlushTimeoutMs),
-		libLog.Int("poll_interval_ms", syncCfg.PollIntervalMs),
+		libLog.Int("batch_size", effectiveCfg.BatchSize),
+		libLog.Int("flush_timeout_ms", effectiveCfg.FlushTimeoutMs),
+		libLog.Int("poll_interval_ms", effectiveCfg.PollIntervalMs),
 	)
 
 	return balanceSyncWorker
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/ledger/internal/bootstrap/config.go` around lines 837 - 856, The
current startup log prints the pre-normalized syncCfg but
NewBalanceSyncWorker/NewBalanceSyncWorkerMT clamp invalid values, so replace the
log of syncCfg with the worker's effective config: after creating
balanceSyncWorker (via NewBalanceSyncWorker or NewBalanceSyncWorkerMT) obtain
the runtime-normalized values from the worker (e.g., add/consume an accessor
like BalanceSyncWorker.Config() or getters for
BatchSize/FlushTimeoutMs/PollIntervalMs) and log those values instead; remove or
update the existing logger.Log call that references the original syncCfg to use
the values returned by balanceSyncWorker so startup diagnostics reflect what
actually runs.
components/ledger/internal/bootstrap/balance_sync.collector.go (1)

108-123: 🧹 Nitpick | 🔵 Trivial

Keep the fetch-error backoff logs collector-scoped.

waitOrDone in components/ledger/internal/bootstrap/balance_sync.worker.go:552-560 emits BalanceSyncWorker log messages. Reusing it here means collector fetch retries will be mislabeled in production, which undercuts the new structured logging. Either make that helper component-agnostic or keep the wait local to the collector.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/ledger/internal/bootstrap/balance_sync.collector.go` around lines
108 - 123, The collector currently calls waitOrDone(ctx, c.pollInterval,
c.logger) which emits BalanceSyncWorker logs, mislabeling collector
retry/backoff messages; replace that call with a collector-scoped wait that uses
c.logger and the collector identity (BalanceSyncCollector) or make waitOrDone
accept an explicit component label. Concretely, in the BalanceSyncCollector code
path (method on c) remove the waitOrDone call and implement a local
context-aware wait using c.pollInterval (select on ctx.Done() and
time.After(c.pollInterval)) so the log and any timeout handling are emitted via
c.logger and attributed to BalanceSyncCollector. Ensure you reference the
existing c.logger, c.pollInterval, and BalanceSyncCollector context when making
the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/ledger/internal/adapters/postgres/balance/balance.postgresql.go`:
- Around line 1451-1486: The UpdateMany implementation builds a VALUES list with
4 params per balance plus 3 shared params and must guard against PostgreSQL's
65,535 bind-parameter limit; add a constant like maxUpdateManyRows = (65535-3)/4
and check len(deduped) against it before constructing values (in the UpdateMany
function), returning an error if exceeded or alternatively implement chunking by
iterating over deduped in slices of maxUpdateManyRows and executing the same
query per chunk while accumulating affected rows; reference the deduped slice,
the VALUES construction loop (valuesClauses/args/paramIdx), and the
db.ExecContext call when adding the guard or chunking logic.

In
`@components/ledger/internal/bootstrap/balance_sync.worker_integration_test.go`:
- Around line 106-109: The test currently checks removal indirectly via
GetBalanceSyncKeys (which filters locked members), so replace or complement that
check with a direct Redis sorted-set assertion: call the Redis ZSCORE or ZRANK
on the schedule key for the balanceKey used in the test and assert the result is
nil/not found (or returns redis.Nil) after RemoveBalanceSyncKeysBatch() runs;
locate the test in balance_sync.worker_integration_test.go around the loop that
iterates keysAfter and add a direct ZSET query against the same schedule Redis
key to ensure RemoveBalanceSyncKeysBatch actually deleted the member rather than
just leaving a claim lock.

In `@components/ledger/internal/bootstrap/balance_sync.worker_mt_fuzz_test.go`:
- Around line 16-25: The fuzz test function name
FuzzNewBalanceSyncWorkerMT_MaxWorkers is outdated because NewBalanceSyncWorkerMT
no longer takes a MaxWorkers parameter; rename the test to match the current
constructor API (e.g., FuzzNewBalanceSyncWorkerMT) and update the comment text
to remove any reference to MaxWorkers so the corpus and go test -fuzz output are
not misleading; ensure you change the function identifier
(FuzzNewBalanceSyncWorkerMT_MaxWorkers) wherever referenced and keep the body
and verified properties unchanged except for the name/comment edits.

In `@components/ledger/internal/bootstrap/balance_sync.worker_test.go`:
- Around line 209-212: The loop in the test iterating over testCases contains
redundant per-iteration rebindings (tc := tc, orgID := orgID, ledgerID :=
ledgerID); remove these unnecessary assignments and use the loop variables
directly in the body (the for loop over testCases and any uses of tc, orgID,
ledgerID) since Go 1.22+ scopes loop variables per iteration.

In `@components/ledger/internal/bootstrap/balance_sync.worker.go`:
- Around line 416-431: The flush callback is incorrectly reporting "no work"
when keys are cleaned up; update the logic so any successful removal/consumption
of keys counts as progress. Specifically, when iterating groups from
groupKeysByOrgLedger and calling processSyncBatch (or when processSyncBatch
invokes RemoveBalanceSyncKeysBatch), ensure processSyncBatch returns true if it
performed any cleanup/consumption of keys (not only when new balances were
processed), and set processed = true in the caller whenever processSyncBatch
indicates cleanup; alternatively, detect RemoveBalanceSyncKeysBatch success in
the caller and set processed = true so the FlushFunc contract is honored.
- Around line 538-541: The log call in BalanceSyncWorker is using redundant
int(...) conversions on fields that are already int (result.KeysProcessed,
result.BalancesAggregated, result.BalancesSynced); remove the unnecessary casts
in the w.logger.Log invocation (keep libLog.Int("processed",
result.KeysProcessed), libLog.Int("aggregated", result.BalancesAggregated),
libLog.Int("synced", result.BalancesSynced)) so the types are correct and
golangci-lint/unconvert warnings are resolved.

In `@components/ledger/internal/bootstrap/redis_consumer.worker_mt_test.go`:
- Around line 307-360: The test
TestRedisQueueConsumer_RunDispatchesBasedOnMultiTenantReady currently only calls
isMultiTenantReady and doesn't verify that Run() actually dispatches to the
correct branch; update the test to call consumer.Run(...) and make the selected
path observable by injecting a spy or stubbing the consumer's runSingleTenant
and runMultiTenant methods (or wrapping NewRedisQueueConsumer to set flags) so
you can assert which branch was executed; use the unique symbols
NewRedisQueueConsumer, Run, isMultiTenantReady, runSingleTenant and
runMultiTenant to locate and modify the test so it verifies Run() dispatch
rather than just the predicate.

---

Outside diff comments:
In `@components/ledger/internal/bootstrap/balance_sync.collector.go`:
- Around line 108-123: The collector currently calls waitOrDone(ctx,
c.pollInterval, c.logger) which emits BalanceSyncWorker logs, mislabeling
collector retry/backoff messages; replace that call with a collector-scoped wait
that uses c.logger and the collector identity (BalanceSyncCollector) or make
waitOrDone accept an explicit component label. Concretely, in the
BalanceSyncCollector code path (method on c) remove the waitOrDone call and
implement a local context-aware wait using c.pollInterval (select on ctx.Done()
and time.After(c.pollInterval)) so the log and any timeout handling are emitted
via c.logger and attributed to BalanceSyncCollector. Ensure you reference the
existing c.logger, c.pollInterval, and BalanceSyncCollector context when making
the change.

In `@components/ledger/internal/bootstrap/config.go`:
- Around line 837-856: The current startup log prints the pre-normalized syncCfg
but NewBalanceSyncWorker/NewBalanceSyncWorkerMT clamp invalid values, so replace
the log of syncCfg with the worker's effective config: after creating
balanceSyncWorker (via NewBalanceSyncWorker or NewBalanceSyncWorkerMT) obtain
the runtime-normalized values from the worker (e.g., add/consume an accessor
like BalanceSyncWorker.Config() or getters for
BatchSize/FlushTimeoutMs/PollIntervalMs) and log those values instead; remove or
update the existing logger.Log call that references the original syncCfg to use
the values returned by balanceSyncWorker so startup diagnostics reflect what
actually runs.

In `@components/ledger/internal/bootstrap/redis_consumer.worker_mt_fuzz_test.go`:
- Around line 117-133: The fuzz test never sets worker.tenantCache so
isMTReady() can never be true; update the fuzz inputs to include a
hasTenantCache boolean (or seed a non-nil cache case) and when true assign a
valid tenant cache to worker.tenantCache before calling isMTReady(); do the same
change for the consumer branch later in the file to ensure both positive
predicate paths for NewBalanceSyncWorker.isMTReady() are exercised (reference
NewBalanceSyncWorker, worker.mtEnabled, worker.pgManager, worker.tenantCache,
and isMTReady()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0cef962c-d299-4f93-8f68-390aa3201dc5

📥 Commits

Reviewing files that changed from the base of the PR and between a838c5e and 51b8a39.

📒 Files selected for processing (35)
  • CLAUDE.md
  • components/ledger/.env.example
  • components/ledger/internal/adapters/postgres/balance/balance.postgresql.go
  • components/ledger/internal/adapters/postgres/balance/balance.postgresql_integration_test.go
  • components/ledger/internal/adapters/postgres/balance/balance.postgresql_mock.go
  • components/ledger/internal/adapters/redis/transaction/balance/aggregation.go
  • components/ledger/internal/adapters/redis/transaction/balance/aggregation_test.go
  • components/ledger/internal/adapters/redis/transaction/consumer.redis.go
  • components/ledger/internal/adapters/redis/transaction/consumer.redis_atomic_state_test.go
  • components/ledger/internal/adapters/redis/transaction/consumer.redis_mock.go
  • components/ledger/internal/adapters/redis/transaction/consumer.redis_test.go
  • components/ledger/internal/adapters/redis/transaction/scripts/claim_balance_sync_keys.lua
  • components/ledger/internal/adapters/redis/transaction/scripts/unschedule_synced_balance.lua
  • components/ledger/internal/bootstrap/balance.worker.go
  • components/ledger/internal/bootstrap/balance.worker_test.go
  • components/ledger/internal/bootstrap/balance_sync.collector.go
  • components/ledger/internal/bootstrap/balance_sync.collector_test.go
  • components/ledger/internal/bootstrap/balance_sync.mt_lifecycle_test.go
  • components/ledger/internal/bootstrap/balance_sync.worker.go
  • components/ledger/internal/bootstrap/balance_sync.worker_integration_test.go
  • components/ledger/internal/bootstrap/balance_sync.worker_mt_fuzz_test.go
  • components/ledger/internal/bootstrap/balance_sync.worker_mt_property_test.go
  • components/ledger/internal/bootstrap/balance_sync.worker_mt_test.go
  • components/ledger/internal/bootstrap/balance_sync.worker_test.go
  • components/ledger/internal/bootstrap/config.go
  • components/ledger/internal/bootstrap/redis_consumer.worker_mt_fuzz_test.go
  • components/ledger/internal/bootstrap/redis_consumer.worker_mt_property_test.go
  • components/ledger/internal/bootstrap/redis_consumer.worker_mt_test.go
  • components/ledger/internal/bootstrap/workers_multitenant_property_test.go
  • components/ledger/internal/bootstrap/workers_multitenant_test.go
  • components/ledger/internal/services/command/sync-balance.go
  • components/ledger/internal/services/command/sync-balance_test.go
  • components/ledger/internal/services/command/sync_balances_batch.go
  • components/ledger/internal/services/command/sync_balances_batch_test.go
  • docs/PROJECT_RULES.md
💤 Files with no reviewable changes (10)
  • components/ledger/.env.example
  • components/ledger/internal/adapters/redis/transaction/consumer.redis_atomic_state_test.go
  • components/ledger/internal/adapters/redis/transaction/scripts/unschedule_synced_balance.lua
  • components/ledger/internal/adapters/redis/transaction/consumer.redis_mock.go
  • components/ledger/internal/services/command/sync-balance.go
  • components/ledger/internal/bootstrap/workers_multitenant_test.go
  • components/ledger/internal/bootstrap/balance.worker_test.go
  • components/ledger/internal/bootstrap/workers_multitenant_property_test.go
  • components/ledger/internal/services/command/sync-balance_test.go
  • components/ledger/internal/bootstrap/balance.worker.go

@lerian-studio
Copy link
Copy Markdown
Contributor

This PR is very large (35 files, 6698 lines changed). Consider breaking it into smaller PRs for easier review.

@blacksmith-sh

This comment has been minimized.

@lerian-studio
Copy link
Copy Markdown
Contributor

This PR is very large (35 files, 6700 lines changed). Consider breaking it into smaller PRs for easier review.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/ledger/internal/bootstrap/balance_sync.collector_test.go (1)

753-759: ⚠️ Potential issue | 🟡 Minor

Replace sleep-based synchronization in idle-resume test.

time.Sleep at Line 758 can make this test timing-sensitive in CI. Prefer waiting on a deterministic signal (fetchCallCount or a channel).

Proposed deterministic wait
  // Wait for collector to reach idle and call waitForNext
- time.Sleep(200 * time.Millisecond)
+ require.Eventually(t, func() bool {
+   return fetchCallCount.Load() >= 1
+ }, 3*time.Second, 10*time.Millisecond, "collector should perform initial fetch before resume")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/ledger/internal/bootstrap/balance_sync.collector_test.go` around
lines 753 - 759, The test uses time.Sleep to wait for the collector to become
idle; replace this flaky sleep with a deterministic wait by blocking on a signal
or counter used in the test (e.g., wait for fetchCallCount to reach the expected
value or receive from waitCh) after starting c.Run(ctx, rec.record, fetchFn,
waitForNextResume(waitCh)). Specifically, remove the time.Sleep(200 *
time.Millisecond) and instead wait until the fetchCallCount (or the channel
waitCh) indicates the fetch has been invoked the expected number of times (with
a small context/timeout to avoid hanging); keep the existing c.Run/close(done)
semantics and use the same test-level timeout to fail fast in CI.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@components/ledger/internal/bootstrap/balance_sync.collector_test.go`:
- Around line 753-759: The test uses time.Sleep to wait for the collector to
become idle; replace this flaky sleep with a deterministic wait by blocking on a
signal or counter used in the test (e.g., wait for fetchCallCount to reach the
expected value or receive from waitCh) after starting c.Run(ctx, rec.record,
fetchFn, waitForNextResume(waitCh)). Specifically, remove the time.Sleep(200 *
time.Millisecond) and instead wait until the fetchCallCount (or the channel
waitCh) indicates the fetch has been invoked the expected number of times (with
a small context/timeout to avoid hanging); keep the existing c.Run/close(done)
semantics and use the same test-level timeout to fail fast in CI.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: ea46e4cd-e2ec-42d6-8ab7-953c34e50bd9

📥 Commits

Reviewing files that changed from the base of the PR and between de399b9 and 724479d.

📒 Files selected for processing (1)
  • components/ledger/internal/bootstrap/balance_sync.collector_test.go

Copy link
Copy Markdown

@gandalf-at-lerian gandalf-at-lerian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid refactor. +2501/−4199 across 35 files and it reads cleaner than the original — that's the mark of good work.

What's good:

  • The UPDATE ... FROM (VALUES ...) in UpdateMany is a real performance win. N round-trips → 1. The client-side dedup by balance ID before the query is correctly motivated — without it, PG would join one target row to multiple source rows non-deterministically.
  • Removing Sync (individual), RemoveBalanceSyncKey, and unschedule_synced_balance.lua in favor of batch-only paths simplifies the mental model. One path to understand instead of two.
  • The collector API change (flush callback as a Run() parameter instead of SetFlushCallback) eliminates the footgun of forgetting to set it. Good call.
  • Structured logging migration from fmt.Sprintf → typed fields is the right pattern for production. The demotion of idle/polling logs to DEBUG will reduce noise significantly.
  • Poison record cleanup is belt-and-suspenders: worker catches unparseable keys in groupKeysByOrgLedger, use case catches expired/unparseable in orphanedKeys. Both paths ZREM.
  • MT reconciliation with 4 phases (reap dead → stop removed → start new → wait for removed) is clean. Non-blocking cancel + batched wait is the right pattern.
  • extractIDsFromMember — allocation-free UUID scanner, position-independent. Neat.
  • Property tests and fuzz tests for the new paths are a welcome addition.

Minor nits (non-blocking):

  1. parseSyncKeysFromLuaResult(res any, logger libLog.Logger, ctx context.Context)ctx should be the first parameter per Go convention and your own PROJECT_RULES.md (Context: Always first param). Swap to (ctx context.Context, res any, logger libLog.Logger).
  2. claim_balance_sync_keys.lua is missing a trailing newline (return claimed with no \n). Most editors and linters prefer POSIX-compliant final newline.

Both are trivial — not worth holding the PR for.

@ClaraTersi ClaraTersi merged commit 39cd486 into develop Apr 6, 2026
27 checks passed
@ClaraTersi ClaraTersi deleted the feature/balance-sync-mt branch April 6, 2026 21:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants