From 5f406d4da3bac4031a9593a22842479b5921d085 Mon Sep 17 00:00:00 2001 From: Jingyu Shao <62282696+jingyu-shao@users.noreply.github.com> Date: Tue, 17 Mar 2020 15:49:32 -0700 Subject: [PATCH 1/3] Add ML Metadata demo via upload --- tfx_labs/ML_Metadata_Demo.ipynb | 1966 +++++++++++++++++++++++++++++++ 1 file changed, 1966 insertions(+) create mode 100644 tfx_labs/ML_Metadata_Demo.ipynb diff --git a/tfx_labs/ML_Metadata_Demo.ipynb b/tfx_labs/ML_Metadata_Demo.ipynb new file mode 100644 index 00000000..3e765cb0 --- /dev/null +++ b/tfx_labs/ML_Metadata_Demo.ipynb @@ -0,0 +1,1966 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "ML Metadata Demo.ipynb", + "provenance": [ + { + "file_id": "1ZY7wMHZfxUw_9Rcd59dioJjsdkpD13CL", + "timestamp": 1544208251666 + }, + { + "file_id": "/piper/depot/google3/third_party/ml_metadata/google/demo/ML_Metadata_Demo.ipynb?workspaceId=martinz:martinzv-martinz-datum-metadata_swig_khaas-git5::citc", + "timestamp": 1544205632493 + }, + { + "file_id": "/piper/depot/google3/experimental/users/martinz/ML_Metadata_Demo.ipynb?workspaceId=martinz:martinzv-martinz-datum-metadata_swig_khaas-git5::citc", + "timestamp": 1544194187418 + }, + { + "file_id": "1fKKctiBMLTCoHMoRSwYYlouIvdT1zy7e", + "timestamp": 1544194098532 + } + ], + "collapsed_sections": [], + "last_runtime": { + "build_target": "//intelligence/datum/prensor/colab:prensor_notebook", + "kind": "shared" + } + }, + "kernelspec": { + "display_name": "Python 2", + "name": "python2" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "WeHoha1Xv3_o", + "colab_type": "text" + }, + "source": [ + "# Part 1: Basic Usages of ml.Metadata\n", + "\n", + "Instructions to play with the demo\n", + "* Step 1: copy this notebook.\n", + "* Step 2: Connect to ML Metadata Demo by tfx-dev \n", + ">* Step 2, Option 2: if this doesn't work, join the MDB group tfx-dev.\n", + ">* Step 2, Option 3: If this doesn't work, run your own instance (for 20 instances).\n", + ">>* Patch CL/223555619\n", + ">>* change user in third_party/ml_metadata/google/demo/colab_pool.borg\n", + ">>* blaze build -c opt third_party/ml_metadata/google/demo:notebook.par\n", + ">>* borgcfg third_party/ml_metadata/google/demo/colab_pool.borg reload\n", + "\n", + "\n", + "\n", + "\n", + "* Step 3: Run the code blocks below." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sBySs6_XwXBf", + "colab_type": "text" + }, + "source": [ + "## Import Packages" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "2s-RFSxVv2zn", + "colab_type": "code", + "colab": {} + }, + "source": [ + "from colabtools import adhoc_import\n", + "from google3.file.base import pywrapfile\n", + "from google3.file.recordio.python import recordio\n", + "import tensorflow_data_validation as tfdv\n", + "import os\n", + "import time\n", + "\n", + "from google3.third_party.ml_metadata.metadata_store import metadata_store\n", + "from google3.third_party.ml_metadata.proto import metadata_store_pb2\n", + "from google3.third_party.tensorflow_metadata.proto.v0 import statistics_pb2\n", + "from google3.third_party.tensorflow_metadata.proto.v0 import schema_pb2" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "iewNXtWiwlTF", + "colab_type": "text" + }, + "source": [ + "## Interaction with a Metadata Store\n", + "### [ConnectionConfig](https://cs.corp.google.com/piper///depot/google3/third_party/ml_metadata/proto/metadata_store.proto?rcl=224366994&l=219) provides the options to use a list of physical backend for storing the metadata in a **MetadataStore**.\n", + "- Fake (in memory db)\n", + "- SQLite (db file)\n", + "- MySql (db server)" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "xa22tvSfw0WW", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Use Connection Config to create a store\n", + "connection_config = metadata_store_pb2.ConnectionConfig()\n", + "connection_config.fake_database.SetInParent()\n", + "store = metadata_store.MetadataStore(connection_config)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "WXE_6VnHw-10", + "colab_type": "text" + }, + "source": [ + "### [MetadataStore](https://cs.corp.google.com/piper///depot/google3/third_party/ml_metadata/metadata_store/metadata_store.py?rcl=224393676&l=34) contains a list of APIs to create and manipulate metadata. The main concepts are:\n", + "- Types: \n", + " * It defines the concepts of possible things in a pipeline, such as components and generated files.\n", + " * One can define **ArtifactType** to describe a set of files, for instance, in the current TFX:\n", + " - Data can be viewed as a type of Artifact with properties such as _span_, _split_, _version_ \n", + " - Stats, Schema are other types of Artifact. \n", + " * Similarly, one can define **ExecutionType** to describe a set of similar component's run, e.g.,:\n", + " - StatsGen's run is a type of Execution, always uses some file in Data type and generates files in Stats type\n", + " - DataValidator may run in a mode of generating a file in Schema type by looking at a Stats typed file. In addition it can run in a mode to validate a Stats typed file by given a Schema typed file, and generates Anomaly typed file. \n", + " * with the type associated with a pipeline artifact, it is more than a file stored as PPPs. The pipeline's description and execution history can be captured in a structured way, and used for later purposes, such as provenance tracking, analyzing errors in a run, and even auditing against policies. \n", + "- **Artifact** and **Execution**:\n", + " * with the types, when the pipeline runs, the actual files, and component runs happen. \n", + " * Metadata Store allows to ingest the component runs' history, generated files including their physical locations, and related properties.\n", + " * It provides transactional methods and allows orchestration relies on the atomicity of Metadata Store operatoins.\n", + "- **Event**\n", + " * describes the relationships between Artifacts and Executions, such as Input and Output.\n", + " \n", + " \n", + "Next code block, we illustrate how to create and query those major concepts in the metadata store. " + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ZmHsGMJQxOv8", + "colab_type": "text" + }, + "source": [ + "### a) Before pipeline run, register artifact and execution types " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "bQbt6GaNxUlK", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# Create ArtifactTypes, e.g., Data and Schema\n", + "data_type = metadata_store_pb2.ArtifactType()\n", + "data_type.name = \"DataSet\"\n", + "data_type.properties[\"span\"] = metadata_store_pb2.INT\n", + "data_type.properties[\"split\"] = metadata_store_pb2.STRING\n", + "data_type.properties[\"version\"] = metadata_store_pb2.INT\n", + "data_type_id = store.put_artifact_type(data_type)\n", + "\n", + "stats_type = metadata_store_pb2.ArtifactType()\n", + "stats_type.name = \"Statistics\"\n", + "stats_type.properties[\"state\"] = metadata_store_pb2.STRING\n", + "stats_type_id = store.put_artifact_type(stats_type)\n", + "\n", + "# Create ExecutionTpye, e.g., StatsGen\n", + "statsgen_type = metadata_store_pb2.ExecutionType()\n", + "statsgen_type.name = \"StatsGen\"\n", + "statsgen_type.properties[\"state\"] = metadata_store_pb2.STRING\n", + "statsgen_type_id = store.put_execution_type(statsgen_type)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "rUQ4O6Xuxeqo", + "colab_type": "text" + }, + "source": [ + "### b) During pipeline run, track component run status, generated artifacts, and their lineage\n", + "1) Let's prepare some data first. **BE SURE TO CHANGE THE `suffix` variable below to your LDAP**:" + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "RGV00x0yErCV", + "outputId": "3f6d90bc-cc24-4034-9b4c-fa03f9e43dfc", + "executionInfo": { + "status": "ok", + "timestamp": 1544235382341, + "user_tz": 480, + "elapsed": 10341, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 52 + } + }, + "source": [ + "suffix = 'test_sandbox' # 'martinz_2' # Add your LDAP here, or your_ldap_2 if you want to start fresh\n", + "# Prepare data\n", + "BASE_DIR = \"/tmp/ml_metadata_demo_\" + suffix + \"/\"\n", + "train_path = BASE_DIR + \"train\"\n", + "test_path = BASE_DIR + \"test_path\"\n", + "\n", + "!fileutil mkdir -p $BASE_DIR\n", + "!fileutil cp -f /cns/is-d/home/mingzhong/bug_party/training_10k.tfrecord $train_path\n", + "!fileutil cp -f /cns/is-d/home/mingzhong/bug_party/test.tfrecord $test_path" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "training_10k.tfrecord 100% |Goooooooooooogle| 8.90M 13.03M/s Time: 00:00:00\n", + "test.tfrecord 100% |Goooooooooooooooooooogle| 887.7K 2.99M/s Time: 00:00:00\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "6wc891niyA9d", + "colab_type": "text" + }, + "source": [ + "2) Let's track these two train/test files in the metadata store using the Data type we defined" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "yMv2DrcsyNaG", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# During a StatsGen run, tracking the input/output files.\n", + "def publish_data_artifact(store, uri, span, split, version):\n", + " data_artifact = metadata_store_pb2.Artifact()\n", + " data_artifact.uri = uri\n", + " data_artifact.properties[\"span\"].int_value = span\n", + " data_artifact.properties[\"split\"].string_value = split\n", + " data_artifact.properties[\"version\"].int_value = version\n", + " data_artifact.type_id = data_type_id\n", + " [artifact_id] = store.put_artifacts([data_artifact])\n", + " return artifact_id\n", + "\n", + "train_data_id = publish_data_artifact(store, train_path, 0, \"TRAIN\", 0)\n", + "train_id = train_data_id # TODO: remove this\n", + "test_data_id = publish_data_artifact(store, test_path, 0, \"TEST\", 0)\n", + "test_id = test_data_id # TODO: remove this" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "A0bzialZyhZG", + "colab_type": "text" + }, + "source": [ + "3) Let's check whether the two artifacts are actually stored properly\n", + "\n", + "_Note_: the metadata store APIs returns stored metadata model as protos defined in [metadata_store.proto](https://cs.corp.google.com/piper///depot/google3/third_party/ml_metadata/proto/metadata_store.proto)." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "JPaR9a4Zytm6", + "colab_type": "code", + "outputId": "daa3514f-0e56-451e-e4cd-ca687501d1cf", + "executionInfo": { + "status": "ok", + "timestamp": 1544235383954, + "user_tz": 480, + "elapsed": 680, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 760 + } + }, + "source": [ + "all_artifacts = store.get_artifacts()\n", + "print all_artifacts" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "[id: 1\n", + "type_id: 1\n", + "uri: \"/tmp/ml_metadata_demo_test_sandbox/train\"\n", + "properties {\n", + " key: \"span\"\n", + " value {\n", + " int_value: 0\n", + " }\n", + "}\n", + "properties {\n", + " key: \"split\"\n", + " value {\n", + " string_value: \"TRAIN\"\n", + " }\n", + "}\n", + "properties {\n", + " key: \"version\"\n", + " value {\n", + " int_value: 0\n", + " }\n", + "}\n", + ", id: 2\n", + "type_id: 1\n", + "uri: \"/tmp/ml_metadata_demo_test_sandbox/test_path\"\n", + "properties {\n", + " key: \"span\"\n", + " value {\n", + " int_value: 0\n", + " }\n", + "}\n", + "properties {\n", + " key: \"split\"\n", + " value {\n", + " string_value: \"TEST\"\n", + " }\n", + "}\n", + "properties {\n", + " key: \"version\"\n", + " value {\n", + " int_value: 0\n", + " }\n", + "}\n", + "]\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "h3pPD5FqyyJ9", + "colab_type": "text" + }, + "source": [ + "4) Next let's run the StatsGen component using tfdv, and we illustrate metadata ingestions calls for future components. Please take a closer look at the inline comments in the following code blocks. " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "-epgr7aFy4Z5", + "colab_type": "code", + "colab": {} + }, + "source": [ + "# a) To start the component run, the caller (orchestration engine) use the \n", + "# metadata store to get the location of the artifact. \n", + "[training_data] = store.get_artifacts_by_id([train_data_id])\n", + "\n", + "# b) In this case, the tfdv statsgen works on the training data. We find it from \n", + "# the metadata store, and passes its location(uri) to tfdv statsgen.\n", + "stats_file = tfdv.generate_statistics_from_tfrecord(training_data.uri)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "bJ7enrMNy8hx", + "colab_type": "code", + "outputId": "97cfbf1d-7c88-4945-d0ea-81e5fe1d1563", + "executionInfo": { + "status": "ok", + "timestamp": 1544235439046, + "user_tz": 480, + "elapsed": 344, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 276 + } + }, + "source": [ + "# c) When component publishes its Artifact (`stats_filepath`), the underline \n", + "# implementation and uses the metadata store to publish the file as an \n", + "# and Artifact, so that it will be visiable to the downstream components.\n", + "def publish_stats(statsgen_output, file_uri, user_properties):\n", + " # when the output is ready, create a unpublished artifact\n", + " stats_artifact = metadata_store_pb2.Artifact()\n", + " stats_artifact.uri = file_uri\n", + " stats_artifact.type_id = stats_type_id\n", + " for name, value in user_properties.items():\n", + " stats_artifact.custom_properties[name].string_value = value\n", + " stats_artifact.properties[\"state\"].string_value = \"UNPUBLISHED\"\n", + " # register it to database, and then write to file \n", + " [stats_artifact_id] = store.put_artifacts([stats_artifact])\n", + " # writing to disk, so even if it fails, the file can still be GCed\n", + " with recordio.RecordWriter(file_uri, \"a\") as output_file:\n", + " output_file.WriteRecord(statsgen_output.SerializeToString())\n", + " # once it finishes writing to disk, we update its status to COMPLETE so that\n", + " # the following components can use it now. \n", + " stats_artifact.id = stats_artifact_id\n", + " stats_artifact.properties[\"state\"].string_value = \"COMPLETE\"\n", + " return store.put_artifacts([stats_artifact])[0]\n", + " \n", + "# Note while publishing more user properties can be attached. \n", + "train_stats_id = publish_stats(stats_file, BASE_DIR + \"train_stats.pbtxt\", \\\n", + " {\"comment\": \"generated status for demo day\"})\n", + "\n", + "# Let's check the stored artifact\n", + "store.get_artifacts_by_id([train_stats_id])" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[id: 3\n", + " type_id: 2\n", + " uri: \"/tmp/ml_metadata_demo_test_sandbox/train_stats.pbtxt\"\n", + " properties {\n", + " key: \"state\"\n", + " value {\n", + " string_value: \"COMPLETE\"\n", + " }\n", + " }\n", + " custom_properties {\n", + " key: \"comment\"\n", + " value {\n", + " string_value: \"generated status for demo day\"\n", + " }\n", + " }]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 30 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "RvA7RDv704pG", + "colab_type": "text" + }, + "source": [ + "5) In addition to keeping track of artifacts using the metadata store, the runs of components can be captured by Executions. Futhermore, the Input/Output lineage of component runs and their dependent artifacts can be captured by Events in the metadata. We still use the above StatsGen example to show how to use MetadataStore APIs to connect the dots." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "Fe_yeGZ6062k", + "colab_type": "code", + "outputId": "a7b652fd-7c51-41f9-99f1-53bd88123899", + "executionInfo": { + "status": "ok", + "timestamp": 1544235439984, + "user_tz": 480, + "elapsed": 623, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 674 + } + }, + "source": [ + "# Tracking runs as executions in MetadataStore\n", + "# Illustrating possible state transitions in a component run. \n", + "def component_run_with_metadata(execution_type_id, input_id, output_id):\n", + " # 1. component begins, register the run in the metadata store\n", + " component_run = metadata_store_pb2.Execution()\n", + " component_run.type_id = execution_type_id;\n", + " component_run.properties[\"state\"].string_value = \"RUNNING\"\n", + " [run_id] = store.put_executions([component_run])\n", + " # 2. declare the artifact will be processed in the run\n", + " input_event = metadata_store_pb2.Event()\n", + " input_event.artifact_id = input_id\n", + " input_event.execution_id = run_id \n", + " input_event.type = metadata_store_pb2.Event.DECLARED_INPUT\n", + " store.put_events([input_event]) \n", + " # 3. component starts \n", + " # ... finished reading the artifact from the storage\n", + " # then it marks it as real input, and start processing\n", + " input_event.type = metadata_store_pb2.Event.INPUT\n", + " store.put_events([input_event]) \n", + " # ... processing the input\n", + " # ... almost finished, before writing to disk\n", + " # it declares an output, first create Artifact, get its `output_id`\n", + " output_event = metadata_store_pb2.Event()\n", + " output_event.artifact_id = output_id\n", + " output_event.execution_id = run_id \n", + " output_event.type = metadata_store_pb2.Event.DECLARED_OUTPUT\n", + " store.put_events([output_event]) \n", + " # 4. component publishes the output\n", + " # ... write to disk\n", + " # then change output artifact to COMPLETE, and add an output event\n", + " output_event.type = metadata_store_pb2.Event.OUTPUT\n", + " store.put_events([output_event]) \n", + " # 5. component finishes, updated its Executionn\n", + " component_run.id = run_id\n", + " component_run.properties[\"state\"].string_value = \"COMPLETED\"\n", + " return store.put_executions([component_run])[0]\n", + "\n", + "\n", + "run_id = component_run_with_metadata(statsgen_type_id, train_data_id, train_stats_id)\n", + "print \"The StatsGen Run in MetadataStore: \\n\"\n", + "print store.get_executions_by_id([run_id])\n", + "print \"\\nThe Associated Events of that StatsGen Run: \\n\"\n", + "for e in store.get_events_by_execution_ids([run_id]):\n", + " print e\n", + " print " + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "The StatsGen Run in MetadataStore: \n", + "\n", + "[id: 1\n", + "type_id: 3\n", + "properties {\n", + " key: \"state\"\n", + " value {\n", + " string_value: \"COMPLETED\"\n", + " }\n", + "}\n", + "]\n", + "\n", + "The Associated Events of that StatsGen Run: \n", + "\n", + "artifact_id: 1\n", + "execution_id: 1\n", + "type: DECLARED_INPUT\n", + "milliseconds_since_epoch: 1544235439831\n", + "\n", + "\n", + "artifact_id: 1\n", + "execution_id: 1\n", + "type: INPUT\n", + "milliseconds_since_epoch: 1544235439831\n", + "\n", + "\n", + "artifact_id: 3\n", + "execution_id: 1\n", + "type: DECLARED_OUTPUT\n", + "milliseconds_since_epoch: 1544235439831\n", + "\n", + "\n", + "artifact_id: 3\n", + "execution_id: 1\n", + "type: OUTPUT\n", + "milliseconds_since_epoch: 1544235439831\n", + "\n", + "\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HaQnLTuM0Gdj", + "colab_type": "text" + }, + "source": [ + "# Part 2: ML Metadata in the Chicago Taxi notebook\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "h6ymxjfF6oIY", + "colab_type": "text" + }, + "source": [ + "First, **you need to run the code above before running the code below**.\n", + "\n", + "\n", + "We begin with a few methods that represent a barebones orchestration system. Here, executions have one input and output. We have also defined some basic types. " + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "-Pg1QEorthO7", + "colab": {} + }, + "source": [ + "# This cell basically defines a very primitive orchestration system, where\n", + "# executions have one input and output.\n", + "# decent support for RecordIO proto artifacts.\n", + "\n", + "# Notice that this cell is defining the types in the store. You can define\n", + "# whatever types with whatever properties you want.\n", + "\n", + "def create_metadata_store():\n", + " \"\"\"Make a fake, local metadata store.\"\"\"\n", + " connection_config = metadata_store_pb2.ConnectionConfig()\n", + " connection_config.fake_database.SetInParent()\n", + " return metadata_store.MetadataStore(connection_config)\n", + "\n", + "def get_schema_type(store):\n", + " \"\"\"Gets the schema type ID, or creates one if it doesn't exist.\"\"\"\n", + " artifact_type = metadata_store_pb2.ArtifactType()\n", + " artifact_type.name = \"Schema\"\n", + " artifact_type.properties[\"version\"] = metadata_store_pb2.INT\n", + " return store.put_artifact_type(artifact_type)\n", + "\n", + "def get_data_type(store):\n", + " \"\"\"Gets the data type ID, or creates one if it doesn't exist.\"\"\"\n", + " artifact_type = metadata_store_pb2.ArtifactType()\n", + " artifact_type.name = \"Data\"\n", + " artifact_type.properties[\"span\"] = metadata_store_pb2.INT\n", + " artifact_type.properties[\"split\"] = metadata_store_pb2.STRING\n", + " artifact_type.properties[\"version\"] = metadata_store_pb2.INT\n", + " return store.put_artifact_type(artifact_type)\n", + "\n", + "def get_stats_type(store):\n", + " \"\"\"Gets the stats type ID, or creates one if it doesn't exist.\"\"\"\n", + " artifact_type = metadata_store_pb2.ArtifactType()\n", + " artifact_type.name = \"Stats\"\n", + " return store.put_artifact_type(artifact_type)\n", + "\n", + "def get_stats_gen_type(store):\n", + " \"\"\"Gets the type of a Stats execution.\"\"\"\n", + " execution_type = metadata_store_pb2.ExecutionType()\n", + " execution_type.name = \"Stats\"\n", + " return store.put_execution_type(execution_type)\n", + "\n", + "def get_infer_schema_type(store):\n", + " \"\"\"Gets the type of a Stats execution.\"\"\"\n", + " execution_type = metadata_store_pb2.ExecutionType()\n", + " execution_type.name = \"InferSchema\"\n", + " return store.put_execution_type(execution_type)\n", + "\n", + "def get_stats_gen_execution(store):\n", + " \"\"\"Returns a local stats gen execution object.\n", + " \n", + " The result can be put in the database, or used in is_already_run below.\n", + " \"\"\"\n", + " execution = metadata_store_pb2.Execution()\n", + " execution.type_id = get_stats_gen_type(store)\n", + " return execution\n", + "\n", + "def get_infer_schema_execution(store):\n", + " \"\"\"Returns a local stats gen execution object.\n", + " \n", + " The result can be put in the database, or used in is_already_run below.\n", + " \"\"\"\n", + " execution = metadata_store_pb2.Execution()\n", + " execution.type_id = get_infer_schema_type(store)\n", + " return execution\n", + "\n", + "\n", + "##### A light API on top of metadata store. ####################################\n", + "\n", + "def put_event(store, execution_id, artifact_id, is_input):\n", + " \"\"\"Commits a single event to the repository.\"\"\"\n", + " event = metadata_store_pb2.Event()\n", + " event.artifact_id = artifact_id\n", + " event.execution_id = execution_id\n", + " event.type = metadata_store_pb2.Event.DECLARED_INPUT if is_input else metadata_store_pb2.Event.DECLARED_OUTPUT\n", + " store.put_events([event])\n", + "\n", + "def publish_execution(store, execution, input_artifact_ids, output_artifact_ids):\n", + " [execution_id] = store.put_executions([execution])\n", + " # This can also be done as a single transaction.\n", + " # Note: paths on inputs and outputs are coming soon!\n", + " # Exercise: have this method call put_events(...) once, so that all events\n", + " # are completed as a single transaction.\n", + " for x in input_artifact_ids:\n", + " put_event(store, execution_id, x, True)\n", + " for x in output_artifact_ids:\n", + " put_event(store, execution_id, x, False)\n", + "\n", + "\n", + "# An example of how to use provenance to drive orchestration.\n", + "def get_old_execution_id_or_none(store, execution, input_artifact_id):\n", + " \"\"\"Test if something was already run.\"\"\"\n", + " # Find events with input_artifact_id.\n", + " events = store.get_events_by_artifact_ids([input_artifact_id])\n", + " # Get the events where it was an official input.\n", + " input_events = filter(lambda e:e.type==metadata_store_pb2.Event.DECLARED_INPUT, events)\n", + " if not input_events:\n", + " return None\n", + " # Get the executions corresponding to those inputs.\n", + " old_executions = store.get_executions_by_id([e.execution_id for e in input_events])\n", + "\n", + " for old in old_executions:\n", + " # Also a good idea to check that the properties are equal.\n", + " if old.type_id == execution.type_id:\n", + " return old.id\n", + " return None\n", + "\n", + "\n", + "# Tests if an execution has already been run.\n", + "def is_already_run(store, execution, input_artifact_id):\n", + " \"\"\"Test if something was already run.\"\"\"\n", + " return get_old_execution_id_or_none(store, execution, input_artifact_id) is not None\n", + "\n", + "# Get outputs of a previous execution.\n", + "def get_outputs(store, execution, input_artifact_id):\n", + " \"\"\"Find any outputs from an earlier, similar execution.\"\"\"\n", + " old_execution_id = get_old_execution_id_or_none(store, execution, input_artifact_id) is not None\n", + " if old_execution_id is None:\n", + " return None\n", + "\n", + " # Find events with input_artifact_id.\n", + " events = store.get_events_by_execution_ids([old_execution_id])\n", + " # Get the events where it was an official output.\n", + " output_events = filter(lambda e:e.type==metadata_store_pb2.Event.DECLARED_OUTPUT, events)\n", + " return [e.artifact_id for e in output_events]\n", + "\n", + "\n", + "\n", + "#### A fake stats execution.\"\"\"\"\"\"\n", + "\n", + "def run_fake_stats(store, data_artifact_id):\n", + " \"\"\"A fake stats run.\"\"\"\n", + " stats_gen_type = get_stats_gen_type(store)\n", + " [data_artifact] = store.get_artifacts_by_id([data_artifact_id])\n", + "\n", + " stats_artifact = metadata_store_pb2.Artifact()\n", + " stats_artifact.type_id = get_stats_type(store)\n", + " stats_gen_execution = get_stats_gen_execution(store)\n", + " [stats_artifact_id] = store.put_artifacts([stats_artifact])\n", + " publish_execution(store, get_stats_gen_execution(store), [data_artifact_id],\n", + " [stats_artifact_id])\n", + " return stats_artifact_id\n", + "\n", + "\n", + "\n", + "def pretty_print_artifact(artifact, type_names):\n", + " result = (\"{Type: \" + type_names[artifact.type_id] + \n", + " \", ID: \" + str(artifact.id) +\n", + " \" uri: \" + artifact.uri + \"{\")\n", + " \n", + " for k,v in artifact.properties.items():\n", + " if v.HasField(\"int_value\"):\n", + " result += k + \":\" + str(v.int_value) + \",\"\n", + " if v.HasField(\"string_value\"):\n", + " result += k + \":\" + v.string_value + \",\"\n", + " result += \"}}\"\n", + "\n", + "def display_all_artifacts(store):\n", + " all_artifacts = store.get_artifacts()\n", + "\n", + " # This serves as a cache: we could also query the database directly\n", + " # for the names of the types of the artifacts.\n", + " type_names = {get_schema_type(store):\"Schema\",\n", + " get_stats_type(store):\"Stats\",\n", + " get_data_type(store):\"Data\"}\n", + "\n", + "\n", + " # Displays all the artifacts.\n", + " for artifact in all_artifacts:\n", + " pretty_print_artifact(artifact, type_names)\n", + "\n", + "def display_all_executions(store):\n", + " for execution in store.get_executions():\n", + " print(execution)\n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "Pe1wauMv5u8d" + }, + "source": [ + "We can also introduce methods for reading and writing various types of artifacts. These can be either general (as in publish_proto and get_proto) or specific (as in publish_stats or get_schema)." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ExBj_NAy0EEN", + "colab": {} + }, + "source": [ + "# Here are some methods for publishing and getting protos from artifacts.\n", + "\n", + "def publish_proto(store, artifact, message):\n", + " \"\"\"Serializes a proto.\"\"\"\n", + " artifact_copy = metadata_store_pb2.Artifact()\n", + " artifact_copy.type_id = artifact.type_id\n", + " [artifact_id] = store.put_artifacts([artifact_copy])\n", + " # Note the non-atomicity here. This can be solved with a state property.\n", + " artifact_copy.MergeFrom(artifact)\n", + " artifact_copy.uri = BASE_DIR + str(artifact_id)\n", + " artifact_copy.id = artifact_id\n", + " # TODO(martinz): write the proto to disk.\n", + " with recordio.RecordWriter(artifact_copy.uri, \"a\") as output_file:\n", + " output_file.WriteRecord(message.SerializeToString())\n", + " # Commit the whole artifact. Note that any properties that are changed are\n", + " # updated, and any that are removed are deleted.\n", + " store.put_artifacts([artifact_copy])\n", + " return artifact_id\n", + "\n", + "# An example of generic code that can run on top of metadata.\n", + "def get_proto(store, artifact_id, message):\n", + " \"\"\"Deserializes a proto, given an artifact ID.\"\"\"\n", + " [artifact] = store.get_artifacts_by_id([artifact_id])\n", + " with recordio.RecordReader(artifact.uri) as reader:\n", + " # TODO(martinz): check that you read SOMETHING.\n", + " for record in reader:\n", + " message.ParseFromString(record)\n", + "\n", + "def publish_stats(store, stats_proto):\n", + " \"\"\"Because stats has no properties except provenance, we can publish it directly.\"\"\"\n", + " artifact = metadata_store_pb2.Artifact()\n", + " artifact.type_id = get_stats_type(store)\n", + " return publish_proto(store, artifact, stats_proto)\n", + "\n", + "def get_stats(store, stats_id):\n", + " result = statistics_pb2.DatasetFeatureStatisticsList()\n", + " get_proto(store, stats_id, result)\n", + " return result\n", + "\n", + "def publish_schema(store, schema_proto):\n", + " \"\"\"Here, we are ignoring the version.\"\"\"\n", + " artifact = metadata_store_pb2.Artifact()\n", + " artifact.type_id = get_schema_type(store)\n", + " return publish_proto(store, artifact, schema_proto)\n", + "\n", + "def get_schema(store, stats_id):\n", + " result = schema_pb2.Schema()\n", + " get_proto(store, stats_id, result)\n", + " return result\n", + "\n", + "def run_real_stats(store, data_id):\n", + " [data_artifact] = store.get_artifacts_by_id([data_id])\n", + " output_stats = tfdv.generate_statistics_from_tfrecord(data_artifact.uri)\n", + " \n", + " output_stats_id = publish_stats(store, output_stats)\n", + " \n", + " publish_execution(store, get_stats_gen_execution(store), [data_id], [output_stats_id])\n", + " return output_stats_id\n", + "\n", + "\n", + "def visualize_statistics(store, stats_id):\n", + " tfdv.visualize_statistics(get_stats(store, stats_id))\n", + "\n", + "def infer_schema(store, stats_id):\n", + " # TODO: check if this has already been run.\n", + " \n", + " schema_id = publish_schema(store, tfdv.infer_schema(get_stats(store, stats_id)))\n", + " \n", + " publish_execution(store, get_infer_schema_execution(store), [stats_id], [schema_id])\n", + " return schema_id\n", + "\n", + "def display_schema(store, schema_id):\n", + " tfdv.display_schema(get_schema(store, schema_id)) \n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "uMgVhjMwqXpv", + "colab": {} + }, + "source": [ + "# No requirement to have properties correspond to their locations on disk.\n", + "train_id = publish_data_artifact(store, train_path, 0, \"TRAIN\", 0)\n", + "test_id = publish_data_artifact(store, test_path, 0, \"TEST\", 0)\n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "pidQUEiErwSk" + }, + "source": [ + "This notebook describes how to explore and validate Chicago Taxi dataset using TensorFlow Data Validation." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "fHTyLVRcrwS7" + }, + "source": [ + "## Memoization" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "5x-X03d3rwS9" + }, + "source": [ + "Here, we are running TFDV's generate_statistics_from_tf_record. However, we have the capacity to return a cached result if we have run statistics before. Note that this directly relies on provenance, as opposed to directory structure." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "executionInfo": { + "status": "ok", + "timestamp": 1544235499828, + "user_tz": 480, + "elapsed": 56763, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "id": "QGt-VoWlrwS_", + "outputId": "21c5b265-cc09-4a72-de7e-3e00fbc75400", + "colab": { + "height": 1918 + } + }, + "source": [ + "def run_with_cache(store, data_id):\n", + " if is_already_run(store, get_stats_gen_execution(store), data_id):\n", + " print(\"returning cached result\")\n", + " [result] = get_outputs(store, get_stats_gen_execution(store), data_id)\n", + " return result\n", + " else:\n", + " print(\"returning real result\")\n", + " return run_real_stats(store, data_id)\n", + "\n", + "# Uncomment this to run with or without caching.\n", + "# Notice how only one cached result is returned.\n", + "stats_id = run_with_cache(store, train_id)\n", + "# stats_id = run_real_stats(store, train_id)\n", + "visualize_statistics(store, stats_id)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "returning real result\n" + ], + "name": "stdout" + }, + { + "output_type": "display_data", + "data": { + "text/html": [ + "\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": { + "tags": [] + } + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "VqMhQs9T_PWX" + }, + "source": [ + "If you want to play around with this, you can:\n", + "* Add a property to the data artifact type that identifies the type of input.\n", + "* When run_real_stats is run, modify it to call generate_statistics_from_tf_record or generate_statistics_from_csv." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ew1F5VcArwTZ", + "colab": {} + }, + "source": [ + "schema_id = infer_schema(store, stats_id)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "1hebajpyrwTf" + }, + "source": [ + "In general, TFDV uses conservative heuristics to infer stable data properties\n", + "from the statistics in order to avoid overfitting the schema to the specific\n", + "dataset. It is strongly advised to **review the inferred schema and refine\n", + "it as needed**, to capture any domain knowledge about the data that TFDV's\n", + "heuristics might have missed." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "XF_b6Qbo4_JK", + "colab_type": "text" + }, + "source": [ + "## Visualizing Artifacts\n", + "\n", + "In addition, we want to be able to visualize artifacts." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "executionInfo": { + "status": "ok", + "timestamp": 1544235502071, + "user_tz": 480, + "elapsed": 605, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "id": "KHa43qi2rwTg", + "outputId": "8aa635c4-c774-443e-86e8-d807b7a8299d", + "colab": { + "height": 1168 + } + }, + "source": [ + "display_schema(store, schema_id)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
TypePresenceValencyDomain
Feature name
'trip_id'BYTESrequired-
'tips'FLOATrequired-
'community_areas'BYTESoptionalsingle(-inf,inf)
'miles'FLOATrequired-
'end'BYTESoptionalsingle-
'pu_longitude'FLOAToptionalsingle-
'taxi_id'BYTESrequired-
'pay_type'STRINGrequired'pay_type'
'do_com_area'BYTESoptionalsingle(-inf,inf)
'total'FLOATrequired-
'fare'FLOATrequired-
'company'STRINGoptionalsingle'company'
'pu_latitude'FLOAToptionalsingle-
'start_hour'INTrequired-
'do_census_tract'BYTESoptionalsingle(-inf,inf)
'pu_com_area'BYTESoptionalsingle(-inf,inf)
'end_month'INToptionalsingle-
'do_location'BYTESoptionalsingle-
'pu_census_tract'BYTESoptionalsingle(-inf,inf)
'pu_location'BYTESoptionalsingle-
'end_hour'INToptionalsingle-
'do_longitude'FLOAToptionalsingle-
'dur'FLOAToptionalsingle-
'start_month'INTrequired-
'start'BYTESrequired-
'end_day'INToptionalsingle-
'extras'FLOATrequired-
'do_latitude'FLOAToptionalsingle-
'tolls'FLOATrequired-
'start_day'INTrequired-
\n", + "
" + ], + "text/plain": [ + " Type Presence Valency Domain\n", + "Feature name \n", + "'trip_id' BYTES required - \n", + "'tips' FLOAT required - \n", + "'community_areas' BYTES optional single (-inf,inf)\n", + "'miles' FLOAT required - \n", + "'end' BYTES optional single - \n", + "'pu_longitude' FLOAT optional single - \n", + "'taxi_id' BYTES required - \n", + "'pay_type' STRING required 'pay_type'\n", + "'do_com_area' BYTES optional single (-inf,inf)\n", + "'total' FLOAT required - \n", + "'fare' FLOAT required - \n", + "'company' STRING optional single 'company' \n", + "'pu_latitude' FLOAT optional single - \n", + "'start_hour' INT required - \n", + "'do_census_tract' BYTES optional single (-inf,inf)\n", + "'pu_com_area' BYTES optional single (-inf,inf)\n", + "'end_month' INT optional single - \n", + "'do_location' BYTES optional single - \n", + "'pu_census_tract' BYTES optional single (-inf,inf)\n", + "'pu_location' BYTES optional single - \n", + "'end_hour' INT optional single - \n", + "'do_longitude' FLOAT optional single - \n", + "'dur' FLOAT optional single - \n", + "'start_month' INT required - \n", + "'start' BYTES required - \n", + "'end_day' INT optional single - \n", + "'extras' FLOAT required - \n", + "'do_latitude' FLOAT optional single - \n", + "'tolls' FLOAT required - \n", + "'start_day' INT required - " + ] + }, + "metadata": { + "tags": [] + } + }, + { + "output_type": "display_data", + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Values
Domain
'pay_type''Cash', 'Credit Card', 'Dispute', 'No Charge', 'Pcard', 'Prcard', 'Unknown'
'company''0118 - 42111 Godfrey S.Awir', '1085 - 72312 N and W Cab Co', '1085 - N and W Cab Co', '1247 - 72807 Daniel Ayertey', '2092 - 61288 Sbeih company', '2733 - 74600 Benny Jona', '2733 - Benny Jona', '2809 - 95474 C & D Cab Co Inc.', '3141 - 87803 Zip Cab', '3152 - 97284 Crystal Abernathy', '3201 - C&D Cab Co Inc', '3385 - Eman Cab', '3591 - 63480 Chuks Cab', '3620 - 52292 David K. Cab Corp.', '3623 - 72222 Arrington Enterprises', '3897 - 57856 Ilie Malec', '4053 - Adwar H. Nikola', '4197 - 41842 Royal Star', '4197 - Royal Star', '4615 - 83503 Tyrone Henderson', '4615 - Tyrone Henderson', '4623 - 27290 Jay Kim', '4623 - Jay Kim', '5006 - 39261 Salifu Bawa', '5129 - 87128', '5724 - 75306 KYVI Cab Inc', '585 - 88805 Valley Cab Co', '5874 - 73628 Sergey Cab Corp.', '6488 - 83287 Zuha Taxi', '6747 - Mueen Abdalla', 'Blue Ribbon Taxi Association Inc.', 'Chicago Elite Cab Corp.', 'Chicago Elite Cab Corp. (Chicago Carriag', 'Chicago Medallion Leasing INC', 'Chicago Medallion Management', 'Choice Taxi Association', 'Dispatch Taxi Affiliation', 'KOAM Taxi Association', 'Northwest Management LLC', 'Taxi Affiliation Services', 'Top Cab Affiliation'
\n", + "
" + ], + "text/plain": [ + " Values\n", + "Domain \n", + "'pay_type' 'Cash', 'Credit Card', 'Dispute', 'No Charge', 'Pcard', 'Prcard', 'Unknown' \n", + "'company' '0118 - 42111 Godfrey S.Awir', '1085 - 72312 N and W Cab Co', '1085 - N and W Cab Co', '1247 - 72807 Daniel Ayertey', '2092 - 61288 Sbeih company', '2733 - 74600 Benny Jona', '2733 - Benny Jona', '2809 - 95474 C & D Cab Co Inc.', '3141 - 87803 Zip Cab', '3152 - 97284 Crystal Abernathy', '3201 - C&D Cab Co Inc', '3385 - Eman Cab', '3591 - 63480 Chuks Cab', '3620 - 52292 David K. Cab Corp.', '3623 - 72222 Arrington Enterprises', '3897 - 57856 Ilie Malec', '4053 - Adwar H. Nikola', '4197 - 41842 Royal Star', '4197 - Royal Star', '4615 - 83503 Tyrone Henderson', '4615 - Tyrone Henderson', '4623 - 27290 Jay Kim', '4623 - Jay Kim', '5006 - 39261 Salifu Bawa', '5129 - 87128', '5724 - 75306 KYVI Cab Inc', '585 - 88805 Valley Cab Co', '5874 - 73628 Sergey Cab Corp.', '6488 - 83287 Zuha Taxi', '6747 - Mueen Abdalla', 'Blue Ribbon Taxi Association Inc.', 'Chicago Elite Cab Corp.', 'Chicago Elite Cab Corp. (Chicago Carriag', 'Chicago Medallion Leasing INC', 'Chicago Medallion Management', 'Choice Taxi Association', 'Dispatch Taxi Affiliation', 'KOAM Taxi Association', 'Northwest Management LLC', 'Taxi Affiliation Services', 'Top Cab Affiliation'" + ] + }, + "metadata": { + "tags": [] + } + } + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "executionInfo": { + "status": "ok", + "timestamp": 1544235518768, + "user_tz": 480, + "elapsed": 16470, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "id": "d3D2yS8jrwTs", + "outputId": "520f3498-2473-4ef5-9621-9bd444453ec3", + "colab": { + "height": 1918 + } + }, + "source": [ + "# Compute stats over eval data.\n", + "test_stats_id = run_with_cache(store, test_id)\n", + "\n", + "# We have written a method for visualizing statistics.\n", + "visualize_statistics(store, test_stats_id)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "returning real result\n" + ], + "name": "stdout" + }, + { + "output_type": "display_data", + "data": { + "text/html": [ + "\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": { + "tags": [] + } + } + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "executionInfo": { + "status": "ok", + "timestamp": 1544235521169, + "user_tz": 480, + "elapsed": 2257, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "id": "hSPeSzBNrwTy", + "outputId": "f75a4f72-ad69-410e-df7e-695a3b86d19a", + "colab": { + "height": 1925 + } + }, + "source": [ + "# Compare stats of eval data with training data.\n", + "\n", + "# Here, we are grabbing protos from their ids, and passing them into a method.\n", + "\n", + "tfdv.visualize_statistics(lhs_statistics=get_stats(store, test_stats_id), rhs_statistics=get_stats(store, stats_id),\n", + " lhs_name='TEST_DATASET', rhs_name='TRAIN_DATASET')" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/html": [ + "\n", + " " + ], + "text/plain": [ + "" + ] + }, + "metadata": { + "tags": [] + } + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "qr3cmnMf3NJ3", + "colab_type": "text" + }, + "source": [ + "## After the pipeline run, the users can use the metadata store to query artifacts, e.g., locations, properties, lineages.\n", + "\n", + "For example, the following block uses the lineage from the data to find derived artifacts within 1-hop. " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "VB3Qv5l43YSa", + "colab_type": "code", + "outputId": "a7fffe58-2bd3-4807-d106-9ea29705f838", + "executionInfo": { + "status": "ok", + "timestamp": 1544235522139, + "user_tz": 480, + "elapsed": 622, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 259 + } + }, + "source": [ + "def find_data(path):\n", + " for artifact in store.get_artifacts():\n", + " if artifact.uri == path:\n", + " print \" type: \", artifact.type_id\n", + " print \" uri: \", artifact.uri\n", + " print \" span: \", artifact.properties[\"span\"].int_value\n", + " print \" split: \", artifact.properties[\"split\"].string_value\n", + " print \" version: \", artifact.properties[\"version\"].int_value\n", + " return artifact.id\n", + " return -1\n", + " \n", + "# 1. find the artifact\n", + "print \"Querying 1-hop dependencies of the training data:\"\n", + "print \"Found training data:\"\n", + "artifact_id = find_data(train_path)\n", + "\n", + "# 2. find the component runs which used this artifact\n", + "print \"Used by the following executions as input:\"\n", + "execution_ids = []\n", + "for event in store.get_events_by_artifact_ids([artifact_id]):\n", + " if event.type == metadata_store_pb2.Event.INPUT:\n", + " execution_ids.append(event.execution_id)\n", + " print \" - execution id: \", event.execution_id, \\\n", + " \" (occured at:\", time.ctime(int(event.milliseconds_since_epoch/1000)), \")\"\n", + "\n", + "# 3. find the output artifacts by those component runs\n", + "print \"The following artifacts are derived from it:\"\n", + "derived_artifact_ids = []\n", + "for event in store.get_events_by_execution_ids(execution_ids):\n", + " if event.type == metadata_store_pb2.Event.OUTPUT:\n", + " derived_artifact_ids.append(event.artifact_id)\n", + " print \" - artifact id: \", event.artifact_id, \" by execution id: \", event.execution_id\n", + " \n", + "# 4. list the artifacts, if it is a Stats type, output it\n", + "found_latest_stats_artifact = \"\"\n", + "print \"Artifact details :\"\n", + "for artifact in store.get_artifacts_by_id(derived_artifact_ids):\n", + " print \" path: \", artifact.uri\n", + " print \" is Stats Type: \", artifact.type_id == stats_type_id\n", + " found_latest_stats_artifact = artifact" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "stream", + "text": [ + "Querying 1-hop dependencies of the training data:\n", + "Found training data:\n", + " type: 1\n", + " uri: /tmp/ml_metadata_demo_test_sandbox/train\n", + " span: 0\n", + " split: TRAIN\n", + " version: 0\n", + "Used by the following executions as input:\n", + " - execution id: 1 (occured at: Fri Dec 7 18:17:19 2018 )\n", + "The following artifacts are derived from it:\n", + " - artifact id: 3 by execution id: 1\n", + "Artifact details :\n", + " path: /tmp/ml_metadata_demo_test_sandbox/train_stats.pbtxt\n", + " is Stats Type: True\n" + ], + "name": "stdout" + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ri-4LxKq2xOu", + "colab_type": "text" + }, + "source": [ + "## More Than One Input Artifact\n", + "\n", + "Exercise: Make a method that records anomalies. Create a type for anomalies, a type for validate_statistics execution. If time permits, add caching.\n", + "\n", + "Note: at present, we do not have named inputs in events. We plan to fix that very soon. Nonetheless, in this setting you can use the types to distinguish between the stats and the schema." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "lhma4EaWrwT6", + "colab": {} + }, + "source": [ + "\n", + "# Check eval data for errors by validating the eval data stats using the previously inferred schema.\n", + "# Note: here we are directly accessing the protos from the metadata and passing them to a method.\n", + "# Wee! Multiple layers of abstraction!\n", + "anomalies = tfdv.validate_statistics(get_stats(store, test_stats_id), get_schema(store, schema_id))" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "dpEKaCh-rwT_", + "colab": {} + }, + "source": [ + "def get_validate_statistics_type(store):\n", + " raise Unimplemented()\n", + "\n", + "def publish_anomales(store, anomalies):\n", + " # See publish_schema and publish_stats \n", + " raise Unimplemented()\n", + " \n", + "def get_anomalies(store):\n", + " # See get_schema and get_stats\n", + "\n", + " # Hint: result = anomalies_pb2.Anomalies()\n", + " raise Unimplemented()\n", + "\n", + "def validate_statistics(store, stats_id, schema_id):\n", + " raise Unimplemented()\n", + " # Run validate_statistics, and record the result.\n", + " # Hint: anomalies = tfdv.validate_statistics(get_stats(store, test_stats_id), get_schema(store, schema_id))\n", + "\n", + "\n", + "def display_anomalies(store, anomalies_id):\n", + " # Hint: tfdv.display_anomalies(anomalies)\n", + " raise Unimplemented()\n", + "\n", + "def get_anomalies_id_for_data(store, data_id):\n", + " \"\"\"Gets the anomalies for the data, or None if it doesn't exist.\"\"\"\n", + " # Hint: get the stats generated by the data.\n", + " # Then get the anomalies generated by the model.\n", + " # NOTE: the current checks for an execution only work for executions that\n", + " # have one input. How can they \n", + " raise Unimplemented()\n", + "\n", + "def display_anomalies_for_data(store, data_id):\n", + " # Hint: get the right anomalies, then display them. Print an error if no\n", + " # anomalies exist.\n", + " raise Unimplemented()\n", + "\n", + "\n", + " " + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "yUzhJvego_Ie", + "colab_type": "text" + }, + "source": [ + "# Part 3: Adding Anomalies " + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "R4YLJrNnrwUF" + }, + "source": [ + "The anomalies indicate that out of domain values were found for features `company` and `payment_type` in the stats in < 1% of the feature values. If this was expected, then the schema can be updated as follows." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "UyR0zo1bmVLZ", + "colab_type": "text" + }, + "source": [ + "" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "tDbaQCNpkiwx", + "colab_type": "code", + "outputId": "279182a3-6a5b-4f1a-b59e-1fadabf12ebc", + "executionInfo": { + "status": "error", + "timestamp": 1544235525202, + "user_tz": 480, + "elapsed": 1125, + "user": { + "displayName": "Neoklis Polyzotis", + "photoUrl": "https://lh4.googleusercontent.com/-Bg42oi48BB0/AAAAAAAAAAI/AAAAAAAAACo/DH07eAxbJTA/s64/photo.jpg", + "userId": "15618422560880532241" + } + }, + "colab": { + "height": 304 + } + }, + "source": [ + "anomalies_id = validate_statistics(store, test_stats_id, schema_id)\n", + "display_anomalies(anomalies_id)\n" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "error", + "ename": "NameError", + "evalue": "ignored", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mNameError\u001b[0m Traceback (most recent call last)", + "\u001b[1;32m\u001b[0m in \u001b[0;36m\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0manomalies_id\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mvalidate_statistics\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mstore\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtest_stats_id\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mschema_id\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 2\u001b[0m \u001b[0mdisplay_anomalies\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0manomalies_id\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", + "\u001b[1;32m\u001b[0m in \u001b[0;36mvalidate_statistics\u001b[1;34m(store, stats_id, schema_id)\u001b[0m\n\u001b[0;32m 13\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 14\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mvalidate_statistics\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mstore\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstats_id\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mschema_id\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 15\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mUnimplemented\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 16\u001b[0m \u001b[1;31m# Run validate_statistics, and record the result.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 17\u001b[0m \u001b[1;31m# Hint: anomalies = tfdv.validate_statistics(get_stats(store, test_stats_id), get_schema(store, schema_id))\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n", + "\u001b[1;31mNameError\u001b[0m: global name 'Unimplemented' is not defined" + ] + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "QLLR_Ev6lcVz", + "colab_type": "code", + "colab": {} + }, + "source": [ + "\n", + "\n", + "def get_anomalies_id_for_data(store, data_id):\n", + " \"\"\"Gets the anomalies for the data, or None if it doesn't exist.\"\"\"\n", + " # Hint: get the stats generated by the data.\n", + " # Then get the anomalies generated by the model.\n", + " # NOTE: the current checks for an execution only work for executions that\n", + " # have one input. How can they \n", + " raise Unimplemented()\n", + "\n", + "def display_anomalies_for_data(store, data_id):\n", + " # Hint: get the right anomalies, then display them. Print an error if no\n", + " # anomalies exist.\n", + " raise Unimplemented()\n" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "AeoHVJSurwUJ", + "colab": {} + }, + "source": [ + "# We can also operate directly on the protos.\n", + "def update_schema(store, schema_id):\n", + " schema = get_schema(store, schema_id)\n", + " \n", + " # Relax the minimum fraction of values that must come from the domain for feature company.\n", + " company = tfdv.get_feature(schema, 'company')\n", + " company.distribution_constraints.min_domain_mass = 0.9\n", + "\n", + " # Add new value to the domain of feature payment_type.\n", + " payment_type_domain = tfdv.get_domain(schema, 'payment_type')\n", + " payment_type_domain.value.append('Prcard')\n", + " # Return a new schema.\n", + " return publish_schema(store, schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "JLMNWJYarwUP", + "colab": {} + }, + "source": [ + "new_schema_id = update_schema(store, schema_id)\n", + "\n", + "\n", + "updated_anomalies_id = validate_statistics(store, test_stats_id, new_schema_id)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "NPPalJMjrwUZ", + "outputId": "fd96d6d3-ddf7-4c7f-dcdc-37765e61c7a0", + "colab": {} + }, + "source": [ + "tfdv.display_anomalies(updated_anomalies)" + ], + "execution_count": 0, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/html": [ + "

No anomalies found.

" + ], + "text/plain": [ + "" + ] + }, + "metadata": { + "tags": [] + } + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gSJxYZBZXSFi", + "colab_type": "text" + }, + "source": [ + "# Part 4: Safeguards\n", + "While the system is very flexible, there are a variety of safeguards. For example:\n", + "\n", + "1. You can't change the type of an artifact or execution when you update it. \n", + "2. (For now), you can't change a named type's properties.\n", + "3. You can't create an event that points to artifact IDs or executions that don't exist.\n", + "4. You can't set a (regular) property for an artifact where that property is not in the type.\n", + "\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "6aTbVf1IrwUr" + }, + "source": [ + "# Part 5: Build Your Own Orchestration System\n", + "\n", + "ML Metadata provides a basic interface for metadata, but it is intentionally not an orchestration system, and there are few limitations in the data model.\n", + "\n", + "1. Write a class that wraps metadata store and provides more restrictive API to the database.\n", + "\n", + "\n", + "```\n", + "class MyMetadataStore(object):\n", + " def __init__(self):\n", + " self._metadata_store = metadata_store.MetadataStore(...)\n", + "```\n", + "\n", + "\n", + "2. Add a state property to all the artifacts. Write a query that only selects artifacts where the state is live.\n", + "3. Add a timestamp property to all the artifacts. Write a \"current version\" query that selects the most recent schema. Hint: make your own create_artifact_type that appends the new properties?\n", + "3. Add a pipeline_id property to all the artifacts. When you create your wrapper class, set a pipeline ID. Write a variant of the API that limits you to only artifacts with the current pipeline ID (e.g., have a new get_artifacts() that calls the existing get_artifacts(), then filters out artifacts without the right ID: have a put_artifacts() that sets the pipeline ID before committing the artifacts, et cetera.) \n", + "\n", + "\n", + "\n" + ] + } + ] +} \ No newline at end of file From 4d91f13ee9af064e20acc9bd8ebe742eb461bb26 Mon Sep 17 00:00:00 2001 From: Jingyu Shao Date: Tue, 17 Mar 2020 23:10:26 +0000 Subject: [PATCH 2/3] Rename ML Metadata demo --- tfx_labs/{ML_Metadata_Demo.ipynb => Lab_11_ML_Metadata.ipynb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tfx_labs/{ML_Metadata_Demo.ipynb => Lab_11_ML_Metadata.ipynb} (100%) diff --git a/tfx_labs/ML_Metadata_Demo.ipynb b/tfx_labs/Lab_11_ML_Metadata.ipynb similarity index 100% rename from tfx_labs/ML_Metadata_Demo.ipynb rename to tfx_labs/Lab_11_ML_Metadata.ipynb From f9177ef5e26bbba37f4a2aa3d3dd964b49395f12 Mon Sep 17 00:00:00 2001 From: Jingyu Shao <62282696+jingyu-shao@users.noreply.github.com> Date: Tue, 17 Mar 2020 16:52:28 -0700 Subject: [PATCH 3/3] Update README.md --- tfx_labs/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tfx_labs/README.md b/tfx_labs/README.md index 3c13958b..bb5416f1 100644 --- a/tfx_labs/README.md +++ b/tfx_labs/README.md @@ -54,3 +54,6 @@ We will train an image classification model on the [UC Merced Land Use Dataset]( This notebook describes graph regularization for sentiment classification by synthesizing a graph from input data. An end-to-end workflow using the [Neural Structured Learning](https://www.tensorflow.org/neural_structured_learning) framework is demonstrated in a TFX pipeline using several TFX custom components as well as a graph-regularized trainer component. +### [Lab 11 – ML Metadata](https://colab.sandbox.google.com/github/tensorflow/workshops/blob/master/tfx_labs/Lab_11_ML_Metadata.ipynb) + +This notebook demonstrates the basic usages of ml metadata. Specifically, it shows how we could interact with Metadata store before and during pipeline runs. We'll see a use case where ML Metadata is used in the Chicago Taxi notebook!