diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 28995d226..67c87adbb 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -13,6 +13,7 @@ from __future__ import annotations import time +from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any @@ -23,11 +24,41 @@ from airbyte.exceptions import PyAirbyteInputError +NAMESPACE_PREFIX = "zz_deleteme" +"""Prefix for auto-generated smoke test namespaces. + +The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the +namespace is safe for automated cleanup. +""" + +DEFAULT_NAMESPACE_SUFFIX = "smoke_test" +"""Default suffix appended when no explicit suffix is provided.""" + + if TYPE_CHECKING: from airbyte.destinations.base import Destination from airbyte.sources.base import Source +def generate_namespace( + *, + namespace_suffix: str | None = None, +) -> str: + """Generate a smoke-test namespace. + + Format: ``zz_deleteme_yyyymmdd_hhmm_``. + The ``zz_`` prefix sorts last alphabetically and the ``deleteme`` + token acts as a guard for automated cleanup scripts. + + If *namespace_suffix* is not provided, ``smoke_test`` is used as the + default suffix. + """ + suffix = namespace_suffix or DEFAULT_NAMESPACE_SUFFIX + now = datetime.now(tz=timezone.utc) + ts = now.strftime("%Y%m%d_%H%M") + return f"{NAMESPACE_PREFIX}_{ts}_{suffix}" + + class DestinationSmokeTestResult(BaseModel): """Result of a destination smoke test run.""" @@ -37,6 +68,9 @@ class DestinationSmokeTestResult(BaseModel): destination: str """The destination connector name.""" + namespace: str + """The namespace used for this smoke test run.""" + records_delivered: int """Total number of records delivered to the destination.""" @@ -53,6 +87,7 @@ class DestinationSmokeTestResult(BaseModel): def get_smoke_test_source( *, scenarios: str | list[str] = "fast", + namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> Source: @@ -70,6 +105,9 @@ def get_smoke_test_source( `custom_scenarios` is an optional list of scenario dicts to inject directly. + `namespace` is an optional namespace to set on all streams. When provided, + the destination will write data into this namespace (schema, database, etc.). + `custom_scenarios_file` is an optional path to a JSON or YAML file containing additional scenario definitions. Each scenario should have `name`, `json_schema`, and optionally `records` and `primary_key`. @@ -134,6 +172,9 @@ def get_smoke_test_source( existing = source_config.get("custom_scenarios", []) source_config["custom_scenarios"] = existing + file_scenarios + if namespace: + source_config["namespace"] = namespace + return get_source( name="source-smoke-test", config=source_config, @@ -157,6 +198,8 @@ def run_destination_smoke_test( *, destination: Destination, scenarios: str | list[str] = "fast", + namespace_suffix: str | None = None, + reuse_namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> DestinationSmokeTestResult: @@ -177,13 +220,26 @@ def run_destination_smoke_test( - `'all'`: runs every scenario including `large_batch_stream`. - A comma-separated string or list of specific scenario names. + `namespace_suffix` is an optional suffix appended to the auto-generated + namespace. Defaults to ``smoke_test`` when not provided + (e.g. ``zz_deleteme_20260318_2256_smoke_test``). + + `reuse_namespace` is an exact namespace string to reuse from a previous + run. When set, no new namespace is generated. + `custom_scenarios` is an optional list of scenario dicts to inject. `custom_scenarios_file` is an optional path to a JSON/YAML file with additional scenario definitions. """ + # Determine namespace + namespace = reuse_namespace or generate_namespace( + namespace_suffix=namespace_suffix, + ) + source_obj = get_smoke_test_source( scenarios=scenarios, + namespace=namespace, custom_scenarios=custom_scenarios, custom_scenarios_file=custom_scenarios_file, ) @@ -214,6 +270,7 @@ def run_destination_smoke_test( return DestinationSmokeTestResult( success=success, destination=destination.name, + namespace=namespace, records_delivered=records_delivered, scenarios_requested=scenarios_str, elapsed_seconds=round(elapsed, 2), diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index b7d0abd80..f71ca5b5a 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -690,6 +690,26 @@ def sync( "and 'primary_key'. These are unioned with the predefined scenarios." ), ) +@click.option( + "--namespace-suffix", + type=str, + default=None, + help=( + "Optional suffix appended to the auto-generated namespace. " + "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " + "Use this to distinguish concurrent runs." + ), +) +@click.option( + "--reuse-namespace", + type=str, + default=None, + help=( + "Exact namespace to reuse from a previous run. " + "When set, no new namespace is generated. " + "Useful for running a second test against an already-populated namespace." + ), +) def destination_smoke_test( *, destination: str, @@ -698,6 +718,8 @@ def destination_smoke_test( use_python: str | None = None, scenarios: str = "fast", custom_scenarios: str | None = None, + namespace_suffix: str | None = None, + reuse_namespace: str | None = None, ) -> None: """Run smoke tests against a destination connector. @@ -723,6 +745,13 @@ def destination_smoke_test( `pyab destination-smoke-test --destination=destination-snowflake --config=./secrets/snowflake.json --scenarios=all` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json --namespace-suffix=run2` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json + --reuse-namespace=zz_deleteme_20260318_2256` """ click.echo("Resolving destination...", file=sys.stderr) destination_obj = _resolve_destination_job( @@ -736,6 +765,8 @@ def destination_smoke_test( result = run_destination_smoke_test( destination=destination_obj, scenarios=scenarios, + namespace_suffix=namespace_suffix, + reuse_namespace=reuse_namespace, custom_scenarios_file=custom_scenarios, ) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 76a340877..46abc56a8 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -54,11 +54,13 @@ def _build_streams_from_scenarios( scenarios: list[dict[str, Any]], + namespace: str | None = None, ) -> list[AirbyteStream]: """Build AirbyteStream objects from scenario definitions.""" return [ AirbyteStream( name=scenario["name"], + namespace=namespace, json_schema=scenario["json_schema"], supported_sync_modes=[SyncMode.full_refresh], source_defined_cursor=False, @@ -174,6 +176,16 @@ def spec( "items": {"type": "string"}, "default": [], }, + "namespace": { + "type": ["string", "null"], + "title": "Namespace", + "description": ( + "Namespace (schema/database) to set on all " + "streams. When provided, the destination will " + "write data into this namespace." + ), + "default": None, + }, }, }, ) @@ -320,7 +332,8 @@ def discover( ) -> AirbyteCatalog: """Return the catalog with all test scenario streams.""" scenarios = self._get_all_scenarios(config) - streams = _build_streams_from_scenarios(scenarios) + namespace = config.get("namespace") + streams = _build_streams_from_scenarios(scenarios, namespace=namespace) logger.info(f"Discovered {len(streams)} smoke test streams.") return AirbyteCatalog(streams=streams) @@ -328,6 +341,7 @@ def _stream_status_message( self, stream_name: str, status: AirbyteStreamStatus, + namespace: str | None = None, ) -> AirbyteMessage: """Build an AirbyteMessage containing a stream status trace.""" return AirbyteMessage( @@ -336,7 +350,10 @@ def _stream_status_message( type=TraceType.STREAM_STATUS, emitted_at=time.time() * 1000, stream_status=AirbyteStreamStatusTraceMessage( - stream_descriptor=StreamDescriptor(name=stream_name), + stream_descriptor=StreamDescriptor( + name=stream_name, + namespace=namespace, + ), status=status, ), ), @@ -355,6 +372,8 @@ def read( scenario_map = {s["name"]: s for s in scenarios} now_ms = int(time.time() * 1000) + namespace = config.get("namespace") + for stream_name in selected_streams: scenario = scenario_map.get(stream_name) if not scenario: @@ -362,8 +381,16 @@ def read( continue # Emit STARTED status - yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED) - yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.STARTED, + namespace=namespace, + ) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.RUNNING, + namespace=namespace, + ) records = get_scenario_records(scenario) logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") @@ -373,10 +400,15 @@ def read( type=Type.RECORD, record=AirbyteRecordMessage( stream=stream_name, + namespace=namespace, data=record, emitted_at=now_ms, ), ) # Emit COMPLETE status - yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.COMPLETE, + namespace=namespace, + ) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 4f8d23aac..7df085365 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -812,7 +812,7 @@ def run_sql_query( @mcp_tool( destructive=True, ) -def destination_smoke_test( +def destination_smoke_test( # noqa: PLR0913, PLR0917 destination_connector_name: Annotated[ str, Field( @@ -879,6 +879,28 @@ def destination_smoke_test( default=None, ), ], + namespace_suffix: Annotated[ + str | None, + Field( + description=( + "Optional suffix appended to the auto-generated namespace. " + "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " + "Use this to distinguish concurrent runs." + ), + default=None, + ), + ], + reuse_namespace: Annotated[ + str | None, + Field( + description=( + "Exact namespace to reuse from a previous run. " + "When set, no new namespace is generated. " + "Useful for running a second test against an already-populated namespace." + ), + default=None, + ), + ], ) -> DestinationSmokeTestResult: """Run smoke tests against a destination connector. @@ -920,6 +942,8 @@ def destination_smoke_test( return run_destination_smoke_test( destination=destination_obj, scenarios=resolved_scenarios, + namespace_suffix=namespace_suffix, + reuse_namespace=reuse_namespace, custom_scenarios=custom_scenarios, )