diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 1fbd3acc..04dbc836 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -7,6 +7,7 @@ import ( "net/http" "os" + rbg "github.com/bcfre/rbg-api/api/workloads/v1alpha1" kedav1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ray "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" zaplog "go.uber.org/zap" @@ -220,6 +221,7 @@ func main() { {ray.SchemeGroupVersion, constants.RayClusterKind, ray.AddToScheme}, {knservingv1.SchemeGroupVersion, constants.KnativeServiceKind, knservingv1.AddToScheme}, {lws.SchemeGroupVersion, constants.LWSKind, lws.AddToScheme}, + {rbg.SchemeGroupVersion, constants.RBGKind, rbg.AddToScheme}, {volcano.SchemeGroupVersion, constants.VolcanoQueueKind, volcano.AddToScheme}, {volcanobatch.SchemeGroupVersion, constants.VolcanoJobKind, volcanobatch.AddToScheme}, {kedav1.SchemeGroupVersion, constants.KEDAScaledObjectKind, kedav1.AddToScheme}, diff --git a/config/samples/rbg/rbg-qwen3-0-6b.yaml b/config/samples/rbg/rbg-qwen3-0-6b.yaml new file mode 100644 index 00000000..ef3e153a --- /dev/null +++ b/config/samples/rbg/rbg-qwen3-0-6b.yaml @@ -0,0 +1,119 @@ +# apiVersion: ome.io/v1beta1 +# kind: InferenceService +# metadata: +# name: qwen3-0-6b +# # namespace: qwen3-0-6b +# annotations: +# ome.io/deploymentMode: "RoleBasedGroup" +# spec: +# model: +# name: qwen3-0-6b +# runtime: +# name: srt-qwen3-0-6b +# # router: +# # minReplicas: 1 +# # maxReplicas: 1 +# engine: +# minReplicas: 1 +# maxReplicas: 1 +--- +apiVersion: ome.io/v1beta1 +kind: InferenceService +metadata: + name: qwen3-0-6b-pd + annotations: + ome.io/deploymentMode: "RoleBasedGroup" +spec: + model: + name: qwen3-0-6b + runtime: + name: srt-qwen3-0-6b-pd + # router: + # minReplicas: 2 + # maxReplicas: 2 + engine: + minReplicas: 2 + maxReplicas: 2 + runner: + name: ome-container + # image: docker.io/lmsysorg/sglang:v0.5.4.post3-cu129-amd64 + # image: docker.io/lmsysorg/sglang:v0.5.5.post3-cu129-amd64 + # image: nginx:latest + env: + - name: test4 + value: test4 + # command: + # - sh + # - -c + # - "sleep infinity" + decoder: + minReplicas: 2 + maxReplicas: 2 + runner: + name: ome-container + # image: docker.io/lmsysorg/sglang:v0.5.4.post3-cu129-amd64 + image: nginx:latest-not-exist-4 + # command: + # - sh + # - -c + # - "sleep infinity" +--- +# apiVersion: ome.io/v1beta1 +# kind: InferenceService +# metadata: +# name: qwen3-0-6b-pd +# annotations: +# ome.io/deploymentMode: "RoleBasedGroup" +# spec: +# model: +# name: qwen3-0-6b +# runtime: +# name: srt-qwen3-0-6b-pd +# # router: +# # minReplicas: 2 +# # maxReplicas: 2 +# engine: +# minReplicas: 2 +# maxReplicas: 2 +# # runner: +# # name: ome-container +# # image: docker.io/lmsysorg/sglang:v0.5.4.post3-cu129-amd64 +# # image: docker.io/lmsysorg/sglang:v0.5.5.post3-cu129-amd64 +# # image: nginx:latest +# # command: +# # - sh +# # - -c +# # - "sleep infinity" +# decoder: +# minReplicas: 2 +# maxReplicas: 2 +# # runner: +# # name: ome-container +# # image: docker.io/lmsysorg/sglang:v0.5.4.post3-cu129-amd64 +# # image: nginx:latest +# # command: +# # - sh +# # - -c +# # - "sleep infinity" +--- +# apiVersion: ome.io/v1beta1 +# kind: InferenceService +# metadata: +# name: qwen3-0-6b-mn-pd +# # namespace: qwen3-0-6b +# annotations: +# ome.io/deploymentMode: "RoleBasedGroup" +# spec: +# model: +# name: qwen3-0-6b +# runtime: +# name: srt-qwen3-0-6b-mn-pd +# router: +# minReplicas: 1 +# maxReplicas: 1 +# engine: +# minReplicas: 1 +# maxReplicas: 1 +# decoder: +# minReplicas: 1 +# maxReplicas: 1 \ No newline at end of file diff --git a/go.mod b/go.mod index eba3b8b8..f3d53f74 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/sgl-project/ome -go 1.25 +go 1.25.5 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.2 @@ -12,9 +12,10 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.87.3 github.com/aws/aws-sdk-go-v2/service/sts v1.38.2 github.com/aws/smithy-go v1.24.0 + github.com/bcfre/rbg-api v0.5.0 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-gonic/gin v1.10.0 - github.com/go-logr/logr v1.4.2 + github.com/go-logr/logr v1.4.3 github.com/go-playground/validator/v10 v10.20.0 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 @@ -40,7 +41,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/oauth2 v0.29.0 golang.org/x/sys v0.39.0 - golang.org/x/term v0.33.0 + golang.org/x/term v0.37.0 gomodules.xyz/jsonpatch/v2 v2.4.0 google.golang.org/api v0.231.0 google.golang.org/protobuf v1.36.10 @@ -53,7 +54,7 @@ require ( k8s.io/client-go v0.33.7 k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff - k8s.io/utils v0.0.0-20241210054802-24370beab758 + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 knative.dev/pkg v0.0.0-20231115001034-97c7258e3a98 knative.dev/serving v0.39.3 sigs.k8s.io/controller-runtime v0.19.7 @@ -106,7 +107,7 @@ require ( github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -150,7 +151,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/openzipkin/zipkin-go v0.4.2 // indirect @@ -189,13 +190,13 @@ require ( go.uber.org/dig v1.18.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.40.0 // indirect + golang.org/x/crypto v0.44.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/net v0.42.0 // indirect - golang.org/x/sync v0.16.0 // indirect - golang.org/x/text v0.28.0 // indirect + golang.org/x/net v0.47.0 // indirect + golang.org/x/sync v0.18.0 // indirect + golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.11.0 // indirect - golang.org/x/tools v0.35.0 // indirect + golang.org/x/tools v0.38.0 // indirect google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect @@ -209,7 +210,7 @@ require ( k8s.io/component-base v0.33.7 // indirect knative.dev/networking v0.0.0-20231115015815-3af9769712cd // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect - sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect ) diff --git a/go.sum b/go.sum index 32cb246e..d4361696 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.38.2 h1:YZPjhyaGzhDQEvsffDEcpycq49nl github.com/aws/aws-sdk-go-v2/service/sts v1.38.2/go.mod h1:2dIN8qhQfv37BdUYGgEC8Q3tteM3zFxTI1MLO2O3J3c= github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/bcfre/rbg-api v0.5.0 h1:gHCYiyC/jonPMrqEyAeqL2/Mj9aUEJooggmQjMCeo4Q= +github.com/bcfre/rbg-api v0.5.0/go.mod h1:XdqxR8LVaCrZfnZcnr7vonIlptqIiIHjNLPJdgsUqgg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -172,8 +174,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= -github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= -github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= +github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -197,8 +199,8 @@ github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= @@ -402,8 +404,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -620,8 +623,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= -golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= +golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= +golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -654,8 +657,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg= -golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -691,8 +694,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= -golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -713,8 +716,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= -golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -766,8 +769,8 @@ golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= -golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -775,8 +778,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= -golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -824,8 +827,8 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= -golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -977,15 +980,12 @@ k8s.io/client-go v0.33.7 h1:sEcU4syZnbwaiGDctJE6G/IKsuays3wjEWGuyrD7M8c= k8s.io/client-go v0.33.7/go.mod h1:0MEM10zY5dGdc3FdkyNCTKXiTr8P+2Vj65njzvE0Vhw= k8s.io/component-base v0.33.7 h1:r3xd2l2lngeiOrQhpnD7CYtgbbrTDBnO3qyDUUfwTXw= k8s.io/component-base v0.33.7/go.mod h1:3v7hH1NvNLID9BUBAR/FqM9StQ/Sa4yBDxEzE1yvGFg= -k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks= -k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 h1:si3PfKm8dDYxgfbeA6orqrtLkvvIeH8UqffFJDl0bz4= -k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= -k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= -k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= knative.dev/networking v0.0.0-20231115015815-3af9769712cd h1:VDtYz+hybqIAEp8NM2tAi2QV4D8Cc5DWLoXLi5IcZjE= knative.dev/networking v0.0.0-20231115015815-3af9769712cd/go.mod h1:HQ3rA7qrKVWvZUl6GGQefn/PzNXlX4e94KpbwBEjFcQ= knative.dev/pkg v0.0.0-20231115001034-97c7258e3a98 h1:uvOLwp5Ar7oJlaYEszh51CemuZc1sRRI14xzKhUEF3U= @@ -1003,8 +1003,8 @@ sigs.k8s.io/controller-runtime v0.19.7 h1:DLABZfMr20A+AwCZOHhcbcu+TqBXnJZaVBri9K sigs.k8s.io/controller-runtime v0.19.7/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/gateway-api v1.2.1 h1:fZZ/+RyRb+Y5tGkwxFKuYuSRQHu9dZtbjenblleOLHM= sigs.k8s.io/gateway-api v1.2.1/go.mod h1:EpNfEXNjiYfUJypf0eZ0P5iXA9ekSGWaS1WgPaM42X0= -sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= -sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/kueue v0.10.6 h1:r/ULCI0wXYvkbl0vliq8aS+AvOVhW2dlxzpRMudhNJ8= sigs.k8s.io/kueue v0.10.6/go.mod h1:3yzOvGI0sPOC3VL1ihVIrzc8mkSyCVTL+SrouewwRWw= sigs.k8s.io/lws v0.5.1 h1:eaeMNkP0manRluQZLN32atoULaGrzP611gSLdFaHZs4= diff --git a/oeps/0006-workload-policy-layer/README.md b/oeps/0006-workload-policy-layer/README.md new file mode 100644 index 00000000..cef5f6d9 --- /dev/null +++ b/oeps/0006-workload-policy-layer/README.md @@ -0,0 +1,757 @@ +# OEP-0006: 工作负载策略层与统一工作负载管理 + + +- [概述](#概述) +- [动机](#动机) + - [目标](#目标) + - [非目标](#非目标) +- [提案](#提案) + - [用户故事](#用户故事) + - [注意事项与约束](#注意事项与约束) + - [风险与缓解措施](#风险与缓解措施) +- [设计细节](#设计细节) + - [架构概览](#架构概览) + - [核心接口设计](#核心接口设计) + - [策略选择机制](#策略选择机制) + - [RBG策略实现](#rbg策略实现) + - [组件配置提取](#组件配置提取) + - [测试计划](#测试计划) + - [成熟度标准](#成熟度标准) + - [Alpha阶段](#alpha阶段) + - [Beta阶段](#beta阶段) + - [GA阶段](#ga阶段) +- [实现历史](#实现历史) +- [缺点](#缺点) +- [备选方案](#备选方案) + + +## 概述 + +本提案引入了工作负载策略层(Workload Policy Layer),为OME提供统一的工作负载管理抽象。该架构通过策略模式将工作负载管理逻辑与角色调谐逻辑解耦,支持单角色独立部署(SingleComponent)和All-in-One工作负载部署(如RoleBasedGroup)两种模式。在保持原有角色工作负载独立调谐能力的前提下,新增了对RBG(RoleBasedGroup)工作负载类型的支持,实现多组件统一编排。 + +## 动机 + +在引入工作负载策略层之前,OME的工作负载部署逻辑紧耦合在各个角色的Reconciler中,每个组件独立创建Deployment、Service、Knative Service等资源。这种架构存在以下问题: + +1. **可运维性不足**:不支持镜像原地升级:现有工作负载在镜像更新时必须重建 Pod,导致服务中断或延迟,拖慢整体发布效率。 +缺少指定预热控制能力。 +2. **多角色协同能力弱**:缺乏跨角色统一编排:各角色独立调谐,无法感知彼此的版本、状态或依赖关系。在扩容或升级时,不能保证角色间的版本一致性或比例协调(比如“先升 A 再升 B”、“A:B = 2:1”等策略),容易引发兼容性问题或服务异常。 + +随着分布式高性能推理对不同推理角色协同需求的增加,我们需要在OME中引入一个All-in-One工作负载(RoleBasedGroup)的支持,以一个全局视角,更精细地调谐所有角色。 + +### 目标 + +- 建立清晰的工作负载策略层抽象,将工作负载管理与角色逻辑解耦 +- 支持多种工作负载策略的动态选择和切换 +- 实现RBG All-in-One工作负载类型的Alpha版本支持 +- 保持对现有单组件独立部署模式的完全向后兼容 +- 提供可扩展的框架,便于未来添加新的工作负载策略(如Grove等) +- 保持原有组件调谐逻辑不变,仅在工作负载编排层面进行抽象 + +### 非目标 + +- 完整实现RBG的所有功能特性(当前为Alpha版本) +- 支持微调模型在RBG模式下的部署 +- 实现RBG模式下对外服务暴露逻辑 +- 修改或重构现有组件的核心调谐逻辑 +- 实现工作负载的跨策略迁移工具 +- 提供工作负载策略的性能优化和资源调度优化 + +## 提案 + +我们提出引入工作负载策略层架构,通过以下关键组件实现统一的工作负载管理: + +1. **统一策略接口(WorkloadStrategy)**:定义所有工作负载策略必须实现的标准接口 +2. **策略管理器(WorkloadStrategyManager)**:负责策略的注册、选择和管理 +3. **组件配置提取器(ComponentConfigExtractor)**:标准化组件配置的提取机制 +4. **单组件策略(SingleComponentStrategy)**:保持现有单组件独立部署行为,作为默认策略 +5. **RBG策略(RBGStrategy)**:实现RoleBasedGroup All-in-One工作负载支持 + +### 用户故事 + +**故事1:作为平台运维人员**,我希望能够将多个推理组件(Engine、Decoder、Router)作为一个整体进行部署和管理,简化运维复杂度,并利用RBG提供的统一调度和资源管理能力。 + +**故事2:服务发布人员**,我希望能够尽快完成服务发布,降低实例由于磁盘准备、IP申请等环境准备动作导致的运维耗时。 + +**故事3:作为开发者**,我希望在不修改现有代码的情况下,通过配置注解即可切换不同的工作负载部署模式,测试不同策略的效果。 + +**故事4:作为系统架构师**,我希望能够方便地扩展新的工作负载类型(如Grove),而无需修改核心组件代码。 + +**故事5:作为现有用户**,我希望升级到新版本后,我的现有InferenceService能够无缝继续工作,不受新架构的影响。 + +### 注意事项与约束 + +1. **Alpha版本限制**:当前RBG策略实现仅支持基础功能,不支持微调模型更新和Ingress服务暴露 +2. **部署模式兼容性**:RBG策略仅支持RawDeployment和MultiNode部署模式,不支持Serverless模式 +3. **组件依赖关系**:所有组件必须实现ComponentConfigExtractor接口才能被RBG策略使用 +4. **注解驱动**:策略选择通过InferenceService的annotations实现,需要正确配置`ome.io/deploymentMode`注解 + +### 风险与缓解措施 + +**风险**:破坏现有用户的部署 +**缓解**:SingleComponentStrategy作为默认策略,确保未配置特殊注解的InferenceService行为不变;增加充分的单元测试和集成测试验证向后兼容性。 + +**风险**:工作负载策略选择错误导致部署失败 +**缓解**:在策略选择时进行严格的部署模式验证;提供清晰的错误信息和事件记录;实现策略适用性检查机制。 + +**风险**:RBG Alpha版本功能不完整影响用户体验 +**缓解**:明确文档说明Alpha版本的限制;提供清晰的功能支持矩阵;在Beta阶段补充完整功能。 + +**风险**:组件配置提取失败导致RBG创建错误 +**缓解**:实现健壮的错误处理和日志记录;提供配置验证机制;在组件接口中明确定义配置提取的契约。 + +## 设计细节 + +### 架构概览 + +工作负载策略层位于InferenceService Controller和具体资源创建之间,作为中间抽象层协调工作负载的部署策略。 + +```mermaid +graph TB + subgraph "InferenceService Controller" + ISVCReconciler[ISVC Reconciler] + StrategyManager[Strategy Manager] + end + + subgraph "Workload Strategy Layer" + SingleStrategy[Single Component Strategy] + RBGStrategy[RBG Strategy] + FutureStrategy[Future Strategies...] + end + + subgraph "Component Layer" + EngineComponent[Engine Component] + DecoderComponent[Decoder Component] + RouterComponent[Router Component] + ConfigExtractor[ComponentConfigExtractor] + end + + subgraph "Resource Reconcilers" + RBGReconciler[RBG Reconciler] + RawDeploymentReconciler[Deployment Reconciler] + KnativeReconciler[Knative Reconciler] + MultiNodeReconciler[MultiNode Reconciler] + HPAReconciler[HPA Reconciler] + PDBReconciler[PDB Reconciler] + RBACReconciler[RBAC Reconciler] + end + + subgraph "Kubernetes Resources" + RBGResource[RoleBasedGroup] + DeploymentResource[Deployment] + KnativeResource[Knative Service] + RayClusterResource[RayCluster] + HPAResource[HPA] + PDBResource[PDB] + end + + ISVCReconciler -->|Select Strategy| StrategyManager + StrategyManager -->|Return Strategy| SingleStrategy + StrategyManager -->|Return Strategy| RBGStrategy + + SingleStrategy -->|Reconcile Independently| EngineComponent + SingleStrategy -->|Reconcile Independently| DecoderComponent + SingleStrategy -->|Reconcile Independently| RouterComponent + + RBGStrategy -->|Extract Config| ConfigExtractor + ConfigExtractor -.->|Implemented by| EngineComponent + ConfigExtractor -.->|Implemented by| DecoderComponent + ConfigExtractor -.->|Implemented by| RouterComponent + + RBGStrategy -->|Create/Update| RBGReconciler + RBGStrategy -->|Create HPA| HPAReconciler + RBGStrategy -->|Create RBAC| RBACReconciler + + EngineComponent -->|Single Mode| RawDeploymentReconciler + EngineComponent -->|Single Mode| KnativeReconciler + DecoderComponent -->|Single Mode| MultiNodeReconciler + + RBGReconciler -->|Manage| RBGResource + RawDeploymentReconciler -->|Manage| DeploymentResource + KnativeReconciler -->|Manage| KnativeResource + MultiNodeReconciler -->|Manage| RayClusterResource + HPAReconciler -->|Manage| HPAResource + PDBReconciler -->|Manage| PDBResource +``` + +**核心设计原则**: + +1. **策略与组件分离**:工作负载策略专注于如何组织和编排组件,组件专注于自身的配置和资源定义 +2. **接口驱动**:通过明确的接口定义策略和组件之间的契约 +3. **可插拔架构**:新策略可通过实现标准接口并注册到管理器中轻松添加 +4. **向后兼容**:SingleComponentStrategy完全保留原有行为,作为默认策略 + +### 核心接口设计 + +#### WorkloadStrategy 接口 + +定义所有工作负载策略必须实现的统一接口: + +```go +type WorkloadStrategy interface { + // GetStrategyName 返回策略名称 + GetStrategyName() string + + // IsApplicable 判断该策略是否适用于当前InferenceService + // 通过检查annotations和其他条件来决定 + IsApplicable(isvc *v1beta1.InferenceService, annotations map[string]string) bool + + // ValidateDeploymentModes 验证部署模式是否被该策略支持 + // 不同策略支持的部署模式可能不同 + ValidateDeploymentModes(modes *ComponentDeploymentModes) error + + // ReconcileWorkload 执行工作负载调谐 + // 这是策略的核心方法,负责创建和更新工作负载资源 + ReconcileWorkload(ctx context.Context, request *WorkloadReconcileRequest) (ctrl.Result, error) +} +``` + +| 方法 | 说明 | 职责 | +|------|------|------| +| GetStrategyName | 返回策略名称 | 用于日志记录和识别 | +| IsApplicable | 判断策略是否适用 | 根据InferenceService的annotations和状态决定是否选择此策略 | +| ValidateDeploymentModes | 验证部署模式兼容性 | 检查组件的部署模式是否被该策略支持 | +| ReconcileWorkload | 执行工作负载调谐 | 核心方法,负责创建和更新工作负载资源 | + +#### ComponentConfigExtractor 接口 + +定义组件配置提取的标准接口,用于工作负载策略从角色组件获取必要配置: + +```go +type ComponentConfigExtractor interface { + // GetPodSpec 返回主PodSpec + GetPodSpec(isvc *v1beta1.InferenceService) (*corev1.PodSpec, error) + + // GetWorkerPodSpec 返回Worker PodSpec(MultiNode部署) + // 如果组件不支持worker pods则返回nil + GetWorkerPodSpec(isvc *v1beta1.InferenceService) (*corev1.PodSpec, error) + + // GetObjectMeta 返回组件的ObjectMeta(名称、标签、注解) + GetObjectMeta(isvc *v1beta1.InferenceService) (metav1.ObjectMeta, error) + + // GetComponentExtension 返回组件扩展配置 + GetComponentExtension() *v1beta1.ComponentExtensionSpec + + // GetWorkerSize 返回worker数量(MultiNode部署) + // 如果组件不支持workers则返回0 + GetWorkerSize() int +} +``` + +| 方法 | 说明 | 返回值 | +|------|------|--------| +| GetPodSpec | 获取主PodSpec | 用于RawDeployment模式或Leader节点 | +| GetWorkerPodSpec | 获取Worker PodSpec | 用于MultiNode模式的Worker节点 | +| GetObjectMeta | 获取元数据 | 包含名称、标签、注解等 | +| GetComponentExtension | 获取组件扩展配置 | 包含副本数、HPA、PDB等配置 | +| GetWorkerSize | 获取Worker数量 | 用于MultiNode模式 | + +#### WorkloadReconcileRequest 结构 + +封装工作负载调谐所需的完整上下文信息: + +```go +type WorkloadReconcileRequest struct { + // InferenceService实例 + InferenceService *v1beta1.InferenceService + + // 基础模型信息 + BaseModel *v1beta1.BaseModelSpec + BaseModelMeta *metav1.ObjectMeta + + // 运行时信息 + Runtime *v1beta1.ServingRuntimeSpec + RuntimeName string + + // 合并后的角色规格 + MergedEngine *v1beta1.EngineSpec + MergedDecoder *v1beta1.DecoderSpec + MergedRouter *v1beta1.RouterSpec + + // 部署模式配置 + DeploymentModes *ComponentDeploymentModes + + // 组件构建工厂 + ComponentBuilderFactory *components.ComponentBuilderFactory + + // 其他上下文信息... +} +``` + +| 字段 | 类型 | 说明 | +|------|------|------| +| InferenceService | *v1beta1.InferenceService | InferenceService实例 | +| BaseModel | *v1beta1.BaseModelSpec | 基础模型规格 | +| Runtime | *v1beta1.ServingRuntimeSpec | 运行时规格 | +| MergedEngine | *v1beta1.EngineSpec | 合并后的Engine配置 | +| MergedDecoder | *v1beta1.DecoderSpec | 合并后的Decoder配置 | +| MergedRouter | *v1beta1.RouterSpec | 合并后的Router配置 | +| DeploymentModes | *ComponentDeploymentModes | 各组件的部署模式 | +| ComponentBuilderFactory | *components.ComponentBuilderFactory | 组件构建工厂 | + +### 策略选择机制 + +策略选择流程在InferenceService Controller的Reconcile方法中执行: + +```mermaid +flowchart TD + Start[开始调谐] --> GetAnnotations[获取InferenceService Annotations] + GetAnnotations --> SelectStrategy[调用StrategyManager.SelectStrategy] + + SelectStrategy --> CheckRegistered{是否有已注册策略?} + CheckRegistered -->|否| Error1[返回错误: 无可用策略] + CheckRegistered -->|是| IterateStrategies[遍历所有策略] + + IterateStrategies --> IsApplicable{策略.IsApplicable?} + IsApplicable -->|是| ValidateMode[策略.ValidateDeploymentModes] + IsApplicable -->|否| NextStrategy[检查下一个策略] + + ValidateMode --> ValidResult{验证通过?} + ValidResult -->|是| StrategySelected[策略选定] + ValidResult -->|否| Error2[返回错误: 部署模式不兼容] + + NextStrategy --> MoreStrategies{还有其他策略?} + MoreStrategies -->|是| IsApplicable + MoreStrategies -->|否| Error3[返回错误: 无适用策略] + + StrategySelected --> ReconcileWorkload[调用策略.ReconcileWorkload] + ReconcileWorkload --> End[完成] + + Error1 --> End + Error2 --> End + Error3 --> End +``` + +**策略注册顺序**: + +策略的注册顺序决定了选择优先级。当前注册顺序为: + +1. RBGStrategy(优先级高) +2. SingleComponentStrategy(默认策略,优先级低) + +**适用性判断规则**: + +| 策略 | 适用条件 | 注解要求 | +|------|----------|----------| +| RBGStrategy | `ome.io/deploymentMode: RoleBasedGroup` | 必须显式指定 | +| SingleComponentStrategy | 未指定特殊部署模式 | 无需注解,作为默认策略 | + +### RBG策略实现 + +RBG策略将多个组件打包到一个RoleBasedGroup资源中进行统一管理。 + +#### RBG资源结构映射 + +InferenceService组件到RBG Role的映射关系: + +| InferenceService组件 | RBG Role名称 | 部署模式 | 备注 | +|---------------------|-------------|---------|------| +| Engine | engine | RawDeployment 或 MultiNode | 必选组件 | +| Decoder | decoder | RawDeployment 或 MultiNode | 可选组件,PD分离时使用 | +| Router | router | RawDeployment | 可选组件,路由层 | + +#### RBG调谐流程 + +```mermaid +sequenceDiagram + participant ISVCController as ISVC Controller + participant RBGStrategy as RBG Strategy + participant ComponentBuilder as Component Builder + participant ConfigExtractor as Config Extractor + participant RBGReconciler as RBG Reconciler + participant HPAReconciler as HPA Reconciler + participant K8sAPI as Kubernetes API + + ISVCController->>RBGStrategy: ReconcileWorkload(request) + + Note over RBGStrategy: 阶段1: 构建组件配置 + + RBGStrategy->>ComponentBuilder: CreateEngineComponent() + ComponentBuilder-->>RBGStrategy: engineComponent + RBGStrategy->>ConfigExtractor: GetPodSpec(isvc) + ConfigExtractor-->>RBGStrategy: enginePodSpec + RBGStrategy->>ConfigExtractor: GetObjectMeta(isvc) + ConfigExtractor-->>RBGStrategy: engineMetadata + RBGStrategy->>ConfigExtractor: GetComponentExtension() + ConfigExtractor-->>RBGStrategy: engineExtension + + Note over RBGStrategy: 对Decoder和Router重复上述过程 + + RBGStrategy->>ComponentBuilder: CreateDecoderComponent() + RBGStrategy->>ConfigExtractor: Extract decoder configs + RBGStrategy->>ComponentBuilder: CreateRouterComponent() + RBGStrategy->>ConfigExtractor: Extract router configs + + Note over RBGStrategy: 阶段2: 创建RBAC资源 + + RBGStrategy->>RBGStrategy: reconcileRBAC(componentConfigs) + RBGStrategy->>K8sAPI: Create/Update ServiceAccount, Role, RoleBinding + + Note over RBGStrategy: 阶段3: 调谐RBG资源 + + RBGStrategy->>RBGReconciler: Reconcile(isvc, componentConfigs) + RBGReconciler->>RBGReconciler: buildRoles(componentConfigs) + RBGReconciler->>K8sAPI: Get existing RBG + + alt RBG不存在 + RBGReconciler->>RBGReconciler: createRBG(isvc, roles) + RBGReconciler->>K8sAPI: Create RBG + else RBG已存在 + RBGReconciler->>RBGReconciler: updateRBG(existing, roles) + Note over RBGReconciler: 保留现有副本数配置 + RBGReconciler->>K8sAPI: Update RBG + end + + RBGReconciler-->>RBGStrategy: Result + + Note over RBGStrategy: 阶段4: 创建HPA资源 + + loop 为每个Role创建HPA + RBGStrategy->>HPAReconciler: Reconcile(hpa) + HPAReconciler->>K8sAPI: Create/Update HPA + end + + RBGStrategy-->>ISVCController: Result +``` + +#### ComponentConfig数据结构 + +用于在RBG策略和RBG Reconciler之间传递组件配置: + +```go +type ComponentConfig struct { + // ComponentType指定组件类型(Engine/Decoder/Router) + ComponentType v1beta1.ComponentType + + // DeploymentMode指定该组件的部署模式(RawDeployment或MultiNode) + DeploymentMode constants.DeploymentModeType + + // PodSpec用于RawDeployment模式或作为MultiNode模式的基础模板 + PodSpec *corev1.PodSpec + + // LeaderPodSpec用于MultiNode模式的leader节点 + LeaderPodSpec *corev1.PodSpec + + // WorkerPodSpec用于MultiNode模式的worker节点 + WorkerPodSpec *corev1.PodSpec + + // WorkerSize指定worker节点数量(MultiNode模式) + WorkerSize int + + // ComponentExtensionSpec包含副本数、扩缩容等配置 + ComponentExtensionSpec *v1beta1.ComponentExtensionSpec + + // ObjectMeta包含名称、标签、注解等元数据 + ObjectMeta metav1.ObjectMeta +} +``` + +| 字段 | 类型 | 说明 | +|------|------|------| +| ComponentType | v1beta1.ComponentType | 组件类型(Engine/Decoder/Router) | +| DeploymentMode | constants.DeploymentModeType | 部署模式(RawDeployment/MultiNode) | +| PodSpec | *corev1.PodSpec | 主PodSpec,用于RawDeployment或Leader | +| LeaderPodSpec | *corev1.PodSpec | Leader节点PodSpec,用于MultiNode | +| WorkerPodSpec | *corev1.PodSpec | Worker节点PodSpec,用于MultiNode | +| WorkerSize | int | Worker节点数量 | +| ComponentExtensionSpec | *v1beta1.ComponentExtensionSpec | 副本数、HPA、PDB等扩展配置 | +| ObjectMeta | metav1.ObjectMeta | 名称、标签、注解等元数据 | + +#### Role构建逻辑 + +RBG Reconciler根据ComponentConfig构建RoleSpec: + +**RawDeployment模式**: +- 创建单个Template +- 将PodSpec转换为PodTemplateSpec +- 配置副本数和标签 + +**MultiNode模式**: +- 创建LeaderWorkerSet配置 +- 设置Size = 1个Leader + N个Worker +- 分别构建Leader和Worker的PodTemplateSpec +- 配置Ray相关标签(ray.io/node-type) + +**副本数管理**: +- 初始创建时使用ComponentExtensionSpec.MinReplicas +- 更新时保留现有副本数(可能被HPA修改) +- HPA独立管理每个Role的副本数 + +### 组件配置提取 + +所有需要支持RBG部署的角色必须实现ComponentConfigExtractor接口。 + +#### 提取流程 + +```mermaid +flowchart LR + A[RBG Strategy] --> B{组件类型} + B -->|Engine| C[Engine Component] + B -->|Decoder| D[Decoder Component] + B -->|Router| E[Router Component] + + C --> F[GetPodSpec] + C --> G[GetWorkerPodSpec] + C --> H[GetObjectMeta] + C --> I[GetComponentExtension] + C --> J[GetWorkerSize] + + D --> F + D --> G + D --> H + D --> I + D --> J + + E --> F + E --> G + E --> H + E --> I + E --> J + + F --> K[ComponentConfig] + G --> K + H --> K + I --> K + J --> K + + K --> L[RBG Reconciler] +``` + +#### 配置提取示例 + +以Engine组件为例,配置提取包括: + +**PodSpec提取**: +- 容器定义(镜像、命令、参数) +- 资源请求和限制 +- 环境变量 +- 卷挂载 +- 节点选择器和容忍度 +- Affinity规则 + +**ObjectMeta提取**: +- 名称:基于InferenceService名称生成 +- 标签:包含组件类型、模型名称、运行时等 +- 注解:传递必要的配置信息 + +**ComponentExtension提取**: +- MinReplicas:最小副本数 +- MaxReplicas:最大副本数(用于HPA) +- HPA配置:指标类型、目标值等 +- PDB配置:最小可用数、最大不可用数 +- 资源策略 + +### 测试计划 + +#### 单元测试 + +**工作负载策略层**: +- `pkg/controller/v1beta1/inferenceservice/workload/manager_test.go`:策略管理器测试 + - 测试策略注册 + - 测试策略选择逻辑 + - 测试错误处理 +- `pkg/controller/v1beta1/inferenceservice/workload/single_component_strategy_test.go`:单组件策略测试 + - 测试适用性判断 + - 测试部署模式验证 + - 测试向后兼容性 +- `pkg/controller/v1beta1/inferenceservice/workload/rbg_strategy_test.go`:RBG策略测试 + - 测试适用性判断 + - 测试部署模式验证 + - 测试组件配置提取 + - 测试RBG资源创建和更新 + +**RBG Reconciler**: +- `pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/rbg_reconciler_test.go`:RBG调谐逻辑测试 + - 测试RBG创建 + - 测试RBG更新(保留副本数) + - 测试错误处理 +- `pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/role_builder_test.go`:Role构建逻辑测试 + - 测试RawDeployment模式Role构建 + - 测试MultiNode模式Role构建 + - 测试标签和注解传递 + +**组件配置提取**: +- 验证每个组件正确实现ComponentConfigExtractor接口 +- 测试配置提取的完整性和正确性 +- 测试错误场景处理 + +**测试覆盖率目标**:>80% + +#### 集成测试 + +**策略选择测试**: +- 验证默认情况下选择SingleComponentStrategy +- 验证配置RBG注解后选择RBGStrategy +- 验证部署模式不兼容时正确报错 + +**RBG部署测试**: +- 测试仅Engine组件的RBG部署 +- 测试Engine + Decoder的PD分离RBG部署 +- 测试Engine + Router的RBG部署 +- 测试Engine + Decoder + Router的完整RBG部署 + +**副本数管理测试**: +- 验证初始创建时使用正确的副本数 +- 验证更新时保留HPA修改的副本数 +- 验证HPA正确管理各Role的副本数 + +**向后兼容性测试**: +- 验证现有InferenceService在升级后继续正常工作 +- 验证所有原有部署模式功能完整保留 + +#### 端到端测试 + +**真实场景测试**: +- 部署实际的推理服务(如LLaMA模型) +- 测试RBG模式下的流量路由 +- 验证扩缩容行为 +- 测试故障恢复能力 + +**性能测试**: +- 对比SingleComponent和RBG模式的性能差异 +- 验证RBG模式不引入显著性能开销 + +### 成熟度标准 + +#### Alpha阶段 + +**功能范围**: +- ✅ 工作负载策略层核心接口定义 +- ✅ SingleComponentStrategy实现(保持现有行为) +- ✅ RBGStrategy基础实现 +- ✅ 支持Engine、Decoder、Router组件的RBG部署 +- ✅ 支持RawDeployment和MultiNode部署模式 +- ✅ ComponentConfigExtractor接口实现 +- ✅ RBG Reconciler实现 +- ✅ HPA资源管理(每个Role独立HPA) +- ✅ RBAC资源管理 + +**测试要求**: +- 单元测试覆盖率 >70% +- 基础集成测试通过 + +**文档要求**: +- 架构设计文档 +- 接口定义文档 +- Alpha版本限制说明 + +**已知限制**: +- ❌ 不支持微调模型 +- ❌ 不支持模型更新 +- ❌ 不支持Ingress服务暴露 + +#### Beta阶段 + +**功能增强**: +- 支持微调模型在RBG模式下的部署 +- 实现Ingress服务暴露 +- 优化策略选择逻辑 + +**测试要求**: +- 单元测试覆盖率 >80% +- 完整的集成测试套件 +- 端到端测试覆盖主要场景 + +**文档要求**: +- 用户使用指南 +- 策略扩展开发指南 + +**稳定性要求**: +- 在生产环境中试运行 +- 收集用户反馈并修复问题 + +#### GA阶段 + +**功能完善**: +- 所有计划功能完全实现 +- 完善的错误处理和恢复机制 +- 完整的可观测性支持(指标、日志、事件) + +**测试要求**: +- 单元测试覆盖率 >85% +- 完整的测试套件覆盖所有场景 + +**文档要求**: +- 完整的产品文档 +- API参考文档 +- 迁移指南 + +**生产就绪**: +- 在多个生产环境验证 +- 性能满足生产要求 +- 稳定性满足SLA要求 + +## 实现历史 + +- 2025-01-13:提案初稿创建 +- 2025-01-13:工作负载策略层核心接口设计完成 +- 2025-01-13:SingleComponentStrategy实现完成(保持现有行为) +- 2025-01-13:RBGStrategy Alpha版本实现完成 +- 2025-01-13:ComponentConfigExtractor接口定义和实现 +- 2025-01-13:RBG Reconciler实现完成 +- 2025-01-13:Alpha版本集成到主分支 + +## 缺点 + +1. **架构复杂度增加**:引入新的抽象层增加了系统整体复杂度,开发者需要理解策略层的概念 +2. **学习曲线**:组件开发者需要学习和实现ComponentConfigExtractor接口 +3. **调试难度**:工作负载问题的调试需要在策略层和组件层之间定位,增加了排查难度 +4. **Alpha版本功能限制**:当前RBG实现不支持微调模型和Ingress,限制了使用场景 +5. **测试维护成本**:需要同时维护策略层和组件层的测试,增加了测试工作量 +6. **潜在的性能开销**:额外的抽象层可能引入轻微的性能开销 + +## 备选方案 + +### 方案1:继续单组件独立部署模式 + +**描述**:不引入工作负载策略层,继续在每个组件中独立管理部署逻辑。 + +**优点**: +- 架构简单,易于理解 +- 无需额外的抽象层 +- 现有代码无需修改 + +**缺点**: +- 无法支持All-in-One工作负载类型(如RBG) +- 扩展新的工作负载策略需要修改核心代码 +- 资源管理缺乏全局视图 + +**拒绝原因**:无法满足对RBG等统一工作负载管理的需求,限制了系统的扩展性。 + +### 方案2:直接在Controller中硬编码RBG逻辑 + +**描述**:在InferenceService Controller中通过if-else分支直接判断是否使用RBG部署。 + +**优点**: +- 实现简单直接 +- 无需设计复杂的接口和抽象 +- 快速交付功能 + +**缺点**: +- 违反开闭原则,每增加新的工作负载类型都需要修改Controller代码 +- 代码可维护性差,容易产生技术债务 +- 难以测试和验证 +- 扩展性极差 + +**拒绝原因**:虽然实现快速,但会导致严重的技术债务,不利于长期维护和扩展。 + +### 方案3:完全重构组件架构 + +**描述**:彻底重构组件的设计,将所有工作负载逻辑完全抽离到策略层,组件仅提供配置。 + +**优点**: +- 架构最清晰 +- 组件和工作负载完全解耦 +- 最大化的灵活性 + +**缺点**: +- 重构成本极高,需要大量时间 +- 风险大,可能引入回归问题 +- 对现有用户影响大,可能破坏兼容性 +- 需要长期的迁移过程 + +**拒绝原因**:重构成本和风险过高,且对现有系统影响过大,不适合当前阶段。 + +--- + +**最终选择**:本提案采用工作负载策略层架构,在保持现有系统稳定性的前提下,通过清晰的抽象和接口设计,实现了对RBG等新型工作负载的支持,并为未来的扩展提供了良好的基础。 diff --git a/oeps/0006-workload-policy-layer/oep.yaml b/oeps/0006-workload-policy-layer/oep.yaml new file mode 100644 index 00000000..973d0529 --- /dev/null +++ b/oeps/0006-workload-policy-layer/oep.yaml @@ -0,0 +1,24 @@ +title: Workload Policy Layer and Unified Workload Management +oep-number: 0006 +authors: + - "@bcfre" +status: implementable +creation-date: 2025-01-13 +reviewers: + - TBD +approvers: + - TBD + +see-also: +replaces: +superseded-by: + +# The target maturity stage. Valid values are "alpha", "beta", "stable" +stage: alpha + +# Metrics to monitor feature adoption and performance +metrics: + - workload-strategy-selection-count + - rbg-deployment-count + - strategy-reconcile-duration + - component-config-extraction-errors diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 7a937666..213de84f 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -420,12 +420,13 @@ const ( PDDisaggregated DeploymentModeType = "PDDisaggregated" MultiNode DeploymentModeType = "MultiNode" VirtualDeployment DeploymentModeType = "VirtualDeployment" + RoleBasedGroup DeploymentModeType = "RoleBasedGroup" ) // IsValid checks if the deployment mode is valid func (d DeploymentModeType) IsValid() bool { switch d { - case Serverless, RawDeployment, MultiNodeRayVLLM, MultiNode, VirtualDeployment: + case Serverless, RawDeployment, MultiNodeRayVLLM, MultiNode, VirtualDeployment, RoleBasedGroup: return true default: return false @@ -459,6 +460,7 @@ const ( KEDAScaledObjectKind = "ScaledObject" VolcanoJobKind = "Job" LWSKind = "LeaderWorkerSet" + RBGKind = "RoleBasedGroup" GatewayKind = "Gateway" ServiceKind = "Service" ) diff --git a/pkg/controller/v1beta1/inferenceservice/components/component.go b/pkg/controller/v1beta1/inferenceservice/components/component.go index 25bbee40..1de37e69 100644 --- a/pkg/controller/v1beta1/inferenceservice/components/component.go +++ b/pkg/controller/v1beta1/inferenceservice/components/component.go @@ -1,6 +1,7 @@ package components import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -28,6 +29,27 @@ type ComponentConfig interface { ValidateSpec() error } +// ComponentConfigExtractor defines methods for extracting configuration from components +// This is used by all in one workload strategies (e.g., RBGStrategy) to obtain component configurations +type ComponentConfigExtractor interface { + // GetPodSpec returns the main PodSpec for this component + GetPodSpec(isvc *v1beta1.InferenceService) (*corev1.PodSpec, error) + + // GetWorkerPodSpec returns the Worker PodSpec for MultiNode deployments + // Returns nil for components that don't support worker pods + GetWorkerPodSpec(isvc *v1beta1.InferenceService) (*corev1.PodSpec, error) + + // GetObjectMeta returns the ObjectMeta (name, labels, annotations) for this component + GetObjectMeta(isvc *v1beta1.InferenceService) (metav1.ObjectMeta, error) + + // GetComponentExtension returns the component extension configuration + GetComponentExtension() *v1beta1.ComponentExtensionSpec + + // GetWorkerSize returns the worker size for MultiNode deployments + // Returns 0 for components that don't support workers + GetWorkerSize() int +} + // PodSpecProvider defines the interface for providing pod specifications type PodSpecProvider interface { // GetPodSpec returns the pod spec for the component diff --git a/pkg/controller/v1beta1/inferenceservice/components/decoder.go b/pkg/controller/v1beta1/inferenceservice/components/decoder.go index dfe2e83f..5615d4d7 100644 --- a/pkg/controller/v1beta1/inferenceservice/components/decoder.go +++ b/pkg/controller/v1beta1/inferenceservice/components/decoder.go @@ -399,3 +399,44 @@ func (d *Decoder) ValidateSpec() error { // Add more validation logic as needed return nil } + +// ============ ComponentConfigExtractor Interface Implementation ============ + +// GetPodSpec implements ComponentConfigExtractor interface +// Returns the main PodSpec for the Decoder component +func (d *Decoder) GetPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + objectMeta, err := d.reconcileObjectMeta(isvc) + if err != nil { + return nil, err + } + return d.reconcilePodSpec(isvc, &objectMeta) +} + +// GetWorkerPodSpec implements ComponentConfigExtractor interface +// Returns the Worker PodSpec for MultiNode deployments +func (d *Decoder) GetWorkerPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + objectMeta, err := d.reconcileObjectMeta(isvc) + if err != nil { + return nil, err + } + return d.reconcileWorkerPodSpec(isvc, &objectMeta) +} + +// GetObjectMeta implements ComponentConfigExtractor interface +// Returns the ObjectMeta (name, labels, annotations) for the Decoder component +func (d *Decoder) GetObjectMeta(isvc *v1beta1.InferenceService) (metav1.ObjectMeta, error) { + return d.reconcileObjectMeta(isvc) +} + +// GetComponentExtension implements ComponentConfigExtractor interface +// Returns the component extension configuration +func (d *Decoder) GetComponentExtension() *v1beta1.ComponentExtensionSpec { + if d.decoderSpec == nil { + return nil + } + return &d.decoderSpec.ComponentExtensionSpec +} + +func (d *Decoder) GetWorkerSize() int { + return d.getWorkerSize() +} diff --git a/pkg/controller/v1beta1/inferenceservice/components/engine.go b/pkg/controller/v1beta1/inferenceservice/components/engine.go index d86fc511..6019021b 100644 --- a/pkg/controller/v1beta1/inferenceservice/components/engine.go +++ b/pkg/controller/v1beta1/inferenceservice/components/engine.go @@ -394,3 +394,42 @@ func (e *Engine) ValidateSpec() error { // Add more validation logic as needed return nil } + +// GetPodSpec implements ComponentConfigExtractor interface +// Returns the main PodSpec for the Engine component +func (e *Engine) GetPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + objectMeta, err := e.reconcileObjectMeta(isvc) + if err != nil { + return nil, err + } + return e.reconcilePodSpec(isvc, &objectMeta) +} + +// GetWorkerPodSpec implements ComponentConfigExtractor interface +// Returns the Worker PodSpec for MultiNode deployments +func (e *Engine) GetWorkerPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + objectMeta, err := e.reconcileObjectMeta(isvc) + if err != nil { + return nil, err + } + return e.reconcileWorkerPodSpec(isvc, &objectMeta) +} + +// GetObjectMeta implements ComponentConfigExtractor interface +// Returns the ObjectMeta (name, labels, annotations) for the Engine component +func (e *Engine) GetObjectMeta(isvc *v1beta1.InferenceService) (metav1.ObjectMeta, error) { + return e.reconcileObjectMeta(isvc) +} + +// GetComponentExtension implements ComponentConfigExtractor interface +// Returns the component extension configuration +func (e *Engine) GetComponentExtension() *v1beta1.ComponentExtensionSpec { + if e.engineSpec == nil { + return nil + } + return &e.engineSpec.ComponentExtensionSpec +} + +func (e *Engine) GetWorkerSize() int { + return e.getWorkerSize() +} diff --git a/pkg/controller/v1beta1/inferenceservice/components/router.go b/pkg/controller/v1beta1/inferenceservice/components/router.go index ae58953a..cf7e9b72 100644 --- a/pkg/controller/v1beta1/inferenceservice/components/router.go +++ b/pkg/controller/v1beta1/inferenceservice/components/router.go @@ -288,3 +288,42 @@ func (r *Router) ValidateSpec() error { // Add more validation logic as needed return nil } + +// ============ ComponentConfigExtractor Interface Implementation ============ + +// GetPodSpec implements ComponentConfigExtractor interface +// Returns the main PodSpec for the Router component +func (r *Router) GetPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + objectMeta, err := r.reconcileObjectMeta(isvc) + if err != nil { + return nil, err + } + return r.reconcilePodSpec(isvc, &objectMeta) +} + +// GetWorkerPodSpec implements ComponentConfigExtractor interface +// Router does not support worker pods, always returns nil +func (r *Router) GetWorkerPodSpec(isvc *v1beta1.InferenceService) (*v1.PodSpec, error) { + return nil, nil +} + +// GetObjectMeta implements ComponentConfigExtractor interface +// Returns the ObjectMeta (name, labels, annotations) for the Router component +func (r *Router) GetObjectMeta(isvc *v1beta1.InferenceService) (metav1.ObjectMeta, error) { + return r.reconcileObjectMeta(isvc) +} + +// GetComponentExtension implements ComponentConfigExtractor interface +// Returns the component extension configuration +func (r *Router) GetComponentExtension() *v1beta1.ComponentExtensionSpec { + if r.routerSpec == nil { + return nil + } + return &r.routerSpec.ComponentExtensionSpec +} + +// GetWorkerSize implements ComponentConfigExtractor interface +// Router does not support workers, always returns 0 +func (r *Router) GetWorkerSize() int { + return 0 +} diff --git a/pkg/controller/v1beta1/inferenceservice/controller.go b/pkg/controller/v1beta1/inferenceservice/controller.go index 87216112..361493a6 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller.go +++ b/pkg/controller/v1beta1/inferenceservice/controller.go @@ -8,6 +8,7 @@ import ( policyv1 "k8s.io/api/policy/v1" + rbg "github.com/bcfre/rbg-api/api/workloads/v1alpha1" "github.com/go-logr/logr" kedav1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/pkg/errors" @@ -44,6 +45,7 @@ import ( multimodelconfig "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/modelconfig" "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/status" isvcutils "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/utils" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/workload" "github.com/sgl-project/ome/pkg/runtimeselector" "github.com/sgl-project/ome/pkg/utils" ) @@ -90,6 +92,9 @@ import ( // +kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=workloads.x-k8s.io,resources=rolebasedgroups,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=workloads.x-k8s.io,resources=rolebasedgroups/status,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=workloads.x-k8s.io,resources=rolebasedgroups/finalizers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete // InferenceServiceState describes the Readiness of the InferenceService @@ -112,6 +117,7 @@ type InferenceServiceReconciler struct { StatusManager *status.StatusReconciler RuntimeSelector runtimeselector.Selector AcceleratorClassSelector acceleratorclassselector.Selector + StrategyManager *workload.WorkloadStrategyManager } func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -202,9 +208,6 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req // for NewInferenceServicesConfig. We will use that existing isvcConfig. componentBuilderFactory := components.NewComponentBuilderFactory(r.Client, r.Clientset, r.Scheme, isvcConfig) - // Determine which components to reconcile based on the spec - var reconcilers []components.Component - // Migrate predictor spec to new architecture if needed if err := r.migratePredictorToNewArchitecture(isvc); err != nil { r.Log.Error(err, "Failed to migrate predictor spec", "namespace", isvc.Namespace, "inferenceService", isvc.Name) @@ -281,79 +284,78 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req r.Log.Info("PD-disaggregated deployment detected", "namespace", isvc.Namespace, "inferenceService", isvc.Name) } - // Step 5: Create reconcilers based on merged specs + // Step 5: Select workload strategy + strategy, err := r.StrategyManager.SelectStrategy(isvc, annotations) + if err != nil { + r.Log.Error(err, "Failed to select workload strategy") + r.Recorder.Eventf(isvc, v1.EventTypeWarning, "StrategySelectionFailed", "Failed to select workload strategy: %v", err) + return reconcile.Result{}, err + } + + r.Log.Info("Selected workload strategy", + "strategy", strategy.GetStrategyName(), + "namespace", isvc.Namespace, + "inferenceService", isvc.Name) + + // Step 6: Validate deployment modes + deploymentModes := &workload.ComponentDeploymentModes{ + Engine: engineDeploymentMode, + Decoder: decoderDeploymentMode, + Router: routerDeploymentMode, + } + + if err := strategy.ValidateDeploymentModes(deploymentModes); err != nil { + r.Log.Error(err, "Deployment mode validation failed") + r.Recorder.Eventf(isvc, v1.EventTypeWarning, "InvalidDeploymentMode", err.Error()) + return reconcile.Result{}, err + } + + // Step 7: Get AcceleratorClass and SupportedModelFormat for each component + var engineAC *v1beta1.AcceleratorClassSpec + var engineAcName string + var engineSupportedModelFormats *v1beta1.SupportedModelFormat if mergedEngine != nil { - engineAC, engineAcName, err := r.AcceleratorClassSelector.GetAcceleratorClass(ctx, isvc, rt, v1beta1.EngineComponent) + engineAC, engineAcName, err = r.AcceleratorClassSelector.GetAcceleratorClass(ctx, isvc, rt, v1beta1.EngineComponent) if err != nil { r.Log.Error(err, "Failed to get accelerator class for engine component", "Name", isvc.Name) r.Recorder.Eventf(isvc, v1.EventTypeWarning, "AcceleratorClassError", "Failed to get accelerator class for engine: %v", err) return reconcile.Result{}, err } - engineSupportedModelFormats := r.RuntimeSelector.GetSupportedModelFormat(ctx, rt, baseModel, userSpecifiedRuntime) - r.Log.Info("Creating engine reconciler", - "deploymentMode", engineDeploymentMode, - "namespace", isvc.Namespace, - "inferenceService", isvc.Name, - "acceleratorClass", engineAcName) - - engineReconciler := componentBuilderFactory.CreateEngineComponent( - engineDeploymentMode, - baseModel, - baseModelMeta, - mergedEngine, - rt, - rtName, - engineSupportedModelFormats, - engineAC, - engineAcName, - ) - reconcilers = append(reconcilers, engineReconciler) + engineSupportedModelFormats = r.RuntimeSelector.GetSupportedModelFormat(ctx, rt, baseModel, userSpecifiedRuntime) } + var decoderAC *v1beta1.AcceleratorClassSpec + var decoderAcName string + var decoderSupportedModelFormats *v1beta1.SupportedModelFormat if mergedDecoder != nil { - decoderAC, decoderAcName, err := r.AcceleratorClassSelector.GetAcceleratorClass(ctx, isvc, rt, v1beta1.DecoderComponent) + decoderAC, decoderAcName, err = r.AcceleratorClassSelector.GetAcceleratorClass(ctx, isvc, rt, v1beta1.DecoderComponent) if err != nil { r.Log.Error(err, "Failed to get accelerator class for decoder component", "Name", isvc.Name) r.Recorder.Eventf(isvc, v1.EventTypeWarning, "AcceleratorClassError", "Failed to get accelerator class for decoder: %v", err) return reconcile.Result{}, err } - decoderSupportedModelFormats := r.RuntimeSelector.GetSupportedModelFormat(ctx, rt, baseModel, userSpecifiedRuntime) - r.Log.Info("Creating decoder reconciler", - "deploymentMode", decoderDeploymentMode, - "namespace", isvc.Namespace, - "inferenceService", isvc.Name, - "acceleratorClass", decoderAcName) - - decoderReconciler := componentBuilderFactory.CreateDecoderComponent( - decoderDeploymentMode, - baseModel, - baseModelMeta, - mergedDecoder, - rt, - rtName, - decoderSupportedModelFormats, - decoderAC, - decoderAcName, - ) - reconcilers = append(reconcilers, decoderReconciler) - } - - // Add Router reconciler if merged router spec exists (using new v2 Router) - if mergedRouter != nil { - r.Log.Info("Creating router reconciler", - "deploymentMode", routerDeploymentMode, // Using the determined router deployment mode - "namespace", isvc.Namespace, - "inferenceService", isvc.Name) - - routerReconciler := componentBuilderFactory.CreateRouterComponent( - routerDeploymentMode, // Using the determined router deployment mode - baseModel, - baseModelMeta, - mergedRouter, // Using the merged router spec instead of isvc.Spec.Router - rt, - rtName, - ) - reconcilers = append(reconcilers, routerReconciler) + decoderSupportedModelFormats = r.RuntimeSelector.GetSupportedModelFormat(ctx, rt, baseModel, userSpecifiedRuntime) + } + + // Step 7: Build WorkloadReconcileRequest + request := &workload.WorkloadReconcileRequest{ + InferenceService: isvc, + BaseModel: baseModel, + BaseModelMeta: baseModelMeta, + Runtime: rt, + RuntimeName: rtName, + MergedEngine: mergedEngine, + MergedDecoder: mergedDecoder, + MergedRouter: mergedRouter, + DeploymentModes: deploymentModes, + ComponentBuilderFactory: componentBuilderFactory, + UserSpecifiedRuntime: userSpecifiedRuntime, + EngineAcceleratorClass: engineAC, + EngineAcceleratorClassName: engineAcName, + DecoderAcceleratorClass: decoderAC, + DecoderAcceleratorClassName: decoderAcName, + EngineSupportedModelFormat: engineSupportedModelFormats, + DecoderSupportedModelFormat: decoderSupportedModelFormats, } // Determine the correct ingress deployment mode using the same logic as ingress reconciler @@ -366,24 +368,28 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req ingressDeploymentMode = engineDeploymentMode } - r.Log.Info("Determined ingress deployment mode", + r.Log.Info("Workload reconciliation completed", + "strategy", strategy.GetStrategyName(), "ingressDeploymentMode", ingressDeploymentMode, "namespace", isvc.Namespace, "inferenceService", isvc.Name) - // Step 6: Run all reconcilers - for _, reconciler := range reconcilers { - result, err := reconciler.Reconcile(isvc) - if err != nil { - r.Log.Error(err, "Failed to reconcile component", - "component", fmt.Sprintf("%T", reconciler), - "namespace", isvc.Namespace, - "inferenceService", isvc.Name) - return result, err - } - if result.Requeue || result.RequeueAfter > 0 { - return result, nil - } + // Step 8: Execute workload reconciliation + result, err = strategy.ReconcileWorkload(ctx, request) + if err != nil { + r.Log.Error(err, "Failed to reconcile workload") + r.Recorder.Eventf(isvc, v1.EventTypeWarning, "WorkloadReconcileFailed", "Failed to reconcile workload: %v", err) + return reconcile.Result{}, err + } + + // If requeue is needed, return immediately + if result.Requeue || result.RequeueAfter > 0 { + r.Log.Info("Workload reconciliation requires requeue", + "namespace", isvc.Namespace, + "inferenceService", isvc.Name, + "requeue", result.Requeue, + "requeueAfter", result.RequeueAfter) + return result, nil } // Now reconcile ingress and external service after components have created their services @@ -615,6 +621,23 @@ func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager, deployCo // Initialize AcceleratorClassSelector r.AcceleratorClassSelector = acceleratorclassselector.New(mgr.GetClient()) + // Initialize WorkloadStrategyManager + r.StrategyManager = workload.NewWorkloadStrategyManager(r.Log) + + // Register RBG Strategy + rbgStrategy := workload.NewRBGStrategy(mgr.GetClient(), r.Clientset, mgr.GetScheme(), r.Log) + if err := r.StrategyManager.RegisterStrategy(rbgStrategy); err != nil { + return err + } + + // Register SingleComponent Strategy (as default, registered last) + singleStrategy := workload.NewSingleComponentStrategy(r.Log) + if err := r.StrategyManager.RegisterStrategy(singleStrategy); err != nil { + return err + } + + r.Log.Info("Registered workload strategies", "strategies", r.StrategyManager.ListStrategies()) + ksvcFound, err := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) if err != nil { return err @@ -635,6 +658,12 @@ func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager, deployCo return err } + // 检查RBG CRD是否可用 + rbgFound, err := utils.IsCrdAvailable(r.ClientConfig, rbg.SchemeGroupVersion.String(), "RoleBasedGroup") + if err != nil { + return err + } + kedaFound, err := utils.IsCrdAvailable(r.ClientConfig, kedav1.SchemeGroupVersion.String(), constants.KEDAScaledObjectKind) if err != nil { return err @@ -674,6 +703,12 @@ func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager, deployCo r.Log.Info("The InferenceService controller won't watch leaderworkerset.x-k8s.io/v1/LeaderWorkerSet resources because the CRD is not available.") } + if rbgFound { + ctrlBuilder = ctrlBuilder.Owns(&rbg.RoleBasedGroup{}) + } else { + r.Log.Info("The InferenceService controller won't watch RoleBasedGroup resources because the CRD is not available.") + } + if vsFound && !ingressConfig.DisableIstioVirtualHost { ctrlBuilder = ctrlBuilder.Owns(&istioclientv1beta1.VirtualService{}) } else { diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler.go index f0727cc7..01f19e5e 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler.go @@ -49,18 +49,17 @@ func createHPA(componentMeta metav1.ObjectMeta, maxReplicas := calculateMaxReplicas(componentExt, minReplicas) metrics := getHPAMetrics(componentMeta, componentExt) + // 根据部署模式确定ScaleTargetRef + scaleTargetRef := getScaleTargetRef(componentMeta) + return &autoscalingv2.HorizontalPodAutoscaler{ ObjectMeta: componentMeta, Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ - ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ - APIVersion: "apps/v1", - Kind: "Deployment", - Name: componentMeta.Name, - }, - MinReplicas: &minReplicas, - MaxReplicas: maxReplicas, - Metrics: metrics, - Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{}, + ScaleTargetRef: scaleTargetRef, + MinReplicas: &minReplicas, + MaxReplicas: maxReplicas, + Metrics: metrics, + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{}, }, } } @@ -72,6 +71,30 @@ func calculateMinReplicas(componentExt *v1beta1.ComponentExtensionSpec) int32 { return int32(*componentExt.MinReplicas) } +// getScaleTargetRef 根据部署模式返回HPA的ScaleTargetRef +// 如果是RoleBasedGroup模式,返回RoleBasedGroupScalingAdapter +// 否则返回默认的Deployment +func getScaleTargetRef(componentMeta metav1.ObjectMeta) autoscalingv2.CrossVersionObjectReference { + // 检查是否为RoleBasedGroup部署模式 + if deploymentMode, exists := componentMeta.Annotations[constants.DeploymentMode]; exists { + if deploymentMode == string(constants.RoleBasedGroup) { + // RoleBasedGroup模式使用RoleBasedGroupScalingAdapter + return autoscalingv2.CrossVersionObjectReference{ + APIVersion: "workloads.x-k8s.io/v1alpha1", + Kind: "RoleBasedGroupScalingAdapter", + Name: componentMeta.Name, + } + } + } + + // 默认使用Deployment + return autoscalingv2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: componentMeta.Name, + } +} + func calculateMaxReplicas(componentExt *v1beta1.ComponentExtensionSpec, minReplicas int32) int32 { maxReplicas := int32(componentExt.MaxReplicas) if maxReplicas < minReplicas { diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler_test.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler_test.go index ce674273..0b4957e8 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler_test.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/hpa/hpa_reconciler_test.go @@ -86,6 +86,24 @@ func TestCreateHPA(t *testing.T) { ScaleMetric: &memoryResource, }, }, + "rbgdeploymenthpa": { + objectMeta: metav1.ObjectMeta{ + Name: "rbg-engine", + Namespace: "rbg-namespace", + Annotations: map[string]string{ + constants.DeploymentMode: string(constants.RoleBasedGroup), + }, + Labels: map[string]string{ + "component": "engine", + }, + }, + componentExt: &v1beta1.ComponentExtensionSpec{ + MinReplicas: isvc.GetIntReference(2), + MaxReplicas: 8, + ScaleTarget: isvc.GetIntReference(70), + ScaleMetric: &cpuResource, + }, + }, } defaultminreplicas := int32(1) @@ -94,6 +112,8 @@ func TestCreateHPA(t *testing.T) { igutilization := int32(30) predictorminreplicas := int32(5) predictorutilization := int32(50) + rbgminreplicas := int32(2) + rbgutilization := int32(70) expectedHPASpecs := map[string]*autoscalingv2.HorizontalPodAutoscaler{ "igdefaulthpa": { @@ -194,6 +214,31 @@ func TestCreateHPA(t *testing.T) { Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{}, }, }, + "rbgdeploymenthpa": { + ObjectMeta: testInput["rbgdeploymenthpa"].objectMeta, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + APIVersion: "workloads.x-k8s.io/v1alpha1", + Kind: "RoleBasedGroupScalingAdapter", + Name: testInput["rbgdeploymenthpa"].objectMeta.Name, + }, + MinReplicas: &rbgminreplicas, + MaxReplicas: 8, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: v1.ResourceName("cpu"), + Target: autoscalingv2.MetricTarget{ + Type: "Utilization", + AverageUtilization: &rbgutilization, + }, + }, + }, + }, + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{}, + }, + }, } tests := []struct { @@ -241,6 +286,14 @@ func TestCreateHPA(t *testing.T) { }, expected: expectedHPASpecs["predictordefaulthpa"], }, + { + name: "rbg deployment mode hpa", + args: args{ + objectMeta: testInput["rbgdeploymenthpa"].objectMeta, + componentExt: testInput["rbgdeploymenthpa"].componentExt, + }, + expected: expectedHPASpecs["rbgdeploymenthpa"], + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/keda/keda_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/keda/keda_reconciler.go index c6a8c59b..72c83adf 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/keda/keda_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/keda/keda_reconciler.go @@ -64,6 +64,8 @@ func createScaledObject( maxReplicas := calculateMaxReplicas(componentExt, minReplicas) triggers := getScaledObjectTriggers(componentMeta, inferenceServiceSpec) + scaleTargetRef := getScaleTargetRef(componentMeta) + return &kedav1.ScaledObject{ ObjectMeta: metav1.ObjectMeta{ Name: utils.GetScaledObjectName(componentMeta.Name), @@ -72,11 +74,7 @@ func createScaledObject( Annotations: componentMeta.Annotations, }, Spec: kedav1.ScaledObjectSpec{ - ScaleTargetRef: &kedav1.ScaleTarget{ - APIVersion: "apps/v1", - Kind: "Deployment", - Name: componentMeta.Name, - }, + ScaleTargetRef: &scaleTargetRef, MinReplicaCount: &minReplicas, MaxReplicaCount: &maxReplicas, Triggers: triggers, @@ -84,6 +82,30 @@ func createScaledObject( } } +// getScaleTargetRef 根据部署模式返回HPA的ScaleTargetRef +// 如果是RoleBasedGroup模式,返回RoleBasedGroupScalingAdapter +// 否则返回默认的Deployment +func getScaleTargetRef(componentMeta metav1.ObjectMeta) kedav1.ScaleTarget { + // 检查是否为RoleBasedGroup部署模式 + if deploymentMode, exists := componentMeta.Annotations[constants.DeploymentMode]; exists { + if deploymentMode == string(constants.RoleBasedGroup) { + // RoleBasedGroup模式使用RoleBasedGroupScalingAdapter + return kedav1.ScaleTarget{ + APIVersion: "workloads.x-k8s.io/v1alpha1", + Kind: "RoleBasedGroupScalingAdapter", + Name: componentMeta.Name, + } + } + } + + // 默认使用Deployment + return kedav1.ScaleTarget{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: componentMeta.Name, + } +} + // calculateMinReplicas calculates the minimum replicas func calculateMinReplicas(componentExt *v1beta1.ComponentExtensionSpec) int32 { if componentExt.MinReplicas != nil && *componentExt.MinReplicas > 0 { diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/rbg_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/rbg_reconciler.go new file mode 100644 index 00000000..912e409a --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/rbg_reconciler.go @@ -0,0 +1,268 @@ +package rbg + +import ( + "context" + "fmt" + + workloadsv1alpha1 "github.com/bcfre/rbg-api/api/workloads/v1alpha1" + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" +) + +var log = ctrl.Log.WithName("RBGReconciler") + +// RBGReconciler负责管理RoleBasedGroup资源 +type RBGReconciler struct { + client client.Client + scheme *runtime.Scheme + log logr.Logger +} + +// NewRBGReconciler创建新的RBG Reconciler实例 +func NewRBGReconciler(client client.Client, scheme *runtime.Scheme) *RBGReconciler { + return &RBGReconciler{ + client: client, + scheme: scheme, + log: log, + } +} + +// Reconcile协调RBG资源 +// 输入: InferenceService实例和各Component的配置 +// 输出: RBGResult包含创建/更新的RBG资源和状态 +func (r *RBGReconciler) Reconcile( + ctx context.Context, + isvc *v1beta1.InferenceService, + components map[v1beta1.ComponentType]*ComponentConfig, +) (ctrl.Result, error) { + r.log.Info("Reconciling RBG", + "namespace", isvc.Namespace, + "name", isvc.Name, + "componentCount", len(components)) + + // 1. 为每个Component生成RoleSpec + roles, err := r.buildRoles(components) + if err != nil { + r.log.Error(err, "Failed to build roles") + return ctrl.Result{}, fmt.Errorf("failed to build roles: %w", err) + } + + // 2. 获取或创建RBG资源 + rbgName := isvc.Name + rbgNamespace := isvc.Namespace + + // 使用类型化对象处理RBG资源 + existingRBG := &workloadsv1alpha1.RoleBasedGroup{} + err = r.client.Get(ctx, types.NamespacedName{ + Name: rbgName, + Namespace: rbgNamespace, + }, existingRBG) + + var rbg *workloadsv1alpha1.RoleBasedGroup + if err != nil { + if apierrors.IsNotFound(err) { + // RBG不存在,创建新的 + rbg, err = r.createRBG(ctx, isvc, roles) + if err != nil { + r.log.Error(err, "Failed to create RBG") + return ctrl.Result{}, fmt.Errorf("failed to create RBG: %w", err) + } + r.log.Info("Created RBG successfully", "name", rbgName) + } else { + // 其他错误 + r.log.Error(err, "Failed to get RBG") + return ctrl.Result{}, fmt.Errorf("failed to get RBG: %w", err) + } + } else { + // RBG已存在,更新 + rbg, err = r.updateRBG(ctx, existingRBG, roles) + if err != nil { + r.log.Error(err, "Failed to update RBG") + return ctrl.Result{}, fmt.Errorf("failed to update RBG: %w", err) + } + r.log.Info("Updated RBG successfully", "name", rbgName) + } + // todo: update isvc status + r.log.Info(rbg.Name) + + // 4. 返回结果 + return ctrl.Result{}, nil +} + +// buildRoles为所有Component构建RoleSpec +func (r *RBGReconciler) buildRoles(components map[v1beta1.ComponentType]*ComponentConfig) ([]workloadsv1alpha1.RoleSpec, error) { + roles := make([]workloadsv1alpha1.RoleSpec, 0, len(components)) + + for componentType, config := range components { + r.log.Info("Building role for component", + "componentType", componentType, + "deploymentMode", config.DeploymentMode) + + role, err := buildRoleSpec(config) + if err != nil { + return nil, fmt.Errorf("failed to build role for %s: %w", componentType, err) + } + roles = append(roles, *role) + } + + return roles, nil +} + +// createRBG创建新的RBG资源 +func (r *RBGReconciler) createRBG(ctx context.Context, isvc *v1beta1.InferenceService, roles []workloadsv1alpha1.RoleSpec) (*workloadsv1alpha1.RoleBasedGroup, error) { + // 提取所有角色名称用于协调配置 + roleNames := make([]string, 0, len(roles)) + for _, role := range roles { + roleNames = append(roleNames, role.Name) + } + + // 构建协调配置:设置所有角色间的更新 skew 最大为 1% + coordinationRequirements := r.buildCoordinationRequirements(roleNames) + + // 使用类型化对象创建RBG资源 + rbg := &workloadsv1alpha1.RoleBasedGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: isvc.Name, + Namespace: isvc.Namespace, + Labels: r.buildRBGLabels(isvc), + }, + Spec: workloadsv1alpha1.RoleBasedGroupSpec{ + Roles: roles, + CoordinationRequirements: coordinationRequirements, + }, + } + + // 为每个Role设置默认的RolloutStrategy + for i := range rbg.Spec.Roles { + if rbg.Spec.Roles[i].RolloutStrategy == nil { + rbg.Spec.Roles[i].RolloutStrategy = &workloadsv1alpha1.RolloutStrategy{ + Type: workloadsv1alpha1.RollingUpdateStrategyType, + } + } + if rbg.Spec.Roles[i].RolloutStrategy.RollingUpdate == nil { + rbg.Spec.Roles[i].RolloutStrategy.RollingUpdate = &workloadsv1alpha1.RollingUpdate{} + } + rbg.Spec.Roles[i].RolloutStrategy.RollingUpdate.Type = workloadsv1alpha1.InPlaceIfPossibleUpdateStrategyType + } + + // 设置OwnerReference,实现级联删除 + if err := controllerutil.SetControllerReference(isvc, rbg, r.scheme); err != nil { + return nil, fmt.Errorf("failed to set owner reference: %w", err) + } + + // 创建RBG资源 + if err := r.client.Create(ctx, rbg); err != nil { + return nil, fmt.Errorf("failed to create RBG resource: %w", err) + } + + return rbg, nil +} + +// updateRBG更新现有RBG资源 +// 注意:保留现有的Replicas配置,因为HPA可能正在管理副本数 +func (r *RBGReconciler) updateRBG(ctx context.Context, existing *workloadsv1alpha1.RoleBasedGroup, newRoles []workloadsv1alpha1.RoleSpec) (*workloadsv1alpha1.RoleBasedGroup, error) { + // 保留现有各个Role的Replicas配置 + // 构建一个map以便快速查找现有Role的Replicas + existingReplicasMap := make(map[string]*int32) + for _, existingRole := range existing.Spec.Roles { + if existingRole.Replicas != nil { + existingReplicasMap[existingRole.Name] = existingRole.Replicas + } + } + + // 更新newRoles,保留现有的Replicas值 + for i := range newRoles { + if existingReplicas, exists := existingReplicasMap[newRoles[i].Name]; exists { + // 使用现有的Replicas值,而不是新提供的值 + newRoles[i].Replicas = existingReplicas + r.log.V(1).Info("Preserving existing replicas for role", + "roleName", newRoles[i].Name, + "replicas", *existingReplicas) + } + } + + // 提取所有角色名称用于协调配置 + roleNames := make([]string, 0, len(newRoles)) + for _, role := range newRoles { + roleNames = append(roleNames, role.Name) + } + + // 构建协调配置:设置所有角色间的更新 skew 最大为 1% + coordinationRequirements := r.buildCoordinationRequirements(roleNames) + + // 更新Spec + existing.Spec.Roles = newRoles + existing.Spec.CoordinationRequirements = coordinationRequirements + + // 为每个Role设置默认的RolloutStrategy + for i := range existing.Spec.Roles { + if existing.Spec.Roles[i].RolloutStrategy == nil { + existing.Spec.Roles[i].RolloutStrategy = &workloadsv1alpha1.RolloutStrategy{ + Type: workloadsv1alpha1.RollingUpdateStrategyType, + } + } + if existing.Spec.Roles[i].RolloutStrategy.RollingUpdate == nil { + existing.Spec.Roles[i].RolloutStrategy.RollingUpdate = &workloadsv1alpha1.RollingUpdate{} + } + existing.Spec.Roles[i].RolloutStrategy.RollingUpdate.Type = workloadsv1alpha1.InPlaceIfPossibleUpdateStrategyType + } + + // 更新资源 + if err := r.client.Update(ctx, existing); err != nil { + return nil, fmt.Errorf("failed to update RBG resource: %w", err) + } + + return existing, nil +} + +// buildRBGLabels构建RBG的标签 +func (r *RBGReconciler) buildRBGLabels(isvc *v1beta1.InferenceService) map[string]string { + labels := make(map[string]string) + + // 复制InferenceService的标签 + if isvc.Labels != nil { + for k, v := range isvc.Labels { + labels[k] = v + } + } + + // 添加标识标签 + labels["app"] = isvc.Name + labels["inferenceservice"] = isvc.Name + labels["deploymentMode"] = string(constants.RoleBasedGroup) + + return labels +} + +// buildCoordinationRequirements构建角色间的协调配置 +// 设置所有角色间的更新 skew 最大为 1%,确保滚动更新时各角色保持同步 +func (r *RBGReconciler) buildCoordinationRequirements(roleNames []string) []workloadsv1alpha1.Coordination { + // 如果只有一个角色,不需要协调配置 + if len(roleNames) <= 1 { + return nil + } + + // 设置 MaxSkew 为 1% + maxSkew := "1%" + + coordination := workloadsv1alpha1.Coordination{ + Name: "all-roles-coordination", + Roles: roleNames, + Strategy: &workloadsv1alpha1.CoordinationStrategy{ + RollingUpdate: &workloadsv1alpha1.CoordinationRollingUpdate{ + MaxSkew: &maxSkew, + }, + }, + } + + return []workloadsv1alpha1.Coordination{coordination} +} diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/role_builder.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/role_builder.go new file mode 100644 index 00000000..fab767bb --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/role_builder.go @@ -0,0 +1,184 @@ +package rbg + +import ( + "encoding/json" + + workloadsv1alpha1 "github.com/bcfre/rbg-api/api/workloads/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" +) + +// buildRoleSpec根据ComponentConfig构建RoleSpec +func buildRoleSpec(config *ComponentConfig) (*workloadsv1alpha1.RoleSpec, error) { + roleName := string(config.ComponentType) + + // 获取副本数 + replicas := getReplicasCount(config.ComponentExtensionSpec) + + // 构建Role基础配置 + role := &workloadsv1alpha1.RoleSpec{ + Name: roleName, + Labels: buildRoleLabels(config), + Annotations: buildRoleAnnotations(config), + Replicas: replicas, + ScalingAdapter: &workloadsv1alpha1.ScalingAdapter{Enable: true}, + } + + role.Labels["instance.rolebasedgroup.workloads.x-k8s.io/pattern"] = "Deployment" + role.Labels["app"] = constants.TruncateNameWithMaxLength(config.ObjectMeta.Name, 63) + + role.Workload = workloadsv1alpha1.WorkloadSpec{ + APIVersion: "workloads.x-k8s.io/v1alpha1", + Kind: "InstanceSet", + } + // role.Workload = workloadsv1alpha1.WorkloadSpec{ + // APIVersion: "leaderworkerset.x-k8s.io/v1", + // Kind: "LeaderWorkerSet", + // } + + // 根据部署模式配置Workload和Template + switch config.DeploymentMode { + case constants.RawDeployment: + role.Template = buildPodTemplateSpec(config) + + case constants.MultiNode: + role.LeaderWorkerSet = buildLeaderWorkerTemplate(config) + + default: + role.Template = buildPodTemplateSpec(config) + } + + return role, nil +} + +// getReplicasCount获取副本数 +func getReplicasCount(spec *v1beta1.ComponentExtensionSpec) *int32 { + if spec == nil || spec.MinReplicas == nil { + defaultReplicas := int32(1) + return &defaultReplicas + } + replicas := int32(*spec.MinReplicas) + return &replicas +} + +// buildRoleLabels构建Role的标签 +func buildRoleLabels(config *ComponentConfig) map[string]string { + labels := make(map[string]string) + + // 复制ObjectMeta中的标签 + if config.ObjectMeta.Labels != nil { + for k, v := range config.ObjectMeta.Labels { + labels[k] = v + } + } + + // 复制ComponentExtensionSpec中的标签 + if config.ComponentExtensionSpec != nil && config.ComponentExtensionSpec.Labels != nil { + for k, v := range config.ComponentExtensionSpec.Labels { + labels[k] = v + } + } + + // 添加component标识标签 + labels["component"] = string(config.ComponentType) + + return labels +} + +// buildRoleAnnotations构建Role的注解 +func buildRoleAnnotations(config *ComponentConfig) map[string]string { + annotations := make(map[string]string) + + // 复制ObjectMeta中的注解 + if config.ObjectMeta.Annotations != nil { + for k, v := range config.ObjectMeta.Annotations { + annotations[k] = v + } + } + + // 复制ComponentExtensionSpec中的注解 + if config.ComponentExtensionSpec != nil && config.ComponentExtensionSpec.Annotations != nil { + for k, v := range config.ComponentExtensionSpec.Annotations { + annotations[k] = v + } + } + + return annotations +} + +// buildPodTemplateSpec构建PodTemplateSpec +func buildPodTemplateSpec(config *ComponentConfig) *corev1.PodTemplateSpec { + if config.PodSpec == nil { + return nil + } + + label := buildRoleLabels(config) + label["app"] = constants.TruncateNameWithMaxLength(config.ObjectMeta.Name, 63) + + return &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: label, + Annotations: buildRoleAnnotations(config), + }, + Spec: *config.PodSpec, + } +} + +// buildLeaderWorkerTemplate构建LeaderWorkerTemplate配置 +func buildLeaderWorkerTemplate(config *ComponentConfig) *workloadsv1alpha1.LeaderWorkerTemplate { + if config.WorkerSize <= 0 { + // 如果没有指定worker数量,默认使用1个leader + 0个worker + config.WorkerSize = 0 + } + + // size = 1个leader + N个worker + size := int32(config.WorkerSize + 1) + + lwt := &workloadsv1alpha1.LeaderWorkerTemplate{ + Size: &size, + } + + // 构建Leader PodTemplateSpec + if config.LeaderPodSpec != nil { + leaderTemplate := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: buildRoleLabels(config), + Annotations: buildRoleAnnotations(config), + }, + Spec: *config.LeaderPodSpec, + } + leaderTemplate.Labels["app"] = constants.GetRawServiceLabel(config.ObjectMeta.Name) + leaderTemplate.Labels["ray.io/node-type"] = "head" + // 转换为RawExtension + leaderJSON, err := json.Marshal(leaderTemplate) + if err == nil { + lwt.PatchLeaderTemplate = &runtime.RawExtension{ + Raw: leaderJSON, + } + } + } + + // 构建Worker PodTemplateSpec + if config.WorkerPodSpec != nil { + workerTemplate := &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: buildRoleLabels(config), + Annotations: buildRoleAnnotations(config), + }, + Spec: *config.WorkerPodSpec, + } + // 转换为RawExtension + workerJSON, err := json.Marshal(workerTemplate) + if err == nil { + lwt.PatchWorkerTemplate = &runtime.RawExtension{ + Raw: workerJSON, + } + } + } + + return lwt +} diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/types.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/types.go new file mode 100644 index 00000000..34884f24 --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg/types.go @@ -0,0 +1,32 @@ +package rbg + +import ( + workloadsv1alpha1 "github.com/bcfre/rbg-api/api/workloads/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" +) + +var _ = &workloadsv1alpha1.RoleBasedGroup{} + +// ComponentConfig包含单个Component的所有配置 +type ComponentConfig struct { + // ComponentType指定组件类型(Engine/Decoder/Router) + ComponentType v1beta1.ComponentType + // DeploymentMode指定该组件的部署模式(RawDeployment或MultiNode) + DeploymentMode constants.DeploymentModeType + // PodSpec用于RawDeployment模式或作为MultiNode模式的基础模板 + PodSpec *corev1.PodSpec + // LeaderPodSpec用于MultiNode模式的leader节点 + LeaderPodSpec *corev1.PodSpec + // WorkerPodSpec用于MultiNode模式的worker节点 + WorkerPodSpec *corev1.PodSpec + // WorkerSize指定worker节点数量(MultiNode模式) + WorkerSize int + // ComponentExtensionSpec包含副本数、扩缩容等配置 + ComponentExtensionSpec *v1beta1.ComponentExtensionSpec + // ObjectMeta包含名称、标签、注解等元数据 + ObjectMeta metav1.ObjectMeta +} diff --git a/pkg/controller/v1beta1/inferenceservice/status/status_reconciler.go b/pkg/controller/v1beta1/inferenceservice/status/status_reconciler.go index 8fd4852a..c592e811 100644 --- a/pkg/controller/v1beta1/inferenceservice/status/status_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/status/status_reconciler.go @@ -2,7 +2,9 @@ package status import ( "reflect" + "strconv" + rbgapi "github.com/bcfre/rbg-api/api/workloads/v1alpha1" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" @@ -81,6 +83,48 @@ func (sr *StatusReconciler) PropagateMultiNodeStatus( status.ObservedGeneration = lws.Generation } +// PropagateRBGStatus propagates status from RBG for a specific component +func (sr *StatusReconciler) PropagateRBGRoleStatus( + status *v1beta1.InferenceServiceStatus, + component v1beta1.ComponentType, + roleReady bool, + rbg *rbgapi.RoleBasedGroup, + url *apis.URL) { + + statusSpec := sr.initializeComponentStatus(status, component) + + statusSpec.LatestCreatedRevision = strconv.FormatInt(rbg.GetGeneration(), 10) + + // Set URL if RBG role is ready + if roleReady { + statusSpec.URL = url + } + + readyCondition := sr.getReadyConditionsMap()[component] + + // Create condition based on RBG role status + var componentCondition *apis.Condition + if roleReady { + componentCondition = &apis.Condition{ + Type: readyCondition, + Status: v1.ConditionTrue, + Reason: "RBGRoleReady", + Message: "RBG role is ready", + } + } else { + componentCondition = &apis.Condition{ + Type: readyCondition, + Status: v1.ConditionFalse, + Reason: "RBGRoleNotReady", + Message: "RBG role is not ready", + } + } + + sr.setCondition(status, readyCondition, componentCondition) + status.Components[component] = statusSpec + // Note: ObservedGeneration will be set from RBG resource metadata +} + // PropagateMultiNodeRayVLLMStatus propagates status from multiple deployments func (sr *StatusReconciler) PropagateMultiNodeRayVLLMStatus( status *v1beta1.InferenceServiceStatus, diff --git a/pkg/controller/v1beta1/inferenceservice/workload/manager.go b/pkg/controller/v1beta1/inferenceservice/workload/manager.go new file mode 100644 index 00000000..ec15ca0f --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/workload/manager.go @@ -0,0 +1,126 @@ +package workload + +import ( + "fmt" + "sync" + + "github.com/go-logr/logr" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" +) + +// WorkloadStrategyManager 管理和选择工作负载策略 +type WorkloadStrategyManager struct { + strategies map[string]WorkloadStrategy + mu sync.RWMutex + log logr.Logger +} + +// NewWorkloadStrategyManager 创建策略管理器实例 +func NewWorkloadStrategyManager(log logr.Logger) *WorkloadStrategyManager { + return &WorkloadStrategyManager{ + strategies: make(map[string]WorkloadStrategy), + log: log, + } +} + +// RegisterStrategy 注册工作负载策略 +func (m *WorkloadStrategyManager) RegisterStrategy(strategy WorkloadStrategy) error { + if strategy == nil { + return fmt.Errorf("cannot register nil strategy") + } + + name := strategy.GetStrategyName() + if name == "" { + return fmt.Errorf("strategy name cannot be empty") + } + + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.strategies[name]; exists { + return fmt.Errorf("strategy %s is already registered", name) + } + + m.strategies[name] = strategy + m.log.Info("Registered workload strategy", "strategy", name) + return nil +} + +// SelectStrategy 根据条件选择合适的策略 +// 策略选择流程: +// 1. 检查annotations中是否指定了deploymentMode +// 2. 遍历所有已注册的策略,找到第一个IsApplicable返回true的策略 +// 3. 如果没有找到适用的策略,返回错误 +func (m *WorkloadStrategyManager) SelectStrategy(isvc *v1beta1.InferenceService, annotations map[string]string) (WorkloadStrategy, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + if len(m.strategies) == 0 { + return nil, fmt.Errorf("no workload strategies registered") + } + + // 遍历所有策略,找到第一个适用的 + // 注意:策略的注册顺序很重要,应该先注册特殊策略(如RBG),最后注册默认策略(SingleComponent) + for name, strategy := range m.strategies { + if strategy.IsApplicable(isvc, annotations) { + m.log.V(1).Info("Selected workload strategy", + "strategy", name, + "namespace", isvc.Namespace, + "inferenceService", isvc.Name) + return strategy, nil + } + } + + // 如果没有策略适用,返回错误 + return nil, fmt.Errorf("no applicable workload strategy found for InferenceService %s/%s", isvc.Namespace, isvc.Name) +} + +// GetStrategy 根据名称获取策略实例 +func (m *WorkloadStrategyManager) GetStrategy(name string) (WorkloadStrategy, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + strategy, exists := m.strategies[name] + if !exists { + return nil, fmt.Errorf("strategy %s not found", name) + } + + return strategy, nil +} + +// ListStrategies 列出所有已注册的策略 +func (m *WorkloadStrategyManager) ListStrategies() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + names := make([]string, 0, len(m.strategies)) + for name := range m.strategies { + names = append(names, name) + } + + return names +} + +// UnregisterStrategy 取消注册策略(主要用于测试) +func (m *WorkloadStrategyManager) UnregisterStrategy(name string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.strategies[name]; !exists { + return fmt.Errorf("strategy %s not found", name) + } + + delete(m.strategies, name) + m.log.Info("Unregistered workload strategy", "strategy", name) + return nil +} + +// Clear 清空所有已注册的策略(主要用于测试) +func (m *WorkloadStrategyManager) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + + m.strategies = make(map[string]WorkloadStrategy) + m.log.Info("Cleared all workload strategies") +} diff --git a/pkg/controller/v1beta1/inferenceservice/workload/rbg_strategy.go b/pkg/controller/v1beta1/inferenceservice/workload/rbg_strategy.go new file mode 100644 index 00000000..dc865286 --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/workload/rbg_strategy.go @@ -0,0 +1,549 @@ +package workload + +import ( + "context" + "fmt" + + rbgapi "github.com/bcfre/rbg-api/api/workloads/v1alpha1" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + knapis "knative.dev/pkg/apis" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" + "github.com/sgl-project/ome/pkg/controller/v1beta1/controllerconfig" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/components" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/autoscaler" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/ingress/services" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/pdb" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/rbac" + rbgpkg "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/reconcilers/rbg" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/status" + isvcutils "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/utils" +) + +// RBGStrategy 实现RBG (RoleBasedGroup) All-in-One工作负载策略 +// 将多个组件(Engine、Decoder、Router)打包到一个RBG资源中 +type RBGStrategy struct { + client client.Client + clientset kubernetes.Interface + scheme *runtime.Scheme + log logr.Logger +} + +// NewRBGStrategy 创建RBG策略实例 +func NewRBGStrategy(client client.Client, clientset kubernetes.Interface, scheme *runtime.Scheme, log logr.Logger) *RBGStrategy { + return &RBGStrategy{ + client: client, + clientset: clientset, + scheme: scheme, + log: log, + } +} + +// GetStrategyName 返回策略名称 +func (r *RBGStrategy) GetStrategyName() string { + return "RBG" +} + +// IsApplicable 判断该策略是否适用 +// RBG策略必须通过annotation显式指定 +func (r *RBGStrategy) IsApplicable(isvc *v1beta1.InferenceService, annotations map[string]string) bool { + if mode, found := annotations[constants.DeploymentMode]; found { + return mode == string(constants.RoleBasedGroup) + } + return false +} + +// ValidateDeploymentModes 验证部署模式 +// RBG策略仅支持RawDeployment和MultiNode +func (r *RBGStrategy) ValidateDeploymentModes(modes *ComponentDeploymentModes) error { + // 检查Engine部署模式(如果存在) + if modes.Engine != "" { + if modes.Engine != constants.RawDeployment && modes.Engine != constants.MultiNode { + return fmt.Errorf("RBG mode only supports RawDeployment and MultiNode deployment modes for engine, got %s", modes.Engine) + } + } + + // 检查Decoder部署模式(如果存在) + if modes.Decoder != "" { + if modes.Decoder != constants.RawDeployment && modes.Decoder != constants.MultiNode { + return fmt.Errorf("RBG mode only supports RawDeployment and MultiNode deployment modes for decoder, got %s", modes.Decoder) + } + } + + // 检查Router部署模式(如果存在) + if modes.Router != "" { + if modes.Router != constants.RawDeployment { + return fmt.Errorf("RBG mode only supports RawDeployment deployment mode for router, got %s", modes.Router) + } + } + + return nil +} + +// ReconcileWorkload 执行工作负载调谐 +// 使用ComponentConfigExtractor接口从组件提取配置,然后调用RBGReconciler +func (r *RBGStrategy) ReconcileWorkload(ctx context.Context, request *WorkloadReconcileRequest) (ctrl.Result, error) { + r.log.Info("Reconciling with RBG strategy", + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + // 收集ComponentConfig + componentConfigs := make(map[v1beta1.ComponentType]*rbgpkg.ComponentConfig) + + // 构建Engine ComponentConfig + if request.MergedEngine != nil { + r.log.Info("Building engine component config for RBG", + "deploymentMode", request.DeploymentModes.Engine) + + engineReconciler := request.ComponentBuilderFactory.CreateEngineComponent( + request.DeploymentModes.Engine, + request.BaseModel, + request.BaseModelMeta, + request.MergedEngine, + request.Runtime, + request.RuntimeName, + request.EngineSupportedModelFormat, + request.EngineAcceleratorClass, + request.EngineAcceleratorClassName, + ) + + // 类型断言为Engine组件并提取配置 + engineComponent, ok := engineReconciler.(components.ComponentConfigExtractor) + if !ok { + return ctrl.Result{}, fmt.Errorf("engine component does not implement ComponentConfigExtractor") + } + + engineConfig, err := r.buildComponentConfig(ctx, engineComponent, request.InferenceService, v1beta1.EngineComponent, request.DeploymentModes.Engine) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to build engine config: %w", err) + } + componentConfigs[v1beta1.EngineComponent] = engineConfig + } + + // 构建Decoder ComponentConfig + if request.MergedDecoder != nil { + r.log.Info("Building decoder component config for RBG", + "deploymentMode", request.DeploymentModes.Decoder) + + decoderReconciler := request.ComponentBuilderFactory.CreateDecoderComponent( + request.DeploymentModes.Decoder, + request.BaseModel, + request.BaseModelMeta, + request.MergedDecoder, + request.Runtime, + request.RuntimeName, + request.DecoderSupportedModelFormat, + request.DecoderAcceleratorClass, + request.DecoderAcceleratorClassName, + ) + + // 类型断言为Decoder组件并提取配置 + decoderComponent, ok := decoderReconciler.(components.ComponentConfigExtractor) + if !ok { + return ctrl.Result{}, fmt.Errorf("decoder component does not implement ComponentConfigExtractor") + } + + decoderConfig, err := r.buildComponentConfig(ctx, decoderComponent, request.InferenceService, v1beta1.DecoderComponent, request.DeploymentModes.Decoder) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to build decoder config: %w", err) + } + componentConfigs[v1beta1.DecoderComponent] = decoderConfig + } + + // 构建Router ComponentConfig + if request.MergedRouter != nil { + r.log.Info("Building router component config for RBG", + "deploymentMode", request.DeploymentModes.Router) + + routerReconciler := request.ComponentBuilderFactory.CreateRouterComponent( + request.DeploymentModes.Router, + request.BaseModel, + request.BaseModelMeta, + request.MergedRouter, + request.Runtime, + request.RuntimeName, + ) + + // 类型断言为Router组件并提取配置 + routerComponent, ok := routerReconciler.(components.ComponentConfigExtractor) + if !ok { + return ctrl.Result{}, fmt.Errorf("router component does not implement ComponentConfigExtractor") + } + + routerConfig, err := r.buildComponentConfig(ctx, routerComponent, request.InferenceService, v1beta1.RouterComponent, request.DeploymentModes.Router) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to build router config: %w", err) + } + componentConfigs[v1beta1.RouterComponent] = routerConfig + } + + // 阶段1: 创建RBAC资源(仅Router需要) + if err := r.reconcileRBAC(ctx, request.InferenceService, componentConfigs); err != nil { + r.log.Error(err, "Failed to reconcile RBAC resources") + return ctrl.Result{}, fmt.Errorf("failed to reconcile RBAC: %w", err) + } + + // 阶段2: 创建RBG Reconciler + rbgReconciler := rbgpkg.NewRBGReconciler(r.client, r.scheme) + + // 调谐RBG资源 + result, err := rbgReconciler.Reconcile(ctx, request.InferenceService, componentConfigs) + if err != nil { + r.log.Error(err, "Failed to reconcile RBG") + return result, fmt.Errorf("failed to reconcile RBG: %w", err) + } + if result.Requeue || result.RequeueAfter > 0 { + return result, nil + } + + // 阶段3: 创建HPA资源(为每个Role创建独立HPA) + if err := r.reconcileHPA(ctx, request.InferenceService, componentConfigs); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconcile HPA: %w", err) + } + + // 阶段4: 创建PDB资源(为每个Role创建独立PDB) + if err := r.reconcilePDB(ctx, request.InferenceService, componentConfigs); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to reconcile PDB: %w", err) + } + + if err := r.propagateStatus(request.InferenceService, componentConfigs); err != nil { + r.log.Error(err, "Failed to propagate status, but continuing", + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + return ctrl.Result{}, fmt.Errorf("failed to reconcile Status: %w", err) + } + + return ctrl.Result{}, nil +} + +// buildComponentConfig 从组件提取配置构建RBG ComponentConfig +func (r *RBGStrategy) buildComponentConfig( + ctx context.Context, + component components.ComponentConfigExtractor, + isvc *v1beta1.InferenceService, + componentType v1beta1.ComponentType, + deploymentMode constants.DeploymentModeType, +) (*rbgpkg.ComponentConfig, error) { + // 提取ObjectMeta + objectMeta, err := component.GetObjectMeta(isvc) + if err != nil { + return nil, fmt.Errorf("failed to get object meta: %w", err) + } + + // 提取PodSpec + podSpec, err := component.GetPodSpec(isvc) + if err != nil { + return nil, fmt.Errorf("failed to get pod spec: %w", err) + } + + // 提取WorkerPodSpec(如果支持) + workerPodSpec, err := component.GetWorkerPodSpec(isvc) + if err != nil { + return nil, fmt.Errorf("failed to get worker pod spec: %w", err) + } + + // 提取ComponentExtension + componentExt := component.GetComponentExtension() + + // 提取WorkerSize + workerSize := component.GetWorkerSize() + + return &rbgpkg.ComponentConfig{ + ComponentType: componentType, + DeploymentMode: deploymentMode, + PodSpec: podSpec, + LeaderPodSpec: podSpec, // For MultiNode, leader uses the main PodSpec + WorkerPodSpec: workerPodSpec, + WorkerSize: workerSize, + ComponentExtensionSpec: componentExt, + ObjectMeta: objectMeta, + }, nil +} + +// propagateStatus 状态传播到InferenceService +// 从RBG资源读取状态并传播到各组件状态和条件 +func (r *RBGStrategy) propagateStatus(isvc *v1beta1.InferenceService, componentConfig map[v1beta1.ComponentType]*rbgpkg.ComponentConfig) error { + ctx := context.TODO() + + // 创建StatusReconciler + statusReconciler := status.NewStatusReconciler() + + // 读取RBG资源 + rbg := &rbgapi.RoleBasedGroup{} + err := r.client.Get(ctx, types.NamespacedName{ + Name: isvc.Name, + Namespace: isvc.Namespace, + }, rbg) + if err != nil { + return fmt.Errorf("failed to get RBG status: %w", err) + } + + // 2. 遍历每个组件,传播组件状态和条件 + componentList := make([]v1beta1.ComponentType, 0, len(componentConfig)) + for componentType, config := range componentConfig { + componentList = append(componentList, componentType) + + // 从cRBG状态中查找对应的Role状态 + var roleStatus *rbgapi.RoleStatus + for i := range rbg.Status.RoleStatuses { + if rbg.Status.RoleStatuses[i].Name == string(componentType) { + roleStatus = &rbg.Status.RoleStatuses[i] + break + } + } + + if roleStatus == nil { + r.log.V(1).Info("No role status found for component, just initializing", + "componentType", componentType) + statusReconciler.InitializeComponentCondition(&isvc.Status, componentType) + continue + } + + // 构造URL + url, err := createRawURL(r.clientset, config.ObjectMeta) + if err != nil { + return err + } + + roleReady := false + if roleStatus.Replicas == roleStatus.ReadyReplicas && roleStatus.Replicas == roleStatus.UpdatedReplicas { + roleReady = true + } + + // 调用StatusReconciler传播组件状态 + statusReconciler.PropagateRBGRoleStatus( + &isvc.Status, + componentType, + roleReady, + rbg, + url, + ) + + r.log.V(1).Info("Propagated RBG status for component", + "componentType", componentType, + "replicas", roleStatus.Replicas, + "readyReplicas", roleStatus.ReadyReplicas, + "url", url.String()) + } + + // 3. 遍历每个组件,更新模型状态 + for componentType, config := range componentConfig { + // 获取组件的状态规范 + statusSpec := isvc.Status.Components[componentType] + + // 根据部署模式确定Pod标签 + var podLabelKey, podLabelValue string + // if config.DeploymentMode == constants.RawDeployment { + // // RawDeployment模式:使用app标签 + // podLabelKey = "app" + // podLabelValue = config.ObjectMeta.Name + // } else if config.DeploymentMode == constants.MultiNode { + // // MultiNode模式:使用LeaderWorkerSet标签 + // podLabelKey = "leaderworkerset.sigs.k8s.io/name" + // podLabelValue = config.ObjectMeta.Name + // } else { + // r.log.V(1).Info("Unsupported deployment mode for pod query", + // "componentType", componentType, + // "deploymentMode", config.DeploymentMode) + // continue + // } + podLabelKey = "app" + podLabelValue = config.ObjectMeta.Name + + // 查询Pod列表 + pods, err := isvcutils.ListPodsByLabel(r.client, isvc.Namespace, podLabelKey, podLabelValue) + if err != nil { + r.log.Error(err, "Failed to list pods for component", + "componentType", componentType, + "labelKey", podLabelKey, + "labelValue", podLabelValue) + continue + } + + // 传播模型状态 + statusReconciler.PropagateModelStatus(&isvc.Status, statusSpec, pods, true) + + r.log.V(1).Info("Propagated model status for component", + "componentType", componentType, + "podCount", len(pods.Items)) + } + + // 4. 聚合整体就绪条件 + if len(componentList) > 0 { + statusReconciler.PropagateCrossComponentStatus(&isvc.Status, componentList, "Ready") + r.log.Info("Propagated cross-component Ready status", + "componentCount", len(componentList)) + } + + isvc.Status.ObservedGeneration = rbg.Generation + + r.log.Info("Successfully propagated RBG status to InferenceService", + "namespace", isvc.Namespace, + "inferenceService", isvc.Name, + "componentCount", len(componentList)) + + return nil +} + +// reconcileRBAC 为Router组件创建RBAC资源 +func (r *RBGStrategy) reconcileRBAC(ctx context.Context, isvc *v1beta1.InferenceService, componentConfigs map[v1beta1.ComponentType]*rbgpkg.ComponentConfig) error { + // 检查是否存在Router组件 + routerConfig, hasRouter := componentConfigs[v1beta1.RouterComponent] + if !hasRouter { + r.log.V(1).Info("No router component, skipping RBAC creation") + return nil + } + + r.log.Info("Creating RBAC resources for router component", + "namespace", isvc.Namespace, + "inferenceService", isvc.Name) + + // 创建RBAC Reconciler + rbacReconciler := rbac.NewRBACReconciler( + r.client, + r.scheme, + routerConfig.ObjectMeta, + v1beta1.RouterComponent, + isvc, + ) + + // 调用Reconcile创建ServiceAccount、Role、RoleBinding + if err := rbacReconciler.Reconcile(); err != nil { + return fmt.Errorf("failed to create RBAC resources for router: %w", err) + } + + // 更新Router的PodSpec,设置ServiceAccountName + serviceAccountName := rbacReconciler.GetServiceAccountName() + routerConfig.PodSpec.ServiceAccountName = serviceAccountName + + r.log.Info("Successfully created RBAC resources for router", + "serviceAccountName", serviceAccountName) + + return nil +} + +// reconcileHPA 为每个Role创建独立的HPA资源 +func (r *RBGStrategy) reconcileHPA(ctx context.Context, isvc *v1beta1.InferenceService, componentConfigs map[v1beta1.ComponentType]*rbgpkg.ComponentConfig) error { + r.log.Info("Creating HPA resources for all roles", + "namespace", isvc.Namespace, + "inferenceService", isvc.Name) + + // 构建InferenceServiceSpec用于AutoscalerClass判断 + inferenceServiceSpec := &v1beta1.InferenceServiceSpec{ + Predictor: v1beta1.PredictorSpec{ + ComponentExtensionSpec: v1beta1.ComponentExtensionSpec{}, + }, + KedaConfig: isvc.Spec.KedaConfig, + } + + // 遍历所有组件,为每个组件创建HPA + for componentType, config := range componentConfigs { + r.log.V(1).Info("Creating HPA for component", + "componentType", componentType, + "name", config.ObjectMeta.Name) + + // 更新inferenceServiceSpec中的ComponentExtensionSpec + inferenceServiceSpec.Predictor.ComponentExtensionSpec = *config.ComponentExtensionSpec + + // 创建Autoscaler Reconciler(会根据AutoscalerClass创建HPA或KEDA) + autoscalerReconciler, err := autoscaler.NewAutoscalerReconciler( + r.client, + r.clientset, + r.scheme, + config.ObjectMeta, + inferenceServiceSpec, + ) + if err != nil { + r.log.Error(err, "Failed to create autoscaler reconciler", + "componentType", componentType) + continue + } + + // 设置OwnerReference + if err := autoscalerReconciler.Autoscaler.SetControllerReferences(isvc, r.scheme); err != nil { + r.log.Error(err, "Failed to set owner reference for autoscaler", + "componentType", componentType) + continue + } + + // 调用Reconcile创建HPA/KEDA资源 + if err := autoscalerReconciler.Reconcile(); err != nil { + r.log.Error(err, "Failed to reconcile autoscaler", + "componentType", componentType) + continue + } + + r.log.Info("Successfully created autoscaler for component", + "componentType", componentType, + "name", config.ObjectMeta.Name) + } + + return nil +} + +// reconcilePDB 为每个Role创建独立的PDB资源 +func (r *RBGStrategy) reconcilePDB(ctx context.Context, isvc *v1beta1.InferenceService, componentConfigs map[v1beta1.ComponentType]*rbgpkg.ComponentConfig) error { + r.log.Info("Creating PDB resources for all roles", + "namespace", isvc.Namespace, + "inferenceService", isvc.Name) + + // 遍历所有组件,为每个组件创建PDB + for componentType, config := range componentConfigs { + r.log.V(1).Info("Creating PDB for component", + "componentType", componentType, + "name", config.ObjectMeta.Name) + + // 创建PDB Reconciler + pdbReconciler := pdb.NewPDBReconciler( + r.client, + r.scheme, + config.ObjectMeta, + config.ComponentExtensionSpec, + ) + + // 设置OwnerReference + if err := controllerutil.SetControllerReference(isvc, pdbReconciler.PDB, r.scheme); err != nil { + r.log.Error(err, "Failed to set owner reference for PDB", + "componentType", componentType) + continue + } + + // 调用Reconcile创建PDB资源 + if _, err := pdbReconciler.Reconcile(); err != nil { + r.log.Error(err, "Failed to reconcile PDB", + "componentType", componentType) + continue + } + + r.log.Info("Successfully created PDB for component", + "componentType", componentType, + "name", config.ObjectMeta.Name) + } + + return nil +} + +func createRawURL(clientset kubernetes.Interface, metadata metav1.ObjectMeta) (*knapis.URL, error) { + ingressConfig, err := controllerconfig.NewIngressConfig(clientset) + if err != nil { + return nil, err + } + + domainService := services.NewDomainService() + url := &knapis.URL{} + url.Scheme = "http" + url.Host, err = domainService.GenerateDomainName(fmt.Sprintf("s-%s", metadata.Name), metadata, ingressConfig) + if err != nil { + return nil, fmt.Errorf("failed creating host name: %w", err) + } + + return url, nil +} diff --git a/pkg/controller/v1beta1/inferenceservice/workload/single_component_strategy.go b/pkg/controller/v1beta1/inferenceservice/workload/single_component_strategy.go new file mode 100644 index 00000000..bdba7e0f --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/workload/single_component_strategy.go @@ -0,0 +1,144 @@ +package workload + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/components" +) + +// SingleComponentStrategy 实现单组件独立部署策略 +// 这是默认策略,每个组件独立调谐,不使用All-in-One工作负载 +type SingleComponentStrategy struct { + log logr.Logger +} + +// NewSingleComponentStrategy 创建单组件策略实例 +func NewSingleComponentStrategy(log logr.Logger) *SingleComponentStrategy { + return &SingleComponentStrategy{ + log: log, + } +} + +// GetStrategyName 返回策略名称 +func (s *SingleComponentStrategy) GetStrategyName() string { + return "SingleComponent" +} + +// IsApplicable 判断该策略是否适用 +// 单组件策略是默认策略,当没有指定其他All-in-One工作负载时使用 +func (s *SingleComponentStrategy) IsApplicable(isvc *v1beta1.InferenceService, annotations map[string]string) bool { + // 检查是否指定了RoleBasedGroup模式 + if mode, found := annotations[constants.DeploymentMode]; found { + if mode == string(constants.RoleBasedGroup) { + return false + } + } + + // 默认情况下,单组件策略总是适用 + return true +} + +// ValidateDeploymentModes 验证部署模式 +// 单组件策略支持所有部署模式 +func (s *SingleComponentStrategy) ValidateDeploymentModes(modes *ComponentDeploymentModes) error { + // 单组件策略支持所有部署模式,无需验证 + return nil +} + +// ReconcileWorkload 执行工作负载调谐 +// 复用现有的组件Reconcile逻辑 +func (s *SingleComponentStrategy) ReconcileWorkload(ctx context.Context, request *WorkloadReconcileRequest) (ctrl.Result, error) { + s.log.Info("Reconciling with SingleComponent strategy", + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + var reconcilers []components.Component + + // 创建Engine组件 + if request.MergedEngine != nil { + s.log.Info("Creating engine reconciler", + "deploymentMode", request.DeploymentModes.Engine, + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + engineReconciler := request.ComponentBuilderFactory.CreateEngineComponent( + request.DeploymentModes.Engine, + request.BaseModel, + request.BaseModelMeta, + request.MergedEngine, + request.Runtime, + request.RuntimeName, + request.EngineSupportedModelFormat, + request.EngineAcceleratorClass, + request.EngineAcceleratorClassName, + ) + reconcilers = append(reconcilers, engineReconciler) + } + + // 创建Decoder组件 + if request.MergedDecoder != nil { + s.log.Info("Creating decoder reconciler", + "deploymentMode", request.DeploymentModes.Decoder, + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + decoderReconciler := request.ComponentBuilderFactory.CreateDecoderComponent( + request.DeploymentModes.Decoder, + request.BaseModel, + request.BaseModelMeta, + request.MergedDecoder, + request.Runtime, + request.RuntimeName, + request.DecoderSupportedModelFormat, + request.DecoderAcceleratorClass, + request.DecoderAcceleratorClassName, + ) + reconcilers = append(reconcilers, decoderReconciler) + } + + // 创建Router组件 + if request.MergedRouter != nil { + s.log.Info("Creating router reconciler", + "deploymentMode", request.DeploymentModes.Router, + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + routerReconciler := request.ComponentBuilderFactory.CreateRouterComponent( + request.DeploymentModes.Router, + request.BaseModel, + request.BaseModelMeta, + request.MergedRouter, + request.Runtime, + request.RuntimeName, + ) + reconcilers = append(reconcilers, routerReconciler) + } + + // 运行所有reconcilers + for _, reconciler := range reconcilers { + result, err := reconciler.Reconcile(request.InferenceService) + if err != nil { + s.log.Error(err, "Failed to reconcile component", + "component", fmt.Sprintf("%T", reconciler), + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + return result, err + } + if result.Requeue || result.RequeueAfter > 0 { + return result, nil + } + } + + s.log.Info("SingleComponent strategy reconciliation completed", + "namespace", request.InferenceService.Namespace, + "inferenceService", request.InferenceService.Name) + + return reconcile.Result{}, nil +} diff --git a/pkg/controller/v1beta1/inferenceservice/workload/strategy.go b/pkg/controller/v1beta1/inferenceservice/workload/strategy.go new file mode 100644 index 00000000..ea9bcd5a --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/workload/strategy.go @@ -0,0 +1,40 @@ +package workload + +import ( + "context" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + ctrl "sigs.k8s.io/controller-runtime" +) + +// WorkloadStrategy 定义工作负载策略的统一接口 +// 所有工作负载策略(单组件、RBG、JobSet等)都需要实现此接口 +type WorkloadStrategy interface { + // GetStrategyName 返回策略名称 + GetStrategyName() string + + // IsApplicable 判断该策略是否适用于当前InferenceService + // 通过检查annotations和其他条件来决定 + IsApplicable(isvc *v1beta1.InferenceService, annotations map[string]string) bool + + // ValidateDeploymentModes 验证部署模式是否被该策略支持 + // 不同策略支持的部署模式可能不同 + ValidateDeploymentModes(modes *ComponentDeploymentModes) error + + // ReconcileWorkload 执行工作负载调谐 + // 这是策略的核心方法,负责创建和更新工作负载资源 + ReconcileWorkload(ctx context.Context, request *WorkloadReconcileRequest) (ctrl.Result, error) +} + +// WorkloadStatus 定义工作负载状态的统一接口 +// 用于抽象不同工作负载类型的状态信息 +type WorkloadStatus interface { + // GetComponentStatus 获取指定组件的状态 + GetComponentStatus(componentType v1beta1.ComponentType) *v1beta1.ComponentStatusSpec + + // IsReady 判断整体是否就绪 + IsReady() bool + + // GetConditions 获取状态条件 + GetConditions() []interface{} +} diff --git a/pkg/controller/v1beta1/inferenceservice/workload/types.go b/pkg/controller/v1beta1/inferenceservice/workload/types.go new file mode 100644 index 00000000..7abca2c0 --- /dev/null +++ b/pkg/controller/v1beta1/inferenceservice/workload/types.go @@ -0,0 +1,55 @@ +package workload + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/sgl-project/ome/pkg/apis/ome/v1beta1" + "github.com/sgl-project/ome/pkg/constants" + "github.com/sgl-project/ome/pkg/controller/v1beta1/inferenceservice/components" +) + +// WorkloadReconcileRequest 封装工作负载调谐请求参数 +// 包含执行工作负载调谐所需的所有信息 +type WorkloadReconcileRequest struct { + // InferenceService实例 + InferenceService *v1beta1.InferenceService + + // 基础模型信息 + BaseModel *v1beta1.BaseModelSpec + BaseModelMeta *metav1.ObjectMeta + + // 运行时信息 + Runtime *v1beta1.ServingRuntimeSpec + RuntimeName string + + // 合并后的组件规格 + MergedEngine *v1beta1.EngineSpec + MergedDecoder *v1beta1.DecoderSpec + MergedRouter *v1beta1.RouterSpec + + // 部署模式配置 + DeploymentModes *ComponentDeploymentModes + + // 组件构建工厂 + ComponentBuilderFactory *components.ComponentBuilderFactory + + // 是否用户指定运行时 + UserSpecifiedRuntime bool + + // AcceleratorClass信息(如果需要) + EngineAcceleratorClass *v1beta1.AcceleratorClassSpec + EngineAcceleratorClassName string + DecoderAcceleratorClass *v1beta1.AcceleratorClassSpec + DecoderAcceleratorClassName string + + // SupportedModelFormat信息(如果需要) + EngineSupportedModelFormat *v1beta1.SupportedModelFormat + DecoderSupportedModelFormat *v1beta1.SupportedModelFormat +} + +// ComponentDeploymentModes 封装各组件的部署模式 +type ComponentDeploymentModes struct { + Engine constants.DeploymentModeType + Decoder constants.DeploymentModeType + Router constants.DeploymentModeType +}