From c9285f8f7d47c718710f9b8b997e1faad7a0bc39 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 02:20:02 +0000 Subject: [PATCH 01/12] feat(destinations): add universal destination using PyAirbyte caches This adds a new universal destination connector that can write to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). The destination implements the Airbyte CDK Destination interface and uses PyAirbyte's cache system to handle the actual writing. Configuration allows selecting the destination type and passing backend-specific settings. Co-Authored-By: AJ Steers --- airbyte/destinations/universal/Dockerfile | 11 + airbyte/destinations/universal/__init__.py | 9 + airbyte/destinations/universal/destination.py | 248 ++++++++++++++++++ airbyte/destinations/universal/run.py | 15 ++ 4 files changed, 283 insertions(+) create mode 100644 airbyte/destinations/universal/Dockerfile create mode 100644 airbyte/destinations/universal/__init__.py create mode 100644 airbyte/destinations/universal/destination.py create mode 100644 airbyte/destinations/universal/run.py diff --git a/airbyte/destinations/universal/Dockerfile b/airbyte/destinations/universal/Dockerfile new file mode 100644 index 000000000..026a1366c --- /dev/null +++ b/airbyte/destinations/universal/Dockerfile @@ -0,0 +1,11 @@ +FROM docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 + +WORKDIR /airbyte/integration_code + +COPY pyproject.toml poetry.lock ./ +COPY airbyte ./airbyte + +RUN pip install --no-cache-dir . + +ENV AIRBYTE_ENTRYPOINT="python -m airbyte.destinations.universal.run" +ENTRYPOINT ["python", "-m", "airbyte.destinations.universal.run"] diff --git a/airbyte/destinations/universal/__init__.py b/airbyte/destinations/universal/__init__.py new file mode 100644 index 000000000..ba3cc4024 --- /dev/null +++ b/airbyte/destinations/universal/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal destination using PyAirbyte caches as backends.""" + +from airbyte.destinations.universal.destination import DestinationPyAirbyteUniversal + + +__all__ = [ + "DestinationPyAirbyteUniversal", +] diff --git a/airbyte/destinations/universal/destination.py b/airbyte/destinations/universal/destination.py new file mode 100644 index 000000000..3d0fc3a14 --- /dev/null +++ b/airbyte/destinations/universal/destination.py @@ -0,0 +1,248 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal destination implementation using PyAirbyte caches.""" + +from __future__ import annotations + +import datetime +import json +import logging +import uuid +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +from sqlalchemy import text + +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + ConfiguredAirbyteCatalog, + DestinationSyncMode, + Status, + Type, +) + +from airbyte.caches.bigquery import BigQueryCache +from airbyte.caches.duckdb import DuckDBCache +from airbyte.caches.motherduck import MotherDuckCache +from airbyte.caches.postgres import PostgresCache +from airbyte.caches.snowflake import SnowflakeCache +from airbyte.secrets.base import SecretString + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + from sqlalchemy.engine import Engine + + from airbyte.caches.base import CacheBase + + +logger = logging.getLogger("airbyte") + + +DESTINATION_TYPE_DUCKDB = "duckdb" +DESTINATION_TYPE_POSTGRES = "postgres" +DESTINATION_TYPE_SNOWFLAKE = "snowflake" +DESTINATION_TYPE_BIGQUERY = "bigquery" +DESTINATION_TYPE_MOTHERDUCK = "motherduck" + + +class DestinationPyAirbyteUniversal(Destination): + """Universal destination that writes to any PyAirbyte-supported cache backend.""" + + def _get_cache(self, config: Mapping[str, Any]) -> CacheBase: + """Create and return the appropriate cache based on configuration.""" + destination_type = config.get("destination_type") + + if destination_type == DESTINATION_TYPE_DUCKDB: + duckdb_config = config.get("duckdb", {}) + return DuckDBCache( + db_path=duckdb_config.get("db_path", "/local/pyairbyte.duckdb"), + schema_name=duckdb_config.get("schema_name", "main"), + ) + + if destination_type == DESTINATION_TYPE_POSTGRES: + pg_config = config.get("postgres", {}) + return PostgresCache( + host=pg_config.get("host", "localhost"), + port=pg_config.get("port", 5432), + username=pg_config.get("username", "postgres"), + password=SecretString(pg_config.get("password", "")), + database=pg_config.get("database", "postgres"), + schema_name=pg_config.get("schema_name", "public"), + ) + + if destination_type == DESTINATION_TYPE_SNOWFLAKE: + sf_config = config.get("snowflake", {}) + password = sf_config.get("password") + return SnowflakeCache( + account=sf_config.get("account", ""), + username=sf_config.get("username", ""), + password=SecretString(password) if password else None, + warehouse=sf_config.get("warehouse", ""), + database=sf_config.get("database", ""), + schema_name=sf_config.get("schema_name", "PUBLIC"), + role=sf_config.get("role", ""), + ) + + if destination_type == DESTINATION_TYPE_BIGQUERY: + bq_config = config.get("bigquery", {}) + return BigQueryCache( + project_name=bq_config.get("project_name", ""), + dataset_name=bq_config.get("dataset_name", ""), + credentials_path=bq_config.get("credentials_path"), + ) + + if destination_type == DESTINATION_TYPE_MOTHERDUCK: + md_config = config.get("motherduck", {}) + return MotherDuckCache( + database=md_config.get("database", "my_db"), + schema_name=md_config.get("schema_name", "main"), + api_key=SecretString(md_config.get("api_key", "")), + ) + + raise ValueError(f"Unsupported destination type: {destination_type}") + + def check( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Test the connection to the destination.""" + try: + cache = self._get_cache(config) + engine = cache.get_sql_engine() + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, message=f"Connection failed: {e!r}" + ) + + def write( + self, + config: Mapping[str, Any], + configured_catalog: ConfiguredAirbyteCatalog, + input_messages: Iterable[AirbyteMessage], + ) -> Iterable[AirbyteMessage]: + """Write data to the destination using PyAirbyte cache. + + This method processes messages in a streaming fashion, buffering records + and flushing on state messages to ensure fault tolerance. + """ + cache = self._get_cache(config) + streams = {s.stream.name for s in configured_catalog.streams} + schema_name = cache.schema_name + + logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams") + + # Get SQL engine and ensure schema exists + engine = cache.get_sql_engine() + with engine.connect() as conn: + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Create tables for each stream + for configured_stream in configured_catalog.streams: + name = configured_stream.stream.name + table_name = f"_airbyte_raw_{name}" + + with engine.connect() as conn: + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + logger.info(f"Dropping table for overwrite: {table_name}") + conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Create the raw table if needed + create_sql = f""" + CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( + _airbyte_ab_id VARCHAR(36) PRIMARY KEY, + _airbyte_emitted_at TIMESTAMP, + _airbyte_data JSON + ) + """ + conn.execute(text(create_sql)) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Buffer for records + buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) + + for message in input_messages: + if message.type == Type.STATE: + # Flush the buffer before yielding state + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + buffer = defaultdict(lambda: defaultdict(list)) + yield message + + elif message.type == Type.RECORD: + record = message.record + if record is None: + continue + stream_name = record.stream + if stream_name not in streams: + logger.debug(f"Stream {stream_name} not in configured streams, skipping") + continue + + # Add to buffer + buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) + buffer[stream_name]["_airbyte_emitted_at"].append( + datetime.datetime.now(datetime.timezone.utc).isoformat() + ) + buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) + + else: + logger.debug(f"Message type {message.type} not handled, skipping") + + # Flush any remaining records + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + + # Close the cache + cache.close() + + def _flush_buffer( + self, + *, + engine: Engine, + buffer: dict[str, dict[str, list[Any]]], + schema_name: str, + stream_name: str, + ) -> None: + """Flush buffered records to the database.""" + if not buffer[stream_name]["_airbyte_ab_id"]: + return + + table_name = f"_airbyte_raw_{stream_name}" + entries = buffer[stream_name] + + logger.info(f"Flushing {len(entries['_airbyte_ab_id'])} records to {table_name}") + + with engine.connect() as conn: + for i in range(len(entries["_airbyte_ab_id"])): + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + conn.execute( + insert_sql, + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + }, + ) + conn.commit() # pyrefly: ignore[missing-attribute] diff --git a/airbyte/destinations/universal/run.py b/airbyte/destinations/universal/run.py new file mode 100644 index 000000000..bd59e92d3 --- /dev/null +++ b/airbyte/destinations/universal/run.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the PyAirbyte Universal destination.""" + +import sys + +from airbyte.destinations.universal import DestinationPyAirbyteUniversal + + +def run() -> None: + """Run the destination.""" + DestinationPyAirbyteUniversal().run(sys.argv[1:]) + + +if __name__ == "__main__": + run() From dbe98840d3ddf8ac4595379dddba4d989e93e670 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 07:02:35 +0000 Subject: [PATCH 02/12] refactor: move universal destination to airbyte/cli/universal_connector/ and add CLI entrypoint Co-Authored-By: AJ Steers --- airbyte/cli/__init__.py | 2 ++ .../universal => cli/universal_connector}/Dockerfile | 4 ++-- airbyte/cli/universal_connector/__init__.py | 9 +++++++++ .../universal => cli/universal_connector}/destination.py | 0 .../universal => cli/universal_connector}/run.py | 2 +- airbyte/destinations/universal/__init__.py | 9 --------- pyproject.toml | 1 + 7 files changed, 15 insertions(+), 12 deletions(-) create mode 100644 airbyte/cli/__init__.py rename airbyte/{destinations/universal => cli/universal_connector}/Dockerfile (64%) create mode 100644 airbyte/cli/universal_connector/__init__.py rename airbyte/{destinations/universal => cli/universal_connector}/destination.py (100%) rename airbyte/{destinations/universal => cli/universal_connector}/run.py (78%) delete mode 100644 airbyte/destinations/universal/__init__.py diff --git a/airbyte/cli/__init__.py b/airbyte/cli/__init__.py new file mode 100644 index 000000000..e6423c8b2 --- /dev/null +++ b/airbyte/cli/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""CLI modules for PyAirbyte.""" diff --git a/airbyte/destinations/universal/Dockerfile b/airbyte/cli/universal_connector/Dockerfile similarity index 64% rename from airbyte/destinations/universal/Dockerfile rename to airbyte/cli/universal_connector/Dockerfile index 026a1366c..8ef6cf3cd 100644 --- a/airbyte/destinations/universal/Dockerfile +++ b/airbyte/cli/universal_connector/Dockerfile @@ -7,5 +7,5 @@ COPY airbyte ./airbyte RUN pip install --no-cache-dir . -ENV AIRBYTE_ENTRYPOINT="python -m airbyte.destinations.universal.run" -ENTRYPOINT ["python", "-m", "airbyte.destinations.universal.run"] +ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte" +ENTRYPOINT ["destination-pyairbyte"] diff --git a/airbyte/cli/universal_connector/__init__.py b/airbyte/cli/universal_connector/__init__.py new file mode 100644 index 000000000..8a4f170d7 --- /dev/null +++ b/airbyte/cli/universal_connector/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal destination connector using PyAirbyte caches as backends.""" + +from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal + + +__all__ = [ + "DestinationPyAirbyteUniversal", +] diff --git a/airbyte/destinations/universal/destination.py b/airbyte/cli/universal_connector/destination.py similarity index 100% rename from airbyte/destinations/universal/destination.py rename to airbyte/cli/universal_connector/destination.py diff --git a/airbyte/destinations/universal/run.py b/airbyte/cli/universal_connector/run.py similarity index 78% rename from airbyte/destinations/universal/run.py rename to airbyte/cli/universal_connector/run.py index bd59e92d3..6e1612a94 100644 --- a/airbyte/destinations/universal/run.py +++ b/airbyte/cli/universal_connector/run.py @@ -3,7 +3,7 @@ import sys -from airbyte.destinations.universal import DestinationPyAirbyteUniversal +from airbyte.cli.universal_connector import DestinationPyAirbyteUniversal def run() -> None: diff --git a/airbyte/destinations/universal/__init__.py b/airbyte/destinations/universal/__init__.py deleted file mode 100644 index ba3cc4024..000000000 --- a/airbyte/destinations/universal/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Universal destination using PyAirbyte caches as backends.""" - -from airbyte.destinations.universal.destination import DestinationPyAirbyteUniversal - - -__all__ = [ - "DestinationPyAirbyteUniversal", -] diff --git a/pyproject.toml b/pyproject.toml index 4b6fa4e16..8aa0d9f58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ dev = [ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" +destination-pyairbyte = "airbyte.cli.universal_connector.run:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From 561093bd462a3e048fc3d13b8a42e5a5c2e27dff Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 07:09:29 +0000 Subject: [PATCH 03/12] feat(sources): add universal source connector using PyAirbyte - Add SourcePyAirbyteUniversal class that wraps any PyAirbyte source - Add CLI entrypoint 'source-pyairbyte' for the universal source - Fix Dockerfile to use uv.lock instead of poetry.lock - Use proper CDK AirbyteEntrypoint.launch() pattern for sources Co-Authored-By: AJ Steers --- airbyte/cli/universal_connector/Dockerfile | 2 +- airbyte/cli/universal_connector/__init__.py | 4 +- airbyte/cli/universal_connector/run_source.py | 17 ++ airbyte/cli/universal_connector/source.py | 156 ++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 airbyte/cli/universal_connector/run_source.py create mode 100644 airbyte/cli/universal_connector/source.py diff --git a/airbyte/cli/universal_connector/Dockerfile b/airbyte/cli/universal_connector/Dockerfile index 8ef6cf3cd..9fb6ece10 100644 --- a/airbyte/cli/universal_connector/Dockerfile +++ b/airbyte/cli/universal_connector/Dockerfile @@ -2,7 +2,7 @@ FROM docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473 WORKDIR /airbyte/integration_code -COPY pyproject.toml poetry.lock ./ +COPY pyproject.toml uv.lock ./ COPY airbyte ./airbyte RUN pip install --no-cache-dir . diff --git a/airbyte/cli/universal_connector/__init__.py b/airbyte/cli/universal_connector/__init__.py index 8a4f170d7..6fef95681 100644 --- a/airbyte/cli/universal_connector/__init__.py +++ b/airbyte/cli/universal_connector/__init__.py @@ -1,9 +1,11 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Universal destination connector using PyAirbyte caches as backends.""" +"""Universal connectors using PyAirbyte as backends.""" from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal +from airbyte.cli.universal_connector.source import SourcePyAirbyteUniversal __all__ = [ "DestinationPyAirbyteUniversal", + "SourcePyAirbyteUniversal", ] diff --git a/airbyte/cli/universal_connector/run_source.py b/airbyte/cli/universal_connector/run_source.py new file mode 100644 index 000000000..0a518a6be --- /dev/null +++ b/airbyte/cli/universal_connector/run_source.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the PyAirbyte Universal source.""" + +import sys + +from airbyte_cdk.entrypoint import launch + +from airbyte.cli.universal_connector import SourcePyAirbyteUniversal + + +def run() -> None: + """Run the source.""" + launch(SourcePyAirbyteUniversal(), sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py new file mode 100644 index 000000000..eea0c12c9 --- /dev/null +++ b/airbyte/cli/universal_connector/source.py @@ -0,0 +1,156 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal source implementation using PyAirbyte to wrap any source connector.""" + +from __future__ import annotations + +import datetime +import logging +from typing import TYPE_CHECKING, Any + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + Status, + Type, +) +from airbyte_cdk.sources.source import Source + +import airbyte as ab + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + +logger = logging.getLogger("airbyte") + + +class SourcePyAirbyteUniversal(Source): + """Universal source that wraps any PyAirbyte-supported source connector. + + This source acts as a proxy, using PyAirbyte to instantiate and run + any registered source connector based on the configuration provided. + """ + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002 + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/sources/pyairbyte-universal", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PyAirbyte Universal Source Spec", + "type": "object", + "required": ["source_name", "source_config"], + "properties": { + "source_name": { + "type": "string", + "title": "Source Connector Name", + "description": ( + "The name of the source connector to use " + "(e.g., 'source-github', 'source-postgres')." + ), + "examples": ["source-github", "source-postgres", "source-stripe"], + }, + "source_config": { + "type": "object", + "title": "Source Configuration", + "description": "The configuration for the underlying source connector.", + "additionalProperties": True, + }, + "source_version": { + "type": "string", + "title": "Source Version", + "description": "Optional: specific version of the source connector to use.", + "default": "latest", + }, + }, + }, + ) + + def _get_pyairbyte_source( + self, + config: Mapping[str, Any], + ) -> ab.Source: + """Get a PyAirbyte source instance from the configuration.""" + source_name = config.get("source_name") + source_config = config.get("source_config", {}) + source_version = config.get("source_version", "latest") + + if not source_name: + raise ValueError("source_name is required in configuration") + + return ab.get_source( + source_name, + config=source_config, + version=source_version if source_version != "latest" else None, + install_if_missing=True, + ) + + def check( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Test the connection to the underlying source connector.""" + try: + source = self._get_pyairbyte_source(config) + source.check() + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except ValueError as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=str(e), + ) + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Connection check failed: {e!r}", + ) + + def discover( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteCatalog: + """Discover the catalog from the underlying source connector.""" + source = self._get_pyairbyte_source(config) + # The catalog types are compatible at runtime but differ in type annotations + return source.discovered_catalog # pyrefly: ignore[bad-return] + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[Any] | None = None, # noqa: ARG002 + ) -> Iterable[AirbyteMessage]: + """Read data from the underlying source connector. + + This method uses PyAirbyte's get_records functionality to stream + records from the configured source connector. + """ + source = self._get_pyairbyte_source(config) + + # Select the streams from the catalog + stream_names = [stream.stream.name for stream in catalog.streams] + source.select_streams(stream_names) + + # Use get_records to iterate through each stream and yield AirbyteMessages + for stream_name in stream_names: + logger.info(f"Reading stream: {stream_name}") + for record in source.get_records(stream_name): + # Convert the record to an AirbyteMessage + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=dict(record), + emitted_at=int( + datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000 + ), + ), + ) diff --git a/pyproject.toml b/pyproject.toml index 8aa0d9f58..99bdd3b4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" destination-pyairbyte = "airbyte.cli.universal_connector.run:run" +source-pyairbyte = "airbyte.cli.universal_connector.run_source:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From 9e195873db3fbb7b8fd081a571f23f706fefef11 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 07:28:14 +0000 Subject: [PATCH 04/12] fix: add spec() method to destination and fix source read() for CLI piping Co-Authored-By: AJ Steers --- .../cli/universal_connector/destination.py | 72 +++++++++++++++++++ airbyte/cli/universal_connector/source.py | 72 ++++++++++++++----- 2 files changed, 128 insertions(+), 16 deletions(-) diff --git a/airbyte/cli/universal_connector/destination.py b/airbyte/cli/universal_connector/destination.py index 3d0fc3a14..db82178f5 100644 --- a/airbyte/cli/universal_connector/destination.py +++ b/airbyte/cli/universal_connector/destination.py @@ -17,6 +17,7 @@ AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, + ConnectorSpecification, DestinationSyncMode, Status, Type, @@ -51,6 +52,77 @@ class DestinationPyAirbyteUniversal(Destination): """Universal destination that writes to any PyAirbyte-supported cache backend.""" + def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002 + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/destinations/pyairbyte-universal", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PyAirbyte Universal Destination Spec", + "type": "object", + "required": ["destination_type"], + "properties": { + "destination_type": { + "type": "string", + "title": "Destination Type", + "description": "The type of destination to write to.", + "enum": ["duckdb", "postgres", "snowflake", "bigquery", "motherduck"], + }, + "duckdb": { + "type": "object", + "title": "DuckDB Configuration", + "properties": { + "db_path": {"type": "string", "default": "/local/pyairbyte.duckdb"}, + "schema_name": {"type": "string", "default": "main"}, + }, + }, + "postgres": { + "type": "object", + "title": "PostgreSQL Configuration", + "properties": { + "host": {"type": "string", "default": "localhost"}, + "port": {"type": "integer", "default": 5432}, + "username": {"type": "string"}, + "password": {"type": "string", "airbyte_secret": True}, + "database": {"type": "string"}, + "schema_name": {"type": "string", "default": "public"}, + }, + }, + "snowflake": { + "type": "object", + "title": "Snowflake Configuration", + "properties": { + "account": {"type": "string"}, + "username": {"type": "string"}, + "password": {"type": "string", "airbyte_secret": True}, + "warehouse": {"type": "string"}, + "database": {"type": "string"}, + "schema_name": {"type": "string", "default": "PUBLIC"}, + "role": {"type": "string"}, + }, + }, + "bigquery": { + "type": "object", + "title": "BigQuery Configuration", + "properties": { + "project_name": {"type": "string"}, + "dataset_name": {"type": "string"}, + "credentials_path": {"type": "string"}, + }, + }, + "motherduck": { + "type": "object", + "title": "MotherDuck Configuration", + "properties": { + "database": {"type": "string", "default": "my_db"}, + "schema_name": {"type": "string", "default": "main"}, + "api_key": {"type": "string", "airbyte_secret": True}, + }, + }, + }, + }, + ) + def _get_cache(self, config: Mapping[str, Any]) -> CacheBase: """Create and return the appropriate cache based on configuration.""" destination_type = config.get("destination_type") diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py index eea0c12c9..db516d63d 100644 --- a/airbyte/cli/universal_connector/source.py +++ b/airbyte/cli/universal_connector/source.py @@ -3,7 +3,7 @@ from __future__ import annotations -import datetime +import json import logging from typing import TYPE_CHECKING, Any @@ -12,14 +12,17 @@ AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, + AirbyteStream, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, + SyncMode, Type, ) from airbyte_cdk.sources.source import Source import airbyte as ab +from airbyte.progress import ProgressStyle, ProgressTracker if TYPE_CHECKING: @@ -118,20 +121,39 @@ def discover( ) -> AirbyteCatalog: """Discover the catalog from the underlying source connector.""" source = self._get_pyairbyte_source(config) - # The catalog types are compatible at runtime but differ in type annotations - return source.discovered_catalog # pyrefly: ignore[bad-return] + # Convert PyAirbyte catalog to CDK catalog format + # Serialize to dict with enum values as strings, then construct CDK objects + catalog_json = source.discovered_catalog.model_dump_json() + catalog_dict = json.loads(catalog_json) + + streams = [] + for stream_dict in catalog_dict.get("streams", []): + # Convert sync mode strings to SyncMode enums + sync_modes = [SyncMode(mode) for mode in stream_dict.get("supported_sync_modes", [])] + streams.append( + AirbyteStream( + name=stream_dict["name"], + json_schema=stream_dict.get("json_schema", {}), + supported_sync_modes=sync_modes, + source_defined_cursor=stream_dict.get("source_defined_cursor"), + default_cursor_field=stream_dict.get("default_cursor_field"), + source_defined_primary_key=stream_dict.get("source_defined_primary_key"), + namespace=stream_dict.get("namespace"), + ) + ) + return AirbyteCatalog(streams=streams) def read( self, - logger: logging.Logger, + logger: logging.Logger, # noqa: ARG002 config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: list[Any] | None = None, # noqa: ARG002 ) -> Iterable[AirbyteMessage]: """Read data from the underlying source connector. - This method uses PyAirbyte's get_records functionality to stream - records from the configured source connector. + This method uses PyAirbyte's _read_with_catalog to stream + AirbyteMessages from the configured source connector. """ source = self._get_pyairbyte_source(config) @@ -139,18 +161,36 @@ def read( stream_names = [stream.stream.name for stream in catalog.streams] source.select_streams(stream_names) - # Use get_records to iterate through each stream and yield AirbyteMessages - for stream_name in stream_names: - logger.info(f"Reading stream: {stream_name}") - for record in source.get_records(stream_name): - # Convert the record to an AirbyteMessage + # Get the configured catalog from PyAirbyte + configured_catalog = source.get_configured_catalog(streams=stream_names) + + # Use _read_with_catalog to get raw AirbyteMessages + progress_tracker = ProgressTracker( + ProgressStyle.PLAIN, + source=source, + cache=None, + destination=None, + expected_streams=stream_names, + ) + + # Convert PyAirbyte messages to CDK messages + # The types differ (airbyte_protocol.models vs airbyte_cdk.models) and enum values + # need to be serialized to strings for the CDK serializer + for message in source._read_with_catalog( # noqa: SLF001 + catalog=configured_catalog, + progress_tracker=progress_tracker, + ): + # Only yield RECORD messages - filter out LOG, TRACE, etc. + # The CDK will handle its own logging + if message.type.name == "RECORD" and message.record: yield AirbyteMessage( type=Type.RECORD, record=AirbyteRecordMessage( - stream=stream_name, - data=dict(record), - emitted_at=int( - datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000 - ), + stream=message.record.stream, + data=message.record.data, + emitted_at=message.record.emitted_at, + namespace=message.record.namespace, ), ) + # Skip STATE messages for now - they require per-stream format conversion + # which is complex and not needed for basic full-refresh syncs From 4e326cdbe65535283d0f7b4b558267dc8dd6e560 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 29 Jan 2026 07:45:48 +0000 Subject: [PATCH 05/12] ci: add TK-TODO comment for STATE handling and import TK-TODO CI check Co-Authored-By: AJ Steers --- .github/workflows/tk-todo-check.yml | 45 +++++++++++++++++++++++ airbyte/cli/universal_connector/source.py | 5 ++- 2 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/tk-todo-check.yml diff --git a/.github/workflows/tk-todo-check.yml b/.github/workflows/tk-todo-check.yml new file mode 100644 index 000000000..8c60a0b8c --- /dev/null +++ b/.github/workflows/tk-todo-check.yml @@ -0,0 +1,45 @@ +name: TK-TODO Check # IGNORE:TK - this workflow checks for TK-TODO markers + +on: + push: + branches: + - main + pull_request: {} + +permissions: + contents: read + +jobs: + tk-todo-check: # IGNORE:TK + name: Check for TK-TODO markers # IGNORE:TK + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Check for TK-TODO markers # IGNORE:TK + run: | + # Find all TK-TODO markers (case-insensitive) that don't have IGNORE:TK on the same line # IGNORE:TK + # Exit with error if any are found + + echo "Checking for TK-TODO markers..." # IGNORE:TK + + # Use git ls-files to only check tracked files (excludes gitignored files like sdks/) + # grep -i for case-insensitive matching + # grep -v for excluding lines with IGNORE:TK + # grep -n for line numbers + + FOUND_TODOS=$(git ls-files | xargs grep -i -n "TK-TODO" 2>/dev/null | grep -i -v "IGNORE:TK" || true) # IGNORE:TK + + if [ -n "$FOUND_TODOS" ]; then + echo "" + echo "ERROR: Found TK-TODO markers that must be resolved before merge:" # IGNORE:TK + echo "" + echo "$FOUND_TODOS" + echo "" + echo "To suppress a TK-TODO, add 'IGNORE:TK' (case-insensitive) on the same line." # IGNORE:TK + echo "" + exit 1 + else + echo "No unresolved TK-TODO markers found." # IGNORE:TK + fi diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py index db516d63d..a419fb815 100644 --- a/airbyte/cli/universal_connector/source.py +++ b/airbyte/cli/universal_connector/source.py @@ -192,5 +192,6 @@ def read( namespace=message.record.namespace, ), ) - # Skip STATE messages for now - they require per-stream format conversion - # which is complex and not needed for basic full-refresh syncs + # TK-TODO: Add STATE message handling for incremental sync support. + # STATE messages require per-stream format conversion which is complex. + # For now, only full-refresh syncs are supported. From ab6ed5272ac6f66da9026f22ab444fdd9dd08617 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 06:39:30 +0000 Subject: [PATCH 06/12] fix: add resource cleanup (try/finally) to check() and write() methods Co-Authored-By: AJ Steers --- .../cli/universal_connector/destination.py | 148 +++++++++--------- 1 file changed, 76 insertions(+), 72 deletions(-) diff --git a/airbyte/cli/universal_connector/destination.py b/airbyte/cli/universal_connector/destination.py index db82178f5..a3aee1057 100644 --- a/airbyte/cli/universal_connector/destination.py +++ b/airbyte/cli/universal_connector/destination.py @@ -182,6 +182,7 @@ def check( config: Mapping[str, Any], ) -> AirbyteConnectionStatus: """Test the connection to the destination.""" + cache = None try: cache = self._get_cache(config) engine = cache.get_sql_engine() @@ -192,6 +193,9 @@ def check( return AirbyteConnectionStatus( status=Status.FAILED, message=f"Connection failed: {e!r}" ) + finally: + if cache is not None: + cache.close() def write( self, @@ -205,85 +209,85 @@ def write( and flushing on state messages to ensure fault tolerance. """ cache = self._get_cache(config) - streams = {s.stream.name for s in configured_catalog.streams} - schema_name = cache.schema_name - - logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams") - - # Get SQL engine and ensure schema exists - engine = cache.get_sql_engine() - with engine.connect() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - conn.commit() # pyrefly: ignore[missing-attribute] + try: + streams = {s.stream.name for s in configured_catalog.streams} + schema_name = cache.schema_name - # Create tables for each stream - for configured_stream in configured_catalog.streams: - name = configured_stream.stream.name - table_name = f"_airbyte_raw_{name}" + logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams") + # Get SQL engine and ensure schema exists + engine = cache.get_sql_engine() with engine.connect() as conn: - if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: - logger.info(f"Dropping table for overwrite: {table_name}") - conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) - conn.commit() # pyrefly: ignore[missing-attribute] - - # Create the raw table if needed - create_sql = f""" - CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( - _airbyte_ab_id VARCHAR(36) PRIMARY KEY, - _airbyte_emitted_at TIMESTAMP, - _airbyte_data JSON - ) - """ - conn.execute(text(create_sql)) + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) conn.commit() # pyrefly: ignore[missing-attribute] - # Buffer for records - buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) - - for message in input_messages: - if message.type == Type.STATE: - # Flush the buffer before yielding state - for stream_name in list(buffer.keys()): - self._flush_buffer( - engine=engine, - buffer=buffer, - schema_name=schema_name, - stream_name=stream_name, + # Create tables for each stream + for configured_stream in configured_catalog.streams: + name = configured_stream.stream.name + table_name = f"_airbyte_raw_{name}" + + with engine.connect() as conn: + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + logger.info(f"Dropping table for overwrite: {table_name}") + conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Create the raw table if needed + create_sql = f""" + CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( + _airbyte_ab_id VARCHAR(36) PRIMARY KEY, + _airbyte_emitted_at TIMESTAMP, + _airbyte_data JSON ) - buffer = defaultdict(lambda: defaultdict(list)) - yield message - - elif message.type == Type.RECORD: - record = message.record - if record is None: - continue - stream_name = record.stream - if stream_name not in streams: - logger.debug(f"Stream {stream_name} not in configured streams, skipping") - continue - - # Add to buffer - buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) - buffer[stream_name]["_airbyte_emitted_at"].append( - datetime.datetime.now(datetime.timezone.utc).isoformat() - ) - buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) - - else: - logger.debug(f"Message type {message.type} not handled, skipping") - - # Flush any remaining records - for stream_name in list(buffer.keys()): - self._flush_buffer( - engine=engine, - buffer=buffer, - schema_name=schema_name, - stream_name=stream_name, - ) + """ + conn.execute(text(create_sql)) + conn.commit() # pyrefly: ignore[missing-attribute] - # Close the cache - cache.close() + # Buffer for records + buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) + + for message in input_messages: + if message.type == Type.STATE: + # Flush the buffer before yielding state + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + buffer = defaultdict(lambda: defaultdict(list)) + yield message + + elif message.type == Type.RECORD: + record = message.record + if record is None: + continue + stream_name = record.stream + if stream_name not in streams: + logger.debug(f"Stream {stream_name} not in configured streams, skipping") + continue + + # Add to buffer + buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) + buffer[stream_name]["_airbyte_emitted_at"].append( + datetime.datetime.now(datetime.timezone.utc).isoformat() + ) + buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) + + else: + logger.debug(f"Message type {message.type} not handled, skipping") + + # Flush any remaining records + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + finally: + cache.close() def _flush_buffer( self, From 74dbb59d48d88333f0af2673430867a35338c4c7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 06:45:11 +0000 Subject: [PATCH 07/12] fix: use record's original emitted_at timestamp instead of current time Co-Authored-By: AJ Steers --- airbyte/cli/universal_connector/destination.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/universal_connector/destination.py b/airbyte/cli/universal_connector/destination.py index a3aee1057..731185fdb 100644 --- a/airbyte/cli/universal_connector/destination.py +++ b/airbyte/cli/universal_connector/destination.py @@ -270,9 +270,16 @@ def write( # Add to buffer buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) - buffer[stream_name]["_airbyte_emitted_at"].append( - datetime.datetime.now(datetime.timezone.utc).isoformat() + # Use the record's original emitted_at timestamp if available, + # otherwise fall back to current time + emitted_at = ( + datetime.datetime.fromtimestamp( + record.emitted_at / 1000, tz=datetime.timezone.utc + ) + if record.emitted_at is not None + else datetime.datetime.now(datetime.timezone.utc) ) + buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat()) buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) else: From f405021ea161c46d34684efeee613046c39014fe Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 07:55:39 +0000 Subject: [PATCH 08/12] refactor: rename CLI entrypoints to destination-smoke-test and source-smoke-test Co-Authored-By: AJ Steers --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 99bdd3b4f..737987965 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,8 +80,8 @@ dev = [ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" -destination-pyairbyte = "airbyte.cli.universal_connector.run:run" -source-pyairbyte = "airbyte.cli.universal_connector.run_source:run" +destination-smoke-test = "airbyte.cli.universal_connector.run:run" +source-smoke-test = "airbyte.cli.universal_connector.run_source:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From a3ebe6e7d2f7c19a9e3c5ce0c068b9cbffa56fa3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 07:57:55 +0000 Subject: [PATCH 09/12] fix: update Dockerfile entrypoint to destination-smoke-test Co-Authored-By: AJ Steers --- airbyte/cli/universal_connector/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/cli/universal_connector/Dockerfile b/airbyte/cli/universal_connector/Dockerfile index 9fb6ece10..99ee0b24a 100644 --- a/airbyte/cli/universal_connector/Dockerfile +++ b/airbyte/cli/universal_connector/Dockerfile @@ -7,5 +7,5 @@ COPY airbyte ./airbyte RUN pip install --no-cache-dir . -ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte" -ENTRYPOINT ["destination-pyairbyte"] +ENV AIRBYTE_ENTRYPOINT="destination-smoke-test" +ENTRYPOINT ["destination-smoke-test"] From ef338c45204c0b2c31c9e558aa4253edd0fd9b0a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:04:46 +0000 Subject: [PATCH 10/12] revert: restore CLI entrypoints to destination-pyairbyte/source-pyairbyte Co-Authored-By: AJ Steers --- airbyte/cli/universal_connector/Dockerfile | 4 ++-- pyproject.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte/cli/universal_connector/Dockerfile b/airbyte/cli/universal_connector/Dockerfile index 99ee0b24a..9fb6ece10 100644 --- a/airbyte/cli/universal_connector/Dockerfile +++ b/airbyte/cli/universal_connector/Dockerfile @@ -7,5 +7,5 @@ COPY airbyte ./airbyte RUN pip install --no-cache-dir . -ENV AIRBYTE_ENTRYPOINT="destination-smoke-test" -ENTRYPOINT ["destination-smoke-test"] +ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte" +ENTRYPOINT ["destination-pyairbyte"] diff --git a/pyproject.toml b/pyproject.toml index 737987965..99bdd3b4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,8 +80,8 @@ dev = [ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" -destination-smoke-test = "airbyte.cli.universal_connector.run:run" -source-smoke-test = "airbyte.cli.universal_connector.run_source:run" +destination-pyairbyte = "airbyte.cli.universal_connector.run:run" +source-pyairbyte = "airbyte.cli.universal_connector.run_source:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From 60c502b86589c4e6f2cae62a210c4911afab5706 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 26 Feb 2026 09:04:34 -0800 Subject: [PATCH 11/12] feat(connectors): add smoke test source for destination regression testing (#982) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- airbyte/cli/universal_connector/__init__.py | 2 + .../_source_smoke_test_scenarios.py | 490 ++++++++++++++++++ .../cli/universal_connector/run_smoke_test.py | 17 + .../universal_connector/smoke_test_source.py | 318 ++++++++++++ airbyte/cli/universal_connector/source.py | 5 +- pyproject.toml | 1 + 6 files changed, 830 insertions(+), 3 deletions(-) create mode 100644 airbyte/cli/universal_connector/_source_smoke_test_scenarios.py create mode 100644 airbyte/cli/universal_connector/run_smoke_test.py create mode 100644 airbyte/cli/universal_connector/smoke_test_source.py diff --git a/airbyte/cli/universal_connector/__init__.py b/airbyte/cli/universal_connector/__init__.py index 6fef95681..a2c8ab01d 100644 --- a/airbyte/cli/universal_connector/__init__.py +++ b/airbyte/cli/universal_connector/__init__.py @@ -2,10 +2,12 @@ """Universal connectors using PyAirbyte as backends.""" from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal +from airbyte.cli.universal_connector.smoke_test_source import SourceSmokeTest from airbyte.cli.universal_connector.source import SourcePyAirbyteUniversal __all__ = [ "DestinationPyAirbyteUniversal", "SourcePyAirbyteUniversal", + "SourceSmokeTest", ] diff --git a/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py b/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py new file mode 100644 index 000000000..e0ec084ef --- /dev/null +++ b/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py @@ -0,0 +1,490 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Predefined smoke test scenarios for destination regression testing. + +Each scenario defines a stream name, JSON schema, optional primary key, +and either inline records or a record generator reference. +""" + +from __future__ import annotations + +import math +from typing import Any + + +_DEFAULT_LARGE_BATCH_COUNT = 1000 + +HIGH_VOLUME_SCENARIO_NAMES: set[str] = { + "large_batch_stream", +} + +PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ + { + "name": "basic_types", + "description": "Covers fundamental column types: string, integer, number, boolean.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "amount": {"type": "number"}, + "is_active": {"type": "boolean"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "name": "Alice", "amount": 100.50, "is_active": True}, + {"id": 2, "name": "Bob", "amount": 0.0, "is_active": False}, + {"id": 3, "name": "", "amount": -99.99, "is_active": True}, + ], + }, + { + "name": "timestamp_types", + "description": "Covers date and timestamp formats including ISO 8601 variations.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_date": {"type": "string", "format": "date"}, + "updated_at": {"type": "string", "format": "date-time"}, + "epoch_seconds": {"type": "integer"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "created_date": "2024-01-15", + "updated_at": "2024-01-15T10:30:00Z", + "epoch_seconds": 1705312200, + }, + { + "id": 2, + "created_date": "1970-01-01", + "updated_at": "1970-01-01T00:00:00+00:00", + "epoch_seconds": 0, + }, + { + "id": 3, + "created_date": "2099-12-31", + "updated_at": "2099-12-31T23:59:59.999999Z", + "epoch_seconds": 4102444799, + }, + ], + }, + { + "name": "large_decimals_and_numbers", + "description": ( + "Tests handling of very large numbers, " "high precision decimals, and boundary values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "big_integer": {"type": "integer"}, + "precise_decimal": {"type": "number"}, + "small_decimal": {"type": "number"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "big_integer": 9999999999999999, + "precise_decimal": math.pi, + "small_decimal": 0.000001, + }, + { + "id": 2, + "big_integer": -9999999999999999, + "precise_decimal": -0.1, + "small_decimal": 1e-10, + }, + { + "id": 3, + "big_integer": 0, + "precise_decimal": 99999999.99999999, + "small_decimal": 0.0, + }, + ], + }, + { + "name": "nested_json_objects", + "description": "Tests nested object and array handling in destination columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "metadata": { + "type": "object", + "properties": { + "source": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}}, + }, + }, + "nested_deep": { + "type": "object", + "properties": { + "level1": { + "type": "object", + "properties": { + "level2": { + "type": "object", + "properties": { + "value": {"type": "string"}, + }, + }, + }, + }, + }, + }, + "items_array": { + "type": "array", + "items": { + "type": "object", + "properties": { + "sku": {"type": "string"}, + "qty": {"type": "integer"}, + }, + }, + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "metadata": {"source": "api", "tags": ["a", "b", "c"]}, + "nested_deep": {"level1": {"level2": {"value": "deep"}}}, + "items_array": [{"sku": "ABC", "qty": 10}], + }, + { + "id": 2, + "metadata": {"source": "manual", "tags": []}, + "nested_deep": {"level1": {"level2": {"value": ""}}}, + "items_array": [], + }, + ], + }, + { + "name": "null_handling", + "description": "Tests null values across all column types and patterns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "nullable_string": {"type": ["null", "string"]}, + "nullable_integer": {"type": ["null", "integer"]}, + "nullable_number": {"type": ["null", "number"]}, + "nullable_boolean": {"type": ["null", "boolean"]}, + "nullable_object": { + "type": ["null", "object"], + "properties": {"key": {"type": "string"}}, + }, + "always_null": {"type": ["null", "string"]}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "nullable_string": "present", + "nullable_integer": 42, + "nullable_number": math.pi, + "nullable_boolean": True, + "nullable_object": {"key": "val"}, + "always_null": None, + }, + { + "id": 2, + "nullable_string": None, + "nullable_integer": None, + "nullable_number": None, + "nullable_boolean": None, + "nullable_object": None, + "always_null": None, + }, + { + "id": 3, + "nullable_string": "", + "nullable_integer": 0, + "nullable_number": 0.0, + "nullable_boolean": False, + "nullable_object": {}, + "always_null": None, + }, + ], + }, + { + "name": "column_naming_edge_cases", + "description": ("Tests special characters, casing, " "and reserved words in column names."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "CamelCaseColumn": {"type": "string"}, + "ALLCAPS": {"type": "string"}, + "snake_case_column": {"type": "string"}, + "column-with-dashes": {"type": "string"}, + "column.with.dots": {"type": "string"}, + "column with spaces": {"type": "string"}, + "select": {"type": "string"}, + "from": {"type": "string"}, + "order": {"type": "string"}, + "group": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "CamelCaseColumn": "camel", + "ALLCAPS": "caps", + "snake_case_column": "snake", + "column-with-dashes": "dashes", + "column.with.dots": "dots", + "column with spaces": "spaces", + "select": "reserved_select", + "from": "reserved_from", + "order": "reserved_order", + "group": "reserved_group", + }, + ], + }, + { + "name": "table_naming_edge_cases", + "description": ("Stream with special characters in the name " "to test table naming."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "table_name_test"}, + ], + }, + { + "name": "CamelCaseStreamName", + "description": "Stream with CamelCase name to test case handling.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "camel_case_stream_test"}, + ], + }, + { + "name": "wide_table_50_columns", + "description": "Tests a wide table with 50 columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + **{f"col_{i:03d}": {"type": ["null", "string"]} for i in range(1, 50)}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + **{f"col_{i:03d}": f"val_{i}" for i in range(1, 50)}, + }, + { + "id": 2, + **{f"col_{i:03d}": None for i in range(1, 50)}, + }, + ], + }, + { + "name": "empty_stream", + "description": ("A stream that emits zero records, " "testing empty dataset handling."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [], + }, + { + "name": "single_record_stream", + "description": "A stream with exactly one record.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "only_record"}, + ], + }, + { + "name": "large_batch_stream", + "description": ( + "A stream that generates a configurable " "number of records for batch testing." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "value": {"type": "number"}, + "category": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "record_count": _DEFAULT_LARGE_BATCH_COUNT, + "record_generator": "large_batch", + "high_volume": True, + }, + { + "name": "unicode_and_special_strings", + "description": ( + "Tests unicode characters, emoji, escape " "sequences, and special string values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "unicode_text": {"type": "string"}, + "special_chars": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "unicode_text": "Hello World", + "special_chars": "line1\nline2\ttab", + }, + { + "id": 2, + "unicode_text": "Caf\u00e9 na\u00efve r\u00e9sum\u00e9", + "special_chars": 'quote"inside', + }, + { + "id": 3, + "unicode_text": "\u4f60\u597d\u4e16\u754c", + "special_chars": "back\\slash", + }, + { + "id": 4, + "unicode_text": "\u0410\u0411\u0412\u0413", + "special_chars": "", + }, + ], + }, + { + "name": "schema_with_no_primary_key", + "description": ("A stream without a primary key, " "testing append-only behavior."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "event_id": {"type": "string"}, + "event_type": {"type": "string"}, + "payload": {"type": "string"}, + }, + }, + "primary_key": None, + "records": [ + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + { + "event_id": "evt_002", + "event_type": "view", + "payload": '{"page": "home"}', + }, + ], + }, + { + "name": "long_column_names", + "description": ( + "Tests handling of very long column names " "that may exceed database limits." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": { + "type": "string", + }, + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": { + "type": "string", + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": "long_col_1", + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": "long_col_2", + }, + ], + }, +] + + +def generate_large_batch_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Generate records for the large_batch_stream scenario.""" + count = scenario.get("record_count", _DEFAULT_LARGE_BATCH_COUNT) + categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] + return [ + { + "id": i, + "name": f"record_{i:06d}", + "value": float(i) * 1.1, + "category": categories[i % len(categories)], + } + for i in range(1, count + 1) + ] + + +def get_scenario_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Get records for a scenario, using generator if specified.""" + if scenario.get("record_generator") == "large_batch": + return generate_large_batch_records(scenario) + return scenario.get("records", []) diff --git a/airbyte/cli/universal_connector/run_smoke_test.py b/airbyte/cli/universal_connector/run_smoke_test.py new file mode 100644 index 000000000..774b4a29d --- /dev/null +++ b/airbyte/cli/universal_connector/run_smoke_test.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the Smoke Test source.""" + +import sys + +from airbyte_cdk.entrypoint import launch + +from airbyte.cli.universal_connector.smoke_test_source import SourceSmokeTest + + +def run() -> None: + """Run the smoke test source.""" + launch(SourceSmokeTest(), sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/universal_connector/smoke_test_source.py b/airbyte/cli/universal_connector/smoke_test_source.py new file mode 100644 index 000000000..abbcfac26 --- /dev/null +++ b/airbyte/cli/universal_connector/smoke_test_source.py @@ -0,0 +1,318 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Smoke test source for destination regression testing. + +This source generates synthetic test data covering common edge cases +that break destinations: type variations, null handling, naming edge cases, +schema variations, and batch size variations. + +Predefined scenarios are always available. Additional scenarios can be +injected dynamically via the ``custom_scenarios`` config field. +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING, Any + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + Status, + SyncMode, + Type, +) +from airbyte_cdk.sources.source import Source + +from airbyte.cli.universal_connector._source_smoke_test_scenarios import ( + _DEFAULT_LARGE_BATCH_COUNT, + PREDEFINED_SCENARIOS, + get_scenario_records, +) + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + +logger = logging.getLogger("airbyte") + + +def _build_streams_from_scenarios( + scenarios: list[dict[str, Any]], +) -> list[AirbyteStream]: + """Build AirbyteStream objects from scenario definitions.""" + return [ + AirbyteStream( + name=scenario["name"], + json_schema=scenario["json_schema"], + supported_sync_modes=[SyncMode.full_refresh], + source_defined_cursor=False, + source_defined_primary_key=scenario.get("primary_key"), + ) + for scenario in scenarios + ] + + +class SourceSmokeTest(Source): + """Smoke test source for destination regression testing. + + Generates synthetic data across predefined scenarios that cover + common destination failure patterns. Supports dynamic injection + of additional scenarios via the ``custom_scenarios`` config field. + """ + + def spec( + self, + logger: logging.Logger, # noqa: ARG002 + ) -> ConnectorSpecification: + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/sources/smoke-test", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Smoke Test Source Spec", + "type": "object", + "required": [], + "properties": { + "custom_scenarios": { + "type": "array", + "title": "Custom Test Scenarios", + "description": ( + "Additional test scenarios to inject " + "at runtime. Each scenario defines a " + "stream name, JSON schema, and records." + ), + "items": { + "type": "object", + "required": [ + "name", + "json_schema", + "records", + ], + "properties": { + "name": { + "type": "string", + "description": ("Stream name for " "this scenario."), + }, + "description": { + "type": "string", + "description": ( + "Human-readable description " "of this scenario." + ), + }, + "json_schema": { + "type": "object", + "description": ("JSON schema for the stream."), + }, + "records": { + "type": "array", + "description": ("Records to emit " "for this stream."), + "items": {"type": "object"}, + }, + "primary_key": { + "type": ["array", "null"], + "description": ( + "Primary key definition " "(list of key paths) " "or null." + ), + "items": { + "type": "array", + "items": {"type": "string"}, + }, + }, + }, + }, + "default": [], + }, + "large_batch_record_count": { + "type": "integer", + "title": "Large Batch Record Count", + "description": ( + "Number of records to generate for " + "the large_batch_stream scenario. " + "Set to 0 to skip this stream." + ), + "default": 1000, + }, + "all_fast_streams": { + "type": "boolean", + "title": "All Fast Streams", + "description": ( + "Include all fast (non-high-volume) " "predefined streams." + ), + "default": True, + }, + "all_slow_streams": { + "type": "boolean", + "title": "All Slow Streams", + "description": ( + "Include all slow (high-volume) streams " + "such as large_batch_stream. These are " + "excluded by default to avoid incurring " + "the cost of large record sets." + ), + "default": False, + }, + "scenario_filter": { + "type": "array", + "title": "Scenario Filter", + "description": ( + "Specific scenario names to include. " + "These are unioned with the boolean-driven " + "sets (deduped). If omitted or empty, " + "only the boolean flags control selection." + ), + "items": {"type": "string"}, + "default": [], + }, + }, + }, + ) + + def _get_all_scenarios( + self, + config: Mapping[str, Any], + ) -> list[dict[str, Any]]: + """Combine predefined and custom scenarios. + + Selection logic: + 1. Boolean flags control groups: ``all_fast_streams`` + (default true) enables non-high-volume scenarios, + ``all_slow_streams`` (default false) enables + high-volume scenarios. + 2. ``scenario_filter`` names are unioned with the boolean sets. + 3. Custom scenarios are always included. + 4. The final list is deduplicated by name. + """ + include_default = config.get("all_fast_streams", True) + include_high_volume = config.get("all_slow_streams", False) + scenario_filter: list[str] = config.get("scenario_filter", []) + explicit_names: set[str] = set(scenario_filter) + + large_batch_count = config.get( + "large_batch_record_count", + _DEFAULT_LARGE_BATCH_COUNT, + ) + + scenarios: list[dict[str, Any]] = [] + seen_names: set[str] = set() + + for scenario in PREDEFINED_SCENARIOS: + name = scenario["name"] + is_high_volume = scenario.get("high_volume", False) + + included_by_flag = (include_high_volume and is_high_volume) or ( + include_default and not is_high_volume + ) + if not included_by_flag and name not in explicit_names: + continue + + s = dict(scenario) + if name == "large_batch_stream" and large_batch_count != _DEFAULT_LARGE_BATCH_COUNT: + s["record_count"] = large_batch_count + + if name not in seen_names: + scenarios.append(s) + seen_names.add(name) + + custom = config.get("custom_scenarios", []) + if custom: + for cs in custom: + name = cs.get("name", "") + if not name or not cs.get("json_schema"): + continue + if name not in seen_names: + scenarios.append( + { + "name": name, + "description": cs.get( + "description", + "Custom injected scenario", + ), + "json_schema": cs["json_schema"], + "primary_key": cs.get("primary_key"), + "records": cs.get("records", []), + } + ) + seen_names.add(name) + + return scenarios + + def check( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Validate the configuration.""" + custom = config.get("custom_scenarios", []) + for i, scenario in enumerate(custom): + if not scenario.get("name"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=("Custom scenario at index " f"{i} is missing 'name'."), + ) + if not scenario.get("json_schema"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=( + f"Custom scenario " f"'{scenario['name']}' " f"is missing 'json_schema'." + ), + ) + + scenarios = self._get_all_scenarios(config) + if not scenarios: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=("No scenarios available. " "Check scenario_filter config."), + ) + + logger.info("Smoke test source check passed " f"with {len(scenarios)} scenarios.") + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + def discover( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteCatalog: + """Return the catalog with all test scenario streams.""" + scenarios = self._get_all_scenarios(config) + streams = _build_streams_from_scenarios(scenarios) + logger.info(f"Discovered {len(streams)} smoke test streams.") + return AirbyteCatalog(streams=streams) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[Any] | None = None, # noqa: ARG002 + ) -> Iterable[AirbyteMessage]: + """Read records from selected smoke test streams.""" + selected_streams = {stream.stream.name for stream in catalog.streams} + scenarios = self._get_all_scenarios(config) + scenario_map = {s["name"]: s for s in scenarios} + now_ms = int(time.time() * 1000) + + for stream_name in selected_streams: + scenario = scenario_map.get(stream_name) + if not scenario: + logger.warning(f"Stream '{stream_name}' not found " f"in scenarios, skipping.") + continue + + records = get_scenario_records(scenario) + logger.info(f"Emitting {len(records)} records " f"for stream '{stream_name}'.") + + for record in records: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=record, + emitted_at=now_ms, + ), + ) diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py index a419fb815..8eb7a0cd5 100644 --- a/airbyte/cli/universal_connector/source.py +++ b/airbyte/cli/universal_connector/source.py @@ -192,6 +192,5 @@ def read( namespace=message.record.namespace, ), ) - # TK-TODO: Add STATE message handling for incremental sync support. - # STATE messages require per-stream format conversion which is complex. - # For now, only full-refresh syncs are supported. + # TODO: Add STATE message handling for incremental sync support. + # https://github.com/airbytehq/PyAirbyte/issues/987 diff --git a/pyproject.toml b/pyproject.toml index 99bdd3b4f..be1c4de4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,7 @@ pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" destination-pyairbyte = "airbyte.cli.universal_connector.run:run" source-pyairbyte = "airbyte.cli.universal_connector.run_source:run" +source-smoke-test = "airbyte.cli.universal_connector.run_smoke_test:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"] From 271673c478f6e150f2cfca5f1d02888273d0d2a4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:12:22 +0000 Subject: [PATCH 12/12] docs: add experimental warning to universal connector modules Co-Authored-By: AJ Steers --- airbyte/cli/universal_connector/__init__.py | 7 ++++++- airbyte/cli/universal_connector/destination.py | 7 ++++++- airbyte/cli/universal_connector/smoke_test_source.py | 4 ++++ airbyte/cli/universal_connector/source.py | 7 ++++++- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/airbyte/cli/universal_connector/__init__.py b/airbyte/cli/universal_connector/__init__.py index a2c8ab01d..80ca11427 100644 --- a/airbyte/cli/universal_connector/__init__.py +++ b/airbyte/cli/universal_connector/__init__.py @@ -1,5 +1,10 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Universal connectors using PyAirbyte as backends.""" +"""Universal connectors using PyAirbyte as backends. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal from airbyte.cli.universal_connector.smoke_test_source import SourceSmokeTest diff --git a/airbyte/cli/universal_connector/destination.py b/airbyte/cli/universal_connector/destination.py index 731185fdb..c8fe929a7 100644 --- a/airbyte/cli/universal_connector/destination.py +++ b/airbyte/cli/universal_connector/destination.py @@ -1,5 +1,10 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Universal destination implementation using PyAirbyte caches.""" +"""Universal destination implementation using PyAirbyte caches. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" from __future__ import annotations diff --git a/airbyte/cli/universal_connector/smoke_test_source.py b/airbyte/cli/universal_connector/smoke_test_source.py index abbcfac26..0c19f744f 100644 --- a/airbyte/cli/universal_connector/smoke_test_source.py +++ b/airbyte/cli/universal_connector/smoke_test_source.py @@ -7,6 +7,10 @@ Predefined scenarios are always available. Additional scenarios can be injected dynamically via the ``custom_scenarios`` config field. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. """ from __future__ import annotations diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py index 8eb7a0cd5..afd5baeb2 100644 --- a/airbyte/cli/universal_connector/source.py +++ b/airbyte/cli/universal_connector/source.py @@ -1,5 +1,10 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -"""Universal source implementation using PyAirbyte to wrap any source connector.""" +"""Universal source implementation using PyAirbyte to wrap any source connector. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" from __future__ import annotations