Skip to content

Commit d6a486a

Browse files
authored
shuffle by (#488)
1 parent 7d3977f commit d6a486a

File tree

5 files changed

+2000
-0
lines changed

5 files changed

+2000
-0
lines changed

docs/shuffle-data.md

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Shuffle Data
2+
3+
**Shuffle** is the process of redistributing upstream (multi-shard) data into multiple downstream substreams based on one of the group-by key columns. Each substream can then be processed independently, without requiring coordination or state merging during finalization.
4+
5+
Internally, the streaming watermark is also maintained per substream. This is useful when event timestamps drift — after shuffling, timestamps can be realigned.
6+
7+
Shuffling provides two key benefits:
8+
9+
- Better parallelization: Each substream can be processed independently.
10+
- Improved memory efficiency: Especially effective when group-by key cardinality is high and source keys are spread across different shards.
11+
12+
By default, Timeplus does not shuffle data. To enable it, you must use the **`SHUFFLE BY`** clause in SQL.
13+
14+
## Syntax
15+
16+
```sql
17+
SELECT ...
18+
FROM ...
19+
SHUFFLE BY col1, ...
20+
GROUP BY col1, col2, ...
21+
EMIT ...
22+
SETTINGS num_target_shards=<num-sub-streams>
23+
```
24+
25+
> Note: The columns in the `SHUFFLE BY` clause must be a subset of the `GROUP BY` columns to ensure correct aggregation results.
26+
27+
**Example**:
28+
29+
```sql
30+
CREATE STREAM device_utils(
31+
location string,
32+
device string,
33+
cpu float32
34+
);
35+
36+
SELECT
37+
location,
38+
device,
39+
min(cpu),
40+
max(cpu),
41+
avg(cpu)
42+
FROM device_utils
43+
SHUFFLE BY location
44+
GROUP BY location, device
45+
EMIT ON UPDATE WITH BATCH 1s;
46+
```
47+
48+
In this example:
49+
50+
- The input stream `device_utils` has a single source shard.
51+
- The `SHUFFLE BY` location clause splits the data into multiple coarse-grained substreams.
52+
- All events for the same location are guaranteed to reside in exactly one substream.
53+
- After shuffling, CPU utilization aggregations (min, max, avg) can be computed independently per substream.
54+
55+
This enables better parallelization, since the one-to-many fan-out creates multiple substreams that can be processed in separate threads.
56+
57+
The internal query plan for the above example looks like this:
58+
59+
![ShufflePipelineOne](/img/shuffle-pipeline-one-to-many.svg)
60+
61+
## Control the Fan-Out
62+
63+
By default, the system automatically determines the number of substreams after a shuffle. This default value may not be optimal, especially on nodes with many CPUs.
64+
65+
To customize this behavior, you can use the **`num_target_shards`** setting to control the number of target substreams.
66+
- If not specified, the system typically chooses a value equal to the number of CPUs on the node.
67+
68+
**Example: Many-to-Many Data Shuffle**
69+
70+
```sql
71+
CREATE STREAM device_utils(
72+
location string,
73+
device string,
74+
cpu float32
75+
) SETTINGS shards=3;
76+
77+
SELECT
78+
location,
79+
device,
80+
min(cpu),
81+
max(cpu),
82+
avg(cpu)
83+
FROM device_utils
84+
SHUFFLE BY location
85+
GROUP BY location, device
86+
EMIT ON UPDATE WITH BATCH 1s
87+
SETTINGS num_target_shards=8;
88+
```
89+
90+
The default system picked number of substreams after shuffle may be not ideal, especially when there are lots of CPUs in the node. You can use setting **`num_target_shards`** to control the number of target substreams. If it is not explicitly specified, the system will pick a value which is usually the number of CPUs of the node.
91+
92+
The internal query plan for the above query looks like this:
93+
94+
![ShufflePipelineMany](/img/shuffle-pipeline-many-to-many.svg)
95+
96+
:::info
97+
The `num_target_shards` value is always rounded **up to the nearest power of 2** for better shuffle performance. For example, if specifying `5` will be rounded to `8`.
98+
:::
99+
100+
## Data Already Shuffled in Storage
101+
102+
In some cases, data is **already partitioned in storage** (for example, data correctly partitioned across Kafka partitions).
103+
Performing an additional shuffle in these scenarios introduces unnecessary overhead.
104+
105+
To avoid this, you can enable **independent and parallel shard processing** with:
106+
107+
```sql
108+
SETTINGS allow_independent_shard_processing=true
109+
```
110+
111+
**Example:**
112+
113+
```sql
114+
CREATE STREAM device_utils(
115+
location string,
116+
device string,
117+
cpu float32
118+
)
119+
SETTINGS shards=3, sharding_expr='weak_hash32((location, device))';
120+
121+
SELECT
122+
location,
123+
device,
124+
min(cpu),
125+
max(cpu),
126+
avg(cpu)
127+
FROM device_utils
128+
GROUP BY location, device
129+
EMIT ON UPDATE WITH BATCH 1s
130+
SETTINGS allow_independent_shard_processing=true;
131+
```
132+
133+
In this example:
134+
135+
- The stream `device_utils` is created with **3 shards** and a custom sharding expression on `(location, device)`.
136+
- Since the data is already partitioned correctly, additional shuffling can be skipped for this aggregation.
137+
- Aggregations (min, max, avg) can be computed directly and in parallel across shards.
138+
139+
The internal query plan for the above query looks like this:
140+
141+
![ShufflePipelineInStorage](/img/shuffle-pipeline-in-storage.svg)

sidebars.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ const sidebars = {
260260
},
261261
]
262262
},
263+
{
264+
type: "doc",
265+
id: "shuffle-data",
266+
label: "Shuffle Data"
267+
},
263268
{
264269
type: "doc",
265270
id: "jit",
Lines changed: 174 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)