Skip to content

Commit e97ed57

Browse files
kosiewkylebarrontimsaucer
authored
Add Arrow C streaming, DataFrame iteration, and OOM-safe streaming execution (#1222)
* feat: add streaming utilities, range support, and improve async handling in DataFrame - Add `range` method to SessionContext and iterator support to DataFrame - Introduce `spawn_stream` utility and refactor async execution for better signal handling - Add tests for `KeyboardInterrupt` in `__arrow_c_stream__` and incremental DataFrame streaming - Improve memory usage tracking in tests with psutil - Update DataFrame docs with PyArrow streaming section and enhance `__arrow_c_stream__` documentation - Replace Tokio runtime creation with `spawn_stream` in PySessionContext - Bump datafusion packages to 49.0.1 and update dependencies - Remove unused imports and restore main Cargo.toml * refactor: improve DataFrame streaming, memory management, and error handling - Refactor record batch streaming to use `poll_next_batch` for clearer error handling - Improve `spawn_future`/`spawn_stream` functions for better Python exception integration and code reuse - Update `datafusion` and `datafusion-ffi` dependencies to 49.0.2 - Fix PyArrow `RecordBatchReader` import to use `_import_from_c_capsule` for safer memory handling - Refactor `ArrowArrayStream` handling to use `PyCapsule` with destructor for improved memory management - Refactor projection initialization in `PyDataFrame` for clarity - Move `range` functionality into `_testing.py` helper - Rename test column in `test_table_from_batches_stream` for accuracy - Add tests for `RecordBatchReader` and enhance DataFrame stream handling * feat: enhance DataFrame streaming and improve robustness, tests, and docs - Preserve partition order in DataFrame streaming and update related tests - Add tests for record batch ordering and DataFrame batch iteration - Improve `drop_stream` to correctly handle PyArrow ownership transfer and null pointers - Replace `assert` with `debug_assert` for safer ArrowArrayStream validation - Add documentation for `poll_next_batch` in PyRecordBatchStream - Refactor tests to use `fail_collect` fixture for DataFrame collect - Refactor `range_table` return type to `DataFrame` for clearer type hints - Minor cleanup in SessionContext (remove extra blank line) * feat: add testing utilities for DataFrame range generation * feat: ensure proper resource management in DataFrame streaming * refactor: replace spawn_stream and spawn_streams with spawn_future for consistency * feat: add test for Arrow C stream schema selection in DataFrame * test: rename and extend test_arrow_c_stream_to_table to include RecordBatchReader validation * test: add validation for schema mismatch in Arrow C stream * fix Ruff errors * Update docs/source/user-guide/dataframe/index.rst Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * test: add batch iteration test for DataFrame * refactor: simplify stream capsule creation in PyDataFrame * refactor: enhance stream capsule management in PyDataFrame * refactor: enhance DataFrame and RecordBatchStream iteration support * refactor: improve docstrings for DataFrame and RecordBatchStream methods * refactor: add to_record_batch_stream method and improve iteration support in DataFrame * test: update test_iter_batches_dataframe to assert RecordBatch type and conversion * fix: update table creation from batches to use to_pyarrow conversion * test: add test_iter_returns_datafusion_recordbatch to verify RecordBatch type * docs: clarify RecordBatch reference and add PyArrow conversion example * test: improve test_iter_batches_dataframe to validate RecordBatch conversion * test: enhance test_arrow_c_stream_to_table_and_reader for batch equality validation * Shelve unrelated changes * Fix documentation to reference datafusion.RecordBatch instead of pyarrow.RecordBatch * Remove redundant to_record_batch_stream method from DataFrame class * Refactor Arrow stream creation in PyDataFrame to use PyCapsule directly * Add `once_cell` dependency and refactor Arrow array stream capsule name handling * Add `cstr` dependency and refactor Arrow array stream capsule name handling * Refactor test_iter_returns_datafusion_recordbatch to use RecordBatch directly * Add streaming execution examples to DataFrame documentation * Rename `to_record_batch_stream` to `execute_stream` and update references in the codebase; mark the old method as deprecated. * Clean up formatting in Cargo.toml for improved readability * Refactor Cargo.toml for improved formatting and readability * Update python/tests/test_io.py Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Update python/datafusion/dataframe.py Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Refactor test_table_from_batches_stream to use pa.table for improved clarity * Remove deprecated to_record_batch_stream method; use execute_stream instead * Add example for concurrent processing of partitioned streams using asyncio * Update documentation to reflect changes in execute_stream return type and usage * Update PyArrow streaming example to use pa.table for eager collection * Enhance documentation for DataFrame streaming API, clarifying schema handling and limitations * Clarify behavior of __arrow_c_stream__ execution, emphasizing incremental batch processing and memory efficiency * Add note on limitations of `arrow::compute::cast` for schema transformations * Update python/tests/test_io.py Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Rename test function for clarity: update `test_table_from_batches_stream` to `test_table_from_arrow_c_stream` * Update python/datafusion/dataframe.py Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Add documentation note for Arrow C Data Interface PyCapsule in DataFrame class * Enhance documentation on zero-copy streaming to Arrow-based Python libraries, clarifying the protocol and adding implementation-agnostic notes. * Fix formatting of section header for zero-copy streaming in DataFrame documentation * Refine zero-copy streaming documentation by removing outdated information about eager conversion, emphasizing on-demand batch processing to prevent memory exhaustion. * Add alternative method for creating RecordBatchReader from Arrow C stream * Refactor tests to use RecordBatchReader.from_stream instead of deprecated _import_from_c_capsule method * Replace deprecated _import_from_c_capsule method with from_stream for RecordBatchReader in test_arrow_c_stream_schema_selection * Update test description for arrow_c_stream_large_dataset to clarify streaming method and usage of public API * Add comments to clarify RSS measurement in test_arrow_c_stream_large_dataset * Fix ruff errors * Update async iterator implementation in DataFrame to ensure compatibility with Python < 3.10 * Fix async iterator implementation in DataFrame for compatibility with Python < 3.10 * fix typo * Fix formatting in DataFrame documentation and add example usage for Arrow integration * fix: correct formatting in documentation for RecordBatchStream * refactor: remove unused import from errors module in dataframe.rs * Simplified the streaming protocol description by removing the clause about arbitrarily large results while keeping the paragraph smooth. * Updated the Arrow streaming documentation to describe incremental execution, remove the note block, and highlight lazy batch retrieval when using __arrow_c_stream__ * Replaced the DataFrame.__arrow_c_stream__ docstring example with a link to the Apache Arrow streaming documentation for practical guidance. * fix: update user guide links in DataFrame class documentation for clarity * minor ruff change --------- Co-authored-by: Kyle Barron <kylebarron2@gmail.com> Co-authored-by: Tim Saucer <timsaucer@gmail.com>
1 parent d7e137e commit e97ed57

File tree

14 files changed

+743
-77
lines changed

14 files changed

+743
-77
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,34 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = [
30+
"/src",
31+
"/datafusion",
32+
"/LICENSE.txt",
33+
"build.rs",
34+
"pyproject.toml",
35+
"Cargo.toml",
36+
"Cargo.lock",
37+
]
3038

3139
[features]
3240
default = ["mimalloc"]
33-
protoc = [ "datafusion-substrait/protoc" ]
41+
protoc = ["datafusion-substrait/protoc"]
3442
substrait = ["dep:datafusion-substrait"]
3543

3644
[dependencies]
37-
tokio = { version = "1.47", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.25", features = ["extension-module", "abi3", "abi3-py310"] }
39-
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"]}
45+
tokio = { version = "1.47", features = [
46+
"macros",
47+
"rt",
48+
"rt-multi-thread",
49+
"sync",
50+
] }
51+
pyo3 = { version = "0.25", features = [
52+
"extension-module",
53+
"abi3",
54+
"abi3-py310",
55+
] }
56+
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] }
4057
pyo3-log = "0.12.4"
4158
arrow = { version = "56", features = ["pyarrow"] }
4259
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
@@ -45,16 +62,24 @@ datafusion-proto = { version = "50" }
4562
datafusion-ffi = { version = "50" }
4663
prost = "0.13.1" # keep in line with `datafusion-substrait`
4764
uuid = { version = "1.18", features = ["v4"] }
48-
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
65+
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
66+
"local_dynamic_tls",
67+
] }
4968
async-trait = "0.1.89"
5069
futures = "0.3"
51-
object_store = { version = "0.12.4", features = ["aws", "gcp", "azure", "http"] }
70+
cstr = "0.2"
71+
object_store = { version = "0.12.4", features = [
72+
"aws",
73+
"gcp",
74+
"azure",
75+
"http",
76+
] }
5277
url = "2"
5378
log = "0.4.27"
5479
parking_lot = "0.12"
5580

