diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 150fe5efc0..e02e80cd1f 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -43,6 +43,7 @@ st_regionstats, st_simplify, ) +from bigframes.bigquery._operations.io import load_data from bigframes.bigquery._operations.json import ( json_extract, json_extract_array, @@ -107,6 +108,8 @@ struct, # table ops create_external_table, + # io ops + load_data, ] _module = sys.modules[__name__] @@ -160,6 +163,8 @@ "struct", # table ops "create_external_table", + # io ops + "load_data", # Modules / SQL namespaces "ai", "ml", diff --git a/bigframes/bigquery/_operations/io.py b/bigframes/bigquery/_operations/io.py new file mode 100644 index 0000000000..daf28e6aed --- /dev/null +++ b/bigframes/bigquery/_operations/io.py @@ -0,0 +1,94 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Mapping, Optional, Union + +import pandas as pd + +from bigframes.bigquery._operations.table import _get_table_metadata +import bigframes.core.logging.log_adapter as log_adapter +import bigframes.core.sql.io +import bigframes.session + + +@log_adapter.method_logger(custom_base_name="bigquery_io") +def load_data( + table_name: str, + *, + write_disposition: str = "INTO", + columns: Optional[Mapping[str, str]] = None, + partition_by: Optional[list[str]] = None, + cluster_by: Optional[list[str]] = None, + table_options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None, + from_files_options: Mapping[str, Union[str, int, float, bool, list]], + with_partition_columns: Optional[Mapping[str, str]] = None, + connection_name: Optional[str] = None, + session: Optional[bigframes.session.Session] = None, +) -> pd.Series: + """ + Loads data into a BigQuery table. + See the `BigQuery LOAD DATA DDL syntax + `_ + for additional reference. + Args: + table_name (str): + The name of the table in BigQuery. + write_disposition (str, default "INTO"): + Whether to replace the table if it already exists ("OVERWRITE") or append to it ("INTO"). + columns (Mapping[str, str], optional): + The table's schema. + partition_by (list[str], optional): + A list of partition expressions to partition the table by. See https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/load-statements#partition_expression. + cluster_by (list[str], optional): + A list of columns to cluster the table by. + table_options (Mapping[str, Union[str, int, float, bool, list]], optional): + The table options. + from_files_options (Mapping[str, Union[str, int, float, bool, list]]): + The options for loading data from files. + with_partition_columns (Mapping[str, str], optional): + The table's partition columns. + connection_name (str, optional): + The connection to use for the table. + session (bigframes.session.Session, optional): + The session to use. If not provided, the default session is used. + Returns: + pandas.Series: + A Series with object dtype containing the table metadata. Reference + the `BigQuery Table REST API reference + `_ + for available fields. + """ + import bigframes.pandas as bpd + + sql = bigframes.core.sql.io.load_data_ddl( + table_name=table_name, + write_disposition=write_disposition, + columns=columns, + partition_by=partition_by, + cluster_by=cluster_by, + table_options=table_options, + from_files_options=from_files_options, + with_partition_columns=with_partition_columns, + connection_name=connection_name, + ) + + if session is None: + bpd.read_gbq_query(sql) + session = bpd.get_global_session() + else: + session.read_gbq_query(sql) + + return _get_table_metadata(bqclient=session.bqclient, table_name=table_name) diff --git a/bigframes/core/sql/io.py b/bigframes/core/sql/io.py new file mode 100644 index 0000000000..9e1a549a64 --- /dev/null +++ b/bigframes/core/sql/io.py @@ -0,0 +1,87 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Mapping, Optional, Union + + +def load_data_ddl( + table_name: str, + *, + write_disposition: str = "INTO", + columns: Optional[Mapping[str, str]] = None, + partition_by: Optional[list[str]] = None, + cluster_by: Optional[list[str]] = None, + table_options: Optional[Mapping[str, Union[str, int, float, bool, list]]] = None, + from_files_options: Mapping[str, Union[str, int, float, bool, list]], + with_partition_columns: Optional[Mapping[str, str]] = None, + connection_name: Optional[str] = None, +) -> str: + """Generates the LOAD DATA DDL statement.""" + statement = ["LOAD DATA"] + statement.append(write_disposition) + statement.append(table_name) + + if columns: + column_defs = ", ".join([f"{name} {typ}" for name, typ in columns.items()]) + statement.append(f"({column_defs})") + + if partition_by: + statement.append(f"PARTITION BY {', '.join(partition_by)}") + + if cluster_by: + statement.append(f"CLUSTER BY {', '.join(cluster_by)}") + + if table_options: + opts = [] + for key, value in table_options.items(): + if isinstance(value, str): + value_sql = repr(value) + opts.append(f"{key} = {value_sql}") + elif isinstance(value, bool): + opts.append(f"{key} = {str(value).upper()}") + elif isinstance(value, list): + list_str = ", ".join([repr(v) for v in value]) + opts.append(f"{key} = [{list_str}]") + else: + opts.append(f"{key} = {value}") + options_str = ", ".join(opts) + statement.append(f"OPTIONS ({options_str})") + + opts = [] + for key, value in from_files_options.items(): + if isinstance(value, str): + value_sql = repr(value) + opts.append(f"{key} = {value_sql}") + elif isinstance(value, bool): + opts.append(f"{key} = {str(value).upper()}") + elif isinstance(value, list): + list_str = ", ".join([repr(v) for v in value]) + opts.append(f"{key} = [{list_str}]") + else: + opts.append(f"{key} = {value}") + options_str = ", ".join(opts) + statement.append(f"FROM FILES ({options_str})") + + if with_partition_columns: + part_defs = ", ".join( + [f"{name} {typ}" for name, typ in with_partition_columns.items()] + ) + statement.append(f"WITH PARTITION COLUMNS ({part_defs})") + + if connection_name: + statement.append(f"WITH CONNECTION `{connection_name}`") + + return " ".join(statement) diff --git a/tests/system/large/bigquery/test_io.py b/tests/system/large/bigquery/test_io.py new file mode 100644 index 0000000000..024c617470 --- /dev/null +++ b/tests/system/large/bigquery/test_io.py @@ -0,0 +1,39 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for for the specific language governing permissions and +# limitations under the License. + +import bigframes.bigquery as bbq + + +def test_load_data(session, dataset_id): + table_name = f"{dataset_id}.test_load_data" + uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv" + + # Create the external table + table = bbq.load_data( + table_name, + columns={ + "name": "STRING", + "post_abbr": "STRING", + }, + from_files_options={"format": "CSV", "uris": [uri], "skip_leading_rows": 1}, + session=session, + ) + assert table is not None + + # Read the table to verify + import bigframes.pandas as bpd + + bf_df = bpd.read_gbq(table_name) + pd_df = bf_df.to_pandas() + assert len(pd_df) > 0 diff --git a/tests/unit/bigquery/_operations/test_io.py b/tests/unit/bigquery/_operations/test_io.py new file mode 100644 index 0000000000..97b38f8649 --- /dev/null +++ b/tests/unit/bigquery/_operations/test_io.py @@ -0,0 +1,41 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import pytest + +import bigframes.bigquery._operations.io +import bigframes.core.sql.io +import bigframes.session + + +@pytest.fixture +def mock_session(): + return mock.create_autospec(spec=bigframes.session.Session) + + +@mock.patch("bigframes.bigquery._operations.io._get_table_metadata") +def test_load_data(get_table_metadata_mock, mock_session): + bigframes.bigquery._operations.io.load_data( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + session=mock_session, + ) + mock_session.read_gbq_query.assert_called_once() + generated_sql = mock_session.read_gbq_query.call_args[0][0] + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert generated_sql == expected + get_table_metadata_mock.assert_called_once() diff --git a/tests/unit/core/sql/test_io.py b/tests/unit/core/sql/test_io.py new file mode 100644 index 0000000000..23e5f796e3 --- /dev/null +++ b/tests/unit/core/sql/test_io.py @@ -0,0 +1,90 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bigframes.core.sql.io + + +def test_load_data_ddl(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert sql == expected + + +def test_load_data_ddl_overwrite(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + write_disposition="OVERWRITE", + columns={"col1": "INT64", "col2": "STRING"}, + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA OVERWRITE my-project.my_dataset.my_table (col1 INT64, col2 STRING) FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert sql == expected + + +def test_load_data_ddl_with_partition_columns(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + with_partition_columns={"part1": "DATE", "part2": "STRING"}, + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) FROM FILES (format = 'CSV', uris = ['gs://bucket/path*']) WITH PARTITION COLUMNS (part1 DATE, part2 STRING)" + assert sql == expected + + +def test_load_data_ddl_connection(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + connection_name="my-connection", + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) FROM FILES (format = 'CSV', uris = ['gs://bucket/path*']) WITH CONNECTION `my-connection`" + assert sql == expected + + +def test_load_data_ddl_partition_by(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + partition_by=["date_col"], + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) PARTITION BY date_col FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert sql == expected + + +def test_load_data_ddl_cluster_by(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + cluster_by=["cluster_col"], + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) CLUSTER BY cluster_col FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert sql == expected + + +def test_load_data_ddl_table_options(): + sql = bigframes.core.sql.io.load_data_ddl( + "my-project.my_dataset.my_table", + columns={"col1": "INT64", "col2": "STRING"}, + table_options={"description": "my table"}, + from_files_options={"format": "CSV", "uris": ["gs://bucket/path*"]}, + ) + expected = "LOAD DATA INTO my-project.my_dataset.my_table (col1 INT64, col2 STRING) OPTIONS (description = 'my table') FROM FILES (format = 'CSV', uris = ['gs://bucket/path*'])" + assert sql == expected