From 7f1db548cfc207ded1ba6134ad12d458c5cdf223 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Tue, 4 Nov 2025 14:35:21 -0500 Subject: [PATCH 01/10] Add dbm integration for clickhouse --- clickhouse/assets/configuration/spec.yaml | 48 +- clickhouse/changelog.d/21773.added | 1 + .../datadog_checks/clickhouse/clickhouse.py | 49 ++ .../clickhouse/config_models/defaults.py | 4 + .../clickhouse/config_models/instance.py | 16 +- .../clickhouse/config_models/validators.py | 24 +- .../clickhouse/data/conf.yaml.example | 124 +++-- .../clickhouse/statement_samples.py | 443 ++++++++++++++++++ clickhouse/test_query_samples.py | 212 +++++++++ clickhouse/tests/test_dbm_integration.py | 272 +++++++++++ clickhouse/tests/test_statement_samples.py | 261 +++++++++++ 11 files changed, 1403 insertions(+), 51 deletions(-) create mode 100644 clickhouse/changelog.d/21773.added create mode 100644 clickhouse/datadog_checks/clickhouse/statement_samples.py create mode 100644 clickhouse/test_query_samples.py create mode 100644 clickhouse/tests/test_dbm_integration.py create mode 100644 clickhouse/tests/test_statement_samples.py diff --git a/clickhouse/assets/configuration/spec.yaml b/clickhouse/assets/configuration/spec.yaml index 224d53ed9f9aa..2e5d4ce9c6a6e 100644 --- a/clickhouse/assets/configuration/spec.yaml +++ b/clickhouse/assets/configuration/spec.yaml @@ -14,7 +14,6 @@ files: - template: instances options: - name: server - required: true description: The hostname used to connect to the system. value: type: string @@ -51,7 +50,7 @@ files: description: | The compression algorithm to use. The default is no compression. If br is specified, the brotli library must be installed separately. - + Valid values are: - lz4 - zstd @@ -73,6 +72,51 @@ files: value: type: boolean example: True + - name: dbm + description: | + Enable Database Monitoring (DBM) to collect query samples and execution plans. + This feature provides deep observability into query performance. + value: + type: boolean + example: false + - name: query_samples + description: | + Configuration for collecting query samples when Database Monitoring (DBM) is enabled. + Query samples provide insights into the queries being executed on your ClickHouse instance. + options: + - name: enabled + description: Enable collection of query samples. + value: + type: boolean + example: true + - name: collection_interval + description: | + The interval in seconds between query sample collections. + Lower values provide more granular data but increase overhead. + value: + type: number + example: 10 + - name: samples_per_hour_per_query + description: | + The maximum number of samples to collect per unique query signature per hour. + This helps limit the volume of data collected while still providing useful insights. + value: + type: number + example: 15 + - name: seen_samples_cache_maxsize + description: | + The maximum size of the cache used to track which query samples have been collected. + A larger cache can help avoid collecting duplicate samples. + value: + type: number + example: 10000 + - name: run_sync + description: | + Whether to run query sample collection synchronously in the check run. + Set to false (default) to run asynchronously in a separate thread. + value: + type: boolean + example: false - template: instances/db overrides: custom_queries.value.example: diff --git a/clickhouse/changelog.d/21773.added b/clickhouse/changelog.d/21773.added new file mode 100644 index 0000000000000..5aa2d9b6ea2b5 --- /dev/null +++ b/clickhouse/changelog.d/21773.added @@ -0,0 +1 @@ +Add Database Monitoring (DBM) support with query sample collection from system.query_log \ No newline at end of file diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index e263da55fb8f4..d9e47375e03ef 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -7,6 +7,7 @@ from datadog_checks.base.utils.db import QueryManager from . import queries +from .statement_samples import ClickhouseStatementSamples from .utils import ErrorSanitizer @@ -30,6 +31,10 @@ def __init__(self, name, init_config, instances): self._verify = self.instance.get('verify', True) self._tags = self.instance.get('tags', []) + # DBM-related properties + self._resolved_hostname = None + self._database_identifier = None + # Add global tags self._tags.append('server:{}'.format(self._server)) self._tags.append('port:{}'.format(self._port)) @@ -58,11 +63,32 @@ def __init__(self, name, init_config, instances): ) self.check_initializations.append(self._query_manager.compile_queries) + # Initialize statement samples if DBM is enabled + self._dbm_enabled = is_affirmative(self.instance.get('dbm', False)) + self._query_samples_config = self.instance.get('query_samples', {}) + if self._dbm_enabled and self._query_samples_config.get('enabled', True): + # Create a simple config object for statement samples + class QuerySamplesConfig: + def __init__(self, config_dict): + self.enabled = config_dict.get('enabled', True) + self.collection_interval = config_dict.get('collection_interval', 10) + self.run_sync = config_dict.get('run_sync', False) + self.samples_per_hour_per_query = config_dict.get('samples_per_hour_per_query', 15) + self.seen_samples_cache_maxsize = config_dict.get('seen_samples_cache_maxsize', 10000) + + self.statement_samples = ClickhouseStatementSamples(self, QuerySamplesConfig(self._query_samples_config)) + else: + self.statement_samples = None + def check(self, _): self.connect() self._query_manager.execute() self.collect_version() + # Run statement samples if DBM is enabled + if self.statement_samples: + self.statement_samples.run_job_loop(self._tags) + @AgentCheck.metadata_entrypoint def collect_version(self): version = list(self.execute_query_raw('SELECT version()'))[0][0] @@ -75,6 +101,29 @@ def collect_version(self): def execute_query_raw(self, query): return self._client.query(query).result_rows + def _get_debug_tags(self): + """Return debug tags for metrics""" + return ['server:{}'.format(self._server)] + + @property + def reported_hostname(self): + """ + Get the hostname to be reported in metrics and events. + """ + if self._resolved_hostname is None: + self._resolved_hostname = self._server + return self._resolved_hostname + + @property + def database_identifier(self): + """ + Get a unique identifier for this database instance. + """ + if self._database_identifier is None: + # Create a unique identifier based on server, port, and database name + self._database_identifier = "{}:{}:{}".format(self._server, self._port, self._db) + return self._database_identifier + def validate_config(self): if not self._server: raise ConfigurationError('the `server` setting is required') diff --git a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py index cda1a5793bb1c..590aed6495569 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py @@ -16,6 +16,10 @@ def instance_db(): return 'default' +def instance_dbm(): + return False + + def instance_disable_generic_tags(): return False diff --git a/clickhouse/datadog_checks/clickhouse/config_models/instance.py b/clickhouse/datadog_checks/clickhouse/config_models/instance.py index 6a6f674ab672e..8d34440aef3ad 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/instance.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/instance.py @@ -41,6 +41,18 @@ class MetricPatterns(BaseModel): include: Optional[tuple[str, ...]] = None +class QuerySamples(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + run_sync: Optional[bool] = None + samples_per_hour_per_query: Optional[float] = None + seen_samples_cache_maxsize: Optional[float] = None + + class InstanceConfig(BaseModel): model_config = ConfigDict( validate_default=True, @@ -51,6 +63,7 @@ class InstanceConfig(BaseModel): connect_timeout: Optional[int] = None custom_queries: Optional[tuple[CustomQuery, ...]] = None db: Optional[str] = None + dbm: Optional[bool] = None disable_generic_tags: Optional[bool] = None empty_default_hostname: Optional[bool] = None metric_patterns: Optional[MetricPatterns] = None @@ -58,8 +71,9 @@ class InstanceConfig(BaseModel): only_custom_queries: Optional[bool] = None password: Optional[str] = None port: Optional[int] = None + query_samples: Optional[QuerySamples] = None read_timeout: Optional[int] = None - server: str + server: Optional[str] = None service: Optional[str] = None tags: Optional[tuple[str, ...]] = None tls_ca_cert: Optional[str] = None diff --git a/clickhouse/datadog_checks/clickhouse/config_models/validators.py b/clickhouse/datadog_checks/clickhouse/config_models/validators.py index 1b99ebf855087..1d5891aedee8d 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/validators.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/validators.py @@ -3,11 +3,19 @@ # Licensed under a 3-clause BSD style license (see LICENSE) # Here you can include additional config validators or transformers -# -# def initialize_instance(values, **kwargs): -# if 'my_option' not in values and 'my_legacy_option' in values: -# values['my_option'] = values['my_legacy_option'] -# if values.get('my_number') > 10: -# raise ValueError('my_number max value is 10, got %s' % str(values.get('my_number'))) -# -# return values + + +def initialize_instance(values, **kwargs): + """ + Initialize and validate instance configuration. + Maps 'host' to 'server' for backwards compatibility. + """ + # Map 'host' to 'server' for backwards compatibility + if 'server' not in values and 'host' in values: + values['server'] = values['host'] + + # Validate that either server or host was provided + if 'server' not in values: + raise ValueError("Either 'server' or 'host' must be specified in the configuration") + + return values diff --git a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example index 2df777c607797..1140bf85f453d 100644 --- a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example +++ b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example @@ -24,10 +24,11 @@ init_config: # instances: - ## @param server - string - required + - + ## @param server - string - optional ## The hostname used to connect to the system. # - - server: + # server: ## @param port - integer - optional - default: 9000 ## The port used to connect to the system. @@ -86,6 +87,46 @@ instances: # # verify: true + ## @param dbm - boolean - optional - default: false + ## Enable Database Monitoring (DBM) to collect query samples and execution plans. + ## This feature provides deep observability into query performance. + # + # dbm: false + + ## Configuration for collecting query samples when Database Monitoring (DBM) is enabled. + ## Query samples provide insights into the queries being executed on your ClickHouse instance. + # + # query_samples: + + ## @param enabled - boolean - optional - default: true + ## Enable collection of query samples. + # + # enabled: true + + ## @param collection_interval - number - optional - default: 10 + ## The interval in seconds between query sample collections. + ## Lower values provide more granular data but increase overhead. + # + # collection_interval: 10 + + ## @param samples_per_hour_per_query - number - optional - default: 15 + ## The maximum number of samples to collect per unique query signature per hour. + ## This helps limit the volume of data collected while still providing useful insights. + # + # samples_per_hour_per_query: 15 + + ## @param seen_samples_cache_maxsize - number - optional - default: 10000 + ## The maximum size of the cache used to track which query samples have been collected. + ## A larger cache can help avoid collecting duplicate samples. + # + # seen_samples_cache_maxsize: 10000 + + ## @param run_sync - boolean - optional - default: false + ## Whether to run query sample collection synchronously in the check run. + ## Set to false (default) to run asynchronously in a separate thread. + # + # run_sync: false + ## @param only_custom_queries - boolean - optional - default: false ## Set this parameter to `true` if you want to skip the integration's default metrics collection. ## Only metrics specified in `custom_queries` will be collected. @@ -105,48 +146,51 @@ instances: ## Each query must have 2 fields, and can have a third optional field: ## ## 1. query - The SQL to execute. It can be a simple statement or a multi-line script. - ## Use the pipe `|` if you require a multi-line script. + ## Use the pipe `|` if you require a multi-line script. ## 2. columns - The list representing each column, ordered sequentially from left to right. - ## The number of columns must equal the number of columns returned in the query. - ## There are 2 required pieces of data: - ## 1. name - The suffix to append to `.` to form - ## the full metric name. If `type` is a `tag` type, this column is considered a tag and applied - ## to every metric collected by this particular query. - ## 2. type - The submission method (gauge, monotonic_count, etc.). - ## This can also be set to the following `tag` types to tag each metric in the row with the name - ## and value of the item in this column: - ## 1. tag - This is the default tag type - ## 2. tag_list - This allows multiple values to be attached to the tag name. For example: - ## ``` - ## query = { - ## "name": "example", - ## "query": "...", - ## "columns": [ - ## {"name": "server_tag", "type": "tag_list"}, - ## {"name": "foo", "type": "gauge"}, - ## ] - ## } - ## ``` - ## May result in: - ## ``` - ## gauge("foo", tags=["server_tag:us", "server_tag:primary", "server_tag:default"]) - ## gauge("foo", tags=["server_tag:eu"]) - ## gauge("foo", tags=["server_tag:eu", "server_tag:primary"]) - ## ``` - ## 3. tag_not_null - This only sets tags in the metric if the value is not null - ## You can use the `count` type to perform aggregation for queries that return multiple rows with - ## the same or no tags. - ## Columns without a name are ignored. To skip a column, enter: - ## ``` - ## - {} - ## ``` + ## The number of columns must equal the number of columns returned in the query. + ## There are 2 required pieces of data: + ## a. name - The suffix to append to `.` to form + ## the full metric name. If `type` is a `tag` type, this column is + ## considered a tag and applied to every + ## metric collected by this particular query. + ## b. type - The submission method (gauge, monotonic_count, etc.). + ## This can also be set to the following `tag` types to + ## tag each metric in the row with the name and value + ## of the item in this column: + ## i. tag - This is the default tag type + ## ii. tag_list - This allows multiple values to be attached + ## to the tag name. For example: + ## + ## query = { + ## "name": "example", + ## "query": "...", + ## "columns": [ + ## {"name": "server_tag", "type": "tag_list"}, + ## {"name": "foo", "type": "gauge"}, + ## ] + ## } + ## + ## May result in: + ## gauge("foo", tags=[ + ## "server_tag:us", + ## "server_tag:primary", + ## "server_tag:default" + ## ]) + ## gauge("foo", tags=["server_tag:eu"]) + ## gauge("foo", tags=["server_tag:eu", "server_tag:primary"]) + ## iii. tag_not_null - This only sets tags in the metric if the value is not null + ## You can use the `count` type to perform aggregation + ## for queries that return multiple rows with the same or no tags. + ## Columns without a name are ignored. To skip a column, enter: + ## - {} ## 3. tags (optional) - A list of tags to apply to each metric. ## 4. collection_interval (optional) - The frequency at which to collect the metrics. ## If collection_interval is not set, the query will be run every check run. - ## If the collection interval is less than check collection interval, the query will be run every check - ## run. - ## If the collection interval is greater than check collection interval, the query will NOT BE RUN - ## exactly at the collection interval. + ## If the collection interval is less than check collection interval, + ## the query will be run every check run. + ## If the collection interval is greater than check collection interval, + ## the query will NOT BE RUN exactly at the collection interval. ## The query will be run at the next check run after the collection interval has passed. ## 5. metric_prefix (optional) - The prefix to apply to each metric. # diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py new file mode 100644 index 0000000000000..9e827e823f4a7 --- /dev/null +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -0,0 +1,443 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +from cachetools import TTLCache + +if TYPE_CHECKING: + from datadog_checks.clickhouse import ClickhouseCheck + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from datadog_checks.base.utils.common import to_native_string +from datadog_checks.base.utils.db.sql import compute_sql_signature +from datadog_checks.base.utils.db.utils import ( + DBMAsyncJob, + RateLimitingTTLCache, + default_json_event_encoding, + obfuscate_sql_with_metadata, +) +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method + +# Query to get recent queries from system.query_log +# We collect queries that have finished execution and have sufficient information +QUERY_LOG_QUERY = """ +SELECT + event_time, + query_id, + query, + type, + user, + query_duration_ms, + read_rows, + read_bytes, + written_rows, + written_bytes, + result_rows, + result_bytes, + memory_usage, + exception +FROM system.query_log +WHERE + type IN ('QueryFinish', 'ExceptionWhileProcessing') + AND event_time >= toDateTime('{start_time}') + AND event_time < toDateTime('{end_time}') + AND query NOT LIKE '%system.query_log%' + AND query NOT LIKE '%system.processes%' +ORDER BY event_time DESC +LIMIT {limit} +""" + +# Query to get active queries from system.processes +ACTIVE_QUERIES_QUERY = """ +SELECT + elapsed, + query_id, + query, + user, + read_rows, + read_bytes, + written_rows, + written_bytes, + memory_usage, + initial_query_id +FROM system.processes +WHERE query NOT LIKE '%%system.processes%%' +""" + + +def agent_check_getter(self): + return self._check + + +class ClickhouseStatementSamples(DBMAsyncJob): + """ + Collects statement samples from ClickHouse query logs. + """ + + def __init__(self, check: ClickhouseCheck, config): + # Default collection interval if not specified + collection_interval = getattr(config, 'collection_interval', 10) + + super(ClickhouseStatementSamples, self).__init__( + check, + rate_limit=1 / collection_interval, + run_sync=getattr(config, 'run_sync', False), + enabled=getattr(config, 'enabled', True), + dbms="clickhouse", + min_collection_interval=check.check_interval if hasattr(check, 'check_interval') else 15, + expected_db_exceptions=(Exception,), + job_name="query-samples", + ) + self._check = check + self._config = config + self._tags_no_db = None + self.tags = None + self._last_collection_timestamp = None + + # Get obfuscator options from config if available + obfuscate_options = { + 'return_json_metadata': True, + 'collect_tables': True, + 'collect_commands': True, + 'collect_comments': True, + } + self._obfuscate_options = to_native_string(json.dumps(obfuscate_options)) + + # Rate limiters for query samples + self._seen_samples_ratelimiter = RateLimitingTTLCache( + maxsize=getattr(config, 'seen_samples_cache_maxsize', 10000), + ttl=60 * 60 / getattr(config, 'samples_per_hour_per_query', 15), + ) + + # Cache for storing query execution plans + self._explain_plan_cache = TTLCache( + maxsize=1000, + ttl=3600, # 1 hour TTL + ) + + self._collection_interval = collection_interval + + def _dbtags(self, db, *extra_tags): + """ + Returns the default instance tags with the initial "db" tag replaced with the provided tag + """ + t = ["db:" + db] + if extra_tags: + t.extend(extra_tags) + if self._tags_no_db: + t.extend(self._tags_no_db) + return t + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_query_log_samples(self): + """ + Fetch recent queries from system.query_log + """ + start_time = time.time() + + # Calculate time window for query collection + end_time_ts = time.time() + if self._last_collection_timestamp is None: + # First run: collect queries from the last collection interval + start_time_ts = end_time_ts - self._collection_interval + else: + start_time_ts = self._last_collection_timestamp + + # Convert to datetime strings for ClickHouse + from datetime import datetime + + start_time_dt = datetime.fromtimestamp(start_time_ts) + end_time_dt = datetime.fromtimestamp(end_time_ts) + + params = { + 'start_time': start_time_dt.strftime('%Y-%m-%d %H:%M:%S'), + 'end_time': end_time_dt.strftime('%Y-%m-%d %H:%M:%S'), + 'limit': 100, + } + + try: + # Execute query using the check's client + query = QUERY_LOG_QUERY.format(**params) + self._log.debug("Executing query log query: %s", query) + rows = self._check.execute_query_raw(query) + + self._last_collection_timestamp = end_time_ts + + self._report_check_hist_metrics(start_time, len(rows), "get_query_log_samples") + self._log.info("Loaded %s rows from system.query_log", len(rows)) + + return rows + except Exception as e: + self._log.exception("Failed to collect query log samples: %s", str(e)) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:query-log-fetch"] + self._get_debug_tags(), + raw=True, + ) + return [] + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_active_queries(self): + """ + Fetch currently running queries from system.processes + """ + start_time = time.time() + + try: + rows = self._check.execute_query_raw(ACTIVE_QUERIES_QUERY) + + self._report_check_hist_metrics(start_time, len(rows), "get_active_queries") + self._log.debug("Loaded %s rows from system.processes", len(rows)) + + return rows + except Exception as e: + self._log.warning("Failed to collect active queries: %s", str(e)) + return [] + + def _normalize_query_log_row(self, row): + """ + Normalize a row from system.query_log into a standard format + """ + try: + ( + event_time, + query_id, + query, + query_type, + user, + query_duration_ms, + read_rows, + read_bytes, + written_rows, + written_bytes, + result_rows, + result_bytes, + memory_usage, + exception, + ) = row + + normalized_row = { + 'timestamp': event_time, + 'query_id': str(query_id), + 'query': str(query), + 'type': str(query_type), + 'user': str(user), + 'duration_ms': float(query_duration_ms) if query_duration_ms else 0, + 'read_rows': int(read_rows) if read_rows else 0, + 'read_bytes': int(read_bytes) if read_bytes else 0, + 'written_rows': int(written_rows) if written_rows else 0, + 'written_bytes': int(written_bytes) if written_bytes else 0, + 'result_rows': int(result_rows) if result_rows else 0, + 'result_bytes': int(result_bytes) if result_bytes else 0, + 'memory_usage': int(memory_usage) if memory_usage else 0, + 'exception': str(exception) if exception else None, + } + + return self._obfuscate_and_normalize_query(normalized_row) + except Exception as e: + self._log.warning("Failed to normalize query log row: %s, row: %s", str(e), row) + raise + + def _obfuscate_and_normalize_query(self, row): + """ + Obfuscate the query and compute its signature + """ + obfuscated_query = None + try: + statement = obfuscate_sql_with_metadata(row['query'], self._obfuscate_options) + obfuscated_query = statement['query'] + metadata = statement['metadata'] + row['query_signature'] = compute_sql_signature(obfuscated_query) + row['dd_tables'] = metadata.get('tables', None) + row['dd_commands'] = metadata.get('commands', None) + row['dd_comments'] = metadata.get('comments', None) + except Exception as e: + self._log.debug("Failed to obfuscate query: %s", e) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:sql-obfuscate"] + self._check._get_debug_tags(), + raw=True, + ) + # Use a default query signature if obfuscation fails + row['query_signature'] = compute_sql_signature(row['query'][:100]) + + row['statement'] = obfuscated_query + return row + + def _get_debug_tags(self): + return self._check._get_debug_tags() if hasattr(self._check, '_get_debug_tags') else [] + + def _report_check_hist_metrics(self, start_time, row_len, method_name): + """ + Report histogram metrics for check operations + """ + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram( + f"dd.clickhouse.{method_name}.time", + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.histogram( + f"dd.clickhouse.{method_name}.rows", + row_len, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_statement_samples(self): + """ + Main method to collect and submit statement samples + """ + start_time = time.time() + + # Get query log samples + rows = self._get_query_log_samples() + + self._log.info("Retrieved %s query log samples for processing", len(rows)) + + submitted_count = 0 + skipped_count = 0 + error_count = 0 + + for row in rows: + try: + normalized_row = self._normalize_query_log_row(row) + + # Check if we should submit this sample based on rate limiting + query_signature = normalized_row.get('query_signature') + if not query_signature: + self._log.debug("Skipping row without query signature") + skipped_count += 1 + continue + + if not self._seen_samples_ratelimiter.acquire(query_signature): + skipped_count += 1 + continue + + # Create the event payload + event = self._create_sample_event(normalized_row) + + # Submit the event + self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding)) + submitted_count += 1 + self._log.debug("Submitted query sample for signature: %s", query_signature[:50]) + + except Exception as e: + error_count += 1 + self._log.exception("Error processing query log row: %s", e) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:process-row"] + self._get_debug_tags(), + raw=True, + ) + + elapsed_ms = (time.time() - start_time) * 1000 + + self._log.info( + "Statement sample collection complete: submitted=%s, skipped=%s, errors=%s, elapsed_ms=%.2f", + submitted_count, skipped_count, error_count, elapsed_ms + ) + + self._check.histogram( + "dd.clickhouse.collect_statement_samples.time", + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.count( + "dd.clickhouse.collect_statement_samples.events_submitted.count", + submitted_count, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.count( + "dd.clickhouse.collect_statement_samples.events_skipped.count", + skipped_count, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.count( + "dd.clickhouse.collect_statement_samples.events_errors.count", + error_count, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.gauge( + "dd.clickhouse.collect_statement_samples.seen_samples_cache.len", + len(self._seen_samples_ratelimiter), + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + def _create_sample_event(self, row): + """ + Create a database monitoring query sample event + """ + db = self._check._db + + event = { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "dbm_type": "sample", + "ddtags": ",".join(self._dbtags(db)), + "timestamp": int(time.time() * 1000), + "db": { + "instance": db, + "query_signature": row.get('query_signature'), + "statement": row.get('statement'), + "user": row.get('user'), + "metadata": { + "tables": row.get('dd_tables'), + "commands": row.get('dd_commands'), + "comments": row.get('dd_comments'), + }, + }, + "clickhouse": { + "query_id": row.get('query_id'), + "type": row.get('type'), + "duration_ms": row.get('duration_ms'), + "read_rows": row.get('read_rows'), + "read_bytes": row.get('read_bytes'), + "written_rows": row.get('written_rows'), + "written_bytes": row.get('written_bytes'), + "result_rows": row.get('result_rows'), + "result_bytes": row.get('result_bytes'), + "memory_usage": row.get('memory_usage'), + }, + } + + # Add exception information if present + if row.get('exception'): + event['clickhouse']['exception'] = row['exception'] + + # Add duration if available + if row.get('duration_ms'): + event['duration'] = int(row['duration_ms'] * 1e6) # Convert to nanoseconds + + return event + + def run_job(self): + """ + Main job execution method called by DBMAsyncJob + """ + # Filter out internal tags + self.tags = [t for t in self._check._tags if not t.startswith('dd.internal')] + self._tags_no_db = [t for t in self.tags if not t.startswith('db:')] + + self._collect_statement_samples() diff --git a/clickhouse/test_query_samples.py b/clickhouse/test_query_samples.py new file mode 100644 index 0000000000000..9e299aa025df0 --- /dev/null +++ b/clickhouse/test_query_samples.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +""" +Test script to verify ClickHouse query samples are being collected correctly. +Run this inside the datadog-agent container. +""" + +import sys +import time +import subprocess +import json + + +def run_command(cmd): + """Execute a shell command and return output.""" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + return result.stdout, result.stderr, result.returncode + + +def test_clickhouse_query_log(): + """Test if ClickHouse has query log data.""" + print("=" * 60) + print("TEST 1: Checking ClickHouse query_log table") + print("=" * 60) + + query = """ + SELECT count(*) as query_count + FROM system.query_log + WHERE event_time > now() - INTERVAL 60 SECOND + """ + + cmd = f"clickhouse-client --host clickhouse-primary --port 9000 --user datadog --password datadog --query \"{query}\"" + stdout, stderr, code = run_command(cmd) + + if code != 0: + print(f"❌ FAILED: Cannot query ClickHouse: {stderr}") + return False + + count = int(stdout.strip()) + print(f"✅ Query log has {count} queries in the last 60 seconds") + + if count == 0: + print("⚠️ WARNING: No queries found. Make sure the orders app is running.") + return False + + return True + + +def test_agent_check(): + """Test if the ClickHouse integration check runs successfully.""" + print("\n" + "=" * 60) + print("TEST 2: Running ClickHouse agent check") + print("=" * 60) + + cmd = "agent check clickhouse -l info 2>&1 | tail -50" + stdout, stderr, code = run_command(cmd) + + if "Running check clickhouse" in stdout: + print("✅ ClickHouse check is running") + else: + print("❌ FAILED: ClickHouse check not running properly") + print(stdout[-500:]) # Last 500 chars + return False + + if "error" in stdout.lower() and "statement_sample" in stdout.lower(): + print("⚠️ WARNING: Errors found in check output:") + print(stdout) + return False + + return True + + +def test_agent_logs(): + """Check agent logs for statement sample activity.""" + print("\n" + "=" * 60) + print("TEST 3: Checking agent logs for query samples") + print("=" * 60) + + # Wait a bit for samples to be collected + print("Waiting 15 seconds for sample collection...") + time.sleep(15) + + cmd = "tail -100 /var/log/datadog/agent.log | grep -i 'statement_sample\\|query_sample\\|query log samples' | tail -20" + stdout, stderr, code = run_command(cmd) + + if not stdout: + print("⚠️ No statement sample logs found") + print("Checking for any ClickHouse logs...") + cmd = "tail -100 /var/log/datadog/agent.log | grep -i clickhouse | tail -10" + stdout, _, _ = run_command(cmd) + print(stdout if stdout else "No ClickHouse logs found") + return False + + print("✅ Found statement sample activity in logs:") + print(stdout) + + # Check for success indicators + if "Loaded" in stdout and "rows from system.query_log" in stdout: + print("✅ Query log samples are being fetched") + return True + + if "submitted" in stdout.lower(): + print("✅ Samples are being submitted") + return True + + return False + + +def test_metrics(): + """Check if statement sample metrics are being reported.""" + print("\n" + "=" * 60) + print("TEST 4: Checking statement sample metrics") + print("=" * 60) + + cmd = "agent status 2>&1 | grep -A 20 clickhouse | grep -i 'sample\\|query_log'" + stdout, stderr, code = run_command(cmd) + + if stdout: + print("✅ Found ClickHouse metrics:") + print(stdout) + return True + else: + print("⚠️ No statement sample metrics found in agent status") + return False + + +def show_sample_queries(): + """Show sample queries from query_log.""" + print("\n" + "=" * 60) + print("BONUS: Sample queries in ClickHouse query_log") + print("=" * 60) + + query = """ + SELECT + event_time, + user, + query_duration_ms, + substring(query, 1, 100) as query_preview + FROM system.query_log + WHERE query NOT LIKE '%system.query_log%' + AND event_time > now() - INTERVAL 60 SECOND + ORDER BY event_time DESC + LIMIT 10 + FORMAT Pretty + """ + + cmd = f"clickhouse-client --host clickhouse-primary --port 9000 --user datadog --password datadog --query \"{query}\"" + stdout, stderr, code = run_command(cmd) + + if code == 0: + print(stdout) + else: + print(f"Could not fetch sample queries: {stderr}") + + +def main(): + """Run all tests.""" + print("\n" + "=" * 60) + print("CLICKHOUSE QUERY SAMPLES VERIFICATION") + print("=" * 60) + + tests = [ + ("ClickHouse Query Log", test_clickhouse_query_log), + ("Agent Check", test_agent_check), + ("Agent Logs", test_agent_logs), + ("Metrics", test_metrics), + ] + + results = [] + for test_name, test_func in tests: + try: + result = test_func() + results.append((test_name, result)) + except Exception as e: + print(f"❌ FAILED: {test_name} - {e}") + results.append((test_name, False)) + + # Show sample queries + try: + show_sample_queries() + except Exception as e: + print(f"Could not show sample queries: {e}") + + # Summary + print("\n" + "=" * 60) + print("TEST SUMMARY") + print("=" * 60) + + passed = sum(1 for _, result in results if result) + total = len(results) + + for test_name, result in results: + status = "✅ PASS" if result else "❌ FAIL" + print(f"{status}: {test_name}") + + print(f"\nTotal: {passed}/{total} tests passed") + + if passed == total: + print("\n🎉 All tests passed! Query samples should be working.") + else: + print("\n⚠️ Some tests failed. Check the output above for details.") + print("\nTroubleshooting tips:") + print("1. Make sure orders app is running and generating queries") + print("2. Check that DBM is enabled in clickhouse.yaml") + print("3. Verify datadog user has permissions on system.query_log") + print("4. Check agent logs: tail -f /var/log/datadog/agent.log | grep clickhouse") + + return 0 if passed == total else 1 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/clickhouse/tests/test_dbm_integration.py b/clickhouse/tests/test_dbm_integration.py new file mode 100644 index 0000000000000..49b368d9eda6b --- /dev/null +++ b/clickhouse/tests/test_dbm_integration.py @@ -0,0 +1,272 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import time +from copy import deepcopy + +import clickhouse_connect +import pytest + +from datadog_checks.clickhouse import ClickhouseCheck + +from .common import CONFIG + + +@pytest.mark.integration +@pytest.mark.usefixtures('dd_environment') +class TestDBMIntegration: + """Integration tests for Database Monitoring (DBM) query samples""" + + def test_query_samples_are_collected(self, aggregator, instance): + """ + Test that query samples are actually collected and submitted when DBM is enabled + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + 'collection_interval': 1, # Collect every second for testing + 'samples_per_hour_per_query': 100, # Allow many samples for testing + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # First, generate some queries in ClickHouse to populate query_log + client = clickhouse_connect.get_client( + host=instance_config['server'], + port=instance_config['port'], + username=instance_config['username'], + password=instance_config['password'], + ) + + # Run several different queries to populate query_log + test_queries = [ + "SELECT 1", + "SELECT count(*) FROM system.tables", + "SELECT name, engine FROM system.databases", + "SELECT version()", + "SELECT now()", + ] + + for query in test_queries: + try: + client.query(query) + except Exception as e: + print(f"Query '{query}' failed: {e}") + + # Wait a moment for queries to appear in query_log + time.sleep(2) + + # Verify there are queries in query_log + result = client.query("SELECT count(*) FROM system.query_log WHERE event_time > now() - INTERVAL 1 MINUTE") + query_log_count = result.result_rows[0][0] + print(f"Found {query_log_count} queries in query_log") + + # Run the check - this should collect samples + check.check(None) + + # Wait for async job to complete if running async + if check.statement_samples and hasattr(check.statement_samples, '_job_loop_future'): + if check.statement_samples._job_loop_future: + check.statement_samples._job_loop_future.result(timeout=5) + + # Run check again to ensure we collect samples + time.sleep(1) + check.check(None) + + # Verify metrics are being reported + aggregator.assert_metric('dd.clickhouse.collect_statement_samples.events_submitted.count') + aggregator.assert_metric('dd.clickhouse.get_query_log_samples.rows') + + # Get the count of submitted events + events_submitted = aggregator.metrics('dd.clickhouse.collect_statement_samples.events_submitted.count') + if events_submitted: + total_submitted = sum(m.value for m in events_submitted) + print(f"Total query samples submitted: {total_submitted}") + assert total_submitted > 0, "Expected at least one query sample to be submitted" + + def test_query_samples_disabled(self, aggregator, instance): + """ + Test that query samples are NOT collected when DBM is disabled + """ + # Configure instance with DBM disabled + instance_config = deepcopy(instance) + instance_config['dbm'] = False + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Verify statement_samples is None + assert check.statement_samples is None, "statement_samples should be None when DBM is disabled" + + # Run the check + check.check(None) + + # Verify no DBM metrics are reported + assert not aggregator.metrics( + 'dd.clickhouse.collect_statement_samples.events_submitted.count' + ), "No DBM metrics should be reported when DBM is disabled" + + def test_query_samples_with_activity(self, aggregator, instance, dd_run_check): + """ + Test that query samples capture actual query activity with details + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + 'collection_interval': 1, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Connect and run a distinctive query + client = clickhouse_connect.get_client( + host=instance_config['server'], + port=instance_config['port'], + username=instance_config['username'], + password=instance_config['password'], + ) + + # Run a query that will be easy to identify + distinctive_query = "SELECT 'DBM_TEST_QUERY' as test_column, count(*) FROM system.tables" + client.query(distinctive_query) + + # Wait for query to appear in query_log + time.sleep(2) + + # Verify the query is in query_log + result = client.query(""" + SELECT count(*) + FROM system.query_log + WHERE query LIKE '%DBM_TEST_QUERY%' + AND event_time > now() - INTERVAL 1 MINUTE + """) + assert result.result_rows[0][0] > 0, "Distinctive query should be in query_log" + + # Run the check + dd_run_check(check) + + # Wait for async processing + time.sleep(2) + + # Run check again + dd_run_check(check) + + # Verify we collected some rows + rows_metric = aggregator.metrics('dd.clickhouse.get_query_log_samples.rows') + if rows_metric: + total_rows = sum(m.value for m in rows_metric) + print(f"Total rows collected from query_log: {total_rows}") + assert total_rows > 0, "Should have collected rows from query_log" + + def test_query_samples_properties(self, instance): + """ + Test that required DBM properties are correctly set on the check + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Verify DBM properties exist + assert hasattr(check, 'reported_hostname'), "Check should have reported_hostname property" + assert hasattr(check, 'database_identifier'), "Check should have database_identifier property" + + # Verify properties return expected values + hostname = check.reported_hostname + db_id = check.database_identifier + + assert hostname is not None, "reported_hostname should not be None" + assert db_id is not None, "database_identifier should not be None" + assert check._server in hostname, "hostname should contain server name" + assert str(check._port) in db_id, "database_identifier should contain port" + assert check._db in db_id, "database_identifier should contain database name" + + print(f"reported_hostname: {hostname}") + print(f"database_identifier: {db_id}") + + def test_statement_samples_event_structure(self, instance): + """ + Test that the event structure for query samples is correct + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Create a mock row to test event creation + mock_row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id', + 'query': 'SELECT * FROM system.tables WHERE name = ?', + 'statement': 'SELECT * FROM system.tables WHERE name = ?', + 'query_signature': 'test-signature', + 'type': 'QueryFinish', + 'user': 'datadog', + 'duration_ms': 100, + 'read_rows': 10, + 'read_bytes': 1024, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 10, + 'result_bytes': 1024, + 'memory_usage': 2048, + 'exception': None, + 'dd_tables': ['system.tables'], + 'dd_commands': ['SELECT'], + 'dd_comments': [], + } + + # Create event + event = check.statement_samples._create_sample_event(mock_row) + + # Verify event structure + assert 'host' in event, "Event should have host field" + assert 'database_instance' in event, "Event should have database_instance field" + assert 'ddagentversion' in event, "Event should have ddagentversion field" + assert 'ddsource' in event, "Event should have ddsource field" + assert event['ddsource'] == 'clickhouse', "ddsource should be clickhouse" + assert 'dbm_type' in event, "Event should have dbm_type field" + assert event['dbm_type'] == 'sample', "dbm_type should be sample" + assert 'timestamp' in event, "Event should have timestamp field" + assert 'db' in event, "Event should have db field" + assert 'clickhouse' in event, "Event should have clickhouse field" + + # Verify db section + db_section = event['db'] + assert 'instance' in db_section + assert 'query_signature' in db_section + assert 'statement' in db_section + assert 'user' in db_section + assert 'metadata' in db_section + + # Verify clickhouse section + ch_section = event['clickhouse'] + assert 'query_id' in ch_section + assert 'type' in ch_section + assert 'duration_ms' in ch_section + assert 'read_rows' in ch_section + assert 'memory_usage' in ch_section + + # Verify duration is in nanoseconds + assert 'duration' in event + assert event['duration'] == 100 * 1e6 # 100ms in nanoseconds + + print("Event structure is valid!") + print(f"Event keys: {list(event.keys())}") + diff --git a/clickhouse/tests/test_statement_samples.py b/clickhouse/tests/test_statement_samples.py new file mode 100644 index 0000000000000..a80aeb56c2f8b --- /dev/null +++ b/clickhouse/tests/test_statement_samples.py @@ -0,0 +1,261 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import time +from unittest import mock + +import pytest + +from datadog_checks.clickhouse import ClickhouseCheck +from datadog_checks.clickhouse.statement_samples import ClickhouseStatementSamples + + +@pytest.fixture +def instance_with_dbm(): + """Return a ClickHouse instance configuration with DBM enabled""" + return { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': True, + 'query_samples': { + 'enabled': True, + 'collection_interval': 10, + 'samples_per_hour_per_query': 15, + 'seen_samples_cache_maxsize': 10000, + 'run_sync': False, + }, + 'tags': ['test:clickhouse'], + } + + +@pytest.fixture +def check_with_dbm(instance_with_dbm): + """Return a ClickHouse check instance with DBM enabled""" + check = ClickhouseCheck('clickhouse', {}, [instance_with_dbm]) + return check + + +def test_statement_samples_initialization(check_with_dbm): + """Test that statement samples are properly initialized when DBM is enabled""" + assert check_with_dbm.statement_samples is not None + assert isinstance(check_with_dbm.statement_samples, ClickhouseStatementSamples) + assert check_with_dbm.statement_samples._config.enabled is True + assert check_with_dbm.statement_samples._config.collection_interval == 10 + + +def test_statement_samples_disabled(): + """Test that statement samples are not initialized when DBM is disabled""" + instance = { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': False, + 'tags': ['test:clickhouse'], + } + check = ClickhouseCheck('clickhouse', {}, [instance]) + assert check.statement_samples is None + + +def test_obfuscate_and_normalize_query(check_with_dbm): + """Test query obfuscation and normalization""" + statement_samples = check_with_dbm.statement_samples + + row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id-123', + 'query': 'SELECT * FROM users WHERE user_id = 12345', + 'type': 'QueryFinish', + 'user': 'default', + 'duration_ms': 100, + 'read_rows': 1, + 'read_bytes': 100, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 1, + 'result_bytes': 100, + 'memory_usage': 1024, + 'exception': None, + } + + normalized_row = statement_samples._obfuscate_and_normalize_query(row) + + # Verify that query was obfuscated (literal should be replaced) + assert normalized_row['statement'] is not None + assert '12345' not in normalized_row['statement'] + assert normalized_row['query_signature'] is not None + + # Verify metadata was collected + assert 'dd_tables' in normalized_row + assert 'dd_commands' in normalized_row + assert 'dd_comments' in normalized_row + + +def test_normalize_query_log_row(check_with_dbm): + """Test normalization of query log rows""" + statement_samples = check_with_dbm.statement_samples + + # Simulate a row from system.query_log + row = ( + time.time(), # event_time + 'test-query-id-123', # query_id + 'SELECT count(*) FROM system.tables', # query + 'QueryFinish', # type + 'default', # user + 50, # query_duration_ms + 10, # read_rows + 1024, # read_bytes + 0, # written_rows + 0, # written_bytes + 1, # result_rows + 8, # result_bytes + 2048, # memory_usage + None, # exception + ) + + normalized_row = statement_samples._normalize_query_log_row(row) + + # Verify basic fields + assert normalized_row['query_id'] == 'test-query-id-123' + assert normalized_row['user'] == 'default' + assert normalized_row['type'] == 'QueryFinish' + assert normalized_row['duration_ms'] == 50 + assert normalized_row['read_rows'] == 10 + assert normalized_row['memory_usage'] == 2048 + + # Verify obfuscation occurred + assert normalized_row['statement'] is not None + assert normalized_row['query_signature'] is not None + + +def test_create_sample_event(check_with_dbm): + """Test creation of sample events for submission""" + statement_samples = check_with_dbm.statement_samples + + normalized_row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id-123', + 'query': 'SELECT * FROM users', + 'statement': 'SELECT * FROM users', + 'query_signature': 'abc123', + 'type': 'QueryFinish', + 'user': 'default', + 'duration_ms': 100, + 'read_rows': 10, + 'read_bytes': 1024, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 10, + 'result_bytes': 1024, + 'memory_usage': 2048, + 'exception': None, + 'dd_tables': ['users'], + 'dd_commands': ['SELECT'], + 'dd_comments': [], + } + + event = statement_samples._create_sample_event(normalized_row) + + # Verify event structure + assert event['ddsource'] == 'clickhouse' + assert event['dbm_type'] == 'sample' + assert 'timestamp' in event + assert 'db' in event + assert event['db']['query_signature'] == 'abc123' + assert event['db']['statement'] == 'SELECT * FROM users' + assert event['db']['user'] == 'default' + + # Verify ClickHouse-specific fields + assert 'clickhouse' in event + assert event['clickhouse']['query_id'] == 'test-query-id-123' + assert event['clickhouse']['duration_ms'] == 100 + assert event['clickhouse']['read_rows'] == 10 + assert event['clickhouse']['memory_usage'] == 2048 + + # Verify duration is in nanoseconds + assert event['duration'] == 100 * 1e6 + + +def test_rate_limiting(check_with_dbm): + """Test that query sample rate limiting works correctly""" + statement_samples = check_with_dbm.statement_samples + + query_signature = 'test-signature-123' + + # First acquisition should succeed + assert statement_samples._seen_samples_ratelimiter.acquire(query_signature) is True + + # Immediate re-acquisition should fail due to rate limiting + assert statement_samples._seen_samples_ratelimiter.acquire(query_signature) is False + + +def test_query_log_query_format(): + """Test that the query log query is properly formatted""" + from datadog_checks.clickhouse.statement_samples import QUERY_LOG_QUERY + + # Verify query contains necessary clauses + assert 'system.query_log' in QUERY_LOG_QUERY + assert 'event_time' in QUERY_LOG_QUERY + assert 'query_id' in QUERY_LOG_QUERY + assert 'query' in QUERY_LOG_QUERY + assert 'query_duration_ms' in QUERY_LOG_QUERY + assert 'WHERE' in QUERY_LOG_QUERY + assert 'LIMIT' in QUERY_LOG_QUERY + + +def test_active_queries_query_format(): + """Test that the active queries query is properly formatted""" + from datadog_checks.clickhouse.statement_samples import ACTIVE_QUERIES_QUERY + + # Verify query contains necessary clauses + assert 'system.processes' in ACTIVE_QUERIES_QUERY + assert 'query_id' in ACTIVE_QUERIES_QUERY + assert 'query' in ACTIVE_QUERIES_QUERY + assert 'elapsed' in ACTIVE_QUERIES_QUERY + assert 'memory_usage' in ACTIVE_QUERIES_QUERY + + +@mock.patch('datadog_checks.clickhouse.statement_samples.datadog_agent') +def test_metrics_reporting(mock_agent, check_with_dbm, aggregator): + """Test that statement samples report metrics correctly""" + statement_samples = check_with_dbm.statement_samples + statement_samples.tags = ['test:clickhouse', 'db:default'] + statement_samples._tags_no_db = ['test:clickhouse'] + + # Mock the agent version + mock_agent.get_version.return_value = '7.50.0' + + # Call the metrics reporting method + start_time = time.time() + statement_samples._report_check_hist_metrics(start_time, 10, "test_method") + + # Verify histograms were submitted + aggregator.assert_metric('dd.clickhouse.test_method.time') + aggregator.assert_metric('dd.clickhouse.test_method.rows') + + +def test_get_debug_tags(check_with_dbm): + """Test that debug tags are properly generated""" + statement_samples = check_with_dbm.statement_samples + debug_tags = statement_samples._get_debug_tags() + + # Verify debug tags contain server information + assert any('server:' in tag for tag in debug_tags) + + +def test_dbtags(check_with_dbm): + """Test that database tags are properly generated""" + statement_samples = check_with_dbm.statement_samples + statement_samples._tags_no_db = ['test:clickhouse', 'server:localhost'] + + db_tags = statement_samples._dbtags('testdb', 'extra:tag') + + # Verify tags include database, extra tags, and base tags + assert 'db:testdb' in db_tags + assert 'extra:tag' in db_tags + assert 'test:clickhouse' in db_tags + assert 'server:localhost' in db_tags From 8b11eb47846c8b34036b7b250da811141ef58a88 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Wed, 5 Nov 2025 14:45:02 -0500 Subject: [PATCH 02/10] Update --- clickhouse/datadog_checks/clickhouse/statement_samples.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index 9e827e823f4a7..ec05437017a8e 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -330,8 +330,14 @@ def _collect_statement_samples(self): # Create the event payload event = self._create_sample_event(normalized_row) + # Log the event payload for debugging + self._log.info("Query sample event payload: ddsource=%s, query_signature=%s", + event.get('ddsource'), query_signature[:50] if query_signature else 'N/A') + # Submit the event - self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding)) + event_json = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Full event JSON (first 500 chars): %s", event_json[:500]) + self._check.database_monitoring_query_sample(event_json) submitted_count += 1 self._log.debug("Submitted query sample for signature: %s", query_signature[:50]) From b118eb21bcdb56b5ead2ba160d194a7fdd2f7822 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Wed, 5 Nov 2025 15:48:36 -0500 Subject: [PATCH 03/10] Update to the correct implementation --- .../clickhouse/statement_samples.py | 277 ++++++------------ 1 file changed, 92 insertions(+), 185 deletions(-) diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index ec05437017a8e..6d6c81a9b8e5f 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -27,36 +27,8 @@ from datadog_checks.base.utils.serialization import json from datadog_checks.base.utils.tracking import tracked_method -# Query to get recent queries from system.query_log -# We collect queries that have finished execution and have sufficient information -QUERY_LOG_QUERY = """ -SELECT - event_time, - query_id, - query, - type, - user, - query_duration_ms, - read_rows, - read_bytes, - written_rows, - written_bytes, - result_rows, - result_bytes, - memory_usage, - exception -FROM system.query_log -WHERE - type IN ('QueryFinish', 'ExceptionWhileProcessing') - AND event_time >= toDateTime('{start_time}') - AND event_time < toDateTime('{end_time}') - AND query NOT LIKE '%system.query_log%' - AND query NOT LIKE '%system.processes%' -ORDER BY event_time DESC -LIMIT {limit} -""" - -# Query to get active queries from system.processes +# Query to get currently running/active queries from system.processes +# This is the ClickHouse equivalent of Postgres pg_stat_activity ACTIVE_QUERIES_QUERY = """ SELECT elapsed, @@ -68,11 +40,26 @@ written_rows, written_bytes, memory_usage, - initial_query_id + initial_query_id, + initial_user, + query_kind, + is_initial_query FROM system.processes -WHERE query NOT LIKE '%%system.processes%%' +WHERE query NOT LIKE '%system.processes%' + AND query NOT LIKE '%system.query_log%' + AND query != '' """ +# Columns from system.processes which correspond to attributes common to all databases +# and are therefore stored under other standard keys +system_processes_sample_exclude_keys = { + # we process & obfuscate this separately + 'query', + # stored separately + 'user', + 'query_id', +} + def agent_check_getter(self): return self._check @@ -80,7 +67,8 @@ def agent_check_getter(self): class ClickhouseStatementSamples(DBMAsyncJob): """ - Collects statement samples from ClickHouse query logs. + Collects statement samples from ClickHouse active queries (system.processes). + Similar to Postgres integration using pg_stat_activity. """ def __init__(self, check: ClickhouseCheck, config): @@ -101,7 +89,6 @@ def __init__(self, check: ClickhouseCheck, config): self._config = config self._tags_no_db = None self.tags = None - self._last_collection_timestamp = None # Get obfuscator options from config if available obfuscate_options = { @@ -118,12 +105,6 @@ def __init__(self, check: ClickhouseCheck, config): ttl=60 * 60 / getattr(config, 'samples_per_hour_per_query', 15), ) - # Cache for storing query execution plans - self._explain_plan_cache = TTLCache( - maxsize=1000, - ttl=3600, # 1 hour TTL - ) - self._collection_interval = collection_interval def _dbtags(self, db, *extra_tags): @@ -137,59 +118,17 @@ def _dbtags(self, db, *extra_tags): t.extend(self._tags_no_db) return t - @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) - def _get_query_log_samples(self): - """ - Fetch recent queries from system.query_log - """ - start_time = time.time() - - # Calculate time window for query collection - end_time_ts = time.time() - if self._last_collection_timestamp is None: - # First run: collect queries from the last collection interval - start_time_ts = end_time_ts - self._collection_interval - else: - start_time_ts = self._last_collection_timestamp - - # Convert to datetime strings for ClickHouse - from datetime import datetime - - start_time_dt = datetime.fromtimestamp(start_time_ts) - end_time_dt = datetime.fromtimestamp(end_time_ts) - - params = { - 'start_time': start_time_dt.strftime('%Y-%m-%d %H:%M:%S'), - 'end_time': end_time_dt.strftime('%Y-%m-%d %H:%M:%S'), - 'limit': 100, - } - - try: - # Execute query using the check's client - query = QUERY_LOG_QUERY.format(**params) - self._log.debug("Executing query log query: %s", query) - rows = self._check.execute_query_raw(query) - - self._last_collection_timestamp = end_time_ts - - self._report_check_hist_metrics(start_time, len(rows), "get_query_log_samples") - self._log.info("Loaded %s rows from system.query_log", len(rows)) - - return rows - except Exception as e: - self._log.exception("Failed to collect query log samples: %s", str(e)) - self._check.count( - "dd.clickhouse.statement_samples.error", - 1, - tags=self.tags + ["error:query-log-fetch"] + self._get_debug_tags(), - raw=True, - ) - return [] + def _get_debug_tags(self): + t = [] + if self._tags_no_db: + t.extend(self._tags_no_db) + return t @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _get_active_queries(self): """ Fetch currently running queries from system.processes + This is analogous to Postgres querying pg_stat_activity """ start_time = time.time() @@ -201,51 +140,55 @@ def _get_active_queries(self): return rows except Exception as e: - self._log.warning("Failed to collect active queries: %s", str(e)) + self._log.exception("Failed to collect active queries: %s", str(e)) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:active-queries-fetch"] + self._get_debug_tags(), + raw=True, + ) return [] - def _normalize_query_log_row(self, row): + def _normalize_active_query_row(self, row): """ - Normalize a row from system.query_log into a standard format + Normalize a row from system.processes into a standard format """ try: ( - event_time, + elapsed, query_id, query, - query_type, user, - query_duration_ms, read_rows, read_bytes, written_rows, written_bytes, - result_rows, - result_bytes, memory_usage, - exception, + initial_query_id, + initial_user, + query_kind, + is_initial_query, ) = row normalized_row = { - 'timestamp': event_time, + 'elapsed': float(elapsed) if elapsed else 0, 'query_id': str(query_id), 'query': str(query), - 'type': str(query_type), 'user': str(user), - 'duration_ms': float(query_duration_ms) if query_duration_ms else 0, 'read_rows': int(read_rows) if read_rows else 0, 'read_bytes': int(read_bytes) if read_bytes else 0, 'written_rows': int(written_rows) if written_rows else 0, 'written_bytes': int(written_bytes) if written_bytes else 0, - 'result_rows': int(result_rows) if result_rows else 0, - 'result_bytes': int(result_bytes) if result_bytes else 0, 'memory_usage': int(memory_usage) if memory_usage else 0, - 'exception': str(exception) if exception else None, + 'initial_query_id': str(initial_query_id) if initial_query_id else None, + 'initial_user': str(initial_user) if initial_user else None, + 'query_kind': str(query_kind) if query_kind else None, + 'is_initial_query': bool(is_initial_query) if is_initial_query is not None else True, } return self._obfuscate_and_normalize_query(normalized_row) except Exception as e: - self._log.warning("Failed to normalize query log row: %s, row: %s", str(e), row) + self._log.warning("Failed to normalize active query row: %s, row: %s", str(e), row) raise def _obfuscate_and_normalize_query(self, row): @@ -257,56 +200,58 @@ def _obfuscate_and_normalize_query(self, row): statement = obfuscate_sql_with_metadata(row['query'], self._obfuscate_options) obfuscated_query = statement['query'] metadata = statement['metadata'] - row['query_signature'] = compute_sql_signature(obfuscated_query) + + row['statement'] = obfuscated_query row['dd_tables'] = metadata.get('tables', None) row['dd_commands'] = metadata.get('commands', None) row['dd_comments'] = metadata.get('comments', None) + + # Compute query signature + row['query_signature'] = compute_sql_signature(obfuscated_query) + except Exception as e: - self._log.debug("Failed to obfuscate query: %s", e) - self._check.count( - "dd.clickhouse.statement_samples.error", - 1, - tags=self.tags + ["error:sql-obfuscate"] + self._check._get_debug_tags(), - raw=True, + self._log.warning( + "Failed to obfuscate query: %s, query: %s", str(e), row.get('query', '')[:100] ) - # Use a default query signature if obfuscation fails - row['query_signature'] = compute_sql_signature(row['query'][:100]) + # On obfuscation error, we still want to emit the row + row['statement'] = None + row['query_signature'] = compute_sql_signature(row['query']) + row['dd_tables'] = None + row['dd_commands'] = None + row['dd_comments'] = None - row['statement'] = obfuscated_query return row - def _get_debug_tags(self): - return self._check._get_debug_tags() if hasattr(self._check, '_get_debug_tags') else [] - - def _report_check_hist_metrics(self, start_time, row_len, method_name): + def _filter_and_normalize_statement_rows(self, rows): """ - Report histogram metrics for check operations + Filter and normalize rows from system.processes """ - elapsed_ms = (time.time() - start_time) * 1000 - self._check.histogram( - f"dd.clickhouse.{method_name}.time", - elapsed_ms, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) - self._check.histogram( - f"dd.clickhouse.{method_name}.rows", - row_len, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) + normalized_rows = [] + for row in rows: + try: + normalized_row = self._normalize_active_query_row(row) + if normalized_row and normalized_row.get('statement'): + normalized_rows.append(normalized_row) + except Exception as e: + self._log.debug("Failed to normalize row: %s", e) + + return normalized_rows @tracked_method(agent_check_getter=agent_check_getter) def _collect_statement_samples(self): """ - Main method to collect and submit statement samples + Main method to collect and submit statement samples from active queries + Similar to Postgres _collect_statement_samples """ start_time = time.time() - # Get query log samples - rows = self._get_query_log_samples() + # Get active queries from system.processes + rows = self._get_active_queries() - self._log.info("Retrieved %s query log samples for processing", len(rows)) + self._log.info("Retrieved %s active queries for processing", len(rows)) + + # Normalize and filter rows + rows = self._filter_and_normalize_statement_rows(rows) submitted_count = 0 skipped_count = 0 @@ -314,10 +259,8 @@ def _collect_statement_samples(self): for row in rows: try: - normalized_row = self._normalize_query_log_row(row) - # Check if we should submit this sample based on rate limiting - query_signature = normalized_row.get('query_signature') + query_signature = row.get('query_signature') if not query_signature: self._log.debug("Skipping row without query signature") skipped_count += 1 @@ -328,22 +271,21 @@ def _collect_statement_samples(self): continue # Create the event payload - event = self._create_sample_event(normalized_row) + event = self._create_sample_event(row) # Log the event payload for debugging - self._log.info("Query sample event payload: ddsource=%s, query_signature=%s", + self._log.debug("Query sample event payload: ddsource=%s, query_signature=%s", event.get('ddsource'), query_signature[:50] if query_signature else 'N/A') # Submit the event event_json = json.dumps(event, default=default_json_event_encoding) - self._log.debug("Full event JSON (first 500 chars): %s", event_json[:500]) self._check.database_monitoring_query_sample(event_json) submitted_count += 1 self._log.debug("Submitted query sample for signature: %s", query_signature[:50]) except Exception as e: error_count += 1 - self._log.exception("Error processing query log row: %s", e) + self._log.exception("Error processing active query row: %s", e) self._check.count( "dd.clickhouse.statement_samples.error", 1, @@ -358,30 +300,7 @@ def _collect_statement_samples(self): submitted_count, skipped_count, error_count, elapsed_ms ) - self._check.histogram( - "dd.clickhouse.collect_statement_samples.time", - elapsed_ms, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) - self._check.count( - "dd.clickhouse.collect_statement_samples.events_submitted.count", - submitted_count, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) - self._check.count( - "dd.clickhouse.collect_statement_samples.events_skipped.count", - skipped_count, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) - self._check.count( - "dd.clickhouse.collect_statement_samples.events_errors.count", - error_count, - tags=self.tags + self._get_debug_tags(), - raw=True, - ) + # Report cache size metrics self._check.gauge( "dd.clickhouse.collect_statement_samples.seen_samples_cache.len", len(self._seen_samples_ratelimiter), @@ -392,6 +311,7 @@ def _collect_statement_samples(self): def _create_sample_event(self, row): """ Create a database monitoring query sample event + Format follows Postgres integration pattern """ db = self._check._db @@ -400,7 +320,7 @@ def _create_sample_event(self, row): "database_instance": self._check.database_identifier, "ddagentversion": datadog_agent.get_version(), "ddsource": "clickhouse", - "dbm_type": "sample", + "dbm_type": "plan", # Using "plan" type like Postgres (even without actual explain plans) "ddtags": ",".join(self._dbtags(db)), "timestamp": int(time.time() * 1000), "db": { @@ -415,26 +335,13 @@ def _create_sample_event(self, row): }, }, "clickhouse": { - "query_id": row.get('query_id'), - "type": row.get('type'), - "duration_ms": row.get('duration_ms'), - "read_rows": row.get('read_rows'), - "read_bytes": row.get('read_bytes'), - "written_rows": row.get('written_rows'), - "written_bytes": row.get('written_bytes'), - "result_rows": row.get('result_rows'), - "result_bytes": row.get('result_bytes'), - "memory_usage": row.get('memory_usage'), + k: v for k, v in row.items() if k not in system_processes_sample_exclude_keys }, } - # Add exception information if present - if row.get('exception'): - event['clickhouse']['exception'] = row['exception'] - - # Add duration if available - if row.get('duration_ms'): - event['duration'] = int(row['duration_ms'] * 1e6) # Convert to nanoseconds + # Add duration if available (elapsed time in seconds, convert to nanoseconds) + if row.get('elapsed'): + event['duration'] = int(row['elapsed'] * 1e9) return event From 51ea3b6f52734729e193c6959eb57fcc316a1bb0 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Thu, 6 Nov 2025 14:53:05 -0500 Subject: [PATCH 04/10] Update implementation --- clickhouse/README.md | 114 ----- .../datadog_checks/clickhouse/clickhouse.py | 60 ++- .../clickhouse/statement_samples.py | 220 +++++++++- .../datadog_checks/clickhouse/statements.py | 407 ++++++++++++++++++ clickhouse/test_query_samples.py | 212 --------- clickhouse/tests/test_dbm_integration.py | 11 +- 6 files changed, 675 insertions(+), 349 deletions(-) delete mode 100644 clickhouse/README.md create mode 100644 clickhouse/datadog_checks/clickhouse/statements.py delete mode 100644 clickhouse/test_query_samples.py diff --git a/clickhouse/README.md b/clickhouse/README.md deleted file mode 100644 index d5904502c12f8..0000000000000 --- a/clickhouse/README.md +++ /dev/null @@ -1,114 +0,0 @@ -# Agent Check: ClickHouse - -## Overview - -This check monitors [ClickHouse][1] through the Datadog Agent. - -**Minimum Agent version:** 7.16.0 - -## Setup - -Follow the instructions below to install and configure this check for an Agent running on a host. For containerized environments, see the [Autodiscovery Integration Templates][2] for guidance on applying these instructions. - -### Installation - -The ClickHouse check is included in the [Datadog Agent][3] package. No additional installation is needed on your server. - -### Configuration - - - - -#### Host - -To configure this check for an Agent running on a host: - -#### Metric collection - -1. To start collecting your ClickHouse performance data, edit the `clickhouse.d/conf.yaml` file in the `conf.d/` folder at the root of your Agent's configuration directory. See the [sample clickhouse.d/conf.yaml][4] for all available configuration options. - -*Note*: This integration uses the official `clickhouse-connect` client to connect over HTTP. - -2. [Restart the Agent][5]. - -##### Log collection - -1. Collecting logs is disabled by default in the Datadog Agent, enable it in your `datadog.yaml` file: - - ```yaml - logs_enabled: true - ``` - -2. Add the log files you are interested in to your `clickhouse.d/conf.yaml` file to start collecting your ClickHouse logs: - - ```yaml - logs: - - type: file - path: /var/log/clickhouse-server/clickhouse-server.log - source: clickhouse - service: "" - ``` - - Change the `path` and `service` parameter values and configure them for your environment. See the [sample clickhouse.d/conf.yaml][4] for all available configuration options. - -3. [Restart the Agent][5]. - - - - -#### Containerized - -For containerized environments, see the [Autodiscovery Integration Templates][2] for guidance on applying the parameters below. - -#### Metric collection - -| Parameter | Value | -|----------------------|------------------------------------------------------------| -| `` | `clickhouse` | -| `` | blank or `{}` | -| `` | `{"server": "%%host%%", "port": "%%port%%", "username": "", "password": ""}` | - -##### Log collection - -Collecting logs is disabled by default in the Datadog Agent. To enable it, see [Kubernetes log collection][6]. - -| Parameter | Value | -|----------------|-------------------------------------------| -| `` | `{"source": "clickhouse", "service": ""}` | - - - - -### Validation - -[Run the Agent's status subcommand][7] and look for `clickhouse` under the **Checks** section. - -## Data Collected - -### Metrics - -See [metadata.csv][8] for a list of metrics provided by this integration. - -### Events - -The ClickHouse check does not include any events. - -### Service Checks - -See [service_checks.json][9] for a list of service checks provided by this integration. - -## Troubleshooting - -Need help? Contact [Datadog support][10]. - - -[1]: https://clickhouse.yandex -[2]: https://docs.datadoghq.com/agent/kubernetes/integrations/ -[3]: /account/settings/agent/latest -[4]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example -[5]: https://docs.datadoghq.com/agent/guide/agent-commands/#start-stop-and-restart-the-agent -[6]: https://docs.datadoghq.com/agent/kubernetes/log/ -[7]: https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information -[8]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/metadata.csv -[9]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/assets/service_checks.json -[10]: https://docs.datadoghq.com/help/ diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index d9e47375e03ef..d8f94de1b567b 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -8,6 +8,7 @@ from . import queries from .statement_samples import ClickhouseStatementSamples +from .statements import ClickhouseStatementMetrics from .utils import ErrorSanitizer @@ -63,8 +64,30 @@ def __init__(self, name, init_config, instances): ) self.check_initializations.append(self._query_manager.compile_queries) - # Initialize statement samples if DBM is enabled + # Initialize DBM components if enabled self._dbm_enabled = is_affirmative(self.instance.get('dbm', False)) + + # Initialize query metrics (from system.query_log - analogous to pg_stat_statements) + self._query_metrics_config = self.instance.get('query_metrics', {}) + if self._dbm_enabled and self._query_metrics_config.get('enabled', True): + # Create a simple config object for query metrics + class QueryMetricsConfig: + def __init__(self, config_dict): + self.enabled = config_dict.get('enabled', True) + self.collection_interval = config_dict.get('collection_interval', 60) + self.run_sync = config_dict.get('run_sync', False) + self.full_statement_text_cache_max_size = config_dict.get( + 'full_statement_text_cache_max_size', 10000 + ) + self.full_statement_text_samples_per_hour_per_query = config_dict.get( + 'full_statement_text_samples_per_hour_per_query', 1 + ) + + self.statement_metrics = ClickhouseStatementMetrics(self, QueryMetricsConfig(self._query_metrics_config)) + else: + self.statement_metrics = None + + # Initialize query samples (from system.processes - analogous to pg_stat_activity) self._query_samples_config = self.instance.get('query_samples', {}) if self._dbm_enabled and self._query_samples_config.get('enabled', True): # Create a simple config object for statement samples @@ -85,7 +108,11 @@ def check(self, _): self._query_manager.execute() self.collect_version() - # Run statement samples if DBM is enabled + # Run query metrics collection if DBM is enabled (from system.query_log) + if self.statement_metrics: + self.statement_metrics.run_job_loop(self._tags) + + # Run statement samples if DBM is enabled (from system.processes) if self.statement_samples: self.statement_samples.run_job_loop(self._tags) @@ -181,3 +208,32 @@ def connect(self): else: self.service_check(self.SERVICE_CHECK_CONNECT, self.OK, tags=self._tags) self._client = client + + def create_dbm_client(self): + """ + Create a separate ClickHouse client for DBM async jobs. + This prevents concurrent query errors when multiple jobs run simultaneously. + """ + try: + client = clickhouse_connect.get_client( + host=self._server, + port=self._port, + username=self._user, + password=self._password, + database=self._db, + secure=self._tls_verify, + connect_timeout=self._connect_timeout, + send_receive_timeout=self._read_timeout, + client_name=f'datadog-dbm-{self.check_id}', + compress=self._compression, + ca_cert=self._tls_ca_cert, + verify=self._verify, + settings={}, + ) + return client + except Exception as e: + error = 'Unable to create DBM client: {}'.format( + self._error_sanitizer.clean(self._error_sanitizer.scrub(str(e))) + ) + self.log.warning(error) + raise diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index 6d6c81a9b8e5f..b2b59c0e1ebdd 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -6,8 +6,6 @@ import time from typing import TYPE_CHECKING -from cachetools import TTLCache - if TYPE_CHECKING: from datadog_checks.clickhouse import ClickhouseCheck @@ -50,6 +48,18 @@ AND query != '' """ +# Query to get active connections aggregated by user, database, etc +# Similar to Postgres PG_ACTIVE_CONNECTIONS_QUERY +ACTIVE_CONNECTIONS_QUERY = """ +SELECT + user, + query_kind, + count(*) as connections +FROM system.processes +WHERE query NOT LIKE '%system.processes%' +GROUP BY user, query_kind +""" + # Columns from system.processes which correspond to attributes common to all databases # and are therefore stored under other standard keys system_processes_sample_exclude_keys = { @@ -90,6 +100,9 @@ def __init__(self, check: ClickhouseCheck, config): self._tags_no_db = None self.tags = None + # Create a separate client for this DBM job to avoid concurrent query errors + self._db_client = None + # Get obfuscator options from config if available obfuscate_options = { 'return_json_metadata': True, @@ -107,6 +120,12 @@ def __init__(self, check: ClickhouseCheck, config): self._collection_interval = collection_interval + # Activity snapshot collection configuration + self._activity_coll_enabled = getattr(config, 'activity_enabled', True) + self._activity_coll_interval = getattr(config, 'activity_collection_interval', 10) + self._activity_max_rows = getattr(config, 'activity_max_rows', 1000) + self._time_since_last_activity_event = 0 + def _dbtags(self, db, *extra_tags): """ Returns the default instance tags with the initial "db" tag replaced with the provided tag @@ -124,6 +143,24 @@ def _get_debug_tags(self): t.extend(self._tags_no_db) return t + def _report_check_hist_metrics(self, start_time, row_count, operation): + """ + Report histogram metrics for check operations + """ + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram( + "dd.clickhouse.statement_samples.{}.time".format(operation), + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.histogram( + "dd.clickhouse.statement_samples.{}.rows".format(operation), + row_count, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) def _get_active_queries(self): """ @@ -133,7 +170,11 @@ def _get_active_queries(self): start_time = time.time() try: - rows = self._check.execute_query_raw(ACTIVE_QUERIES_QUERY) + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(ACTIVE_QUERIES_QUERY) + rows = result.result_rows self._report_check_hist_metrics(start_time, len(rows), "get_active_queries") self._log.debug("Loaded %s rows from system.processes", len(rows)) @@ -141,6 +182,8 @@ def _get_active_queries(self): return rows except Exception as e: self._log.exception("Failed to collect active queries: %s", str(e)) + # Reset client on error to force reconnect + self._db_client = None self._check.count( "dd.clickhouse.statement_samples.error", 1, @@ -210,9 +253,7 @@ def _obfuscate_and_normalize_query(self, row): row['query_signature'] = compute_sql_signature(obfuscated_query) except Exception as e: - self._log.warning( - "Failed to obfuscate query: %s, query: %s", str(e), row.get('query', '')[:100] - ) + self._log.warning("Failed to obfuscate query: %s, query: %s", str(e), row.get('query', '')[:100]) # On obfuscation error, we still want to emit the row row['statement'] = None row['query_signature'] = compute_sql_signature(row['query']) @@ -237,6 +278,106 @@ def _filter_and_normalize_statement_rows(self, rows): return normalized_rows + def _get_active_connections(self): + """ + Get aggregated active connection counts from system.processes + Similar to Postgres _get_active_connections from pg_stat_activity + """ + try: + start_time = time.time() + + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(ACTIVE_CONNECTIONS_QUERY) + rows = result.result_rows + + elapsed_ms = (time.time() - start_time) * 1000 + self._log.debug("Retrieved %s connection aggregation rows in %.2f ms", len(rows), elapsed_ms) + + # Convert to list of dicts + connections = [] + for row in rows: + connections.append( + { + 'user': row[0], + 'query_kind': row[1], + 'connections': row[2], + } + ) + + return connections + + except Exception as e: + self._log.warning("Failed to get active connections: %s", e) + # Reset client on error to force reconnect + self._db_client = None + return [] + + def _to_active_session(self, row): + """ + Convert a system.processes row to an active session + Similar to Postgres _to_active_session + """ + # Filter out non-active queries + if not row.get('query') or not row.get('statement'): + return None + + # Remove null values and the raw query + active_row = {key: val for key, val in row.items() if val is not None and key != 'query'} + return active_row + + def _create_active_sessions(self, rows): + """ + Create active sessions from system.processes rows + Similar to Postgres _create_active_sessions + """ + active_sessions_count = 0 + for row in rows: + active_row = self._to_active_session(row) + if active_row: + active_sessions_count += 1 + yield active_row + if active_sessions_count >= self._activity_max_rows: + break + + def _create_activity_event(self, rows, active_connections): + """ + Create a database monitoring activity event + Similar to Postgres _create_activity_event + """ + self._time_since_last_activity_event = time.time() + active_sessions = [] + + for row in self._create_active_sessions(rows): + active_sessions.append(row) + + event = { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "dbm_type": "activity", + "collection_interval": self._activity_coll_interval, + "ddtags": self._tags_no_db, + "timestamp": time.time() * 1000, + "cloud_metadata": getattr(self._check, 'cloud_metadata', {}), + 'service': getattr(self._config, 'service', None), + "clickhouse_activity": active_sessions, + "clickhouse_connections": active_connections, + } + return event + + def _report_activity_event(self): + """ + Check if we should report an activity event based on collection interval + Similar to Postgres _report_activity_event + """ + elapsed_s = time.time() - self._time_since_last_activity_event + if elapsed_s < self._activity_coll_interval or not self._activity_coll_enabled: + return False + return True + @tracked_method(agent_check_getter=agent_check_getter) def _collect_statement_samples(self): """ @@ -274,8 +415,11 @@ def _collect_statement_samples(self): event = self._create_sample_event(row) # Log the event payload for debugging - self._log.debug("Query sample event payload: ddsource=%s, query_signature=%s", - event.get('ddsource'), query_signature[:50] if query_signature else 'N/A') + self._log.debug( + "Query sample event payload: ddsource=%s, query_signature=%s", + event.get('ddsource'), + query_signature[:50] if query_signature else 'N/A', + ) # Submit the event event_json = json.dumps(event, default=default_json_event_encoding) @@ -297,7 +441,10 @@ def _collect_statement_samples(self): self._log.info( "Statement sample collection complete: submitted=%s, skipped=%s, errors=%s, elapsed_ms=%.2f", - submitted_count, skipped_count, error_count, elapsed_ms + submitted_count, + skipped_count, + error_count, + elapsed_ms, ) # Report cache size metrics @@ -310,8 +457,9 @@ def _collect_statement_samples(self): def _create_sample_event(self, row): """ - Create a database monitoring query sample event + Create a database monitoring query sample event (plan type) Format follows Postgres integration pattern + This represents currently executing queries from system.processes """ db = self._check._db @@ -320,7 +468,7 @@ def _create_sample_event(self, row): "database_instance": self._check.database_identifier, "ddagentversion": datadog_agent.get_version(), "ddsource": "clickhouse", - "dbm_type": "plan", # Using "plan" type like Postgres (even without actual explain plans) + "dbm_type": "plan", # Using "plan" type for query samples "ddtags": ",".join(self._dbtags(db)), "timestamp": int(time.time() * 1000), "db": { @@ -334,9 +482,7 @@ def _create_sample_event(self, row): "comments": row.get('dd_comments'), }, }, - "clickhouse": { - k: v for k, v in row.items() if k not in system_processes_sample_exclude_keys - }, + "clickhouse": {k: v for k, v in row.items() if k not in system_processes_sample_exclude_keys}, } # Add duration if available (elapsed time in seconds, convert to nanoseconds) @@ -353,4 +499,50 @@ def run_job(self): self.tags = [t for t in self._check._tags if not t.startswith('dd.internal')] self._tags_no_db = [t for t in self.tags if not t.startswith('db:')] + # Check if we should collect activity snapshots + collect_activity = self._report_activity_event() + + # Always collect statement samples self._collect_statement_samples() + + # Collect and submit activity event if it's time + if collect_activity: + try: + start_time = time.time() + + # Get active queries for activity snapshot + rows = self._get_active_queries() + rows = self._filter_and_normalize_statement_rows(rows) + + # Get active connections aggregation + active_connections = self._get_active_connections() + + # Create and submit activity event + activity_event = self._create_activity_event(rows, active_connections) + self._check.database_monitoring_query_activity( + json.dumps(activity_event, default=default_json_event_encoding) + ) + + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram( + "dd.clickhouse.collect_activity_snapshot.time", + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + self._log.info( + "Activity snapshot collected and submitted: sessions=%s, connections=%s, elapsed_ms=%.2f", + len(activity_event.get('clickhouse_activity', [])), + len(activity_event.get('clickhouse_connections', [])), + elapsed_ms, + ) + + except Exception as e: + self._log.exception("Failed to collect activity snapshot: %s", e) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:collect-activity-snapshot"] + self._get_debug_tags(), + raw=True, + ) diff --git a/clickhouse/datadog_checks/clickhouse/statements.py b/clickhouse/datadog_checks/clickhouse/statements.py new file mode 100644 index 0000000000000..8ab22ef3492b3 --- /dev/null +++ b/clickhouse/datadog_checks/clickhouse/statements.py @@ -0,0 +1,407 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import copy +import time +from typing import TYPE_CHECKING + +from cachetools import TTLCache + +if TYPE_CHECKING: + from datadog_checks.clickhouse import ClickhouseCheck + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from datadog_checks.base.utils.common import to_native_string +from datadog_checks.base.utils.db.sql import compute_sql_signature +from datadog_checks.base.utils.db.statement_metrics import StatementMetrics +from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding, obfuscate_sql_with_metadata +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method + +# Query to fetch aggregated metrics from system.query_log +# This is the ClickHouse equivalent of Postgres pg_stat_statements +STATEMENTS_QUERY = """ +SELECT + normalized_query_hash, + any(query) as query_text, + any(user) as user, + any(type) as query_type, + any(databases) as databases, + any(tables) as tables, + count() as execution_count, + sum(query_duration_ms) as total_duration_ms, + avg(query_duration_ms) as avg_duration_ms, + min(query_duration_ms) as min_duration_ms, + max(query_duration_ms) as max_duration_ms, + quantile(0.95)(query_duration_ms) as p95_duration_ms, + sum(read_rows) as total_read_rows, + sum(read_bytes) as total_read_bytes, + sum(written_rows) as total_written_rows, + sum(written_bytes) as total_written_bytes, + sum(result_rows) as total_result_rows, + sum(result_bytes) as total_result_bytes, + sum(memory_usage) as total_memory_usage, + max(memory_usage) as peak_memory_usage +FROM system.query_log +WHERE event_time >= now() - INTERVAL {collection_interval} SECOND + AND type IN ('QueryFinish', 'ExceptionWhileProcessing') + AND query NOT LIKE '%system.query_log%' + AND query NOT LIKE '%system.processes%' + AND query NOT LIKE '/* DDIGNORE */%' + AND query != '' + AND normalized_query_hash != 0 +GROUP BY normalized_query_hash +ORDER BY total_duration_ms DESC +LIMIT 10000 +""" + + +def agent_check_getter(self): + return self._check + + +def _row_key(row): + """ + :param row: a normalized row from system.query_log + :return: a tuple uniquely identifying this row + """ + return row['query_signature'], row.get('user', ''), row.get('databases', '') + + +class ClickhouseStatementMetrics(DBMAsyncJob): + """Collects telemetry for SQL statements from system.query_log""" + + def __init__(self, check: ClickhouseCheck, config): + collection_interval = float(getattr(config, 'collection_interval', 60)) + super(ClickhouseStatementMetrics, self).__init__( + check, + run_sync=getattr(config, 'run_sync', False), + enabled=getattr(config, 'enabled', True), + expected_db_exceptions=(Exception,), + min_collection_interval=check.check_interval if hasattr(check, 'check_interval') else 15, + dbms="clickhouse", + rate_limit=1 / float(collection_interval), + job_name="query-metrics", + ) + self._check = check + self._metrics_collection_interval = collection_interval + self._config = config + self._tags_no_db = None + self.tags = None + self._state = StatementMetrics() + + # Create a separate client for this DBM job to avoid concurrent query errors + self._db_client = None + + # Obfuscator options + obfuscate_options = { + 'return_json_metadata': True, + 'collect_tables': True, + 'collect_commands': True, + 'collect_comments': True, + } + self._obfuscate_options = to_native_string(json.dumps(obfuscate_options)) + + # full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature + self._full_statement_text_cache = TTLCache( + maxsize=getattr(config, 'full_statement_text_cache_max_size', 10000), + ttl=60 * 60 / getattr(config, 'full_statement_text_samples_per_hour_per_query', 1), + ) + + def _execute_query(self, query): + """Execute a query and return the results using the dedicated client""" + if self._cancel_event.is_set(): + raise Exception("Job loop cancelled. Aborting query.") + try: + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(query) + return result.result_rows + except Exception as e: + self._log.warning("Failed to run query: %s", e) + # Reset client on error to force reconnect + self._db_client = None + raise e + + def run_job(self): + # do not emit any dd.internal metrics for DBM specific check code + self.tags = [t for t in self._tags if not t.startswith('dd.internal')] + self._tags_no_db = [t for t in self.tags if not t.startswith('db:')] + self.collect_per_statement_metrics() + + @tracked_method(agent_check_getter=agent_check_getter) + def collect_per_statement_metrics(self): + """ + Collect per-statement metrics from system.query_log and emit as: + 1. FQT events (dbm_type: fqt) - for query text catalog + 2. Query metrics payload - for time-series metrics + """ + try: + rows = self._collect_metrics_rows() + if not rows: + return + + # Emit FQT (Full Query Text) events + for event in self._rows_to_fqt_events(rows): + self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding)) + + # Prepare metrics payload wrapper + payload_wrapper = { + 'host': self._check.reported_hostname, + 'timestamp': time.time() * 1000, + 'min_collection_interval': self._metrics_collection_interval, + 'tags': self._tags_no_db, + 'ddagentversion': datadog_agent.get_version(), + 'clickhouse_version': self._get_clickhouse_version(), + } + + # Get query metrics payloads (may be split into multiple if too large) + payloads = self._get_query_metrics_payloads(payload_wrapper, rows) + + for payload in payloads: + self._check.database_monitoring_query_metrics(payload) + + except Exception: + self._log.exception('Unable to collect statement metrics due to an error') + return [] + + def _get_clickhouse_version(self): + """Get ClickHouse version string""" + try: + version_rows = self._check.execute_query_raw('SELECT version()') + if version_rows: + return str(version_rows[0][0]) + except Exception: + pass + return 'unknown' + + def _get_query_metrics_payloads(self, payload_wrapper, rows): + """ + Split rows into multiple payloads if needed to avoid exceeding size limits + """ + payloads = [] + max_size = 5 * 1024 * 1024 # 5MB limit + + queue = [rows] + while queue: + current = queue.pop() + if len(current) == 0: + continue + + payload = copy.deepcopy(payload_wrapper) + payload["clickhouse_rows"] = current + serialized_payload = json.dumps(payload, default=default_json_event_encoding) + size = len(serialized_payload) + + if size < max_size: + payloads.append(serialized_payload) + else: + if len(current) == 1: + self._log.warning( + "A single query is too large to send to Datadog. This query will be dropped. size=%d", + size, + ) + continue + mid = len(current) // 2 + queue.append(current[:mid]) + queue.append(current[mid:]) + + return payloads + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _load_query_log_statements(self): + """ + Load aggregated query metrics from system.query_log + This is analogous to Postgres loading from pg_stat_statements + """ + try: + query = STATEMENTS_QUERY.format(collection_interval=int(self._metrics_collection_interval)) + rows = self._execute_query(query) + + self._log.debug("Loaded %s rows from system.query_log", len(rows)) + + # Convert to list of dicts + result_rows = [] + for row in rows: + ( + normalized_query_hash, + query_text, + user, + query_type, + databases, + tables, + execution_count, + total_duration_ms, + avg_duration_ms, + min_duration_ms, + max_duration_ms, + p95_duration_ms, + total_read_rows, + total_read_bytes, + total_written_rows, + total_written_bytes, + total_result_rows, + total_result_bytes, + total_memory_usage, + peak_memory_usage, + ) = row + + result_rows.append( + { + 'normalized_query_hash': str(normalized_query_hash), + 'query': str(query_text) if query_text else '', + 'user': str(user) if user else '', + 'query_type': str(query_type) if query_type else '', + 'databases': str(databases[0]) if databases and len(databases) > 0 else '', + 'tables': tables if tables else [], + 'calls': int(execution_count) if execution_count else 0, + 'total_time': float(total_duration_ms) if total_duration_ms else 0.0, + 'mean_time': float(avg_duration_ms) if avg_duration_ms else 0.0, + 'min_time': float(min_duration_ms) if min_duration_ms else 0.0, + 'max_time': float(max_duration_ms) if max_duration_ms else 0.0, + 'p95_time': float(p95_duration_ms) if p95_duration_ms else 0.0, + 'rows': int(total_result_rows) if total_result_rows else 0, + 'read_rows': int(total_read_rows) if total_read_rows else 0, + 'read_bytes': int(total_read_bytes) if total_read_bytes else 0, + 'written_rows': int(total_written_rows) if total_written_rows else 0, + 'written_bytes': int(total_written_bytes) if total_written_bytes else 0, + 'result_bytes': int(total_result_bytes) if total_result_bytes else 0, + 'memory_usage': int(total_memory_usage) if total_memory_usage else 0, + 'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0, + } + ) + + return result_rows + + except Exception as e: + self._log.exception("Failed to load statements from system.query_log: %s", e) + self._check.count( + "dd.clickhouse.statement_metrics.error", + 1, + tags=self.tags + ["error:query_log_load_failed"], + raw=True, + ) + return [] + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _collect_metrics_rows(self): + """ + Collect and normalize query metrics rows + """ + rows = self._load_query_log_statements() + if not rows: + return [] + + # Normalize queries (obfuscate SQL text) + rows = self._normalize_queries(rows) + + if not rows: + return [] + + # Get available metric columns + available_columns = set(rows[0].keys()) + metric_columns = available_columns & { + 'calls', + 'total_time', + 'mean_time', + 'min_time', + 'max_time', + 'p95_time', + 'rows', + 'read_rows', + 'read_bytes', + 'written_rows', + 'written_bytes', + 'result_bytes', + 'memory_usage', + 'peak_memory_usage', + } + + # Compute derivative rows (calculate deltas since last collection) + rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key, execution_indicators=['calls']) + + self._check.gauge( + 'dd.clickhouse.queries.query_rows_raw', + len(rows), + tags=self.tags + self._check._get_debug_tags(), + raw=True, + ) + + return rows + + def _normalize_queries(self, rows): + """ + Normalize and obfuscate queries + """ + normalized_rows = [] + for row in rows: + normalized_row = dict(copy.copy(row)) + try: + query_text = row['query'] + statement = obfuscate_sql_with_metadata(query_text, self._obfuscate_options) + except Exception as e: + self._log.debug("Failed to obfuscate query | err=[%s]", e) + continue + + obfuscated_query = statement['query'] + normalized_row['query'] = obfuscated_query + normalized_row['query_signature'] = compute_sql_signature(obfuscated_query) + + metadata = statement['metadata'] + normalized_row['dd_tables'] = metadata.get('tables', None) + normalized_row['dd_commands'] = metadata.get('commands', None) + normalized_row['dd_comments'] = metadata.get('comments', None) + normalized_rows.append(normalized_row) + + return normalized_rows + + def _rows_to_fqt_events(self, rows): + """ + Generate FQT (Full Query Text) events for each unique query signature + These events provide the mapping from query_signature to actual SQL text + dbm_type: fqt + """ + for row in rows: + query_cache_key = _row_key(row) + if query_cache_key in self._full_statement_text_cache: + continue + self._full_statement_text_cache[query_cache_key] = True + + db = row.get('databases', 'default') + user = row.get('user', 'default') + + row_tags = self._tags_no_db + [ + "db:{}".format(db), + "user:{}".format(user), + ] + + yield { + "timestamp": time.time() * 1000, + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "ddtags": ",".join(row_tags), + "dbm_type": "fqt", + "db": { + "instance": db, + "query_signature": row['query_signature'], + "statement": row['query'], + "metadata": { + "tables": row['dd_tables'], + "commands": row['dd_commands'], + "comments": row['dd_comments'], + }, + }, + "clickhouse": { + "user": user, + "normalized_query_hash": row.get('normalized_query_hash'), + }, + } diff --git a/clickhouse/test_query_samples.py b/clickhouse/test_query_samples.py deleted file mode 100644 index 9e299aa025df0..0000000000000 --- a/clickhouse/test_query_samples.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify ClickHouse query samples are being collected correctly. -Run this inside the datadog-agent container. -""" - -import sys -import time -import subprocess -import json - - -def run_command(cmd): - """Execute a shell command and return output.""" - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) - return result.stdout, result.stderr, result.returncode - - -def test_clickhouse_query_log(): - """Test if ClickHouse has query log data.""" - print("=" * 60) - print("TEST 1: Checking ClickHouse query_log table") - print("=" * 60) - - query = """ - SELECT count(*) as query_count - FROM system.query_log - WHERE event_time > now() - INTERVAL 60 SECOND - """ - - cmd = f"clickhouse-client --host clickhouse-primary --port 9000 --user datadog --password datadog --query \"{query}\"" - stdout, stderr, code = run_command(cmd) - - if code != 0: - print(f"❌ FAILED: Cannot query ClickHouse: {stderr}") - return False - - count = int(stdout.strip()) - print(f"✅ Query log has {count} queries in the last 60 seconds") - - if count == 0: - print("⚠️ WARNING: No queries found. Make sure the orders app is running.") - return False - - return True - - -def test_agent_check(): - """Test if the ClickHouse integration check runs successfully.""" - print("\n" + "=" * 60) - print("TEST 2: Running ClickHouse agent check") - print("=" * 60) - - cmd = "agent check clickhouse -l info 2>&1 | tail -50" - stdout, stderr, code = run_command(cmd) - - if "Running check clickhouse" in stdout: - print("✅ ClickHouse check is running") - else: - print("❌ FAILED: ClickHouse check not running properly") - print(stdout[-500:]) # Last 500 chars - return False - - if "error" in stdout.lower() and "statement_sample" in stdout.lower(): - print("⚠️ WARNING: Errors found in check output:") - print(stdout) - return False - - return True - - -def test_agent_logs(): - """Check agent logs for statement sample activity.""" - print("\n" + "=" * 60) - print("TEST 3: Checking agent logs for query samples") - print("=" * 60) - - # Wait a bit for samples to be collected - print("Waiting 15 seconds for sample collection...") - time.sleep(15) - - cmd = "tail -100 /var/log/datadog/agent.log | grep -i 'statement_sample\\|query_sample\\|query log samples' | tail -20" - stdout, stderr, code = run_command(cmd) - - if not stdout: - print("⚠️ No statement sample logs found") - print("Checking for any ClickHouse logs...") - cmd = "tail -100 /var/log/datadog/agent.log | grep -i clickhouse | tail -10" - stdout, _, _ = run_command(cmd) - print(stdout if stdout else "No ClickHouse logs found") - return False - - print("✅ Found statement sample activity in logs:") - print(stdout) - - # Check for success indicators - if "Loaded" in stdout and "rows from system.query_log" in stdout: - print("✅ Query log samples are being fetched") - return True - - if "submitted" in stdout.lower(): - print("✅ Samples are being submitted") - return True - - return False - - -def test_metrics(): - """Check if statement sample metrics are being reported.""" - print("\n" + "=" * 60) - print("TEST 4: Checking statement sample metrics") - print("=" * 60) - - cmd = "agent status 2>&1 | grep -A 20 clickhouse | grep -i 'sample\\|query_log'" - stdout, stderr, code = run_command(cmd) - - if stdout: - print("✅ Found ClickHouse metrics:") - print(stdout) - return True - else: - print("⚠️ No statement sample metrics found in agent status") - return False - - -def show_sample_queries(): - """Show sample queries from query_log.""" - print("\n" + "=" * 60) - print("BONUS: Sample queries in ClickHouse query_log") - print("=" * 60) - - query = """ - SELECT - event_time, - user, - query_duration_ms, - substring(query, 1, 100) as query_preview - FROM system.query_log - WHERE query NOT LIKE '%system.query_log%' - AND event_time > now() - INTERVAL 60 SECOND - ORDER BY event_time DESC - LIMIT 10 - FORMAT Pretty - """ - - cmd = f"clickhouse-client --host clickhouse-primary --port 9000 --user datadog --password datadog --query \"{query}\"" - stdout, stderr, code = run_command(cmd) - - if code == 0: - print(stdout) - else: - print(f"Could not fetch sample queries: {stderr}") - - -def main(): - """Run all tests.""" - print("\n" + "=" * 60) - print("CLICKHOUSE QUERY SAMPLES VERIFICATION") - print("=" * 60) - - tests = [ - ("ClickHouse Query Log", test_clickhouse_query_log), - ("Agent Check", test_agent_check), - ("Agent Logs", test_agent_logs), - ("Metrics", test_metrics), - ] - - results = [] - for test_name, test_func in tests: - try: - result = test_func() - results.append((test_name, result)) - except Exception as e: - print(f"❌ FAILED: {test_name} - {e}") - results.append((test_name, False)) - - # Show sample queries - try: - show_sample_queries() - except Exception as e: - print(f"Could not show sample queries: {e}") - - # Summary - print("\n" + "=" * 60) - print("TEST SUMMARY") - print("=" * 60) - - passed = sum(1 for _, result in results if result) - total = len(results) - - for test_name, result in results: - status = "✅ PASS" if result else "❌ FAIL" - print(f"{status}: {test_name}") - - print(f"\nTotal: {passed}/{total} tests passed") - - if passed == total: - print("\n🎉 All tests passed! Query samples should be working.") - else: - print("\n⚠️ Some tests failed. Check the output above for details.") - print("\nTroubleshooting tips:") - print("1. Make sure orders app is running and generating queries") - print("2. Check that DBM is enabled in clickhouse.yaml") - print("3. Verify datadog user has permissions on system.query_log") - print("4. Check agent logs: tail -f /var/log/datadog/agent.log | grep clickhouse") - - return 0 if passed == total else 1 - - -if __name__ == "__main__": - sys.exit(main()) - diff --git a/clickhouse/tests/test_dbm_integration.py b/clickhouse/tests/test_dbm_integration.py index 49b368d9eda6b..fa498cfc63583 100644 --- a/clickhouse/tests/test_dbm_integration.py +++ b/clickhouse/tests/test_dbm_integration.py @@ -9,8 +9,6 @@ from datadog_checks.clickhouse import ClickhouseCheck -from .common import CONFIG - @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') @@ -105,9 +103,9 @@ def test_query_samples_disabled(self, aggregator, instance): check.check(None) # Verify no DBM metrics are reported - assert not aggregator.metrics( - 'dd.clickhouse.collect_statement_samples.events_submitted.count' - ), "No DBM metrics should be reported when DBM is disabled" + assert not aggregator.metrics('dd.clickhouse.collect_statement_samples.events_submitted.count'), ( + "No DBM metrics should be reported when DBM is disabled" + ) def test_query_samples_with_activity(self, aggregator, instance, dd_run_check): """ @@ -242,7 +240,7 @@ def test_statement_samples_event_structure(self, instance): assert 'ddsource' in event, "Event should have ddsource field" assert event['ddsource'] == 'clickhouse', "ddsource should be clickhouse" assert 'dbm_type' in event, "Event should have dbm_type field" - assert event['dbm_type'] == 'sample', "dbm_type should be sample" + assert event['dbm_type'] == 'plan', "dbm_type should be plan" assert 'timestamp' in event, "Event should have timestamp field" assert 'db' in event, "Event should have db field" assert 'clickhouse' in event, "Event should have clickhouse field" @@ -269,4 +267,3 @@ def test_statement_samples_event_structure(self, instance): print("Event structure is valid!") print(f"Event keys: {list(event.keys())}") - From 15b1ff85666f8c5ff02b3d7f1d2f7683e2817e82 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Thu, 6 Nov 2025 16:09:43 -0500 Subject: [PATCH 05/10] Update --- clickhouse/README.md | 135 ++++++++++++++++++ .../clickhouse/statement_samples.py | 4 +- 2 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 clickhouse/README.md diff --git a/clickhouse/README.md b/clickhouse/README.md new file mode 100644 index 0000000000000..f3ea2e0945a13 --- /dev/null +++ b/clickhouse/README.md @@ -0,0 +1,135 @@ +# ClickHouse Integration + +## Overview + +The ClickHouse integration provides health and performance metrics for your ClickHouse database in near real-time. Visualize these metrics with the provided dashboard and create monitors to alert your team on ClickHouse states. + +Enable Database Monitoring (DBM) for enhanced insights into query performance and database health. In addition to the standard integration, Datadog DBM provides query-level metrics, live and historical query snapshots, and query explain plans. + +**Minimum Agent version:** 7.50.0 + +## Setup + +### Installation + +The ClickHouse check is packaged with the Agent. To start gathering your ClickHouse metrics and logs, [install the Agent](https://docs.datadoghq.com/agent/). + +### Configuration + +#### Prepare ClickHouse + +To get started with the ClickHouse integration, create a `datadog` user with proper access to your ClickHouse server. + +```sql +CREATE USER datadog IDENTIFIED BY ''; +GRANT SELECT ON system.* TO datadog; +GRANT SELECT ON information_schema.* TO datadog; +GRANT SHOW DATABASES ON *.* TO datadog; +GRANT SHOW TABLES ON *.* TO datadog; +GRANT SHOW COLUMNS ON *.* TO datadog; +``` + +#### Configure the Agent + +Edit the `clickhouse.d/conf.yaml` file, in the `conf.d/` folder at the root of your Agent's configuration directory to start collecting your ClickHouse performance data. See the [sample clickhouse.d/conf.yaml](https://github.com/DataDog/integrations-core/blob/master/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example) for all available configuration options. + +```yaml +init_config: + +instances: + - server: localhost + port: 8123 + username: datadog + password: + + # Enable Database Monitoring + dbm: true + + # Query Metrics Configuration + query_metrics: + enabled: true + collection_interval: 60 + + # Query Samples Configuration + query_samples: + enabled: true + collection_interval: 10 + + # Activity snapshot configuration + activity_enabled: true + activity_collection_interval: 10 + activity_max_rows: 1000 +``` + +#### Enable query_log + +For Database Monitoring features, you need to enable ClickHouse's `query_log`. Add this to your ClickHouse server configuration: + +```xml + + + system + query_log
+ 7500 +
+
+``` + +[Restart the Agent](https://docs.datadoghq.com/agent/guide/agent-commands/#start-stop-and-restart-the-agent) to start sending ClickHouse metrics to Datadog. + +### Validation + +[Run the Agent's status subcommand](https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information) and look for `clickhouse` under the Checks section. + +## Data Collected + +### Metrics + +The ClickHouse integration collects a wide range of metrics from ClickHouse system tables. See [metadata.csv](https://github.com/DataDog/integrations-core/blob/master/clickhouse/metadata.csv) for a list of metrics provided by this integration. + +### Database Monitoring + +When Database Monitoring is enabled, the integration collects: + +- **Query Metrics**: Aggregated query performance metrics from `system.query_log` +- **Query Samples**: Execution plans for currently running queries from `system.processes` +- **Activity Snapshots**: Real-time view of active sessions and connections + +### Events + +The ClickHouse check does not include any events. + +### Service Checks + +**clickhouse.can_connect**: +Returns `CRITICAL` if the Agent cannot connect to ClickHouse, otherwise returns `OK`. + +## Troubleshooting + +### Connection Issues + +If you encounter connection errors: + +1. Verify ClickHouse is running and accessible on the configured host and port +2. Use port `8123` (HTTP interface) for the agent connection +3. Ensure the `datadog` user has the required permissions +4. Check firewall rules allow connections from the Agent + +### Database Monitoring Not Collecting Data + +If DBM features are not working: + +1. Verify `dbm: true` is set in the configuration +2. Ensure `query_log` is enabled in ClickHouse server configuration +3. Check that the `datadog` user has SELECT permissions on `system.query_log` and `system.processes` +4. Review Agent logs for any errors + +For more troubleshooting help, contact [Datadog support](https://docs.datadoghq.com/help/). + +## Further Reading + +Additional helpful documentation, links, and articles: + +- [Monitor ClickHouse with Datadog](https://www.datadoghq.com/blog/monitor-clickhouse/) +- [Database Monitoring](https://docs.datadoghq.com/database_monitoring/) + diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index b2b59c0e1ebdd..6dcfc79d69800 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -319,8 +319,8 @@ def _to_active_session(self, row): Convert a system.processes row to an active session Similar to Postgres _to_active_session """ - # Filter out non-active queries - if not row.get('query') or not row.get('statement'): + # Only include rows with successfully obfuscated statements + if not row.get('statement'): return None # Remove null values and the raw query From 0b09f49482676076bccd596b8b32a7bcebbdbab7 Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Thu, 6 Nov 2025 22:02:27 -0500 Subject: [PATCH 06/10] Update --- clickhouse/assets/configuration/spec.yaml | 21 +++++++++++++++++++ .../clickhouse/config_models/instance.py | 3 +++ .../clickhouse/data/conf.yaml.example | 18 ++++++++++++++++ .../clickhouse/statement_samples.py | 8 +++++++ 4 files changed, 50 insertions(+) diff --git a/clickhouse/assets/configuration/spec.yaml b/clickhouse/assets/configuration/spec.yaml index 2e5d4ce9c6a6e..8f9b8ec744da5 100644 --- a/clickhouse/assets/configuration/spec.yaml +++ b/clickhouse/assets/configuration/spec.yaml @@ -117,6 +117,27 @@ files: value: type: boolean example: false + - name: activity_enabled + description: | + Enable collection of database activity snapshots. + Activity snapshots capture currently executing queries and active connections. + value: + type: boolean + example: true + - name: activity_collection_interval + description: | + The interval in seconds between activity snapshot collections. + Lower values capture more activity data but increase overhead. + For fast ClickHouse queries, consider using 1-5 seconds. + value: + type: number + example: 10 + - name: activity_max_rows + description: | + The maximum number of active sessions to include in each activity snapshot. + value: + type: number + example: 1000 - template: instances/db overrides: custom_queries.value.example: diff --git a/clickhouse/datadog_checks/clickhouse/config_models/instance.py b/clickhouse/datadog_checks/clickhouse/config_models/instance.py index 8d34440aef3ad..6a25ddd694be3 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/instance.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/instance.py @@ -46,6 +46,9 @@ class QuerySamples(BaseModel): arbitrary_types_allowed=True, frozen=True, ) + activity_collection_interval: Optional[float] = None + activity_enabled: Optional[bool] = None + activity_max_rows: Optional[float] = None collection_interval: Optional[float] = None enabled: Optional[bool] = None run_sync: Optional[bool] = None diff --git a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example index 1140bf85f453d..0f98da6559455 100644 --- a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example +++ b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example @@ -127,6 +127,24 @@ instances: # # run_sync: false + ## @param activity_enabled - boolean - optional - default: true + ## Enable collection of database activity snapshots. + ## Activity snapshots capture currently executing queries and active connections. + # + # activity_enabled: true + + ## @param activity_collection_interval - number - optional - default: 10 + ## The interval in seconds between activity snapshot collections. + ## Lower values capture more activity data but increase overhead. + ## For fast ClickHouse queries, consider using 1-5 seconds. + # + # activity_collection_interval: 10 + + ## @param activity_max_rows - number - optional - default: 1000 + ## The maximum number of active sessions to include in each activity snapshot. + # + # activity_max_rows: 1000 + ## @param only_custom_queries - boolean - optional - default: false ## Set this parameter to `true` if you want to skip the integration's default metrics collection. ## Only metrics specified in `custom_queries` will be collected. diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index 6dcfc79d69800..9f33c6d1d8c55 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -125,6 +125,14 @@ def __init__(self, check: ClickhouseCheck, config): self._activity_coll_interval = getattr(config, 'activity_collection_interval', 10) self._activity_max_rows = getattr(config, 'activity_max_rows', 1000) self._time_since_last_activity_event = 0 + + # Debug logging to verify config values + self._check.log.info( + "Activity config: enabled=%s, interval=%s, max_rows=%s", + self._activity_coll_enabled, + self._activity_coll_interval, + self._activity_max_rows + ) def _dbtags(self, db, *extra_tags): """ From 959cd716cad439ba4d8a9a013c8720a9d59e14bf Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Tue, 11 Nov 2025 06:53:34 +0530 Subject: [PATCH 07/10] Update --- .../datadog_checks/clickhouse/clickhouse.py | 4 + .../clickhouse/statement_samples.py | 78 ++++++++++-- clickhouse/tests/test_statement_samples.py | 111 ++++++++++++++++++ 3 files changed, 185 insertions(+), 8 deletions(-) diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index d8f94de1b567b..f4760ca364646 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -98,6 +98,10 @@ def __init__(self, config_dict): self.run_sync = config_dict.get('run_sync', False) self.samples_per_hour_per_query = config_dict.get('samples_per_hour_per_query', 15) self.seen_samples_cache_maxsize = config_dict.get('seen_samples_cache_maxsize', 10000) + # Activity snapshot configuration + self.activity_enabled = config_dict.get('activity_enabled', True) + self.activity_collection_interval = config_dict.get('activity_collection_interval', 10) + self.activity_max_rows = config_dict.get('activity_max_rows', 1000) self.statement_samples = ClickhouseStatementSamples(self, QuerySamplesConfig(self._query_samples_config)) else: diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index 9f33c6d1d8c55..350eff37ccde7 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -41,7 +41,24 @@ initial_query_id, initial_user, query_kind, - is_initial_query + is_initial_query, + peak_memory_usage, + total_rows_approx, + result_rows, + result_bytes, + query_start_time, + query_start_time_microseconds, + client_name, + client_version_major, + client_version_minor, + client_version_patch, + current_database, + thread_ids, + address, + port, + client_hostname, + is_cancelled, + http_user_agent FROM system.processes WHERE query NOT LIKE '%system.processes%' AND query NOT LIKE '%system.query_log%' @@ -54,10 +71,11 @@ SELECT user, query_kind, + current_database, count(*) as connections FROM system.processes WHERE query NOT LIKE '%system.processes%' -GROUP BY user, query_kind +GROUP BY user, query_kind, current_database """ # Columns from system.processes which correspond to attributes common to all databases @@ -65,9 +83,10 @@ system_processes_sample_exclude_keys = { # we process & obfuscate this separately 'query', - # stored separately + # stored separately in standard db fields 'user', 'query_id', + 'current_database', # stored as db.instance } @@ -125,13 +144,13 @@ def __init__(self, check: ClickhouseCheck, config): self._activity_coll_interval = getattr(config, 'activity_collection_interval', 10) self._activity_max_rows = getattr(config, 'activity_max_rows', 1000) self._time_since_last_activity_event = 0 - + # Debug logging to verify config values self._check.log.info( - "Activity config: enabled=%s, interval=%s, max_rows=%s", + "Activity config: enabled=%s, interval=%s, max_rows=%s", self._activity_coll_enabled, self._activity_coll_interval, - self._activity_max_rows + self._activity_max_rows, ) def _dbtags(self, db, *extra_tags): @@ -219,9 +238,27 @@ def _normalize_active_query_row(self, row): initial_user, query_kind, is_initial_query, + peak_memory_usage, + total_rows_approx, + result_rows, + result_bytes, + query_start_time, + query_start_time_microseconds, + client_name, + client_version_major, + client_version_minor, + client_version_patch, + current_database, + thread_ids, + address, + port, + client_hostname, + is_cancelled, + http_user_agent, ) = row normalized_row = { + # Original fields 'elapsed': float(elapsed) if elapsed else 0, 'query_id': str(query_id), 'query': str(query), @@ -235,6 +272,26 @@ def _normalize_active_query_row(self, row): 'initial_user': str(initial_user) if initial_user else None, 'query_kind': str(query_kind) if query_kind else None, 'is_initial_query': bool(is_initial_query) if is_initial_query is not None else True, + # New fields + 'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0, + 'total_rows_approx': int(total_rows_approx) if total_rows_approx else 0, + 'result_rows': int(result_rows) if result_rows else 0, + 'result_bytes': int(result_bytes) if result_bytes else 0, + 'query_start_time': str(query_start_time) if query_start_time else None, + 'query_start_time_microseconds': str(query_start_time_microseconds) + if query_start_time_microseconds + else None, + 'client_name': str(client_name) if client_name else None, + 'client_version_major': int(client_version_major) if client_version_major else None, + 'client_version_minor': int(client_version_minor) if client_version_minor else None, + 'client_version_patch': int(client_version_patch) if client_version_patch else None, + 'current_database': str(current_database) if current_database else None, + 'thread_ids': list(thread_ids) if thread_ids else [], + 'address': str(address) if address else None, + 'port': int(port) if port else None, + 'client_hostname': str(client_hostname) if client_hostname else None, + 'is_cancelled': bool(is_cancelled) if is_cancelled is not None else False, + 'http_user_agent': str(http_user_agent) if http_user_agent else None, } return self._obfuscate_and_normalize_query(normalized_row) @@ -310,7 +367,8 @@ def _get_active_connections(self): { 'user': row[0], 'query_kind': row[1], - 'connections': row[2], + 'current_database': row[2], + 'connections': row[3], } ) @@ -469,7 +527,8 @@ def _create_sample_event(self, row): Format follows Postgres integration pattern This represents currently executing queries from system.processes """ - db = self._check._db + # Use current_database from the query if available, fallback to check's default db + db = row.get('current_database') or self._check._db event = { "host": self._check.reported_hostname, @@ -520,13 +579,16 @@ def run_job(self): # Get active queries for activity snapshot rows = self._get_active_queries() + self._log.info("DEBUG: Retrieved %s raw rows from system.processes", len(rows)) rows = self._filter_and_normalize_statement_rows(rows) + self._log.info("DEBUG: After filtering/normalization: %s rows", len(rows)) # Get active connections aggregation active_connections = self._get_active_connections() # Create and submit activity event activity_event = self._create_activity_event(rows, active_connections) + self._log.info("DEBUG: Activity event has %s sessions", len(activity_event.get('clickhouse_activity', []))) self._check.database_monitoring_query_activity( json.dumps(activity_event, default=default_json_event_encoding) ) diff --git a/clickhouse/tests/test_statement_samples.py b/clickhouse/tests/test_statement_samples.py index a80aeb56c2f8b..4d43e5fd5f514 100644 --- a/clickhouse/tests/test_statement_samples.py +++ b/clickhouse/tests/test_statement_samples.py @@ -213,10 +213,35 @@ def test_active_queries_query_format(): # Verify query contains necessary clauses assert 'system.processes' in ACTIVE_QUERIES_QUERY + + # Original fields assert 'query_id' in ACTIVE_QUERIES_QUERY assert 'query' in ACTIVE_QUERIES_QUERY assert 'elapsed' in ACTIVE_QUERIES_QUERY assert 'memory_usage' in ACTIVE_QUERIES_QUERY + assert 'read_rows' in ACTIVE_QUERIES_QUERY + assert 'read_bytes' in ACTIVE_QUERIES_QUERY + assert 'written_rows' in ACTIVE_QUERIES_QUERY + assert 'written_bytes' in ACTIVE_QUERIES_QUERY + + # New enhanced fields + assert 'peak_memory_usage' in ACTIVE_QUERIES_QUERY + assert 'total_rows_approx' in ACTIVE_QUERIES_QUERY + assert 'result_rows' in ACTIVE_QUERIES_QUERY + assert 'result_bytes' in ACTIVE_QUERIES_QUERY + assert 'query_start_time' in ACTIVE_QUERIES_QUERY + assert 'query_start_time_microseconds' in ACTIVE_QUERIES_QUERY + assert 'client_name' in ACTIVE_QUERIES_QUERY + assert 'client_version_major' in ACTIVE_QUERIES_QUERY + assert 'client_version_minor' in ACTIVE_QUERIES_QUERY + assert 'client_version_patch' in ACTIVE_QUERIES_QUERY + assert 'current_database' in ACTIVE_QUERIES_QUERY + assert 'thread_ids' in ACTIVE_QUERIES_QUERY + assert 'address' in ACTIVE_QUERIES_QUERY + assert 'port' in ACTIVE_QUERIES_QUERY + assert 'client_hostname' in ACTIVE_QUERIES_QUERY + assert 'is_cancelled' in ACTIVE_QUERIES_QUERY + assert 'http_user_agent' in ACTIVE_QUERIES_QUERY @mock.patch('datadog_checks.clickhouse.statement_samples.datadog_agent') @@ -259,3 +284,89 @@ def test_dbtags(check_with_dbm): assert 'extra:tag' in db_tags assert 'test:clickhouse' in db_tags assert 'server:localhost' in db_tags + + +def test_normalize_active_query_row_with_all_fields(check_with_dbm): + """Test that all new fields are properly normalized""" + statement_samples = check_with_dbm.statement_samples + + # Create a mock row with all fields (31 fields total) + mock_row = ( + 1.234, # elapsed + 'abc-123-def', # query_id + 'SELECT * FROM users WHERE id = 1', # query + 'default', # user + 1000, # read_rows + 50000, # read_bytes + 0, # written_rows + 0, # written_bytes + 1048576, # memory_usage + 'abc-123-def', # initial_query_id + 'default', # initial_user + 'Select', # query_kind + 1, # is_initial_query + 2097152, # peak_memory_usage (NEW) + 1000000, # total_rows_approx (NEW) + 1000, # result_rows (NEW) + 45000, # result_bytes (NEW) + '2025-11-07 10:05:01', # query_start_time (NEW) + '2025-11-07 10:05:01.123456', # query_start_time_microseconds (NEW) + 'python-clickhouse-driver', # client_name (NEW) + 0, # client_version_major (NEW) + 2, # client_version_minor (NEW) + 4, # client_version_patch (NEW) + 'analytics', # current_database (NEW) + [123, 124, 125], # thread_ids (NEW) + '192.168.1.100', # address (NEW) + 54321, # port (NEW) + 'app-server-01', # client_hostname (NEW) + 0, # is_cancelled (NEW) + 'python-requests/2.28.0', # http_user_agent (NEW) + ) + + normalized_row = statement_samples._normalize_active_query_row(mock_row) + + # Verify original fields + assert normalized_row['elapsed'] == 1.234 + assert normalized_row['query_id'] == 'abc-123-def' + assert normalized_row['user'] == 'default' + assert normalized_row['read_rows'] == 1000 + assert normalized_row['read_bytes'] == 50000 + assert normalized_row['memory_usage'] == 1048576 + assert normalized_row['query_kind'] == 'Select' + assert normalized_row['is_initial_query'] is True + + # Verify new memory & performance fields + assert normalized_row['peak_memory_usage'] == 2097152 + assert normalized_row['total_rows_approx'] == 1000000 + assert normalized_row['result_rows'] == 1000 + assert normalized_row['result_bytes'] == 45000 + + # Verify new timing fields + assert normalized_row['query_start_time'] == '2025-11-07 10:05:01' + assert normalized_row['query_start_time_microseconds'] == '2025-11-07 10:05:01.123456' + + # Verify new client fields + assert normalized_row['client_name'] == 'python-clickhouse-driver' + assert normalized_row['client_version_major'] == 0 + assert normalized_row['client_version_minor'] == 2 + assert normalized_row['client_version_patch'] == 4 + assert normalized_row['client_hostname'] == 'app-server-01' + assert normalized_row['address'] == '192.168.1.100' + assert normalized_row['port'] == 54321 + + # Verify new database field + assert normalized_row['current_database'] == 'analytics' + + # Verify new thread fields + assert normalized_row['thread_ids'] == [123, 124, 125] + + # Verify new status fields + assert normalized_row['is_cancelled'] is False + + # Verify new HTTP field + assert normalized_row['http_user_agent'] == 'python-requests/2.28.0' + + # Verify obfuscation happened (statement should be set) + assert 'statement' in normalized_row + assert 'query_signature' in normalized_row From 943fd353b8472ac9a238f68611234185aef556bd Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Sat, 15 Nov 2025 11:25:50 +0530 Subject: [PATCH 08/10] Update --- .../clickhouse/statement_samples.py | 16 ++-------------- .../datadog_checks/clickhouse/statements.py | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index 350eff37ccde7..db1170c76772b 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -27,6 +27,8 @@ # Query to get currently running/active queries from system.processes # This is the ClickHouse equivalent of Postgres pg_stat_activity +# Note: result_rows, result_bytes, query_start_time, query_start_time_microseconds +# don't exist in ClickHouse (as of 24.11), so they're excluded ACTIVE_QUERIES_QUERY = """ SELECT elapsed, @@ -44,10 +46,6 @@ is_initial_query, peak_memory_usage, total_rows_approx, - result_rows, - result_bytes, - query_start_time, - query_start_time_microseconds, client_name, client_version_major, client_version_minor, @@ -240,10 +238,6 @@ def _normalize_active_query_row(self, row): is_initial_query, peak_memory_usage, total_rows_approx, - result_rows, - result_bytes, - query_start_time, - query_start_time_microseconds, client_name, client_version_major, client_version_minor, @@ -275,12 +269,6 @@ def _normalize_active_query_row(self, row): # New fields 'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0, 'total_rows_approx': int(total_rows_approx) if total_rows_approx else 0, - 'result_rows': int(result_rows) if result_rows else 0, - 'result_bytes': int(result_bytes) if result_bytes else 0, - 'query_start_time': str(query_start_time) if query_start_time else None, - 'query_start_time_microseconds': str(query_start_time_microseconds) - if query_start_time_microseconds - else None, 'client_name': str(client_name) if client_name else None, 'client_version_major': int(client_version_major) if client_version_major else None, 'client_version_minor': int(client_version_minor) if client_version_minor else None, diff --git a/clickhouse/datadog_checks/clickhouse/statements.py b/clickhouse/datadog_checks/clickhouse/statements.py index 8ab22ef3492b3..b361e10425388 100644 --- a/clickhouse/datadog_checks/clickhouse/statements.py +++ b/clickhouse/datadog_checks/clickhouse/statements.py @@ -155,6 +155,7 @@ def collect_per_statement_metrics(self): # Prepare metrics payload wrapper payload_wrapper = { 'host': self._check.reported_hostname, + 'database_instance': self._check.database_identifier, 'timestamp': time.time() * 1000, 'min_collection_interval': self._metrics_collection_interval, 'tags': self._tags_no_db, @@ -166,6 +167,14 @@ def collect_per_statement_metrics(self): payloads = self._get_query_metrics_payloads(payload_wrapper, rows) for payload in payloads: + payload_data = json.loads(payload) + num_rows = len(payload_data.get('clickhouse_rows', [])) + self._log.info( + "Submitting query metrics payload: %d bytes, %d rows, database_instance=%s", + len(payload), + num_rows, + payload_data.get('database_instance', 'MISSING') + ) self._check.database_monitoring_query_metrics(payload) except Exception: @@ -325,7 +334,16 @@ def _collect_metrics_rows(self): } # Compute derivative rows (calculate deltas since last collection) + rows_before = len(rows) rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key, execution_indicators=['calls']) + rows_after = len(rows) + + self._log.info( + "Query metrics: loaded=%d rows, after_derivative=%d rows (filtered=%d)", + rows_before, + rows_after, + rows_before - rows_after + ) self._check.gauge( 'dd.clickhouse.queries.query_rows_raw', From c96290387144b6dc91372da22e0a39a62e98245a Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Mon, 17 Nov 2025 23:34:20 +0530 Subject: [PATCH 09/10] Add new metadata payloads --- clickhouse/assets/configuration/spec.yaml | 79 +++++++++++++++ .../datadog_checks/clickhouse/clickhouse.py | 98 +++++++++++++++++++ .../clickhouse/config_models/defaults.py | 4 + .../clickhouse/config_models/instance.py | 31 ++++++ .../clickhouse/data/conf.yaml.example | 65 ++++++++++++ .../clickhouse/statement_samples.py | 4 +- .../datadog_checks/clickhouse/statements.py | 4 +- clickhouse/tests/test_clickhouse.py | 48 +++++++++ 8 files changed, 330 insertions(+), 3 deletions(-) diff --git a/clickhouse/assets/configuration/spec.yaml b/clickhouse/assets/configuration/spec.yaml index 8f9b8ec744da5..ef3687c83926b 100644 --- a/clickhouse/assets/configuration/spec.yaml +++ b/clickhouse/assets/configuration/spec.yaml @@ -138,6 +138,85 @@ files: value: type: number example: 1000 + - name: database_instance_collection_interval + hidden: true + description: | + Set the database instance collection interval (in seconds). The database instance collection sends + basic information about the database instance along with a signal that it still exists. + This collection does not involve any additional queries to the database. + value: + type: number + default: 300 + - name: aws + description: | + This block defines the configuration for AWS RDS and Aurora instances. + + Complete this section if you have installed the Datadog AWS Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: instance_endpoint + description: | + Equal to the Endpoint.Address of the instance the agent is connecting to. + This value is optional if the value of `server` is already configured to the instance endpoint. + + For more information on instance endpoints, + see the AWS docs https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_Endpoint.html + value: + type: string + example: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + - name: gcp + description: | + This block defines the configuration for Google Cloud SQL instances. + + Complete this section if you have installed the Datadog GCP Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: project_id + description: | + Equal to the GCP resource's project ID. + + For more information on project IDs, + see the GCP docs https://cloud.google.com/resource-manager/docs/creating-managing-projects + value: + type: string + example: foo-project + - name: instance_id + description: | + Equal to the GCP resource's instance ID. + + For more information on instance IDs, + see the GCP docs https://cloud.google.com/sql/docs/mysql/instance-settings#instance-id-2ndgen + value: + type: string + example: foo-database + - name: azure + description: | + This block defines the configuration for Azure Database for ClickHouse. + + Complete this section if you have installed the Datadog Azure Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: deployment_type + description: | + Equal to the deployment type for the managed database. + + For Azure, this is typically 'flexible_server' or 'single_server'. + value: + type: string + example: flexible_server + - name: fully_qualified_domain_name + description: | + Equal to the fully qualified domain name of the Azure database. + + This value is optional if the value of `server` is already configured to the fully qualified domain name. + value: + type: string + example: my-clickhouse.database.windows.net + - name: database_name + description: | + The database name for the Azure instance. + value: + type: string - template: instances/db overrides: custom_queries.value.example: diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index f4760ca364646..cb4da5d7783bd 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -1,16 +1,27 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import json +from time import time + import clickhouse_connect +from cachetools import TTLCache from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative from datadog_checks.base.utils.db import QueryManager +from datadog_checks.base.utils.db.utils import default_json_event_encoding from . import queries +from .__about__ import __version__ from .statement_samples import ClickhouseStatementSamples from .statements import ClickhouseStatementMetrics from .utils import ErrorSanitizer +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + class ClickhouseCheck(AgentCheck): __NAMESPACE__ = 'clickhouse' @@ -35,6 +46,44 @@ def __init__(self, name, init_config, instances): # DBM-related properties self._resolved_hostname = None self._database_identifier = None + self._agent_hostname = None + + # Cloud metadata configuration + self._cloud_metadata = {} + + # AWS cloud metadata + aws_config = self.instance.get('aws', {}) + if aws_config.get('instance_endpoint'): + self._cloud_metadata['aws'] = {'instance_endpoint': aws_config['instance_endpoint']} + + # GCP cloud metadata + gcp_config = self.instance.get('gcp', {}) + if gcp_config.get('project_id') and gcp_config.get('instance_id'): + self._cloud_metadata['gcp'] = { + 'project_id': gcp_config['project_id'], + 'instance_id': gcp_config['instance_id'], + } + + # Azure cloud metadata + azure_config = self.instance.get('azure', {}) + if azure_config.get('deployment_type') and azure_config.get('fully_qualified_domain_name'): + self._cloud_metadata['azure'] = { + 'deployment_type': azure_config['deployment_type'], + 'fully_qualified_domain_name': azure_config['fully_qualified_domain_name'], + } + if azure_config.get('database_name'): + self._cloud_metadata['azure']['database_name'] = azure_config['database_name'] + + # Database instance metadata collection interval + self._database_instance_collection_interval = float( + self.instance.get('database_instance_collection_interval', 300) + ) + + # _database_instance_emitted: limit the collection and transmission of the database instance metadata + self._database_instance_emitted = TTLCache( + maxsize=1, + ttl=self._database_instance_collection_interval, + ) # Add global tags self._tags.append('server:{}'.format(self._server)) @@ -107,11 +156,53 @@ def __init__(self, config_dict): else: self.statement_samples = None + def _send_database_instance_metadata(self): + """Send database instance metadata to the metadata intake.""" + if self.database_identifier not in self._database_instance_emitted: + # Get the version for the metadata + version = None + try: + version_result = list(self.execute_query_raw('SELECT version()'))[0][0] + version = version_result + except Exception as e: + self.log.debug("Unable to fetch version for metadata: %s", e) + version = "unknown" + + event = { + "host": self.reported_hostname, + "port": self._port, + "database_instance": self.database_identifier, + "database_hostname": self.reported_hostname, + "agent_version": datadog_agent.get_version(), + "ddagenthostname": self.agent_hostname, + "dbms": "clickhouse", + "kind": "database_instance", + "collection_interval": self._database_instance_collection_interval, + "dbms_version": version, + "integration_version": __version__, + "tags": [t for t in self._tags if not t.startswith('db:')], + "timestamp": time() * 1000, + "metadata": { + "dbm": self._dbm_enabled, + "connection_host": self._server, + }, + } + + # Add cloud metadata if available + if self._cloud_metadata: + event["cloud_metadata"] = self._cloud_metadata + + self._database_instance_emitted[self.database_identifier] = event + self.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + def check(self, _): self.connect() self._query_manager.execute() self.collect_version() + # Send database instance metadata + self._send_database_instance_metadata() + # Run query metrics collection if DBM is enabled (from system.query_log) if self.statement_metrics: self.statement_metrics.run_job_loop(self._tags) @@ -145,6 +236,13 @@ def reported_hostname(self): self._resolved_hostname = self._server return self._resolved_hostname + @property + def agent_hostname(self): + """Get the agent hostname.""" + if self._agent_hostname is None: + self._agent_hostname = datadog_agent.get_hostname() + return self._agent_hostname + @property def database_identifier(self): """ diff --git a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py index 590aed6495569..787db6fbe2ca4 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py @@ -12,6 +12,10 @@ def instance_connect_timeout(): return 10 +def instance_database_instance_collection_interval(): + return 300 + + def instance_db(): return 'default' diff --git a/clickhouse/datadog_checks/clickhouse/config_models/instance.py b/clickhouse/datadog_checks/clickhouse/config_models/instance.py index 6a25ddd694be3..8ada723f83c23 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/instance.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/instance.py @@ -20,6 +20,24 @@ from . import defaults, validators +class Aws(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + instance_endpoint: Optional[str] = None + + +class Azure(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + database_name: Optional[str] = None + deployment_type: Optional[str] = None + fully_qualified_domain_name: Optional[str] = None + + class CustomQuery(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -32,6 +50,15 @@ class CustomQuery(BaseModel): tags: Optional[tuple[str, ...]] = None +class Gcp(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + instance_id: Optional[str] = None + project_id: Optional[str] = None + + class MetricPatterns(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -62,13 +89,17 @@ class InstanceConfig(BaseModel): arbitrary_types_allowed=True, frozen=True, ) + aws: Optional[Aws] = None + azure: Optional[Azure] = None compression: Optional[str] = None connect_timeout: Optional[int] = None custom_queries: Optional[tuple[CustomQuery, ...]] = None + database_instance_collection_interval: Optional[float] = None db: Optional[str] = None dbm: Optional[bool] = None disable_generic_tags: Optional[bool] = None empty_default_hostname: Optional[bool] = None + gcp: Optional[Gcp] = None metric_patterns: Optional[MetricPatterns] = None min_collection_interval: Optional[float] = None only_custom_queries: Optional[bool] = None diff --git a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example index 0f98da6559455..5d761f2724ff9 100644 --- a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example +++ b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example @@ -145,6 +145,71 @@ instances: # # activity_max_rows: 1000 + ## This block defines the configuration for AWS RDS and Aurora instances. + ## + ## Complete this section if you have installed the Datadog AWS Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # aws: + + ## @param instance_endpoint - string - optional - default: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + ## Equal to the Endpoint.Address of the instance the agent is connecting to. + ## This value is optional if the value of `server` is already configured to the instance endpoint. + ## + ## For more information on instance endpoints, + ## see the AWS docs https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_Endpoint.html + # + # instance_endpoint: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + + ## This block defines the configuration for Google Cloud SQL instances. + ## + ## Complete this section if you have installed the Datadog GCP Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # gcp: + + ## @param project_id - string - optional - default: foo-project + ## Equal to the GCP resource's project ID. + ## + ## For more information on project IDs, + ## see the GCP docs https://cloud.google.com/resource-manager/docs/creating-managing-projects + # + # project_id: foo-project + + ## @param instance_id - string - optional - default: foo-database + ## Equal to the GCP resource's instance ID. + ## + ## For more information on instance IDs, + ## see the GCP docs https://cloud.google.com/sql/docs/mysql/instance-settings#instance-id-2ndgen + # + # instance_id: foo-database + + ## This block defines the configuration for Azure Database for ClickHouse. + ## + ## Complete this section if you have installed the Datadog Azure Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # azure: + + ## @param deployment_type - string - optional - default: flexible_server + ## Equal to the deployment type for the managed database. + ## + ## For Azure, this is typically 'flexible_server' or 'single_server'. + # + # deployment_type: flexible_server + + ## @param fully_qualified_domain_name - string - optional - default: my-clickhouse.database.windows.net + ## Equal to the fully qualified domain name of the Azure database. + ## + ## This value is optional if the value of `server` is already configured to the fully qualified domain name. + # + # fully_qualified_domain_name: my-clickhouse.database.windows.net + + ## @param database_name - string - optional + ## The database name for the Azure instance. + # + # database_name: + ## @param only_custom_queries - boolean - optional - default: false ## Set this parameter to `true` if you want to skip the integration's default metrics collection. ## Only metrics specified in `custom_queries` will be collected. diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py index db1170c76772b..4f7d52ee9af90 100644 --- a/clickhouse/datadog_checks/clickhouse/statement_samples.py +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -576,7 +576,9 @@ def run_job(self): # Create and submit activity event activity_event = self._create_activity_event(rows, active_connections) - self._log.info("DEBUG: Activity event has %s sessions", len(activity_event.get('clickhouse_activity', []))) + self._log.info( + "DEBUG: Activity event has %s sessions", len(activity_event.get('clickhouse_activity', [])) + ) self._check.database_monitoring_query_activity( json.dumps(activity_event, default=default_json_event_encoding) ) diff --git a/clickhouse/datadog_checks/clickhouse/statements.py b/clickhouse/datadog_checks/clickhouse/statements.py index b361e10425388..786f2490e48c8 100644 --- a/clickhouse/datadog_checks/clickhouse/statements.py +++ b/clickhouse/datadog_checks/clickhouse/statements.py @@ -173,7 +173,7 @@ def collect_per_statement_metrics(self): "Submitting query metrics payload: %d bytes, %d rows, database_instance=%s", len(payload), num_rows, - payload_data.get('database_instance', 'MISSING') + payload_data.get('database_instance', 'MISSING'), ) self._check.database_monitoring_query_metrics(payload) @@ -342,7 +342,7 @@ def _collect_metrics_rows(self): "Query metrics: loaded=%d rows, after_derivative=%d rows (filtered=%d)", rows_before, rows_after, - rows_before - rows_after + rows_before - rows_after, ) self._check.gauge( diff --git a/clickhouse/tests/test_clickhouse.py b/clickhouse/tests/test_clickhouse.py index 69dfbf2d01a06..6aa234751141e 100644 --- a/clickhouse/tests/test_clickhouse.py +++ b/clickhouse/tests/test_clickhouse.py @@ -73,3 +73,51 @@ def test_version_metadata(instance, datadog_agent, dd_run_check): datadog_agent.assert_metadata( 'test:123', {'version.scheme': 'calver', 'version.year': CLICKHOUSE_VERSION.split(".")[0]} ) + + +def test_database_instance_metadata(aggregator, instance, datadog_agent, dd_run_check): + """Test that database_instance metadata is sent correctly.""" + check = ClickhouseCheck('clickhouse', {}, [instance]) + check.check_id = 'test:456' + dd_run_check(check) + + # Get database monitoring metadata events + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + + # Find the database_instance event + event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None) + + assert event is not None, "database_instance metadata event should be sent" + assert event['dbms'] == 'clickhouse' + assert event['kind'] == 'database_instance' + assert event['database_instance'] == check.database_identifier + assert event['collection_interval'] == 300 + assert 'metadata' in event + assert 'dbm' in event['metadata'] + assert 'connection_host' in event['metadata'] + assert event['metadata']['connection_host'] == instance['server'] + + +def test_database_instance_metadata_with_cloud_metadata(aggregator, instance, datadog_agent, dd_run_check): + """Test that database_instance metadata includes cloud metadata when configured.""" + instance = instance.copy() + instance['aws'] = {'instance_endpoint': 'my-clickhouse.us-east-1.rds.amazonaws.com'} + instance['gcp'] = {'project_id': 'my-project', 'instance_id': 'my-instance'} + + check = ClickhouseCheck('clickhouse', {}, [instance]) + check.check_id = 'test:789' + dd_run_check(check) + + # Get database monitoring metadata events + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + + # Find the database_instance event + event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None) + + assert event is not None + assert 'cloud_metadata' in event + assert 'aws' in event['cloud_metadata'] + assert event['cloud_metadata']['aws']['instance_endpoint'] == 'my-clickhouse.us-east-1.rds.amazonaws.com' + assert 'gcp' in event['cloud_metadata'] + assert event['cloud_metadata']['gcp']['project_id'] == 'my-project' + assert event['cloud_metadata']['gcp']['instance_id'] == 'my-instance' From 3d763127d05ce7154c97c8a1b69d92f480f73fbb Mon Sep 17 00:00:00 2001 From: sangeetashivaji Date: Thu, 20 Nov 2025 22:19:33 +0530 Subject: [PATCH 10/10] Fix avg & count metrics --- .../datadog_checks/clickhouse/statements.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/clickhouse/datadog_checks/clickhouse/statements.py b/clickhouse/datadog_checks/clickhouse/statements.py index 786f2490e48c8..5c3a73d1e011d 100644 --- a/clickhouse/datadog_checks/clickhouse/statements.py +++ b/clickhouse/datadog_checks/clickhouse/statements.py @@ -26,6 +26,9 @@ # Query to fetch aggregated metrics from system.query_log # This is the ClickHouse equivalent of Postgres pg_stat_statements +# Note: We collect count() and sum() metrics which are treated as cumulative counters +# and then compute derivatives. We no longer collect avg/min/max/percentile as they +# don't make sense after derivative calculation (mean_time is computed from total_time/count). STATEMENTS_QUERY = """ SELECT normalized_query_hash, @@ -36,10 +39,6 @@ any(tables) as tables, count() as execution_count, sum(query_duration_ms) as total_duration_ms, - avg(query_duration_ms) as avg_duration_ms, - min(query_duration_ms) as min_duration_ms, - max(query_duration_ms) as max_duration_ms, - quantile(0.95)(query_duration_ms) as p95_duration_ms, sum(read_rows) as total_read_rows, sum(read_bytes) as total_read_bytes, sum(written_rows) as total_written_rows, @@ -248,10 +247,6 @@ def _load_query_log_statements(self): tables, execution_count, total_duration_ms, - avg_duration_ms, - min_duration_ms, - max_duration_ms, - p95_duration_ms, total_read_rows, total_read_bytes, total_written_rows, @@ -270,12 +265,11 @@ def _load_query_log_statements(self): 'query_type': str(query_type) if query_type else '', 'databases': str(databases[0]) if databases and len(databases) > 0 else '', 'tables': tables if tables else [], - 'calls': int(execution_count) if execution_count else 0, + 'count': int(execution_count) if execution_count else 0, 'total_time': float(total_duration_ms) if total_duration_ms else 0.0, - 'mean_time': float(avg_duration_ms) if avg_duration_ms else 0.0, - 'min_time': float(min_duration_ms) if min_duration_ms else 0.0, - 'max_time': float(max_duration_ms) if max_duration_ms else 0.0, - 'p95_time': float(p95_duration_ms) if p95_duration_ms else 0.0, + # Note: mean_time will be calculated after derivative calculation as total_time / count + # min_time, max_time, p95_time are not included because they are aggregates that + # don't make sense after taking derivatives 'rows': int(total_result_rows) if total_result_rows else 0, 'read_rows': int(total_read_rows) if total_read_rows else 0, 'read_bytes': int(total_read_bytes) if total_read_bytes else 0, @@ -315,14 +309,13 @@ def _collect_metrics_rows(self): return [] # Get available metric columns + # Note: We only include counter metrics (count, totals) in derivative calculation. + # Aggregated metrics like mean_time, min_time, max_time, p95_time are excluded + # because taking derivatives of averages/percentiles is mathematically incorrect. available_columns = set(rows[0].keys()) metric_columns = available_columns & { - 'calls', + 'count', 'total_time', - 'mean_time', - 'min_time', - 'max_time', - 'p95_time', 'rows', 'read_rows', 'read_bytes', @@ -335,9 +328,17 @@ def _collect_metrics_rows(self): # Compute derivative rows (calculate deltas since last collection) rows_before = len(rows) - rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key, execution_indicators=['calls']) + rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key, execution_indicators=['count']) rows_after = len(rows) + # Calculate mean_time from derivative values (total_time / count) + # This follows the same pattern as Postgres, MySQL, and SQL Server + for row in rows: + if row.get('count', 0) > 0: + row['mean_time'] = row.get('total_time', 0.0) / row['count'] + else: + row['mean_time'] = 0.0 + self._log.info( "Query metrics: loaded=%d rows, after_derivative=%d rows (filtered=%d)", rows_before,