5681
[build-dependencies]
57-
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
82+
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
5883
pyo3-build-config = "0.25"
5984

6085
[lib]

docs/source/user-guide/dataframe/index.rst

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,118 @@ To materialize the results of your DataFrame operations:
196196
197197
# Display results
198198
df.show() # Print tabular format to console
199-
199+
200200
# Count rows
201201
count = df.count()
202202
203+
Zero-copy streaming to Arrow-based Python libraries
204+
---------------------------------------------------
205+
206+
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
207+
zero-copy, lazy streaming into Arrow-based Python libraries. With the streaming
208+
protocol, batches are produced on demand.
209+
210+
.. note::
211+
212+
The protocol is implementation-agnostic and works with any Python library
213+
that understands the Arrow C streaming interface (for example, PyArrow
214+
or other Arrow-compatible implementations). The sections below provide a
215+
short PyArrow-specific example and general guidance for other
216+
implementations.
217+
218+
PyArrow
219+
-------
220+
221+
.. code-block:: python
222+
223+
import pyarrow as pa
224+
225+
# Create a PyArrow RecordBatchReader without materializing all batches
226+
reader = pa.RecordBatchReader.from_stream(df)
227+
for batch in reader:
228+
... # process each batch as it is produced
229+
230+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
231+
objects lazily so you can loop over results directly without importing
232+
PyArrow:
233+
234+
.. code-block:: python
235+
236+
for batch in df:
237+
... # each batch is a ``datafusion.RecordBatch``
238+
239+
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
240+
table. ``pa.table(df)`` collects the entire DataFrame eagerly into a
241+
PyArrow table::
242+
243+
.. code-block:: python
244+
245+
import pyarrow as pa
246+
table = pa.table(df)
247+
248+
Asynchronous iteration is supported as well, allowing integration with
249+
``asyncio`` event loops::
250+
251+
.. code-block:: python
252+
253+
async for batch in df:
254+
... # process each batch as it is produced
255+
256+
To work with the stream directly, use ``execute_stream()``, which returns a
257+
:class:`~datafusion.RecordBatchStream`.
258+
259+
.. code-block:: python
260+
261+
stream = df.execute_stream()
262+
for batch in stream:
263+
...
264+
265+
Execute as Stream
266+
^^^^^^^^^^^^^^^^^
267+
268+
For finer control over streaming execution, use
269+
:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a
270+
:py:class:`datafusion.RecordBatchStream`:
271+
272+
.. code-block:: python
273+
274+
stream = df.execute_stream()
275+
for batch in stream:
276+
... # process each batch as it is produced
277+
278+
.. tip::
279+
280+
To get a PyArrow reader instead, call
281+
282+
``pa.RecordBatchReader.from_stream(df)``.
283+
284+
When partition boundaries are important,
285+
:py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
286+
returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per
287+
partition:
288+
289+
.. code-block:: python
290+
291+
for stream in df.execute_stream_partitioned():
292+
for batch in stream:
293+
... # each stream yields RecordBatches
294+
295+
To process partitions concurrently, first collect the streams into a list
296+
and then poll each one in a separate ``asyncio`` task:
297+
298+
.. code-block:: python
299+
300+
import asyncio
301+
302+
async def consume(stream):
303+
async for batch in stream:
304+
...
305+
306+
streams = list(df.execute_stream_partitioned())
307+
await asyncio.gather(*(consume(s) for s in streams))
308+
309+
See :doc:`../io/arrow` for additional details on the Arrow interface.
310+
203311
HTML Rendering
204312
--------------
205313

