Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b09ecfb
feat: add destination readback introspection for smoke tests
devin-ai-integration[bot] Mar 23, 2026
8c0385b
fix: address CI lint and type check failures
devin-ai-integration[bot] Mar 23, 2026
4faa641
fix: use uv run ruff format for CI-compatible formatting
devin-ai-integration[bot] Mar 23, 2026
1a514c4
refactor: use cache's built-in normalizers and move query methods to …
devin-ai-integration[bot] Mar 23, 2026
6740c3d
refactor: add get_sql_cache() to Destination, simplify readback to us…
devin-ai-integration[bot] Mar 23, 2026
6fa7180
fix: case-insensitive key lookup for column stats across DBs
devin-ai-integration[bot] Mar 23, 2026
4885a16
refactor: eliminate destination_readback.py, fix key casing and secre…
devin-ai-integration[bot] Mar 23, 2026
fffbcea
style: convert docstring from reST to markdown syntax
devin-ai-integration[bot] Mar 23, 2026
cda5d81
fix: use positional aliases in column stats query to avoid truncation
devin-ai-integration[bot] Mar 23, 2026
d75b003
fix: quote table names in readback SQL queries for Snowflake compatib…
devin-ai-integration[bot] Mar 23, 2026
8ce95e4
fix: handle dict config in postgres_destination_to_cache for local re…
devin-ai-integration[bot] Mar 23, 2026
2b7d3ce
fix: case-insensitive row_count lookup + fallback to original stream …
devin-ai-integration[bot] Mar 23, 2026
40944f9
fix: move inline imports to top-level in CacheBase readback methods
devin-ai-integration[bot] Mar 23, 2026
08f1b4c
fix: import sorting in base.py + handle dict config and /local path i…
devin-ai-integration[bot] Mar 23, 2026
1cdd9b0
style: fix ruff format in duckdb_destination_to_cache
devin-ai-integration[bot] Mar 23, 2026
f07a223
fix: handle dict config in motherduck_destination_to_cache
devin-ai-integration[bot] Mar 23, 2026
76151ab
fix: strip destinationType key in bigquery and snowflake dest-to-cach…
devin-ai-integration[bot] Mar 23, 2026
08b64e4
fix: use dialect-aware identifier quoting in readback SQL (BigQuery b…
devin-ai-integration[bot] Mar 23, 2026
73993fa
refactor: move readback logic from CacheBase to SqlProcessorBase per …
devin-ai-integration[bot] Mar 23, 2026
f0b26d7
fix: restore CI-expected formatting in sql_processor.py
devin-ai-integration[bot] Mar 23, 2026
174bbfb
style: convert all docstrings from reST to Markdown formatting
devin-ai-integration[bot] Mar 23, 2026
1e5c45b
fix: make stats fields nullable, broaden readback exception handling …
devin-ai-integration[bot] Mar 23, 2026
a1187b4
docs: add Markdown docstring style guideline to CONTRIBUTING.md
devin-ai-integration[bot] Mar 23, 2026
8a5ea93
fix: pass schema_name through destination_to_cache, add db/schema to …
devin-ai-integration[bot] Mar 23, 2026
2aea3d0
refactor: improve get_column_info docstring, share inspector across t…
devin-ai-integration[bot] Mar 23, 2026
b86d561
refactor: make get_column_stats private (_get_column_stats)
devin-ai-integration[bot] Mar 23, 2026
8673f98
refactor: rename get_* to fetch_* for DB-hitting introspection methods
devin-ai-integration[bot] Mar 23, 2026
20acec8
refactor: replace _get_stream_names_from_source with Source.get_selec…
devin-ai-integration[bot] Mar 23, 2026
dbc48c2
refactor: delete DestinationReadbackResult, flatten table_statistics …
devin-ai-integration[bot] Mar 23, 2026
9e17822
refactor: make tables_not_found a dict mapping stream_name to expecte…
devin-ai-integration[bot] Mar 23, 2026
19045b5
refactor: add Destination.is_cache_supported, remove try/except from …
devin-ai-integration[bot] Mar 23, 2026
f425257
refactor: run readback regardless of write success (partial success s…
devin-ai-integration[bot] Mar 23, 2026
9ec7ac7
refactor: improve readback skip log message per review suggestion
devin-ai-integration[bot] Mar 23, 2026
18e1e99
refactor: simplify get_sql_cache signature, remove destination_name/c…
devin-ai-integration[bot] Mar 23, 2026
97c48c9
refactor: move translation files from destinations/ to caches/_utils/…
devin-ai-integration[bot] Mar 23, 2026
d894c8e
fix: add copyright notice to caches/_utils/__init__.py
devin-ai-integration[bot] Mar 23, 2026
8576cfc
refactor: promote inline imports to top-level now that circular dep i…
devin-ai-integration[bot] Mar 23, 2026
01a59e9
docs: fix stale docstrings - readback runs regardless of write success
devin-ai-integration[bot] Mar 23, 2026
18f980f
docs: fix remaining double backticks to use Markdown style
devin-ai-integration[bot] Mar 23, 2026
8323694
fix: wrap readback introspection in try/except to preserve structured…
devin-ai-integration[bot] Mar 23, 2026
a545987
fix: dispose engine after schema_name override to fix stale schema_tr…
devin-ai-integration[bot] Mar 23, 2026
a6902f1
fix: fall back to default schema when namespace schema has no tables
devin-ai-integration[bot] Mar 23, 2026
491551e
fix: override destination config schema key for namespace isolation
devin-ai-integration[bot] Mar 23, 2026
8ec0813
style: ruff format destination_smoke_tests.py
devin-ai-integration[bot] Mar 23, 2026
0eeb90d
fix: force disable_type_dedupe=false for readback and improve config …
devin-ai-integration[bot] Mar 23, 2026
c79c72e
refactor: remove config schema override — catalog namespace is suffic…
devin-ai-integration[bot] Mar 23, 2026
88f3d9e
fix: skip redundant config validation in _prepare_destination_config
devin-ai-integration[bot] Mar 23, 2026
006c891
fix: use plaintext password directly in snowflake_destination_to_cache
devin-ai-integration[bot] Mar 23, 2026
e9adb75
fix: use destination's credentials_json for BigQuery cache construction
devin-ai-integration[bot] Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

Thank you for your interest in contributing to PyAirbyte!

## Docstring Style

Use **Markdown** formatting in all docstrings — not reStructuredText (reST).

- Use single backticks for inline code: `` `MyClass` ``, not double backticks (` ``MyClass`` `).
- Reference methods as `` `get_column_info()` ``, not `:meth:\`get_column_info\``.
- Use standard Markdown for emphasis, lists, and links.

## 🚀 Releasing

This project uses [`semantic-pr-release-drafter`](https://github.com/aaronsteers/semantic-pr-release-drafter) for automated release management. To release, simply click "`Edit`" on the latest release draft from the [releases page](https://github.com/airbytehq/PyAirbyte/releases), and then click "`Publish release`". This publish operation will trigger all necessary downstream publish operations.
Expand Down
126 changes: 116 additions & 10 deletions airbyte/_util/destination_smoke_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

Smoke tests send synthetic data from the built-in smoke test source to a
destination connector and report whether the destination accepted the data
without errors. No readback or comparison is performed.
without errors.

When the destination has a compatible cache implementation, readback
introspection is automatically performed to produce stats on the written
data: table row counts, column names/types, and per-column null/non-null
counts.
"""

from __future__ import annotations

import logging
import time
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -22,12 +28,16 @@

from airbyte import get_source
from airbyte.exceptions import PyAirbyteInputError
from airbyte.shared.sql_processor import TableStatistics # noqa: TC001 # Pydantic needs at runtime


logger = logging.getLogger(__name__)


NAMESPACE_PREFIX = "zz_deleteme"
"""Prefix for auto-generated smoke test namespaces.

The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the
The `zz_` prefix sorts last alphabetically; `deleteme` signals the
namespace is safe for automated cleanup.
"""

Expand All @@ -46,11 +56,11 @@ def generate_namespace(
) -> str:
"""Generate a smoke-test namespace.

Format: ``zz_deleteme_yyyymmdd_hhmm_<suffix>``.
The ``zz_`` prefix sorts last alphabetically and the ``deleteme``
Format: `zz_deleteme_yyyymmdd_hhmm_<suffix>`.
The `zz_` prefix sorts last alphabetically and the `deleteme`
token acts as a guard for automated cleanup scripts.

If *namespace_suffix* is not provided, ``smoke_test`` is used as the
If `namespace_suffix` is not provided, `smoke_test` is used as the
default suffix.
"""
suffix = namespace_suffix or DEFAULT_NAMESPACE_SUFFIX
Expand All @@ -59,6 +69,11 @@ def generate_namespace(
return f"{NAMESPACE_PREFIX}_{ts}_{suffix}"


# ---------------------------------------------------------------------------
# Smoke test result model
# ---------------------------------------------------------------------------


class DestinationSmokeTestResult(BaseModel):
"""Result of a destination smoke test run."""

Expand All @@ -83,6 +98,22 @@ class DestinationSmokeTestResult(BaseModel):
error: str | None = None
"""Error message if the smoke test failed."""

table_statistics: dict[str, TableStatistics] | None = None
"""Map of stream name to table statistics (row counts, columns, stats).

Populated when the destination has a compatible cache, regardless of
write success (to support partial-success inspection). `None` when
the destination does not have a compatible cache.
"""

tables_not_found: dict[str, str] | None = None
"""Stream names whose expected tables were not found in the destination.

Maps stream name to the expected (normalized) table name that was
looked up but not found. Populated alongside `table_statistics`.
`None` when readback was not performed.
"""


def get_smoke_test_source(
*,
Expand Down Expand Up @@ -183,6 +214,41 @@ def get_smoke_test_source(
)


def _prepare_destination_config(
destination: Destination,
) -> None:
"""Prepare the destination config for smoke testing.

The catalog namespace (set on each stream by the source) is the primary
mechanism that directs destinations to write into the test schema.
Modern destinations respect the catalog namespace without needing a
config-level schema override.

This function applies config-level tweaks that are *not* handled by
the catalog namespace:

- **Typing/deduplication enabled** — forces `disable_type_dedupe`
to `False` so the destination creates final typed tables (not just
raw staging). Readback introspection requires final tables.

Note: This mutates the destination's config in place.
"""
config = dict(destination.get_config())
changed = False

# Ensure typing and deduplication are enabled so that final tables
# (not just raw staging) are created for readback inspection.
if config.get("disable_type_dedupe"):
logger.info(
"Forcing 'disable_type_dedupe' to False so final tables are created.",
)
config["disable_type_dedupe"] = False
changed = True

if changed:
destination.set_config(config, validate=False)


def _sanitize_error(ex: Exception) -> str:
"""Extract an error message from an exception without leaking secrets.

Expand All @@ -208,9 +274,12 @@ def run_destination_smoke_test(
Sends synthetic test data from the smoke test source to the specified
destination and returns a structured result.

This function does NOT read back data from the destination or compare
results. It only verifies that the destination accepts the data without
errors.
When the destination has a compatible cache implementation, readback
introspection is automatically performed (even on write failure, to
support partial-success inspection).
The readback produces stats on the written data (table row counts,
column names/types, and per-column null/non-null counts) and is
included in the result as `table_statistics` and `tables_not_found`.

`destination` is a resolved `Destination` object ready for writing.

Expand All @@ -221,8 +290,8 @@ def run_destination_smoke_test(
- A comma-separated string or list of specific scenario names.

`namespace_suffix` is an optional suffix appended to the auto-generated
namespace. Defaults to ``smoke_test`` when not provided
(e.g. ``zz_deleteme_20260318_2256_smoke_test``).
namespace. Defaults to `smoke_test` when not provided
(e.g. `zz_deleteme_20260318_2256_smoke_test`).

`reuse_namespace` is an exact namespace string to reuse from a previous
run. When set, no new namespace is generated.
Expand All @@ -244,12 +313,21 @@ def run_destination_smoke_test(
custom_scenarios_file=custom_scenarios_file,
)

# Capture stream names for readback before the write consumes the source
stream_names = source_obj.get_selected_streams()

# Normalize scenarios to a display string
if isinstance(scenarios, list):
scenarios_str = ",".join(scenarios) if scenarios else "fast"
else:
scenarios_str = scenarios

# Prepare the destination config for smoke testing (e.g. ensure
# disable_type_dedupe is off so final tables are created for readback).
# The catalog namespace on each stream is the primary mechanism that
# directs the destination to write into the test schema.
_prepare_destination_config(destination)

start_time = time.monotonic()
success = False
error_message: str | None = None
Expand All @@ -267,6 +345,32 @@ def run_destination_smoke_test(

elapsed = time.monotonic() - start_time

# Perform readback introspection (runs even on write failure for partial-success support)
table_statistics: dict[str, TableStatistics] | None = None
tables_not_found: dict[str, str] | None = None
if destination.is_cache_supported:
try:
cache = destination.get_sql_cache(schema_name=namespace)
table_statistics = cache.fetch_table_statistics(stream_names)
tables_not_found = {
name: cache.processor.get_sql_table_name(name)
for name in stream_names
if name not in table_statistics
}
except Exception:
logger.warning(
"Readback failed for destination '%s'.",
destination.name,
exc_info=True,
)
else:
logger.info(
"Skipping table and column statistics retrieval for "
"destination '%s' because no SQL interface mapping has been "
"defined.",
destination.name,
)

return DestinationSmokeTestResult(
success=success,
destination=destination.name,
Expand All @@ -275,4 +379,6 @@ def run_destination_smoke_test(
scenarios_requested=scenarios_str,
elapsed_seconds=round(elapsed, 2),
error=error_message,
table_statistics=table_statistics,
tables_not_found=tables_not_found,
)
2 changes: 2 additions & 0 deletions airbyte/caches/_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Cache utility modules for translating between cache and destination configurations."""
Loading
Loading