diff --git a/.env.example b/.env.example index c0151a0be..e011e8c77 100644 --- a/.env.example +++ b/.env.example @@ -52,3 +52,9 @@ POWERDNS_SERVER_ID=localhost # Storage Configuration # STORAGE_SECRET is REQUIRED for presigned URL signing - must be set to a secure value STORAGE_SECRET=your-secure-storage-secret + +# Kubernetes Cluster Configuration +# CLOUD_API_URL is the public API URL used by k8s cluster components (CCM, autoscaler) +# For local development, use http://host.docker.internal:8080 +# For production, use https://api.yourdomain.com +CLOUD_API_URL=http://host.docker.internal:8080 diff --git a/Dockerfile b/Dockerfile index 42506d936..ee2ebbecb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,6 +33,8 @@ COPY --from=builder /bin/api /app/api COPY --from=builder /app/internal/repositories/postgres/migrations /app/migrations # Copy docs for swagger if needed COPY --from=builder /app/docs /app/docs +# Copy K8s templates for cluster provisioning +COPY --from=builder /app/internal/repositories/k8s/templates /app/internal/repositories/k8s/templates # Create data directory RUN mkdir -p /app/thecloud-data diff --git a/docker-compose.yml b/docker-compose.yml index bcd5185c4..a72f12b2c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -108,6 +108,7 @@ services: - POWERDNS_API_KEY=${POWERDNS_API_KEY} - STORAGE_SECRET=${STORAGE_SECRET} - DOCKER_DEFAULT_NETWORK=${DOCKER_DEFAULT_NETWORK:-cloud-network} + - CLOUD_API_URL=${CLOUD_API_URL:-http://host.docker.internal:8080} volumes: - /var/run/docker.sock:/var/run/docker.sock - /var/run/openvswitch:/var/run/openvswitch diff --git a/internal/repositories/postgres/cluster_repo.go b/internal/repositories/postgres/cluster_repo.go index 61543e519..7320de4ac 100644 --- a/internal/repositories/postgres/cluster_repo.go +++ b/internal/repositories/postgres/cluster_repo.go @@ -26,16 +26,16 @@ func NewClusterRepository(db DB) *ClusterRepository { func (r *ClusterRepository) Create(ctx context.Context, cluster *domain.Cluster) error { query := ` INSERT INTO clusters ( - id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, - network_isolation, pod_cidr, service_cidr, api_server_lb_address, - kubeconfig_encrypted, ssh_private_key_encrypted, join_token, - token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, + id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, + network_isolation, pod_cidr, service_cidr, api_server_lb_address, + kubeconfig_encrypted, ssh_private_key_encrypted, join_token, + token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, created_at, updated_at ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) ` _, err := r.db.Exec(ctx, query, - cluster.ID, cluster.UserID, cluster.VpcID, cluster.Name, cluster.Version, + cluster.ID, cluster.UserID, cluster.TenantID, cluster.VpcID, cluster.Name, cluster.Version, string(cluster.Status), cluster.ControlPlaneIPs, cluster.WorkerCount, cluster.HAEnabled, cluster.NetworkIsolation, cluster.PodCIDR, cluster.ServiceCIDR, cluster.APIServerLBAddress, cluster.KubeconfigEncrypted, @@ -51,17 +51,18 @@ func (r *ClusterRepository) Create(ctx context.Context, cluster *domain.Cluster) func (r *ClusterRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Cluster, error) { userID := appcontext.UserIDFromContext(ctx) + tenantID := appcontext.TenantIDFromContext(ctx) query := ` - SELECT - id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, - network_isolation, pod_cidr, service_cidr, api_server_lb_address, - kubeconfig_encrypted, ssh_private_key_encrypted, join_token, - token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, + SELECT + id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, + network_isolation, pod_cidr, service_cidr, api_server_lb_address, + kubeconfig_encrypted, ssh_private_key_encrypted, join_token, + token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, created_at, updated_at FROM clusters - WHERE id = $1 AND user_id = $2 + WHERE id = $1 AND tenant_id = $2 AND user_id = $3 ` - cluster, err := r.scanCluster(r.db.QueryRow(ctx, query, id, userID)) + cluster, err := r.scanCluster(r.db.QueryRow(ctx, query, id, tenantID, userID)) if err != nil || cluster == nil { return cluster, err } @@ -77,27 +78,28 @@ func (r *ClusterRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain. } func (r *ClusterRepository) ListByUserID(ctx context.Context, userID uuid.UUID) ([]*domain.Cluster, error) { + tenantID := appcontext.TenantIDFromContext(ctx) query := ` - SELECT - id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, - network_isolation, pod_cidr, service_cidr, api_server_lb_address, - kubeconfig_encrypted, ssh_private_key_encrypted, join_token, - token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, + SELECT + id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, + network_isolation, pod_cidr, service_cidr, api_server_lb_address, + kubeconfig_encrypted, ssh_private_key_encrypted, join_token, + token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, created_at, updated_at FROM clusters - WHERE user_id = $1 + WHERE tenant_id = $1 AND user_id = $2 ORDER BY created_at DESC ` - return r.list(ctx, query, userID) + return r.list(ctx, query, tenantID, userID) } func (r *ClusterRepository) ListAll(ctx context.Context) ([]*domain.Cluster, error) { query := ` - SELECT - id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, - network_isolation, pod_cidr, service_cidr, api_server_lb_address, - kubeconfig_encrypted, ssh_private_key_encrypted, join_token, - token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, + SELECT + id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled, + network_isolation, pod_cidr, service_cidr, api_server_lb_address, + kubeconfig_encrypted, ssh_private_key_encrypted, join_token, + token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days, created_at, updated_at FROM clusters ORDER BY created_at DESC @@ -133,16 +135,17 @@ func (r *ClusterRepository) list(ctx context.Context, query string, args ...any) } func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster) error { + tenantID := appcontext.TenantIDFromContext(ctx) query := ` UPDATE clusters - SET - vpc_id = $1, name = $2, version = $3, status = $4, control_plane_ips = $5, - worker_count = $6, ha_enabled = $7, network_isolation = $8, pod_cidr = $9, - service_cidr = $10, api_server_lb_address = $11, kubeconfig_encrypted = $12, - ssh_private_key_encrypted = $13, join_token = $14, token_expires_at = $15, - ca_cert_hash = $16, job_id = $17, backup_schedule = $18, backup_retention_days = $19, + SET + vpc_id = $1, name = $2, version = $3, status = $4, control_plane_ips = $5, + worker_count = $6, ha_enabled = $7, network_isolation = $8, pod_cidr = $9, + service_cidr = $10, api_server_lb_address = $11, kubeconfig_encrypted = $12, + ssh_private_key_encrypted = $13, join_token = $14, token_expires_at = $15, + ca_cert_hash = $16, job_id = $17, backup_schedule = $18, backup_retention_days = $19, updated_at = $20 - WHERE id = $21 AND user_id = $22 + WHERE id = $21 AND tenant_id = $22 AND user_id = $23 ` _, err := r.db.Exec(ctx, query, cluster.VpcID, cluster.Name, cluster.Version, string(cluster.Status), @@ -151,7 +154,7 @@ func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster) cluster.APIServerLBAddress, cluster.KubeconfigEncrypted, cluster.SSHPrivateKeyEncrypted, cluster.JoinToken, cluster.TokenExpiresAt, cluster.CACertHash, cluster.JobID, cluster.BackupSchedule, cluster.BackupRetentionDays, - time.Now(), cluster.ID, cluster.UserID, + time.Now(), cluster.ID, tenantID, cluster.UserID, ) if err != nil { return errors.Wrap(errors.Internal, "failed to update cluster", err) @@ -161,8 +164,9 @@ func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster) func (r *ClusterRepository) Delete(ctx context.Context, id uuid.UUID) error { userID := appcontext.UserIDFromContext(ctx) - query := `DELETE FROM clusters WHERE id = $1 AND user_id = $2` - _, err := r.db.Exec(ctx, query, id, userID) + tenantID := appcontext.TenantIDFromContext(ctx) + query := `DELETE FROM clusters WHERE id = $1 AND tenant_id = $2 AND user_id = $3` + _, err := r.db.Exec(ctx, query, id, tenantID, userID) if err != nil { return errors.Wrap(errors.Internal, "failed to delete cluster", err) } @@ -295,7 +299,7 @@ func (r *ClusterRepository) scanCluster(row pgx.Row) (*domain.Cluster, error) { var c domain.Cluster var status string err := row.Scan( - &c.ID, &c.UserID, &c.VpcID, &c.Name, &c.Version, &status, &c.ControlPlaneIPs, &c.WorkerCount, + &c.ID, &c.UserID, &c.TenantID, &c.VpcID, &c.Name, &c.Version, &status, &c.ControlPlaneIPs, &c.WorkerCount, &c.HAEnabled, &c.NetworkIsolation, &c.PodCIDR, &c.ServiceCIDR, &c.APIServerLBAddress, &c.KubeconfigEncrypted, &c.SSHPrivateKeyEncrypted, &c.JoinToken, &c.TokenExpiresAt, &c.CACertHash, &c.JobID, &c.BackupSchedule, &c.BackupRetentionDays, diff --git a/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.down.sql b/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.down.sql new file mode 100644 index 000000000..a82265833 --- /dev/null +++ b/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.down.sql @@ -0,0 +1,7 @@ +-- Remove tenant_id column from clusters table +ALTER TABLE clusters DROP COLUMN tenant_id; + +-- Restore original index +DROP INDEX IF EXISTS idx_clusters_tenant_id; +DROP INDEX IF EXISTS idx_clusters_tenant_user; +CREATE INDEX idx_clusters_user_id ON clusters(user_id); diff --git a/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.up.sql b/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.up.sql new file mode 100644 index 000000000..96bf88147 --- /dev/null +++ b/internal/repositories/postgres/migrations/100_add_clusters_tenant_id.up.sql @@ -0,0 +1,21 @@ +-- Add tenant_id column to clusters table for proper tenant isolation +-- Step 1: Add tenant_id as nullable first +ALTER TABLE clusters ADD COLUMN tenant_id UUID; + +-- Step 2: Backfill tenant_id from users table for existing clusters +UPDATE clusters c +SET tenant_id = t.id +FROM users u +JOIN tenants t ON u.tenant_id = t.id +WHERE c.user_id = u.id AND c.tenant_id IS NULL; + +-- Step 3: Set default for any remaining (standalone/system clusters without user) +UPDATE clusters SET tenant_id = '00000000-0000-0000-0000-000000000000' WHERE tenant_id IS NULL; + +-- Step 4: Add NOT NULL constraint +ALTER TABLE clusters ALTER COLUMN tenant_id SET NOT NULL; + +-- Step 5: Create proper indexes +DROP INDEX IF EXISTS idx_clusters_user_id; +CREATE INDEX idx_clusters_tenant_id ON clusters(tenant_id); +CREATE INDEX idx_clusters_tenant_user ON clusters(tenant_id, user_id); diff --git a/internal/workers/cluster_worker.go b/internal/workers/cluster_worker.go index b6deab35c..02df69bd9 100644 --- a/internal/workers/cluster_worker.go +++ b/internal/workers/cluster_worker.go @@ -137,7 +137,7 @@ func (w *ClusterWorker) processJob(workerCtx context.Context, msg *ports.Durable } } - ctx := appcontext.WithUserID(workerCtx, job.UserID) + ctx := appcontext.WithUserID(appcontext.WithTenantID(workerCtx, job.TenantID), job.UserID) cluster, err := w.repo.GetByID(ctx, job.ClusterID) if err != nil {