From 171f16025d66aa809026924fbfac170c3b1aa370 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:54:47 +0000 Subject: [PATCH 1/9] feat(cli): extract smoke-test source as dedicated cli submodule Co-Authored-By: AJ Steers --- airbyte/cli/__init__.py | 2 + airbyte/cli/smoke_test_source/__init__.py | 18 + airbyte/cli/smoke_test_source/_scenarios.py | 490 ++++++++++++++++++++ airbyte/cli/smoke_test_source/run.py | 17 + airbyte/cli/smoke_test_source/source.py | 316 +++++++++++++ pyproject.toml | 1 + 6 files changed, 844 insertions(+) create mode 100644 airbyte/cli/__init__.py create mode 100644 airbyte/cli/smoke_test_source/__init__.py create mode 100644 airbyte/cli/smoke_test_source/_scenarios.py create mode 100644 airbyte/cli/smoke_test_source/run.py create mode 100644 airbyte/cli/smoke_test_source/source.py diff --git a/airbyte/cli/__init__.py b/airbyte/cli/__init__.py new file mode 100644 index 000000000..e6423c8b2 --- /dev/null +++ b/airbyte/cli/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""CLI modules for PyAirbyte.""" diff --git a/airbyte/cli/smoke_test_source/__init__.py b/airbyte/cli/smoke_test_source/__init__.py new file mode 100644 index 000000000..4b26a03f3 --- /dev/null +++ b/airbyte/cli/smoke_test_source/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Smoke test source for destination regression testing. + +This module provides a synthetic data source that generates test data +covering common edge cases that break destinations: type variations, +null handling, naming edge cases, schema variations, and batch sizes. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from airbyte.cli.smoke_test_source.source import SourceSmokeTest + + +__all__ = [ + "SourceSmokeTest", +] diff --git a/airbyte/cli/smoke_test_source/_scenarios.py b/airbyte/cli/smoke_test_source/_scenarios.py new file mode 100644 index 000000000..e0ec084ef --- /dev/null +++ b/airbyte/cli/smoke_test_source/_scenarios.py @@ -0,0 +1,490 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Predefined smoke test scenarios for destination regression testing. + +Each scenario defines a stream name, JSON schema, optional primary key, +and either inline records or a record generator reference. +""" + +from __future__ import annotations + +import math +from typing import Any + + +_DEFAULT_LARGE_BATCH_COUNT = 1000 + +HIGH_VOLUME_SCENARIO_NAMES: set[str] = { + "large_batch_stream", +} + +PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ + { + "name": "basic_types", + "description": "Covers fundamental column types: string, integer, number, boolean.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "amount": {"type": "number"}, + "is_active": {"type": "boolean"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "name": "Alice", "amount": 100.50, "is_active": True}, + {"id": 2, "name": "Bob", "amount": 0.0, "is_active": False}, + {"id": 3, "name": "", "amount": -99.99, "is_active": True}, + ], + }, + { + "name": "timestamp_types", + "description": "Covers date and timestamp formats including ISO 8601 variations.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_date": {"type": "string", "format": "date"}, + "updated_at": {"type": "string", "format": "date-time"}, + "epoch_seconds": {"type": "integer"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "created_date": "2024-01-15", + "updated_at": "2024-01-15T10:30:00Z", + "epoch_seconds": 1705312200, + }, + { + "id": 2, + "created_date": "1970-01-01", + "updated_at": "1970-01-01T00:00:00+00:00", + "epoch_seconds": 0, + }, + { + "id": 3, + "created_date": "2099-12-31", + "updated_at": "2099-12-31T23:59:59.999999Z", + "epoch_seconds": 4102444799, + }, + ], + }, + { + "name": "large_decimals_and_numbers", + "description": ( + "Tests handling of very large numbers, " "high precision decimals, and boundary values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "big_integer": {"type": "integer"}, + "precise_decimal": {"type": "number"}, + "small_decimal": {"type": "number"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "big_integer": 9999999999999999, + "precise_decimal": math.pi, + "small_decimal": 0.000001, + }, + { + "id": 2, + "big_integer": -9999999999999999, + "precise_decimal": -0.1, + "small_decimal": 1e-10, + }, + { + "id": 3, + "big_integer": 0, + "precise_decimal": 99999999.99999999, + "small_decimal": 0.0, + }, + ], + }, + { + "name": "nested_json_objects", + "description": "Tests nested object and array handling in destination columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "metadata": { + "type": "object", + "properties": { + "source": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}}, + }, + }, + "nested_deep": { + "type": "object", + "properties": { + "level1": { + "type": "object", + "properties": { + "level2": { + "type": "object", + "properties": { + "value": {"type": "string"}, + }, + }, + }, + }, + }, + }, + "items_array": { + "type": "array", + "items": { + "type": "object", + "properties": { + "sku": {"type": "string"}, + "qty": {"type": "integer"}, + }, + }, + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "metadata": {"source": "api", "tags": ["a", "b", "c"]}, + "nested_deep": {"level1": {"level2": {"value": "deep"}}}, + "items_array": [{"sku": "ABC", "qty": 10}], + }, + { + "id": 2, + "metadata": {"source": "manual", "tags": []}, + "nested_deep": {"level1": {"level2": {"value": ""}}}, + "items_array": [], + }, + ], + }, + { + "name": "null_handling", + "description": "Tests null values across all column types and patterns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "nullable_string": {"type": ["null", "string"]}, + "nullable_integer": {"type": ["null", "integer"]}, + "nullable_number": {"type": ["null", "number"]}, + "nullable_boolean": {"type": ["null", "boolean"]}, + "nullable_object": { + "type": ["null", "object"], + "properties": {"key": {"type": "string"}}, + }, + "always_null": {"type": ["null", "string"]}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "nullable_string": "present", + "nullable_integer": 42, + "nullable_number": math.pi, + "nullable_boolean": True, + "nullable_object": {"key": "val"}, + "always_null": None, + }, + { + "id": 2, + "nullable_string": None, + "nullable_integer": None, + "nullable_number": None, + "nullable_boolean": None, + "nullable_object": None, + "always_null": None, + }, + { + "id": 3, + "nullable_string": "", + "nullable_integer": 0, + "nullable_number": 0.0, + "nullable_boolean": False, + "nullable_object": {}, + "always_null": None, + }, + ], + }, + { + "name": "column_naming_edge_cases", + "description": ("Tests special characters, casing, " "and reserved words in column names."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "CamelCaseColumn": {"type": "string"}, + "ALLCAPS": {"type": "string"}, + "snake_case_column": {"type": "string"}, + "column-with-dashes": {"type": "string"}, + "column.with.dots": {"type": "string"}, + "column with spaces": {"type": "string"}, + "select": {"type": "string"}, + "from": {"type": "string"}, + "order": {"type": "string"}, + "group": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "CamelCaseColumn": "camel", + "ALLCAPS": "caps", + "snake_case_column": "snake", + "column-with-dashes": "dashes", + "column.with.dots": "dots", + "column with spaces": "spaces", + "select": "reserved_select", + "from": "reserved_from", + "order": "reserved_order", + "group": "reserved_group", + }, + ], + }, + { + "name": "table_naming_edge_cases", + "description": ("Stream with special characters in the name " "to test table naming."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "table_name_test"}, + ], + }, + { + "name": "CamelCaseStreamName", + "description": "Stream with CamelCase name to test case handling.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "camel_case_stream_test"}, + ], + }, + { + "name": "wide_table_50_columns", + "description": "Tests a wide table with 50 columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + **{f"col_{i:03d}": {"type": ["null", "string"]} for i in range(1, 50)}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + **{f"col_{i:03d}": f"val_{i}" for i in range(1, 50)}, + }, + { + "id": 2, + **{f"col_{i:03d}": None for i in range(1, 50)}, + }, + ], + }, + { + "name": "empty_stream", + "description": ("A stream that emits zero records, " "testing empty dataset handling."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [], + }, + { + "name": "single_record_stream", + "description": "A stream with exactly one record.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "only_record"}, + ], + }, + { + "name": "large_batch_stream", + "description": ( + "A stream that generates a configurable " "number of records for batch testing." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "value": {"type": "number"}, + "category": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "record_count": _DEFAULT_LARGE_BATCH_COUNT, + "record_generator": "large_batch", + "high_volume": True, + }, + { + "name": "unicode_and_special_strings", + "description": ( + "Tests unicode characters, emoji, escape " "sequences, and special string values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "unicode_text": {"type": "string"}, + "special_chars": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "unicode_text": "Hello World", + "special_chars": "line1\nline2\ttab", + }, + { + "id": 2, + "unicode_text": "Caf\u00e9 na\u00efve r\u00e9sum\u00e9", + "special_chars": 'quote"inside', + }, + { + "id": 3, + "unicode_text": "\u4f60\u597d\u4e16\u754c", + "special_chars": "back\\slash", + }, + { + "id": 4, + "unicode_text": "\u0410\u0411\u0412\u0413", + "special_chars": "", + }, + ], + }, + { + "name": "schema_with_no_primary_key", + "description": ("A stream without a primary key, " "testing append-only behavior."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "event_id": {"type": "string"}, + "event_type": {"type": "string"}, + "payload": {"type": "string"}, + }, + }, + "primary_key": None, + "records": [ + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + { + "event_id": "evt_002", + "event_type": "view", + "payload": '{"page": "home"}', + }, + ], + }, + { + "name": "long_column_names", + "description": ( + "Tests handling of very long column names " "that may exceed database limits." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": { + "type": "string", + }, + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": { + "type": "string", + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": "long_col_1", + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": "long_col_2", + }, + ], + }, +] + + +def generate_large_batch_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Generate records for the large_batch_stream scenario.""" + count = scenario.get("record_count", _DEFAULT_LARGE_BATCH_COUNT) + categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] + return [ + { + "id": i, + "name": f"record_{i:06d}", + "value": float(i) * 1.1, + "category": categories[i % len(categories)], + } + for i in range(1, count + 1) + ] + + +def get_scenario_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Get records for a scenario, using generator if specified.""" + if scenario.get("record_generator") == "large_batch": + return generate_large_batch_records(scenario) + return scenario.get("records", []) diff --git a/airbyte/cli/smoke_test_source/run.py b/airbyte/cli/smoke_test_source/run.py new file mode 100644 index 000000000..299e254bd --- /dev/null +++ b/airbyte/cli/smoke_test_source/run.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the Smoke Test source.""" + +import sys + +from airbyte_cdk.entrypoint import launch + +from airbyte.cli.smoke_test_source.source import SourceSmokeTest + + +def run() -> None: + """Run the smoke test source.""" + launch(SourceSmokeTest(), sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py new file mode 100644 index 000000000..0d4b1cb99 --- /dev/null +++ b/airbyte/cli/smoke_test_source/source.py @@ -0,0 +1,316 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Smoke test source for destination regression testing. + +This source generates synthetic test data covering common edge cases +that break destinations: type variations, null handling, naming edge cases, +schema variations, and batch size variations. + +Predefined scenarios are always available. Additional scenarios can be +injected dynamically via the ``custom_scenarios`` config field. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING, Any + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + Status, + SyncMode, + Type, +) +from airbyte_cdk.sources.source import Source + +from airbyte.cli.smoke_test_source._scenarios import ( + _DEFAULT_LARGE_BATCH_COUNT, + PREDEFINED_SCENARIOS, + get_scenario_records, +) + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + +logger = logging.getLogger("airbyte") + + +def _build_streams_from_scenarios( + scenarios: list[dict[str, Any]], +) -> list[AirbyteStream]: + """Build AirbyteStream objects from scenario definitions.""" + return [ + AirbyteStream( + name=scenario["name"], + json_schema=scenario["json_schema"], + supported_sync_modes=[SyncMode.full_refresh], + source_defined_cursor=False, + source_defined_primary_key=scenario.get("primary_key"), + ) + for scenario in scenarios + ] + + +class SourceSmokeTest(Source): + """Smoke test source for destination regression testing. + + Generates synthetic data across predefined scenarios that cover + common destination failure patterns. Supports dynamic injection + of additional scenarios via the ``custom_scenarios`` config field. + """ + + def spec( + self, + logger: logging.Logger, # noqa: ARG002 + ) -> ConnectorSpecification: + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/sources/smoke-test", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Smoke Test Source Spec", + "type": "object", + "required": [], + "properties": { + "custom_scenarios": { + "type": "array", + "title": "Custom Test Scenarios", + "description": ( + "Additional test scenarios to inject " + "at runtime. Each scenario defines a " + "stream name, JSON schema, and records." + ), + "items": { + "type": "object", + "required": [ + "name", + "json_schema", + "records", + ], + "properties": { + "name": { + "type": "string", + "description": "Stream name for this scenario.", + }, + "description": { + "type": "string", + "description": ("Human-readable description of this scenario."), + }, + "json_schema": { + "type": "object", + "description": "JSON schema for the stream.", + }, + "records": { + "type": "array", + "description": "Records to emit for this stream.", + "items": {"type": "object"}, + }, + "primary_key": { + "type": ["array", "null"], + "description": ( + "Primary key definition (list of key paths) or null." + ), + "items": { + "type": "array", + "items": {"type": "string"}, + }, + }, + }, + }, + "default": [], + }, + "large_batch_record_count": { + "type": "integer", + "title": "Large Batch Record Count", + "description": ( + "Number of records to generate for " + "the large_batch_stream scenario. " + "Set to 0 to skip this stream." + ), + "default": 1000, + }, + "all_fast_streams": { + "type": "boolean", + "title": "All Fast Streams", + "description": ("Include all fast (non-high-volume) predefined streams."), + "default": True, + }, + "all_slow_streams": { + "type": "boolean", + "title": "All Slow Streams", + "description": ( + "Include all slow (high-volume) streams " + "such as large_batch_stream. These are " + "excluded by default to avoid incurring " + "the cost of large record sets." + ), + "default": False, + }, + "scenario_filter": { + "type": "array", + "title": "Scenario Filter", + "description": ( + "Specific scenario names to include. " + "These are unioned with the boolean-driven " + "sets (deduped). If omitted or empty, " + "only the boolean flags control selection." + ), + "items": {"type": "string"}, + "default": [], + }, + }, + }, + ) + + def _get_all_scenarios( + self, + config: Mapping[str, Any], + ) -> list[dict[str, Any]]: + """Combine predefined and custom scenarios. + + Selection logic: + 1. Boolean flags control groups: ``all_fast_streams`` + (default true) enables non-high-volume scenarios, + ``all_slow_streams`` (default false) enables + high-volume scenarios. + 2. ``scenario_filter`` names are unioned with the boolean sets. + 3. Custom scenarios are always included. + 4. The final list is deduplicated by name. + """ + include_default = config.get("all_fast_streams", True) + include_high_volume = config.get("all_slow_streams", False) + scenario_filter: list[str] = config.get("scenario_filter", []) + explicit_names: set[str] = set(scenario_filter) + + large_batch_count = config.get( + "large_batch_record_count", + _DEFAULT_LARGE_BATCH_COUNT, + ) + + scenarios: list[dict[str, Any]] = [] + seen_names: set[str] = set() + + for scenario in PREDEFINED_SCENARIOS: + name = scenario["name"] + is_high_volume = scenario.get("high_volume", False) + + included_by_flag = (include_high_volume and is_high_volume) or ( + include_default and not is_high_volume + ) + if not included_by_flag and name not in explicit_names: + continue + + s = dict(scenario) + if name == "large_batch_stream" and large_batch_count != _DEFAULT_LARGE_BATCH_COUNT: + s["record_count"] = large_batch_count + + if name not in seen_names: + scenarios.append(s) + seen_names.add(name) + + custom = config.get("custom_scenarios", []) + if custom: + for cs in custom: + name = cs.get("name", "") + if not name or not cs.get("json_schema"): + continue + if name not in seen_names: + scenarios.append( + { + "name": name, + "description": cs.get( + "description", + "Custom injected scenario", + ), + "json_schema": cs["json_schema"], + "primary_key": cs.get("primary_key"), + "records": cs.get("records", []), + } + ) + seen_names.add(name) + + return scenarios + + def check( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Validate the configuration.""" + custom = config.get("custom_scenarios", []) + for i, scenario in enumerate(custom): + if not scenario.get("name"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Custom scenario at index {i} is missing 'name'.", + ) + if not scenario.get("json_schema"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=(f"Custom scenario '{scenario['name']}' is missing 'json_schema'."), + ) + + scenarios = self._get_all_scenarios(config) + if not scenarios: + return AirbyteConnectionStatus( + status=Status.FAILED, + message="No scenarios available. Check scenario_filter config.", + ) + + logger.info(f"Smoke test source check passed with {len(scenarios)} scenarios.") + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + def discover( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteCatalog: + """Return the catalog with all test scenario streams.""" + scenarios = self._get_all_scenarios(config) + streams = _build_streams_from_scenarios(scenarios) + logger.info(f"Discovered {len(streams)} smoke test streams.") + return AirbyteCatalog(streams=streams) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[Any] | None = None, # noqa: ARG002 + ) -> Iterable[AirbyteMessage]: + """Read records from selected smoke test streams.""" + selected_streams = {stream.stream.name for stream in catalog.streams} + scenarios = self._get_all_scenarios(config) + scenario_map = {s["name"]: s for s in scenarios} + now_ms = int(time.time() * 1000) + + for stream_name in selected_streams: + scenario = scenario_map.get(stream_name) + if not scenario: + logger.warning(f"Stream '{stream_name}' not found in scenarios, skipping.") + continue + + records = get_scenario_records(scenario) + logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.") + + for record in records: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=record, + emitted_at=now_ms, + ), + ) diff --git a/pyproject.toml b/pyproject.toml index 4b6fa4e16..27c3343eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ dev = [ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" +source-smoke-test = "airbyte.cli.smoke_test_source.run:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From eb1f8c4a970c12abf0b9c8e686ee012b60b4bfc5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:01:59 +0000 Subject: [PATCH 2/9] fix: make records optional in custom_scenarios spec schema Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 0d4b1cb99..681d103bc 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -97,7 +97,6 @@ def spec( "required": [ "name", "json_schema", - "records", ], "properties": { "name": { From 4ba67fad5b1dc2fb1180cb9e595d4a7f68134be4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:07:17 +0000 Subject: [PATCH 3/9] fix: update large_batch_record_count description to match behavior Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 681d103bc..3bcddb305 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -136,7 +136,7 @@ def spec( "description": ( "Number of records to generate for " "the large_batch_stream scenario. " - "Set to 0 to skip this stream." + "Set to 0 to emit no records for this stream." ), "default": 1000, }, From 24dd46238830b2878647302f7d18f5ff35ef6f76 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:15:47 +0000 Subject: [PATCH 4/9] fix: move cli.py to cli/pyab.py to resolve package shadowing and add config type validation Co-Authored-By: AJ Steers --- airbyte/cli/__init__.py | 5 +++++ airbyte/{cli.py => cli/pyab.py} | 2 +- airbyte/cli/smoke_test_source/source.py | 25 ++++++++++++++++++++++--- pyproject.toml | 4 ++-- 4 files changed, 30 insertions(+), 6 deletions(-) rename airbyte/{cli.py => cli/pyab.py} (99%) diff --git a/airbyte/cli/__init__.py b/airbyte/cli/__init__.py index e6423c8b2..aa98d792d 100644 --- a/airbyte/cli/__init__.py +++ b/airbyte/cli/__init__.py @@ -1,2 +1,7 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. """CLI modules for PyAirbyte.""" + +from airbyte.cli.pyab import cli + + +__all__ = ["cli"] diff --git a/airbyte/cli.py b/airbyte/cli/pyab.py similarity index 99% rename from airbyte/cli.py rename to airbyte/cli/pyab.py index 22c157344..fb99bc53c 100644 --- a/airbyte/cli.py +++ b/airbyte/cli/pyab.py @@ -170,7 +170,7 @@ def _is_executable_path(connector_str: str) -> bool: def _get_connector_name(connector: str) -> str: if _is_docker_image(connector): - return connector.split(":")[0].split("/")[-1] + return connector.split(":", maxsplit=1)[0].rsplit("/", maxsplit=1)[-1] return connector diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 3bcddb305..2cfd4090b 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -190,7 +190,12 @@ def _get_all_scenarios( """ include_default = config.get("all_fast_streams", True) include_high_volume = config.get("all_slow_streams", False) - scenario_filter: list[str] = config.get("scenario_filter", []) + raw_scenario_filter = config.get("scenario_filter", []) + scenario_filter: list[str] = ( + [name for name in raw_scenario_filter if isinstance(name, str)] + if isinstance(raw_scenario_filter, list) + else [] + ) explicit_names: set[str] = set(scenario_filter) large_batch_count = config.get( @@ -219,9 +224,12 @@ def _get_all_scenarios( scenarios.append(s) seen_names.add(name) - custom = config.get("custom_scenarios", []) + raw_custom = config.get("custom_scenarios", []) + custom = raw_custom if isinstance(raw_custom, list) else [] if custom: for cs in custom: + if not isinstance(cs, dict): + continue name = cs.get("name", "") if not name or not cs.get("json_schema"): continue @@ -248,8 +256,19 @@ def check( config: Mapping[str, Any], ) -> AirbyteConnectionStatus: """Validate the configuration.""" - custom = config.get("custom_scenarios", []) + raw_custom = config.get("custom_scenarios", []) + if not isinstance(raw_custom, list): + return AirbyteConnectionStatus( + status=Status.FAILED, + message="'custom_scenarios' must be an array of objects.", + ) + custom = raw_custom for i, scenario in enumerate(custom): + if not isinstance(scenario, dict): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Custom scenario at index {i} must be an object.", + ) if not scenario.get("name"): return AirbyteConnectionStatus( status=Status.FAILED, diff --git a/pyproject.toml b/pyproject.toml index 27c3343eb..0ab95fe8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,8 +77,8 @@ dev = [ ] [project.scripts] -pyairbyte = "airbyte.cli:cli" -pyab = "airbyte.cli:cli" +pyairbyte = "airbyte.cli.pyab:cli" +pyab = "airbyte.cli.pyab:cli" airbyte-mcp = "airbyte.mcp.server:main" source-smoke-test = "airbyte.cli.smoke_test_source.run:run" From 2bf78614932f7617f664c0b29fccf5733cf87a72 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:20:14 +0000 Subject: [PATCH 5/9] fix: add __main__.py to restore python -m airbyte.cli invocation Co-Authored-By: AJ Steers --- airbyte/cli/__main__.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 airbyte/cli/__main__.py diff --git a/airbyte/cli/__main__.py b/airbyte/cli/__main__.py new file mode 100644 index 000000000..e002b2212 --- /dev/null +++ b/airbyte/cli/__main__.py @@ -0,0 +1,8 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Allow running the CLI via python -m airbyte.cli.""" + +from airbyte.cli.pyab import cli + + +if __name__ == "__main__": + cli() From b6ab7102372670462a0d816045d38aa9f8c12331 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:24:32 +0000 Subject: [PATCH 6/9] fix: update docs/generate.py to reference new cli/pyab.py path Co-Authored-By: AJ Steers --- docs/generate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/generate.py b/docs/generate.py index a659034ec..a3bfa91b9 100755 --- a/docs/generate.py +++ b/docs/generate.py @@ -18,7 +18,7 @@ def run() -> None: """Generate docs for all public modules in PyAirbyte and save them to docs/generated.""" - public_modules = ["airbyte", "airbyte/cli.py"] + public_modules = ["airbyte", "airbyte/cli/pyab.py"] # recursively delete the docs/generated folder if it exists if pathlib.Path("docs/generated").exists(): From 5a39c797d1065b009696dc2fc6f3be696a6c3d1c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:33:56 +0000 Subject: [PATCH 7/9] fix: remove __main__.py and python -m airbyte.cli reference per review Co-Authored-By: AJ Steers --- airbyte/cli/__main__.py | 8 -------- airbyte/cli/pyab.py | 1 - 2 files changed, 9 deletions(-) delete mode 100644 airbyte/cli/__main__.py diff --git a/airbyte/cli/__main__.py b/airbyte/cli/__main__.py deleted file mode 100644 index e002b2212..000000000 --- a/airbyte/cli/__main__.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Allow running the CLI via python -m airbyte.cli.""" - -from airbyte.cli.pyab import cli - - -if __name__ == "__main__": - cli() diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index fb99bc53c..0fa5f5d5c 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -9,7 +9,6 @@ These are equivalent: ```bash -python -m airbyte.cli --help pyairbyte --help pyab --help ``` From 660e438e73cd60556d0d593550af34fa3784f4db Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:07:46 +0000 Subject: [PATCH 8/9] fix: harden check() validation for json_schema and records types Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index 2cfd4090b..ab91622b8 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -274,11 +274,32 @@ def check( status=Status.FAILED, message=f"Custom scenario at index {i} is missing 'name'.", ) - if not scenario.get("json_schema"): + if not isinstance(scenario.get("json_schema"), dict): return AirbyteConnectionStatus( status=Status.FAILED, - message=(f"Custom scenario '{scenario['name']}' is missing 'json_schema'."), + message=( + f"Custom scenario '{scenario['name']}' must provide " + "'json_schema' as an object." + ), ) + if "records" in scenario: + if not isinstance(scenario["records"], list): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=( + f"Custom scenario '{scenario['name']}' has invalid 'records': " + "expected an array of objects." + ), + ) + for j, record in enumerate(scenario["records"]): + if not isinstance(record, dict): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=( + f"Custom scenario '{scenario['name']}' record at index {j} " + "must be an object." + ), + ) scenarios = self._get_all_scenarios(config) if not scenarios: From 3052f253716201effe64f3c7f23a3a510917b9e3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:09:47 +0000 Subject: [PATCH 9/9] fix: extract _validate_custom_scenarios to fix PLR0911 (too many returns) Co-Authored-By: AJ Steers --- airbyte/cli/smoke_test_source/source.py | 74 ++++++++++++------------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/airbyte/cli/smoke_test_source/source.py b/airbyte/cli/smoke_test_source/source.py index ab91622b8..5ed3f2a75 100644 --- a/airbyte/cli/smoke_test_source/source.py +++ b/airbyte/cli/smoke_test_source/source.py @@ -250,6 +250,35 @@ def _get_all_scenarios( return scenarios + @staticmethod + def _validate_custom_scenarios( + scenarios: list[Any], + ) -> str | None: + """Validate custom scenario entries, returning an error message or None.""" + for i, scenario in enumerate(scenarios): + if not isinstance(scenario, dict): + return f"Custom scenario at index {i} must be an object." + if not scenario.get("name"): + return f"Custom scenario at index {i} is missing 'name'." + if not isinstance(scenario.get("json_schema"), dict): + return ( + f"Custom scenario '{scenario['name']}' must provide " + "'json_schema' as an object." + ) + if "records" in scenario: + if not isinstance(scenario["records"], list): + return ( + f"Custom scenario '{scenario['name']}' has invalid 'records': " + "expected an array of objects." + ) + for j, record in enumerate(scenario["records"]): + if not isinstance(record, dict): + return ( + f"Custom scenario '{scenario['name']}' record at index {j} " + "must be an object." + ) + return None + def check( self, logger: logging.Logger, @@ -262,44 +291,13 @@ def check( status=Status.FAILED, message="'custom_scenarios' must be an array of objects.", ) - custom = raw_custom - for i, scenario in enumerate(custom): - if not isinstance(scenario, dict): - return AirbyteConnectionStatus( - status=Status.FAILED, - message=f"Custom scenario at index {i} must be an object.", - ) - if not scenario.get("name"): - return AirbyteConnectionStatus( - status=Status.FAILED, - message=f"Custom scenario at index {i} is missing 'name'.", - ) - if not isinstance(scenario.get("json_schema"), dict): - return AirbyteConnectionStatus( - status=Status.FAILED, - message=( - f"Custom scenario '{scenario['name']}' must provide " - "'json_schema' as an object." - ), - ) - if "records" in scenario: - if not isinstance(scenario["records"], list): - return AirbyteConnectionStatus( - status=Status.FAILED, - message=( - f"Custom scenario '{scenario['name']}' has invalid 'records': " - "expected an array of objects." - ), - ) - for j, record in enumerate(scenario["records"]): - if not isinstance(record, dict): - return AirbyteConnectionStatus( - status=Status.FAILED, - message=( - f"Custom scenario '{scenario['name']}' record at index {j} " - "must be an object." - ), - ) + + error = self._validate_custom_scenarios(raw_custom) + if error: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=error, + ) scenarios = self._get_all_scenarios(config) if not scenarios: