diff --git a/auditlog_clickhouse/README.rst b/auditlog_clickhouse/README.rst new file mode 100644 index 00000000000..25e07217fc6 --- /dev/null +++ b/auditlog_clickhouse/README.rst @@ -0,0 +1,166 @@ +=================================== +Audit Log ClickHouse store and read +=================================== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:11aaa38bad24a890554c0d34d74d31e13b933facbba3fea31f4cbf22ae8fd842 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--tools-lightgray.png?logo=github + :target: https://github.com/OCA/server-tools/tree/18.0/auditlog_clickhouse + :alt: OCA/server-tools +.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png + :target: https://translation.odoo-community.org/projects/server-tools-18-0/server-tools-18-0-auditlog_clickhouse + :alt: Translate me on Weblate +.. |badge5| image:: https://img.shields.io/badge/runboat-Try%20me-875A7B.png + :target: https://runboat.odoo-community.org/builds?repo=OCA/server-tools&target_branch=18.0 + :alt: Try me on Runboat + +|badge1| |badge2| |badge3| |badge4| |badge5| + +This module implements buffered asynchronous transfers of audit logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations. Audit logs +stored in ClickHouse are displayed in standard Odoo audit log views +(logs, log lines, forms with detailed log information) without any +changes to existing view definitions. + +**Table of contents** + +.. contents:: + :local: + +Use Cases / Context +=================== + +The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by + base.group_system) have full CRUD access to audit tables, allowing + audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same + transaction and performs multiple ORM create() calls, adding latency + to audited operations. + +Configuration +============= + +This module requires: + +- A reachable ClickHouse server. +- Python dependency ``clickhouse-driver`` available in the Odoo + environment. +- A ClickHouse database created in advance (the module does **not** + create databases/users/grants). +- A ClickHouse user with at least: + + - ``INSERT`` and ``CREATE TABLE`` privileges on the target database. + +- The ``pg_clickhouse`` extension installed on the PostgreSQL server. + +Steps: + +- Make sure ``clickhouse-driver`` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + ++---------------------------------------+ +| Field | ++=======================================+ +| Hostname or IP | ++---------------------------------------+ +| TCP port | ++---------------------------------------+ +| ClickHouse database name | ++---------------------------------------+ +| ClickHouse user | ++---------------------------------------+ +| ClickHouse Password | ++---------------------------------------+ +| queue_job_batch_size (default = 1000) | ++---------------------------------------+ +| channel_id (default root) | ++---------------------------------------+ + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in + the target database. +- Click **Setup FDW read** to configure the Foreign Data Wrapper so that + standard Odoo audit log views read data directly from ClickHouse. + +Usage +===== + +Once auditlog_clickhouse is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) + on models with active auditlog.rule subscriptions. This behavior is + unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer + table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to + ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or + deleted via Odoo. + +All standard Odoo audit log views work as expected - logs, log lines, +and forms with detailed log data display data from ClickHouse. Search, +filtering, and grouping (by user, model, date, session, query) work +through FDW with the query being forwarded to ClickHouse. The “View +logs” quick access button in audited model forms works as expected. +Audit logs are read-only. Attempting to modify or delete a log entry +from the user interface raises an error. + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* Cetmix + +Maintainers +----------- + +This module is maintained by the OCA. + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +This module is part of the `OCA/server-tools `_ project on GitHub. + +You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute. diff --git a/auditlog_clickhouse/__init__.py b/auditlog_clickhouse/__init__.py new file mode 100644 index 00000000000..0650744f6bc --- /dev/null +++ b/auditlog_clickhouse/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/auditlog_clickhouse/__manifest__.py b/auditlog_clickhouse/__manifest__.py new file mode 100644 index 00000000000..e9a41e8b228 --- /dev/null +++ b/auditlog_clickhouse/__manifest__.py @@ -0,0 +1,21 @@ +{ + "name": "Audit Log ClickHouse store and read", + "version": "18.0.1.0.0", + "summary": "Asynchronous audit log storage in ClickHouse", + "category": "Tools", + "license": "AGPL-3", + "author": "Odoo Community Association (OCA), Cetmix", + "website": "https://github.com/OCA/server-tools", + "depends": [ + "auditlog", + "queue_job", + ], + "external_dependencies": { + "python": ["clickhouse-driver"], + }, + "data": [ + "security/ir.model.access.csv", + "data/auditlog_clickhouse_queue.xml", + "views/auditlog_clickhouse_config_views.xml", + ], +} diff --git a/auditlog_clickhouse/data/auditlog_clickhouse_queue.xml b/auditlog_clickhouse/data/auditlog_clickhouse_queue.xml new file mode 100644 index 00000000000..c5968f58514 --- /dev/null +++ b/auditlog_clickhouse/data/auditlog_clickhouse_queue.xml @@ -0,0 +1,27 @@ + + + Auditlog ClickHouse: enqueue buffer flush + + code + model._cron_flush_to_clickhouse() + 5 + minutes + True + + + + + Edit buffer flush schedule + ir.cron + + ir.actions.act_window + form + new + + diff --git a/auditlog_clickhouse/i18n/auditlog_clickhouse.pot b/auditlog_clickhouse/i18n/auditlog_clickhouse.pot new file mode 100644 index 00000000000..a302b7b5e05 --- /dev/null +++ b/auditlog_clickhouse/i18n/auditlog_clickhouse.pot @@ -0,0 +1,392 @@ +# Translation of Odoo Server. +# This file contains the translation of the following modules: +# * auditlog_clickhouse +# +msgid "" +msgstr "" +"Project-Id-Version: Odoo Server 18.0\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2026-02-25 21:35+0000\n" +"PO-Revision-Date: 2026-02-25 21:35+0000\n" +"Last-Translator: \n" +"Language-Team: \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: \n" +"Plural-Forms: \n" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "" +"%s\n" +"\n" +"If you save this configuration as active, the currently active one will be deactivated:\n" +"- %s" +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "" +"As soon as this connection to ClickHouse is activated, all log\n" +" entries from that moment will be stored in the configured\n" +" ClickHouse database. Only one connection can be active at a\n" +" time." +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "" +"As soon as this connection to ClickHouse is activated, all log entries from that moment will be stored in the configured ClickHouse database.\n" +"\n" +" Only one connection can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__attempt_count +msgid "Attempt Count" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_readonly.py:0 +msgid "Audit logs are read-only (stored in ClickHouse)." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model,name:auditlog_clickhouse.model_auditlog_log +msgid "Auditlog - Log" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model,name:auditlog_clickhouse.model_auditlog_log_line +msgid "Auditlog - Log details (fields updated)" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model,name:auditlog_clickhouse.model_auditlog_rule +msgid "Auditlog - Rule" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model,name:auditlog_clickhouse.model_auditlog_log_buffer +msgid "Auditlog ClickHouse Buffer" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model,name:auditlog_clickhouse.model_auditlog_clickhouse_config +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Auditlog ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.actions.server,name:auditlog_clickhouse.ir_cron_auditlog_clickhouse_enqueue_flush_ir_actions_server +msgid "Auditlog ClickHouse: enqueue buffer flush" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Auditlog tables were created (if they did not exist)." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__queue_batch_size +msgid "Batch size" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__queue_channel_id +msgid "Channel" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.actions.act_window,name:auditlog_clickhouse.action_auditlog_clickhouse_config +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_list +msgid "ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse activation" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.ui.menu,name:auditlog_clickhouse.menu_auditlog_clickhouse_config +msgid "ClickHouse configuration" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse connection failed: %s" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__port +msgid "" +"ClickHouse native TCP port used by clickhouse-driver (default is 9000)." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__host +msgid "" +"ClickHouse server hostname or IP address. Must be reachable from the Odoo " +"server." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__user +msgid "" +"ClickHouse user name used for INSERT operations into auditlog tables. " +"Recommended: a dedicated user with INSERT-only privileges." +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Configure action" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.actions.act_window,name:auditlog_clickhouse.action_configure_auditlog_clickhouse_flush_cron +msgid "Configure flush action" +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Connection" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Connection to ClickHouse is OK." +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Create Auditlog Tables" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__create_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__create_uid +msgid "Created by" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__create_date +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__create_date +msgid "Created on" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__database +msgid "Database name" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__display_name +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__display_name +msgid "Display Name" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.actions.act_window,name:auditlog_clickhouse.action_configure_auditlog_clickhouse_cron +msgid "Edit buffer flush schedule" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_log_buffer.py:0 +msgid "Error" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__error_message +msgid "Error Message" +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Export Queue" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__fdw_enabled +msgid "FDW enabled" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "FDW server and user mapping were configured." +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Failed to create ClickHouse tables: %s" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Failed to create/alter FDW server: %s" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Failed to create/alter user mapping: %s" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_log_buffer.py:0 +msgid "Flushed to ClickHouse but failed to delete buffer rows: %s" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Host is required." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__host +msgid "Hostname or IP" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__id +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__id +msgid "ID" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__is_active +msgid "" +"If checked audit logs will be buffered locally and exported to ClickHouse. " +"Only one configuration can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__is_active +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_list +msgid "Is Active" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__write_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__write_uid +msgid "Last Updated by" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__write_date +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__write_date +msgid "Last Updated on" +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "" +"Logs are buffered in PostgreSQL and periodically flushed to\n" +" ClickHouse by cron." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__queue_batch_size +msgid "Maximum number of buffer rows processed per queue job run." +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Notes" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__password +msgid "Password" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__password +msgid "Password for the ClickHouse user." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__payload_json +msgid "Payload Json" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_log_buffer.py:0 +msgid "Pending" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/clickhouse_client.py:0 +msgid "" +"Python package 'clickhouse-driver' is not available. Install it in the Odoo " +"environment to use ClickHouse storage." +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Setup FDW read" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_log_buffer__state +msgid "State" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "Success" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__port +msgid "TCP Port" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__database +msgid "" +"Target ClickHouse database where auditlog tables exist (or will be created " +"by the setup button)." +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__fdw_enabled +msgid "Technical flag set after configuring pg_clickhouse FDW objects." +msgstr "" + +#. module: auditlog_clickhouse +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse.view_auditlog_clickhouse_config_form +msgid "Test connection" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,field_description:auditlog_clickhouse.field_auditlog_clickhouse_config__user +msgid "User" +msgstr "" + +#. module: auditlog_clickhouse +#. odoo-python +#: code:addons/auditlog_clickhouse/models/auditlog_clickhouse_config.py:0 +msgid "pg_clickhouse extension is not available: %s" +msgstr "" + +#. module: auditlog_clickhouse +#: model:ir.model.fields,help:auditlog_clickhouse.field_auditlog_clickhouse_config__queue_channel_id +msgid "queue_job channel used for export jobs." +msgstr "" diff --git a/auditlog_clickhouse/models/__init__.py b/auditlog_clickhouse/models/__init__.py new file mode 100644 index 00000000000..d29210bc110 --- /dev/null +++ b/auditlog_clickhouse/models/__init__.py @@ -0,0 +1,5 @@ +from . import auditlog_clickhouse_config +from . import clickhouse_client +from . import auditlog_log_buffer +from . import auditlog_rule +from . import auditlog_readonly diff --git a/auditlog_clickhouse/models/auditlog_clickhouse_config.py b/auditlog_clickhouse/models/auditlog_clickhouse_config.py new file mode 100644 index 00000000000..0485c6ac8d9 --- /dev/null +++ b/auditlog_clickhouse/models/auditlog_clickhouse_config.py @@ -0,0 +1,751 @@ +import logging +from typing import Any, Optional + +from odoo import api, fields, models +from odoo.exceptions import UserError +from odoo.tools import SQL + +from .clickhouse_client import get_clickhouse_client + +_logger = logging.getLogger(__name__) + + +class AuditlogClickhouseConfig(models.Model): + """ + ClickHouse connection configuration for auditlog_clickhouse. + + Business rules: + - Only one configuration can be active at a time. + - UI provides tools to test the connection and (optionally) create tables. + + Notes: + - As soon as a configuration becomes active, audit log entries will be stored + in the configured ClickHouse database from that moment. + """ + + _name = "auditlog.clickhouse.config" + _description = "Auditlog ClickHouse Configuration" + _rec_name = "display_name" + + FDW_SERVER = "auditlog_clickhouse_srv" + DEFAULT_PORT = 9000 + DEFAULT_DB = "odoo_audit" + DEFAULT_USER = "odoo_audit_writer" + DEFAULT_QUEUE_BATCH_SIZE = 1000 + + is_active = fields.Boolean( + help=( + "If checked audit logs will be buffered locally and exported to ClickHouse." + " Only one configuration can be active at a time." + ), + ) + host = fields.Char( + string="Hostname or IP", + required=True, + help=( + "ClickHouse server hostname or IP address. " + "Must be reachable from the Odoo server." + ), + ) + port = fields.Integer( + string="TCP Port", + required=True, + default=DEFAULT_PORT, + help=( + "ClickHouse native TCP port used by clickhouse-driver " "(default is 9000)." + ), + ) + database = fields.Char( + string="Database name", + required=True, + default=DEFAULT_DB, + help=( + "Target ClickHouse database where auditlog tables exist " + "(or will be created by the setup button)." + ), + ) + user = fields.Char( + required=True, + default=DEFAULT_USER, + help=( + "ClickHouse user name used for INSERT operations into auditlog tables. " + "Recommended: a dedicated user with INSERT-only privileges." + ), + ) + password = fields.Char( + help="Password for the ClickHouse user.", + ) + + queue_batch_size = fields.Integer( + string="Batch size", + default=DEFAULT_QUEUE_BATCH_SIZE, + required=True, + help="Maximum number of buffer rows processed per queue job run.", + ) + + def _default_queue_channel(self): + Channel = self.env["queue.job.channel"].sudo() + return Channel.search([("complete_name", "=", "root")], limit=1) + + queue_channel_id = fields.Many2one( + comodel_name="queue.job.channel", + string="Channel", + required=True, + default=_default_queue_channel, + ondelete="restrict", + help="queue_job channel used for export jobs.", + ) + + fdw_enabled = fields.Boolean( + string="FDW enabled", + readonly=True, + help="Technical flag set after configuring pg_clickhouse FDW objects.", + ) + + _sql_constraints = [ + ( + "auditlog_clickhouse_queue_batch_size_positive", + "CHECK(queue_batch_size > 0)", + "Batch size must be greater than 0.", + ), + ] + + @api.depends("host", "port", "database", "user", "is_active") + def _compute_display_name(self): + for rec in self: + base = ( + f"{rec.host or ''}:{rec.port or ''}/" + f"{rec.database or ''} ({rec.user or ''})" + ) + rec.display_name = f"{base} [active]" if rec.is_active else base + + @api.model + def get_active_config(self) -> Optional["AuditlogClickhouseConfig"]: + """Return the currently active configuration (if any).""" + config = self.search([("is_active", "=", True)], limit=1) + _logger.debug( + "auditlog_clickhouse: get_active_config -> %s", + config.id if config else None, + ) + return config + + def _deactivate_other_configs(self) -> None: + """ + Silently deactivate all other active configurations. + + Called after the current record(s) become active to keep the + "single active" rule without DB constraint errors. + """ + other_configs = self.search( + [("is_active", "=", True), ("id", "not in", self.ids)] + ) + if other_configs: + _logger.info( + "auditlog_clickhouse: deactivating other configs %s (activated=%s)", + other_configs.ids, + self.ids, + ) + other_configs.write({"is_active": False}) + + @api.onchange("is_active") + def _onchange_is_active(self): + """ + Show disclaimer immediately when user enables the checkbox. + + If another active configuration exists, also warn that it will be + deactivated after saving. + """ + for rec in self: + if not rec.is_active or (rec._origin and rec._origin.is_active): + continue + + disclaimer = rec.env._( + "As soon as this connection to ClickHouse is activated, all log entries" + " from that moment will be stored in the configured ClickHouse" + " database.\n\n Only one connection can be active at a time." + ) + + domain = [("is_active", "=", True)] + if rec.id: + domain.append(("id", "!=", rec.id)) + + other = rec.env["auditlog.clickhouse.config"].sudo().search(domain, limit=1) + if other: + message = rec.env._( + "%s\n\nIf you save this configuration as active, " + "the currently active one will be deactivated:\n- %s" + ) % (disclaimer, other.display_name) + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": message, + } + } + + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": disclaimer, + } + } + + @api.model_create_multi + def create(self, vals_list: list[dict[str, Any]]): + """ + Enforce single active config on creation. + + If any newly created record is active, deactivate all other active + configs after the create succeeds. + """ + records = super().create(vals_list) + active_records = records.filtered("is_active") + if active_records: + _logger.info( + "auditlog_clickhouse: created active config(s) %s", + active_records.ids, + ) + active_records._deactivate_other_configs() + else: + _logger.debug("auditlog_clickhouse: created config(s) %s", records.ids) + return records + + def write(self, vals: dict[str, Any]) -> bool: + """ + Enforce single active config on update. + + If this write enables the current record(s), deactivate all other active + configs after the write succeeds. + """ + turning_on = vals.get("is_active") is True + result = super().write(vals) + + if turning_on: + activated = self.filtered("is_active") + _logger.info( + "auditlog_clickhouse: activated config(s) %s (via write)", + activated.ids, + ) + activated._deactivate_other_configs() + else: + _logger.debug( + "auditlog_clickhouse: updated config(s) %s (vals=%s)", + self.ids, + sorted(vals.keys()), + ) + + return result + + def action_test_connection(self) -> dict[str, Any]: + """UI button: verify ClickHouse connectivity with a trivial query.""" + self.ensure_one() + _logger.info( + "auditlog_clickhouse: testing connection " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + client = self._get_client() + try: + client.execute("SELECT 1") + except Exception as exc: + _logger.exception( + "auditlog_clickhouse: connection test FAILED " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + raise UserError( + self.env._("ClickHouse connection failed: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse: connection test OK " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Connection to ClickHouse is OK."), + notif_type="success", + ) + + def action_create_auditlog_tables(self) -> dict[str, Any]: + """ + UI button: create ClickHouse tables if they do not exist. + + Important: + - This is optional. In production you may point to an existing DB. + - Database must already exist. + - We intentionally do not create users/grants in this project. + """ + self.ensure_one() + _logger.info( + "auditlog_clickhouse: creating tables (config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + + client = self._get_client() + try: + for statement in self._get_clickhouse_ddl(): + preview = " ".join(statement.strip().splitlines())[:120] + _logger.debug( + "auditlog_clickhouse: executing DDL (config=%s): %s...", + self.id, + preview, + ) + client.execute(statement) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse: create tables FAILED " + "(config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + raise UserError( + self.env._("Failed to create ClickHouse tables: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse: create tables OK (config=%s db=%s)", + self.id, + self.database, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Auditlog tables were created (if they did not exist)."), + notif_type="success", + ) + + def _get_client(self): + """Build a clickhouse-driver client from the current record values.""" + self.ensure_one() + _logger.debug( + "auditlog_clickhouse: building client " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + return get_clickhouse_client( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + ) + + def _get_clickhouse_ddl(self) -> list[str]: + """ + Return ClickHouse DDL statements for required objects. + + Schema is based on the reference provided in the task. Engines/ORDER BY + are chosen as safe defaults for append-only workloads. + """ + self.ensure_one() + db_name = self.database + + return [ + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log + ( + id Int64, + name Nullable(String), + model_id Int32, + model_name Nullable(String), + model_model String, + res_id Nullable(Int64), + res_ids Nullable(String), + user_id Int32, + method String, + http_request_id Nullable(Int64), + http_session_id Nullable(Int64), + log_type Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log_line + ( + id Int64, + log_id Int64, + field_id Int32, + field_name Nullable(String), + field_description Nullable(String), + old_value Nullable(String), + new_value Nullable(String), + old_value_text Nullable(String), + new_value_text Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + ] + + def _fdw_server_exists(self) -> bool: + self.env.cr.execute( + "SELECT 1 FROM pg_foreign_server WHERE srvname = %s", + (self.FDW_SERVER,), + ) + return bool(self.env.cr.fetchone()) + + def _fdw_user_mapping_exists(self) -> bool: + # pg_user_mappings: srvname, usename (view) + self.env.cr.execute( + "SELECT 1 FROM pg_user_mappings " + "WHERE srvname = %s AND usename = current_user", + (self.FDW_SERVER,), + ) + return bool(self.env.cr.fetchone()) + + def action_setup_fdw_read(self): + """UI button: configure pg_clickhouse FDW server + user mapping.""" + self.ensure_one() + + try: + self.env.cr.execute("CREATE EXTENSION IF NOT EXISTS pg_clickhouse") + except Exception as exc: + raise UserError( + self.env._("pg_clickhouse extension is not available: %s") % exc + ) from exc + + driver = "binary" + host = (self.host or "").strip() + if not host: + raise UserError(self.env._("Host is required.")) + port = int(self.port or 0) or self.DEFAULT_PORT + port_opt = str(port) + dbname = (self.database or "").strip() or self.DEFAULT_DB + + try: + if self._fdw_server_exists(): + self.env.cr.execute( + SQL( + """ + ALTER SERVER %s OPTIONS ( + SET driver %s, + SET host %s, + SET port %s, + SET dbname %s + ) + """, + SQL.identifier(self.FDW_SERVER), + driver, + host, + port_opt, + dbname, + ) + ) + else: + self.env.cr.execute( + SQL( + """ + CREATE SERVER %s + FOREIGN DATA WRAPPER clickhouse_fdw + OPTIONS ( + driver %s, + host %s, + port %s, + dbname %s + ) + """, + SQL.identifier(self.FDW_SERVER), + driver, + host, + port_opt, + dbname, + ) + ) + except Exception as exc: + raise UserError( + self.env._("Failed to create/alter FDW server: %s") % exc + ) from exc + + ch_user = (self.user or "default").strip() or "default" + ch_pass = self.password or "" + + try: + if self._fdw_user_mapping_exists(): + self.env.cr.execute( + SQL( + """ + ALTER USER MAPPING FOR CURRENT_USER + SERVER %s + OPTIONS ( + SET user %s, + SET password %s + ) + """, + SQL.identifier(self.FDW_SERVER), + ch_user, + ch_pass, + ) + ) + else: + self.env.cr.execute( + SQL( + """ + CREATE USER MAPPING FOR CURRENT_USER + SERVER %s + OPTIONS ( + user %s, + password %s + ) + """, + SQL.identifier(self.FDW_SERVER), + ch_user, + ch_pass, + ) + ) + except Exception as exc: + raise UserError( + self.env._("Failed to create/alter user mapping: %s") % exc + ) from exc + + self._swap_auditlog_tables_to_fdw() + + self.write({"fdw_enabled": True}) + return self._notify( + title=self.env._("Success"), + message=self.env._("FDW server and user mapping were configured."), + notif_type="success", + ) + + def _relation_kind(self, schema: str, name: str) -> str | None: + """Return pg_class.relkind for schema.name, or None if missing.""" + self.env.cr.execute("SELECT to_regclass(%s)", (f"{schema}.{name}",)) + reg = self.env.cr.fetchone()[0] + if not reg: + return None + self.env.cr.execute( + """ + SELECT c.relkind + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = %s + AND c.relname = %s + """, + (schema, name), + ) + row = self.env.cr.fetchone() + return row[0] if row else None + + def _drop_foreign_table_if_exists(self, schema: str, name: str): + kind = self._relation_kind(schema, name) + if kind == "f": + self.env.cr.execute( + SQL( + "DROP FOREIGN TABLE %s.%s", + SQL.identifier(schema), + SQL.identifier(name), + ) + ) + + def _rename_table_if_exists(self, schema: str, name: str, new_name: str): + kind = self._relation_kind(schema, name) + if kind == "r": # ordinary table + self.env.cr.execute( + SQL( + "ALTER TABLE %s.%s RENAME TO %s", + SQL.identifier(schema), + SQL.identifier(name), + SQL.identifier(new_name), + ) + ) + + def _ensure_sequences(self): + # needed for integer ids if PG tables are swapped away + self.env.cr.execute("CREATE SEQUENCE IF NOT EXISTS auditlog_log_id_seq") + self.env.cr.execute("CREATE SEQUENCE IF NOT EXISTS auditlog_log_line_id_seq") + + def _create_foreign_tables(self, schema: str): + db_opt = (self.database or "").strip() + + # auditlog_log + self.env.cr.execute( + SQL( + """ + CREATE FOREIGN TABLE %s.%s ( + id bigint, + create_date timestamp, + create_uid integer, + write_date timestamp, + write_uid integer, + name text, + model_id integer, + model_name text, + model_model text, + res_id bigint, + res_ids text, + user_id integer, + method text, + http_session_id integer, + http_request_id integer, + log_type text + ) + SERVER %s + OPTIONS (table_name %s, database %s) + """, + SQL.identifier(schema), + SQL.identifier("auditlog_log"), + SQL.identifier(self.FDW_SERVER), + "auditlog_log", + db_opt, + ) + ) + + # auditlog_log_line + self.env.cr.execute( + SQL( + """ + CREATE FOREIGN TABLE %s.%s ( + id bigint, + create_date timestamp, + create_uid integer, + write_date timestamp, + write_uid integer, + field_id integer, + log_id bigint, + old_value text, + new_value text, + old_value_text text, + new_value_text text, + field_name text, + field_description text + ) + SERVER %s + OPTIONS (table_name %s, database %s) + """, + SQL.identifier(schema), + SQL.identifier("auditlog_log_line"), + SQL.identifier(self.FDW_SERVER), + "auditlog_log_line", + db_opt, + ) + ) + + def _recreate_auditlog_log_line_view(self, schema: str): + # Odoo model auditlog.log.line.view expects this view name. + # Drop first to avoid old OID dependencies when swapping tables. + self.env.cr.execute( + SQL( + "DROP VIEW IF EXISTS %s.%s", + SQL.identifier(schema), + SQL.identifier("auditlog_log_line_view"), + ) + ) + self.env.cr.execute( + SQL( + """ + CREATE VIEW %s.%s AS + SELECT alogl.id, + alogl.create_date, + alogl.create_uid, + alogl.write_uid, + alogl.write_date, + alogl.field_id, + alogl.log_id, + alogl.old_value, + alogl.new_value, + alogl.old_value_text, + alogl.new_value_text, + alogl.field_name, + alogl.field_description, + alog.name, + alog.model_id, + alog.model_name, + alog.model_model, + alog.res_id, + alog.user_id, + alog.method, + alog.http_session_id, + alog.http_request_id, + alog.log_type + FROM %s.%s alogl + JOIN %s.%s alog ON alog.id = alogl.log_id + """, + SQL.identifier(schema), + SQL.identifier("auditlog_log_line_view"), + SQL.identifier(schema), + SQL.identifier("auditlog_log_line"), + SQL.identifier(schema), + SQL.identifier("auditlog_log"), + ) + ) + + def _swap_auditlog_tables_to_fdw(self): + """Make auditlog read from ClickHouse through pg_clickhouse foreign tables.""" + self.ensure_one() + schema = "public" + + # 1) Drop SQL view first (it binds to old table OIDs) + self.env.cr.execute( + SQL( + "DROP VIEW IF EXISTS %s.%s", + SQL.identifier(schema), + SQL.identifier("auditlog_log_line_view"), + ) + ) + + # 2) If foreign tables already exist, drop them (safe; data is in ClickHouse) + self._drop_foreign_table_if_exists(schema, "auditlog_log_line") + self._drop_foreign_table_if_exists(schema, "auditlog_log") + + # 3) If ordinary tables exist, rename to backup (keep local history) + self._rename_table_if_exists( + schema, "auditlog_log_line", "auditlog_log_line_pg_backup" + ) + self._rename_table_if_exists(schema, "auditlog_log", "auditlog_log_pg_backup") + + # 4) Ensure sequences (needed by our ClickHouse write path) + self._ensure_sequences() + + # 5) Create foreign tables + self._create_foreign_tables(schema) + + # 6) Recreate view that auditlog uses for details + self._recreate_auditlog_log_line_view(schema) + + @staticmethod + def _notify( + *, title: str, message: str, notif_type: str = "info" + ) -> dict[str, Any]: + """Return standard Odoo UI notification action.""" + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": title, + "message": message, + "type": notif_type, + "sticky": False, + }, + } diff --git a/auditlog_clickhouse/models/auditlog_log_buffer.py b/auditlog_clickhouse/models/auditlog_log_buffer.py new file mode 100644 index 00000000000..240b82bddf9 --- /dev/null +++ b/auditlog_clickhouse/models/auditlog_log_buffer.py @@ -0,0 +1,484 @@ +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from dateutil import parser as dt_parser + +from odoo import api, fields, models +from odoo.tools import SQL + +from odoo.addons.queue_job.exception import RetryableJobError + +_logger = logging.getLogger(__name__) + +JsonMapping = dict[str, Any] +ChRow = tuple[Any, ...] + + +class AuditlogLogBuffer(models.Model): + """ + Buffered audit log payloads waiting to be flushed into ClickHouse. + + Each record stores a pre-built payload produced by the auditlog.rule override. + Export is asynchronous: + + - A cron enqueues a queue_job. + - The queue_job locks pending buffer rows (FOR UPDATE SKIP LOCKED), + converts payloads to ClickHouse tuples and inserts them in batches. + - Successfully flushed buffer rows are removed from PostgreSQL. + + Design notes: + - This model is an internal queue; no user-facing ACLs should be provided. + - queue_job provides retries/backoff when ClickHouse is slow/unavailable. + """ + + _name = "auditlog.log.buffer" + _description = "Auditlog ClickHouse Buffer" + _order = "create_date asc, id asc" + + STATE_PENDING = "pending" + STATE_ERROR = "error" + + # Column order MUST match CREATE TABLE schema and inserted tuples. + _CH_LOG_COLUMNS: tuple[str, ...] = ( + "id", + "name", + "model_id", + "model_name", + "model_model", + "res_id", + "res_ids", + "user_id", + "method", + "http_request_id", + "http_session_id", + "log_type", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + _CH_LINE_COLUMNS: tuple[str, ...] = ( + "id", + "log_id", + "field_id", + "field_name", + "field_description", + "old_value", + "new_value", + "old_value_text", + "new_value_text", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + + _INVALID_PAYLOAD_MESSAGE = ( + "Invalid payload structure (expected object with 'log' and 'lines')." + ) + + @api.model + def _selection_state(self) -> list[tuple[str, str]]: + """Centralized selection for `state`.""" + return [ + (self.STATE_PENDING, self.env._("Pending")), + (self.STATE_ERROR, self.env._("Error")), + ] + + payload_json = fields.Json(required=True) + state = fields.Selection( + selection=lambda self: self._selection_state(), + default=lambda self: self.STATE_PENDING, + required=True, + index=True, + ) + attempt_count = fields.Integer(default=0, required=True) + error_message = fields.Text() + + @staticmethod + def _to_ch_nullable_string(value: Any) -> str | None: + """ + Convert value into ClickHouse Nullable(String). + + - None/False -> None + - str -> as is + - list/dict/tuple -> JSON string (unicode preserved) + - other -> str(value) + """ + if value is None or value is False: + return None + if isinstance(value, str): + return value + if isinstance(value, (dict | list | tuple)): + return json.dumps(value, ensure_ascii=False, default=str) + return str(value) + + @staticmethod + def _to_ch_datetime_utc(value: Any) -> datetime | None: + """ + Convert incoming value to tz-aware UTC datetime. + + We normalize to UTC to keep consistent semantics for ClickHouse + DateTime64(3, 'UTC'). + """ + if not value: + return None + + if isinstance(value, datetime): + parsed = value + else: + raw = str(value).strip().replace("Z", "+00:00") + try: + parsed = dt_parser.parse(raw) + except (ValueError, TypeError, OverflowError): + # Fallback: Odoo parser usually returns naive datetime. + parsed = fields.Datetime.from_string(value) + + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + def _set_error(self, message: str) -> None: + """ + Mark records as error + increment attempt_count + store error_message. + + We update per-record to ensure attempt_count increments correctly. + """ + for rec in self: + rec.write( + { + "state": self.STATE_ERROR, + "attempt_count": rec.attempt_count + 1, + "error_message": message, + } + ) + + @api.model + def _lock_pending_buffers(self, batch_size: int) -> "AuditlogLogBuffer": + """ + Fetch up to `batch_size` pending buffers and lock them (FOR UPDATE SKIP LOCKED). + + This prevents concurrent workers/jobs from selecting the same rows and + inserting duplicates into ClickHouse. + """ + query = SQL( + """ + SELECT id + FROM %s + WHERE state = %s + ORDER BY id + FOR UPDATE SKIP LOCKED + LIMIT %s + """, + SQL.identifier(self._table), + self.STATE_PENDING, + batch_size, + ) + self.env.cr.execute(query) + ids = [row[0] for row in self.env.cr.fetchall()] + return self.browse(ids) + + @api.model + def _cron_flush_to_clickhouse(self, batch_size: int | None = None) -> bool: + """ + Enqueue a queue_job to flush buffered rows into ClickHouse. + + This cron does not perform ClickHouse INSERTs directly. It only schedules + a job, so that queue_job can handle retries and high load. + + :param batch_size: optional override; if not provided, + uses config.queue_batch_size. + :return: True (cron compatibility). + """ + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + _logger.debug("auditlog_clickhouse: cron flush skipped (no active config)") + return True + + effective_batch = int(batch_size or config.queue_batch_size or 0) or 1000 + + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + _logger.debug( + "auditlog_clickhouse: cron flush skipped (no pending buffers)" + ) + return True + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + + _logger.info( + "auditlog_clickhouse: enqueue flush job " + "(config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + effective_batch, + ) + + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse: flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, effective_batch) + + return True + + @api.model + def _get_active_config_for_job(self, config_id: int): + config = self.env["auditlog.clickhouse.config"].sudo().browse(config_id) + if not config or not config.exists() or not config.is_active: + _logger.info( + "auditlog_clickhouse: job skipped " + "(config missing or not active) (config_id=%s)", + config_id, + ) + return None + return config + + @classmethod + def _payload_is_valid(cls, payload: Any) -> bool: + """Strict-enough validation to avoid endless RetryableJobError loops.""" + if not isinstance(payload, dict): + return False + + log_data = payload.get("log") + lines_data = payload.get("lines") + + if not isinstance(log_data, dict) or not isinstance(lines_data, list): + return False + + # Minimal required log fields (to avoid CH insert failures forever) + required = ( + "id", + "model_id", + "model_model", + "user_id", + "method", + "create_date", + "create_uid", + ) + for key in required: + if not log_data.get(key): + return False + + # Lines must be a list of dicts (if any line is broken -> whole payload invalid) + return all(isinstance(line, dict) for line in lines_data) + + def _collect_rows_from_buffers(self, buffers): + """Return (valid_buffers, invalid_buffers, log_rows, line_rows).""" + log_rows: list[ChRow] = [] + line_rows: list[ChRow] = [] + invalid_buffers = self.browse() + + for rec in buffers: + payload = rec.payload_json + + if not self._payload_is_valid(payload): + invalid_buffers |= rec + continue + + log_data = payload["log"] + lines_data = payload["lines"] + + log_rows.append(self._build_ch_log_row(log_data)) + for line_data in lines_data: + line_rows.append(self._build_ch_line_row(line_data)) + + valid_buffers = buffers - invalid_buffers + return valid_buffers, invalid_buffers, log_rows, line_rows + + def _mark_invalid_buffers(self, invalid_buffers, config) -> None: + if not invalid_buffers: + return + invalid_buffers._set_error(self.env._(self._INVALID_PAYLOAD_MESSAGE)) + _logger.warning( + "auditlog_clickhouse: invalid payloads=%s (marked error) (config=%s)", + len(invalid_buffers), + config.id, + ) + + def _insert_rows_to_clickhouse( + self, client, config, log_rows, line_rows, valid_buffers + ): + try: + if log_rows: + client.execute( + f"INSERT INTO {config.database}.auditlog_log (" + f"{', '.join(self._CH_LOG_COLUMNS)}) VALUES", + log_rows, + ) + if line_rows: + client.execute( + f"INSERT INTO {config.database}.auditlog_log_line (" + f"{', '.join(self._CH_LINE_COLUMNS)}) VALUES", + line_rows, + ) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse: INSERT failed (will retry) " + "(config=%s buffers=%s logs=%s lines=%s)", + config.id, + len(valid_buffers), + len(log_rows), + len(line_rows), + ) + raise RetryableJobError( + f"ClickHouse insert failed: {exc}", + seconds=60, + ) from exc + + def _delete_flushed_buffers(self, valid_buffers, config) -> None: + try: + valid_buffers.unlink() + except Exception as exc: + _logger.exception( + "auditlog_clickhouse: failed to delete flushed buffers " + "(config=%s buffers=%s)", + config.id, + len(valid_buffers), + ) + valid_buffers._set_error( + self.env._("Flushed to ClickHouse but failed to delete buffer rows: %s") + % exc + ) + else: + _logger.info( + "auditlog_clickhouse: job flushed batch " + "(config=%s flushed_buffers=%s)", + config.id, + len(valid_buffers), + ) + + def _enqueue_next_flush_job_if_needed(self, config, batch_size: int) -> None: + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + return + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + _logger.debug( + "auditlog_clickhouse: more pending buffers detected, enqueue next job " + "(config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + batch_size, + ) + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse: flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, int(batch_size)) + + @api.model + def _job_flush_to_clickhouse(self, config_id: int, batch_size: int) -> None: + """ + Queue job: flush one batch of pending buffers into ClickHouse. + + - Locks pending buffers (SKIP LOCKED) + - Validates payload structure + - Builds CH rows + - INSERTs into CH (retryable) + - Deletes flushed buffers + - Marks invalid payloads as error (non-retryable) + - Enqueues next job if more pending exist + """ + config = self._get_active_config_for_job(config_id) + if not config: + return + + pending_buffers = self.sudo()._lock_pending_buffers(int(batch_size)) + if not pending_buffers: + _logger.debug( + "auditlog_clickhouse: job no-op (no pending buffers) (config=%s)", + config.id, + ) + return + + valid_buffers, invalid_buffers, log_rows, line_rows = ( + self._collect_rows_from_buffers(pending_buffers) + ) + + # Nothing valid: just mark invalids and exit successfully. + if not valid_buffers: + self._mark_invalid_buffers(invalid_buffers, config) + return + + client = config._get_client() + self._insert_rows_to_clickhouse( + client=client, + config=config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + # Delete flushed buffers; if deletion fails, + # mark them as error to avoid re-inserts. + self._delete_flushed_buffers(valid_buffers, config) + + # Mark invalid ones only after successful CH insert + # (so RetryableJobError doesn't rollback the marking) + self._mark_invalid_buffers(invalid_buffers, config) + + # Continue draining queue + self._enqueue_next_flush_job_if_needed(config, int(batch_size)) + + @classmethod + def _build_ch_log_row(cls, log_data: JsonMapping) -> ChRow: + """Convert payload['log'] dict into CH tuple (order matches _CH_LOG_COLUMNS).""" + return ( + int(log_data.get("id") or 0), + cls._to_ch_nullable_string(log_data.get("name")), + int(log_data.get("model_id") or 0), + cls._to_ch_nullable_string(log_data.get("model_name")), + (log_data.get("model_model") or "unknown"), + int(log_data.get("res_id") or 0) + if log_data.get("res_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("res_ids")), + int(log_data.get("user_id") or 0), + (log_data.get("method") or "unknown"), + int(log_data.get("http_request_id") or 0) + if log_data.get("http_request_id") is not None + else None, + int(log_data.get("http_session_id") or 0) + if log_data.get("http_session_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("log_type")), + cls._to_ch_datetime_utc(log_data.get("create_date")), + int(log_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(log_data.get("write_date")), + int(log_data.get("write_uid") or 0) + if log_data.get("write_uid") is not None + else None, + ) + + @classmethod + def _build_ch_line_row(cls, line_data: JsonMapping) -> ChRow: + """ + Convert payload['lines'][] dict into CH + tuple (order matches _CH_LINE_COLUMNS). + """ + return ( + int(line_data.get("id") or 0), + int(line_data.get("log_id") or 0), + int(line_data.get("field_id") or 0), + cls._to_ch_nullable_string(line_data.get("field_name")), + cls._to_ch_nullable_string(line_data.get("field_description")), + cls._to_ch_nullable_string(line_data.get("old_value")), + cls._to_ch_nullable_string(line_data.get("new_value")), + cls._to_ch_nullable_string(line_data.get("old_value_text")), + cls._to_ch_nullable_string(line_data.get("new_value_text")), + cls._to_ch_datetime_utc(line_data.get("create_date")), + int(line_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(line_data.get("write_date")), + int(line_data.get("write_uid") or 0) + if line_data.get("write_uid") is not None + else None, + ) diff --git a/auditlog_clickhouse/models/auditlog_readonly.py b/auditlog_clickhouse/models/auditlog_readonly.py new file mode 100644 index 00000000000..1ea8ff3320e --- /dev/null +++ b/auditlog_clickhouse/models/auditlog_readonly.py @@ -0,0 +1,53 @@ +from odoo import api, models +from odoo.exceptions import UserError + + +def _is_clickhouse_readonly_mode(env) -> bool: + """Return True when ClickHouse mode is active and FDW read is enabled.""" + config = env["auditlog.clickhouse.config"].sudo().get_active_config() + return bool(config and config.fdw_enabled) + + +def _raise_clickhouse_readonly(env) -> None: + """Raise a localized UserError for read-only audit log mode.""" + raise UserError(env._("Audit logs are read-only (stored in ClickHouse).")) + + +class AuditlogLogReadonly(models.Model): + _inherit = "auditlog.log" + + @api.model_create_multi + def create(self, vals_list): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().create(vals_list) + + def write(self, vals): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().write(vals) + + def unlink(self): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().unlink() + + +class AuditlogLogLineReadonly(models.Model): + _inherit = "auditlog.log.line" + + @api.model_create_multi + def create(self, vals_list): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().create(vals_list) + + def write(self, vals): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().write(vals) + + def unlink(self): + if _is_clickhouse_readonly_mode(self.env): + _raise_clickhouse_readonly(self.env) + return super().unlink() diff --git a/auditlog_clickhouse/models/auditlog_rule.py b/auditlog_clickhouse/models/auditlog_rule.py new file mode 100644 index 00000000000..d3e11bdd9c9 --- /dev/null +++ b/auditlog_clickhouse/models/auditlog_rule.py @@ -0,0 +1,362 @@ +import logging +import time +from collections.abc import Mapping, Sequence +from datetime import date, datetime, timezone +from decimal import Decimal +from typing import Any, TypedDict + +from odoo import models + +from odoo.addons.auditlog.models.rule import EMPTY_DICT, FIELDS_BLACKLIST, DictDiffer + +_logger = logging.getLogger(__name__) + + +class _PayloadLog(TypedDict, total=False): + id: str + name: str | None + model_id: int + model_name: str | None + model_model: str + res_id: int | None + res_ids: str | None + user_id: int + method: str + http_request_id: int | None + http_session_id: int | None + log_type: str | None + create_date: str + create_uid: int + + +class _PayloadLine(TypedDict, total=False): + id: str + log_id: str + field_id: int + field_name: str | None + field_description: str | None + old_value: Any | None + new_value: Any | None + old_value_text: Any | None + new_value_text: Any | None + create_date: str + create_uid: int + + +class _Payload(TypedDict): + log: _PayloadLog + lines: list[_PayloadLine] + + +def _json_sanitize(obj: Any) -> Any: + """ + Convert values to JSON-serializable structures. + + This is used when writing payloads into a `fields.Json` column: + - datetime/date -> ISO string + - recordsets -> list of ids + - mappings/sequences -> recursively sanitized + - other unknown types -> string representation + """ + if obj is None or isinstance(obj, (str | int | float | bool)): + return obj + + if isinstance(obj, (datetime | date)): + return obj.isoformat() + + if isinstance(obj, Decimal): + return float(obj) + + if isinstance(obj, bytes): + return obj.decode("utf-8", errors="replace") + + if isinstance(obj, models.BaseModel): + return list(obj.ids) + + if isinstance(obj, Mapping): + return {str(k): _json_sanitize(v) for k, v in obj.items()} + + if isinstance(obj, (list | tuple | set)): + return [_json_sanitize(v) for v in obj] + + return str(obj) + + +class AuditlogRule(models.Model): + _inherit = "auditlog.rule" + + def _next_ids(self, seq_name: str, count: int) -> list[int]: + if count <= 0: + return [] + self.env.cr.execute( + "SELECT nextval(%s::regclass) FROM generate_series(1, %s)", + (seq_name, count), + ) + return [row[0] for row in self.env.cr.fetchall()] + + def _get_rule_settings(self, model_id: int) -> tuple[set[str], bool]: + """Return (fields_to_exclude_set, capture_record) for the given model_id. + + We cache the result on the registry pool to avoid + a DB hit for every audited call. + Cache is naturally reset on registry reload + (auditlog invalidates registry on rule changes). + """ + cache: dict[tuple[int, tuple[int, ...]], tuple[set[str], bool]] = getattr( + self.pool, "_auditlog_clickhouse_rule_cache", {} + ) + if not hasattr(self.pool, "_auditlog_clickhouse_rule_cache"): + self.pool._auditlog_clickhouse_rule_cache = cache + + rules = self.filtered(lambda r: r.model_id.id == model_id) + if not rules: + domain = [("model_id", "=", model_id)] + if "state" in self._fields: + domain.append(("state", "=", "subscribed")) + rules = self.sudo().search(domain) + + key = (model_id, tuple(sorted(rules.ids))) + if key in cache: + return cache[key] + + excluded: set[str] = set(FIELDS_BLACKLIST) + capture_record = False + + if len(rules) > 1: + _logger.warning( + "auditlog_clickhouse: multiple rules found for model_id=%s (rules=%s); " + "using union of excluded fields and any(capture_record).", + model_id, + rules.ids, + ) + for rule in rules: + excluded |= set(rule.fields_to_exclude_ids.mapped("name")) + capture_record = capture_record or bool(rule.capture_record) + + cache[key] = (excluded, capture_record) + return cache[key] + + def _get_audit_model_id(self, res_model: str) -> int: + """ + Resolve `ir.model` id for a given model name. + + Prefer auditlog's in-memory model cache (filled by auditlog hooks) to avoid + extra DB lookups. If cache is missing, fall back to `ir.model._get()`. + + Args: + res_model: Technical model name (e.g. "res.partner"). + + Returns: + The `ir.model` record id for the given model name. + """ + model_id = getattr(self.pool, "_auditlog_model_cache", {}).get(res_model) + if model_id: + return int(model_id) + return int(self.env["ir.model"].sudo()._get(res_model).id) + + def _dump_payload_json(self, payload: dict[str, Any]) -> dict[str, Any]: + return _json_sanitize(payload) + + # flake8: noqa: C901 + def create_logs( + self, + uid: int, + res_model: str, + res_ids: Sequence[int], + method: str, + old_values: Mapping[int, Mapping[str, Any]] | None = None, + new_values: Mapping[int, Mapping[str, Any]] | None = None, + additional_log_values: Mapping[str, Any] | None = None, + ) -> None: + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + return super().create_logs( + uid, + res_model, + res_ids, + method, + old_values=old_values, + new_values=new_values, + additional_log_values=additional_log_values, + ) + + started = time.monotonic() + old_values = old_values or EMPTY_DICT + new_values = new_values or EMPTY_DICT + additional_log_values = dict(additional_log_values or {}) + log_type = additional_log_values.get("log_type") + + model_id = self._get_audit_model_id(res_model) + model_rs = self.env[res_model] + fields_to_exclude_set, capture_record = self._get_rule_settings(model_id) + + now_iso = datetime.now(timezone.utc).isoformat(timespec="milliseconds") + model_rec = self.env["ir.model"].sudo().browse(model_id) + + # IMPORTANT: do it like auditlog does (not from additional_log_values) + http_request_id = ( + self.env["auditlog.http.request"].current_http_request() or None + ) + http_session_id = ( + self.env["auditlog.http.session"].current_http_session() or None + ) + + base_log: dict[str, Any] = { + "model_id": int(model_id), + "model_name": model_rec.name, + "model_model": model_rec.model, + "user_id": int(uid), + "method": method, + "http_request_id": http_request_id, + "http_session_id": http_session_id, + "log_type": log_type, + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + + buffer_model = ( + self.env["auditlog.log.buffer"].sudo().with_context(tracking_disable=True) + ) + + # export_data is special (no lines) + if method == "export_data": + log_id = int(self._next_ids("auditlog_log_id_seq", 1)[0]) + payload: _Payload = { + "log": { + "id": log_id, + "name": res_model, + "res_id": None, + "res_ids": str(list(res_ids)), + **base_log, + }, + "lines": [], + } + buffer_model.create([{"payload_json": self._dump_payload_json(payload)}]) + _logger.debug( + "auditlog_clickhouse: create_logs end export_data (elapsed=%.3fs)", + time.monotonic() - started, + ) + return + + include_lines_on_unlink = method == "unlink" and capture_record + + if method == "create": + line_builder = self._prepare_log_line_vals_on_create + values_src = (new_values,) + elif method == "read": + line_builder = self._prepare_log_line_vals_on_read + values_src = (old_values,) + elif method == "write": + line_builder = self._prepare_log_line_vals_on_write + values_src = (old_values, new_values) + elif include_lines_on_unlink: + line_builder = self._prepare_log_line_vals_on_read + values_src = (old_values,) + else: + line_builder = None + values_src = () + + log_ids = self._next_ids("auditlog_log_id_seq", len(res_ids)) + payloads: list[tuple[_PayloadLog, list[_PayloadLine]]] = [] + total_lines = 0 + + for idx, res_id in enumerate(res_ids): + log_id = int(log_ids[idx]) + record = model_rs.browse(res_id) + + log: _PayloadLog = { + "id": log_id, + "name": record.display_name, + "res_id": int(res_id), + "res_ids": None, + **base_log, + } + + diff = DictDiffer( + dict(new_values.get(res_id, EMPTY_DICT)), + dict(old_values.get(res_id, EMPTY_DICT)), + ) + + if method == "create": + fields_list = diff.added() + elif method == "read" or include_lines_on_unlink: + fields_list = old_values.get(res_id, EMPTY_DICT).keys() + elif method == "write": + fields_list = diff.changed() + else: + fields_list = () + + lines: list[_PayloadLine] = [] + if line_builder: + one_source = method in ("create", "read") or include_lines_on_unlink + log_ctx = {"res_id": res_id, "model_id": model_id, "log_type": log_type} + + for field_name in fields_list: + if field_name in fields_to_exclude_set: + continue + field = self._get_field(model_id, field_name) + if not field: + continue + + if one_source: + vals = line_builder(log_ctx, field, values_src[0]) + else: + vals = line_builder( + log_ctx, field, values_src[0], values_src[1] + ) + + lines.append( + { + "id": 0, + "log_id": log_id, + "field_id": int(field["id"]), + "field_name": field.get("name"), + "field_description": field.get("field_description"), + "old_value": vals.get("old_value"), + "new_value": vals.get("new_value"), + "old_value_text": vals.get("old_value_text"), + "new_value_text": vals.get("new_value_text"), + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + ) + + payloads.append((log, lines)) + total_lines += len(lines) + + # Assign line ids in one batch (Int64) + line_ids: list[int] = self._next_ids("auditlog_log_line_id_seq", total_lines) + pos = 0 + + buffer_vals_list: list[dict[str, Any]] = [] + for log, lines in payloads: + for line in lines: + line["id"] = int(line_ids[pos]) + pos += 1 + + if method == "unlink" or lines: + buffer_vals_list.append( + { + "payload_json": self._dump_payload_json( + {"log": log, "lines": lines} + ) + } + ) + + if buffer_vals_list: + buffer_model.create(buffer_vals_list) + + _logger.debug( + "auditlog_clickhouse: create_logs end (model=%s method=%s res_ids=%s " + "payloads=%s lines=%s elapsed=%.3fs)", + res_model, + method, + len(res_ids), + len(buffer_vals_list), + total_lines, + time.monotonic() - started, + ) diff --git a/auditlog_clickhouse/models/clickhouse_client.py b/auditlog_clickhouse/models/clickhouse_client.py new file mode 100644 index 00000000000..a481807c17e --- /dev/null +++ b/auditlog_clickhouse/models/clickhouse_client.py @@ -0,0 +1,58 @@ +from collections.abc import Mapping +from typing import Any + +from odoo import _ +from odoo.exceptions import UserError + +try: + from clickhouse_driver import Client as ClickHouseClient +except ImportError: + ClickHouseClient = None + + +def _require_driver() -> None: + """Ensure clickhouse-driver is importable in the current Odoo environment.""" + if ClickHouseClient is None: + raise UserError( + _( + "Python package 'clickhouse-driver' is not available. " + "Install it in the Odoo environment to use ClickHouse storage." + ) + ) + + +def get_clickhouse_client( + *, + host: str, + port: int, + database: str, + user: str, + password: str | None = None, + settings: Mapping[str, Any] | None = None, +) -> "ClickHouseClient": + """Create and return a ClickHouse client (clickhouse-driver). + + Args: + host: ClickHouse host or IP. + port: ClickHouse TCP port (native protocol). + database: Default database to use for queries. + user: ClickHouse username. + password: ClickHouse password (optional). + settings: Optional clickhouse-driver settings dict. + + Returns: + clickhouse_driver.Client instance configured for native TCP protocol. + + Raises: + UserError: If the clickhouse-driver package is not installed. + """ + _require_driver() + # `settings` is passed as-is to clickhouse-driver, keep it optional and immutable. + return ClickHouseClient( + host=host, + port=port, + database=database, + user=user, + password=password or "", + settings=dict(settings or {}), + ) diff --git a/auditlog_clickhouse/pyproject.toml b/auditlog_clickhouse/pyproject.toml new file mode 100644 index 00000000000..4231d0cccb3 --- /dev/null +++ b/auditlog_clickhouse/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/auditlog_clickhouse/readme/CONFIGURE.md b/auditlog_clickhouse/readme/CONFIGURE.md new file mode 100644 index 00000000000..520579c4464 --- /dev/null +++ b/auditlog_clickhouse/readme/CONFIGURE.md @@ -0,0 +1,30 @@ +This module requires: + +- A reachable ClickHouse server. +- Python dependency `clickhouse-driver` available in the Odoo environment. +- A ClickHouse database created in advance (the module does **not** create databases/users/grants). +- A ClickHouse user with at least: + - `INSERT` and `CREATE TABLE` privileges on the target database. +- The `pg_clickhouse` extension installed on the PostgreSQL server. + +Steps: + +- Make sure `clickhouse-driver` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + +| Field | +|:-----| +| Hostname or IP | +| TCP port | +| ClickHouse database name | +| ClickHouse user | +| ClickHouse Password | +| queue_job_batch_size (default = 1000) | +| channel_id (default root) | + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in the target database. +- Click **Setup FDW read** to configure the Foreign Data Wrapper so that standard Odoo audit log views read data directly from ClickHouse. diff --git a/auditlog_clickhouse/readme/CONTEXT.md b/auditlog_clickhouse/readme/CONTEXT.md new file mode 100644 index 00000000000..af8999013bb --- /dev/null +++ b/auditlog_clickhouse/readme/CONTEXT.md @@ -0,0 +1,5 @@ +The auditlog module stores audit data in PostgreSQL. In production systems with extensive audit rules, these tables grow without limits, causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by base.group_system) have full CRUD access to audit tables, allowing audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same transaction and performs multiple ORM create() calls, adding latency to audited operations. diff --git a/auditlog_clickhouse/readme/DESCRIPTION.md b/auditlog_clickhouse/readme/DESCRIPTION.md new file mode 100644 index 00000000000..39ff8261df2 --- /dev/null +++ b/auditlog_clickhouse/readme/DESCRIPTION.md @@ -0,0 +1,6 @@ +This module implements buffered asynchronous transfers of audit logs from PostgreSQL to ClickHouse. +Storing audit data in a columnar database that is write-only prevents database bloat, makes audit records effectively +immutable, and allows for scaling to very large volumes of logs without slowing down normal transactions. +Audit logs are written asynchronously to reduce the load on business operations. +Audit logs stored in ClickHouse are displayed in standard Odoo audit log views (logs, log lines, +forms with detailed log information) without any changes to existing view definitions. diff --git a/auditlog_clickhouse/readme/USAGE.md b/auditlog_clickhouse/readme/USAGE.md new file mode 100644 index 00000000000..459df10b451 --- /dev/null +++ b/auditlog_clickhouse/readme/USAGE.md @@ -0,0 +1,12 @@ +Once auditlog_clickhouse is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) on models with active auditlog.rule subscriptions. + This behavior is unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or deleted via Odoo. + +All standard Odoo audit log views work as expected - logs, log lines, and forms with detailed log data display data from ClickHouse. +Search, filtering, and grouping (by user, model, date, session, query) work through FDW with the query being forwarded to ClickHouse. +The “View logs” quick access button in audited model forms works as expected. +Audit logs are read-only. Attempting to modify or delete a log entry from the user interface raises an error. diff --git a/auditlog_clickhouse/security/ir.model.access.csv b/auditlog_clickhouse/security/ir.model.access.csv new file mode 100644 index 00000000000..7fe73ca3790 --- /dev/null +++ b/auditlog_clickhouse/security/ir.model.access.csv @@ -0,0 +1,3 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_auditlog_clickhouse_config_manager,auditlog.clickhouse.config manager,model_auditlog_clickhouse_config,auditlog.group_auditlog_manager,1,1,1,1 +access_auditlog_log_buffer_no_one,auditlog.log.buffer (no one),model_auditlog_log_buffer,base.group_no_one,1,1,1,1 diff --git a/auditlog_clickhouse/static/description/index.html b/auditlog_clickhouse/static/description/index.html new file mode 100644 index 00000000000..09c5da766a7 --- /dev/null +++ b/auditlog_clickhouse/static/description/index.html @@ -0,0 +1,522 @@ + + + + + +Audit Log ClickHouse store and read + + + +
+

