Skip to content

Generate parquet dataset failed for join #27

@zhuqi-lucas

Description

@zhuqi-lucas

cc @MrPowers @SemyonSinchenko

Generate parquet dataset works well for group by dataset, but it fails with join dataset.

Details:
apache/datafusion#16804 (comment)

./bench.sh data h2o_medium_join_parquet
***************************
DataFusion Benchmark Runner and Data Generator
COMMAND: data
BENCHMARK: h2o_medium_join_parquet
DATA_DIR: /Users/zhuqi/arrow-datafusion/benchmarks/data
CARGO_COMMAND: cargo run --release
PREFER_HASH_JOIN: true
***************************
Found Python version 3.13, which is suitable.
Using Python command: /opt/homebrew/bin/python3
Installing falsa...
Generating h2o test data in /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o with size=MEDIUM and format=PARQUET
100 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e2_0.parquet

100000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e5_0.parquet

100000000 rows will be saved into: /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e8_NA.parquet

An SMALL data schema is the following:
id1: int64 not null
id4: string
v2: double

An output format is PARQUET

Batch mode is supported.
In case of memory problems you can try to reduce a batch_size.


Working... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━   0% -:--:--
╭──────────────────────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────────────────────────────────────────────────────────╮
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/falsa/app.py:144 in join                                                                                                                │
│                                                                                                                                                                                                                    │
│   141)                                                                                                                                                                                                        │
│   142 │                                                                                                                                                                                                            │
│   143for batch in track(join_small.iter_batches(), total=len(join_small.batches)):                                                                                                                            │
│ ❱ 144 │   │   writer_small.write_batch(batch)                                                                                                                                                                      │
│   145 │   writer_small.close()                                                                                                                                                                                     │
│   146 │                                                                                                                                                                                                            │
│   147if data_format is Format.DELTA:                                                                                                                                                                          │
│                                                                                                                                                                                                                    │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │                batch = pyarrow.RecordBatch                                                                                                                                                                     │ │
│ │                        id1: int64 not null                                                                                                                                                                     │ │
│ │                        id4: string not null                                                                                                                                                                    │ │
│ │                        v2: double not null                                                                                                                                                                     │ │
│ │                        ----                                                                                                                                                                                    │ │
│ │                        id1: [12,77,106,10,52,105,29,64,46,51,...,110,82,72,8,1,104,69,5,44,25]                                                                                                                 │ │
│ │                        id4: ["id12","id77","id106","id10","id52","id105","id29","id64","id46","id51",...,"id110","id82","id72","id8","id1","id104","id69","id5","id44","id25"]                                 │ │
│ │                        v2:                                                                                                                                                                                     │ │
│ │                        [53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,15.539006557813716,23.840799451398684,7.23383214431385,6.366591524991982,20.222312628293857… │ │
│ │           batch_size = 5000000                                                                                                                                                                                 │ │
│ │    data_filename_big = 'J1_1e8_1e8_NA.parquet'                                                                                                                                                                 │ │
│ │    data_filename_lhs = 'J1_1e8_NA_0.parquet'                                                                                                                                                                   │ │
│ │ data_filename_medium = 'J1_1e8_1e5_0.parquet'                                                                                                                                                                  │ │
│ │  data_filename_small = 'J1_1e8_1e2_0.parquet'                                                                                                                                                                  │ │
│ │          data_format = <Format.PARQUET: 'PARQUET'>                                                                                                                                                             │ │
│ │      generation_seed = 6839596180442651345                                                                                                                                                                     │ │
│ │             join_big = <falsa.local_fs.JoinBigGenerator object at 0x105e2fe00>                                                                                                                                 │ │
│ │             join_lhs = <falsa.local_fs.JoinLHSGenerator object at 0x105e2da90>                                                                                                                                 │ │
│ │          join_medium = <falsa.local_fs.JoinMediumGenerator object at 0x105e2fcb0>                                                                                                                              │ │
│ │           join_small = <falsa.local_fs.JoinSmallGenerator object at 0x105e2fb60>                                                                                                                               │ │
│ │                    k = 10                                                                                                                                                                                      │ │
│ │            keys_seed = 1026847926404610461                                                                                                                                                                     │ │
│ │                n_big = 100000000                                                                                                                                                                               │ │
│ │             n_medium = 100000                                                                                                                                                                                  │ │
│ │              n_small = 100                                                                                                                                                                                     │ │
│ │                  nas = 0                                                                                                                                                                                       │ │
│ │           output_big = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e8_NA.parquet')                                                                                                    │ │
│ │           output_dir = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o')                                                                                                                          │ │
│ │           output_lhs = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_NA_0.parquet')                                                                                                      │ │
│ │        output_medium = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e5_0.parquet')                                                                                                     │ │
│ │         output_small = PosixPath('/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e8_1e2_0.parquet')                                                                                                     │ │
│ │          path_prefix = '/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o'                                                                                                                                     │ │
│ │                 seed = 42                                                                                                                                                                                      │ │
│ │                 size = <Size.MEDIUM: 'MEDIUM'>                                                                                                                                                                 │ │
│ │         writer_small = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050>                                                                                                                              │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                                                                                                                                    │
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/pyarrow/parquet/core.py:1089 in write_batch                                                                                             │
│                                                                                                                                                                                                                    │
│   1086 │   │   │   will be used instead.                                                                                                                                                                           │
│   1087 │   │   """                                                                                                                                                                                                 │
│   1088 │   │   table = pa.Table.from_batches([batch], batch.schema)                                                                                                                                                │
│ ❱ 1089 │   │   self.write_table(table, row_group_size)                                                                                                                                                             │
│   1090 │                                                                                                                                                                                                           │
│   1091 │   def write_table(self, table, row_group_size=None):                                                                                                                                                      │
│   1092 │   │   """                                                                                                                                                                                                 │
│                                                                                                                                                                                                                    │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │          batch = pyarrow.RecordBatch                                                                                                                                                                           │ │
│ │                  id1: int64 not null                                                                                                                                                                           │ │
│ │                  id4: string not null                                                                                                                                                                          │ │
│ │                  v2: double not null                                                                                                                                                                           │ │
│ │                  ----                                                                                                                                                                                          │ │
│ │                  id1: [12,77,106,10,52,105,29,64,46,51,...,110,82,72,8,1,104,69,5,44,25]                                                                                                                       │ │
│ │                  id4: ["id12","id77","id106","id10","id52","id105","id29","id64","id46","id51",...,"id110","id82","id72","id8","id1","id104","id69","id5","id44","id25"]                                       │ │
│ │                  v2:                                                                                                                                                                                           │ │
│ │                  [53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,15.539006557813716,23.840799451398684,7.23383214431385,6.366591524991982,20.222312628293857,...,6… │ │
│ │ row_group_size = None                                                                                                                                                                                          │ │
│ │           self = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050>                                                                                                                                    │ │
│ │          table = pyarrow.Table                                                                                                                                                                                 │ │
│ │                  id1: int64 not null                                                                                                                                                                           │ │
│ │                  id4: string not null                                                                                                                                                                          │ │
│ │                  v2: double not null                                                                                                                                                                           │ │
│ │                  ----                                                                                                                                                                                          │ │
│ │                  id1: [[12,77,106,10,52,...,104,69,5,44,25]]                                                                                                                                                   │ │
│ │                  id4: [["id12","id77","id106","id10","id52",...,"id104","id69","id5","id44","id25"]]                                                                                                           │ │
│ │                  v2:                                                                                                                                                                                           │ │
│ │                  [[53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,...,26.7118533955444,73.44416011403574,93.63022604514522,51.816253173876824,78.95727980955964]]   │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                                                                                                                                    │
│ /Users/zhuqi/arrow-datafusion/benchmarks/venv/lib/python3.13/site-packages/pyarrow/parquet/core.py:1113 in write_table                                                                                             │
│                                                                                                                                                                                                                    │
│   1110 │   │   │   msg = ('Table schema does not match schema used to create file: '                                                                                                                               │
│   1111 │   │   │   │      '\ntable:\n{!s} vs. \nfile:\n{!s}'                                                                                                                                                       │
│   1112 │   │   │   │      .format(table.schema, self.schema))                                                                                                                                                      │
│ ❱ 1113 │   │   │   raise ValueError(msg)                                                                                                                                                                           │
│   1114 │   │                                                                                                                                                                                                       │
│   1115 │   │   self.writer.write_table(table, row_group_size=row_group_size)                                                                                                                                       │
│   1116                                                                                                                                                                                                             │
│                                                                                                                                                                                                                    │
│ ╭──────────────────────────────────────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │            msg = 'Table schema does not match schema used to create file: \ntable:\nid1: int64 not n'+98                                                                                                       │ │
│ │ row_group_size = None                                                                                                                                                                                          │ │
│ │           self = <pyarrow.parquet.core.ParquetWriter object at 0x1067d4050>                                                                                                                                    │ │
│ │          table = pyarrow.Table                                                                                                                                                                                 │ │
│ │                  id1: int64 not null                                                                                                                                                                           │ │
│ │                  id4: string not null                                                                                                                                                                          │ │
│ │                  v2: double not null                                                                                                                                                                           │ │
│ │                  ----                                                                                                                                                                                          │ │
│ │                  id1: [[12,77,106,10,52,...,104,69,5,44,25]]                                                                                                                                                   │ │
│ │                  id4: [["id12","id77","id106","id10","id52",...,"id104","id69","id5","id44","id25"]]                                                                                                           │ │
│ │                  v2:                                                                                                                                                                                           │ │
│ │                  [[53.0075954693085,32.410072200393316,72.68372205230826,71.61363809771296,86.99915627358179,...,26.7118533955444,73.44416011403574,93.63022604514522,51.816253173876824,78.95727980955964]]   │ │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
ValueError: Table schema does not match schema used to create file:
table:
id1: int64 not null
id4: string not null
v2: double not null vs.
file:
id1: int64 not null
id4: string
v2: double

Metadata

Metadata

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions