Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ POWERDNS_SERVER_ID=localhost
# Storage Configuration
# STORAGE_SECRET is REQUIRED for presigned URL signing - must be set to a secure value
STORAGE_SECRET=your-secure-storage-secret

# Kubernetes Cluster Configuration
# CLOUD_API_URL is the public API URL used by k8s cluster components (CCM, autoscaler)
# For local development, use http://host.docker.internal:8080
# For production, use https://api.yourdomain.com
CLOUD_API_URL=http://host.docker.internal:8080
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ COPY --from=builder /bin/api /app/api
COPY --from=builder /app/internal/repositories/postgres/migrations /app/migrations
# Copy docs for swagger if needed
COPY --from=builder /app/docs /app/docs
# Copy K8s templates for cluster provisioning
COPY --from=builder /app/internal/repositories/k8s/templates /app/internal/repositories/k8s/templates

# Create data directory
RUN mkdir -p /app/thecloud-data
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ services:
- POWERDNS_API_KEY=${POWERDNS_API_KEY}
- STORAGE_SECRET=${STORAGE_SECRET}
- DOCKER_DEFAULT_NETWORK=${DOCKER_DEFAULT_NETWORK:-cloud-network}
- CLOUD_API_URL=${CLOUD_API_URL:-http://host.docker.internal:8080}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/run/openvswitch:/var/run/openvswitch
Expand Down
76 changes: 40 additions & 36 deletions internal/repositories/postgres/cluster_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func NewClusterRepository(db DB) *ClusterRepository {
func (r *ClusterRepository) Create(ctx context.Context, cluster *domain.Cluster) error {
query := `
INSERT INTO clusters (
id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
`
_, err := r.db.Exec(ctx, query,
cluster.ID, cluster.UserID, cluster.VpcID, cluster.Name, cluster.Version,
cluster.ID, cluster.UserID, cluster.TenantID, cluster.VpcID, cluster.Name, cluster.Version,
string(cluster.Status), cluster.ControlPlaneIPs, cluster.WorkerCount, cluster.HAEnabled,
cluster.NetworkIsolation, cluster.PodCIDR, cluster.ServiceCIDR,
cluster.APIServerLBAddress, cluster.KubeconfigEncrypted,
Expand All @@ -51,17 +51,18 @@ func (r *ClusterRepository) Create(ctx context.Context, cluster *domain.Cluster)

func (r *ClusterRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Cluster, error) {
userID := appcontext.UserIDFromContext(ctx)
tenantID := appcontext.TenantIDFromContext(ctx)
query := `
SELECT
id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
SELECT
id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
created_at, updated_at
FROM clusters
WHERE id = $1 AND user_id = $2
WHERE id = $1 AND tenant_id = $2 AND user_id = $3
`
cluster, err := r.scanCluster(r.db.QueryRow(ctx, query, id, userID))
cluster, err := r.scanCluster(r.db.QueryRow(ctx, query, id, tenantID, userID))
if err != nil || cluster == nil {
return cluster, err
}
Expand All @@ -77,27 +78,28 @@ func (r *ClusterRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.
}

func (r *ClusterRepository) ListByUserID(ctx context.Context, userID uuid.UUID) ([]*domain.Cluster, error) {
tenantID := appcontext.TenantIDFromContext(ctx)
query := `
SELECT
id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
SELECT
id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
created_at, updated_at
FROM clusters
WHERE user_id = $1
WHERE tenant_id = $1 AND user_id = $2
ORDER BY created_at DESC
`
return r.list(ctx, query, userID)
return r.list(ctx, query, tenantID, userID)
}

func (r *ClusterRepository) ListAll(ctx context.Context) ([]*domain.Cluster, error) {
query := `
SELECT
id, user_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
SELECT
id, user_id, tenant_id, vpc_id, name, version, status, control_plane_ips, worker_count, ha_enabled,
network_isolation, pod_cidr, service_cidr, api_server_lb_address,
kubeconfig_encrypted, ssh_private_key_encrypted, join_token,
token_expires_at, ca_cert_hash, job_id, backup_schedule, backup_retention_days,
created_at, updated_at
FROM clusters
ORDER BY created_at DESC
Expand Down Expand Up @@ -133,16 +135,17 @@ func (r *ClusterRepository) list(ctx context.Context, query string, args ...any)
}

func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster) error {
tenantID := appcontext.TenantIDFromContext(ctx)
query := `
UPDATE clusters
SET
vpc_id = $1, name = $2, version = $3, status = $4, control_plane_ips = $5,
worker_count = $6, ha_enabled = $7, network_isolation = $8, pod_cidr = $9,
service_cidr = $10, api_server_lb_address = $11, kubeconfig_encrypted = $12,
ssh_private_key_encrypted = $13, join_token = $14, token_expires_at = $15,
ca_cert_hash = $16, job_id = $17, backup_schedule = $18, backup_retention_days = $19,
SET
vpc_id = $1, name = $2, version = $3, status = $4, control_plane_ips = $5,
worker_count = $6, ha_enabled = $7, network_isolation = $8, pod_cidr = $9,
service_cidr = $10, api_server_lb_address = $11, kubeconfig_encrypted = $12,
ssh_private_key_encrypted = $13, join_token = $14, token_expires_at = $15,
ca_cert_hash = $16, job_id = $17, backup_schedule = $18, backup_retention_days = $19,
updated_at = $20
WHERE id = $21 AND user_id = $22
WHERE id = $21 AND tenant_id = $22 AND user_id = $23
`
_, err := r.db.Exec(ctx, query,
cluster.VpcID, cluster.Name, cluster.Version, string(cluster.Status),
Expand All @@ -151,7 +154,7 @@ func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster)
cluster.APIServerLBAddress, cluster.KubeconfigEncrypted,
cluster.SSHPrivateKeyEncrypted, cluster.JoinToken, cluster.TokenExpiresAt,
cluster.CACertHash, cluster.JobID, cluster.BackupSchedule, cluster.BackupRetentionDays,
time.Now(), cluster.ID, cluster.UserID,
time.Now(), cluster.ID, tenantID, cluster.UserID,
)
if err != nil {
return errors.Wrap(errors.Internal, "failed to update cluster", err)
Expand All @@ -161,8 +164,9 @@ func (r *ClusterRepository) Update(ctx context.Context, cluster *domain.Cluster)

func (r *ClusterRepository) Delete(ctx context.Context, id uuid.UUID) error {
userID := appcontext.UserIDFromContext(ctx)
query := `DELETE FROM clusters WHERE id = $1 AND user_id = $2`
_, err := r.db.Exec(ctx, query, id, userID)
tenantID := appcontext.TenantIDFromContext(ctx)
query := `DELETE FROM clusters WHERE id = $1 AND tenant_id = $2 AND user_id = $3`
_, err := r.db.Exec(ctx, query, id, tenantID, userID)
if err != nil {
return errors.Wrap(errors.Internal, "failed to delete cluster", err)
}
Expand Down Expand Up @@ -295,7 +299,7 @@ func (r *ClusterRepository) scanCluster(row pgx.Row) (*domain.Cluster, error) {
var c domain.Cluster
var status string
err := row.Scan(
&c.ID, &c.UserID, &c.VpcID, &c.Name, &c.Version, &status, &c.ControlPlaneIPs, &c.WorkerCount,
&c.ID, &c.UserID, &c.TenantID, &c.VpcID, &c.Name, &c.Version, &status, &c.ControlPlaneIPs, &c.WorkerCount,
&c.HAEnabled, &c.NetworkIsolation, &c.PodCIDR, &c.ServiceCIDR,
&c.APIServerLBAddress, &c.KubeconfigEncrypted, &c.SSHPrivateKeyEncrypted,
&c.JoinToken, &c.TokenExpiresAt, &c.CACertHash, &c.JobID, &c.BackupSchedule, &c.BackupRetentionDays,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Remove tenant_id column from clusters table
ALTER TABLE clusters DROP COLUMN tenant_id;

-- Restore original index
DROP INDEX IF EXISTS idx_clusters_tenant_id;
DROP INDEX IF EXISTS idx_clusters_tenant_user;
CREATE INDEX idx_clusters_user_id ON clusters(user_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Add tenant_id column to clusters table for proper tenant isolation
-- Step 1: Add tenant_id as nullable first
ALTER TABLE clusters ADD COLUMN tenant_id UUID;

-- Step 2: Backfill tenant_id from users table for existing clusters
UPDATE clusters c
SET tenant_id = t.id
FROM users u
JOIN tenants t ON u.tenant_id = t.id
WHERE c.user_id = u.id AND c.tenant_id IS NULL;

-- Step 3: Set default for any remaining (standalone/system clusters without user)
UPDATE clusters SET tenant_id = '00000000-0000-0000-0000-000000000000' WHERE tenant_id IS NULL;

-- Step 4: Add NOT NULL constraint
ALTER TABLE clusters ALTER COLUMN tenant_id SET NOT NULL;

-- Step 5: Create proper indexes
DROP INDEX IF EXISTS idx_clusters_user_id;
CREATE INDEX idx_clusters_tenant_id ON clusters(tenant_id);
CREATE INDEX idx_clusters_tenant_user ON clusters(tenant_id, user_id);
2 changes: 1 addition & 1 deletion internal/workers/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (w *ClusterWorker) processJob(workerCtx context.Context, msg *ports.Durable
}
}

ctx := appcontext.WithUserID(workerCtx, job.UserID)
ctx := appcontext.WithUserID(appcontext.WithTenantID(workerCtx, job.TenantID), job.UserID)

cluster, err := w.repo.GetByID(ctx, job.ClusterID)
if err != nil {
Expand Down
Loading