diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 8f6e6dffe..8990d6021 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -17,6 +17,7 @@ from __future__ import annotations +import json import logging import time from datetime import datetime, timezone @@ -98,6 +99,13 @@ class DestinationSmokeTestResult(BaseModel): error: str | None = None """Error message if the smoke test failed.""" + preflight_passed: bool | None = None + """Whether the preflight `basic_types` check passed. + + `True` if the preflight check succeeded, `False` if it failed, + `None` if the preflight check was skipped. + """ + table_statistics: dict[str, TableStatistics] | None = None """Map of stream name to table statistics (row counts, columns, stats). @@ -114,6 +122,14 @@ class DestinationSmokeTestResult(BaseModel): `None` when readback was not performed. """ + warnings: list[str] | None = None + """Non-fatal warnings encountered during the smoke test. + + Includes readback errors (e.g. cache connection failures) that + prevented `table_statistics` from being populated, as well as + any other non-fatal issues worth surfacing to the caller. + """ + def get_smoke_test_source( *, @@ -249,18 +265,193 @@ def _prepare_destination_config( destination.set_config(config, validate=False) +def _extract_trace_error_from_log(ex: Exception) -> str | None: + """Search the connector log file for a TRACE ERROR `internal_message`. + + Many Java-based connectors emit a generic user-facing `message` in + their TRACE ERROR output (e.g. "Something went wrong in the connector") + while the actionable detail lives in the `internal_message` field. + PyAirbyte only captures the `message` in the exception chain, so this + helper reads the connector's log file to recover the `internal_message`. + + Returns the `internal_message` string if found, or `None`. + """ + # Walk the exception chain looking for a log_file path + log_file: Path | None = None + current: Exception | None = ex + while current is not None: + if hasattr(current, "log_file") and current.log_file is not None: + log_file = current.log_file + break + current = getattr(current, "original_exception", None) + + if log_file is None or not log_file.exists(): + return None + + # Scan the log file in reverse for the last TRACE ERROR with an + # internal_message. We only need the last one (the fatal error). + try: + lines = log_file.read_text(encoding="utf-8", errors="replace").splitlines() + except OSError: + return None + + for line in reversed(lines): + # TRACE messages are logged as JSON-serialised Airbyte messages. + # They may be prefixed by a timestamp; look for the JSON payload. + json_start = line.find('{"type":"TRACE"') + if json_start == -1: + continue + try: + payload = json.loads(line[json_start:]) + internal = payload.get("trace", {}).get("error", {}).get("internal_message") + if internal: + return str(internal) + except (json.JSONDecodeError, AttributeError): + continue + + return None + + def _sanitize_error(ex: Exception) -> str: - """Extract an error message from an exception without leaking secrets. + """Extract an actionable error message from an exception without leaking secrets. + + Resolution order (first match wins): + + 1. **Connector log file** - The TRACE ERROR `internal_message` from the + connector's log is typically the most specific error available (e.g. + `password authentication failed for user "bob"`). Many Java-based + connectors emit a generic user-facing `message` while the real + detail lives only in `internal_message`. + 2. **Exception chain** - Walks `original_exception` to find the deepest + exception with a message more specific than the top-level wrapper. + 3. **Top-level exception** - Falls back to `get_message()` or `str()`. Uses `get_message()` when available (PyAirbyte exceptions) to avoid including full config/context in the error string. """ + # 1. Try the connector log file first — it has the most detail. + trace_msg = _extract_trace_error_from_log(ex) + if trace_msg: + return f"{type(ex).__name__}: {trace_msg}" + + # 2. Walk original_exception chain to find the deepest specific message. + deepest = ex + while hasattr(deepest, "original_exception") and deepest.original_exception is not None: + deepest = deepest.original_exception + + if deepest is not ex: + deep_msg = ( + deepest.get_message() + if hasattr(deepest, "get_message") + else str(deepest) # Standard exceptions (ConnectionError, ValueError, etc.) + ) + # Only use the deep message if it's more specific than the wrapper + wrapper_msg = ex.get_message() if hasattr(ex, "get_message") else str(ex) + if deep_msg and deep_msg != wrapper_msg: + return f"{type(ex).__name__}: {deep_msg}" + + # 3. Fall back to the top-level exception message. if hasattr(ex, "get_message"): return f"{type(ex).__name__}: {ex.get_message()}" return f"{type(ex).__name__}: {ex}" -def run_destination_smoke_test( +PREFLIGHT_SCENARIO = "basic_types" +"""The predefined scenario whose schema/records are reused for the preflight check.""" + +PREFLIGHT_STREAM_NAME = "_preflight_basic_types" +"""Stream name used by the preflight write. + +This is deliberately different from the predefined `basic_types` stream +so that the preflight data lands in its own table and never collides with +the main smoke-test run.""" + + +def _build_preflight_scenario() -> dict[str, Any]: + """Build the preflight custom scenario. + + Returns a scenario dict that mirrors the predefined `basic_types` + scenario but with the stream name set to `PREFLIGHT_STREAM_NAME` + so the preflight data lands in its own table. + + The schema and records are defined inline to avoid a circular import + from `airbyte.cli.smoke_test_source._scenarios`. + """ + return { + "name": PREFLIGHT_STREAM_NAME, + "description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "amount": {"type": "number"}, + "is_active": {"type": "boolean"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "name": "Alice", "amount": 100.50, "is_active": True}, + {"id": 2, "name": "Bob", "amount": 0.0, "is_active": False}, + {"id": 3, "name": "", "amount": -99.99, "is_active": True}, + ], + } + + +def _run_preflight( + *, + destination: Destination, + namespace: str, +) -> tuple[bool, str | None]: + """Run the preflight connectivity check. + + Writes a small dataset (based on the `basic_types` scenario) using a + dedicated stream name (`PREFLIGHT_STREAM_NAME`) so the preflight + data never collides with the main smoke-test run. + + Returns a tuple of `(passed, error_message)`. + + * `(True, None)` when the preflight write succeeded. + * `(False, '')` when the write failed. + + Failures are logged but not raised so the caller can return a + structured result that includes the actionable connector error. + """ + logger.info( + "Running preflight check ('%s') for destination '%s'...", + PREFLIGHT_STREAM_NAME, + destination.name, + ) + preflight_scenario = _build_preflight_scenario() + preflight_source = get_smoke_test_source( + scenarios="", # No predefined scenarios + namespace=namespace, + custom_scenarios=[preflight_scenario], + ) + try: + destination.write( + source_data=preflight_source, + cache=False, + state_cache=False, + ) + except Exception as ex: + sanitized = _sanitize_error(ex) + logger.warning( + "Preflight check failed for destination '%s': %s", + destination.name, + sanitized, + ) + return False, sanitized + + logger.info( + "Preflight check passed for destination '%s'.", + destination.name, + ) + return True, None + + +def run_destination_smoke_test( # noqa: PLR0914 *, destination: Destination, scenarios: str | list[str] = "fast", @@ -268,6 +459,7 @@ def run_destination_smoke_test( reuse_namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, + skip_preflight: bool = False, ) -> DestinationSmokeTestResult: """Run a smoke test against a destination connector. @@ -300,12 +492,42 @@ def run_destination_smoke_test( `custom_scenarios_file` is an optional path to a JSON/YAML file with additional scenario definitions. + + `skip_preflight` disables the automatic `basic_types` preflight check. """ # Determine namespace namespace = reuse_namespace or generate_namespace( namespace_suffix=namespace_suffix, ) + # Prepare the destination config for smoke testing (e.g. ensure + # disable_type_dedupe is off so final tables are created for readback). + _prepare_destination_config(destination) + + # --- Preflight check --------------------------------------------------- + preflight_passed: bool | None = None + if not skip_preflight: + preflight_passed, preflight_error = _run_preflight( + destination=destination, + namespace=namespace, + ) + if not preflight_passed: + return DestinationSmokeTestResult( + success=False, + destination=destination.name, + namespace=namespace, + records_delivered=0, + scenarios_requested=( + ",".join(scenarios) if isinstance(scenarios, list) else scenarios + ), + elapsed_seconds=0.0, + error=( + f"Preflight check failed for '{PREFLIGHT_STREAM_NAME}': " + f"{preflight_error or 'unknown error'}" + ), + preflight_passed=False, + ) + source_obj = get_smoke_test_source( scenarios=scenarios, namespace=namespace, @@ -322,12 +544,6 @@ def run_destination_smoke_test( else: scenarios_str = scenarios - # Prepare the destination config for smoke testing (e.g. ensure - # disable_type_dedupe is off so final tables are created for readback). - # The catalog namespace on each stream is the primary mechanism that - # directs the destination to write into the test schema. - _prepare_destination_config(destination) - start_time = time.monotonic() success = False error_message: str | None = None @@ -348,6 +564,7 @@ def run_destination_smoke_test( # Perform readback introspection (runs even on write failure for partial-success support) table_statistics: dict[str, TableStatistics] | None = None tables_not_found: dict[str, str] | None = None + readback_warnings: list[str] = [] if destination.is_cache_supported: try: cache = destination.get_sql_cache(schema_name=namespace) @@ -357,10 +574,16 @@ def run_destination_smoke_test( for name in stream_names if name not in table_statistics } - except Exception: + except Exception as readback_ex: + sanitized_readback = _sanitize_error(readback_ex) + readback_msg = ( + f"Readback failed for destination '{destination.name}': {sanitized_readback}" + ) + readback_warnings.append(readback_msg) logger.warning( - "Readback failed for destination '%s'.", + "Readback failed for destination '%s': %s", destination.name, + sanitized_readback, exc_info=True, ) else: @@ -379,6 +602,8 @@ def run_destination_smoke_test( scenarios_requested=scenarios_str, elapsed_seconds=round(elapsed, 2), error=error_message, + preflight_passed=preflight_passed, table_statistics=table_statistics, tables_not_found=tables_not_found, + warnings=readback_warnings or None, ) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index f179ff307..2d16d0f21 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -710,7 +710,17 @@ def sync( "Useful for running a second test against an already-populated namespace." ), ) -def destination_smoke_test( +@click.option( + "--skip-preflight", + is_flag=True, + default=False, + help=( + "Skip the automatic preflight check that runs basic_types before " + "the requested scenarios. Use when you expect basic_types itself to fail " + "or want to save time on repeated runs." + ), +) +def destination_smoke_test( # noqa: PLR0913 *, destination: str, config: str | None = None, @@ -720,6 +730,7 @@ def destination_smoke_test( custom_scenarios: str | None = None, namespace_suffix: str | None = None, reuse_namespace: str | None = None, + skip_preflight: bool = False, ) -> None: """Run smoke tests against a destination connector. @@ -770,6 +781,7 @@ def destination_smoke_test( namespace_suffix=namespace_suffix, reuse_namespace=reuse_namespace, custom_scenarios_file=custom_scenarios, + skip_preflight=skip_preflight, ) click.echo(json.dumps(result.model_dump(), indent=2)) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 06631f55f..d9bbb801f 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -901,6 +901,17 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 default=None, ), ], + skip_preflight: Annotated[ + bool, + Field( + description=( + "Skip the automatic preflight check that runs basic_types before " + "the requested scenarios. Set to true when you expect basic_types " + "itself to fail or want to save time on repeated runs." + ), + default=False, + ), + ], ) -> DestinationSmokeTestResult: """Run smoke tests against a destination connector. @@ -949,6 +960,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 namespace_suffix=namespace_suffix, reuse_namespace=reuse_namespace, custom_scenarios=custom_scenarios, + skip_preflight=skip_preflight, )