Skip to content

dbt-core-interface is an MIT licensed high level wrapper for dbt-core that can be used to drive third party integrations such as servers, CI automation, DWH automation, etc. without duplicate boilerplate.

License

Notifications You must be signed in to change notification settings

z3z1ma/dbt-core-interface

Project Logo

DBT-CORE-INTERFACE

Lightweight, thread-safe, multi-project Python interface to dbt-core

license last-commit repo-top-language repo-language-count

Built with the tools and technologies:

TOML Rich Ruff GNU%20Bash FastAPI
Pytest Docker Python GitHub%20Actions uv

Overview

dbt-core-interface is a lightweight, high-performance Python interface for working directly with dbt-core (v1.8+). It allows developers to manage and run dbt projects entirely in memory using an intuitive Python API—enabling runtime SQL compilation, macro evaluation, SQLFluff linting/formatting, and more, all through FastAPI or local usage.

It supports dynamic multi-project environments, automatic re-parsing, file watchers, and asynchronous usage. It is the foundation for more complex interfaces such as dbt-fastapi and is designed to rapidly prototype ideas outside the constraints of the dbt-core repo itself.


Features

  • 🧐 In-memory dbt-core 1.8+ interface with full RuntimeConfig hydration
  • ⚡ Fast, thread-safe SQL compilation and execution via FastAPI
  • 🔬 Interactive linting and formatting with SQLFluff
  • 🌐 Live REST API server via FastAPI
  • 🌍 Supports multiple projects simultaneously using DbtProjectContainer
  • 🚀 Dynamic macro parsing, Jinja rendering, manifest manipulation
  • 🔄 Background file watching for auto-reparsing
  • ⚖ Direct dbt command passthrough (e.g. run, test, docs serve, etc.)
  • 🔍 Automated data quality monitoring and alerting

Requirements

  • Python 3.9+
  • dbt-core >= 1.8.0

Install via PyPI:

pip install dbt-core-interface

Usage

Programmatic

from dbt_core_interface import DbtProject

# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")

# Run a simple SQL query
res = project.execute_sql("SELECT current_date AS today")
print(res.table)

# Compile SQL (but don't run it)
compiled = project.compile_sql("SELECT * FROM {{ ref('my_model') }}")
print(compiled.compiled_code)

# Execute a ref() lookup
node = project.ref("my_model")
print(node.resource_type, node.name)

# Load a source node
source = project.source("my_source", "my_table")
print(source.description)

# Incrementally parse the project
project.parse_project(write_manifest=True)

# Re-parse a specific path
project.parse_paths("models/my_model.sql")

# Compile a node from path
node = project.get_node_by_path("models/my_model.sql")
compiled = project.compile_node(node)
print(compiled.compiled_code)

# Run a dbt command programmatically
project.run("-s +orders")
project.test()

# SQLFluff linting
lint_result = project.lint(sql="select 1 AS foo")
lint_result = project.lint(sql=Path("models/my_model.sql"))
print(lint_result)

# SQLFluff formatting
success, formatted_sql = project.format(sql="Select * FROM orders as o")
success, formatted_sql = project.format(sql=Path("models/my_model.sql"))
print(formatted_sql)

# Use the DbtProjectContainer to manage multiple projects
from dbt_core_interface import DbtProjectContainer

container = DbtProjectContainer()
container.create_project(project_dir="/path/to/dbt_project_1")
container.create_project(project_dir="/path/to/dbt_project_2")
print(container.registered_projects())

Server Mode (FastAPI)

Run:

python -m dbt_core_interface.server --host 0.0.0.0 --port 8581

Register a project:

curl -X POST 'http://localhost:8581/register?project_dir=/your/dbt_project'

Compile SQL:

curl -X POST 'http://localhost:8581/compile' \
     -H 'X-dbt-Project: /your/dbt_project' \
     -d 'select * from {{ ref("orders" }}'

Client Usage

Run the server and use the bundled client to interact with it:

from dbt_core_interface.client import DbtInterfaceClient, ServerError

client = DbtInterfaceClient(
    project_dir="/path/to/project",
    profiles_dir="/path/to/profiles.yml",
    target="dev",
    base_url="http://localhost:8581",
    timeout=(5.0, 15.0)
)

# Health & heartbeat
print(client.health_check())  # {'status': 'ok', ...}
print(client.heartbeat())     # {'alive': True, 'uptime': ...}

# Run SQL with limit & path which allows resolving {{ this }}
result = client.run_sql("SELECT * FROM {{ this }} ORDER BY id", limit=500, path="models/my_model.sql")
print(result.table.rows)

# Compile without execution
comp = client.compile_sql("SELECT * FROM {{ ref('users') }}")
print(comp.compiled_code)

# Lint & format
lint = client.lint_sql(raw_sql="select * from {{ ref('users') }}")
print(lint.violations)
fmt = client.format_sql(raw_sql="select * from {{ ref('users') }}")
print(fmt.formatted_code)

# Arbitrary dbt command
docs = client.command("docs", "generate")
print(docs)

# On object deletion, project is unregistered automatically
del client

Data Quality Monitoring

dbt-core-interface includes automated data quality monitoring and alerting capabilities:

from dbt_core_interface import DbtProject, RowCountCheck, NullPercentageCheck
from dbt_core_interface import Severity, WebhookAlertChannel

# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")

# Access the quality monitor
monitor = project.quality_monitor

# Add quality checks to your models
monitor.add_check(
    "my_model",
    RowCountCheck(
        name="row_count_validation",
        min_rows=1,
        max_rows=1000000,
        severity=Severity.ERROR,
    )
)

monitor.add_check(
    "my_model",
    NullPercentageCheck(
        name="null_id_check",
        column_name="id",
        max_null_percentage=0.0,
        severity=Severity.CRITICAL,
    )
)

# Add alert channels
monitor.add_alert_channel(
    WebhookAlertChannel(url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL")
)

# Run all quality checks
results = monitor.run_checks(model_name="my_model")
for result in results:
    print(f"{result.check_name}: {result.status} - {result.message}")

# Run checks for all models
all_results = monitor.run_checks()

Available Check Types

  • RowCountCheck: Validate row counts are within min/max bounds
  • NullPercentageCheck: Ensure null percentage in a column is acceptable
  • DuplicateCheck: Detect duplicate rows based on key columns
  • ValueRangeCheck: Verify numeric values are within expected range
  • CustomSqlCheck: Define custom SQL-based validation logic

Server API Endpoints

When running the FastAPI server, use these endpoints for quality monitoring:

# Add a quality check
curl -X POST 'http://localhost:8581/api/v1/quality/checks' \
  -H 'X-dbt-Project: /your/dbt_project' \
  -d '{
    "name": "row_count_check",
    "check_type": "row_count",
    "model_name": "my_model",
    "severity": "warning",
    "config": {"min_rows": 1, "max_rows": 1000000}
  }'

# List all quality checks
curl 'http://localhost:8581/api/v1/quality/checks?project_dir=/your/dbt_project'

# Run quality checks
curl -X POST 'http://localhost:8581/api/v1/quality/run?project_dir=/your/dbt_project&model_name=my_model'

# Add an alert channel
curl -X POST 'http://localhost:8581/api/v1/quality/alerts' \
  -H 'X-dbt-Project: /your/dbt_project' \
  -d '{
    "channel_type": "webhook",
    "config": {"url": "https://hooks.slack.com/..."}
  }'

Generic Test Library

dbt-core-interface includes a comprehensive library of reusable generic dbt tests that can be easily configured via YAML schema files:

from dbt_core_interface import DbtProject, GenericTestLibrary

# Load your project
project = DbtProject(project_dir="/path/to/dbt_project")

# Initialize the test library
library = GenericTestLibrary(project)

# List all available tests
for test in library.list_tests():
    print(f"{test.name}: {test.description}")

# Generate schema.yml with suggested tests
columns = {"id": {}, "email": {}, "status": {}}
schema_yml = library.generate_schema_yml("users", columns)
print(schema_yml)

Available Generic Tests

  • unique: Ensures a column has unique values (no duplicates)
  • not_null: Ensures a column has no null values
  • relationships: Ensures referential integrity between tables
  • accepted_values: Ensures column values are from a specified list
  • recency: Checks data freshness within a time window
  • cardinality_equals: Ensures distinct count matches another table

Example YAML Configuration

version: 2

models:
  - name: users
    columns:
      - name: id
        tests:
          - unique
          - not_null

      - name: email
        tests:
          - unique
          - not_null

      - name: status
        tests:
          - accepted_values:
              values: ['active', 'inactive', 'pending']

Auto-Suggest Tests

The library can automatically suggest appropriate tests based on column naming patterns:

# Get suggestions for specific columns
for col_name in ["id", "user_id", "email", "status"]:
    suggestions = library.suggest_tests_for_column(col_name)
    for suggestion in suggestions:
        print(f"{col_name}: {suggestion.test_type.value}")

For detailed documentation, see src/dbt_core_interface/generic_tests/docs.md.


License

This project is licensed under the MIT License. See the LICENSE file for more info.


Acknowledgments

Thanks to the dbt-core maintainers and contributors whose work makes this project possible.

About

dbt-core-interface is an MIT licensed high level wrapper for dbt-core that can be used to drive third party integrations such as servers, CI automation, DWH automation, etc. without duplicate boilerplate.

Topics

Resources

License

Code of conduct

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 8

Languages