diff --git a/tfx_addons/predictions_to_bigquery/executor.py b/tfx_addons/predictions_to_bigquery/executor.py index 763e226f..a271d733 100644 --- a/tfx_addons/predictions_to_bigquery/executor.py +++ b/tfx_addons/predictions_to_bigquery/executor.py @@ -19,8 +19,7 @@ import datetime import os import re -from collections.abc import Mapping, Sequence -from typing import Any, List, Optional, Tuple, Union +from typing import Any, Optional, Union import apache_beam as beam import numpy as np @@ -31,7 +30,7 @@ from tensorflow_serving.apis import prediction_log_pb2 from tfx import types from tfx.dsl.components.base import base_beam_executor -from tfx.types import artifact_utils +from tfx.types import Artifact, artifact_utils # TODO(cezequiel): Move relevant functions in utils module here. from tfx_addons.predictions_to_bigquery import utils @@ -41,32 +40,50 @@ _DEFAULT_TIMESTRING_FORMAT = '%Y%m%d_%H%M%S' _REQUIRED_EXEC_PROPERTIES = ( 'bq_table_name', - 'bq_dataset', 'filter_threshold', - 'gcp_project', 'gcs_temp_dir', - 'vocab_label_file', ) _REGEX_CHARS_TO_REPLACE = re.compile(r'[^a-zA-Z0-9_]') +_REGEX_BQ_TABLE_NAME = re.compile(r'^[\w-]*:?[\w_]+\.[\w_]+$') -def _check_exec_properties(exec_properties: Mapping[str, Any]) -> None: +def _check_exec_properties(exec_properties: dict[str, Any]) -> None: for key in _REQUIRED_EXEC_PROPERTIES: if exec_properties[key] is None: raise ValueError(f'{key} must be set in exec_properties') -def _get_labels(transform_output_uri: str, vocab_file: str) -> Sequence[str]: - tf_transform_output = tft.TFTransformOutput(transform_output_uri) - tft_vocab = tf_transform_output.vocabulary_by_name(vocab_filename=vocab_file) +def _get_prediction_log_path(inference_results: list[Artifact]) -> str: + inference_results_uri = artifact_utils.get_single_uri(inference_results) + return f'{inference_results_uri}/*.gz' + + +def _get_tft_output( + transform_graph: Optional[list[Artifact]] = None +) -> Optional[tft.TFTransformOutput]: + if transform_graph is None: + return None + + transform_graph_uri = artifact_utils.get_single_uri(transform_graph) + return tft.TFTransformOutput(transform_graph_uri) + + +def _get_labels(tft_output: tft.TFTransformOutput, + vocab_file: str) -> list[str]: + tft_vocab = tft_output.vocabulary_by_name(vocab_filename=vocab_file) return [label.decode() for label in tft_vocab] -def _get_bq_table_name( - basename: str, - timestamp: Optional[datetime.datetime] = None, - timestring_format: Optional[str] = None, -) -> str: +def _check_bq_table_name(bq_table_name: str) -> None: + if _REGEX_BQ_TABLE_NAME.match(bq_table_name) is None: + raise ValueError('Invalid BigQuery table name.' + ' Specify in either `PROJECT:DATASET.TABLE` or' + ' `DATASET.TABLE` format.') + + +def _add_bq_table_name_suffix(basename: str, + timestamp: Optional[datetime.datetime] = None, + timestring_format: Optional[str] = None) -> str: if timestamp is not None: timestring_format = timestring_format or _DEFAULT_TIMESTRING_FORMAT return basename + '_' + timestamp.strftime(timestring_format) @@ -74,37 +91,67 @@ def _get_bq_table_name( def _get_additional_bq_parameters( - expiration_days: Optional[int] = None, - table_partitioning: bool = False, -) -> Mapping[str, Any]: + table_expiration_days: Optional[int] = None, + table_partitioning: Optional[bool] = False, +) -> dict[str, Any]: output = {} if table_partitioning: time_partitioning = {'type': 'DAY'} logging.info('BigQuery table time partitioning set to DAY') - if expiration_days: - expiration_time_delta = datetime.timedelta(days=expiration_days) + if table_expiration_days: + expiration_time_delta = datetime.timedelta(days=table_expiration_days) expiration_milliseconds = expiration_time_delta.total_seconds() * 1000 logging.info( - f'BigQuery table partition expiration time set to {expiration_days}' - ' days') + f'BigQuery table expiration set to {table_expiration_days} days.') time_partitioning['expirationMs'] = expiration_milliseconds output['timePartitioning'] = time_partitioning return output -def _get_features( - *, - schema_uri: Optional[str] = None, +# TODO(cezequiel): Move to a separate module with called functions. +# pylint: disable=protected-access +def _parse_features_from_prediction_results( + prediction_log_path: str) -> dict[str, Any]: + filepath = tf.io.gfile.glob(prediction_log_path)[0] + compression_type = utils._get_compress_type(filepath) + dataset = tf.data.TFRecordDataset([filepath], + compression_type=compression_type) + + for bytes_record in dataset.take(1): + prediction_log = prediction_log_pb2.PredictionLog.FromString( + bytes_record.numpy()) + + example_bytes = ( + prediction_log.predict_log.request.inputs['examples'].string_val[0]) + example = tf.train.Example.FromString(example_bytes) + features = {} + + for name, feature_proto in example.features.feature.items(): + feature_dtype = utils._get_feature_type(feature=feature_proto) + feature = tf.io.VarLenFeature(dtype=feature_dtype) + features[name] = feature + + return features + + +def _get_schema_features( + schema: Optional[list[Artifact]] = None, + tft_output: Optional[tft.TFTransformOutput] = None, prediction_log_path: Optional[str] = None, -) -> Mapping[str, Any]: - if schema_uri: +) -> dict[str, Any]: + if schema is not None: + schema_uri = artifact_utils.get_single_uri(schema) schema_file = os.path.join(schema_uri, _SCHEMA_FILE_NAME) return utils.load_schema(schema_file) - if not prediction_log_path: - raise ValueError('Specify one of `schema_uri` or `prediction_log_path`.') + if tft_output is not None: + return tft_output.raw_feature_spec() - return utils.parse_schema(prediction_log_path) + if prediction_log_path is None: + raise ValueError( + 'Specify one of `schema`, `tft_output` or `prediction_log_path`.') + + return _parse_features_from_prediction_results(prediction_log_path) def _get_bq_field_name_from_key(key: str) -> str: @@ -112,8 +159,7 @@ def _get_bq_field_name_from_key(key: str) -> str: return re.sub('_+', '_', field_name).strip('_') -def _features_to_bq_schema(features: Mapping[str, Any], - required: bool = False): +def _features_to_bq_schema(features: dict[str, Any], required: bool = False): bq_schema_fields_ = utils.feature_to_bq_schema(features, required=required) bq_schema_fields = [] for field in bq_schema_fields_: @@ -128,8 +174,7 @@ def _features_to_bq_schema(features: Mapping[str, Any], def _tensor_to_native_python_value( - tensor: Union[tf.Tensor, tf.sparse.SparseTensor] -) -> Optional[Union[int, float, str]]: + tensor: Union[tf.Tensor, tf.sparse.SparseTensor]) -> Optional[Any]: """Converts a TF Tensor to a native Python value.""" if isinstance(tensor, tf.sparse.SparseTensor): values = tensor.values.numpy() @@ -139,7 +184,7 @@ def _tensor_to_native_python_value( return None values = np.squeeze(values) # Removes extra dimension, e.g. shape (n, 1). values = values.item() # Converts to native Python type - if isinstance(values, Sequence) and isinstance(values[0], bytes): + if isinstance(values, list) and isinstance(values[0], bytes): return [v.decode('utf-8') for v in values] if isinstance(values, bytes): return values.decode('utf-8') @@ -147,34 +192,35 @@ def _tensor_to_native_python_value( @beam.typehints.with_input_types(str) -@beam.typehints.with_output_types(beam.typehints.Iterable[Tuple[str, str, +@beam.typehints.with_output_types(beam.typehints.Iterable[tuple[str, str, Any]]) class FilterPredictionToDictFn(beam.DoFn): """Converts a PredictionLog proto to a dict.""" def __init__( self, - labels: List, - features: Any, + features: dict[str, tf.io.FixedLenFeature], timestamp: datetime.datetime, filter_threshold: float, + labels: Optional[list[str]] = None, score_multiplier: float = 1., ): super().__init__() - self._labels = labels self._features = features + self._timestamp = timestamp self._filter_threshold = filter_threshold + self._labels = labels self._score_multiplier = score_multiplier - self._timestamp = timestamp - def _parse_prediction(self, predictions: npt.ArrayLike): + def _parse_prediction( + self, predictions: npt.ArrayLike) -> tuple[Optional[str], float]: prediction_id = np.argmax(predictions) logging.debug("Prediction id: %s", prediction_id) logging.debug("Predictions: %s", predictions) - label = self._labels[prediction_id] + label = self._labels[prediction_id] if self._labels is not None else None score = predictions[0][prediction_id] return label, score - def _parse_example(self, serialized: bytes) -> Mapping[str, Any]: + def _parse_example(self, serialized: bytes) -> dict[str, Any]: parsed_example = tf.io.parse_example(serialized, self._features) output = {} for key, tensor in parsed_example.items(): @@ -191,17 +237,18 @@ def process(self, element, *args, **kwargs): # pylint: disable=missing-function del args, kwargs # unused parsed_prediction_scores = tf.make_ndarray( - element.predict_log.response.outputs["outputs"]) + element.predict_log.response.outputs['outputs']) label, score = self._parse_prediction(parsed_prediction_scores) if score >= self._filter_threshold: output = { - "category_label": label, # Workaround to issue with the score value having additional non-zero values # in higher decimal places. # e.g. 0.8 -> 0.800000011920929 - "score": round(score * self._score_multiplier, _DECIMAL_PLACES), - "datetime": self._timestamp, + 'score': round(score * self._score_multiplier, _DECIMAL_PLACES), + 'datetime': self._timestamp, } + if label is not None: + output['category_label'] = label output.update( self._parse_example( element.predict_log.request.inputs['examples'].string_val)) @@ -212,9 +259,9 @@ class Executor(base_beam_executor.BaseBeamExecutor): """Implements predictions-to-bigquery component logic.""" def Do( self, - input_dict: Mapping[str, List[types.Artifact]], - output_dict: Mapping[str, List[types.Artifact]], - exec_properties: Mapping[str, Any], + input_dict: dict[str, list[types.Artifact]], + output_dict: dict[str, list[types.Artifact]], + exec_properties: dict[str, Any], ) -> None: """Do function for predictions_to_bq executor.""" @@ -223,36 +270,41 @@ def Do( # Check required keys set in exec_properties _check_exec_properties(exec_properties) - # get labels from tf transform generated vocab file - labels = _get_labels( - artifact_utils.get_single_uri(input_dict['transform_graph']), - exec_properties['vocab_label_file'], - ) - logging.info(f"found the following labels from TFT vocab: {labels}") - - # set BigQuery table name and timestamp suffix if specified. - bq_table_name = _get_bq_table_name(exec_properties['bq_table_name'], - timestamp, - exec_properties['table_suffix']) - - # set prediction result file path and decoder - inference_results_uri = artifact_utils.get_single_uri( - input_dict["inference_results"]) - prediction_log_path = f"{inference_results_uri}/*.gz" + # Get prediction log file path and decoder + prediction_log_path = _get_prediction_log_path( + input_dict['inference_results']) prediction_log_decoder = beam.coders.ProtoCoder( prediction_log_pb2.PredictionLog) + tft_output = _get_tft_output(input_dict.get('transform_graph')) + # get schema features - features = _get_features(schema_uri=artifact_utils.get_single_uri( - input_dict["schema"]), - prediction_log_path=prediction_log_path) + features = _get_schema_features( + schema=input_dict.get('schema'), + tft_output=tft_output, + prediction_log_path=prediction_log_path, + ) + + # get label names from TFTransformOutput object, if applicable + if tft_output is not None and 'vocab_label_file' in exec_properties: + labels = _get_labels(tft_output, exec_properties['vocab_label_file']) + logging.info(f'Found the following labels from TFT vocab: {labels}.') + else: + labels = None + logging.info('No TFTransform output given; no labels parsed.') + + # set BigQuery table name and timestamp suffix if specified. + _check_bq_table_name(exec_properties['bq_table_name']) + bq_table_name = _add_bq_table_name_suffix( + exec_properties['bq_table_name'], timestamp, + exec_properties['table_time_suffix']) # generate bigquery schema from tf transform features bq_schema = _features_to_bq_schema(features) logging.info(f'generated bq_schema: {bq_schema}.') additional_bq_parameters = _get_additional_bq_parameters( - exec_properties.get('expiration_time_delta'), + exec_properties.get('table_expiration_days'), exec_properties.get('table_partitioning')) # run the Beam pipeline to write the inference data to bigquery @@ -262,14 +314,12 @@ def Do( prediction_log_path, coder=prediction_log_decoder) | 'Filter and Convert to Dict' >> beam.ParDo( FilterPredictionToDictFn( - labels=labels, features=features, timestamp=timestamp, - filter_threshold=exec_properties['filter_threshold'])) - | 'Write Dict to BQ' >> beam.io.gcp.bigquery.WriteToBigQuery( + filter_threshold=exec_properties['filter_threshold'], + labels=labels)) + | 'Write Dict to BQ' >> beam.io.WriteToBigQuery( table=bq_table_name, - dataset=exec_properties['bq_dataset'], - project=exec_properties['gcp_project'], schema=bq_schema, additional_bq_parameters=additional_bq_parameters, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, diff --git a/tfx_addons/predictions_to_bigquery/executor_test.py b/tfx_addons/predictions_to_bigquery/executor_test.py index 38447fb4..d1f01ebc 100644 --- a/tfx_addons/predictions_to_bigquery/executor_test.py +++ b/tfx_addons/predictions_to_bigquery/executor_test.py @@ -15,7 +15,8 @@ """Tests for executor.py.""" import datetime -from typing import Mapping, Sequence, Union +import pathlib +from typing import Union from unittest import mock import apache_beam as beam @@ -35,7 +36,7 @@ def _create_tf_example( - features: Mapping[str, Union[bytes, float, int]]) -> tf.train.Example: + features: dict[str, Union[bytes, float, int]]) -> tf.train.Example: tf_features = {} for key, value in features.items(): if isinstance(value, bytes): @@ -59,8 +60,8 @@ def _create_model_spec() -> model_pb2.ModelSpec: def _create_predict_request( - features: Mapping[str, Union[bytes, float, int]] -) -> predict_pb2.PredictRequest: + features: dict[str, Union[bytes, float, + int]]) -> predict_pb2.PredictRequest: tf_example = _create_tf_example(features) request_tensor_proto = tf.make_tensor_proto( values=tf_example.SerializeToString(), dtype=tf.string, shape=(1, )) @@ -73,7 +74,7 @@ def _create_predict_request( def _create_predict_response( - values: Sequence[float]) -> predict_pb2.PredictResponse: + values: list[float]) -> predict_pb2.PredictResponse: response_tensor_proto = tf.make_tensor_proto(values=values, dtype=tf.float32, shape=(1, len(values))) @@ -103,10 +104,10 @@ def setUp(self): self.filter_threshold = 0.5 self.dofn = executor.FilterPredictionToDictFn( - labels=self.labels, features=self.features, timestamp=self.timestamp, filter_threshold=self.filter_threshold, + labels=self.labels, ) def test_process(self): @@ -138,6 +139,30 @@ def test_process_below_threshold(self): with self.assertRaises(StopIteration): _ = next(self.dofn.process(element)) + def test_process_no_labels(self): + features = { + 'bytes_feature': tf.io.FixedLenFeature([], dtype=tf.string), + } + dofn = executor.FilterPredictionToDictFn( + features=features, + timestamp=self.timestamp, + filter_threshold=self.filter_threshold, + labels=None, + ) + element = _create_prediction_log( + request=_create_predict_request(features={ + 'bytes_feature': b'a', + }), + response=_create_predict_response([0.9]), + ) + expected = { + 'bytes_feature': 'a', + 'score': 0.9, + 'datetime': mock.ANY, + } + output = next(dofn.process(element)) + self.assertEqual(expected, output) + def _make_artifact(uri) -> types.Artifact: artifact = types.Artifact(metadata_store_pb2.ArtifactType()) @@ -146,7 +171,7 @@ def _make_artifact(uri) -> types.Artifact: def _make_artifact_mapping( - data_dict: Mapping[str, str]) -> Mapping[str, Sequence[types.Artifact]]: + data_dict: dict[str, str]) -> dict[str, list[types.Artifact]]: return {k: [_make_artifact(v)] for k, v in data_dict.items()} @@ -168,23 +193,31 @@ def setUp(self): 'gcs_temp_dir': 'gs://bucket/temp-dir', 'expiration_time_delta': 1, 'filter_threshold': 0.5, - 'table_suffix': '%Y%m%d', + 'table_time_suffix': '%Y%m%d', 'table_partitioning': True, 'vocab_label_file': 'vocab_file', } - self.executor = executor.Executor() - + self.enter_context( + mock.patch.object(executor, '_get_prediction_log_path', autospec=True)) + self.enter_context( + mock.patch.object(executor, + '_get_tft_output', + autospec=True, + return_value=object())) + self.enter_context( + mock.patch.object(executor, '_get_schema_features', autospec=True)) self.enter_context( mock.patch.object(executor, '_get_labels', autospec=True)) self.enter_context( - mock.patch.object(executor, '_get_bq_table_name', autospec=True)) + mock.patch.object(executor, '_check_bq_table_name', autospec=True)) + self.enter_context( + mock.patch.object(executor, '_add_bq_table_name_suffix', + autospec=True)) self.enter_context( mock.patch.object(executor, '_get_additional_bq_parameters', autospec=True)) - self.enter_context( - mock.patch.object(executor, '_get_features', autospec=True)) self.enter_context( mock.patch.object(executor, '_features_to_bq_schema', autospec=True)) @@ -193,15 +226,15 @@ def setUp(self): self.mock_pardo = self.enter_context( mock.patch.object(beam, 'ParDo', autospec=True)) self.mock_write_to_bigquery = self.enter_context( - mock.patch.object(beam.io.gcp.bigquery, - 'WriteToBigQuery', - autospec=True)) + mock.patch.object(beam.io, 'WriteToBigQuery', autospec=True)) self.enter_context( mock.patch.object(types.Artifact, 'set_string_custom_property', autospec=True)) + self.executor = executor.Executor() + def test_Do(self): self.executor.Do(self.input_dict, self.output_dict, self.exec_properties) @@ -215,30 +248,67 @@ def test_Do(self): class ExecutorModuleTest(parameterized.TestCase): """Tests for executor module-level functions.""" + def test_get_prediction_log_path(self): + inference_results = [_make_artifact('inference_results')] + expected = 'inference_results/*.gz' + output = executor._get_prediction_log_path(inference_results) + self.assertEqual(expected, output) + + @parameterized.named_parameters([ + ('no_inference_results', False), + ('inference_results', True), + ]) + def test_get_tft_output(self, has_transform_graph): + if has_transform_graph: + transform_graph = [_make_artifact('transform_graph')] + mock_tftransform_output = self.enter_context( + mock.patch.object(tft, 'TFTransformOutput', autospec=True)) + + _ = executor._get_tft_output(transform_graph) + + mock_tftransform_output.assert_called_once() + + else: + output = executor._get_tft_output(None) + self.assertIsNone(output) + def test_get_labels(self): mock_tftransform_output = self.enter_context( mock.patch.object(tft, 'TFTransformOutput', autospec=True)) mock_vocabulary_by_name = ( mock_tftransform_output.return_value.vocabulary_by_name) mock_vocabulary_by_name.return_value = [b'a', b'b'] - - transform_output_uri = '/path/to/transform_output' vocab_file = 'vocab' + tft_output = tft.TFTransformOutput('uri') - output = executor._get_labels(transform_output_uri, vocab_file) + output = executor._get_labels(tft_output, vocab_file) self.assertEqual(['a', 'b'], output) - mock_tftransform_output.assert_called_once_with(transform_output_uri) mock_vocabulary_by_name.assert_called_once_with(vocab_file) + @parameterized.named_parameters([ + ('project_dataset_table', 'gcp_project:bq_dataset.bq_table_name', True), + ('dataset_table', 'bq_dataset.bq_table_name', True), + ('table_only', 'bq_table_name', False) + ]) + def test_check_bq_table_name(self, bq_table_name, is_ok): + if is_ok: + try: + executor._check_bq_table_name(bq_table_name) + except ValueError: + self.fail('ValueError was raised unexpectedly.') + else: + with self.assertRaises(ValueError): + executor._check_bq_table_name(bq_table_name) + @parameterized.named_parameters([('no_timestamp', None, None), ('timestamp_no_format', _TIMESTAMP, None), ('timestamp_format', _TIMESTAMP, '%Y%m%d')]) - def test_get_bq_table_name(self, timestamp, timestring_format): + def test_add_bq_table_name_suffix(self, timestamp, timestring_format): basename = 'bq_table' - output = executor._get_bq_table_name(basename, timestamp, - timestring_format) + output = executor._add_bq_table_name_suffix(basename, timestamp, + timestring_format) if timestamp is None: expected = basename @@ -258,8 +328,8 @@ def test_get_bq_table_name(self, timestamp, timestring_format): ('table_partitioning_only', None, True), ('expiration_table_partitioning', 2, True), ]) - def test_get_additiona_bq_parameters(self, expiration_days, - table_partitioning): + def test_get_additional_bq_parameters(self, expiration_days, + table_partitioning): output = executor._get_additional_bq_parameters(expiration_days, table_partitioning) @@ -278,44 +348,60 @@ def test_get_additiona_bq_parameters(self, expiration_days, } self.assertEqual(expected, output) + def test_parse_features_from_prediction_results(self): + test_data_dir = pathlib.Path( + 'tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output') + prediction_log_path = (test_data_dir / + 'BulkInferrer/inference_result/7/*.gz') + output = executor._parse_features_from_prediction_results( + str(prediction_log_path)) + self.assertIn('Culmen Depth (mm)', output) + self.assertEqual(tf.float32, output['Culmen Depth (mm)'].dtype) + @parameterized.named_parameters([ - ('error_no_input', None, None), - ('schema_uri_only', 'uri', None), - ('prediction_log_path', None, 'path'), - ('schema_uri_prediction_log_path', 'uri', 'path'), + ('error_no_inputs', False, False, False), + ('schema', True, False, False), + ('tft_output', False, True, False), + ('prediction_log_path', False, False, True), ]) - def test_get_features(self, schema_uri, prediction_log_path): - schema = { - 'feature': tf.io.FixedLenFeature([], dtype=tf.int64), - } + def test_get_schema_features(self, has_schema, has_tft_output, + has_prediction_log_path): mock_load_schema = self.enter_context( mock.patch.object(utils, 'load_schema', autospec=True, - return_value=schema)) - mock_parse_schema = self.enter_context( - mock.patch.object(utils, - 'parse_schema', - autopspec=True, - return_value=schema)) + return_value=has_schema)) + mock_raw_feature_spec = self.enter_context( + mock.patch.object(tft.TFTransformOutput, + 'raw_feature_spec', + autospec=True)) + mock_parse_features_from_prediction_results = self.enter_context( + mock.patch.object(executor, + '_parse_features_from_prediction_results', + autospec=True, + return_value=has_schema)) - if schema_uri is None and prediction_log_path is None: + if (has_schema is None and has_tft_output is None + and has_prediction_log_path is None): with self.assertRaises(ValueError): - _ = executor._get_features(schema_uri=schema_uri, - prediction_log_path=prediction_log_path) + _ = executor._get_schema_features(has_schema, has_tft_output, + has_prediction_log_path) + return - else: - output = executor._get_features(schema_uri=schema_uri, - prediction_log_path=prediction_log_path) + if has_schema: + schema = [_make_artifact('schema_uri')] + _ = executor._get_schema_features(schema, None, None) + mock_load_schema.assert_called_once() - if schema_uri: - mock_load_schema.assert_called_once_with(mock.ANY) - mock_parse_schema.assert_not_called() - elif prediction_log_path: - mock_load_schema.assert_not_called() - mock_parse_schema.assert_called_once_with(prediction_log_path) + elif has_tft_output: + tft_output = tft.TFTransformOutput('uri') + _ = executor._get_schema_features(None, tft_output, None) + mock_raw_feature_spec.assert_called_once() - self.assertEqual(schema, output) + else: + prediction_log_path = 'path' + _ = executor._get_schema_features(None, None, prediction_log_path) + mock_parse_features_from_prediction_results.assert_called_once() def test_features_to_bq_schema(self): mock_feature_to_bq_schema = self.enter_context( diff --git a/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz new file mode 100644 index 00000000..058b96b6 Binary files /dev/null and b/tfx_addons/predictions_to_bigquery/testdata/sample-tfx-output/BulkInferrer/inference_result/7/prediction_logs-00000-of-00001.gz differ