Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
237 commits
Select commit Hold shift + click to select a range
ea36004
Turning follow_external_dependency into bool or dict property
siklosid Sep 30, 2023
5113752
Handling the new dict format of follow_external_dependency
siklosid Sep 30, 2023
fc2a2e2
Changing test case to see if it handles sensor parameters properly
siklosid Sep 30, 2023
d0780dd
Merge pull request #23 from chocoapp/feature/sensor_parameters
siklosid Oct 9, 2023
db6dee2
upgrade version of tenacity
Oct 23, 2023
bd5adcf
Merge pull request #24 from chocoapp/feature/DATA-1562-upgrade-tenacity
claudiazi Oct 26, 2023
d463438
added new dbt config parser module
kiranvasudev Nov 15, 2023
a91a58f
added class with constructor
kiranvasudev Nov 15, 2023
102620e
added method to parse dbt model inputs
kiranvasudev Nov 15, 2023
c61e656
added functions to generate dagger input and outputs for dbt models
kiranvasudev Nov 15, 2023
29858a9
added fn to generate io for dbt task
kiranvasudev Nov 15, 2023
1a93f05
black format
kiranvasudev Nov 15, 2023
847f622
use new module in fn to generate configs
kiranvasudev Nov 15, 2023
104013a
changed output bucket
kiranvasudev Nov 15, 2023
de82c14
fixed import
kiranvasudev Nov 15, 2023
dcb6854
renamed functions and variables
kiranvasudev Nov 15, 2023
fb6f8f6
added type hints and docstrings
kiranvasudev Nov 15, 2023
06dc86b
changed how models are selected when config is generated
kiranvasudev Nov 15, 2023
19a13bc
black
kiranvasudev Nov 15, 2023
5f83797
fix project dir to load the correct profile
kiranvasudev Nov 16, 2023
209efd6
get external_location from config instead of unrendered config
kiranvasudev Nov 16, 2023
3c4e4c3
renamed variables for better understanding
kiranvasudev Nov 16, 2023
1bd525c
added doctest for fn
kiranvasudev Nov 16, 2023
4b651ad
added files for fixtures and tests
kiranvasudev Nov 16, 2023
7123cbc
added fixture for manifest and profiles
kiranvasudev Nov 16, 2023
77fb2b2
setUp test class
kiranvasudev Nov 16, 2023
47c0638
added test for get_dbt_model_parents
kiranvasudev Nov 16, 2023
4eced6b
added tests for generate_dagger_inputs
kiranvasudev Nov 16, 2023
aa767e8
added test for generate_dagger_outputs
kiranvasudev Nov 16, 2023
8cecbe6
fixed type hint
kiranvasudev Nov 16, 2023
04eafe6
fixed type hints
kiranvasudev Nov 16, 2023
745311d
refactored for simplicity
kiranvasudev Nov 17, 2023
b52e0b5
updated tests
kiranvasudev Nov 17, 2023
8e8c408
Merge remote-tracking branch 'origin/feature/dbt-task-split' into fea…
kiranvasudev Nov 17, 2023
6e7ced2
added dbt profile to default parameters for dbt task
kiranvasudev Nov 20, 2023
962ef70
Merge pull request #25 from chocoapp/feature/dbt-task-split
kiranvasudev Nov 20, 2023
e8cbc6f
added follow external dependency as true as default for athena task
kiranvasudev Nov 20, 2023
e62d35c
add fn to process seed input
kiranvasudev Nov 20, 2023
8de6821
refactor code to incorporate seeds
kiranvasudev Nov 20, 2023
5f26723
updates fixtures
kiranvasudev Nov 20, 2023
9acaff5
added test for dbt seed
kiranvasudev Nov 20, 2023
876010c
modified tests for model containing dbt seed as a dependency
kiranvasudev Nov 20, 2023
9cd0a51
refactor
kiranvasudev Nov 24, 2023
2e6abf6
updated tests and fixtures
kiranvasudev Nov 24, 2023
c652d56
changed name of seed task generating fn
kiranvasudev Nov 24, 2023
ac5f48f
removed unused line
kiranvasudev Nov 24, 2023
46a833d
added docstrings
kiranvasudev Nov 27, 2023
cdd0fea
removed unused test parameter
kiranvasudev Nov 27, 2023
829ac94
changed name of function for better understanding
kiranvasudev Nov 27, 2023
eaf4286
added test to check for de-duplication of inputs
kiranvasudev Nov 27, 2023
ab9a2c8
refactored getting model location function
kiranvasudev Nov 29, 2023
8ef7050
refactor dummy task generation
kiranvasudev Nov 29, 2023
65a07d7
generate inputs for intermediate models and updated tests
kiranvasudev Nov 29, 2023
e5afcfa
uncomment skipping local test
kiranvasudev Nov 29, 2023
e28a078
refactor generate_dagger_tasks fn to make recursive
kiranvasudev Nov 29, 2023
ab48229
updated fixtures and tests
kiranvasudev Nov 29, 2023
3c8c92c
Merge pull request #26 from chocoapp/feature/dbt-task-split
kiranvasudev Nov 30, 2023
d653977
bugfix
kiranvasudev Dec 4, 2023
48a102f
only return dummy when stg model
kiranvasudev Dec 4, 2023
f6c7226
adapted tests
kiranvasudev Dec 4, 2023
32fdd1f
Merge pull request #27 from chocoapp/fix/external-dag-sensors
kiranvasudev Dec 5, 2023
4ceca95
Merge pull request #28 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 5, 2023
146846e
initialize dbt module only when its a dbt pipeline config
kiranvasudev Dec 6, 2023
fd2bd0f
format
kiranvasudev Dec 6, 2023
fb0c2e9
made logic to check for dbt task easier
kiranvasudev Dec 6, 2023
bd8e44c
Merge pull request #29 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 6, 2023
a2b3fdd
fix: follow external dependency for staging models
kiranvasudev Dec 7, 2023
4f1783b
Merge pull request #30 from chocoapp/feature/dbt-task-split
kiranvasudev Dec 8, 2023
3653646
added logic to get parents of int models
kiranvasudev Feb 23, 2024
7dec65a
updated tests and fixtures
kiranvasudev Feb 23, 2024
750f53e
Merge pull request #31 from chocoapp/fix/dbt-add-core-sensor-to-int
kiranvasudev Feb 27, 2024
bdc3e57
Turing split_statements on by default
siklosid Mar 25, 2024
0e9dd3b
Merge pull request #32 from chocoapp/feature/psql_split_statements
siklosid Mar 26, 2024
96ad27b
Moving to python3.9; Upgrading airflow version; removing legacy postg…
siklosid Apr 13, 2024
3e62d78
Making sensor default args more flexible
siklosid Apr 15, 2024
41f1554
Upgrading python in CI
siklosid Apr 15, 2024
d8145ab
Adding graphviz dependency to test
siklosid Apr 15, 2024
7b9bd16
Merge pull request #33 from chocoapp/feature/DATA-1791_defferable_sen…
siklosid Apr 15, 2024
8cc2ec2
Upgrading some package versions to remove warnings
siklosid Apr 16, 2024
647a740
Merge pull request #34 from chocoapp/fix/update_packages
siklosid Apr 16, 2024
f56e6b6
feat: rename profile_name to target_name
Apr 17, 2024
52e7572
Merge pull request #35 from chocoapp/feature/DATA-1493-adjust-dbt-params
claudiazi Apr 17, 2024
1b99357
feat: register new databricks_io
Apr 26, 2024
e00f555
feat: refactor the DBTParseConfig to parse databricks-dbt manifest
Apr 26, 2024
9274fe2
feat: add unit test for databricks config parser
Apr 26, 2024
e951c47
chore: black
Apr 26, 2024
4567b3e
Switching to official batch operator
siklosid Apr 26, 2024
1c2ad82
feat: add another param in dbt task
Apr 26, 2024
a8e6471
Complete renaming of classes
siklosid Apr 26, 2024
3c5fd12
Merge pull request #36 from chocoapp/bugfix/DATA-1804_missing_batch_logs
siklosid Apr 26, 2024
7e29420
feat: refactor
Apr 26, 2024
432c32d
Merge remote-tracking branch 'origin/master' into feature/DATA-1811-d…
Apr 26, 2024
5659c68
feat: adjust the s3 tasks
Apr 26, 2024
81697d4
feat: adjust the _get_s3_task for different dbt adapters
Apr 29, 2024
7c1aa22
fix: define the correct target_config for databricks adapter
Apr 29, 2024
40b28ef
fix: _generate_dagger_output
Apr 29, 2024
9c0311f
extend: command
Apr 29, 2024
14f9d1f
fix: _generate_command
Apr 30, 2024
0d3947f
Merge pull request #37 from chocoapp/feature/DATA-1811-databricks-io
claudiazi Apr 30, 2024
f7cc983
fix: _get_model_data_location in DatabricksDBTConfigParser
Apr 30, 2024
1523648
Merge pull request #38 from chocoapp/feature/DATA-1811-databricks-io
claudiazi Apr 30, 2024
59cf847
fix: generate_task for dbt tasks
May 3, 2024
a25ccc9
Merge pull request #39 from chocoapp/feature/DATA-1811-databricks-io
claudiazi May 3, 2024
c44a7b5
Replacing string replacement with jinja in module processor
siklosid Jun 12, 2024
e687686
Merge pull request #40 from chocoapp/feature/DATA-1699_using_jinja_in…
siklosid Jun 12, 2024
1bc6bf2
feat: adjust the dbt config parser so that view/ephemeral staging lay…
Jun 17, 2024
22ca09b
feat: adjust io for the materalised staging model
Jun 18, 2024
f2f8015
feat: restructure the dbt dagger task input & output
Jun 19, 2024
81e75da
Merge pull request #41 from chocoapp/feat/DATA-1996-improve-dbt-parser
claudiazi Jun 19, 2024
698358d
Module generation with generalised jinja parameters
siklosid Jul 4, 2024
cd12dda
Now it's possible to assign task to task groups
siklosid Jul 4, 2024
0e6a6a6
Adding default value to the parameter
siklosid Jul 4, 2024
f041aa8
Merge pull request #42 from chocoapp/feature/DATA-2002_generalise_jin…
siklosid Jul 4, 2024
8d0e23f
removed custom dbt task generation logic from Module
kiranvasudev Nov 6, 2024
77f6026
add plugins path to dagger config
kiranvasudev Nov 7, 2024
fbb648b
added function to load plugins
kiranvasudev Nov 7, 2024
90bb16b
load plugins and render jinja
kiranvasudev Nov 7, 2024
1d09c57
iterate over multiple folders and their subfolders
kiranvasudev Nov 7, 2024
da0528a
exclude all files starting with __
kiranvasudev Nov 7, 2024
a8aeff7
added plugin to dagger config
kiranvasudev Nov 7, 2024
6addf98
added logging for plugins
kiranvasudev Nov 7, 2024
b8feafb
fix
kiranvasudev Nov 7, 2024
d8c6384
added tests for plugins
kiranvasudev Nov 7, 2024
a3d7f47
refactor code
kiranvasudev Nov 7, 2024
450b544
refactor tests
kiranvasudev Nov 7, 2024
98a6b55
exclude test folders from directory walk
kiranvasudev Nov 8, 2024
ab6ff15
remove unused imports
kiranvasudev Nov 8, 2024
0fa5389
update readme
kiranvasudev Nov 11, 2024
e7f08cd
fix readme
kiranvasudev Nov 12, 2024
7554e62
Merge pull request #43 from chocoapp/feature/plugin-dbt-parser
kiranvasudev Nov 13, 2024
7d871b6
Adding new reverse etl operator to dagger inherited from batch operator
siklosid Jan 2, 2025
bfceb81
Registering the new operator with dagger
siklosid Jan 2, 2025
0b2ac50
Small type fix to resolve broken cli help command
siklosid Jan 2, 2025
e414b84
Adding the possibility that inherited operator can overwrite attribut…
siklosid Jan 2, 2025
67a8142
Smaller fixes; syntax fix; Fixing command creation by extending the e…
siklosid Jan 2, 2025
0557bab
Adding dynamo and sns io types
siklosid Jan 3, 2025
4ddfc33
Adding dynamo and sns io types
siklosid Jan 3, 2025
8038d3c
Fixing input/output name for reverse etl so it matches the batch job …
siklosid Jan 3, 2025
60c4736
Handling hard wired constants as local parameters of the task
siklosid Jan 3, 2025
f64910f
Removing account_id from io; naming convention; fixing small issues
siklosid Jan 3, 2025
2ea6f7e
Fixing batch job name
siklosid Jan 3, 2025
85c9c2a
Removing choco specific parameters and moving them to conf file; Maki…
siklosid Jan 7, 2025
3c5a610
Adding unit tests and a small fix
siklosid Jan 7, 2025
f45c84a
Adding comments
siklosid Jan 7, 2025
76c774d
Merge pull request #44 from chocoapp/feature/data-2025_reverse_etl_op…
siklosid Jan 8, 2025
e62890b
feat: add application name to the spark job & add kill spark job when…
Jan 16, 2025
31085ac
fix: type of _execution_timeout
Jan 16, 2025
02283d8
fix: remove the wrong and uncessary function
Jan 16, 2025
c9a1c6f
fix: timeout logic
Jan 17, 2025
bfb935c
fix: add missing import
Jan 17, 2025
7fba8d5
fix: spark_app_name
Jan 17, 2025
37a31f8
fix: default spark_app_name
Jan 17, 2025
a3422db
feat: improve the logic to kill the spark job
Jan 20, 2025
6046a01
chore: black + add log + missing function
Jan 20, 2025
57c394b
chore: add info to debug
Jan 20, 2025
6fdb674
chore: add info to debug
Jan 20, 2025
a5d627c
chore: reformat
Jan 20, 2025
9ad0c1e
Merge pull request #45 from chocoapp/feature/DATA-2175-kill-timeout-s…
claudiazi Jan 21, 2025
3ef83b3
Bumping tenacity version
siklosid Feb 13, 2025
10f77cf
feat: add param _full_refresh in the reverse_etl task
Feb 19, 2025
08304e7
Merge pull request #48 from chocoapp/quickfix/bumping_tenacity_version
claudiazi Feb 19, 2025
3283180
Merge remote-tracking branch 'origin/master' into hotfix/reverse-etl-…
Feb 19, 2025
437f53a
Merge pull request #47 from chocoapp/hotfix/reverse-etl-full-refresh
claudiazi Feb 19, 2025
13ccf9e
Create soda runner
raimundovidaljunior Mar 18, 2025
2f7cc55
Fixing configs
raimundovidaljunior Mar 19, 2025
47f4d83
Remove unnecessary vars
raimundovidaljunior Mar 24, 2025
1c7802d
Merge pull request #49 from chocoapp/feature/DATA-2216-create-soda-ru…
raimundovidaljunior Mar 25, 2025
75bb61f
Adding column mapping and case conversion
raimundovidaljunior Mar 28, 2025
57f3c35
Adjust reverse_etl
raimundovidaljunior Apr 1, 2025
7e10650
fixing error
raimundovidaljunior Apr 1, 2025
78ce5b2
fixing custom columns
raimundovidaljunior Apr 1, 2025
f5b825d
Merge pull request #50 from chocoapp/feature/DATA-2232-Turn-on-revers…
raimundovidaljunior Apr 7, 2025
64bb822
Removing unnecessary inputs/outputs from command
raimundovidaljunior Apr 15, 2025
b264ea1
Merge pull request #51 from chocoapp/feature/DATA-2238-remove-unneces…
raimundovidaljunior Apr 15, 2025
0e008dc
feat: add param _columns_to_include and _columns_to_exclude in the re…
Apr 22, 2025
001f19c
chore: improve the naming of new params
Apr 23, 2025
b3a96dc
fix: error message for _input_table_columns_to_include and _input_tab…
Apr 23, 2025
18d0ca2
chore: improve the naming of params
Apr 23, 2025
6b1d3b5
Merge pull request #52 from chocoapp/feature/new-param-for-reverse-etl
claudiazi Apr 23, 2025
c775839
fix: wrong validator for full_refresh
Apr 25, 2025
a199e63
Merge pull request #53 from chocoapp/hotfix/validator-full-refresh-in…
claudiazi Apr 25, 2025
164f7bb
Adding critical test flag
raimundovidaljunior Apr 28, 2025
4adb4e6
ADding missing params
raimundovidaljunior Apr 28, 2025
713d99b
Merge pull request #54 from chocoapp/feature/DATA-2238-Add-critical-t…
raimundovidaljunior May 13, 2025
abd2b4c
feat: new param for s3 destination
Jun 4, 2025
9c90cab
feat: add region_name for s3 io
Jun 4, 2025
3fc8ffd
feat: make batch_size and num_threads optional
Jun 5, 2025
2b82499
Merge pull request #55 from chocoapp/feature/DATA-2281-reverse-etl-ne…
claudiazi Jun 10, 2025
cd80d5e
feat: convert soda to deferrable
Jun 18, 2025
289af26
Merge remote-tracking branch 'origin/master' into feature/DATA-2275-a…
Jun 18, 2025
f961d15
feat: convert soda to deferrable
Jun 19, 2025
1649376
feat: convert soda to deferrable
Jun 19, 2025
fca31a6
fix: add extra package
Jun 19, 2025
2caa823
Merge pull request #56 from chocoapp/feature/DATA-2275-adjsut-soda-to…
claudiazi Jun 24, 2025
4c009d3
feat: add real logs for deferrable batch job
Jun 24, 2025
7a5af26
feat: improve the log message
Jun 24, 2025
b38ede9
chore: decrease the size of logs
Jun 24, 2025
6dc02a8
Merge pull request #57 from chocoapp/feature/DATA-2275-adjsut-soda-to…
claudiazi Jun 24, 2025
9603fb5
upgrading flask and requests
raimundovidaljunior Jul 16, 2025
04b839a
Merge pull request #59 from chocoapp/feature/DATA-2322-upgrade-flask-…
raimundovidaljunior Jul 16, 2025
7e4b663
parse dagrun_timeout as a timedelta instead of int
kiranvasudev Aug 6, 2025
cd1c90e
feat: fetch the logs from cloudwatch before BatchOperator.execute_com…
Aug 7, 2025
6733f4a
feat: add fetch logs if the execute fails
Aug 7, 2025
093ad01
Merge pull request #60 from chocoapp/feature/airflow2.11
kiranvasudev Aug 7, 2025
abfeac2
feat: fetch the logs from cloudwatch before BatchOperator.execute_com…
Aug 7, 2025
a65e25e
feat: add fetch logs if the execute fails
Aug 7, 2025
d34624c
Merge remote-tracking branch 'origin/feature/DATA-2449-improve-failed…
Aug 8, 2025
007dcba
fix: handle all failure scenarios
Aug 8, 2025
78b4772
fix: overwrite resume_execution
Aug 8, 2025
892e618
fix: save job_id in xcom
Aug 8, 2025
3e6fe6d
chore: simplify the logic
Aug 8, 2025
8db638a
chore: clean the logic
Aug 8, 2025
881c1c0
Merge pull request #61 from chocoapp/feature/DATA-2449-improve-failed…
claudiazi Aug 8, 2025
925029f
Relaxing click version to avoid conflict with dbt
siklosid Oct 19, 2025
c7d8be0
Merge pull request #62 from chocoapp/feature/DATA-2528_relaxing_click…
siklosid Oct 30, 2025
29203d3
Upgrading local env
siklosid Nov 3, 2025
2f665c5
Fixing dependency issues
siklosid Nov 3, 2025
57d300a
Upgrading git workflow python
siklosid Nov 3, 2025
735279f
Bumping airflow version
siklosid Nov 3, 2025
d72b812
Fixing version mismatch between test and dev
siklosid Nov 3, 2025
08fb0ab
Fixing unit tests caused by airflow version bump
siklosid Nov 3, 2025
3afc530
Merge pull request #63 from chocoapp/feature/DATA-2563_python_upgrade…
siklosid Nov 4, 2025
f0640fd
Implement DLT creator
mrhallak Jan 9, 2026
1a7ff6e
Add databricks provider to Airflow dependencies
mrhallak Jan 9, 2026
435beed
Add databricks provider to production Airflow image
mrhallak Jan 9, 2026
1a82c9d
Remove plugin
mrhallak Jan 12, 2026
6cc7438
Add Claude to repository
mrhallak Jan 12, 2026
7329c97
Merge pull request #66 from chocoapp/feature/add-claude-to-repository
mrhallak Jan 12, 2026
bfeb772
Update the module to support yaml
mrhallak Jan 12, 2026
63d0345
Add comprehensive tests and improve Databricks DLT components
mrhallak Jan 13, 2026
40bdea4
Add coding standard to avoid getattr in CLAUDE.md
mrhallak Jan 13, 2026
99898b4
Revert dbt_config_parser.py changes
mrhallak Jan 13, 2026
596a1fb
Merge pull request #65 from chocoapp/DATA-2637
mrhallak Jan 13, 2026
a0286d3
Rename databricks_dlt to declarative_pipeline task type
mrhallak Jan 15, 2026
0e92e9c
Merge pull request #67 from chocoapp/DATA-2637
mrhallak Jan 15, 2026
6120683
Fix Slack alerts not firing for declarative pipeline tasks
mrhallak Feb 12, 2026
4a89d44
Merge pull request #68 from chocoapp/fix/declarative-pipeline-slack-a…
mrhallak Feb 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ jobs:
with:
persist-credentials: false