docs/source/user-guide/io/arrow.rst

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ Exporting from DataFusion
6060
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
6161
Python library that accepts these can import a DataFusion DataFrame directly.
6262

63-
.. warning::
64-
It is important to note that this will cause the DataFrame execution to happen, which may be
65-
a time consuming task. That is, you will cause a
66-
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
63+
Invoking ``__arrow_c_stream__`` triggers execution of the underlying query, but
64+
batches are yielded incrementally rather than materialized all at once in memory.
65+
Consumers can process the stream as it arrives. The stream executes lazily,
66+
letting downstream readers pull batches on demand.
6767

6868

6969
.. ipython:: python
7070
71+
from datafusion import col, lit
72+
7173
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
7274
pa.table(df)
7375

python/datafusion/dataframe.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from __future__ import annotations
2323

2424
import warnings
25-
from collections.abc import Iterable, Sequence
25+
from collections.abc import AsyncIterator, Iterable, Iterator, Sequence
2626
from typing import (
2727
TYPE_CHECKING,
2828
Any,
@@ -50,7 +50,7 @@
5050
sort_list_to_raw_sort_list,
5151
)
5252
from datafusion.plan import ExecutionPlan, LogicalPlan
53-
from datafusion.record_batch import RecordBatchStream
53+
from datafusion.record_batch import RecordBatch, RecordBatchStream
5454

