Skip to content

Commit 500fca9

Browse files
authored
Streaming join (#545)
1 parent 76d50ff commit 500fca9

18 files changed

+939
-11
lines changed

docs/bidirectional-join.md

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
# Bidirctional Join
2+
3+
**Bidirectional Join** allows two streams to be joined bidirectionally — i.e., rows from both the **left** and **right** streams are buffered and joined against each other so that matches produced by either side are emitted. This is different from typical enrichment joins (where only the left stream probes a static or changing RHS). Bidirectional joins are useful for exploratory queries, historical+real-time consistency checks, and scenarios where both sides evolve and you need to materialize the full cross-match over time.
4+
5+
**Bidirectional Join** can be applied in two distinct scenarios:
6+
7+
1. **Finite Cardinality with Data Mutations**. This includes joins where data updates or replacements occur, but the key space remains bounded:
8+
- Mutable Stream ⨝ Mutable Stream
9+
- Versioned-KV Stream ⨝ Versioned-KV Stream
10+
- Changelog-KV Stream ⨝ Changelog-KV Stream
11+
2. **Unbounded Append-Only Data** . This applies to joins where both sides continuously append new records without updates:
12+
- Append-only Stream ⨝ Append-only Stream
13+
14+
## Syntax
15+
16+
```sql
17+
SELECT
18+
*
19+
FROM left_stream [LEFT | RIGHT | INNER | FULL] JOIN right_stream
20+
ON left_stream.key = right_stream.key
21+
```
22+
23+
## Bidirectional Join with Data Mutation
24+
25+
In this mode, **both the left and right streams** can mutate over time. Timeplus buffers data from both sides and continuously updates the join state as new events arrive. For each join key, only the **latest version** of the data is retained — effectively maintaining a **real-time snapshot** of the key/value tables on both sides, similar to an OLTP system.
26+
27+
As new updates arrive, outdated (overridden) records are **garbage collected** to keep memory usage bounded. When the set of joined keys is finite, the buffered state remains stable and bounded over time.
28+
29+
Internally, the join process follows the principle of **“baseline + incremental updates”**:
30+
- The system first loads a **baseline snapshot** (latest compacted data) from historical storage.
31+
- Then it connects to real-time stream events to **apply incremental changes** and keep the in-memory state up to date.
32+
- Both left and right streams maintain their own hash tables that evolve as new records arrive.
33+
- When a key is updated, Timeplus emits **retraction events** to cancel previous joined results and then produces **new joined rows** to reflect the latest state.
34+
35+
This ensures that any downstream aggregate or materialized view based on the join remains **consistent and correct**.
36+
37+
When the primary key space is finite (finite cardinality), the corresponding joined data set also remains bounded.
38+
39+
### Example
40+
41+
```sql
42+
CREATE MUTABLE STREAM left_mu (
43+
i int,
44+
k string,
45+
k1 string
46+
)
47+
PRIMARY KEY (k, k1);
48+
49+
CREATE MUTABLE STREAM right_mu (
50+
ii int,
51+
kk string,
52+
kk1 string
53+
)
54+
PRIMARY KEY (kk, kk1);
55+
56+
SELECT * FROM left_mu JOIN right_mu ON left_mu.k = right_mu.kk;
57+
```
58+
59+
The following diagram illustrates how the above **Mutable Stream ⨝ Mutable Stream** join works when the join uses a **partial primary key** — for example, the primary key is `(k, kk)` but the join key is only `k`.
60+
61+
![BidirectionalJoinMutableStream](/img/bidirectional-join-mutable-stream.svg)
62+
63+
In the diagram above, the join key `k1` appears multiple times in both streams:
64+
65+
- In the **left hash table**, `k1` maps to three unique primary keys:
66+
`(k1, kk1)`, `(k1, kk2)`, `(k1, kk3)`
67+
- In the **right hash table**, `k1` maps to two unique primary keys:
68+
`(k1, kk4)` and `(k1, kk5)`
69+
70+
When joined, they produce **2 × 3 = 6 joined rows**.
71+
72+
Now, suppose the user inserts a new record into the right stream which updates primary key `(k1, kk5)`:
73+
74+
```sql
75+
INSERT INTO right_stream VALUES (k1, kk5, v55);
76+
```
77+
78+
Here’s how the **internal retraction and update process** unfolds:
79+
1. Timeplus looks up `(k1, kk5)` in another *assistant* hash table created for the join and finds the existing value `(k1, kk5, v5)`.
80+
2. The old row is **retracted** as:
81+
```scss
82+
(k1, kk5, v5, -1)
83+
```
84+
and emitted downstream.
85+
3. The right hash table replaces the old value with the new one in the assitant hash table:
86+
```scss
87+
(k1, kk5, v5) → (k1, kk5, v55)
88+
```
89+
4. A new **update event** is emitted:
90+
```scss
91+
(k1, kk5, v55, +1)
92+
```
93+
94+
Next, both the retraction and update rows are joined with the **left hash table**:
95+
96+
**Retraction phase**:
97+
The retraction `(k1, kk5, v5, -1)` joins with all three left-side records:
98+
```scss
99+
(k1, kk1, v1, right.k1, right.kk5, right.v5, -1)
100+
(k1, kk2, v2, right.k1, right.kk5, right.v5, -1)
101+
(k1, kk3, v3, right.k1, right.kk5, right.v5, -1)
102+
```
103+
104+
**Update phase**:
105+
The update `(k1, kk5, v55, +1)` joins with the same three left-side records:
106+
```scss
107+
(k1, kk1, v1, right.k1, right.kk5, right.v55, +1)
108+
(k1, kk2, v2, right.k1, right.kk5, right.v55, +1)
109+
(k1, kk3, v3, right.k1, right.kk5, right.v55, +1)
110+
```
111+
112+
Finally, the right-side hash table updates its internal mapping:
113+
```scss
114+
k1 → (kk5, v55)
115+
```
116+
117+
This process ensures downstream consumers always see a consistent and up-to-date join result, even as both sides continuously mutate and the following diagram illustrate this retract and update process.
118+
119+
![BidirectionalJoinMutableStreamRetract](/img/bidirectional-join-mutable-stream-retract.svg)
120+
121+
**Run the join and aggregation with concrete data samples**:
122+
123+
```sql
124+
-- Perform a bidirectional join on partial primary key between the two streams
125+
-- and observe the join results and the retraction process
126+
-- in a different console (console-1)
127+
SELECT *, _tp_delta
128+
FROM left_mu
129+
JOIN right_mu
130+
ON left_mu.k = right_mu.kk
131+
EMIT CHANGELOG;
132+
133+
-- Aggregate results to observe join output in a different console (console-2)
134+
SELECT
135+
count(),
136+
min(i), max(i), avg(i),
137+
min(ii), max(ii), avg(ii)
138+
FROM left_mu
139+
JOIN right_mu
140+
ON left_mu.k = right_mu.kk;
141+
142+
-- Insert initial rows
143+
INSERT INTO left_mu(i, k, k1) VALUES (1, 'a', 'b');
144+
INSERT INTO right_mu(ii, kk, kk1) VALUES (11, 'a', 'bb');
145+
146+
-- Initial join results in console-1
147+
-- ┌─i─┬─k─┬─k1─┬────────────────_tp_time─┬─ii─┬─kk─┬─kk1─┬───────right_mu._tp_time─┬─_tp_delta─┬─_tp_delta─┐
148+
-- │ 1 │ a │ b │ 2025-10-25 00:20:10.032 │ 11 │ a │ bb │ 2025-10-25 00:20:15.236 │ 1 │ 1 │
149+
-- └───┴───┴────┴─────────────────────────┴────┴────┴─────┴─────────────────────────┴───────────┴───────────┘
150+
151+
-- Initial aggregation results in console-2
152+
-- ┌─count()─┬─min(i)─┬─max(i)─┬─avg(i)─┬─min(ii)─┬─max(ii)─┬─avg(ii)─┐
153+
-- │ 1 │ 1 │ 1 │ 1 │ 11 │ 11 │ 11 │
154+
-- └─────────┴────────┴────────┴────────┴─────────┴─────────┴─────────┘
155+
156+
-- Update existing rows to trigger retract/update behavior
157+
INSERT INTO left_mu(i, k, k1) VALUES (2, 'a', 'b');
158+
159+
-- Retract and update join results in console-1
160+
-- ┌─i─┬─k─┬─k1─┬────────────────_tp_time─┬─ii─┬─kk─┬─kk1─┬───────right_mu._tp_time─┬─_tp_delta─┬─_tp_delta─┐
161+
-- │ 1 │ a │ b │ 2025-10-25 00:20:10.032 │ 11 │ a │ bb │ 2025-10-25 00:20:15.236 │ -1 │ -1 │
162+
-- └───┴───┴────┴─────────────────────────┴────┴────┴─────┴─────────────────────────┴───────────┴───────────┘
163+
-- ┌─i─┬─k─┬─k1─┬────────────────_tp_time─┬─ii─┬─kk─┬─kk1─┬───────right_mu._tp_time─┬─_tp_delta─┬─_tp_delta─┐
164+
-- │ 2 │ a │ b │ 2025-10-25 00:20:31.836 │ 11 │ a │ bb │ 2025-10-25 00:20:15.236 │ 1 │ 1 │
165+
-- └───┴───┴────┴─────────────────────────┴────┴────┴─────┴─────────────────────────┴───────────┴───────────┘
166+
167+
-- Retract and update aggregation results in console-2
168+
-- ┌─count()─┬─min(i)─┬─max(i)─┬─avg(i)─┬─min(ii)─┬─max(ii)─┬─avg(ii)─┐
169+
-- │ 1 │ 2 │ 2 │ 2 │ 11 │ 11 │ 11 │
170+
-- └─────────┴────────┴────────┴────────┴─────────┴─────────┴─────────┘
171+
172+
INSERT INTO right_mu(ii, kk, kk1) VALUES (22, 'a', 'bb');
173+
174+
-- More retract and update join result in console-1
175+
-- ┌─i─┬─k─┬─k1─┬────────────────_tp_time─┬─ii─┬─kk─┬─kk1─┬───────right_mu._tp_time─┬─_tp_delta─┬─_tp_delta─┐
176+
-- │ 2 │ a │ b │ 2025-10-25 00:20:31.836 │ 11 │ a │ bb │ 2025-10-25 00:20:15.236 │ -1 │ -1 │
177+
-- └───┴───┴────┴─────────────────────────┴────┴────┴─────┴─────────────────────────┴───────────┴───────────┘
178+
-- ┌─i─┬─k─┬─k1─┬────────────────_tp_time─┬─ii─┬─kk─┬─kk1─┬───────right_mu._tp_time─┬─_tp_delta─┬─_tp_delta─┐
179+
-- │ 2 │ a │ b │ 2025-10-25 00:20:31.836 │ 22 │ a │ bb │ 2025-10-25 00:20:36.827 │ 1 │ 1 │
180+
-- └───┴───┴────┴─────────────────────────┴────┴────┴─────┴─────────────────────────┴───────────┴───────────┘
181+
182+
-- More retract and update aggregation results in console-2
183+
-- ┌─count()─┬─min(i)─┬─max(i)─┬─avg(i)─┬─min(ii)─┬─max(ii)─┬─avg(ii)─┐
184+
-- │ 1 │ 2 │ 2 │ 2 │ 22 │ 22 │ 22 │
185+
-- └─────────┴────────┴────────┴────────┴─────────┴─────────┴─────────┘
186+
187+
-- Compare streaming aggregation results in console-2 and
188+
-- this historical query aggregation results, they shall keep the same
189+
SELECT
190+
count(),
191+
min(i), max(i), avg(i),
192+
min(ii), max(ii), avg(ii)
193+
FROM table(left_mu) AS left_mu
194+
JOIN table(right_mu) AS right_mu
195+
ON left_mu.k = right_mu.kk;
196+
197+
-- Historical aggregation results
198+
-- ┌─count()─┬─min(i)─┬─max(i)─┬─avg(i)─┬─min(ii)─┬─max(ii)─┬─avg(ii)─┐
199+
-- │ 1 │ 2 │ 2 │ 2 │ 22 │ 22 │ 22 │
200+
-- └─────────┴────────┴────────┴────────┴─────────┴─────────┴─────────┘
201+
```
202+
203+
### Memory Efficiency
204+
205+
**Bidirectional Join with Data Mutation** can still consume significant memory when the **cardinality** of the join keys is very high.
206+
To mitigate this, you can enable a **hybrid hash join**, which keeps **hot keys in memory** while **spilling cold keys to disk**, achieving a balance between performance and memory efficiency.
207+
208+
**Example:**
209+
210+
```sql
211+
SELECT *
212+
FROM left_mu
213+
JOIN right_mu
214+
ON left_mu.k = right_mu.kk
215+
SETTINGS default_hash_join = 'hybrid';
216+
```
217+
218+
If the same query includes an **aggregation** and the aggregation’s cardinality is also large, you can enable **hybrid aggregation** by setting `default_hash_table='hybrid'`.
219+
This allows the aggregation hash table to spill to disk when memory thresholds are reached.
220+
221+
**Example**:
222+
```
223+
SELECT k, k1, kk, kk1, count()
224+
FROM left_mu
225+
JOIN right_mu
226+
ON left_mu.k = right_mu.kk
227+
GROUP BY k, k1, kk, kk1
228+
SETTINGS default_hash_join='hybrid', default_hash_table='hybrid';
229+
```
230+
231+
## Bidirectional Join Without Data Mutation
232+
233+
**Append-only ⨝ Append-only (Experimental)**
234+
235+
In this mode, both the **left** and **right** input streams are *append-only*, meaning that no data mutations or updates occur after insertion.
236+
Since a bidirectional join needs to buffer **all** source data from both sides to match possible keys, this leads to an **unbounded data growth problem** — as the streams continue to append data indefinitely.
237+
238+
This join type is currently **experimental** and best suited for **ad-hoc analysis** or exploratory workloads in the Timeplus console, where users can quickly visualize or test streaming joins.
239+
240+
Internally, Timeplus uses a query setting called **`join_max_buffered_bytes`** to control the maximum amount of buffered source data.
241+
Once this limit is reached, the system will **abort the query** to prevent memory exhaustion.
242+
243+
Even if the join key space is finite, the joined value combinations can still grow without bound since every new record is treated as a unique event.
244+
In the future, Timeplus may enhance this join type by adding **automatic garbage collection** for stale or expired join data, enabling more stable long-running global joins.
245+
246+
The following diagram illustrates this join behavior at a high level:
247+
248+
![BidirectionalJoinAppendOnlyStream](/img/bidirectional-join-append-only.svg)
249+
250+
### Example
251+
252+
```sql
253+
CREATE STREAM left_append(i int, k string);
254+
CREATE STREAM right_append(ii int, kk string);
255+
256+
SELECT * FROM
257+
left_append JOIN right_append
258+
ON left_append.k = right_append.kk
259+
SETTINGS join_max_buffered_bytes=102400000;
260+
261+
INSERT INTO left_append(i, k) VALUES (1, 'a');
262+
INSERT INTO right_append(ii, kk) VALUES (22, 'a');
263+
```

docs/bidirectional-range-join.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Bidirectional Range Join
2+
3+
**Bidirectional Range Join** is an enhancement of the regular bidirectional join designed to reduce memory consumption and improve performance when dealing with large or unbounded data streams.
4+
5+
For append stream, while a standard bidirectional join buffers **all** records from both sides, a **range-based** approach **buckets data by time or value ranges**, limiting how much data must be retained in memory at any given moment.
6+
7+
## Syntax
8+
9+
```sql
10+
SELECT *
11+
FROM left_append_stream
12+
[LEFT | INNER] JOIN right_append_stream
13+
ON left_append_stream.key = right_append_stream.key AND date_diff_within([<left_timestamp_col>, <right_timestamp_col>], <time_range>);
14+
```
15+
16+
The `date_diff_within(...)` clause defines the time range condition for joining records across streams.
17+
- `left_timestamp_col / right_timestamp_col` — Timestamp or numeric columns used to compute the range difference.
18+
If omitted, the system uses _tp_time for both streams by default.
19+
- `time_range` — The maximum time difference allowed between the two sides for them to be considered a match.
20+
(e.g., `2m`, `5s`, `1h`).
21+
22+
**Example Variants**:
23+
```sql
24+
-- Use default timestamp columns (_tp_time)
25+
ON left_append_stream.key = right_append_stream.key AND date_diff_within(2m);
26+
27+
-- Specify explicit timestamp columns
28+
ON left_append_stream.key = right_append_stream.key
29+
AND date_diff_within(left_append_stream.event_time, right_append_stream.event_time, 10s);
30+
31+
-- Use non-time-based numeric ranges
32+
ON left_append_stream.key = right_append_stream.key
33+
AND left_append_stream.seq < right_append_stream.seq + 100;
34+
```
35+
36+
## Example
37+
38+
```sql
39+
CREATE STREAM left_range(i int, k string);
40+
CREATE STREAM right_range(ii int, kk string);
41+
42+
-- Run bidirectional range join in another console (console-1)
43+
SELECT * FROM left_range JOIN right_range ON left_range.k = right_range.kk AND date_diff_within(30s);
44+
45+
INSERT INTO left_range(i, k, _tp_time) VALUES (1, 'a', '2023-07-01 00:01:00');
46+
INSERT INTO right_range(ii, kk, _tp_time) VALUES (22, 'a', '2023-07-01 00:00:50');
47+
48+
-- We will observe the join these results in console-1
49+
-- ┌─i─┬─k─┬────────────────_tp_time─┬─ii─┬─kk─┬────right_range._tp_time─┐
50+
-- │ 1 │ a │ 2023-07-01 00:01:00.000 │ 22 │ a │ 2023-07-01 00:00:50.000 │
51+
-- └───┴───┴─────────────────────────┴────┴────┴─────────────────────────┘
52+
```
53+
54+
At runtime, the engine performs the following:
55+
56+
1. **Buckets Source Data**
57+
- Each side’s records are grouped into **range buckets** based on the time or numeric field defined in the join condition.
58+
- For example, if the range is `30s`, records are partitioned into 30-second intervals.
59+
2. **Builds Range-Specific Hash Tables**
60+
- For each active range bucket, a **bidirectional hash table** is built to store the records from both sides.
61+
3. **Performs Range-Aware Joins**
62+
When a new record arrives, the system:
63+
- Determines which **range bucket(s)** it belongs to.
64+
- Probes corresponding hash tables in relevant ranges to find matching records.
65+
- Emits joined results immediately when matches are found.
66+
4. **Evicts Old Ranges**
67+
- Once a range bucket exceeds the configured window or retention period which is decided by an internal combined watermark, its buffered data is **evicted**, freeing memory.
68+
69+
The following diagram depicits this join behavior.
70+
71+
![BidirectionalRangeJoin](/img/bidirectional-range-join.svg)

docs/sql-create-dictionary.md renamed to docs/dictionary.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -562,10 +562,6 @@ Then you can query the dictionary with the `dict_get` function:
562562

563563
```sql
564564
SELECT dict_get('dict_pg_table','desc','192.168.1.1');
565-
```
566-
567-
## Limitations
568-
* Creating a dictionary from PostgreSQL is not supported.
569565

570566
## See also
571567
* [SHOW DICTIONARIES](/sql-show-dictionaries) - Show dictionaries

0 commit comments

Comments
 (0)