Audit Log ClickHouse store and read

+ + +

Beta License: AGPL-3 OCA/server-tools Translate me on Weblate Try me on Runboat

+

This module implements buffered asynchronous transfers of audit logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations. Audit logs +stored in ClickHouse are displayed in standard Odoo audit log views +(logs, log lines, forms with detailed log information) without any +changes to existing view definitions.

+

Table of contents

+ +
+

Use Cases / Context

+

The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues:

+
    +
  • Database bloat;
  • +
  • Immutability gap: Members of group_auditlog_manager (implied by +base.group_system) have full CRUD access to audit tables, allowing +audit records to be altered or deleted via UI, ORM, or SQL;
  • +
  • Performance overhead: Audit logging runs synchronously in the same +transaction and performs multiple ORM create() calls, adding latency +to audited operations.
  • +
+
+
+

Configuration

+

This module requires:

+
    +
  • A reachable ClickHouse server.
  • +
  • Python dependency clickhouse-driver available in the Odoo +environment.
  • +
  • A ClickHouse database created in advance (the module does not +create databases/users/grants).
  • +
  • A ClickHouse user with at least:
      +
    • INSERT and CREATE TABLE privileges on the target database.
    • +
    +
  • +
  • The pg_clickhouse extension installed on the PostgreSQL server.
  • +
