From 769915be6814c94388bb9aa913f42b3dc7a6df26 Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Thu, 18 Mar 2021 11:14:02 -0700 Subject: [PATCH 1/3] wip! --- Pipfile.lock | 7 +++++ lib/id3c/api/routes.py | 18 ++++++++++++- lib/id3c/metrics.py | 61 ++++++++++++++++++++++++++++++++++++++++++ setup.py | 1 + 4 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 lib/id3c/metrics.py diff --git a/Pipfile.lock b/Pipfile.lock index 41e71bb89..541140a66 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -483,6 +483,13 @@ "markers": "python_full_version >= '3.6.1'", "version": "==1.1.5" }, + "prometheus-client": { + "hashes": [ + "sha256:9da7b32f02439d8c04f7777021c304ed51d9ec180604700c1ba72a4d44dceb03", + "sha256:b08c34c328e1bf5961f0b4352668e6c8f145b4a087e09b7296ef62cbe4693d35" + ], + "version": "==0.9.0" + }, "protobuf": { "hashes": [ "sha256:0e247612fadda953047f53301a7b0407cb0c3cb4ae25a6fde661597a04039b3c", diff --git a/lib/id3c/api/routes.py b/lib/id3c/api/routes.py index b582c482b..99bc5e8b5 100644 --- a/lib/id3c/api/routes.py +++ b/lib/id3c/api/routes.py @@ -4,7 +4,8 @@ import json import logging import pkg_resources -from flask import Blueprint, request, send_file +from flask import Blueprint, make_response, request, send_file +from .. import metrics from . import datastore from .utils.routes import authenticated_datastore_session_required, content_types_accepted, check_content_length @@ -147,6 +148,21 @@ def receive_fhir(*, session): return "", 204 +@api_v1.route("/metrics", methods = ["GET"]) +@authenticated_datastore_session_required +def expose_metrics(*, session): + """ + Exposes metrics for Prometheus. + """ + # Make an ephemeral registry for metrics collected via the authenticated + # session. + registry = metrics.CollectorRegistry(auto_describe = True) + + registry.register(metrics.DatabaseCollector(session)) + + return make_response(metrics.make_wsgi_app(registry)) + + # Load all extra API routes from extensions # Needs to be at the end of route declarations to allow customization of # existing routes and avoid dependency errors diff --git a/lib/id3c/metrics.py b/lib/id3c/metrics.py new file mode 100644 index 000000000..df28c8c22 --- /dev/null +++ b/lib/id3c/metrics.py @@ -0,0 +1,61 @@ +""" +Metrics handling functions. +""" +import logging +import prometheus_client +from prometheus_client.core import GaugeMetricFamily +from psycopg2.errors import InsufficientPrivilege +from .db import DatabaseSession + + +LOG = logging.getLogger(__name__) + +CollectorRegistry = prometheus_client.CollectorRegistry + +make_wsgi_app = prometheus_client.make_wsgi_app + + +class DatabaseCollector: + """ + Collects metrics from the database, using an existing *session*. + """ + def __init__(self, session: DatabaseSession): + self.session = session + + + def collect(self): + with self.session: + yield from self.estimated_row_total() + + + def estimated_row_total(self): + family = GaugeMetricFamily( + "id3c_estimated_row_total", + "Estimated number of rows in an ID3C database table", + labels = ("schema", "table")) + + try: + metrics = self.session.fetch_all( + """ + select + ns.nspname as schema, + c.relname as table, + c.reltuples::bigint as estimated_row_count + from + pg_catalog.pg_class c + join pg_catalog.pg_namespace ns on (c.relnamespace = ns.oid) + where + ns.nspname in ('receiving', 'warehouse') and + c.relkind = 'r' + order by + schema, + "table" + """) + except InsufficientPrivilege as error: + LOG.error(f"Permission denied when collecting id3c_estimated_row_total metrics: {error}") + return + + for metric in metrics: + family.add_metric((metric.schema, metric.table), metric.estimated_row_count) + + yield family diff --git a/setup.py b/setup.py index fecd993fa..1ed851b54 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,7 @@ "more-itertools", "oauth2client >2.0.0,<4.0.0", "pandas >=1.0.1,<2", + "prometheus_client", "psycopg2 >=2.8,<3", "pyyaml", "requests", From 9803e20afde6e2129667ff734977a1f684109c27 Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Fri, 19 Mar 2021 13:15:36 -0700 Subject: [PATCH 2/3] wip! --- Pipfile.lock | 6 ++++++ lib/id3c/api/__init__.py | 4 +++- lib/id3c/api/metrics.py | 27 ++++++++++++++++++++++++++ lib/id3c/api/routes.py | 42 +++++++++++++++++++++++++--------------- lib/id3c/metrics.py | 5 ----- setup.py | 1 + 6 files changed, 63 insertions(+), 22 deletions(-) create mode 100644 lib/id3c/api/metrics.py diff --git a/Pipfile.lock b/Pipfile.lock index 541140a66..3eb4d9205 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -490,6 +490,12 @@ ], "version": "==0.9.0" }, + "prometheus-flask-exporter": { + "hashes": [ + "sha256:01c8282eb695596531f29f4869c1b127e1918af4f191f1215c4b0b0d081f757e" + ], + "version": "==0.18.1" + }, "protobuf": { "hashes": [ "sha256:0e247612fadda953047f53301a7b0407cb0c3cb4ae25a6fde661597a04039b3c", diff --git a/lib/id3c/api/__init__.py b/lib/id3c/api/__init__.py index 534617967..bb9f8eb4a 100644 --- a/lib/id3c/api/__init__.py +++ b/lib/id3c/api/__init__.py @@ -3,7 +3,7 @@ """ import logging from flask import Flask -from . import config +from . import config, metrics from .routes import blueprints @@ -17,6 +17,8 @@ def create_app(): for blueprint in blueprints: app.register_blueprint(blueprint) + metrics.register_app(app) + LOG.debug(f"app root is {app.root_path}") LOG.debug(f"app static directory is {app.static_folder}") diff --git a/lib/id3c/api/metrics.py b/lib/id3c/api/metrics.py new file mode 100644 index 000000000..a5381ac69 --- /dev/null +++ b/lib/id3c/api/metrics.py @@ -0,0 +1,27 @@ +""" +Web API metrics. +""" +import os +import prometheus_flask_exporter +import prometheus_flask_exporter.multiprocess + + +if "prometheus_multiproc_dir" in os.environ: + FlaskMetrics = prometheus_flask_exporter.multiprocess.MultiprocessPrometheusMetrics + MULTIPROCESS = True + +else: + FlaskMetrics = prometheus_flask_exporter.PrometheusMetrics + MULTIPROCESS = False + + +# This instance is used by both our routes and create_app(). +XXX = FlaskMetrics( + app = None, + path = None, + defaults_prefix = prometheus_flask_exporter.NO_PREFIX, + default_latency_as_histogram = False) + + +def register_app(app): + XXX.init_app(app) diff --git a/lib/id3c/api/routes.py b/lib/id3c/api/routes.py index 99bc5e8b5..2e8df737c 100644 --- a/lib/id3c/api/routes.py +++ b/lib/id3c/api/routes.py @@ -4,9 +4,12 @@ import json import logging import pkg_resources +import prometheus_client from flask import Blueprint, make_response, request, send_file -from .. import metrics + +from ..metrics import DatabaseCollector from . import datastore +from .metrics import XXX from .utils.routes import authenticated_datastore_session_required, content_types_accepted, check_content_length @@ -21,6 +24,28 @@ ] +# Metrics exposition endpoint +@api_v1.route("/metrics", methods = ["GET"]) +@XXX.do_not_track() +@authenticated_datastore_session_required +def expose_metrics(*, session): + """ + Exposes metrics for Prometheus. + + Includes metrics collected from the Flask app, as well as the database. + """ + registry = prometheus_client.CollectorRegistry(auto_describe = True) + + # Collect metrics from the server-wide registry, potentially from multiple + # server processes via files in prometheus_multiproc_dir. + registry.register(XXX.registry) + + # Collect metrics from the database using the authenticated session. + registry.register(DatabaseCollector(session)) + + return make_response(prometheus_client.make_wsgi_app(registry)) + + @api_v1.route("/", methods = ['GET']) @api_unversioned.route("/", methods = ['GET']) def index(): @@ -148,21 +173,6 @@ def receive_fhir(*, session): return "", 204 -@api_v1.route("/metrics", methods = ["GET"]) -@authenticated_datastore_session_required -def expose_metrics(*, session): - """ - Exposes metrics for Prometheus. - """ - # Make an ephemeral registry for metrics collected via the authenticated - # session. - registry = metrics.CollectorRegistry(auto_describe = True) - - registry.register(metrics.DatabaseCollector(session)) - - return make_response(metrics.make_wsgi_app(registry)) - - # Load all extra API routes from extensions # Needs to be at the end of route declarations to allow customization of # existing routes and avoid dependency errors diff --git a/lib/id3c/metrics.py b/lib/id3c/metrics.py index df28c8c22..ce51dc672 100644 --- a/lib/id3c/metrics.py +++ b/lib/id3c/metrics.py @@ -2,7 +2,6 @@ Metrics handling functions. """ import logging -import prometheus_client from prometheus_client.core import GaugeMetricFamily from psycopg2.errors import InsufficientPrivilege from .db import DatabaseSession @@ -10,10 +9,6 @@ LOG = logging.getLogger(__name__) -CollectorRegistry = prometheus_client.CollectorRegistry - -make_wsgi_app = prometheus_client.make_wsgi_app - class DatabaseCollector: """ diff --git a/setup.py b/setup.py index 1ed851b54..618baffff 100644 --- a/setup.py +++ b/setup.py @@ -72,6 +72,7 @@ "oauth2client >2.0.0,<4.0.0", "pandas >=1.0.1,<2", "prometheus_client", + "prometheus_flask_exporter", "psycopg2 >=2.8,<3", "pyyaml", "requests", From f3a7e4c06ed32ff13c542b292f15ed5deb8eb94e Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Fri, 19 Mar 2021 13:15:48 -0700 Subject: [PATCH 3/3] wip! --- lib/id3c/api/metrics.py | 14 ++++++++++++++ lib/id3c/metrics.py | 39 +++++++++++++++++++++++++++++++++++++++ lib/id3c/utils.py | 20 ++++++++++++++++++++ 3 files changed, 73 insertions(+) diff --git a/lib/id3c/api/metrics.py b/lib/id3c/api/metrics.py index a5381ac69..dc693c6d9 100644 --- a/lib/id3c/api/metrics.py +++ b/lib/id3c/api/metrics.py @@ -2,9 +2,12 @@ Web API metrics. """ import os +from prometheus_client import CollectorRegistry, GCCollector, PlatformCollector, ProcessCollector import prometheus_flask_exporter import prometheus_flask_exporter.multiprocess +from ..metrics import MultiProcessWriter + if "prometheus_multiproc_dir" in os.environ: FlaskMetrics = prometheus_flask_exporter.multiprocess.MultiprocessPrometheusMetrics @@ -25,3 +28,14 @@ def register_app(app): XXX.init_app(app) + + # XXX TODO FIXME needs to be postfork for a pre-forking server like uWSGI + if MULTIPROCESS: + registry = CollectorRegistry(auto_describe = True) + + ProcessCollector(registry = registry) + PlatformCollector(registry = registry) + GCCollector(registry = registry) + + writer = MultiProcessWriter(registry) + writer.start() diff --git a/lib/id3c/metrics.py b/lib/id3c/metrics.py index ce51dc672..4a4e21f33 100644 --- a/lib/id3c/metrics.py +++ b/lib/id3c/metrics.py @@ -2,9 +2,15 @@ Metrics handling functions. """ import logging +import os +import threading +from prometheus_client import CollectorRegistry, REGISTRY as DEFAULT_REGISTRY from prometheus_client.core import GaugeMetricFamily +from prometheus_client.values import ValueClass from psycopg2.errors import InsufficientPrivilege +from time import sleep from .db import DatabaseSession +from .utils import set_thread_name LOG = logging.getLogger(__name__) @@ -54,3 +60,36 @@ def estimated_row_total(self): family.add_metric((metric.schema, metric.table), metric.estimated_row_count) yield family + + +class MultiProcessWriter(threading.Thread): + def __init__(self, registry: CollectorRegistry = DEFAULT_REGISTRY, interval: int = 15): + super().__init__(name = "metrics writer", daemon = True) + + self.registry = registry + self.interval = interval + + def run(self): + set_thread_name(self) + + while True: + for metric in self.registry.collect(): + for sample in metric.samples: + if metric.type == "gauge": + # Metrics from GaugeMetricFamily will not have the + # attribute set, for example. + multiprocess_mode = getattr(metric, "_multiprocess_mode", "all") + else: + multiprocess_mode = "" + + value = ValueClass( + metric.type, + metric.name, + sample.name, + sample.labels.keys(), + sample.labels.values(), + multiprocess_mode = multiprocess_mode) + + value.set(sample.value) + + sleep(self.interval) diff --git a/lib/id3c/utils.py b/lib/id3c/utils.py index 7a94d9eb9..c2dbba964 100644 --- a/lib/id3c/utils.py +++ b/lib/id3c/utils.py @@ -1,6 +1,8 @@ """ Utilities. """ +import ctypes +import threading from typing import Any, Sequence, Union @@ -94,3 +96,21 @@ def shorten(text, length, placeholder): return text[0:length - len(placeholder)] + placeholder else: return text + + +LIBCAP = None + +def set_thread_name(thread: threading.Thread): + global LIBCAP + + if LIBCAP is None: + try: + LIBCAP = ctypes.CDLL("libcap.so.2") + except: + LIBCAP = False + + if not LIBCAP: + return + + # From the prctl(2) manpage, PR_SET_NAME is 15. + LIBCAP.prctl(15, thread.name.encode())