5555
if TYPE_CHECKING:
5656
import pathlib
@@ -304,6 +304,9 @@ def __init__(
304304
class DataFrame:
305305
"""Two dimensional table representation of data.
306306
307+
DataFrame objects are iterable; iterating over a DataFrame yields
308+
:class:`datafusion.RecordBatch` instances lazily.
309+
307310
See :ref:`user_guide_concepts` in the online documentation for more information.
308311
"""
309312

@@ -332,7 +335,7 @@ def into_view(self, temporary: bool = False) -> Table:
332335
return _Table(self.df.into_view(temporary))
333336

334337
def __getitem__(self, key: str | list[str]) -> DataFrame:
335-
"""Return a new :py:class`DataFrame` with the specified column or columns.
338+
"""Return a new :py:class:`DataFrame` with the specified column or columns.
336339
337340
Args:
338341
key: Column name or list of column names to select.
@@ -1291,21 +1294,54 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
12911294
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
12921295

12931296
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1294-
"""Export an Arrow PyCapsule Stream.
1297+
"""Export the DataFrame as an Arrow C Stream.
1298+
1299+
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1300+
Arrow's C Stream interface. Record batches are produced incrementally, so the
1301+
full result set is never materialized in memory.
12951302
1296-
This will execute and collect the DataFrame. We will attempt to respect the
1297-
requested schema, but only trivial transformations will be applied such as only
1298-
returning the fields listed in the requested schema if their data types match
1299-
those in the DataFrame.
1303+
When ``requested_schema`` is provided, DataFusion applies only simple
1304+
projections such as selecting a subset of existing columns or reordering
1305+
them. Column renaming, computed expressions, or type coercion are not
1306+
supported through this interface.
13001307
13011308
Args:
1302-
requested_schema: Attempt to provide the DataFrame using this schema.
1309+
requested_schema: Either a :py:class:`pyarrow.Schema` or an Arrow C
1310+
Schema capsule (``PyCapsule``) produced by
1311+
``schema._export_to_c_capsule()``. The DataFrame will attempt to
1312+
align its output with the fields and order specified by this schema.
13031313
13041314
Returns:
1305-
Arrow PyCapsule object.
1315+
Arrow ``PyCapsule`` object representing an ``ArrowArrayStream``.
1316+
1317+
For practical usage patterns, see the Apache Arrow streaming
1318+
documentation: https://arrow.apache.org/docs/python/ipc.html#streaming.
1319+
1320+
For details on DataFusion's Arrow integration and DataFrame streaming,
1321+
see the user guide (user-guide/io/arrow and user-guide/dataframe/index).
1322+
1323+
Notes:
1324+
The Arrow C Data Interface PyCapsule details are documented by Apache
1325+
Arrow and can be found at:
1326+
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
13061327
"""
1328+
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1329+
# ``execute_stream_partitioned`` under the hood to stream batches while
1330+
# preserving the original partition order.
13071331
return self.df.__arrow_c_stream__(requested_schema)
13081332

1333+
def __iter__(self) -> Iterator[RecordBatch]:
1334+
"""Return an iterator over this DataFrame's record batches."""
1335+
return iter(self.execute_stream())
1336+
1337+
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1338+
"""Return an async iterator over this DataFrame's record batches.
1339+
1340+
We're using __aiter__ because we support Python < 3.10 where aiter() is not
1341+
available.
1342+
"""
1343+
return self.execute_stream().__aiter__()
1344+
13091345
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
13101346
"""Apply a function to the current DataFrame which returns another DataFrame.
13111347

python/datafusion/record_batch.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49+
def __arrow_c_array__(
50+
self, requested_schema: object | None = None
51+
) -> tuple[object, object]:
52+
"""Export the record batch via the Arrow C Data Interface.
53+
54+
This allows zero-copy interchange with libraries that support the
55+
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56+
CDataInterface/PyCapsuleInterface.html>`_.
57+
58+
Args:
59+
requested_schema: Attempt to provide the record batch using this
60+
schema. Only straightforward projections such as column
61+
selection or reordering are applied.
62+
63+
Returns:
64+
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65+
``ArrowSchema``.
66+
"""
67+
return self.record_batch.__arrow_c_array__(requested_schema)
68+
4969

5070
class RecordBatchStream:
5171
"""This class represents a stream of record batches.
@@ -63,19 +83,19 @@ def next(self) -> RecordBatch:
6383
return next(self)
6484

6585
async def __anext__(self) -> RecordBatch:
66-
"""Async iterator function."""
86+
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
6787
next_batch = await self.rbs.__anext__()
6888
return RecordBatch(next_batch)
6989

7090
def __next__(self) -> RecordBatch:
71-
"""Iterator function."""
91+
"""Return the next :py:class:`RecordBatch` in the stream."""
7292
next_batch = next(self.rbs)
7393
return RecordBatch(next_batch)
7494

7595
def __aiter__(self) -> typing_extensions.Self:
76-
"""Async iterator function."""
96+
"""Return an asynchronous iterator over record batches."""
7797
return self
7898

7999
def __iter__(self) -> typing_extensions.Self:
80-
"""Iterator function."""
100+
"""Return an iterator over record batches."""
81101
return self

0 commit comments

Comments
 (0)