From a76db1f0bc4c1f101807b251bbaf0ef76653bb1b Mon Sep 17 00:00:00 2001 From: tendil Date: Thu, 12 Feb 2026 20:02:53 +0000 Subject: [PATCH 1/3] [ADD] auditlog_clickhouse_write: enable offloaded audit storage Audit tables can grow without bounds on production databases, causing bloat and adding latency to audited operations because logs are written synchronously in the same transaction. Keeping audit data in PostgreSQL also leaves an immutability gap for privileged users. Introduce a dedicated module to buffer audit payloads and export them asynchronously to ClickHouse, allowing audit storage to scale without slowing down business transactions and keeping audit data effectively immutable in a write-only store. Task: 5246 --- auditlog_clickhouse_write/README.rst | 172 +++++ auditlog_clickhouse_write/__init__.py | 1 + auditlog_clickhouse_write/__manifest__.py | 21 + .../data/auditlog_clickhouse_queue.xml | 27 + .../i18n/auditlog_clickhouse_write.pot | 335 +++++++++ auditlog_clickhouse_write/models/__init__.py | 4 + .../models/auditlog_clickhouse_config.py | 480 +++++++++++++ .../models/auditlog_log_buffer.py | 675 ++++++++++++++++++ .../models/auditlog_rule.py | 615 ++++++++++++++++ .../models/clickhouse_client.py | 62 ++ auditlog_clickhouse_write/pyproject.toml | 3 + auditlog_clickhouse_write/readme/CONFIGURE.md | 31 + auditlog_clickhouse_write/readme/CONTEXT.md | 5 + .../readme/CONTRIBUTORS.md | 4 + auditlog_clickhouse_write/readme/CREDITS.md | 3 + .../readme/DESCRIPTION.md | 4 + auditlog_clickhouse_write/readme/USAGE.md | 7 + .../security/ir.model.access.csv | 3 + .../static/description/index.html | 533 ++++++++++++++ auditlog_clickhouse_write/tests/__init__.py | 2 + auditlog_clickhouse_write/tests/common.py | 129 ++++ .../tests/test_auditlog_clickhouse_write.py | 348 +++++++++ .../tests/test_clickhouse_config.py | 67 ++ .../auditlog_clickhouse_config_views.xml | 115 +++ requirements.txt | 1 + 25 files changed, 3647 insertions(+) create mode 100644 auditlog_clickhouse_write/README.rst create mode 100644 auditlog_clickhouse_write/__init__.py create mode 100644 auditlog_clickhouse_write/__manifest__.py create mode 100644 auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml create mode 100644 auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot create mode 100644 auditlog_clickhouse_write/models/__init__.py create mode 100644 auditlog_clickhouse_write/models/auditlog_clickhouse_config.py create mode 100644 auditlog_clickhouse_write/models/auditlog_log_buffer.py create mode 100644 auditlog_clickhouse_write/models/auditlog_rule.py create mode 100644 auditlog_clickhouse_write/models/clickhouse_client.py create mode 100644 auditlog_clickhouse_write/pyproject.toml create mode 100644 auditlog_clickhouse_write/readme/CONFIGURE.md create mode 100644 auditlog_clickhouse_write/readme/CONTEXT.md create mode 100644 auditlog_clickhouse_write/readme/CONTRIBUTORS.md create mode 100644 auditlog_clickhouse_write/readme/CREDITS.md create mode 100644 auditlog_clickhouse_write/readme/DESCRIPTION.md create mode 100644 auditlog_clickhouse_write/readme/USAGE.md create mode 100644 auditlog_clickhouse_write/security/ir.model.access.csv create mode 100644 auditlog_clickhouse_write/static/description/index.html create mode 100644 auditlog_clickhouse_write/tests/__init__.py create mode 100644 auditlog_clickhouse_write/tests/common.py create mode 100644 auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py create mode 100644 auditlog_clickhouse_write/tests/test_clickhouse_config.py create mode 100644 auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml diff --git a/auditlog_clickhouse_write/README.rst b/auditlog_clickhouse_write/README.rst new file mode 100644 index 00000000000..99b11e1b252 --- /dev/null +++ b/auditlog_clickhouse_write/README.rst @@ -0,0 +1,172 @@ +============================= +Store Audit Log in Clickhouse +============================= + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:11aaa38bad24a890554c0d34d74d31e13b933facbba3fea31f4cbf22ae8fd842 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--tools-lightgray.png?logo=github + :target: https://github.com/OCA/server-tools/tree/18.0/auditlog_clickhouse_write + :alt: OCA/server-tools +.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png + :target: https://translation.odoo-community.org/projects/server-tools-18-0/server-tools-18-0-auditlog_clickhouse_write + :alt: Translate me on Weblate +.. |badge5| image:: https://img.shields.io/badge/runboat-Try%20me-875A7B.png + :target: https://runboat.odoo-community.org/builds?repo=OCA/server-tools&target_branch=18.0 + :alt: Try me on Runboat + +|badge1| |badge2| |badge3| |badge4| |badge5| + +This module implements buffered asynchronous transfers audit of logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations. + +**Table of contents** + +.. contents:: + :local: + +Use Cases / Context +=================== + +The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by + base.group_system) have full CRUD access to audit tables, allowing + audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same + transaction and performs multiple ORM create() calls, adding latency + to audited operations. + +Configuration +============= + +This module requires: + +- A reachable ClickHouse server. +- Python dependency ``clickhouse-driver`` available in the Odoo + environment. +- A ClickHouse database created in advance (the module does **not** + create databases/users/grants). +- A ClickHouse user with at least: + + - ``INSERT`` and ``CREATE TABLE`` privileges on the target database. + +.. + + ClickHouse installation (Docker guide): + ``https://clickhouse.com/docs/install/docker`` + +Steps: + +- Make sure ``clickhouse-driver`` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + ++---------------------------------------+ +| Field | ++=======================================+ +| Hostname or IP | ++---------------------------------------+ +| TCP port | ++---------------------------------------+ +| ClickHouse database name | ++---------------------------------------+ +| ClickHouse user | ++---------------------------------------+ +| ClickHouse Password | ++---------------------------------------+ +| queue_job_batch_size (default = 1000) | ++---------------------------------------+ +| channel_id (default root) | ++---------------------------------------+ + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in + the target database. + +Usage +===== + +Once auditlog_clickhouse_write is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) + on models with active auditlog.rule subscriptions. This behavior is + unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer + table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to + ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or + deleted via Odoo. + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* Cetmix + +Contributors +------------ + +- `Cetmix `__ + + - Ivan Sokolov + - George Smirnov + - Dmitry Meita + +Other credits +------------- + +The development of this module has been financially supported by: + +- Geschäftsstelle Sozialinfo + +Maintainers +----------- + +This module is maintained by the OCA. + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +This module is part of the `OCA/server-tools `_ project on GitHub. + +You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute. diff --git a/auditlog_clickhouse_write/__init__.py b/auditlog_clickhouse_write/__init__.py new file mode 100644 index 00000000000..0650744f6bc --- /dev/null +++ b/auditlog_clickhouse_write/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/auditlog_clickhouse_write/__manifest__.py b/auditlog_clickhouse_write/__manifest__.py new file mode 100644 index 00000000000..05e78aff82b --- /dev/null +++ b/auditlog_clickhouse_write/__manifest__.py @@ -0,0 +1,21 @@ +{ + "name": "Store Audit Log in Clickhouse", + "version": "18.0.1.0.0", + "summary": "Asynchronous audit log storage in ClickHouse", + "category": "Tools", + "license": "AGPL-3", + "author": "Odoo Community Association (OCA), Cetmix", + "website": "https://github.com/OCA/server-tools", + "depends": [ + "auditlog", + "queue_job", + ], + "external_dependencies": { + "python": ["clickhouse-driver"], + }, + "data": [ + "security/ir.model.access.csv", + "data/auditlog_clickhouse_queue.xml", + "views/auditlog_clickhouse_config_views.xml", + ], +} diff --git a/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml b/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml new file mode 100644 index 00000000000..6d5370020f7 --- /dev/null +++ b/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml @@ -0,0 +1,27 @@ + + + Auditlog ClickHouse Write: enqueue buffer flush + + code + model._cron_flush_to_clickhouse() + 5 + minutes + True + + + + + Edit buffer flush schedule + ir.cron + + ir.actions.act_window + form + new + + diff --git a/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot b/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot new file mode 100644 index 00000000000..2079fb9ce46 --- /dev/null +++ b/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot @@ -0,0 +1,335 @@ +# Translation of Odoo Server. +# This file contains the translation of the following modules: +# * auditlog_clickhouse_write +# +msgid "" +msgstr "" +"Project-Id-Version: Odoo Server 18.0\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2026-03-05 18:40+0000\n" +"PO-Revision-Date: 2026-03-05 18:40+0000\n" +"Last-Translator: \n" +"Language-Team: \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: \n" +"Plural-Forms: \n" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "" +"%s\n" +"\n" +"If you save this configuration as active, the currently active one will be deactivated:\n" +"- %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "" +"As soon as this connection to ClickHouse is activated, all log\n" +" entries from that moment will be stored in the configured\n" +" ClickHouse database. Only one connection can be active at a\n" +" time." +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "" +"As soon as this connection to ClickHouse is activated, all log entries from that moment will be stored in the configured ClickHouse database.\n" +"\n" +" Only one connection can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__attempt_count +msgid "Attempt Count" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_rule +msgid "Auditlog - Rule" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_log_buffer +msgid "Auditlog ClickHouse Buffer" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Auditlog ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_clickhouse_config +msgid "Auditlog ClickHouse Write Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.server,name:auditlog_clickhouse_write.ir_cron_auditlog_clickhouse_write_enqueue_flush_ir_actions_server +msgid "Auditlog ClickHouse Write: enqueue buffer flush" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Auditlog tables were created (if they did not exist)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_batch_size +msgid "Batch size" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_channel_id +msgid "Channel" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_auditlog_clickhouse_config +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_list +msgid "ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse activation" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.ui.menu,name:auditlog_clickhouse_write.menu_auditlog_clickhouse_config +msgid "ClickHouse configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse connection failed: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__port +msgid "" +"ClickHouse native TCP port used by clickhouse-driver (default is 9000)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__host +msgid "" +"ClickHouse server hostname or IP address. Must be reachable from the Odoo " +"server." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__user +msgid "" +"ClickHouse user name used for INSERT operations into auditlog tables. " +"Recommended: a dedicated user with INSERT-only privileges." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Configure action" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_configure_auditlog_clickhouse_write_flush_cron +msgid "Configure flush action" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Connection" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Connection to ClickHouse is OK." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Create Auditlog Tables" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__create_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__create_uid +msgid "Created by" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__create_date +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__create_date +msgid "Created on" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__database +msgid "Database name" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__display_name +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__display_name +msgid "Display Name" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_configure_auditlog_clickhouse_write_cron +msgid "Edit buffer flush schedule" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Error" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__error_message +msgid "Error Message" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Export Queue" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Failed to create ClickHouse tables: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Flushed to ClickHouse but failed to delete buffer rows: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__host +msgid "Hostname or IP" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__id +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__id +msgid "ID" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__is_active +msgid "" +"If checked audit logs will be buffered locally and exported to ClickHouse. " +"Only one configuration can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__is_active +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_list +msgid "Is Active" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__write_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__write_uid +msgid "Last Updated by" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__write_date +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__write_date +msgid "Last Updated on" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "" +"Logs are buffered in PostgreSQL and periodically flushed to\n" +" ClickHouse by cron." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_batch_size +msgid "Maximum number of buffer rows processed per queue job run." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Notes" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__password +msgid "Password" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__password +msgid "Password for the ClickHouse user." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__payload_json +msgid "Payload Json" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Pending" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/clickhouse_client.py:0 +msgid "" +"Python package 'clickhouse-driver' is not available. Install it in the Odoo " +"environment to use ClickHouse storage." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__state +msgid "State" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Success" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__port +msgid "TCP Port" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__database +msgid "" +"Target ClickHouse database where auditlog tables exist (or will be created " +"by the setup button)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Test connection" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__user +msgid "User" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_channel_id +msgid "queue_job channel used for export jobs." +msgstr "" diff --git a/auditlog_clickhouse_write/models/__init__.py b/auditlog_clickhouse_write/models/__init__.py new file mode 100644 index 00000000000..9791cf29bce --- /dev/null +++ b/auditlog_clickhouse_write/models/__init__.py @@ -0,0 +1,4 @@ +from . import auditlog_clickhouse_config +from . import clickhouse_client +from . import auditlog_log_buffer +from . import auditlog_rule diff --git a/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py new file mode 100644 index 00000000000..1057f96ef73 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py @@ -0,0 +1,480 @@ +import logging +import re + +from odoo import api, fields, models +from odoo.exceptions import UserError, ValidationError + +from .clickhouse_client import get_clickhouse_client + +_logger = logging.getLogger(__name__) + + +class AuditlogClickhouseConfig(models.Model): + """ + ClickHouse connection configuration for auditlog_clickhouse_write. + + Business rules: + - Only one configuration can be active at a time. + - UI provides tools to test the connection and (optionally) create tables. + + Notes: + - As soon as a configuration becomes active, audit log entries will be stored + in the configured ClickHouse database from that moment. + """ + + _name = "auditlog.clickhouse.config" + _description = "Auditlog ClickHouse Write Configuration" + _rec_name = "display_name" + + DEFAULT_PORT = 9000 + DEFAULT_DB = "odoo_audit" + DB_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + DEFAULT_USER = "odoo_audit_writer" + DEFAULT_QUEUE_BATCH_SIZE = 1000 + + is_active = fields.Boolean( + help=( + "If checked audit logs will be buffered locally and exported to ClickHouse." + " Only one configuration can be active at a time." + ), + ) + host = fields.Char( + string="Hostname or IP", + required=True, + help=( + "ClickHouse server hostname or IP address. " + "Must be reachable from the Odoo server." + ), + ) + port = fields.Integer( + string="TCP Port", + required=True, + default=DEFAULT_PORT, + help=( + "ClickHouse native TCP port used by clickhouse-driver " "(default is 9000)." + ), + ) + database = fields.Char( + string="Database name", + required=True, + default=DEFAULT_DB, + help=( + "Target ClickHouse database where auditlog tables exist " + "(or will be created by the setup button)." + ), + ) + user = fields.Char( + required=True, + default=DEFAULT_USER, + help=( + "ClickHouse user name used for INSERT operations into auditlog tables. " + "Recommended: a dedicated user with INSERT-only privileges." + ), + ) + password = fields.Char( + help="Password for the ClickHouse user.", + ) + + queue_batch_size = fields.Integer( + string="Batch size", + default=DEFAULT_QUEUE_BATCH_SIZE, + required=True, + help="Maximum number of buffer rows processed per queue job run.", + ) + + def _default_queue_channel(self): + """Return default queue_job channel (root). + + :return: Root queue.job.channel record or empty recordset + :rtype: odoo.models.BaseModel + """ + Channel = self.env["queue.job.channel"].sudo() + channel = Channel.search([("complete_name", "=", "root")], limit=1) + return channel + + queue_channel_id = fields.Many2one( + comodel_name="queue.job.channel", + string="Channel", + required=True, + default=lambda self: self._default_queue_channel(), + ondelete="restrict", + help="queue_job channel used for export jobs.", + ) + + @api.depends("host", "port", "database", "user", "is_active") + def _compute_display_name(self): + """Compute human-readable configuration name. + + Format: + host:port/database (user) [active] + """ + for rec in self: + base = ( + f"{rec.host or ''}:{rec.port or ''}/" + f"{rec.database or ''} ({rec.user or ''})" + ) + rec.display_name = f"{base} [active]" if rec.is_active else base + + @api.model + def get_active_config(self): + """Return the currently active ClickHouse configuration. + + :return: Active configuration record or None + :rtype: Optional[AuditlogClickhouseConfig] + """ + config = self.search([("is_active", "=", True)], limit=1) + _logger.debug( + "auditlog_clickhouse_write: get_active_config -> %s", + config.id if config else None, + ) + return config + + def _deactivate_other_configs(self): + """Deactivate all other active configurations. + + Ensures single-active configuration rule. + """ + other_configs = self.search( + [("is_active", "=", True), ("id", "not in", self.ids)] + ) + if other_configs: + _logger.info( + "auditlog_clickhouse_write: deactivating " + "other configs %s (activated=%s)", + other_configs.ids, + self.ids, + ) + other_configs.write({"is_active": False}) + + @api.onchange("is_active") + def _onchange_is_active(self): + """Display warning when activating configuration. + + Warns user that: + - Logs will start exporting immediately. + - Another active config will be deactivated. + + :return: Warning action dictionary or None + :rtype: Optional[Dict[str, Any]] + """ + for rec in self: + if not rec.is_active or (rec._origin and rec._origin.is_active): + continue + + disclaimer = rec.env._( + "As soon as this connection to ClickHouse is activated, all log entries" + " from that moment will be stored in the configured ClickHouse" + " database.\n\n Only one connection can be active at a time." + ) + + domain = [("is_active", "=", True)] + if rec.id: + domain.append(("id", "!=", rec.id)) + + other = rec.env["auditlog.clickhouse.config"].sudo().search(domain, limit=1) + if other: + message = rec.env._( + "%s\n\nIf you save this configuration as active, " + "the currently active one will be deactivated:\n- %s" + ) % (disclaimer, other.display_name) + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": message, + } + } + + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": disclaimer, + } + } + + @api.model_create_multi + def create(self, vals_list): + """Create configuration records. + + Enforces single active configuration rule after creation. + + :param vals_list: List of record values + :type vals_list: List[Dict[str, Any]] + + :return: Created records + :rtype: AuditlogClickhouseWriteConfig + """ + records = super().create(vals_list) + active_records = records.filtered("is_active") + if active_records: + _logger.info( + "auditlog_clickhouse_write: created active config(s) %s", + active_records.ids, + ) + active_records._deactivate_other_configs() + else: + _logger.debug( + "auditlog_clickhouse_write: created config(s) %s", records.ids + ) + return records + + def write(self, vals): + """Update configuration record(s). + + If activation flag is enabled, ensures other configs are deactivated. + + :param vals: Field values to update + :type vals: Dict[str, Any] + + :return: True if write succeeds + :rtype: bool + """ + turning_on = vals.get("is_active") is True + result = super().write(vals) + + if turning_on: + activated = self.filtered("is_active") + _logger.info( + "auditlog_clickhouse_write: activated config(s) %s (via write)", + activated.ids, + ) + activated._deactivate_other_configs() + else: + _logger.debug( + "auditlog_clickhouse_write: updated config(s) %s (vals=%s)", + self.ids, + sorted(vals.keys()), + ) + + return result + + @api.constrains("database") + def _check_database(self): + """ + Validate CH database identifier to prevent SQL injection/malformed queries. + + :raises ValidationError: If the DB name does not match the allowed pattern. + """ + for rec in self: + db = rec.database or "" + if not self.DB_NAME_RE.match(db): + raise ValidationError( + rec.env._( + "Invalid database name. Allowed characters: " + "letters, digits, underscore. " + "Must start with a letter or underscore." + ) + ) + + def action_test_connection(self): + """Test ClickHouse connectivity using simple SELECT query. + + :raises UserError: If connection fails + + :return: Odoo notification action + :rtype: Dict[str, Any] + """ + self.ensure_one() + _logger.info( + "auditlog_clickhouse_write: testing connection " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + client = self._get_client() + try: + client.execute("SELECT 1") + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: connection test FAILED " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + raise UserError( + self.env._("ClickHouse connection failed: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse_write: connection test OK " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Connection to ClickHouse is OK."), + notif_type="success", + ) + + def action_create_auditlog_tables(self): + """Create required ClickHouse tables if not present. + + Executes predefined DDL statements. + + :raises UserError: If DDL execution fails + + :return: Odoo notification action + :rtype: Dict[str, Any] + """ + self.ensure_one() + _logger.info( + "auditlog_clickhouse_write: creating tables (config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + + client = self._get_client() + try: + for statement in self._get_clickhouse_ddl(): + preview = " ".join(statement.strip().splitlines())[:120] + _logger.debug( + "auditlog_clickhouse_write: executing DDL (config=%s): %s...", + self.id, + preview, + ) + client.execute(statement) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: create tables FAILED " + "(config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + raise UserError( + self.env._("Failed to create ClickHouse tables: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse_write: create tables OK (config=%s db=%s)", + self.id, + self.database, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Auditlog tables were created (if they did not exist)."), + notif_type="success", + ) + + def _get_client(self): + """Build clickhouse-driver client from configuration. + + :return: Configured ClickHouse client instance + :rtype: clickhouse_driver.Client + """ + self.ensure_one() + _logger.debug( + "auditlog_clickhouse_write: building client " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + return get_clickhouse_client( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + ) + + def _get_clickhouse_ddl(self): + """Return ClickHouse DDL statements. + + Includes: + - auditlog_log + - auditlog_log_line + + :return: List of DDL SQL statements + :rtype: List[str] + """ + self.ensure_one() + db_name = self.database + + return [ + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log + ( + id Int64, + name Nullable(String), + model_id Int32, + model_name Nullable(String), + model_model String, + res_id Nullable(Int64), + res_ids Nullable(String), + user_id Int32, + method String, + http_request_id Nullable(Int64), + http_session_id Nullable(Int64), + log_type Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log_line + ( + id Int64, + log_id Int64, + field_id Int32, + field_name Nullable(String), + field_description Nullable(String), + old_value Nullable(String), + new_value Nullable(String), + old_value_text Nullable(String), + new_value_text Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + ] + + @staticmethod + def _notify(*, title, message, notif_type="info"): + """Build standard Odoo display_notification action. + + :param title: Notification title + :type title: str + :param message: Notification message + :type message: str + :param notif_type: Notification type, defaults to "info" + :type notif_type: str, optional + + :return: Odoo client action dictionary + :rtype: Dict[str, Any] + """ + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": title, + "message": message, + "type": notif_type, + "sticky": False, + }, + } diff --git a/auditlog_clickhouse_write/models/auditlog_log_buffer.py b/auditlog_clickhouse_write/models/auditlog_log_buffer.py new file mode 100644 index 00000000000..92878098745 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_log_buffer.py @@ -0,0 +1,675 @@ +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from dateutil import parser as dt_parser + +from odoo import api, fields, models +from odoo.tools import SQL + +from odoo.addons.queue_job.exception import RetryableJobError + +_logger = logging.getLogger(__name__) + +JsonMapping = dict[str, Any] +ChRow = tuple[Any, ...] + + +class AuditlogLogBuffer(models.Model): + """ + Buffered audit log payloads waiting to be flushed into ClickHouse. + + Each record stores a pre-built payload produced by the auditlog.rule override. + Export is asynchronous: + + - A cron enqueues a queue_job. + - The queue_job locks pending buffer rows (FOR UPDATE SKIP LOCKED), + converts payloads to ClickHouse tuples and inserts them in batches. + - Successfully flushed buffer rows are removed from PostgreSQL. + + Design notes: + - This model is an internal queue; no user-facing ACLs should be provided. + - queue_job provides retries/backoff when ClickHouse is slow/unavailable. + """ + + _name = "auditlog.log.buffer" + _description = "Auditlog ClickHouse Buffer" + _order = "create_date asc, id asc" + + STATE_PENDING = "pending" + STATE_ERROR = "error" + + # Column order MUST match CREATE TABLE schema and inserted tuples. + _CH_LOG_COLUMNS: tuple[str, ...] = ( + "id", + "name", + "model_id", + "model_name", + "model_model", + "res_id", + "res_ids", + "user_id", + "method", + "http_request_id", + "http_session_id", + "log_type", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + _CH_LINE_COLUMNS: tuple[str, ...] = ( + "id", + "log_id", + "field_id", + "field_name", + "field_description", + "old_value", + "new_value", + "old_value_text", + "new_value_text", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + + _INVALID_PAYLOAD_MESSAGE = ( + "Invalid payload structure (expected object with 'log' and 'lines')." + ) + + @api.model + def _selection_state(self): + """Return centralized state selection values. + + :return: List of (value, label) tuples + :rtype: List[Tuple[str, str]] + """ + return [ + (self.STATE_PENDING, self.env._("Pending")), + (self.STATE_ERROR, self.env._("Error")), + ] + + payload_json = fields.Json(required=True) + state = fields.Selection( + selection=lambda self: self._selection_state(), + default=lambda self: self.STATE_PENDING, + required=True, + index=True, + ) + attempt_count = fields.Integer(default=0, required=True) + error_message = fields.Text() + + @staticmethod + def _to_ch_nullable_string(value): + """Convert value to ClickHouse Nullable(String). + + Rules: + - None/False -> None + - str -> unchanged + - list/dict/tuple -> JSON string + - other -> str(value) + + :param value: Any value + :type value: Any + :return: Nullable string value + :rtype: Optional[str] + """ + if value is None or value is False: + return None + if isinstance(value, str): + return value + if isinstance(value, (dict | list | tuple)): + return json.dumps(value, ensure_ascii=False, default=str) + return str(value) + + @staticmethod + def _to_ch_datetime_utc(value): + """Convert value to timezone-aware UTC datetime. + + Normalizes to UTC for ClickHouse DateTime64(3, 'UTC'). + + :param value: Incoming datetime or string + :type value: Any + :return: UTC-aware datetime or None + :rtype: Optional[datetime] + """ + if not value: + return None + + if isinstance(value, datetime): + parsed = value + else: + raw = str(value).strip().replace("Z", "+00:00") + try: + parsed = dt_parser.parse(raw) + except (ValueError, TypeError, OverflowError): + # Fallback: Odoo parser usually returns naive datetime. + parsed = fields.Datetime.from_string(value) + + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + @staticmethod + def _ch_format_in_list(values): + """Format a Python iterable as a ClickHouse IN (...) list. + + Used to build a safe-ish `IN (...)` fragment for identifiers when + parametrization cannot be applied (e.g. identifiers in ClickHouse SQL). + + Rules: + - None values are removed. + - If all values are ints -> rendered without quotes. + - Otherwise values are rendered as single-quoted strings with minimal + escaping for backslashes and single quotes. + + :param values: Iterable of values (ints/strings) to include in IN list. + :type values: Iterable[Any] + :return: Comma-separated list content (without surrounding parentheses). + :rtype: str + """ + cleaned = [v for v in values if v is not None] + if not cleaned: + return "" + # ints -> no quotes + if all(isinstance(v, int) for v in cleaned): + return ", ".join(str(v) for v in cleaned) + + # everything else -> single-quoted + def _q(v): + s = str(v).replace("\\", "\\\\").replace("'", "\\'") + return f"'{s}'" + + return ", ".join(_q(v) for v in cleaned) + + def _filter_existing_rows(self, client, config, table_name, rows): + """Filter out rows already present in ClickHouse by id. + + This helper makes inserts idempotent on retry: if part of a multi-table + insert succeeds (e.g. auditlog_log) and another part fails (e.g. lines), + the next retry must not re-insert already stored rows. + + :param client: ClickHouse client. + :type client: clickhouse_driver.Client + :param config: Active configuration (provides database name). + :type config: AuditlogClickhouseConfig + :param table_name: ClickHouse table name (without database prefix). + :type table_name: str + :param rows: Prepared ClickHouse rows. + :type rows: List[ChRow] + :return: Rows that are not yet present in ClickHouse. + :rtype: List[ChRow] + """ + if not rows: + return rows + + ids = [row[0] for row in rows] + in_list = self._ch_format_in_list(ids) + if not in_list: + return rows + + query = ( + f"SELECT id FROM {config.database}.{table_name} " f"WHERE id IN ({in_list})" + ) + existing = client.execute(query) or [] + existing_ids = {row[0] for row in existing} + if not existing_ids: + return rows + return [row for row in rows if row[0] not in existing_ids] + + def _set_error(self, message): + """Mark buffer records as error and increment attempt counter. + + :param message: Error message + :type message: str + """ + for rec in self: + rec.write( + { + "state": self.STATE_ERROR, + "attempt_count": rec.attempt_count + 1, + "error_message": message, + } + ) + + @api.model + def _lock_pending_buffers(self, batch_size): + """Fetch and lock pending buffers using FOR UPDATE SKIP LOCKED. + + Prevents concurrent workers from selecting the same rows. + + :param batch_size: Maximum number of rows to fetch + :type batch_size: int + :return: Locked buffer recordset + :rtype: AuditlogLogBuffer + """ + query = SQL( + """ + SELECT id + FROM %s + WHERE state = %s + ORDER BY id + FOR UPDATE SKIP LOCKED + LIMIT %s + """, + SQL.identifier(self._table), + self.STATE_PENDING, + batch_size, + ) + self.env.cr.execute(query) + ids = [row[0] for row in self.env.cr.fetchall()] + return self.browse(ids) + + @api.model + def _cron_flush_to_clickhouse(self, batch_size=None): + """Schedule queue_job to flush buffered rows. + + Does not perform ClickHouse INSERT directly. + + :param batch_size: Optional override batch size + :type batch_size: Optional[int] + :return: True (cron compatibility) + :rtype: bool + """ + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + _logger.warning( + "auditlog_clickhouse_write: cron flush " "skipped (no active config)" + ) + return True + + effective_batch = int(batch_size or config.queue_batch_size or 0) or 1000 + + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + _logger.debug( + "auditlog_clickhouse_write: cron flush skipped (no pending buffers)" + ) + return True + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + + _logger.info( + "auditlog_clickhouse_write: enqueue flush job " + "(config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + effective_batch, + ) + + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse_write: " + f"flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, effective_batch) + + return True + + @api.model + def _get_active_config_for_job(self, config_id): + """Return active config for job execution. + + :param config_id: Configuration record ID + :type config_id: int + :return: Active config or None + :rtype: Optional[AuditlogClickhouseConfig] + """ + config = self.env["auditlog.clickhouse.config"].sudo().browse(config_id) + if not config or not config.exists() or not config.is_active: + _logger.info( + "auditlog_clickhouse_write: job skipped " + "(config missing or not active) (config_id=%s)", + config_id, + ) + return None + return config + + @classmethod + def _payload_is_valid(cls, payload): + """Validate payload structure before processing. + + Ensures minimal required fields exist to avoid endless retry loops. + + :param payload: Parsed JSON payload + :type payload: Any + :return: True if valid + :rtype: bool + """ + if not isinstance(payload, dict): + return False + + log_data = payload.get("log") + lines_data = payload.get("lines") + + if not isinstance(log_data, dict) or not isinstance(lines_data, list): + return False + + # Minimal required log fields (to avoid CH insert failures forever) + required = ( + "id", + "model_id", + "model_model", + "user_id", + "method", + "create_date", + "create_uid", + ) + for key in required: + if not log_data.get(key): + return False + + # Lines must be a list of dicts (if any line is broken -> whole payload invalid) + return all(isinstance(line, dict) for line in lines_data) + + def _collect_rows_from_buffers(self, buffers): + """Extract ClickHouse rows from buffer payloads. + + :param buffers: Buffer recordset + :type buffers: AuditlogLogBuffer + :return: (valid_buffers, invalid_buffers, log_rows, line_rows) + :rtype: Tuple[AuditlogLogBuffer, AuditlogLogBuffer, List[ChRow], List[ChRow]] + """ + log_rows: list[ChRow] = [] + line_rows: list[ChRow] = [] + invalid_buffers = self.browse() + + for rec in buffers: + payload = rec.payload_json + + if not self._payload_is_valid(payload): + invalid_buffers |= rec + continue + + log_data = payload["log"] + lines_data = payload["lines"] + + log_rows.append(self._build_ch_log_row(log_data)) + for line_data in lines_data: + line_rows.append(self._build_ch_line_row(line_data)) + + valid_buffers = buffers - invalid_buffers + return valid_buffers, invalid_buffers, log_rows, line_rows + + def _mark_invalid_buffers(self, invalid_buffers, config): + """Mark invalid payload buffers as error. + + :param invalid_buffers: Buffers to mark + :type invalid_buffers: AuditlogLogBuffer + :param config: Active configuration + :type config: AuditlogClickhouseConfig + """ + if not invalid_buffers: + return + invalid_buffers._set_error(self.env._(self._INVALID_PAYLOAD_MESSAGE)) + _logger.warning( + "auditlog_clickhouse_write: invalid payloads=%s (marked error) (config=%s)", + len(invalid_buffers), + config.id, + ) + + def _insert_rows_to_clickhouse( + self, client, config, log_rows, line_rows, valid_buffers + ): + """Insert prepared rows into ClickHouse. + + Raises RetryableJobError on failure. + + :param client: ClickHouse client + :type client: clickhouse_driver.Client + :param config: Active configuration + :type config: AuditlogClickhouseConfig + :param log_rows: Rows for auditlog_log + :type log_rows: List[ChRow] + :param line_rows: Rows for auditlog_log_line + :type line_rows: List[ChRow] + :param valid_buffers: Successfully processed buffers + :type valid_buffers: AuditlogLogBuffer + """ + log_rows_to_insert = log_rows + line_rows_to_insert = line_rows + if log_rows and line_rows: + log_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_log", log_rows + ) + line_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_log_line", line_rows + ) + + if not log_rows_to_insert and not line_rows_to_insert: + _logger.warning( + "auditlog_clickhouse_write: nothing to insert " + "(config_id=%s buffers=%s)", + config.id, + len(valid_buffers), + ) + return + + try: + if log_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_log (" + f"{', '.join(self._CH_LOG_COLUMNS)}) VALUES", + log_rows_to_insert, + ) + if line_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_log_line (" + f"{', '.join(self._CH_LINE_COLUMNS)}) VALUES", + line_rows_to_insert, + ) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: INSERT failed (will retry) " + "(config=%s buffers=%s logs=%s lines=%s)", + config.id, + len(valid_buffers), + len(log_rows_to_insert), + len(line_rows_to_insert), + ) + raise RetryableJobError( + f"ClickHouse insert failed: {exc}", + seconds=60, + ) from exc + + def _delete_flushed_buffers(self, valid_buffers, config): + """Delete flushed buffer rows from PostgreSQL. + + If deletion fails, mark them as error. + + :param valid_buffers: Buffers to delete + :type valid_buffers: AuditlogLogBuffer + :param config: Active configuration + :type config: AuditlogClickhouseConfig + """ + if not valid_buffers: + _logger.warning( + "auditlog_clickhouse_write: no flushed " + "buffers to delete (config_id=%s)", + config.id, + ) + return + + try: + valid_buffers.unlink() + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: failed to delete flushed buffers " + "(config=%s buffers=%s)", + config.id, + len(valid_buffers), + ) + valid_buffers._set_error( + self.env._("Flushed to ClickHouse but failed to delete buffer rows: %s") + % exc + ) + else: + _logger.info( + "auditlog_clickhouse_write: job flushed batch " + "(config=%s flushed_buffers=%s)", + config.id, + len(valid_buffers), + ) + + def _enqueue_next_flush_job_if_needed(self, config, batch_size): + """Schedule next flush job if pending buffers remain. + + :param config: Active configuration + :type config: AuditlogClickhouseConfig + :param batch_size: Batch size + :type batch_size: int + """ + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + return + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + _logger.debug( + "auditlog_clickhouse_write: more pending buffers detected, " + "enqueue next job (config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + batch_size, + ) + # NOTE: pass config.id (not recordset) to keep queue_job args JSON-serializable + # and re-check config existence/is_active at execution time. + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse_write: " + f"flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, int(batch_size)) + + @api.model + def _job_flush_to_clickhouse(self, config_id, batch_size): + """Queue job: flush one batch of buffers into ClickHouse. + + Steps: + - Lock pending buffers + - Validate payloads + - Build CH rows + - INSERT into ClickHouse (retryable) + - Delete flushed buffers + - Mark invalid buffers + - Enqueue next batch if needed + + :param config_id: Active config ID + :type config_id: int + :param batch_size: Batch size + :type batch_size: int + """ + config = self._get_active_config_for_job(config_id) + if not config: + return + + pending_buffers = self.sudo()._lock_pending_buffers(int(batch_size)) + if not pending_buffers: + _logger.debug( + "auditlog_clickhouse_write: job no-op (no pending buffers) (config=%s)", + config.id, + ) + return + + valid_buffers, invalid_buffers, log_rows, line_rows = ( + self._collect_rows_from_buffers(pending_buffers) + ) + + # Nothing valid: just mark invalids and exit successfully. + if not valid_buffers: + self._mark_invalid_buffers(invalid_buffers, config) + return + + client = config._get_client() + self._insert_rows_to_clickhouse( + client=client, + config=config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + # Delete flushed buffers; if deletion fails, + # mark them as error to avoid re-inserts. + self._delete_flushed_buffers(valid_buffers, config) + + # Mark invalid ones only after successful CH insert + # (so RetryableJobError doesn't rollback the marking) + self._mark_invalid_buffers(invalid_buffers, config) + + # Continue draining queue + self._enqueue_next_flush_job_if_needed(config, int(batch_size)) + + @classmethod + def _build_ch_log_row(cls, log_data): + """Convert payload['log'] dict to ClickHouse tuple. + + Order must match _CH_LOG_COLUMNS. + + :param log_data: Log dictionary + :type log_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(log_data.get("id") or 0), + cls._to_ch_nullable_string(log_data.get("name")), + int(log_data.get("model_id") or 0), + cls._to_ch_nullable_string(log_data.get("model_name")), + (log_data.get("model_model") or "unknown"), + int(log_data.get("res_id") or 0) + if log_data.get("res_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("res_ids")), + int(log_data.get("user_id") or 0), + (log_data.get("method") or "unknown"), + int(log_data.get("http_request_id") or 0) + if log_data.get("http_request_id") is not None + else None, + int(log_data.get("http_session_id") or 0) + if log_data.get("http_session_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("log_type")), + cls._to_ch_datetime_utc(log_data.get("create_date")), + int(log_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(log_data.get("write_date")), + int(log_data.get("write_uid") or 0) + if log_data.get("write_uid") is not None + else None, + ) + + @classmethod + def _build_ch_line_row(cls, line_data): + """Convert payload['lines'][] dict to ClickHouse tuple. + + Order must match _CH_LINE_COLUMNS. + + :param line_data: Line dictionary + :type line_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(line_data.get("id") or 0), + int(line_data.get("log_id") or 0), + int(line_data.get("field_id") or 0), + cls._to_ch_nullable_string(line_data.get("field_name")), + cls._to_ch_nullable_string(line_data.get("field_description")), + cls._to_ch_nullable_string(line_data.get("old_value")), + cls._to_ch_nullable_string(line_data.get("new_value")), + cls._to_ch_nullable_string(line_data.get("old_value_text")), + cls._to_ch_nullable_string(line_data.get("new_value_text")), + cls._to_ch_datetime_utc(line_data.get("create_date")), + int(line_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(line_data.get("write_date")), + int(line_data.get("write_uid") or 0) + if line_data.get("write_uid") is not None + else None, + ) diff --git a/auditlog_clickhouse_write/models/auditlog_rule.py b/auditlog_clickhouse_write/models/auditlog_rule.py new file mode 100644 index 00000000000..803045143a4 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_rule.py @@ -0,0 +1,615 @@ +import logging +import time +from collections.abc import Mapping +from datetime import date, datetime, timezone +from decimal import Decimal +from typing import Any, TypedDict + +from odoo import models + +from odoo.addons.auditlog.models.rule import EMPTY_DICT, FIELDS_BLACKLIST, DictDiffer + +_logger = logging.getLogger(__name__) + + +class _PayloadLog(TypedDict, total=False): + """Structured payload for auditlog_log row before buffering.""" + + id: str + name: str | None + model_id: int + model_name: str | None + model_model: str + res_id: int | None + res_ids: str | None + user_id: int + method: str + http_request_id: int | None + http_session_id: int | None + log_type: str | None + create_date: str + create_uid: int + + +class _PayloadLine(TypedDict, total=False): + """Structured payload for auditlog_log_line row before buffering.""" + + id: str + log_id: str + field_id: int + field_name: str | None + field_description: str | None + old_value: Any | None + new_value: Any | None + old_value_text: Any | None + new_value_text: Any | None + create_date: str + create_uid: int + + +class _Payload(TypedDict): + """Full payload stored in auditlog.log.buffer.""" + + log: _PayloadLog + lines: list[_PayloadLine] + + +def _json_sanitize(obj): + """Convert value to JSON-serializable structure. + + Handles: + - datetime/date -> ISO string + - Decimal -> float + - bytes -> UTF-8 string + - recordsets -> list of ids + - mappings/sequences -> recursively sanitized + - unknown types -> string representation + + :param obj: Arbitrary value + :type obj: Any + :return: JSON-safe value + :rtype: Any + """ + if obj is None or isinstance(obj, (str | int | float | bool)): + return obj + + if isinstance(obj, (datetime | date)): + return obj.isoformat() + + if isinstance(obj, Decimal): + return float(obj) + + if isinstance(obj, bytes): + return obj.decode("utf-8", errors="replace") + + if isinstance(obj, models.BaseModel): + return list(obj.ids) + + if isinstance(obj, Mapping): + return {str(k): _json_sanitize(v) for k, v in obj.items()} + + if isinstance(obj, (list | tuple | set)): + return [_json_sanitize(v) for v in obj] + + return str(obj) + + +class AuditlogRule(models.Model): + _inherit = "auditlog.rule" + + def _next_ids(self, seq_name: str, count: int) -> list[int]: + if count <= 0: + return [] + self.env.cr.execute( + "SELECT nextval(%s::regclass) FROM generate_series(1, %s)", + (seq_name, count), + ) + return [row[0] for row in self.env.cr.fetchall()] + + def _get_rule_settings(self, model_id): + """Return rule settings for given model. + + Computes: + - Union of excluded field names + - Whether record capture is enabled + + Result is cached on registry pool to avoid repeated DB lookups. + + :param model_id: ir.model record ID + :type model_id: int + :return: (excluded_fields_set, capture_record_flag) + :rtype: Tuple[Set[str], bool] + """ + if not hasattr(self.pool, "_auditlog_clickhouse_write_rule_cache"): + self.pool._auditlog_clickhouse_write_rule_cache = {} + cache = self.pool._auditlog_clickhouse_write_rule_cache + + rules = self.sudo().filtered(lambda r: r.model_id.id == model_id) + if not rules: + domain = [("model_id", "=", model_id)] + if "state" in self._fields: + domain.append(("state", "=", "subscribed")) + rules = self.sudo().search(domain) + + stamp = tuple( + ( + rule.id, + bool(rule.capture_record), + tuple(sorted(rule.fields_to_exclude_ids.ids)), + rule.write_date or rule.create_date, + ) + for rule in rules.sorted("id") + ) + key = (model_id, stamp) + if key in cache: + return cache[key] + + excluded: set[str] = set(FIELDS_BLACKLIST) + capture_record = False + + if len(rules) > 1: + _logger.warning( + "auditlog_clickhouse_write: multiple rules found for model_id=%s " + "(rules=%s); using union of excluded fields and any(capture_record).", + model_id, + rules.ids, + ) + for rule in rules: + excluded |= set(rule.fields_to_exclude_ids.mapped("name")) + capture_record = capture_record or bool(rule.capture_record) + + cache[key] = (excluded, capture_record) + return cache[key] + + def _get_audit_model_id(self, res_model): + """Resolve ir.model ID for given model name. + + Prefers auditlog in-memory cache, falls back to ir.model lookup. + + :param res_model: Technical model name (e.g. "res.partner") + :type res_model: str + :return: ir.model record ID + :rtype: int + """ + model_id = getattr(self.pool, "_auditlog_model_cache", {}).get(res_model) + if model_id: + return int(model_id) + return int(self.env["ir.model"].sudo()._get(res_model).id) + + def _dump_payload_json(self, payload): + """Sanitize payload for storage in fields.Json column. + + :param payload: Raw payload dictionary + :type payload: Dict[str, Any] + :return: JSON-serializable payload + :rtype: Dict[str, Any] + """ + return _json_sanitize(payload) + + def _get_http_ids(self): + """Return current auditlog HTTP identifiers (same as base auditlog). + + :return: (http_request_id, http_session_id) + :rtype: Tuple[Optional[int], Optional[int]] + """ + http_request_id = ( + self.env["auditlog.http.request"].current_http_request() or None + ) + http_session_id = ( + self.env["auditlog.http.session"].current_http_session() or None + ) + return http_request_id, http_session_id + + def _build_base_log( + self, + *, + uid, + method, + model_id, + now_iso, + log_type, + ): + """Build base log mapping shared across payloads. + + :param uid: User ID performing operation + :type uid: int + :param method: ORM method name + :type method: str + :param model_id: ir.model record ID + :type model_id: int + :param now_iso: UTC ISO timestamp with milliseconds + :type now_iso: str + :param log_type: Audit log type (from rule/additional values) + :type log_type: Any + :return: Base log dict + :rtype: Dict[str, Any] + """ + model_rec = self.env["ir.model"].sudo().browse(model_id) + http_request_id, http_session_id = self._get_http_ids() + + return { + "model_id": int(model_id), + "model_name": model_rec.name, + "model_model": model_rec.model, + "user_id": int(uid), + "method": method, + "http_request_id": http_request_id, + "http_session_id": http_session_id, + "log_type": log_type, + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + + def _get_buffer_model(self): + """Return auditlog.log.buffer model used to store payloads. + + :return: auditlog.log.buffer model recordset (sudo + tracking_disable) + :rtype: odoo.models.BaseModel + """ + return ( + self.env["auditlog.log.buffer"].sudo().with_context(tracking_disable=True) + ) + + def _buffer_export_data_payload( + self, + *, + buffer_model, + base_log, + res_model, + res_ids, + started, + ): + """Store export_data payload (no lines) into the buffer. + + :param buffer_model: auditlog.log.buffer model recordset + :type buffer_model: odoo.models.BaseModel + :param base_log: Base log mapping + :type base_log: Dict[str, Any] + :param res_model: Model technical name + :type res_model: str + :param res_ids: Record IDs affected + :type res_ids: Sequence[int] + :param started: monotonic() timestamp when create_logs started + :type started: float + """ + log_id = int(self._next_ids("auditlog_log_id_seq", 1)[0]) + payload: _Payload = { + "log": { + "id": log_id, + "name": res_model, + "res_id": None, + "res_ids": str(list(res_ids)), + **base_log, + }, + "lines": [], + } + buffer_model.create([{"payload_json": self._dump_payload_json(payload)}]) + _logger.debug( + "auditlog_clickhouse_write: create_logs end export_data (elapsed=%.3fs)", + time.monotonic() - started, + ) + + def _select_line_builder_and_sources( + self, + *, + method, + include_lines_on_unlink, + old_values, + new_values, + ): + """Select line builder callback and value sources based on method. + + :param method: ORM method name + :type method: str + :param include_lines_on_unlink: Whether unlink should include lines + :type include_lines_on_unlink: bool + :param old_values: Values before change + :type old_values: Mapping[int, Mapping[str, Any]] + :param new_values: Values after change + :type new_values: Mapping[int, Mapping[str, Any]] + :return: (line_builder, values_src) + :rtype: Tuple[Optional[Callable], Tuple] + """ + if method == "create": + return self._prepare_log_line_vals_on_create, (new_values,) + if method == "read": + return self._prepare_log_line_vals_on_read, (old_values,) + if method == "write": + return self._prepare_log_line_vals_on_write, (old_values, new_values) + if include_lines_on_unlink: + return self._prepare_log_line_vals_on_read, (old_values,) + return None, () + + @staticmethod + def _fields_list_for_method( + *, + method, + include_lines_on_unlink, + diff: DictDiffer, + old_values, + res_id, + ): + """Return list of fields to build lines for given method. + + :param method: ORM method name + :type method: str + :param include_lines_on_unlink: Whether unlink should include lines + :type include_lines_on_unlink: bool + :param diff: DictDiffer for old/new values + :type diff: DictDiffer + :param old_values: Old values mapping + :type old_values: Mapping[int, Mapping[str, Any]] + :param res_id: Current record ID + :type res_id: int + :return: Iterable of field names + :rtype: Any + """ + if method == "create": + return diff.added() + if method == "read" or include_lines_on_unlink: + return old_values.get(res_id, EMPTY_DICT).keys() + if method == "write": + return diff.changed() + return () + + def _build_payloads_for_records( + self, + *, + uid, + res_model, + res_ids, + method, + model_id, + model_rs, + log_type, + now_iso, + base_log, + fields_to_exclude_set, + old_values, + new_values, + line_builder, + values_src, + include_lines_on_unlink, + ): + """Build (log, lines) payloads for each record. + + :return: (payloads, total_lines) + :rtype: Tuple[List[Tuple[_PayloadLog, List[_PayloadLine]]], int] + """ + log_ids = self._next_ids("auditlog_log_id_seq", len(res_ids)) + payloads: list[tuple[_PayloadLog, list[_PayloadLine]]] = [] + total_lines = 0 + + for idx, res_id in enumerate(res_ids): + log_id = int(log_ids[idx]) + record = model_rs.browse(res_id) + + log: _PayloadLog = { + "id": log_id, + "name": record.display_name, + "res_id": int(res_id), + "res_ids": None, + **base_log, + } + + diff = DictDiffer( + dict(new_values.get(res_id, EMPTY_DICT)), + dict(old_values.get(res_id, EMPTY_DICT)), + ) + + fields_list = self._fields_list_for_method( + method=method, + include_lines_on_unlink=include_lines_on_unlink, + diff=diff, + old_values=old_values, + res_id=res_id, + ) + + lines: list[_PayloadLine] = [] + if line_builder: + one_source = method in ("create", "read") or include_lines_on_unlink + log_ctx = {"res_id": res_id, "model_id": model_id, "log_type": log_type} + + for field_name in fields_list: + if field_name in fields_to_exclude_set: + continue + field = self._get_field(model_id, field_name) + if not field: + continue + + if one_source: + vals = line_builder(log_ctx, field, values_src[0]) + else: + vals = line_builder( + log_ctx, field, values_src[0], values_src[1] + ) + + lines.append( + { + "id": 0, + "log_id": log_id, + "field_id": int(field["id"]), + "field_name": field.get("name"), + "field_description": field.get("field_description"), + "old_value": vals.get("old_value"), + "new_value": vals.get("new_value"), + "old_value_text": vals.get("old_value_text"), + "new_value_text": vals.get("new_value_text"), + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + ) + + payloads.append((log, lines)) + total_lines += len(lines) + + return payloads, total_lines + + def _build_buffer_vals_from_payloads( + self, + *, + payloads, + total_lines, + method, + ): + """Assign line IDs and build buffer create vals. + + :param payloads: List of (log, lines) payloads + :type payloads: List[Tuple[_PayloadLog, List[_PayloadLine]]] + :param total_lines: Total number of line items across all payloads + :type total_lines: int + :param method: ORM method name + :type method: str + :return: Values list for auditlog.log.buffer.create() + :rtype: List[Dict[str, Any]] + """ + line_ids: list[int] = self._next_ids("auditlog_log_line_id_seq", total_lines) + pos = 0 + + buffer_vals_list: list[dict[str, Any]] = [] + for log, lines in payloads: + for line in lines: + line["id"] = int(line_ids[pos]) + pos += 1 + + if method == "unlink" or lines: + buffer_vals_list.append( + { + "payload_json": self._dump_payload_json( + {"log": log, "lines": lines} + ) + } + ) + + return buffer_vals_list + + def create_logs( + self, + uid, + res_model, + res_ids, + method, + old_values=None, + new_values=None, + additional_log_values=None, + ): + """Override auditlog log creation to buffer payloads for ClickHouse. + + Behavior: + - If no active ClickHouse config -> fallback to parent implementation. + - Otherwise: + - Build structured payload (log + lines) + - Sanitize to JSON + - Store in auditlog.log.buffer + - Export to ClickHouse is performed asynchronously via queue_job. + + Supported methods: + - create, read, write unlink export_data + + :param uid: User ID performing operation + :type uid: int + :param res_model: Model technical name + :type res_model: str + :param res_ids: Record IDs affected + :type res_ids: Sequence[int] + :param method: ORM method name + :type method: str + :param old_values: Values before change + :type old_values: Optional[Mapping[int, Mapping[str, Any]]] + :param new_values: Values after change + :type new_values: Optional[Mapping[int, Mapping[str, Any]]] + :param additional_log_values: Extra audit metadata + :type additional_log_values: Optional[Mapping[str, Any]] + """ + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + return super().create_logs( + uid, + res_model, + res_ids, + method, + old_values=old_values, + new_values=new_values, + additional_log_values=additional_log_values, + ) + + started = time.monotonic() + old_values = old_values or EMPTY_DICT + new_values = new_values or EMPTY_DICT + additional_log_values = dict(additional_log_values or {}) + log_type = additional_log_values.get("log_type") + + model_id = self._get_audit_model_id(res_model) + model_rs = self.env[res_model] + fields_to_exclude_set, capture_record = self._get_rule_settings(model_id) + + now_iso = datetime.now(timezone.utc).isoformat(timespec="milliseconds") + base_log = self._build_base_log( + uid=int(uid), + method=method, + model_id=int(model_id), + now_iso=now_iso, + log_type=log_type, + ) + buffer_model = self._get_buffer_model() + + # export_data is special (no lines) + if method == "export_data": + self._buffer_export_data_payload( + buffer_model=buffer_model, + base_log=base_log, + res_model=res_model, + res_ids=res_ids, + started=started, + ) + return + + include_lines_on_unlink = method == "unlink" and capture_record + line_builder, values_src = self._select_line_builder_and_sources( + method=method, + include_lines_on_unlink=include_lines_on_unlink, + old_values=old_values, + new_values=new_values, + ) + + payloads, total_lines = self._build_payloads_for_records( + uid=int(uid), + res_model=res_model, + res_ids=res_ids, + method=method, + model_id=int(model_id), + model_rs=model_rs, + log_type=log_type, + now_iso=now_iso, + base_log=base_log, + fields_to_exclude_set=fields_to_exclude_set, + old_values=old_values, + new_values=new_values, + line_builder=line_builder, + values_src=values_src, + include_lines_on_unlink=include_lines_on_unlink, + ) + + buffer_vals_list = self._build_buffer_vals_from_payloads( + payloads=payloads, + total_lines=total_lines, + method=method, + ) + + if buffer_vals_list: + buffer_model.create(buffer_vals_list) + + _logger.debug( + "auditlog_clickhouse_write: create_logs end (model=%s method=%s res_ids=%s " + "payloads=%s lines=%s elapsed=%.3fs)", + res_model, + method, + len(res_ids), + len(buffer_vals_list), + total_lines, + time.monotonic() - started, + ) diff --git a/auditlog_clickhouse_write/models/clickhouse_client.py b/auditlog_clickhouse_write/models/clickhouse_client.py new file mode 100644 index 00000000000..cf02786dc15 --- /dev/null +++ b/auditlog_clickhouse_write/models/clickhouse_client.py @@ -0,0 +1,62 @@ +from odoo import _ +from odoo.exceptions import UserError + +try: + from clickhouse_driver import Client as ClickHouseClient +except ImportError: + ClickHouseClient = None + + +def _require_driver() -> None: + """Ensure clickhouse-driver is available in the current environment. + + :raises UserError: If clickhouse-driver is not installed. + """ + if ClickHouseClient is None: + raise UserError( + _( + "Python package 'clickhouse-driver' is not available. " + "Install it in the Odoo environment to use ClickHouse storage." + ) + ) + + +def get_clickhouse_client( + *, + host, + port, + database, + user, + password=None, + settings=None, +) -> "ClickHouseClient": + """Create and return a clickhouse-driver Client instance. + + Uses native TCP protocol. + + :param host: ClickHouse hostname or IP + :type host: str + :param port: Native TCP port + :type port: int + :param database: Default database + :type database: str + :param user: ClickHouse username + :type user: str + :param password: ClickHouse password, optional + :type password: Optional[str] + :param settings: Optional clickhouse-driver settings mapping + :type settings: Optional[Mapping[str, Any]] + :return: Configured ClickHouse client + :rtype: clickhouse_driver.Client + :raises UserError: If clickhouse-driver is not installed + """ + _require_driver() + # `settings` is passed as-is to clickhouse-driver, keep it optional and immutable. + return ClickHouseClient( + host=host, + port=port, + database=database, + user=user, + password=password or "", + settings=dict(settings or {}), + ) diff --git a/auditlog_clickhouse_write/pyproject.toml b/auditlog_clickhouse_write/pyproject.toml new file mode 100644 index 00000000000..4231d0cccb3 --- /dev/null +++ b/auditlog_clickhouse_write/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/auditlog_clickhouse_write/readme/CONFIGURE.md b/auditlog_clickhouse_write/readme/CONFIGURE.md new file mode 100644 index 00000000000..42f9462aef7 --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONFIGURE.md @@ -0,0 +1,31 @@ +This module requires: + +- A reachable ClickHouse server. +- Python dependency `clickhouse-driver` available in the Odoo environment. +- A ClickHouse database created in advance (the module does **not** create databases/users/grants). +- A ClickHouse user with at least: + - `INSERT` and `CREATE TABLE` privileges on the target database. + +> ClickHouse installation (Docker guide): +> `https://clickhouse.com/docs/install/docker` + +Steps: + +- Make sure `clickhouse-driver` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + +| Field | +|:-----| +| Hostname or IP | +| TCP port | +| ClickHouse database name | +| ClickHouse user | +| ClickHouse Password | +| queue_job_batch_size (default = 1000) | +| channel_id (default root) | + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in the target database. diff --git a/auditlog_clickhouse_write/readme/CONTEXT.md b/auditlog_clickhouse_write/readme/CONTEXT.md new file mode 100644 index 00000000000..af8999013bb --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONTEXT.md @@ -0,0 +1,5 @@ +The auditlog module stores audit data in PostgreSQL. In production systems with extensive audit rules, these tables grow without limits, causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by base.group_system) have full CRUD access to audit tables, allowing audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same transaction and performs multiple ORM create() calls, adding latency to audited operations. diff --git a/auditlog_clickhouse_write/readme/CONTRIBUTORS.md b/auditlog_clickhouse_write/readme/CONTRIBUTORS.md new file mode 100644 index 00000000000..f2ec264300f --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONTRIBUTORS.md @@ -0,0 +1,4 @@ +- [Cetmix](https://cetmix.com/) + - Ivan Sokolov + - George Smirnov + - Dmitry Meita diff --git a/auditlog_clickhouse_write/readme/CREDITS.md b/auditlog_clickhouse_write/readme/CREDITS.md new file mode 100644 index 00000000000..ce94d167cfd --- /dev/null +++ b/auditlog_clickhouse_write/readme/CREDITS.md @@ -0,0 +1,3 @@ +The development of this module has been financially supported by: + +- Geschäftsstelle Sozialinfo diff --git a/auditlog_clickhouse_write/readme/DESCRIPTION.md b/auditlog_clickhouse_write/readme/DESCRIPTION.md new file mode 100644 index 00000000000..74bf7db4fd2 --- /dev/null +++ b/auditlog_clickhouse_write/readme/DESCRIPTION.md @@ -0,0 +1,4 @@ +This module implements buffered asynchronous transfers audit of logs from PostgreSQL to ClickHouse. +Storing audit data in a columnar database that is write-only prevents database bloat, makes audit records effectively +immutable, and allows for scaling to very large volumes of logs without slowing down normal transactions. +Audit logs are written asynchronously to reduce the load on business operations. diff --git a/auditlog_clickhouse_write/readme/USAGE.md b/auditlog_clickhouse_write/readme/USAGE.md new file mode 100644 index 00000000000..588ba32ef32 --- /dev/null +++ b/auditlog_clickhouse_write/readme/USAGE.md @@ -0,0 +1,7 @@ +Once auditlog_clickhouse_write is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) on models with active auditlog.rule subscriptions. + This behavior is unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or deleted via Odoo. diff --git a/auditlog_clickhouse_write/security/ir.model.access.csv b/auditlog_clickhouse_write/security/ir.model.access.csv new file mode 100644 index 00000000000..22a4f6ee8dd --- /dev/null +++ b/auditlog_clickhouse_write/security/ir.model.access.csv @@ -0,0 +1,3 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_auditlog_clickhouse_config_manager,auditlog.clickhouse.config manager,model_auditlog_clickhouse_config,auditlog.group_auditlog_manager,1,1,1,1 +access_auditlog_log_buffer_system_read,auditlog.log.buffer system read,model_auditlog_log_buffer,base.group_system,1,0,0,0 diff --git a/auditlog_clickhouse_write/static/description/index.html b/auditlog_clickhouse_write/static/description/index.html new file mode 100644 index 00000000000..a8696ef1ca0 --- /dev/null +++ b/auditlog_clickhouse_write/static/description/index.html @@ -0,0 +1,533 @@ + + + + + +Store Audit Log in Clickhouse + + + +
+