- name: Set up Python 3.7
- name: Set up Python 3.12
uses: actions/setup-python@v2
with:
python-version: 3.7
python-version: 3.12

- name: Install dependencies
run: |
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/claude.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: Claude Code

on:
issue_comment:
types: [created]
pull_request_review_comment:
types: [created]
issues:
types: [opened, assigned]
pull_request_review:
types: [submitted]

jobs:
claude:
if: |
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
(github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude')))
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for Claude to read CI results on PRs
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}

# This is an optional setting that allows Claude to read CI results on PRs
additional_permissions: |
actions: read

# Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
# prompt: 'Update the pull request description to include a summary of changes.'

# Optional: Add claude_args to customize behavior and configuration
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://code.claude.com/docs/en/cli-reference for available options
# claude_args: '--allowed-tools Bash(gh pr:*)'
98 changes: 98 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

Dagger is a configuration-driven framework that transforms YAML definitions into Apache Airflow DAGs. It uses dataset lineage (matching inputs/outputs) to automatically build dependency graphs across workflows.

## Common Commands

### Development Setup
```bash
make install-dev # Create venv, install package in editable mode with dev/test deps
source venv/bin/activate
```

### Testing
```bash
make test # Run all tests with coverage (sets AIRFLOW_HOME automatically)

# Run a single test file
AIRFLOW_HOME=$(pwd)/tests/fixtures/config_finder/root/ ENV=local pytest -s tests/path/to/test_file.py

# Run a specific test
AIRFLOW_HOME=$(pwd)/tests/fixtures/config_finder/root/ ENV=local pytest -s tests/path/to/test_file.py::test_function_name
```

