Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions controllers/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ func InitAuthConfig() {
iamsdk.InitConfig(iamEndpoint, clientId, clientSecret, "", iamOrganization, iamApplication)
application, err := iamsdk.GetApplication(iamApplication)
if err != nil {
panic(err)
fmt.Printf("[WARN] Failed to get IAM application %q: %v (auth features disabled)\n", iamApplication, err)
return
}
if application == nil {
panic(fmt.Errorf("The application: %s does not exist", iamApplication))
fmt.Printf("[WARN] IAM application %q does not exist (auth features disabled)\n", iamApplication)
return
}

cert, err := iamsdk.GetCert(application.Cert)
if err != nil {
panic(err)
fmt.Printf("[WARN] Failed to get cert %q for application %q: %v (auth features disabled)\n", application.Cert, iamApplication, err)
return
}
if cert == nil {
panic(fmt.Errorf("The cert: %s does not exist", application.Cert))
fmt.Printf("[WARN] Cert %q for application %q does not exist (auth features disabled)\n", application.Cert, iamApplication)
return
}

iamsdk.InitConfig(iamEndpoint, clientId, clientSecret, cert.Certificate, iamOrganization, iamApplication)
Expand Down
4 changes: 3 additions & 1 deletion controllers/openai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ func resolveConsoleKeys(org string) (publicKey, secretKey string) {
// (console-pk-{org} / console-sk-{org}), enabling each org to see their own usage
// in console.hanzo.ai. This is fire-and-forget — failures are silently ignored.
func recordTrace(record *usageRecord, startTime time.Time) {
// Write to ClickHouse via native ZAP if datastore is connected.
// Write billing record to ClickHouse for invoice reconciliation.
go zapWriteUsage(record, startTime)
// Write observability trace to ClickHouse via native ZAP.
go zapWriteTrace(record, startTime)

go func() {
Expand Down
92 changes: 92 additions & 0 deletions controllers/zap_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,98 @@ func zapWriteTrace(record *usageRecord, startTime time.Time) {
}
}

// ── ZAP billing record writer (datastore → ClickHouse) ──────────────────
//
// Writes billing/usage records to hanzo.cloud_usage for invoice reconciliation.
// Both Commerce and Console can query this table for unified billing views.

var usageTableCreated bool

func zapEnsureUsageTable() {
if usageTableCreated {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err := object.ZapDatastoreExec(ctx, `
CREATE TABLE IF NOT EXISTS hanzo.cloud_usage (
id String,
timestamp DateTime,
owner String,
user_id String,
organization String,
model String,
provider String,
request_id String,
prompt_tokens UInt32,
completion_tokens UInt32,
total_tokens UInt32,
cache_read_tokens UInt32,
cache_write_tokens UInt32,
cost_cents UInt64,
currency String,
status String,
error_msg String,
is_premium UInt8,
is_stream UInt8,
client_ip String
) ENGINE = MergeTree()
ORDER BY (timestamp, organization, user_id)
TTL timestamp + INTERVAL 2 YEAR
`)
if err != nil {
logs.Warn("ZAP: failed to create cloud_usage table: %v", err)
return
}
usageTableCreated = true
}

func zapWriteUsage(record *usageRecord, startTime time.Time) {
if !object.DatastoreEnabled() {
return
}

zapEnsureUsageTable()

org := record.Organization
if org == "" {
org = record.Owner
}

costCents := calculateCostCentsWithCache(
record.Model, record.PromptTokens, record.CompletionTokens,
record.CacheReadTokens, record.CacheWriteTokens,
)

premium := uint8(0)
if record.Premium {
premium = 1
}
stream := uint8(0)
if record.Stream {
stream = 1
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := object.ZapDatastoreExec(ctx,
`INSERT INTO hanzo.cloud_usage (id, timestamp, owner, user_id, organization, model, provider, request_id, prompt_tokens, completion_tokens, total_tokens, cache_read_tokens, cache_write_tokens, cost_cents, currency, status, error_msg, is_premium, is_stream, client_ip) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
record.RequestID, startTime.UTC(),
record.Owner, record.User, org,
record.Model, record.Provider, record.RequestID,
record.PromptTokens, record.CompletionTokens, record.TotalTokens,
record.CacheReadTokens, record.CacheWriteTokens,
costCents, "usd",
record.Status, record.ErrorMsg,
premium, stream, record.ClientIP,
)
if err != nil {
logs.Warn("ZAP: usage write failed: %v", err)
}
}

// ── models.list ─────────────────────────────────────────────────────────

func zapListModelsHandler() (*zap.Message, error) {
Expand Down
Loading