Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions airbyte/_util/destination_smoke_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from __future__ import annotations

import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand All @@ -23,11 +24,41 @@
from airbyte.exceptions import PyAirbyteInputError


NAMESPACE_PREFIX = "zz_deleteme"
"""Prefix for auto-generated smoke test namespaces.

The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the
namespace is safe for automated cleanup.
"""

DEFAULT_NAMESPACE_SUFFIX = "smoke_test"
"""Default suffix appended when no explicit suffix is provided."""


if TYPE_CHECKING:
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source


def generate_namespace(
*,
namespace_suffix: str | None = None,
) -> str:
"""Generate a smoke-test namespace.

Format: ``zz_deleteme_yyyymmdd_hhmm_<suffix>``.
The ``zz_`` prefix sorts last alphabetically and the ``deleteme``
token acts as a guard for automated cleanup scripts.

If *namespace_suffix* is not provided, ``smoke_test`` is used as the
default suffix.
"""
suffix = namespace_suffix or DEFAULT_NAMESPACE_SUFFIX
now = datetime.now(tz=timezone.utc)
ts = now.strftime("%Y%m%d_%H%M")
return f"{NAMESPACE_PREFIX}_{ts}_{suffix}"


class DestinationSmokeTestResult(BaseModel):
"""Result of a destination smoke test run."""

Expand All @@ -37,6 +68,9 @@ class DestinationSmokeTestResult(BaseModel):
destination: str
"""The destination connector name."""

namespace: str
"""The namespace used for this smoke test run."""

records_delivered: int
"""Total number of records delivered to the destination."""

