diff --git a/README.md b/README.md index ac42c3e7..402f5e15 100644 --- a/README.md +++ b/README.md @@ -10,25 +10,25 @@ SIG TFX-Addons is a community-led open source project. As such, the project depe ## Maintainership The maintainers of TensorFlow Addons can be found in the [CODEOWNERS](https://github.com/tensorflow/tfx-addons/blob/main/CODEOWNERS) file of the repo. If you would -like to maintain something, please feel free to submit a PR. We encourage multiple +like to maintain something, please feel free to submit a PR. We encourage multiple owners for all submodules. ## Installation -TFX Addons is available on PyPI for all OS. To install the latest version, +TFX Addons is available on PyPI for all OS. To install the latest version, run the following: ``` pip install tfx-addons ``` -To ensure you have a compatible version of dependencies for any given project, +To ensure you have a compatible version of dependencies for any given project, you can specify the project name as an extra requirement during install: ``` pip install tfx-addons[feast_examplegen,schema_curation] -``` +``` To use TFX Addons: @@ -45,19 +45,19 @@ tfxa.feast_examplegen.FeastExampleGen(...) ## TFX Addons projects -* [tfxa.feast_examplegen](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/feast_examplegen) +* [tfxa.feast_examplegen](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/feast_examplegen) * [tfxa.feature_selection](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/feature_selection) * [tfxa.firebase_publisher](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/firebase_publisher) * [tfxa.huggingface_pusher](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/huggingface_pusher) -* [tfxa.message_exit_handler](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/message_exit_handler) -* [tfxa.mlmd_client](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/mlmd_client) +* [tfxa.message_exit_handler](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/message_exit_handler) +* [tfxa.mlmd_client](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/mlmd_client) * [tfxa.model_card_generator](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/model_card_generator) -* [tfxa.pandas_transform](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/pandas_transform) -* [tfxa.predictions_to_bigquery](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/predictions_to_bigquery) +* [tfxa.pandas_transform](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/pandas_transform) +* [tfxa.predictions_to_bigquery](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/predictions_to_bigquery) * [tfxa.sampling](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/sampling) -* [tfxa.schema_curation](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/schema_curation) +* [tfxa.schema_curation](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/schema_curation) * [tfxa.xgboost_evaluator](https://github.com/tensorflow/tfx-addons/tree/main/tfx_addons/xgboost_evaluator) - + Check out [proposals](https://github.com/tensorflow/tfx-addons/tree/main/proposals) for a list of existing or upcoming projects proposals for TFX Addons. diff --git a/setup.py b/setup.py index 5db798e3..56e98a80 100644 --- a/setup.py +++ b/setup.py @@ -57,7 +57,7 @@ def get_long_description(): return fp.read() -TESTS_REQUIRE = ["pytest", "pylint", "pre-commit", "isort", "yapf"] +TESTS_REQUIRE = ["pytest", "pylint", "pre-commit", "isort", "yapf", "absl-py"] PKG_REQUIRES = get_pkg_metadata() EXTRAS_REQUIRE = PKG_REQUIRES.copy() diff --git a/tfx_addons/predictions_to_bigquery/Dockerfile b/tfx_addons/predictions_to_bigquery/Dockerfile new file mode 100644 index 00000000..50cfe5b0 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/Dockerfile @@ -0,0 +1,24 @@ +# Copyright 2023 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +ARG PLATFORM=cpu + +FROM gcr.io/tfx-oss-public/tfx:latest + +WORKDIR /tfx-addons +RUN mkdir -p /tfx-addons/tfx_addons +ADD __init__.py /tfx-addons/tfx_addons +COPY ./ ./tfx_addons/predictions_to_bigquery + +ENV PYTHONPATH="/tfx-addons:${PYTHONPATH}" diff --git a/tfx_addons/predictions_to_bigquery/README.md b/tfx_addons/predictions_to_bigquery/README.md new file mode 100644 index 00000000..c9d5da0d --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/README.md @@ -0,0 +1,128 @@ +# Prediction results to BigQuery component + +[![TensorFlow](https://img.shields.io/badge/TFX-orange)](https://www.tensorflow.org/tfx) + +## Project Description + +This component exports prediction results from BulkInferrer to a BigQuery +table. +The BigQuery table schema can be generated through one of the following sources: +1. From SchemaGen component output +2. From Transform component output +3. From BulkInferrer component output (i.e. prediction results) + +If both SchemaGen and Transform outputs are passed to the component, +the SchemaGen output will take priority. It would be best to use SchemaGen +for generating the BigQuery schema. + +If the Transform output channel is passed to the component, without the +SchemaGen output, the BigQuery schema will be derived from the pre-transform +metadata schema generated by Transform. Note that the metadata schema may +include a label key, which may not be present in the BulkInferrer prediction +results. Therefore, this option may not work for unlabeled data. + +If neither the SchemaGen nor Transform outputs are passed to the component, +the BigQuery schema will be parsed from the BulkInferrer prediction results +itself, which contains tf.Example protos. + +Prediction string labels from the BulkInferrer output may be derived by passing a 'vocab_label_file' execution parameter to the component. This will only work +if the Transform component output is passed and if it the `vocab_label_file` +is present. + +## Project Use-Case(s) + +The main use case for this components is to enable export of model prediction +results into a BigQuery for further data analysis. The exported table will +contain the model predictions and their corresponding inputs. If the input +data is labeled, this would allow users to compare labels and corresponding predictions. + +## Project Implementation + +PredictionsToBigQuery component uses Beam to process the prediction results +from BulkInferrer and export it to a BigQuery table. + +The BigQuery table name is passed as a parameter by the user, however the user +can also choose to have the component append a timestamp at the end of the table name. + +The output component is the fully qualified BigQuery table name where the inference results are stored, and this can be accessed through the `bigquery_export` key. The same table name is also stored as a custom property +of the `bigquery_export` artifact. + +### Usage example + +```python + +from tfx import v1 as tfx +import tfx_addons as tfxa + +... + +predictions_to_bigquery = tfxa.predictions_to_bigquery.PredictionsToBigQuery( + inference_results=bulk_inferrer.outputs['inference_result'], + schema=schema_gen.outputs['schema'], + transform_graph=transform.outputs['transform_graph'], + bq_table_name='my_bigquery_table', + gcs_temp_dir='gs://bucket/temp-dir', + vocab_label_file='Label', +) +``` + +Refer to `integration_test.py` for tests that demonstrates how to use the +component. + +For a description of the inputs and execution parameters of the component, +refer to the `component.py` file. + +## Project Dependencies + +See `version.py` in the top repo directory for component dependencies. + +## Testing + +Each Python module has a corresponding unit test file ending in `_test.py`. + +An integration test is also available and requires use of a Google Cloud +project. Additional instructions for running the unit test can be found in `integration_test.py`. + +Some tests use Abseil's `absltest` module. +Install the package using pip: +```bash +pip install absl-py +``` + +### Test coverage + +Test coverage can be generated using the `coverage package`: +```bash +pip install coverage +``` + +To get test code coverage on the component code, run the following from the +top directory of the tfx-addons repository: + +```bash +coverage run -m unittest discover -s tfx_addons/predictions_to_bigquery -p *_test.py +``` + +Generate a summary report in the terminal: +```bash +coverage report -m + +``` +Generate an HTML report that also details missed lines +```bash +coverage html -d /tmp/htmlcov +``` + +If working on a remote machine, the HTML coverage report can be viewed +by launching a web server +```bash +pushd /tmp/htmlcov +python -m http.server 8000 # or another unused port number +``` + +## Project team +- Hannes Hapke (@hanneshapke, Digits Financial Inc.) +- Carlos Ezequiel (@cfezequiel, Google) +- Michael Sherman (@michaelwsherman, Google) +- Robert Crowe (@rcrowe-google, Google) +- Gerard Casas Saez (@casassg, Cash App) diff --git a/tfx_addons/predictions_to_bigquery/component.py b/tfx_addons/predictions_to_bigquery/component.py index c89c8ca9..15163401 100644 --- a/tfx_addons/predictions_to_bigquery/component.py +++ b/tfx_addons/predictions_to_bigquery/component.py @@ -1,23 +1,20 @@ # Copyright 2023 The TensorFlow Authors. All Rights Reserved. # -# Licensed under the Apache License, Version 2.0 (the "License"); +# Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, +# distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== # This code was originally written by Hannes Hapke (Digits Financial Inc.) # on Feb. 6, 2023. -""" -Digits Prediction-to-BigQuery: Functionality to write prediction results usually - from a BulkInferrer to BigQuery. -""" +"""Predictions-to-bigquery component spec.""" from typing import Optional @@ -26,103 +23,109 @@ from tfx.types import standard_artifacts from tfx.types.component_spec import ChannelParameter, ExecutionParameter -from .executor import Executor as AnnotateUnlabeledCategoryDataExecutor +from tfx_addons.predictions_to_bigquery import executor -_MIN_THRESHOLD = 0.8 -_VOCAB_FILE = "vocab_label_txt" +_MIN_THRESHOLD = 0.0 # pylint: disable=missing-class-docstring -class AnnotateUnlabeledCategoryDataComponentSpec(types.ComponentSpec): +class PredictionsToBigQueryComponentSpec(types.ComponentSpec): PARAMETERS = { - # These are parameters that will be passed in the call to - # create an instance of this component. - "vocab_label_file": ExecutionParameter(type=str), - "bq_table_name": ExecutionParameter(type=str), - "filter_threshold": ExecutionParameter(type=float), - "table_suffix": ExecutionParameter(type=str), - "table_partitioning": ExecutionParameter(type=bool), - "expiration_time_delta": ExecutionParameter(type=int), + 'bq_table_name': ExecutionParameter(type=str), + 'gcs_temp_dir': ExecutionParameter(type=str), + 'table_expiration_days': ExecutionParameter(type=int), + 'filter_threshold': ExecutionParameter(type=float), + 'table_partitioning': ExecutionParameter(type=bool), + 'table_time_suffix': ExecutionParameter(type=str, optional=True), + 'vocab_label_file': ExecutionParameter(type=str, optional=True), } INPUTS = { - # This will be a dictionary with input artifacts, including URIs - "transform_graph": - ChannelParameter(type=standard_artifacts.TransformGraph), - "inference_results": - ChannelParameter(type=standard_artifacts.InferenceResult), - "schema": - ChannelParameter(type=standard_artifacts.Schema), + 'inference_results': + (ChannelParameter(type=standard_artifacts.InferenceResult)), + 'schema': (ChannelParameter(type=standard_artifacts.Schema, + optional=True)), + 'transform_graph': + (ChannelParameter(type=standard_artifacts.TransformGraph, + optional=True)), } OUTPUTS = { - "bigquery_export": ChannelParameter(type=standard_artifacts.String), + 'bigquery_export': ChannelParameter(type=standard_artifacts.String), } -class AnnotateUnlabeledCategoryDataComponent(base_component.BaseComponent): - """ - AnnotateUnlabeledCategoryData Component. - - The component takes the following input artifacts: - * Inference results: InferenceResult - * Transform graph: TransformGraph - * Schema: Schema (optional) if not present, the component will determine - the schema (only predtion supported at the moment) - - The component takes the following parameters: - * vocab_label_file: str - The file name of the file containing the - vocabulary labels (produced by TFT). - * bq_table_name: str - The name of the BigQuery table to write the results - to. - * filter_threshold: float - The minimum probability threshold for a - prediction to be considered a positive, thrustworthy prediction. - Default is 0.8. - * table_suffix: str (optional) - If provided, the generated datetime string - will be added the BigQuery table name as suffix. The default is %Y%m%d. - * table_partitioning: bool - Whether to partition the table by DAY. If True, - the generated BigQuery table will be partition by date. If False, no - partitioning will be applied. Default is True. - * expiration_time_delta: int (optional) - The number of seconds after which - the table will expire. +class PredictionsToBigQuery(base_component.BaseComponent): + """Predictions to BigQuery TFX component. - The component produces the following output artifacts: - * bigquery_export: String - The URI of the BigQuery table containing the - results. - """ + Exports BulkInferrer inference_results data to a BigQuery table. + """ - SPEC_CLASS = AnnotateUnlabeledCategoryDataComponentSpec - EXECUTOR_SPEC = executor_spec.BeamExecutorSpec( - AnnotateUnlabeledCategoryDataExecutor) + SPEC_CLASS = PredictionsToBigQueryComponentSpec + EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(executor.Executor) def __init__( self, - inference_results: types.Channel = None, - transform_graph: types.Channel = None, - bq_table_name: str = None, - vocab_label_file: str = _VOCAB_FILE, + inference_results: types.Channel, + bq_table_name: str, + gcs_temp_dir: str, + bigquery_export: Optional[types.Channel] = None, + transform_graph: Optional[types.Channel] = None, + schema: Optional[types.Channel] = None, + table_expiration_days: Optional[int] = 0, filter_threshold: float = _MIN_THRESHOLD, - table_suffix: str = "%Y%m%d", table_partitioning: bool = True, - schema: Optional[types.Channel] = None, - expiration_time_delta: Optional[int] = 0, - bigquery_export: Optional[types.Channel] = None, - ): - + table_time_suffix: Optional[str] = None, + vocab_label_file: Optional[str] = None, + ) -> None: + """Initialize the component. + + Args: + inference_results: TFX input channel for inference results. + bq_table_name: BigQuery table name in either PROJECT:DATASET.TABLE. + or DATASET.TABLE formats. + bigquery_export: TFX output channel containing BigQuery table name + where the results are stored. + The output table name will have the following format: + _ + where `bq_table_name` is argument of the same name and timestamp + is a timestamp string having the format given by `table_time_suffix` + argument. + transform_graph: TFX input channel containing TFTransform output + directory. + If specified, and `schema` is not specified, the prediction + input schema shall be derived from this channel. + schema: TFX input channel for the schema, which is primarily + generated by the SchemaGen component. + If specified, the prediction input schema shall be derived from this + channel. + table_expiration_days: Expiration in number of days from current time of + the output BigQuery table. + If not specified, the table does not expire by default. + filter_threshold: Prediction threshold to use to filter prediction scores. + Outputs that are below this threshold are discarded. + table_partitioning: If set to True, partition table. + See: https://cloud.google.com/bigquery/docs/partitioned-tables + table_time_suffix: Time format for table suffix in Linux strftime format. + Example: '%Y%m%d'. + vocab_label_file: Name of the TF Transform vocabulary file for mapping + string labels into integer IDs. If specified, this would be used to + get back string labels from predicted label IDs. + """ bigquery_export = bigquery_export or types.Channel( type=standard_artifacts.String) - schema = schema or types.Channel(type=standard_artifacts.Schema()) - spec = AnnotateUnlabeledCategoryDataComponentSpec( + spec = PredictionsToBigQueryComponentSpec( inference_results=inference_results, + bq_table_name=bq_table_name, + gcs_temp_dir=gcs_temp_dir, + bigquery_export=bigquery_export, transform_graph=transform_graph, schema=schema, - bq_table_name=bq_table_name, - vocab_label_file=vocab_label_file, + table_expiration_days=table_expiration_days, filter_threshold=filter_threshold, - table_suffix=table_suffix, table_partitioning=table_partitioning, - expiration_time_delta=expiration_time_delta, - bigquery_export=bigquery_export, + table_time_suffix=table_time_suffix, + vocab_label_file=vocab_label_file, ) super().__init__(spec=spec) diff --git a/tfx_addons/predictions_to_bigquery/test_component.py b/tfx_addons/predictions_to_bigquery/component_test.py similarity index 51% rename from tfx_addons/predictions_to_bigquery/test_component.py rename to tfx_addons/predictions_to_bigquery/component_test.py index f07bae40..dd8d29b1 100644 --- a/tfx_addons/predictions_to_bigquery/test_component.py +++ b/tfx_addons/predictions_to_bigquery/component_test.py @@ -1,52 +1,67 @@ # Copyright 2023 The TensorFlow Authors. All Rights Reserved. # -# Licensed under the Apache License, Version 2.0 (the "License"); +# Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, +# distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== # This code was originally written by Hannes Hapke (Digits Financial Inc.) # on Feb. 6, 2023. -""" -Tests around Digits Prediction-to-BigQuery component. -""" +"""Tests for component.py.""" + +import unittest -import tensorflow as tf from tfx.types import channel_utils, standard_artifacts -from . import component +from tfx_addons.predictions_to_bigquery import component -class ComponentTest(tf.test.TestCase): +class ComponentTest(unittest.TestCase): def setUp(self): - super(ComponentTest, self).setUp() + super().setUp() self._transform_graph = channel_utils.as_channel( [standard_artifacts.TransformGraph()]) self._inference_results = channel_utils.as_channel( [standard_artifacts.InferenceResult()]) self._schema = channel_utils.as_channel([standard_artifacts.Schema()]) - def testConstruct(self): - # not a real test, just checking if if the component can be - # instantiated - _ = component.AnnotateUnlabeledCategoryDataComponent( + def testInit(self): + component_instance = component.PredictionsToBigQuery( transform_graph=self._transform_graph, inference_results=self._inference_results, schema=self._schema, - bq_table_name="gcp_project:bq_database.table", - vocab_label_file="vocab_txt", + bq_table_name='gcp_project:bq_database.table', + gcs_temp_dir='gs://bucket/temp-dir', + vocab_label_file='vocab_txt', filter_threshold=0.1, - table_suffix="%Y", table_partitioning=False, + table_time_suffix='%Y%m%d', ) + self.assertCountEqual({ + 'inference_results', + 'schema', + 'transform_graph', + }, component_instance.inputs.keys()) + self.assertCountEqual({'bigquery_export'}, + component_instance.outputs.keys()) + self.assertCountEqual( + { + 'bq_table_name', + 'gcs_temp_dir', + 'table_expiration_days', + 'filter_threshold', + 'table_partitioning', + 'table_time_suffix', + 'vocab_label_file', + }, component_instance.exec_properties.keys()) -if __name__ == "__main__": - tf.test.main() +if __name__ == '__main__': + unittest.main() diff --git a/tfx_addons/predictions_to_bigquery/executor.py b/tfx_addons/predictions_to_bigquery/executor.py index cd722167..2a971250 100644 --- a/tfx_addons/predictions_to_bigquery/executor.py +++ b/tfx_addons/predictions_to_bigquery/executor.py @@ -17,9 +17,8 @@ """Implements executor to write BulkInferrer prediction results to BigQuery.""" import datetime -import os import re -from typing import Any, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import apache_beam as beam import numpy as np @@ -30,42 +29,61 @@ from tensorflow_serving.apis import prediction_log_pb2 from tfx import types from tfx.dsl.components.base import base_beam_executor -from tfx.types import artifact_utils +from tfx.types import Artifact, artifact_utils -# TODO(cezequiel): Move relevant functions in utils module here. from tfx_addons.predictions_to_bigquery import utils -_SCHEMA_FILE_NAME = "schema.pbtxt" _DECIMAL_PLACES = 6 _DEFAULT_TIMESTRING_FORMAT = '%Y%m%d_%H%M%S' _REQUIRED_EXEC_PROPERTIES = ( 'bq_table_name', - 'bq_dataset', 'filter_threshold', - 'gcp_project', 'gcs_temp_dir', - 'vocab_label_file', ) -_REGEX_CHARS_TO_REPLACE = re.compile(r'[^a-zA-Z0-9_]') +# Regular expression to check for a proper BigQuery table name, i.e. +# [PROJECT:]DATASET.TABLE, +# where specifying GCP PROJECT is optional. +_REGEX_BQ_TABLE_NAME = re.compile(r'^[\w-]*:?[\w_]+\.[\w_]+$') -def _check_exec_properties(exec_properties: Mapping[str, Any]) -> None: + +def _check_exec_properties(exec_properties: Dict[str, Any]) -> None: for key in _REQUIRED_EXEC_PROPERTIES: if exec_properties[key] is None: raise ValueError(f'{key} must be set in exec_properties') -def _get_labels(transform_output_uri: str, vocab_file: str) -> Sequence[str]: - tf_transform_output = tft.TFTransformOutput(transform_output_uri) - tft_vocab = tf_transform_output.vocabulary_by_name(vocab_filename=vocab_file) +def _get_prediction_log_path(inference_results: List[Artifact]) -> str: + inference_results_uri = artifact_utils.get_single_uri(inference_results) + return f'{inference_results_uri}/*.gz' + + +def _get_tft_output( + transform_graph: Optional[List[Artifact]] = None +) -> Optional[tft.TFTransformOutput]: + if not transform_graph: + return None + + transform_graph_uri = artifact_utils.get_single_uri(transform_graph) + return tft.TFTransformOutput(transform_graph_uri) + + +def _get_labels(tft_output: tft.TFTransformOutput, + vocab_file: str) -> List[str]: + tft_vocab = tft_output.vocabulary_by_name(vocab_filename=vocab_file) return [label.decode() for label in tft_vocab] -def _get_bq_table_name( - basename: str, - timestamp: Optional[datetime.datetime] = None, - timestring_format: Optional[str] = None, -) -> str: +def _check_bq_table_name(bq_table_name: str) -> None: + if _REGEX_BQ_TABLE_NAME.match(bq_table_name) is None: + raise ValueError('Invalid BigQuery table name.' + ' Specify in either `PROJECT:DATASET.TABLE` or' + ' `DATASET.TABLE` format.') + + +def _add_bq_table_name_suffix(basename: str, + timestamp: Optional[datetime.datetime] = None, + timestring_format: Optional[str] = None) -> str: if timestamp is not None: timestring_format = timestring_format or _DEFAULT_TIMESTRING_FORMAT return basename + '_' + timestamp.strftime(timestring_format) @@ -73,72 +91,39 @@ def _get_bq_table_name( def _get_additional_bq_parameters( - expiration_days: Optional[int] = None, - table_partitioning: bool = False, -) -> Mapping[str, Any]: + table_expiration_days: Optional[int] = None, + table_partitioning: Optional[bool] = False, +) -> Dict[str, Any]: output = {} if table_partitioning: time_partitioning = {'type': 'DAY'} logging.info('BigQuery table time partitioning set to DAY') - if expiration_days: - expiration_time_delta = datetime.timedelta(days=expiration_days) + if table_expiration_days: + expiration_time_delta = datetime.timedelta(days=table_expiration_days) expiration_milliseconds = expiration_time_delta.total_seconds() * 1000 logging.info( - f'BigQuery table partition expiration time set to {expiration_days}' - ' days') + f'BigQuery table expiration set to {table_expiration_days} days.') time_partitioning['expirationMs'] = expiration_milliseconds output['timePartitioning'] = time_partitioning return output -def _get_features( - *, - schema_uri: Optional[str] = None, - prediction_log_path: Optional[str] = None, -) -> Mapping[str, Any]: - if schema_uri: - schema_file = os.path.join(schema_uri, _SCHEMA_FILE_NAME) - return utils.load_schema(schema_file) - - if not prediction_log_path: - raise ValueError('Specify one of `schema_uri` or `prediction_log_path`.') - - return utils.parse_schema(prediction_log_path) - - -def _get_bq_field_name_from_key(key: str) -> str: - field_name = _REGEX_CHARS_TO_REPLACE.sub('_', key) - return re.sub('_+', '_', field_name).strip('_') - - -def _features_to_bq_schema(features: Mapping[str, Any], - required: bool = False): - bq_schema_fields_ = utils.feature_to_bq_schema(features, required=required) - bq_schema_fields = [] - for field in bq_schema_fields_: - field['name'] = _get_bq_field_name_from_key(field['name']) - bq_schema_fields.append(field) - bq_schema_fields.extend( - utils.create_annotation_fields(label_field_name="category_label", - score_field_name="score", - required=required, - add_datetime_field=True)) - return {"fields": bq_schema_fields} - - def _tensor_to_native_python_value( - tensor: Union[tf.Tensor, tf.sparse.SparseTensor] -) -> Optional[Union[int, float, str]]: + tensor: Union[tf.Tensor, tf.sparse.SparseTensor]) -> Optional[Any]: """Converts a TF Tensor to a native Python value.""" if isinstance(tensor, tf.sparse.SparseTensor): values = tensor.values.numpy() else: values = tensor.numpy() - if not values: + if not np.any(values): return None - values = np.squeeze(values) # Removes extra dimension, e.g. shape (n, 1). - values = values.item() # Converts to native Python type - if isinstance(values, Sequence) and isinstance(values[0], bytes): + # Removes any extra dimension, e.g. shape (n, 1). + values = np.squeeze(values) + try: + values = values.item() # Convert to single Python value. + except ValueError: + values = list(values) + if isinstance(values, list) and isinstance(values[0], bytes): return [v.decode('utf-8') for v in values] if isinstance(values, bytes): return values.decode('utf-8') @@ -152,37 +137,38 @@ class FilterPredictionToDictFn(beam.DoFn): """Converts a PredictionLog proto to a dict.""" def __init__( self, - labels: List, - features: Any, + features: Dict[str, tf.io.FixedLenFeature], timestamp: datetime.datetime, filter_threshold: float, + labels: Optional[List[str]] = None, score_multiplier: float = 1., ): super().__init__() - self._labels = labels self._features = features + self._timestamp = timestamp self._filter_threshold = filter_threshold + self._labels = labels self._score_multiplier = score_multiplier - self._timestamp = timestamp - def _parse_prediction(self, predictions: npt.ArrayLike): + def _parse_prediction( + self, predictions: npt.ArrayLike) -> Tuple[Optional[str], float]: prediction_id = np.argmax(predictions) logging.debug("Prediction id: %s", prediction_id) logging.debug("Predictions: %s", predictions) - label = self._labels[prediction_id] + label = self._labels[prediction_id] if self._labels is not None else None score = predictions[0][prediction_id] return label, score - def _parse_example(self, serialized: bytes) -> Mapping[str, Any]: + def _parse_example(self, serialized: bytes) -> Dict[str, Any]: parsed_example = tf.io.parse_example(serialized, self._features) output = {} for key, tensor in parsed_example.items(): - field = _get_bq_field_name_from_key(key) value = _tensor_to_native_python_value(tensor) # To add a null value to BigQuery from JSON, omit the key,value pair # with null value. if value is None: continue + field = utils.get_bq_field_name_from_key(key) output[field] = value return output @@ -190,17 +176,18 @@ def process(self, element, *args, **kwargs): # pylint: disable=missing-function del args, kwargs # unused parsed_prediction_scores = tf.make_ndarray( - element.predict_log.response.outputs["outputs"]) + element.predict_log.response.outputs['outputs']) label, score = self._parse_prediction(parsed_prediction_scores) if score >= self._filter_threshold: output = { - "category_label": label, # Workaround to issue with the score value having additional non-zero values # in higher decimal places. # e.g. 0.8 -> 0.800000011920929 - "score": round(score * self._score_multiplier, _DECIMAL_PLACES), - "datetime": self._timestamp, + 'score': round(score * self._score_multiplier, _DECIMAL_PLACES), + 'datetime': self._timestamp, } + if label is not None: + output['category_label'] = label output.update( self._parse_example( element.predict_log.request.inputs['examples'].string_val)) @@ -211,64 +198,69 @@ class Executor(base_beam_executor.BaseBeamExecutor): """Implements predictions-to-bigquery component logic.""" def Do( self, - input_dict: Mapping[str, List[types.Artifact]], - output_dict: Mapping[str, List[types.Artifact]], - exec_properties: Mapping[str, Any], + input_dict: Dict[str, List[types.Artifact]], + output_dict: Dict[str, List[types.Artifact]], + exec_properties: Dict[str, Any], ) -> None: """Do function for predictions_to_bq executor.""" - timestamp = datetime.datetime.now().replace(second=0, microsecond=0) - - # Check required keys set in exec_properties + # Check required keys set in exec_properties. _check_exec_properties(exec_properties) - # get labels from tf transform generated vocab file - labels = _get_labels( - artifact_utils.get_single_uri(input_dict['transform_graph']), - exec_properties['vocab_label_file'], - ) - logging.info(f"found the following labels from TFT vocab: {labels}") - - # set BigQuery table name and timestamp suffix if specified. - bq_table_name = _get_bq_table_name(exec_properties['bq_table_name'], - timestamp, - exec_properties['table_suffix']) - - # set prediction result file path and decoder - inference_results_uri = artifact_utils.get_single_uri( - input_dict["inference_results"]) - prediction_log_path = f"{inference_results_uri}/*.gz" + # Get prediction log file path and decoder. + prediction_log_path = _get_prediction_log_path( + input_dict['inference_results']) prediction_log_decoder = beam.coders.ProtoCoder( prediction_log_pb2.PredictionLog) - # get schema features - features = _get_features(schema_uri=artifact_utils.get_single_uri( - input_dict["schema"]), - prediction_log_path=prediction_log_path) + tft_output = _get_tft_output(input_dict.get('transform_graph')) - # generate bigquery schema from tf transform features - bq_schema = _features_to_bq_schema(features) + # Get schema features + features = utils.get_feature_spec( + schema=input_dict.get('schema'), + tft_output=tft_output, + prediction_log_path=prediction_log_path, + ) + + # Get label names from TFTransformOutput object, if applicable. + if tft_output is not None and 'vocab_label_file' in exec_properties: + label_key = exec_properties['vocab_label_file'] + labels = _get_labels(tft_output, label_key) + logging.info(f'Found the following labels from TFT vocab: {labels}.') + else: + labels = None + logging.info('No TFTransform output given; no labels parsed.') + + # Set BigQuery table name and timestamp suffix if specified. + _check_bq_table_name(exec_properties['bq_table_name']) + timestamp = datetime.datetime.now().replace(second=0, microsecond=0) + bq_table_name = _add_bq_table_name_suffix( + exec_properties['bq_table_name'], timestamp, + exec_properties.get('table_time_suffix')) + + # Generate bigquery schema from tf transform features. + add_label_field = labels is not None + bq_schema = utils.feature_spec_to_bq_schema( + features, add_label_field=add_label_field) logging.info(f'generated bq_schema: {bq_schema}.') additional_bq_parameters = _get_additional_bq_parameters( - exec_properties.get('expiration_time_delta'), + exec_properties.get('table_expiration_days'), exec_properties.get('table_partitioning')) - # run the Beam pipeline to write the inference data to bigquery + # Run the Beam pipeline to write the inference data to bigquery. with self._make_beam_pipeline() as pipeline: _ = (pipeline | 'Read Prediction Log' >> beam.io.ReadFromTFRecord( prediction_log_path, coder=prediction_log_decoder) | 'Filter and Convert to Dict' >> beam.ParDo( FilterPredictionToDictFn( - labels=labels, features=features, timestamp=timestamp, - filter_threshold=exec_properties['filter_threshold'])) - | 'Write Dict to BQ' >> beam.io.gcp.bigquery.WriteToBigQuery( + filter_threshold=exec_properties['filter_threshold'], + labels=labels)) + | 'Write Dict to BQ' >> beam.io.WriteToBigQuery( table=bq_table_name, - dataset=exec_properties['bq_dataset'], - project=exec_properties['gcp_project'], schema=bq_schema, additional_bq_parameters=additional_bq_parameters, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, @@ -279,4 +271,6 @@ def Do( output_dict['bigquery_export']) bigquery_export.set_string_custom_property('generated_bq_table_name', bq_table_name) + with tf.io.gfile.GFile(bigquery_export.uri, 'w') as output_file: + output_file.write(bq_table_name) logging.info(f'Annotated data exported to {bq_table_name}') diff --git a/tfx_addons/predictions_to_bigquery/executor_test.py b/tfx_addons/predictions_to_bigquery/executor_test.py index 38447fb4..55a9ea6f 100644 --- a/tfx_addons/predictions_to_bigquery/executor_test.py +++ b/tfx_addons/predictions_to_bigquery/executor_test.py @@ -15,7 +15,7 @@ """Tests for executor.py.""" import datetime -from typing import Mapping, Sequence, Union +from typing import Union from unittest import mock import apache_beam as beam @@ -35,7 +35,7 @@ def _create_tf_example( - features: Mapping[str, Union[bytes, float, int]]) -> tf.train.Example: + features: dict[str, Union[bytes, float, int]]) -> tf.train.Example: tf_features = {} for key, value in features.items(): if isinstance(value, bytes): @@ -59,8 +59,8 @@ def _create_model_spec() -> model_pb2.ModelSpec: def _create_predict_request( - features: Mapping[str, Union[bytes, float, int]] -) -> predict_pb2.PredictRequest: + features: dict[str, Union[bytes, float, + int]]) -> predict_pb2.PredictRequest: tf_example = _create_tf_example(features) request_tensor_proto = tf.make_tensor_proto( values=tf_example.SerializeToString(), dtype=tf.string, shape=(1, )) @@ -73,7 +73,7 @@ def _create_predict_request( def _create_predict_response( - values: Sequence[float]) -> predict_pb2.PredictResponse: + values: list[float]) -> predict_pb2.PredictResponse: response_tensor_proto = tf.make_tensor_proto(values=values, dtype=tf.float32, shape=(1, len(values))) @@ -103,10 +103,10 @@ def setUp(self): self.filter_threshold = 0.5 self.dofn = executor.FilterPredictionToDictFn( - labels=self.labels, features=self.features, timestamp=self.timestamp, filter_threshold=self.filter_threshold, + labels=self.labels, ) def test_process(self): @@ -138,6 +138,30 @@ def test_process_below_threshold(self): with self.assertRaises(StopIteration): _ = next(self.dofn.process(element)) + def test_process_no_labels(self): + features = { + 'bytes_feature': tf.io.FixedLenFeature([], dtype=tf.string), + } + dofn = executor.FilterPredictionToDictFn( + features=features, + timestamp=self.timestamp, + filter_threshold=self.filter_threshold, + labels=None, + ) + element = _create_prediction_log( + request=_create_predict_request(features={ + 'bytes_feature': b'a', + }), + response=_create_predict_response([0.9]), + ) + expected = { + 'bytes_feature': 'a', + 'score': 0.9, + 'datetime': mock.ANY, + } + output = next(dofn.process(element)) + self.assertEqual(expected, output) + def _make_artifact(uri) -> types.Artifact: artifact = types.Artifact(metadata_store_pb2.ArtifactType()) @@ -146,7 +170,7 @@ def _make_artifact(uri) -> types.Artifact: def _make_artifact_mapping( - data_dict: Mapping[str, str]) -> Mapping[str, Sequence[types.Artifact]]: + data_dict: dict[str, str]) -> dict[str, list[types.Artifact]]: return {k: [_make_artifact(v)] for k, v in data_dict.items()} @@ -168,39 +192,48 @@ def setUp(self): 'gcs_temp_dir': 'gs://bucket/temp-dir', 'expiration_time_delta': 1, 'filter_threshold': 0.5, - 'table_suffix': '%Y%m%d', + 'table_time_suffix': '%Y%m%d', 'table_partitioning': True, 'vocab_label_file': 'vocab_file', } - self.executor = executor.Executor() - + self.enter_context( + mock.patch.object(executor, '_get_prediction_log_path', autospec=True)) + self.enter_context( + mock.patch.object(executor, + '_get_tft_output', + autospec=True, + return_value=object())) + self.enter_context( + mock.patch.object(utils, 'get_feature_spec', autospec=True)) self.enter_context( mock.patch.object(executor, '_get_labels', autospec=True)) self.enter_context( - mock.patch.object(executor, '_get_bq_table_name', autospec=True)) + mock.patch.object(executor, '_check_bq_table_name', autospec=True)) + self.enter_context( + mock.patch.object(executor, '_add_bq_table_name_suffix', + autospec=True)) self.enter_context( mock.patch.object(executor, '_get_additional_bq_parameters', autospec=True)) self.enter_context( - mock.patch.object(executor, '_get_features', autospec=True)) - self.enter_context( - mock.patch.object(executor, '_features_to_bq_schema', autospec=True)) + mock.patch.object(utils, 'feature_spec_to_bq_schema', autospec=True)) self.mock_read_from_tfrecord = self.enter_context( mock.patch.object(beam.io, 'ReadFromTFRecord', autospec=True)) self.mock_pardo = self.enter_context( mock.patch.object(beam, 'ParDo', autospec=True)) self.mock_write_to_bigquery = self.enter_context( - mock.patch.object(beam.io.gcp.bigquery, - 'WriteToBigQuery', - autospec=True)) + mock.patch.object(beam.io, 'WriteToBigQuery', autospec=True)) self.enter_context( mock.patch.object(types.Artifact, 'set_string_custom_property', autospec=True)) + self.enter_context(mock.patch.object(tf.io.gfile, 'GFile', autospec=True)) + + self.executor = executor.Executor() def test_Do(self): self.executor.Do(self.input_dict, self.output_dict, self.exec_properties) @@ -215,30 +248,67 @@ def test_Do(self): class ExecutorModuleTest(parameterized.TestCase): """Tests for executor module-level functions.""" + def test_get_prediction_log_path(self): + inference_results = [_make_artifact('inference_results')] + expected = 'inference_results/*.gz' + output = executor._get_prediction_log_path(inference_results) + self.assertEqual(expected, output) + + @parameterized.named_parameters([ + ('no_inference_results', False), + ('inference_results', True), + ]) + def test_get_tft_output(self, has_transform_graph): + if has_transform_graph: + transform_graph = [_make_artifact('transform_graph')] + mock_tftransform_output = self.enter_context( + mock.patch.object(tft, 'TFTransformOutput', autospec=True)) + + _ = executor._get_tft_output(transform_graph) + + mock_tftransform_output.assert_called_once() + + else: + output = executor._get_tft_output(None) + self.assertIsNone(output) + def test_get_labels(self): mock_tftransform_output = self.enter_context( mock.patch.object(tft, 'TFTransformOutput', autospec=True)) mock_vocabulary_by_name = ( mock_tftransform_output.return_value.vocabulary_by_name) mock_vocabulary_by_name.return_value = [b'a', b'b'] - - transform_output_uri = '/path/to/transform_output' vocab_file = 'vocab' + tft_output = tft.TFTransformOutput('uri') - output = executor._get_labels(transform_output_uri, vocab_file) + output = executor._get_labels(tft_output, vocab_file) self.assertEqual(['a', 'b'], output) - mock_tftransform_output.assert_called_once_with(transform_output_uri) mock_vocabulary_by_name.assert_called_once_with(vocab_file) + @parameterized.named_parameters([ + ('project_dataset_table', 'gcp_project:bq_dataset.bq_table_name', True), + ('dataset_table', 'bq_dataset.bq_table_name', True), + ('table_only', 'bq_table_name', False) + ]) + def test_check_bq_table_name(self, bq_table_name, is_ok): + if is_ok: + try: + executor._check_bq_table_name(bq_table_name) + except ValueError: + self.fail('ValueError was raised unexpectedly.') + else: + with self.assertRaises(ValueError): + executor._check_bq_table_name(bq_table_name) + @parameterized.named_parameters([('no_timestamp', None, None), ('timestamp_no_format', _TIMESTAMP, None), ('timestamp_format', _TIMESTAMP, '%Y%m%d')]) - def test_get_bq_table_name(self, timestamp, timestring_format): + def test_add_bq_table_name_suffix(self, timestamp, timestring_format): basename = 'bq_table' - output = executor._get_bq_table_name(basename, timestamp, - timestring_format) + output = executor._add_bq_table_name_suffix(basename, timestamp, + timestring_format) if timestamp is None: expected = basename @@ -258,8 +328,8 @@ def test_get_bq_table_name(self, timestamp, timestring_format): ('table_partitioning_only', None, True), ('expiration_table_partitioning', 2, True), ]) - def test_get_additiona_bq_parameters(self, expiration_days, - table_partitioning): + def test_get_additional_bq_parameters(self, expiration_days, + table_partitioning): output = executor._get_additional_bq_parameters(expiration_days, table_partitioning) @@ -278,65 +348,20 @@ def test_get_additiona_bq_parameters(self, expiration_days, } self.assertEqual(expected, output) - @parameterized.named_parameters([ - ('error_no_input', None, None), - ('schema_uri_only', 'uri', None), - ('prediction_log_path', None, 'path'), - ('schema_uri_prediction_log_path', 'uri', 'path'), - ]) - def test_get_features(self, schema_uri, prediction_log_path): - schema = { - 'feature': tf.io.FixedLenFeature([], dtype=tf.int64), - } - mock_load_schema = self.enter_context( - mock.patch.object(utils, - 'load_schema', - autospec=True, - return_value=schema)) - mock_parse_schema = self.enter_context( - mock.patch.object(utils, - 'parse_schema', - autopspec=True, - return_value=schema)) - - if schema_uri is None and prediction_log_path is None: - with self.assertRaises(ValueError): - _ = executor._get_features(schema_uri=schema_uri, - prediction_log_path=prediction_log_path) - - else: - output = executor._get_features(schema_uri=schema_uri, - prediction_log_path=prediction_log_path) - - if schema_uri: - mock_load_schema.assert_called_once_with(mock.ANY) - mock_parse_schema.assert_not_called() - elif prediction_log_path: - mock_load_schema.assert_not_called() - mock_parse_schema.assert_called_once_with(prediction_log_path) - - self.assertEqual(schema, output) - - def test_features_to_bq_schema(self): - mock_feature_to_bq_schema = self.enter_context( - mock.patch.object(utils, 'feature_to_bq_schema', autospec=True)) - mock_create_annotation_fields = self.enter_context( - mock.patch.object(utils, - 'create_annotation_fields', - autospec=True, - return_value={})) - - features = { - 'feature': tf.io.FixedLenFeature([], dtype=tf.int64), + def test_check_exec_properties_error_key_not_found(self): + exec_properties = { + 'bq_table_name': None, + 'filter_threshold': 0.1, + 'gcs_temp_dir': 'dir', } - required = True - - output = executor._features_to_bq_schema(features, required) + with self.assertRaises(ValueError): + executor._check_exec_properties(exec_properties) - self.assertIn('fields', output) - mock_feature_to_bq_schema.assert_called_once_with(features, - required=required) - mock_create_annotation_fields.assert_called_once() + def test_tensor_to_native_python_value_bytes_list(self): + tensor = tf.constant([b'1', b'2', b'3']) + expected = ['1', '2', '3'] + output = executor._tensor_to_native_python_value(tensor) + self.assertEqual(expected, output) if __name__ == '__main__': diff --git a/tfx_addons/predictions_to_bigquery/integration_test.py b/tfx_addons/predictions_to_bigquery/integration_test.py new file mode 100644 index 00000000..73784bc9 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/integration_test.py @@ -0,0 +1,583 @@ +# Copyright 2023 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Integration test for PredictionsToBigQuery component. + +Prerequisites: + +The following environmental variables should be defined. + + GOOGLE_CLOUD_PROJECT: environmental variable must be set containing + the GCP project ID to be used for testing. + + GCS_TEMP_DIR: Cloud Storage URI to use for handling temporary files as part + of the BigQuery export process. e.g. `gs://path/to/temp/dir`. + + GCP_SERVICE_ACCOUNT_EMAIL: Service account address to use for Vertex AI + pipeline runs. The service account should be have access to Cloud + Storage and Vertex AI. Local test runs may still work without this variable. + + GCP_COMPONENT_IMAGE: Docker image repository name that would be used for + Vertex AI Pipelines integration testing. The Dockerfile associated with + this component will create a custom TFX image with the component that + should be uploaded to Artifact Registry. + A new Docker image should be uploaded whenever there are any changes + to the non-test module files of this component. + + +The following Google Cloud APIs should be enabled + + BigQuery API: For generating the BigQuery table output of this component. + + Vertex AI API: For running TFX pipeline jobs in Vertex. + + Artifact Registry API: For storing the Docker image to be used in order + to run a TFX pipeline with this component in Vertex AI. + +Vertex AI test: + +The `ComponentIntegrationTest` test class has a test to run the component +in Vertex AI Pipelines. The test is skipped by default, since it can take +several minutes to complete. You can comment out the skip decorator +(i.e. `@absltest.skip(...)`) and add similar decorators to other tests that +you don't want to run. +""" + +import datetime +import json +import logging +import os +import pathlib +import shutil +import subprocess +import tempfile +from typing import List + +import tensorflow as tf +from absl.testing import absltest, parameterized +from google.api_core import exceptions +from google.cloud import aiplatform, bigquery +from google.cloud.aiplatform import pipeline_jobs +from ml_metadata.proto import metadata_store_pb2 +from tfx import types +from tfx import v1 as tfx +from tfx.dsl.component.experimental import container_component, placeholders +from tfx.dsl.components.base import base_node +from tfx.proto import example_gen_pb2 +from tfx.types.standard_artifacts import Model, String, TransformGraph + +from tfx_addons.predictions_to_bigquery import component, executor + +_GOOGLE_CLOUD_PROJECT = os.environ['GOOGLE_CLOUD_PROJECT'] +_GCS_TEMP_DIR = os.environ['GCS_TEMP_DIR'] +_GCP_SERVICE_ACCOUNT_EMAIL = os.environ.get('GCP_SERVICE_ACCOUNT_EMAIL') +_GCP_COMPONENT_IMAGE = os.environ['GCP_COMPONENT_IMAGE'] + +_BQ_TABLE_EXPIRATION_DATE = datetime.datetime.now() + datetime.timedelta( + days=1) + +_TEST_DATA_DIR = pathlib.Path('tfx_addons/predictions_to_bigquery/testdata') + + +def _make_artifact(uri: pathlib.Path) -> types.Artifact: + artifact = types.Artifact(metadata_store_pb2.ArtifactType()) + artifact.uri = str(uri) + return artifact + + +def _make_artifact_mapping( + data_dict: dict[str, pathlib.Path]) -> dict[str, list[types.Artifact]]: + return {k: [_make_artifact(v)] for k, v in data_dict.items()} + + +class ExecutorBigQueryTest(absltest.TestCase): + """Tests executor pipeline exporting predictions to a BigQuery table. + + This test generates a BigQuery table with an expiration date of 1 day. + """ + def _assert_bq_table_exists(self, full_bq_table_name): + full_bq_table_name = full_bq_table_name.replace(':', '.') + try: + self.client.get_table(full_bq_table_name) + except exceptions.NotFound as e: + self.fail(f'BigQuery table not found: {full_bq_table_name} . ' + f'Reason: {e} .') + + def _expire_table(self, full_bq_table_name): + try: + table = self.client.get_table(full_bq_table_name) + except (ValueError, exceptions.NotFound): + logging.warning('Unable to read table: %s', full_bq_table_name) + else: + table.expires = _BQ_TABLE_EXPIRATION_DATE + self.client.update_table(table, ['expires']) + + def setUp(self): + super().setUp() + self.test_data_dir = _TEST_DATA_DIR / 'sample-tfx-output' + self.input_dict = _make_artifact_mapping({ + 'transform_graph': + (self.test_data_dir / 'Transform/transform_graph/5'), + 'inference_results': + (self.test_data_dir / 'BulkInferrer/inference_result/7'), + 'schema': + (self.test_data_dir / 'Transform/transform_graph/5/metadata'), + }) + self.gcp_project = _GOOGLE_CLOUD_PROJECT + self.bq_dataset = 'executor_bigquery_test_dataset' + self.bq_table_name = f'{self.gcp_project}:{self.bq_dataset}.predictions' + self.client = bigquery.Client() + self.client.create_dataset(dataset=self.bq_dataset, exists_ok=True) + self.exec_properties = { + 'bq_table_name': self.bq_table_name, + 'table_expiration_days': 5, + 'filter_threshold': 0.5, + 'gcs_temp_dir': _GCS_TEMP_DIR, + 'table_partitioning': False, + 'table_time_suffix': '%Y%m%d%H%M%S', + 'vocab_label_file': 'Species', + } + self.generated_bq_table_name = None + + self.executor = executor.Executor() + + def tearDown(self): + super().tearDown() + if self.generated_bq_table_name: + self._expire_table(self.generated_bq_table_name) + + def test_Do(self): + with tempfile.NamedTemporaryFile() as output_file: + output_dict = _make_artifact_mapping( + {'bigquery_export': pathlib.Path(output_file.name)}) + + self.executor.Do(self.input_dict, output_dict, self.exec_properties) + + with open(output_file.name, encoding='utf-8') as input_file: + self.generated_bq_table_name = input_file.read() + + self.generated_bq_table_name = (str( + self.generated_bq_table_name).replace(':', '.')) + self._assert_bq_table_exists(self.generated_bq_table_name) + + +def _gcs_path_exists(gcs_path: str) -> bool: + files = tf.io.gfile.glob(gcs_path + '/*') + return bool(files) + + +def _copy_local_dir_to_gcs(local_dir: str, gcs_path: str): + subprocess.check_call(f'gsutil -m cp -r {local_dir} {gcs_path}', shell=True) + + +@tfx.dsl.components.component +def _transform_function_component( + transform_graph: tfx.dsl.components.OutputArtifact[TransformGraph], + transform_dir: tfx.dsl.components.Parameter[str], +): + """TFX Transform component stub.""" + os.makedirs(transform_graph.uri, exist_ok=True) + shutil.copytree(transform_dir, transform_graph.uri, dirs_exist_ok=True) + + +def _create_transform_container_component_class(): + return container_component.create_container_component( + name='TransformContainerComponent', + inputs={}, + outputs={ + 'transform_graph': TransformGraph, + }, + parameters={ + 'transform_dir': str, + }, + image='google/cloud-sdk:latest', + command=[ + 'sh', + '-exc', + ''' + transform_dir="$0" + transform_graph_uri="$1" + gsutil cp -r $transform_dir/* $transform_graph_uri/ + ''', + placeholders.InputValuePlaceholder('transform_dir'), + placeholders.OutputUriPlaceholder('transform_graph'), + ], + ) + + +def _transform_component(transform_dir: str): + if transform_dir.startswith('gs://'): + transform_class = _create_transform_container_component_class() + transform = transform_class(transform_dir=transform_dir) + else: + transform = _transform_function_component(transform_dir=transform_dir) + return transform + + +@tfx.dsl.components.component +def _saved_model_function_component( + model: tfx.dsl.components.OutputArtifact[Model], + saved_model_dir: tfx.dsl.components.Parameter[str], +): + """Creates a component that outputs a TF saved model.""" + target_dir = os.path.join(model.uri, 'Format-Serving') + os.makedirs(target_dir, exist_ok=True) + shutil.copytree(saved_model_dir, target_dir, dirs_exist_ok=True) + + +def _create_saved_model_container_component_class(): + return container_component.create_container_component( + name='SavedModelContainerComponent', + inputs={}, + outputs={ + 'model': Model, + }, + parameters={ + 'saved_model_dir': str, + }, + image='google/cloud-sdk:latest', + command=[ + 'sh', + '-exc', + ''' + saved_model_dir="$0" + model_uri="$1" + gsutil cp -r $saved_model_dir $model_uri/ + ''', + placeholders.InputValuePlaceholder('saved_model_dir'), + placeholders.OutputUriPlaceholder('model'), + ], + ) + + +def _saved_model_component(saved_model_dir: str): + if saved_model_dir.startswith('gs://'): + saved_model_component_class = ( + _create_saved_model_container_component_class()) + saved_model = saved_model_component_class(saved_model_dir=saved_model_dir) + else: + saved_model = _saved_model_function_component( + saved_model_dir=saved_model_dir) + return saved_model + + +@tfx.dsl.components.component +def _get_output_function_component( + bigquery_export: tfx.dsl.components.InputArtifact[String], + output_filepath: tfx.dsl.components.Parameter[str], +): + """Copies component-under-test output to `output_filepath`.""" + with tf.io.gfile.GFile(bigquery_export.uri) as input_file: + bq_table_name = input_file.read() + with tf.io.gfile.GFile(output_filepath, 'w') as output_file: + output = { + 'generated_bq_table_name': bq_table_name, + } + json.dump(output, output_file) + + +def _create_get_output_container_component_class(): + return container_component.create_container_component( + name='BigQueryExportContainerComponent', + inputs={ + 'bigquery_export': String, + }, + parameters={ + 'output_path': str, + }, + image='google/cloud-sdk:latest', + command=[ + 'sh', + '-exc', + ''' + apt install -y jq + bigquery_export_uri="$0" + local_bigquery_export_path=$(mktemp) + local_output_path=$(mktemp) + output_path="$1" + gsutil cp $bigquery_export_uri $local_bigquery_export_path + bq_table_name=$(cat $local_bigquery_export_path) + jq --null-input \ + --arg bq_table_name "$bq_table_name" \ + '{"generated_bq_table_name": $bq_table_name}' \ + > $local_output_path + gsutil cp -r $local_output_path $output_path + ''', + placeholders.InputUriPlaceholder('bigquery_export'), + placeholders.InputValuePlaceholder('output_path'), + ], + ) + + +def _get_output_component(output_channel, output_file): + if output_file.startswith('gs://'): + get_output_class = _create_get_output_container_component_class() + output_component = get_output_class(bigquery_export=output_channel, + output_path=output_file) + else: + output_component = _get_output_function_component( + bigquery_export=output_channel, output_filepath=output_file) + return output_component + + +class ComponentIntegrationTest(parameterized.TestCase): + """Tests component integration with other TFX components/services. + + This test generates a BigQuery table with an expiration date of 1 day. + """ + def setUp(self): + super().setUp() + # Pipeline config + self.pipeline_name = 'component-integration-test' + self.test_file = 'test-tiny.csv' + self.gcs_temp_dir = _GCS_TEMP_DIR + self.dataset_name = 'penguins-dataset' + self.saved_model_path = 'sample-tfx-output/Trainer/model/6/Format-Serving' + self.transform_path = 'sample-tfx-output/Transform/transform_graph/5' + + # Vertex Pipeline config + self.service_account = _GCP_SERVICE_ACCOUNT_EMAIL + self.location = os.environ.get('GCP_REGION') or 'us-central1' + + # GCP config + self.gcp_project = _GOOGLE_CLOUD_PROJECT + self.bq_dataset = 'component_integration_test_dataset' + self.bq_table_name = f'{self.gcp_project}:{self.bq_dataset}.predictions' + self.client = bigquery.Client() + self.client.create_dataset(dataset=self.bq_dataset, exists_ok=True) + + # Test config + self.generated_bq_table_name = None + + def tearDown(self): + super().tearDown() + if self.generated_bq_table_name is not None: + self._expire_table(self.generated_bq_table_name) + + def _add_test_label_to_table(self, table): + labels = {'test_method_name': self._testMethodName} + table.labels = labels + self.client.update_table(table, ['labels']) + + def _expire_table(self, full_bq_table_name): + full_bq_table_name = full_bq_table_name.replace(':', '.') + try: + table = self.client.get_table(full_bq_table_name) + except (ValueError, exceptions.NotFound): + logging.warning('Unable to read table: %s', full_bq_table_name) + else: + table.expires = _BQ_TABLE_EXPIRATION_DATE + table = self.client.update_table(table, ['expires']) + self._add_test_label_to_table(table) + + def _create_gcs_tempfile(self) -> str: + timestamp = datetime.datetime.now().strftime('%Y%m%d-%H%M%S') + self.gcs_temp_file = os.path.join(_GCS_TEMP_DIR, 'pipeline-outputs', + f'output-{timestamp}') + return self.gcs_temp_file + + def _create_upstream_component_map(self, use_gcs=False): + if use_gcs: + gcs_test_data_dir = os.path.join(_GCS_TEMP_DIR, _TEST_DATA_DIR.stem) + if not _gcs_path_exists(gcs_test_data_dir): + # Copy test files to GCS + # NOTE: If local `testdata` files are updated, forcing a copy to the + # GCS mirror directory may be needed. + _copy_local_dir_to_gcs(str(_TEST_DATA_DIR), _GCS_TEMP_DIR) + dataset_dir = os.path.join(gcs_test_data_dir, self.dataset_name) + saved_model_dir = os.path.join(gcs_test_data_dir, self.saved_model_path) + transform_dir = os.path.join(gcs_test_data_dir, self.transform_path) + else: + dataset_dir = str(_TEST_DATA_DIR / self.dataset_name) + saved_model_dir = str(_TEST_DATA_DIR / self.saved_model_path) + transform_dir = str(_TEST_DATA_DIR / self.transform_path) + + test_split = example_gen_pb2.Input.Split(name='test', + pattern=f'test/{self.test_file}') + example_gen = tfx.components.CsvExampleGen( + input_base=dataset_dir, + input_config=example_gen_pb2.Input( + splits=[test_split])).with_id('UnlabeledExampleGen') + + transform = _transform_component(transform_dir=transform_dir) + + statistics_gen = tfx.components.StatisticsGen( + examples=example_gen.outputs['examples'], ) + + schema_gen = tfx.components.SchemaGen( + statistics=statistics_gen.outputs['statistics'], ) + + saved_model = _saved_model_component(saved_model_dir) + + bulk_inferrer = tfx.components.BulkInferrer( + examples=example_gen.outputs['examples'], + model=saved_model.outputs['model'], + data_spec=tfx.proto.DataSpec(), + model_spec=tfx.proto.ModelSpec(), + ) + + return { + 'example_gen': example_gen, + 'statistics_gen': statistics_gen, + 'schema_gen': schema_gen, + 'transform': transform, + 'saved_model': saved_model, + 'bulk_inferrer': bulk_inferrer, + } + + def _create_pipeline(self, + component_under_test: base_node.BaseNode, + upstream_components: List[base_node.BaseNode], + output_file: str, + pipeline_dir: str, + metadata_connection_config=None): + output_component = _get_output_component( + component_under_test.outputs['bigquery_export'], output_file) + components = (upstream_components + + [component_under_test, output_component]) + return tfx.dsl.Pipeline( + pipeline_name=self.pipeline_name, + pipeline_root=pipeline_dir, + components=components, + metadata_connection_config=metadata_connection_config) + + def _run_local_pipeline(self, pipeline): + assert pipeline.metadata_connection_config is not None + return tfx.orchestration.LocalDagRunner().run(pipeline) + + def _run_vertex_pipeline(self, pipeline): + pipeline_definition_file = os.path.join( + '/tmp', f'{self.pipeline_name}-pipeline.json') + runner = tfx.orchestration.experimental.KubeflowV2DagRunner( + config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig( + default_image=_GCP_COMPONENT_IMAGE), + output_filename=pipeline_definition_file) + runner.run(pipeline) + + aiplatform.init(project=_GOOGLE_CLOUD_PROJECT, location=self.location) + job = pipeline_jobs.PipelineJob(template_path=pipeline_definition_file, + display_name=self.pipeline_name) + return job.run(service_account=self.service_account, sync=True) + + def _check_output(self, output_file: str): + with tf.io.gfile.GFile(output_file) as output_file_handler: + output = json.load(output_file_handler) + + self.generated_bq_table_name = output['generated_bq_table_name'] + self.assertStartsWith(self.generated_bq_table_name, self.bq_table_name) + + @parameterized.named_parameters([ + ( + 'inference_results_only', + False, + False, + ), + ('inference_results_schema', True, False), + ('inference_results_transform', False, True), + ('inference_results_schema_transform', True, True), + ]) + def test_local_pipeline(self, add_schema, add_transform): + """Tests component using a local pipeline runner.""" + upstream = self._create_upstream_component_map() + upstream_components = [ + upstream['example_gen'], + upstream['saved_model'], + upstream['bulk_inferrer'], + ] + + if add_schema: + upstream_components.append(upstream['statistics_gen']) + upstream_components.append(upstream['schema_gen']) + schema = upstream['schema_gen'].outputs['schema'] + else: + schema = None + + if add_transform: + transform_graph = upstream['transform'].outputs['transform_graph'] + upstream_components.append(upstream['transform']) + vocab_label_file = 'Species' + else: + transform_graph = None + vocab_label_file = None + + component_under_test = component.PredictionsToBigQuery( + inference_results=( + upstream['bulk_inferrer'].outputs['inference_result']), + transform_graph=transform_graph, + schema=schema, + bq_table_name=self.bq_table_name, + gcs_temp_dir=self.gcs_temp_dir, + vocab_label_file=vocab_label_file, + ) + + with tempfile.TemporaryDirectory() as pipeline_dir, \ + tempfile.NamedTemporaryFile() as output_file, \ + tempfile.NamedTemporaryFile() as metadata_path: + metadata_connection_config = ( + tfx.orchestration.metadata.sqlite_metadata_connection_config( + metadata_path.name)) + + pipeline = self._create_pipeline( + component_under_test, + upstream_components, + output_file.name, + pipeline_dir, + metadata_connection_config, + ) + self._run_local_pipeline(pipeline) + + self._check_output(output_file.name) + + @absltest.skip('long-running test') + def test_vertex_pipeline(self): + """Tests component using Vertex AI Pipelines. + + This tests the case where a Transform component is used for the input + schema. + """ + upstream = self._create_upstream_component_map(use_gcs=True) + upstream_components = [ + upstream['example_gen'], + upstream['transform'], + upstream['saved_model'], + upstream['bulk_inferrer'], + ] + transform_graph = upstream['transform'].outputs['transform_graph'] + vocab_label_file = 'Species' + + component_under_test = component.PredictionsToBigQuery( + inference_results=( + upstream['bulk_inferrer'].outputs['inference_result']), + transform_graph=transform_graph, + bq_table_name=self.bq_table_name, + gcs_temp_dir=self.gcs_temp_dir, + vocab_label_file=vocab_label_file, + ) + + output_file = self._create_gcs_tempfile() + pipeline_dir = os.path.join(_GCS_TEMP_DIR, 'pipeline-root') + + pipeline = self._create_pipeline( + component_under_test, + upstream_components, + output_file, + pipeline_dir, + ) + self._run_vertex_pipeline(pipeline) + + self._check_output(output_file) + + +if __name__ == '__main__': + absltest.main() diff --git a/tfx_addons/predictions_to_bigquery/testdata/penguins-dataset/test/test-tiny.csv b/tfx_addons/predictions_to_bigquery/testdata/penguins-dataset/test/test-tiny.csv new file mode 100644 index 00000000..086441e9 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/penguins-dataset/test/test-tiny.csv @@ -0,0 +1,4 @@ +studyName,Sample Number,Species,Region,Island,Stage,Individual ID,Clutch Completion,Date Egg,Culmen Length (mm),Culmen Depth (mm),Flipper Length (mm),Body Mass (g),Sex,Delta 15 N (o/oo),Delta 13 C (o/oo),Comments +PAL0708,1,Adelie Penguin (Pygoscelis adeliae),Anvers,Torgersen,"Adult, 1 Egg Stage",N1A1,Yes,11/11/07,39.1,18.7,181,3750,MALE,,,Not enough blood for isotopes. +PAL0809,44,Chinstrap penguin (Pygoscelis antarctica),Anvers,Dream,"Adult, 1 Egg Stage",N75A2,Yes,11/14/08,45.5,17,196,3500,FEMALE,9.36493,-24.66259, +PAL0910,124,Gentoo penguin (Pygoscelis papua),Anvers,Biscoe,"Adult, 1 Egg Stage",N43A2,Yes,11/22/09,49.9,16.1,213,5400,MALE,8.3639,-26.15531, diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz new file mode 100644 index 00000000..058b96b6 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/assets/Species b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/assets/Species new file mode 100644 index 00000000..d919d4f2 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/assets/Species @@ -0,0 +1,3 @@ +Adelie Penguin (Pygoscelis adeliae) +Gentoo penguin (Pygoscelis papua) +Chinstrap penguin (Pygoscelis antarctica) diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/fingerprint.pb b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/fingerprint.pb new file mode 100644 index 00000000..079090b8 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/fingerprint.pb differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/keras_metadata.pb b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/keras_metadata.pb new file mode 100644 index 00000000..6832974f --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/keras_metadata.pb @@ -0,0 +1,13 @@ + +À3root"_tf_keras_network*ž3{"name": "model_1", "trainable": true, "expects_training_arg": true, "dtype": "float32", "batch_input_shape": null, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": false, "class_name": "Functional", "config": {"name": "model_1", "layers": [{"class_name": "InputLayer", "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "culmen_length_mm"}, "name": "culmen_length_mm", "inbound_nodes": []}, {"class_name": "InputLayer", "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "culmen_depth_mm"}, "name": "culmen_depth_mm", "inbound_nodes": []}, {"class_name": "InputLayer", "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "flipper_length_mm"}, "name": "flipper_length_mm", "inbound_nodes": []}, {"class_name": "InputLayer", "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "body_mass_g"}, "name": "body_mass_g", "inbound_nodes": []}, {"class_name": "Concatenate", "config": {"name": "concatenate_1", "trainable": true, "dtype": "float32", "axis": -1}, "name": "concatenate_1", "inbound_nodes": [[["culmen_length_mm", 0, 0, {}], ["culmen_depth_mm", 0, 0, {}], ["flipper_length_mm", 0, 0, {}], ["body_mass_g", 0, 0, {}]]]}, {"class_name": "Dense", "config": {"name": "dense_3", "trainable": true, "dtype": "float32", "units": 8, "activation": "relu", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}}, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "name": "dense_3", "inbound_nodes": [[["concatenate_1", 0, 0, {}]]]}, {"class_name": "Dense", "config": {"name": "dense_4", "trainable": true, "dtype": "float32", "units": 8, "activation": "relu", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}}, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "name": "dense_4", "inbound_nodes": [[["dense_3", 0, 0, {}]]]}, {"class_name": "Dense", "config": {"name": "dense_5", "trainable": true, "dtype": "float32", "units": 3, "activation": "linear", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}}, "bias_initializer": {"class_name": "Zeros", "config": {}}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "name": "dense_5", "inbound_nodes": [[["dense_4", 0, 0, {}]]]}, {"class_name": "TensorFlowTransform>TransformFeaturesLayer", "config": {"layer was saved without config": true}, "name": "transform_features_layer_1", "inbound_nodes": []}], "input_layers": [["culmen_length_mm", 0, 0], ["culmen_depth_mm", 0, 0], ["flipper_length_mm", 0, 0], ["body_mass_g", 0, 0]], "output_layers": [["dense_5", 0, 0]]}, "shared_object_id": 14, "input_spec": [{"class_name": "InputSpec", "config": {"dtype": null, "shape": {"class_name": "__tuple__", "items": [null, 1]}, "ndim": 2, "max_ndim": null, "min_ndim": null, "axes": {}}}, {"class_name": "InputSpec", "config": {"dtype": null, "shape": {"class_name": "__tuple__", "items": [null, 1]}, "ndim": 2, "max_ndim": null, "min_ndim": null, "axes": {}}}, {"class_name": "InputSpec", "config": {"dtype": null, "shape": {"class_name": "__tuple__", "items": [null, 1]}, "ndim": 2, "max_ndim": null, "min_ndim": null, "axes": {}}}, {"class_name": "InputSpec", "config": {"dtype": null, "shape": {"class_name": "__tuple__", "items": [null, 1]}, "ndim": 2, "max_ndim": null, "min_ndim": null, "axes": {}}}], "build_input_shape": [{"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}], "is_graph_network": true, "full_save_spec": {"class_name": "__tuple__", "items": [[[{"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "culmen_length_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "culmen_depth_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "flipper_length_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "body_mass_g"]}]], {}]}, "save_spec": [{"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "culmen_length_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "culmen_depth_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "flipper_length_mm"]}, {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "float32", "body_mass_g"]}], "keras_version": "2.11.0", "backend": "tensorflow", "model_config": {"class_name": "Functional"}, "training_config": {"loss": {"class_name": "SparseCategoricalCrossentropy", "config": {"reduction": "auto", "name": "sparse_categorical_crossentropy", "from_logits": true, "ignore_class": null}, "shared_object_id": 19}, "metrics": [[{"class_name": "SparseCategoricalAccuracy", "config": {"name": "sparse_categorical_accuracy", "dtype": "float32"}, "shared_object_id": 20}]], "weighted_metrics": null, "loss_weights": null, "optimizer_config": {"class_name": "Custom>Adam", "config": {"name": "Adam", "weight_decay": null, "clipnorm": null, "global_clipnorm": null, "clipvalue": null, "use_ema": false, "ema_momentum": 0.99, "ema_overwrite_frequency": null, "jit_compile": false, "is_legacy_optimizer": false, "learning_rate": 0.009999999776482582, "beta_1": 0.9, "beta_2": 0.999, "epsilon": 1e-07, "amsgrad": false}}}}2 +ˆ root.layer-0"_tf_keras_input_layer*Ø{"class_name": "InputLayer", "name": "culmen_length_mm", "dtype": "float32", "sparse": false, "ragged": false, "batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "culmen_length_mm"}}2 +† root.layer-1"_tf_keras_input_layer*Ö{"class_name": "InputLayer", "name": "culmen_depth_mm", "dtype": "float32", "sparse": false, "ragged": false, "batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "culmen_depth_mm"}}2 +Š root.layer-2"_tf_keras_input_layer*Ú{"class_name": "InputLayer", "name": "flipper_length_mm", "dtype": "float32", "sparse": false, "ragged": false, "batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "flipper_length_mm"}}2 +þ root.layer-3"_tf_keras_input_layer*Î{"class_name": "InputLayer", "name": "body_mass_g", "dtype": "float32", "sparse": false, "ragged": false, "batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "config": {"batch_input_shape": {"class_name": "__tuple__", "items": [null, 1]}, "dtype": "float32", "sparse": false, "ragged": false, "name": "body_mass_g"}}2 +˜ root.layer-4"_tf_keras_layer*î{"name": "concatenate_1", "trainable": true, "expects_training_arg": false, "dtype": "float32", "batch_input_shape": null, "stateful": false, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": true, "class_name": "Concatenate", "config": {"name": "concatenate_1", "trainable": true, "dtype": "float32", "axis": -1}, "inbound_nodes": [[["culmen_length_mm", 0, 0, {}], ["culmen_depth_mm", 0, 0, {}], ["flipper_length_mm", 0, 0, {}], ["body_mass_g", 0, 0, {}]]], "shared_object_id": 4, "build_input_shape": [{"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}, {"class_name": "TensorShape", "items": [null, 1]}]}2 +³root.layer_with_weights-0"_tf_keras_layer*ü{"name": "dense_3", "trainable": true, "expects_training_arg": false, "dtype": "float32", "batch_input_shape": null, "stateful": false, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": true, "class_name": "Dense", "config": {"name": "dense_3", "trainable": true, "dtype": "float32", "units": 8, "activation": "relu", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}, "shared_object_id": 5}, "bias_initializer": {"class_name": "Zeros", "config": {}, "shared_object_id": 6}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "inbound_nodes": [[["concatenate_1", 0, 0, {}]]], "shared_object_id": 7, "input_spec": {"class_name": "InputSpec", "config": {"dtype": null, "shape": null, "ndim": null, "max_ndim": null, "min_ndim": 2, "axes": {"-1": 4}}, "shared_object_id": 21}, "build_input_shape": {"class_name": "TensorShape", "items": [null, 4]}}2 +®root.layer_with_weights-1"_tf_keras_layer*÷{"name": "dense_4", "trainable": true, "expects_training_arg": false, "dtype": "float32", "batch_input_shape": null, "stateful": false, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": true, "class_name": "Dense", "config": {"name": "dense_4", "trainable": true, "dtype": "float32", "units": 8, "activation": "relu", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}, "shared_object_id": 8}, "bias_initializer": {"class_name": "Zeros", "config": {}, "shared_object_id": 9}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "inbound_nodes": [[["dense_3", 0, 0, {}]]], "shared_object_id": 10, "input_spec": {"class_name": "InputSpec", "config": {"dtype": null, "shape": null, "ndim": null, "max_ndim": null, "min_ndim": 2, "axes": {"-1": 8}}, "shared_object_id": 22}, "build_input_shape": {"class_name": "TensorShape", "items": [null, 8]}}2 +²root.layer_with_weights-2"_tf_keras_layer*û{"name": "dense_5", "trainable": true, "expects_training_arg": false, "dtype": "float32", "batch_input_shape": null, "stateful": false, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": true, "class_name": "Dense", "config": {"name": "dense_5", "trainable": true, "dtype": "float32", "units": 3, "activation": "linear", "use_bias": true, "kernel_initializer": {"class_name": "GlorotUniform", "config": {"seed": null}, "shared_object_id": 11}, "bias_initializer": {"class_name": "Zeros", "config": {}, "shared_object_id": 12}, "kernel_regularizer": null, "bias_regularizer": null, "activity_regularizer": null, "kernel_constraint": null, "bias_constraint": null}, "inbound_nodes": [[["dense_4", 0, 0, {}]]], "shared_object_id": 13, "input_spec": {"class_name": "InputSpec", "config": {"dtype": null, "shape": null, "ndim": null, "max_ndim": null, "min_ndim": 2, "axes": {"-1": 8}}, "shared_object_id": 23}, "build_input_shape": {"class_name": "TensorShape", "items": [null, 8]}}2 +€6  root.layer-8"_tf_keras_model*Ö5{"name": "transform_features_layer_1", "trainable": false, "expects_training_arg": false, "dtype": "float32", "batch_input_shape": null, "must_restore_from_config": false, "preserve_input_structure_in_config": false, "autocast": true, "class_name": "TensorFlowTransform>TransformFeaturesLayer", "config": {"layer was saved without config": true}, "build_input_shape": {"Body Mass (g)": {"class_name": "TensorShape", "items": [null, null]}, "Comments": {"class_name": "TensorShape", "items": [null, null]}, "Culmen Depth (mm)": {"class_name": "TensorShape", "items": [null, null]}, "Culmen Length (mm)": {"class_name": "TensorShape", "items": [null, null]}, "Delta 13 C (o/oo)": {"class_name": "TensorShape", "items": [null, null]}, "Delta 15 N (o/oo)": {"class_name": "TensorShape", "items": [null, null]}, "Flipper Length (mm)": {"class_name": "TensorShape", "items": [null, null]}, "Sex": {"class_name": "TensorShape", "items": [null, null]}, "Clutch Completion": {"class_name": "TensorShape", "items": [null, 1]}, "Date Egg": {"class_name": "TensorShape", "items": [null, 1]}, "Individual ID": {"class_name": "TensorShape", "items": [null, 1]}, "Island": {"class_name": "TensorShape", "items": [null, 1]}, "Region": {"class_name": "TensorShape", "items": [null, 1]}, "Sample Number": {"class_name": "TensorShape", "items": [null, 1]}, "Stage": {"class_name": "TensorShape", "items": [null, 1]}, "studyName": {"class_name": "TensorShape", "items": [null, 1]}}, "is_graph_network": false, "full_save_spec": {"class_name": "__tuple__", "items": [[{"Body Mass (g)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "int64"]}, "Comments": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "string"]}, "Culmen Depth (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Culmen Length (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Delta 13 C (o/oo)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Delta 15 N (o/oo)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Flipper Length (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "int64"]}, "Sex": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "string"]}, "Clutch Completion": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Clutch Completion"]}, "Date Egg": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Date Egg"]}, "Individual ID": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Individual ID"]}, "Island": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Island"]}, "Region": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Region"]}, "Sample Number": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "int64", "Sample Number"]}, "Stage": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Stage"]}, "studyName": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "studyName"]}}], {}]}, "save_spec": {"Body Mass (g)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "int64"]}, "Comments": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "string"]}, "Culmen Depth (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Culmen Length (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Delta 13 C (o/oo)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Delta 15 N (o/oo)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "float32"]}, "Flipper Length (mm)": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "int64"]}, "Sex": {"class_name": "TypeSpec", "type_spec": "tf.SparseTensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, null]}, "string"]}, "Clutch Completion": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Clutch Completion"]}, "Date Egg": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Date Egg"]}, "Individual ID": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Individual ID"]}, "Island": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Island"]}, "Region": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Region"]}, "Sample Number": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "int64", "Sample Number"]}, "Stage": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "Stage"]}, "studyName": {"class_name": "TypeSpec", "type_spec": "tf.TensorSpec", "serialized": [{"class_name": "TensorShape", "items": [null, 1]}, "string", "studyName"]}}, "keras_version": "2.11.0", "backend": "tensorflow", "model_config": {"class_name": "TransformFeaturesLayer"}}2 +¹vroot.keras_api.metrics.0"_tf_keras_metric*‚{"class_name": "Mean", "name": "loss", "dtype": "float32", "config": {"name": "loss", "dtype": "float32"}, "shared_object_id": 24}2 +üwroot.keras_api.metrics.1"_tf_keras_metric*Å{"class_name": "SparseCategoricalAccuracy", "name": "sparse_categorical_accuracy", "dtype": "float32", "config": {"name": "sparse_categorical_accuracy", "dtype": "float32"}, "shared_object_id": 20}2 \ No newline at end of file diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/saved_model.pb b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/saved_model.pb new file mode 100644 index 00000000..81c87235 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/saved_model.pb differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.data-00000-of-00001 b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.data-00000-of-00001 new file mode 100644 index 00000000..530bfdb7 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.data-00000-of-00001 differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.index b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.index new file mode 100644 index 00000000..c25f85b5 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Trainer/model/6/Format-Serving/variables/variables.index differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/metadata/schema.pbtxt b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/metadata/schema.pbtxt new file mode 100644 index 00000000..332bbd3c --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/metadata/schema.pbtxt @@ -0,0 +1,290 @@ +feature { + name: "Body Mass (g)" + type: INT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Clutch Completion" + type: BYTES + domain: "Clutch Completion" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Comments" + type: BYTES + domain: "Comments" + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Culmen Depth (mm)" + type: FLOAT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Culmen Length (mm)" + type: FLOAT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Date Egg" + type: BYTES + domain: "Date Egg" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Delta 13 C (o/oo)" + type: FLOAT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Delta 15 N (o/oo)" + type: FLOAT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Flipper Length (mm)" + type: INT + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Individual ID" + type: BYTES + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Island" + type: BYTES + domain: "Island" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Region" + type: BYTES + domain: "Region" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Sample Number" + type: INT + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Sex" + type: BYTES + domain: "Sex" + presence { + min_fraction: 1.0 + min_count: 1 + } +} +feature { + name: "Species" + type: BYTES + domain: "Species" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "Stage" + type: BYTES + domain: "Stage" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +feature { + name: "studyName" + type: BYTES + domain: "studyName" + presence { + min_fraction: 1.0 + min_count: 1 + } + shape { + dim { + size: 1 + } + } +} +string_domain { + name: "Clutch Completion" + value: "No" + value: "Yes" +} +string_domain { + name: "Comments" + value: "Adult not sampled." + value: "Nest never observed with full clutch." + value: "Nest never observed with full clutch. Not enough blood for isotopes." + value: "No blood sample obtained for sexing." + value: "No blood sample obtained." + value: "Not enough blood for isotopes." + value: "Sexing primers did not amplify. Not enough blood for isotopes." +} +string_domain { + name: "Date Egg" + value: "11/10/07" + value: "11/10/08" + value: "11/10/09" + value: "11/11/07" + value: "11/11/08" + value: "11/12/07" + value: "11/12/09" + value: "11/13/07" + value: "11/13/08" + value: "11/13/09" + value: "11/14/08" + value: "11/15/07" + value: "11/15/08" + value: "11/15/09" + value: "11/16/07" + value: "11/16/09" + value: "11/17/08" + value: "11/17/09" + value: "11/18/09" + value: "11/19/07" + value: "11/19/09" + value: "11/2/08" + value: "11/20/09" + value: "11/21/07" + value: "11/21/09" + value: "11/22/07" + value: "11/22/09" + value: "11/23/09" + value: "11/24/08" + value: "11/25/08" + value: "11/25/09" + value: "11/26/07" + value: "11/27/07" + value: "11/27/09" + value: "11/28/07" + value: "11/29/07" + value: "11/3/08" + value: "11/30/07" + value: "11/4/08" + value: "11/6/08" + value: "11/7/08" + value: "11/8/08" + value: "11/9/07" + value: "11/9/08" + value: "11/9/09" + value: "12/1/09" + value: "12/3/07" + value: "11/14/09" + value: "11/18/07" + value: "11/5/08" +} +string_domain { + name: "Island" + value: "Biscoe" + value: "Dream" + value: "Torgersen" +} +string_domain { + name: "Region" + value: "Anvers" +} +string_domain { + name: "Sex" + value: "FEMALE" + value: "MALE" + value: "." +} +string_domain { + name: "Species" + value: "Adelie Penguin (Pygoscelis adeliae)" + value: "Chinstrap penguin (Pygoscelis antarctica)" + value: "Gentoo penguin (Pygoscelis papua)" +} +string_domain { + name: "Stage" + value: "Adult, 1 Egg Stage" +} +string_domain { + name: "studyName" + value: "PAL0708" + value: "PAL0809" + value: "PAL0910" +} diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/assets/Species b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/assets/Species new file mode 100644 index 00000000..d919d4f2 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/assets/Species @@ -0,0 +1,3 @@ +Adelie Penguin (Pygoscelis adeliae) +Gentoo penguin (Pygoscelis papua) +Chinstrap penguin (Pygoscelis antarctica) diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/fingerprint.pb b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/fingerprint.pb new file mode 100644 index 00000000..d8f6e4a8 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/fingerprint.pb differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/saved_model.pb b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/saved_model.pb new file mode 100644 index 00000000..c03d97d8 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/saved_model.pb differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.data-00000-of-00001 b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.data-00000-of-00001 new file mode 100644 index 00000000..dfb85a80 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.data-00000-of-00001 differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.index b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.index new file mode 100644 index 00000000..b238b806 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transform_fn/variables/variables.index differ diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/asset_map b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/asset_map new file mode 100644 index 00000000..3b7264a4 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/asset_map @@ -0,0 +1 @@ +{"Species": "Species"} \ No newline at end of file diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/schema.pbtxt b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/schema.pbtxt new file mode 100644 index 00000000..d3845d77 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/Transform/transform_graph/5/transformed_metadata/schema.pbtxt @@ -0,0 +1,33 @@ +feature { + name: "body_mass_g" + type: INT +} +feature { + name: "culmen_depth_mm" + type: FLOAT +} +feature { + name: "culmen_length_mm" + type: FLOAT +} +feature { + name: "flipper_length_mm" + type: INT +} +feature { + name: "species" + type: INT + int_domain { + min: 0 + max: 2 + is_categorical: true + } + presence { + min_fraction: 1.0 + } + shape { + dim { + size: 1 + } + } +} diff --git a/tfx_addons/predictions_to_bigquery/utils.py b/tfx_addons/predictions_to_bigquery/utils.py index ee79b126..b0cfe635 100644 --- a/tfx_addons/predictions_to_bigquery/utils.py +++ b/tfx_addons/predictions_to_bigquery/utils.py @@ -14,41 +14,46 @@ # ============================================================================== # This code was originally written by Hannes Hapke (Digits Financial Inc.) # on Feb. 6, 2023. -""" -Util functions for the Digits Prediction-to-BigQuery component. -""" +"""Schema parsing and conversion routines.""" -import glob -from typing import Any, Dict, List +# TODO(cezequiel): Rename file to schema_utils.py + +import os +import re +from typing import Any, Dict, List, Optional, Union -import numpy as np import tensorflow as tf import tensorflow_transform as tft -from absl import logging from google.protobuf import text_format from tensorflow.python.lib.io import file_io from tensorflow_metadata.proto.v0 import schema_pb2 +from tensorflow_serving.apis import prediction_log_pb2 +from tfx.types import Artifact, artifact_utils +FeatureSpec = Dict[str, Union[tf.io.FixedLenFeature, tf.io.VarLenFeature]] +BigQuerySchema = Dict[str, Any] -def load_schema(input_path: str) -> Dict: - """ - Loads a TFX schema from a file and returns schema object. +_SCHEMA_FILE_NAME = "schema.pbtxt" +_REGEX_CHARS_TO_REPLACE = re.compile(r'[^a-zA-Z0-9_]') - Args: - input_path: Path to the file containing the schema. - Returns: - A schema object. - """ +def _get_feature_spec_from_schema_file(input_path: str) -> FeatureSpec: + """Loads a TFX schema from a file and parses it into a TF feature spec. + + Args: + input_path: Path to the `_SCHEMA_FILE_NAME` file. + Returns: + A `FeatureSpec` object. + """ schema = schema_pb2.Schema() schema_text = file_io.read_file_to_string(input_path) text_format.Parse(schema_text, schema) - return tft.tf_metadata.schema_utils.schema_as_feature_spec( - schema).feature_spec + return ( + tft.tf_metadata.schema_utils.schema_as_feature_spec(schema).feature_spec) -def _get_compress_type(file_path): +def _get_compress_type(file_path: str) -> Optional[str]: magic_bytes = { b'x\x01': 'ZLIB', b'x^': 'ZLIB', @@ -57,7 +62,9 @@ def _get_compress_type(file_path): b'\x1f\x8b': 'GZIP' } - two_bytes = open(file_path, 'rb').read(2) + with tf.io.gfile.GFile(file_path, 'rb') as input_file: + two_bytes = input_file.read(2) + return magic_bytes.get(two_bytes) @@ -83,86 +90,69 @@ def _get_feature_type(feature=None, type_=None): return None -def parse_schema(prediction_log_path: str, - compression_type: str = 'auto') -> Dict: - """Parses feature schema from predictions.""" - - features = {} +def _get_feature_spec_from_prediction_results( + prediction_log_path: str) -> FeatureSpec: + """Parses a TensorFlow feature spec from BulkInferrer prediction results. - file_paths = glob.glob(prediction_log_path) - if compression_type == 'auto': - compression_type = _get_compress_type(file_paths[0]) + Args: + prediction_log_path: Path containing BulkInferrer prediction results. - dataset = tf.data.TFRecordDataset(file_paths, + Returns: + A `FeatureSpec` object. + """ + filepath = tf.io.gfile.glob(prediction_log_path)[0] + compression_type = _get_compress_type(filepath) + dataset = tf.data.TFRecordDataset([filepath], compression_type=compression_type) - serialized = next(iter(dataset.map(lambda serialized: serialized))) - seq_ex = tf.train.SequenceExample.FromString(serialized.numpy()) - - if seq_ex.feature_lists.feature_list: - raise NotImplementedError("FeatureLists aren't supported at the moment.") + for bytes_record in dataset.take(1): + prediction_log = prediction_log_pb2.PredictionLog.FromString( + bytes_record.numpy()) - for key, feature in seq_ex.context.feature.items(): - features[key] = tf.io.FixedLenFeature((), - _get_feature_type(feature=feature)) - return features + example_bytes = ( + prediction_log.predict_log.request.inputs['examples'].string_val[0]) + example = tf.train.Example.FromString(example_bytes) + features = {} + for name, feature_proto in example.features.feature.items(): + feature_dtype = _get_feature_type(feature=feature_proto) + feature = tf.io.VarLenFeature(dtype=feature_dtype) + features[name] = feature -def convert_python_numpy_to_bq_type(python_type: Any) -> str: - """ - Converts a python type to a BigQuery type. + return features - Args: - python_type: A python type. - Returns: - A BigQuery type. - """ - if isinstance(python_type, (int, np.int64)): - return "INTEGER" - elif isinstance(python_type, (float, np.float32)): - return "FLOAT" - elif isinstance(python_type, (str, bytes)): - return "STRING" - elif isinstance(python_type, (bool, np.bool)): - return "BOOLEAN" - else: - raise ValueError("Unsupported type: {python_type}") +def get_feature_spec( + schema: Optional[List[Artifact]] = None, + tft_output: Optional[tft.TFTransformOutput] = None, + prediction_log_path: Optional[str] = None, +) -> Dict[str, Any]: + """Returns a TensorFlow feature spec representing the input data schema. + Specify one of `schema`, `tft_output`, `prediction_log_path` as the source + for the data schema. -def convert_single_value_to_native_py_value(tensor: Any) -> str: + Args: + schema: Artifact containing the URI to a `_SCHEMA_FILENAME` file. + tft_output: TensorFlow Transform output path. + prediction_log_path: Path to a TFRecord file containing inference results. """ - Converts a Python value to a native Python value. + if schema: # note: schema can be an empty list + schema_uri = artifact_utils.get_single_uri(schema) + schema_file = os.path.join(schema_uri, _SCHEMA_FILE_NAME) + return _get_feature_spec_from_schema_file(schema_file) - Args: - value: A value. + if tft_output is not None: + return tft_output.raw_feature_spec() - Returns: - Value casted to native Python type. - """ + if prediction_log_path is None: + raise ValueError( + 'Specify one of `schema`, `tft_output` or `prediction_log_path`.') - if isinstance(tensor, tf.sparse.SparseTensor): - value = tensor.values.numpy()[0] - logging.debug(f"sparse value: {value}") - else: - value = tensor.numpy()[0] - logging.debug(f"dense value: {value}") - - if isinstance(value, (int, np.int64, np.int32)): - return int(value) - elif isinstance(value, (float, np.float32, np.float64)): - return float(value) - elif isinstance(value, str): - return value - elif isinstance(value, bytes): - return value.decode("utf-8") - elif isinstance(value, (bool, np.bool)): - return bool(value) - else: - raise ValueError(f"Unsupported value type: {value} of type {type(value)}") + return _get_feature_spec_from_prediction_results(prediction_log_path) -def convert_tensorflow_dtype_to_bq_type(tf_dtype: tf.dtypes.DType) -> str: +def _convert_tensorflow_dtype_to_bq_type(tf_dtype: tf.dtypes.DType) -> str: """ Converts a tensorflow dtype to a BigQuery type string. @@ -184,64 +174,103 @@ def convert_tensorflow_dtype_to_bq_type(tf_dtype: tf.dtypes.DType) -> str: raise ValueError(f"Unsupported type: {tf_dtype}") -def feature_to_bq_schema(features: Dict[str, Any], - required: bool = True) -> List[Dict]: - """ - Convert a list of features to a list of BigQuery schema fields. +def get_bq_field_name_from_key(key: str) -> str: + field_name = _REGEX_CHARS_TO_REPLACE.sub('_', key) + return re.sub('_+', '_', field_name).strip('_') - Args: - features: A list of features. - required: Whether the field is required. - Returns: - A list of BigQuery schema fields. - """ +def _feature_spec_to_bq_schema_fields(feature_spec: FeatureSpec, + required: bool = True) -> List[Dict]: + """Convert a TensorFlow feature spec to a list of BigQuery schema fields. + + Args: + feature_spec: TensorFlow feature spec. + required: Whether the field is required. + + Returns: + A list of BigQuery schema fields. + """ return [{ - "name": feature_name, - "type": convert_tensorflow_dtype_to_bq_type(feature_def.dtype), + "name": get_bq_field_name_from_key(feature_name), + "type": _convert_tensorflow_dtype_to_bq_type(feature_def.dtype), "mode": "REQUIRED" if required else "NULLABLE", - } for feature_name, feature_def in features.items()] + } for feature_name, feature_def in feature_spec.items()] -def create_annotation_fields( - label_field_name: str = "category_label", - score_field_name: str = "score", +def _create_annotation_fields( + *, required: bool = True, + add_label_field: bool = False, add_datetime_field: bool = True, ) -> List[Dict]: - """ - Create a list of BigQuery schema fields for the annotation fields. + """Creates annotation fields in BigQuery schema format. - Args: - label_field_name: The name of the label field. - score_field_name: The name of the score field. - required: Whether the fields are required. - add_datetime_field: Whether to add a datetime field. + This function creates the following fields: score, category_label, and + datetime, where the last two are optional. - Returns: - A list of BigQuery schema fields. - """ + Args: + required: Whether the field is required or not. + add_label_field: If true, add a field representing the label of the + model prediction input. + add_datetime_field: Whether to add a datetime field representing the + data creation timestamp. - label_field = { - "name": label_field_name, - "type": "STRING", - "mode": "REQUIRED" if required else "NULLABLE", - } + Returns: + A list of the BigQuery schema fields. + """ + + fields = [] + if add_label_field: + label_field = { + 'name': 'category_label', + 'type': 'STRING', + 'mode': 'REQUIRED' if required else 'NULLABLE', + } + fields.append(label_field) score_field = { - "name": score_field_name, - "type": "FLOAT", - "mode": "REQUIRED" if required else "NULLABLE", + 'name': 'score', + 'type': 'FLOAT', + 'mode': 'REQUIRED' if required else 'NULLABLE', } - - fields = [label_field, score_field] + fields.append(score_field) if add_datetime_field: datetime_field = { - "name": "datetime", - "type": "TIMESTAMP", - "mode": "REQUIRED" if required else "NULLABLE", + 'name': 'datetime', + 'type': 'TIMESTAMP', + 'mode': 'REQUIRED' if required else 'NULLABLE', } fields.append(datetime_field) return fields + + +def feature_spec_to_bq_schema( + feature_spec: FeatureSpec, + required: bool = False, + add_label_field: bool = False, + add_datetime_field: bool = True, +) -> BigQuerySchema: + """Converts a TensorFlow feature spec into a BigQuery schema. + + Args: + feature_spec: TensorFlow feature spec. + required: If True, mark BigQuery fields as required (i.e. not nullable). + add_label_field: If true, add a field representing the label of the + model prediction input. + add_datetime_field: Whether to add a datetime field representing the + data creation timestamp. + + Returns: + A `BigQuerySchema` object. + """ + bq_schema_fields = _feature_spec_to_bq_schema_fields(feature_spec, + required=required) + bq_schema_fields.extend( + _create_annotation_fields( + required=required, + add_label_field=add_label_field, + add_datetime_field=add_datetime_field, + )) + return {"fields": bq_schema_fields} diff --git a/tfx_addons/predictions_to_bigquery/utils_test.py b/tfx_addons/predictions_to_bigquery/utils_test.py new file mode 100644 index 00000000..ae9d7fc1 --- /dev/null +++ b/tfx_addons/predictions_to_bigquery/utils_test.py @@ -0,0 +1,180 @@ +# Copyright 2023 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for utils.py""" + +import pathlib +from unittest import mock + +import tensorflow as tf +import tensorflow_transform as tft +from absl.testing import absltest, parameterized +from ml_metadata.proto import metadata_store_pb2 +from tfx import types + +from tfx_addons.predictions_to_bigquery import utils + + +def _make_artifact(uri) -> types.Artifact: + artifact = types.Artifact(metadata_store_pb2.ArtifactType()) + artifact.uri = uri + return artifact + + +# pylint: disable=protected-access +class UtilsTest(parameterized.TestCase): + """Tests for utils module functions.""" + def test_get_features_from_prediction_results(self): + test_data_dir = pathlib.Path( + 'tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output') + prediction_log_path = (test_data_dir / + 'BulkInferrer/inference_result/7/*.gz') + output = utils._get_feature_spec_from_prediction_results( + str(prediction_log_path)) + self.assertIn('Culmen Depth (mm)', output) + self.assertEqual(tf.float32, output['Culmen Depth (mm)'].dtype) + + @parameterized.named_parameters([ + ('error_no_inputs', False, False, False), + ('schema', True, False, False), + ('tft_output', False, True, False), + ('prediction_log_path', False, False, True), + ]) + def test_get_feature_spec(self, has_schema, has_tft_output, + has_prediction_log_path): + mock_load_schema = self.enter_context( + mock.patch.object(utils, + '_get_feature_spec_from_schema_file', + autospec=True, + return_value=has_schema)) + mock_raw_feature_spec = self.enter_context( + mock.patch.object(tft.TFTransformOutput, + 'raw_feature_spec', + autospec=True)) + mock_parse_features_from_prediction_results = self.enter_context( + mock.patch.object(utils, + '_get_feature_spec_from_prediction_results', + autospec=True, + return_value=has_schema)) + + if (has_schema is None and has_tft_output is None + and has_prediction_log_path is None): + with self.assertRaises(ValueError): + _ = utils.get_feature_spec(has_schema, has_tft_output, + has_prediction_log_path) + return + + if has_schema: + schema = [_make_artifact('schema_uri')] + _ = utils.get_feature_spec(schema, None, None) + mock_load_schema.assert_called_once() + + elif has_tft_output: + tft_output = tft.TFTransformOutput('uri') + _ = utils.get_feature_spec(None, tft_output, None) + mock_raw_feature_spec.assert_called_once() + + elif has_prediction_log_path: + prediction_log_path = 'path' + _ = utils.get_feature_spec(None, None, prediction_log_path) + mock_parse_features_from_prediction_results.assert_called_once() + + else: + with self.assertRaises(ValueError): + _ = utils.get_feature_spec(None, None, None) + + @parameterized.named_parameters([ + ('no_label_field', False), + ('with_label_field', True), + ]) + def test_feature_spec_to_bq_schema(self, add_label_field): + feature_spec: utils.FeatureSpec = { + 'Some Feature': tf.io.FixedLenFeature([], dtype=tf.int64), + } + required = True + if add_label_field: + expected = { + 'fields': [ + { + 'name': 'Some_Feature', + 'type': 'INTEGER', + 'mode': 'REQUIRED', + }, + { + 'name': 'category_label', + 'type': 'STRING', + 'mode': 'REQUIRED', + }, + { + 'name': 'score', + 'type': 'FLOAT', + 'mode': 'REQUIRED', + }, + { + 'name': 'datetime', + 'type': 'TIMESTAMP', + 'mode': 'REQUIRED', + }, + ] + } + else: + expected = { + 'fields': [ + { + 'name': 'Some_Feature', + 'type': 'INTEGER', + 'mode': 'REQUIRED', + }, + { + 'name': 'score', + 'type': 'FLOAT', + 'mode': 'REQUIRED', + }, + { + 'name': 'datetime', + 'type': 'TIMESTAMP', + 'mode': 'REQUIRED', + }, + ] + } + + output = utils.feature_spec_to_bq_schema(feature_spec, + required, + add_label_field=add_label_field) + + self.assertEqual(expected, output) + + @parameterized.named_parameters([ + ('none', None, None, None), + ('int', None, int, tf.int64), + ]) + def test_get_feature_type(self, feature, type_, expected): + output = utils._get_feature_type(feature=feature, type_=type_) + self.assertEqual(expected, output) + + @parameterized.named_parameters([ + ('unsupported', None, None), + ('boolean', tf.bool, 'BOOLEAN'), + ]) + def test_convert_tensorflow_dtype_to_bq_type(self, tf_dtype, expected): + if tf_dtype is not None: + output = utils._convert_tensorflow_dtype_to_bq_type(tf_dtype) + self.assertEqual(expected, output) + else: + with self.assertRaises(ValueError): + _ = utils._convert_tensorflow_dtype_to_bq_type(tf_dtype) + + +if __name__ == '__main__': + absltest.main()