Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 235 additions & 10 deletions airbyte/_util/destination_smoke_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import json
import logging
import time
from datetime import datetime, timezone
Expand Down Expand Up @@ -98,6 +99,13 @@ class DestinationSmokeTestResult(BaseModel):
error: str | None = None
"""Error message if the smoke test failed."""

preflight_passed: bool | None = None
"""Whether the preflight `basic_types` check passed.

`True` if the preflight check succeeded, `False` if it failed,
`None` if the preflight check was skipped.
"""

table_statistics: dict[str, TableStatistics] | None = None
"""Map of stream name to table statistics (row counts, columns, stats).

Expand All @@ -114,6 +122,14 @@ class DestinationSmokeTestResult(BaseModel):
`None` when readback was not performed.
"""

warnings: list[str] | None = None
"""Non-fatal warnings encountered during the smoke test.

Includes readback errors (e.g. cache connection failures) that
prevented `table_statistics` from being populated, as well as
any other non-fatal issues worth surfacing to the caller.
"""


def get_smoke_test_source(
*,
Expand Down Expand Up @@ -249,25 +265,201 @@ def _prepare_destination_config(
destination.set_config(config, validate=False)


def _extract_trace_error_from_log(ex: Exception) -> str | None:
"""Search the connector log file for a TRACE ERROR `internal_message`.

Many Java-based connectors emit a generic user-facing `message` in
their TRACE ERROR output (e.g. "Something went wrong in the connector")
while the actionable detail lives in the `internal_message` field.
PyAirbyte only captures the `message` in the exception chain, so this
helper reads the connector's log file to recover the `internal_message`.

Returns the `internal_message` string if found, or `None`.
"""
# Walk the exception chain looking for a log_file path
log_file: Path | None = None
current: Exception | None = ex
while current is not None:
if hasattr(current, "log_file") and current.log_file is not None:
log_file = current.log_file
break
current = getattr(current, "original_exception", None)

if log_file is None or not log_file.exists():
return None

# Scan the log file in reverse for the last TRACE ERROR with an
# internal_message. We only need the last one (the fatal error).
try:
lines = log_file.read_text(encoding="utf-8", errors="replace").splitlines()
except OSError:
return None

for line in reversed(lines):
# TRACE messages are logged as JSON-serialised Airbyte messages.
# They may be prefixed by a timestamp; look for the JSON payload.
json_start = line.find('{"type":"TRACE"')
if json_start == -1:
continue
try:
payload = json.loads(line[json_start:])
internal = payload.get("trace", {}).get("error", {}).get("internal_message")
if internal:
return str(internal)
except (json.JSONDecodeError, AttributeError):
continue

return None


def _sanitize_error(ex: Exception) -> str:
"""Extract an error message from an exception without leaking secrets.
"""Extract an actionable error message from an exception without leaking secrets.

Resolution order (first match wins):

1. **Connector log file** - The TRACE ERROR `internal_message` from the
connector's log is typically the most specific error available (e.g.
`password authentication failed for user "bob"`). Many Java-based
connectors emit a generic user-facing `message` while the real
detail lives only in `internal_message`.
2. **Exception chain** - Walks `original_exception` to find the deepest
exception with a message more specific than the top-level wrapper.
3. **Top-level exception** - Falls back to `get_message()` or `str()`.

Uses `get_message()` when available (PyAirbyte exceptions) to avoid
including full config/context in the error string.
"""
# 1. Try the connector log file first — it has the most detail.
trace_msg = _extract_trace_error_from_log(ex)
if trace_msg:
return f"{type(ex).__name__}: {trace_msg}"

# 2. Walk original_exception chain to find the deepest specific message.
deepest = ex
while hasattr(deepest, "original_exception") and deepest.original_exception is not None:
deepest = deepest.original_exception

if deepest is not ex:
deep_msg = (
deepest.get_message()
if hasattr(deepest, "get_message")
else str(deepest) # Standard exceptions (ConnectionError, ValueError, etc.)
)
# Only use the deep message if it's more specific than the wrapper
wrapper_msg = ex.get_message() if hasattr(ex, "get_message") else str(ex)
if deep_msg and deep_msg != wrapper_msg:
return f"{type(ex).__name__}: {deep_msg}"

# 3. Fall back to the top-level exception message.
if hasattr(ex, "get_message"):
return f"{type(ex).__name__}: {ex.get_message()}"
return f"{type(ex).__name__}: {ex}"


def run_destination_smoke_test(
PREFLIGHT_SCENARIO = "basic_types"
"""The predefined scenario whose schema/records are reused for the preflight check."""

PREFLIGHT_STREAM_NAME = "_preflight_basic_types"
"""Stream name used by the preflight write.

This is deliberately different from the predefined `basic_types` stream
so that the preflight data lands in its own table and never collides with
the main smoke-test run."""


def _build_preflight_scenario() -> dict[str, Any]:
"""Build the preflight custom scenario.

Returns a scenario dict that mirrors the predefined `basic_types`
scenario but with the stream name set to `PREFLIGHT_STREAM_NAME`
so the preflight data lands in its own table.

The schema and records are defined inline to avoid a circular import
from `airbyte.cli.smoke_test_source._scenarios`.
"""
return {
"name": PREFLIGHT_STREAM_NAME,
"description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"amount": {"type": "number"},
"is_active": {"type": "boolean"},
},
},
"primary_key": [["id"]],
"records": [
{"id": 1, "name": "Alice", "amount": 100.50, "is_active": True},
{"id": 2, "name": "Bob", "amount": 0.0, "is_active": False},
{"id": 3, "name": "", "amount": -99.99, "is_active": True},
],
}


def _run_preflight(
*,
destination: Destination,
namespace: str,
) -> tuple[bool, str | None]:
"""Run the preflight connectivity check.

Writes a small dataset (based on the `basic_types` scenario) using a
dedicated stream name (`PREFLIGHT_STREAM_NAME`) so the preflight
data never collides with the main smoke-test run.

Returns a tuple of `(passed, error_message)`.

* `(True, None)` when the preflight write succeeded.
* `(False, '<sanitized error>')` when the write failed.

Failures are logged but not raised so the caller can return a
structured result that includes the actionable connector error.
"""
logger.info(
"Running preflight check ('%s') for destination '%s'...",
PREFLIGHT_STREAM_NAME,
destination.name,
)
preflight_scenario = _build_preflight_scenario()
preflight_source = get_smoke_test_source(
scenarios="", # No predefined scenarios
namespace=namespace,
custom_scenarios=[preflight_scenario],
)
try:
destination.write(
source_data=preflight_source,
cache=False,
state_cache=False,
)
except Exception as ex:
sanitized = _sanitize_error(ex)
logger.warning(
"Preflight check failed for destination '%s': %s",
destination.name,
sanitized,
)
return False, sanitized

logger.info(
"Preflight check passed for destination '%s'.",
destination.name,
)
return True, None


def run_destination_smoke_test( # noqa: PLR0914
*,
destination: Destination,
scenarios: str | list[str] = "fast",
namespace_suffix: str | None = None,
reuse_namespace: str | None = None,
custom_scenarios: list[dict[str, Any]] | None = None,
custom_scenarios_file: str | None = None,
skip_preflight: bool = False,
) -> DestinationSmokeTestResult:
"""Run a smoke test against a destination connector.

Expand Down Expand Up @@ -300,12 +492,42 @@ def run_destination_smoke_test(

`custom_scenarios_file` is an optional path to a JSON/YAML file with
additional scenario definitions.

`skip_preflight` disables the automatic `basic_types` preflight check.
"""
# Determine namespace
namespace = reuse_namespace or generate_namespace(
namespace_suffix=namespace_suffix,
)

# Prepare the destination config for smoke testing (e.g. ensure
# disable_type_dedupe is off so final tables are created for readback).
_prepare_destination_config(destination)

# --- Preflight check ---------------------------------------------------
preflight_passed: bool | None = None
if not skip_preflight:
preflight_passed, preflight_error = _run_preflight(
destination=destination,
namespace=namespace,
)
if not preflight_passed:
return DestinationSmokeTestResult(
success=False,
destination=destination.name,
namespace=namespace,
records_delivered=0,
scenarios_requested=(
",".join(scenarios) if isinstance(scenarios, list) else scenarios
),
elapsed_seconds=0.0,
error=(
f"Preflight check failed for '{PREFLIGHT_STREAM_NAME}': "
f"{preflight_error or 'unknown error'}"
),
preflight_passed=False,
)

source_obj = get_smoke_test_source(
scenarios=scenarios,
namespace=namespace,
Expand All @@ -322,12 +544,6 @@ def run_destination_smoke_test(
else:
scenarios_str = scenarios

# Prepare the destination config for smoke testing (e.g. ensure
# disable_type_dedupe is off so final tables are created for readback).
# The catalog namespace on each stream is the primary mechanism that
# directs the destination to write into the test schema.
_prepare_destination_config(destination)

start_time = time.monotonic()
success = False
error_message: str | None = None
Expand All @@ -348,6 +564,7 @@ def run_destination_smoke_test(
# Perform readback introspection (runs even on write failure for partial-success support)
table_statistics: dict[str, TableStatistics] | None = None
tables_not_found: dict[str, str] | None = None
readback_warnings: list[str] = []
if destination.is_cache_supported:
try:
cache = destination.get_sql_cache(schema_name=namespace)
Expand All @@ -357,10 +574,16 @@ def run_destination_smoke_test(
for name in stream_names
if name not in table_statistics
}
except Exception:
except Exception as readback_ex:
sanitized_readback = _sanitize_error(readback_ex)
readback_msg = (
f"Readback failed for destination '{destination.name}': {sanitized_readback}"
)
readback_warnings.append(readback_msg)
logger.warning(
"Readback failed for destination '%s'.",
"Readback failed for destination '%s': %s",
destination.name,
sanitized_readback,
exc_info=True,
)
else:
Expand All @@ -379,6 +602,8 @@ def run_destination_smoke_test(
scenarios_requested=scenarios_str,
elapsed_seconds=round(elapsed, 2),
error=error_message,
preflight_passed=preflight_passed,
table_statistics=table_statistics,
tables_not_found=tables_not_found,
warnings=readback_warnings or None,
)
14 changes: 13 additions & 1 deletion airbyte/cli/pyab.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,17 @@ def sync(
"Useful for running a second test against an already-populated namespace."
),
)
def destination_smoke_test(
@click.option(
"--skip-preflight",
is_flag=True,
default=False,
help=(
"Skip the automatic preflight check that runs basic_types before "
"the requested scenarios. Use when you expect basic_types itself to fail "
"or want to save time on repeated runs."
),
)
def destination_smoke_test( # noqa: PLR0913
*,
destination: str,
config: str | None = None,
Expand All @@ -720,6 +730,7 @@ def destination_smoke_test(
custom_scenarios: str | None = None,
namespace_suffix: str | None = None,
reuse_namespace: str | None = None,
skip_preflight: bool = False,
) -> None:
"""Run smoke tests against a destination connector.

Expand Down Expand Up @@ -770,6 +781,7 @@ def destination_smoke_test(
namespace_suffix=namespace_suffix,
reuse_namespace=reuse_namespace,
custom_scenarios_file=custom_scenarios,
skip_preflight=skip_preflight,
)

click.echo(json.dumps(result.model_dump(), indent=2))
Expand Down
12 changes: 12 additions & 0 deletions airbyte/mcp/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,17 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917
default=None,
),
],
skip_preflight: Annotated[
bool,
Field(
description=(
"Skip the automatic preflight check that runs basic_types before "
"the requested scenarios. Set to true when you expect basic_types "
"itself to fail or want to save time on repeated runs."
),
default=False,
),
],
) -> DestinationSmokeTestResult:
"""Run smoke tests against a destination connector.

Expand Down Expand Up @@ -949,6 +960,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917
namespace_suffix=namespace_suffix,
reuse_namespace=reuse_namespace,
custom_scenarios=custom_scenarios,
skip_preflight=skip_preflight,
)


Expand Down
Loading