diff --git a/Transport-agnostic-dynamo/comm.png b/Transport-agnostic-dynamo/comm.png new file mode 100644 index 00000000..1403278a Binary files /dev/null and b/Transport-agnostic-dynamo/comm.png differ diff --git a/Transport-agnostic-dynamo/dyn_entities.png b/Transport-agnostic-dynamo/dyn_entities.png new file mode 100644 index 00000000..7d3b6a09 Binary files /dev/null and b/Transport-agnostic-dynamo/dyn_entities.png differ diff --git a/Transport-agnostic-dynamo/dynamo-entity.png b/Transport-agnostic-dynamo/dynamo-entity.png new file mode 100644 index 00000000..a342b89c Binary files /dev/null and b/Transport-agnostic-dynamo/dynamo-entity.png differ diff --git a/Transport-agnostic-dynamo/generic_event_req.jpg b/Transport-agnostic-dynamo/generic_event_req.jpg new file mode 100644 index 00000000..a82af26b Binary files /dev/null and b/Transport-agnostic-dynamo/generic_event_req.jpg differ diff --git a/Transport-agnostic-dynamo/net_manager.png b/Transport-agnostic-dynamo/net_manager.png new file mode 100644 index 00000000..5c589bd8 Binary files /dev/null and b/Transport-agnostic-dynamo/net_manager.png differ diff --git a/Transport-agnostic-dynamo/transport_agnostic_dynamo.md b/Transport-agnostic-dynamo/transport_agnostic_dynamo.md new file mode 100644 index 00000000..d463baa7 --- /dev/null +++ b/Transport-agnostic-dynamo/transport_agnostic_dynamo.md @@ -0,0 +1,370 @@ +# Dynamo runtime: Transport Agnostic Dynamo Pipelines +Status: Draft + +Authors: [biswapanda](https://github.com/biswapanda) + +Category: Architecture + +Reviewers: [Ryan Olson](https://github.com/ryanolson), [Neelay Shah](https://github.com/nnshah1), [Graham King](https://github.com/grahamking), [Maksim Khadkevich](https://github.com/hutm), [Rudy Pei](https://github.com/PeaBrane), [Kyle kranen](https://github.com/kkranen) + + +## Overview + +High level goal is to decouple the NATs (transport and object store) from the dynamo runtime. + +- introduce abstractions for current NATs usages (e.g. event plane, request plane & object store, etc) which can be used to plug different implementations. + +- deprecate NATs object store and reduce dependencies on NATs. + +## Requirements +- deliver messages across dynamo instances with at least once delivery guarantee. +- switch between transports at runtime. +- support long term architecure goals for Dynamo GA + +### Transport Agnostic API + +Dynamo communication primitives needs to support: +- peer-to-peer (req/reply: request plane) and scoped broadcasts (event plane) +- communication regimes: single process, single node (multi process) and multi-node +- transport options: NATs, Raw TCP, ZMQ, HTTP SSE, GRPC, UCX active messaging +- cancelation support for request/reply pattern +- events does not require response back + +### Deprecate NATs Object store usage + - Router snapshots are stored in NATs object store. + - Model files are stored in NATs object store. + +### Long term architectural goal support: + +- separation of Frontend (3-in-1 across in-process, same node or remote node) + +- HTTP based endpoint for one off usage of a component (KV router, etc) + +- batching/pipelining messages for Req/Responses: + - we can see a perf benefit by batching multiple requests together over a network round-trip. + +- Simplify `dynamo namespace` usage and process heirarchy (namespace, component, etc) + - `dynamo namespace` is causing unnecessaary cognitive complexity for end-users. + - Support mapping to more meaningful Grove concepts like PodClique, PodCliqueSet, etc. + +## Usage Patterns + +More details in [NATs use cases](#nats-use-cases) + +| Plane | Purpose / Protocol | Delivery Guarantee / Notes | Current NATs Usage Example | +|---------------|------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------| +| Request Plane | Protocol for request context, control messages, and request cancellation | At least once (with idempotent handlers); supports request/reply, cancellation, and timeouts | Service stats collection, direct requests | +| Event Plane | Delivery of KV events and other system events | At least once (preferred); events are idempotent, redelivered messages are ignored | KV router event publishing/consumption | +| Metrics | Collection and reporting of runtime/service metrics | At least once (metrics can be aggregated, duplicate-safe) | Service stats, health checks | +| Object Store | Ephemeral storage for model files and artifacts | At least once (object upload/download, may use JetStream Object Store) | Model file storage, file upload/download | + +**Notes:** +- All planes should support pluggable transports (NATs, ZMQ, HTTP SSE, GRPC, UCX, etc). +- At least once delivery is preferred for reliability; consumers must be idempotent. +- Request Plane requires protocol for cancellation and context propagation. +- Object Store usage via NATs is being deprecated in favor of more direct or persistent solutions. + + +## Proposal +![alt text](./dyn_entities.png) + +![alt text](./comm.png) + +### Transport Agnostic API + +![alt text](./generic_event_req.jpg) + +Use unique identifiers to identify the target entity. + +`DynamoEntity`: uniquely addessible compute unit (engine, instance, component), etc. + - dynamo application will use communication primitives: (publish, subscribe, request) will use Entity to route messages to the target in transport agnostic manner. + - An Opaque `u128` identifier that uniquely identifies a single instance or collection of instances (component, namespace, etc) + - each dynamo entity has a unique ID and can be used to communicate with other dynamo entities. + - can be used to identify the target and establish connection by `DynamoNetworkManager`. + - discovery service api can be used to find participating dynamo entities. + +`DynamoNetworkManager`: manages communication between dynamo entities. + - manages listening ports and client connections to remote peers + - responsible for serializing and deserializing messages for different transports + - responsible for handling timeout error, retry and cancellation for request/reply pattern + - responsible for sending and receiving messages + - Handles remote peer connections: in-process, local host or remote host + +![alt text](./net_manager.png) + + +High level `DynamoNetworkManager` API: + +Following high level network manager interface will be available to dynamo application layer to be used by component authors. + +``` +publish(topic: &str, message: InputType, entity: &DynamoEntity) -> Result<(), Error> + +subscribe(topic: &str, + handler: fn(message: InputType) -> Result<(), Error>, + entity: &DynamoEntity) -> Result<(), Error> + + +request(topic: &str, message: InputType, entity_id: u128) -> Future> +``` + +### PubSub interface: +Event plane will use pubsub interface to publish and subscribe to events. + +#### topics +Topics are used to group events of same nature for example: "kv_events", "metrics" etc. + +#### dynamo entity based filtering / scoping +Guiding principle: "flat is better than nested". +Inspired by Kubernetes experience (selectors and labels) and mongo document filter usage. + +DynamoEntity can be used to as filtering criteria for events publishing and subscribing. This matching will be done at the network layer and is transparent to the application layer. + +For example, an event published by publisher_entity will be delivered to subscriber_entity if the subscriber_entity's metadata matches (is a subset of) the publisher_entity's metadata. + +- metadata is fetched from service discovery layer and wire protocol does not (de)serialize metadata. + +- dynamo entity is used as a filter to identify the target entities to publish/subscribe so we can do this in absense of a dedicated broker service. This can be done in network manager layer. + +- perf: matching logic can be cached in network manager layer to avoid re-matching for each message. Service discovery watch can be used to invalidate the cache when the metadata changes. + + +``` +publisher_entity: DynamoEntity{ + metadata: {"k1": "v1", "k2": "v2"} +} + +// matches: {"k2": "v2"} is subset of publisher_entity's metadata {"k1": "v1", "k2": "v2"} +subscriber_entity: DynamoEntity{ + metadata: {"k2": "v2"} +} + +// matches: {"k1": "v1"} is subset of publisher_entity's metadata {"k1": "v1", "k2": "v2"} +subscriber_entity: DynamoEntity{ + metadata: {"k1": "v1"} +} + +// does not match +subscriber_entity: DynamoEntity{ + metadata: {"k1": "v2"} +} +``` + +### DynamoEntity + +#### Current problems: dynamo namespace and heirarchy based addressing scheme + +- we can't support multiple models in a single dynamo namespace. +Currently we use `dynamo_namespace.component.endpoint` as addressing scheme to orchestrate communication (p2p/ pubsub) across pods. This can cause cross-model communication because of lack of scoping. + +- current 3 level heirarchy (namespace, component, endpoint) is not capable of addressing pods with Grove. + We need extensible way to address specific pods within [grove heirarchy](https://github.com/NVIDIA/grove/blob/bcc412140323d5d781c2d16c9828befa2e965bb8/docs/assets/multinode-disaggregated.excalidraw.png) + +- Identify specific instance or a collection of instances within a deployment based on a query / selector. + + +#### Solution + +Improve addressing scheme for communication (p2p/ pubsub). + +Use proven data model to identify and query entities: +- Kubernetes labels and selectors +- SQL WHERE clause and ORMs (SQLAlchemy, Django, etc) +- MongoDB document filter +- Inspired by inodes and (parent) ProcessID from Operating System to represent heirarchy. + +DynamoEntity uniquely identifies a single instance or collection of instances (currently component, namespace, etc) across a deployment using a selector. + +1. Each dynamo process registers itself with the discovery service and advertises its metadata (similar to kubernetes labels). + +For example, a prefill worker in a deepseek deployment with model deepseek-r1-671b: + +Corresponding json serialized dynamo entity: +```json +{ + "id": "123", + "entity_type": "instance", + "name": "", + "metadata": { + "deployment_name": "deepseek", + "model_name": "deepseek-r1-671b", + "component": "prefill", + "role": "leader", + "rank": "0", + } +} +``` + +Router can query the discovery service to find the prefill leader workers. +Query `DynamoEntity` in serialized form: +```rust +// Create a dynamo entity to get all prefill leader workers +prefill_leaders = DynamoEntity::from_selector({ + "deployment_name": "deepseek", + "model_name": "deepseek-r1-671b", + "component": "prefill", + "role": "leader", +}) +``` + +Decode can publish KV events to All router workers by specifying the collection entity: +```rust +// Create a dynamo entity to get all router workers +routers = DynamoEntity::from_selector({ + "deployment_name": "deepseek", + "model_name": "deepseek-r1-671b", + "component": "router", +}) + +// Publish KV events to All router workers +publish("kv_events", kv_block_removed_event, routers) + +``` +![alt text](./dynamo-entity.png) + +Collection entity type are useful and can be mapped to [grove heirarchy](https://github.com/NVIDIA/grove/blob/bcc412140323d5d781c2d16c9828befa2e965bb8/docs/assets/multinode-disaggregated.excalidraw.png) like PodClique/PodCliqueSet/PodCliqueSet. + +- current `dynamo_namespace`: maps to a PodCliqueSet + it's children are components (PodClique) + +- current `component`: maps to a PodClique + children are instances (Pod) + +```rust +pub struct DynamoEntity { + pub id: u128, + pub entity_type: EntityType, + pub name: Option>, // (Optional) human-readable name + pub children: Vec, // (Optional) children for collection + pub metadata: HashMap, // (Optional) metadata for matching +} + +impl DynamoEntity { + /// Create from name with automatic hashing + pub fn from_name(name: &str, entity_type: EntityType) -> Self { + Self { + id: hash_name(name), + name: Some(Arc::new(name.to_string())), + entity_type: entity_type, + children: Vec::new(), + metadata: HashMap::new(), + } + } +} + +enum EntityType { + Instance, + Collection, +} +``` + +`EntityType`: single instance or collection of instances (currently component, namespace, etc) + +`name`: String (optional) +- Human-readable identifiers for debugging and logging +- Configuration files (YAML/JSON) +- Command-line interfaces +- Logging and observability + +### Object Store Interface + +// Todo: add clean interface for object store + +### Implementation +- Phase 1 + * degraded feature set + * not use KV router if they want. Best effort + * nats + * No HA guarantees for router + * Operate without high availability w/ single router +- Phase 2 + * explore transports + * durability + * exactly once delivery + +## Guiding principles + +### Generic Messaging Protocol +Decouple messaging protocol from the underlying transport like Raw TCP, ZMQ or (HTTP, GRPC, and UCX active message). + +Phased approach: start with Nats, ZMQ and HTTP SSE. +Later, incrementally expand to support more advanced transports, ensuring that the protocol remains adaptable to requirements. + +### Handshake and Closure Protocols: +Robust handshake and closure protocols, using sentinels and message headers to signal the end of stream or cancellation. +A common semantic for closing requests and handling errors, will be generalized across different transports. + +### Multipart Message Structure +Use a multipart message structure, inspired by ZMQ's native multipart support, to encapsulate headers, body, and control signals (such as closure control signals or error notifications). + +### Better Python-Rust Interoperability and Data class generation + +Improve Python-Rust interoperability, focusing on auto-generating Python data classes from Rust structs using Pydantic. +This way message schemas are aligned and we can reduce manual coding and serialization errors. + + +## Additional notes + +## NATs use cases + +### 1. NatsQueue python binding +- **Location**: `lib/bindings/python/rust/llm/nats.rs` (`NatsQueue`) +- **Functionality**: +- Deprecated: We don't use `NatsQueue` python binding anymore. We use `NatsQueue` rust binding instead. +- We can remove the python binding and the associated tests to simplify the codebase. + +### 2. JetStream-backed Queue/Event Bus +- **Location**: `lib/runtime/src/transports/nats.rs` (`NatsQueue`) +- **Functionality**: + - Stream creation per subject pattern `{stream_name}.*` + - Publisher-only, worker-group, and broadcast consumer modes + - Durable consumers with pull-based consumption + - Administrative operations (purge, consumer management) + +### 3. Event Publishing for KV Router +- **Location**: `lib/llm/src/kv_router/publisher.rs` +- **Functionality**: + - Publishes KV cache events from ZMQ or direct sources + - Uses `EventPublisher` trait to send events + +### 4. Event Consumption for KV Router +- **Location**: `lib/llm/src/kv_router/subscriber.rs` +- **Functionality**: + - Consumes `RouterEvent` messages via durable consumers + - Handles state snapshots and stream purging + +### 5. Object Store (JetStream Object Store) +- **Location**: `lib/runtime/src/transports/nats.rs` +- **Functionality**: + - File upload/download operations + - Typed data serialization with bincode + - Bucket management and cleanup + +### 6. Key-Value Store (JetStream KV) +- **Location**: `lib/runtime/src/storage/key_value_store/nats.rs` +- **Functionality**: + - Implements `KeyValueStore` trait + - CRUD operations with conflict resolution + - Watch streams for real-time updates + +### 7. Request/Reply Pattern +- **Location**: `lib/runtime/src/transports/nats.rs` +- **Functionality**: + - Service stats collection via broadcast requests + - Each service responds once to stats queries + +### 8 KVBM Nats usage (todo) + + +## Message Delivery Guarantees + +### At least once delivery (preferred) +- No message loss is possible. +- Message is delivered at least once to the consumers +- consumers should be idempotent and be able to handle duplicate messages. + +### Exactly once delivery +- needs stateful tracking of messages and ack/nack coordination to ensure exactly once delivery. + +### At most once delivery +- Message loss is possible. \ No newline at end of file diff --git a/brainstorm.md b/brainstorm.md new file mode 100644 index 00000000..4fde3a55 --- /dev/null +++ b/brainstorm.md @@ -0,0 +1,56 @@ + +Chat w/ Graham +- Replace model deployment cards fields nats:// URLs, with model express +- Direct call frontend -> backend for critical path of requests, replacing NATS mailbox +- Metrics fetching, euh, not sure. + + + +### transports + - ZeroMQ + - Raw TCP + - ZeroRPC + - Redis + - Kafka + - SQS + - GCP PubSub + - Azure Service Bus + +### Object store + - S3 + - Redis + - Shared filesystem + +LLM-d KV cache manager: +https://github.com/llm-d/llm-d-kv-cache-manager/tree/main + + +1. + + +1. Use RocksDB / local storage to persist messages on producer side to guarantee at least once delivery. +2. RocksMQ + +## KV Router are stateful. +- kv routers are stateful. +- Sharding +- Replication +- KV events are broadcasted to all routers. + + + +# ####################### LLM + +Allow mini batching of requests to the LLM. + +mini batching + +https://github.com/pathwaycom/pathway?tab=readme-ov-file#deployment + + + +## Rust <> Python interoperability + +using pydantic dataclasses to eliminate manual code to match rust structs to python classes + +