diff --git a/tfx_labs/Lab_1_Pipeline_in_Colab.ipynb b/tfx_labs/Lab_1_Pipeline_in_Colab.ipynb index 1783d9ae..33b6eafc 100644 --- a/tfx_labs/Lab_1_Pipeline_in_Colab.ipynb +++ b/tfx_labs/Lab_1_Pipeline_in_Colab.ipynb @@ -1 +1,1704 @@ -{"cells":[{"cell_type":"markdown","metadata":{"colab_type":"text","id":"qMj8ORjK27p9"},"source":["##### Copyright © 2019 The TensorFlow Authors."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"7JwKPOmN2-15"},"outputs":[],"source":"#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n# you may not use this file except in compliance with the License.\n# You may obtain a copy of the License at\n#\n# https://www.apache.org/licenses/LICENSE-2.0\n#\n# Unless required by applicable law or agreed to in writing, software\n# distributed under the License is distributed on an \"AS IS\" BASIS,\n# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n# See the License for the specific language governing permissions and\n# limitations under the License."},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"23R0Z9RojXYW"},"source":["# TFX — Running a simple pipeline manually in a Colab Notebook\n","\n","### Running a simple pipeline manually in a Colab Notebook\n","This notebook demonstrates how to use Jupyter/Colab notebooks for TFX iterative development. Here, we walk through the Chicago Taxi example in an interactive notebook.\n","\n","Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.\n","\n","## Orchestration\n","\n","In a production deployment of TFX you will use an orchestrator such as Apache Airflow, Kubeflow, or Apache Beam. In an interactive notebook the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.\n","\n","## Metadata\n","\n","In a production deployment of TFX you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in the /tmp directory on the Jupyter notebook or Colab server."]},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"2GivNBNYjb3b"},"source":["## Setup\n","First, we install the necessary packages, download data, import modules and set up paths.\n","\n","### Install TFX and TensorFlow\n","\n","> #### Note\n","> Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"bRbSCw-U-h_F"},"outputs":[],"source":"!pip install -U tensorflow==1.15\n!pip install -U tfx\n!pip install -q -U pyarrow==0.14.1"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"N-ePgV0Lj68Q"},"source":["### Import packages\n","We import necessary packages, including standard TFX component classes."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"YIqpWK9efviJ"},"outputs":[],"source":"import os\nimport tempfile\nimport urllib\nimport pprint\n\nimport tensorflow.compat.v2 as tf\ntf.enable_v2_behavior()\ntf.get_logger().propagate = False\npp = pprint.PrettyPrinter()\n\nimport tfx\nfrom tfx.components.evaluator.component import Evaluator\nfrom tfx.components.example_gen.csv_example_gen.component import CsvExampleGen\nfrom tfx.components.example_validator.component import ExampleValidator\nfrom tfx.components.model_validator.component import ModelValidator\nfrom tfx.components.pusher.component import Pusher\nfrom tfx.components.schema_gen.component import SchemaGen\nfrom tfx.components.statistics_gen.component import StatisticsGen\nfrom tfx.components.trainer.component import Trainer\nfrom tfx.components.transform.component import Transform\nfrom tfx.orchestration.interactive.interactive_context import InteractiveContext\nfrom tfx.proto import evaluator_pb2\nfrom tfx.proto import pusher_pb2\nfrom tfx.proto import trainer_pb2\nfrom tfx.utils.dsl_utils import csv_input\n\nfrom tensorflow.core.example import example_pb2\nfrom tensorflow_metadata.proto.v0 import anomalies_pb2\nfrom tensorflow_metadata.proto.v0 import schema_pb2\nfrom tensorflow_metadata.proto.v0 import statistics_pb2\n\nimport tensorflow_transform as tft\nimport tensorflow_model_analysis as tfma\nimport tensorflow_data_validation as tfdv"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"jlhCYXop3vcd"},"source":["Check the versions"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"XZY7Pnoxmoe8"},"outputs":[],"source":"print('TensorFlow version: {}'.format(tf.__version__))\nprint('TFX version: {}'.format(tfx.__version__))"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"n2cMMAbSkGfX"},"source":["### Download example data\n","We download the sample dataset for use in our TFX pipeline. We're working with the [Online News Popularity](https://archive.ics.uci.edu/ml/datasets/online+news+popularity) dataset, which summarizes a heterogeneous set of features about articles published by Mashable in a period of two years. The goal is to predict the number of shares in social networks (popularity)."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"BywX6OUEhAqn"},"outputs":[],"source":"# Download the example data.\nDATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \\\n 'online_news_popularity_for_course/online_news_popularity_for_course.csv'\n_data_root = tempfile.mkdtemp(prefix='tfx-data')\n_data_filepath = os.path.join(_data_root, \"data.csv\")\nurllib.request.urlretrieve(DATA_PATH, _data_filepath)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"V5RjduOX4us-"},"source":["Take a quick look at the CSV file."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"Hqn4wST2Bex5"},"outputs":[],"source":"!head {_data_filepath}"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"8ONIE_hdkPS4"},"source":["## Create the InteractiveContext\n","\n","An interactive context is used to provide global context when running a TFX pipeline in a notebook without using a runner or orchestrator such as Apache Airflow or Kubeflow. This style of development is only useful when developing the code for a pipeline, and cannot currently be used to deploy a working pipeline to production."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"0Rh6K5sUf9dd"},"outputs":[],"source":"# Here, we create an InteractiveContext using default parameters. This will\n# use a temporary directory with an ephemeral ML Metadata database instance.\n# To use your own pipeline root or database, the optional properties\n# `pipeline_root` and `metadata_connection_config` may be passed to\n# InteractiveContext.\ncontext = InteractiveContext()"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"HdQWxfsVkzdJ"},"source":["# Run TFX Components Interactively\n","\n","---\n","\n","In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain `ExecutionResult` objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met."]},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"L9fwt9gQk3BR"},"source":["### The ExampleGen Component\n","In any ML development process the first step when starting code development is to ingest the training and test datasets. The `ExampleGen` component brings data into the TFX pipeline.\n","\n","Create an ExampleGen component and run it."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"PyXjuMt8f-9u"},"outputs":[],"source":"# Use the packaged CSV input data.\nexamples = csv_input(_data_root)\n\n# Brings data into the pipeline or otherwise joins/converts training data.\nexample_gen = CsvExampleGen(input_base=examples)\ncontext.run(example_gen)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"0SXc2OGnDWz5"},"source":["The component's outputs include 2 artifacts: the training examples and the eval examples (by default, split 2/3 training, 1/3 eval):"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"XoFaWckTy8pL"},"outputs":[],"source":"for artifact in example_gen.outputs['examples'].get():\n print(artifact.split, artifact.uri)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"pfzF2WObDqs6"},"source":["Take a peek at the output training examples to see what they look like.\n","\n","1. Get the URI of the output artifact representing the training examples, which is a directory\n","1. Get the list of files in this directory (all compressed TFRecord files), and create a `TFRecordDataset` to read these files\n","1. Iterate over the first record and decode it using a `TFExampleDecoder` to check the results"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"EEHi7dFLzZWP"},"outputs":[],"source":"train_uri = example_gen.outputs['examples'].get()[0].uri\ntfrecord_filenames = [os.path.join(train_uri, name)\n for name in os.listdir(train_uri)]\ndataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\ndecoder = tfdv.TFExampleDecoder()\nfor tfrecord in dataset.take(1):\n serialized_example = tfrecord.numpy()\n example = decoder.decode(serialized_example)\n pp.pprint(example)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"csM6BFhtk5Aa"},"source":["### The StatisticsGen Component\n","\n","The `StatisticsGen` component computes descriptive statistics for your dataset. The statistics that it generates can be visualized for review, and are used for example validation and to infer a schema.\n","\n","Create a StatisticsGen component and run it."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"MAscCCYWgA-9"},"outputs":[],"source":"# Computes statistics over data for visualization and example validation.\nstatistics_gen = StatisticsGen(\n input_data=example_gen.outputs['examples'])\ncontext.run(statistics_gen)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"j5LBVkeDEvZQ"},"source":["Again, let's take a peek at the output training artifact. Note that this time it is a TFRecord file containing a single record with a serialized `DatasetFeatureStatisticsList` protobuf:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"fHP1HKDc3EXY"},"outputs":[],"source":"train_uri = statistics_gen.outputs['output'].get()[0].uri\ntfrecord_filenames = [os.path.join(train_uri, name)\n for name in os.listdir(train_uri)]\ndataset = tf.data.TFRecordDataset(tfrecord_filenames)\nfor tfrecord in dataset.take(1):\n serialized_example = tfrecord.numpy()\n stats = statistics_pb2.DatasetFeatureStatisticsList()\n stats.ParseFromString(serialized_example)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"GRNfT5_aFGJC"},"source":["The stats can be visualized using the `tfdv.visualize_statistics()` function (we will look at this in more detail in a subsequent lab)."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"3i_3ntm3FEcK"},"outputs":[],"source":"tfdv.visualize_statistics(stats)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"HLKLTO9Nk60p"},"source":["### The SchemaGen Component\n","\n","The `SchemaGen` component generates a schema for your data based on the statistics from StatisticsGen. It tries to infer the data types of each of your features, and the ranges of legal values for categorical features.\n","\n","Create a SchemaGen component and run it."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"ygQvZ6hsiQ_J"},"outputs":[],"source":"# Generates schema based on statistics files.\ninfer_schema = SchemaGen(stats=statistics_gen.outputs['output'])\ncontext.run(infer_schema)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"kdtU3u01FR-2"},"source":["The generated artifact is just a `schema.pbtxt` containing a text representation of a `schema_pb2.Schema` protobuf:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"L6-tgKi6A_gK"},"outputs":[],"source":"train_uri = infer_schema.outputs['output'].get()[0].uri\nschema_filename = os.path.join(train_uri, \"schema.pbtxt\")\nschema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,\n message=schema_pb2.Schema())"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"FaSgx5qIFelw"},"source":["It can be visualized using `tfdv.display_schema()` (we will look at this in more detail in a subsequent lab):"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"gycOsJIQFhi3"},"outputs":[],"source":"tfdv.display_schema(schema)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"V1qcUuO9k9f8"},"source":["### The ExampleValidator Component\n","\n","The `ExampleValidator` performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.\n","\n","Create an ExampleValidator component and run it."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"XRlRUuGgiXks"},"outputs":[],"source":"# Performs anomaly detection based on statistics and data schema.\nvalidate_stats = ExampleValidator(\n stats=statistics_gen.outputs['output'],\n schema=infer_schema.outputs['output'])\ncontext.run(validate_stats)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"gWUDXqADFp5U"},"source":["The output artifact of the `ExampleValidator` is an `anomalies.pbtxt` file describing an `anomalies_pb2.Anomalies` protobuf:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"DMX0LCyHCKGH"},"outputs":[],"source":"train_uri = validate_stats.outputs['output'].get()[0].uri\nanomalies_filename = os.path.join(train_uri, \"anomalies.pbtxt\")\nanomalies = tfx.utils.io_utils.parse_pbtxt_file(\n file_name=anomalies_filename,\n message=anomalies_pb2.Anomalies())"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"Z-nMbzFbF22_"},"source":["This can be visualized using the `tfdv.display_anomalies()` function (we will look at this in more details in a subsequent lab). Did it find any anomalies?"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"IGfl7-8YF2Ej"},"outputs":[],"source":"tfdv.display_anomalies(anomalies)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"JPViEz5RlA36"},"source":["### The Transform Component\n","\n","The `Transform` component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.\n","\n","The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.\n","\n","Define some constants and functions for both the `Transform` component and the `Trainer` component. Define them in a Python module, in this case saved to disk using the `%%writefile` magic command since you are working in a notebook:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"FvZ6OFHDG2fe"},"outputs":[],"source":"_constants_module_file = 'online_news_constants.py'"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"_GpU9-JNXw-_"},"outputs":[],"source":"%%writefile {_constants_module_file}\n\nDENSE_FLOAT_FEATURE_KEYS = [\n \"timedelta\", \"n_tokens_title\", \"n_tokens_content\",\n \"n_unique_tokens\", \"n_non_stop_words\", \"n_non_stop_unique_tokens\",\n \"n_hrefs\", \"n_self_hrefs\", \"n_imgs\", \"n_videos\", \"average_token_length\",\n \"n_keywords\", \"kw_min_min\", \"kw_max_min\", \"kw_avg_min\", \"kw_min_max\",\n \"kw_max_max\", \"kw_avg_max\", \"kw_min_avg\", \"kw_max_avg\", \"kw_avg_avg\",\n \"self_reference_min_shares\", \"self_reference_max_shares\",\n \"self_reference_avg_shares\", \"is_weekend\", \"global_subjectivity\",\n \"global_sentiment_polarity\", \"global_rate_positive_words\",\n \"global_rate_negative_words\", \"rate_positive_words\", \"rate_negative_words\",\n \"avg_positive_polarity\", \"min_positive_polarity\", \"max_positive_polarity\",\n \"avg_negative_polarity\", \"min_negative_polarity\", \"max_negative_polarity\",\n \"title_subjectivity\", \"title_sentiment_polarity\", \"abs_title_subjectivity\",\n \"abs_title_sentiment_polarity\"]\n\nVOCAB_FEATURE_KEYS = [\"data_channel\"]\n\nBUCKET_FEATURE_KEYS = [\"LDA_00\", \"LDA_01\", \"LDA_02\", \"LDA_03\", \"LDA_04\"]\n\nCATEGORICAL_FEATURE_KEYS = [\"weekday\"]\n\n# Categorical features are assumed to each have a maximum value in the dataset.\nMAX_CATEGORICAL_FEATURE_VALUES = [6]\n\n#UNUSED: date, slug\n\nLABEL_KEY = \"n_shares\"\nVOCAB_SIZE = 10\nOOV_SIZE = 5\nFEATURE_BUCKET_COUNT = 10\n\ndef transformed_name(key):\n return key + '_xf'"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"XUYeCayFG7kH"},"source":["Now let's define a module containing the `preprocessing_fn()` function that we will pass to the `Transform` component:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"7uuWiQbOG9ki"},"outputs":[],"source":"_transform_module_file = 'online_news_transform.py'"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"v3EIuVQnBfH7"},"outputs":[],"source":"%%writefile {_transform_module_file}\n\nimport tensorflow.compat.v2 as tf\ntf.enable_v2_behavior()\n\nimport tensorflow_transform as tft\nfrom online_news_constants import *\n\ndef preprocessing_fn(inputs):\n \"\"\"tf.transform's callback function for preprocessing inputs.\n\n Args:\n inputs: map from feature keys to raw not-yet-transformed features.\n\n Returns:\n Map from string feature key to transformed feature operations.\n \"\"\"\n outputs = {}\n for key in DENSE_FLOAT_FEATURE_KEYS:\n # Preserve this feature as a dense float, setting nan's to the mean.\n outputs[transformed_name(key)] = tft.scale_to_z_score(\n _fill_in_missing(inputs[key]))\n\n for key in VOCAB_FEATURE_KEYS:\n # Build a vocabulary for this feature.\n outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(\n _fill_in_missing(inputs[key]),\n top_k=VOCAB_SIZE,\n num_oov_buckets=OOV_SIZE)\n\n for key in BUCKET_FEATURE_KEYS:\n outputs[transformed_name(key)] = tft.bucketize(\n _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,\n always_return_num_quantiles=False)\n\n for key in CATEGORICAL_FEATURE_KEYS:\n outputs[transformed_name(key)] = _fill_in_missing(inputs[key])\n\n # Was this passenger a big tipper?\n outputs[transformed_name(LABEL_KEY)] = _fill_in_missing(inputs[LABEL_KEY])\n\n return outputs\n\ndef _fill_in_missing(x):\n \"\"\"Replace missing values in a SparseTensor.\n\n Fills in missing values of `x` with '' or 0, and converts to a dense tensor.\n\n Args:\n x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1\n in the second dimension.\n\n Returns:\n A rank 1 tensor where missing values of `x` have been filled in.\n \"\"\"\n default_value = '' if x.dtype == tf.string else 0\n return tf.squeeze(\n tf.sparse.to_dense(\n tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),\n default_value),\n axis=1)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"eeMVMafpHHX1"},"source":["Create and run the `Transform` component, referring to the files that were created above."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"jHfhth_GiZI9"},"outputs":[],"source":"# Performs transformations and feature engineering in training and serving.\ntransform = Transform(\n input_data=example_gen.outputs['examples'],\n schema=infer_schema.outputs['output'],\n module_file=_transform_module_file)\ncontext.run(transform)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"_jbZO1ykHOeG"},"source":["The `Transform` component has 2 types of outputs:\n","* `transform_output` is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).\n","* `transformed_examples` represents the preprocessed training and evaluation data."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"j4UjersvAC7p"},"outputs":[],"source":"transform.outputs"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"wRFMlRcdHlQy"},"source":["Take a peek at the `transform_output` artifact: it points to a directory containing 3 subdirectories:"]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"E4I-cqfQQvaW"},"outputs":[],"source":"train_uri = transform.outputs['transform_output'].get()[0].uri\nos.listdir(train_uri)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"9374B4RpHzor"},"source":["The `transform_fn` subdirectory contains the actual preprocessing graph. The `metadata` subdirectory contains the schema of the original data. The `transformed_metadata` subdirectory contains the schema of the preprocessed data.\n","\n","Take a look at some of the transformed examples and check that they are indeed processed as intended."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"2zIepQhSQoPa"},"outputs":[],"source":"train_uri = transform.outputs['transformed_examples'].get()[1].uri\ntfrecord_filenames = [os.path.join(train_uri, name)\n for name in os.listdir(train_uri)]\ndataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\ndecoder = tfdv.TFExampleDecoder()\nfor tfrecord in dataset.take(3):\n serialized_example = tfrecord.numpy()\n example = decoder.decode(serialized_example)\n pp.pprint(example)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"OBJFtnl6lCg9"},"source":["### The Trainer Component\n","\n","The `Trainer` component trains models using TensorFlow.\n","\n","Create a Python module containing a `trainer_fn` function, which must return an estimator. If you prefer creating a Keras model, you can do so and then convert it to an estimator using `keras.model_to_estimator()`."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"d6QNYWc6PD_h"},"outputs":[],"source":"# Setup paths.\n_trainer_module_file = 'online_news_trainer.py'"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"CaFFTBBeB4wf"},"outputs":[],"source":"%%writefile {_trainer_module_file}\n\nimport tensorflow.compat.v2 as tf\ntf.enable_v2_behavior()\n\nimport tensorflow_model_analysis as tfma\nimport tensorflow_transform as tft\nfrom tensorflow_transform.tf_metadata import schema_utils\n\nfrom online_news_constants import *\n\n\ndef transformed_names(keys):\n return [transformed_name(key) for key in keys]\n\n\n# Tf.Transform considers these features as \"raw\"\ndef _get_raw_feature_spec(schema):\n return schema_utils.schema_as_feature_spec(schema).feature_spec\n\n\ndef _gzip_reader_fn(filenames):\n \"\"\"Small utility returning a record reader that can read gzip'ed files.\"\"\"\n return tf.data.TFRecordDataset(\n filenames,\n compression_type='GZIP')\n\n\ndef _build_estimator(config, hidden_units=None, warm_start_from=None):\n \"\"\"Build an estimator for predicting the popularity of online news articles\n\n Args:\n config: tf.estimator.RunConfig defining the runtime environment for the\n estimator (including model_dir).\n hidden_units: [int], the layer sizes of the DNN (input layer first)\n warm_start_from: Optional directory to warm start from.\n\n Returns:\n The estimator that will be used for training and eval.\n \"\"\"\n real_valued_columns = [\n tf.feature_column.numeric_column(key, shape=())\n for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)\n ]\n categorical_columns = [\n tf.feature_column.categorical_column_with_identity(\n key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)\n for key in transformed_names(VOCAB_FEATURE_KEYS)\n ]\n categorical_columns += [\n tf.feature_column.categorical_column_with_identity(\n key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)\n for key in transformed_names(BUCKET_FEATURE_KEYS)\n ]\n categorical_columns += [\n tf.feature_column.categorical_column_with_identity(\n key,\n num_buckets=num_buckets,\n default_value=0) for key, num_buckets in zip(\n transformed_names(CATEGORICAL_FEATURE_KEYS),\n MAX_CATEGORICAL_FEATURE_VALUES)\n ]\n return tf.estimator.DNNLinearCombinedRegressor(\n config=config,\n linear_feature_columns=categorical_columns,\n dnn_feature_columns=real_valued_columns,\n dnn_hidden_units=hidden_units or [100, 70, 50, 25],\n warm_start_from=warm_start_from)\n\n\ndef _example_serving_receiver_fn(tf_transform_output, schema):\n \"\"\"Build the serving in inputs.\n\n Args:\n tf_transform_output: A TFTransformOutput.\n schema: the schema of the input data.\n\n Returns:\n Tensorflow graph which parses examples, applying tf-transform to them.\n \"\"\"\n raw_feature_spec = _get_raw_feature_spec(schema)\n raw_feature_spec.pop(LABEL_KEY)\n\n raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(\n raw_feature_spec, default_batch_size=None)\n serving_input_receiver = raw_input_fn()\n\n transformed_features = tf_transform_output.transform_raw_features(\n serving_input_receiver.features)\n\n return tf.estimator.export.ServingInputReceiver(\n transformed_features, serving_input_receiver.receiver_tensors)\n\n\ndef _eval_input_receiver_fn(tf_transform_output, schema):\n \"\"\"Build everything needed for the tf-model-analysis to run the model.\n\n Args:\n tf_transform_output: A TFTransformOutput.\n schema: the schema of the input data.\n\n Returns:\n EvalInputReceiver function, which contains:\n - Tensorflow graph which parses raw untransformed features, applies the\n tf-transform preprocessing operators.\n - Set of raw, untransformed features.\n - Label against which predictions will be compared.\n \"\"\"\n # Notice that the inputs are raw features, not transformed features here.\n raw_feature_spec = _get_raw_feature_spec(schema)\n\n raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(\n raw_feature_spec, default_batch_size=None)\n serving_input_receiver = raw_input_fn()\n\n features = serving_input_receiver.features.copy()\n transformed_features = tf_transform_output.transform_raw_features(features)\n \n # NOTE: Model is driven by transformed features (since training works on the\n # materialized output of TFT, but slicing will happen on raw features.\n features.update(transformed_features)\n\n return tfma.export.EvalInputReceiver(\n features=features,\n receiver_tensors=serving_input_receiver.receiver_tensors,\n labels=transformed_features[transformed_name(LABEL_KEY)])\n\n\ndef _input_fn(filenames, tf_transform_output, batch_size=200):\n \"\"\"Generates features and labels for training or evaluation.\n\n Args:\n filenames: [str] list of CSV files to read data from.\n tf_transform_output: A TFTransformOutput.\n batch_size: int First dimension size of the Tensors returned by input_fn\n\n Returns:\n A (features, indices) tuple where features is a dictionary of\n Tensors, and indices is a single Tensor of label indices.\n \"\"\"\n transformed_feature_spec = (\n tf_transform_output.transformed_feature_spec().copy())\n\n dataset = tf.data.experimental.make_batched_features_dataset(\n filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)\n\n transformed_features = dataset.make_one_shot_iterator().get_next()\n # We pop the label because we do not want to use it as a feature while we're\n # training.\n return transformed_features, transformed_features.pop(\n transformed_name(LABEL_KEY))\n\n\n# TFX will call this function\ndef trainer_fn(hparams, schema):\n \"\"\"Build the estimator using the high level API.\n Args:\n hparams: Holds hyperparameters used to train the model as name/value pairs.\n schema: Holds the schema of the training examples.\n Returns:\n A dict of the following:\n - estimator: The estimator that will be used for training and eval.\n - train_spec: Spec for training.\n - eval_spec: Spec for eval.\n - eval_input_receiver_fn: Input function for eval.\n \"\"\"\n # Number of nodes in the first layer of the DNN\n first_dnn_layer_size = 100\n num_dnn_layers = 4\n dnn_decay_factor = 0.7\n\n train_batch_size = 40\n eval_batch_size = 40\n\n tf_transform_output = tft.TFTransformOutput(hparams.transform_output)\n\n train_input_fn = lambda: _input_fn(\n hparams.train_files,\n tf_transform_output,\n batch_size=train_batch_size)\n\n eval_input_fn = lambda: _input_fn(\n hparams.eval_files,\n tf_transform_output,\n batch_size=eval_batch_size)\n\n train_spec = tf.estimator.TrainSpec(\n train_input_fn,\n max_steps=hparams.train_steps)\n\n serving_receiver_fn = lambda: _example_serving_receiver_fn(\n tf_transform_output, schema)\n\n exporter = tf.estimator.FinalExporter('online-news', serving_receiver_fn)\n eval_spec = tf.estimator.EvalSpec(\n eval_input_fn,\n steps=hparams.eval_steps,\n exporters=[exporter],\n name='online-news-eval')\n\n run_config = tf.estimator.RunConfig(\n save_checkpoints_steps=999, keep_checkpoint_max=1)\n\n run_config = run_config.replace(model_dir=hparams.serving_model_dir)\n\n estimator = _build_estimator(\n # Construct layers sizes with exponetial decay\n hidden_units=[\n max(2, int(first_dnn_layer_size * dnn_decay_factor**i))\n for i in range(num_dnn_layers)\n ],\n config=run_config,\n warm_start_from=hparams.warm_start_from)\n\n # Create an input receiver for TFMA processing\n receiver_fn = lambda: _eval_input_receiver_fn(\n tf_transform_output, schema)\n\n return {\n 'estimator': estimator,\n 'train_spec': train_spec,\n 'eval_spec': eval_spec,\n 'eval_input_receiver_fn': receiver_fn\n }"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"GnLjStUJIoos"},"source":["Create and run the `Trainer` component, passing it the file that we created above."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"429-vvCWibO0"},"outputs":[],"source":"# Uses user-provided Python function that implements a model using TensorFlow's\n# Estimators API.\ntrainer = Trainer(\n module_file=_trainer_module_file,\n transformed_examples=transform.outputs['transformed_examples'],\n schema=infer_schema.outputs['output'],\n transform_output=transform.outputs['transform_output'],\n train_args=trainer_pb2.TrainArgs(num_steps=10000),\n eval_args=trainer_pb2.EvalArgs(num_steps=5000))\ncontext.run(trainer)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"ktJA8On9Iui7"},"source":["Take a peek at the trained model which was exported from `Trainer`."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"qDBZG9Oso-BD"},"outputs":[],"source":"train_uri = trainer.outputs['output'].get()[0].uri\nserving_model_path = os.path.join(train_uri, 'serving_model_dir', 'export', 'online-news')\nlatest_serving_model_path = os.path.join(serving_model_path, max(os.listdir(serving_model_path)))\nexported_model = tf.saved_model.load(latest_serving_model_path)"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"KyT3ZVGCZWsj"},"outputs":[],"source":"exported_model.graph.get_operations()[:10] + [\"...\"]"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"XpF7caML7WLB"},"source":["## Analyze Training with TensorBoard\n","\n","Use [TensorBoard](https://www.tensorflow.org/tensorboard) to analyze the model training that was done in Trainer, and see how well our model trained."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"bjCXDSnX7mjQ"},"outputs":[],"source":"%load_ext tensorboard"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"IGcJtyH87m68"},"outputs":[],"source":"%tensorboard --logdir {os.path.join(train_uri, 'serving_model_dir')}"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"FmPftrv0lEQy"},"source":["### The Evaluator Component\n","\n","The `Evaluator` component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which `slices` are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is imporant in this particular use case or domain. The slice chosen for this example is `weekday`.\n","\n","Create and run an Evaluator component."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"Zjcx8g6mihSt"},"outputs":[],"source":"# Uses TFMA to compute a evaluation statistics over features of a model.\nmodel_analyzer = Evaluator(\n examples=example_gen.outputs['examples'],\n model_exports=trainer.outputs['output'],\n feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n evaluator_pb2.SingleSlicingSpec(\n column_for_slicing=['weekday'])\n ]))\ncontext.run(model_analyzer)"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"M1PVK6IhI5uS"},"outputs":[],"source":"model_analyzer.outputs"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"SuKDt7398K6f"},"source":["Use the Evaluator results to generate model performance data which can be visualized. First create evaluation input data."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"zhir0gtOI615"},"outputs":[],"source":"import csv\nfrom tensorflow import python_io\nBASE_DIR = tempfile.mkdtemp()\n\nreader = csv.DictReader(open(_data_filepath))\nexamples = []\nfor line in reader:\n example = example_pb2.Example()\n for feature in schema.feature:\n key = feature.name\n if len(line[key]) > 0:\n if feature.type == schema_pb2.FLOAT:\n example.features.feature[key].float_list.value[:] = [float(line[key])]\n elif feature.type == schema_pb2.INT:\n example.features.feature[key].int64_list.value[:] = [int(line[key])]\n elif feature.type == schema_pb2.BYTES:\n example.features.feature[key].bytes_list.value[:] = [line[key].encode('utf8')]\n else:\n if feature.type == schema_pb2.FLOAT:\n example.features.feature[key].float_list.value[:] = []\n elif feature.type == schema_pb2.INT:\n example.features.feature[key].int64_list.value[:] = []\n elif feature.type == schema_pb2.BYTES:\n example.features.feature[key].bytes_list.value[:] = []\n examples.append(example)\n\nTFRecord_file = os.path.join(BASE_DIR, 'train_data.rio')\nwith python_io.TFRecordWriter(TFRecord_file) as writer:\n for example in examples:\n writer.write(example.SerializeToString())\n writer.flush()\n writer.close()\n\n!ls {TFRecord_file}"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"tu8wdXhh8f-6"},"source":["Run the analysis of a particular slice of data."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"fIRR31vI8kVR"},"outputs":[],"source":"def run_and_render(eval_model=None, slice_list=None, slice_idx=0):\n \"\"\"Runs the model analysis and renders the slicing metrics\n\n Args:\n eval_model: An instance of tf.saved_model saved with evaluation data\n slice_list: A list of tfma.slicer.SingleSliceSpec giving the slices\n slice_idx: An integer index into slice_list specifying the slice to use\n\n Returns:\n A SlicingMetricsViewer object if in Jupyter notebook; None if in Colab.\n \"\"\"\n eval_result = tfma.run_model_analysis(eval_shared_model=eval_model,\n data_location=TFRecord_file,\n file_format='tfrecords',\n slice_spec=slice_list,\n output_path='sample_data',\n extractors=None)\n return tfma.view.render_slicing_metrics(eval_result, slicing_spec=slice_list[slice_idx] if slice_list else None)\n\n# Load the TFMA results for the first training run\n# This will take a minute\neval_model_base_dir_0 = os.path.join(train_uri, 'eval_model_dir')\neval_model_dir_0 = os.path.join(eval_model_base_dir_0,\n max(os.listdir(eval_model_base_dir_0)))\neval_shared_model_0 = tfma.default_eval_shared_model(\n eval_saved_model_path=eval_model_dir_0)\n\n# Slice our data by the weekday feature\nslices = [tfma.slicer.SingleSliceSpec(columns=['weekday'])]\n\nrun_and_render(eval_model=eval_shared_model_0, slice_list=slices, slice_idx=0)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"ebQUT3Tv8uLR"},"source":["Print the slicing metrics."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"ya3NKcxO8vjA"},"outputs":[],"source":"evaluation_uri = model_analyzer.outputs['output'].get()[0].uri\neval_result = tfma.load_eval_result(evaluation_uri)\n\nprint('{}\\n\\nslicing_metrics:\\n'.format(eval_result))\n\nfor metric in eval_result.slicing_metrics:\n pp.pprint(metric)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"Sxi3qGc884-J"},"source":["Examine the output data."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"NxMTME3I85ca"},"outputs":[],"source":"eval_path_uri = model_analyzer.outputs['output'].get()[0].uri\ntfrecord_filenames = [os.path.join(eval_path_uri, name)\n for name in os.listdir(eval_path_uri)]\npp.pprint(tfrecord_filenames)\ndataset = tf.data.TFRecordDataset(tfrecord_filenames)\npp.pprint(dataset)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"76Mil-7FlF_y"},"source":["### The ModelValidator Component\n","\n","The `ModelValidator` component performs validation of your candidate model compared to the previously deployed model (if any) using criteria that you define, or to a baseline value. If the new model scores better than the previous model it will be \"blessed\" by ModelValidator, approving it for deployment."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"FXk1MA7sijCr"},"outputs":[],"source":"# Performs quality validation of a candidate model (compared to a baseline).\nmodel_validator = ModelValidator(\n examples=example_gen.outputs['examples'],\n model=trainer.outputs['output'])\ncontext.run(model_validator)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"FVOkA2MA9FtN"},"source":["Examine the output of ModelValidator."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"U-si25tpAQ0q"},"outputs":[],"source":"model_validator.outputs"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"Z2RWZWD-AZ-u"},"outputs":[],"source":"blessing_uri = model_validator.outputs.blessing.get()[0].uri\n!ls -l {blessing_uri}"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"T8DYekCZlHfj"},"source":["### The Pusher Component\n","\n","The `Pusher` component checks whether a model has been \"blessed\", and if so, deploys it to production by pushing the model to a well known file destination."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"KvVasBxePW-n"},"outputs":[],"source":"# Setup serving path\n_serving_model_dir = os.path.join(tempfile.mkdtemp(),\n 'serving_model/online_news_simple')"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"7nDI_54N9Sbk"},"source":["Create and run a Pusher component."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"r45nQ69eikc9"},"outputs":[],"source":"# Checks whether the model passed the validation steps and pushes the model\n# to a file destination if check passed.\npusher = Pusher(\n model_export=trainer.outputs['output'],\n model_blessing=model_validator.outputs['blessing'],\n push_destination=pusher_pb2.PushDestination(\n filesystem=pusher_pb2.PushDestination.Filesystem(\n base_directory=_serving_model_dir)))\ncontext.run(pusher)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"fj6imIJx9YGD"},"source":["Examine the output of Pusher."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"gNvMj9AWsmSt"},"outputs":[],"source":"pusher.outputs"},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"4bphCjS-B4vv"},"outputs":[],"source":"push_uri = pusher.outputs.model_push.get()[0].uri\nlatest_version = max(os.listdir(push_uri))\nlatest_version_path = os.path.join(push_uri, latest_version)\nmodel = tf.saved_model.load(latest_version_path)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"e7yifs_19iQG"},"source":["Review the model signatures and methods."]},{"cell_type":"code","execution_count":0,"metadata":{"colab":{},"colab_type":"code","id":"hQAmjZ81B8xm"},"outputs":[],"source":"for item in model.signatures.items():\n pp.pprint(item)"},{"cell_type":"markdown","metadata":{"colab_type":"text","id":"NmT39IX-9rkb"},"source":["### Pipeline Complete!\n","\n","In this example you created a TFX pipeline in a Colab notebook, using the InteractiveContext. Along the way you learned about each of the standard TFX components, but if the standard components don't meet all of your needs you can create your own custom components! Custom components will be covered in a later lesson."]}],"nbformat":4,"nbformat_minor":2,"metadata":{"language_info":{"name":"python","codemirror_mode":{"name":"ipython","version":3}},"orig_nbformat":2,"file_extension":".py","mimetype":"text/x-python","name":"python","npconvert_exporter":"python","pygments_lexer":"ipython3","version":3}} \ No newline at end of file +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "TFX Lab 1 – Pipeline in a Colab Notebook", + "provenance": [], + "private_outputs": true, + "collapsed_sections": [ + "qMj8ORjK27p9" + ] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "qMj8ORjK27p9", + "colab_type": "text" + }, + "source": [ + "##### Copyright © 2019 The TensorFlow Authors." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "7JwKPOmN2-15", + "colab_type": "code", + "colab": {} + }, + "source": [ + "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# https://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License." + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "23R0Z9RojXYW", + "colab_type": "text" + }, + "source": [ + "# TFX — Running a simple pipeline manually in a Colab Notebook\n", + "\n", + "### Running a simple pipeline manually in a Colab Notebook\n", + "This notebook demonstrates how to use Jupyter/Colab notebooks for TFX iterative development. Here, we walk through the Online News popularity dataset in an interactive notebook.\n", + "\n", + "Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.\n", + "\n", + "## Orchestration\n", + "\n", + "In a production deployment of TFX you will use an orchestrator such as Apache Airflow, Kubeflow, or Apache Beam. In an interactive notebook the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.\n", + "\n", + "## Metadata\n", + "\n", + "In a production deployment of TFX you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in the /tmp directory on the Jupyter notebook or Colab server." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2GivNBNYjb3b", + "colab_type": "text" + }, + "source": [ + "## Setup\n", + "First, we install the necessary packages, download data, import modules and set up paths.\n", + "\n", + "### Install TFX and TensorFlow\n", + "\n", + "> #### Note\n", + "> Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "bRbSCw-U-h_F", + "colab_type": "code", + "colab": {} + }, + "source": [ + "!pip install -q -U tensorflow==2.0.0 tfx==0.15.0rc0 pyarrow==0.14.1\n", + "!pip freeze" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Sywq031PC7lf", + "colab_type": "text" + }, + "source": [ + "It is necessary to restart your runtime (using `Runtime/Restart Runtime...` button here so new package versions can be picked up." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "N-ePgV0Lj68Q", + "colab_type": "text" + }, + "source": [ + "### Import packages\n", + "We import necessary packages, including standard TFX component classes." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "YIqpWK9efviJ", + "colab_type": "code", + "colab": {} + }, + "source": [ + "import os\n", + "import tempfile\n", + "import urllib\n", + "import pprint\n", + "\n", + "import tensorflow as tf\n", + "tf.get_logger().propagate = False\n", + "pp = pprint.PrettyPrinter()\n", + "\n", + "import tfx\n", + "from tfx.components.evaluator.component import Evaluator\n", + "from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen\n", + "from tfx.components.example_validator.component import ExampleValidator\n", + "from tfx.components.model_validator.component import ModelValidator\n", + "from tfx.components.pusher.component import Pusher\n", + "from tfx.components.schema_gen.component import SchemaGen\n", + "from tfx.components.statistics_gen.component import StatisticsGen\n", + "from tfx.components.trainer.component import Trainer\n", + "from tfx.components.transform.component import Transform\n", + "from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext\n", + "from tfx.proto import evaluator_pb2\n", + "from tfx.proto import pusher_pb2\n", + "from tfx.proto import trainer_pb2\n", + "from tfx.utils.dsl_utils import csv_input\n", + "\n", + "from tensorflow.core.example import example_pb2\n", + "from tensorflow_metadata.proto.v0 import anomalies_pb2\n", + "from tensorflow_metadata.proto.v0 import schema_pb2\n", + "from tensorflow_metadata.proto.v0 import statistics_pb2\n", + "\n", + "import tensorflow_transform as tft\n", + "import tensorflow_model_analysis as tfma\n", + "import tensorflow_data_validation as tfdv" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "jlhCYXop3vcd", + "colab_type": "text" + }, + "source": [ + "Check the versions" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "XZY7Pnoxmoe8", + "colab_type": "code", + "colab": {} + }, + "source": [ + "print('TensorFlow version: {}'.format(tf.__version__))\n", + "print('TFX version: {}'.format(tfx.__version__))" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "n2cMMAbSkGfX", + "colab_type": "text" + }, + "source": [ + "### Download example data\n", + "We download the sample dataset for use in our TFX pipeline. We're working with a variant of the [Online News Popularity](https://archive.ics.uci.edu/ml/datasets/online+news+popularity) dataset, which summarizes a heterogeneous set of features about articles published by Mashable in a period of two years. The goal is to predict how popular the article will be on social networks. Specifically, in the original dataset the objective was to predict the number of times each article will be shared on social networks. In this variant, the goal is to predict the article's popularity percentile. For example, if the model predicts a score of 0.7, then it means it expects the article to be shared more than 70% of all articles." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "BywX6OUEhAqn", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Download the example data.\n", + "DATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \\\n", + " 'online_news_popularity_for_course/online_news_popularity_for_course.csv'\n", + "_data_root = tempfile.mkdtemp(prefix='tfx-data')\n", + "_data_filepath = os.path.join(_data_root, \"data.csv\")\n", + "urllib.request.urlretrieve(DATA_PATH, _data_filepath)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "V5RjduOX4us-", + "colab_type": "text" + }, + "source": [ + "Take a quick look at the CSV file." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "Hqn4wST2Bex5", + "colab_type": "code", + "colab": {} + }, + "source": [ + "!head {_data_filepath}" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8ONIE_hdkPS4", + "colab_type": "text" + }, + "source": [ + "## Create the InteractiveContext\n", + "\n", + "An interactive context is used to provide global context when running a TFX pipeline in a notebook without using a runner or orchestrator such as Apache Airflow or Kubeflow. This style of development is only useful when developing the code for a pipeline, and cannot currently be used to deploy a working pipeline to production." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "0Rh6K5sUf9dd", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Here, we create an InteractiveContext using default parameters. This will\n", + "# use a temporary directory with an ephemeral ML Metadata database instance.\n", + "# To use your own pipeline root or database, the optional properties\n", + "# `pipeline_root` and `metadata_connection_config` may be passed to\n", + "# InteractiveContext.\n", + "context = InteractiveContext()" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HdQWxfsVkzdJ", + "colab_type": "text" + }, + "source": [ + "# Run TFX Components Interactively\n", + "\n", + "---\n", + "\n", + "In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain `ExecutionResult` objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "L9fwt9gQk3BR", + "colab_type": "text" + }, + "source": [ + "### The ExampleGen Component\n", + "In any ML development process the first step when starting code development is to ingest the training and test datasets. The `ExampleGen` component brings data into the TFX pipeline.\n", + "\n", + "Create an ExampleGen component and run it." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "PyXjuMt8f-9u", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Use the packaged CSV input data.\n", + "examples = csv_input(_data_root)\n", + "\n", + "# Brings data into the pipeline or otherwise joins/converts training data.\n", + "example_gen = CsvExampleGen(input_base=examples)\n", + "context.run(example_gen)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "0SXc2OGnDWz5", + "colab_type": "text" + }, + "source": [ + "The component's outputs include 2 artifacts: the training examples and the eval examples (by default, split 2/3 training, 1/3 eval):" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "XoFaWckTy8pL", + "colab_type": "code", + "colab": {} + }, + "source": [ + "for artifact in example_gen.outputs['examples'].get():\n", + " print(artifact.split, artifact.uri)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "pfzF2WObDqs6", + "colab_type": "text" + }, + "source": [ + "Take a peek at the output training examples to see what they look like.\n", + "\n", + "1. Get the URI of the output artifact representing the training examples, which is a directory\n", + "1. Get the list of files in this directory (all compressed TFRecord files), and create a `TFRecordDataset` to read these files\n", + "1. Iterate over the first record and decode it using a `TFExampleDecoder` to check the results" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "EEHi7dFLzZWP", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = example_gen.outputs['examples'].get()[0].uri\n", + "tfrecord_filenames = [os.path.join(train_uri, name)\n", + " for name in os.listdir(train_uri)]\n", + "dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\n", + "decoder = tfdv.TFExampleDecoder()\n", + "for tfrecord in dataset.take(1):\n", + " serialized_example = tfrecord.numpy()\n", + " example = decoder.decode(serialized_example)\n", + " pp.pprint(example)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "csM6BFhtk5Aa", + "colab_type": "text" + }, + "source": [ + "### The StatisticsGen Component\n", + "\n", + "The `StatisticsGen` component computes descriptive statistics for your dataset. The statistics that it generates can be visualized for review, and are used for example validation and to infer a schema.\n", + "\n", + "Create a StatisticsGen component and run it." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "MAscCCYWgA-9", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Computes statistics over data for visualization and example validation.\n", + "statistics_gen = StatisticsGen(\n", + " input_data=example_gen.outputs['examples'])\n", + "context.run(statistics_gen)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "j5LBVkeDEvZQ", + "colab_type": "text" + }, + "source": [ + "Again, let's take a peek at the output training artifact. Note that this time it is a TFRecord file containing a single record with a serialized `DatasetFeatureStatisticsList` protobuf:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "fHP1HKDc3EXY", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = statistics_gen.outputs['output'].get()[0].uri\n", + "tfrecord_filenames = [os.path.join(train_uri, name)\n", + " for name in os.listdir(train_uri)]\n", + "dataset = tf.data.TFRecordDataset(tfrecord_filenames)\n", + "for tfrecord in dataset.take(1):\n", + " serialized_example = tfrecord.numpy()\n", + " stats = statistics_pb2.DatasetFeatureStatisticsList()\n", + " stats.ParseFromString(serialized_example)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "GRNfT5_aFGJC", + "colab_type": "text" + }, + "source": [ + "The stats can be visualized using the `tfdv.visualize_statistics()` function (we will look at this in more detail in a subsequent lab)." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "3i_3ntm3FEcK", + "colab_type": "code", + "colab": {} + }, + "source": [ + "tfdv.visualize_statistics(stats)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HLKLTO9Nk60p", + "colab_type": "text" + }, + "source": [ + "### The SchemaGen Component\n", + "\n", + "The `SchemaGen` component generates a schema for your data based on the statistics from StatisticsGen. It tries to infer the data types of each of your features, and the ranges of legal values for categorical features.\n", + "\n", + "Create a SchemaGen component and run it." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "ygQvZ6hsiQ_J", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Generates schema based on statistics files.\n", + "infer_schema = SchemaGen(stats=statistics_gen.outputs['output'])\n", + "context.run(infer_schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kdtU3u01FR-2", + "colab_type": "text" + }, + "source": [ + "The generated artifact is just a `schema.pbtxt` containing a text representation of a `schema_pb2.Schema` protobuf:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "L6-tgKi6A_gK", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = infer_schema.outputs['output'].get()[0].uri\n", + "schema_filename = os.path.join(train_uri, \"schema.pbtxt\")\n", + "schema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,\n", + " message=schema_pb2.Schema())" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FaSgx5qIFelw", + "colab_type": "text" + }, + "source": [ + "It can be visualized using `tfdv.display_schema()` (we will look at this in more detail in a subsequent lab):" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "gycOsJIQFhi3", + "colab_type": "code", + "colab": {} + }, + "source": [ + "tfdv.display_schema(schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "V1qcUuO9k9f8", + "colab_type": "text" + }, + "source": [ + "### The ExampleValidator Component\n", + "\n", + "The `ExampleValidator` performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.\n", + "\n", + "Create an ExampleValidator component and run it." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "XRlRUuGgiXks", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs anomaly detection based on statistics and data schema.\n", + "validate_stats = ExampleValidator(\n", + " stats=statistics_gen.outputs['output'],\n", + " schema=infer_schema.outputs['output'])\n", + "context.run(validate_stats)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gWUDXqADFp5U", + "colab_type": "text" + }, + "source": [ + "The output artifact of the `ExampleValidator` is an `anomalies.pbtxt` file describing an `anomalies_pb2.Anomalies` protobuf:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "DMX0LCyHCKGH", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = validate_stats.outputs['output'].get()[0].uri\n", + "anomalies_filename = os.path.join(train_uri, \"anomalies.pbtxt\")\n", + "anomalies = tfx.utils.io_utils.parse_pbtxt_file(\n", + " file_name=anomalies_filename,\n", + " message=anomalies_pb2.Anomalies())" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Z-nMbzFbF22_", + "colab_type": "text" + }, + "source": [ + "This can be visualized using the `tfdv.display_anomalies()` function (we will look at this in more details in a subsequent lab). Did it find any anomalies?" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "IGfl7-8YF2Ej", + "colab_type": "code", + "colab": {} + }, + "source": [ + "tfdv.display_anomalies(anomalies)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "JPViEz5RlA36", + "colab_type": "text" + }, + "source": [ + "### The Transform Component\n", + "\n", + "The `Transform` component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.\n", + "\n", + "The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.\n", + "\n", + "Define some constants and functions for both the `Transform` component and the `Trainer` component. Define them in a Python module, in this case saved to disk using the `%%writefile` magic command since you are working in a notebook." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "FvZ6OFHDG2fe", + "colab_type": "code", + "colab": {} + }, + "source": [ + "_constants_module_file = 'online_news_constants.py'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "_GpU9-JNXw-_", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%%writefile {_constants_module_file}\n", + "\n", + "DENSE_FLOAT_FEATURE_KEYS = [\n", + " \"timedelta\", \"n_tokens_title\", \"n_tokens_content\",\n", + " \"n_unique_tokens\", \"n_non_stop_words\", \"n_non_stop_unique_tokens\",\n", + " \"n_hrefs\", \"n_self_hrefs\", \"n_imgs\", \"n_videos\", \"average_token_length\",\n", + " \"n_keywords\", \"kw_min_min\", \"kw_max_min\", \"kw_avg_min\", \"kw_min_max\",\n", + " \"kw_max_max\", \"kw_avg_max\", \"kw_min_avg\", \"kw_max_avg\", \"kw_avg_avg\",\n", + " \"self_reference_min_shares\", \"self_reference_max_shares\",\n", + " \"self_reference_avg_shares\", \"is_weekend\", \"global_subjectivity\",\n", + " \"global_sentiment_polarity\", \"global_rate_positive_words\",\n", + " \"global_rate_negative_words\", \"rate_positive_words\", \"rate_negative_words\",\n", + " \"avg_positive_polarity\", \"min_positive_polarity\", \"max_positive_polarity\",\n", + " \"avg_negative_polarity\", \"min_negative_polarity\", \"max_negative_polarity\",\n", + " \"title_subjectivity\", \"title_sentiment_polarity\", \"abs_title_subjectivity\",\n", + " \"abs_title_sentiment_polarity\"]\n", + "\n", + "VOCAB_FEATURE_KEYS = [\"data_channel\"]\n", + "\n", + "BUCKET_FEATURE_KEYS = [\"LDA_00\", \"LDA_01\", \"LDA_02\", \"LDA_03\", \"LDA_04\"]\n", + "\n", + "CATEGORICAL_FEATURE_KEYS = [\"weekday\"]\n", + "\n", + "# Categorical features are assumed to each have a maximum value in the dataset.\n", + "MAX_CATEGORICAL_FEATURE_VALUES = [6]\n", + "\n", + "#UNUSED: date, slug\n", + "\n", + "LABEL_KEY = \"n_shares_percentile\"\n", + "VOCAB_SIZE = 10\n", + "OOV_SIZE = 5\n", + "FEATURE_BUCKET_COUNT = 10\n", + "\n", + "def transformed_name(key):\n", + " return key + '_xf'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "XUYeCayFG7kH", + "colab_type": "text" + }, + "source": [ + "Now let's define a module containing the `preprocessing_fn()` function that we will pass to the `Transform` component:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "7uuWiQbOG9ki", + "colab_type": "code", + "colab": {} + }, + "source": [ + "_transform_module_file = 'online_news_transform.py'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "v3EIuVQnBfH7", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%%writefile {_transform_module_file}\n", + "\n", + "import tensorflow.compat.v2 as tf\n", + "tf.enable_v2_behavior()\n", + "\n", + "import tensorflow_transform as tft\n", + "from online_news_constants import *\n", + "\n", + "def preprocessing_fn(inputs):\n", + " \"\"\"tf.transform's callback function for preprocessing inputs.\n", + "\n", + " Args:\n", + " inputs: map from feature keys to raw not-yet-transformed features.\n", + "\n", + " Returns:\n", + " Map from string feature key to transformed feature operations.\n", + " \"\"\"\n", + " outputs = {}\n", + " for key in DENSE_FLOAT_FEATURE_KEYS:\n", + " # Preserve this feature as a dense float, setting nan's to the mean.\n", + " outputs[transformed_name(key)] = tft.scale_to_z_score(\n", + " _fill_in_missing(inputs[key]))\n", + "\n", + " for key in VOCAB_FEATURE_KEYS:\n", + " # Build a vocabulary for this feature.\n", + " outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(\n", + " _fill_in_missing(inputs[key]),\n", + " top_k=VOCAB_SIZE,\n", + " num_oov_buckets=OOV_SIZE)\n", + "\n", + " for key in BUCKET_FEATURE_KEYS:\n", + " outputs[transformed_name(key)] = tft.bucketize(\n", + " _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,\n", + " always_return_num_quantiles=False)\n", + "\n", + " for key in CATEGORICAL_FEATURE_KEYS:\n", + " outputs[transformed_name(key)] = _fill_in_missing(inputs[key])\n", + "\n", + " # How popular is this article?\n", + " outputs[transformed_name(LABEL_KEY)] = _fill_in_missing(inputs[LABEL_KEY])\n", + "\n", + " return outputs\n", + "\n", + "def _fill_in_missing(x):\n", + " \"\"\"Replace missing values in a SparseTensor.\n", + "\n", + " Fills in missing values of `x` with '' or 0, and converts to a dense tensor.\n", + "\n", + " Args:\n", + " x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1\n", + " in the second dimension.\n", + "\n", + " Returns:\n", + " A rank 1 tensor where missing values of `x` have been filled in.\n", + " \"\"\"\n", + " default_value = '' if x.dtype == tf.string else 0\n", + " return tf.squeeze(\n", + " tf.sparse.to_dense(\n", + " tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),\n", + " default_value),\n", + " axis=1)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "eeMVMafpHHX1", + "colab_type": "text" + }, + "source": [ + "Create and run the `Transform` component, referring to the files that were created above." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "jHfhth_GiZI9", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs transformations and feature engineering in training and serving.\n", + "transform = Transform(\n", + " input_data=example_gen.outputs['examples'],\n", + " schema=infer_schema.outputs['output'],\n", + " module_file=_transform_module_file)\n", + "context.run(transform)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_jbZO1ykHOeG", + "colab_type": "text" + }, + "source": [ + "The `Transform` component has 2 types of outputs:\n", + "* `transform_output` is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).\n", + "* `transformed_examples` represents the preprocessed training and evaluation data." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "j4UjersvAC7p", + "colab_type": "code", + "colab": {} + }, + "source": [ + "transform.outputs" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "wRFMlRcdHlQy", + "colab_type": "text" + }, + "source": [ + "Take a peek at the `transform_output` artifact: it points to a directory containing 3 subdirectories:" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "E4I-cqfQQvaW", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = transform.outputs['transform_output'].get()[0].uri\n", + "os.listdir(train_uri)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9374B4RpHzor", + "colab_type": "text" + }, + "source": [ + "The `transform_fn` subdirectory contains the actual preprocessing graph. The `metadata` subdirectory contains the schema of the original data. The `transformed_metadata` subdirectory contains the schema of the preprocessed data.\n", + "\n", + "Take a look at some of the transformed examples and check that they are indeed processed as intended." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "2zIepQhSQoPa", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = transform.outputs['transformed_examples'].get()[1].uri\n", + "tfrecord_filenames = [os.path.join(train_uri, name)\n", + " for name in os.listdir(train_uri)]\n", + "dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type=\"GZIP\")\n", + "decoder = tfdv.TFExampleDecoder()\n", + "for tfrecord in dataset.take(3):\n", + " serialized_example = tfrecord.numpy()\n", + " example = decoder.decode(serialized_example)\n", + " pp.pprint(example)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "OBJFtnl6lCg9", + "colab_type": "text" + }, + "source": [ + "### The Trainer Component\n", + "\n", + "The `Trainer` component trains models using TensorFlow.\n", + "\n", + "Create a Python module containing a `trainer_fn` function, which must return an estimator. If you prefer creating a Keras model, you can do so and then convert it to an estimator using `keras.model_to_estimator()`." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "d6QNYWc6PD_h", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Setup paths.\n", + "_trainer_module_file = 'online_news_trainer.py'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "CaFFTBBeB4wf", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%%writefile {_trainer_module_file}\n", + "\n", + "import tensorflow.compat.v2 as tf\n", + "tf.enable_v2_behavior()\n", + "\n", + "import tensorflow_model_analysis as tfma\n", + "import tensorflow_transform as tft\n", + "from tensorflow_transform.tf_metadata import schema_utils\n", + "\n", + "from online_news_constants import *\n", + "\n", + "\n", + "def transformed_names(keys):\n", + " return [transformed_name(key) for key in keys]\n", + "\n", + "\n", + "# Tf.Transform considers these features as \"raw\"\n", + "def _get_raw_feature_spec(schema):\n", + " return schema_utils.schema_as_feature_spec(schema).feature_spec\n", + "\n", + "\n", + "def _gzip_reader_fn(filenames):\n", + " \"\"\"Small utility returning a record reader that can read gzip'ed files.\"\"\"\n", + " return tf.data.TFRecordDataset(\n", + " filenames,\n", + " compression_type='GZIP')\n", + "\n", + "\n", + "def _build_estimator(config, hidden_units=None, warm_start_from=None):\n", + " \"\"\"Build an estimator for predicting the popularity of online news articles\n", + "\n", + " Args:\n", + " config: tf.estimator.RunConfig defining the runtime environment for the\n", + " estimator (including model_dir).\n", + " hidden_units: [int], the layer sizes of the DNN (input layer first)\n", + " warm_start_from: Optional directory to warm start from.\n", + "\n", + " Returns:\n", + " The estimator that will be used for training and eval.\n", + " \"\"\"\n", + " real_valued_columns = [\n", + " tf.feature_column.numeric_column(key, shape=())\n", + " for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)\n", + " ]\n", + " categorical_columns = [\n", + " tf.feature_column.categorical_column_with_identity(\n", + " key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)\n", + " for key in transformed_names(VOCAB_FEATURE_KEYS)\n", + " ]\n", + " categorical_columns += [\n", + " tf.feature_column.categorical_column_with_identity(\n", + " key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)\n", + " for key in transformed_names(BUCKET_FEATURE_KEYS)\n", + " ]\n", + " categorical_columns += [\n", + " tf.feature_column.categorical_column_with_identity(\n", + " key,\n", + " num_buckets=num_buckets,\n", + " default_value=0) for key, num_buckets in zip(\n", + " transformed_names(CATEGORICAL_FEATURE_KEYS),\n", + " MAX_CATEGORICAL_FEATURE_VALUES)\n", + " ]\n", + " return tf.estimator.DNNLinearCombinedRegressor(\n", + " config=config,\n", + " linear_feature_columns=categorical_columns,\n", + " dnn_feature_columns=real_valued_columns,\n", + " dnn_hidden_units=hidden_units or [100, 70, 50, 25],\n", + " warm_start_from=warm_start_from)\n", + "\n", + "\n", + "def _example_serving_receiver_fn(tf_transform_output, schema):\n", + " \"\"\"Build the serving in inputs.\n", + "\n", + " Args:\n", + " tf_transform_output: A TFTransformOutput.\n", + " schema: the schema of the input data.\n", + "\n", + " Returns:\n", + " Tensorflow graph which parses examples, applying tf-transform to them.\n", + " \"\"\"\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + " raw_feature_spec.pop(LABEL_KEY)\n", + "\n", + " raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(\n", + " raw_feature_spec, default_batch_size=None)\n", + " serving_input_receiver = raw_input_fn()\n", + "\n", + " transformed_features = tf_transform_output.transform_raw_features(\n", + " serving_input_receiver.features)\n", + "\n", + " return tf.estimator.export.ServingInputReceiver(\n", + " transformed_features, serving_input_receiver.receiver_tensors)\n", + "\n", + "\n", + "def _eval_input_receiver_fn(tf_transform_output, schema):\n", + " \"\"\"Build everything needed for the tf-model-analysis to run the model.\n", + "\n", + " Args:\n", + " tf_transform_output: A TFTransformOutput.\n", + " schema: the schema of the input data.\n", + "\n", + " Returns:\n", + " EvalInputReceiver function, which contains:\n", + " - Tensorflow graph which parses raw untransformed features, applies the\n", + " tf-transform preprocessing operators.\n", + " - Set of raw, untransformed features.\n", + " - Label against which predictions will be compared.\n", + " \"\"\"\n", + " # Notice that the inputs are raw features, not transformed features here.\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + "\n", + " raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(\n", + " raw_feature_spec, default_batch_size=None)\n", + " serving_input_receiver = raw_input_fn()\n", + "\n", + " features = serving_input_receiver.features.copy()\n", + " transformed_features = tf_transform_output.transform_raw_features(features)\n", + " \n", + " # NOTE: Model is driven by transformed features (since training works on the\n", + " # materialized output of TFT, but slicing will happen on raw features.\n", + " features.update(transformed_features)\n", + "\n", + " return tfma.export.EvalInputReceiver(\n", + " features=features,\n", + " receiver_tensors=serving_input_receiver.receiver_tensors,\n", + " labels=transformed_features[transformed_name(LABEL_KEY)])\n", + "\n", + "\n", + "def _input_fn(filenames, tf_transform_output, batch_size=200):\n", + " \"\"\"Generates features and labels for training or evaluation.\n", + "\n", + " Args:\n", + " filenames: [str] list of CSV files to read data from.\n", + " tf_transform_output: A TFTransformOutput.\n", + " batch_size: int First dimension size of the Tensors returned by input_fn\n", + "\n", + " Returns:\n", + " A (features, indices) tuple where features is a dictionary of\n", + " Tensors, and indices is a single Tensor of label indices.\n", + " \"\"\"\n", + " transformed_feature_spec = (\n", + " tf_transform_output.transformed_feature_spec().copy())\n", + "\n", + " dataset = tf.data.experimental.make_batched_features_dataset(\n", + " filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)\n", + "\n", + " transformed_features = dataset.make_one_shot_iterator().get_next()\n", + " # We pop the label because we do not want to use it as a feature while we're\n", + " # training.\n", + " return transformed_features, transformed_features.pop(\n", + " transformed_name(LABEL_KEY))\n", + "\n", + "\n", + "# TFX will call this function\n", + "def trainer_fn(hparams, schema):\n", + " \"\"\"Build the estimator using the high level API.\n", + " Args:\n", + " hparams: Holds hyperparameters used to train the model as name/value pairs.\n", + " schema: Holds the schema of the training examples.\n", + " Returns:\n", + " A dict of the following:\n", + " - estimator: The estimator that will be used for training and eval.\n", + " - train_spec: Spec for training.\n", + " - eval_spec: Spec for eval.\n", + " - eval_input_receiver_fn: Input function for eval.\n", + " \"\"\"\n", + " # Number of nodes in the first layer of the DNN\n", + " first_dnn_layer_size = 100\n", + " num_dnn_layers = 4\n", + " dnn_decay_factor = 0.7\n", + "\n", + " train_batch_size = 40\n", + " eval_batch_size = 40\n", + "\n", + " tf_transform_output = tft.TFTransformOutput(hparams.transform_output)\n", + "\n", + " train_input_fn = lambda: _input_fn(\n", + " hparams.train_files,\n", + " tf_transform_output,\n", + " batch_size=train_batch_size)\n", + "\n", + " eval_input_fn = lambda: _input_fn(\n", + " hparams.eval_files,\n", + " tf_transform_output,\n", + " batch_size=eval_batch_size)\n", + "\n", + " train_spec = tf.estimator.TrainSpec(\n", + " train_input_fn,\n", + " max_steps=hparams.train_steps)\n", + "\n", + " serving_receiver_fn = lambda: _example_serving_receiver_fn(\n", + " tf_transform_output, schema)\n", + "\n", + " exporter = tf.estimator.FinalExporter('online-news', serving_receiver_fn)\n", + " eval_spec = tf.estimator.EvalSpec(\n", + " eval_input_fn,\n", + " steps=hparams.eval_steps,\n", + " exporters=[exporter],\n", + " name='online-news-eval')\n", + "\n", + " run_config = tf.estimator.RunConfig(\n", + " save_checkpoints_steps=999, keep_checkpoint_max=1)\n", + "\n", + " run_config = run_config.replace(model_dir=hparams.serving_model_dir)\n", + "\n", + " estimator = _build_estimator(\n", + " # Construct layers sizes with exponetial decay\n", + " hidden_units=[\n", + " max(2, int(first_dnn_layer_size * dnn_decay_factor**i))\n", + " for i in range(num_dnn_layers)\n", + " ],\n", + " config=run_config,\n", + " warm_start_from=hparams.warm_start_from)\n", + "\n", + " # Create an input receiver for TFMA processing\n", + " receiver_fn = lambda: _eval_input_receiver_fn(\n", + " tf_transform_output, schema)\n", + "\n", + " return {\n", + " 'estimator': estimator,\n", + " 'train_spec': train_spec,\n", + " 'eval_spec': eval_spec,\n", + " 'eval_input_receiver_fn': receiver_fn\n", + " }" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "GnLjStUJIoos", + "colab_type": "text" + }, + "source": [ + "Create and run the `Trainer` component, passing it the file that we created above." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "429-vvCWibO0", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Uses user-provided Python function that implements a model using TensorFlow's\n", + "# Estimators API.\n", + "trainer = Trainer(\n", + " module_file=_trainer_module_file,\n", + " transformed_examples=transform.outputs['transformed_examples'],\n", + " schema=infer_schema.outputs['output'],\n", + " transform_output=transform.outputs['transform_output'],\n", + " train_args=trainer_pb2.TrainArgs(num_steps=10000),\n", + " eval_args=trainer_pb2.EvalArgs(num_steps=5000))\n", + "context.run(trainer)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ktJA8On9Iui7", + "colab_type": "text" + }, + "source": [ + "Take a peek at the trained model which was exported from `Trainer`." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "qDBZG9Oso-BD", + "colab_type": "code", + "colab": {} + }, + "source": [ + "train_uri = trainer.outputs['output'].get()[0].uri\n", + "serving_model_path = os.path.join(train_uri, 'serving_model_dir', 'export', 'online-news')\n", + "latest_serving_model_path = os.path.join(serving_model_path, max(os.listdir(serving_model_path)))\n", + "exported_model = tf.saved_model.load(latest_serving_model_path)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "KyT3ZVGCZWsj", + "colab_type": "code", + "colab": {} + }, + "source": [ + "exported_model.graph.get_operations()[:10] + [\"...\"]" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "XpF7caML7WLB", + "colab_type": "text" + }, + "source": [ + "## Analyze Training with TensorBoard\n", + "\n", + "Use [TensorBoard](https://www.tensorflow.org/tensorboard) to analyze the model training that was done in Trainer, and see how well our model trained." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "bjCXDSnX7mjQ", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%load_ext tensorboard" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "IGcJtyH87m68", + "colab_type": "code", + "colab": {} + }, + "source": [ + "%tensorboard --logdir {os.path.join(train_uri, 'serving_model_dir')}" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FmPftrv0lEQy", + "colab_type": "text" + }, + "source": [ + "### The Evaluator Component\n", + "\n", + "The `Evaluator` component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which `slices` are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is imporant in this particular use case or domain. The slice chosen for this example is `weekday`.\n", + "\n", + "Create and run an Evaluator component." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "Zjcx8g6mihSt", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Uses TFMA to compute a evaluation statistics over features of a model.\n", + "model_analyzer = Evaluator(\n", + " examples=example_gen.outputs['examples'],\n", + " model_exports=trainer.outputs['output'],\n", + " feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[\n", + " evaluator_pb2.SingleSlicingSpec(\n", + " column_for_slicing=['weekday'])\n", + " ]))\n", + "context.run(model_analyzer)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "M1PVK6IhI5uS", + "colab_type": "code", + "colab": {} + }, + "source": [ + "model_analyzer.outputs" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "SuKDt7398K6f", + "colab_type": "text" + }, + "source": [ + "Use the Evaluator results to generate model performance data which can be visualized. First create evaluation input data." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "zhir0gtOI615", + "colab_type": "code", + "colab": {} + }, + "source": [ + "import csv\n", + "BASE_DIR = tempfile.mkdtemp()\n", + "\n", + "reader = csv.DictReader(open(_data_filepath))\n", + "examples = []\n", + "for line in reader:\n", + " example = example_pb2.Example()\n", + " for feature in schema.feature:\n", + " key = feature.name\n", + " if len(line[key]) > 0:\n", + " if feature.type == schema_pb2.FLOAT:\n", + " example.features.feature[key].float_list.value[:] = [float(line[key])]\n", + " elif feature.type == schema_pb2.INT:\n", + " example.features.feature[key].int64_list.value[:] = [int(line[key])]\n", + " elif feature.type == schema_pb2.BYTES:\n", + " example.features.feature[key].bytes_list.value[:] = [line[key].encode('utf8')]\n", + " else:\n", + " if feature.type == schema_pb2.FLOAT:\n", + " example.features.feature[key].float_list.value[:] = []\n", + " elif feature.type == schema_pb2.INT:\n", + " example.features.feature[key].int64_list.value[:] = []\n", + " elif feature.type == schema_pb2.BYTES:\n", + " example.features.feature[key].bytes_list.value[:] = []\n", + " examples.append(example)\n", + "\n", + "TFRecord_file = os.path.join(BASE_DIR, 'train_data.rio')\n", + "with tf.io.TFRecordWriter(TFRecord_file) as writer:\n", + " for example in examples:\n", + " writer.write(example.SerializeToString())\n", + " writer.flush()\n", + " writer.close()\n", + "\n", + "!ls {TFRecord_file}" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "tu8wdXhh8f-6", + "colab_type": "text" + }, + "source": [ + "Run the analysis of a particular slice of data." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "fIRR31vI8kVR", + "colab_type": "code", + "colab": {} + }, + "source": [ + "def run_and_render(eval_model=None, slice_list=None, slice_idx=0):\n", + " \"\"\"Runs the model analysis and renders the slicing metrics\n", + "\n", + " Args:\n", + " eval_model: An instance of tf.saved_model saved with evaluation data\n", + " slice_list: A list of tfma.slicer.SingleSliceSpec giving the slices\n", + " slice_idx: An integer index into slice_list specifying the slice to use\n", + "\n", + " Returns:\n", + " A SlicingMetricsViewer object if in Jupyter notebook; None if in Colab.\n", + " \"\"\"\n", + " eval_result = tfma.run_model_analysis(eval_shared_model=eval_model,\n", + " data_location=TFRecord_file,\n", + " file_format='tfrecords',\n", + " slice_spec=slice_list,\n", + " output_path='sample_data',\n", + " extractors=None)\n", + " return tfma.view.render_slicing_metrics(eval_result, slicing_spec=slice_list[slice_idx] if slice_list else None)\n", + "\n", + "# Load the TFMA results for the first training run\n", + "# This will take a minute\n", + "eval_model_base_dir_0 = os.path.join(train_uri, 'eval_model_dir')\n", + "eval_model_dir_0 = os.path.join(eval_model_base_dir_0,\n", + " max(os.listdir(eval_model_base_dir_0)))\n", + "eval_shared_model_0 = tfma.default_eval_shared_model(\n", + " eval_saved_model_path=eval_model_dir_0)\n", + "\n", + "# Slice our data by the weekday feature\n", + "slices = [tfma.slicer.SingleSliceSpec(columns=['weekday'])]\n", + "\n", + "run_and_render(eval_model=eval_shared_model_0, slice_list=slices, slice_idx=0)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ebQUT3Tv8uLR", + "colab_type": "text" + }, + "source": [ + "Print the slicing metrics." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "ya3NKcxO8vjA", + "colab_type": "code", + "colab": {} + }, + "source": [ + "evaluation_uri = model_analyzer.outputs['output'].get()[0].uri\n", + "eval_result = tfma.load_eval_result(evaluation_uri)\n", + "\n", + "print('{}\\n\\nslicing_metrics:\\n'.format(eval_result))\n", + "\n", + "for metric in eval_result.slicing_metrics:\n", + " pp.pprint(metric)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Sxi3qGc884-J", + "colab_type": "text" + }, + "source": [ + "Examine the output data." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "NxMTME3I85ca", + "colab_type": "code", + "colab": {} + }, + "source": [ + "eval_path_uri = model_analyzer.outputs['output'].get()[0].uri\n", + "tfrecord_filenames = [os.path.join(eval_path_uri, name)\n", + " for name in os.listdir(eval_path_uri)]\n", + "pp.pprint(tfrecord_filenames)\n", + "dataset = tf.data.TFRecordDataset(tfrecord_filenames)\n", + "pp.pprint(dataset)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "76Mil-7FlF_y", + "colab_type": "text" + }, + "source": [ + "### The ModelValidator Component\n", + "\n", + "The `ModelValidator` component performs validation of your candidate model compared to the previously deployed model (if any) using criteria that you define, or to a baseline value. If the new model scores better than the previous model it will be \"blessed\" by ModelValidator, approving it for deployment." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "FXk1MA7sijCr", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Performs quality validation of a candidate model (compared to a baseline).\n", + "model_validator = ModelValidator(\n", + " examples=example_gen.outputs['examples'],\n", + " model=trainer.outputs['output'])\n", + "context.run(model_validator)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FVOkA2MA9FtN", + "colab_type": "text" + }, + "source": [ + "Examine the output of ModelValidator." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "U-si25tpAQ0q", + "colab_type": "code", + "colab": {} + }, + "source": [ + "model_validator.outputs" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "Z2RWZWD-AZ-u", + "colab_type": "code", + "colab": {} + }, + "source": [ + "blessing_uri = model_validator.outputs.blessing.get()[0].uri\n", + "!ls -l {blessing_uri}" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "T8DYekCZlHfj", + "colab_type": "text" + }, + "source": [ + "### The Pusher Component\n", + "\n", + "The `Pusher` component checks whether a model has been \"blessed\", and if so, deploys it to production by pushing the model to a well known file destination." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "KvVasBxePW-n", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Setup serving path\n", + "_serving_model_dir = os.path.join(tempfile.mkdtemp(),\n", + " 'serving_model/online_news_simple')" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "7nDI_54N9Sbk", + "colab_type": "text" + }, + "source": [ + "Create and run a Pusher component." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "r45nQ69eikc9", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Checks whether the model passed the validation steps and pushes the model\n", + "# to a file destination if check passed.\n", + "pusher = Pusher(\n", + " model_export=trainer.outputs['output'],\n", + " model_blessing=model_validator.outputs['blessing'],\n", + " push_destination=pusher_pb2.PushDestination(\n", + " filesystem=pusher_pb2.PushDestination.Filesystem(\n", + " base_directory=_serving_model_dir)))\n", + "context.run(pusher)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "fj6imIJx9YGD", + "colab_type": "text" + }, + "source": [ + "Examine the output of Pusher." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "gNvMj9AWsmSt", + "colab_type": "code", + "colab": {} + }, + "source": [ + "pusher.outputs" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "4bphCjS-B4vv", + "colab_type": "code", + "colab": {} + }, + "source": [ + "push_uri = pusher.outputs.model_push.get()[0].uri\n", + "latest_version = max(os.listdir(push_uri))\n", + "latest_version_path = os.path.join(push_uri, latest_version)\n", + "model = tf.saved_model.load(latest_version_path)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "e7yifs_19iQG", + "colab_type": "text" + }, + "source": [ + "Review the model signatures and methods." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "hQAmjZ81B8xm", + "colab_type": "code", + "colab": {} + }, + "source": [ + "for item in model.signatures.items():\n", + " pp.pprint(item)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NmT39IX-9rkb", + "colab_type": "text" + }, + "source": [ + "### Pipeline Complete!\n", + "\n", + "In this example you created a TFX pipeline in a Colab notebook, using the InteractiveContext. Along the way you learned about each of the standard TFX components, but if the standard components don't meet all of your needs you can create your own custom components! Custom components will be covered in a later lesson." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "4-UPMzyDZDps", + "colab_type": "code", + "colab": {} + }, + "source": [ + "" + ], + "execution_count": 0, + "outputs": [] + } + ] +} \ No newline at end of file