diff --git a/clickhouse/README.md b/clickhouse/README.md index d5904502c12f8..f3ea2e0945a13 100644 --- a/clickhouse/README.md +++ b/clickhouse/README.md @@ -1,114 +1,135 @@ -# Agent Check: ClickHouse +# ClickHouse Integration ## Overview -This check monitors [ClickHouse][1] through the Datadog Agent. +The ClickHouse integration provides health and performance metrics for your ClickHouse database in near real-time. Visualize these metrics with the provided dashboard and create monitors to alert your team on ClickHouse states. -**Minimum Agent version:** 7.16.0 +Enable Database Monitoring (DBM) for enhanced insights into query performance and database health. In addition to the standard integration, Datadog DBM provides query-level metrics, live and historical query snapshots, and query explain plans. -## Setup +**Minimum Agent version:** 7.50.0 -Follow the instructions below to install and configure this check for an Agent running on a host. For containerized environments, see the [Autodiscovery Integration Templates][2] for guidance on applying these instructions. +## Setup ### Installation -The ClickHouse check is included in the [Datadog Agent][3] package. No additional installation is needed on your server. +The ClickHouse check is packaged with the Agent. To start gathering your ClickHouse metrics and logs, [install the Agent](https://docs.datadoghq.com/agent/). ### Configuration - - +#### Prepare ClickHouse -#### Host +To get started with the ClickHouse integration, create a `datadog` user with proper access to your ClickHouse server. -To configure this check for an Agent running on a host: +```sql +CREATE USER datadog IDENTIFIED BY ''; +GRANT SELECT ON system.* TO datadog; +GRANT SELECT ON information_schema.* TO datadog; +GRANT SHOW DATABASES ON *.* TO datadog; +GRANT SHOW TABLES ON *.* TO datadog; +GRANT SHOW COLUMNS ON *.* TO datadog; +``` -#### Metric collection +#### Configure the Agent -1. To start collecting your ClickHouse performance data, edit the `clickhouse.d/conf.yaml` file in the `conf.d/` folder at the root of your Agent's configuration directory. See the [sample clickhouse.d/conf.yaml][4] for all available configuration options. +Edit the `clickhouse.d/conf.yaml` file, in the `conf.d/` folder at the root of your Agent's configuration directory to start collecting your ClickHouse performance data. See the [sample clickhouse.d/conf.yaml](https://github.com/DataDog/integrations-core/blob/master/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example) for all available configuration options. -*Note*: This integration uses the official `clickhouse-connect` client to connect over HTTP. +```yaml +init_config: -2. [Restart the Agent][5]. +instances: + - server: localhost + port: 8123 + username: datadog + password: -##### Log collection + # Enable Database Monitoring + dbm: true -1. Collecting logs is disabled by default in the Datadog Agent, enable it in your `datadog.yaml` file: + # Query Metrics Configuration + query_metrics: + enabled: true + collection_interval: 60 - ```yaml - logs_enabled: true - ``` + # Query Samples Configuration + query_samples: + enabled: true + collection_interval: 10 -2. Add the log files you are interested in to your `clickhouse.d/conf.yaml` file to start collecting your ClickHouse logs: + # Activity snapshot configuration + activity_enabled: true + activity_collection_interval: 10 + activity_max_rows: 1000 +``` - ```yaml - logs: - - type: file - path: /var/log/clickhouse-server/clickhouse-server.log - source: clickhouse - service: "" - ``` +#### Enable query_log - Change the `path` and `service` parameter values and configure them for your environment. See the [sample clickhouse.d/conf.yaml][4] for all available configuration options. +For Database Monitoring features, you need to enable ClickHouse's `query_log`. Add this to your ClickHouse server configuration: -3. [Restart the Agent][5]. +```xml + + + system + query_log
+ 7500 +
+
+``` - - +[Restart the Agent](https://docs.datadoghq.com/agent/guide/agent-commands/#start-stop-and-restart-the-agent) to start sending ClickHouse metrics to Datadog. -#### Containerized +### Validation -For containerized environments, see the [Autodiscovery Integration Templates][2] for guidance on applying the parameters below. +[Run the Agent's status subcommand](https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information) and look for `clickhouse` under the Checks section. -#### Metric collection +## Data Collected -| Parameter | Value | -|----------------------|------------------------------------------------------------| -| `` | `clickhouse` | -| `` | blank or `{}` | -| `` | `{"server": "%%host%%", "port": "%%port%%", "username": "", "password": ""}` | +### Metrics -##### Log collection +The ClickHouse integration collects a wide range of metrics from ClickHouse system tables. See [metadata.csv](https://github.com/DataDog/integrations-core/blob/master/clickhouse/metadata.csv) for a list of metrics provided by this integration. -Collecting logs is disabled by default in the Datadog Agent. To enable it, see [Kubernetes log collection][6]. +### Database Monitoring -| Parameter | Value | -|----------------|-------------------------------------------| -| `` | `{"source": "clickhouse", "service": ""}` | +When Database Monitoring is enabled, the integration collects: - - +- **Query Metrics**: Aggregated query performance metrics from `system.query_log` +- **Query Samples**: Execution plans for currently running queries from `system.processes` +- **Activity Snapshots**: Real-time view of active sessions and connections -### Validation +### Events -[Run the Agent's status subcommand][7] and look for `clickhouse` under the **Checks** section. +The ClickHouse check does not include any events. -## Data Collected +### Service Checks -### Metrics +**clickhouse.can_connect**: +Returns `CRITICAL` if the Agent cannot connect to ClickHouse, otherwise returns `OK`. -See [metadata.csv][8] for a list of metrics provided by this integration. +## Troubleshooting -### Events +### Connection Issues -The ClickHouse check does not include any events. +If you encounter connection errors: -### Service Checks +1. Verify ClickHouse is running and accessible on the configured host and port +2. Use port `8123` (HTTP interface) for the agent connection +3. Ensure the `datadog` user has the required permissions +4. Check firewall rules allow connections from the Agent -See [service_checks.json][9] for a list of service checks provided by this integration. +### Database Monitoring Not Collecting Data -## Troubleshooting +If DBM features are not working: + +1. Verify `dbm: true` is set in the configuration +2. Ensure `query_log` is enabled in ClickHouse server configuration +3. Check that the `datadog` user has SELECT permissions on `system.query_log` and `system.processes` +4. Review Agent logs for any errors + +For more troubleshooting help, contact [Datadog support](https://docs.datadoghq.com/help/). + +## Further Reading -Need help? Contact [Datadog support][10]. +Additional helpful documentation, links, and articles: +- [Monitor ClickHouse with Datadog](https://www.datadoghq.com/blog/monitor-clickhouse/) +- [Database Monitoring](https://docs.datadoghq.com/database_monitoring/) -[1]: https://clickhouse.yandex -[2]: https://docs.datadoghq.com/agent/kubernetes/integrations/ -[3]: /account/settings/agent/latest -[4]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example -[5]: https://docs.datadoghq.com/agent/guide/agent-commands/#start-stop-and-restart-the-agent -[6]: https://docs.datadoghq.com/agent/kubernetes/log/ -[7]: https://docs.datadoghq.com/agent/guide/agent-commands/#agent-status-and-information -[8]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/metadata.csv -[9]: https://github.com/DataDog/integrations-core/blob/master/clickhouse/assets/service_checks.json -[10]: https://docs.datadoghq.com/help/ diff --git a/clickhouse/assets/configuration/spec.yaml b/clickhouse/assets/configuration/spec.yaml index 224d53ed9f9aa..ef3687c83926b 100644 --- a/clickhouse/assets/configuration/spec.yaml +++ b/clickhouse/assets/configuration/spec.yaml @@ -14,7 +14,6 @@ files: - template: instances options: - name: server - required: true description: The hostname used to connect to the system. value: type: string @@ -51,7 +50,7 @@ files: description: | The compression algorithm to use. The default is no compression. If br is specified, the brotli library must be installed separately. - + Valid values are: - lz4 - zstd @@ -73,6 +72,151 @@ files: value: type: boolean example: True + - name: dbm + description: | + Enable Database Monitoring (DBM) to collect query samples and execution plans. + This feature provides deep observability into query performance. + value: + type: boolean + example: false + - name: query_samples + description: | + Configuration for collecting query samples when Database Monitoring (DBM) is enabled. + Query samples provide insights into the queries being executed on your ClickHouse instance. + options: + - name: enabled + description: Enable collection of query samples. + value: + type: boolean + example: true + - name: collection_interval + description: | + The interval in seconds between query sample collections. + Lower values provide more granular data but increase overhead. + value: + type: number + example: 10 + - name: samples_per_hour_per_query + description: | + The maximum number of samples to collect per unique query signature per hour. + This helps limit the volume of data collected while still providing useful insights. + value: + type: number + example: 15 + - name: seen_samples_cache_maxsize + description: | + The maximum size of the cache used to track which query samples have been collected. + A larger cache can help avoid collecting duplicate samples. + value: + type: number + example: 10000 + - name: run_sync + description: | + Whether to run query sample collection synchronously in the check run. + Set to false (default) to run asynchronously in a separate thread. + value: + type: boolean + example: false + - name: activity_enabled + description: | + Enable collection of database activity snapshots. + Activity snapshots capture currently executing queries and active connections. + value: + type: boolean + example: true + - name: activity_collection_interval + description: | + The interval in seconds between activity snapshot collections. + Lower values capture more activity data but increase overhead. + For fast ClickHouse queries, consider using 1-5 seconds. + value: + type: number + example: 10 + - name: activity_max_rows + description: | + The maximum number of active sessions to include in each activity snapshot. + value: + type: number + example: 1000 + - name: database_instance_collection_interval + hidden: true + description: | + Set the database instance collection interval (in seconds). The database instance collection sends + basic information about the database instance along with a signal that it still exists. + This collection does not involve any additional queries to the database. + value: + type: number + default: 300 + - name: aws + description: | + This block defines the configuration for AWS RDS and Aurora instances. + + Complete this section if you have installed the Datadog AWS Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: instance_endpoint + description: | + Equal to the Endpoint.Address of the instance the agent is connecting to. + This value is optional if the value of `server` is already configured to the instance endpoint. + + For more information on instance endpoints, + see the AWS docs https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_Endpoint.html + value: + type: string + example: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + - name: gcp + description: | + This block defines the configuration for Google Cloud SQL instances. + + Complete this section if you have installed the Datadog GCP Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: project_id + description: | + Equal to the GCP resource's project ID. + + For more information on project IDs, + see the GCP docs https://cloud.google.com/resource-manager/docs/creating-managing-projects + value: + type: string + example: foo-project + - name: instance_id + description: | + Equal to the GCP resource's instance ID. + + For more information on instance IDs, + see the GCP docs https://cloud.google.com/sql/docs/mysql/instance-settings#instance-id-2ndgen + value: + type: string + example: foo-database + - name: azure + description: | + This block defines the configuration for Azure Database for ClickHouse. + + Complete this section if you have installed the Datadog Azure Integration to enrich instances + with ClickHouse integration telemetry. + options: + - name: deployment_type + description: | + Equal to the deployment type for the managed database. + + For Azure, this is typically 'flexible_server' or 'single_server'. + value: + type: string + example: flexible_server + - name: fully_qualified_domain_name + description: | + Equal to the fully qualified domain name of the Azure database. + + This value is optional if the value of `server` is already configured to the fully qualified domain name. + value: + type: string + example: my-clickhouse.database.windows.net + - name: database_name + description: | + The database name for the Azure instance. + value: + type: string - template: instances/db overrides: custom_queries.value.example: diff --git a/clickhouse/changelog.d/21773.added b/clickhouse/changelog.d/21773.added new file mode 100644 index 0000000000000..5aa2d9b6ea2b5 --- /dev/null +++ b/clickhouse/changelog.d/21773.added @@ -0,0 +1 @@ +Add Database Monitoring (DBM) support with query sample collection from system.query_log \ No newline at end of file diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index e263da55fb8f4..cb4da5d7783bd 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -1,14 +1,27 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import json +from time import time + import clickhouse_connect +from cachetools import TTLCache from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative from datadog_checks.base.utils.db import QueryManager +from datadog_checks.base.utils.db.utils import default_json_event_encoding from . import queries +from .__about__ import __version__ +from .statement_samples import ClickhouseStatementSamples +from .statements import ClickhouseStatementMetrics from .utils import ErrorSanitizer +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + class ClickhouseCheck(AgentCheck): __NAMESPACE__ = 'clickhouse' @@ -30,6 +43,48 @@ def __init__(self, name, init_config, instances): self._verify = self.instance.get('verify', True) self._tags = self.instance.get('tags', []) + # DBM-related properties + self._resolved_hostname = None + self._database_identifier = None + self._agent_hostname = None + + # Cloud metadata configuration + self._cloud_metadata = {} + + # AWS cloud metadata + aws_config = self.instance.get('aws', {}) + if aws_config.get('instance_endpoint'): + self._cloud_metadata['aws'] = {'instance_endpoint': aws_config['instance_endpoint']} + + # GCP cloud metadata + gcp_config = self.instance.get('gcp', {}) + if gcp_config.get('project_id') and gcp_config.get('instance_id'): + self._cloud_metadata['gcp'] = { + 'project_id': gcp_config['project_id'], + 'instance_id': gcp_config['instance_id'], + } + + # Azure cloud metadata + azure_config = self.instance.get('azure', {}) + if azure_config.get('deployment_type') and azure_config.get('fully_qualified_domain_name'): + self._cloud_metadata['azure'] = { + 'deployment_type': azure_config['deployment_type'], + 'fully_qualified_domain_name': azure_config['fully_qualified_domain_name'], + } + if azure_config.get('database_name'): + self._cloud_metadata['azure']['database_name'] = azure_config['database_name'] + + # Database instance metadata collection interval + self._database_instance_collection_interval = float( + self.instance.get('database_instance_collection_interval', 300) + ) + + # _database_instance_emitted: limit the collection and transmission of the database instance metadata + self._database_instance_emitted = TTLCache( + maxsize=1, + ttl=self._database_instance_collection_interval, + ) + # Add global tags self._tags.append('server:{}'.format(self._server)) self._tags.append('port:{}'.format(self._port)) @@ -58,11 +113,104 @@ def __init__(self, name, init_config, instances): ) self.check_initializations.append(self._query_manager.compile_queries) + # Initialize DBM components if enabled + self._dbm_enabled = is_affirmative(self.instance.get('dbm', False)) + + # Initialize query metrics (from system.query_log - analogous to pg_stat_statements) + self._query_metrics_config = self.instance.get('query_metrics', {}) + if self._dbm_enabled and self._query_metrics_config.get('enabled', True): + # Create a simple config object for query metrics + class QueryMetricsConfig: + def __init__(self, config_dict): + self.enabled = config_dict.get('enabled', True) + self.collection_interval = config_dict.get('collection_interval', 60) + self.run_sync = config_dict.get('run_sync', False) + self.full_statement_text_cache_max_size = config_dict.get( + 'full_statement_text_cache_max_size', 10000 + ) + self.full_statement_text_samples_per_hour_per_query = config_dict.get( + 'full_statement_text_samples_per_hour_per_query', 1 + ) + + self.statement_metrics = ClickhouseStatementMetrics(self, QueryMetricsConfig(self._query_metrics_config)) + else: + self.statement_metrics = None + + # Initialize query samples (from system.processes - analogous to pg_stat_activity) + self._query_samples_config = self.instance.get('query_samples', {}) + if self._dbm_enabled and self._query_samples_config.get('enabled', True): + # Create a simple config object for statement samples + class QuerySamplesConfig: + def __init__(self, config_dict): + self.enabled = config_dict.get('enabled', True) + self.collection_interval = config_dict.get('collection_interval', 10) + self.run_sync = config_dict.get('run_sync', False) + self.samples_per_hour_per_query = config_dict.get('samples_per_hour_per_query', 15) + self.seen_samples_cache_maxsize = config_dict.get('seen_samples_cache_maxsize', 10000) + # Activity snapshot configuration + self.activity_enabled = config_dict.get('activity_enabled', True) + self.activity_collection_interval = config_dict.get('activity_collection_interval', 10) + self.activity_max_rows = config_dict.get('activity_max_rows', 1000) + + self.statement_samples = ClickhouseStatementSamples(self, QuerySamplesConfig(self._query_samples_config)) + else: + self.statement_samples = None + + def _send_database_instance_metadata(self): + """Send database instance metadata to the metadata intake.""" + if self.database_identifier not in self._database_instance_emitted: + # Get the version for the metadata + version = None + try: + version_result = list(self.execute_query_raw('SELECT version()'))[0][0] + version = version_result + except Exception as e: + self.log.debug("Unable to fetch version for metadata: %s", e) + version = "unknown" + + event = { + "host": self.reported_hostname, + "port": self._port, + "database_instance": self.database_identifier, + "database_hostname": self.reported_hostname, + "agent_version": datadog_agent.get_version(), + "ddagenthostname": self.agent_hostname, + "dbms": "clickhouse", + "kind": "database_instance", + "collection_interval": self._database_instance_collection_interval, + "dbms_version": version, + "integration_version": __version__, + "tags": [t for t in self._tags if not t.startswith('db:')], + "timestamp": time() * 1000, + "metadata": { + "dbm": self._dbm_enabled, + "connection_host": self._server, + }, + } + + # Add cloud metadata if available + if self._cloud_metadata: + event["cloud_metadata"] = self._cloud_metadata + + self._database_instance_emitted[self.database_identifier] = event + self.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + def check(self, _): self.connect() self._query_manager.execute() self.collect_version() + # Send database instance metadata + self._send_database_instance_metadata() + + # Run query metrics collection if DBM is enabled (from system.query_log) + if self.statement_metrics: + self.statement_metrics.run_job_loop(self._tags) + + # Run statement samples if DBM is enabled (from system.processes) + if self.statement_samples: + self.statement_samples.run_job_loop(self._tags) + @AgentCheck.metadata_entrypoint def collect_version(self): version = list(self.execute_query_raw('SELECT version()'))[0][0] @@ -75,6 +223,36 @@ def collect_version(self): def execute_query_raw(self, query): return self._client.query(query).result_rows + def _get_debug_tags(self): + """Return debug tags for metrics""" + return ['server:{}'.format(self._server)] + + @property + def reported_hostname(self): + """ + Get the hostname to be reported in metrics and events. + """ + if self._resolved_hostname is None: + self._resolved_hostname = self._server + return self._resolved_hostname + + @property + def agent_hostname(self): + """Get the agent hostname.""" + if self._agent_hostname is None: + self._agent_hostname = datadog_agent.get_hostname() + return self._agent_hostname + + @property + def database_identifier(self): + """ + Get a unique identifier for this database instance. + """ + if self._database_identifier is None: + # Create a unique identifier based on server, port, and database name + self._database_identifier = "{}:{}:{}".format(self._server, self._port, self._db) + return self._database_identifier + def validate_config(self): if not self._server: raise ConfigurationError('the `server` setting is required') @@ -132,3 +310,32 @@ def connect(self): else: self.service_check(self.SERVICE_CHECK_CONNECT, self.OK, tags=self._tags) self._client = client + + def create_dbm_client(self): + """ + Create a separate ClickHouse client for DBM async jobs. + This prevents concurrent query errors when multiple jobs run simultaneously. + """ + try: + client = clickhouse_connect.get_client( + host=self._server, + port=self._port, + username=self._user, + password=self._password, + database=self._db, + secure=self._tls_verify, + connect_timeout=self._connect_timeout, + send_receive_timeout=self._read_timeout, + client_name=f'datadog-dbm-{self.check_id}', + compress=self._compression, + ca_cert=self._tls_ca_cert, + verify=self._verify, + settings={}, + ) + return client + except Exception as e: + error = 'Unable to create DBM client: {}'.format( + self._error_sanitizer.clean(self._error_sanitizer.scrub(str(e))) + ) + self.log.warning(error) + raise diff --git a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py index cda1a5793bb1c..787db6fbe2ca4 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/defaults.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/defaults.py @@ -12,10 +12,18 @@ def instance_connect_timeout(): return 10 +def instance_database_instance_collection_interval(): + return 300 + + def instance_db(): return 'default' +def instance_dbm(): + return False + + def instance_disable_generic_tags(): return False diff --git a/clickhouse/datadog_checks/clickhouse/config_models/instance.py b/clickhouse/datadog_checks/clickhouse/config_models/instance.py index 6a6f674ab672e..8ada723f83c23 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/instance.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/instance.py @@ -20,6 +20,24 @@ from . import defaults, validators +class Aws(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + instance_endpoint: Optional[str] = None + + +class Azure(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + database_name: Optional[str] = None + deployment_type: Optional[str] = None + fully_qualified_domain_name: Optional[str] = None + + class CustomQuery(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -32,6 +50,15 @@ class CustomQuery(BaseModel): tags: Optional[tuple[str, ...]] = None +class Gcp(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + instance_id: Optional[str] = None + project_id: Optional[str] = None + + class MetricPatterns(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -41,25 +68,46 @@ class MetricPatterns(BaseModel): include: Optional[tuple[str, ...]] = None +class QuerySamples(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + activity_collection_interval: Optional[float] = None + activity_enabled: Optional[bool] = None + activity_max_rows: Optional[float] = None + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + run_sync: Optional[bool] = None + samples_per_hour_per_query: Optional[float] = None + seen_samples_cache_maxsize: Optional[float] = None + + class InstanceConfig(BaseModel): model_config = ConfigDict( validate_default=True, arbitrary_types_allowed=True, frozen=True, ) + aws: Optional[Aws] = None + azure: Optional[Azure] = None compression: Optional[str] = None connect_timeout: Optional[int] = None custom_queries: Optional[tuple[CustomQuery, ...]] = None + database_instance_collection_interval: Optional[float] = None db: Optional[str] = None + dbm: Optional[bool] = None disable_generic_tags: Optional[bool] = None empty_default_hostname: Optional[bool] = None + gcp: Optional[Gcp] = None metric_patterns: Optional[MetricPatterns] = None min_collection_interval: Optional[float] = None only_custom_queries: Optional[bool] = None password: Optional[str] = None port: Optional[int] = None + query_samples: Optional[QuerySamples] = None read_timeout: Optional[int] = None - server: str + server: Optional[str] = None service: Optional[str] = None tags: Optional[tuple[str, ...]] = None tls_ca_cert: Optional[str] = None diff --git a/clickhouse/datadog_checks/clickhouse/config_models/validators.py b/clickhouse/datadog_checks/clickhouse/config_models/validators.py index 1b99ebf855087..1d5891aedee8d 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/validators.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/validators.py @@ -3,11 +3,19 @@ # Licensed under a 3-clause BSD style license (see LICENSE) # Here you can include additional config validators or transformers -# -# def initialize_instance(values, **kwargs): -# if 'my_option' not in values and 'my_legacy_option' in values: -# values['my_option'] = values['my_legacy_option'] -# if values.get('my_number') > 10: -# raise ValueError('my_number max value is 10, got %s' % str(values.get('my_number'))) -# -# return values + + +def initialize_instance(values, **kwargs): + """ + Initialize and validate instance configuration. + Maps 'host' to 'server' for backwards compatibility. + """ + # Map 'host' to 'server' for backwards compatibility + if 'server' not in values and 'host' in values: + values['server'] = values['host'] + + # Validate that either server or host was provided + if 'server' not in values: + raise ValueError("Either 'server' or 'host' must be specified in the configuration") + + return values diff --git a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example index 2df777c607797..5d761f2724ff9 100644 --- a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example +++ b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example @@ -24,10 +24,11 @@ init_config: # instances: - ## @param server - string - required + - + ## @param server - string - optional ## The hostname used to connect to the system. # - - server: + # server: ## @param port - integer - optional - default: 9000 ## The port used to connect to the system. @@ -86,6 +87,129 @@ instances: # # verify: true + ## @param dbm - boolean - optional - default: false + ## Enable Database Monitoring (DBM) to collect query samples and execution plans. + ## This feature provides deep observability into query performance. + # + # dbm: false + + ## Configuration for collecting query samples when Database Monitoring (DBM) is enabled. + ## Query samples provide insights into the queries being executed on your ClickHouse instance. + # + # query_samples: + + ## @param enabled - boolean - optional - default: true + ## Enable collection of query samples. + # + # enabled: true + + ## @param collection_interval - number - optional - default: 10 + ## The interval in seconds between query sample collections. + ## Lower values provide more granular data but increase overhead. + # + # collection_interval: 10 + + ## @param samples_per_hour_per_query - number - optional - default: 15 + ## The maximum number of samples to collect per unique query signature per hour. + ## This helps limit the volume of data collected while still providing useful insights. + # + # samples_per_hour_per_query: 15 + + ## @param seen_samples_cache_maxsize - number - optional - default: 10000 + ## The maximum size of the cache used to track which query samples have been collected. + ## A larger cache can help avoid collecting duplicate samples. + # + # seen_samples_cache_maxsize: 10000 + + ## @param run_sync - boolean - optional - default: false + ## Whether to run query sample collection synchronously in the check run. + ## Set to false (default) to run asynchronously in a separate thread. + # + # run_sync: false + + ## @param activity_enabled - boolean - optional - default: true + ## Enable collection of database activity snapshots. + ## Activity snapshots capture currently executing queries and active connections. + # + # activity_enabled: true + + ## @param activity_collection_interval - number - optional - default: 10 + ## The interval in seconds between activity snapshot collections. + ## Lower values capture more activity data but increase overhead. + ## For fast ClickHouse queries, consider using 1-5 seconds. + # + # activity_collection_interval: 10 + + ## @param activity_max_rows - number - optional - default: 1000 + ## The maximum number of active sessions to include in each activity snapshot. + # + # activity_max_rows: 1000 + + ## This block defines the configuration for AWS RDS and Aurora instances. + ## + ## Complete this section if you have installed the Datadog AWS Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # aws: + + ## @param instance_endpoint - string - optional - default: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + ## Equal to the Endpoint.Address of the instance the agent is connecting to. + ## This value is optional if the value of `server` is already configured to the instance endpoint. + ## + ## For more information on instance endpoints, + ## see the AWS docs https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_Endpoint.html + # + # instance_endpoint: mydb.cfxgae8cilcf.us-east-1.rds.amazonaws.com + + ## This block defines the configuration for Google Cloud SQL instances. + ## + ## Complete this section if you have installed the Datadog GCP Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # gcp: + + ## @param project_id - string - optional - default: foo-project + ## Equal to the GCP resource's project ID. + ## + ## For more information on project IDs, + ## see the GCP docs https://cloud.google.com/resource-manager/docs/creating-managing-projects + # + # project_id: foo-project + + ## @param instance_id - string - optional - default: foo-database + ## Equal to the GCP resource's instance ID. + ## + ## For more information on instance IDs, + ## see the GCP docs https://cloud.google.com/sql/docs/mysql/instance-settings#instance-id-2ndgen + # + # instance_id: foo-database + + ## This block defines the configuration for Azure Database for ClickHouse. + ## + ## Complete this section if you have installed the Datadog Azure Integration to enrich instances + ## with ClickHouse integration telemetry. + # + # azure: + + ## @param deployment_type - string - optional - default: flexible_server + ## Equal to the deployment type for the managed database. + ## + ## For Azure, this is typically 'flexible_server' or 'single_server'. + # + # deployment_type: flexible_server + + ## @param fully_qualified_domain_name - string - optional - default: my-clickhouse.database.windows.net + ## Equal to the fully qualified domain name of the Azure database. + ## + ## This value is optional if the value of `server` is already configured to the fully qualified domain name. + # + # fully_qualified_domain_name: my-clickhouse.database.windows.net + + ## @param database_name - string - optional + ## The database name for the Azure instance. + # + # database_name: + ## @param only_custom_queries - boolean - optional - default: false ## Set this parameter to `true` if you want to skip the integration's default metrics collection. ## Only metrics specified in `custom_queries` will be collected. @@ -105,48 +229,51 @@ instances: ## Each query must have 2 fields, and can have a third optional field: ## ## 1. query - The SQL to execute. It can be a simple statement or a multi-line script. - ## Use the pipe `|` if you require a multi-line script. + ## Use the pipe `|` if you require a multi-line script. ## 2. columns - The list representing each column, ordered sequentially from left to right. - ## The number of columns must equal the number of columns returned in the query. - ## There are 2 required pieces of data: - ## 1. name - The suffix to append to `.` to form - ## the full metric name. If `type` is a `tag` type, this column is considered a tag and applied - ## to every metric collected by this particular query. - ## 2. type - The submission method (gauge, monotonic_count, etc.). - ## This can also be set to the following `tag` types to tag each metric in the row with the name - ## and value of the item in this column: - ## 1. tag - This is the default tag type - ## 2. tag_list - This allows multiple values to be attached to the tag name. For example: - ## ``` - ## query = { - ## "name": "example", - ## "query": "...", - ## "columns": [ - ## {"name": "server_tag", "type": "tag_list"}, - ## {"name": "foo", "type": "gauge"}, - ## ] - ## } - ## ``` - ## May result in: - ## ``` - ## gauge("foo", tags=["server_tag:us", "server_tag:primary", "server_tag:default"]) - ## gauge("foo", tags=["server_tag:eu"]) - ## gauge("foo", tags=["server_tag:eu", "server_tag:primary"]) - ## ``` - ## 3. tag_not_null - This only sets tags in the metric if the value is not null - ## You can use the `count` type to perform aggregation for queries that return multiple rows with - ## the same or no tags. - ## Columns without a name are ignored. To skip a column, enter: - ## ``` - ## - {} - ## ``` + ## The number of columns must equal the number of columns returned in the query. + ## There are 2 required pieces of data: + ## a. name - The suffix to append to `.` to form + ## the full metric name. If `type` is a `tag` type, this column is + ## considered a tag and applied to every + ## metric collected by this particular query. + ## b. type - The submission method (gauge, monotonic_count, etc.). + ## This can also be set to the following `tag` types to + ## tag each metric in the row with the name and value + ## of the item in this column: + ## i. tag - This is the default tag type + ## ii. tag_list - This allows multiple values to be attached + ## to the tag name. For example: + ## + ## query = { + ## "name": "example", + ## "query": "...", + ## "columns": [ + ## {"name": "server_tag", "type": "tag_list"}, + ## {"name": "foo", "type": "gauge"}, + ## ] + ## } + ## + ## May result in: + ## gauge("foo", tags=[ + ## "server_tag:us", + ## "server_tag:primary", + ## "server_tag:default" + ## ]) + ## gauge("foo", tags=["server_tag:eu"]) + ## gauge("foo", tags=["server_tag:eu", "server_tag:primary"]) + ## iii. tag_not_null - This only sets tags in the metric if the value is not null + ## You can use the `count` type to perform aggregation + ## for queries that return multiple rows with the same or no tags. + ## Columns without a name are ignored. To skip a column, enter: + ## - {} ## 3. tags (optional) - A list of tags to apply to each metric. ## 4. collection_interval (optional) - The frequency at which to collect the metrics. ## If collection_interval is not set, the query will be run every check run. - ## If the collection interval is less than check collection interval, the query will be run every check - ## run. - ## If the collection interval is greater than check collection interval, the query will NOT BE RUN - ## exactly at the collection interval. + ## If the collection interval is less than check collection interval, + ## the query will be run every check run. + ## If the collection interval is greater than check collection interval, + ## the query will NOT BE RUN exactly at the collection interval. ## The query will be run at the next check run after the collection interval has passed. ## 5. metric_prefix (optional) - The prefix to apply to each metric. # diff --git a/clickhouse/datadog_checks/clickhouse/statement_samples.py b/clickhouse/datadog_checks/clickhouse/statement_samples.py new file mode 100644 index 0000000000000..4f7d52ee9af90 --- /dev/null +++ b/clickhouse/datadog_checks/clickhouse/statement_samples.py @@ -0,0 +1,608 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from datadog_checks.clickhouse import ClickhouseCheck + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from datadog_checks.base.utils.common import to_native_string +from datadog_checks.base.utils.db.sql import compute_sql_signature +from datadog_checks.base.utils.db.utils import ( + DBMAsyncJob, + RateLimitingTTLCache, + default_json_event_encoding, + obfuscate_sql_with_metadata, +) +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method + +# Query to get currently running/active queries from system.processes +# This is the ClickHouse equivalent of Postgres pg_stat_activity +# Note: result_rows, result_bytes, query_start_time, query_start_time_microseconds +# don't exist in ClickHouse (as of 24.11), so they're excluded +ACTIVE_QUERIES_QUERY = """ +SELECT + elapsed, + query_id, + query, + user, + read_rows, + read_bytes, + written_rows, + written_bytes, + memory_usage, + initial_query_id, + initial_user, + query_kind, + is_initial_query, + peak_memory_usage, + total_rows_approx, + client_name, + client_version_major, + client_version_minor, + client_version_patch, + current_database, + thread_ids, + address, + port, + client_hostname, + is_cancelled, + http_user_agent +FROM system.processes +WHERE query NOT LIKE '%system.processes%' + AND query NOT LIKE '%system.query_log%' + AND query != '' +""" + +# Query to get active connections aggregated by user, database, etc +# Similar to Postgres PG_ACTIVE_CONNECTIONS_QUERY +ACTIVE_CONNECTIONS_QUERY = """ +SELECT + user, + query_kind, + current_database, + count(*) as connections +FROM system.processes +WHERE query NOT LIKE '%system.processes%' +GROUP BY user, query_kind, current_database +""" + +# Columns from system.processes which correspond to attributes common to all databases +# and are therefore stored under other standard keys +system_processes_sample_exclude_keys = { + # we process & obfuscate this separately + 'query', + # stored separately in standard db fields + 'user', + 'query_id', + 'current_database', # stored as db.instance +} + + +def agent_check_getter(self): + return self._check + + +class ClickhouseStatementSamples(DBMAsyncJob): + """ + Collects statement samples from ClickHouse active queries (system.processes). + Similar to Postgres integration using pg_stat_activity. + """ + + def __init__(self, check: ClickhouseCheck, config): + # Default collection interval if not specified + collection_interval = getattr(config, 'collection_interval', 10) + + super(ClickhouseStatementSamples, self).__init__( + check, + rate_limit=1 / collection_interval, + run_sync=getattr(config, 'run_sync', False), + enabled=getattr(config, 'enabled', True), + dbms="clickhouse", + min_collection_interval=check.check_interval if hasattr(check, 'check_interval') else 15, + expected_db_exceptions=(Exception,), + job_name="query-samples", + ) + self._check = check + self._config = config + self._tags_no_db = None + self.tags = None + + # Create a separate client for this DBM job to avoid concurrent query errors + self._db_client = None + + # Get obfuscator options from config if available + obfuscate_options = { + 'return_json_metadata': True, + 'collect_tables': True, + 'collect_commands': True, + 'collect_comments': True, + } + self._obfuscate_options = to_native_string(json.dumps(obfuscate_options)) + + # Rate limiters for query samples + self._seen_samples_ratelimiter = RateLimitingTTLCache( + maxsize=getattr(config, 'seen_samples_cache_maxsize', 10000), + ttl=60 * 60 / getattr(config, 'samples_per_hour_per_query', 15), + ) + + self._collection_interval = collection_interval + + # Activity snapshot collection configuration + self._activity_coll_enabled = getattr(config, 'activity_enabled', True) + self._activity_coll_interval = getattr(config, 'activity_collection_interval', 10) + self._activity_max_rows = getattr(config, 'activity_max_rows', 1000) + self._time_since_last_activity_event = 0 + + # Debug logging to verify config values + self._check.log.info( + "Activity config: enabled=%s, interval=%s, max_rows=%s", + self._activity_coll_enabled, + self._activity_coll_interval, + self._activity_max_rows, + ) + + def _dbtags(self, db, *extra_tags): + """ + Returns the default instance tags with the initial "db" tag replaced with the provided tag + """ + t = ["db:" + db] + if extra_tags: + t.extend(extra_tags) + if self._tags_no_db: + t.extend(self._tags_no_db) + return t + + def _get_debug_tags(self): + t = [] + if self._tags_no_db: + t.extend(self._tags_no_db) + return t + + def _report_check_hist_metrics(self, start_time, row_count, operation): + """ + Report histogram metrics for check operations + """ + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram( + "dd.clickhouse.statement_samples.{}.time".format(operation), + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + self._check.histogram( + "dd.clickhouse.statement_samples.{}.rows".format(operation), + row_count, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_active_queries(self): + """ + Fetch currently running queries from system.processes + This is analogous to Postgres querying pg_stat_activity + """ + start_time = time.time() + + try: + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(ACTIVE_QUERIES_QUERY) + rows = result.result_rows + + self._report_check_hist_metrics(start_time, len(rows), "get_active_queries") + self._log.debug("Loaded %s rows from system.processes", len(rows)) + + return rows + except Exception as e: + self._log.exception("Failed to collect active queries: %s", str(e)) + # Reset client on error to force reconnect + self._db_client = None + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:active-queries-fetch"] + self._get_debug_tags(), + raw=True, + ) + return [] + + def _normalize_active_query_row(self, row): + """ + Normalize a row from system.processes into a standard format + """ + try: + ( + elapsed, + query_id, + query, + user, + read_rows, + read_bytes, + written_rows, + written_bytes, + memory_usage, + initial_query_id, + initial_user, + query_kind, + is_initial_query, + peak_memory_usage, + total_rows_approx, + client_name, + client_version_major, + client_version_minor, + client_version_patch, + current_database, + thread_ids, + address, + port, + client_hostname, + is_cancelled, + http_user_agent, + ) = row + + normalized_row = { + # Original fields + 'elapsed': float(elapsed) if elapsed else 0, + 'query_id': str(query_id), + 'query': str(query), + 'user': str(user), + 'read_rows': int(read_rows) if read_rows else 0, + 'read_bytes': int(read_bytes) if read_bytes else 0, + 'written_rows': int(written_rows) if written_rows else 0, + 'written_bytes': int(written_bytes) if written_bytes else 0, + 'memory_usage': int(memory_usage) if memory_usage else 0, + 'initial_query_id': str(initial_query_id) if initial_query_id else None, + 'initial_user': str(initial_user) if initial_user else None, + 'query_kind': str(query_kind) if query_kind else None, + 'is_initial_query': bool(is_initial_query) if is_initial_query is not None else True, + # New fields + 'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0, + 'total_rows_approx': int(total_rows_approx) if total_rows_approx else 0, + 'client_name': str(client_name) if client_name else None, + 'client_version_major': int(client_version_major) if client_version_major else None, + 'client_version_minor': int(client_version_minor) if client_version_minor else None, + 'client_version_patch': int(client_version_patch) if client_version_patch else None, + 'current_database': str(current_database) if current_database else None, + 'thread_ids': list(thread_ids) if thread_ids else [], + 'address': str(address) if address else None, + 'port': int(port) if port else None, + 'client_hostname': str(client_hostname) if client_hostname else None, + 'is_cancelled': bool(is_cancelled) if is_cancelled is not None else False, + 'http_user_agent': str(http_user_agent) if http_user_agent else None, + } + + return self._obfuscate_and_normalize_query(normalized_row) + except Exception as e: + self._log.warning("Failed to normalize active query row: %s, row: %s", str(e), row) + raise + + def _obfuscate_and_normalize_query(self, row): + """ + Obfuscate the query and compute its signature + """ + obfuscated_query = None + try: + statement = obfuscate_sql_with_metadata(row['query'], self._obfuscate_options) + obfuscated_query = statement['query'] + metadata = statement['metadata'] + + row['statement'] = obfuscated_query + row['dd_tables'] = metadata.get('tables', None) + row['dd_commands'] = metadata.get('commands', None) + row['dd_comments'] = metadata.get('comments', None) + + # Compute query signature + row['query_signature'] = compute_sql_signature(obfuscated_query) + + except Exception as e: + self._log.warning("Failed to obfuscate query: %s, query: %s", str(e), row.get('query', '')[:100]) + # On obfuscation error, we still want to emit the row + row['statement'] = None + row['query_signature'] = compute_sql_signature(row['query']) + row['dd_tables'] = None + row['dd_commands'] = None + row['dd_comments'] = None + + return row + + def _filter_and_normalize_statement_rows(self, rows): + """ + Filter and normalize rows from system.processes + """ + normalized_rows = [] + for row in rows: + try: + normalized_row = self._normalize_active_query_row(row) + if normalized_row and normalized_row.get('statement'): + normalized_rows.append(normalized_row) + except Exception as e: + self._log.debug("Failed to normalize row: %s", e) + + return normalized_rows + + def _get_active_connections(self): + """ + Get aggregated active connection counts from system.processes + Similar to Postgres _get_active_connections from pg_stat_activity + """ + try: + start_time = time.time() + + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(ACTIVE_CONNECTIONS_QUERY) + rows = result.result_rows + + elapsed_ms = (time.time() - start_time) * 1000 + self._log.debug("Retrieved %s connection aggregation rows in %.2f ms", len(rows), elapsed_ms) + + # Convert to list of dicts + connections = [] + for row in rows: + connections.append( + { + 'user': row[0], + 'query_kind': row[1], + 'current_database': row[2], + 'connections': row[3], + } + ) + + return connections + + except Exception as e: + self._log.warning("Failed to get active connections: %s", e) + # Reset client on error to force reconnect + self._db_client = None + return [] + + def _to_active_session(self, row): + """ + Convert a system.processes row to an active session + Similar to Postgres _to_active_session + """ + # Only include rows with successfully obfuscated statements + if not row.get('statement'): + return None + + # Remove null values and the raw query + active_row = {key: val for key, val in row.items() if val is not None and key != 'query'} + return active_row + + def _create_active_sessions(self, rows): + """ + Create active sessions from system.processes rows + Similar to Postgres _create_active_sessions + """ + active_sessions_count = 0 + for row in rows: + active_row = self._to_active_session(row) + if active_row: + active_sessions_count += 1 + yield active_row + if active_sessions_count >= self._activity_max_rows: + break + + def _create_activity_event(self, rows, active_connections): + """ + Create a database monitoring activity event + Similar to Postgres _create_activity_event + """ + self._time_since_last_activity_event = time.time() + active_sessions = [] + + for row in self._create_active_sessions(rows): + active_sessions.append(row) + + event = { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "dbm_type": "activity", + "collection_interval": self._activity_coll_interval, + "ddtags": self._tags_no_db, + "timestamp": time.time() * 1000, + "cloud_metadata": getattr(self._check, 'cloud_metadata', {}), + 'service': getattr(self._config, 'service', None), + "clickhouse_activity": active_sessions, + "clickhouse_connections": active_connections, + } + return event + + def _report_activity_event(self): + """ + Check if we should report an activity event based on collection interval + Similar to Postgres _report_activity_event + """ + elapsed_s = time.time() - self._time_since_last_activity_event + if elapsed_s < self._activity_coll_interval or not self._activity_coll_enabled: + return False + return True + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_statement_samples(self): + """ + Main method to collect and submit statement samples from active queries + Similar to Postgres _collect_statement_samples + """ + start_time = time.time() + + # Get active queries from system.processes + rows = self._get_active_queries() + + self._log.info("Retrieved %s active queries for processing", len(rows)) + + # Normalize and filter rows + rows = self._filter_and_normalize_statement_rows(rows) + + submitted_count = 0 + skipped_count = 0 + error_count = 0 + + for row in rows: + try: + # Check if we should submit this sample based on rate limiting + query_signature = row.get('query_signature') + if not query_signature: + self._log.debug("Skipping row without query signature") + skipped_count += 1 + continue + + if not self._seen_samples_ratelimiter.acquire(query_signature): + skipped_count += 1 + continue + + # Create the event payload + event = self._create_sample_event(row) + + # Log the event payload for debugging + self._log.debug( + "Query sample event payload: ddsource=%s, query_signature=%s", + event.get('ddsource'), + query_signature[:50] if query_signature else 'N/A', + ) + + # Submit the event + event_json = json.dumps(event, default=default_json_event_encoding) + self._check.database_monitoring_query_sample(event_json) + submitted_count += 1 + self._log.debug("Submitted query sample for signature: %s", query_signature[:50]) + + except Exception as e: + error_count += 1 + self._log.exception("Error processing active query row: %s", e) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:process-row"] + self._get_debug_tags(), + raw=True, + ) + + elapsed_ms = (time.time() - start_time) * 1000 + + self._log.info( + "Statement sample collection complete: submitted=%s, skipped=%s, errors=%s, elapsed_ms=%.2f", + submitted_count, + skipped_count, + error_count, + elapsed_ms, + ) + + # Report cache size metrics + self._check.gauge( + "dd.clickhouse.collect_statement_samples.seen_samples_cache.len", + len(self._seen_samples_ratelimiter), + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + def _create_sample_event(self, row): + """ + Create a database monitoring query sample event (plan type) + Format follows Postgres integration pattern + This represents currently executing queries from system.processes + """ + # Use current_database from the query if available, fallback to check's default db + db = row.get('current_database') or self._check._db + + event = { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "dbm_type": "plan", # Using "plan" type for query samples + "ddtags": ",".join(self._dbtags(db)), + "timestamp": int(time.time() * 1000), + "db": { + "instance": db, + "query_signature": row.get('query_signature'), + "statement": row.get('statement'), + "user": row.get('user'), + "metadata": { + "tables": row.get('dd_tables'), + "commands": row.get('dd_commands'), + "comments": row.get('dd_comments'), + }, + }, + "clickhouse": {k: v for k, v in row.items() if k not in system_processes_sample_exclude_keys}, + } + + # Add duration if available (elapsed time in seconds, convert to nanoseconds) + if row.get('elapsed'): + event['duration'] = int(row['elapsed'] * 1e9) + + return event + + def run_job(self): + """ + Main job execution method called by DBMAsyncJob + """ + # Filter out internal tags + self.tags = [t for t in self._check._tags if not t.startswith('dd.internal')] + self._tags_no_db = [t for t in self.tags if not t.startswith('db:')] + + # Check if we should collect activity snapshots + collect_activity = self._report_activity_event() + + # Always collect statement samples + self._collect_statement_samples() + + # Collect and submit activity event if it's time + if collect_activity: + try: + start_time = time.time() + + # Get active queries for activity snapshot + rows = self._get_active_queries() + self._log.info("DEBUG: Retrieved %s raw rows from system.processes", len(rows)) + rows = self._filter_and_normalize_statement_rows(rows) + self._log.info("DEBUG: After filtering/normalization: %s rows", len(rows)) + + # Get active connections aggregation + active_connections = self._get_active_connections() + + # Create and submit activity event + activity_event = self._create_activity_event(rows, active_connections) + self._log.info( + "DEBUG: Activity event has %s sessions", len(activity_event.get('clickhouse_activity', [])) + ) + self._check.database_monitoring_query_activity( + json.dumps(activity_event, default=default_json_event_encoding) + ) + + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram( + "dd.clickhouse.collect_activity_snapshot.time", + elapsed_ms, + tags=self.tags + self._get_debug_tags(), + raw=True, + ) + + self._log.info( + "Activity snapshot collected and submitted: sessions=%s, connections=%s, elapsed_ms=%.2f", + len(activity_event.get('clickhouse_activity', [])), + len(activity_event.get('clickhouse_connections', [])), + elapsed_ms, + ) + + except Exception as e: + self._log.exception("Failed to collect activity snapshot: %s", e) + self._check.count( + "dd.clickhouse.statement_samples.error", + 1, + tags=self.tags + ["error:collect-activity-snapshot"] + self._get_debug_tags(), + raw=True, + ) diff --git a/clickhouse/datadog_checks/clickhouse/statements.py b/clickhouse/datadog_checks/clickhouse/statements.py new file mode 100644 index 0000000000000..5c3a73d1e011d --- /dev/null +++ b/clickhouse/datadog_checks/clickhouse/statements.py @@ -0,0 +1,426 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import copy +import time +from typing import TYPE_CHECKING + +from cachetools import TTLCache + +if TYPE_CHECKING: + from datadog_checks.clickhouse import ClickhouseCheck + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from datadog_checks.base.utils.common import to_native_string +from datadog_checks.base.utils.db.sql import compute_sql_signature +from datadog_checks.base.utils.db.statement_metrics import StatementMetrics +from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding, obfuscate_sql_with_metadata +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method + +# Query to fetch aggregated metrics from system.query_log +# This is the ClickHouse equivalent of Postgres pg_stat_statements +# Note: We collect count() and sum() metrics which are treated as cumulative counters +# and then compute derivatives. We no longer collect avg/min/max/percentile as they +# don't make sense after derivative calculation (mean_time is computed from total_time/count). +STATEMENTS_QUERY = """ +SELECT + normalized_query_hash, + any(query) as query_text, + any(user) as user, + any(type) as query_type, + any(databases) as databases, + any(tables) as tables, + count() as execution_count, + sum(query_duration_ms) as total_duration_ms, + sum(read_rows) as total_read_rows, + sum(read_bytes) as total_read_bytes, + sum(written_rows) as total_written_rows, + sum(written_bytes) as total_written_bytes, + sum(result_rows) as total_result_rows, + sum(result_bytes) as total_result_bytes, + sum(memory_usage) as total_memory_usage, + max(memory_usage) as peak_memory_usage +FROM system.query_log +WHERE event_time >= now() - INTERVAL {collection_interval} SECOND + AND type IN ('QueryFinish', 'ExceptionWhileProcessing') + AND query NOT LIKE '%system.query_log%' + AND query NOT LIKE '%system.processes%' + AND query NOT LIKE '/* DDIGNORE */%' + AND query != '' + AND normalized_query_hash != 0 +GROUP BY normalized_query_hash +ORDER BY total_duration_ms DESC +LIMIT 10000 +""" + + +def agent_check_getter(self): + return self._check + + +def _row_key(row): + """ + :param row: a normalized row from system.query_log + :return: a tuple uniquely identifying this row + """ + return row['query_signature'], row.get('user', ''), row.get('databases', '') + + +class ClickhouseStatementMetrics(DBMAsyncJob): + """Collects telemetry for SQL statements from system.query_log""" + + def __init__(self, check: ClickhouseCheck, config): + collection_interval = float(getattr(config, 'collection_interval', 60)) + super(ClickhouseStatementMetrics, self).__init__( + check, + run_sync=getattr(config, 'run_sync', False), + enabled=getattr(config, 'enabled', True), + expected_db_exceptions=(Exception,), + min_collection_interval=check.check_interval if hasattr(check, 'check_interval') else 15, + dbms="clickhouse", + rate_limit=1 / float(collection_interval), + job_name="query-metrics", + ) + self._check = check + self._metrics_collection_interval = collection_interval + self._config = config + self._tags_no_db = None + self.tags = None + self._state = StatementMetrics() + + # Create a separate client for this DBM job to avoid concurrent query errors + self._db_client = None + + # Obfuscator options + obfuscate_options = { + 'return_json_metadata': True, + 'collect_tables': True, + 'collect_commands': True, + 'collect_comments': True, + } + self._obfuscate_options = to_native_string(json.dumps(obfuscate_options)) + + # full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature + self._full_statement_text_cache = TTLCache( + maxsize=getattr(config, 'full_statement_text_cache_max_size', 10000), + ttl=60 * 60 / getattr(config, 'full_statement_text_samples_per_hour_per_query', 1), + ) + + def _execute_query(self, query): + """Execute a query and return the results using the dedicated client""" + if self._cancel_event.is_set(): + raise Exception("Job loop cancelled. Aborting query.") + try: + # Use the dedicated client for this job + if self._db_client is None: + self._db_client = self._check.create_dbm_client() + result = self._db_client.query(query) + return result.result_rows + except Exception as e: + self._log.warning("Failed to run query: %s", e) + # Reset client on error to force reconnect + self._db_client = None + raise e + + def run_job(self): + # do not emit any dd.internal metrics for DBM specific check code + self.tags = [t for t in self._tags if not t.startswith('dd.internal')] + self._tags_no_db = [t for t in self.tags if not t.startswith('db:')] + self.collect_per_statement_metrics() + + @tracked_method(agent_check_getter=agent_check_getter) + def collect_per_statement_metrics(self): + """ + Collect per-statement metrics from system.query_log and emit as: + 1. FQT events (dbm_type: fqt) - for query text catalog + 2. Query metrics payload - for time-series metrics + """ + try: + rows = self._collect_metrics_rows() + if not rows: + return + + # Emit FQT (Full Query Text) events + for event in self._rows_to_fqt_events(rows): + self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding)) + + # Prepare metrics payload wrapper + payload_wrapper = { + 'host': self._check.reported_hostname, + 'database_instance': self._check.database_identifier, + 'timestamp': time.time() * 1000, + 'min_collection_interval': self._metrics_collection_interval, + 'tags': self._tags_no_db, + 'ddagentversion': datadog_agent.get_version(), + 'clickhouse_version': self._get_clickhouse_version(), + } + + # Get query metrics payloads (may be split into multiple if too large) + payloads = self._get_query_metrics_payloads(payload_wrapper, rows) + + for payload in payloads: + payload_data = json.loads(payload) + num_rows = len(payload_data.get('clickhouse_rows', [])) + self._log.info( + "Submitting query metrics payload: %d bytes, %d rows, database_instance=%s", + len(payload), + num_rows, + payload_data.get('database_instance', 'MISSING'), + ) + self._check.database_monitoring_query_metrics(payload) + + except Exception: + self._log.exception('Unable to collect statement metrics due to an error') + return [] + + def _get_clickhouse_version(self): + """Get ClickHouse version string""" + try: + version_rows = self._check.execute_query_raw('SELECT version()') + if version_rows: + return str(version_rows[0][0]) + except Exception: + pass + return 'unknown' + + def _get_query_metrics_payloads(self, payload_wrapper, rows): + """ + Split rows into multiple payloads if needed to avoid exceeding size limits + """ + payloads = [] + max_size = 5 * 1024 * 1024 # 5MB limit + + queue = [rows] + while queue: + current = queue.pop() + if len(current) == 0: + continue + + payload = copy.deepcopy(payload_wrapper) + payload["clickhouse_rows"] = current + serialized_payload = json.dumps(payload, default=default_json_event_encoding) + size = len(serialized_payload) + + if size < max_size: + payloads.append(serialized_payload) + else: + if len(current) == 1: + self._log.warning( + "A single query is too large to send to Datadog. This query will be dropped. size=%d", + size, + ) + continue + mid = len(current) // 2 + queue.append(current[:mid]) + queue.append(current[mid:]) + + return payloads + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _load_query_log_statements(self): + """ + Load aggregated query metrics from system.query_log + This is analogous to Postgres loading from pg_stat_statements + """ + try: + query = STATEMENTS_QUERY.format(collection_interval=int(self._metrics_collection_interval)) + rows = self._execute_query(query) + + self._log.debug("Loaded %s rows from system.query_log", len(rows)) + + # Convert to list of dicts + result_rows = [] + for row in rows: + ( + normalized_query_hash, + query_text, + user, + query_type, + databases, + tables, + execution_count, + total_duration_ms, + total_read_rows, + total_read_bytes, + total_written_rows, + total_written_bytes, + total_result_rows, + total_result_bytes, + total_memory_usage, + peak_memory_usage, + ) = row + + result_rows.append( + { + 'normalized_query_hash': str(normalized_query_hash), + 'query': str(query_text) if query_text else '', + 'user': str(user) if user else '', + 'query_type': str(query_type) if query_type else '', + 'databases': str(databases[0]) if databases and len(databases) > 0 else '', + 'tables': tables if tables else [], + 'count': int(execution_count) if execution_count else 0, + 'total_time': float(total_duration_ms) if total_duration_ms else 0.0, + # Note: mean_time will be calculated after derivative calculation as total_time / count + # min_time, max_time, p95_time are not included because they are aggregates that + # don't make sense after taking derivatives + 'rows': int(total_result_rows) if total_result_rows else 0, + 'read_rows': int(total_read_rows) if total_read_rows else 0, + 'read_bytes': int(total_read_bytes) if total_read_bytes else 0, + 'written_rows': int(total_written_rows) if total_written_rows else 0, + 'written_bytes': int(total_written_bytes) if total_written_bytes else 0, + 'result_bytes': int(total_result_bytes) if total_result_bytes else 0, + 'memory_usage': int(total_memory_usage) if total_memory_usage else 0, + 'peak_memory_usage': int(peak_memory_usage) if peak_memory_usage else 0, + } + ) + + return result_rows + + except Exception as e: + self._log.exception("Failed to load statements from system.query_log: %s", e) + self._check.count( + "dd.clickhouse.statement_metrics.error", + 1, + tags=self.tags + ["error:query_log_load_failed"], + raw=True, + ) + return [] + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _collect_metrics_rows(self): + """ + Collect and normalize query metrics rows + """ + rows = self._load_query_log_statements() + if not rows: + return [] + + # Normalize queries (obfuscate SQL text) + rows = self._normalize_queries(rows) + + if not rows: + return [] + + # Get available metric columns + # Note: We only include counter metrics (count, totals) in derivative calculation. + # Aggregated metrics like mean_time, min_time, max_time, p95_time are excluded + # because taking derivatives of averages/percentiles is mathematically incorrect. + available_columns = set(rows[0].keys()) + metric_columns = available_columns & { + 'count', + 'total_time', + 'rows', + 'read_rows', + 'read_bytes', + 'written_rows', + 'written_bytes', + 'result_bytes', + 'memory_usage', + 'peak_memory_usage', + } + + # Compute derivative rows (calculate deltas since last collection) + rows_before = len(rows) + rows = self._state.compute_derivative_rows(rows, metric_columns, key=_row_key, execution_indicators=['count']) + rows_after = len(rows) + + # Calculate mean_time from derivative values (total_time / count) + # This follows the same pattern as Postgres, MySQL, and SQL Server + for row in rows: + if row.get('count', 0) > 0: + row['mean_time'] = row.get('total_time', 0.0) / row['count'] + else: + row['mean_time'] = 0.0 + + self._log.info( + "Query metrics: loaded=%d rows, after_derivative=%d rows (filtered=%d)", + rows_before, + rows_after, + rows_before - rows_after, + ) + + self._check.gauge( + 'dd.clickhouse.queries.query_rows_raw', + len(rows), + tags=self.tags + self._check._get_debug_tags(), + raw=True, + ) + + return rows + + def _normalize_queries(self, rows): + """ + Normalize and obfuscate queries + """ + normalized_rows = [] + for row in rows: + normalized_row = dict(copy.copy(row)) + try: + query_text = row['query'] + statement = obfuscate_sql_with_metadata(query_text, self._obfuscate_options) + except Exception as e: + self._log.debug("Failed to obfuscate query | err=[%s]", e) + continue + + obfuscated_query = statement['query'] + normalized_row['query'] = obfuscated_query + normalized_row['query_signature'] = compute_sql_signature(obfuscated_query) + + metadata = statement['metadata'] + normalized_row['dd_tables'] = metadata.get('tables', None) + normalized_row['dd_commands'] = metadata.get('commands', None) + normalized_row['dd_comments'] = metadata.get('comments', None) + normalized_rows.append(normalized_row) + + return normalized_rows + + def _rows_to_fqt_events(self, rows): + """ + Generate FQT (Full Query Text) events for each unique query signature + These events provide the mapping from query_signature to actual SQL text + dbm_type: fqt + """ + for row in rows: + query_cache_key = _row_key(row) + if query_cache_key in self._full_statement_text_cache: + continue + self._full_statement_text_cache[query_cache_key] = True + + db = row.get('databases', 'default') + user = row.get('user', 'default') + + row_tags = self._tags_no_db + [ + "db:{}".format(db), + "user:{}".format(user), + ] + + yield { + "timestamp": time.time() * 1000, + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "ddagentversion": datadog_agent.get_version(), + "ddsource": "clickhouse", + "ddtags": ",".join(row_tags), + "dbm_type": "fqt", + "db": { + "instance": db, + "query_signature": row['query_signature'], + "statement": row['query'], + "metadata": { + "tables": row['dd_tables'], + "commands": row['dd_commands'], + "comments": row['dd_comments'], + }, + }, + "clickhouse": { + "user": user, + "normalized_query_hash": row.get('normalized_query_hash'), + }, + } diff --git a/clickhouse/tests/test_clickhouse.py b/clickhouse/tests/test_clickhouse.py index 69dfbf2d01a06..6aa234751141e 100644 --- a/clickhouse/tests/test_clickhouse.py +++ b/clickhouse/tests/test_clickhouse.py @@ -73,3 +73,51 @@ def test_version_metadata(instance, datadog_agent, dd_run_check): datadog_agent.assert_metadata( 'test:123', {'version.scheme': 'calver', 'version.year': CLICKHOUSE_VERSION.split(".")[0]} ) + + +def test_database_instance_metadata(aggregator, instance, datadog_agent, dd_run_check): + """Test that database_instance metadata is sent correctly.""" + check = ClickhouseCheck('clickhouse', {}, [instance]) + check.check_id = 'test:456' + dd_run_check(check) + + # Get database monitoring metadata events + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + + # Find the database_instance event + event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None) + + assert event is not None, "database_instance metadata event should be sent" + assert event['dbms'] == 'clickhouse' + assert event['kind'] == 'database_instance' + assert event['database_instance'] == check.database_identifier + assert event['collection_interval'] == 300 + assert 'metadata' in event + assert 'dbm' in event['metadata'] + assert 'connection_host' in event['metadata'] + assert event['metadata']['connection_host'] == instance['server'] + + +def test_database_instance_metadata_with_cloud_metadata(aggregator, instance, datadog_agent, dd_run_check): + """Test that database_instance metadata includes cloud metadata when configured.""" + instance = instance.copy() + instance['aws'] = {'instance_endpoint': 'my-clickhouse.us-east-1.rds.amazonaws.com'} + instance['gcp'] = {'project_id': 'my-project', 'instance_id': 'my-instance'} + + check = ClickhouseCheck('clickhouse', {}, [instance]) + check.check_id = 'test:789' + dd_run_check(check) + + # Get database monitoring metadata events + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + + # Find the database_instance event + event = next((e for e in dbm_metadata if e['kind'] == 'database_instance'), None) + + assert event is not None + assert 'cloud_metadata' in event + assert 'aws' in event['cloud_metadata'] + assert event['cloud_metadata']['aws']['instance_endpoint'] == 'my-clickhouse.us-east-1.rds.amazonaws.com' + assert 'gcp' in event['cloud_metadata'] + assert event['cloud_metadata']['gcp']['project_id'] == 'my-project' + assert event['cloud_metadata']['gcp']['instance_id'] == 'my-instance' diff --git a/clickhouse/tests/test_dbm_integration.py b/clickhouse/tests/test_dbm_integration.py new file mode 100644 index 0000000000000..fa498cfc63583 --- /dev/null +++ b/clickhouse/tests/test_dbm_integration.py @@ -0,0 +1,269 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import time +from copy import deepcopy + +import clickhouse_connect +import pytest + +from datadog_checks.clickhouse import ClickhouseCheck + + +@pytest.mark.integration +@pytest.mark.usefixtures('dd_environment') +class TestDBMIntegration: + """Integration tests for Database Monitoring (DBM) query samples""" + + def test_query_samples_are_collected(self, aggregator, instance): + """ + Test that query samples are actually collected and submitted when DBM is enabled + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + 'collection_interval': 1, # Collect every second for testing + 'samples_per_hour_per_query': 100, # Allow many samples for testing + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # First, generate some queries in ClickHouse to populate query_log + client = clickhouse_connect.get_client( + host=instance_config['server'], + port=instance_config['port'], + username=instance_config['username'], + password=instance_config['password'], + ) + + # Run several different queries to populate query_log + test_queries = [ + "SELECT 1", + "SELECT count(*) FROM system.tables", + "SELECT name, engine FROM system.databases", + "SELECT version()", + "SELECT now()", + ] + + for query in test_queries: + try: + client.query(query) + except Exception as e: + print(f"Query '{query}' failed: {e}") + + # Wait a moment for queries to appear in query_log + time.sleep(2) + + # Verify there are queries in query_log + result = client.query("SELECT count(*) FROM system.query_log WHERE event_time > now() - INTERVAL 1 MINUTE") + query_log_count = result.result_rows[0][0] + print(f"Found {query_log_count} queries in query_log") + + # Run the check - this should collect samples + check.check(None) + + # Wait for async job to complete if running async + if check.statement_samples and hasattr(check.statement_samples, '_job_loop_future'): + if check.statement_samples._job_loop_future: + check.statement_samples._job_loop_future.result(timeout=5) + + # Run check again to ensure we collect samples + time.sleep(1) + check.check(None) + + # Verify metrics are being reported + aggregator.assert_metric('dd.clickhouse.collect_statement_samples.events_submitted.count') + aggregator.assert_metric('dd.clickhouse.get_query_log_samples.rows') + + # Get the count of submitted events + events_submitted = aggregator.metrics('dd.clickhouse.collect_statement_samples.events_submitted.count') + if events_submitted: + total_submitted = sum(m.value for m in events_submitted) + print(f"Total query samples submitted: {total_submitted}") + assert total_submitted > 0, "Expected at least one query sample to be submitted" + + def test_query_samples_disabled(self, aggregator, instance): + """ + Test that query samples are NOT collected when DBM is disabled + """ + # Configure instance with DBM disabled + instance_config = deepcopy(instance) + instance_config['dbm'] = False + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Verify statement_samples is None + assert check.statement_samples is None, "statement_samples should be None when DBM is disabled" + + # Run the check + check.check(None) + + # Verify no DBM metrics are reported + assert not aggregator.metrics('dd.clickhouse.collect_statement_samples.events_submitted.count'), ( + "No DBM metrics should be reported when DBM is disabled" + ) + + def test_query_samples_with_activity(self, aggregator, instance, dd_run_check): + """ + Test that query samples capture actual query activity with details + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + 'collection_interval': 1, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Connect and run a distinctive query + client = clickhouse_connect.get_client( + host=instance_config['server'], + port=instance_config['port'], + username=instance_config['username'], + password=instance_config['password'], + ) + + # Run a query that will be easy to identify + distinctive_query = "SELECT 'DBM_TEST_QUERY' as test_column, count(*) FROM system.tables" + client.query(distinctive_query) + + # Wait for query to appear in query_log + time.sleep(2) + + # Verify the query is in query_log + result = client.query(""" + SELECT count(*) + FROM system.query_log + WHERE query LIKE '%DBM_TEST_QUERY%' + AND event_time > now() - INTERVAL 1 MINUTE + """) + assert result.result_rows[0][0] > 0, "Distinctive query should be in query_log" + + # Run the check + dd_run_check(check) + + # Wait for async processing + time.sleep(2) + + # Run check again + dd_run_check(check) + + # Verify we collected some rows + rows_metric = aggregator.metrics('dd.clickhouse.get_query_log_samples.rows') + if rows_metric: + total_rows = sum(m.value for m in rows_metric) + print(f"Total rows collected from query_log: {total_rows}") + assert total_rows > 0, "Should have collected rows from query_log" + + def test_query_samples_properties(self, instance): + """ + Test that required DBM properties are correctly set on the check + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Verify DBM properties exist + assert hasattr(check, 'reported_hostname'), "Check should have reported_hostname property" + assert hasattr(check, 'database_identifier'), "Check should have database_identifier property" + + # Verify properties return expected values + hostname = check.reported_hostname + db_id = check.database_identifier + + assert hostname is not None, "reported_hostname should not be None" + assert db_id is not None, "database_identifier should not be None" + assert check._server in hostname, "hostname should contain server name" + assert str(check._port) in db_id, "database_identifier should contain port" + assert check._db in db_id, "database_identifier should contain database name" + + print(f"reported_hostname: {hostname}") + print(f"database_identifier: {db_id}") + + def test_statement_samples_event_structure(self, instance): + """ + Test that the event structure for query samples is correct + """ + # Configure instance with DBM enabled + instance_config = deepcopy(instance) + instance_config['dbm'] = True + instance_config['query_samples'] = { + 'enabled': True, + } + + # Create check + check = ClickhouseCheck('clickhouse', {}, [instance_config]) + + # Create a mock row to test event creation + mock_row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id', + 'query': 'SELECT * FROM system.tables WHERE name = ?', + 'statement': 'SELECT * FROM system.tables WHERE name = ?', + 'query_signature': 'test-signature', + 'type': 'QueryFinish', + 'user': 'datadog', + 'duration_ms': 100, + 'read_rows': 10, + 'read_bytes': 1024, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 10, + 'result_bytes': 1024, + 'memory_usage': 2048, + 'exception': None, + 'dd_tables': ['system.tables'], + 'dd_commands': ['SELECT'], + 'dd_comments': [], + } + + # Create event + event = check.statement_samples._create_sample_event(mock_row) + + # Verify event structure + assert 'host' in event, "Event should have host field" + assert 'database_instance' in event, "Event should have database_instance field" + assert 'ddagentversion' in event, "Event should have ddagentversion field" + assert 'ddsource' in event, "Event should have ddsource field" + assert event['ddsource'] == 'clickhouse', "ddsource should be clickhouse" + assert 'dbm_type' in event, "Event should have dbm_type field" + assert event['dbm_type'] == 'plan', "dbm_type should be plan" + assert 'timestamp' in event, "Event should have timestamp field" + assert 'db' in event, "Event should have db field" + assert 'clickhouse' in event, "Event should have clickhouse field" + + # Verify db section + db_section = event['db'] + assert 'instance' in db_section + assert 'query_signature' in db_section + assert 'statement' in db_section + assert 'user' in db_section + assert 'metadata' in db_section + + # Verify clickhouse section + ch_section = event['clickhouse'] + assert 'query_id' in ch_section + assert 'type' in ch_section + assert 'duration_ms' in ch_section + assert 'read_rows' in ch_section + assert 'memory_usage' in ch_section + + # Verify duration is in nanoseconds + assert 'duration' in event + assert event['duration'] == 100 * 1e6 # 100ms in nanoseconds + + print("Event structure is valid!") + print(f"Event keys: {list(event.keys())}") diff --git a/clickhouse/tests/test_statement_samples.py b/clickhouse/tests/test_statement_samples.py new file mode 100644 index 0000000000000..4d43e5fd5f514 --- /dev/null +++ b/clickhouse/tests/test_statement_samples.py @@ -0,0 +1,372 @@ +# (C) Datadog, Inc. 2025-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import time +from unittest import mock + +import pytest + +from datadog_checks.clickhouse import ClickhouseCheck +from datadog_checks.clickhouse.statement_samples import ClickhouseStatementSamples + + +@pytest.fixture +def instance_with_dbm(): + """Return a ClickHouse instance configuration with DBM enabled""" + return { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': True, + 'query_samples': { + 'enabled': True, + 'collection_interval': 10, + 'samples_per_hour_per_query': 15, + 'seen_samples_cache_maxsize': 10000, + 'run_sync': False, + }, + 'tags': ['test:clickhouse'], + } + + +@pytest.fixture +def check_with_dbm(instance_with_dbm): + """Return a ClickHouse check instance with DBM enabled""" + check = ClickhouseCheck('clickhouse', {}, [instance_with_dbm]) + return check + + +def test_statement_samples_initialization(check_with_dbm): + """Test that statement samples are properly initialized when DBM is enabled""" + assert check_with_dbm.statement_samples is not None + assert isinstance(check_with_dbm.statement_samples, ClickhouseStatementSamples) + assert check_with_dbm.statement_samples._config.enabled is True + assert check_with_dbm.statement_samples._config.collection_interval == 10 + + +def test_statement_samples_disabled(): + """Test that statement samples are not initialized when DBM is disabled""" + instance = { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': False, + 'tags': ['test:clickhouse'], + } + check = ClickhouseCheck('clickhouse', {}, [instance]) + assert check.statement_samples is None + + +def test_obfuscate_and_normalize_query(check_with_dbm): + """Test query obfuscation and normalization""" + statement_samples = check_with_dbm.statement_samples + + row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id-123', + 'query': 'SELECT * FROM users WHERE user_id = 12345', + 'type': 'QueryFinish', + 'user': 'default', + 'duration_ms': 100, + 'read_rows': 1, + 'read_bytes': 100, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 1, + 'result_bytes': 100, + 'memory_usage': 1024, + 'exception': None, + } + + normalized_row = statement_samples._obfuscate_and_normalize_query(row) + + # Verify that query was obfuscated (literal should be replaced) + assert normalized_row['statement'] is not None + assert '12345' not in normalized_row['statement'] + assert normalized_row['query_signature'] is not None + + # Verify metadata was collected + assert 'dd_tables' in normalized_row + assert 'dd_commands' in normalized_row + assert 'dd_comments' in normalized_row + + +def test_normalize_query_log_row(check_with_dbm): + """Test normalization of query log rows""" + statement_samples = check_with_dbm.statement_samples + + # Simulate a row from system.query_log + row = ( + time.time(), # event_time + 'test-query-id-123', # query_id + 'SELECT count(*) FROM system.tables', # query + 'QueryFinish', # type + 'default', # user + 50, # query_duration_ms + 10, # read_rows + 1024, # read_bytes + 0, # written_rows + 0, # written_bytes + 1, # result_rows + 8, # result_bytes + 2048, # memory_usage + None, # exception + ) + + normalized_row = statement_samples._normalize_query_log_row(row) + + # Verify basic fields + assert normalized_row['query_id'] == 'test-query-id-123' + assert normalized_row['user'] == 'default' + assert normalized_row['type'] == 'QueryFinish' + assert normalized_row['duration_ms'] == 50 + assert normalized_row['read_rows'] == 10 + assert normalized_row['memory_usage'] == 2048 + + # Verify obfuscation occurred + assert normalized_row['statement'] is not None + assert normalized_row['query_signature'] is not None + + +def test_create_sample_event(check_with_dbm): + """Test creation of sample events for submission""" + statement_samples = check_with_dbm.statement_samples + + normalized_row = { + 'timestamp': time.time(), + 'query_id': 'test-query-id-123', + 'query': 'SELECT * FROM users', + 'statement': 'SELECT * FROM users', + 'query_signature': 'abc123', + 'type': 'QueryFinish', + 'user': 'default', + 'duration_ms': 100, + 'read_rows': 10, + 'read_bytes': 1024, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 10, + 'result_bytes': 1024, + 'memory_usage': 2048, + 'exception': None, + 'dd_tables': ['users'], + 'dd_commands': ['SELECT'], + 'dd_comments': [], + } + + event = statement_samples._create_sample_event(normalized_row) + + # Verify event structure + assert event['ddsource'] == 'clickhouse' + assert event['dbm_type'] == 'sample' + assert 'timestamp' in event + assert 'db' in event + assert event['db']['query_signature'] == 'abc123' + assert event['db']['statement'] == 'SELECT * FROM users' + assert event['db']['user'] == 'default' + + # Verify ClickHouse-specific fields + assert 'clickhouse' in event + assert event['clickhouse']['query_id'] == 'test-query-id-123' + assert event['clickhouse']['duration_ms'] == 100 + assert event['clickhouse']['read_rows'] == 10 + assert event['clickhouse']['memory_usage'] == 2048 + + # Verify duration is in nanoseconds + assert event['duration'] == 100 * 1e6 + + +def test_rate_limiting(check_with_dbm): + """Test that query sample rate limiting works correctly""" + statement_samples = check_with_dbm.statement_samples + + query_signature = 'test-signature-123' + + # First acquisition should succeed + assert statement_samples._seen_samples_ratelimiter.acquire(query_signature) is True + + # Immediate re-acquisition should fail due to rate limiting + assert statement_samples._seen_samples_ratelimiter.acquire(query_signature) is False + + +def test_query_log_query_format(): + """Test that the query log query is properly formatted""" + from datadog_checks.clickhouse.statement_samples import QUERY_LOG_QUERY + + # Verify query contains necessary clauses + assert 'system.query_log' in QUERY_LOG_QUERY + assert 'event_time' in QUERY_LOG_QUERY + assert 'query_id' in QUERY_LOG_QUERY + assert 'query' in QUERY_LOG_QUERY + assert 'query_duration_ms' in QUERY_LOG_QUERY + assert 'WHERE' in QUERY_LOG_QUERY + assert 'LIMIT' in QUERY_LOG_QUERY + + +def test_active_queries_query_format(): + """Test that the active queries query is properly formatted""" + from datadog_checks.clickhouse.statement_samples import ACTIVE_QUERIES_QUERY + + # Verify query contains necessary clauses + assert 'system.processes' in ACTIVE_QUERIES_QUERY + + # Original fields + assert 'query_id' in ACTIVE_QUERIES_QUERY + assert 'query' in ACTIVE_QUERIES_QUERY + assert 'elapsed' in ACTIVE_QUERIES_QUERY + assert 'memory_usage' in ACTIVE_QUERIES_QUERY + assert 'read_rows' in ACTIVE_QUERIES_QUERY + assert 'read_bytes' in ACTIVE_QUERIES_QUERY + assert 'written_rows' in ACTIVE_QUERIES_QUERY + assert 'written_bytes' in ACTIVE_QUERIES_QUERY + + # New enhanced fields + assert 'peak_memory_usage' in ACTIVE_QUERIES_QUERY + assert 'total_rows_approx' in ACTIVE_QUERIES_QUERY + assert 'result_rows' in ACTIVE_QUERIES_QUERY + assert 'result_bytes' in ACTIVE_QUERIES_QUERY + assert 'query_start_time' in ACTIVE_QUERIES_QUERY + assert 'query_start_time_microseconds' in ACTIVE_QUERIES_QUERY + assert 'client_name' in ACTIVE_QUERIES_QUERY + assert 'client_version_major' in ACTIVE_QUERIES_QUERY + assert 'client_version_minor' in ACTIVE_QUERIES_QUERY + assert 'client_version_patch' in ACTIVE_QUERIES_QUERY + assert 'current_database' in ACTIVE_QUERIES_QUERY + assert 'thread_ids' in ACTIVE_QUERIES_QUERY + assert 'address' in ACTIVE_QUERIES_QUERY + assert 'port' in ACTIVE_QUERIES_QUERY + assert 'client_hostname' in ACTIVE_QUERIES_QUERY + assert 'is_cancelled' in ACTIVE_QUERIES_QUERY + assert 'http_user_agent' in ACTIVE_QUERIES_QUERY + + +@mock.patch('datadog_checks.clickhouse.statement_samples.datadog_agent') +def test_metrics_reporting(mock_agent, check_with_dbm, aggregator): + """Test that statement samples report metrics correctly""" + statement_samples = check_with_dbm.statement_samples + statement_samples.tags = ['test:clickhouse', 'db:default'] + statement_samples._tags_no_db = ['test:clickhouse'] + + # Mock the agent version + mock_agent.get_version.return_value = '7.50.0' + + # Call the metrics reporting method + start_time = time.time() + statement_samples._report_check_hist_metrics(start_time, 10, "test_method") + + # Verify histograms were submitted + aggregator.assert_metric('dd.clickhouse.test_method.time') + aggregator.assert_metric('dd.clickhouse.test_method.rows') + + +def test_get_debug_tags(check_with_dbm): + """Test that debug tags are properly generated""" + statement_samples = check_with_dbm.statement_samples + debug_tags = statement_samples._get_debug_tags() + + # Verify debug tags contain server information + assert any('server:' in tag for tag in debug_tags) + + +def test_dbtags(check_with_dbm): + """Test that database tags are properly generated""" + statement_samples = check_with_dbm.statement_samples + statement_samples._tags_no_db = ['test:clickhouse', 'server:localhost'] + + db_tags = statement_samples._dbtags('testdb', 'extra:tag') + + # Verify tags include database, extra tags, and base tags + assert 'db:testdb' in db_tags + assert 'extra:tag' in db_tags + assert 'test:clickhouse' in db_tags + assert 'server:localhost' in db_tags + + +def test_normalize_active_query_row_with_all_fields(check_with_dbm): + """Test that all new fields are properly normalized""" + statement_samples = check_with_dbm.statement_samples + + # Create a mock row with all fields (31 fields total) + mock_row = ( + 1.234, # elapsed + 'abc-123-def', # query_id + 'SELECT * FROM users WHERE id = 1', # query + 'default', # user + 1000, # read_rows + 50000, # read_bytes + 0, # written_rows + 0, # written_bytes + 1048576, # memory_usage + 'abc-123-def', # initial_query_id + 'default', # initial_user + 'Select', # query_kind + 1, # is_initial_query + 2097152, # peak_memory_usage (NEW) + 1000000, # total_rows_approx (NEW) + 1000, # result_rows (NEW) + 45000, # result_bytes (NEW) + '2025-11-07 10:05:01', # query_start_time (NEW) + '2025-11-07 10:05:01.123456', # query_start_time_microseconds (NEW) + 'python-clickhouse-driver', # client_name (NEW) + 0, # client_version_major (NEW) + 2, # client_version_minor (NEW) + 4, # client_version_patch (NEW) + 'analytics', # current_database (NEW) + [123, 124, 125], # thread_ids (NEW) + '192.168.1.100', # address (NEW) + 54321, # port (NEW) + 'app-server-01', # client_hostname (NEW) + 0, # is_cancelled (NEW) + 'python-requests/2.28.0', # http_user_agent (NEW) + ) + + normalized_row = statement_samples._normalize_active_query_row(mock_row) + + # Verify original fields + assert normalized_row['elapsed'] == 1.234 + assert normalized_row['query_id'] == 'abc-123-def' + assert normalized_row['user'] == 'default' + assert normalized_row['read_rows'] == 1000 + assert normalized_row['read_bytes'] == 50000 + assert normalized_row['memory_usage'] == 1048576 + assert normalized_row['query_kind'] == 'Select' + assert normalized_row['is_initial_query'] is True + + # Verify new memory & performance fields + assert normalized_row['peak_memory_usage'] == 2097152 + assert normalized_row['total_rows_approx'] == 1000000 + assert normalized_row['result_rows'] == 1000 + assert normalized_row['result_bytes'] == 45000 + + # Verify new timing fields + assert normalized_row['query_start_time'] == '2025-11-07 10:05:01' + assert normalized_row['query_start_time_microseconds'] == '2025-11-07 10:05:01.123456' + + # Verify new client fields + assert normalized_row['client_name'] == 'python-clickhouse-driver' + assert normalized_row['client_version_major'] == 0 + assert normalized_row['client_version_minor'] == 2 + assert normalized_row['client_version_patch'] == 4 + assert normalized_row['client_hostname'] == 'app-server-01' + assert normalized_row['address'] == '192.168.1.100' + assert normalized_row['port'] == 54321 + + # Verify new database field + assert normalized_row['current_database'] == 'analytics' + + # Verify new thread fields + assert normalized_row['thread_ids'] == [123, 124, 125] + + # Verify new status fields + assert normalized_row['is_cancelled'] is False + + # Verify new HTTP field + assert normalized_row['http_user_agent'] == 'python-requests/2.28.0' + + # Verify obfuscation happened (statement should be set) + assert 'statement' in normalized_row + assert 'query_signature' in normalized_row