diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..901bcaa2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,26 @@ +.git +.gitignore +.venv +__pycache__ +*.pyc +*.pyo +.mypy_cache +.ruff_cache +.pytest_cache +*.egg-info +dist +build +.env +.env.* +operator/ +docs/ +setup/ +test_agent.py +test_agent_problem_sets.json +*.md +hack/ +.github/ +.idea/ +Makefile +deploy/ +evaluator/datasets/swebench_verified/repos/ diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml new file mode 100644 index 00000000..aac2718e --- /dev/null +++ b/.github/workflows/build-images.yml @@ -0,0 +1,67 @@ +name: Build Images + +on: + push: + branches: + - main + tags: + - "v*" + pull_request: + branches: + - main + +permissions: + contents: read + packages: write + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - image: screener + dockerfile: validator/Dockerfile + context: . + - image: operator + dockerfile: operator/Dockerfile + context: operator + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/${{ github.repository }}/${{ matrix.image }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=sha + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: ${{ matrix.context }} + file: ${{ matrix.dockerfile }} + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/api/endpoints/screener.py b/api/endpoints/screener.py new file mode 100644 index 00000000..3a379b7b --- /dev/null +++ b/api/endpoints/screener.py @@ -0,0 +1,23 @@ +from fastapi import APIRouter, Query, HTTPException +from pydantic import BaseModel + +from models.evaluation_set import EvaluationSetGroup +from queries.agent import get_queue_depth, get_active_evaluation_count + +router = APIRouter() + + +class QueueDepthResponse(BaseModel): + depth: int + stage: str + active: int + + +@router.get("/queue-depth", response_model=QueueDepthResponse) +async def queue_depth( + stage: str = Query(..., pattern="^(screener_1|screener_2|validator)$"), +): + group = EvaluationSetGroup(stage) + depth = await get_queue_depth(group) + active = await get_active_evaluation_count(group) + return QueueDepthResponse(depth=depth, stage=stage, active=active) diff --git a/api/src/main.py b/api/src/main.py index 4e90f925..7e6dfee1 100644 --- a/api/src/main.py +++ b/api/src/main.py @@ -25,6 +25,7 @@ from api.endpoints.scoring import router as scoring_router from api.endpoints.statistics import router as statistics_router from api.endpoints.retrieval import router as retrieval_router +from api.endpoints.screener import router as screener_router @@ -114,6 +115,7 @@ async def lifespan(app: FastAPI): app.include_router(evaluation_run_router, prefix="/evaluation-run") app.include_router(evaluations_router, prefix="/evaluation") app.include_router(statistics_router, prefix="/statistics") +app.include_router(screener_router, prefix="/screener") if __name__ == "__main__": diff --git a/evaluator/sandbox/Dockerfile b/evaluator/sandbox/Dockerfile index d217c88a..b6dfe7a0 100644 --- a/evaluator/sandbox/Dockerfile +++ b/evaluator/sandbox/Dockerfile @@ -32,7 +32,9 @@ RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.3/install.sh | b ln -sf "$NVM_DIR/versions/node/$NODE_VERSION/bin/npm" /usr/local/bin/npm && \ ln -sf "$NVM_DIR/versions/node/$NODE_VERSION/bin/npx" /usr/local/bin/npx && \ ln -sf "$NVM_DIR/versions/node/$NODE_VERSION/bin/node" /usr/local/bin/node && \ - ln -sf "$NVM_DIR/versions/node/$NODE_VERSION/lib/node_modules" /usr/local/lib/node_modules + ln -sf "$NVM_DIR/versions/node/$NODE_VERSION/lib/node_modules" /usr/local/lib/node_modules && \ + chmod -R a+rX /root/.nvm && \ + chmod a+rx /root ENV NODE_PATH=/usr/local/lib/node_modules diff --git a/evaluator/sandbox/proxy/Dockerfile b/evaluator/sandbox/proxy/Dockerfile index 20b23685..5b77f42b 100644 --- a/evaluator/sandbox/proxy/Dockerfile +++ b/evaluator/sandbox/proxy/Dockerfile @@ -1,7 +1,12 @@ FROM nginx:alpine +USER root + RUN apk add --no-cache gettext +RUN mkdir -p /sandbox-proxy && chmod 0777 /sandbox-proxy COPY nginx.conf.template /tmp/nginx.conf.template -CMD ["/bin/sh", "-c", "envsubst '${GATEWAY_URL} ${GATEWAY_HOST}' < /tmp/nginx.conf.template > /etc/nginx/nginx.conf && exec nginx -g 'daemon off;'"] \ No newline at end of file +USER nginx + +CMD ["/bin/sh", "-c", "envsubst '${GATEWAY_URL} ${GATEWAY_HOST}' < /tmp/nginx.conf.template > /sandbox-proxy/nginx.conf && exec nginx -c /sandbox-proxy/nginx.conf -g 'daemon off;'"] \ No newline at end of file diff --git a/evaluator/sandbox/proxy/nginx.conf.template b/evaluator/sandbox/proxy/nginx.conf.template index 1cb85f47..fd4b0f60 100644 --- a/evaluator/sandbox/proxy/nginx.conf.template +++ b/evaluator/sandbox/proxy/nginx.conf.template @@ -1,26 +1,52 @@ +pid /sandbox-proxy/nginx.pid; + events {} http { # Use a public DNS resolver for upstream name resolution resolver 1.1.1.1 1.0.0.1 ipv6=off; + client_body_temp_path /sandbox-proxy/client_temp; + proxy_temp_path /sandbox-proxy/proxy_temp; + fastcgi_temp_path /sandbox-proxy/fastcgi_temp; + uwsgi_temp_path /sandbox-proxy/uwsgi_temp; + scgi_temp_path /sandbox-proxy/scgi_temp; + + limit_req_zone $binary_remote_addr zone=agent:10m rate=30r/s; + server { - listen 80; + listen 8080; + + proxy_set_header Host $GATEWAY_HOST; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + proxy_connect_timeout 300s; + proxy_send_timeout 300s; + proxy_read_timeout 300s; + + # Ensure TLS SNI to Cloudflare/origin matches the hostname + #proxy_ssl_server_name on; + #proxy_ssl_name $GATEWAY_HOST; + + location /api/inference { + limit_req zone=agent burst=10 nodelay; + proxy_pass $GATEWAY_URL/api/inference; + } + + location /api/embedding { + limit_req zone=agent burst=10 nodelay; + proxy_pass $GATEWAY_URL/api/embedding; + } + + location /api/usage { + limit_req zone=agent burst=10 nodelay; + proxy_pass $GATEWAY_URL/api/usage; + } location / { - proxy_pass $GATEWAY_URL; - proxy_set_header Host $GATEWAY_HOST; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - - proxy_connect_timeout 300s; - proxy_send_timeout 300s; - proxy_read_timeout 300s; - - # Ensure TLS SNI to Cloudflare/origin matches the hostname - proxy_ssl_server_name on; - proxy_ssl_name $GATEWAY_HOST; + return 403 "Forbidden"; } } -} \ No newline at end of file +} diff --git a/evaluator/sandbox/sandbox_manager.py b/evaluator/sandbox/sandbox_manager.py index cbc5ee63..4853858e 100644 --- a/evaluator/sandbox/sandbox_manager.py +++ b/evaluator/sandbox/sandbox_manager.py @@ -14,7 +14,7 @@ SANDBOX_NETWORK_NAME = f"{DOCKER_PREFIX}-sandbox-network" SANDBOX_PROXY_HOST = f"{DOCKER_PREFIX}-sandbox-proxy" -SANDBOX_PROXY_PORT = 80 +SANDBOX_PROXY_PORT = 8080 @@ -135,7 +135,11 @@ def initialize_sandbox( elif script_extension == ".js": command = f"node /sandbox/{script_name} 2>&1" - # Create Docker container + for root, dirs, files in os.walk(temp_dir): + os.chmod(root, 0o777) + for fname in files: + os.chmod(os.path.join(root, fname), 0o666) + container = get_docker_client().containers.run( name=name, image=f"{DOCKER_PREFIX}-sandbox-image", @@ -143,12 +147,22 @@ def initialize_sandbox( network=SANDBOX_NETWORK_NAME, environment={ "PYTHONUNBUFFERED": "1", - "PYTHONDONTWRITEBYTECODE": "1", # No __pycache__ + "PYTHONDONTWRITEBYTECODE": "1", + "HOME": "/tmp", "SANDBOX_PROXY_URL": f"http://{SANDBOX_PROXY_HOST}:{SANDBOX_PROXY_PORT}", + "GIT_CONFIG_COUNT": "1", + "GIT_CONFIG_KEY_0": "safe.directory", + "GIT_CONFIG_VALUE_0": "/sandbox/repo", **env_vars }, command=command, - detach=True + detach=True, + user="65534", + read_only=True, + tmpfs={"/tmp": "size=64m,mode=1777"}, + pids_limit=256, + security_opt=["no-new-privileges"], + cap_drop=["ALL"], ) return Sandbox( diff --git a/operator/.dockerignore b/operator/.dockerignore new file mode 100644 index 00000000..fac5ce78 --- /dev/null +++ b/operator/.dockerignore @@ -0,0 +1,2 @@ +.git +*.md diff --git a/operator/Dockerfile b/operator/Dockerfile new file mode 100644 index 00000000..c75170b2 --- /dev/null +++ b/operator/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.25 AS modules +WORKDIR /app +COPY go.mod go.sum* ./ +RUN go mod download + +FROM modules AS builder +COPY . . +RUN CGO_ENABLED=0 go build -o manager ./cmd/main.go + +FROM gcr.io/distroless/static:nonroot +COPY --from=builder /app/manager /manager +ENTRYPOINT ["/manager"] diff --git a/operator/api/v1alpha1/evaluatorpool_types.go b/operator/api/v1alpha1/evaluatorpool_types.go new file mode 100644 index 00000000..4ad0cbb1 --- /dev/null +++ b/operator/api/v1alpha1/evaluatorpool_types.go @@ -0,0 +1,94 @@ +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + ConditionReady = "Ready" + ConditionAPIReachable = "APIReachable" + ConditionSecretReady = "SecretReady" + ConditionNodesAvailable = "NodesAvailable" + ConditionScalingActive = "ScalingActive" + ConditionDegraded = "Degraded" +) + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:subresource:scale:specpath=.spec.scaling.minReplicas,statuspath=.status.currentReplicas +// +kubebuilder:resource:shortName=ep,categories={ridges} +// +kubebuilder:printcolumn:name="Stage",type=string,JSONPath=`.spec.stage` +// +kubebuilder:printcolumn:name="Desired",type=integer,JSONPath=`.status.desiredReplicas` +// +kubebuilder:printcolumn:name="Current",type=integer,JSONPath=`.status.currentReplicas` +// +kubebuilder:printcolumn:name="Queue",type=integer,JSONPath=`.status.lastQueueDepth` +// +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` + +type EvaluatorPool struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec EvaluatorPoolSpec `json:"spec,omitempty"` + Status EvaluatorPoolStatus `json:"status,omitempty"` +} + +type EvaluatorPoolSpec struct { + // +kubebuilder:validation:Enum=screener_1;screener_2 + Stage string `json:"stage"` + + Scaling EvaluatorPoolScaling `json:"scaling"` + + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:default=30 + PollingIntervalSeconds int32 `json:"pollingIntervalSeconds,omitempty"` + + // +optional + Resources *EvaluatorPoolResources `json:"resources,omitempty"` +} + +type EvaluatorPoolResources struct { + // +optional + Screener *corev1.ResourceRequirements `json:"screener,omitempty"` + // +optional + Dind *corev1.ResourceRequirements `json:"dind,omitempty"` +} + +type EvaluatorPoolScaling struct { + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=0 + MinReplicas int32 `json:"minReplicas,omitempty"` + + // +kubebuilder:validation:Minimum=1 + MaxReplicas int32 `json:"maxReplicas"` + + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=600 + ScaleDownStabilizationSeconds int32 `json:"scaleDownStabilizationSeconds,omitempty"` +} + +type EvaluatorPoolStatus struct { + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + CurrentReplicas int32 `json:"currentReplicas"` + DesiredReplicas int32 `json:"desiredReplicas"` + LastQueueDepth int32 `json:"lastQueueDepth"` + LastScaleUpTime *metav1.Time `json:"lastScaleUpTime,omitempty"` + LastScaleDownTime *metav1.Time `json:"lastScaleDownTime,omitempty"` + LastPollTime *metav1.Time `json:"lastPollTime,omitempty"` + + // +listType=map + // +listMapKey=type + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// +kubebuilder:object:root=true + +type EvaluatorPoolList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []EvaluatorPool `json:"items"` +} + +func init() { + SchemeBuilder.Register(&EvaluatorPool{}, &EvaluatorPoolList{}) +} diff --git a/operator/api/v1alpha1/groupversion_info.go b/operator/api/v1alpha1/groupversion_info.go new file mode 100644 index 00000000..46e7ff3d --- /dev/null +++ b/operator/api/v1alpha1/groupversion_info.go @@ -0,0 +1,14 @@ +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + GroupVersion = schema.GroupVersion{Group: "ridges.ai", Version: "v1alpha1"} + + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000..93f20386 --- /dev/null +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,152 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func (in *EvaluatorPool) DeepCopyInto(out *EvaluatorPool) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +func (in *EvaluatorPool) DeepCopy() *EvaluatorPool { + if in == nil { + return nil + } + out := new(EvaluatorPool) + in.DeepCopyInto(out) + return out +} + +func (in *EvaluatorPool) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +func (in *EvaluatorPoolList) DeepCopyInto(out *EvaluatorPoolList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]EvaluatorPool, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *EvaluatorPoolList) DeepCopy() *EvaluatorPoolList { + if in == nil { + return nil + } + out := new(EvaluatorPoolList) + in.DeepCopyInto(out) + return out +} + +func (in *EvaluatorPoolList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +func (in *EvaluatorPoolSpec) DeepCopyInto(out *EvaluatorPoolSpec) { + *out = *in + out.Scaling = in.Scaling + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = new(EvaluatorPoolResources) + (*in).DeepCopyInto(*out) + } +} + +func (in *EvaluatorPoolSpec) DeepCopy() *EvaluatorPoolSpec { + if in == nil { + return nil + } + out := new(EvaluatorPoolSpec) + in.DeepCopyInto(out) + return out +} + +func (in *EvaluatorPoolResources) DeepCopyInto(out *EvaluatorPoolResources) { + *out = *in + if in.Screener != nil { + in, out := &in.Screener, &out.Screener + *out = new(corev1.ResourceRequirements) + (*in).DeepCopyInto(*out) + } + if in.Dind != nil { + in, out := &in.Dind, &out.Dind + *out = new(corev1.ResourceRequirements) + (*in).DeepCopyInto(*out) + } +} + +func (in *EvaluatorPoolResources) DeepCopy() *EvaluatorPoolResources { + if in == nil { + return nil + } + out := new(EvaluatorPoolResources) + in.DeepCopyInto(out) + return out +} + +func (in *EvaluatorPoolScaling) DeepCopyInto(out *EvaluatorPoolScaling) { + *out = *in +} + +func (in *EvaluatorPoolScaling) DeepCopy() *EvaluatorPoolScaling { + if in == nil { + return nil + } + out := new(EvaluatorPoolScaling) + in.DeepCopyInto(out) + return out +} + +func (in *EvaluatorPoolStatus) DeepCopyInto(out *EvaluatorPoolStatus) { + *out = *in + if in.LastScaleUpTime != nil { + in, out := &in.LastScaleUpTime, &out.LastScaleUpTime + *out = (*in).DeepCopy() + } + if in.LastScaleDownTime != nil { + in, out := &in.LastScaleDownTime, &out.LastScaleDownTime + *out = (*in).DeepCopy() + } + if in.LastPollTime != nil { + in, out := &in.LastPollTime, &out.LastPollTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *EvaluatorPoolStatus) DeepCopy() *EvaluatorPoolStatus { + if in == nil { + return nil + } + out := new(EvaluatorPoolStatus) + in.DeepCopyInto(out) + return out +} diff --git a/operator/cmd/main.go b/operator/cmd/main.go new file mode 100644 index 00000000..c6f2282a --- /dev/null +++ b/operator/cmd/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "flag" + "os" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + ridgesv1alpha1 "github.com/ridgesai/ridges/operator/api/v1alpha1" + "github.com/ridgesai/ridges/operator/internal/controller" +) + +var scheme = runtime.NewScheme() + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(ridgesv1alpha1.AddToScheme(scheme)) +} + +func getNamespace() string { + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + return ns + } + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + return string(data) + } + return "default" +} + +func main() { + var metricsAddr string + var probeAddr string + var enableLeaderElection bool + + flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager.") + + opts := zap.Options{Development: true} + opts.BindFlags(flag.CommandLine) + flag.Parse() + + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + setupLog := ctrl.Log.WithName("setup") + + namespace := getNamespace() + setupLog.Info("operator namespace resolved", "namespace", namespace) + + required := []string{ + "RIDGES_API_URL", "PLATFORM_URL", "INFERENCE_GATEWAY_URL", + "SCREENER_IMAGE", "SCREENER_SECRET_NAME", + } + for _, key := range required { + if os.Getenv(key) == "" { + setupLog.Error(nil, "required env var is missing", "var", key) + os.Exit(1) + } + } + + cfg := controller.OperatorConfig{ + RidgesAPIURL: os.Getenv("RIDGES_API_URL"), + PlatformURL: os.Getenv("PLATFORM_URL"), + InferenceGatewayURL: os.Getenv("INFERENCE_GATEWAY_URL"), + ScreenerImage: os.Getenv("SCREENER_IMAGE"), + ScreenerSecretName: os.Getenv("SCREENER_SECRET_NAME"), + CommitHash: os.Getenv("COMMIT_HASH"), + ImagePullSecret: os.Getenv("IMAGE_PULL_SECRET"), + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: metricsserver.Options{ + BindAddress: metricsAddr, + }, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: "ridges-operator.ridges.ai", + LeaderElectionNamespace: namespace, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + namespace: {}, + }, + ByObject: map[client.Object]cache.ByObject{ + &corev1.Node{}: {}, + }, + }, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + + if err := (&controller.EvaluatorPoolReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Config: cfg, + Recorder: mgr.GetEventRecorderFor("ridges-operator"), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "EvaluatorPool") + os.Exit(1) + } + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + + setupLog.Info("starting manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } +} diff --git a/operator/go.mod b/operator/go.mod new file mode 100644 index 00000000..0cea348d --- /dev/null +++ b/operator/go.mod @@ -0,0 +1,79 @@ +module github.com/ridgesai/ridges/operator + +go 1.25.6 + +require ( + github.com/google/go-containerregistry v0.21.2 + github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-20260302220502-9e0ccb0a7240 + github.com/prometheus/client_golang v1.20.5 + k8s.io/api v0.35.1 + k8s.io/apimachinery v0.35.1 + k8s.io/client-go v0.35.1 + sigs.k8s.io/controller-runtime v0.20.2 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/containerd/stargz-snapshotter/estargz v0.18.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/docker/cli v29.2.1+incompatible // indirect + github.com/docker/distribution v2.8.3+incompatible // indirect + github.com/docker/docker-credential-helpers v0.9.3 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect + github.com/evanphx/json-patch/v5 v5.9.11 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/fxamacker/cbor/v2 v2.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/zapr v1.3.0 // indirect + github.com/go-openapi/jsonpointer v0.21.1 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/btree v1.1.3 // indirect + github.com/google/gnostic-models v0.7.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.4 // indirect + github.com/mailru/easyjson v0.9.0 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/pflag v1.0.9 // indirect + github.com/vbatts/tar-split v0.12.2 // indirect + github.com/x448/float16 v0.8.4 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/net v0.47.0 // indirect + golang.org/x/oauth2 v0.35.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.37.0 // indirect + golang.org/x/text v0.31.0 // indirect + golang.org/x/time v0.11.0 // indirect + gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/apiextensions-apiserver v0.32.1 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect + sigs.k8s.io/yaml v1.6.0 // indirect +) diff --git a/operator/go.sum b/operator/go.sum new file mode 100644 index 00000000..32226c9e --- /dev/null +++ b/operator/go.sum @@ -0,0 +1,219 @@ +github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= +github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containerd/stargz-snapshotter/estargz v0.18.2 h1:yXkZFYIzz3eoLwlTUZKz2iQ4MrckBxJjkmD16ynUTrw= +github.com/containerd/stargz-snapshotter/estargz v0.18.2/go.mod h1:XyVU5tcJ3PRpkA9XS2T5us6Eg35yM0214Y+wvrZTBrY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/cli v29.2.1+incompatible h1:n3Jt0QVCN65eiVBoUTZQM9mcQICCJt3akW4pKAbKdJg= +github.com/docker/cli v29.2.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= +github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker-credential-helpers v0.9.3 h1:gAm/VtF9wgqJMoxzT3Gj5p4AqIjCBS4wrsOh9yRqcz8= +github.com/docker/docker-credential-helpers v0.9.3/go.mod h1:x+4Gbw9aGmChi3qTLZj8Dfn0TD20M/fuWy0E5+WDeCo= +github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= +github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= +github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= +github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= +github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= +github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= +github.com/go-openapi/jsonpointer v0.21.1 h1:whnzv/pNXtK2FbX/W9yJfRmE2gsmkfahjMKB0fZvcic= +github.com/go-openapi/jsonpointer v0.21.1/go.mod h1:50I1STOfbY1ycR8jGz8DaMeLCdXiI6aDteEdRNNzpdk= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= +github.com/go-openapi/swag v0.23.1 h1:lpsStH0n2ittzTnbaSloVZLuB5+fvSY/+hnagBjSNZU= +github.com/go-openapi/swag v0.23.1/go.mod h1:STZs8TbRvEQQKUA+JZNAm3EWlgaOBGpyFDqQnDHMef0= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= +github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-containerregistry v0.21.2 h1:vYaMU4nU55JJGFC9JR/s8NZcTjbE9DBBbvusTW9NeS0= +github.com/google/go-containerregistry v0.21.2/go.mod h1:ctO5aCaewH4AK1AumSF5DPW+0+R+d2FmylMJdp5G7p0= +github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-20260302220502-9e0ccb0a7240 h1:83ZjkRIoUu7aBtTXFRoUib0as76RziudB8pSEdR+t+M= +github.com/google/go-containerregistry/pkg/authn/kubernetes v0.0.0-20260302220502-9e0ccb0a7240/go.mod h1:tHI2pZM69kTLaqiCqf0UETRmNw5p5jpMGq0We2j1V2E= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= +github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= +github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= +github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= +github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/vbatts/tar-split v0.12.2 h1:w/Y6tjxpeiFMR47yzZPlPj/FcPLpXbTUi/9H7d3CPa4= +github.com/vbatts/tar-split v0.12.2/go.mod h1:eF6B6i6ftWQcDqEn3/iGFRFRo8cBIMSJVOpnNdfTMFA= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= +golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= +gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/evanphx/json-patch.v4 v4.13.0 h1:czT3CmqEaQ1aanPc5SdlgQrrEIb8w/wwCvWWnfEbYzo= +gopkg.in/evanphx/json-patch.v4 v4.13.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.1.0 h1:rVV8Tcg/8jHUkPUorwjaMTtemIMVXfIPKiOqnhEhakk= +gotest.tools/v3 v3.1.0/go.mod h1:fHy7eyTmJFO5bQbUsEGQ1v4m2J3Jz9eWL54TP2/ZuYQ= +k8s.io/api v0.35.1 h1:0PO/1FhlK/EQNVK5+txc4FuhQibV25VLSdLMmGpDE/Q= +k8s.io/api v0.35.1/go.mod h1:28uR9xlXWml9eT0uaGo6y71xK86JBELShLy4wR1XtxM= +k8s.io/apiextensions-apiserver v0.32.1 h1:hjkALhRUeCariC8DiVmb5jj0VjIc1N0DREP32+6UXZw= +k8s.io/apiextensions-apiserver v0.32.1/go.mod h1:sxWIGuGiYov7Io1fAS2X06NjMIk5CbRHc2StSmbaQto= +k8s.io/apimachinery v0.35.1 h1:yxO6gV555P1YV0SANtnTjXYfiivaTPvCTKX6w6qdDsU= +k8s.io/apimachinery v0.35.1/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= +k8s.io/client-go v0.35.1 h1:+eSfZHwuo/I19PaSxqumjqZ9l5XiTEKbIaJ+j1wLcLM= +k8s.io/client-go v0.35.1/go.mod h1:1p1KxDt3a0ruRfc/pG4qT/3oHmUj1AhSHEcxNSGg+OA= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/controller-runtime v0.20.2 h1:/439OZVxoEc02psi1h4QO3bHzTgu49bb347Xp4gW1pc= +sigs.k8s.io/controller-runtime v0.20.2/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/operator/internal/controller/evaluatorpool_controller.go b/operator/internal/controller/evaluatorpool_controller.go new file mode 100644 index 00000000..e5cbe358 --- /dev/null +++ b/operator/internal/controller/evaluatorpool_controller.go @@ -0,0 +1,475 @@ +package controller + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "time" + + ridgesv1alpha1 "github.com/ridgesai/ridges/operator/api/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/prometheus/client_golang/prometheus" +) + +// OperatorConfig holds operator-level env-var configuration. +type OperatorConfig struct { + RidgesAPIURL string + PlatformURL string + InferenceGatewayURL string + ScreenerImage string + ScreenerSecretName string + CommitHash string // optional: screener code commit passed to screener pods + ImagePullSecret string // optional: secret name for pulling images from a private registry +} + +// +kubebuilder:rbac:groups=ridges.ai,resources=evaluatorpools,verbs=get;list;watch +// +kubebuilder:rbac:groups=ridges.ai,resources=evaluatorpools/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=ridges.ai,resources=evaluatorpools/scale,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=nodes,verbs=list +// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=networking.k8s.io,resources=networkpolicies,verbs=get;list;watch;create;update;patch;delete + +type EvaluatorPoolReconciler struct { + client.Client + Scheme *runtime.Scheme + Config OperatorConfig + Recorder record.EventRecorder +} + +var ( + metricQueueDepth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "evaluatorpool_queue_depth", + Help: "Current queue depth per EvaluatorPool", + }, []string{"evaluatorpool"}) + + metricDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "evaluatorpool_desired_replicas", + Help: "Desired replicas per EvaluatorPool", + }, []string{"evaluatorpool"}) + + metricPreflightStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "evaluatorpool_preflight_status", + Help: "Preflight check status (1=ok, 0=fail) per EvaluatorPool and condition", + }, []string{"evaluatorpool", "condition"}) + + metricDriftCorrections = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "evaluatorpool_drift_corrections_total", + Help: "Total drift corrections applied", + }, []string{"evaluatorpool"}) + + metricScalingErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "evaluatorpool_scaling_errors_total", + Help: "Total scaling errors", + }, []string{"evaluatorpool"}) +) + +func init() { + metrics.Registry.MustRegister( + metricQueueDepth, + metricDesiredReplicas, + metricPreflightStatus, + metricDriftCorrections, + metricScalingErrors, + ) +} + +func (r *EvaluatorPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + var ep ridgesv1alpha1.EvaluatorPool + if err := r.Get(ctx, req.NamespacedName, &ep); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + allPreflightOK := r.runPreflightChecks(ctx, &ep) + + desired := buildStatefulSet(&ep, r.Config) + if err := controllerutil.SetControllerReference(&ep, desired, r.Scheme); err != nil { + return ctrl.Result{}, fmt.Errorf("setting owner ref on StatefulSet: %w", err) + } + + desiredHash := podSpecHash(desired.Spec.Template.Spec) + if desired.Spec.Template.Annotations == nil { + desired.Spec.Template.Annotations = map[string]string{} + } + desired.Spec.Template.Annotations["ridges.ai/pod-spec-hash"] = desiredHash + + var existing appsv1.StatefulSet + err := r.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &existing) + if apierrors.IsNotFound(err) { + logger.Info("creating StatefulSet", "name", desired.Name) + if err := r.Create(ctx, desired); err != nil { + return ctrl.Result{}, fmt.Errorf("creating StatefulSet: %w", err) + } + r.Recorder.Eventf(&ep, corev1.EventTypeNormal, "Created", "Created StatefulSet %s", desired.Name) + } else if err != nil { + return ctrl.Result{}, fmt.Errorf("getting StatefulSet: %w", err) + } else { + existingHash := existing.Spec.Template.Annotations["ridges.ai/pod-spec-hash"] + replicasDrift := existing.Spec.Replicas == nil || *existing.Spec.Replicas != *desired.Spec.Replicas + + if existingHash != desiredHash || replicasDrift { + existing.Spec.Replicas = desired.Spec.Replicas + existing.Spec.Template = desired.Spec.Template + if err := r.Update(ctx, &existing); err != nil { + return ctrl.Result{}, fmt.Errorf("updating StatefulSet: %w", err) + } + logger.Info("drift corrected on StatefulSet", "name", desired.Name, + "hashChanged", existingHash != desiredHash, "replicasDrift", replicasDrift) + r.Recorder.Eventf(&ep, corev1.EventTypeWarning, "DriftCorrected", "StatefulSet %s was out of spec", desired.Name) + metricDriftCorrections.WithLabelValues(ep.Name).Inc() + } + } + + if err := r.ensurePDB(ctx, &ep); err != nil { + logger.Error(err, "ensuring PDB") + } + + if err := r.ensureNetworkPolicy(ctx, &ep); err != nil { + logger.Error(err, "ensuring NetworkPolicy") + } + + if allPreflightOK { + queueDepth, activeCount, err := r.fetchQueueDepth(ctx, &ep) + if err != nil { + logger.Error(err, "fetching queue depth") + r.Recorder.Eventf(&ep, corev1.EventTypeWarning, "QueueDepthError", "Failed to fetch queue depth: %v", err) + metricScalingErrors.WithLabelValues(ep.Name).Inc() + } else { + ep.Status.LastQueueDepth = queueDepth + ep.Status.LastPollTime = nowTime() + metricQueueDepth.WithLabelValues(ep.Name).Set(float64(queueDepth)) + + newDesired := r.computeDesiredReplicas(ctx, &ep, queueDepth, activeCount) + oldDesired := ep.Status.DesiredReplicas + + if newDesired != oldDesired { + if newDesired > oldDesired { + r.Recorder.Eventf(&ep, corev1.EventTypeNormal, "ScaleUp", + "Scaling from %d to %d (queue depth: %d, active: %d)", oldDesired, newDesired, queueDepth, activeCount) + ep.Status.LastScaleUpTime = nowTime() + } else { + r.Recorder.Eventf(&ep, corev1.EventTypeNormal, "ScaleDown", + "Scaling from %d to %d (queue depth: %d, active: %d)", oldDesired, newDesired, queueDepth, activeCount) + ep.Status.LastScaleDownTime = nowTime() + } + ep.Status.DesiredReplicas = newDesired + } + metricDesiredReplicas.WithLabelValues(ep.Name).Set(float64(newDesired)) + + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionScalingActive, + Status: metav1.ConditionTrue, + ObservedGeneration: ep.Generation, + Reason: "Polling", + Message: fmt.Sprintf("Queue depth: %d, active: %d, desired replicas: %d", queueDepth, activeCount, ep.Status.DesiredReplicas), + }) + } + } + + var latestSTS appsv1.StatefulSet + if err := r.Get(ctx, types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}, &latestSTS); err == nil { + ep.Status.CurrentReplicas = latestSTS.Status.ReadyReplicas + } + + ready := allPreflightOK && ep.Status.CurrentReplicas > 0 + readyStatus := metav1.ConditionFalse + readyReason := "NotReady" + readyMessage := "One or more preflight checks failed or no ready replicas" + if ready { + readyStatus = metav1.ConditionTrue + readyReason = "AllChecksPass" + readyMessage = "All preflight checks pass and replicas are ready" + } + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionReady, + Status: readyStatus, + ObservedGeneration: ep.Generation, + Reason: readyReason, + Message: readyMessage, + }) + + ep.Status.ObservedGeneration = ep.Generation + if err := r.Status().Update(ctx, &ep); err != nil { + return ctrl.Result{}, fmt.Errorf("updating EvaluatorPool status: %w", err) + } + + requeue := time.Duration(ep.Spec.PollingIntervalSeconds) * time.Second + return ctrl.Result{RequeueAfter: requeue}, nil +} + +func (r *EvaluatorPoolReconciler) runPreflightChecks(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) bool { + allOK := true + + secretOK := r.checkSecret(ctx, ep) + if !secretOK { + allOK = false + } + metricPreflightStatus.WithLabelValues(ep.Name, ridgesv1alpha1.ConditionSecretReady).Set(boolToFloat(secretOK)) + + apiOK := r.checkAPIReachable(ctx, ep) + if !apiOK { + allOK = false + } + metricPreflightStatus.WithLabelValues(ep.Name, ridgesv1alpha1.ConditionAPIReachable).Set(boolToFloat(apiOK)) + + nodesOK := r.checkNodesAvailable(ctx, ep) + if !nodesOK { + allOK = false + } + metricPreflightStatus.WithLabelValues(ep.Name, ridgesv1alpha1.ConditionNodesAvailable).Set(boolToFloat(nodesOK)) + + return allOK +} + +func (r *EvaluatorPoolReconciler) checkSecret(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) bool { + var secret corev1.Secret + err := r.Get(ctx, types.NamespacedName{Name: r.Config.ScreenerSecretName, Namespace: ep.Namespace}, &secret) + if err != nil { + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionSecretReady, + Status: metav1.ConditionFalse, + ObservedGeneration: ep.Generation, + Reason: "SecretMissing", + Message: fmt.Sprintf("Secret %q not found. Create it from validator/.env: make dev-secrets -n %s", + r.Config.ScreenerSecretName, ep.Namespace), + }) + r.Recorder.Eventf(ep, corev1.EventTypeWarning, "PreflightFailed", "Secret %s not found", r.Config.ScreenerSecretName) + return false + } + + if _, ok := secret.Data["SCREENER_PASSWORD"]; !ok { + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionSecretReady, + Status: metav1.ConditionFalse, + ObservedGeneration: ep.Generation, + Reason: "SecretKeyMissing", + Message: fmt.Sprintf("Secret %q exists but missing 'SCREENER_PASSWORD' key", r.Config.ScreenerSecretName), + }) + return false + } + + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionSecretReady, + Status: metav1.ConditionTrue, + ObservedGeneration: ep.Generation, + Reason: "SecretFound", + Message: "Secret exists with required keys", + }) + return true +} + +func (r *EvaluatorPoolReconciler) checkAPIReachable(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) bool { + url := fmt.Sprintf("%s/screener/queue-depth?stage=%s", r.Config.RidgesAPIURL, ep.Spec.Stage) + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(url) + if err != nil { + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionAPIReachable, + Status: metav1.ConditionFalse, + ObservedGeneration: ep.Generation, + Reason: "Unreachable", + Message: fmt.Sprintf("Cannot reach Ridges API at %s: %v. Check RIDGES_API_URL and network.", r.Config.RidgesAPIURL, err), + }) + r.Recorder.Eventf(ep, corev1.EventTypeWarning, "PreflightFailed", "Ridges API unreachable: %v", err) + return false + } + resp.Body.Close() + + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionAPIReachable, + Status: metav1.ConditionTrue, + ObservedGeneration: ep.Generation, + Reason: "Reachable", + Message: "Ridges API is reachable", + }) + return true +} + +func (r *EvaluatorPoolReconciler) checkNodesAvailable(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) bool { + var nodes corev1.NodeList + if err := r.List(ctx, &nodes, client.MatchingLabels{"node.cluster.x-k8s.io/ridges-evaluator": "true"}); err != nil { + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionNodesAvailable, + Status: metav1.ConditionFalse, + ObservedGeneration: ep.Generation, + Reason: "ListError", + Message: fmt.Sprintf("Failed to list nodes: %v", err), + }) + return false + } + + if len(nodes.Items) == 0 { + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionNodesAvailable, + Status: metav1.ConditionFalse, + ObservedGeneration: ep.Generation, + Reason: "NoNodes", + Message: "No nodes with label node.cluster.x-k8s.io/ridges-evaluator=true. Label a node: kubectl label node node.cluster.x-k8s.io/ridges-evaluator=true", + }) + r.Recorder.Eventf(ep, corev1.EventTypeWarning, "PreflightFailed", "No DinD-capable nodes found") + return false + } + + meta.SetStatusCondition(&ep.Status.Conditions, metav1.Condition{ + Type: ridgesv1alpha1.ConditionNodesAvailable, + Status: metav1.ConditionTrue, + ObservedGeneration: ep.Generation, + Reason: "NodesFound", + Message: fmt.Sprintf("%d DinD-capable node(s) available", len(nodes.Items)), + }) + return true +} + +func (r *EvaluatorPoolReconciler) ensurePDB(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) error { + desired := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: ep.Name, + Namespace: ep.Namespace, + Labels: buildLabels(ep), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: intstrPtr(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/instance": ep.Name, + }, + }, + }, + } + + if err := controllerutil.SetControllerReference(ep, desired, r.Scheme); err != nil { + return err + } + + var existing policyv1.PodDisruptionBudget + err := r.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &existing) + if apierrors.IsNotFound(err) { + return r.Create(ctx, desired) + } else if err != nil { + return err + } + + existing.Spec = desired.Spec + return r.Update(ctx, &existing) +} + +func (r *EvaluatorPoolReconciler) ensureNetworkPolicy(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) error { + dnsPort := intstr.FromInt32(53) + httpPort := intstr.FromInt32(80) + httpsPort := intstr.FromInt32(443) + apiPort := intstr.FromInt32(8000) + gatewayPort := intstr.FromInt32(8080) + udp := corev1.ProtocolUDP + tcp := corev1.ProtocolTCP + + desired := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: ep.Name, + Namespace: ep.Namespace, + Labels: buildLabels(ep), + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/instance": ep.Name, + }, + }, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeEgress}, + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + Ports: []networkingv1.NetworkPolicyPort{ + {Port: &dnsPort, Protocol: &udp}, + }, + }, + { + Ports: []networkingv1.NetworkPolicyPort{ + {Port: &httpPort, Protocol: &tcp}, + {Port: &httpsPort, Protocol: &tcp}, + }, + }, + { + Ports: []networkingv1.NetworkPolicyPort{ + {Port: &apiPort, Protocol: &tcp}, + {Port: &gatewayPort, Protocol: &tcp}, + }, + To: []networkingv1.NetworkPolicyPeer{ + { + NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "kubernetes.io/metadata.name": ep.Namespace, + }, + }, + }, + }, + }, + }, + }, + } + + if err := controllerutil.SetControllerReference(ep, desired, r.Scheme); err != nil { + return err + } + + var existing networkingv1.NetworkPolicy + err := r.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, &existing) + if apierrors.IsNotFound(err) { + return r.Create(ctx, desired) + } else if err != nil { + return err + } + + existing.Spec = desired.Spec + return r.Update(ctx, &existing) +} + +func (r *EvaluatorPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&ridgesv1alpha1.EvaluatorPool{}). + Owns(&appsv1.StatefulSet{}). + Owns(&policyv1.PodDisruptionBudget{}). + Owns(&networkingv1.NetworkPolicy{}). + Complete(r) +} + +func intstrPtr(val int32) *intstr.IntOrString { + v := intstr.FromInt32(val) + return &v +} + +func boolToFloat(b bool) float64 { + if b { + return 1 + } + return 0 +} + +func podSpecHash(spec corev1.PodSpec) string { + data, _ := json.Marshal(spec) + sum := sha256.Sum256(data) + return fmt.Sprintf("%x", sum)[:16] +} diff --git a/operator/internal/controller/pod_builder.go b/operator/internal/controller/pod_builder.go new file mode 100644 index 00000000..b66bffcf --- /dev/null +++ b/operator/internal/controller/pod_builder.go @@ -0,0 +1,319 @@ +package controller + +import ( + "strings" + + ridgesv1alpha1 "github.com/ridgesai/ridges/operator/api/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func buildStatefulSet(ep *ridgesv1alpha1.EvaluatorPool, cfg OperatorConfig) *appsv1.StatefulSet { + labels := buildLabels(ep) + replicas := ep.Status.DesiredReplicas + terminationGrace := int64(900) + automountSA := false + + priorityClass := "screener-1-priority" + if ep.Spec.Stage == "screener_2" { + priorityClass = "screener-2-priority" + } + + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: ep.Name, + Namespace: ep.Namespace, + Labels: labels, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + ServiceName: ep.Name, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/instance": ep.Name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: &terminationGrace, + AutomountServiceAccountToken: &automountSA, + PriorityClassName: priorityClass, + ImagePullSecrets: imagePullSecrets(cfg), + SecurityContext: &corev1.PodSecurityContext{ + FSGroup: int64Ptr(1000), + SeccompProfile: &corev1.SeccompProfile{ + Type: corev1.SeccompProfileTypeRuntimeDefault, + }, + }, + NodeSelector: map[string]string{ + "node.cluster.x-k8s.io/ridges-evaluator": "true", + }, + Containers: []corev1.Container{ + buildScreenerContainer(ep, cfg), + buildDindContainer(ep, cfg), + }, + Volumes: []corev1.Volume{ + { + Name: "workspace", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{ + SizeLimit: quantityPtr("10Gi"), + }}, + }, + { + Name: "tmp", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + SizeLimit: quantityPtr("64Mi"), + }}, + }, + { + Name: "home", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{ + SizeLimit: quantityPtr("64Mi"), + }}, + }, + { + Name: "app-logs", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{ + SizeLimit: quantityPtr("1Gi"), + }}, + }, + }, + }, + }, + VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "docker-storage", + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("50Gi"), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "swebench-repos", + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("5Gi"), + }, + }, + }, + }, + }, + }, + } + + return sts +} + +func buildScreenerContainer(ep *ridgesv1alpha1.EvaluatorPool, cfg OperatorConfig) corev1.Container { + runAsUser := int64(1000) + allowPrivEsc := false + readOnlyRoot := true + + envFrom := []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: cfg.ScreenerSecretName}, + }, + }, + } + + env := []corev1.EnvVar{ + {Name: "DIND_RESOLVE_GATEWAY", Value: "true"}, + {Name: "MODE", Value: "screener"}, + {Name: "DOCKER_HOST", Value: "tcp://localhost:2375"}, + {Name: "TMPDIR", Value: "/workspace/tmp"}, + {Name: "RIDGES_INFERENCE_GATEWAY_URL", Value: cfg.InferenceGatewayURL}, + {Name: "RIDGES_PLATFORM_URL", Value: cfg.PlatformURL}, + } + if cfg.CommitHash != "" { + env = append(env, corev1.EnvVar{Name: "COMMIT_HASH", Value: cfg.CommitHash}) + } + + return corev1.Container{ + Name: "screener", + Image: cfg.ScreenerImage, + ImagePullPolicy: corev1.PullAlways, + EnvFrom: envFrom, + Env: env, + SecurityContext: &corev1.SecurityContext{ + RunAsUser: &runAsUser, + AllowPrivilegeEscalation: &allowPrivEsc, + ReadOnlyRootFilesystem: &readOnlyRoot, + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"ALL"}, + }, + }, + Lifecycle: &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"sh", "-c", "sleep 5"}, + }, + }, + }, + Resources: screenerResources(ep), + VolumeMounts: []corev1.VolumeMount{ + {Name: "workspace", MountPath: "/workspace"}, + {Name: "tmp", MountPath: "/tmp"}, + {Name: "home", MountPath: "/home/screener"}, + {Name: "app-logs", MountPath: "/app/logs"}, + {Name: "swebench-repos", MountPath: "/app/evaluator/datasets/swebench_verified/repos"}, + }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.FromInt32(8080), + }, + }, + FailureThreshold: 360, + PeriodSeconds: 10, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.FromInt32(8080), + }, + }, + PeriodSeconds: 10, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt32(8080), + }, + }, + PeriodSeconds: 30, + FailureThreshold: 3, + }, + } +} + +func buildDindContainer(ep *ridgesv1alpha1.EvaluatorPool, cfg OperatorConfig) corev1.Container { + privileged := true + + dindFlags := "--userns-remap=default --host=tcp://127.0.0.1:2375 --storage-driver=overlay2" + + return corev1.Container{ + Name: "dind", + Image: "docker:27-dind", + ImagePullPolicy: corev1.PullAlways, + Command: []string{"/bin/sh", "-c", strings.TrimSpace(` +grep -q '^dockremap:' /etc/group || addgroup -S dockremap +grep -q '^dockremap:' /etc/passwd || adduser -S -G dockremap dockremap +grep -q 'dockremap:100000' /etc/subuid || echo 'dockremap:100000:65536' >> /etc/subuid +grep -q 'dockremap:100000' /etc/subgid || echo 'dockremap:100000:65536' >> /etc/subgid +dockerd-entrypoint.sh dockerd ` + dindFlags + ` & +DPID=$! +trap 'kill -TERM $DPID; wait $DPID' TERM +until docker info >/dev/null 2>&1; do sleep 1; done +iptables -I DOCKER-USER -d 169.254.169.254/32 -j DROP 2>/dev/null || iptables-legacy -I DOCKER-USER -d 169.254.169.254/32 -j DROP 2>/dev/null || true +wait $DPID +`)}, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + Env: []corev1.EnvVar{ + {Name: "DOCKER_HOST", Value: "tcp://127.0.0.1:2375"}, + {Name: "DOCKER_TLS_CERTDIR", Value: ""}, + }, + Resources: dindResources(ep), + VolumeMounts: []corev1.VolumeMount{ + {Name: "docker-storage", MountPath: "/var/lib/docker"}, + {Name: "workspace", MountPath: "/workspace"}, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"docker", "info"}, + }, + }, + PeriodSeconds: 10, + FailureThreshold: 30, + }, + } +} + +func buildLabels(ep *ridgesv1alpha1.EvaluatorPool) map[string]string { + return map[string]string{ + "app.kubernetes.io/name": "ridges-screener", + "app.kubernetes.io/instance": ep.Name, + "app.kubernetes.io/component": "screener", + "app.kubernetes.io/part-of": "ridges", + "app.kubernetes.io/managed-by": "ridges-operator", + } +} + +func stsName(ep *ridgesv1alpha1.EvaluatorPool) string { + return ep.Name +} + +func screenerResources(ep *ridgesv1alpha1.EvaluatorPool) corev1.ResourceRequirements { + defaults := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + } + if ep.Spec.Resources != nil && ep.Spec.Resources.Screener != nil { + return *ep.Spec.Resources.Screener + } + return defaults +} + +func dindResources(ep *ridgesv1alpha1.EvaluatorPool) corev1.ResourceRequirements { + defaults := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + } + if ep.Spec.Resources != nil && ep.Spec.Resources.Dind != nil { + return *ep.Spec.Resources.Dind + } + return defaults +} + +func quantityPtr(s string) *resource.Quantity { + q := resource.MustParse(s) + return &q +} + +func int64Ptr(i int64) *int64 { + return &i +} + +func imagePullSecrets(cfg OperatorConfig) []corev1.LocalObjectReference { + if cfg.ImagePullSecret == "" { + return nil + } + return []corev1.LocalObjectReference{{Name: cfg.ImagePullSecret}} +} diff --git a/operator/internal/controller/scaler.go b/operator/internal/controller/scaler.go new file mode 100644 index 00000000..46759c4c --- /dev/null +++ b/operator/internal/controller/scaler.go @@ -0,0 +1,112 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + ridgesv1alpha1 "github.com/ridgesai/ridges/operator/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type queueDepthResponse struct { + Depth int32 `json:"depth"` + Stage string `json:"stage"` + Active int32 `json:"active"` +} + +func (r *EvaluatorPoolReconciler) fetchQueueDepth(ctx context.Context, ep *ridgesv1alpha1.EvaluatorPool) (int32, int32, error) { + url := fmt.Sprintf("%s/screener/queue-depth?stage=%s", r.Config.RidgesAPIURL, ep.Spec.Stage) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(url) + if err != nil { + return 0, 0, fmt.Errorf("GET %s: %w", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return 0, 0, fmt.Errorf("GET %s returned %d: %s", url, resp.StatusCode, string(body)) + } + + var qd queueDepthResponse + if err := json.NewDecoder(resp.Body).Decode(&qd); err != nil { + return 0, 0, fmt.Errorf("decoding queue depth response: %w", err) + } + + return qd.Depth, qd.Active, nil +} + +func (r *EvaluatorPoolReconciler) computeDesiredReplicas( + ctx context.Context, + ep *ridgesv1alpha1.EvaluatorPool, + queueDepth int32, + activeCount int32, +) int32 { + logger := log.FromContext(ctx) + min := ep.Spec.Scaling.MinReplicas + max := ep.Spec.Scaling.MaxReplicas + + // Queue depth = agents waiting for a screener. + // Active count = evaluations currently being processed (1 per screener). + // Total demand = waiting + in-progress. + desired := queueDepth + activeCount + if desired < min { + desired = min + } + if desired > max { + desired = max + } + + current := ep.Status.DesiredReplicas + + if desired < current { + if activeCount > 0 { + logger.V(1).Info("scale-down blocked: evaluations in progress", + "activeCount", activeCount, + "keeping", current, + ) + return current + } + + stabilization := time.Duration(ep.Spec.Scaling.ScaleDownStabilizationSeconds) * time.Second + + if ep.Status.LastScaleUpTime != nil { + if elapsed := time.Since(ep.Status.LastScaleUpTime.Time); elapsed < stabilization { + logger.V(1).Info("scale-down blocked: recent scale-up", + "elapsed", elapsed, + "stabilization", stabilization, + "keeping", current, + ) + return current + } + } + + if ep.Status.LastScaleDownTime != nil { + if elapsed := time.Since(ep.Status.LastScaleDownTime.Time); elapsed < stabilization { + logger.V(1).Info("scale-down stabilization active", + "elapsed", elapsed, + "stabilization", stabilization, + "keeping", current, + ) + return current + } + } + + if desired < current-1 { + desired = current - 1 + } + } + + return desired +} + +func nowTime() *metav1.Time { + t := metav1.Now() + return &t +} diff --git a/queries/agent.py b/queries/agent.py index 3d17382d..06aa7d13 100644 --- a/queries/agent.py +++ b/queries/agent.py @@ -228,6 +228,26 @@ async def get_top_agents( return [AgentScored(**agent) for agent in results] +@db_operation +async def get_queue_depth(conn: DatabaseConnection, queue_stage: EvaluationSetGroup) -> int: + queue_to_query = f"{queue_stage.value}_queue" + result = await conn.fetchval(f"SELECT COUNT(*) FROM {queue_to_query}") + return result or 0 + + +@db_operation +async def get_active_evaluation_count(conn: DatabaseConnection, queue_stage: EvaluationSetGroup) -> int: + result = await conn.fetchval( + """ + SELECT COUNT(*) FROM evaluations_hydrated + WHERE status = 'running' + AND evaluation_set_group = $1::EvaluationSetGroup + """, + queue_stage.value + ) + return result or 0 + + @db_operation async def get_agents_in_queue(conn: DatabaseConnection, queue_stage: EvaluationSetGroup) -> list[Agent]: # TODO ALEX from ADAM: Modify this in the view itself rather than branching explicitly here. diff --git a/validator/Dockerfile b/validator/Dockerfile new file mode 100644 index 00000000..cc834420 --- /dev/null +++ b/validator/Dockerfile @@ -0,0 +1,42 @@ +FROM ghcr.io/astral-sh/uv:latest AS uv +FROM docker:27-cli AS docker-cli + +FROM alpine/git:latest AS git-shallow +WORKDIR /repo +COPY .git .git +RUN git clone --depth 1 --no-checkout file:///repo /shallow + +FROM python:3.12-slim-bookworm AS deps +WORKDIR /app +COPY --from=uv /uv /bin/uv +COPY pyproject.toml uv.lock ./ +RUN uv sync --frozen --no-dev --no-cache + +FROM python:3.12-slim-bookworm AS runtime +RUN apt-get update && \ + apt-get install -y --no-install-recommends git && \ + rm -rf /var/lib/apt/lists/* +RUN useradd -u 1000 -m screener && \ + git config --system user.email "screener@ridges.ai" && \ + git config --system user.name "ridges-screener" && \ + git config --system safe.directory /app +COPY --from=docker-cli /usr/local/bin/docker /usr/local/bin/docker +COPY --from=deps /app/.venv /app/.venv +COPY api/ /app/api/ +COPY evaluator/ /app/evaluator/ +COPY models/ /app/models/ +COPY queries/ /app/queries/ +COPY utils/ /app/utils/ +COPY validator/ /app/validator/ +COPY pyproject.toml /app/ +COPY --from=git-shallow /shallow/.git /app/.git +WORKDIR /app +ENV PATH="/app/.venv/bin:$PATH" \ + PYTHONUNBUFFERED=1 +COPY validator/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh && \ + mkdir -p /app/logs && chown 1000:1000 /app/logs && \ + mkdir -p /app/evaluator/datasets/swebench_verified/repos && \ + chown -R 1000:1000 /app/evaluator/datasets +USER 1000 +ENTRYPOINT ["/entrypoint.sh"] diff --git a/validator/Dockerfile.dockerignore b/validator/Dockerfile.dockerignore new file mode 100644 index 00000000..d6d5562c --- /dev/null +++ b/validator/Dockerfile.dockerignore @@ -0,0 +1,26 @@ +.gitignore +.venv +__pycache__ +*.pyc +*.pyo +.mypy_cache +.ruff_cache +.pytest_cache +*.egg-info +dist +build +.env +.env.* +operator/ +docs/ +setup/ +test_agent.py +test_agent_problem_sets.json +*.md +deploy/ +hack/ +.github/ +.idea/ +Makefile +inference_gateway/ +evaluator/datasets/swebench_verified/repos/ diff --git a/validator/entrypoint.sh b/validator/entrypoint.sh new file mode 100644 index 00000000..506a6c90 --- /dev/null +++ b/validator/entrypoint.sh @@ -0,0 +1,20 @@ +#!/bin/sh +export SCREENER_NAME="${SCREENER_NAME:-$HOSTNAME}" +mkdir -p /workspace/tmp + +# Resolve the gateway hostname to a ClusterIP so the +# DinD-internal proxy container can reach it (no K8s DNS inside DinD). +if [ "$DIND_RESOLVE_GATEWAY" = "true" ] && [ -n "$RIDGES_INFERENCE_GATEWAY_URL" ]; then + GW_HOST=$(echo "$RIDGES_INFERENCE_GATEWAY_URL" | sed -E 's|^https?://||;s|[:/].*||') + GW_IP=$(getent hosts "$GW_HOST" | awk '{print $1}') + if [ -n "$GW_IP" ]; then + export RIDGES_INFERENCE_GATEWAY_URL=$(echo "$RIDGES_INFERENCE_GATEWAY_URL" | sed "s|$GW_HOST|$GW_IP|") + export RIDGES_INFERENCE_GATEWAY_HOST="$GW_HOST" + echo "Resolved gateway $GW_HOST -> $GW_IP" + fi +fi + +trap 'kill -TERM $PID; wait $PID' TERM +python -m validator.main & +PID=$! +wait $PID diff --git a/validator/health_server.py b/validator/health_server.py new file mode 100644 index 00000000..c28990c2 --- /dev/null +++ b/validator/health_server.py @@ -0,0 +1,55 @@ +"""Lightweight HTTP health server for K8s probes. + +Exposes: + GET /readyz -> 200 once ready, 503 before or during shutdown + GET /healthz -> 200 always (event loop responsiveness = liveness) +""" + +import asyncio +import utils.logger as logger + + +class HealthServer: + def __init__(self, port: int = 8080): + self._port = port + self._ready = False + + async def start(self) -> None: + server = await asyncio.start_server(self._handle, "0.0.0.0", self._port) + asyncio.create_task(server.serve_forever()) + logger.info(f"Health server listening on :{self._port}") + + def mark_ready(self) -> None: + self._ready = True + + def mark_shutting_down(self) -> None: + self._ready = False + + async def _handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + request_line = await reader.readline() + while True: + line = await reader.readline() + if line == b"\r\n" or line == b"" or line == b"\n": + break + + path = "" + if request_line: + parts = request_line.decode(errors="replace").split() + if len(parts) >= 2: + path = parts[1] + + if path == "/readyz": + ok = self._ready + elif path == "/healthz": + ok = True + else: + writer.write(b"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\n\r\n") + await writer.drain() + writer.close() + return + + status = "200 OK" if ok else "503 Service Unavailable" + body = "ok" if ok else "not ready" + writer.write(f"HTTP/1.1 {status}\r\nContent-Length: {len(body)}\r\n\r\n{body}".encode()) + await writer.drain() + writer.close() diff --git a/validator/main.py b/validator/main.py index d5f84ab6..c4a7f614 100644 --- a/validator/main.py +++ b/validator/main.py @@ -5,6 +5,7 @@ import sys import time import httpx +import signal import random import asyncio import pathlib @@ -18,6 +19,7 @@ from utils.git import COMMIT_HASH, reset_local_repo from evaluator.models import EvaluationRunException from utils.system_metrics import get_system_metrics +from validator.health_server import HealthServer from models.evaluation_set import EvaluationSetProblem from evaluator.sandbox.sandbox_manager import SandboxManager from validator.http_utils import get_ridges_platform, post_ridges_platform @@ -38,6 +40,10 @@ # The sandbox manager and problem suites sandbox_manager = None problem_suites = [] +health = None + +# Graceful shutdown: set by SIGTERM handler +_shutdown_event = None @@ -55,18 +61,23 @@ async def disconnect(reason: str): logger.error(traceback.format_exc()) os._exit(1) - - # A loop that sends periodic heartbeats to the Ridges platform async def send_heartbeat_loop(): try: logger.info("Starting send heartbeat loop...") - while True: + while not _shutdown_event.is_set(): logger.info("Sending heartbeat...") system_metrics = await get_system_metrics() await post_ridges_platform("/validator/heartbeat", ValidatorHeartbeatRequest(system_metrics=system_metrics), bearer_token=session_id, quiet=2, timeout=5) - await asyncio.sleep(config.SEND_HEARTBEAT_INTERVAL_SECONDS) + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=config.SEND_HEARTBEAT_INTERVAL_SECONDS) + break + except asyncio.TimeoutError: + pass + logger.info("Heartbeat loop stopped (shutdown)") except Exception as e: + if _shutdown_event and _shutdown_event.is_set(): + return logger.error(f"Error in send_heartbeat_loop(): {type(e).__name__}: {e}") logger.error(traceback.format_exc()) os._exit(1) @@ -305,8 +316,24 @@ async def main(): global max_evaluation_run_log_size_bytes global sandbox_manager global problem_suites + global health + global _shutdown_event + + health = HealthServer() + await health.start() + _shutdown_event = asyncio.Event() + # SIGTERM handler: mark not-ready immediately so K8s stops sending + # traffic, then signal the main loop to drain and exit. + def _handle_sigterm(sig, frame): + logger.info("Received SIGTERM, initiating graceful shutdown...") + health.mark_shutting_down() + asyncio.get_event_loop().call_soon_threadsafe(_shutdown_event.set) + + signal.signal(signal.SIGTERM, _handle_sigterm) + + logger.info(f"Commit SHA: {COMMIT_HASH}") # Register with the Ridges platform, yielding us a session ID logger.info("Registering validator...") @@ -350,25 +377,21 @@ async def main(): logger.info(f" Running Evaluation Timeout: {running_eval_timeout_seconds} second(s)") logger.info(f" Max Evaluation Run Log Size: {max_evaluation_run_log_size_bytes} byte(s)") - - # Create the sandbox manager sandbox_manager = SandboxManager(config.RIDGES_INFERENCE_GATEWAY_URL) # Load all problem suites problem_suites = [POLYGLOT_PY_SUITE, POLYGLOT_JS_SUITE, SWEBENCH_VERIFIED_SUITE] - - # Get all the problems in the latest set latest_set_problems_data = await get_ridges_platform("/evaluation-sets/all-latest-set-problems", quiet=1) latest_set_problems = [EvaluationSetProblem(**prob) for prob in latest_set_problems_data] latest_set_problem_names = list({prob.problem_name for prob in latest_set_problems}) - + # Prebuild the images for the SWE-Bench Verified problems SWEBENCH_VERIFIED_SUITE.prebuild_problem_images(latest_set_problem_names) - + health.mark_ready() # Start the send heartbeat loop asyncio.create_task(send_heartbeat_loop()) @@ -377,27 +400,35 @@ async def main(): # Start the set weights loop asyncio.create_task(set_weights_loop()) - - - # Loop forever, just keep requesting evaluations and running them - while True: + # Main evaluation loop: exits when shutdown event is set. + # If idle, wakes immediately on SIGTERM. If running an evaluation, + # finishes the current one before exiting. + while not _shutdown_event.is_set(): logger.info("Requesting an evaluation...") request_evaluation_response_data = await post_ridges_platform("/validator/request-evaluation", ValidatorRequestEvaluationRequest(), bearer_token=session_id, quiet=1) - # If no evaluation is available, wait and try again if request_evaluation_response_data is None: logger.info(f"No evaluations available. Waiting for {config.REQUEST_EVALUATION_INTERVAL_SECONDS} seconds...") - await asyncio.sleep(config.REQUEST_EVALUATION_INTERVAL_SECONDS) + try: + await asyncio.wait_for( + _shutdown_event.wait(), + timeout=config.REQUEST_EVALUATION_INTERVAL_SECONDS + ) + except asyncio.TimeoutError: + pass continue await _run_evaluation(ValidatorRequestEvaluationResponse(**request_evaluation_response_data)) + logger.info("Shutdown: disconnecting from platform...") + await disconnect("SIGTERM") if __name__ == "__main__": try: asyncio.run(main()) + logger.info("Shutdown complete") except KeyboardInterrupt: logger.warning("Keyboard interrupt") asyncio.run(disconnect("Keyboard interrupt"))