From 9dcdfca050fb3c2285ae85e4e1aa707f8212ac74 Mon Sep 17 00:00:00 2001 From: Ruoyu Liu Date: Thu, 24 Oct 2019 15:21:50 -0700 Subject: [PATCH] Adding modified Lab 8 that is compatible with TF 2.0 and TFX 0.15.0 --- ...8_\342\200\223_Custom_TFX_Component.ipynb" | 1371 +++++++++++++++++ 1 file changed, 1371 insertions(+) create mode 100644 "tfx_labs/Lab_8_\342\200\223_Custom_TFX_Component.ipynb" diff --git "a/tfx_labs/Lab_8_\342\200\223_Custom_TFX_Component.ipynb" "b/tfx_labs/Lab_8_\342\200\223_Custom_TFX_Component.ipynb" new file mode 100644 index 00000000..3611622f --- /dev/null +++ "b/tfx_labs/Lab_8_\342\200\223_Custom_TFX_Component.ipynb" @@ -0,0 +1,1371 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "TFX Lab 8 – Custom TFX Component", + "provenance": [], + "private_outputs": true, + "collapsed_sections": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "accelerator": "GPU" + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "FSa4Ry187mLL", + "colab_type": "text" + }, + "source": [ + "##### Copyright © 2019 The TensorFlow Authors." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "7JwKPOmN2-15", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# https://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License." + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "23R0Z9RojXYW", + "colab_type": "text" + }, + "source": [ + "# TFX – Building a Custom TFX Component\n", + "This notebook demonstrates how to build and use a custom component in your TFX pipeline. We will train an image classification model on the [UC Merced Land Use Dataset](http://weegee.vision.ucmerced.edu/datasets/landuse.html) of aerial pictures, using a custom component to perform image augmentation." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2GivNBNYjb3b", + "colab_type": "text" + }, + "source": [ + "## Setup\n", + "First, we install the necessary packages, download data, import modules and set up paths." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "PNMMAVwGj2Sl", + "colab_type": "text" + }, + "source": [ + "### Install TFX and Tensorflow" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "9h_QV-UK3ZR1", + "colab_type": "code", + "colab": {} + }, + "source": [ + "!pip uninstall -y tensorflow\n", + "!pip uninstall -y tensorflow-estimator\n", + "!pip install -q -U tensorflow-gpu==2.0.0\n", + "!pip install -q -U tfx==0.15.0rc0\n", + "!pip install -q -U tensorflow_datasets\n", + "!pip install -q -U pyarrow==0.14.1\n", + "!pip install -q -U tensorflow-addons" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "4MXOiWAWFsoW", + "colab_type": "code", + "colab": {} + }, + "source": [ + "!pip list" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "cZISQqjf6gyz" + }, + "source": [ + "### Import packages\n", + "We import necessary packages, including standard TFX component classes." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "7kMRHaij6gy0", + "colab": {} + }, + "source": [ + "import logging\n", + "import os\n", + "import tempfile\n", + "import urllib\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "\n", + "import tensorflow.compat.v2 as tf\n", + "tf.enable_v2_behavior()\n", + "tf.get_logger().setLevel(logging.INFO)\n", + "tf.get_logger().propagate = False\n", + "\n", + "keras = tf.keras\n", + "K = keras.backend\n", + "\n", + "import apache_beam as beam\n", + "\n", + "import tensorflow_data_validation as tfdv\n", + "import tensorflow_datasets as tfds\n", + "import tensorflow_model_analysis as tfma\n", + "from tensorflow_model_analysis.eval_saved_model.export import build_parsing_eval_input_receiver_fn\n", + "from tfx_bsl.coders import example_coder\n", + "\n", + "# In TF 2, you should install tensorflow-addons and use it instead of tf.contrib\n", + "# from tensorflow_addons.image import rotate\n", + "try:\n", + " from tensorflow.contrib.image import rotate\n", + "except ImportError:\n", + " from tensorflow_addons.image import rotate\n", + "\n", + "import tfx\n", + "\n", + "from tfx.components.base import base_component\n", + "from tfx.components.base import base_executor\n", + "from tfx.components.base import executor_spec\n", + "from tfx.components.evaluator.component import Evaluator\n", + "from tfx.components.example_gen.import_example_gen.component import ImportExampleGen\n", + "from tfx.components.example_validator.component import ExampleValidator\n", + "from tfx.components.model_validator.component import ModelValidator\n", + "from tfx.components.pusher.component import Pusher\n", + "from tfx.components.schema_gen.component import SchemaGen\n", + "from tfx.components.statistics_gen.component import StatisticsGen\n", + "from tfx.components.trainer.component import Trainer\n", + "from tfx.components.transform.component import Transform\n", + "\n", + "from tfx.orchestration import metadata\n", + "from tfx.orchestration import pipeline\n", + "from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner\n", + "from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext\n", + "\n", + "from tfx.proto import evaluator_pb2\n", + "from tfx.proto import example_gen_pb2\n", + "from tfx.proto import pusher_pb2\n", + "from tfx.proto import trainer_pb2\n", + "\n", + "from tfx.types import artifact_utils\n", + "from tfx.types import standard_artifacts\n", + "from tfx.types.component_spec import ChannelParameter\n", + "from tfx.types.component_spec import ExecutionParameter\n", + "\n", + "from tfx.utils.dsl_utils import external_input" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "JIoKX5nR96cz", + "colab_type": "text" + }, + "source": [ + "Check the versions" + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "scyBC9jM6gy2", + "colab": {} + }, + "source": [ + "print('TensorFlow version: {}'.format(tf.__version__))\n", + "print('TFX version: {}'.format(tfx.__version__))" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "n2cMMAbSkGfX", + "colab_type": "text" + }, + "source": [ + "### Download example data\n", + "We download the sample dataset for use in our TFX pipeline. We use TFDS to load the *uc_merced* dataset, for aerial image classification." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "oEOFEIlX6gy7", + "colab": {} + }, + "source": [ + "train_set, ds_info = tfds.load(name=\"uc_merced\",\n", + " split=\"train\",\n", + " as_supervised=True,\n", + " with_info=True)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "7NtlqQDscWPh", + "colab_type": "code", + "colab": {} + }, + "source": [ + "ds_info" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "NIbU3lzGc71G", + "colab_type": "code", + "colab": {} + }, + "source": [ + "n_classes = ds_info.features['label'].num_classes\n", + "n_classes" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "cX4tKCUQdT-h", + "colab_type": "code", + "colab": {} + }, + "source": [ + "class_names = ds_info.features['label'].names\n", + "class_names" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "hfIiYkxSbsZh", + "colab_type": "code", + "colab": {} + }, + "source": [ + "num_rows, num_cols = 10, 5\n", + "plt.figure(figsize=(4 * num_cols, 4 * num_rows))\n", + "for index, (image, label) in enumerate(train_set.take(num_rows * num_cols)):\n", + " plt.subplot(num_rows, num_cols, index + 1)\n", + " plt.imshow(image)\n", + " plt.title(class_names[label])\n", + " plt.axis('off')\n", + "plt.show()" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8mE3wtC15qZd", + "colab_type": "text" + }, + "source": [ + "Note that a few images are slightly smaller than 256x256:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "q8zNfnxa5w_Q", + "colab_type": "code", + "colab": {} + }, + "source": [ + "for img, label in train_set:\n", + " if img.shape!=(256, 256, 3):\n", + " print(img.shape)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "MWo6yMNAEOdT", + "colab_type": "text" + }, + "source": [ + "## Running the pipeline interactively" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "m7ptBxyFKF01", + "colab_type": "code", + "colab": {} + }, + "source": [ + "context = InteractiveContext()" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "mhpA6usY092w", + "colab_type": "code", + "colab": {} + }, + "source": [ + "HOME = os.path.expanduser('~')\n", + "examples_path = os.path.join(HOME, \"tensorflow_datasets\", \"uc_merced\", \"0.0.1\")\n", + "dataset = tf.data.TFRecordDataset(os.path.join(examples_path, \"uc_merced-train.tfrecord-00000-of-00001\"))\n", + "for tfrecord in dataset.take(1):\n", + " example = example_coder.ExampleToNumpyDict(tfrecord.numpy())\n", + " img = tf.io.decode_png(example['image'][0])" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "5LOiqEVNMU22", + "colab_type": "code", + "colab": {} + }, + "source": [ + "example" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "7S2vbU8zMEXH", + "colab_type": "code", + "colab": {} + }, + "source": [ + "plt.imshow(img)\n", + "plt.axis('off')\n", + "plt.show()" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "_CT0ZiEqN50o", + "colab_type": "code", + "colab": {} + }, + "source": [ + "img.shape" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "GyEUbh0e29kd", + "colab_type": "code", + "colab": {} + }, + "source": [ + "examples = external_input(examples_path)\n", + "\n", + "input_config = example_gen_pb2.Input(splits=[\n", + " example_gen_pb2.Input.Split(name='train', pattern='uc_merced-train*')])\n", + "#Or equivalently:\n", + "#input_config = tfx.components.example_gen.utils.make_default_input_config(\n", + "# split_pattern='uc_merced-train*')\n", + "\n", + "example_gen = ImportExampleGen(input_base=examples, input_config=input_config)\n", + "\n", + "context.run(example_gen)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "dqIsT6tMC1Dg", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Computes statistics over data for visualization and example validation.\n", + "statistics_gen = StatisticsGen(\n", + " input_data=example_gen.outputs['examples'])\n", + "context.run(statistics_gen)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "-7sPPh4ZDSie", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Generates schema based on statistics files.\n", + "infer_schema = SchemaGen(stats=statistics_gen.outputs['output'])\n", + "context.run(infer_schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "XGdkE2VjLj1u", + "colab_type": "code", + "colab": {} + }, + "source": [ + "from tensorflow_metadata.proto.v0 import schema_pb2\n", + "\n", + "train_uri = infer_schema.outputs['output'].get()[0].uri\n", + "schema_filename = os.path.join(train_uri, \"schema.pbtxt\")\n", + "schema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,\n", + " message=schema_pb2.Schema())" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "G9ONFGWSLwef", + "colab_type": "code", + "colab": {} + }, + "source": [ + "tfdv.display_schema(schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "nvRd3y40Env4", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs anomaly detection based on statistics and data schema.\n", + "validate_stats = ExampleValidator(\n", + " stats=statistics_gen.outputs['output'],\n", + " schema=infer_schema.outputs['output'])\n", + "#context.run(validate_stats) # TFDV does not support images yet" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "D9hSfui8OETe", + "colab": {} + }, + "source": [ + "\"\"\"Example of a custom TFX component for data augmentation.\n", + "This component along with other custom component related code will only serve as\n", + "an example and will not be supported by TFX team.\n", + "\"\"\"\n", + "class DataAugmentationComponentSpec(tfx.types.ComponentSpec):\n", + " \"\"\"ComponentSpec for custom TFX data augmentation component.\"\"\"\n", + "\n", + " PARAMETERS = {\n", + " 'max_rotation_angle': ExecutionParameter(type=float),\n", + " 'num_augmented_per_image': ExecutionParameter(type=int),\n", + " }\n", + " INPUTS = {\n", + " 'input_data': ChannelParameter(type=standard_artifacts.Examples),\n", + " }\n", + " OUTPUTS = {\n", + " 'augmented_data': ChannelParameter(type=standard_artifacts.Examples),\n", + " }\n", + "\n", + "def _dict_to_example(instance):\n", + " \"\"\"Decoded CSV to tf example.\"\"\"\n", + " feature = {}\n", + " for key, value in instance.items():\n", + " if value is None:\n", + " feature[key] = tf.train.Feature()\n", + " elif value.dtype == np.integer:\n", + " feature[key] = tf.train.Feature(\n", + " int64_list=tf.train.Int64List(value=value.tolist()))\n", + " elif value.dtype == np.float32:\n", + " feature[key] = tf.train.Feature(\n", + " float_list=tf.train.FloatList(value=value.tolist()))\n", + " else:\n", + " feature[key] = tf.train.Feature(\n", + " bytes_list=tf.train.BytesList(value=value.tolist()))\n", + " return tf.train.Example(features=tf.train.Features(feature=feature))\n", + "\n", + "\n", + "def _augment_image(example, max_rotation_angle, num_augmented_per_image):\n", + " image = tf.image.decode_png(example[\"image\"][0])\n", + " augmented_examples = [example.copy() for _ in range(num_augmented_per_image)]\n", + " for augmented_example in augmented_examples:\n", + " angle = tf.random.uniform([], -max_rotation_angle, +max_rotation_angle)\n", + " augmented_example[\"image\"] = np.array([tf.image.encode_png(\n", + " rotate(images=image, angles=angle)).numpy()])\n", + " return [example] + augmented_examples\n", + "\n", + "\n", + "class DataAugmentationExecutor(base_executor.BaseExecutor):\n", + " \"\"\"Executor for custom TFX image augmentation component.\"\"\"\n", + "\n", + " def Do(self, input_dict, output_dict, exec_properties):\n", + " \"\"\"Perform transformations to the images to augment the training set.\n", + " Args:\n", + " input_dict: Input dict from input key to a list of artifacts, including:\n", + " - input_data: transformed examples from the transform component.\n", + " output_dict: Output dict from key to a list of artifacts, including:\n", + " - augmented_data: augmented examples.\n", + " exec_properties: A dict of execution properties, including:\n", + " - max_rotation_angle: images will be rotated by a random angle between\n", + " —max_rotation_angle and +max_rotation_angle (in degrees)\n", + " - num_augmented_per_image: Number of augmented images.\n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " self._log_startup(input_dict, output_dict, exec_properties)\n", + " input_examples_uri = artifact_utils.get_split_uri(\n", + " input_dict['input_data'], 'train')\n", + " output_examples_uri = artifact_utils.get_split_uri(\n", + " output_dict['augmented_data'], 'train')\n", + " with self._make_beam_pipeline() as pipeline:\n", + " raw_data = (\n", + " pipeline\n", + " | 'ReadTrainData' >> beam.io.ReadFromTFRecord(input_examples_uri)\n", + " | 'ParseExample' >> beam.Map(tfdv.TFExampleDecoder().decode)\n", + " | 'Augmentation' >> beam.ParDo(_augment_image, **exec_properties)\n", + " | 'DictToExample' >> beam.Map(_dict_to_example)\n", + " | 'SerializeExample' >> beam.Map(lambda x: x.SerializeToString())\n", + " | 'WriteAugmentedData' >> beam.io.WriteToTFRecord(\n", + " os.path.join(output_examples_uri, \"data_tfrecord\"),\n", + " file_name_suffix='.gz')\n", + " )\n", + " eval_input_examples_uri = artifact_utils.get_split_uri(\n", + " input_dict['input_data'], 'eval')\n", + " eval_output_examples_uri = artifact_utils.get_split_uri(\n", + " output_dict['augmented_data'], 'eval')\n", + " with self._make_beam_pipeline() as pipeline:\n", + " raw_data = (\n", + " pipeline\n", + " | 'ReadEvalData' >> beam.io.ReadFromTFRecord(eval_input_examples_uri)\n", + " | 'WriteAugmentedData' >> beam.io.WriteToTFRecord(\n", + " os.path.join(eval_output_examples_uri, \"data_tfrecord\"),\n", + " file_name_suffix='.gz')\n", + " )\n", + "\n", + "class DataAugmentationComponent(base_component.BaseComponent):\n", + " \"\"\"Custom TFX image augmentation component.\n", + " This custom component will transform the training images some more after the\n", + " transform component to augment the training set. This augmentation will only\n", + " take place during training, not during eval or serving.\n", + " \"\"\"\n", + " SPEC_CLASS = DataAugmentationComponentSpec\n", + " EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(DataAugmentationExecutor)\n", + "\n", + " def __init__(self,\n", + " input_data,\n", + " max_rotation_angle=10.,\n", + " num_augmented_per_image=2,\n", + " augmented_data=None,\n", + " instance_name=None):\n", + " \"\"\"Construct a DataAugmentationComponent.\n", + " Args:\n", + " input_data: A Channel of 'Examples' type, usually produced by a Transform\n", + " component.\n", + " max_rotation_angle: images will be rotated by a random angle between\n", + " —max_rotation_angle and +max_rotation_angle (in degrees)\n", + " num_augmented_per_image: number of augmented images per original image,\n", + " not including the original image\n", + " augmented_data: A Channel of 'Examples' type for the output (augmented)\n", + " data.\n", + " instance_name: Optional unique instance name. Necessary if multiple\n", + " components of this class are declared in the same pipeline.\n", + " \"\"\"\n", + " augmented_data = augmented_data or tfx.types.Channel(\n", + " type=standard_artifacts.Examples,\n", + " artifacts=[standard_artifacts.Examples(split=\"train\"),\n", + " standard_artifacts.Examples(split=\"eval\")])\n", + " spec = DataAugmentationComponentSpec(\n", + " input_data=input_data,\n", + " max_rotation_angle=max_rotation_angle,\n", + " num_augmented_per_image=num_augmented_per_image,\n", + " augmented_data=augmented_data)\n", + " super().__init__(spec=spec, instance_name=instance_name)\n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "I2T7buG3NXXJ", + "colab": {} + }, + "source": [ + "data_augmentation = DataAugmentationComponent(\n", + " input_data=example_gen.outputs['examples'],\n", + " max_rotation_angle=180.,\n", + " num_augmented_per_image=5)\n", + "\n", + "context.run(data_augmentation)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ClBsJpJyNXXH", + "colab": {} + }, + "source": [ + "train_uri = data_augmentation.outputs['augmented_data'].get()[0].uri\n", + "tfrecord_filenames = [os.path.join(train_uri, name)\n", + " for name in os.listdir(train_uri)]\n", + "augmented_train_set = tf.data.TFRecordDataset(tfrecord_filenames,\n", + " compression_type=\"GZIP\")" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "BlsHJ4BqNXXE", + "colab": {} + }, + "source": [ + "num_rows, num_cols = 10, 5\n", + "plt.figure(figsize=(4 * num_cols, 4 * num_rows))\n", + "for index, tfrecord in enumerate(augmented_train_set.take(num_rows * num_cols)):\n", + " example = example_coder.ExampleToNumpyDict(tfrecord.numpy())\n", + " plt.subplot(num_rows, num_cols, index + 1)\n", + " plt.imshow(tf.image.decode_png(example[\"image\"][0]))\n", + " plt.title(class_names[example[\"label\"][0]])\n", + " plt.axis('off')\n", + "plt.show()" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "ad5JLpKbf6sN", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Set up paths.\n", + "_transform_module_file = 'uc_merced_tranform.py'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "Hoy57eb_SVQc", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%%writefile {_transform_module_file}\n", + "\n", + "import tensorflow_transform as tft\n", + "import tensorflow.compat.v2 as tf\n", + "tf.enable_v2_behavior()\n", + "\n", + "LABEL_KEY = 'label'\n", + "\n", + "def transformed_name(name):\n", + " return name + '_xf'\n", + "\n", + "def preprocessing_fn(inputs):\n", + " \"\"\"tf.transform's callback function for preprocessing inputs.\n", + "\n", + " Args:\n", + " inputs: map from feature keys to raw not-yet-transformed features.\n", + "\n", + " Returns:\n", + " Map from string feature key to transformed feature operations.\n", + " \"\"\"\n", + " outputs = {}\n", + " for feature, value in inputs.items():\n", + " outputs[transformed_name(feature)] = _fill_in_missing(value)\n", + " return outputs\n", + "\n", + "def _fill_in_missing(x):\n", + " \"\"\"Replace missing values in a SparseTensor.\n", + "\n", + " Fills in missing values of `x` with '' or 0, and converts to a dense tensor.\n", + "\n", + " Args:\n", + " x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1\n", + " in the second dimension.\n", + "\n", + " Returns:\n", + " A rank 1 tensor where missing values of `x` have been filled in.\n", + " \"\"\"\n", + " default_value = '' if x.dtype == tf.string else 0\n", + " return tf.squeeze(\n", + " tf.sparse.to_dense(\n", + " tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),\n", + " default_value),\n", + " axis=1)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "HEM3u1HvXm3s", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs transformations and feature engineering in training and serving.\n", + "transform = Transform(\n", + " input_data=data_augmentation.outputs['augmented_data'],\n", + " schema=infer_schema.outputs['output'],\n", + " module_file=_transform_module_file)\n", + "context.run(transform)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "4WpsuWAsSiUk", + "colab_type": "code", + "colab": {} + }, + "source": [ + "_trainer_module = 'uc_merced_trainer'\n", + "_trainer_module_file = _trainer_module + '.py'\n", + "_serving_model_dir = os.path.join(tempfile.mkdtemp(),\n", + " 'serving_model/uc_merced_simple')" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "CaFFTBBeB4wf", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%%writefile {_trainer_module_file}\n", + "\n", + "import tensorflow.compat.v2 as tf\n", + "tf.enable_v2_behavior()\n", + "keras = tf.keras\n", + "import tensorflow_model_analysis as tfma\n", + "import tensorflow_transform as tft\n", + "from tensorflow_transform.tf_metadata import schema_utils\n", + "\n", + "LABEL_KEY = 'label'\n", + "DROP_FEATURES = [\"filename\"]\n", + "NUM_CLASSES = 21\n", + "\n", + "def transformed_name(name):\n", + " return name + '_xf'\n", + "\n", + "\n", + "# Tf.Transform considers these features as \"raw\"\n", + "def _get_raw_feature_spec(schema):\n", + " return schema_utils.schema_as_feature_spec(schema).feature_spec\n", + "\n", + "\n", + "def _gzip_reader_fn(filenames):\n", + " \"\"\"Small utility returning a record reader that can read gzip'ed files.\"\"\"\n", + " return tf.data.TFRecordDataset(\n", + " filenames,\n", + " compression_type='GZIP')\n", + "\n", + "\n", + "@tf.function\n", + "def decode_and_resize(image):\n", + " return tf.image.resize(tf.io.decode_png(image), (256, 256))\n", + "\n", + "\n", + "@tf.function\n", + "def parse_png_images(png_images):\n", + " with tf.device(\"/cpu:0\"):\n", + " flattened = tf.reshape(png_images, [-1])\n", + " decoded = tf.map_fn(decode_and_resize, flattened, dtype=tf.float32)\n", + " reshaped = tf.reshape(decoded, [-1, 256, 256, 3])\n", + " return reshaped / 255.\n", + "\n", + "\n", + "def _build_estimator(config, num_filters=None):\n", + " \"\"\"Build an estimator for classifying uc_merced images\n", + "\n", + " Args:\n", + " config: tf.estimator.RunConfig defining the runtime environment for the\n", + " estimator (including model_dir).\n", + " num_filters: [int], number of filters per Conv2D layer\n", + "\n", + " Returns:\n", + " The estimator that will be used for training and eval.\n", + " \"\"\"\n", + " model = keras.models.Sequential()\n", + " model.add(keras.layers.InputLayer(input_shape=[1], dtype=\"string\", name=\"image_xf\"))\n", + " model.add(keras.layers.Lambda(parse_png_images))\n", + " for filters in num_filters:\n", + " model.add(keras.layers.Conv2D(filters=filters, kernel_size=3, activation=\"relu\"))\n", + " model.add(keras.layers.MaxPool2D())\n", + " model.add(keras.layers.Flatten())\n", + " model.add(keras.layers.Dense(NUM_CLASSES, activation=\"softmax\"))\n", + " model.compile(loss=\"sparse_categorical_crossentropy\",\n", + " optimizer=\"adam\", metrics=[\"accuracy\"])\n", + " return tf.keras.estimator.model_to_estimator(\n", + " keras_model=model,\n", + " config=config,\n", + " custom_objects={\"parse_png_images\": parse_png_images})\n", + "\n", + "\n", + "def _example_serving_receiver_fn(tf_transform_output, schema):\n", + " \"\"\"Build the serving in inputs.\n", + "\n", + " Args:\n", + " tf_transform_output: A TFTransformOutput.\n", + " schema: the schema of the input data.\n", + "\n", + " Returns:\n", + " Tensorflow graph which parses examples, applying tf-transform to them.\n", + " \"\"\"\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + " raw_feature_spec.pop(LABEL_KEY)\n", + "\n", + " raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(\n", + " raw_feature_spec, default_batch_size=None)\n", + " serving_input_receiver = raw_input_fn()\n", + "\n", + " transformed_features = tf_transform_output.transform_raw_features(\n", + " serving_input_receiver.features)\n", + " for feature in DROP_FEATURES + [LABEL_KEY]:\n", + " transformed_features.pop(transformed_name(feature))\n", + "\n", + " return tf.estimator.export.ServingInputReceiver(\n", + " transformed_features, serving_input_receiver.receiver_tensors)\n", + "\n", + "\n", + "def _eval_input_receiver_fn(tf_transform_output, schema):\n", + " \"\"\"Build everything needed for the tf-model-analysis to run the model.\n", + "\n", + " Args:\n", + " tf_transform_output: A TFTransformOutput.\n", + " schema: the schema of the input data.\n", + "\n", + " Returns:\n", + " EvalInputReceiver function, which contains:\n", + " - Tensorflow graph which parses raw untransformed features, applies the\n", + " tf-transform preprocessing operators.\n", + " - Set of raw, untransformed features.\n", + " - Label against which predictions will be compared.\n", + " \"\"\"\n", + " # Notice that the inputs are raw features, not transformed features here.\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + "\n", + " \n", + " serialized_tf_example = tf.compat.v1.placeholder(\n", + " dtype=tf.string, shape=[None], name='input_example_tensor')\n", + "\n", + " # Add a parse_example operator to the tensorflow graph, which will parse\n", + " # raw, untransformed, tf examples.\n", + " features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)\n", + "\n", + " # Now that we have our raw examples, process them through the tf-transform\n", + " # function computed during the preprocessing step.\n", + " transformed_features = tf_transform_output.transform_raw_features(\n", + " features)\n", + "\n", + " # The key name MUST be 'examples'.\n", + " receiver_tensors = {'examples': serialized_tf_example}\n", + "\n", + " # NOTE: Model is driven by transformed features (since training works on the\n", + " # materialized output of TFT, but slicing will happen on raw features).\n", + " features.update(transformed_features)\n", + " for feature in DROP_FEATURES + [LABEL_KEY]:\n", + " if feature in features:\n", + " features.pop(feature)\n", + " if transformed_name(feature) in features:\n", + " features.pop(transformed_name(feature))\n", + " features.pop('image')\n", + " return tfma.export.EvalInputReceiver(\n", + " features=features,\n", + " receiver_tensors=receiver_tensors,\n", + " labels=transformed_features[transformed_name(LABEL_KEY)])\n", + "\n", + "\n", + "def _input_fn(filenames, tf_transform_output, batch_size=200):\n", + " \"\"\"Generates features and labels for training or evaluation.\n", + "\n", + " Args:\n", + " filenames: [str] list of CSV files to read data from.\n", + " tf_transform_output: A TFTransformOutput.\n", + " batch_size: int First dimension size of the Tensors returned by input_fn\n", + "\n", + " Returns:\n", + " A (features, indices) tuple where features is a dictionary of\n", + " Tensors, and indices is a single Tensor of label indices.\n", + " \"\"\"\n", + " transformed_feature_spec = (\n", + " tf_transform_output.transformed_feature_spec().copy())\n", + " dataset = tf.data.experimental.make_batched_features_dataset(\n", + " filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)\n", + "\n", + " transformed_features = dataset.make_one_shot_iterator().get_next()\n", + "\n", + " for feature in DROP_FEATURES:\n", + " transformed_features.pop(transformed_name(feature))\n", + "\n", + " return transformed_features, transformed_features.pop(\n", + " transformed_name(LABEL_KEY))\n", + "\n", + "\n", + "# TFX will call this function\n", + "def trainer_fn(hparams, schema):\n", + " \"\"\"Build the estimator using the high level API.\n", + " Args:\n", + " hparams: Holds hyperparameters used to train the model as name/value pairs.\n", + " schema: Holds the schema of the training examples.\n", + " Returns:\n", + " A dict of the following:\n", + " - estimator: The estimator that will be used for training and eval.\n", + " - train_spec: Spec for training.\n", + " - eval_spec: Spec for eval.\n", + " - eval_input_receiver_fn: Input function for eval.\n", + " \"\"\"\n", + " train_batch_size = 40\n", + " eval_batch_size = 40\n", + " num_cnn_layers = 4\n", + " first_cnn_filters = 32\n", + "\n", + " tf_transform_output = tft.TFTransformOutput(hparams.transform_output)\n", + "\n", + " train_input_fn = lambda: _input_fn(\n", + " hparams.train_files,\n", + " tf_transform_output,\n", + " batch_size=train_batch_size)\n", + "\n", + " eval_input_fn = lambda: _input_fn(\n", + " hparams.eval_files,\n", + " tf_transform_output,\n", + " batch_size=eval_batch_size)\n", + "\n", + " train_spec = tf.estimator.TrainSpec(\n", + " train_input_fn,\n", + " max_steps=hparams.train_steps)\n", + "\n", + " serving_receiver_fn = lambda: _example_serving_receiver_fn(\n", + " tf_transform_output, schema)\n", + "\n", + " exporter = tf.estimator.FinalExporter('uc-merced', serving_receiver_fn)\n", + " eval_spec = tf.estimator.EvalSpec(\n", + " eval_input_fn,\n", + " steps=hparams.eval_steps,\n", + " exporters=[exporter],\n", + " name='uc-merced-eval')\n", + "\n", + " run_config = tf.estimator.RunConfig(\n", + " save_checkpoints_steps=999, keep_checkpoint_max=1)\n", + "\n", + " run_config = run_config.replace(model_dir=hparams.serving_model_dir)\n", + "\n", + " num_filters = [first_cnn_filters]\n", + " for layer_index in range(1, num_cnn_layers):\n", + " num_filters.append(num_filters[-1] * 2)\n", + "\n", + " estimator = _build_estimator(\n", + " config=run_config,\n", + " num_filters=num_filters)\n", + "\n", + " # Create an input receiver for TFMA processing\n", + " receiver_fn = lambda: _eval_input_receiver_fn(\n", + " tf_transform_output, schema)\n", + "\n", + " return {\n", + " 'estimator': estimator,\n", + " 'train_spec': train_spec,\n", + " 'eval_spec': eval_spec,\n", + " 'eval_input_receiver_fn': receiver_fn\n", + " }" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "Lzvb_f_sXfSq", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Uses user-provided Python function that implements a model using TensorFlow's\n", + "# Estimators API.\n", + "trainer = Trainer(\n", + " trainer_fn=\"{}.trainer_fn\".format(_trainer_module),\n", + " transformed_examples=transform.outputs['transformed_examples'],\n", + " schema=infer_schema.outputs['output'],\n", + " transform_output=transform.outputs['transform_output'],\n", + " train_args=trainer_pb2.TrainArgs(num_steps=200),\n", + " eval_args=trainer_pb2.EvalArgs(num_steps=100))\n", + "context.run(trainer)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "zcAop1M4cTbm", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Uses TFMA to compute a evaluation statistics over features of a model.\n", + "model_analyzer = Evaluator(\n", + " examples=example_gen.outputs.examples,\n", + " model_exports=trainer.outputs.output,\n", + " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", + " evaluator_pb2.SingleSlicingSpec(\n", + " column_for_slicing=['label_xf'])\n", + " ]))\n", + "context.run(model_analyzer)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "3LdLU9OUc4-O", + "colab_type": "code", + "colab": {} + }, + "source": [ + "evaluation_uri = model_analyzer.outputs['output'].get()[0].uri\n", + "eval_result = tfma.load_eval_result(evaluation_uri)\n", + "eval_result" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "TLVKECPicbYP", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs quality validation of a candidate model (compared to a baseline).\n", + "model_validator = ModelValidator(\n", + " examples=example_gen.outputs.examples, model=trainer.outputs.output)\n", + "context.run(model_validator)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "Y9uvF1k1eyQ3", + "colab_type": "code", + "colab": {} + }, + "source": [ + "blessing_uri = model_validator.outputs.blessing.get()[0].uri\n", + "!ls -l {blessing_uri}" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "ftNZByqDe-1H", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Setup serving path\n", + "_serving_model_dir = os.path.join(tempfile.mkdtemp(),\n", + " 'serving_model/uc_merced_simple')" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "-gJ1j6c4eiur", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Checks whether the model passed the validation steps and pushes the model\n", + "# to a file destination if check passed.\n", + "pusher = Pusher(\n", + " model_export=trainer.outputs.output,\n", + " model_blessing=model_validator.outputs.blessing,\n", + " push_destination=pusher_pb2.PushDestination(\n", + " filesystem=pusher_pb2.PushDestination.Filesystem(\n", + " base_directory=_serving_model_dir)))\n", + "context.run(pusher)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "piMS6nwS6gzr" + }, + "source": [ + "## Create the pipeline using Beam orchestration" + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ljp3p9Ge6gzs", + "colab": {} + }, + "source": [ + "_pipeline_name = 'uc_merced_beam'\n", + "\n", + "_pipeline_root = tempfile.mkdtemp(prefix='tfx-pipelines')\n", + "_pipeline_root = os.path.join(_pipeline_root, 'pipelines', _pipeline_name)\n", + "\n", + "# Sqlite ML-metadata db path.\n", + "_metadata_root = tempfile.mkdtemp(prefix='tfx-metadata')\n", + "_metadata_path = os.path.join(_metadata_root, 'metadata.db')" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "rp9j8M2s6gzt", + "colab": {} + }, + "source": [ + "def _create_pipeline(pipeline_name, pipeline_root, data_root,\n", + " transform_module_file, trainer_module_file,\n", + " serving_model_dir, metadata_path):\n", + " \"\"\"Implements the UC Merced classification pipeline with TFX.\"\"\"\n", + " examples = external_input(data_root)\n", + "\n", + " input_config = example_gen_pb2.Input(splits=[\n", + " example_gen_pb2.Input.Split(name='train', pattern='uc_merced-train*')])\n", + " #Or equivalently:\n", + " #input_config = tfx.components.example_gen.utils.make_default_input_config(\n", + " # split_pattern='uc_merced-train*')\n", + "\n", + " # Brings data into the pipeline or otherwise joins/converts training data.\n", + " example_gen = ImportExampleGen(input_base=examples, input_config=input_config)\n", + "\n", + " # Computes statistics over data for visualization and example validation.\n", + " statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)\n", + "\n", + " # Generates schema based on statistics files.\n", + " infer_schema = SchemaGen(\n", + " stats=statistics_gen.outputs.output)\n", + "\n", + " # Performs anomaly detection based on statistics and data schema.\n", + " validate_stats = ExampleValidator(\n", + " stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)\n", + "\n", + " # Performs data augmentation for training\n", + " data_augmentation = DataAugmentationComponent(\n", + " input_data=example_gen.outputs.examples,\n", + " max_rotation_angle=180.,\n", + " num_augmented_per_image=5)\n", + "\n", + " # Performs transformations and feature engineering in training and serving.\n", + " transform = Transform(\n", + " input_data=data_augmentation.outputs.augmented_data,\n", + " schema=infer_schema.outputs.output,\n", + " module_file=transform_module_file)\n", + "\n", + " # Uses user-provided Python function that implements a model using\n", + " # TensorFlow's Estimators API.\n", + " trainer = Trainer(\n", + " trainer_fn='{}.trainer_fn'.format(_trainer_module),\n", + " transformed_examples=transform.outputs.transformed_examples,\n", + " schema=infer_schema.outputs.output,\n", + " transform_output=transform.outputs.transform_output,\n", + " train_args=trainer_pb2.TrainArgs(num_steps=200),\n", + " eval_args=trainer_pb2.EvalArgs(num_steps=100))\n", + "\n", + " # Uses TFMA to compute a evaluation statistics over features of a model.\n", + " model_analyzer = Evaluator(\n", + " examples=example_gen.outputs.examples,\n", + " model_exports=trainer.outputs.output,\n", + " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", + " evaluator_pb2.SingleSlicingSpec(\n", + " column_for_slicing=['label_xf'])\n", + " ]))\n", + "\n", + " # Performs quality validation of a candidate model (compared to a baseline).\n", + " model_validator = ModelValidator(\n", + " examples=example_gen.outputs.examples, model=trainer.outputs.output)\n", + "\n", + " # Checks whether the model passed the validation steps and pushes the model\n", + " # to a file destination if check passed.\n", + " pusher = Pusher(\n", + " model_export=trainer.outputs.output,\n", + " model_blessing=model_validator.outputs.blessing,\n", + " push_destination=pusher_pb2.PushDestination(\n", + " filesystem=pusher_pb2.PushDestination.Filesystem(\n", + " base_directory=serving_model_dir)))\n", + "\n", + " components = [example_gen, statistics_gen, infer_schema,\n", + " # validate_stats, # Does not support images yet\n", + " data_augmentation, transform, trainer, model_analyzer,\n", + " model_validator, pusher]\n", + "\n", + " return pipeline.Pipeline(\n", + " pipeline_name=pipeline_name,\n", + " pipeline_root=pipeline_root,\n", + " components=components,\n", + " enable_cache=True,\n", + " metadata_connection_config=metadata.sqlite_metadata_connection_config(\n", + " metadata_path),\n", + " additional_pipeline_args={},\n", + " )" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ooj_PnVV6gz0", + "colab": {} + }, + "source": [ + "from tfx.orchestration import pipeline\n", + "\n", + "uc_merced_pipeline = _create_pipeline(\n", + " pipeline_name=_pipeline_name,\n", + " pipeline_root=_pipeline_root,\n", + " data_root=examples_path,\n", + " transform_module_file=_transform_module_file,\n", + " trainer_module_file=_trainer_module_file,\n", + " serving_model_dir=_serving_model_dir,\n", + " metadata_path=_metadata_path)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "zGaj6eNe6gz3", + "colab": {} + }, + "source": [ + "BeamDagRunner().run(uc_merced_pipeline)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "xSOyzCc9y1iq", + "colab_type": "code", + "colab": {} + }, + "source": [ + "" + ], + "execution_count": 0, + "outputs": [] + } + ] +} \ No newline at end of file