From a0743c67001bbdf8d16b5f8567d4f443edf68c2d Mon Sep 17 00:00:00 2001 From: Avnish327030 Date: Fri, 15 Apr 2022 13:32:06 +0530 Subject: [PATCH 1/3] added a new component --- .gitignore | 5 ++- setup.py | 33 +++++++++-------- tfx_addons/components_addons/README.md | 33 +++++++++++++++++ tfx_addons/components_addons/demo.py | 12 ++++++ .../components_addons/types/__init__.py | 0 .../types/standard_component_specs.py | 37 +++++++++++++++++++ 6 files changed, 104 insertions(+), 16 deletions(-) create mode 100644 tfx_addons/components_addons/README.md create mode 100644 tfx_addons/components_addons/demo.py create mode 100644 tfx_addons/components_addons/types/__init__.py create mode 100644 tfx_addons/components_addons/types/standard_component_specs.py diff --git a/.gitignore b/.gitignore index a847376d..3c33eb59 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,7 @@ env/* #Editor .idea/* -.vscode/* \ No newline at end of file +.vscode/* +venv/* + +tfx_addons*/ \ No newline at end of file diff --git a/setup.py b/setup.py index 1341b708..1e23d532 100644 --- a/setup.py +++ b/setup.py @@ -22,19 +22,19 @@ def get_project_version(): - # Version - extracted_version = {} - base_dir = os.path.dirname(os.path.abspath(__file__)) - with open(os.path.join(base_dir, "tfx_addons", "version.py")) as fp: - exec(fp.read(), extracted_version) # pylint: disable=exec-used + # Version + extracted_version = {} + base_dir = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(base_dir, "tfx_addons", "version.py")) as fp: + exec(fp.read(), extracted_version) # pylint: disable=exec-used - return extracted_version + return extracted_version def get_long_description(): - base_dir = os.path.dirname(os.path.abspath(__file__)) - with open(os.path.join(base_dir, "README.md")) as fp: - return fp.read() + base_dir = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(base_dir, "README.md")) as fp: + return fp.read() version = get_project_version() @@ -53,7 +53,7 @@ def get_long_description(): PKG_REQUIRES = { # Add dependencies here for your project. Avoid using install_requires. "mlmd_client": - [required_ml_pipelines_sdk_version, required_ml_metadata_version], + [required_ml_pipelines_sdk_version, required_ml_metadata_version], "schema_curation": [ required_tfx_version, ], @@ -66,7 +66,9 @@ def get_long_description(): "kfp>=1.8,<1.9", "slackclient>=2.9.0", "pydantic>=1.8.0", + ], + "components":["tfx==1.7.1"] } EXTRAS_REQUIRE = PKG_REQUIRES.copy() EXTRAS_REQUIRE["all"] = list( @@ -88,11 +90,12 @@ def get_long_description(): }, extras_require=EXTRAS_REQUIRE, tests_require=TESTS_REQUIRE, - packages=find_namespace_packages(include=[ - # Add here new library package - "tfx_addons", - ] + [f"tfx_addons.{m}.*" - for m in PKG_REQUIRES] + [f"tfx_addons.{m}" for m in PKG_REQUIRES]), + install_requires = ['tfx==1.6.1', 'apache-airflow'], + packages=find_namespace_packages(include=["tfx==1.7.1" + # Add here new library package + "tfx_addons", + ] + [f"tfx_addons.{m}.*" + for m in PKG_REQUIRES] + [f"tfx_addons.{m}" for m in PKG_REQUIRES]), classifiers=[ "Intended Audience :: Developers", "Intended Audience :: Education", diff --git a/tfx_addons/components_addons/README.md b/tfx_addons/components_addons/README.md new file mode 100644 index 00000000..eff1ee31 --- /dev/null +++ b/tfx_addons/components_addons/README.md @@ -0,0 +1,33 @@ +TFX RemoteZipCsvExampleGen component. + + The RemoteZipCsvExampleGen component takes zipped csv file from https or http url and generates train + and eval examples for downstream components. + + The RemoteZipCsvExampleGen encodes column values to tf.Example int/float/byte feature. + For the case when there's missing cells, the RemoteZipCsvExampleGen uses: + -- tf.train.Feature(`type`_list=tf.train.`type`List(value=[])), when the + `type` can be inferred. + -- tf.train.Feature() when it cannot infer the `type` from the column. + + Note that the type inferring will be per input split. If input isn't a single + split, users need to ensure the column types align in each pre-splits. + + For example, given the following csv rows of a split: + + header:A,B,C,D + row1: 1,,x,0.1 + row2: 2,,y,0.2 + row3: 3,,,0.3 + row4: + + The output example will be + example1: 1(int), empty feature(no type), x(string), 0.1(float) + example2: 2(int), empty feature(no type), x(string), 0.2(float) + example3: 3(int), empty feature(no type), empty list(string), 0.3(float) + + Note that the empty feature is `tf.train.Feature()` while empty list string + feature is `tf.train.Feature(bytes_list=tf.train.BytesList(value=[]))`. + + Component `outputs` contains: + - `examples`: Channel of type `standard_artifacts.Examples` for output train + and eval examples. diff --git a/tfx_addons/components_addons/demo.py b/tfx_addons/components_addons/demo.py new file mode 100644 index 00000000..49a01ad1 --- /dev/null +++ b/tfx_addons/components_addons/demo.py @@ -0,0 +1,12 @@ +from tfx_addons.components_addons.components import RemoteZipCsvExampleGen +import os + +# temp location to perform downloading and extraction +INPUT_BASE = os.getcwd() +# file url to download all the file +URL = "https://files.consumerfinance.gov/ccdb/complaints.csv.zip" + +remote_zip_csv_example_gen = RemoteZipCsvExampleGen( + input_base=INPUT_BASE, + zip_file_uri=URL +) diff --git a/tfx_addons/components_addons/types/__init__.py b/tfx_addons/components_addons/types/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tfx_addons/components_addons/types/standard_component_specs.py b/tfx_addons/components_addons/types/standard_component_specs.py new file mode 100644 index 00000000..044d7a17 --- /dev/null +++ b/tfx_addons/components_addons/types/standard_component_specs.py @@ -0,0 +1,37 @@ +from tfx.proto import example_gen_pb2 + +from tfx.proto import range_config_pb2 + +from tfx.types import standard_artifacts, standard_component_specs +from tfx.types.component_spec import ChannelParameter +from tfx.types.component_spec import ComponentSpec +from tfx.types.component_spec import ExecutionParameter + +REMOTE_ZIP_FILE_URI_KEY = "zip_file_uri" + + +class RemoteZipFileBasedExampleGenSpec(ComponentSpec): + """File-based ExampleGen component spec.""" + + PARAMETERS = { + standard_component_specs.INPUT_BASE_KEY: + ExecutionParameter(type=str), + REMOTE_ZIP_FILE_URI_KEY: + ExecutionParameter(type=str), + standard_component_specs.INPUT_CONFIG_KEY: + ExecutionParameter(type=example_gen_pb2.Input), + standard_component_specs.OUTPUT_CONFIG_KEY: + ExecutionParameter(type=example_gen_pb2.Output), + standard_component_specs.OUTPUT_DATA_FORMAT_KEY: + ExecutionParameter(type=int), # example_gen_pb2.PayloadFormat enum. + standard_component_specs.OUTPUT_FILE_FORMAT_KEY: + ExecutionParameter(type=int), # example_gen_pb2.FileFormat enum. + standard_component_specs.CUSTOM_CONFIG_KEY: + ExecutionParameter(type=example_gen_pb2.CustomConfig, optional=True), + standard_component_specs.RANGE_CONFIG_KEY: + ExecutionParameter(type=range_config_pb2.RangeConfig, optional=True), + } + INPUTS = {} + OUTPUTS = { + standard_component_specs.EXAMPLES_KEY: ChannelParameter(type=standard_artifacts.Examples), + } From ede9bd4069ed6dc7a5801d08cbdad054453344f6 Mon Sep 17 00:00:00 2001 From: Avnish327030 Date: Fri, 15 Apr 2022 13:34:21 +0530 Subject: [PATCH 2/3] updated readme file --- tfx_addons/components_addons/README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tfx_addons/components_addons/README.md b/tfx_addons/components_addons/README.md index eff1ee31..7a557562 100644 --- a/tfx_addons/components_addons/README.md +++ b/tfx_addons/components_addons/README.md @@ -31,3 +31,20 @@ TFX RemoteZipCsvExampleGen component. Component `outputs` contains: - `examples`: Channel of type `standard_artifacts.Examples` for output train and eval examples. + +Sample example is given below for implementation +```commandline +from tfx_addons.components_addons.components import RemoteZipCsvExampleGen +import os + +# temp location to perform downloading and extraction +INPUT_BASE = os.getcwd() +# file url to download all the file +URL = "https://files.consumerfinance.gov/ccdb/complaints.csv.zip" + +remote_zip_csv_example_gen = RemoteZipCsvExampleGen( + input_base=INPUT_BASE, + zip_file_uri=URL +) + +``` \ No newline at end of file From 5d6d38c4d6538aa97907d9e59aeb86197a5d31ef Mon Sep 17 00:00:00 2001 From: Avnish327030 Date: Fri, 15 Apr 2022 19:24:47 +0530 Subject: [PATCH 3/3] added new component --- .gitignore | 2 +- .../components_addons/components/__init__.py | 2 + .../components/example_gen/__init__.py | 0 .../components/example_gen/component.py | 102 +++++++++++++ .../remote_zip_example_gen/__init__.py | 0 .../remote_zip_example_gen/component.py | 94 ++++++++++++ .../remote_zip_example_gen/executor.py | 142 ++++++++++++++++++ 7 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 tfx_addons/components_addons/components/__init__.py create mode 100644 tfx_addons/components_addons/components/example_gen/__init__.py create mode 100644 tfx_addons/components_addons/components/example_gen/component.py create mode 100644 tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/__init__.py create mode 100644 tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/component.py create mode 100644 tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/executor.py diff --git a/.gitignore b/.gitignore index 3c33eb59..2e433af7 100644 --- a/.gitignore +++ b/.gitignore @@ -40,4 +40,4 @@ env/* .vscode/* venv/* -tfx_addons*/ \ No newline at end of file +tfx_addons.egg-info/ \ No newline at end of file diff --git a/tfx_addons/components_addons/components/__init__.py b/tfx_addons/components_addons/components/__init__.py new file mode 100644 index 00000000..2ad4d5d4 --- /dev/null +++ b/tfx_addons/components_addons/components/__init__.py @@ -0,0 +1,2 @@ +from tfx_addons.components_addons.components.example_gen.remote_zip_example_gen.component import RemoteZipCsvExampleGen + diff --git a/tfx_addons/components_addons/components/example_gen/__init__.py b/tfx_addons/components_addons/components/example_gen/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tfx_addons/components_addons/components/example_gen/component.py b/tfx_addons/components_addons/components/example_gen/component.py new file mode 100644 index 00000000..ea396123 --- /dev/null +++ b/tfx_addons/components_addons/components/example_gen/component.py @@ -0,0 +1,102 @@ +from typing import Optional, Union + +from tfx import types +from tfx.components.example_gen import driver +from tfx.components.example_gen import utils +from tfx.dsl.components.base import base_beam_component +from tfx.dsl.components.base import base_beam_executor +from tfx.dsl.components.base import executor_spec +from tfx.orchestration import data_types +from tfx.proto import example_gen_pb2 +from tfx.proto import range_config_pb2 +from tfx.types import standard_artifacts +from census_consumer_complaint_types.types import RemoteZipFileBasedExampleGenSpec + + +class RemoteZipFileBasedExampleGen(base_beam_component.BaseBeamComponent): + """A TFX component to ingest examples from a file system. + + The RemoteZipFileBasedExampleGen component is an API for getting zip + file-based available at HTTP urlrecords into TFX pipelines. It consumes + external files to generate examples which will + be used by other internal components like StatisticsGen or Trainers. The + component will also convert the input data into + [tf.record](https://www.tensorflow.org/tutorials/load_data/tf_records) + and generate train and eval example splits for downstream components. + + ## Example + ``` + _taxi_root = os.path.join(os.environ['HOME'], 'taxi') + _data_root = os.path.join(_taxi_root, 'data', 'simple') + _zip_uri = "https://xyz//abz.csv.zip" + # Brings data into the pipeline or otherwise joins/converts training data. + example_gen = RemoteZipFileBasedExample(input_base=_data_root,zip_file_uri="") + ``` + + Component `outputs` contains: + - `examples`: Channel of type `standard_artifacts.Examples` for output train + and eval examples. + """ + + SPEC_CLASS = RemoteZipFileBasedExampleGenSpec + + # EXECUTOR_SPEC should be overridden by subclasses. + EXECUTOR_SPEC = executor_spec.BeamExecutorSpec( + base_beam_executor.BaseBeamExecutor) + DRIVER_CLASS = driver.FileBasedDriver + + def __init__( + self, + input_base: Optional[str] = None, + zip_file_uri: Optional[str] = None, + input_config: Optional[Union[example_gen_pb2.Input, + data_types.RuntimeParameter]] = None, + output_config: Optional[Union[example_gen_pb2.Output, + data_types.RuntimeParameter]] = None, + custom_config: Optional[Union[example_gen_pb2.CustomConfig, + data_types.RuntimeParameter]] = None, + range_config: Optional[Union[range_config_pb2.RangeConfig, + data_types.RuntimeParameter]] = None, + output_data_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE, + output_file_format: Optional[int] = example_gen_pb2.FORMAT_TFRECORDS_GZIP, + custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None): + """Construct a FileBasedExampleGen component. + + Args: + input_base: an extract directory containing the CSV files after extraction of downloaded zip file. + zip_file_uri: Remote Zip file uri to download compressed zip csv file + input_config: An + [`example_gen_pb2.Input`](https://github.com/tensorflow/tfx/blob/master/tfx/proto/example_gen.proto) + instance, providing input configuration. If unset, input files will be + treated as a single split. + output_config: An example_gen_pb2.Output instance, providing the output + configuration. If unset, default splits will be 'train' and + 'eval' with size 2:1. + custom_config: An optional example_gen_pb2.CustomConfig instance, + providing custom configuration for executor. + range_config: An optional range_config_pb2.RangeConfig instance, + specifying the range of span values to consider. If unset, driver will + default to searching for latest span with no restrictions. + output_data_format: Payload format of generated data in output artifact, + one of example_gen_pb2.PayloadFormat enum. + output_file_format: File format of generated data in output artifact, + one of example_gen_pb2.FileFormat enum. + custom_executor_spec: Optional custom executor spec overriding the default + executor spec specified in the component attribute. + """ + # Configure inputs and outputs. + input_config = input_config or utils.make_default_input_config() + output_config = output_config or utils.make_default_output_config( + input_config) + example_artifacts = types.Channel(type=standard_artifacts.Examples) + spec = RemoteZipFileBasedExampleGenSpec( + input_base=input_base, + zip_file_uri=zip_file_uri, + input_config=input_config, + output_config=output_config, + custom_config=custom_config, + range_config=range_config, + output_data_format=output_data_format, + output_file_format=output_file_format, + examples=example_artifacts) + super().__init__(spec=spec, custom_executor_spec=custom_executor_spec) diff --git a/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/__init__.py b/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/component.py b/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/component.py new file mode 100644 index 00000000..6a1c76ce --- /dev/null +++ b/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/component.py @@ -0,0 +1,94 @@ +# Copyright 2019 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TFX RemoteZipCsvExampleGen component definition.""" + +from typing import Optional, Union + +from census_consumer_complaint_custom_component.example_gen.remote_zip_csv_example_gen import executor +from census_consumer_complaint_custom_component.component import RemoteZipFileBasedExampleGen +from tfx.dsl.components.base import executor_spec +from tfx.orchestration import data_types +from tfx.proto import example_gen_pb2 +from tfx.proto import range_config_pb2 + + +class RemoteZipCsvExampleGen(RemoteZipFileBasedExampleGen): # pylint: disable=protected-access + """Official TFX RemoteZipCsvExampleGen component. + + The remotezipcsv examplegen component takes zip file url of zip compressed csv data, and generates train + and eval examples for downstream components. + + The remotezipcsv examplegen encodes column values to tf.Example int/float/byte feature. + For the case when there's missing cells, the csv examplegen uses: + -- tf.train.Feature(`type`_list=tf.train.`type`List(value=[])), when the + `type` can be inferred. + -- tf.train.Feature() when it cannot infer the `type` from the column. + + Note that the type inferring will be per input split. If input isn't a single + split, users need to ensure the column types align in each pre-splits. + + For example, given the following csv rows of a split: + + header:A,B,C,D + row1: 1,,x,0.1 + row2: 2,,y,0.2 + row3: 3,,,0.3 + row4: + + The output example will be + example1: 1(int), empty feature(no type), x(string), 0.1(float) + example2: 2(int), empty feature(no type), x(string), 0.2(float) + example3: 3(int), empty feature(no type), empty list(string), 0.3(float) + + Note that the empty feature is `tf.train.Feature()` while empty list string + feature is `tf.train.Feature(bytes_list=tf.train.BytesList(value=[]))`. + + Component `outputs` contains: + - `examples`: Channel of type `standard_artifacts.Examples` for output train + and eval examples. + """ + + EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(executor.Executor) + + def __init__( + self, + input_base: Optional[str] = None, + zip_file_uri: Optional[str] = None, + input_config: Optional[Union[example_gen_pb2.Input, + data_types.RuntimeParameter]] = None, + output_config: Optional[Union[example_gen_pb2.Output, + data_types.RuntimeParameter]] = None, + range_config: Optional[Union[range_config_pb2.RangeConfig, + data_types.RuntimeParameter]] = None): + """Construct a RemoteZipCsvExampleGen component. + + Args: + input_base: an extract directory containing the CSV files after extraction of downloaded zip file. + zip_file_uri: Remote Zip file uri to download compressed zip csv file + input_config: An example_gen_pb2.Input instance, providing input + configuration. If unset, the files under input_base will be treated as a + single split. + output_config: An example_gen_pb2.Output instance, providing output + configuration. If unset, default splits will be 'train' and 'eval' with + size 2:1. + range_config: An optional range_config_pb2.RangeConfig instance, + specifying the range of span values to consider. If unset, driver will + default to searching for latest span with no restrictions. + """ + super().__init__( + input_base=input_base, + zip_file_uri=zip_file_uri, + input_config=input_config, + output_config=output_config, + range_config=range_config) diff --git a/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/executor.py b/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/executor.py new file mode 100644 index 00000000..7d5cfd15 --- /dev/null +++ b/tfx_addons/components_addons/components/example_gen/remote_zip_example_gen/executor.py @@ -0,0 +1,142 @@ +# Copyright 2019 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Remote Zip csv based TFX example gen executor.""" +import datetime +import os +from typing import Any, Dict, Union +from tfx.dsl.io import fileio +from tfx.components.example_gen.csv_example_gen.executor import _CsvToExample +import pandas as pd +from absl import logging +import apache_beam as beam +import tensorflow as tf +from zipfile import ZipFile +from tfx.components.example_gen.base_example_gen_executor import BaseExampleGenExecutor +from tfx.types import standard_component_specs +from tfx_addons.components_addons.types.standard_component_specs import REMOTE_ZIP_FILE_URI_KEY + + +def download_dataset(zip_file_uri: str, download_dir: str) -> str: + """Downloads a dataset from a given uri and saves it to a Download directory. + + Args: + zip_file_uri: The uri of the dataset to download. + download_dir: The directory to save the dataset to. + + Returns: + The path to the downloaded dataset. + + Raises: + ValueError: If the dataset cannot be downloaded. + """ + + try: + print("Downloading file in ", download_dir) + # Creating download_dir if not exists + if os.path.exists(download_dir): + shutil.rmtree(download_dir) + os.makedirs(download_dir, exist_ok=True) + # Obtaining zip file path to download zip file + zip_file_path = os.path.join(download_dir, os.path.basename(zip_file_uri)) + request.urlretrieve(zip_file_uri, zip_file_path) + return zip_file_path + except Exception as e: + raise ValueError('Failed to download dataset from {}. {}'.format(zip_file_uri, e)) + + +def extract_zip_file(zip_file_path: str, extract_dir: str, zip_file_read_mode: str = "r") -> None: + """Extracts a zip file to a directory. + + Args: + zip_file_path: The path to the zip file. + output_dir: The directory to extract the zip file to. + """ + try: + print("Extracting file in ", extract_dir) + os.makedirs(extract_dir, exist_ok=True) + with ZipFile(zip_file_path, zip_file_read_mode) as zip_file: + zip_file.extractall(extract_dir) + except Exception as e: + raise e + + +@beam.ptransform_fn +@beam.typehints.with_input_types(beam.Pipeline) +@beam.typehints.with_output_types(tf.train.Example) +def _ZipToExample( # pylint: disable=invalid-name + pipeline: beam.Pipeline, exec_properties: Dict[str, Any], + split_pattern: str) -> beam.pvalue.PCollection: + """Read remote zip csv files and transform to TF examples. + + Note that each input split will be transformed by this function separately. + + Args: + pipeline: beam pipeline. + exec_properties: A dict of execution properties. + - input_base: input dir that contains Avro data. + split_pattern: Split.pattern in Input config, glob relative file pattern + that maps to input files with root directory given by input_base. + + Returns: + PCollection of TF examples. + """ + # directory to extract zip file + + input_base_uri = os.path.join(exec_properties[standard_component_specs.INPUT_BASE_KEY]) + + # remote zip file uri to download zip file + zip_file_uri = exec_properties[REMOTE_ZIP_FILE_URI_KEY] + + # downloading zip file from zip file uri into input_base_uri location + zip_file_path = download_dataset(zip_file_uri, input_base_uri) + + # extract zip files at input_base_uri + extract_zip_file(zip_file_path, input_base_uri) + os.remove(zip_file_path) + return _CsvToExample(exec_properties=exec_properties, split_pattern=split_pattern).expand(pipeline=pipeline) + + +class Executor(BaseExampleGenExecutor): + """TFX example gen executor for processing remote zip csv format. + + Data type conversion: + integer types will be converted to tf.train.Feature with tf.train.Int64List. + float types will be converted to tf.train.Feature with tf.train.FloatList. + string types will be converted to tf.train.Feature with tf.train.BytesList + and utf-8 encoding. + + Note that, + Single value will be converted to a list of that single value. + Missing value will be converted to empty tf.train.Feature(). + + For details, check the dict_to_example function in example_gen.utils. + + + Example usage: + + from tfx.components.base import executor_spec + from tfx.components.example_gen.component import + FileBasedExampleGen + from tfx.components.example_gen.custom_executors import + avro_executor + + example_gen = FileBasedExampleGen( + input_base=avro_dir_path, + custom_executor_spec=executor_spec.ExecutorClassSpec( + avro_executor.Executor)) + """ + + def GetInputSourceToExamplePTransform(self) -> beam.PTransform: + """Returns PTransform for avro to TF examples.""" + return _ZipToExample