diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py new file mode 100644 index 000000000..28995d226 --- /dev/null +++ b/airbyte/_util/destination_smoke_tests.py @@ -0,0 +1,221 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Shared implementation for destination smoke tests. + +This module provides the core logic for running smoke tests against destination +connectors. It is used by both the CLI (`pyab destination-smoke-test`) and the +MCP tool (`destination_smoke_test`). + +Smoke tests send synthetic data from the built-in smoke test source to a +destination connector and report whether the destination accepted the data +without errors. No readback or comparison is performed. +""" + +from __future__ import annotations + +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import yaml +from pydantic import BaseModel + +from airbyte import get_source +from airbyte.exceptions import PyAirbyteInputError + + +if TYPE_CHECKING: + from airbyte.destinations.base import Destination + from airbyte.sources.base import Source + + +class DestinationSmokeTestResult(BaseModel): + """Result of a destination smoke test run.""" + + success: bool + """Whether the smoke test passed (destination accepted all data without errors).""" + + destination: str + """The destination connector name.""" + + records_delivered: int + """Total number of records delivered to the destination.""" + + scenarios_requested: str + """Which scenarios were requested ('all' or a comma-separated list).""" + + elapsed_seconds: float + """Time taken for the smoke test in seconds.""" + + error: str | None = None + """Error message if the smoke test failed.""" + + +def get_smoke_test_source( + *, + scenarios: str | list[str] = "fast", + custom_scenarios: list[dict[str, Any]] | None = None, + custom_scenarios_file: str | None = None, +) -> Source: + """Create a smoke test source with the given configuration. + + The smoke test source generates synthetic data across predefined scenarios + that cover common destination failure patterns. + + `scenarios` controls which scenarios to run: + + - `'fast'` (default): runs all fast (non-high-volume) predefined scenarios, + excluding `large_batch_stream`. + - `'all'`: runs every predefined scenario including `large_batch_stream`. + - A comma-separated string or list of specific scenario names. + + `custom_scenarios` is an optional list of scenario dicts to inject directly. + + `custom_scenarios_file` is an optional path to a JSON or YAML file containing + additional scenario definitions. Each scenario should have `name`, `json_schema`, + and optionally `records` and `primary_key`. + """ + # Normalize empty list to "fast" (default) + if isinstance(scenarios, list) and not scenarios: + scenarios = "fast" + + scenarios_str = ",".join(scenarios) if isinstance(scenarios, list) else scenarios + keyword = scenarios_str.strip().lower() + is_all = keyword == "all" + is_fast = keyword == "fast" + + if is_all: + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": True, + } + elif is_fast: + source_config: dict[str, Any] = { + "all_fast_streams": True, + "all_slow_streams": False, + } + else: + source_config: dict[str, Any] = { + "all_fast_streams": False, + "all_slow_streams": False, + } + if isinstance(scenarios, list): + source_config["scenario_filter"] = [s.strip() for s in scenarios if s.strip()] + else: + source_config["scenario_filter"] = [ + s.strip() for s in scenarios.split(",") if s.strip() + ] + + # Handle custom scenarios passed as a list of dicts (MCP path) + if custom_scenarios: + source_config["custom_scenarios"] = custom_scenarios + + # Handle custom scenarios from a file path (CLI path) + if custom_scenarios_file: + custom_path = Path(custom_scenarios_file) + if not custom_path.exists(): + raise PyAirbyteInputError( + message="Custom scenarios file not found.", + input_value=str(custom_path), + ) + loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8")) + if isinstance(loaded, list): + file_scenarios = loaded + elif isinstance(loaded, dict) and "custom_scenarios" in loaded: + file_scenarios = loaded["custom_scenarios"] + else: + raise PyAirbyteInputError( + message=( + "Custom scenarios file must contain a list of scenarios " + "or a dict with a 'custom_scenarios' key." + ), + input_value=str(custom_path), + ) + # Merge with any directly-provided custom scenarios + existing = source_config.get("custom_scenarios", []) + source_config["custom_scenarios"] = existing + file_scenarios + + return get_source( + name="source-smoke-test", + config=source_config, + streams="*", + local_executable="source-smoke-test", + ) + + +def _sanitize_error(ex: Exception) -> str: + """Extract an error message from an exception without leaking secrets. + + Uses `get_message()` when available (PyAirbyte exceptions) to avoid + including full config/context in the error string. + """ + if hasattr(ex, "get_message"): + return f"{type(ex).__name__}: {ex.get_message()}" + return f"{type(ex).__name__}: {ex}" + + +def run_destination_smoke_test( + *, + destination: Destination, + scenarios: str | list[str] = "fast", + custom_scenarios: list[dict[str, Any]] | None = None, + custom_scenarios_file: str | None = None, +) -> DestinationSmokeTestResult: + """Run a smoke test against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and returns a structured result. + + This function does NOT read back data from the destination or compare + results. It only verifies that the destination accepts the data without + errors. + + `destination` is a resolved `Destination` object ready for writing. + + `scenarios` controls which predefined scenarios to run: + + - `'fast'` (default): runs all fast (non-high-volume) predefined scenarios. + - `'all'`: runs every scenario including `large_batch_stream`. + - A comma-separated string or list of specific scenario names. + + `custom_scenarios` is an optional list of scenario dicts to inject. + + `custom_scenarios_file` is an optional path to a JSON/YAML file with + additional scenario definitions. + """ + source_obj = get_smoke_test_source( + scenarios=scenarios, + custom_scenarios=custom_scenarios, + custom_scenarios_file=custom_scenarios_file, + ) + + # Normalize scenarios to a display string + if isinstance(scenarios, list): + scenarios_str = ",".join(scenarios) if scenarios else "fast" + else: + scenarios_str = scenarios + + start_time = time.monotonic() + success = False + error_message: str | None = None + records_delivered = 0 + try: + write_result = destination.write( + source_data=source_obj, + cache=False, + state_cache=False, + ) + records_delivered = write_result.processed_records + success = True + except Exception as ex: + error_message = _sanitize_error(ex) + + elapsed = time.monotonic() - start_time + + return DestinationSmokeTestResult( + success=success, + destination=destination.name, + records_delivered=records_delivered, + scenarios_requested=scenarios_str, + elapsed_seconds=round(elapsed, 2), + error=error_message, + ) diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index 0fa5f5d5c..b7d0abd80 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -60,6 +60,7 @@ from __future__ import annotations +import json import re import sys from pathlib import Path @@ -68,6 +69,7 @@ import click import yaml +from airbyte._util.destination_smoke_tests import run_destination_smoke_test from airbyte.destinations.util import get_destination, get_noop_destination from airbyte.exceptions import PyAirbyteInputError from airbyte.secrets.util import get_secret @@ -631,6 +633,120 @@ def sync( ) +@click.command(name="destination-smoke-test") +@click.option( + "--destination", + type=str, + required=True, + help=( + "The destination connector to test. Can be a connector name " + "(e.g. 'destination-snowflake'), a Docker image with tag " + "(e.g. 'airbyte/destination-snowflake:3.14.0'), or a path to a local executable." + ), +) +@click.option( + "--config", + type=str, + help="The destination configuration. " + CONFIG_HELP, +) +@click.option( + "--pip-url", + type=str, + help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP, +) +@click.option( + "--use-python", + type=str, + help=( + "Python interpreter specification. Use 'true' for current Python, " + "'false' for Docker, a path for specific interpreter, or a version " + "string for uv-managed Python (e.g., '3.11', 'python3.12')." + ), +) +@click.option( + "--scenarios", + type=str, + default="fast", + help=( + "Which smoke test scenarios to run. " + "Use 'fast' (default) for all fast predefined scenarios " + "(excludes large_batch_stream), 'all' for every predefined scenario " + "including large batch, or provide a comma-separated list of scenario names. " + "Available scenarios: basic_types, timestamp_types, " + "large_decimals_and_numbers, nested_json_objects, null_handling, " + "column_naming_edge_cases, table_naming_edge_cases, " + "CamelCaseStreamName, wide_table_50_columns, empty_stream, " + "single_record_stream, unicode_and_special_strings, " + "schema_with_no_primary_key, long_column_names, large_batch_stream." + ), +) +@click.option( + "--custom-scenarios", + type=str, + default=None, + help=( + "Path to a JSON or YAML file containing additional custom test scenarios. " + "Each scenario should define 'name', 'json_schema', and optionally 'records' " + "and 'primary_key'. These are unioned with the predefined scenarios." + ), +) +def destination_smoke_test( + *, + destination: str, + config: str | None = None, + pip_url: str | None = None, + use_python: str | None = None, + scenarios: str = "fast", + custom_scenarios: str | None = None, +) -> None: + """Run smoke tests against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and reports success or failure. The smoke test source + generates data across predefined scenarios covering common destination + failure patterns: type variations, null handling, naming edge cases, + schema variations, and batch sizes. + + This command does NOT read back data from the destination or compare + results. It only verifies that the destination accepts the data without + errors. + + Usage examples: + + `pyab destination-smoke-test --destination=destination-dev-null` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json` + + `pyab destination-smoke-test --destination=destination-motherduck + --scenarios=basic_types,null_handling` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json --scenarios=all` + """ + click.echo("Resolving destination...", file=sys.stderr) + destination_obj = _resolve_destination_job( + destination=destination, + config=config, + pip_url=pip_url, + use_python=use_python, + ) + + click.echo("Running destination smoke test...", file=sys.stderr) + result = run_destination_smoke_test( + destination=destination_obj, + scenarios=scenarios, + custom_scenarios_file=custom_scenarios, + ) + + click.echo(json.dumps(result.model_dump(), indent=2)) + + if not result.success: + if result.error: + click.echo(f"Smoke test FAILED: {result.error}", file=sys.stderr) + sys.exit(1) + + @click.group() def cli() -> None: """@private PyAirbyte CLI.""" @@ -640,6 +756,7 @@ def cli() -> None: cli.add_command(validate) cli.add_command(benchmark) cli.add_command(sync) +cli.add_command(destination_smoke_test) if __name__ == "__main__": cli() diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 5ed3f2a75..76a340877 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -25,10 +25,15 @@ AirbyteMessage, AirbyteRecordMessage, AirbyteStream, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, + StreamDescriptor, SyncMode, + TraceType, Type, ) from airbyte_cdk.sources.source import Source @@ -262,8 +267,7 @@ def _validate_custom_scenarios( return f"Custom scenario at index {i} is missing 'name'." if not isinstance(scenario.get("json_schema"), dict): return ( - f"Custom scenario '{scenario['name']}' must provide " - "'json_schema' as an object." + f"Custom scenario '{scenario['name']}' must provide 'json_schema' as an object." ) if "records" in scenario: if not isinstance(scenario["records"], list): @@ -320,6 +324,24 @@ def discover( logger.info(f"Discovered {len(streams)} smoke test streams.") return AirbyteCatalog(streams=streams) + def _stream_status_message( + self, + stream_name: str, + status: AirbyteStreamStatus, + ) -> AirbyteMessage: + """Build an AirbyteMessage containing a stream status trace.""" + return AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + emitted_at=time.time() * 1000, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor(name=stream_name), + status=status, + ), + ), + ) + def read( self, logger: logging.Logger, @@ -339,6 +361,10 @@ def read( logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.") continue + # Emit STARTED status + yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED) + yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING) + records = get_scenario_records(scenario) logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") @@ -351,3 +377,6 @@ def read( emitted_at=now_ms, ), ) + + # Emit COMPLETE status + yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 56216ff95..4f8d23aac 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -12,8 +12,13 @@ from pydantic import BaseModel, Field from airbyte import get_source +from airbyte._util.destination_smoke_tests import ( + DestinationSmokeTestResult, + run_destination_smoke_test, +) from airbyte._util.meta import is_docker_installed from airbyte.caches.util import get_default_cache +from airbyte.destinations.util import get_destination from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings from airbyte.registry import get_connector_metadata from airbyte.secrets.config import _get_secret_sources @@ -804,6 +809,121 @@ def run_sql_query( del cache # Ensure the cache is closed properly +@mcp_tool( + destructive=True, +) +def destination_smoke_test( + destination_connector_name: Annotated[ + str, + Field( + description=( + "The name of the destination connector to test " + "(e.g. 'destination-snowflake', 'destination-motherduck')." + ), + ), + ], + config: Annotated[ + dict | str | None, + Field( + description=( + "The destination configuration as a dict object or JSON string. " + "Must not contain hardcoded secrets; use secret_reference::ENV_VAR_NAME instead." + ), + default=None, + ), + ], + config_file: Annotated[ + str | Path | None, + Field( + description="Path to a YAML or JSON file containing the destination configuration.", + default=None, + ), + ], + config_secret_name: Annotated[ + str | None, + Field( + description="The name of the secret containing the destination configuration.", + default=None, + ), + ], + scenarios: Annotated[ + list[str] | str, + Field( + description=( + "Which scenarios to run. Use 'fast' (default) for all fast predefined " + "scenarios (excludes large_batch_stream), 'all' for every predefined " + "scenario including large batch, or provide a list of scenario names " + "or a comma-separated string." + ), + default="fast", + ), + ], + custom_scenarios: Annotated[ + list[dict[str, Any]] | None, + Field( + description=( + "Additional custom test scenarios to inject. Each scenario should define " + "'name', 'json_schema', and optionally 'records' and 'primary_key'. " + "These are unioned with the predefined scenarios." + ), + default=None, + ), + ], + docker_image: Annotated[ + str | None, + Field( + description=( + "Optional Docker image override for the destination connector " + "(e.g. 'airbyte/destination-snowflake:3.14.0')." + ), + default=None, + ), + ], +) -> DestinationSmokeTestResult: + """Run smoke tests against a destination connector. + + Sends synthetic test data from the smoke test source to the specified + destination and reports success or failure. The smoke test source generates + data across predefined scenarios covering common destination failure patterns: + type variations, null handling, naming edge cases, schema variations, and + batch sizes. + + This tool does NOT read back data from the destination or compare results. + It only verifies that the destination accepts the data without errors. + """ + # Resolve destination config + config_dict = resolve_connector_config( + config=config, + config_file=config_file, + config_secret_name=config_secret_name, + ) + + # Set up destination + destination_kwargs: dict[str, Any] = { + "name": destination_connector_name, + "config": config_dict, + } + if docker_image: + destination_kwargs["docker_image"] = docker_image + elif is_docker_installed(): + destination_kwargs["docker_image"] = True + + destination_obj = get_destination(**destination_kwargs) + + # Resolve scenarios for the shared helper + resolved_scenarios: str | list[str] + if isinstance(scenarios, str): + resolved_scenarios = scenarios + else: + resolved_scenarios = resolve_list_of_strings(scenarios) or "fast" + + return run_destination_smoke_test( + destination=destination_obj, + scenarios=resolved_scenarios, + custom_scenarios=custom_scenarios, + ) + + def register_local_tools(app: FastMCP) -> None: """Register local tools with the FastMCP app.