Store Audit Log in Clickhouse

+ + +

Beta License: AGPL-3 OCA/server-tools Translate me on Weblate Try me on Runboat

+

This module implements buffered asynchronous transfers audit of logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations.

+

Table of contents

+ +
+

Use Cases / Context

+

The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues:

+
    +
  • Database bloat;
  • +
  • Immutability gap: Members of group_auditlog_manager (implied by +base.group_system) have full CRUD access to audit tables, allowing +audit records to be altered or deleted via UI, ORM, or SQL;
  • +
  • Performance overhead: Audit logging runs synchronously in the same +transaction and performs multiple ORM create() calls, adding latency +to audited operations.
  • +
+
+
+

Configuration

+

This module requires:

+
    +
  • A reachable ClickHouse server.
  • +
  • Python dependency clickhouse-driver available in the Odoo +environment.
  • +
  • A ClickHouse database created in advance (the module does not +create databases/users/grants).
  • +
  • A ClickHouse user with at least:
      +
    • INSERT and CREATE TABLE privileges on the target database.
    • +
    +
  • +
+ +
+ClickHouse installation (Docker guide): +https://clickhouse.com/docs/install/docker
+

Steps:

+
    +
  • Make sure clickhouse-driver is available in your system.
  • +
  • Install the module.
  • +
  • Configure the connection parameters in Odoo:
      +
    • Settings > Technical > Auditlog > Clickhouse configuration
    • +
    • Fill in the following parameters:
    • +
    +
  • +
+ +++ + + + + + + + + + + + + + + + + + + + + +
Field
Hostname or IP
TCP port
ClickHouse database name
ClickHouse user
ClickHouse Password
queue_job_batch_size (default = 1000)
channel_id (default root)
+
    +
  • Click Test connection.
  • +
  • Optionally, click Create Auditlog Tables to create the tables in +the target database.
  • +
+
+
+

Usage

+

Once auditlog_clickhouse_write is installed and configured:

+
    +
  • Users perform tracked operations (create, write, unlink, read, export) +on models with active auditlog.rule subscriptions. This behavior is +unchanged from the base auditlog module.
  • +
  • Log data is serialized and stored in the local auditlog.log.buffer +table instantly. The standard auditlog tables are not populated.
  • +
  • Every 5 minutes (default), the Cron job runs, pushes data to +ClickHouse, and cleans the local buffer.
  • +
  • Data is permanently stored in ClickHouse and cannot be modified or +deleted via Odoo.
  • +
+
+
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • Cetmix
  • +
+
+
+

Contributors

+
    +
  • Cetmix
      +
    • Ivan Sokolov
    • +
    • George Smirnov
    • +
    • Dmitry Meita
    • +
    +
  • +
+
+
+

Other credits

+

The development of this module has been financially supported by:

+
    +
  • Geschäftsstelle Sozialinfo
  • +
+
+
+

Maintainers

+

This module is maintained by the OCA.

+ +Odoo Community Association + +

OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use.

+

This module is part of the OCA/server-tools project on GitHub.

+

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

+
+
+
+ + diff --git a/auditlog_clickhouse_write/tests/__init__.py b/auditlog_clickhouse_write/tests/__init__.py new file mode 100644 index 00000000000..4b4eb8888e9 --- /dev/null +++ b/auditlog_clickhouse_write/tests/__init__.py @@ -0,0 +1,2 @@ +from . import test_auditlog_clickhouse_write +from . import test_clickhouse_config diff --git a/auditlog_clickhouse_write/tests/common.py b/auditlog_clickhouse_write/tests/common.py new file mode 100644 index 00000000000..532bd3becf8 --- /dev/null +++ b/auditlog_clickhouse_write/tests/common.py @@ -0,0 +1,129 @@ +import contextlib +import re +from unittest.mock import patch + +from odoo.addons.auditlog.tests.common import AuditLogRuleCommon + + +class DummyClickHouseClient: + """Tiny fake clickhouse client collecting execute() calls.""" + + def __init__( + self, *, raise_on_insert: bool = False, raise_on_line_insert_once: bool = False + ): + self.raise_on_insert = raise_on_insert + self.raise_on_line_insert_once = raise_on_line_insert_once + self._line_failed_once = False + + self.calls = [] # list[(query, params)] + self.log_ids = set() + + def _parse_in_ids(self, query): + m = re.search(r"\bIN\s*\(([^)]*)\)", query, flags=re.IGNORECASE) + if not m: + return set() + raw = m.group(1).strip() + if not raw: + return set() + ids = set() + for part in raw.split(","): + p = part.strip().strip("'") + if not p: + continue + # tests use ints + try: + ids.add(int(p)) + except ValueError: + ids.add(p) + return ids + + def execute(self, query, params=None): + self.calls.append((query, params)) + q = (query or "").strip() + q_up = q.upper() + + if q_up.startswith("SELECT 1"): + return [(1,)] + + if q_up.startswith("SELECT ID FROM") and "AUDITLOG_LOG" in q_up: + wanted = self._parse_in_ids(q) + existing = sorted(self.log_ids.intersection(wanted)) + return [(x,) for x in existing] + + if self.raise_on_insert and "INSERT INTO" in q_up: + raise Exception("Simulated ClickHouse insert error") + + if "INSERT INTO" in q_up and "AUDITLOG_LOG_LINE" in q_up: + if self.raise_on_line_insert_once and not self._line_failed_once: + self._line_failed_once = True + raise Exception("Simulated ClickHouse line insert error") + return [] + + if ( + "INSERT INTO" in q_up + and "AUDITLOG_LOG" in q_up + and "AUDITLOG_LOG_LINE" not in q_up + ): + # collect inserted ids (1st tuple element) + if params: + for row in params: + self.log_ids.add(row[0]) + return [] + + return [] + + +class AuditLogClickhouseCommon(AuditLogRuleCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._cleanup_clickhouse_test_data() + + @classmethod + def tearDownClass(cls): + try: + cls._cleanup_clickhouse_test_data() + finally: + super().tearDownClass() + + @classmethod + def _cleanup_clickhouse_test_data(cls): + """Ensure clean state for configs and buffer across suites.""" + cls.env["auditlog.clickhouse.config"].sudo().search([]).write( + {"is_active": False} + ) + cls.env["auditlog.log.buffer"].sudo().search([]).unlink() + + @classmethod + def create_config(cls, **vals): + """Create ClickHouse config with minimal defaults for tests.""" + defaults = { + "host": "localhost", + "port": 9000, + "database": "db", + "user": "user", + "password": "pass", + "is_active": False, + } + defaults.update(vals) + return ( + cls.env["auditlog.clickhouse.config"] + .with_context(tracking_disable=True) + .create(defaults) + ) + + @contextlib.contextmanager + def _patched_clickhouse_client(self, *, raise_on_insert: bool = False): + """Patch ClickHouse client getter so tests don't require real ClickHouse.""" + dummy = DummyClickHouseClient(raise_on_insert=raise_on_insert) + target = ( + "odoo.addons.auditlog_clickhouse_write.models." + "auditlog_clickhouse_config.get_clickhouse_client" + ) + with patch(target, autospec=True, return_value=dummy): + yield dummy + + def _parse_payloads(self): + """Return list of decoded payload dicts from buffer (oldest first).""" + buf = self.env["auditlog.log.buffer"].sudo().search([], order="id asc") + return [rec.payload_json for rec in buf] diff --git a/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py new file mode 100644 index 00000000000..a1a140274a3 --- /dev/null +++ b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py @@ -0,0 +1,348 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from odoo.addons.queue_job.exception import RetryableJobError + +from .common import AuditLogClickhouseCommon, DummyClickHouseClient + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseBuffer(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.groups_model_id = cls.env.ref("base.model_res_groups").id + + # Rule for groups: full logging + cls.groups_rule = cls.create_rule( + { + "name": "testrule groups clickhouse", + "model_id": cls.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_export_data": True, + "log_type": "full", + "capture_record": False, + } + ) + + # Active config to enable buffering + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.groups_rule.subscribe() + + def test_01_create_writes_to_buffer_not_auditlog_tables(self): + buf = self.env["auditlog.log.buffer"].sudo() + log_model = self.env["auditlog.log"] + + start_buf = buf.search_count([]) + start_logs = log_model.search_count([("model_id", "=", self.groups_model_id)]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_1"}) + ) + + self.assertEqual( + log_model.search_count([("model_id", "=", self.groups_model_id)]) + - start_logs, + 0, + "auditlog.log must NOT be written by auditlog_clickhouse_write", + ) + self.assertEqual(buf.search_count([]) - start_buf, 1) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "create") + self.assertEqual(payload["log"]["model_id"], self.groups_model_id) + self.assertEqual(payload["log"]["res_id"], group.id) + + def test_02_write_creates_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "CH Group"}) + ) + group.with_context(tracking_disable=True).write({"name": "CH Group v2"}) + + self.assertGreater(buf.search_count([]), start_buf) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "write") + self.assertEqual(payload["log"]["model_model"], "res.groups") + + field_names = {line.get("field_name") for line in payload["lines"]} + self.assertIn("name", field_names) + + def test_03_export_data_creates_single_payload_no_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + self.env["res.groups"].search([]).export_data(["name"]) + + self.assertEqual(buf.search_count([]) - start_buf, 1) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "export_data") + self.assertEqual(payload["lines"], []) + + def test_04_unlink_is_logged(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + g = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_unlink"}) + ) + g.unlink() + + self.assertGreater(buf.search_count([]), start_buf) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "unlink") + self.assertIsInstance(payload["lines"], list) + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseQueueJobs(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.partner_model_id = cls.env.ref("base.model_res_partner").id + cls.rule = cls.create_rule( + { + "name": "testrule partner clickhouse queue", + "model_id": cls.partner_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + } + ) + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.rule.subscribe() + + def test_01_cron_enqueues_job_and_does_not_flush_inline(self): + """ + Cron must only enqueue queue.job (no direct ClickHouse INSERTs here). + """ + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Cron Enqueue Test"}) + ) + partner.with_context(tracking_disable=True).write( + {"name": "Cron Enqueue Test v2"} + ) + + self.assertGreater(buf.search_count([]), 0) + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() # uses config.queue_batch_size + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, + 1, + "Cron must enqueue exactly one job", + ) + + job = job_model.search([], order="id desc", limit=1) + self.assertEqual(job.model_name, "auditlog.log.buffer") + self.assertEqual(job.method_name, "_job_flush_to_clickhouse") + self.assertEqual(job.args[0], self.config.id) + self.assertEqual(job.args[1], self.config.queue_batch_size) + + expected_channel = ( + self.config.queue_channel_id.complete_name + if self.config.queue_channel_id + else "root" + ) + self.assertEqual(job.channel, expected_channel) + + def test_02_cron_skips_when_no_pending_buffers(self): + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + # Ensure no pending buffers + buf.search([]).unlink() + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No pending buffers -> no job" + ) + + def test_03_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + start_jobs = job_model.search_count([]) + rec = buf.create( + {"payload_json": {"log": {}, "lines": []}, "state": buf.STATE_PENDING} + ) + + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No active config -> no job" + ) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + def test_04_job_flush_success_deletes_buffers_and_calls_insert(self): + buf = self.env["auditlog.log.buffer"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Job Flush OK"}) + ) + partner.with_context(tracking_disable=True).write({"name": "Job Flush OK v2"}) + + self.assertGreater(buf.search_count([]), 0) + + with self._patched_clickhouse_client() as dummy: + buf._job_flush_to_clickhouse(self.config.id, self.config.queue_batch_size) + + self.assertEqual( + buf.search_count([]), + 0, + "Buffers must be removed after successful job flush", + ) + + insert_calls = [ + q for (q, _params) in dummy.calls if "INSERT INTO" in (q or "").upper() + ] + self.assertTrue(insert_calls, "Job must insert into ClickHouse") + + def test_05_job_invalid_payload_marks_error_and_keeps_row(self): + buf = self.env["auditlog.log.buffer"].sudo() + + # Invalid structure for payload_json (Json field accepts string, + # but our code expects mapping with log/lines) + rec = buf.create( + {"payload_json": "NOT A JSON OBJECT", "state": buf.STATE_PENDING} + ) + + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + buf._job_flush_to_clickhouse(self.config.id, batch_size=10) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_ERROR) + self.assertTrue(rec.error_message) + self.assertGreaterEqual(rec.attempt_count, 1) + + def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): + buf = self.env["auditlog.log.buffer"].sudo() + buf.search([]).unlink() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Partial insert"}) + ) + partner.with_context(tracking_disable=True).write({"name": "Partial insert v2"}) + + pending = buf.search([("state", "=", buf.STATE_PENDING)], order="id asc") + valid_buffers, invalid_buffers, log_rows, line_rows = ( + buf._collect_rows_from_buffers(pending) + ) + + self.assertTrue(log_rows) + self.assertTrue(line_rows) + + dummy = DummyClickHouseClient(raise_on_line_insert_once=True) + + # 1st try: expected failure -> mute ERROR traceback + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + with self.assertRaises(RetryableJobError): + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + # 2nd try: should pass + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + log_inserts = [ + q + for (q, _p) in dummy.calls + if "INSERT INTO" in (q or "").upper() + and "AUDITLOG_LOG" in (q or "").upper() + and "AUDITLOG_LOG_LINE" not in (q or "").upper() + ] + self.assertEqual( + len(log_inserts), 1, "Log rows must not be inserted twice on retry" + ) + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseRuleCache(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.partner_model_id = cls.env.ref("base.model_res_partner").id + cls.rule = cls.create_rule( + { + "name": "testrule cache invalidation", + "model_id": cls.partner_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + "capture_record": False, + } + ) + + def test_01_cache_updates_when_rule_changes_but_id_same(self): + # isolate cache for this test + if hasattr(self.env.registry, "_auditlog_clickhouse_write_rule_cache"): + self.env.registry._auditlog_clickhouse_write_rule_cache = {} + + excluded_1, capture_1 = self.rule._get_rule_settings(self.partner_model_id) + self.assertFalse(capture_1) + + # change rule with same id + self.rule.write({"capture_record": True}) + self.rule.invalidate_recordset() + + excluded_2, capture_2 = self.rule._get_rule_settings(self.partner_model_id) + self.assertTrue(capture_2) diff --git a/auditlog_clickhouse_write/tests/test_clickhouse_config.py b/auditlog_clickhouse_write/tests/test_clickhouse_config.py new file mode 100644 index 00000000000..d6083b8c1b9 --- /dev/null +++ b/auditlog_clickhouse_write/tests/test_clickhouse_config.py @@ -0,0 +1,67 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from .common import AuditLogClickhouseCommon + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseConfig(AuditLogClickhouseCommon): + def test_01_single_active_on_create(self): + cfg1 = self.create_config(is_active=True, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg2.is_active) + self.assertFalse(cfg1.is_active) + + def test_02_single_active_on_write(self): + cfg1 = self.create_config(is_active=False, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.write({"is_active": True}) + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg1.is_active) + self.assertFalse(cfg2.is_active) + + def test_03_test_connection_uses_client(self): + cfg = self.create_config(is_active=True) + + # Without patch, get_clickhouse_client may + # raise if clickhouse-driver isn't installed + with self._patched_clickhouse_client() as dummy: + action = cfg.action_test_connection() + + self.assertTrue(action) + self.assertTrue(any("SELECT 1" in (q or "") for (q, params) in dummy.calls)) + + def test_04_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + rec = buf.create({"payload_json": "NOT A JSON", "state": buf.STATE_PENDING}) + + with self._patched_clickhouse_client() as dummy: + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + res = buf._cron_flush_to_clickhouse(batch_size=10) + + self.assertTrue(res) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + self.assertFalse(dummy.calls) diff --git a/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml b/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml new file mode 100644 index 00000000000..9c311160a22 --- /dev/null +++ b/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml @@ -0,0 +1,115 @@ + + + Configure flush action + ir.cron + + ir.actions.act_window + form + new + + + + auditlog.clickhouse.config.form + auditlog.clickhouse.config + +
+
+
+ + + + + + + + + + + + + + + + + + + +
+
+
+ + +
+ Logs are buffered in PostgreSQL and periodically flushed to + ClickHouse by cron. +
+
+
+
+
+
+ + + auditlog.clickhouse.config.list + auditlog.clickhouse.config + + + + + + + + + + + + + + ClickHouse Configuration + auditlog.clickhouse.config + list,form + + + +
diff --git a/requirements.txt b/requirements.txt index 5d1fefa6f23..d3e18b80a6d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # generated from manifests external_dependencies +clickhouse-driver cryptography dataclasses odoo_test_helper From feebd8f18ecce262612165dddb224c5a2119b9ac Mon Sep 17 00:00:00 2001 From: tendil Date: Wed, 11 Mar 2026 14:22:48 +0000 Subject: [PATCH 2/3] [FIX] auditlog_clickhouse_write: chunk existing row checks Split ClickHouse existing-row checks into chunks to avoid oversized SELECT ... IN (...) queries during large imports. Previously, retry-safe deduplication could build a very large IN clause when checking already inserted log or log line ids. This caused ClickHouse to fail with "Max query size exceeded" and the queue job stopped processing buffered auditlog rows. The fix keeps deduplication logic but executes existence checks in smaller chunks, which preserves idempotent retries and prevents query size errors on bulk imports. Task: 5246 --- .../models/auditlog_log_buffer.py | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/auditlog_clickhouse_write/models/auditlog_log_buffer.py b/auditlog_clickhouse_write/models/auditlog_log_buffer.py index 92878098745..f7429a352ea 100644 --- a/auditlog_clickhouse_write/models/auditlog_log_buffer.py +++ b/auditlog_clickhouse_write/models/auditlog_log_buffer.py @@ -39,6 +39,7 @@ class AuditlogLogBuffer(models.Model): STATE_PENDING = "pending" STATE_ERROR = "error" + EXISTING_ROWS_CHUNK_SIZE = 2000 # Column order MUST match CREATE TABLE schema and inserted tuples. _CH_LOG_COLUMNS: tuple[str, ...] = ( @@ -184,6 +185,18 @@ def _q(v): return ", ".join(_q(v) for v in cleaned) + @staticmethod + def _chunk_values(values, chunk_size): + """Yield chunks from a sequence. + + :param list values: Source values to split into chunks. + :param int chunk_size: Maximum size of one chunk. + :yield: Chunk of source values. + :rtype: list + """ + for index in range(0, len(values), chunk_size): + yield values[index : index + chunk_size] + def _filter_existing_rows(self, client, config, table_name, rows): """Filter out rows already present in ClickHouse by id. @@ -191,6 +204,9 @@ def _filter_existing_rows(self, client, config, table_name, rows): insert succeeds (e.g. auditlog_log) and another part fails (e.g. lines), the next retry must not re-insert already stored rows. + The existence check is executed in chunks to avoid oversized ClickHouse + queries caused by a very large ``IN (...)`` clause. + :param client: ClickHouse client. :type client: clickhouse_driver.Client :param config: Active configuration (provides database name). @@ -205,16 +221,21 @@ def _filter_existing_rows(self, client, config, table_name, rows): if not rows: return rows - ids = [row[0] for row in rows] - in_list = self._ch_format_in_list(ids) - if not in_list: + ids = list(dict.fromkeys(row[0] for row in rows if row[0] is not None)) + if not ids: return rows - query = ( - f"SELECT id FROM {config.database}.{table_name} " f"WHERE id IN ({in_list})" - ) - existing = client.execute(query) or [] - existing_ids = {row[0] for row in existing} + existing_ids = set() + for ids_chunk in self._chunk_values(ids, self.EXISTING_ROWS_CHUNK_SIZE): + in_list = self._ch_format_in_list(ids_chunk) + if not in_list: + continue + query = ( + f"SELECT id FROM {config.database}.{table_name} " + f"WHERE id IN ({in_list})" + ) + existing = client.execute(query) or [] + existing_ids.update(row[0] for row in existing) if not existing_ids: return rows return [row for row in rows if row[0] not in existing_ids] @@ -433,10 +454,11 @@ def _insert_rows_to_clickhouse( """ log_rows_to_insert = log_rows line_rows_to_insert = line_rows - if log_rows and line_rows: + if log_rows: log_rows_to_insert = self._filter_existing_rows( client, config, "auditlog_log", log_rows ) + if line_rows: line_rows_to_insert = self._filter_existing_rows( client, config, "auditlog_log_line", line_rows ) From b7137b6f9ebba61cdb1032fad42222838c4392fb Mon Sep 17 00:00:00 2001 From: tendil Date: Thu, 12 Mar 2026 12:33:47 +0000 Subject: [PATCH 3/3] [FIX] auditlog_clickhouse_write: store HTTP audit relations Store auditlog HTTP session and request data in ClickHouse together with audit logs and log lines. Previously only auditlog_log and auditlog_log_line were exported. This made ClickHouse-backed logs incomplete for read scenarios that depend on http_session_id and http_request_id, especially after recreating the Odoo database. Also extend buffer processing and tests to handle HTTP-related rows and keep retry-safe deduplication for the additional tables. Task: 5246 --- auditlog_clickhouse_write/models/__init__.py | 2 + .../models/auditlog_clickhouse_config.py | 115 ++++++++---- .../models/auditlog_http_request.py | 140 ++++++++++++++ .../models/auditlog_http_session.py | 125 +++++++++++++ .../models/auditlog_log_buffer.py | 177 +++++++++++++++++- .../models/auditlog_rule.py | 145 ++++++++++++-- auditlog_clickhouse_write/tests/common.py | 82 ++++++-- .../tests/test_auditlog_clickhouse_write.py | 174 ++++++++++++++++- .../tests/test_clickhouse_config.py | 18 ++ 9 files changed, 898 insertions(+), 80 deletions(-) create mode 100644 auditlog_clickhouse_write/models/auditlog_http_request.py create mode 100644 auditlog_clickhouse_write/models/auditlog_http_session.py diff --git a/auditlog_clickhouse_write/models/__init__.py b/auditlog_clickhouse_write/models/__init__.py index 9791cf29bce..a6c3d5c3e64 100644 --- a/auditlog_clickhouse_write/models/__init__.py +++ b/auditlog_clickhouse_write/models/__init__.py @@ -2,3 +2,5 @@ from . import clickhouse_client from . import auditlog_log_buffer from . import auditlog_rule +from . import auditlog_http_session +from . import auditlog_http_request diff --git a/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py index 1057f96ef73..5b50802a281 100644 --- a/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py +++ b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py @@ -410,48 +410,81 @@ def _get_clickhouse_ddl(self): return [ f""" - CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log - ( - id Int64, - name Nullable(String), - model_id Int32, - model_name Nullable(String), - model_model String, - res_id Nullable(Int64), - res_ids Nullable(String), - user_id Int32, - method String, - http_request_id Nullable(Int64), - http_session_id Nullable(Int64), - log_type Nullable(String), - create_date DateTime64(3, 'UTC'), - create_uid Int32, - write_date Nullable(DateTime64(3, 'UTC')), - write_uid Nullable(Int32) - ) - ENGINE = MergeTree - ORDER BY (create_date, id) - """, + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_http_session + ( + id Int64, + user_id Nullable(Int32), + create_uid Nullable(Int32), + write_uid Nullable(Int32), + display_name Nullable(String), + name Nullable(String), + create_date DateTime64(3, 'UTC'), + write_date Nullable(DateTime64(3, 'UTC')) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, f""" - CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log_line - ( - id Int64, - log_id Int64, - field_id Int32, - field_name Nullable(String), - field_description Nullable(String), - old_value Nullable(String), - new_value Nullable(String), - old_value_text Nullable(String), - new_value_text Nullable(String), - create_date DateTime64(3, 'UTC'), - create_uid Int32, - write_date Nullable(DateTime64(3, 'UTC')), - write_uid Nullable(Int32) - ) - ENGINE = MergeTree - ORDER BY (create_date, id) - """, + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_http_request + ( + id Int64, + user_id Nullable(Int32), + http_session_id Nullable(Int64), + create_uid Nullable(Int32), + write_uid Nullable(Int32), + display_name Nullable(String), + name Nullable(String), + root_url Nullable(String), + user_context Nullable(String), + create_date DateTime64(3, 'UTC'), + write_date Nullable(DateTime64(3, 'UTC')) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log + ( + id Int64, + name Nullable(String), + model_id Int32, + model_name Nullable(String), + model_model String, + res_id Nullable(Int64), + res_ids Nullable(String), + user_id Int32, + method String, + http_request_id Nullable(Int64), + http_session_id Nullable(Int64), + log_type Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log_line + ( + id Int64, + log_id Int64, + field_id Int32, + field_name Nullable(String), + field_description Nullable(String), + old_value Nullable(String), + new_value Nullable(String), + old_value_text Nullable(String), + new_value_text Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, ] @staticmethod diff --git a/auditlog_clickhouse_write/models/auditlog_http_request.py b/auditlog_clickhouse_write/models/auditlog_http_request.py new file mode 100644 index 00000000000..fa98584b3c8 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_http_request.py @@ -0,0 +1,140 @@ +# Copyright (C) 2026 Cetmix OÜ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from datetime import datetime, timezone + +from odoo import api, fields, models +from odoo.http import request + + +class AuditlogHTTPRequest(models.Model): + _inherit = "auditlog.http.request" + + @api.model + def _is_clickhouse_write_enabled(self): + """Return whether ClickHouse write mode is active.""" + return bool(self.env["auditlog.clickhouse.config"].sudo().get_active_config()) + + @api.model + def _ensure_http_request_sequence(self): + """Ensure PostgreSQL sequence exists for HTTP request identifiers.""" + self.env.cr.execute( + "CREATE SEQUENCE IF NOT EXISTS auditlog_http_request_id_seq" + ) + + @api.model + def _next_http_request_id(self): + """Return next PostgreSQL sequence value for HTTP request payload.""" + self._ensure_http_request_sequence() + self.env.cr.execute("SELECT nextval('auditlog_http_request_id_seq')") + return int(self.env.cr.fetchone()[0]) + + @api.model + def _build_http_request_payload(self, request_id): + """Build in-memory payload for current HTTP request.""" + now_dt = datetime.now(timezone.utc) + now_iso = now_dt.isoformat(timespec="milliseconds") + uid = request.uid or self.env.uid or 1 + path = request.httprequest.path + display_name = f"{path or '?'} ({fields.Datetime.to_string(now_dt)})" + session_id = self.env["auditlog.http.session"].current_http_session() or None + return { + "id": int(request_id), + "user_id": int(uid) if uid else None, + "http_session_id": int(session_id) if session_id else None, + "create_uid": int(uid) if uid else None, + "write_uid": None, + "display_name": display_name, + "name": path, + "root_url": request.httprequest.url_root, + "user_context": request.context, + "create_date": now_iso, + "write_date": None, + } + + @api.model + def _get_cached_http_request_payload(self, request_id=None): + """Return cached current-request HTTP request payload if available.""" + if request and getattr(request, "httprequest", None): + payload = getattr( + request.httprequest, "auditlog_http_request_payload", None + ) + if payload and (request_id is None or payload["id"] == request_id): + return payload + return None + + @api.model + def current_http_request(self): + """Return current HTTP request ID. + + In ClickHouse write mode, avoid ORM create() on auditlog_http_request + and store the row as an in-memory payload cached on request.httprequest. + """ + if not self._is_clickhouse_write_enabled(): + return super().current_http_request() + + if not request: + return False + + httprequest = request.httprequest + if not httprequest: + return False + + payload = getattr(httprequest, "auditlog_http_request_payload", None) + if payload: + return payload["id"] + + request_id = getattr(httprequest, "auditlog_http_request_id", None) + if not request_id: + request_id = self._next_http_request_id() + httprequest.auditlog_http_request_id = request_id + + payload = self._build_http_request_payload(request_id) + httprequest.auditlog_http_request_payload = payload + return request_id + + @api.model + def get_clickhouse_payload(self, request_id=None): + """Return HTTP request payload from cache or local database. + + :param request_id: Target HTTP request ID + :type request_id: Optional[int] + :return: Serialized HTTP request payload or None + :rtype: Optional[dict] + """ + payload = self._get_cached_http_request_payload(request_id) + if payload: + return payload + + if not request_id: + return None + + http_request = self.sudo().browse(request_id) + if not http_request.exists(): + return None + + return { + "id": int(http_request.id), + "user_id": int(http_request.user_id.id) if http_request.user_id else None, + "http_session_id": ( + int(http_request.http_session_id.id) + if http_request.http_session_id + else None + ), + "create_uid": int(http_request.create_uid.id) + if http_request.create_uid + else None, + "write_uid": int(http_request.write_uid.id) + if http_request.write_uid + else None, + "display_name": http_request.display_name, + "name": http_request.name, + "root_url": http_request.root_url, + "user_context": http_request.user_context, + "create_date": http_request.create_date.isoformat() + if http_request.create_date + else None, + "write_date": http_request.write_date.isoformat() + if http_request.write_date + else None, + } diff --git a/auditlog_clickhouse_write/models/auditlog_http_session.py b/auditlog_clickhouse_write/models/auditlog_http_session.py new file mode 100644 index 00000000000..d9c6c019b55 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_http_session.py @@ -0,0 +1,125 @@ +# Copyright (C) 2026 Cetmix OÜ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from datetime import datetime, timezone + +from odoo import api, fields, models +from odoo.http import request + + +class AuditlogHTTPSession(models.Model): + _inherit = "auditlog.http.session" + + @api.model + def _is_clickhouse_write_enabled(self): + """Return whether ClickHouse write mode is active.""" + return bool(self.env["auditlog.clickhouse.config"].sudo().get_active_config()) + + @api.model + def _ensure_http_session_sequence(self): + """Ensure PostgreSQL sequence exists for HTTP session identifiers.""" + self.env.cr.execute( + "CREATE SEQUENCE IF NOT EXISTS auditlog_http_session_id_seq" + ) + + @api.model + def _next_http_session_id(self): + """Return next PostgreSQL sequence value for HTTP session payload.""" + self._ensure_http_session_sequence() + self.env.cr.execute("SELECT nextval('auditlog_http_session_id_seq')") + return int(self.env.cr.fetchone()[0]) + + @api.model + def _build_http_session_payload(self, session_id): + """Build in-memory payload for current HTTP session.""" + now_dt = datetime.now(timezone.utc) + now_iso = now_dt.isoformat(timespec="milliseconds") + uid = request.uid or self.env.uid or 1 + user = self.env["res.users"].sudo().browse(uid) + user_name = user.name if user.exists() else "?" + display_name = f"{user_name} ({fields.Datetime.to_string(now_dt)})" + + return { + "id": int(session_id), + "user_id": int(uid) if uid else None, + "create_uid": int(uid) if uid else None, + "write_uid": None, + "display_name": display_name, + "name": request.session.sid, + "create_date": now_iso, + "write_date": None, + } + + @api.model + def _get_cached_http_session_payload(self, session_id=None): + """Return cached current-request HTTP session payload if available.""" + if request and getattr(request, "session", None): + payload = getattr(request.session, "auditlog_http_session_payload", None) + if payload and (session_id is None or payload["id"] == session_id): + return payload + return None + + @api.model + def current_http_session(self): + """Return current HTTP session ID. + + In ClickHouse write mode, avoid ORM create() on auditlog_http_session + and store the row as an in-memory payload cached on request.session. + """ + if not self._is_clickhouse_write_enabled(): + return super().current_http_session() + + if not request: + return False + + httpsession = request.session + if not httpsession: + return False + + payload = getattr(httpsession, "auditlog_http_session_payload", None) + if payload: + return payload["id"] + + session_id = getattr(httpsession, "auditlog_http_session_id", None) + if not session_id: + session_id = self._next_http_session_id() + httpsession.auditlog_http_session_id = session_id + + payload = self._build_http_session_payload(session_id) + httpsession.auditlog_http_session_payload = payload + return session_id + + @api.model + def get_clickhouse_payload(self, session_id=None): + """Return HTTP session payload from cache or local database. + + :param session_id: Target HTTP session ID + :type session_id: Optional[int] + :return: Serialized HTTP session payload or None + :rtype: Optional[dict] + """ + payload = self._get_cached_http_session_payload(session_id) + if payload: + return payload + + if not session_id: + return None + + session = self.sudo().browse(session_id) + if not session.exists(): + return None + + return { + "id": int(session.id), + "user_id": int(session.user_id.id) if session.user_id else None, + "create_uid": int(session.create_uid.id) if session.create_uid else None, + "write_uid": int(session.write_uid.id) if session.write_uid else None, + "display_name": session.display_name, + "name": session.name, + "create_date": session.create_date.isoformat() + if session.create_date + else None, + "write_date": session.write_date.isoformat() + if session.write_date + else None, + } diff --git a/auditlog_clickhouse_write/models/auditlog_log_buffer.py b/auditlog_clickhouse_write/models/auditlog_log_buffer.py index f7429a352ea..c628aeec9d3 100644 --- a/auditlog_clickhouse_write/models/auditlog_log_buffer.py +++ b/auditlog_clickhouse_write/models/auditlog_log_buffer.py @@ -75,6 +75,29 @@ class AuditlogLogBuffer(models.Model): "write_date", "write_uid", ) + _CH_HTTP_SESSION_COLUMNS: tuple[str, ...] = ( + "id", + "user_id", + "create_uid", + "write_uid", + "display_name", + "name", + "create_date", + "write_date", + ) + _CH_HTTP_REQUEST_COLUMNS: tuple[str, ...] = ( + "id", + "user_id", + "http_session_id", + "create_uid", + "write_uid", + "display_name", + "name", + "root_url", + "user_context", + "create_date", + "write_date", + ) _INVALID_PAYLOAD_MESSAGE = ( "Invalid payload structure (expected object with 'log' and 'lines')." @@ -367,9 +390,15 @@ def _payload_is_valid(cls, payload): log_data = payload.get("log") lines_data = payload.get("lines") + http_session_data = payload.get("http_session") + http_request_data = payload.get("http_request") if not isinstance(log_data, dict) or not isinstance(lines_data, list): return False + if http_session_data is not None and not isinstance(http_session_data, dict): + return False + if http_request_data is not None and not isinstance(http_request_data, dict): + return False # Minimal required log fields (to avoid CH insert failures forever) required = ( @@ -393,9 +422,25 @@ def _collect_rows_from_buffers(self, buffers): :param buffers: Buffer recordset :type buffers: AuditlogLogBuffer - :return: (valid_buffers, invalid_buffers, log_rows, line_rows) - :rtype: Tuple[AuditlogLogBuffer, AuditlogLogBuffer, List[ChRow], List[ChRow]] + :return: ( + valid_buffers, + invalid_buffers, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + ) + :rtype: Tuple[ + AuditlogLogBuffer, + AuditlogLogBuffer, + List[ChRow], + List[ChRow], + List[ChRow], + List[ChRow], + ] """ + http_session_rows: list[ChRow] = [] + http_request_rows: list[ChRow] = [] log_rows: list[ChRow] = [] line_rows: list[ChRow] = [] invalid_buffers = self.browse() @@ -407,15 +452,33 @@ def _collect_rows_from_buffers(self, buffers): invalid_buffers |= rec continue + http_session_data = payload.get("http_session") + http_request_data = payload.get("http_request") log_data = payload["log"] lines_data = payload["lines"] + if http_session_data: + http_session_rows.append( + self._build_ch_http_session_row(http_session_data) + ) + if http_request_data: + http_request_rows.append( + self._build_ch_http_request_row(http_request_data) + ) + log_rows.append(self._build_ch_log_row(log_data)) for line_data in lines_data: line_rows.append(self._build_ch_line_row(line_data)) valid_buffers = buffers - invalid_buffers - return valid_buffers, invalid_buffers, log_rows, line_rows + return ( + valid_buffers, + invalid_buffers, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + ) def _mark_invalid_buffers(self, invalid_buffers, config): """Mark invalid payload buffers as error. @@ -435,7 +498,14 @@ def _mark_invalid_buffers(self, invalid_buffers, config): ) def _insert_rows_to_clickhouse( - self, client, config, log_rows, line_rows, valid_buffers + self, + client, + config, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + valid_buffers, ): """Insert prepared rows into ClickHouse. @@ -452,8 +522,18 @@ def _insert_rows_to_clickhouse( :param valid_buffers: Successfully processed buffers :type valid_buffers: AuditlogLogBuffer """ + http_session_rows_to_insert = http_session_rows + http_request_rows_to_insert = http_request_rows log_rows_to_insert = log_rows line_rows_to_insert = line_rows + if http_session_rows: + http_session_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_http_session", http_session_rows + ) + if http_request_rows: + http_request_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_http_request", http_request_rows + ) if log_rows: log_rows_to_insert = self._filter_existing_rows( client, config, "auditlog_log", log_rows @@ -463,7 +543,12 @@ def _insert_rows_to_clickhouse( client, config, "auditlog_log_line", line_rows ) - if not log_rows_to_insert and not line_rows_to_insert: + if ( + not http_session_rows_to_insert + and not http_request_rows_to_insert + and not log_rows_to_insert + and not line_rows_to_insert + ): _logger.warning( "auditlog_clickhouse_write: nothing to insert " "(config_id=%s buffers=%s)", @@ -473,6 +558,18 @@ def _insert_rows_to_clickhouse( return try: + if http_session_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_http_session (" + f"{', '.join(self._CH_HTTP_SESSION_COLUMNS)}) VALUES", + http_session_rows_to_insert, + ) + if http_request_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_http_request (" + f"{', '.join(self._CH_HTTP_REQUEST_COLUMNS)}) VALUES", + http_request_rows_to_insert, + ) if log_rows_to_insert: client.execute( f"INSERT INTO {config.database}.auditlog_log (" @@ -600,9 +697,14 @@ def _job_flush_to_clickhouse(self, config_id, batch_size): ) return - valid_buffers, invalid_buffers, log_rows, line_rows = ( - self._collect_rows_from_buffers(pending_buffers) - ) + ( + valid_buffers, + invalid_buffers, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + ) = self._collect_rows_from_buffers(pending_buffers) # Nothing valid: just mark invalids and exit successfully. if not valid_buffers: @@ -613,6 +715,8 @@ def _job_flush_to_clickhouse(self, config_id, batch_size): self._insert_rows_to_clickhouse( client=client, config=config, + http_session_rows=http_session_rows, + http_request_rows=http_request_rows, log_rows=log_rows, line_rows=line_rows, valid_buffers=valid_buffers, @@ -629,6 +733,63 @@ def _job_flush_to_clickhouse(self, config_id, batch_size): # Continue draining queue self._enqueue_next_flush_job_if_needed(config, int(batch_size)) + @classmethod + def _build_ch_http_session_row(cls, session_data): + """Convert payload['http_session'] dict to ClickHouse tuple. + + :param session_data: HTTP session dictionary + :type session_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(session_data.get("id") or 0), + int(session_data.get("user_id") or 0) + if session_data.get("user_id") is not None + else None, + int(session_data.get("create_uid") or 0) + if session_data.get("create_uid") is not None + else None, + int(session_data.get("write_uid") or 0) + if session_data.get("write_uid") is not None + else None, + cls._to_ch_nullable_string(session_data.get("display_name")), + cls._to_ch_nullable_string(session_data.get("name")), + cls._to_ch_datetime_utc(session_data.get("create_date")), + cls._to_ch_datetime_utc(session_data.get("write_date")), + ) + + @classmethod + def _build_ch_http_request_row(cls, request_data): + """Convert payload['http_request'] dict to ClickHouse tuple. + + :param request_data: HTTP request dictionary + :type request_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(request_data.get("id") or 0), + int(request_data.get("user_id") or 0) + if request_data.get("user_id") is not None + else None, + int(request_data.get("http_session_id") or 0) + if request_data.get("http_session_id") is not None + else None, + int(request_data.get("create_uid") or 0) + if request_data.get("create_uid") is not None + else None, + int(request_data.get("write_uid") or 0) + if request_data.get("write_uid") is not None + else None, + cls._to_ch_nullable_string(request_data.get("display_name")), + cls._to_ch_nullable_string(request_data.get("name")), + cls._to_ch_nullable_string(request_data.get("root_url")), + cls._to_ch_nullable_string(request_data.get("user_context")), + cls._to_ch_datetime_utc(request_data.get("create_date")), + cls._to_ch_datetime_utc(request_data.get("write_date")), + ) + @classmethod def _build_ch_log_row(cls, log_data): """Convert payload['log'] dict to ClickHouse tuple. diff --git a/auditlog_clickhouse_write/models/auditlog_rule.py b/auditlog_clickhouse_write/models/auditlog_rule.py index 803045143a4..8439bc0cb20 100644 --- a/auditlog_clickhouse_write/models/auditlog_rule.py +++ b/auditlog_clickhouse_write/models/auditlog_rule.py @@ -47,11 +47,40 @@ class _PayloadLine(TypedDict, total=False): create_uid: int -class _Payload(TypedDict): +class _PayloadHttpSession(TypedDict, total=False): + """Structured payload for auditlog_http_session row before buffering.""" + + id: int + name: str | None + user_id: int | None + create_date: str | None + create_uid: int | None + write_date: str | None + write_uid: int | None + + +class _PayloadHttpRequest(TypedDict, total=False): + """Structured payload for auditlog_http_request row before buffering.""" + + id: int + name: str | None + root_url: str | None + user_id: int | None + http_session_id: int | None + user_context: Any | None + create_date: str | None + create_uid: int | None + write_date: str | None + write_uid: int | None + + +class _Payload(TypedDict, total=False): """Full payload stored in auditlog.log.buffer.""" log: _PayloadLog lines: list[_PayloadLine] + http_session: _PayloadHttpSession | None + http_request: _PayloadHttpRequest | None def _json_sanitize(obj): @@ -161,6 +190,34 @@ def _get_rule_settings(self, model_id): cache[key] = (excluded, capture_record) return cache[key] + def _serialize_http_session(self, session_id): + """Serialize auditlog.http.session for ClickHouse buffering. + + :param session_id: HTTP session record ID + :type session_id: Optional[int] + :return: Serialized HTTP session payload or None + :rtype: Optional[_PayloadHttpSession] + """ + if not session_id: + return None + return ( + self.env["auditlog.http.session"].sudo().get_clickhouse_payload(session_id) + ) + + def _serialize_http_request(self, request_id): + """Serialize auditlog.http.request for ClickHouse buffering. + + :param request_id: HTTP request record ID + :type request_id: Optional[int] + :return: Serialized HTTP request payload or None + :rtype: Optional[_PayloadHttpRequest] + """ + if not request_id: + return None + return ( + self.env["auditlog.http.request"].sudo().get_clickhouse_payload(request_id) + ) + def _get_audit_model_id(self, res_model): """Resolve ir.model ID for given model name. @@ -284,6 +341,7 @@ def _buffer_export_data_payload( **base_log, }, "lines": [], + **self._build_http_related_payload(base_log), } buffer_model.create([{"payload_json": self._dump_payload_json(payload)}]) _logger.debug( @@ -373,13 +431,51 @@ def _build_payloads_for_records( values_src, include_lines_on_unlink, ): - """Build (log, lines) payloads for each record. + """Build buffered payload structures for each processed record. - :return: (payloads, total_lines) - :rtype: Tuple[List[Tuple[_PayloadLog, List[_PayloadLine]]], int] + Each returned item contains: + - log payload + - line payloads + - related HTTP payloads (session/request) + + :param uid: User ID performing operation. + :type uid: int + :param res_model: Model technical name. + :type res_model: str + :param res_ids: Record IDs affected. + :type res_ids: Sequence[int] + :param method: ORM method name. + :type method: str + :param model_id: ir.model record ID. + :type model_id: int + :param model_rs: Target model recordset. + :type model_rs: odoo.models.BaseModel + :param log_type: Audit log type. + :type log_type: Any + :param now_iso: UTC ISO timestamp with milliseconds. + :type now_iso: str + :param base_log: Base log mapping. + :type base_log: Dict[str, Any] + :param fields_to_exclude_set: Field names excluded from logging. + :type fields_to_exclude_set: Set[str] + :param old_values: Values before change. + :type old_values: Mapping[int, Mapping[str, Any]] + :param new_values: Values after change. + :type new_values: Mapping[int, Mapping[str, Any]] + :param line_builder: Callback to build line values. + :type line_builder: Optional[Callable] + :param values_src: Source value mappings for line building. + :type values_src: Tuple + :param include_lines_on_unlink: Whether unlink should include lines. + :type include_lines_on_unlink: bool + :return: Tuple of payload tuples and total line count. + :rtype: Tuple[ + List[Tuple[_PayloadLog, List[_PayloadLine], Dict[str, Any]]], + int, + ] """ log_ids = self._next_ids("auditlog_log_id_seq", len(res_ids)) - payloads: list[tuple[_PayloadLog, list[_PayloadLine]]] = [] + payloads: list[tuple[_PayloadLog, list[_PayloadLine], dict[str, Any]]] = [] total_lines = 0 for idx, res_id in enumerate(res_ids): @@ -444,7 +540,8 @@ def _build_payloads_for_records( } ) - payloads.append((log, lines)) + http_related = self._build_http_related_payload(log) + payloads.append((log, lines, http_related)) total_lines += len(lines) return payloads, total_lines @@ -458,20 +555,23 @@ def _build_buffer_vals_from_payloads( ): """Assign line IDs and build buffer create vals. - :param payloads: List of (log, lines) payloads - :type payloads: List[Tuple[_PayloadLog, List[_PayloadLine]]] - :param total_lines: Total number of line items across all payloads + Line identifiers are allocated in one batch and injected into line payloads + before serializing the final JSON stored in ``auditlog.log.buffer``. + + :param payloads: List of ``(log, lines, http_related)`` payload tuples. + :type payloads: List[Tuple[_PayloadLog, List[_PayloadLine], Dict[str, Any]]] + :param total_lines: Total number of line items across all payloads. :type total_lines: int - :param method: ORM method name + :param method: ORM method name. :type method: str - :return: Values list for auditlog.log.buffer.create() + :return: Values list for ``auditlog.log.buffer.create()``. :rtype: List[Dict[str, Any]] """ line_ids: list[int] = self._next_ids("auditlog_log_line_id_seq", total_lines) pos = 0 buffer_vals_list: list[dict[str, Any]] = [] - for log, lines in payloads: + for log, lines, http_related in payloads: for line in lines: line["id"] = int(line_ids[pos]) pos += 1 @@ -480,13 +580,32 @@ def _build_buffer_vals_from_payloads( buffer_vals_list.append( { "payload_json": self._dump_payload_json( - {"log": log, "lines": lines} + { + "log": log, + "lines": lines, + **http_related, + } ) } ) return buffer_vals_list + def _build_http_related_payload(self, base_log): + """Build HTTP related payload blocks for current audit log entry. + + :param base_log: Base log mapping + :type base_log: Dict[str, Any] + :return: Mapping with serialized http_session/http_request payloads + :rtype: Dict[str, Any] + """ + http_session = self._serialize_http_session(base_log.get("http_session_id")) + http_request = self._serialize_http_request(base_log.get("http_request_id")) + return { + "http_session": http_session, + "http_request": http_request, + } + def create_logs( self, uid, diff --git a/auditlog_clickhouse_write/tests/common.py b/auditlog_clickhouse_write/tests/common.py index 532bd3becf8..35fa294f3f0 100644 --- a/auditlog_clickhouse_write/tests/common.py +++ b/auditlog_clickhouse_write/tests/common.py @@ -17,6 +17,9 @@ def __init__( self.calls = [] # list[(query, params)] self.log_ids = set() + self.line_ids = set() + self.http_session_ids = set() + self.http_request_ids = set() def _parse_in_ids(self, query): m = re.search(r"\bIN\s*\(([^)]*)\)", query, flags=re.IGNORECASE) @@ -37,39 +40,88 @@ def _parse_in_ids(self, query): ids.add(p) return ids - def execute(self, query, params=None): - self.calls.append((query, params)) - q = (query or "").strip() - q_up = q.upper() + @staticmethod + def _is_select_ids_query(q_up, table_name): + return q_up.startswith("SELECT ID FROM") and table_name in q_up + + @staticmethod + def _is_insert_query(q_up, table_name): + return "INSERT INTO" in q_up and table_name in q_up + + def _select_existing_ids(self, query, stored_ids): + wanted = self._parse_in_ids(query) + existing = sorted(stored_ids.intersection(wanted)) + return [(row_id,) for row_id in existing] + + @staticmethod + def _collect_inserted_ids(params, target_set): + if not params: + return + for row in params: + target_set.add(row[0]) + def _handle_select(self, q, q_up): if q_up.startswith("SELECT 1"): return [(1,)] - if q_up.startswith("SELECT ID FROM") and "AUDITLOG_LOG" in q_up: - wanted = self._parse_in_ids(q) - existing = sorted(self.log_ids.intersection(wanted)) - return [(x,) for x in existing] + if self._is_select_ids_query(q_up, "AUDITLOG_HTTP_SESSION"): + return self._select_existing_ids(q, self.http_session_ids) + + if self._is_select_ids_query(q_up, "AUDITLOG_HTTP_REQUEST"): + return self._select_existing_ids(q, self.http_request_ids) + + if self._is_select_ids_query(q_up, "AUDITLOG_LOG_LINE"): + return self._select_existing_ids(q, self.line_ids) + + if ( + self._is_select_ids_query(q_up, "AUDITLOG_LOG") + and "AUDITLOG_LOG_LINE" not in q_up + ): + return self._select_existing_ids(q, self.log_ids) + + return None + def _handle_insert(self, q_up, params): if self.raise_on_insert and "INSERT INTO" in q_up: raise Exception("Simulated ClickHouse insert error") - if "INSERT INTO" in q_up and "AUDITLOG_LOG_LINE" in q_up: + if self._is_insert_query(q_up, "AUDITLOG_LOG_LINE"): if self.raise_on_line_insert_once and not self._line_failed_once: self._line_failed_once = True raise Exception("Simulated ClickHouse line insert error") + self._collect_inserted_ids(params, self.line_ids) + return [] + + if self._is_insert_query(q_up, "AUDITLOG_HTTP_SESSION"): + self._collect_inserted_ids(params, self.http_session_ids) + return [] + + if self._is_insert_query(q_up, "AUDITLOG_HTTP_REQUEST"): + self._collect_inserted_ids(params, self.http_request_ids) return [] if ( - "INSERT INTO" in q_up - and "AUDITLOG_LOG" in q_up + self._is_insert_query(q_up, "AUDITLOG_LOG") and "AUDITLOG_LOG_LINE" not in q_up ): - # collect inserted ids (1st tuple element) - if params: - for row in params: - self.log_ids.add(row[0]) + self._collect_inserted_ids(params, self.log_ids) return [] + return None + + def execute(self, query, params=None): + self.calls.append((query, params)) + q = (query or "").strip() + q_up = q.upper() + + select_result = self._handle_select(q, q_up) + if select_result is not None: + return select_result + + insert_result = self._handle_insert(q_up, params) + if insert_result is not None: + return insert_result + return [] diff --git a/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py index a1a140274a3..5a9dc37c746 100644 --- a/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py +++ b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py @@ -61,6 +61,24 @@ def test_01_create_writes_to_buffer_not_auditlog_tables(self): self.assertEqual(payload["log"]["model_id"], self.groups_model_id) self.assertEqual(payload["log"]["res_id"], group.id) + def test_01b_create_payload_contains_http_related_data(self): + buf = self.env["auditlog.log.buffer"].sudo() + + self.env["res.groups"].with_context(tracking_disable=True).create( + {"name": "ch_test_group_http_related"} + ) + + payload = buf.search([], order="id desc", limit=1).payload_json + + self.assertIn("http_session", payload) + self.assertIn("http_request", payload) + self.assertTrue( + payload["http_session"] is None or isinstance(payload["http_session"], dict) + ) + self.assertTrue( + payload["http_request"] is None or isinstance(payload["http_request"], dict) + ) + def test_02_write_creates_lines(self): buf = self.env["auditlog.log.buffer"].sudo() start_buf = buf.search_count([]) @@ -238,6 +256,9 @@ def test_04_job_flush_success_deletes_buffers_and_calls_insert(self): q for (q, _params) in dummy.calls if "INSERT INTO" in (q or "").upper() ] self.assertTrue(insert_calls, "Job must insert into ClickHouse") + insert_sql = "\n".join(insert_calls).upper() + self.assertIn("AUDITLOG_LOG", insert_sql) + self.assertIn("AUDITLOG_LOG_LINE", insert_sql) def test_05_job_invalid_payload_marks_error_and_keeps_row(self): buf = self.env["auditlog.log.buffer"].sudo() @@ -270,9 +291,14 @@ def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): partner.with_context(tracking_disable=True).write({"name": "Partial insert v2"}) pending = buf.search([("state", "=", buf.STATE_PENDING)], order="id asc") - valid_buffers, invalid_buffers, log_rows, line_rows = ( - buf._collect_rows_from_buffers(pending) - ) + ( + valid_buffers, + invalid_buffers, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + ) = buf._collect_rows_from_buffers(pending) self.assertTrue(log_rows) self.assertTrue(line_rows) @@ -287,6 +313,8 @@ def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): buf._insert_rows_to_clickhouse( client=dummy, config=self.config, + http_session_rows=http_session_rows, + http_request_rows=http_request_rows, log_rows=log_rows, line_rows=line_rows, valid_buffers=valid_buffers, @@ -296,6 +324,8 @@ def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): buf._insert_rows_to_clickhouse( client=dummy, config=self.config, + http_session_rows=http_session_rows, + http_request_rows=http_request_rows, log_rows=log_rows, line_rows=line_rows, valid_buffers=valid_buffers, @@ -312,6 +342,144 @@ def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): len(log_inserts), 1, "Log rows must not be inserted twice on retry" ) + def test_07_retry_does_not_duplicate_http_related_rows(self): + buf = self.env["auditlog.log.buffer"].sudo() + buf.search([]).unlink() + + rec = buf.create( + { + "payload_json": { + "http_session": { + "id": 101, + "name": "sess-101", + "user_id": self.env.uid, + "create_date": "2026-03-12T10:00:00+00:00", + "create_uid": self.env.uid, + "write_date": None, + "write_uid": None, + }, + "http_request": { + "id": 201, + "name": "/web", + "root_url": "http://localhost:8069/", + "user_id": self.env.uid, + "http_session_id": 101, + "user_context": "{}", + "create_date": "2026-03-12T10:00:00+00:00", + "create_uid": self.env.uid, + "write_date": None, + "write_uid": None, + }, + "log": { + "id": 301, + "name": "Test log", + "model_id": self.env.ref("base.model_res_partner").id, + "model_name": "Contact", + "model_model": "res.partner", + "res_id": 1, + "res_ids": None, + "user_id": self.env.uid, + "method": "write", + "http_request_id": 201, + "http_session_id": 101, + "log_type": "full", + "create_date": "2026-03-12T10:00:00+00:00", + "create_uid": self.env.uid, + "write_date": None, + "write_uid": None, + }, + "lines": [ + { + "id": 401, + "log_id": 301, + "field_id": self.env["ir.model.fields"] + .search( + [("model", "=", "res.partner"), ("name", "=", "name")], + limit=1, + ) + .id, + "field_name": "name", + "field_description": "Name", + "old_value": "Old", + "new_value": "New", + "old_value_text": None, + "new_value_text": None, + "create_date": "2026-03-12T10:00:00+00:00", + "create_uid": self.env.uid, + "write_date": None, + "write_uid": None, + } + ], + }, + "state": buf.STATE_PENDING, + } + ) + + pending = buf.search([("id", "=", rec.id), ("state", "=", buf.STATE_PENDING)]) + ( + valid_buffers, + invalid_buffers, + http_session_rows, + http_request_rows, + log_rows, + line_rows, + ) = buf._collect_rows_from_buffers(pending) + + self.assertTrue(http_session_rows) + self.assertTrue(http_request_rows) + self.assertTrue(log_rows) + self.assertTrue(line_rows) + + dummy = DummyClickHouseClient(raise_on_line_insert_once=True) + + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + with self.assertRaises(RetryableJobError): + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + http_session_rows=http_session_rows, + http_request_rows=http_request_rows, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + http_session_rows=http_session_rows, + http_request_rows=http_request_rows, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + session_inserts = [ + q + for (q, _p) in dummy.calls + if "INSERT INTO" in (q or "").upper() + and "AUDITLOG_HTTP_SESSION" in (q or "").upper() + ] + request_inserts = [ + q + for (q, _p) in dummy.calls + if "INSERT INTO" in (q or "").upper() + and "AUDITLOG_HTTP_REQUEST" in (q or "").upper() + ] + log_inserts = [ + q + for (q, _p) in dummy.calls + if "INSERT INTO" in (q or "").upper() + and "AUDITLOG_LOG" in (q or "").upper() + and "AUDITLOG_LOG_LINE" not in (q or "").upper() + ] + + self.assertEqual(len(session_inserts), 1) + self.assertEqual(len(request_inserts), 1) + self.assertEqual(len(log_inserts), 1) + @tagged("-at_install", "post_install") class TestAuditlogClickhouseRuleCache(AuditLogClickhouseCommon): diff --git a/auditlog_clickhouse_write/tests/test_clickhouse_config.py b/auditlog_clickhouse_write/tests/test_clickhouse_config.py index d6083b8c1b9..907573d5de0 100644 --- a/auditlog_clickhouse_write/tests/test_clickhouse_config.py +++ b/auditlog_clickhouse_write/tests/test_clickhouse_config.py @@ -65,3 +65,21 @@ def test_04_cron_skips_without_active_config(self): self.assertFalse(rec.error_message) self.assertFalse(dummy.calls) + + def test_05_create_tables_includes_http_relations(self): + cfg = self.create_config(is_active=True) + + with self._patched_clickhouse_client() as dummy: + cfg.action_create_auditlog_tables() + + ddl_calls = [ + q + for (q, _params) in dummy.calls + if "CREATE TABLE IF NOT EXISTS" in (q or "") + ] + ddl_sql = "\n".join(ddl_calls) + + self.assertIn("auditlog_http_session", ddl_sql) + self.assertIn("auditlog_http_request", ddl_sql) + self.assertIn("auditlog_log", ddl_sql) + self.assertIn("auditlog_log_line", ddl_sql)