### Linting
```bash
make lint # Run flake8 on dagger and tests directories
black dagger tests # Format code
```

### Local Airflow Testing
```bash
make test-airflow # Build and start Airflow in Docker (localhost:8080, user: dev_user, pass: dev_user)
make stop-airflow # Stop Airflow containers
```

### CLI
```bash
dagger --help
dagger list-tasks # Show available task types
dagger list-ios # Show available IO types
dagger init-pipeline # Create a new pipeline.yaml
dagger init-task --type=<task_type> # Add a task configuration
dagger init-io --type=<io_type> # Add an IO definition
dagger print-graph # Visualize dependency graph
```

## Architecture

### Core Flow
1. **ConfigFinder** discovers pipeline directories (each with `pipeline.yaml` + task YAML files)
2. **ConfigProcessor** loads YAML configs with environment variable support
3. **TaskFactory/IOFactory** use reflection to instantiate task/IO objects from YAML
4. **TaskGraph** builds a 3-layer graph: Pipeline → Task → Dataset nodes
5. **DagCreator** traverses the graph and generates Airflow DAGs using **OperatorFactory**

### Key Directories
- `dagger/pipeline/tasks/` - Task type definitions (DbtTask, SparkTask, AthenaTransformTask, etc.)
- `dagger/pipeline/ios/` - IO type definitions (S3, Redshift, Athena, Databricks, etc.)
- `dagger/dag_creator/airflow/operator_creators/` - One creator per task type, translates tasks to Airflow operators
- `dagger/graph/` - Graph construction from task inputs/outputs
- `dagger/config_finder/` - YAML discovery and loading
- `tests/fixtures/config_finder/root/dags/` - Example DAG configurations for testing

### Adding a New Task Type
1. Create task definition in `dagger/pipeline/tasks/` (subclass of Task)
2. Create any needed IOs in `dagger/pipeline/ios/` (if new data sources)
3. Create operator creator in `dagger/dag_creator/airflow/operator_creators/`
4. Register in `dagger/dag_creator/airflow/operator_factory.py`

### Configuration Files
- `pipeline.yaml` - Pipeline metadata (owner, schedule, alerts, airflow_parameters)
- `[taskname].yaml` - Task configs (type, inputs, outputs, task-specific params)
- `dagger_config.yaml` - System config (Neo4j, Elasticsearch, Spark settings)

### Key Patterns
- **Factory Pattern**: TaskFactory/IOFactory auto-discover types via reflection
- **Strategy Pattern**: OperatorCreator subclasses handle task-specific operator creation
- **Dataset Aliasing**: IO `alias()` method enables automatic dependency detection across pipelines

## Coding Standards

### Avoid getattr
Do not use `getattr` for accessing task or IO properties. Instead, define explicit properties on the class. This ensures:
- Type safety and IDE autocompletion
- Clear interface contracts
- Easier debugging and testing

```python
# Bad - avoid this pattern
value = getattr(self._task, 'some_property', default)

# Good - use explicit properties
value = self._task.some_property # Property defined on task class
```
11 changes: 5 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,16 @@ install: clean ## install the package to the active Python's site-packages


install-dev: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.12 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
python setup.py install; \
python -m pip install --upgrade pip setuptools wheel; \
pip install -e . ; \
pip install -r reqs/dev.txt -r reqs/test.txt
SYSTEM_VERSION_COMPAT=0 CFLAGS='-std=c++20' pip install -r reqs/dev.txt -r reqs/test.txt

install-test: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.12 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
python -m pip install --upgrade pip setuptools wheel; \
pip install -r reqs/test.txt -r reqs/base.txt

install-ui: clean ## install the package to the active Python's site-packages
Expand Down
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,41 @@ flowchart TD;

```

Plugins for dagger
-------

### Overview
Dagger now supports a plugin system that allows users to extend its functionality by adding custom Python classes. These plugins are integrated into the Jinja2 templating engine, enabling dynamic rendering of task configuration templates.
### Purpose
The plugin system allows users to define Python classes that can be loaded into the Jinja2 environment. When functions from these classes are invoked within a task configuration template, they are rendered dynamically using Jinja2. This feature enhances the flexibility of task configurations by allowing custom logic to be embedded directly in the templates.

### Usage
1. **Creating a Plugin:** To create a new plugin, define a Python class in a folder(for example `plugins/sample_plugin/sample_plugin.py`) with the desired methods. For example:
```python
class MyCustomPlugin:
def generate_input(self, branch_name):
return [{"name": f"{branch_name}", "type": "dummy"}]
```
This class defines a `generate_input` method that takes the branch_name from the module config and returns a dummy dagger task.

2. **Loading the Plugin into Dagger:** To load this plugin into Dagger's Jinja2 environment, you need to register it in your `dagger_config.yaml`:
```yaml
# pipeline.yaml
plugin:
paths:
- plugins # all Python classes within this path will be loaded into the Jinja environment
```

3. **Using Plugin Methods in Templates:** Once the plugin is loaded, you can call its methods from within any Jinja2 template in your task configurations:
```yaml
# task_configuration.yaml
type: batch
description: sample task
inputs: # format: list | Use dagger init-io cli
{{ MyCustomPlugin.generate_input("dummy_input") }}
```



Credits
-------
Expand Down
36 changes: 34 additions & 2 deletions dagger/cli/module.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,49 @@
import json

import click
import yaml

from dagger.utilities.module import Module
from dagger.utils import Printer


def parse_key_value(ctx, param, value):
"""Parse key=value pairs where value is a path to JSON or YAML file.

Args:
ctx: Click context.
param: Click parameter.
value: List of key=value pairs.

Returns:
Dictionary mapping variable names to parsed file contents.
"""
if not value:
return {}
key_value_dict = {}
for pair in value:
try:
key, val_file_path = pair.split('=', 1)
with open(val_file_path, 'r') as f:
if val_file_path.endswith(('.yaml', '.yml')):
val = yaml.safe_load(f)
else:
val = json.load(f)
key_value_dict[key] = val
except ValueError:
raise click.BadParameter(f"Key-value pair '{pair}' is not in the format key=value")
return key_value_dict

@click.command()
@click.option("--config_file", "-c", help="Path to module config file")
@click.option("--target_dir", "-t", help="Path to directory to generate the task configs to")
def generate_tasks(config_file: str, target_dir: str) -> None:
@click.option("--jinja_parameters", "-j", callback=parse_key_value, multiple=True, default=None, help="Jinja parameters file in the format: <var_name>=<path to json/yaml file>")
def generate_tasks(config_file: str, target_dir: str, jinja_parameters: dict) -> None:
"""
Generating tasks for a module based on config
"""

module = Module(config_file, target_dir)
module = Module(config_file, target_dir, jinja_parameters)
module.generate_task_configs()

Printer.print_success("Tasks are successfully generated")
Expand Down
25 changes: 21 additions & 4 deletions dagger/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
# Airflow parameters
airflow_config = config.get('airflow', None) or {}
WITH_DATA_NODES = airflow_config.get('with_data_nodes', False)
EXTERNAL_SENSOR_POKE_INTERVAL = airflow_config.get('external_sensor_poke_interval', 600)
EXTERNAL_SENSOR_TIMEOUT = airflow_config.get('external_sensor_timeout', 28800)
EXTERNAL_SENSOR_MODE = airflow_config.get('external_sensor_mode', 'reschedule')
EXTERNAL_SENSOR_DEFAULT_ARGS = airflow_config.get('external_sensor_default_args', {})
IS_DUMMY_OPERATOR_SHORT_CIRCUIT = airflow_config.get('is_dummy_operator_short_circuit', False)

# Neo4j parameters
Expand Down Expand Up @@ -100,4 +98,23 @@
# Alert parameters
alert_config = config.get('alert', None) or {}
SLACK_TOKEN = alert_config.get('slack_token', None)
DEFAULT_ALERT = alert_config.get('default_alert', {"type": "slack", "channel": "#airflow-jobs", "mentions": None})
DEFAULT_ALERT = alert_config.get('default_alert', {"type": "slack", "channel": "#airflow-jobs", "mentions": None})

# Plugin parameters
plugin_config = config.get('plugin', None) or {}
PLUGIN_DIRS = [os.path.join(AIRFLOW_HOME, path) for path in plugin_config.get('paths', [])]
logging.info(f"All Python classes will be loaded as plugins from the following directories: {PLUGIN_DIRS}")

# ReverseETL parameters
reverse_etl_config = config.get('reverse_etl', None) or {}
REVERSE_ETL_DEFAULT_JOB_NAME = reverse_etl_config.get('default_job_name', None)
REVERSE_ETL_DEFAULT_EXECUTABLE_PREFIX = reverse_etl_config.get('default_executable_prefix', None)
REVERSE_ETL_DEFAULT_EXECUTABLE = reverse_etl_config.get('default_executable', None)

# Soda parameters
soda_config = config.get('soda', None) or {}
SODA_DEFAULT_JOB_NAME = soda_config.get('default_job_name', None)
SODA_DEFAULT_EXECUTABLE_PREFIX = soda_config.get('default_executable_prefix', None)
SODA_DEFAULT_EXECUTABLE = soda_config.get('default_executable', None)
SODA_DEFAULT_OUTPUT_TABLE = soda_config.get('default_output_table', None)
SODA_DEFAULT_OUTPUT_S3_PATH = soda_config.get('default_output_s3_path', None)
28 changes: 16 additions & 12 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _get_external_task_sensor_name_dict(self, from_task_id: str) -> dict:
"external_sensor_name": f"{from_pipeline_name}-{from_task_name}-sensor",
}

def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> ExternalTaskSensor:
def _get_external_task_sensor(self, from_task_id: str, to_task_id: str, follow_external_dependency: dict) -> ExternalTaskSensor:
"""
create an object of external task sensor for a specific from_task_id and to_task_id
"""
Expand All @@ -72,6 +72,9 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> Exter

to_pipe_id = self._task_graph.get_node(to_task_id).obj.pipeline.name

extra_args = conf.EXTERNAL_SENSOR_DEFAULT_ARGS.copy()
extra_args.update(follow_external_dependency)

return ExternalTaskSensor(
dag=self._dags[to_pipe_id],
task_id=external_sensor_name,
Expand All @@ -80,9 +83,7 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> Exter
execution_date_fn=self._get_execution_date_fn(
from_pipeline_schedule, to_pipeline_schedule
),
mode=conf.EXTERNAL_SENSOR_MODE,
poke_interval=conf.EXTERNAL_SENSOR_POKE_INTERVAL,
timeout=conf.EXTERNAL_SENSOR_TIMEOUT,
**extra_args
)

def _create_control_flow_task(self, pipe_id, dag):
Expand Down Expand Up @@ -135,6 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_task_ids: The IDs of the tasks to which the edge connects.
node: The current node in a task graph.
"""

from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
Expand All @@ -143,7 +145,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_pipe = self._task_graph.get_node(to_task_id).obj.pipeline_name
if from_pipe and from_pipe == to_pipe:
self._tasks[from_task_id] >> self._tasks[to_task_id]
elif from_pipe and from_pipe != to_pipe and edge_properties.follow_external_dependency:
elif from_pipe and from_pipe != to_pipe and edge_properties.follow_external_dependency is not None:
from_schedule = self._task_graph.get_node(from_task_id).obj.pipeline.schedule
to_schedule = self._task_graph.get_node(to_task_id).obj.pipeline.schedule
if not from_schedule.startswith("@") and not to_schedule.startswith("@"):
Expand All @@ -155,15 +157,17 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
not in self._sensor_dict.get(to_pipe, dict()).keys()
):
external_task_sensor = self._get_external_task_sensor(
from_task_id, to_task_id
from_task_id, to_task_id, edge_properties.follow_external_dependency
)
self._sensor_dict[to_pipe] = {

if self._sensor_dict.get(to_pipe) is None:
self._sensor_dict[to_pipe] = {}

self._sensor_dict[to_pipe].update({
external_task_sensor_name: external_task_sensor
}
(
self._tasks[self._get_control_flow_task_id(to_pipe)]
>> external_task_sensor
)
})

self._tasks[self._get_control_flow_task_id(to_pipe)] >> external_task_sensor
self._sensor_dict[to_pipe][external_task_sensor_name] >> self._tasks[to_task_id]
else:
self._tasks[self._get_control_flow_task_id(to_pipe)] >> self._tasks[to_task_id]
Expand Down
Loading
Loading