-
Notifications
You must be signed in to change notification settings - Fork 5
feat(tenant-manager): add TenantDiscovery orchestrator for background workers #415
Description
Context
Consumer services have two types of components that need to know which tenants are active:
-
HTTP handlers — tenant ID comes from JWT in the request. The
TenantMiddlewareresolves connections via lazy-load + Pub/Sub cache. Works because each request carries its own tenant context. -
Background workers (e.g.,
BalanceSyncWorker, cron jobs) — run outside the HTTP request cycle. No JWT, no request, no lazy-load trigger. They need to iterate all active tenants to process each one. Today theBalanceSyncWorkerin midaz callsGetActiveTenantsByServicebut has no Pub/Sub listener — it doesn't know when tenants are added/removed in real time.
The building blocks already exist in lib-commons but each consumer has to wire them manually, duplicating ~50 lines of bootstrap code across services.
Problem
- Background workers that don't share the HTTP server's
TenantCachehave no way to discover tenants in real time - After boot, if a tenant is associated/suspended/removed, the worker doesn't know until the next polling cycle (or restart)
- Each consumer duplicates the Redis client setup + event listener + cache + bootstrap wiring
- No single entry point that combines bootstrap (
GET /tenants/active) with real-time updates (Pub/Sub)
Solution
Add a TenantDiscovery orchestrator in commons/tenant-manager/discovery/ that combines:
- Bootstrap: calls
GET /tenants/active?service={service}to populate the cache on start - Real-time: starts
NewTenantEventListenerfor Pub/Sub updates (add/remove/suspend) - Cache: maintains an in-memory list of active tenant IDs, always up to date
Proposed API
package discovery
import (
tmclient "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/client"
tmredis "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/redis"
)
type Config struct {
ServiceName string // APPLICATION_NAME (e.g., "ledger")
TMClient *tmclient.Client // tenant-manager HTTP client (for bootstrap)
RedisConfig tmredis.TenantPubSubRedisConfig // Redis Pub/Sub config
Logger log.Logger
}
type TenantDiscovery struct { ... }
// NewTenantDiscovery creates the orchestrator. Does NOT start anything yet.
func NewTenantDiscovery(cfg Config) (*TenantDiscovery, error)
// Start bootstraps from /tenants/active and starts the Pub/Sub listener.
// 1. GET /tenants/active?service={serviceName} → populates cache
// 2. NewTenantEventListener → subscribes to tenant-events:*
// Returns after bootstrap completes. Pub/Sub runs in background goroutine.
func (d *TenantDiscovery) Start(ctx context.Context) error
// ActiveTenantIDs returns the current list of active tenant IDs from cache.
// Safe for concurrent use. Workers call this on each tick to iterate tenants.
func (d *TenantDiscovery) ActiveTenantIDs() []string
// OnTenantAdded registers a callback for when a new tenant is discovered.
func (d *TenantDiscovery) OnTenantAdded(fn func(ctx context.Context, tenantID string))
// OnTenantRemoved registers a callback for when a tenant is removed/suspended.
func (d *TenantDiscovery) OnTenantRemoved(fn func(ctx context.Context, tenantID string))
// Close stops the Pub/Sub listener and cleans up.
func (d *TenantDiscovery) Close() errorUsage in a background worker
// Worker bootstrap
discovery, _ := tmdiscovery.NewTenantDiscovery(tmdiscovery.Config{
ServiceName: cfg.ApplicationName,
TMClient: tenantClient,
RedisConfig: tmredis.TenantPubSubRedisConfig{
Host: cfg.MultiTenantRedisHost,
Port: cfg.MultiTenantRedisPort,
Password: cfg.MultiTenantRedisPassword,
TLS: cfg.MultiTenantRedisTLS,
},
Logger: logger,
})
discovery.Start(ctx)
// Worker loop
for {
for _, tenantID := range discovery.ActiveTenantIDs() {
conn, _ := pgManager.GetConnection(ctx, tenantID)
processBalances(ctx, conn)
}
time.Sleep(interval)
}Internal flow
Start()
├── GET /tenants/active?service=ledger
│ └── populate cache: [org_01A, org_01B, org_01C]
└── NewTenantEventListener(redis, dispatcher)
└── PSUBSCRIBE tenant-events:*
├── tenant.service.associated → add to cache + OnTenantAdded callback
├── tenant.service.disassociated → remove from cache + OnTenantRemoved callback
├── tenant.suspended → remove from cache + OnTenantRemoved callback
└── tenant.credentials.rotated → reload via /connections
Components to reuse (already in lib-commons)
| Component | Package | Purpose |
|---|---|---|
NewTenantPubSubRedisClient |
commons/tenant-manager/redis |
Redis client for Pub/Sub |
NewTenantEventListener |
commons/tenant-manager/event |
Pub/Sub subscriber |
EventDispatcher |
commons/tenant-manager/event |
Routes events to handlers |
TenantCache |
commons/tenant-manager/cache |
In-memory cache with TTL |
Client.GetActiveTenantsByService |
commons/tenant-manager/client |
Bootstrap via HTTP |
Consumers that will use this
| Consumer | Service | Current approach | After |
|---|---|---|---|
BalanceSyncWorker (midaz ledger) |
ledger |
GetActiveTenantsByService polling, no Pub/Sub |
TenantDiscovery |
| Future cron jobs | any | Would need to duplicate wiring | TenantDiscovery |
| Fetcher worker | fetcher |
Not yet multi-tenant | TenantDiscovery when ready |
Acceptance criteria
-
NewTenantDiscovery(cfg)validates required fields -
Start(ctx)callsGetActiveTenantsByServiceand populates cache -
Start(ctx)creates and startsNewTenantEventListener -
ActiveTenantIDs()returns current tenant list from cache (concurrency-safe) -
OnTenantAdded/OnTenantRemovedcallbacks fire on Pub/Sub events -
Close()stops listener and cleans up - If Redis Pub/Sub is unavailable at start, logs warning and continues with bootstrap-only mode
- If
GetActiveTenantsByServicefails at start, returns error (fail-fast) - Unit tests with mock Redis + mock HTTP server
-
.env.referenceupdated if new envs needed (none expected — reuses existing MULTI_TENANT_REDIS_* + MULTI_TENANT_URL)