Expand All @@ -53,6 +87,7 @@ class DestinationSmokeTestResult(BaseModel):
def get_smoke_test_source(
*,
scenarios: str | list[str] = "fast",
namespace: str | None = None,
custom_scenarios: list[dict[str, Any]] | None = None,
custom_scenarios_file: str | None = None,
) -> Source:
Expand All @@ -70,6 +105,9 @@ def get_smoke_test_source(

`custom_scenarios` is an optional list of scenario dicts to inject directly.

`namespace` is an optional namespace to set on all streams. When provided,
the destination will write data into this namespace (schema, database, etc.).

`custom_scenarios_file` is an optional path to a JSON or YAML file containing
additional scenario definitions. Each scenario should have `name`, `json_schema`,
and optionally `records` and `primary_key`.
Expand Down Expand Up @@ -134,6 +172,9 @@ def get_smoke_test_source(
existing = source_config.get("custom_scenarios", [])
source_config["custom_scenarios"] = existing + file_scenarios

if namespace:
source_config["namespace"] = namespace

return get_source(
name="source-smoke-test",
config=source_config,
Expand All @@ -157,6 +198,8 @@ def run_destination_smoke_test(
*,
destination: Destination,
scenarios: str | list[str] = "fast",
namespace_suffix: str | None = None,
reuse_namespace: str | None = None,
custom_scenarios: list[dict[str, Any]] | None = None,
custom_scenarios_file: str | None = None,
) -> DestinationSmokeTestResult:
Expand All @@ -177,13 +220,26 @@ def run_destination_smoke_test(
- `'all'`: runs every scenario including `large_batch_stream`.
- A comma-separated string or list of specific scenario names.

`namespace_suffix` is an optional suffix appended to the auto-generated
namespace. Defaults to ``smoke_test`` when not provided
(e.g. ``zz_deleteme_20260318_2256_smoke_test``).

`reuse_namespace` is an exact namespace string to reuse from a previous
run. When set, no new namespace is generated.

`custom_scenarios` is an optional list of scenario dicts to inject.

`custom_scenarios_file` is an optional path to a JSON/YAML file with
additional scenario definitions.
"""
# Determine namespace
namespace = reuse_namespace or generate_namespace(
namespace_suffix=namespace_suffix,
)

source_obj = get_smoke_test_source(
scenarios=scenarios,
namespace=namespace,
custom_scenarios=custom_scenarios,
custom_scenarios_file=custom_scenarios_file,
)
Expand Down Expand Up @@ -214,6 +270,7 @@ def run_destination_smoke_test(
return DestinationSmokeTestResult(
success=success,
destination=destination.name,
namespace=namespace,
records_delivered=records_delivered,
scenarios_requested=scenarios_str,
elapsed_seconds=round(elapsed, 2),
Expand Down
31 changes: 31 additions & 0 deletions airbyte/cli/pyab.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,26 @@ def sync(
"and 'primary_key'. These are unioned with the predefined scenarios."
),
)
@click.option(
"--namespace-suffix",
type=str,
default=None,
help=(
"Optional suffix appended to the auto-generated namespace. "
"Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
"Use this to distinguish concurrent runs."
),
)
@click.option(
"--reuse-namespace",
type=str,
default=None,
help=(
"Exact namespace to reuse from a previous run. "
"When set, no new namespace is generated. "
"Useful for running a second test against an already-populated namespace."
),
)
def destination_smoke_test(
*,
destination: str,
Expand All @@ -698,6 +718,8 @@ def destination_smoke_test(
use_python: str | None = None,
scenarios: str = "fast",
custom_scenarios: str | None = None,
namespace_suffix: str | None = None,
reuse_namespace: str | None = None,
) -> None:
"""Run smoke tests against a destination connector.

Expand All @@ -723,6 +745,13 @@ def destination_smoke_test(

`pyab destination-smoke-test --destination=destination-snowflake
--config=./secrets/snowflake.json --scenarios=all`

`pyab destination-smoke-test --destination=destination-snowflake
--config=./secrets/snowflake.json --namespace-suffix=run2`

`pyab destination-smoke-test --destination=destination-snowflake
--config=./secrets/snowflake.json
--reuse-namespace=zz_deleteme_20260318_2256`
"""
click.echo("Resolving destination...", file=sys.stderr)
destination_obj = _resolve_destination_job(
Expand All @@ -736,6 +765,8 @@ def destination_smoke_test(
result = run_destination_smoke_test(
destination=destination_obj,
scenarios=scenarios,
namespace_suffix=namespace_suffix,
reuse_namespace=reuse_namespace,
custom_scenarios_file=custom_scenarios,
)

Expand Down
42 changes: 37 additions & 5 deletions airbyte/cli/smoke_test_source/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@

def _build_streams_from_scenarios(
scenarios: list[dict[str, Any]],
namespace: str | None = None,
) -> list[AirbyteStream]:
"""Build AirbyteStream objects from scenario definitions."""
return [
AirbyteStream(
name=scenario["name"],
namespace=namespace,
json_schema=scenario["json_schema"],
supported_sync_modes=[SyncMode.full_refresh],
source_defined_cursor=False,
Expand Down Expand Up @@ -174,6 +176,16 @@ def spec(
"items": {"type": "string"},
"default": [],
},
"namespace": {
"type": ["string", "null"],
"title": "Namespace",
"description": (
"Namespace (schema/database) to set on all "
"streams. When provided, the destination will "
"write data into this namespace."
),
"default": None,
},
},
},
)
Expand Down Expand Up @@ -320,14 +332,16 @@ def discover(
) -> AirbyteCatalog:
"""Return the catalog with all test scenario streams."""
scenarios = self._get_all_scenarios(config)
streams = _build_streams_from_scenarios(scenarios)
namespace = config.get("namespace")
streams = _build_streams_from_scenarios(scenarios, namespace=namespace)
logger.info(f"Discovered {len(streams)} smoke test streams.")
return AirbyteCatalog(streams=streams)

def _stream_status_message(
self,
stream_name: str,
status: AirbyteStreamStatus,
namespace: str | None = None,
) -> AirbyteMessage:
"""Build an AirbyteMessage containing a stream status trace."""
return AirbyteMessage(
Expand All @@ -336,7 +350,10 @@ def _stream_status_message(
type=TraceType.STREAM_STATUS,
emitted_at=time.time() * 1000,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(name=stream_name),
stream_descriptor=StreamDescriptor(
name=stream_name,
namespace=namespace,
),
status=status,
),
),
Expand All @@ -355,15 +372,25 @@ def read(
scenario_map = {s["name"]: s for s in scenarios}
now_ms = int(time.time() * 1000)

namespace = config.get("namespace")

for stream_name in selected_streams:
scenario = scenario_map.get(stream_name)
if not scenario:
logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.")
continue

# Emit STARTED status
yield self._stream_status_message(stream_name, AirbyteStreamStatus.STARTED)
yield self._stream_status_message(stream_name, AirbyteStreamStatus.RUNNING)
yield self._stream_status_message(
stream_name,
AirbyteStreamStatus.STARTED,
namespace=namespace,
)
yield self._stream_status_message(
stream_name,
AirbyteStreamStatus.RUNNING,
namespace=namespace,
)

records = get_scenario_records(scenario)
logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.")
Expand All @@ -373,10 +400,15 @@ def read(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=stream_name,
namespace=namespace,
data=record,
emitted_at=now_ms,
),
)

# Emit COMPLETE status
yield self._stream_status_message(stream_name, AirbyteStreamStatus.COMPLETE)
yield self._stream_status_message(
stream_name,
AirbyteStreamStatus.COMPLETE,
namespace=namespace,
)
26 changes: 25 additions & 1 deletion airbyte/mcp/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ def run_sql_query(
@mcp_tool(
destructive=True,
)
def destination_smoke_test(
def destination_smoke_test( # noqa: PLR0913, PLR0917
destination_connector_name: Annotated[
str,
Field(
Expand Down Expand Up @@ -879,6 +879,28 @@ def destination_smoke_test(
default=None,
),
],
namespace_suffix: Annotated[
str | None,
Field(
description=(
"Optional suffix appended to the auto-generated namespace. "
"Defaults to 'smoke_test' (format: 'zz_deleteme_yyyymmdd_hhmm_{suffix}'). "
"Use this to distinguish concurrent runs."
),
default=None,
),
],
reuse_namespace: Annotated[
str | None,
Field(
description=(
"Exact namespace to reuse from a previous run. "
"When set, no new namespace is generated. "
"Useful for running a second test against an already-populated namespace."
),
default=None,
),
],
) -> DestinationSmokeTestResult:
"""Run smoke tests against a destination connector.

Expand Down Expand Up @@ -920,6 +942,8 @@ def destination_smoke_test(
return run_destination_smoke_test(
destination=destination_obj,
scenarios=resolved_scenarios,
namespace_suffix=namespace_suffix,
reuse_namespace=reuse_namespace,
custom_scenarios=custom_scenarios,
)

Expand Down
Loading