+

Steps:

+
    +
  • Make sure clickhouse-driver is available in your system.
  • +
  • Install the module.
  • +
  • Configure the connection parameters in Odoo:
      +
    • Settings > Technical > Auditlog > Clickhouse configuration
    • +
    • Fill in the following parameters:
    • +
    +
  • +
+ +++ + + + + + + + + + + + + + + + + + + + + +
Field
Hostname or IP
TCP port
ClickHouse database name
ClickHouse user
ClickHouse Password
queue_job_batch_size (default = 1000)
channel_id (default root)
+
    +
  • Click Test connection.
  • +
  • Optionally, click Create Auditlog Tables to create the tables in +the target database.
  • +
  • Click Setup FDW read to configure the Foreign Data Wrapper so that +standard Odoo audit log views read data directly from ClickHouse.
  • +
+
+
+

Usage

+

Once auditlog_clickhouse is installed and configured:

+
    +
  • Users perform tracked operations (create, write, unlink, read, export) +on models with active auditlog.rule subscriptions. This behavior is +unchanged from the base auditlog module.
  • +
  • Log data is serialized and stored in the local auditlog.log.buffer +table instantly. The standard auditlog tables are not populated.
  • +
  • Every 5 minutes (default), the Cron job runs, pushes data to +ClickHouse, and cleans the local buffer.
  • +
  • Data is permanently stored in ClickHouse and cannot be modified or +deleted via Odoo.
  • +
