From 1f73a32fb50a28aa7c694c0bf6a412866f505772 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 24 Dec 2025 12:56:30 -0800 Subject: [PATCH 1/5] Add sample dag for metadata db cleanup --- .../dags/other_examples/airflow_db_cleanup.py | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 orchestrate/dags/other_examples/airflow_db_cleanup.py diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py new file mode 100644 index 00000000..9c7a1aed --- /dev/null +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -0,0 +1,202 @@ +""" +## Airflow Database Cleanup DAG + +This DAG automates the cleanup of Airflow's metadata database using the `airflow db clean` command. + +### Why is this needed? +Airflow retains all historical task data indefinitely. Over time, tables like `dag_run`, `job`, +`log`, `task_instance`, and `xcom` can grow significantly, causing: +- Delayed task scheduling and DAG parsing +- Sluggish UI performance +- Increased storage costs + +### How it works +1. **db_cleanup**: Executes `airflow db clean` to remove old records +2. **drop_archive_tables**: Removes temporary archive tables created during cleanup + +### Parameters +- **clean_before_timestamp**: Delete records older than this date (default: 180 days ago) +- **tables**: Which metadata tables to clean (default: all eligible tables) +- **dry_run**: Preview SQL commands without executing (default: True for safety) + +### Usage +1. Trigger the DAG manually from the Airflow UI +2. Configure parameters as needed +3. For first run, use `dry_run=True` to preview changes +4. Review logs, then run with `dry_run=False` to execute cleanup +""" + +from datetime import datetime, timedelta + +from airflow.decorators import dag, task +from airflow.models.param import Param + +from orchestrate.utils import datacoves_utils + + +# Tables eligible for cleanup via `airflow db clean` +CLEANABLE_TABLES = [ + "callback_request", + "dag_run", + "dataset_event", + "import_error", + "job", + "log", + "rendered_task_instance_fields", + "sla_miss", + "task_fail", + "task_instance", + "task_reschedule", + "trigger", + "xcom", +] + + +def get_default_clean_before_date() -> str: + """Returns date 180 days ago as default cleanup cutoff.""" + return (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%d") + + +@dag( + doc_md=__doc__, + catchup=False, + default_args=datacoves_utils.set_default_args( + owner="Noel Gomez", + owner_email="noel@example.com" + ), + # Manual trigger only - no schedule + schedule=None, + description="Clean up Airflow metadata database to improve performance", + tags=["maintenance", "cleanup", "database"], +) +def airflow_db_cleanup(): + + @task() + def db_cleanup( + clean_before_timestamp: str, + tables: list, + dry_run: bool, + ) -> dict: + """ + Execute the airflow db clean command. + + This removes old records from metadata tables while preserving + the most recent data needed for operations. + """ + import subprocess + + # Build the command + cmd = [ + "airflow", + "db", + "clean", + "--clean-before-timestamp", + clean_before_timestamp, + "--skip-archive", + "--yes", + ] + + # Add tables to clean + for table in tables: + cmd.extend(["--tables", table]) + + # Add dry run flag if enabled + if dry_run: + cmd.append("--dry-run") + + print(f"Executing command: {' '.join(cmd)}") + + # Execute the command + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + + print(result.stdout) + if result.stderr: + print(result.stderr) + + if result.returncode != 0: + raise Exception( + f"airflow db clean failed with return code {result.returncode}" + ) + + return { + "status": "success", + "dry_run": dry_run, + "clean_before": clean_before_timestamp, + "tables_cleaned": tables, + } + + @task(trigger_rule="all_done") + def drop_archive_tables(cleanup_result: dict) -> str: + """ + Drop any archive tables created during cleanup. + + Archive tables are temporary tables that may be left behind if + the cleanup process times out or fails. This task ensures they + are removed. + + Uses trigger_rule='all_done' to run even if db_cleanup fails. + """ + import subprocess + + cmd = [ + "airflow", + "db", + "drop-archived", + "--yes", + ] + + print(f"Executing command: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + + print(result.stdout) + if result.stderr: + print(result.stderr) + + if result.returncode != 0: + raise Exception( + f"airflow db drop-archived failed with return code {result.returncode}" + ) + + return "Archive tables dropped successfully" + + # Execute tasks + cleanup_result = db_cleanup( + clean_before_timestamp="{{ params.clean_before_timestamp }}", + tables="{{ params.tables }}", + dry_run="{{ params.dry_run }}", + ) + + drop_archive_tables(cleanup_result) + + +airflow_db_cleanup.params = { + "clean_before_timestamp": Param( + default=get_default_clean_before_date(), + type="string", + format="date", + description="Delete records older than this date (YYYY-MM-DD)", + ), + "tables": Param( + default=CLEANABLE_TABLES, + type="array", + description="Metadata tables to clean (leave default for all eligible tables)", + ), + "dry_run": Param( + default=True, + type="boolean", + description="Preview SQL commands without executing (recommended for first run)", + ), +} + +airflow_db_cleanup() From c7f52d9ece96478b684fece5764fadb3c92ea9fc Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 24 Dec 2025 13:16:02 -0800 Subject: [PATCH 2/5] Update cleanup dag --- .../dags/other_examples/airflow_db_cleanup.py | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py index 9c7a1aed..6abe9800 100644 --- a/orchestrate/dags/other_examples/airflow_db_cleanup.py +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -68,6 +68,24 @@ def get_default_clean_before_date() -> str: schedule=None, description="Clean up Airflow metadata database to improve performance", tags=["maintenance", "cleanup", "database"], + params={ + "clean_before_timestamp": Param( + default=get_default_clean_before_date(), + type="string", + format="date", + description="Delete records older than this date (YYYY-MM-DD)", + ), + "tables": Param( + default=CLEANABLE_TABLES, + type="array", + description="Metadata tables to clean (leave default for all eligible tables)", + ), + "dry_run": Param( + default=True, + type="boolean", + description="Preview SQL commands without executing (recommended for first run)", + ), + }, ) def airflow_db_cleanup(): @@ -180,23 +198,4 @@ def drop_archive_tables(cleanup_result: dict) -> str: drop_archive_tables(cleanup_result) -airflow_db_cleanup.params = { - "clean_before_timestamp": Param( - default=get_default_clean_before_date(), - type="string", - format="date", - description="Delete records older than this date (YYYY-MM-DD)", - ), - "tables": Param( - default=CLEANABLE_TABLES, - type="array", - description="Metadata tables to clean (leave default for all eligible tables)", - ), - "dry_run": Param( - default=True, - type="boolean", - description="Preview SQL commands without executing (recommended for first run)", - ), -} - airflow_db_cleanup() From bc6fac00e50d74c2dc0fbc25caf1fbec60307358 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Wed, 24 Dec 2025 13:32:13 -0800 Subject: [PATCH 3/5] Update cleanup dag --- orchestrate/dags/other_examples/airflow_db_cleanup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py index 6abe9800..ac043817 100644 --- a/orchestrate/dags/other_examples/airflow_db_cleanup.py +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -68,6 +68,7 @@ def get_default_clean_before_date() -> str: schedule=None, description="Clean up Airflow metadata database to improve performance", tags=["maintenance", "cleanup", "database"], + render_template_as_native_obj=True, params={ "clean_before_timestamp": Param( default=get_default_clean_before_date(), From 6d9abe50993415d40e6ac76b2a520cda8890cdb3 Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Fri, 26 Dec 2025 14:04:07 -0800 Subject: [PATCH 4/5] update validations and clean up dag --- .../dags/other_examples/airflow_db_cleanup.py | 13 ++++++++----- orchestrate/test_dags/validate_dags.py | 17 +++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py index ac043817..1a8231ba 100644 --- a/orchestrate/dags/other_examples/airflow_db_cleanup.py +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -2,6 +2,7 @@ ## Airflow Database Cleanup DAG This DAG automates the cleanup of Airflow's metadata database using the `airflow db clean` command. +To learn more check out https://www.astronomer.io/docs/learn/2.x/cleanup-dag-tutorial ### Why is this needed? Airflow retains all historical task data indefinitely. Over time, tables like `dag_run`, `job`, @@ -33,30 +34,29 @@ from orchestrate.utils import datacoves_utils - # Tables eligible for cleanup via `airflow db clean` CLEANABLE_TABLES = [ "callback_request", + "dag", "dag_run", "dataset_event", "import_error", "job", "log", - "rendered_task_instance_fields", + "session", "sla_miss", "task_fail", "task_instance", + "task_instance_history", "task_reschedule", "trigger", - "xcom", + "xcom" ] - def get_default_clean_before_date() -> str: """Returns date 180 days ago as default cleanup cutoff.""" return (datetime.now() - timedelta(days=180)).strftime("%Y-%m-%d") - @dag( doc_md=__doc__, catchup=False, @@ -87,6 +87,9 @@ def get_default_clean_before_date() -> str: description="Preview SQL commands without executing (recommended for first run)", ), }, + + description="Clean up Airflow metadata database to improve performance", + tags=["maintenance"], ) def airflow_db_cleanup(): diff --git a/orchestrate/test_dags/validate_dags.py b/orchestrate/test_dags/validate_dags.py index e8398dc2..9ee44ee1 100644 --- a/orchestrate/test_dags/validate_dags.py +++ b/orchestrate/test_dags/validate_dags.py @@ -9,14 +9,15 @@ from airflow.models import DagBag APPROVED_TAGS = {'extract_and_load', - 'transform', - 'python_script', - 'ms_teams_notification', - 'slack_notification', - 'marketing_automation', - 'update_catalog', - 'parameters', - 'sample'} + 'maintenance', + 'marketing_automation', + 'ms_teams_notification', + 'parameters', + 'python_script', + 'sample', + 'slack_notification', + 'transform', + 'update_catalog'} ALLOWED_OPERATORS = [ "_PythonDecoratedOperator", # this allows the @task decorator From 6ff7701f5a2b6615986d6d7ad285c48b2c5f2fee Mon Sep 17 00:00:00 2001 From: Noel Gomez Date: Fri, 26 Dec 2025 14:07:53 -0800 Subject: [PATCH 5/5] remove duplicate description --- orchestrate/dags/other_examples/airflow_db_cleanup.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/orchestrate/dags/other_examples/airflow_db_cleanup.py b/orchestrate/dags/other_examples/airflow_db_cleanup.py index 1a8231ba..091697f4 100644 --- a/orchestrate/dags/other_examples/airflow_db_cleanup.py +++ b/orchestrate/dags/other_examples/airflow_db_cleanup.py @@ -66,8 +66,6 @@ def get_default_clean_before_date() -> str: ), # Manual trigger only - no schedule schedule=None, - description="Clean up Airflow metadata database to improve performance", - tags=["maintenance", "cleanup", "database"], render_template_as_native_obj=True, params={ "clean_before_timestamp": Param( @@ -87,7 +85,6 @@ def get_default_clean_before_date() -> str: description="Preview SQL commands without executing (recommended for first run)", ), }, - description="Clean up Airflow metadata database to improve performance", tags=["maintenance"], )