From 09d36b1b548cc43563ef916cfdb69fee882a9049 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:01:46 +0000 Subject: [PATCH 1/4] feat(cli,mcp): add namespace support to destination smoke tests - Generate zz_deleteme_yyyymmdd_hhmm namespaces for test isolation - Add --namespace-suffix for concurrent run disambiguation - Add --reuse-namespace for multi-test runs on same namespace - Inject namespace into smoke test source streams and records - Include namespace in DestinationSmokeTestResult Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 52 ++++++++++++++++++++++++ airbyte/cli/pyab.py | 31 ++++++++++++++ airbyte/cli/smoke_test_source/source.py | 17 +++++++- airbyte/mcp/local.py | 26 +++++++++++- 4 files changed, 124 insertions(+), 2 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 28995d226..984ce4790 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -13,6 +13,7 @@ from __future__ import annotations import time +from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any @@ -23,11 +24,37 @@ from airbyte.exceptions import PyAirbyteInputError +NAMESPACE_PREFIX = "zz_deleteme" +"""Prefix for auto-generated smoke test namespaces. + +The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the +namespace is safe for automated cleanup. +""" + + if TYPE_CHECKING: from airbyte.destinations.base import Destination from airbyte.sources.base import Source +def generate_namespace( + *, + namespace_suffix: str | None = None, +) -> str: + """Generate a smoke-test namespace. + + Format: ``zz_deleteme_yyyymmdd_hhmm`` with an optional ``_``. + The ``zz_`` prefix sorts last alphabetically and the ``deleteme`` + token acts as a guard for automated cleanup scripts. + """ + now = datetime.now(tz=timezone.utc) + ts = now.strftime("%Y%m%d_%H%M") + ns = f"{NAMESPACE_PREFIX}_{ts}" + if namespace_suffix: + ns = f"{ns}_{namespace_suffix}" + return ns + + class DestinationSmokeTestResult(BaseModel): """Result of a destination smoke test run.""" @@ -37,6 +64,9 @@ class DestinationSmokeTestResult(BaseModel): destination: str """The destination connector name.""" + namespace: str + """The namespace used for this smoke test run.""" + records_delivered: int """Total number of records delivered to the destination.""" @@ -53,6 +83,7 @@ class DestinationSmokeTestResult(BaseModel): def get_smoke_test_source( *, scenarios: str | list[str] = "fast", + namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> Source: @@ -70,6 +101,9 @@ def get_smoke_test_source( `custom_scenarios` is an optional list of scenario dicts to inject directly. + `namespace` is an optional namespace to set on all streams. When provided, + the destination will write data into this namespace (schema, database, etc.). + `custom_scenarios_file` is an optional path to a JSON or YAML file containing additional scenario definitions. Each scenario should have `name`, `json_schema`, and optionally `records` and `primary_key`. @@ -134,6 +168,9 @@ def get_smoke_test_source( existing = source_config.get("custom_scenarios", []) source_config["custom_scenarios"] = existing + file_scenarios + if namespace: + source_config["namespace"] = namespace + return get_source( name="source-smoke-test", config=source_config, @@ -157,6 +194,8 @@ def run_destination_smoke_test( *, destination: Destination, scenarios: str | list[str] = "fast", + namespace_suffix: str | None = None, + reuse_namespace: str | None = None, custom_scenarios: list[dict[str, Any]] | None = None, custom_scenarios_file: str | None = None, ) -> DestinationSmokeTestResult: @@ -177,13 +216,25 @@ def run_destination_smoke_test( - `'all'`: runs every scenario including `large_batch_stream`. - A comma-separated string or list of specific scenario names. + `namespace_suffix` is an optional suffix appended to the auto-generated + namespace (e.g. ``zz_deleteme_20260318_2256_mysuffix``). + + `reuse_namespace` is an exact namespace string to reuse from a previous + run. When set, no new namespace is generated and cleanup is skipped. + `custom_scenarios` is an optional list of scenario dicts to inject. `custom_scenarios_file` is an optional path to a JSON/YAML file with additional scenario definitions. """ + # Determine namespace + namespace = reuse_namespace or generate_namespace( + namespace_suffix=namespace_suffix, + ) + source_obj = get_smoke_test_source( scenarios=scenarios, + namespace=namespace, custom_scenarios=custom_scenarios, custom_scenarios_file=custom_scenarios_file, ) @@ -214,6 +265,7 @@ def run_destination_smoke_test( return DestinationSmokeTestResult( success=success, destination=destination.name, + namespace=namespace, records_delivered=records_delivered, scenarios_requested=scenarios_str, elapsed_seconds=round(elapsed, 2), diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index b7d0abd80..adbcec0e9 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -690,6 +690,26 @@ def sync( "and 'primary_key'. These are unioned with the predefined scenarios." ), ) +@click.option( + "--namespace-suffix", + type=str, + default=None, + help=( + "Optional suffix appended to the auto-generated namespace. " + "The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. " + "Use this to distinguish concurrent runs." + ), +) +@click.option( + "--reuse-namespace", + type=str, + default=None, + help=( + "Exact namespace to reuse from a previous run. " + "When set, no new namespace is generated and cleanup is skipped. " + "Useful for running a second test against an already-populated namespace." + ), +) def destination_smoke_test( *, destination: str, @@ -698,6 +718,8 @@ def destination_smoke_test( use_python: str | None = None, scenarios: str = "fast", custom_scenarios: str | None = None, + namespace_suffix: str | None = None, + reuse_namespace: str | None = None, ) -> None: """Run smoke tests against a destination connector. @@ -723,6 +745,13 @@ def destination_smoke_test( `pyab destination-smoke-test --destination=destination-snowflake --config=./secrets/snowflake.json --scenarios=all` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json --namespace-suffix=run2` + + `pyab destination-smoke-test --destination=destination-snowflake + --config=./secrets/snowflake.json + --reuse-namespace=zz_deleteme_20260318_2256` """ click.echo("Resolving destination...", file=sys.stderr) destination_obj = _resolve_destination_job( @@ -736,6 +765,8 @@ def destination_smoke_test( result = run_destination_smoke_test( destination=destination_obj, scenarios=scenarios, + namespace_suffix=namespace_suffix, + reuse_namespace=reuse_namespace, custom_scenarios_file=custom_scenarios, ) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 76a340877..12e50f23b 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -54,11 +54,13 @@ def _build_streams_from_scenarios( scenarios: list[dict[str, Any]], + namespace: str | None = None, ) -> list[AirbyteStream]: """Build AirbyteStream objects from scenario definitions.""" return [ AirbyteStream( name=scenario["name"], + namespace=namespace, json_schema=scenario["json_schema"], supported_sync_modes=[SyncMode.full_refresh], source_defined_cursor=False, @@ -174,6 +176,16 @@ def spec( "items": {"type": "string"}, "default": [], }, + "namespace": { + "type": ["string", "null"], + "title": "Namespace", + "description": ( + "Namespace (schema/database) to set on all " + "streams. When provided, the destination will " + "write data into this namespace." + ), + "default": None, + }, }, }, ) @@ -320,7 +332,8 @@ def discover( ) -> AirbyteCatalog: """Return the catalog with all test scenario streams.""" scenarios = self._get_all_scenarios(config) - streams = _build_streams_from_scenarios(scenarios) + namespace = config.get("namespace") + streams = _build_streams_from_scenarios(scenarios, namespace=namespace) logger.info(f"Discovered {len(streams)} smoke test streams.") return AirbyteCatalog(streams=streams) @@ -368,11 +381,13 @@ def read( records = get_scenario_records(scenario) logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") + namespace = config.get("namespace") for record in records: yield AirbyteMessage( type=Type.RECORD, record=AirbyteRecordMessage( stream=stream_name, + namespace=namespace, data=record, emitted_at=now_ms, ), diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 4f8d23aac..765b11c28 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -812,7 +812,7 @@ def run_sql_query( @mcp_tool( destructive=True, ) -def destination_smoke_test( +def destination_smoke_test( # noqa: PLR0913, PLR0917 destination_connector_name: Annotated[ str, Field( @@ -879,6 +879,28 @@ def destination_smoke_test( default=None, ), ], + namespace_suffix: Annotated[ + str | None, + Field( + description=( + "Optional suffix appended to the auto-generated namespace. " + "The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. " + "Use this to distinguish concurrent runs." + ), + default=None, + ), + ], + reuse_namespace: Annotated[ + str | None, + Field( + description=( + "Exact namespace to reuse from a previous run. " + "When set, no new namespace is generated and cleanup is skipped. " + "Useful for running a second test against an already-populated namespace." + ), + default=None, + ), + ], ) -> DestinationSmokeTestResult: """Run smoke tests against a destination connector. @@ -920,6 +942,8 @@ def destination_smoke_test( return run_destination_smoke_test( destination=destination_obj, scenarios=resolved_scenarios, + namespace_suffix=namespace_suffix, + reuse_namespace=reuse_namespace, custom_scenarios=custom_scenarios, ) From 6c9013c25ac98bdb6ad1011d70f4868bf6bfd150 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:15:35 +0000 Subject: [PATCH 2/4] fix: default namespace_suffix to 'smoke_test' when not provided Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 17 +++++++++++------ airbyte/cli/pyab.py | 2 +- airbyte/mcp/local.py | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 984ce4790..22210ed2f 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -31,6 +31,9 @@ namespace is safe for automated cleanup. """ +DEFAULT_NAMESPACE_SUFFIX = "smoke_test" +"""Default suffix appended when no explicit suffix is provided.""" + if TYPE_CHECKING: from airbyte.destinations.base import Destination @@ -43,16 +46,17 @@ def generate_namespace( ) -> str: """Generate a smoke-test namespace. - Format: ``zz_deleteme_yyyymmdd_hhmm`` with an optional ``_``. + Format: ``zz_deleteme_yyyymmdd_hhmm_``. The ``zz_`` prefix sorts last alphabetically and the ``deleteme`` token acts as a guard for automated cleanup scripts. + + If *namespace_suffix* is not provided, ``smoke_test`` is used as the + default suffix. """ + suffix = namespace_suffix or DEFAULT_NAMESPACE_SUFFIX now = datetime.now(tz=timezone.utc) ts = now.strftime("%Y%m%d_%H%M") - ns = f"{NAMESPACE_PREFIX}_{ts}" - if namespace_suffix: - ns = f"{ns}_{namespace_suffix}" - return ns + return f"{NAMESPACE_PREFIX}_{ts}_{suffix}" class DestinationSmokeTestResult(BaseModel): @@ -217,7 +221,8 @@ def run_destination_smoke_test( - A comma-separated string or list of specific scenario names. `namespace_suffix` is an optional suffix appended to the auto-generated - namespace (e.g. ``zz_deleteme_20260318_2256_mysuffix``). + namespace. Defaults to ``smoke_test`` when not provided + (e.g. ``zz_deleteme_20260318_2256_smoke_test``). `reuse_namespace` is an exact namespace string to reuse from a previous run. When set, no new namespace is generated and cleanup is skipped. diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index adbcec0e9..be31a8759 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -696,7 +696,7 @@ def sync( default=None, help=( "Optional suffix appended to the auto-generated namespace. " - "The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. " + "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " "Use this to distinguish concurrent runs." ), ) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 765b11c28..80e54bbea 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -884,7 +884,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 Field( description=( "Optional suffix appended to the auto-generated namespace. " - "The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. " + "Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). " "Use this to distinguish concurrent runs." ), default=None, From 2b7b2a8d659209d9361f54395ef18e642b8a1747 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:21:05 +0000 Subject: [PATCH 3/4] fix: include namespace in stream status TRACE messages Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 27 ++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 12e50f23b..46abc56a8 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -341,6 +341,7 @@ def _stream_status_message( self, stream_name: str, status: AirbyteStreamStatus, + namespace: str | None = None, ) -> AirbyteMessage: """Build an AirbyteMessage containing a stream status trace.""" return AirbyteMessage( @@ -349,7 +350,10 @@ def _stream_status_message( type=TraceType.STREAM_STATUS, emitted_at=time.time() * 1000, stream_status=AirbyteStreamStatusTraceMessage( - stream_descriptor=StreamDescriptor(name=stream_name), + stream_descriptor=StreamDescriptor( + name=stream_name, + namespace=namespace, + ), status=status, ), ), @@ -368,6 +372,8 @@ def read( scenario_map = {s["name"]: s for s in scenarios} now_ms = int(time.time() * 1000) + namespace = config.get("namespace") + for stream_name in selected_streams: scenario = scenario_map.get(stream_name) if not scenario: @@ -375,13 +381,20 @@ def read( continue # Emit STARTED status - yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED) - yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.STARTED, + namespace=namespace, + ) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.RUNNING, + namespace=namespace, + ) records = get_scenario_records(scenario) logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") - namespace = config.get("namespace") for record in records: yield AirbyteMessage( type=Type.RECORD, @@ -394,4 +407,8 @@ def read( ) # Emit COMPLETE status - yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE) + yield self._stream_status_message( + stream_name, + AirbyteStreamStatus.COMPLETE, + namespace=namespace, + ) From a5fff9e86e25690e7b3adb29be31f87f62d6553a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 23:31:32 +0000 Subject: [PATCH 4/4] fix: remove misleading 'cleanup is skipped' wording (cleanup not yet implemented) Co-Authored-By: AJ Steers --- airbyte/_util/destination_smoke_tests.py | 2 +- airbyte/cli/pyab.py | 2 +- airbyte/mcp/local.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 22210ed2f..67c87adbb 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -225,7 +225,7 @@ def run_destination_smoke_test( (e.g. ``zz_deleteme_20260318_2256_smoke_test``). `reuse_namespace` is an exact namespace string to reuse from a previous - run. When set, no new namespace is generated and cleanup is skipped. + run. When set, no new namespace is generated. `custom_scenarios` is an optional list of scenario dicts to inject. diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index be31a8759..f71ca5b5a 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -706,7 +706,7 @@ def sync( default=None, help=( "Exact namespace to reuse from a previous run. " - "When set, no new namespace is generated and cleanup is skipped. " + "When set, no new namespace is generated. " "Useful for running a second test against an already-populated namespace." ), ) diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 80e54bbea..7df085365 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -895,7 +895,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 Field( description=( "Exact namespace to reuse from a previous run. " - "When set, no new namespace is generated and cleanup is skipped. " + "When set, no new namespace is generated. " "Useful for running a second test against an already-populated namespace." ), default=None,