From a2246018d9eae1cf88586ef7ec7b669843d9f777 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 19:23:55 +0000 Subject: [PATCH 1/5] feat(cli,mcp): add destination-smoke-test command and MCP tool Adds a new `pyab destination-smoke-test` CLI command and a corresponding `destination_smoke_test` MCP tool that send synthetic smoke test data to a destination connector and report success/failure. Both use the existing SourceSmokeTest (15 predefined scenarios) via PyAirbyte's Destination.write() orchestration. Supports: - Destination resolution (name, Docker image, local executable) - Destination config via file, inline YAML, or secret reference - Scenario filtering via --scenario-filter - Large batch inclusion via --include-large-batch - Custom scenario injection via --custom-scenarios (CLI) or custom_scenarios param (MCP) - Structured JSON output with success/failure, record counts, timing, errors Co-Authored-By: AJ Steers --- airbyte/cli/pyab.py | 209 +++++++++++++++++++++++++++++++++++++++++++ airbyte/mcp/local.py | 182 +++++++++++++++++++++++++++++++++++++ 2 files changed, 391 insertions(+) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index 0fa5f5d5c..f8d71e70f 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -60,8 +60,10 @@ from __future__ import annotations +import json import re import sys +import time from pathlib import Path from typing import TYPE_CHECKING, Any @@ -631,6 +633,212 @@ def sync( ) +def _get_smoke_test_source( + *, + scenario_filter: str | None = None, + include_large_batch: bool = False, + custom_scenarios: str | None = None, +) -> Source: + """Create a smoke test source with the given configuration. + + The smoke test source generates synthetic data across predefined scenarios + that cover common destination failure patterns. + + Args: + scenario_filter: Optional comma-separated list of scenario names to include. + If not provided, all fast (non-high-volume) scenarios are included. + include_large_batch: Whether to include the large_batch_stream scenario. + custom_scenarios: Optional path to a JSON or YAML file containing additional + scenario definitions. Each scenario should have 'name', 'json_schema', + and optionally 'records' and 'primary_key'. + """ + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": include_large_batch, + } + if scenario_filter: + source_config["scenario_filter"] = [s.strip() for s in scenario_filter.split(",")] + if custom_scenarios: + custom_path = Path(custom_scenarios) + if not custom_path.exists(): + raise PyAirbyteInputError( + message="Custom scenarios file not found.", + input_value=str(custom_path), + ) + loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8")) + if isinstance(loaded, list): + source_config["custom_scenarios"] = loaded + elif isinstance(loaded, dict) and "custom_scenarios" in loaded: + source_config["custom_scenarios"] = loaded["custom_scenarios"] + else: + raise PyAirbyteInputError( + message=( + "Custom scenarios file must contain a list of scenarios " + "or a dict with a 'custom_scenarios' key." + ), + input_value=str(custom_path), + ) + + return get_source( + name="source-smoke-test", + config=source_config, + streams="*", + local_executable="source-smoke-test", + ) + + +@click.command(name="destination-smoke-test") +@click.option( + "--destination", + type=str, + required=True, + help=( + "The destination connector to test. Can be a connector name " + "(e.g. 'destination-snowflake'), a Docker image with tag " + "(e.g. 'airbyte/destination-snowflake:3.14.0'), or a path to a local executable." + ), +) +@click.option( + "--config", + type=str, + help="The destination configuration. " + CONFIG_HELP, +) +@click.option( + "--pip-url", + type=str, + help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP, +) +@click.option( + "--use-python", + type=str, + help=( + "Python interpreter specification. Use 'true' for current Python, " + "'false' for Docker, a path for specific interpreter, or a version " + "string for uv-managed Python (e.g., '3.11', 'python3.12')." + ), +) +@click.option( + "--scenario-filter", + type=str, + default=None, + help=( + "Comma-separated list of smoke test scenario names to run. " + "If not provided, all fast (non-high-volume) scenarios are included. " + "Available scenarios include: basic_types_stream, timestamp_stream, " + "large_decimal_stream, nested_json_stream, null_values_stream, " + "column_name_edge_cases, table_name_with_dots, table_name_with_spaces, " + "CamelCaseStream, wide_table_50_cols, empty_stream, single_record_stream, " + "unicode_special_strings, no_pk_stream, long_column_names." + ), +) +@click.option( + "--include-large-batch", + is_flag=True, + default=False, + help="Include the large_batch_stream scenario (1000 records by default).", +) +@click.option( + "--custom-scenarios", + type=str, + default=None, + help=( + "Path to a JSON or YAML file containing additional custom test scenarios. " + "Each scenario should define 'name', 'json_schema', and optionally 'records' " + "and 'primary_key'. These are unioned with the predefined scenarios." + ), +) +def destination_smoke_test( + *, + destination: str, + config: str | None = None, + pip_url: str | None = None, + use_python: str | None = None, + scenario_filter: str | None = None, + include_large_batch: bool = False, + custom_scenarios: str | None = None, +) -> None: + r"""Run smoke tests against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and reports success or failure. The smoke test source + generates data across predefined scenarios covering common destination + failure patterns: type variations, null handling, naming edge cases, + schema variations, and batch sizes. + + This command does NOT read back data from the destination or compare + results. It only verifies that the destination accepts the data without + errors. + + Examples: + \b + # Test with a Docker destination: + pyab destination-smoke-test \\ + --destination=airbyte/destination-dev-null:latest + + \b + # Test with config file: + pyab destination-smoke-test \\ + --destination=destination-snowflake \\ + --config=./secrets/snowflake.json + + \b + # Run only specific scenarios: + pyab destination-smoke-test \\ + --destination=destination-motherduck \\ + --config='{motherduck_api_key: "SECRET:MOTHERDUCK_API_KEY"}' \\ + --scenario-filter=basic_types_stream,null_values_stream + """ + click.echo("Resolving destination...", file=sys.stderr) + destination_obj = _resolve_destination_job( + destination=destination, + config=config, + pip_url=pip_url, + use_python=use_python, + ) + + click.echo("Creating smoke test source...", file=sys.stderr) + source_obj = _get_smoke_test_source( + scenario_filter=scenario_filter, + include_large_batch=include_large_batch, + custom_scenarios=custom_scenarios, + ) + + click.echo("Running destination smoke test...", file=sys.stderr) + start_time = time.monotonic() + success = False + error_message: str | None = None + records_delivered = 0 + try: + write_result = destination_obj.write( + source_data=source_obj, + cache=False, + state_cache=False, + ) + records_delivered = write_result.processed_records + success = True + except Exception as ex: + error_message = str(ex) + click.echo(f"Smoke test FAILED: {ex}", file=sys.stderr) + + elapsed = time.monotonic() - start_time + + result = { + "success": success, + "destination": _get_connector_name(destination), + "records_delivered": records_delivered, + "scenarios_requested": scenario_filter or "all_fast", + "include_large_batch": include_large_batch, + "elapsed_seconds": round(elapsed, 2), + } + if error_message: + result["error"] = error_message + + click.echo(json.dumps(result, indent=2)) + + if not success: + sys.exit(1) + + @click.group() def cli() -> None: """@private PyAirbyte CLI.""" @@ -640,6 +848,7 @@ def cli() -> None: cli.add_command(validate) cli.add_command(benchmark) cli.add_command(sync) +cli.add_command(destination_smoke_test) if __name__ == "__main__": cli() diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 56216ff95..85ea36b18 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -2,6 +2,7 @@ """Local MCP operations.""" import sys +import time import traceback from itertools import islice from pathlib import Path @@ -14,6 +15,7 @@ from airbyte import get_source from airbyte._util.meta import is_docker_installed from airbyte.caches.util import get_default_cache +from airbyte.destinations.util import get_destination from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings from airbyte.registry import get_connector_metadata from airbyte.secrets.config import _get_secret_sources @@ -804,6 +806,186 @@ def run_sql_query( del cache # Ensure the cache is closed properly +class DestinationSmokeTestResult(BaseModel): + """Result of a destination smoke test run.""" + + success: bool + """Whether the smoke test passed (destination accepted all data without errors).""" + destination: str + """The destination connector name.""" + records_delivered: int + """Total number of records delivered to the destination.""" + scenarios_requested: str + """Which scenarios were requested ('all_fast' or a comma-separated filter).""" + include_large_batch: bool + """Whether the large_batch_stream scenario was included.""" + elapsed_seconds: float + """Time taken for the smoke test in seconds.""" + error: str | None = None + """Error message if the smoke test failed.""" + + +@mcp_tool( + destructive=False, +) +def destination_smoke_test( + destination_connector_name: Annotated[ + str, + Field( + description=( + "The name of the destination connector to test " + "(e.g. 'destination-snowflake', 'destination-motherduck')." + ), + ), + ], + config: Annotated[ + dict | str | None, + Field( + description=( + "The destination configuration as a dict object or JSON string. " + "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." + ), + default=None, + ), + ], + config_file: Annotated[ + str | Path | None, + Field( + description="Path to a YAML or JSON file containing the destination configuration.", + default=None, + ), + ], + config_secret_name: Annotated[ + str | None, + Field( + description="The name of the secret containing the destination configuration.", + default=None, + ), + ], + scenario_filter: Annotated[ + list[str] | str | None, + Field( + description=( + "Specific scenario names to run. If not provided, all fast (non-high-volume) " + "scenarios are included. Can be a list of names or a comma-separated string." + ), + default=None, + ), + ], + include_large_batch: Annotated[ + bool, + Field( + description="Whether to include the large_batch_stream scenario (1000 records).", + default=False, + ), + ], + custom_scenarios: Annotated[ + list[dict[str, Any]] | None, + Field( + description=( + "Additional custom test scenarios to inject. Each scenario should define " + "'name', 'json_schema', and optionally 'records' and 'primary_key'. " + "These are unioned with the predefined scenarios." + ), + default=None, + ), + ], + docker_image: Annotated[ + str | None, + Field( + description=( + "Optional Docker image override for the destination connector " + "(e.g. 'airbyte/destination-snowflake:3.14.0')." + ), + default=None, + ), + ], +) -> DestinationSmokeTestResult: + """Run smoke tests against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and reports success or failure. The smoke test source generates + data across predefined scenarios covering common destination failure patterns: + type variations, null handling, naming edge cases, schema variations, and + batch sizes. + + This tool does NOT read back data from the destination or compare results. + It only verifies that the destination accepts the data without errors. + """ + # Resolve destination config + config_dict = resolve_connector_config( + config=config, + config_file=config_file, + config_secret_name=config_secret_name, + ) + + # Set up destination + destination_kwargs: dict[str, Any] = { + "name": destination_connector_name, + "config": config_dict, + } + if docker_image: + destination_kwargs["docker_image"] = docker_image + elif is_docker_installed(): + destination_kwargs["docker_image"] = True + + destination_obj = get_destination(**destination_kwargs) + + # Set up smoke test source config + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": include_large_batch, + } + if scenario_filter: + resolved_filter = resolve_list_of_strings(scenario_filter) + if resolved_filter: + source_config["scenario_filter"] = resolved_filter + if custom_scenarios: + source_config["custom_scenarios"] = custom_scenarios + + source_obj = get_source( + name="source-smoke-test", + config=source_config, + streams="*", + local_executable="source-smoke-test", + ) + + # Run the smoke test + start_time = time.monotonic() + success = False + error_message: str | None = None + records_delivered = 0 + try: + write_result = destination_obj.write( + source_data=source_obj, + cache=False, + state_cache=False, + ) + records_delivered = write_result.processed_records + success = True + except Exception as ex: + error_message = str(ex) + + elapsed = time.monotonic() - start_time + + scenarios_str: str + if scenario_filter: + resolved = resolve_list_of_strings(scenario_filter) + scenarios_str = ",".join(resolved) if resolved else "all_fast" + else: + scenarios_str = "all_fast" + + return DestinationSmokeTestResult( + success=success, + destination=destination_connector_name, + records_delivered=records_delivered, + scenarios_requested=scenarios_str, + include_large_batch=include_large_batch, + elapsed_seconds=round(elapsed, 2), + error=error_message, + ) + + def register_local_tools(app: FastMCP) -> None: """Register local tools with the FastMCP app. From e97743d0b546d927015e05bdd4259db2ac49e7e9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 19:35:09 +0000 Subject: [PATCH 2/5] fix: address review comments - rename to --scenarios, markdown docstrings, sanitize errors, destructive=True Co-Authored-By: AJ Steers --- airbyte/cli/pyab.py | 90 ++++++++++++++++++-------------------------- airbyte/mcp/local.py | 48 +++++++++++------------ 2 files changed, 59 insertions(+), 79 deletions(-) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index f8d71e70f..42563f64e 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -635,8 +635,7 @@ def sync( def _get_smoke_test_source( *, - scenario_filter: str | None = None, - include_large_batch: bool = False, + scenarios: str = "all", custom_scenarios: str | None = None, ) -> Source: """Create a smoke test source with the given configuration. @@ -644,20 +643,20 @@ def _get_smoke_test_source( The smoke test source generates synthetic data across predefined scenarios that cover common destination failure patterns. - Args: - scenario_filter: Optional comma-separated list of scenario names to include. - If not provided, all fast (non-high-volume) scenarios are included. - include_large_batch: Whether to include the large_batch_stream scenario. - custom_scenarios: Optional path to a JSON or YAML file containing additional - scenario definitions. Each scenario should have 'name', 'json_schema', - and optionally 'records' and 'primary_key'. + `scenarios` controls which scenarios to run: `'all'` runs all scenarios + (including large batch), or provide a comma-separated list of scenario names. + + `custom_scenarios` is an optional path to a JSON or YAML file containing additional + scenario definitions. Each scenario should have `name`, `json_schema`, + and optionally `records` and `primary_key`. """ + is_all = scenarios.strip().lower() == "all" source_config: dict[str, Any] = { - "all_fast_streams": True, - "all_slow_streams": include_large_batch, + "all_fast_streams": is_all, + "all_slow_streams": is_all, } - if scenario_filter: - source_config["scenario_filter"] = [s.strip() for s in scenario_filter.split(",")] + if not is_all: + source_config["scenario_filter"] = [s.strip() for s in scenarios.split(",") if s.strip()] if custom_scenarios: custom_path = Path(custom_scenarios) if not custom_path.exists(): @@ -718,25 +717,20 @@ def _get_smoke_test_source( ), ) @click.option( - "--scenario-filter", + "--scenarios", type=str, - default=None, + default="all", help=( - "Comma-separated list of smoke test scenario names to run. " - "If not provided, all fast (non-high-volume) scenarios are included. " - "Available scenarios include: basic_types_stream, timestamp_stream, " + "Which smoke test scenarios to run. Use 'all' (default) to run every " + "predefined scenario including large batch, or provide a comma-separated " + "list of scenario names. " + "Available scenarios: basic_types_stream, timestamp_stream, " "large_decimal_stream, nested_json_stream, null_values_stream, " "column_name_edge_cases, table_name_with_dots, table_name_with_spaces, " "CamelCaseStream, wide_table_50_cols, empty_stream, single_record_stream, " - "unicode_special_strings, no_pk_stream, long_column_names." + "unicode_special_strings, no_pk_stream, long_column_names, large_batch_stream." ), ) -@click.option( - "--include-large-batch", - is_flag=True, - default=False, - help="Include the large_batch_stream scenario (1000 records by default).", -) @click.option( "--custom-scenarios", type=str, @@ -753,11 +747,10 @@ def destination_smoke_test( config: str | None = None, pip_url: str | None = None, use_python: str | None = None, - scenario_filter: str | None = None, - include_large_batch: bool = False, + scenarios: str = "all", custom_scenarios: str | None = None, ) -> None: - r"""Run smoke tests against a destination connector. + """Run smoke tests against a destination connector. Sends synthetic test data from the smoke test source to the specified destination and reports success or failure. The smoke test source @@ -769,24 +762,15 @@ def destination_smoke_test( results. It only verifies that the destination accepts the data without errors. - Examples: - \b - # Test with a Docker destination: - pyab destination-smoke-test \\ - --destination=airbyte/destination-dev-null:latest - - \b - # Test with config file: - pyab destination-smoke-test \\ - --destination=destination-snowflake \\ - --config=./secrets/snowflake.json - - \b - # Run only specific scenarios: - pyab destination-smoke-test \\ - --destination=destination-motherduck \\ - --config='{motherduck_api_key: "SECRET:MOTHERDUCK_API_KEY"}' \\ - --scenario-filter=basic_types_stream,null_values_stream + Usage examples: + + `pyab destination-smoke-test --destination=destination-dev-null` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json` + + `pyab destination-smoke-test --destination=destination-motherduck + --scenarios=basic_types_stream,null_values_stream` """ click.echo("Resolving destination...", file=sys.stderr) destination_obj = _resolve_destination_job( @@ -798,8 +782,7 @@ def destination_smoke_test( click.echo("Creating smoke test source...", file=sys.stderr) source_obj = _get_smoke_test_source( - scenario_filter=scenario_filter, - include_large_batch=include_large_batch, + scenarios=scenarios, custom_scenarios=custom_scenarios, ) @@ -817,17 +800,18 @@ def destination_smoke_test( records_delivered = write_result.processed_records success = True except Exception as ex: - error_message = str(ex) - click.echo(f"Smoke test FAILED: {ex}", file=sys.stderr) + error_message = ( + f"{type(ex).__name__}: {ex.get_message() if hasattr(ex, 'get_message') else ex}" + ) + click.echo(f"Smoke test FAILED: {error_message}", file=sys.stderr) elapsed = time.monotonic() - start_time result = { "success": success, - "destination": _get_connector_name(destination), + "destination": destination_obj.name, "records_delivered": records_delivered, - "scenarios_requested": scenario_filter or "all_fast", - "include_large_batch": include_large_batch, + "scenarios_requested": scenarios, "elapsed_seconds": round(elapsed, 2), } if error_message: diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 85ea36b18..50d180cbc 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -816,9 +816,7 @@ class DestinationSmokeTestResult(BaseModel): records_delivered: int """Total number of records delivered to the destination.""" scenarios_requested: str - """Which scenarios were requested ('all_fast' or a comma-separated filter).""" - include_large_batch: bool - """Whether the large_batch_stream scenario was included.""" + """Which scenarios were requested ('all' or a comma-separated list).""" elapsed_seconds: float """Time taken for the smoke test in seconds.""" error: str | None = None @@ -826,7 +824,7 @@ class DestinationSmokeTestResult(BaseModel): @mcp_tool( - destructive=False, + destructive=True, ) def destination_smoke_test( destination_connector_name: Annotated[ @@ -862,21 +860,15 @@ def destination_smoke_test( default=None, ), ], - scenario_filter: Annotated[ - list[str] | str | None, + scenarios: Annotated[ + list[str] | str, Field( description=( - "Specific scenario names to run. If not provided, all fast (non-high-volume) " - "scenarios are included. Can be a list of names or a comma-separated string." + "Which scenarios to run. Use 'all' (default) to run every predefined " + "scenario including large batch, or provide a list of scenario names " + "or a comma-separated string." ), - default=None, - ), - ], - include_large_batch: Annotated[ - bool, - Field( - description="Whether to include the large_batch_stream scenario (1000 records).", - default=False, + default="all", ), ], custom_scenarios: Annotated[ @@ -932,12 +924,13 @@ def destination_smoke_test( destination_obj = get_destination(**destination_kwargs) # Set up smoke test source config + is_all = isinstance(scenarios, str) and scenarios.strip().lower() == "all" source_config: dict[str, Any] = { - "all_fast_streams": True, - "all_slow_streams": include_large_batch, + "all_fast_streams": is_all, + "all_slow_streams": is_all, } - if scenario_filter: - resolved_filter = resolve_list_of_strings(scenario_filter) + if not is_all: + resolved_filter = resolve_list_of_strings(scenarios) if resolved_filter: source_config["scenario_filter"] = resolved_filter if custom_scenarios: @@ -964,23 +957,26 @@ def destination_smoke_test( records_delivered = write_result.processed_records success = True except Exception as ex: - error_message = str(ex) + error_message = ( + f"{type(ex).__name__}: {ex.get_message()}" + if hasattr(ex, "get_message") + else f"{type(ex).__name__}: {ex}" + ) elapsed = time.monotonic() - start_time scenarios_str: str - if scenario_filter: - resolved = resolve_list_of_strings(scenario_filter) - scenarios_str = ",".join(resolved) if resolved else "all_fast" + if is_all: + scenarios_str = "all" else: - scenarios_str = "all_fast" + resolved = resolve_list_of_strings(scenarios) + scenarios_str = ",".join(resolved) if resolved else "all" return DestinationSmokeTestResult( success=success, destination=destination_connector_name, records_delivered=records_delivered, scenarios_requested=scenarios_str, - include_large_batch=include_large_batch, elapsed_seconds=round(elapsed, 2), error=error_message, ) From 93f021aaead7042e2db3a4f48e7311a8a781a3e1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 19:43:08 +0000 Subject: [PATCH 3/5] refactor: extract shared module airbyte/_util/destination_smoke_tests.py - Move DestinationSmokeTestResult, get_smoke_test_source, run_destination_smoke_test into shared module used by both CLI and MCP - Fix incorrect scenario names in CLI help text (bug: names didn't match actual scenarios) - Remove duplicated orchestration logic from CLI and MCP - CLI and MCP are now thin presentation layers over the shared implementation Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 199 +++++++++++++++++++++++ airbyte/cli/pyab.py | 112 ++----------- airbyte/mcp/local.py | 87 ++-------- 3 files changed, 229 insertions(+), 169 deletions(-) create mode 100644 airbyte/_util/destination_smoke_tests.py diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py new file mode 100644 index 000000000..91c7441c8 --- /dev/null +++ b/airbyte/_util/destination_smoke_tests.py @@ -0,0 +1,199 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Shared implementation for destination smoke tests. + +This module provides the core logic for running smoke tests against destination +connectors. It is used by both the CLI (`pyab destination-smoke-test`) and the +MCP tool (`destination_smoke_test`). + +Smoke tests send synthetic data from the built-in smoke test source to a +destination connector and report whether the destination accepted the data +without errors. No readback or comparison is performed. +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import yaml +from pydantic import BaseModel + +from airbyte import get_source +from airbyte.exceptions import PyAirbyteInputError + + +if TYPE_CHECKING: + from airbyte.destinations.base import Destination + from airbyte.sources.base import Source + + +class DestinationSmokeTestResult(BaseModel): + """Result of a destination smoke test run.""" + + success: bool + """Whether the smoke test passed (destination accepted all data without errors).""" + + destination: str + """The destination connector name.""" + + records_delivered: int + """Total number of records delivered to the destination.""" + + scenarios_requested: str + """Which scenarios were requested ('all' or a comma-separated list).""" + + elapsed_seconds: float + """Time taken for the smoke test in seconds.""" + + error: str | None = None + """Error message if the smoke test failed.""" + + +def get_smoke_test_source( + *, + scenarios: str | list[str] = "all", + custom_scenarios: list[dict[str, Any]] | None = None, + custom_scenarios_file: str | None = None, +) -> Source: + """Create a smoke test source with the given configuration. + + The smoke test source generates synthetic data across predefined scenarios + that cover common destination failure patterns. + + `scenarios` controls which scenarios to run: `'all'` runs all scenarios + (including large batch), or provide a comma-separated string or list of + scenario names. + + `custom_scenarios` is an optional list of scenario dicts to inject directly. + + `custom_scenarios_file` is an optional path to a JSON or YAML file containing + additional scenario definitions. Each scenario should have `name`, `json_schema`, + and optionally `records` and `primary_key`. + """ + scenarios_str = ",".join(scenarios) if isinstance(scenarios, list) else scenarios + is_all = scenarios_str.strip().lower() == "all" + source_config: dict[str, Any] = { + "all_fast_streams": is_all, + "all_slow_streams": is_all, + } + if not is_all: + if isinstance(scenarios, list): + source_config["scenario_filter"] = [s.strip() for s in scenarios if s.strip()] + else: + source_config["scenario_filter"] = [ + s.strip() for s in scenarios.split(",") if s.strip() + ] + + # Handle custom scenarios passed as a list of dicts (MCP path) + if custom_scenarios: + source_config["custom_scenarios"] = custom_scenarios + + # Handle custom scenarios from a file path (CLI path) + if custom_scenarios_file: + custom_path = Path(custom_scenarios_file) + if not custom_path.exists(): + raise PyAirbyteInputError( + message="Custom scenarios file not found.", + input_value=str(custom_path), + ) + loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8")) + if isinstance(loaded, list): + file_scenarios = loaded + elif isinstance(loaded, dict) and "custom_scenarios" in loaded: + file_scenarios = loaded["custom_scenarios"] + else: + raise PyAirbyteInputError( + message=( + "Custom scenarios file must contain a list of scenarios " + "or a dict with a 'custom_scenarios' key." + ), + input_value=str(custom_path), + ) + # Merge with any directly-provided custom scenarios + existing = source_config.get("custom_scenarios", []) + source_config["custom_scenarios"] = existing + file_scenarios + + return get_source( + name="source-smoke-test", + config=source_config, + streams="*", + local_executable="source-smoke-test", + ) + + +def _sanitize_error(ex: Exception) -> str: + """Extract an error message from an exception without leaking secrets. + + Uses `get_message()` when available (PyAirbyte exceptions) to avoid + including full config/context in the error string. + """ + if hasattr(ex, "get_message"): + return f"{type(ex).__name__}: {ex.get_message()}" + return f"{type(ex).__name__}: {ex}" + + +def run_destination_smoke_test( + *, + destination: Destination, + scenarios: str | list[str] = "all", + custom_scenarios: list[dict[str, Any]] | None = None, + custom_scenarios_file: str | None = None, +) -> DestinationSmokeTestResult: + """Run a smoke test against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and returns a structured result. + + This function does NOT read back data from the destination or compare + results. It only verifies that the destination accepts the data without + errors. + + `destination` is a resolved `Destination` object ready for writing. + + `scenarios` controls which predefined scenarios to run: `'all'` runs + every scenario (including large batch), or provide a comma-separated + string or list of specific scenario names. + + `custom_scenarios` is an optional list of scenario dicts to inject. + + `custom_scenarios_file` is an optional path to a JSON/YAML file with + additional scenario definitions. + """ + source_obj = get_smoke_test_source( + scenarios=scenarios, + custom_scenarios=custom_scenarios, + custom_scenarios_file=custom_scenarios_file, + ) + + # Normalize scenarios to a display string + if isinstance(scenarios, list): + scenarios_str = ",".join(scenarios) if scenarios else "all" + else: + scenarios_str = scenarios + + start_time = time.monotonic() + success = False + error_message: str | None = None + records_delivered = 0 + try: + write_result = destination.write( + source_data=source_obj, + cache=False, + state_cache=False, + ) + records_delivered = write_result.processed_records + success = True + except Exception as ex: + error_message = _sanitize_error(ex) + + elapsed = time.monotonic() - start_time + + return DestinationSmokeTestResult( + success=success, + destination=destination.name, + records_delivered=records_delivered, + scenarios_requested=scenarios_str, + elapsed_seconds=round(elapsed, 2), + error=error_message, + ) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index 42563f64e..c79286224 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -63,13 +63,13 @@ import json import re import sys -import time from pathlib import Path from typing import TYPE_CHECKING, Any import click import yaml +from airbyte._util.destination_smoke_tests import run_destination_smoke_test from airbyte.destinations.util import get_destination, get_noop_destination from airbyte.exceptions import PyAirbyteInputError from airbyte.secrets.util import get_secret @@ -633,59 +633,6 @@ def sync( ) -def _get_smoke_test_source( - *, - scenarios: str = "all", - custom_scenarios: str | None = None, -) -> Source: - """Create a smoke test source with the given configuration. - - The smoke test source generates synthetic data across predefined scenarios - that cover common destination failure patterns. - - `scenarios` controls which scenarios to run: `'all'` runs all scenarios - (including large batch), or provide a comma-separated list of scenario names. - - `custom_scenarios` is an optional path to a JSON or YAML file containing additional - scenario definitions. Each scenario should have `name`, `json_schema`, - and optionally `records` and `primary_key`. - """ - is_all = scenarios.strip().lower() == "all" - source_config: dict[str, Any] = { - "all_fast_streams": is_all, - "all_slow_streams": is_all, - } - if not is_all: - source_config["scenario_filter"] = [s.strip() for s in scenarios.split(",") if s.strip()] - if custom_scenarios: - custom_path = Path(custom_scenarios) - if not custom_path.exists(): - raise PyAirbyteInputError( - message="Custom scenarios file not found.", - input_value=str(custom_path), - ) - loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8")) - if isinstance(loaded, list): - source_config["custom_scenarios"] = loaded - elif isinstance(loaded, dict) and "custom_scenarios" in loaded: - source_config["custom_scenarios"] = loaded["custom_scenarios"] - else: - raise PyAirbyteInputError( - message=( - "Custom scenarios file must contain a list of scenarios " - "or a dict with a 'custom_scenarios' key." - ), - input_value=str(custom_path), - ) - - return get_source( - name="source-smoke-test", - config=source_config, - streams="*", - local_executable="source-smoke-test", - ) - - @click.command(name="destination-smoke-test") @click.option( "--destination", @@ -724,11 +671,12 @@ def _get_smoke_test_source( "Which smoke test scenarios to run. Use 'all' (default) to run every " "predefined scenario including large batch, or provide a comma-separated " "list of scenario names. " - "Available scenarios: basic_types_stream, timestamp_stream, " - "large_decimal_stream, nested_json_stream, null_values_stream, " - "column_name_edge_cases, table_name_with_dots, table_name_with_spaces, " - "CamelCaseStream, wide_table_50_cols, empty_stream, single_record_stream, " - "unicode_special_strings, no_pk_stream, long_column_names, large_batch_stream." + "Available scenarios: basic_types, timestamp_types, " + "large_decimals_and_numbers, nested_json_objects, null_handling, " + "column_naming_edge_cases, table_naming_edge_cases, " + "CamelCaseStreamName, wide_table_50_columns, empty_stream, " + "single_record_stream, unicode_and_special_strings, " + "schema_with_no_primary_key, long_column_names, large_batch_stream." ), ) @click.option( @@ -770,7 +718,7 @@ def destination_smoke_test( --config=./secrets/snowflake.json` `pyab destination-smoke-test --destination=destination-motherduck - --scenarios=basic_types_stream,null_values_stream` + --scenarios=basic_types,null_handling` """ click.echo("Resolving destination...", file=sys.stderr) destination_obj = _resolve_destination_job( @@ -780,46 +728,18 @@ def destination_smoke_test( use_python=use_python, ) - click.echo("Creating smoke test source...", file=sys.stderr) - source_obj = _get_smoke_test_source( + click.echo("Running destination smoke test...", file=sys.stderr) + result = run_destination_smoke_test( + destination=destination_obj, scenarios=scenarios, - custom_scenarios=custom_scenarios, + custom_scenarios_file=custom_scenarios, ) - click.echo("Running destination smoke test...", file=sys.stderr) - start_time = time.monotonic() - success = False - error_message: str | None = None - records_delivered = 0 - try: - write_result = destination_obj.write( - source_data=source_obj, - cache=False, - state_cache=False, - ) - records_delivered = write_result.processed_records - success = True - except Exception as ex: - error_message = ( - f"{type(ex).__name__}: {ex.get_message() if hasattr(ex, 'get_message') else ex}" - ) - click.echo(f"Smoke test FAILED: {error_message}", file=sys.stderr) - - elapsed = time.monotonic() - start_time - - result = { - "success": success, - "destination": destination_obj.name, - "records_delivered": records_delivered, - "scenarios_requested": scenarios, - "elapsed_seconds": round(elapsed, 2), - } - if error_message: - result["error"] = error_message - - click.echo(json.dumps(result, indent=2)) + click.echo(json.dumps(result.model_dump(), indent=2)) - if not success: + if not result.success: + if result.error: + click.echo(f"Smoke test FAILED: {result.error}", file=sys.stderr) sys.exit(1) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 50d180cbc..85d5675a4 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -2,7 +2,6 @@ """Local MCP operations.""" import sys -import time import traceback from itertools import islice from pathlib import Path @@ -13,6 +12,10 @@ from pydantic import BaseModel, Field from airbyte import get_source +from airbyte._util.destination_smoke_tests import ( + DestinationSmokeTestResult, + run_destination_smoke_test, +) from airbyte._util.meta import is_docker_installed from airbyte.caches.util import get_default_cache from airbyte.destinations.util import get_destination @@ -806,23 +809,6 @@ def run_sql_query( del cache # Ensure the cache is closed properly -class DestinationSmokeTestResult(BaseModel): - """Result of a destination smoke test run.""" - - success: bool - """Whether the smoke test passed (destination accepted all data without errors).""" - destination: str - """The destination connector name.""" - records_delivered: int - """Total number of records delivered to the destination.""" - scenarios_requested: str - """Which scenarios were requested ('all' or a comma-separated list).""" - elapsed_seconds: float - """Time taken for the smoke test in seconds.""" - error: str | None = None - """Error message if the smoke test failed.""" - - @mcp_tool( destructive=True, ) @@ -923,62 +909,17 @@ def destination_smoke_test( destination_obj = get_destination(**destination_kwargs) - # Set up smoke test source config - is_all = isinstance(scenarios, str) and scenarios.strip().lower() == "all" - source_config: dict[str, Any] = { - "all_fast_streams": is_all, - "all_slow_streams": is_all, - } - if not is_all: - resolved_filter = resolve_list_of_strings(scenarios) - if resolved_filter: - source_config["scenario_filter"] = resolved_filter - if custom_scenarios: - source_config["custom_scenarios"] = custom_scenarios - - source_obj = get_source( - name="source-smoke-test", - config=source_config, - streams="*", - local_executable="source-smoke-test", - ) - - # Run the smoke test - start_time = time.monotonic() - success = False - error_message: str | None = None - records_delivered = 0 - try: - write_result = destination_obj.write( - source_data=source_obj, - cache=False, - state_cache=False, - ) - records_delivered = write_result.processed_records - success = True - except Exception as ex: - error_message = ( - f"{type(ex).__name__}: {ex.get_message()}" - if hasattr(ex, "get_message") - else f"{type(ex).__name__}: {ex}" - ) - - elapsed = time.monotonic() - start_time - - scenarios_str: str - if is_all: - scenarios_str = "all" + # Resolve scenarios for the shared helper + resolved_scenarios: str | list[str] + if isinstance(scenarios, str): + resolved_scenarios = scenarios else: - resolved = resolve_list_of_strings(scenarios) - scenarios_str = ",".join(resolved) if resolved else "all" - - return DestinationSmokeTestResult( - success=success, - destination=destination_connector_name, - records_delivered=records_delivered, - scenarios_requested=scenarios_str, - elapsed_seconds=round(elapsed, 2), - error=error_message, + resolved_scenarios = resolve_list_of_strings(scenarios) or "all" + + return run_destination_smoke_test( + destination=destination_obj, + scenarios=resolved_scenarios, + custom_scenarios=custom_scenarios, ) From c725831825dcc2e88da54d9a25222d30f76003e5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 19:51:17 +0000 Subject: [PATCH 4/5] feat: use 'fast' and 'all' as scenario keywords, default to 'fast' Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 52 +++++++++++++++++------- airbyte/cli/pyab.py | 14 ++++--- airbyte/mcp/local.py | 7 ++-- 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 91c7441c8..28995d226 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -52,7 +52,7 @@ class DestinationSmokeTestResult(BaseModel): def get_smoke_test_source( *, - scenarios: str | list[str] = "all", + scenarios: str | list[str] = "fast", custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> Source: @@ -61,9 +61,12 @@ def get_smoke_test_source( The smoke test source generates synthetic data across predefined scenarios that cover common destination failure patterns. - `scenarios` controls which scenarios to run: `'all'` runs all scenarios - (including large batch), or provide a comma-separated string or list of - scenario names. + `scenarios` controls which scenarios to run: + + - `'fast'` (default): runs all fast (non-high-volume) predefined scenarios, + excluding `large_batch_stream`. + - `'all'`: runs every predefined scenario including `large_batch_stream`. + - A comma-separated string or list of specific scenario names. `custom_scenarios` is an optional list of scenario dicts to inject directly. @@ -71,13 +74,30 @@ def get_smoke_test_source( additional scenario definitions. Each scenario should have `name`, `json_schema`, and optionally `records` and `primary_key`. """ + # Normalize empty list to "fast" (default) + if isinstance(scenarios, list) and not scenarios: + scenarios = "fast" + scenarios_str = ",".join(scenarios) if isinstance(scenarios, list) else scenarios - is_all = scenarios_str.strip().lower() == "all" - source_config: dict[str, Any] = { - "all_fast_streams": is_all, - "all_slow_streams": is_all, - } - if not is_all: + keyword = scenarios_str.strip().lower() + is_all = keyword == "all" + is_fast = keyword == "fast" + + if is_all: + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": True, + } + elif is_fast: + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": False, + } + else: + source_config: dict[str, Any] = { + "all_fast_streams": False, + "all_slow_streams": False, + } if isinstance(scenarios, list): source_config["scenario_filter"] = [s.strip() for s in scenarios if s.strip()] else: @@ -136,7 +156,7 @@ def _sanitize_error(ex: Exception) -> str: def run_destination_smoke_test( *, destination: Destination, - scenarios: str | list[str] = "all", + scenarios: str | list[str] = "fast", custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> DestinationSmokeTestResult: @@ -151,9 +171,11 @@ def run_destination_smoke_test( `destination` is a resolved `Destination` object ready for writing. - `scenarios` controls which predefined scenarios to run: `'all'` runs - every scenario (including large batch), or provide a comma-separated - string or list of specific scenario names. + `scenarios` controls which predefined scenarios to run: + + - `'fast'` (default): runs all fast (non-high-volume) predefined scenarios. + - `'all'`: runs every scenario including `large_batch_stream`. + - A comma-separated string or list of specific scenario names. `custom_scenarios` is an optional list of scenario dicts to inject. @@ -168,7 +190,7 @@ def run_destination_smoke_test( # Normalize scenarios to a display string if isinstance(scenarios, list): - scenarios_str = ",".join(scenarios) if scenarios else "all" + scenarios_str = ",".join(scenarios) if scenarios else "fast" else: scenarios_str = scenarios diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index c79286224..b7d0abd80 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -666,11 +666,12 @@ def sync( @click.option( "--scenarios", type=str, - default="all", + default="fast", help=( - "Which smoke test scenarios to run. Use 'all' (default) to run every " - "predefined scenario including large batch, or provide a comma-separated " - "list of scenario names. " + "Which smoke test scenarios to run. " + "Use 'fast' (default) for all fast predefined scenarios " + "(excludes large_batch_stream), 'all' for every predefined scenario " + "including large batch, or provide a comma-separated list of scenario names. " "Available scenarios: basic_types, timestamp_types, " "large_decimals_and_numbers, nested_json_objects, null_handling, " "column_naming_edge_cases, table_naming_edge_cases, " @@ -695,7 +696,7 @@ def destination_smoke_test( config: str | None = None, pip_url: str | None = None, use_python: str | None = None, - scenarios: str = "all", + scenarios: str = "fast", custom_scenarios: str | None = None, ) -> None: """Run smoke tests against a destination connector. @@ -719,6 +720,9 @@ def destination_smoke_test( `pyab destination-smoke-test --destination=destination-motherduck --scenarios=basic_types,null_handling` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json --scenarios=all` """ click.echo("Resolving destination...", file=sys.stderr) destination_obj = _resolve_destination_job( diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 85d5675a4..4f8d23aac 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -850,11 +850,12 @@ def destination_smoke_test( list[str] | str, Field( description=( - "Which scenarios to run. Use 'all' (default) to run every predefined " + "Which scenarios to run. Use 'fast' (default) for all fast predefined " + "scenarios (excludes large_batch_stream), 'all' for every predefined " "scenario including large batch, or provide a list of scenario names " "or a comma-separated string." ), - default="all", + default="fast", ), ], custom_scenarios: Annotated[ @@ -914,7 +915,7 @@ def destination_smoke_test( if isinstance(scenarios, str): resolved_scenarios = scenarios else: - resolved_scenarios = resolve_list_of_strings(scenarios) or "all" + resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" return run_destination_smoke_test( destination=destination_obj, From f624c99beac93078760b9e9ef5fe461a19eaed6a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 20:08:31 +0000 Subject: [PATCH 5/5] fix(smoke-test-source): emit stream status TRACE messages for JDK destinations Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 33 +++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 5ed3f2a75..76a340877 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -25,10 +25,15 @@ AirbyteMessage, AirbyteRecordMessage, AirbyteStream, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, + StreamDescriptor, SyncMode, + TraceType, Type, ) from airbyte_cdk.sources.source import Source @@ -262,8 +267,7 @@ def _validate_custom_scenarios( return f"Custom scenario at index {i} is missing 'name'." if not isinstance(scenario.get("json_schema"), dict): return ( - f"Custom scenario '{scenario['name']}' must provide " - "'json_schema' as an object." + f"Custom scenario '{scenario['name']}' must provide 'json_schema' as an object." ) if "records" in scenario: if not isinstance(scenario["records"], list): @@ -320,6 +324,24 @@ def discover( logger.info(f"Discovered {len(streams)} smoke test streams.") return AirbyteCatalog(streams=streams) + def _stream_status_message( + self, + stream_name: str, + status: AirbyteStreamStatus, + ) -> AirbyteMessage: + """Build an AirbyteMessage containing a stream status trace.""" + return AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=time.time() * 1000, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name=stream_name), + status=status, + ), + ), + ) + def read( self, logger: logging.Logger, @@ -339,6 +361,10 @@ def read( logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.") continue + # Emit STARTED status + yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED) + yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING) + records = get_scenario_records(scenario) logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") @@ -351,3 +377,6 @@ def read( emitted_at=now_ms, ), ) + + # Emit COMPLETE status + yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE)