Skip to content

Ioannis-Stamatakis/dataflow-agent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

13 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

๐Ÿ” dataflow-agent

AI-powered data pipeline debugger

Python Gemini LangGraph License


An agentic CLI tool that uses Google Gemini and LangGraph to autonomously diagnose, explain, and fix broken or slow data pipelines.


dbt Airflow Prefect Apache Spark PostgreSQL Snowflake


What it does

dataflow-agent acts as an autonomous data engineering assistant. Point it at a broken pipeline โ€” a dbt error log, a failing Airflow DAG, a Spark OOM crash โ€” and it will:

  1. Read the logs and project files using its tool suite
  2. Reason over the evidence with Gemini running inside a LangGraph ReAct loop
  3. Explain the root cause in plain English
  4. Fix the broken code โ€” showing a rich diff and asking for your confirmation before writing

Features

Capability Details
๐Ÿ”Ž Autonomous diagnosis Reads logs, traverses project files, calls tools in sequence until it has enough evidence
๐Ÿ›  Interactive fixes Rich unified diff preview with y/n confirmation before any file is modified
๐Ÿ—„ Schema introspection Queries live Postgres or Snowflake to validate column and table references
๐Ÿ“ SQL analysis sqlglot-powered parsing, linting, and optimization hints (indexes, skew, wildcards, etc.)
๐Ÿ’ฌ Chat mode Multi-turn session with the full pipeline context retained across turns
๐Ÿงฉ Framework-aware Dedicated parsers for dbt artifacts, Airflow DAGs, Prefect flows, and Spark driver logs
๐Ÿ“Š dbt project profiler Scans all SQL models, scores complexity (CTEs, JOINs, anti-patterns), ranks by risk
๐Ÿงช dbt test generator Infers and writes a schema.yml with not_null, unique, and FK tests from model SQL
๐Ÿ”— dbt lineage tracer Traces upstream/downstream model dependencies from manifest.json or SQL ref() scan; optional AI impact analysis

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        dataflow-agent                           โ”‚
โ”‚                                                                 โ”‚
โ”‚   CLI (Typer + Rich)                                            โ”‚
โ”‚       โ”‚                                                         โ”‚
โ”‚       โ–ผ                                                         โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚                  LangGraph Agent Graph                  โ”‚   โ”‚
โ”‚  โ”‚                                                         โ”‚   โ”‚
โ”‚  โ”‚   START โ†’ context_loader โ†’ agent_node โ‡„ tools_node     โ”‚   โ”‚
โ”‚  โ”‚                                 โ”‚            โ”‚          โ”‚   โ”‚
โ”‚  โ”‚                            Gemini LLM   Tool Executor   โ”‚   โ”‚
โ”‚  โ”‚                                 โ”‚            โ”‚          โ”‚   โ”‚
โ”‚  โ”‚                                 โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚   โ”‚
โ”‚  โ”‚                                      โ”‚                  โ”‚   โ”‚
โ”‚  โ”‚                                     END                 โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                 โ”‚
โ”‚   Tools available to the agent:                                 โ”‚
โ”‚   read_log ยท read_file ยท list_files ยท extract_errors             โ”‚
โ”‚   analyze_sql ยท explain_query ยท inspect_schema                   โ”‚
โ”‚   parse_dbt_manifest ยท generate_dbt_tests ยท trace_dbt_lineage   โ”‚
โ”‚   profile_dbt_project ยท parse_airflow_dag ยท parse_prefect_flow  โ”‚
โ”‚   parse_spark_log ยท write_fix                                    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The agent loops โ€” calling tools, processing results, calling more tools โ€” until Gemini decides it has enough information to produce a final diagnosis. No hardcoded logic, no fixed pipelines.


Installation

Requirements: Python 3.11+, a free Gemini API key

# 1. Clone
git clone https://github.com/yourname/dataflow-agent
cd dataflow-agent

# 2. Virtual environment
python -m venv venv
source venv/bin/activate       # Windows: venv\Scripts\activate

# 3. Install
pip install -e .

# 4. Configure
cp .env.example .env
#  โ†’ open .env and paste your GEMINI_API_KEY

Configuration

# .env

# Required
GEMINI_API_KEY=your_gemini_api_key_here

# Optional โ€” defaults shown
GEMINI_MODEL=gemini-2.5-flash

# Optional โ€” for schema introspection
POSTGRES_URL=postgresql://user:password@localhost:5432/mydb

SNOWFLAKE_ACCOUNT=myorg-myaccount
SNOWFLAKE_USER=myuser
SNOWFLAKE_PASSWORD=mypassword
SNOWFLAKE_DATABASE=ANALYTICS
SNOWFLAKE_SCHEMA=PUBLIC
SNOWFLAKE_WAREHOUSE=COMPUTE_WH

Usage

diagnose โ€” find the root cause

# dbt run log
dataflow-agent diagnose --framework dbt --log ./logs/dbt.log

# dbt log + full project (agent can traverse models, macros, etc.)
dataflow-agent diagnose --framework dbt \
  --log ./logs/dbt.log \
  --project ./my_dbt_project

# Airflow โ€” DAG file + task log
dataflow-agent diagnose --framework airflow \
  --dag ./dags/my_dag.py \
  --log ./logs/task.log

# Spark โ€” OOM, executor loss, shuffle failures
dataflow-agent diagnose --framework spark \
  --log ./logs/spark_driver.log

diagnose --fix โ€” diagnose and repair

# Shows a rich diff, asks y/n before writing
dataflow-agent diagnose --framework dbt \
  --log ./logs/dbt.log \
  --project ./my_dbt_project \
  --fix

Example output:

โ•ญโ”€ Proposed change โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ --- a/models/marts/fct_orders.sql                  โ”‚
โ”‚ +++ b/models/marts/fct_orders.sql                  โ”‚
โ”‚ @@ -11,7 +11,7 @@                                  โ”‚
โ”‚ -    SUM(oi.discount_amount) AS total_discount,    โ”‚
โ”‚ +    SUM(oi.discount_pct)    AS total_discount,    โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
Apply this fix? [y/N]:

optimize โ€” find performance issues

dataflow-agent optimize --framework spark \
  --log ./logs/spark.log \
  --db postgres

chat โ€” interactive session

# Full context retained across turns โ€” ask follow-up questions
dataflow-agent chat --framework dbt --project ./my_dbt_project

dataflow-agent chat --framework spark --log ./logs/spark.log
โ•ญโ”€ Interactive Pipeline Chat โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚ Framework: dbt  |  Type exit to end the session.   โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

You> Why did fct_orders fail?
Agent> The model failed because it references `discount_amount`, but
       the upstream `int_order_items` model exposes `discount_pct`...

You> What other models depend on fct_orders?
Agent> Based on the manifest, three models reference fct_orders: ...

profile โ€” rank dbt models by complexity

Scans every .sql file under models/, scores each by CTEs, JOINs, subqueries, and anti-patterns, and returns a ranked report with LLM-generated refactoring recommendations.

dataflow-agent profile ./my_dbt_project

# Show top 20 models instead of the default 10
dataflow-agent profile ./my_dbt_project --top 20

generate-tests โ€” generate a schema.yml from model SQL

Infers not_null, unique, and foreign-key tests from a model's SQL and writes a ready-to-use schema.yml.

dataflow-agent generate-tests --model ./models/marts/fct_orders.sql

# Write to a specific output path
dataflow-agent generate-tests \
  --model ./models/marts/fct_orders.sql \
  --output ./models/marts/schema.yml

lineage โ€” trace dbt model dependencies

Builds an upstream/downstream dependency graph from manifest.json (preferred) or by scanning ref()/source() calls in SQL files. No API key required unless --analyze is used.

# From manifest
dataflow-agent lineage fct_orders --manifest ./target/manifest.json

# From project directory (SQL scan fallback)
dataflow-agent lineage fct_orders --project ./my_dbt_project

# Upstream only, limited to 2 hops
dataflow-agent lineage fct_orders -m ./target/manifest.json -d upstream --depth 2

# AI-powered impact analysis (requires GEMINI_API_KEY)
dataflow-agent lineage fct_orders -m ./target/manifest.json --analyze

--model โ€” override the LLM

dataflow-agent diagnose --framework dbt --log ./dbt.log --model gemini-1.5-pro

Demo โ€” try it now

The repo ships with realistic broken fixtures. No database needed.

# dbt: two failed models with column name mismatches
dataflow-agent diagnose --framework dbt \
  --log tests/fixtures/dbt_error.log

# Spark: executor OOM โ†’ stage failure โ†’ job abort
dataflow-agent diagnose --framework spark \
  --log tests/fixtures/spark_error.log

# Airflow: KeyError on a renamed column + catchup=True misconfiguration
dataflow-agent diagnose --framework airflow \
  --dag  tests/fixtures/broken_dag.py \
  --log  tests/fixtures/airflow_error.log

# Interactive chat about the broken Airflow DAG
dataflow-agent chat --framework airflow \
  --dag tests/fixtures/broken_dag.py

Project Structure

dataflow-agent/
โ”œโ”€โ”€ dataflow_agent/
โ”‚   โ”œโ”€โ”€ cli.py                   # Typer CLI: diagnose ยท optimize ยท chat ยท profile ยท generate-tests ยท lineage ยท explain
โ”‚   โ”œโ”€โ”€ agent.py                 # LangGraph graph, state, nodes, chat loop, run_* entry points
โ”‚   โ”œโ”€โ”€ config.py                # Pydantic settings loaded from .env
โ”‚   โ”œโ”€โ”€ tools/
โ”‚   โ”‚   โ”œโ”€โ”€ log_reader.py        # read_log
โ”‚   โ”‚   โ”œโ”€โ”€ file_reader.py       # read_file ยท list_files
โ”‚   โ”‚   โ”œโ”€โ”€ file_writer.py       # write_fix  (Rich diff + y/n prompt)
โ”‚   โ”‚   โ”œโ”€โ”€ schema_inspector.py  # inspect_schema  (Postgres + Snowflake)
โ”‚   โ”‚   โ”œโ”€โ”€ sql_analyzer.py      # analyze_sql ยท explain_query
โ”‚   โ”‚   โ”œโ”€โ”€ dbt_profiler.py      # profile_dbt_project  (complexity ranking)
โ”‚   โ”‚   โ””โ”€โ”€ framework/
โ”‚   โ”‚       โ”œโ”€โ”€ dbt.py           # parse_dbt_manifest ยท generate_dbt_tests ยท trace_dbt_lineage
โ”‚   โ”‚       โ”œโ”€โ”€ airflow.py       # parse_airflow_dag
โ”‚   โ”‚       โ”œโ”€โ”€ prefect.py       # parse_prefect_flow
โ”‚   โ”‚       โ””โ”€โ”€ spark.py         # parse_spark_log
โ”‚   โ””โ”€โ”€ parsers/
โ”‚       โ””โ”€โ”€ error_extractor.py   # extract_errors  (framework-aware regex)
โ”œโ”€โ”€ tests/
โ”‚   โ”œโ”€โ”€ fixtures/
โ”‚   โ”‚   โ”œโ”€โ”€ dbt_error.log        # dbt run with 2 failures + 5 skips
โ”‚   โ”‚   โ”œโ”€โ”€ airflow_error.log    # Airflow KeyError after 3 retry attempts
โ”‚   โ”‚   โ”œโ”€โ”€ spark_error.log      # Spark OOM โ†’ executor loss โ†’ job abort
โ”‚   โ”‚   โ”œโ”€โ”€ broken_dag.py        # Airflow DAG with column bug + catchup=True
โ”‚   โ”‚   โ”œโ”€โ”€ fct_orders.sql       # dbt model with wrong column reference
โ”‚   โ”‚   โ”œโ”€โ”€ dbt_model.sql        # dbt model fixture for test generation
โ”‚   โ”‚   โ””โ”€โ”€ manifest.json        # minimal dbt manifest with multi-layer lineage graph
โ”‚   โ””โ”€โ”€ test_tools.py            # 24 smoke tests (all passing)
โ”œโ”€โ”€ .env.example
โ”œโ”€โ”€ .gitignore
โ””โ”€โ”€ pyproject.toml

Supported Frameworks

Framework Log parsing File parsing Schema validation SQL analysis
dbt โœ… Run logs โœ… manifest.json, run_results.json โœ… โœ…
Apache Airflow โœ… Task logs โœ… DAG .py files โœ… โœ…
Prefect โœ… Flow run logs โœ… Flow .py files โœ… โœ…
Apache Spark โœ… Java exceptions, OOM, shuffle โœ… PySpark scripts โœ… โœ…

Tech Stack

Layer Library
LLM google-generativeai / langchain-google-genai
Agent framework langgraph
CLI typer + rich
SQL parsing sqlglot
DB connectors psycopg2-binary, snowflake-connector-python
Config pydantic + python-dotenv

License

MIT โ€” see LICENSE.

About

AI-powered data pipeline debugger. Uses Google Gemini + LangGraph to autonomously diagnose, explain, and fix broken dbt, Airflow, Prefect, and Spark pipelines. Supports PostgreSQL & Snowflake schema introspection, SQL analysis, and interactive fix proposals.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages