From ab5c061f3e9c1102dc9ed78fbb7ae4272e309e9a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 19:26:44 +0000 Subject: [PATCH 1/9] feat: Add preflight check to destination smoke test tool - Run basic_types scenario before requested scenarios to validate credentials and connectivity - Return early with structured error if preflight fails - Add skip_preflight parameter to core function, MCP tool, and CLI - Improve error surfacing by walking exception chain to find the actual connector error message instead of generic wrapper - Add preflight_passed field to DestinationSmokeTestResult model Closes #1006 Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 109 +++++++++++++++++++++-- airbyte/cli/pyab.py | 14 ++- airbyte/mcp/local.py | 12 +++ 3 files changed, 127 insertions(+), 8 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 8f6e6dffe..cf1535ff6 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -98,6 +98,13 @@ class DestinationSmokeTestResult(BaseModel): error: str | None = None """Error message if the smoke test failed.""" + preflight_passed: bool | None = None + """Whether the preflight `basic_types` check passed. + + `True` if the preflight check succeeded, `False` if it failed, + `None` if the preflight check was skipped. + """ + table_statistics: dict[str, TableStatistics] | None = None """Map of stream name to table statistics (row counts, columns, stats). @@ -250,16 +257,77 @@ def _prepare_destination_config( def _sanitize_error(ex: Exception) -> str: - """Extract an error message from an exception without leaking secrets. + """Extract an actionable error message from an exception without leaking secrets. + + Walks the exception chain (`original_exception`, then `__cause__`) to + find the most specific error message available, which is typically the + actual connector error rather than the generic wrapper message. Uses `get_message()` when available (PyAirbyte exceptions) to avoid including full config/context in the error string. """ + # Walk original_exception chain to find the deepest specific message + deepest = ex + while hasattr(deepest, "original_exception") and deepest.original_exception is not None: + deepest = deepest.original_exception + + # If we found a deeper exception with a specific message, use it + if deepest is not ex and hasattr(deepest, "get_message"): + deep_msg = deepest.get_message() + # Only use the deep message if it's more specific than the wrapper + if deep_msg and hasattr(ex, "get_message") and deep_msg != ex.get_message(): + return f"{type(ex).__name__}: {deep_msg}" + if hasattr(ex, "get_message"): return f"{type(ex).__name__}: {ex.get_message()}" return f"{type(ex).__name__}: {ex}" +PREFLIGHT_SCENARIO = "basic_types" +"""The scenario used as a preflight connectivity/credential check.""" + + +def _run_preflight( + *, + destination: Destination, + namespace: str, +) -> bool: + """Run the preflight `basic_types` scenario to validate connectivity. + + Returns `True` if the preflight write succeeded, `False` otherwise. + Failures are logged but not raised so the caller can return a + structured result. + """ + logger.info( + "Running preflight check ('%s') for destination '%s'...", + PREFLIGHT_SCENARIO, + destination.name, + ) + preflight_source = get_smoke_test_source( + scenarios=[PREFLIGHT_SCENARIO], + namespace=namespace, + ) + try: + destination.write( + source_data=preflight_source, + cache=False, + state_cache=False, + ) + except Exception as ex: + logger.warning( + "Preflight check failed for destination '%s': %s", + destination.name, + _sanitize_error(ex), + ) + return False + + logger.info( + "Preflight check passed for destination '%s'.", + destination.name, + ) + return True + + def run_destination_smoke_test( *, destination: Destination, @@ -268,6 +336,7 @@ def run_destination_smoke_test( reuse_namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, + skip_preflight: bool = False, ) -> DestinationSmokeTestResult: """Run a smoke test against a destination connector. @@ -300,12 +369,43 @@ def run_destination_smoke_test( `custom_scenarios_file` is an optional path to a JSON/YAML file with additional scenario definitions. + + `skip_preflight` disables the automatic `basic_types` preflight check. """ # Determine namespace namespace = reuse_namespace or generate_namespace( namespace_suffix=namespace_suffix, ) + # Prepare the destination config for smoke testing (e.g. ensure + # disable_type_dedupe is off so final tables are created for readback). + _prepare_destination_config(destination) + + # --- Preflight check --------------------------------------------------- + preflight_passed: bool | None = None + if not skip_preflight: + preflight_passed = _run_preflight( + destination=destination, + namespace=namespace, + ) + if not preflight_passed: + return DestinationSmokeTestResult( + success=False, + destination=destination.name, + namespace=namespace, + records_delivered=0, + scenarios_requested=( + ",".join(scenarios) if isinstance(scenarios, list) else scenarios + ), + elapsed_seconds=0.0, + error=( + f"Preflight check failed: {PREFLIGHT_SCENARIO} scenario could not " + "write to destination. Verify credentials and connectivity " + "before testing other scenarios." + ), + preflight_passed=False, + ) + source_obj = get_smoke_test_source( scenarios=scenarios, namespace=namespace, @@ -322,12 +422,6 @@ def run_destination_smoke_test( else: scenarios_str = scenarios - # Prepare the destination config for smoke testing (e.g. ensure - # disable_type_dedupe is off so final tables are created for readback). - # The catalog namespace on each stream is the primary mechanism that - # directs the destination to write into the test schema. - _prepare_destination_config(destination) - start_time = time.monotonic() success = False error_message: str | None = None @@ -379,6 +473,7 @@ def run_destination_smoke_test( scenarios_requested=scenarios_str, elapsed_seconds=round(elapsed, 2), error=error_message, + preflight_passed=preflight_passed, table_statistics=table_statistics, tables_not_found=tables_not_found, ) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index f179ff307..2d16d0f21 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -710,7 +710,17 @@ def sync( "Useful for running a second test against an already-populated namespace." ), ) -def destination_smoke_test( +@click.option( + "--skip-preflight", + is_flag=True, + default=False, + help=( + "Skip the automatic preflight check that runs basic_types before " + "the requested scenarios. Use when you expect basic_types itself to fail " + "or want to save time on repeated runs." + ), +) +def destination_smoke_test( # noqa: PLR0913 *, destination: str, config: str | None = None, @@ -720,6 +730,7 @@ def destination_smoke_test( custom_scenarios: str | None = None, namespace_suffix: str | None = None, reuse_namespace: str | None = None, + skip_preflight: bool = False, ) -> None: """Run smoke tests against a destination connector. @@ -770,6 +781,7 @@ def destination_smoke_test( namespace_suffix=namespace_suffix, reuse_namespace=reuse_namespace, custom_scenarios_file=custom_scenarios, + skip_preflight=skip_preflight, ) click.echo(json.dumps(result.model_dump(), indent=2)) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 06631f55f..d9bbb801f 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -901,6 +901,17 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 default=None, ), ], + skip_preflight: Annotated[ + bool, + Field( + description=( + "Skip the automatic preflight check that runs basic_types before " + "the requested scenarios. Set to true when you expect basic_types " + "itself to fail or want to save time on repeated runs." + ), + default=False, + ), + ], ) -> DestinationSmokeTestResult: """Run smoke tests against a destination connector. @@ -949,6 +960,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 namespace_suffix=namespace_suffix, reuse_namespace=reuse_namespace, custom_scenarios=custom_scenarios, + skip_preflight=skip_preflight, ) From c72dbcb86cfdd6e37bb042c8cb1195f29af69c17 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 19:33:58 +0000 Subject: [PATCH 2/9] feat: Add warnings field for readback errors and improve error chain walking - Add 'warnings' field to DestinationSmokeTestResult to surface readback failures (e.g. cache connection issues) that cause table_statistics to be null - Improve _sanitize_error to handle standard Python exceptions (ConnectionError, ValueError, etc.) in the exception chain, not just PyAirbyte exceptions with get_message() Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 31 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index cf1535ff6..03a170f0f 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -121,6 +121,14 @@ class DestinationSmokeTestResult(BaseModel): `None` when readback was not performed. """ + warnings: list[str] | None = None + """Non-fatal warnings encountered during the smoke test. + + Includes readback errors (e.g. cache connection failures) that + prevented `table_statistics` from being populated, as well as + any other non-fatal issues worth surfacing to the caller. + """ + def get_smoke_test_source( *, @@ -272,8 +280,12 @@ def _sanitize_error(ex: Exception) -> str: deepest = deepest.original_exception # If we found a deeper exception with a specific message, use it - if deepest is not ex and hasattr(deepest, "get_message"): - deep_msg = deepest.get_message() + if deepest is not ex: + deep_msg = ( + deepest.get_message() + if hasattr(deepest, "get_message") + else str(deepest) # Standard exceptions (ConnectionError, ValueError, etc.) + ) # Only use the deep message if it's more specific than the wrapper if deep_msg and hasattr(ex, "get_message") and deep_msg != ex.get_message(): return f"{type(ex).__name__}: {deep_msg}" @@ -328,7 +340,7 @@ def _run_preflight( return True -def run_destination_smoke_test( +def run_destination_smoke_test( # noqa: PLR0914 *, destination: Destination, scenarios: str | list[str] = "fast", @@ -442,6 +454,7 @@ def run_destination_smoke_test( # Perform readback introspection (runs even on write failure for partial-success support) table_statistics: dict[str, TableStatistics] | None = None tables_not_found: dict[str, str] | None = None + readback_warnings: list[str] = [] if destination.is_cache_supported: try: cache = destination.get_sql_cache(schema_name=namespace) @@ -451,12 +464,13 @@ def run_destination_smoke_test( for name in stream_names if name not in table_statistics } - except Exception: - logger.warning( - "Readback failed for destination '%s'.", - destination.name, - exc_info=True, + except Exception as readback_ex: + readback_msg = ( + f"Readback failed for destination '{destination.name}': " + f"{_sanitize_error(readback_ex)}" ) + readback_warnings.append(readback_msg) + logger.warning(readback_msg, exc_info=True) else: logger.info( "Skipping table and column statistics retrieval for " @@ -476,4 +490,5 @@ def run_destination_smoke_test( preflight_passed=preflight_passed, table_statistics=table_statistics, tables_not_found=tables_not_found, + warnings=readback_warnings or None, ) From 0c851c2955d7ccf7b2930187f679d11693502572 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 19:42:13 +0000 Subject: [PATCH 3/9] fix: Surface actual connector error in preflight failure result _run_preflight() now returns (bool, str | None) so the caller includes the sanitized connector error in the structured result instead of a generic 'verify credentials' message. Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 03a170f0f..56428c01a 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -303,12 +303,16 @@ def _run_preflight( *, destination: Destination, namespace: str, -) -> bool: +) -> tuple[bool, str | None]: """Run the preflight `basic_types` scenario to validate connectivity. - Returns `True` if the preflight write succeeded, `False` otherwise. + Returns a tuple of ``(passed, error_message)``. + + * ``(True, None)`` when the preflight write succeeded. + * ``(False, '')`` when the write failed. + Failures are logged but not raised so the caller can return a - structured result. + structured result that includes the actionable connector error. """ logger.info( "Running preflight check ('%s') for destination '%s'...", @@ -326,18 +330,19 @@ def _run_preflight( state_cache=False, ) except Exception as ex: + sanitized = _sanitize_error(ex) logger.warning( "Preflight check failed for destination '%s': %s", destination.name, - _sanitize_error(ex), + sanitized, ) - return False + return False, sanitized logger.info( "Preflight check passed for destination '%s'.", destination.name, ) - return True + return True, None def run_destination_smoke_test( # noqa: PLR0914 @@ -396,7 +401,7 @@ def run_destination_smoke_test( # noqa: PLR0914 # --- Preflight check --------------------------------------------------- preflight_passed: bool | None = None if not skip_preflight: - preflight_passed = _run_preflight( + preflight_passed, preflight_error = _run_preflight( destination=destination, namespace=namespace, ) @@ -411,9 +416,8 @@ def run_destination_smoke_test( # noqa: PLR0914 ), elapsed_seconds=0.0, error=( - f"Preflight check failed: {PREFLIGHT_SCENARIO} scenario could not " - "write to destination. Verify credentials and connectivity " - "before testing other scenarios." + f"Preflight check failed for '{PREFLIGHT_SCENARIO}': " + f"{preflight_error or 'unknown error'}" ), preflight_passed=False, ) From 192a65c0298e6f9fe9b8c060310a2ae7f35b3c9d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 20:34:04 +0000 Subject: [PATCH 4/9] feat: Extract TRACE ERROR internal_message from connector log for better error surfacing Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 72 ++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 56428c01a..4e4c6df67 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -17,6 +17,7 @@ from __future__ import annotations +import json import logging import time from datetime import datetime, timezone @@ -264,22 +265,80 @@ def _prepare_destination_config( destination.set_config(config, validate=False) +def _extract_trace_error_from_log(ex: Exception) -> str | None: + """Search the connector log file for a TRACE ERROR ``internal_message``. + + Many Java-based connectors emit a generic user-facing ``message`` in + their TRACE ERROR output (e.g. "Something went wrong in the connector") + while the actionable detail lives in the ``internal_message`` field. + PyAirbyte only captures the ``message`` in the exception chain, so this + helper reads the connector's log file to recover the ``internal_message``. + + Returns the ``internal_message`` string if found, or ``None``. + """ + # Walk the exception chain looking for a log_file path + log_file: Path | None = None + current: Exception | None = ex + while current is not None: + if hasattr(current, "log_file") and current.log_file is not None: + log_file = current.log_file + break + current = getattr(current, "original_exception", None) + + if log_file is None or not log_file.exists(): + return None + + # Scan the log file in reverse for the last TRACE ERROR with an + # internal_message. We only need the last one (the fatal error). + try: + lines = log_file.read_text().splitlines() + except OSError: + return None + + for line in reversed(lines): + # TRACE messages are logged as JSON-serialised Airbyte messages. + # They may be prefixed by a timestamp; look for the JSON payload. + json_start = line.find('{"type":"TRACE"') + if json_start == -1: + continue + try: + payload = json.loads(line[json_start:]) + internal = payload.get("trace", {}).get("error", {}).get("internal_message") + if internal: + return str(internal) + except (json.JSONDecodeError, AttributeError): + continue + + return None + + def _sanitize_error(ex: Exception) -> str: """Extract an actionable error message from an exception without leaking secrets. - Walks the exception chain (`original_exception`, then `__cause__`) to - find the most specific error message available, which is typically the - actual connector error rather than the generic wrapper message. + Resolution order (first match wins): - Uses `get_message()` when available (PyAirbyte exceptions) to avoid + 1. **Connector log file** - The TRACE ERROR ``internal_message`` from the + connector's log is typically the most specific error available (e.g. + ``password authentication failed for user "bob"``). Many Java-based + connectors emit a generic user-facing ``message`` while the real + detail lives only in ``internal_message``. + 2. **Exception chain** - Walks ``original_exception`` to find the deepest + exception with a message more specific than the top-level wrapper. + 3. **Top-level exception** - Falls back to ``get_message()`` or ``str()``. + + Uses ``get_message()`` when available (PyAirbyte exceptions) to avoid including full config/context in the error string. """ - # Walk original_exception chain to find the deepest specific message + # 1. Try the connector log file first — it has the most detail. + trace_msg = _extract_trace_error_from_log(ex) + if trace_msg: + return f"{type(ex).__name__}: {trace_msg}" + + # 2. Walk original_exception chain to find the deepest specific message. deepest = ex while hasattr(deepest, "original_exception") and deepest.original_exception is not None: deepest = deepest.original_exception - # If we found a deeper exception with a specific message, use it if deepest is not ex: deep_msg = ( deepest.get_message() @@ -290,6 +349,7 @@ def _sanitize_error(ex: Exception) -> str: if deep_msg and hasattr(ex, "get_message") and deep_msg != ex.get_message(): return f"{type(ex).__name__}: {deep_msg}" + # 3. Fall back to the top-level exception message. if hasattr(ex, "get_message"): return f"{type(ex).__name__}: {ex.get_message()}" return f"{type(ex).__name__}: {ex}" From a89d3b6d930eb6803e07bcde088006e84c9acf57 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 20:50:43 +0000 Subject: [PATCH 5/9] fix: Use dedicated stream name for preflight to avoid collision with main run Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 58 ++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 4e4c6df67..fa877c231 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -28,6 +28,7 @@ from pydantic import BaseModel from airbyte import get_source +from airbyte.cli.smoke_test_source._scenarios import PREDEFINED_SCENARIOS from airbyte.exceptions import PyAirbyteInputError from airbyte.shared.sql_processor import TableStatistics # noqa: TC001 # Pydantic needs at runtime @@ -356,7 +357,48 @@ def _sanitize_error(ex: Exception) -> str: PREFLIGHT_SCENARIO = "basic_types" -"""The scenario used as a preflight connectivity/credential check.""" +"""The predefined scenario whose schema/records are reused for the preflight check.""" + +PREFLIGHT_STREAM_NAME = "_preflight_basic_types" +"""Stream name used by the preflight write. + +This is deliberately different from the predefined ``basic_types`` stream +so that the preflight data lands in its own table and never collides with +the main smoke-test run.""" + + +def _build_preflight_scenario() -> dict[str, Any]: + """Build the preflight custom scenario from the predefined ``basic_types`` scenario. + + Returns a scenario dict identical to ``basic_types`` but with the stream + name set to :data:`PREFLIGHT_STREAM_NAME` so the preflight data lands in + its own table. + """ + for scenario in PREDEFINED_SCENARIOS: + if scenario["name"] == PREFLIGHT_SCENARIO: + return { + "name": PREFLIGHT_STREAM_NAME, + "description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').", + "json_schema": scenario["json_schema"], + "primary_key": scenario.get("primary_key"), + "records": list(scenario.get("records", [])), + } + + # Fallback: if basic_types is somehow missing, use a minimal schema. + return { + "name": PREFLIGHT_STREAM_NAME, + "description": "Preflight connectivity check.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [{"id": 1, "value": "preflight"}], + } def _run_preflight( @@ -364,7 +406,11 @@ def _run_preflight( destination: Destination, namespace: str, ) -> tuple[bool, str | None]: - """Run the preflight `basic_types` scenario to validate connectivity. + """Run the preflight connectivity check. + + Writes a small dataset (based on the ``basic_types`` scenario) using a + dedicated stream name (:data:`PREFLIGHT_STREAM_NAME`) so the preflight + data never collides with the main smoke-test run. Returns a tuple of ``(passed, error_message)``. @@ -376,12 +422,14 @@ def _run_preflight( """ logger.info( "Running preflight check ('%s') for destination '%s'...", - PREFLIGHT_SCENARIO, + PREFLIGHT_STREAM_NAME, destination.name, ) + preflight_scenario = _build_preflight_scenario() preflight_source = get_smoke_test_source( - scenarios=[PREFLIGHT_SCENARIO], + scenarios=[], # No predefined scenarios namespace=namespace, + custom_scenarios=[preflight_scenario], ) try: destination.write( @@ -476,7 +524,7 @@ def run_destination_smoke_test( # noqa: PLR0914 ), elapsed_seconds=0.0, error=( - f"Preflight check failed for '{PREFLIGHT_SCENARIO}': " + f"Preflight check failed for '{PREFLIGHT_STREAM_NAME}': " f"{preflight_error or 'unknown error'}" ), preflight_passed=False, From 7eafafef32f29777756782e0bf7cfa726c8d3813 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 20:53:39 +0000 Subject: [PATCH 6/9] fix: Remove circular import from _scenarios module The top-level import of PREDEFINED_SCENARIOS from airbyte.cli.smoke_test_source._scenarios caused a circular import that broke test collection. Replace with inline scenario data (mirrors the basic_types schema) to avoid the cycle. Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 35 +++++++++++------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index fa877c231..3e602a909 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -28,7 +28,6 @@ from pydantic import BaseModel from airbyte import get_source -from airbyte.cli.smoke_test_source._scenarios import PREDEFINED_SCENARIOS from airbyte.exceptions import PyAirbyteInputError from airbyte.shared.sql_processor import TableStatistics # noqa: TC001 # Pydantic needs at runtime @@ -368,36 +367,34 @@ def _sanitize_error(ex: Exception) -> str: def _build_preflight_scenario() -> dict[str, Any]: - """Build the preflight custom scenario from the predefined ``basic_types`` scenario. + """Build the preflight custom scenario. - Returns a scenario dict identical to ``basic_types`` but with the stream - name set to :data:`PREFLIGHT_STREAM_NAME` so the preflight data lands in - its own table. - """ - for scenario in PREDEFINED_SCENARIOS: - if scenario["name"] == PREFLIGHT_SCENARIO: - return { - "name": PREFLIGHT_STREAM_NAME, - "description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').", - "json_schema": scenario["json_schema"], - "primary_key": scenario.get("primary_key"), - "records": list(scenario.get("records", [])), - } + Returns a scenario dict that mirrors the predefined ``basic_types`` + scenario but with the stream name set to :data:`PREFLIGHT_STREAM_NAME` + so the preflight data lands in its own table. - # Fallback: if basic_types is somehow missing, use a minimal schema. + The schema and records are defined inline to avoid a circular import + from :mod:`airbyte.cli.smoke_test_source._scenarios`. + """ return { "name": PREFLIGHT_STREAM_NAME, - "description": "Preflight connectivity check.", + "description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').", "json_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "id": {"type": "integer"}, - "value": {"type": "string"}, + "name": {"type": "string"}, + "amount": {"type": "number"}, + "is_active": {"type": "boolean"}, }, }, "primary_key": [["id"]], - "records": [{"id": 1, "value": "preflight"}], + "records": [ + {"id": 1, "name": "Alice", "amount": 100.50, "is_active": True}, + {"id": 2, "name": "Bob", "amount": 0.0, "is_active": False}, + {"id": 3, "name": "", "amount": -99.99, "is_active": True}, + ], } From 7ed0018f46633f38c5a0b5231bc90a95bd060a80 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 21:00:00 +0000 Subject: [PATCH 7/9] fix: Prevent preflight from expanding to all fast scenarios and use static logger template Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 3e602a909..69c6d1da7 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -424,7 +424,7 @@ def _run_preflight( ) preflight_scenario = _build_preflight_scenario() preflight_source = get_smoke_test_source( - scenarios=[], # No predefined scenarios + scenarios="", # No predefined scenarios namespace=namespace, custom_scenarios=[preflight_scenario], ) @@ -574,12 +574,17 @@ def run_destination_smoke_test( # noqa: PLR0914 if name not in table_statistics } except Exception as readback_ex: + sanitized_readback = _sanitize_error(readback_ex) readback_msg = ( - f"Readback failed for destination '{destination.name}': " - f"{_sanitize_error(readback_ex)}" + f"Readback failed for destination '{destination.name}': {sanitized_readback}" ) readback_warnings.append(readback_msg) - logger.warning(readback_msg, exc_info=True) + logger.warning( + "Readback failed for destination '%s': %s", + destination.name, + sanitized_readback, + exc_info=True, + ) else: logger.info( "Skipping table and column statistics retrieval for " From 3a3aa2c9fb1f6cb2b8d796a8ed828c7d195985f0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 21:01:34 +0000 Subject: [PATCH 8/9] fix: Handle wrapper comparison for standard exceptions and harden log file reading Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 69c6d1da7..462f0efd7 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -291,7 +291,7 @@ def _extract_trace_error_from_log(ex: Exception) -> str | None: # Scan the log file in reverse for the last TRACE ERROR with an # internal_message. We only need the last one (the fatal error). try: - lines = log_file.read_text().splitlines() + lines = log_file.read_text(encoding="utf-8", errors="replace").splitlines() except OSError: return None @@ -346,7 +346,8 @@ def _sanitize_error(ex: Exception) -> str: else str(deepest) # Standard exceptions (ConnectionError, ValueError, etc.) ) # Only use the deep message if it's more specific than the wrapper - if deep_msg and hasattr(ex, "get_message") and deep_msg != ex.get_message(): + wrapper_msg = ex.get_message() if hasattr(ex, "get_message") else str(ex) + if deep_msg and deep_msg != wrapper_msg: return f"{type(ex).__name__}: {deep_msg}" # 3. Fall back to the top-level exception message. From e100ed25939e57facda9233257b2fe866df6cdfe Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 21:03:40 +0000 Subject: [PATCH 9/9] docs: Use Markdown single backticks in docstrings per project conventions Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 44 ++++++++++++------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 462f0efd7..8990d6021 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -266,15 +266,15 @@ def _prepare_destination_config( def _extract_trace_error_from_log(ex: Exception) -> str | None: - """Search the connector log file for a TRACE ERROR ``internal_message``. + """Search the connector log file for a TRACE ERROR `internal_message`. - Many Java-based connectors emit a generic user-facing ``message`` in + Many Java-based connectors emit a generic user-facing `message` in their TRACE ERROR output (e.g. "Something went wrong in the connector") - while the actionable detail lives in the ``internal_message`` field. - PyAirbyte only captures the ``message`` in the exception chain, so this - helper reads the connector's log file to recover the ``internal_message``. + while the actionable detail lives in the `internal_message` field. + PyAirbyte only captures the `message` in the exception chain, so this + helper reads the connector's log file to recover the `internal_message`. - Returns the ``internal_message`` string if found, or ``None``. + Returns the `internal_message` string if found, or `None`. """ # Walk the exception chain looking for a log_file path log_file: Path | None = None @@ -317,16 +317,16 @@ def _sanitize_error(ex: Exception) -> str: Resolution order (first match wins): - 1. **Connector log file** - The TRACE ERROR ``internal_message`` from the + 1. **Connector log file** - The TRACE ERROR `internal_message` from the connector's log is typically the most specific error available (e.g. - ``password authentication failed for user "bob"``). Many Java-based - connectors emit a generic user-facing ``message`` while the real - detail lives only in ``internal_message``. - 2. **Exception chain** - Walks ``original_exception`` to find the deepest + `password authentication failed for user "bob"`). Many Java-based + connectors emit a generic user-facing `message` while the real + detail lives only in `internal_message`. + 2. **Exception chain** - Walks `original_exception` to find the deepest exception with a message more specific than the top-level wrapper. - 3. **Top-level exception** - Falls back to ``get_message()`` or ``str()``. + 3. **Top-level exception** - Falls back to `get_message()` or `str()`. - Uses ``get_message()`` when available (PyAirbyte exceptions) to avoid + Uses `get_message()` when available (PyAirbyte exceptions) to avoid including full config/context in the error string. """ # 1. Try the connector log file first — it has the most detail. @@ -362,7 +362,7 @@ def _sanitize_error(ex: Exception) -> str: PREFLIGHT_STREAM_NAME = "_preflight_basic_types" """Stream name used by the preflight write. -This is deliberately different from the predefined ``basic_types`` stream +This is deliberately different from the predefined `basic_types` stream so that the preflight data lands in its own table and never collides with the main smoke-test run.""" @@ -370,12 +370,12 @@ def _sanitize_error(ex: Exception) -> str: def _build_preflight_scenario() -> dict[str, Any]: """Build the preflight custom scenario. - Returns a scenario dict that mirrors the predefined ``basic_types`` - scenario but with the stream name set to :data:`PREFLIGHT_STREAM_NAME` + Returns a scenario dict that mirrors the predefined `basic_types` + scenario but with the stream name set to `PREFLIGHT_STREAM_NAME` so the preflight data lands in its own table. The schema and records are defined inline to avoid a circular import - from :mod:`airbyte.cli.smoke_test_source._scenarios`. + from `airbyte.cli.smoke_test_source._scenarios`. """ return { "name": PREFLIGHT_STREAM_NAME, @@ -406,14 +406,14 @@ def _run_preflight( ) -> tuple[bool, str | None]: """Run the preflight connectivity check. - Writes a small dataset (based on the ``basic_types`` scenario) using a - dedicated stream name (:data:`PREFLIGHT_STREAM_NAME`) so the preflight + Writes a small dataset (based on the `basic_types` scenario) using a + dedicated stream name (`PREFLIGHT_STREAM_NAME`) so the preflight data never collides with the main smoke-test run. - Returns a tuple of ``(passed, error_message)``. + Returns a tuple of `(passed, error_message)`. - * ``(True, None)`` when the preflight write succeeded. - * ``(False, '')`` when the write failed. + * `(True, None)` when the preflight write succeeded. + * `(False, '')` when the write failed. Failures are logged but not raised so the caller can return a structured result that includes the actionable connector error.