diff --git a/Cargo.lock b/Cargo.lock index f8fce91e3e..215d06903b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,18 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -578,6 +590,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -2393,6 +2416,8 @@ dependencies = [ "humantime", "inotify", "jsonschema", + "k8s-openapi", + "kube", "local-ip-address", "log", "nid", @@ -3363,6 +3388,18 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.27" @@ -3531,6 +3568,26 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.0", +] + +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if 1.0.3", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "hound" version = "3.5.1" @@ -3703,6 +3760,7 @@ dependencies = [ "http 1.3.1", "hyper 1.7.0", "hyper-util", + "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -4270,6 +4328,18 @@ dependencies = [ "unicode-general-category", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json5" version = "0.4.1" @@ -4281,6 +4351,29 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonpath-rust" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jsonschema" version = "0.17.1" @@ -4321,6 +4414,18 @@ dependencies = [ "rayon", ] +[[package]] +name = "k8s-openapi" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" +dependencies = [ + "base64 0.22.1", + "chrono", + "serde", + "serde_json", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -4331,6 +4436,115 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kube" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tower 0.5.2", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" +dependencies = [ + "chrono", + "derive_more 2.0.1", + "form_urlencoded", + "http 1.3.1", + "json-patch", + "k8s-openapi", + "schemars 1.0.4", + "serde", + "serde-value", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "kube-derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "kube-runtime" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "backon", + "educe", + "futures", + "hashbrown 0.15.5", + "hostname", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "kvbm-py3" version = "0.7.0" @@ -4967,7 +5181,7 @@ dependencies = [ "num-traits", "objc", "once_cell", - "ordered-float", + "ordered-float 5.1.0", "parking_lot", "radix_trie", "rand 0.9.2", @@ -5816,6 +6030,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "5.1.0" @@ -5921,6 +6144,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -7566,7 +7799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" dependencies = [ "dyn-clone", - "schemars_derive", + "schemars_derive 0.8.22", "serde", "serde_json", ] @@ -7591,6 +7824,7 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", + "schemars_derive 1.0.4", "serde", "serde_json", ] @@ -7607,6 +7841,18 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -7748,6 +7994,16 @@ dependencies = [ "typeid", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.1", + "serde", +] + [[package]] name = "serde_cbor" version = "0.11.2" @@ -8927,6 +9183,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -9243,12 +9500,14 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ + "base64 0.22.1", "bitflags 2.9.4", "bytes", "futures-util", "http 1.3.1", "http-body 1.0.1", "iri-string", + "mime", "pin-project-lite", "tower 0.5.2", "tower-layer", diff --git a/deploy/discovery/README.md b/deploy/discovery/README.md new file mode 100644 index 0000000000..b76c577123 --- /dev/null +++ b/deploy/discovery/README.md @@ -0,0 +1,57 @@ + + +# Dynamo Service Discovery + +## Overview + +By default, Dynamo discovers endpoints and model cards through etcd. An experimental Kubernetes backend is available for discovery that uses native Kubernetes EndpointSlices, eliminating the dependency on etcd. + +**Using DynamoGraphDeployment (Recommended):** + +When deploying with the Dynamo operator, simply add the annotation to your DGD manifest: + +```yaml +metadata: + annotations: + nvidia.com/dynamo-discover-backend: kubernetes +``` + +The operator will automatically configure the required EndpointSlices, labels, and pod environment variables. See [`dgd.yaml`](./dgd.yaml) for a complete example. + +## Environment Variables + +| **Variable** | **Description** | **Default** | +| ------------ | --------------- | ----------- | +| `DYN_DISCOVERY_BACKEND` | Discovery backend (`kv_store` for etcd or `kubernetes` for experimental EndpointSlice-based discovery) | `kv_store` | + +## Metadata Endpoint + +The Kubernetes backend exposes a `/metadata` endpoint on each pod that returns registered discovery information. This is used by the system status server to expose the discovery information to the clients on the discovery plane. + +### Example Request + +```bash +curl -s localhost:9090/metadata | jq +``` + +### Example Response + +```json +{ + "endpoints": { + "vllm-disagg/backend/generate": { + "component": "backend", + "endpoint": "generate", + "instance_id": 12345678901234567890, + "namespace": "vllm-disagg", + "transport": { + "nats_tcp": "vllm-disagg_backend.generate-abc123" + } + } + }, + "model_cards": {} +} +``` diff --git a/deploy/discovery/dgd.yaml b/deploy/discovery/dgd.yaml new file mode 100644 index 0000000000..1440fe99d9 --- /dev/null +++ b/deploy/discovery/dgd.yaml @@ -0,0 +1,110 @@ + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +apiVersion: nvidia.com/v1alpha1 +kind: DynamoGraphDeployment +metadata: + name: dynamo + labels: + nvidia.com/dynamo-discovery-backend: kubernetes +spec: + envs: + - name: DYN_LOG + value: "debug" + services: + Frontend: + dynamoNamespace: dynamo + componentType: frontend + replicas: 1 + envs: + - name: DYN_SYSTEM_PORT + value: "9090" + extraPodSpec: + mainContainer: + image: ${IMAGE} + VllmDecodeWorker: + dynamoNamespace: vllm-disagg + componentType: backend + replicas: 1 + resources: + limits: + gpu: "1" + envs: + - name: DYN_SYSTEM_PORT + value: "9090" + - name: DYN_SYSTEM_STARTING_HEALTH_STATUS + value: "notready" + - name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS + value: "[\"generate\", \"clear_kv_blocks\"]" + readinessProbe: + httpGet: + path: /health + port: 9090 + initialDelaySeconds: 60 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 60 + livenessProbe: + httpGet: + path: /live + port: 9090 + initialDelaySeconds: 60 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 60 + extraPodSpec: + terminationGracePeriodSeconds: 120 + mainContainer: + image: ${IMAGE} + workingDir: /workspace/components/backends/vllm + command: + - python3 + - -m + - dynamo.vllm + args: + - --model + - Qwen/Qwen3-0.6B + VllmPrefillWorker: + dynamoNamespace: vllm-disagg + componentType: prefill + replicas: 1 + resources: + limits: + gpu: "1" + envs: + - name: DYN_SYSTEM_PORT + value: "9090" + - name: DYN_SYSTEM_STARTING_HEALTH_STATUS + value: "notready" + - name: DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS + value: "[\"generate\", \"clear_kv_blocks\"]" + readinessProbe: + httpGet: + path: /health + port: 9090 + initialDelaySeconds: 60 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 60 + livenessProbe: + httpGet: + path: /live + port: 9090 + initialDelaySeconds: 60 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 60 + extraPodSpec: + terminationGracePeriodSeconds: 120 + mainContainer: + image: ${IMAGE} + workingDir: /workspace/components/backends/vllm + command: + - python3 + - -m + - dynamo.vllm + args: + - --model + - Qwen/Qwen3-0.6B + - --is-prefill-worker \ No newline at end of file diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index b4d4ec3ea9..8ac7b954aa 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -55,6 +55,12 @@ dependencies = [ "equator", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -173,6 +179,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-broadcast" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -467,6 +485,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1144,8 +1173,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", ] [[package]] @@ -1162,13 +1201,38 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", "quote", "syn 2.0.106", ] @@ -1265,7 +1329,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -1287,7 +1351,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" dependencies = [ - "derive_more-impl", + "derive_more-impl 1.0.0", +] + +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl 2.0.1", ] [[package]] @@ -1302,6 +1375,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "dialoguer" version = "0.11.0" @@ -1607,6 +1691,8 @@ dependencies = [ "futures", "humantime", "inotify", + "k8s-openapi", + "kube", "local-ip-address", "log", "nid", @@ -1621,6 +1707,7 @@ dependencies = [ "rand 0.9.2", "rayon", "regex", + "reqwest", "serde", "serde_json", "socket2 0.5.10", @@ -2387,6 +2474,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.4.12" @@ -2441,6 +2540,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash", ] @@ -2503,6 +2604,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "hostname" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" +dependencies = [ + "cfg-if 1.0.3", + "libc", + "windows-link 0.1.3", +] + [[package]] name = "http" version = "1.3.1" @@ -2587,6 +2699,7 @@ dependencies = [ "http", "hyper", "hyper-util", + "log", "rustls", "rustls-native-certs 0.8.1", "rustls-pki-types", @@ -3073,6 +3186,18 @@ dependencies = [ "unicode-general-category", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "json5" version = "0.4.1" @@ -3084,6 +3209,29 @@ dependencies = [ "serde", ] +[[package]] +name = "jsonpath-rust" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c00ae348f9f8fd2d09f82a98ca381c60df9e0820d8d79fce43e649b4dc3128b" +dependencies = [ + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jwalk" version = "0.8.1" @@ -3094,6 +3242,18 @@ dependencies = [ "rayon", ] +[[package]] +name = "k8s-openapi" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13f06d5326a915becaffabdfab75051b8cdc260c2a5c06c0e90226ede89a692" +dependencies = [ + "base64 0.22.1", + "chrono", + "serde", + "serde_json", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -3104,6 +3264,115 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kube" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e7bb0b6a46502cc20e4575b6ff401af45cfea150b34ba272a3410b78aa014e" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4987d57a184d2b5294fdad3d7fc7f278899469d21a4da39a8f6ca16426567a36" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures", + "home", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914bbb770e7bb721a06e3538c0edd2babed46447d128f7c21caa68747060ee73" +dependencies = [ + "chrono", + "derive_more 2.0.1", + "form_urlencoded", + "http", + "json-patch", + "k8s-openapi", + "schemars 1.0.4", + "serde", + "serde-value", + "serde_json", + "thiserror 2.0.16", +] + +[[package]] +name = "kube-derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03dee8252be137772a6ab3508b81cd797dee62ee771112a2453bc85cbbe150d2" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "kube-runtime" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aea4de4b562c5cc89ab10300bb63474ae1fa57ff5a19275f2e26401a323e3fd" +dependencies = [ + "ahash", + "async-broadcast", + "async-stream", + "backon", + "educe", + "futures", + "hashbrown 0.15.5", + "hostname", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lalrpop-util" version = "0.20.2" @@ -3284,7 +3553,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d149aaa2965d70381709d9df4c7ee1fc0de1c614a4efc2ee356f5e43d68749f8" dependencies = [ - "derive_more", + "derive_more 1.0.0", "malachite", "num-integer", "num-traits", @@ -3993,6 +4262,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -4073,6 +4351,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5397,10 +5685,23 @@ checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" dependencies = [ "dyn-clone", "ref-cast", + "schemars_derive", "serde", "serde_json", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -5473,10 +5774,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -5491,11 +5793,41 @@ dependencies = [ "typeid", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", @@ -5601,7 +5933,7 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.106", @@ -6197,6 +6529,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -6426,12 +6759,14 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ + "base64 0.22.1", "bitflags 2.9.3", "bytes", "futures-util", "http", "http-body", "iri-string", + "mime", "pin-project-lite", "tower", "tower-layer", @@ -6895,7 +7230,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" dependencies = [ - "darling", + "darling 0.20.11", "once_cell", "proc-macro-error2", "proc-macro2", diff --git a/lib/runtime/Cargo.toml b/lib/runtime/Cargo.toml index 343ee01f34..42770ef103 100644 --- a/lib/runtime/Cargo.toml +++ b/lib/runtime/Cargo.toml @@ -39,6 +39,7 @@ humantime = { workspace = true } parking_lot = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } @@ -75,6 +76,10 @@ regex = { version = "1" } socket2 = { version = "0.5.8" } tokio-rayon = { version = "2.1" } +# Kubernetes discovery backend +kube = { version = "2.0.1", default-features = false, features = ["runtime", "derive", "client", "rustls-tls", "aws-lc-rs"] } +k8s-openapi = { version = "0.26.0", features = ["v1_32"] } + [dev-dependencies] assert_matches = { version = "1.5.0" } criterion = { version = "0.5", features = ["async_tokio"] } diff --git a/lib/runtime/src/discovery/kube.rs b/lib/runtime/src/discovery/kube.rs new file mode 100644 index 0000000000..25b05fb83a --- /dev/null +++ b/lib/runtime/src/discovery/kube.rs @@ -0,0 +1,296 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod daemon; +mod utils; + +pub use utils::hash_pod_name; + +use daemon::DiscoveryDaemon; +use utils::PodInfo; + +use crate::CancellationToken; +use crate::discovery::{ + Discovery, DiscoveryEvent, DiscoveryInstance, DiscoveryMetadata, DiscoveryQuery, DiscoverySpec, + DiscoveryStream, MetadataSnapshot, +}; +use anyhow::Result; +use async_trait::async_trait; +use kube::Client as KubeClient; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Kubernetes-based discovery client +#[derive(Clone)] +pub struct KubeDiscoveryClient { + instance_id: u64, + metadata: Arc>, + metadata_watch: tokio::sync::watch::Receiver>, +} + +impl KubeDiscoveryClient { + /// Create a new Kubernetes discovery client + /// + /// # Arguments + /// * `metadata` - Shared metadata store (also used by system server) + /// * `cancel_token` - Cancellation token for shutdown + pub async fn new( + metadata: Arc>, + cancel_token: CancellationToken, + ) -> Result { + let pod_info = PodInfo::from_env()?; + let instance_id = hash_pod_name(&pod_info.pod_name); + + tracing::info!( + "Initializing KubeDiscoveryClient: pod_name={}, instance_id={:x}, namespace={}", + pod_info.pod_name, + instance_id, + pod_info.pod_namespace + ); + + let kube_client = KubeClient::try_default() + .await + .map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?; + + // Create watch channel with initial empty snapshot + let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty())); + + // Create and spawn daemon + let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?; + + tokio::spawn(async move { + if let Err(e) = daemon.run(watch_tx).await { + tracing::error!("Discovery daemon failed: {}", e); + } + }); + + tracing::info!("Discovery daemon started"); + + Ok(Self { + instance_id, + metadata, + metadata_watch: watch_rx, + }) + } +} + +#[async_trait] +impl Discovery for KubeDiscoveryClient { + fn instance_id(&self) -> u64 { + self.instance_id + } + + async fn register(&self, spec: DiscoverySpec) -> Result { + let instance_id = self.instance_id(); + let instance = spec.with_instance_id(instance_id); + + tracing::debug!( + "Registering instance: {:?} with instance_id={:x}", + instance, + instance_id + ); + + // Write to local metadata + let mut metadata = self.metadata.write().await; + match &instance { + DiscoveryInstance::Endpoint(inst) => { + tracing::info!( + "Registered endpoint: namespace={}, component={}, endpoint={}, instance_id={:x}", + inst.namespace, + inst.component, + inst.endpoint, + instance_id + ); + metadata.register_endpoint(instance.clone())?; + } + DiscoveryInstance::Model { + namespace, + component, + endpoint, + .. + } => { + tracing::info!( + "Registered model card: namespace={}, component={}, endpoint={}, instance_id={:x}", + namespace, + component, + endpoint, + instance_id + ); + metadata.register_model_card(instance.clone())?; + } + } + + Ok(instance) + } + + async fn list(&self, query: DiscoveryQuery) -> Result> { + tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query); + + // Get current snapshot (may be empty if daemon hasn't fetched yet) + let snapshot = self.metadata_watch.borrow().clone(); + + tracing::debug!( + "List using snapshot seq={} with {} instances", + snapshot.sequence, + snapshot.instances.len() + ); + + // Filter snapshot by query + let instances = snapshot.filter(&query); + + tracing::info!( + "KubeDiscoveryClient::list returning {} instances for query={:?}", + instances.len(), + query + ); + + Ok(instances) + } + + async fn list_and_watch( + &self, + query: DiscoveryQuery, + cancel_token: Option, + ) -> Result { + use tokio::sync::mpsc; + + tracing::info!( + "KubeDiscoveryClient::list_and_watch started for query={:?}", + query + ); + + // Clone the watch receiver + let mut watch_rx = self.metadata_watch.clone(); + + // Create output stream + let (event_tx, event_rx) = mpsc::unbounded_channel(); + + // Generate unique stream identifier for tracing + let stream_id = uuid::Uuid::new_v4(); + + // Spawn task to process snapshots + tokio::spawn(async move { + let mut known_instances = HashSet::::new(); + + tracing::debug!( + stream_id = %stream_id, + "Watch started for query={:?}", + query + ); + + loop { + // Wait for next snapshot or cancellation + let watch_result = if let Some(ref token) = cancel_token { + tokio::select! { + result = watch_rx.changed() => result, + _ = token.cancelled() => { + tracing::info!( + stream_id = %stream_id, + "Watch cancelled via cancel token" + ); + break; + } + } + } else { + watch_rx.changed().await + }; + + match watch_result { + Ok(()) => { + // Get latest snapshot + let snapshot = watch_rx.borrow_and_update().clone(); + + // Filter snapshot by query + let current_instances: HashSet = snapshot + .instances + .iter() + .filter_map(|(&instance_id, metadata)| { + let filtered = metadata.filter(&query); + if !filtered.is_empty() { + Some(instance_id) + } else { + None + } + }) + .collect(); + + // Compute diff + let added: Vec = current_instances + .difference(&known_instances) + .copied() + .collect(); + + let removed: Vec = known_instances + .difference(¤t_instances) + .copied() + .collect(); + + // Only log if there are changes + if !added.is_empty() || !removed.is_empty() { + tracing::debug!( + stream_id = %stream_id, + seq = snapshot.sequence, + added = added.len(), + removed = removed.len(), + total = current_instances.len(), + "Watch detected changes" + ); + } + + // Emit Added events + for instance_id in added { + if let Some(metadata) = snapshot.instances.get(&instance_id) { + let instances = metadata.filter(&query); + for instance in instances { + tracing::info!( + stream_id = %stream_id, + instance_id = format!("{:x}", instance.instance_id()), + "Emitting Added event" + ); + if event_tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() { + tracing::debug!( + stream_id = %stream_id, + "Watch receiver dropped" + ); + return; + } + } + } + } + + // Emit Removed events + for instance_id in removed { + tracing::info!( + stream_id = %stream_id, + instance_id = format!("{:x}", instance_id), + "Emitting Removed event" + ); + if event_tx + .send(Ok(DiscoveryEvent::Removed(instance_id))) + .is_err() + { + tracing::debug!(stream_id = %stream_id, "Watch receiver dropped"); + return; + } + } + + // Update known set + known_instances = current_instances; + } + Err(_) => { + tracing::info!( + stream_id = %stream_id, + "Watch channel closed (daemon stopped)" + ); + break; + } + } + } + }); + + // Convert receiver to stream + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(event_rx); + Ok(Box::pin(stream)) + } +} diff --git a/lib/runtime/src/discovery/kube/daemon.rs b/lib/runtime/src/discovery/kube/daemon.rs new file mode 100644 index 0000000000..4eee1287ad --- /dev/null +++ b/lib/runtime/src/discovery/kube/daemon.rs @@ -0,0 +1,312 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::CancellationToken; +use crate::discovery::{DiscoveryMetadata, MetadataSnapshot}; +use anyhow::Result; +use futures::StreamExt; +use k8s_openapi::api::discovery::v1::EndpointSlice; +use kube::{ + Api, Client as KubeClient, + runtime::{WatchStreamExt, reflector, watcher, watcher::Config}, +}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use super::utils::{PodInfo, extract_endpoint_info, hash_pod_name}; + +const SNAPSHOT_POLL_INTERVAL_MS: u64 = 5000; +const MAX_CONCURRENT_FETCHES: usize = 20; +const METADATA_FETCH_TIMEOUT_SECS: u64 = 5; + +/// Discovers and aggregates metadata from pods in the cluster +#[derive(Clone)] +pub(super) struct DiscoveryDaemon { + /// Kubernetes client + kube_client: KubeClient, + /// HTTP client for fetching remote metadata + http_client: reqwest::Client, + /// Cache of remote pod metadata (instance_id -> metadata) + cache: Arc>>>, + // This pod's info + pod_info: PodInfo, + cancel_token: CancellationToken, +} + +impl DiscoveryDaemon { + pub fn new( + kube_client: KubeClient, + pod_info: PodInfo, + cancel_token: CancellationToken, + ) -> Result { + let http_client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(METADATA_FETCH_TIMEOUT_SECS)) + .build() + .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?; + + Ok(Self { + kube_client, + http_client, + cache: Arc::new(RwLock::new(HashMap::new())), + pod_info, + cancel_token, + }) + } + + /// Run the discovery daemon + pub async fn run( + self, + watch_tx: tokio::sync::watch::Sender>, + ) -> Result<()> { + tracing::info!("Discovery daemon starting"); + + // Create reflector for ALL EndpointSlices in our namespace + let endpoint_slices: Api = + Api::namespaced(self.kube_client.clone(), &self.pod_info.pod_namespace); + + let (reader, writer) = reflector::store(); + + // Apply label selector to only watch discovery-enabled EndpointSlices + let watch_config = + Config::default().labels("nvidia.com/dynamo-discovery-backend=kubernetes"); + + tracing::info!( + "Daemon watching EndpointSlices with label: nvidia.com/dynamo-discovery-backend=kubernetes" + ); + + // Spawn reflector task (runs independently) + let reflector_stream = reflector(writer, watcher(endpoint_slices, watch_config)) + .default_backoff() + .touched_objects() + .for_each(|res| { + match res { + Ok(obj) => { + tracing::debug!( + slice_name = obj.metadata.name.as_deref().unwrap_or("unknown"), + "Daemon reflector updated EndpointSlice" + ); + } + Err(e) => { + tracing::warn!("Daemon reflector error: {}", e); + } + } + futures::future::ready(()) + }); + + tokio::spawn(reflector_stream); + + // Polling loop + let mut sequence = 0u64; + let mut prev_instance_ids: HashSet = HashSet::new(); + let mut interval = + tokio::time::interval(std::time::Duration::from_millis(SNAPSHOT_POLL_INTERVAL_MS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = interval.tick() => { + match self.aggregate_snapshot(&reader, sequence).await { + Ok(snapshot) => { + // Compare instance IDs to detect changes + let current_instance_ids: HashSet = + snapshot.instances.keys().copied().collect(); + + let instances_changed = current_instance_ids != prev_instance_ids; + + if instances_changed { + // Compute what was added and removed + let added: Vec = current_instance_ids + .difference(&prev_instance_ids) + .copied() + .collect(); + + let removed: Vec = prev_instance_ids + .difference(¤t_instance_ids) + .copied() + .collect(); + + tracing::info!( + "Daemon snapshot (seq={}): instances changed, total={}, added=[{}], removed=[{}]", + sequence, + current_instance_ids.len(), + added.iter().map(|id| format!("{:x}", id)).collect::>().join(", "), + removed.iter().map(|id| format!("{:x}", id)).collect::>().join(", ") + ); + + // Prune cache for removed instances + if !removed.is_empty() { + self.prune_cache(&removed).await; + } + + // Broadcast the snapshot (only when changed) + if watch_tx.send(Arc::new(snapshot)).is_err() { + tracing::debug!("No watch subscribers, daemon stopping"); + break; + } + + prev_instance_ids = current_instance_ids; + } else { + tracing::trace!( + "Daemon snapshot (seq={}): no changes, {} instances", + sequence, + current_instance_ids.len() + ); + } + + sequence += 1; + } + Err(e) => { + tracing::error!("Failed to aggregate snapshot: {}", e); + // Continue on errors - don't crash daemon + } + } + } + _ = self.cancel_token.cancelled() => { + tracing::info!("Discovery daemon received cancellation"); + break; + } + } + } + + tracing::info!("Discovery daemon stopped"); + Ok(()) + } + + /// Aggregate metadata from all pods into a snapshot + async fn aggregate_snapshot( + &self, + reader: &reflector::Store, + sequence: u64, + ) -> Result { + let start = std::time::Instant::now(); + + // Extract ALL ready endpoints (instance_id, pod_name, pod_ip) directly from reflector + let all_endpoints: Vec<(u64, String, String)> = reader + .state() + .iter() + .flat_map(|arc_slice| extract_endpoint_info(arc_slice.as_ref())) + .collect(); + + tracing::trace!( + "Daemon found {} ready endpoints to fetch", + all_endpoints.len() + ); + + // Concurrent fetch: Fetch metadata for all endpoints in parallel + let fetch_futures = all_endpoints + .into_iter() + .map(|(instance_id, pod_name, pod_ip)| { + let daemon = self.clone(); + async move { + match daemon.fetch_metadata(&pod_name, &pod_ip).await { + Ok(metadata) => Some((instance_id, metadata)), + Err(e) => { + tracing::warn!( + "Failed to fetch metadata for pod {} (instance_id={:x}): {}", + pod_name, + instance_id, + e + ); + None + } + } + } + }); + + // Execute fetches concurrently with bounded parallelism + let results: Vec<_> = futures::stream::iter(fetch_futures) + .buffer_unordered(MAX_CONCURRENT_FETCHES) + .collect() + .await; + + // Build the snapshot + let instances: HashMap> = + results.into_iter().flatten().collect(); + + let elapsed = start.elapsed(); + + tracing::trace!( + "Daemon snapshot complete (seq={}): {} instances in {:?}", + sequence, + instances.len(), + elapsed + ); + + Ok(MetadataSnapshot { + instances, + sequence, + timestamp: std::time::Instant::now(), + }) + } + + /// Fetch metadata for a single pod (with caching) + async fn fetch_metadata(&self, pod_name: &str, pod_ip: &str) -> Result> { + let instance_id = hash_pod_name(pod_name); + + // Check cache + { + let cache = self.cache.read().await; + if let Some(cached) = cache.get(&instance_id) { + tracing::trace!( + "Cache hit for pod_name={}, instance_id={:x}", + pod_name, + instance_id + ); + return Ok(cached.clone()); + } + } + + // Cache miss: fetch from HTTP + let url = format!("http://{}:{}/metadata", pod_ip, self.pod_info.system_port); + + tracing::debug!("Fetching metadata from {url}"); + + let response = self + .http_client + .get(&url) + .send() + .await + .map_err(|e| anyhow::anyhow!("Failed to fetch metadata from {}: {}", url, e))?; + + let metadata: DiscoveryMetadata = response + .json() + .await + .map_err(|e| anyhow::anyhow!("Failed to parse metadata from {}: {}", url, e))?; + + let metadata = Arc::new(metadata); + + // Cache it + { + let mut cache = self.cache.write().await; + // Check again in case another task inserted while we were fetching + if let Some(existing) = cache.get(&instance_id) { + tracing::debug!( + "Another task cached metadata for instance_id={:x} while we were fetching", + instance_id + ); + return Ok(existing.clone()); + } + + cache.insert(instance_id, metadata.clone()); + + tracing::debug!( + "Cached metadata for pod_name={}, instance_id={:x}", + pod_name, + instance_id + ); + } + + Ok(metadata) + } + + /// Prune cache entries for removed instances + async fn prune_cache(&self, removed_ids: &[u64]) { + let mut cache = self.cache.write().await; + for id in removed_ids { + if cache.remove(id).is_some() { + tracing::debug!("Pruned cache for removed instance_id={:x}", id); + } + } + } +} diff --git a/lib/runtime/src/discovery/kube/utils.rs b/lib/runtime/src/discovery/kube/utils.rs new file mode 100644 index 0000000000..6e1c5bee94 --- /dev/null +++ b/lib/runtime/src/discovery/kube/utils.rs @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use k8s_openapi::api::discovery::v1::EndpointSlice; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + +/// Hash a pod name to get a consistent instance ID +pub fn hash_pod_name(pod_name: &str) -> u64 { + let mut hasher = DefaultHasher::new(); + pod_name.hash(&mut hasher); + hasher.finish() +} + +/// Extract endpoint information from an EndpointSlice +/// Returns (instance_id, pod_name, pod_ip) tuples for ready endpoints +pub(super) fn extract_endpoint_info(slice: &EndpointSlice) -> Vec<(u64, String, String)> { + let mut result = Vec::new(); + + let endpoints = &slice.endpoints; + + for endpoint in endpoints { + // Check if endpoint is ready + let is_ready = endpoint + .conditions + .as_ref() + .and_then(|c| c.ready) + .unwrap_or(false); + + if !is_ready { + continue; + } + + // Get pod name from targetRef + let pod_name = match endpoint.target_ref.as_ref() { + Some(target_ref) => target_ref.name.as_deref().unwrap_or(""), + None => continue, + }; + + if pod_name.is_empty() { + continue; + } + + let instance_id = hash_pod_name(pod_name); + + // Get first IP only (avoid duplicate instance IDs) + if let Some(ip) = endpoint.addresses.first() { + result.push((instance_id, pod_name.to_string(), ip.clone())); + } + } + + result +} + +/// Pod information extracted from environment +#[derive(Debug, Clone)] +pub(super) struct PodInfo { + pub pod_name: String, + pub pod_namespace: String, + pub system_port: u16, +} + +impl PodInfo { + /// Discover pod information from environment variables + pub fn from_env() -> Result { + let pod_name = std::env::var("POD_NAME") + .map_err(|_| anyhow::anyhow!("POD_NAME environment variable not set"))?; + + let pod_namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| { + tracing::warn!("POD_NAMESPACE not set, defaulting to 'default'"); + "default".to_string() + }); + + // Read system server port from config + let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); + let system_port = config.system_port as u16; + + Ok(Self { + pod_name, + pod_namespace, + system_port, + }) + } +} diff --git a/lib/runtime/src/discovery/metadata.rs b/lib/runtime/src/discovery/metadata.rs new file mode 100644 index 0000000000..86437383f5 --- /dev/null +++ b/lib/runtime/src/discovery/metadata.rs @@ -0,0 +1,318 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use std::collections::HashMap; +use std::sync::Arc; + +use super::{DiscoveryInstance, DiscoveryQuery}; + +/// Key for organizing metadata internally +/// Format: "namespace/component/endpoint" +fn make_endpoint_key(namespace: &str, component: &str, endpoint: &str) -> String { + format!("{namespace}/{component}/{endpoint}") +} + +/// Metadata stored on each pod and exposed via HTTP endpoint +/// This struct holds all discovery registrations for this pod instance +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DiscoveryMetadata { + /// Registered endpoint instances (key: "namespace/component/endpoint") + endpoints: HashMap, + /// Registered model card instances (key: "namespace/component/endpoint") + model_cards: HashMap, +} + +impl DiscoveryMetadata { + /// Create a new empty metadata store + pub fn new() -> Self { + Self { + endpoints: HashMap::new(), + model_cards: HashMap::new(), + } + } + + /// Register an endpoint instance + pub fn register_endpoint(&mut self, instance: DiscoveryInstance) -> Result<()> { + if let DiscoveryInstance::Endpoint(ref inst) = instance { + let key = make_endpoint_key(&inst.namespace, &inst.component, &inst.endpoint); + self.endpoints.insert(key, instance); + Ok(()) + } else { + anyhow::bail!("Cannot register non-endpoint instance as endpoint") + } + } + + /// Register a model card instance + pub fn register_model_card(&mut self, instance: DiscoveryInstance) -> Result<()> { + if let DiscoveryInstance::Model { + ref namespace, + ref component, + ref endpoint, + .. + } = instance + { + let key = make_endpoint_key(namespace, component, endpoint); + self.model_cards.insert(key, instance); + Ok(()) + } else { + anyhow::bail!("Cannot register non-model-card instance as model card") + } + } + + /// Get all registered endpoints + pub fn get_all_endpoints(&self) -> Vec { + self.endpoints.values().cloned().collect() + } + + /// Get all registered model cards + pub fn get_all_model_cards(&self) -> Vec { + self.model_cards.values().cloned().collect() + } + + /// Get all registered instances (endpoints and model cards) + pub fn get_all(&self) -> Vec { + self.endpoints + .values() + .chain(self.model_cards.values()) + .cloned() + .collect() + } + + /// Filter this metadata by query + pub fn filter(&self, query: &DiscoveryQuery) -> Vec { + let all_instances = match query { + DiscoveryQuery::AllEndpoints + | DiscoveryQuery::NamespacedEndpoints { .. } + | DiscoveryQuery::ComponentEndpoints { .. } + | DiscoveryQuery::Endpoint { .. } => self.get_all_endpoints(), + + DiscoveryQuery::AllModels + | DiscoveryQuery::NamespacedModels { .. } + | DiscoveryQuery::ComponentModels { .. } + | DiscoveryQuery::EndpointModels { .. } => self.get_all_model_cards(), + }; + + filter_instances(all_instances, query) + } +} + +impl Default for DiscoveryMetadata { + fn default() -> Self { + Self::new() + } +} + +/// Filter instances by query predicate +fn filter_instances( + instances: Vec, + query: &DiscoveryQuery, +) -> Vec { + match query { + DiscoveryQuery::AllEndpoints | DiscoveryQuery::AllModels => instances, + + DiscoveryQuery::NamespacedEndpoints { namespace } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Endpoint(i) => &i.namespace == namespace, + _ => false, + }) + .collect(), + + DiscoveryQuery::ComponentEndpoints { + namespace, + component, + } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Endpoint(i) => { + &i.namespace == namespace && &i.component == component + } + _ => false, + }) + .collect(), + + DiscoveryQuery::Endpoint { + namespace, + component, + endpoint, + } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Endpoint(i) => { + &i.namespace == namespace + && &i.component == component + && &i.endpoint == endpoint + } + _ => false, + }) + .collect(), + + DiscoveryQuery::NamespacedModels { namespace } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Model { namespace: ns, .. } => ns == namespace, + _ => false, + }) + .collect(), + + DiscoveryQuery::ComponentModels { + namespace, + component, + } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Model { + namespace: ns, + component: comp, + .. + } => ns == namespace && comp == component, + _ => false, + }) + .collect(), + + DiscoveryQuery::EndpointModels { + namespace, + component, + endpoint, + } => instances + .into_iter() + .filter(|inst| match inst { + DiscoveryInstance::Model { + namespace: ns, + component: comp, + endpoint: ep, + .. + } => ns == namespace && comp == component && ep == endpoint, + _ => false, + }) + .collect(), + } +} + +/// Snapshot of all discovered instances and their metadata +#[derive(Clone, Debug)] +pub struct MetadataSnapshot { + /// Map of instance_id -> metadata + pub instances: HashMap>, + /// Sequence number for debugging + pub sequence: u64, + /// Timestamp for observability + pub timestamp: std::time::Instant, +} + +impl MetadataSnapshot { + pub fn empty() -> Self { + Self { + instances: HashMap::new(), + sequence: 0, + timestamp: std::time::Instant::now(), + } + } + + /// Filter all instances in the snapshot by query + pub fn filter(&self, query: &DiscoveryQuery) -> Vec { + self.instances + .values() + .flat_map(|metadata| metadata.filter(query)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::component::{Instance, TransportType}; + + #[test] + fn test_metadata_serde() { + let mut metadata = DiscoveryMetadata::new(); + + // Add an endpoint + let instance = DiscoveryInstance::Endpoint(Instance { + namespace: "test".to_string(), + component: "comp1".to_string(), + endpoint: "ep1".to_string(), + instance_id: 123, + transport: TransportType::NatsTcp("nats://localhost:4222".to_string()), + }); + + metadata.register_endpoint(instance).unwrap(); + + // Serialize + let json = serde_json::to_string(&metadata).unwrap(); + + // Deserialize + let deserialized: DiscoveryMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(deserialized.endpoints.len(), 1); + assert_eq!(deserialized.model_cards.len(), 0); + } + + #[tokio::test] + async fn test_concurrent_registration() { + use tokio::sync::RwLock; + + let metadata = Arc::new(RwLock::new(DiscoveryMetadata::new())); + + // Spawn multiple tasks registering concurrently + let handles: Vec<_> = (0..10) + .map(|i| { + let metadata = metadata.clone(); + tokio::spawn(async move { + let mut meta = metadata.write().await; + let instance = DiscoveryInstance::Endpoint(Instance { + namespace: "test".to_string(), + component: "comp1".to_string(), + endpoint: format!("ep{}", i), + instance_id: i, + transport: TransportType::NatsTcp("nats://localhost:4222".to_string()), + }); + meta.register_endpoint(instance).unwrap(); + }) + }) + .collect(); + + // Wait for all to complete + for handle in handles { + handle.await.unwrap(); + } + + // Verify all registrations succeeded + let meta = metadata.read().await; + assert_eq!(meta.endpoints.len(), 10); + } + + #[tokio::test] + async fn test_metadata_accessors() { + let mut metadata = DiscoveryMetadata::new(); + + // Register endpoints + for i in 0..3 { + let instance = DiscoveryInstance::Endpoint(Instance { + namespace: "test".to_string(), + component: "comp1".to_string(), + endpoint: format!("ep{}", i), + instance_id: i, + transport: TransportType::NatsTcp("nats://localhost:4222".to_string()), + }); + metadata.register_endpoint(instance).unwrap(); + } + + // Register model cards + for i in 0..2 { + let instance = DiscoveryInstance::Model { + namespace: "test".to_string(), + component: "comp1".to_string(), + endpoint: format!("ep{}", i), + instance_id: i, + card_json: serde_json::json!({"model": "test"}), + }; + metadata.register_model_card(instance).unwrap(); + } + + assert_eq!(metadata.get_all_endpoints().len(), 3); + assert_eq!(metadata.get_all_model_cards().len(), 2); + assert_eq!(metadata.get_all().len(), 5); + } +} diff --git a/lib/runtime/src/discovery/mod.rs b/lib/runtime/src/discovery/mod.rs index 3b10856e0d..8ac31e7c66 100644 --- a/lib/runtime/src/discovery/mod.rs +++ b/lib/runtime/src/discovery/mod.rs @@ -8,10 +8,17 @@ use serde::{Deserialize, Serialize}; use std::pin::Pin; use tokio_util::sync::CancellationToken; +mod metadata; +pub use metadata::{DiscoveryMetadata, MetadataSnapshot}; + mod mock; pub use mock::{MockDiscovery, SharedMockRegistry}; mod kv_store; pub use kv_store::KVStoreDiscovery; + +mod kube; +pub use kube::{KubeDiscoveryClient, hash_pod_name}; + pub mod utils; use crate::component::TransportType; pub use utils::watch_and_extract_field; diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 00277a9c7a..7eeadfdd01 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -49,6 +49,10 @@ pub struct DistributedRuntime { // Service discovery client discovery_client: Arc, + // Discovery metadata (only used for Kubernetes backend) + // Shared with system status server to expose via /metadata endpoint + discovery_metadata: Option>>, + // local registry for components // the registry allows us to use share runtime resources across instances of the same component object. // take for example two instances of a client to the same remote component. The registry allows us to use @@ -134,13 +138,37 @@ impl DistributedRuntime { let nats_client_for_metrics = nats_client.clone(); - // Initialize discovery backed by KV store - let discovery_client = { - use crate::discovery::KVStoreDiscovery; - Arc::new(KVStoreDiscovery::new( - store.clone(), - runtime.primary_token(), - )) as Arc + // Initialize discovery client based on backend configuration + let discovery_backend = + std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string()); + + let (discovery_client, discovery_metadata) = match discovery_backend.as_str() { + "kubernetes" => { + tracing::info!("Initializing Kubernetes discovery backend"); + let metadata = Arc::new(tokio::sync::RwLock::new( + crate::discovery::DiscoveryMetadata::new(), + )); + let client = crate::discovery::KubeDiscoveryClient::new( + metadata.clone(), + runtime.primary_token(), + ) + .await + .inspect_err( + |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"), + )?; + (Arc::new(client) as Arc, Some(metadata)) + } + _ => { + tracing::info!("Initializing KV store discovery backend"); + use crate::discovery::KVStoreDiscovery; + ( + Arc::new(KVStoreDiscovery::new( + store.clone(), + runtime.primary_token(), + )) as Arc, + None, + ) + } }; let distributed_runtime = Self { @@ -151,6 +179,7 @@ impl DistributedRuntime { tcp_server: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), discovery_client, + discovery_metadata, component_registry: component::Registry::new(), is_static, instance_sources: Arc::new(Mutex::new(HashMap::new())), @@ -194,6 +223,7 @@ impl DistributedRuntime { port, cancel_token, Arc::new(distributed_runtime.clone()), + distributed_runtime.discovery_metadata.clone(), ) .await { @@ -283,7 +313,7 @@ impl DistributedRuntime { } pub fn connection_id(&self) -> u64 { - self.store.connection_id() + self.discovery_client.instance_id() } pub fn shutdown(&self) { diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index 2ebe20cdc5..d0a46f01dc 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -56,18 +56,33 @@ impl Clone for SystemStatusServerInfo { pub struct SystemStatusState { // global drt registry is for printing out the entire Prometheus format output root_drt: Arc, + // Discovery metadata (only for Kubernetes backend) + discovery_metadata: Option>>, } impl SystemStatusState { /// Create new system status server state with the provided distributed runtime - pub fn new(drt: Arc) -> anyhow::Result { - Ok(Self { root_drt: drt }) + pub fn new( + drt: Arc, + discovery_metadata: Option>>, + ) -> anyhow::Result { + Ok(Self { + root_drt: drt, + discovery_metadata, + }) } /// Get a reference to the distributed runtime pub fn drt(&self) -> &crate::DistributedRuntime { &self.root_drt } + + /// Get a reference to the discovery metadata if available + pub fn discovery_metadata( + &self, + ) -> Option<&Arc>> { + self.discovery_metadata.as_ref() + } } /// Start system status server with metrics support @@ -76,9 +91,10 @@ pub async fn spawn_system_status_server( port: u16, cancel_token: CancellationToken, drt: Arc, + discovery_metadata: Option>>, ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { // Create system status server state with the provided distributed runtime - let server_state = Arc::new(SystemStatusState::new(drt)?); + let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?); let health_path = server_state .drt() .system_health() @@ -114,6 +130,13 @@ pub async fn spawn_system_status_server( move || metrics_handler(state) }), ) + .route( + "/metadata", + get({ + let state = Arc::clone(&server_state); + move || metadata_handler(state) + }), + ) .fallback(|| async { tracing::info!("[fallback handler] called"); (StatusCode::NOT_FOUND, "Route not found").into_response() @@ -207,6 +230,42 @@ async fn metrics_handler(state: Arc) -> impl IntoResponse { (StatusCode::OK, response) } +/// Metadata handler +#[tracing::instrument(skip_all, level = "trace")] +async fn metadata_handler(state: Arc) -> impl IntoResponse { + // Check if discovery metadata is available + let metadata = match state.discovery_metadata() { + Some(metadata) => metadata, + None => { + tracing::debug!("Metadata endpoint called but no discovery metadata available"); + return ( + StatusCode::NOT_FOUND, + "Discovery metadata not available".to_string(), + ) + .into_response(); + } + }; + + // Read the metadata + let metadata_guard = metadata.read().await; + + // Serialize to JSON + match serde_json::to_string(&*metadata_guard) { + Ok(json) => { + tracing::trace!("Returning metadata: {} bytes", json.len()); + (StatusCode::OK, json).into_response() + } + Err(e) => { + tracing::error!("Failed to serialize metadata: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to serialize metadata".to_string(), + ) + .into_response() + } + } +} + // Regular tests: cargo test system_status_server --lib #[cfg(test)] mod tests {