+

All standard Odoo audit log views work as expected - logs, log lines, +and forms with detailed log data display data from ClickHouse. Search, +filtering, and grouping (by user, model, date, session, query) work +through FDW with the query being forwarded to ClickHouse. The “View +logs” quick access button in audited model forms works as expected. +Audit logs are read-only. Attempting to modify or delete a log entry +from the user interface raises an error.

+
+
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • Cetmix
  • +
+
+
+

Maintainers

+

This module is maintained by the OCA.

+ +Odoo Community Association + +

OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use.

+

This module is part of the OCA/server-tools project on GitHub.

+

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

+
+
+
+ + diff --git a/auditlog_clickhouse/tests/__init__.py b/auditlog_clickhouse/tests/__init__.py new file mode 100644 index 00000000000..ca453c38480 --- /dev/null +++ b/auditlog_clickhouse/tests/__init__.py @@ -0,0 +1,2 @@ +from . import test_auditlog_clickhouse +from . import test_clickhouse_config diff --git a/auditlog_clickhouse/tests/common.py b/auditlog_clickhouse/tests/common.py new file mode 100644 index 00000000000..defb67cdd21 --- /dev/null +++ b/auditlog_clickhouse/tests/common.py @@ -0,0 +1,77 @@ +import contextlib +from unittest.mock import patch + +from odoo.addons.auditlog.tests.common import AuditLogRuleCommon + + +class DummyClickHouseClient: + """Tiny fake clickhouse client collecting execute() calls.""" + + def __init__(self, *, raise_on_insert: bool = False): + self.raise_on_insert = raise_on_insert + self.calls = [] # list[(query, params)] + + def execute(self, query, params=None): + self.calls.append((query, params)) + q = (query or "").strip().upper() + if q.startswith("SELECT"): + return [(1,)] + if self.raise_on_insert and "INSERT INTO" in q: + raise Exception("Simulated ClickHouse insert error") + return [] + + +class AuditLogClickhouseCommon(AuditLogRuleCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._cleanup_clickhouse_test_data() + + @classmethod + def tearDownClass(cls): + try: + cls._cleanup_clickhouse_test_data() + finally: + super().tearDownClass() + + @classmethod + def _cleanup_clickhouse_test_data(cls): + """Ensure clean state for configs and buffer across suites.""" + cls.env["auditlog.clickhouse.config"].sudo().search([]).write( + {"is_active": False} + ) + cls.env["auditlog.log.buffer"].sudo().search([]).unlink() + + @classmethod + def create_config(cls, **vals): + """Create ClickHouse config with minimal defaults for tests.""" + defaults = { + "host": "localhost", + "port": 9000, + "database": "db", + "user": "user", + "password": "pass", + "is_active": False, + } + defaults.update(vals) + return ( + cls.env["auditlog.clickhouse.config"] + .with_context(tracking_disable=True) + .create(defaults) + ) + + @contextlib.contextmanager + def _patched_clickhouse_client(self, *, raise_on_insert: bool = False): + """Patch ClickHouse client getter so tests don't require real ClickHouse.""" + dummy = DummyClickHouseClient(raise_on_insert=raise_on_insert) + target = ( + "odoo.addons.auditlog_clickhouse.models." + "auditlog_clickhouse_config.get_clickhouse_client" + ) + with patch(target, autospec=True, return_value=dummy): + yield dummy + + def _parse_payloads(self): + """Return list of decoded payload dicts from buffer (oldest first).""" + buf = self.env["auditlog.log.buffer"].sudo().search([], order="id asc") + return [rec.payload_json for rec in buf] diff --git a/auditlog_clickhouse/tests/test_auditlog_clickhouse.py b/auditlog_clickhouse/tests/test_auditlog_clickhouse.py new file mode 100644 index 00000000000..b3b568d43cf --- /dev/null +++ b/auditlog_clickhouse/tests/test_auditlog_clickhouse.py @@ -0,0 +1,252 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from .common import AuditLogClickhouseCommon + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseBuffer(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.groups_model_id = cls.env.ref("base.model_res_groups").id + + # Rule for groups: full logging + cls.groups_rule = cls.create_rule( + { + "name": "testrule groups clickhouse", + "model_id": cls.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_export_data": True, + "log_type": "full", + "capture_record": False, + } + ) + + # Active config to enable buffering + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.groups_rule.subscribe() + + def test_01_create_writes_to_buffer_not_auditlog_tables(self): + buf = self.env["auditlog.log.buffer"].sudo() + log_model = self.env["auditlog.log"] + + start_buf = buf.search_count([]) + start_logs = log_model.search_count([("model_id", "=", self.groups_model_id)]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_1"}) + ) + + self.assertEqual( + log_model.search_count([("model_id", "=", self.groups_model_id)]) + - start_logs, + 0, + "auditlog.log must NOT be written by auditlog_clickhouse", + ) + self.assertEqual(buf.search_count([]) - start_buf, 1) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "create") + self.assertEqual(payload["log"]["model_id"], self.groups_model_id) + self.assertEqual(payload["log"]["res_id"], group.id) + + def test_02_write_creates_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "CH Group"}) + ) + group.with_context(tracking_disable=True).write({"name": "CH Group v2"}) + + self.assertGreater(buf.search_count([]), start_buf) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "write") + self.assertEqual(payload["log"]["model_model"], "res.groups") + + field_names = {line.get("field_name") for line in payload["lines"]} + self.assertIn("name", field_names) + + def test_03_export_data_creates_single_payload_no_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + self.env["res.groups"].search([]).export_data(["name"]) + + self.assertEqual(buf.search_count([]) - start_buf, 1) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "export_data") + self.assertEqual(payload["lines"], []) + + def test_04_unlink_is_logged(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + g = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_unlink"}) + ) + g.unlink() + + self.assertGreater(buf.search_count([]), start_buf) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "unlink") + self.assertIsInstance(payload["lines"], list) + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseQueueJobs(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.partner_model_id = cls.env.ref("base.model_res_partner").id + cls.rule = cls.create_rule( + { + "name": "testrule partner clickhouse queue", + "model_id": cls.partner_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + } + ) + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.rule.subscribe() + + def test_01_cron_enqueues_job_and_does_not_flush_inline(self): + """ + Cron must only enqueue queue.job (no direct ClickHouse INSERTs here). + """ + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Cron Enqueue Test"}) + ) + partner.with_context(tracking_disable=True).write( + {"name": "Cron Enqueue Test v2"} + ) + + self.assertGreater(buf.search_count([]), 0) + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() # uses config.queue_batch_size + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, + 1, + "Cron must enqueue exactly one job", + ) + + job = job_model.search([], order="id desc", limit=1) + self.assertEqual(job.model_name, "auditlog.log.buffer") + self.assertEqual(job.method_name, "_job_flush_to_clickhouse") + self.assertEqual(job.args[0], self.config.id) + self.assertEqual(job.args[1], self.config.queue_batch_size) + + expected_channel = ( + self.config.queue_channel_id.complete_name + if self.config.queue_channel_id + else "root" + ) + self.assertEqual(job.channel, expected_channel) + + def test_02_cron_skips_when_no_pending_buffers(self): + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + # Ensure no pending buffers + buf.search([]).unlink() + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No pending buffers -> no job" + ) + + def test_03_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + start_jobs = job_model.search_count([]) + rec = buf.create( + {"payload_json": {"log": {}, "lines": []}, "state": buf.STATE_PENDING} + ) + + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No active config -> no job" + ) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + def test_04_job_flush_success_deletes_buffers_and_calls_insert(self): + buf = self.env["auditlog.log.buffer"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Job Flush OK"}) + ) + partner.with_context(tracking_disable=True).write({"name": "Job Flush OK v2"}) + + self.assertGreater(buf.search_count([]), 0) + + with self._patched_clickhouse_client() as dummy: + buf._job_flush_to_clickhouse(self.config.id, self.config.queue_batch_size) + + self.assertEqual( + buf.search_count([]), + 0, + "Buffers must be removed after successful job flush", + ) + + insert_calls = [ + q for (q, _params) in dummy.calls if "INSERT INTO" in (q or "").upper() + ] + self.assertTrue(insert_calls, "Job must insert into ClickHouse") + + def test_05_job_invalid_payload_marks_error_and_keeps_row(self): + buf = self.env["auditlog.log.buffer"].sudo() + + # Invalid structure for payload_json (Json field accepts string, + # but our code expects mapping with log/lines) + rec = buf.create( + {"payload_json": "NOT A JSON OBJECT", "state": buf.STATE_PENDING} + ) + + with mute_logger("odoo.addons.auditlog_clickhouse.models.auditlog_log_buffer"): + buf._job_flush_to_clickhouse(self.config.id, batch_size=10) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_ERROR) + self.assertTrue(rec.error_message) + self.assertGreaterEqual(rec.attempt_count, 1) diff --git a/auditlog_clickhouse/tests/test_clickhouse_config.py b/auditlog_clickhouse/tests/test_clickhouse_config.py new file mode 100644 index 00000000000..cf5bee29835 --- /dev/null +++ b/auditlog_clickhouse/tests/test_clickhouse_config.py @@ -0,0 +1,67 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from .common import AuditLogClickhouseCommon + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseConfig(AuditLogClickhouseCommon): + def test_01_single_active_on_create(self): + cfg1 = self.create_config(is_active=True, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg2.is_active) + self.assertFalse(cfg1.is_active) + + def test_02_single_active_on_write(self): + cfg1 = self.create_config(is_active=False, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.write({"is_active": True}) + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg1.is_active) + self.assertFalse(cfg2.is_active) + + def test_03_test_connection_uses_client(self): + cfg = self.create_config(is_active=True) + + # Without patch, get_clickhouse_client may + # raise if clickhouse-driver isn't installed + with self._patched_clickhouse_client() as dummy: + action = cfg.action_test_connection() + + self.assertTrue(action) + self.assertTrue(any("SELECT 1" in (q or "") for (q, params) in dummy.calls)) + + def test_04_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + rec = buf.create({"payload_json": "NOT A JSON", "state": buf.STATE_PENDING}) + + with self._patched_clickhouse_client() as dummy: + with mute_logger( + "odoo.addons.auditlog_clickhouse.models.auditlog_log_buffer" + ): + res = buf._cron_flush_to_clickhouse(batch_size=10) + + self.assertTrue(res) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + self.assertFalse(dummy.calls) diff --git a/auditlog_clickhouse/views/auditlog_clickhouse_config_views.xml b/auditlog_clickhouse/views/auditlog_clickhouse_config_views.xml new file mode 100644 index 00000000000..c724ccd7762 --- /dev/null +++ b/auditlog_clickhouse/views/auditlog_clickhouse_config_views.xml @@ -0,0 +1,121 @@ + + + Configure flush action + ir.cron + + ir.actions.act_window + form + new + + + + auditlog.clickhouse.config.form + auditlog.clickhouse.config + +
+
+
+ + + + + + + + + + + + + + + + + + + +
+
+
+ + +
+ Logs are buffered in PostgreSQL and periodically flushed to + ClickHouse by cron. +
+
+
+
+
+
+ + + auditlog.clickhouse.config.list + auditlog.clickhouse.config + + + + + + + + + + + + + + ClickHouse Configuration + auditlog.clickhouse.config + list,form + + + +
diff --git a/requirements.txt b/requirements.txt index 5d1fefa6f23..d3e18b80a6d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # generated from manifests external_dependencies +clickhouse-driver cryptography dataclasses odoo_test_helper