Skip to content

Streaming Q4 implementation#710

Open
TomAugspurger wants to merge 85 commits intorapidsai:mainfrom
TomAugspurger:tom/streaming-q4
Open

Streaming Q4 implementation#710
TomAugspurger wants to merge 85 commits intorapidsai:mainfrom
TomAugspurger:tom/streaming-q4

Conversation

@TomAugspurger
Copy link
Contributor

This implements TPCH query 4 using rapidsmpf.

The primary notable thing about q4 is the left semi join / where exists between the two tables:

q = (
    # SQL exists translates to semi join in Polars API
    orders.join(
        (lineitem.filter(pl.col("l_commitdate") < pl.col("l_receiptdate"))),
        left_on="o_orderkey",
        right_on="l_orderkey",
        how="semi",
    )
    ...
)

I want to think a bit more about how to do this. Right now, I've implemented a version that broadcasts the smaller orders table and shuffles the larger lineitem table. We're currently unable to reuse the filtered joiner across chunks. cudf only supports probing with the left table (the smaller one in our case, orders). The build table must be the right table (the larger one in our case, lineitem). So I think we have to shuffle lineitem otherwise we'll get incorrect results (a key in orders matching against a key in lineitem multiple times, once per partition).

@copy-pr-bot
Copy link

copy-pr-bot bot commented Dec 4, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@TomAugspurger TomAugspurger added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Dec 4, 2025
@TomAugspurger TomAugspurger marked this pull request as ready for review December 5, 2025 14:47
@TomAugspurger TomAugspurger requested review from a team as code owners December 5, 2025 14:47
@TomAugspurger TomAugspurger changed the title WIP: Streaming Q4 implementation Streaming Q4 implementation Dec 5, 2025
This was referenced Dec 9, 2025
@TomAugspurger
Copy link
Contributor Author

a338046 uses cudf::ast::column_name_reference instead of column_reference to avoid including o_orderdate in the output (which isn't used outside of the filter, which is done in the read_parquet now).

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments, I think this looks pretty good now!

Comment on lines +159 to +160
streaming::TableChunk const& left_chunk,
streaming::TableChunk&& right_chunk,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you take left_chunk by const ref, but right_chunk by rvalue reference (i.e. caller must move it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure (I'm reading up on the semantics of the two now).

We use this streaming:TableChunk&& type for the other functions (inner_join_chunk) so I suspect I was trying to match that. But when doing a broadcast left semi join, the left_chunk is reused many times, one per chunk, which I think means it can't be moved.

@TomAugspurger
Copy link
Contributor Author

TomAugspurger commented Jan 30, 2026

I included some changes to our cpp/scripts/ndsh.py to make testing this easier

python cpp/scripts/ndsh.py run-and-validate --input-dir scale-1 --output-dir validation  --benchmark-dir "cpp/build/benchmarks/ndsh/" --benchmark-args='--no-pinned-host-memory' --reuse-expected -q 4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants