Skip to content

Commit 213f26b

Browse files
authored
Prefer pyarrow to read/write parquet; use arro3 as fallback (#598)
This is blocked on the next release of arro3
1 parent 2364d1b commit 213f26b

File tree

10 files changed

+138
-81
lines changed

10 files changed

+138
-81
lines changed

lonboard/_cli.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import json
24
import webbrowser
35
from pathlib import Path
@@ -22,7 +24,8 @@ def read_pyogrio(path: Path) -> Table:
2224
from pyogrio.raw import open_arrow
2325
except ImportError as e:
2426
raise ImportError(
25-
"pyogrio is a required dependency for the CLI. "
27+
"pyogrio is a required dependency for the CLI for reading data sources \n"
28+
"other than GeoParquet.\n"
2629
"Install with `pip install pyogrio`."
2730
) from e
2831

@@ -58,29 +61,55 @@ def read_pyogrio(path: Path) -> Table:
5861
return table.with_schema(new_schema)
5962

6063

61-
def read_geoparquet(path: Path) -> Table:
62-
"""Read GeoParquet file at path using pyarrow
64+
def read_parquet(path: Path) -> tuple[Table, dict]:
65+
"""Read Parquet file using either pyarrow or arro3.
66+
67+
arro3.io.read_parquet is not multi-threaded (as of arro3 0.2.1), so pyarrow can be
68+
up to 4x faster on an 8-core machine. Because of this, we prefer pyarrow if it's
69+
installed, and fall back to arro3 otherwise.
6370
6471
Args:
65-
path: Path to GeoParquet file
72+
path: path to Parquet file.
73+
74+
Raises:
75+
ValueError: if there's no GeoParquet metadata in the file
76+
77+
Returns:
78+
arro3 Table
6679
"""
6780
try:
6881
import pyarrow.parquet as pq
69-
except ImportError as e:
70-
raise ImportError(
71-
"pyarrow currently required for reading GeoParquet files.\n"
72-
"Run `pip install pyarrow`."
73-
) from e
7482

75-
file = pq.ParquetFile(path)
76-
geo_meta = file.metadata.metadata.get(b"geo")
77-
if not geo_meta:
78-
raise ValueError("Expected geo metadata in Parquet file")
83+
file = pq.ParquetFile(path)
84+
if b"geo" not in file.metadata.metadata:
85+
raise ValueError("Expected geo metadata in Parquet file")
86+
geo_meta = json.loads(file.metadata.metadata.get(b"geo"))
87+
88+
table = Table.from_arrow(file.read())
89+
90+
return table, geo_meta
7991

80-
pyarrow_table = file.read()
81-
table = Table.from_arrow(pyarrow_table)
92+
except ImportError:
93+
from arro3.io import read_parquet
8294

83-
geo_meta = json.loads(geo_meta)
95+
reader = read_parquet(path)
96+
97+
if "geo" not in reader.schema.metadata_str.keys():
98+
raise ValueError("Expected geo metadata in Parquet file")
99+
100+
table = reader.read_all()
101+
geo_meta = json.loads(table.schema.metadata_str["geo"])
102+
103+
return table, geo_meta
104+
105+
106+
def read_geoparquet(path: Path) -> Table:
107+
"""Read GeoParquet file at path using pyarrow or arro3.io
108+
109+
Args:
110+
path: Path to GeoParquet file
111+
"""
112+
table, geo_meta = read_parquet(path)
84113
geometry_column_name = geo_meta["primary_column"]
85114
geometry_column_index = [
86115
i for (i, name) in enumerate(table.schema.names) if name == geometry_column_name

lonboard/_geoarrow/_duckdb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
from typing import TYPE_CHECKING, List, Optional, Union
66

77
import numpy as np
8-
from arro3.compute import struct_field
98
from arro3.core import (
109
Array,
1110
ChunkedArray,
1211
Field,
1312
Table,
1413
fixed_size_list_array,
1514
list_array,
15+
struct_field,
1616
)
1717

1818
from lonboard._constants import EXTENSION_NAME

lonboard/_geoarrow/ops/bbox.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
from typing import Tuple
88

99
import numpy as np
10-
from arro3.compute import list_flatten
11-
from arro3.core import Array, ChunkedArray, DataType, Field
10+
from arro3.core import Array, ChunkedArray, DataType, Field, list_flatten
1211

1312
from lonboard._constants import EXTENSION_NAME
1413

lonboard/_geoarrow/ops/centroid.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
from typing import Optional
77

88
import numpy as np
9-
from arro3.compute import list_flatten
10-
from arro3.core import Array, ChunkedArray, DataType, Field
9+
from arro3.core import Array, ChunkedArray, DataType, Field, list_flatten
1110

1211
from lonboard._constants import EXTENSION_NAME
1312

lonboard/_geoarrow/ops/coord_layout.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
from typing import Tuple
44

55
import numpy as np
6-
from arro3.compute import struct_field
76
from arro3.core import (
87
Array,
98
ChunkedArray,
109
DataType,
1110
Field,
1211
Table,
1312
fixed_size_list_array,
13+
struct_field,
1414
)
1515

1616
from lonboard._constants import EXTENSION_NAME

lonboard/_geoarrow/ops/reproject.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from warnings import warn
99

1010
import numpy as np
11-
from arro3.compute import list_flatten, list_offsets
1211
from arro3.core import (
1312
Array,
1413
ChunkedArray,
@@ -17,6 +16,8 @@
1716
Table,
1817
fixed_size_list_array,
1918
list_array,
19+
list_flatten,
20+
list_offsets,
2021
)
2122
from pyproj import CRS, Transformer
2223

lonboard/_serialization.py

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
66

77
import numpy as np
8-
from arro3.core import Array, ChunkedArray, Table
9-
from arro3.io import write_parquet
8+
from arro3.core import Array, ChunkedArray, RecordBatch, Table
109
from traitlets import TraitError
1110

1211
from lonboard.models import ViewState
@@ -27,28 +26,59 @@
2726
DEFAULT_MAX_NUM_CHUNKS = 32
2827

2928

29+
def write_parquet_batch(record_batch: RecordBatch) -> bytes:
30+
"""Write a RecordBatch to a Parquet file
31+
32+
We still use pyarrow.parquet.ParquetWriter if pyarrow is installed because pyarrow
33+
has better encoding defaults. So Parquet files written by pyarrow are smaller by
34+
default than files written by arro3.io.write_parquet.
35+
"""
36+
# Occasionally it's possible for there to be empty batches in the
37+
# pyarrow table. This will error when writing to parquet. We want to
38+
# give a more informative error.
39+
if record_batch.num_rows == 0:
40+
raise ValueError("Batch with 0 rows.")
41+
42+
try:
43+
import pyarrow as pa
44+
import pyarrow.parquet as pq
45+
46+
bio = BytesIO()
47+
with pq.ParquetWriter(
48+
bio,
49+
schema=pa.schema(record_batch.schema),
50+
compression=DEFAULT_PARQUET_COMPRESSION,
51+
compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL,
52+
) as writer:
53+
writer.write_batch(
54+
pa.record_batch(record_batch), row_group_size=record_batch.num_rows
55+
)
56+
57+
return bio.getvalue()
58+
59+
except ImportError:
60+
from arro3.io import write_parquet
61+
62+
compression_string = (
63+
f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})"
64+
)
65+
bio = BytesIO()
66+
write_parquet(
67+
record_batch,
68+
bio,
69+
compression=compression_string,
70+
max_row_group_size=record_batch.num_rows,
71+
)
72+
73+
return bio.getvalue()
74+
75+
3076
def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> List[bytes]:
3177
buffers: List[bytes] = []
3278
assert max_chunksize > 0
3379

34-
compression_string = (
35-
f"{DEFAULT_PARQUET_COMPRESSION}({DEFAULT_PARQUET_COMPRESSION_LEVEL})"
36-
)
3780
for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches():
38-
with BytesIO() as bio:
39-
# Occasionally it's possible for there to be empty batches in the
40-
# pyarrow table. This will error when writing to parquet. We want to
41-
# give a more informative error.
42-
if record_batch.num_rows == 0:
43-
raise ValueError("Batch with 0 rows.")
44-
45-
write_parquet(
46-
table,
47-
bio,
48-
compression=compression_string,
49-
max_row_group_size=record_batch.num_rows,
50-
)
51-
buffers.append(bio.getvalue())
81+
buffers.append(write_parquet_batch(record_batch))
5282

5383
return buffers
5484

lonboard/_viz.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
)
1919

2020
import numpy as np
21-
from arro3.compute import struct_field
22-
from arro3.core import Array, ChunkedArray, Schema, Table
21+
from arro3.core import Array, ChunkedArray, Schema, Table, struct_field
2322

2423
from lonboard._compat import check_pandas_version
2524
from lonboard._constants import EXTENSION_NAME

poetry.lock

Lines changed: 34 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ classifiers = [
3232
[tool.poetry.dependencies]
3333
python = "^3.8"
3434
anywidget = "^0.9.0"
35-
arro3-core = "^0.2.1"
36-
arro3-io = "^0.2.1"
37-
arro3-compute = "^0.2.1"
35+
arro3-core = "^0.3.0-beta.1"
36+
arro3-io = "^0.3.0-beta.1"
37+
arro3-compute = "^0.3.0-beta.1"
3838
ipywidgets = ">=7.6.0"
3939
numpy = ">=1.14"
4040
# The same version pin as geopandas

0 commit comments

Comments
 (0)