Skip to content
This repository was archived by the owner on Nov 5, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tfx_and_serving/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
build:
context: ./buildfiles
dockerfile: Dockerfile
network_mode: "host"
image: gcr.io/tfx-oss-public/tfx-and-serving:latest
ports:
- "4040:4040" # SparkContext
Expand Down
265 changes: 265 additions & 0 deletions tfx_and_serving/notebooks/step8.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "step8.ipynb",
"provenance": [],
"collapsed_sections": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "c_SJpWiSsEie"
},
"source": [
"# Step 8: Use model to perform inference\n",
"\n",
"Use example data stored on disk to perform inference with your model by sending REST requests to Tesnorflow Serving."
]
},
{
"cell_type": "code",
"metadata": {
"colab_type": "code",
"id": "Hgj2EoGU6689",
"colab": {}
},
"source": [
"\"\"\"A client for serving the chicago_taxi workshop example locally.\"\"\"\n",
"\n",
"from __future__ import absolute_import\n",
"from __future__ import division\n",
"from __future__ import print_function\n",
"\n",
"import argparse\n",
"import base64\n",
"import json\n",
"import os\n",
"import subprocess\n",
"import tempfile\n",
"\n",
"import requests\n",
"import tensorflow as tf\n",
"import tfx_utils\n",
"from tfx.utils import io_utils\n",
"from tensorflow_metadata.proto.v0 import schema_pb2\n",
"\n",
"from tensorflow_transform import coders as tft_coders\n",
"from tensorflow_transform.tf_metadata import dataset_metadata\n",
"from tensorflow_transform.tf_metadata import dataset_schema\n",
"from tensorflow_transform.tf_metadata import schema_utils\n",
"\n",
"from google.protobuf import text_format\n",
"\n",
"from tensorflow.python.lib.io import file_io # pylint: disable=g-direct-tensorflow-import\n",
"from tfx.examples.chicago_taxi.trainer import taxi\n",
"\n",
"_INFERENCE_TIMEOUT_SECONDS = 5.0\n",
"_PIPELINE_NAME = 'taxi'\n",
"_LABEL_KEY = 'tips'"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "gbDJpeYtoynU"
},
"source": [
"The data that we will use to send requests to our model is stored on disk in [csv](https://en.wikipedia.org/wiki/Comma-separated_values) format; we will convert these examples to [Tensorflow Example](https://www.tensorflow.org/api_docs/python/tf/train/Example) to send to our model being served by Tensorflow Serving.\n",
"\n",
"Construct the following two utility functions:\n",
"\n",
"* `_make_proto_coder` which creates a coder that will decode a single row from the CSV data file and output a tf.transform encoded dict.\n",
"* `_make_csv_coder` which creates a coder that will encode a tf.transform encoded dict object into a TF Example.\n",
"\n"
]
},
{
"cell_type": "code",
"metadata": {
"colab_type": "code",
"id": "ETyVOHa669Gh",
"colab": {}
},
"source": [
"def _get_raw_feature_spec(schema):\n",
" \"\"\"Return raw feature spec for a given schema.\"\"\"\n",
" return schema_utils.schema_as_feature_spec(schema).feature_spec\n",
"\n",
"\n",
"def _make_proto_coder(schema):\n",
" \"\"\"Return a coder for tf.transform to read TF Examples.\"\"\"\n",
" raw_feature_spec = _get_raw_feature_spec(schema)\n",
" raw_schema = dataset_schema.from_feature_spec(raw_feature_spec)\n",
" return tft_coders.ExampleProtoCoder(raw_schema)\n",
"\n",
"\n",
"def _make_csv_coder(schema, column_names):\n",
" \"\"\"Return a coder for tf.transform to read csv files.\"\"\"\n",
" raw_feature_spec = _get_raw_feature_spec(schema)\n",
" parsing_schema = dataset_schema.from_feature_spec(raw_feature_spec)\n",
" return tft_coders.CsvCoder(column_names, parsing_schema)"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "RBOVwebUs6hz"
},
"source": [
"Implement routine to read examples from a CSV file and for each example, send an inference request containing a base-64 encoding of the serialized TF Example."
]
},
{
"cell_type": "code",
"metadata": {
"colab_type": "code",
"id": "zlArlZKo69Og",
"colab": {}
},
"source": [
"def do_inference(server_addr, model_name, examples_file, num_examples, schema):\n",
" \"\"\"Sends requests to the model and prints the results.\n",
" Args:\n",
" server_addr: network address of model server in \"host:port\" format\n",
" model_name: name of the model as understood by the model server\n",
" examples_file: path to csv file containing examples, with the first line\n",
" assumed to have the column headers\n",
" num_examples: number of requests to send to the server\n",
" schema: a Schema describing the input data\n",
" Returns:\n",
" Response from model server\n",
" \"\"\"\n",
" filtered_features = [\n",
" feature for feature in schema.feature if feature.name != _LABEL_KEY\n",
" ]\n",
" del schema.feature[:]\n",
" schema.feature.extend(filtered_features)\n",
"\n",
" column_names = io_utils.load_csv_column_names(examples_file)\n",
" csv_coder = _make_csv_coder(schema, column_names)\n",
" proto_coder = _make_proto_coder(schema)\n",
"\n",
" input_file = open(examples_file, 'r')\n",
" input_file.readline() # skip header line\n",
"\n",
" serialized_examples = []\n",
" for _ in range(num_examples):\n",
" one_line = input_file.readline()\n",
" if not one_line:\n",
" print('End of example file reached')\n",
" break\n",
" one_example = csv_coder.decode(one_line)\n",
"\n",
" serialized_example = proto_coder.encode(one_example)\n",
" serialized_examples.append(serialized_example)\n",
"\n",
" parsed_server_addr = server_addr.split(':')\n",
"\n",
" host=parsed_server_addr[0]\n",
" port=parsed_server_addr[1]\n",
" json_examples = []\n",
"\n",
" for serialized_example in serialized_examples:\n",
" # The encoding follows the guidelines in:\n",
" # https://www.tensorflow.org/tfx/serving/api_rest\n",
" example_bytes = base64.b64encode(serialized_example).decode('utf-8')\n",
" predict_request = '{ \"b64\": \"%s\" }' % example_bytes\n",
" json_examples.append(predict_request)\n",
"\n",
" json_request = '{ \"instances\": [' + ','.join(map(str, json_examples)) + ']}'\n",
"\n",
" server_url = 'http://' + host + ':' + port + '/v1/models/' + model_name + ':predict'\n",
" response = requests.post(\n",
" server_url, data=json_request, timeout=_INFERENCE_TIMEOUT_SECONDS)\n",
" response.raise_for_status()\n",
" prediction = response.json()\n",
" print(json.dumps(prediction, indent=4))"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "FRBwM_-Ytuo5"
},
"source": [
"Open the metadata store, obtain the URI for the schema of your model, as inferred by TF DV, fetch the schema file and parse it into a `Schema` object."
]
},
{
"cell_type": "code",
"metadata": {
"colab_type": "code",
"id": "dYAuE2A47Dt0",
"colab": {}
},
"source": [
"def _make_schema(pipeline_name):\n",
" \"\"\"Reads and constructs schema object for provided pipeline.\n",
"\n",
" Args:\n",
" pipeline_name: The name of the pipeline for which TFX Metadata Store has Schema.\n",
"\n",
" Returns:\n",
" An instance of Schema or raises Exception if more or fewer than one schema\n",
" was found for the given pipeline.\n",
" \"\"\"\n",
" db_path = os.path.join(os.environ['HOME'], 'airflow/tfx/metadata/', pipeline_name, 'metadata.db')\n",
" store = tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(db_path)\n",
" schemas = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)\n",
" assert len(schemas.URI) == 1\n",
" schema_uri = schemas.URI.iloc[0] + 'schema.pbtxt'\n",
" schema = schema_pb2.Schema()\n",
" contents = file_io.read_file_to_string(schema_uri)\n",
" text_format.Parse(contents, schema)\n",
" return schema"
],
"execution_count": 0,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "x6l3ZBafybWC"
},
"source": [
"Use the utilities that we have defined to send a series of inference requests to the model being served by Tensorflow Serving listening on the host's network interface."
]
},
{
"cell_type": "code",
"metadata": {
"colab_type": "code",
"id": "AmgLWbAOAcQy",
"colab": {}
},
"source": [
"do_inference(server_addr='127.0.0.1:8501',\n",
" model_name=_PIPELINE_NAME,\n",
" examples_file='/root/airflow/data/taxi_data/data.csv',\n",
" num_examples=3,\n",
" schema=_make_schema(_PIPELINE_NAME))"
],
"execution_count": 0,
"outputs": []
}
]
}