diff --git a/Pipfile.lock b/Pipfile.lock index 41e71bb89..3eb4d9205 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -483,6 +483,19 @@ "markers": "python_full_version >= '3.6.1'", "version": "==1.1.5" }, + "prometheus-client": { + "hashes": [ + "sha256:9da7b32f02439d8c04f7777021c304ed51d9ec180604700c1ba72a4d44dceb03", + "sha256:b08c34c328e1bf5961f0b4352668e6c8f145b4a087e09b7296ef62cbe4693d35" + ], + "version": "==0.9.0" + }, + "prometheus-flask-exporter": { + "hashes": [ + "sha256:01c8282eb695596531f29f4869c1b127e1918af4f191f1215c4b0b0d081f757e" + ], + "version": "==0.18.1" + }, "protobuf": { "hashes": [ "sha256:0e247612fadda953047f53301a7b0407cb0c3cb4ae25a6fde661597a04039b3c", diff --git a/lib/id3c/api/__init__.py b/lib/id3c/api/__init__.py index 534617967..bb9f8eb4a 100644 --- a/lib/id3c/api/__init__.py +++ b/lib/id3c/api/__init__.py @@ -3,7 +3,7 @@ """ import logging from flask import Flask -from . import config +from . import config, metrics from .routes import blueprints @@ -17,6 +17,8 @@ def create_app(): for blueprint in blueprints: app.register_blueprint(blueprint) + metrics.register_app(app) + LOG.debug(f"app root is {app.root_path}") LOG.debug(f"app static directory is {app.static_folder}") diff --git a/lib/id3c/api/metrics.py b/lib/id3c/api/metrics.py new file mode 100644 index 000000000..dc693c6d9 --- /dev/null +++ b/lib/id3c/api/metrics.py @@ -0,0 +1,41 @@ +""" +Web API metrics. +""" +import os +from prometheus_client import CollectorRegistry, GCCollector, PlatformCollector, ProcessCollector +import prometheus_flask_exporter +import prometheus_flask_exporter.multiprocess + +from ..metrics import MultiProcessWriter + + +if "prometheus_multiproc_dir" in os.environ: + FlaskMetrics = prometheus_flask_exporter.multiprocess.MultiprocessPrometheusMetrics + MULTIPROCESS = True + +else: + FlaskMetrics = prometheus_flask_exporter.PrometheusMetrics + MULTIPROCESS = False + + +# This instance is used by both our routes and create_app(). +XXX = FlaskMetrics( + app = None, + path = None, + defaults_prefix = prometheus_flask_exporter.NO_PREFIX, + default_latency_as_histogram = False) + + +def register_app(app): + XXX.init_app(app) + + # XXX TODO FIXME needs to be postfork for a pre-forking server like uWSGI + if MULTIPROCESS: + registry = CollectorRegistry(auto_describe = True) + + ProcessCollector(registry = registry) + PlatformCollector(registry = registry) + GCCollector(registry = registry) + + writer = MultiProcessWriter(registry) + writer.start() diff --git a/lib/id3c/api/routes.py b/lib/id3c/api/routes.py index b582c482b..2e8df737c 100644 --- a/lib/id3c/api/routes.py +++ b/lib/id3c/api/routes.py @@ -4,8 +4,12 @@ import json import logging import pkg_resources -from flask import Blueprint, request, send_file +import prometheus_client +from flask import Blueprint, make_response, request, send_file + +from ..metrics import DatabaseCollector from . import datastore +from .metrics import XXX from .utils.routes import authenticated_datastore_session_required, content_types_accepted, check_content_length @@ -20,6 +24,28 @@ ] +# Metrics exposition endpoint +@api_v1.route("/metrics", methods = ["GET"]) +@XXX.do_not_track() +@authenticated_datastore_session_required +def expose_metrics(*, session): + """ + Exposes metrics for Prometheus. + + Includes metrics collected from the Flask app, as well as the database. + """ + registry = prometheus_client.CollectorRegistry(auto_describe = True) + + # Collect metrics from the server-wide registry, potentially from multiple + # server processes via files in prometheus_multiproc_dir. + registry.register(XXX.registry) + + # Collect metrics from the database using the authenticated session. + registry.register(DatabaseCollector(session)) + + return make_response(prometheus_client.make_wsgi_app(registry)) + + @api_v1.route("/", methods = ['GET']) @api_unversioned.route("/", methods = ['GET']) def index(): diff --git a/lib/id3c/metrics.py b/lib/id3c/metrics.py new file mode 100644 index 000000000..4a4e21f33 --- /dev/null +++ b/lib/id3c/metrics.py @@ -0,0 +1,95 @@ +""" +Metrics handling functions. +""" +import logging +import os +import threading +from prometheus_client import CollectorRegistry, REGISTRY as DEFAULT_REGISTRY +from prometheus_client.core import GaugeMetricFamily +from prometheus_client.values import ValueClass +from psycopg2.errors import InsufficientPrivilege +from time import sleep +from .db import DatabaseSession +from .utils import set_thread_name + + +LOG = logging.getLogger(__name__) + + +class DatabaseCollector: + """ + Collects metrics from the database, using an existing *session*. + """ + def __init__(self, session: DatabaseSession): + self.session = session + + + def collect(self): + with self.session: + yield from self.estimated_row_total() + + + def estimated_row_total(self): + family = GaugeMetricFamily( + "id3c_estimated_row_total", + "Estimated number of rows in an ID3C database table", + labels = ("schema", "table")) + + try: + metrics = self.session.fetch_all( + """ + select + ns.nspname as schema, + c.relname as table, + c.reltuples::bigint as estimated_row_count + from + pg_catalog.pg_class c + join pg_catalog.pg_namespace ns on (c.relnamespace = ns.oid) + where + ns.nspname in ('receiving', 'warehouse') and + c.relkind = 'r' + order by + schema, + "table" + """) + except InsufficientPrivilege as error: + LOG.error(f"Permission denied when collecting id3c_estimated_row_total metrics: {error}") + return + + for metric in metrics: + family.add_metric((metric.schema, metric.table), metric.estimated_row_count) + + yield family + + +class MultiProcessWriter(threading.Thread): + def __init__(self, registry: CollectorRegistry = DEFAULT_REGISTRY, interval: int = 15): + super().__init__(name = "metrics writer", daemon = True) + + self.registry = registry + self.interval = interval + + def run(self): + set_thread_name(self) + + while True: + for metric in self.registry.collect(): + for sample in metric.samples: + if metric.type == "gauge": + # Metrics from GaugeMetricFamily will not have the + # attribute set, for example. + multiprocess_mode = getattr(metric, "_multiprocess_mode", "all") + else: + multiprocess_mode = "" + + value = ValueClass( + metric.type, + metric.name, + sample.name, + sample.labels.keys(), + sample.labels.values(), + multiprocess_mode = multiprocess_mode) + + value.set(sample.value) + + sleep(self.interval) diff --git a/lib/id3c/utils.py b/lib/id3c/utils.py index 7a94d9eb9..c2dbba964 100644 --- a/lib/id3c/utils.py +++ b/lib/id3c/utils.py @@ -1,6 +1,8 @@ """ Utilities. """ +import ctypes +import threading from typing import Any, Sequence, Union @@ -94,3 +96,21 @@ def shorten(text, length, placeholder): return text[0:length - len(placeholder)] + placeholder else: return text + + +LIBCAP = None + +def set_thread_name(thread: threading.Thread): + global LIBCAP + + if LIBCAP is None: + try: + LIBCAP = ctypes.CDLL("libcap.so.2") + except: + LIBCAP = False + + if not LIBCAP: + return + + # From the prctl(2) manpage, PR_SET_NAME is 15. + LIBCAP.prctl(15, thread.name.encode()) diff --git a/setup.py b/setup.py index fecd993fa..618baffff 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,8 @@ "more-itertools", "oauth2client >2.0.0,<4.0.0", "pandas >=1.0.1,<2", + "prometheus_client", + "prometheus_flask_exporter", "psycopg2 >=2.8,<3", "pyyaml", "requests",