diff --git a/controllers/account.go b/controllers/account.go index 06201e1b..e1c177c8 100644 --- a/controllers/account.go +++ b/controllers/account.go @@ -44,18 +44,22 @@ func InitAuthConfig() { iamsdk.InitConfig(iamEndpoint, clientId, clientSecret, "", iamOrganization, iamApplication) application, err := iamsdk.GetApplication(iamApplication) if err != nil { - panic(err) + fmt.Printf("[WARN] Failed to get IAM application %q: %v (auth features disabled)\n", iamApplication, err) + return } if application == nil { - panic(fmt.Errorf("The application: %s does not exist", iamApplication)) + fmt.Printf("[WARN] IAM application %q does not exist (auth features disabled)\n", iamApplication) + return } cert, err := iamsdk.GetCert(application.Cert) if err != nil { - panic(err) + fmt.Printf("[WARN] Failed to get cert %q for application %q: %v (auth features disabled)\n", application.Cert, iamApplication, err) + return } if cert == nil { - panic(fmt.Errorf("The cert: %s does not exist", application.Cert)) + fmt.Printf("[WARN] Cert %q for application %q does not exist (auth features disabled)\n", application.Cert, iamApplication) + return } iamsdk.InitConfig(iamEndpoint, clientId, clientSecret, cert.Certificate, iamOrganization, iamApplication) diff --git a/controllers/openai_api.go b/controllers/openai_api.go index ed232134..33dc1c43 100644 --- a/controllers/openai_api.go +++ b/controllers/openai_api.go @@ -470,7 +470,9 @@ func resolveConsoleKeys(org string) (publicKey, secretKey string) { // (console-pk-{org} / console-sk-{org}), enabling each org to see their own usage // in console.hanzo.ai. This is fire-and-forget — failures are silently ignored. func recordTrace(record *usageRecord, startTime time.Time) { - // Write to ClickHouse via native ZAP if datastore is connected. + // Write billing record to ClickHouse for invoice reconciliation. + go zapWriteUsage(record, startTime) + // Write observability trace to ClickHouse via native ZAP. go zapWriteTrace(record, startTime) go func() { diff --git a/controllers/zap_native.go b/controllers/zap_native.go index 2834d93b..d29e5348 100644 --- a/controllers/zap_native.go +++ b/controllers/zap_native.go @@ -169,6 +169,98 @@ func zapWriteTrace(record *usageRecord, startTime time.Time) { } } +// ── ZAP billing record writer (datastore → ClickHouse) ────────────────── +// +// Writes billing/usage records to hanzo.cloud_usage for invoice reconciliation. +// Both Commerce and Console can query this table for unified billing views. + +var usageTableCreated bool + +func zapEnsureUsageTable() { + if usageTableCreated { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := object.ZapDatastoreExec(ctx, ` + CREATE TABLE IF NOT EXISTS hanzo.cloud_usage ( + id String, + timestamp DateTime, + owner String, + user_id String, + organization String, + model String, + provider String, + request_id String, + prompt_tokens UInt32, + completion_tokens UInt32, + total_tokens UInt32, + cache_read_tokens UInt32, + cache_write_tokens UInt32, + cost_cents UInt64, + currency String, + status String, + error_msg String, + is_premium UInt8, + is_stream UInt8, + client_ip String + ) ENGINE = MergeTree() + ORDER BY (timestamp, organization, user_id) + TTL timestamp + INTERVAL 2 YEAR + `) + if err != nil { + logs.Warn("ZAP: failed to create cloud_usage table: %v", err) + return + } + usageTableCreated = true +} + +func zapWriteUsage(record *usageRecord, startTime time.Time) { + if !object.DatastoreEnabled() { + return + } + + zapEnsureUsageTable() + + org := record.Organization + if org == "" { + org = record.Owner + } + + costCents := calculateCostCentsWithCache( + record.Model, record.PromptTokens, record.CompletionTokens, + record.CacheReadTokens, record.CacheWriteTokens, + ) + + premium := uint8(0) + if record.Premium { + premium = 1 + } + stream := uint8(0) + if record.Stream { + stream = 1 + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := object.ZapDatastoreExec(ctx, + `INSERT INTO hanzo.cloud_usage (id, timestamp, owner, user_id, organization, model, provider, request_id, prompt_tokens, completion_tokens, total_tokens, cache_read_tokens, cache_write_tokens, cost_cents, currency, status, error_msg, is_premium, is_stream, client_ip) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + record.RequestID, startTime.UTC(), + record.Owner, record.User, org, + record.Model, record.Provider, record.RequestID, + record.PromptTokens, record.CompletionTokens, record.TotalTokens, + record.CacheReadTokens, record.CacheWriteTokens, + costCents, "usd", + record.Status, record.ErrorMsg, + premium, stream, record.ClientIP, + ) + if err != nil { + logs.Warn("ZAP: usage write failed: %v", err) + } +} + // ── models.list ───────────────────────────────────────────────────────── func zapListModelsHandler() (*zap.Message, error) {