Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/allgather/allgather.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ class PostBox {
* inverse bandwidth. Although the latency term is linear (rather than
* logarithmic as is the case for Bruck's algorithm or recursive
* doubling) MPI implementations typically observe that for large
* messages ring allgorithms perform better since message passing is
* messages ring algorithms perform better since message passing is
* only nearest neighbour.
*/
class AllGather {
Expand Down
18 changes: 9 additions & 9 deletions docs/source/background/channels.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Channels


Channels are asynchronous messaging queue used to move messages between {term}`Node`s in the rapidsmpf streaming network.
{term}`Channel`s are asynchronous messaging queues used to move {term}`Message`s between {term}`Node`s in the rapidsmpf streaming {term}`Network`.

```{image} ../_static/animation-legend.png
:width: 320px
Expand Down Expand Up @@ -33,11 +33,11 @@ As buffers move through the graph, the channels (arrows) move from empty (dashed
*fig: example streaming network with 3 Nodes and 2 Channels*

Components:
• Node: Coroutine that processes messages
• Channel: Async queue connecting nodes
• Message: GPU Buffer with a CUDA Stream
{term}`Node`: Coroutine that processes {term}`Message`s
{term}`Channel`: Async queue connecting nodes
{term}`Message`: GPU {term}`Buffer` with a CUDA Stream

In the above graph, moving data in and out of channels on a single GPU should be relatively cheap, nearly free! This stratedy of using channels to move tasks/buffers is a core methodology for rapidsmpf to overlap: scans, compute, spilling, and communication.
In the above graph, moving data in and out of {term}`Channel`s on a single GPU should be relatively cheap, nearly free! This strategy of using channels to move tasks/{term}`Buffer`s is a core methodology for rapidsmpf to overlap: scans, compute, {term}`spilling`, and communication.

## Backpressure

Expand All @@ -62,7 +62,7 @@ Producer Side: Consumer Side:
Key Properties:
• Non-blocking: Coroutines suspend, not threads
• Backpressure: Slow consumers throttle producers
• Type-safe: Messages are type-erased but validated
• Type-safe: {term}`Message`s are type-erased but validated

A Consumer is **"full"** when an internal ring_buffer `coro::ring_buffer<Message, 1> rb_;` has reached capacity.

Expand All @@ -88,8 +88,8 @@ for (int i = 0; i < n_producer; i++) {
}
```

Internally, when using a `throttle` a Node that writes into a channel must acquire a ticket granting permission to write before being able to. The write/send then returns a receipt that grants permission to release the ticket. The consumer of a throttled channel reads messages without issue. This means that the throttle is localised to the producer nodes.
Internally, when using a `throttle` a {term}`Node` that writes into a {term}`Channel` must acquire a ticket granting permission to write before being able to. The write/send then returns a receipt that grants permission to release the ticket. The consumer of a throttled channel reads {term}`Message`s without issue. This means that the throttle is localised to the producer nodes.

More simply, using a throttling adaptor limits the number messages a producer writes into a channel. This pattern is very useful for producer nodes where we want some amount of bounded concurrency in the tasks that might suspend before sending into a channel -- especially useful when trying to minimize the over-production of long-lived memory: reads/scans, shuffles, etc.
More simply, using a throttling adaptor limits the number of {term}`Message`s a producer writes into a {term}`Channel`. This pattern is very useful for producer {term}`Node`s where we want some amount of bounded concurrency in the tasks that might suspend before sending into a channel -- especially useful when trying to minimize the over-production of long-lived memory: reads/scans, shuffles, etc.

eg. a source node that read files. `ThrottlingAdaptor` will allow the node to delay reading files, until it has acquired a ticket to send a message to the channel. In comparison, non-throttling channels will suspend during send by which time, the files have already loaded into the memory unnecessarily
e.g. a source node that reads files. `ThrottlingAdaptor` will allow the {term}`Node` to delay reading files, until it has acquired a ticket to send a {term}`Message` to the {term}`Channel`. In comparison, non-throttling channels will suspend during send by which time, the files have already loaded into the memory unnecessarily.
10 changes: 5 additions & 5 deletions docs/source/background/nodes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Nodes

Nodes are coroutine-based asynchronous relational operators that read from
{term}`Node`s are coroutine-based asynchronous relational operators that read from
zero-or-more {term}`Channel`s and write to zero-or-more {term}`Channel`s within a {term}`Network`.

**C++**
Expand Down Expand Up @@ -49,12 +49,12 @@ async def accumulator(ch_out, ch_in, msg):

## Node Types

Nodes fall into two categories:
{term}`Node`s fall into two categories:
- Local Nodes: These include operations like filtering, projection, or column-wise transforms. They operate exclusively on local data and preserve CSP semantics.

- Collective Nodes: These include operations like shuffle, join, groupby aggregations, etc. which require access to distributed data.
- Collective Nodes: These include {term}`Collective Operation`s like {term}`Shuffler`, join, groupby aggregations, etc. which require access to distributed data.

In the case of a collective nodes, remote communication is handled internally. For example, a shuffle node may need to access all partitions of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder "Channels" are an abstraction and not used to serialize and pass data between workers
In the case of collective {term}`Node`s, remote communication is handled internally. For example, a shuffle node may need to access all {term}`Partition`s of a table, both local and remote, but this coordination and data exchange happens inside the CSP-process itself. As a reminder {term}`Channel`s are an abstraction and not used to serialize and pass data between workers.

This hybrid model, which combines a SPMD-style distribution model and a local CSP-style streaming model, offers several advantages:

Expand All @@ -64,4 +64,4 @@ This hybrid model, which combines a SPMD-style distribution model and a local CS

- It makes inter-worker parallelism explicit through SPMD-style communication.

For examples of communication nodes and collective operations please read the [shuffle architecture page](./shuffle-architecture.md)
For examples of communication nodes and collective operations please read the [shuffle architecture page](./shuffle-architecture.md).
94 changes: 47 additions & 47 deletions docs/source/background/shuffle-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,132 +7,132 @@ machines. The key requirement is that there exist communication links
between the GPUs.

The core abstraction that encapsulates the set of processes that are
executing collectively is a `Communicator`. This provides unique
identifiers (termed `rank`s) to each process along with message-passing
routes between them. We provide communicator implementations based either
executing collectively is a {term}`Communicator`. This provides unique
identifiers (termed {term}`Rank`s) to each process along with message-passing
routes between them. We provide {term}`Communicator` implementations based either
directly on [UCX](https://openucx.org/)/[UCXX](https://github.com/rapidsai/ucxx) or
[MPI](https://www.mpi-forum.org). Message passing handles CPU and GPU data
uniformly, the underlying transport takes care of choosing the appropriate
route.

## "Streaming" collective operations

`rapidsmpf` provides collectives (i.e. communication
`rapidsmpf` provides {term}`Collective Operation`s (i.e. communication
primitives) that operate on "streaming" data. As a consequence, a
round of collective communication proceeds in four stages:

1. Participating ranks (defined by the `Communicator`) create a
1. Participating {term}`Rank`s (defined by the {term}`Communicator`) create a
collective object.
2. Each rank independently _inserts_ zero-or-more data chunks into the
2. Each {term}`Rank` independently _inserts_ zero-or-more data chunks into the
collective object.
3. Once a rank has inserted all data chunks, it inserts a _finish marker_.
4. After insertion is finished, a rank can _extract_ data that is the
3. Once a {term}`Rank` has inserted all data chunks, it inserts a _finish marker_.
4. After insertion is finished, a {term}`Rank` can _extract_ data that is the
result of the collective communication. This may block until data are
ready.

Collectives over subsets of all ranks in the program are enabled by
creating a `Communicator` object that only contains the desired
participating ranks.
Collectives over subsets of all {term}`Rank`s in the program are enabled by
creating a {term}`Communicator` object that only contains the desired
participating {term}`Rank`s.

Multiple collective operations can be live at the same time, they are each
distinguished by a `tag`. This `tag` must be consistent across all
participating ranks to line up the messages in the collective.

Notice that we are not responsible for providing the output buffers that a
Notice that we are not responsible for providing the output {term}`Buffer`s that a
collective writes into. This is a consequence of the streaming design: to
allocate output buffers of the correct size we would first have to see all
inputs. Instead `rapidsmpf` is responsible for allocation of output buffers
and spilling data from device to host if device memory is at a premium.
allocate output {term}`Buffer`s of the correct size we would first have to see all
inputs. Instead `rapidsmpf` is responsible for allocation of output {term}`Buffer`s
and {term}`Spilling` data from device to host if device memory is at a premium.
However, although `rapidsmpf` allocates outputs it never interprets your
data: it just sends and receives bytes "as-is".

## Shuffles

A key collective operation in large-scale data analytics is a "shuffle"
(a generalised all-to-all). In a shuffle, every participating rank sends
data to every other rank. We will walk through a high-level overview of the
A key {term}`Collective Operation` in large-scale data analytics is a "shuffle"
(a generalised all-to-all). In a shuffle, every participating {term}`Rank` sends
data to every other {term}`Rank`. We will walk through a high-level overview of the
steps in a shuffle using `rapidsmpf` to see how things fit together.

Having created a collective shuffle operation (a `rapidsmpf::Shuffler`), at
Having created a collective shuffle operation (a {term}`Shuffler`), at
a high level, a shuffle operation involves these steps:

1. [user code] Each rank *inserts* **chunks** of data to the Shuffler,
1. [user code] Each {term}`Rank` *inserts* **chunks** of data to the {term}`Shuffler`,
followed by a finish marker.
2. [rapidsmpf] The Shuffler on that rank processes that chunk by either sending it to
another rank or keeping it for itself.
3. [user code] Each rank *extracts* chunks of data from each once it's
2. [rapidsmpf] The {term}`Shuffler` on that {term}`Rank` processes that chunk by either sending it to
another {term}`Rank` or keeping it for itself.
3. [user code] Each {term}`Rank` *extracts* chunks of data from each once it's
ready.

There are more details around how chunks are assigned to output ranks and how memory is
There are more details around how chunks are assigned to output {term}`Rank`s and how memory is
managed. But at a high level, your program is responsible for inserting chunks
somewhere and extracting (the now shuffled) chunks once they've been moved to
the correct rank.
the correct {term}`Rank`.

This diagram shows a network of with three ranks in the middle of a Shuffle operation.
This diagram shows a {term}`Network` with three {term}`Rank`s in the middle of a Shuffle operation.

![A diagram showing a shuffle.](../_static/rapidsmpf-shuffler-transparent-fs8.png)

As your program inserts chunks of data (see below), each chunk is assigned to
a particular rank. In the diagram above, this is shown by color: each
process (recall a process is uniquely identified by a `(rank,
communicator)` pair) has a particular color (the color of its circle) and each chunk with that color will
be sent to its matching rank. So, for example, all of the green chunks will be
a particular {term}`Rank`. In the diagram above, this is shown by color: each
process (recall a process is uniquely identified by a `({term}`Rank`,
{term}`Communicator`)` pair) has a particular color (the color of its circle) and each chunk with that color will
be sent to its matching {term}`Rank`. So, for example, all of the green chunks will be
extracted from the green process in the top-left. Note that the number of different
chunk types (colors in this diagram) is typically larger than the number of ranks,
chunk types (colors in this diagram) is typically larger than the number of {term}`Rank`s,
and so each process will be responsible for multiple output chunk types.

The process you insert the chunk on is responsible for getting the data to the
correct output rank. It does so by placing the chunk in its **Outgoing** message
correct output {term}`Rank`. It does so by placing the chunk in its **Outgoing** message
box and then working to send it (shown by the black lines connecting the processes).

Internally, the processes involved in a shuffle continuously

- receive newly inserted chunks from your program
- move chunks to their intended ranks
- receive chunks from other ranks
- move chunks to their intended {term}`Rank`s
- receive chunks from other {term}`Rank`s
- hand off *ready* chunks when your program extracts them

During a shuffle, device memory might run low on more or more processes . `rapidsmpf` is able to *spill* chunks of data from device memory to a
During a shuffle, device memory might run low on one or more processes. `rapidsmpf` is able to *spill* chunks of data from device memory to a
larger pool (e.g. host memory). In the diagram above, this is shown by the
hatched chunks.
hatched chunks. See {term}`Spilling` for more details.

### Example: Shuffle a Table on a Column

The `rapidsmpf` Shuffler operates on **chunks** of data, without really caring
The `rapidsmpf` {term}`Shuffler` operates on **chunks** of data, without really caring
what those bytes represent. But one common use case is shuffling a table on (the
hash of) one or more columns. In this scenario, `rapidsmpf` can be used as part
of a Shuffle Join implementation.

This diagram shows multiple nodes working together to shuffle a large, logical
This diagram shows multiple {term}`Rank`s working together to shuffle a large, logical
Table.

![A diagram showing how to use rapidsmpf to shuffle a table.](../_static/rapidsmpf-shuffle-table-fs8.png)

Suppose you have a large logical table that's split into a number of partitions.
Suppose you have a large logical table that's split into a number of {term}`Partition`s.
In the diagram above, this is shown as the different dashed boxes on the
left-hand side. In this example, we've shown four partitions, but this could be
left-hand side. In this example, we've shown four {term}`Partition`s, but this could be
much larger. Each row in the table is assigned to some group (by the hash of the
columns you're joining on, say), which is shown by the color of the row.

Your program **inserts** data to the shuffler. In this case, it's inserting
Your program **inserts** data to the {term}`Shuffler`. In this case, it's inserting
chunks that represent pieces of the table that have been partitioned (by hash
key) and packed into a chunk.

Each rank involved in the shuffle knows which ranks are responsible for which
hash keys. For example, rank 1 knows that it's responsible for the purple
chunks, needs to send red chunks to rank 2, etc.
Each {term}`Rank` involved in the shuffle knows which {term}`Rank`s are responsible for which
hash keys. For example, {term}`Rank` 1 knows that it's responsible for the purple
chunks, needs to send red chunks to {term}`Rank` 2, etc.

Each input partition possibly includes data for each hash key. All the processes
Each input {term}`Partition` possibly includes data for each hash key. All the processes
involved in the shuffle move data to get all the chunks with a particular hash
key to the correct rank (spilling if needed). This is shown in the middle
key to the correct {term}`Rank` ({term}`Spilling` if needed). This is shown in the middle
section.

As chunks become "ready" (see above), your program can **extract** chunks and
process them as necessary. This is shown on the right-hand side.

### Shuffle Statistics

Shuffles can be configured to collect statistics, which can help you understand the performance of the system.
Shuffles can be configured to collect {term}`Statistics`, which can help you understand the performance of the system.
This table gives an overview of the different statistics collected.

| Name | Type | Description |
Expand Down
Loading