From b4191fd882d0cf7a5c783b82240b647b49a4037c Mon Sep 17 00:00:00 2001 From: Pedram Pejman Date: Mon, 7 Oct 2019 20:16:36 -0400 Subject: [PATCH] Adds step 8 notebook; changes container to share host's network --- tfx_and_serving/docker-compose.yaml | 1 + tfx_and_serving/notebooks/step8.ipynb | 265 ++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 tfx_and_serving/notebooks/step8.ipynb diff --git a/tfx_and_serving/docker-compose.yaml b/tfx_and_serving/docker-compose.yaml index 37b93330..f0b26cd6 100644 --- a/tfx_and_serving/docker-compose.yaml +++ b/tfx_and_serving/docker-compose.yaml @@ -4,6 +4,7 @@ services: build: context: ./buildfiles dockerfile: Dockerfile + network_mode: "host" image: gcr.io/tfx-oss-public/tfx-and-serving:latest ports: - "4040:4040" # SparkContext diff --git a/tfx_and_serving/notebooks/step8.ipynb b/tfx_and_serving/notebooks/step8.ipynb new file mode 100644 index 00000000..f907f3bd --- /dev/null +++ b/tfx_and_serving/notebooks/step8.ipynb @@ -0,0 +1,265 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "name": "step8.ipynb", + "provenance": [], + "collapsed_sections": [] + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "c_SJpWiSsEie" + }, + "source": [ + "# Step 8: Use model to perform inference\n", + "\n", + "Use example data stored on disk to perform inference with your model by sending REST requests to Tesnorflow Serving." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "Hgj2EoGU6689", + "colab": {} + }, + "source": [ + "\"\"\"A client for serving the chicago_taxi workshop example locally.\"\"\"\n", + "\n", + "from __future__ import absolute_import\n", + "from __future__ import division\n", + "from __future__ import print_function\n", + "\n", + "import argparse\n", + "import base64\n", + "import json\n", + "import os\n", + "import subprocess\n", + "import tempfile\n", + "\n", + "import requests\n", + "import tensorflow as tf\n", + "import tfx_utils\n", + "from tfx.utils import io_utils\n", + "from tensorflow_metadata.proto.v0 import schema_pb2\n", + "\n", + "from tensorflow_transform import coders as tft_coders\n", + "from tensorflow_transform.tf_metadata import dataset_metadata\n", + "from tensorflow_transform.tf_metadata import dataset_schema\n", + "from tensorflow_transform.tf_metadata import schema_utils\n", + "\n", + "from google.protobuf import text_format\n", + "\n", + "from tensorflow.python.lib.io import file_io # pylint: disable=g-direct-tensorflow-import\n", + "from tfx.examples.chicago_taxi.trainer import taxi\n", + "\n", + "_INFERENCE_TIMEOUT_SECONDS = 5.0\n", + "_PIPELINE_NAME = 'taxi'\n", + "_LABEL_KEY = 'tips'" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "gbDJpeYtoynU" + }, + "source": [ + "The data that we will use to send requests to our model is stored on disk in [csv](https://en.wikipedia.org/wiki/Comma-separated_values) format; we will convert these examples to [Tensorflow Example](https://www.tensorflow.org/api_docs/python/tf/train/Example) to send to our model being served by Tensorflow Serving.\n", + "\n", + "Construct the following two utility functions:\n", + "\n", + "* `_make_proto_coder` which creates a coder that will decode a single row from the CSV data file and output a tf.transform encoded dict.\n", + "* `_make_csv_coder` which creates a coder that will encode a tf.transform encoded dict object into a TF Example.\n", + "\n" + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "ETyVOHa669Gh", + "colab": {} + }, + "source": [ + "def _get_raw_feature_spec(schema):\n", + " \"\"\"Return raw feature spec for a given schema.\"\"\"\n", + " return schema_utils.schema_as_feature_spec(schema).feature_spec\n", + "\n", + "\n", + "def _make_proto_coder(schema):\n", + " \"\"\"Return a coder for tf.transform to read TF Examples.\"\"\"\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + " raw_schema = dataset_schema.from_feature_spec(raw_feature_spec)\n", + " return tft_coders.ExampleProtoCoder(raw_schema)\n", + "\n", + "\n", + "def _make_csv_coder(schema, column_names):\n", + " \"\"\"Return a coder for tf.transform to read csv files.\"\"\"\n", + " raw_feature_spec = _get_raw_feature_spec(schema)\n", + " parsing_schema = dataset_schema.from_feature_spec(raw_feature_spec)\n", + " return tft_coders.CsvCoder(column_names, parsing_schema)" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "RBOVwebUs6hz" + }, + "source": [ + "Implement routine to read examples from a CSV file and for each example, send an inference request containing a base-64 encoding of the serialized TF Example." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "zlArlZKo69Og", + "colab": {} + }, + "source": [ + "def do_inference(server_addr, model_name, examples_file, num_examples, schema):\n", + " \"\"\"Sends requests to the model and prints the results.\n", + " Args:\n", + " server_addr: network address of model server in \"host:port\" format\n", + " model_name: name of the model as understood by the model server\n", + " examples_file: path to csv file containing examples, with the first line\n", + " assumed to have the column headers\n", + " num_examples: number of requests to send to the server\n", + " schema: a Schema describing the input data\n", + " Returns:\n", + " Response from model server\n", + " \"\"\"\n", + " filtered_features = [\n", + " feature for feature in schema.feature if feature.name != _LABEL_KEY\n", + " ]\n", + " del schema.feature[:]\n", + " schema.feature.extend(filtered_features)\n", + "\n", + " column_names = io_utils.load_csv_column_names(examples_file)\n", + " csv_coder = _make_csv_coder(schema, column_names)\n", + " proto_coder = _make_proto_coder(schema)\n", + "\n", + " input_file = open(examples_file, 'r')\n", + " input_file.readline() # skip header line\n", + "\n", + " serialized_examples = []\n", + " for _ in range(num_examples):\n", + " one_line = input_file.readline()\n", + " if not one_line:\n", + " print('End of example file reached')\n", + " break\n", + " one_example = csv_coder.decode(one_line)\n", + "\n", + " serialized_example = proto_coder.encode(one_example)\n", + " serialized_examples.append(serialized_example)\n", + "\n", + " parsed_server_addr = server_addr.split(':')\n", + "\n", + " host=parsed_server_addr[0]\n", + " port=parsed_server_addr[1]\n", + " json_examples = []\n", + "\n", + " for serialized_example in serialized_examples:\n", + " # The encoding follows the guidelines in:\n", + " # https://www.tensorflow.org/tfx/serving/api_rest\n", + " example_bytes = base64.b64encode(serialized_example).decode('utf-8')\n", + " predict_request = '{ \"b64\": \"%s\" }' % example_bytes\n", + " json_examples.append(predict_request)\n", + "\n", + " json_request = '{ \"instances\": [' + ','.join(map(str, json_examples)) + ']}'\n", + "\n", + " server_url = 'http://' + host + ':' + port + '/v1/models/' + model_name + ':predict'\n", + " response = requests.post(\n", + " server_url, data=json_request, timeout=_INFERENCE_TIMEOUT_SECONDS)\n", + " response.raise_for_status()\n", + " prediction = response.json()\n", + " print(json.dumps(prediction, indent=4))" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "FRBwM_-Ytuo5" + }, + "source": [ + "Open the metadata store, obtain the URI for the schema of your model, as inferred by TF DV, fetch the schema file and parse it into a `Schema` object." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "dYAuE2A47Dt0", + "colab": {} + }, + "source": [ + "def _make_schema(pipeline_name):\n", + " \"\"\"Reads and constructs schema object for provided pipeline.\n", + "\n", + " Args:\n", + " pipeline_name: The name of the pipeline for which TFX Metadata Store has Schema.\n", + "\n", + " Returns:\n", + " An instance of Schema or raises Exception if more or fewer than one schema\n", + " was found for the given pipeline.\n", + " \"\"\"\n", + " db_path = os.path.join(os.environ['HOME'], 'airflow/tfx/metadata/', pipeline_name, 'metadata.db')\n", + " store = tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(db_path)\n", + " schemas = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)\n", + " assert len(schemas.URI) == 1\n", + " schema_uri = schemas.URI.iloc[0] + 'schema.pbtxt'\n", + " schema = schema_pb2.Schema()\n", + " contents = file_io.read_file_to_string(schema_uri)\n", + " text_format.Parse(contents, schema)\n", + " return schema" + ], + "execution_count": 0, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "text", + "id": "x6l3ZBafybWC" + }, + "source": [ + "Use the utilities that we have defined to send a series of inference requests to the model being served by Tensorflow Serving listening on the host's network interface." + ] + }, + { + "cell_type": "code", + "metadata": { + "colab_type": "code", + "id": "AmgLWbAOAcQy", + "colab": {} + }, + "source": [ + "do_inference(server_addr='127.0.0.1:8501',\n", + " model_name=_PIPELINE_NAME,\n", + " examples_file='/root/airflow/data/taxi_data/data.csv',\n", + " num_examples=3,\n", + " schema=_make_schema(_PIPELINE_NAME))" + ], + "execution_count": 0, + "outputs": [] + } + ] +} \ No newline at end of file