Skip to content

Commit bd1f99d

Browse files
authored
refine global aggregation (#490)
1 parent d6a486a commit bd1f99d

File tree

1 file changed

+79
-9
lines changed

1 file changed

+79
-9
lines changed

docs/global-aggregation.md

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,103 @@
11
# Global Aggregation
22

3-
## Overview
3+
## Overview
44

5-
In Timeplus, we define global aggregation as an aggregation query without using streaming windows like tumble, hop. Unlike streaming window aggregation, global streaming aggregation doesn't slice
6-
the unbound streaming data into windows according to timestamp, instead it processes the unbounded streaming data as one huge big global window. Due to this property, Timeplus for now can't
7-
recycle in-memory aggregation states / results according to timestamp for global aggregation.
5+
**Global aggregation** refers to running an aggregation query **without using streaming windows** such as `TUMBLE`, `HOP`, or `SESSION`.
6+
7+
Unlike windowed aggregations, global aggregation does not slice unbounded streaming data into time-based windows. Instead, it treats the entire unbounded stream as a **single global window**.
8+
9+
With global aggregation:
10+
- The query continuously updates aggregation results over all incoming data.
11+
- Users don’t need to worry about **late events**, since there are no time windows to close.
12+
13+
## Syntax
814

915
```sql
1016
SELECT <column_name1>, <column_name2>, <aggr_function>
1117
FROM <stream_name>
12-
[WHERE clause]
13-
EMIT PERIODIC [<n><UNIT>]
18+
[WHERE <condition>]
19+
GROUP BY col1, col2, ...
20+
EMIT PERIODIC <n><UNIT>
1421
```
1522

16-
`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
23+
The `EMIT PERIODIC <n><UNIT>` clause tells Timeplus to periodically emit aggregation results.
24+
- `<n>` must be an integer greater than 0.
25+
- `<UNIT>` can be one of:
1726

18-
Examples
27+
- ms (milliseconds)
28+
- s (seconds)
29+
- m (minutes)
30+
- h (hours)
1931

32+
**Examples:**
2033
```sql
2134
SELECT device, count(*)
2235
FROM device_utils
2336
WHERE cpu_usage > 99
37+
GROUP BY device
2438
EMIT PERIODIC 5s
2539
```
2640

27-
Like in [Streaming Tail](/query-syntax#streaming-tailing), Timeplus continuously monitors new events in the stream `device_utils`, does the filtering and then continuously does **incremental** count aggregation. Whenever the specified delay interval is up, project the current aggregation result to clients.
41+
In this example:
42+
- The query continuously monitors new events in the stream `device_utils`.
43+
- It filters rows where cpu_usage > 99.
44+
- An **incremental count aggregation** is maintained per `device`.
45+
- Every **5 seconds**, the current aggregation result is emitted to clients.
46+
47+
## TTL of Aggregation Keys
48+
49+
Global aggregation does not automatically garbage-collect intermediate aggregation states after each emit.
50+
If the grouping keys grow monotonically over time (for example, when timestamps are part of the key), memory usage can eventually **blow up**.
51+
52+
To address this challenge, you can use a **hybrid hash table** for aggregation states:
53+
- Hot keys are kept in memory.
54+
- Cold keys are spilled to disk using an LRU-like algorithm.
55+
- Combined with a **TTL for keys**, this approach provides the best of both worlds:
56+
- Handles very late events.
57+
- Prevents unbounded memory growth.
58+
59+
**Example:**
60+
61+
```sql
62+
CREATE STREAM device_utils(
63+
location string,
64+
device string,
65+
cpu float32
66+
) SETTINGS shards=3;
67+
68+
SELECT
69+
to_start_of_interval(_tp_time, 5m) AS bucket_window_start,
70+
location,
71+
device,
72+
min(cpu),
73+
max(cpu),
74+
avg(cpu)
75+
FROM device_utils
76+
SHUFFLE BY location
77+
GROUP BY bucket_window_start, location, device
78+
EMIT ON UPDATE WITH BATCH 1s
79+
SETTINGS
80+
num_target_shards=8,
81+
default_hash_table='hybrid',
82+
max_hot_keys=100000,
83+
aggregate_state_ttl_sec=3600;
84+
```
85+
86+
- This query performs a **global aggregation** to calculate CPU metrics in **5-minute buckets per device**.
87+
- The grouping key includes `bucket_window_start`, which increases monotonically with time.
88+
- The hybrid hash table is enabled via `default_hash_table='hybrid'`.
89+
- Keeps up to `100,000` hot keys in memory per substream.
90+
- Cold keys are spilled to disk automatically.
91+
- The TTL is set to `3600` seconds (`aggregate_state_ttl_sec=3600`):
92+
- Keys not updated for an hour are garbage-collected from disk.
93+
- Prevents infinite state accumulation.
94+
- Data shuffling is enabled (SHUFFLE BY location) for better **parallelism and memory efficiency**.
95+
- See [Data Shuffle](/shuffle-data) for more details.
2896

2997
## Emit Policies
3098

99+
Global aggregation supports different `emit policies` to control when you like to get the intermidiate results pushing out.
100+
31101
### EMIT PERIODIC {#emit_periodic}
32102

33103
`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.

0 commit comments

Comments
 (0)