From 5ba584aba6cba31e226cd9df83a14ef63f372912 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 17 Dec 2025 19:27:40 -0800 Subject: [PATCH 1/4] Added a glossary --- docs/source/background/channels.md | 18 ++-- docs/source/background/nodes.md | 10 +- .../source/background/shuffle-architecture.md | 94 +++++++++---------- docs/source/background/streaming-engine.md | 62 ++++-------- docs/source/glossary.md | 86 +++++++++++++++++ docs/source/index.md | 5 +- docs/source/quickstart.md | 4 +- 7 files changed, 168 insertions(+), 111 deletions(-) create mode 100644 docs/source/glossary.md diff --git a/docs/source/background/channels.md b/docs/source/background/channels.md index ee92190e9..2c9b7d77e 100644 --- a/docs/source/background/channels.md +++ b/docs/source/background/channels.md @@ -1,7 +1,7 @@ # Channels -Channels are asynchronous messaging queue used to move messages between {term}`Node`s in the rapidsmpf streaming network. +{term}`Channel`s are asynchronous messaging queues used to move {term}`Message`s between {term}`Node`s in the rapidsmpf streaming {term}`Network`. ```{image} ../_static/animation-legend.png :width: 320px @@ -33,11 +33,11 @@ As buffers move through the graph, the channels (arrows) move from empty (dashed *fig: example streaming network with 3 Nodes and 2 Channels* Components: - • Node: Coroutine that processes messages - • Channel: Async queue connecting nodes - • Message: GPU Buffer with a CUDA Stream + • {term}`Node`: Coroutine that processes {term}`Message`s + • {term}`Channel`: Async queue connecting nodes + • {term}`Message`: GPU {term}`Buffer` with a CUDA Stream -In the above graph, moving data in and out of channels on a single GPU should be relatively cheap, nearly free! This stratedy of using channels to move tasks/buffers is a core methodology for rapidsmpf to overlap: scans, compute, spilling, and communication. +In the above graph, moving data in and out of {term}`Channel`s on a single GPU should be relatively cheap, nearly free! This strategy of using channels to move tasks/{term}`Buffer`s is a core methodology for rapidsmpf to overlap: scans, compute, {term}`spilling`, and communication. ## Backpressure @@ -62,7 +62,7 @@ Producer Side: Consumer Side: Key Properties: • Non-blocking: Coroutines suspend, not threads • Backpressure: Slow consumers throttle producers - • Type-safe: Messages are type-erased but validated + • Type-safe: {term}`Message`s are type-erased but validated A Consumer is **"full"** when an internal ring_buffer `coro::ring_buffer rb_;` has reached capacity. @@ -88,8 +88,8 @@ for (int i = 0; i < n_producer; i++) { } ``` -Internally, when using a `throttle` a Node that writes into a channel must acquire a ticket granting permission to write before being able to. The write/send then returns a receipt that grants permission to release the ticket. The consumer of a throttled channel reads messages without issue. This means that the throttle is localised to the producer nodes. +Internally, when using a `throttle` a {term}`Node` that writes into a {term}`Channel` must acquire a ticket granting permission to write before being able to. The write/send then returns a receipt that grants permission to release the ticket. The consumer of a throttled channel reads {term}`Message`s without issue. This means that the throttle is localised to the producer nodes. -More simply, using a throttling adaptor limits the number messages a producer writes into a channel. This pattern is very useful for producer nodes where we want some amount of bounded concurrency in the tasks that might suspend before sending into a channel -- especially useful when trying to minimize the over-production of long-lived memory: reads/scans, shuffles, etc. +More simply, using a throttling adaptor limits the number of {term}`Message`s a producer writes into a {term}`Channel`. This pattern is very useful for producer {term}`Node`s where we want some amount of bounded concurrency in the tasks that might suspend before sending into a channel -- especially useful when trying to minimize the over-production of long-lived memory: reads/scans, shuffles, etc. -eg. a source node that read files. `ThrottlingAdaptor` will allow the node to delay reading files, until it has acquired a ticket to send a message to the channel. In comparison, non-throttling channels will suspend during send by which time, the files have already loaded into the memory unnecessarily +e.g. a source node that reads files. `ThrottlingAdaptor` will allow the {term}`Node` to delay reading files, until it has acquired a ticket to send a {term}`Message` to the {term}`Channel`. In comparison, non-throttling channels will suspend during send by which time, the files have already loaded into the memory unnecessarily. diff --git a/docs/source/background/nodes.md b/docs/source/background/nodes.md index e0e37fc93..432dae668 100644 --- a/docs/source/background/nodes.md +++ b/docs/source/background/nodes.md @@ -1,6 +1,6 @@ # Nodes -Nodes are coroutine-based asynchronous relational operators that read from +{term}`Node`s are coroutine-based asynchronous relational operators that read from zero-or-more {term}`Channel`s and write to zero-or-more {term}`Channel`s within a {term}`Network`. **C++** @@ -49,12 +49,12 @@ async def accumulator(ch_out, ch_in, msg): ## Node Types -Nodes fall into two categories: +{term}`Node`s fall into two categories: - Local Nodes: These include operations like filtering, projection, or column-wise transforms. They operate exclusively on local data and preserve CSP semantics. -- Collective Nodes: These include operations like shuffle, join, groupby aggregations, etc. which require access to distributed data. +- Collective Nodes: These include {term}`Collective Operation`s like {term}`Shuffler`, join, groupby aggregations, etc. which require access to distributed data. -In the case of a collective nodes, remote communication is handled internally. For example, a shuffle node may need to access all partitions of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder "Channels" are an abstraction and not used to serialize and pass data between workers +In the case of collective {term}`Node`s, remote communication is handled internally. For example, a shuffle node may need to access all {term}`Partition`s of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder {term}`Channel`s are an abstraction and not used to serialize and pass data between workers. This hybrid model, which combines a SPMD-style distribution model and a local CSP-style streaming model, offers several advantages: @@ -64,4 +64,4 @@ This hybrid model, which combines a SPMD-style distribution model and a local CS - It makes inter-worker parallelism explicit through SPMD-style communication. -For examples of communication nodes and collective operations please read the [shuffle architecture page](./shuffle-architecture.md) +For examples of communication nodes and collective operations please read the [shuffle architecture page](./shuffle-architecture.md). diff --git a/docs/source/background/shuffle-architecture.md b/docs/source/background/shuffle-architecture.md index 660c3447e..77cf4a604 100644 --- a/docs/source/background/shuffle-architecture.md +++ b/docs/source/background/shuffle-architecture.md @@ -7,9 +7,9 @@ machines. The key requirement is that there exist communication links between the GPUs. The core abstraction that encapsulates the set of processes that are -executing collectively is a `Communicator`. This provides unique -identifiers (termed `rank`s) to each process along with message-passing -routes between them. We provide communicator implementations based either +executing collectively is a {term}`Communicator`. This provides unique +identifiers (termed {term}`Rank`s) to each process along with message-passing +routes between them. We provide {term}`Communicator` implementations based either directly on [UCX](https://openucx.org/)/[UCXX](https://github.com/rapidsai/ucxx) or [MPI](https://www.mpi-forum.org). Message passing handles CPU and GPU data uniformly, the underlying transport takes care of choosing the appropriate @@ -17,114 +17,114 @@ route. ## "Streaming" collective operations -`rapidsmpf` provides collectives (i.e. communication +`rapidsmpf` provides {term}`Collective Operation`s (i.e. communication primitives) that operate on "streaming" data. As a consequence, a round of collective communication proceeds in four stages: -1. Participating ranks (defined by the `Communicator`) create a +1. Participating {term}`Rank`s (defined by the {term}`Communicator`) create a collective object. -2. Each rank independently _inserts_ zero-or-more data chunks into the +2. Each {term}`Rank` independently _inserts_ zero-or-more data chunks into the collective object. -3. Once a rank has inserted all data chunks, it inserts a _finish marker_. -4. After insertion is finished, a rank can _extract_ data that is the +3. Once a {term}`Rank` has inserted all data chunks, it inserts a _finish marker_. +4. After insertion is finished, a {term}`Rank` can _extract_ data that is the result of the collective communication. This may block until data are ready. -Collectives over subsets of all ranks in the program are enabled by -creating a `Communicator` object that only contains the desired -participating ranks. +Collectives over subsets of all {term}`Rank`s in the program are enabled by +creating a {term}`Communicator` object that only contains the desired +participating {term}`Rank`s. Multiple collective operations can be live at the same time, they are each distinguished by a `tag`. This `tag` must be consistent across all participating ranks to line up the messages in the collective. -Notice that we are not responsible for providing the output buffers that a +Notice that we are not responsible for providing the output {term}`Buffer`s that a collective writes into. This is a consequence of the streaming design: to -allocate output buffers of the correct size we would first have to see all -inputs. Instead `rapidsmpf` is responsible for allocation of output buffers -and spilling data from device to host if device memory is at a premium. +allocate output {term}`Buffer`s of the correct size we would first have to see all +inputs. Instead `rapidsmpf` is responsible for allocation of output {term}`Buffer`s +and {term}`Spilling` data from device to host if device memory is at a premium. However, although `rapidsmpf` allocates outputs it never interprets your data: it just sends and receives bytes "as-is". ## Shuffles -A key collective operation in large-scale data analytics is a "shuffle" -(a generalised all-to-all). In a shuffle, every participating rank sends -data to every other rank. We will walk through a high-level overview of the +A key {term}`Collective Operation` in large-scale data analytics is a "shuffle" +(a generalised all-to-all). In a shuffle, every participating {term}`Rank` sends +data to every other {term}`Rank`. We will walk through a high-level overview of the steps in a shuffle using `rapidsmpf` to see how things fit together. -Having created a collective shuffle operation (a `rapidsmpf::Shuffler`), at +Having created a collective shuffle operation (a {term}`Shuffler`), at a high level, a shuffle operation involves these steps: -1. [user code] Each rank *inserts* **chunks** of data to the Shuffler, +1. [user code] Each {term}`Rank` *inserts* **chunks** of data to the {term}`Shuffler`, followed by a finish marker. -2. [rapidsmpf] The Shuffler on that rank processes that chunk by either sending it to - another rank or keeping it for itself. -3. [user code] Each rank *extracts* chunks of data from each once it's +2. [rapidsmpf] The {term}`Shuffler` on that {term}`Rank` processes that chunk by either sending it to + another {term}`Rank` or keeping it for itself. +3. [user code] Each {term}`Rank` *extracts* chunks of data from each once it's ready. -There are more details around how chunks are assigned to output ranks and how memory is +There are more details around how chunks are assigned to output {term}`Rank`s and how memory is managed. But at a high level, your program is responsible for inserting chunks somewhere and extracting (the now shuffled) chunks once they've been moved to -the correct rank. +the correct {term}`Rank`. -This diagram shows a network of with three ranks in the middle of a Shuffle operation. +This diagram shows a {term}`Network` with three {term}`Rank`s in the middle of a Shuffle operation. ![A diagram showing a shuffle.](../_static/rapidsmpf-shuffler-transparent-fs8.png) As your program inserts chunks of data (see below), each chunk is assigned to -a particular rank. In the diagram above, this is shown by color: each -process (recall a process is uniquely identified by a `(rank, -communicator)` pair) has a particular color (the color of its circle) and each chunk with that color will -be sent to its matching rank. So, for example, all of the green chunks will be +a particular {term}`Rank`. In the diagram above, this is shown by color: each +process (recall a process is uniquely identified by a `({term}`Rank`, +{term}`Communicator`)` pair) has a particular color (the color of its circle) and each chunk with that color will +be sent to its matching {term}`Rank`. So, for example, all of the green chunks will be extracted from the green process in the top-left. Note that the number of different -chunk types (colors in this diagram) is typically larger than the number of ranks, +chunk types (colors in this diagram) is typically larger than the number of {term}`Rank`s, and so each process will be responsible for multiple output chunk types. The process you insert the chunk on is responsible for getting the data to the -correct output rank. It does so by placing the chunk in its **Outgoing** message +correct output {term}`Rank`. It does so by placing the chunk in its **Outgoing** message box and then working to send it (shown by the black lines connecting the processes). Internally, the processes involved in a shuffle continuously - receive newly inserted chunks from your program -- move chunks to their intended ranks -- receive chunks from other ranks +- move chunks to their intended {term}`Rank`s +- receive chunks from other {term}`Rank`s - hand off *ready* chunks when your program extracts them -During a shuffle, device memory might run low on more or more processes . `rapidsmpf` is able to *spill* chunks of data from device memory to a +During a shuffle, device memory might run low on one or more processes. `rapidsmpf` is able to *spill* chunks of data from device memory to a larger pool (e.g. host memory). In the diagram above, this is shown by the -hatched chunks. +hatched chunks. See {term}`Spilling` for more details. ### Example: Shuffle a Table on a Column -The `rapidsmpf` Shuffler operates on **chunks** of data, without really caring +The `rapidsmpf` {term}`Shuffler` operates on **chunks** of data, without really caring what those bytes represent. But one common use case is shuffling a table on (the hash of) one or more columns. In this scenario, `rapidsmpf` can be used as part of a Shuffle Join implementation. -This diagram shows multiple nodes working together to shuffle a large, logical +This diagram shows multiple {term}`Rank`s working together to shuffle a large, logical Table. ![A diagram showing how to use rapidsmpf to shuffle a table.](../_static/rapidsmpf-shuffle-table-fs8.png) -Suppose you have a large logical table that's split into a number of partitions. +Suppose you have a large logical table that's split into a number of {term}`Partition`s. In the diagram above, this is shown as the different dashed boxes on the -left-hand side. In this example, we've shown four partitions, but this could be +left-hand side. In this example, we've shown four {term}`Partition`s, but this could be much larger. Each row in the table is assigned to some group (by the hash of the columns you're joining on, say), which is shown by the color of the row. -Your program **inserts** data to the shuffler. In this case, it's inserting +Your program **inserts** data to the {term}`Shuffler`. In this case, it's inserting chunks that represent pieces of the table that have been partitioned (by hash key) and packed into a chunk. -Each rank involved in the shuffle knows which ranks are responsible for which -hash keys. For example, rank 1 knows that it's responsible for the purple -chunks, needs to send red chunks to rank 2, etc. +Each {term}`Rank` involved in the shuffle knows which {term}`Rank`s are responsible for which +hash keys. For example, {term}`Rank` 1 knows that it's responsible for the purple +chunks, needs to send red chunks to {term}`Rank` 2, etc. -Each input partition possibly includes data for each hash key. All the processes +Each input {term}`Partition` possibly includes data for each hash key. All the processes involved in the shuffle move data to get all the chunks with a particular hash -key to the correct rank (spilling if needed). This is shown in the middle +key to the correct {term}`Rank` ({term}`Spilling` if needed). This is shown in the middle section. As chunks become "ready" (see above), your program can **extract** chunks and @@ -132,7 +132,7 @@ process them as necessary. This is shown on the right-hand side. ### Shuffle Statistics -Shuffles can be configured to collect statistics, which can help you understand the performance of the system. +Shuffles can be configured to collect {term}`Statistics`, which can help you understand the performance of the system. This table gives an overview of the different statistics collected. | Name | Type | Description | diff --git a/docs/source/background/streaming-engine.md b/docs/source/background/streaming-engine.md index f4ca74569..503ba58d9 100644 --- a/docs/source/background/streaming-engine.md +++ b/docs/source/background/streaming-engine.md @@ -24,24 +24,24 @@ memory at once. The abstract framework we use to describe task graphs is broadly that of Hoare's [Communicating Sequential Processes](https://en.wikipedia.org/wiki/Communicating_sequential_processes). -Nodes (tasks) in the graph are long-lived that read from zero-or-more -channels and write to zero-or-more channels. In this sense, the programming +{term}`Node`s (tasks) in the graph are long-lived that read from zero-or-more +{term}`Channel`s and write to zero-or-more channels. In this sense, the programming model is relatively close to that of [actors](https://en.wikipedia.org/wiki/Actor_model). The communication channels are bounded capacity, multi-producer -multi-consumer queues. A node processing data from an input channel pulls +multi-consumer queues. A {term}`Node` processing data from an input {term}`Channel` pulls data as necessary until the channel is empty, and can optionally signal that it needs no more data (thus shutting the producer down). -Communication between tasks in the same process occurs through channels. In +Communication between tasks in the same process occurs through {term}`Channel`s. In contrast communication between processes uses the lower-level rapidsmpf communication primitives. In this way, achieving forward progress of the task graph is a local property, as long as the logically collective semantics of individual tasks are obeyed internally. The recommended usage to target multiple GPUs is to have one process per -GPU, tied together by a rapidsmpf communicator. +GPU, tied together by a rapidsmpf {term}`Communicator`. ## Building task networks from query plans @@ -51,7 +51,7 @@ application specific intermediate representation, though one can write it by hand. For example, one can convert logical plans from query engines such as Polars, DuckDB, etc to a physical plan to be executed by rapidsmpf. -A typical approach is to define one node in the graph for each physical +A typical approach is to define one {term}`Node` in the graph for each physical operation in the query plan. Parallelism is obtained by using a multi-threaded executor to handle the concurrent tasks that thus result. @@ -65,48 +65,18 @@ library. | Scan | --> | Select | --> | Filter | --> | Sink | +------+ +--------+ +--------+ +------+ ``` -*A typical rapidsmpf network of nodes* +*A typical rapidsmpf {term}`Network` of {term}`Node`s* - Once constructed, the network of "nodes" and their connecting channels remains in place for the duration of the workflow. Each node continuously awaits new data, activating as soon as inputs are ready and forwarding results downstream via the channels to the next node(s) in the graph. + Once constructed, the {term}`Network` of {term}`Node`s and their connecting {term}`Channel`s remains in place for the duration of the workflow. Each node continuously awaits new data, activating as soon as inputs are ready and forwarding results downstream via the channels to the next node(s) in the graph. -## Definitions +## Key Concepts -```{glossary} -Network - A graph of nodes and edges. Nodes are the relational operators on data and edges are the channels connecting the next operation in the workflow. +The streaming engine is built around these core concepts (see the {doc}`/glossary` for complete definitions): -Context - Provides access to resources necessary for executing nodes: - - Communicators (UCXX or MPI) - - Thread pool executor - - CUDA Memory (RMM) - - rapidsmpf Buffer Resource (spillable) - -Buffer - Raw memory buffers typically shared pointers from tabular data provided by cuDF. - - - Buffers are created most commonly during scan (read_parquet) operations but can also be created during joins and aggregations. When operating on multiple buffers either a new stream is created for the new buffer or re-use of an existing stream is attached the newly created buffer. - - Buffers have an attached CUDA Stream maintained for the lifetime of the buffer. - -Message - [Type-erased](https://en.wikipedia.org/wiki/Type_erasure) container for data payloads (shared memory pointers) including: cudf tables, buffers, and rapidsmpf internal data structures like packed data. - - - Messages also contain metadata: a sequence number. - - Sequences _do not_ guarantee that chunks arrive in order but they do provide the order in which the data was created. - -Node - Coroutine-based asynchronous relational operator: read, filter, select, join. - - - Nodes read from zero-or-more channels and write to zero-or-more channels. - - Multiple Nodes can be executed concurrently. - - Nodes can communicate directly using "streaming" collective operations such as shuffles and joins (see [Streaming collective operations](./shuffle-architecture.md#streaming-collective-operations)). - -Channel - An asynchronous messaging queue used for communicating Messages between Nodes. - - Provides backpressure to the network prevent over consumption of memory - - Can be throttled to prevent over production of buffers which can useful when writing producer nodes that otherwise do not depend on an input channel. - - Sending suspends when channel is "full". - - Does not copy (de-)serialize, (un-)spill data - -``` +- {term}`Network` - A graph of {term}`Node`s connected by {term}`Channel`s +- {term}`Node` - Coroutine-based asynchronous operators (read, filter, select, join) +- {term}`Channel` - Asynchronous messaging queues with backpressure +- {term}`Message` - Type-erased containers for data payloads +- {term}`Context` - Provides access to resources ({term}`Communicator`, {term}`BufferResource`, etc.) +- {term}`Buffer` - Raw memory allocations with attached CUDA streams diff --git a/docs/source/glossary.md b/docs/source/glossary.md new file mode 100644 index 000000000..0972c9973 --- /dev/null +++ b/docs/source/glossary.md @@ -0,0 +1,86 @@ +# Glossary + +This glossary defines key concepts and terminology used throughout rapidsmpf. + +```{glossary} +AllGather + A collective operation that gathers data from all ranks and distributes the combined result to every rank. Each rank contributes its local data, and after the operation completes, all ranks have a copy of the concatenated data from all participants. + +Buffer + A raw memory allocation that can reside in device (GPU), pinned host, or regular host memory. Buffers have an attached CUDA stream maintained for the lifetime of the buffer. They are typically created during scan (read) operations or when new data is produced by joins and aggregations. + +BufferResource + A class that manages memory allocation and transfers between different memory types (device, pinned host, and host). All memory operations in rapidsmpf, such as those performed by the Shuffler, rely on a BufferResource for memory management. It handles memory reservations, spilling, and provides access to CUDA stream pools. + +Channel + An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s in a streaming network. Channels provide backpressure to prevent memory overconsumption by suspending senders when full. They do not copy, serialize, or spill data - they simply pass references between nodes. + +Collective Operation + A communication pattern that involves coordination across multiple ranks. Examples include {term}`Shuffler` (redistributing data by partition), {term}`AllGather` (gathering data from all ranks), and {term}`Fanout` (broadcasting to multiple channels). These operations are handled internally within nodes while maintaining CSP semantics. + +Communicator + An abstract interface for sending and receiving messages between ranks (processes/GPUs). Communicators support asynchronous operations, GPU data transfers, and custom logging. Implementations include UCXX (for UCX-based communication) and MPI backends. + +Context + The execution environment for {term}`Node`s in a streaming pipeline. A Context provides access to essential resources including: + - A {term}`Communicator` for inter-rank communication + - A {term}`BufferResource` for memory management + - A {term}`ProgressThread` for background operations + - A coroutine thread pool executor + - Configuration {term}`Options` + - {term}`Statistics` for performance tracking + +Fanout + A streaming operation that broadcasts messages from a single input {term}`Channel` to multiple output channels. Supports both bounded and unbounded policies for controlling message delivery. + +MemoryReservation + A token representing a promise of future memory allocation from a {term}`BufferResource`. Reservations must be obtained before allocating buffers, enabling the system to track memory usage and perform spilling when necessary. Reservations specify the memory type (device, pinned host, or host) and size. + +MemoryType + An enumeration specifying the location of memory: + - `DEVICE`: GPU memory + - `PINNED_HOST`: Page-locked host memory for efficient GPU transfers + - `HOST`: Regular system memory + +Message + A type-erased container for data payloads passed through {term}`Channel`s. Messages wrap arbitrary payload types (such as cuDF tables or buffers) along with metadata including a sequence number for ordering. Messages support deep-copy operations and can be spilled to different memory types when memory pressure occurs. + +Network + A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline. Nodes are relational operators on data, and channels are the edges connecting operations in the workflow. The network remains in place for the duration of a workflow, with nodes continuously processing data as it flows through. + +Node + A coroutine-based asynchronous operator in a streaming pipeline. Nodes read from zero or more input {term}`Channel`s, perform computation, and write to zero or more output channels. They can be local (operating on data within a single rank) or collective (coordinating across multiple ranks). Multiple nodes execute concurrently via a thread pool executor. + +Options + A configuration container that stores key-value pairs controlling rapidsmpf behavior. Options can be populated from environment variables (prefixed with `RAPIDSMPF_`) or set programmatically. Common options include logging verbosity, memory limits, and integration-specific settings. + +PackedData + Serialized (packed) data ready for transfer between ranks or for spilling to host memory. PackedData contains both metadata and the actual data buffers in a format that can be efficiently transmitted and later unpacked back into structured data like cuDF tables. + +Partition + A logical division of data assigned to a particular rank during shuffle operations. Data is partitioned using hash-based or custom partitioning schemes, with each partition identified by a unique partition ID (PartID). The {term}`Shuffler` redistributes partitions so that each rank receives all data belonging to its assigned partitions. + +Payload + A protocol that message payloads must implement to be sent through {term}`Channel`s. The protocol defines how to construct a payload from a {term}`Message` and how to insert a payload back into a message, enabling type-safe communication between nodes. + +ProgressThread + A background thread that executes registered progress functions in a loop. Used by the {term}`Shuffler` and other components to make continuous progress on asynchronous operations without blocking the main execution. Functions can be dynamically added and removed, and the thread can be paused and resumed. + +Rank + A unique integer identifier for a process in a distributed system, ranging from 0 to nranks-1. Each rank typically corresponds to one GPU. The rank is used to determine which partitions a process owns and to route messages between processes. + +Shuffler + A service for redistributing partitioned data across ranks. The Shuffler accepts packed data chunks, routes them to the appropriate destination ranks based on partition ownership, and allows extraction of completed partitions. It supports asynchronous operation with pipelining of insertions and extractions, and can spill data to host memory under memory pressure. + +SpillableMessages + A collection that manages {term}`Message`s that can be spilled to different memory types (typically from device to host memory) when GPU memory is scarce. Messages are inserted with a unique ID and can be extracted or spilled on demand, enabling out-of-core processing of data larger than GPU memory. + +SpillManager + A component that coordinates memory spilling across different parts of the system. The SpillManager maintains a registry of spill functions with priorities, and when memory pressure occurs, it invokes these functions to free up memory by moving data from device memory to host memory or storage. + +Spilling + The process of moving data from GPU (device) memory to host memory or storage when GPU memory is scarce. Spilling enables out-of-core processing where the working set exceeds available GPU memory. Data is later "unspilled" (moved back to GPU memory) when needed for computation. + +Statistics + A class for collecting and reporting performance metrics during rapidsmpf operations. Statistics tracks various counters (bytes transferred, operations performed, timing information) and can optionally profile memory allocations. Statistics can be enabled/disabled and provides a formatted report of collected metrics. +``` diff --git a/docs/source/index.md b/docs/source/index.md index 7042ab3e6..549be7746 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -3,8 +3,8 @@ Building high-performance GPU pipelines is hard. Each stage must move data efficiently between GPUs and processes, synchronize work, and manage limited device memory. RapidsMPF, including its nascent Streaming Engine and Out-of-Core (OOC) shuffle, provides a unified framework for asynchronous, multi-GPU pipelines using simple streaming primitives — {term}`Channel`s, {term}`Node`s, and {term}`Message`s — built on RAPIDS components: [rmm](https://docs.rapids.ai/api/rmm/nightly), [libcudf](https://docs.rapids.ai/api/libcudf/nightly/), and [ucxx](https://docs.rapids.ai/api/ucxx/nightly/). -RapidsMPF's design leverages Explicit Data Parallelism (SPMD-style coordination) combined with a local CSP-style streaming model, enabling the engine to overlap I/O, computation, and communication. This makes it possible to handle out-of-core processing efficiently and integrate seamlessly with frontend query engines such as Polars. -The result is clean, composable, and scalable GPU streaming — from single-node prototypes to large-scale, multi-GPU deployments. +RapidsMPF's design leverages Explicit Data Parallelism (SPMD-style coordination) combined with a local CSP-style streaming model, enabling the engine to overlap I/O, computation, and communication. This makes it possible to handle out-of-core processing efficiently (via {term}`Spilling`) and integrate seamlessly with frontend query engines such as Polars. +The result is clean, composable, and scalable GPU streaming — from single-node prototypes to large-scale, multi-GPU deployments. See the {doc}`glossary` for definitions of key concepts. Follow the RAPIDS [installation guide](https://docs.rapids.ai/install/) to install `rapidsmpf` and the necessary dependencies. @@ -16,4 +16,5 @@ caption: Contents: quickstart.md background/index.md api.md +glossary.md ``` diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index ecb07c866..6b1002cb6 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -40,8 +40,8 @@ the same partition. See [Dask Integration](#api-integration-dask) for more. ## Streaming Engine -Basic streaming pipeline example in Python. In this example we have 3 {term}`Node`s -in the network: push_to_channel->count_num_rows->pull_from_channel. +Basic streaming pipeline example in Python. In this example we have 3 {term}`Node`s +in the {term}`Network`: push_to_channel->count_num_rows->pull_from_channel. *note: push_to_channel/pull_from_channel are convenience functions which simulate scans/writes* From e7af4a916e246f268a91637f0b75a00a736feac0 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 18 Dec 2025 05:11:30 -0800 Subject: [PATCH 2/4] fixups --- cpp/include/rapidsmpf/allgather/allgather.hpp | 2 +- docs/source/glossary.md | 21 ++++++++----------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cpp/include/rapidsmpf/allgather/allgather.hpp b/cpp/include/rapidsmpf/allgather/allgather.hpp index b1ef5b2d5..21cece782 100644 --- a/cpp/include/rapidsmpf/allgather/allgather.hpp +++ b/cpp/include/rapidsmpf/allgather/allgather.hpp @@ -363,7 +363,7 @@ class PostBox { * inverse bandwidth. Although the latency term is linear (rather than * logarithmic as is the case for Bruck's algorithm or recursive * doubling) MPI implementations typically observe that for large - * messages ring allgorithms perform better since message passing is + * messages ring algorithms perform better since message passing is * only nearest neighbour. */ class AllGather { diff --git a/docs/source/glossary.md b/docs/source/glossary.md index 0972c9973..22d5427f6 100644 --- a/docs/source/glossary.md +++ b/docs/source/glossary.md @@ -7,26 +7,26 @@ AllGather A collective operation that gathers data from all ranks and distributes the combined result to every rank. Each rank contributes its local data, and after the operation completes, all ranks have a copy of the concatenated data from all participants. Buffer - A raw memory allocation that can reside in device (GPU), pinned host, or regular host memory. Buffers have an attached CUDA stream maintained for the lifetime of the buffer. They are typically created during scan (read) operations or when new data is produced by joins and aggregations. + A raw memory allocation that can reside in device (GPU), pinned host, or regular host memory. Buffers have an attached CUDA stream maintained for the lifetime of the buffer. They are created through a {term}`BufferResource`. BufferResource A class that manages memory allocation and transfers between different memory types (device, pinned host, and host). All memory operations in rapidsmpf, such as those performed by the Shuffler, rely on a BufferResource for memory management. It handles memory reservations, spilling, and provides access to CUDA stream pools. Channel - An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s in a streaming network. Channels provide backpressure to prevent memory overconsumption by suspending senders when full. They do not copy, serialize, or spill data - they simply pass references between nodes. + An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s in a streaming network. Channels provide backpressure to prevent excessive memory consumption by suspending senders when full. They do not copy, serialize, or spill data - they simply pass references between nodes. Collective Operation - A communication pattern that involves coordination across multiple ranks. Examples include {term}`Shuffler` (redistributing data by partition), {term}`AllGather` (gathering data from all ranks), and {term}`Fanout` (broadcasting to multiple channels). These operations are handled internally within nodes while maintaining CSP semantics. + A communication pattern that involves coordination across multiple {term}`Rank`s. Examples include {term}`Shuffler` (redistributing data by partition), {term}`AllGather` (gathering data from all ranks), and {term}`Fanout` (broadcasting to multiple channels). Communicator - An abstract interface for sending and receiving messages between ranks (processes/GPUs). Communicators support asynchronous operations, GPU data transfers, and custom logging. Implementations include UCXX (for UCX-based communication) and MPI backends. + An abstract interface for sending and receiving messages between {term}`Rank`s (processes/GPUs). Communicators support asynchronous operations, GPU data transfers, and custom logging. rapidsmpf includes UCXX (for UCX-based communication) and MPI-based communicators. A single-process communicator can be used for testing. Context The execution environment for {term}`Node`s in a streaming pipeline. A Context provides access to essential resources including: - A {term}`Communicator` for inter-rank communication - A {term}`BufferResource` for memory management - A {term}`ProgressThread` for background operations - - A coroutine thread pool executor + - A thread pool for executing coroutines - Configuration {term}`Options` - {term}`Statistics` for performance tracking @@ -49,7 +49,7 @@ Network A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline. Nodes are relational operators on data, and channels are the edges connecting operations in the workflow. The network remains in place for the duration of a workflow, with nodes continuously processing data as it flows through. Node - A coroutine-based asynchronous operator in a streaming pipeline. Nodes read from zero or more input {term}`Channel`s, perform computation, and write to zero or more output channels. They can be local (operating on data within a single rank) or collective (coordinating across multiple ranks). Multiple nodes execute concurrently via a thread pool executor. + A coroutine-based asynchronous operator in a streaming pipeline. Nodes receive from zero or more input {term}`Channel`s, perform computation, and send to zero or more output channels. They can be local (operating on data within a single rank) or collective (coordinating across multiple ranks). Multiple nodes execute concurrently via a thread pool executor. Options A configuration container that stores key-value pairs controlling rapidsmpf behavior. Options can be populated from environment variables (prefixed with `RAPIDSMPF_`) or set programmatically. Common options include logging verbosity, memory limits, and integration-specific settings. @@ -58,16 +58,13 @@ PackedData Serialized (packed) data ready for transfer between ranks or for spilling to host memory. PackedData contains both metadata and the actual data buffers in a format that can be efficiently transmitted and later unpacked back into structured data like cuDF tables. Partition - A logical division of data assigned to a particular rank during shuffle operations. Data is partitioned using hash-based or custom partitioning schemes, with each partition identified by a unique partition ID (PartID). The {term}`Shuffler` redistributes partitions so that each rank receives all data belonging to its assigned partitions. - -Payload - A protocol that message payloads must implement to be sent through {term}`Channel`s. The protocol defines how to construct a payload from a {term}`Message` and how to insert a payload back into a message, enabling type-safe communication between nodes. + A logical division of data assigned to a particular rank during shuffle operations. Data is partitioned using hash-based or custom partitioning schemes, with each partition identified by a unique partition ID. The {term}`Shuffler` redistributes partitions so that each rank receives all data belonging to its assigned partitions. ProgressThread - A background thread that executes registered progress functions in a loop. Used by the {term}`Shuffler` and other components to make continuous progress on asynchronous operations without blocking the main execution. Functions can be dynamically added and removed, and the thread can be paused and resumed. + A background thread that executes registered progress functions in a loop. Used by the {term}`Shuffler` and other components to make continuous progress on asynchronous operations (including spilling and unspilling) without blocking the main execution. Functions can be dynamically added and removed, and the thread can be paused and resumed. Rank - A unique integer identifier for a process in a distributed system, ranging from 0 to nranks-1. Each rank typically corresponds to one GPU. The rank is used to determine which partitions a process owns and to route messages between processes. + A unique integer identifier for a process in a distributed system, ranging from 0 to nranks-1. Each rank typically corresponds to one GPU. The rank is used to determine how to distribute work among processes and how to route messages between processes. Shuffler A service for redistributing partitioned data across ranks. The Shuffler accepts packed data chunks, routes them to the appropriate destination ranks based on partition ownership, and allows extraction of completed partitions. It supports asynchronous operation with pipelining of insertions and extractions, and can spill data to host memory under memory pressure. From b4d966ea213b990c7f3662c0b9d3d6f4ca4f509e Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 23 Dec 2025 07:43:17 -0600 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Mads R. B. Kristensen --- docs/source/glossary.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/glossary.md b/docs/source/glossary.md index 22d5427f6..5780b6332 100644 --- a/docs/source/glossary.md +++ b/docs/source/glossary.md @@ -7,16 +7,16 @@ AllGather A collective operation that gathers data from all ranks and distributes the combined result to every rank. Each rank contributes its local data, and after the operation completes, all ranks have a copy of the concatenated data from all participants. Buffer - A raw memory allocation that can reside in device (GPU), pinned host, or regular host memory. Buffers have an attached CUDA stream maintained for the lifetime of the buffer. They are created through a {term}`BufferResource`. + A raw memory allocation that can reside in device (GPU), pinned host, or regular host memory. Buffers have an attached CUDA stream maintained for the lifetime of the buffer; all operations on the buffer are stream-ordered, including when the underlying storage is host memory. They are created through a {term}`BufferResource`. BufferResource A class that manages memory allocation and transfers between different memory types (device, pinned host, and host). All memory operations in rapidsmpf, such as those performed by the Shuffler, rely on a BufferResource for memory management. It handles memory reservations, spilling, and provides access to CUDA stream pools. Channel - An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s in a streaming network. Channels provide backpressure to prevent excessive memory consumption by suspending senders when full. They do not copy, serialize, or spill data - they simply pass references between nodes. + An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s within a single local network instance on a {term}`Rank`. Channels provide backpressure to prevent excessive memory consumption by suspending senders when full. They do not copy, serialize, spill, or transmit data across ranks; instead, they simply pass references between local nodes. Any inter-rank communication is handled explicitly by nodes via a {term}`Communicator`, outside the channel abstraction. Collective Operation - A communication pattern that involves coordination across multiple {term}`Rank`s. Examples include {term}`Shuffler` (redistributing data by partition), {term}`AllGather` (gathering data from all ranks), and {term}`Fanout` (broadcasting to multiple channels). + A communication pattern that involves coordination across multiple {term}`Rank`s and is performed within a {term}`Node`. Collective operations use a {term}`Communicator` to exchange data between ranks, while remaining fully encapsulated within the node’s execution. From the network’s perspective, a collective operation is part of a local node’s computation and does not alter the network topology or channel semantics. Examples include {term}`Shuffler` (redistributing data by partition) and {term}`AllGather` (gathering data from all ranks). Operations such as {term}`Fanout`, which broadcast to multiple channels within the local network, are **not** collective operations because they do not involve inter-rank communication. Communicator An abstract interface for sending and receiving messages between {term}`Rank`s (processes/GPUs). Communicators support asynchronous operations, GPU data transfers, and custom logging. rapidsmpf includes UCXX (for UCX-based communication) and MPI-based communicators. A single-process communicator can be used for testing. @@ -46,10 +46,10 @@ Message A type-erased container for data payloads passed through {term}`Channel`s. Messages wrap arbitrary payload types (such as cuDF tables or buffers) along with metadata including a sequence number for ordering. Messages support deep-copy operations and can be spilled to different memory types when memory pressure occurs. Network - A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline. Nodes are relational operators on data, and channels are the edges connecting operations in the workflow. The network remains in place for the duration of a workflow, with nodes continuously processing data as it flows through. + A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline local to a single {term}`Rank`. From the network’s point of view, all nodes and channels are local, even if individual nodes internally perform inter-rank communication. The network topology is identical on every participating rank, which ensures consistent execution semantics across the distributed system. The network remains in place for the duration of a workflow, with nodes continuously processing data as data flows through. Node - A coroutine-based asynchronous operator in a streaming pipeline. Nodes receive from zero or more input {term}`Channel`s, perform computation, and send to zero or more output channels. They can be local (operating on data within a single rank) or collective (coordinating across multiple ranks). Multiple nodes execute concurrently via a thread pool executor. + A coroutine-based asynchronous operator in a streaming pipeline. Nodes receive from zero or more input {term}`Channel`s, perform computation, and send to zero or more output channels. From the network’s perspective, nodes are always local operators. A node may internally use a {term}`Communicator` to perform inter-rank communication, but this coordination is fully encapsulated within the node and is not visible to the surrounding network or channels. Multiple nodes execute concurrently via a thread pool executor. Options A configuration container that stores key-value pairs controlling rapidsmpf behavior. Options can be populated from environment variables (prefixed with `RAPIDSMPF_`) or set programmatically. Common options include logging verbosity, memory limits, and integration-specific settings. From 0860225af3e049206ac973706350038e822a624e Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 23 Dec 2025 07:22:54 -0800 Subject: [PATCH 4/4] lint --- docs/source/glossary.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/glossary.md b/docs/source/glossary.md index 5780b6332..699ea8922 100644 --- a/docs/source/glossary.md +++ b/docs/source/glossary.md @@ -16,7 +16,7 @@ Channel An asynchronous messaging queue used for communicating {term}`Message`s between {term}`Node`s within a single local network instance on a {term}`Rank`. Channels provide backpressure to prevent excessive memory consumption by suspending senders when full. They do not copy, serialize, spill, or transmit data across ranks; instead, they simply pass references between local nodes. Any inter-rank communication is handled explicitly by nodes via a {term}`Communicator`, outside the channel abstraction. Collective Operation - A communication pattern that involves coordination across multiple {term}`Rank`s and is performed within a {term}`Node`. Collective operations use a {term}`Communicator` to exchange data between ranks, while remaining fully encapsulated within the node’s execution. From the network’s perspective, a collective operation is part of a local node’s computation and does not alter the network topology or channel semantics. Examples include {term}`Shuffler` (redistributing data by partition) and {term}`AllGather` (gathering data from all ranks). Operations such as {term}`Fanout`, which broadcast to multiple channels within the local network, are **not** collective operations because they do not involve inter-rank communication. + A communication pattern that involves coordination across multiple {term}`Rank`s and is performed within a {term}`Node`. Collective operations use a {term}`Communicator` to exchange data between ranks, while remaining fully encapsulated within the node's execution. From the network's perspective, a collective operation is part of a local node's computation and does not alter the network topology or channel semantics. Examples include {term}`Shuffler` (redistributing data by partition) and {term}`AllGather` (gathering data from all ranks). Operations such as {term}`Fanout`, which broadcast to multiple channels within the local network, are **not** collective operations because they do not involve inter-rank communication. Communicator An abstract interface for sending and receiving messages between {term}`Rank`s (processes/GPUs). Communicators support asynchronous operations, GPU data transfers, and custom logging. rapidsmpf includes UCXX (for UCX-based communication) and MPI-based communicators. A single-process communicator can be used for testing. @@ -46,10 +46,10 @@ Message A type-erased container for data payloads passed through {term}`Channel`s. Messages wrap arbitrary payload types (such as cuDF tables or buffers) along with metadata including a sequence number for ordering. Messages support deep-copy operations and can be spilled to different memory types when memory pressure occurs. Network - A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline local to a single {term}`Rank`. From the network’s point of view, all nodes and channels are local, even if individual nodes internally perform inter-rank communication. The network topology is identical on every participating rank, which ensures consistent execution semantics across the distributed system. The network remains in place for the duration of a workflow, with nodes continuously processing data as data flows through. + A directed graph of {term}`Node`s connected by {term}`Channel`s representing a streaming data processing pipeline local to a single {term}`Rank`. From the network's point of view, all nodes and channels are local, even if individual nodes internally perform inter-rank communication. The network topology is identical on every participating rank, which ensures consistent execution semantics across the distributed system. The network remains in place for the duration of a workflow, with nodes continuously processing data as data flows through. Node - A coroutine-based asynchronous operator in a streaming pipeline. Nodes receive from zero or more input {term}`Channel`s, perform computation, and send to zero or more output channels. From the network’s perspective, nodes are always local operators. A node may internally use a {term}`Communicator` to perform inter-rank communication, but this coordination is fully encapsulated within the node and is not visible to the surrounding network or channels. Multiple nodes execute concurrently via a thread pool executor. + A coroutine-based asynchronous operator in a streaming pipeline. Nodes receive from zero or more input {term}`Channel`s, perform computation, and send to zero or more output channels. From the network's perspective, nodes are always local operators. A node may internally use a {term}`Communicator` to perform inter-rank communication, but this coordination is fully encapsulated within the node and is not visible to the surrounding network or channels. Multiple nodes execute concurrently via a thread pool executor. Options A configuration container that stores key-value pairs controlling rapidsmpf behavior. Options can be populated from environment variables (prefixed with `RAPIDSMPF_`) or set programmatically. Common options include logging verbosity, memory limits, and integration-specific settings.