diff --git a/.github/workflows/tk-todo-check.yml b/.github/workflows/tk-todo-check.yml new file mode 100644 index 000000000..8c60a0b8c --- /dev/null +++ b/.github/workflows/tk-todo-check.yml @@ -0,0 +1,45 @@ +name: TK-TODO Check # IGNORE:TK - this workflow checks for TK-TODO markers + +on: + push: + branches: + - main + pull_request: {} + +permissions: + contents: read + +jobs: + tk-todo-check: # IGNORE:TK + name: Check for TK-TODO markers # IGNORE:TK + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Check for TK-TODO markers # IGNORE:TK + run: | + # Find all TK-TODO markers (case-insensitive) that don't have IGNORE:TK on the same line # IGNORE:TK + # Exit with error if any are found + + echo "Checking for TK-TODO markers..." # IGNORE:TK + + # Use git ls-files to only check tracked files (excludes gitignored files like sdks/) + # grep -i for case-insensitive matching + # grep -v for excluding lines with IGNORE:TK + # grep -n for line numbers + + FOUND_TODOS=$(git ls-files | xargs grep -i -n "TK-TODO" 2>/dev/null | grep -i -v "IGNORE:TK" || true) # IGNORE:TK + + if [ -n "$FOUND_TODOS" ]; then + echo "" + echo "ERROR: Found TK-TODO markers that must be resolved before merge:" # IGNORE:TK + echo "" + echo "$FOUND_TODOS" + echo "" + echo "To suppress a TK-TODO, add 'IGNORE:TK' (case-insensitive) on the same line." # IGNORE:TK + echo "" + exit 1 + else + echo "No unresolved TK-TODO markers found." # IGNORE:TK + fi diff --git a/airbyte/cli/__init__.py b/airbyte/cli/__init__.py new file mode 100644 index 000000000..e6423c8b2 --- /dev/null +++ b/airbyte/cli/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""CLI modules for PyAirbyte.""" diff --git a/airbyte/cli/universal_connector/Dockerfile b/airbyte/cli/universal_connector/Dockerfile new file mode 100644 index 000000000..9fb6ece10 --- /dev/null +++ b/airbyte/cli/universal_connector/Dockerfile @@ -0,0 +1,11 @@ +FROM docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 + +WORKDIR /airbyte/integration_code + +COPY pyproject.toml uv.lock ./ +COPY airbyte ./airbyte + +RUN pip install --no-cache-dir . + +ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte" +ENTRYPOINT ["destination-pyairbyte"] diff --git a/airbyte/cli/universal_connector/__init__.py b/airbyte/cli/universal_connector/__init__.py new file mode 100644 index 000000000..80ca11427 --- /dev/null +++ b/airbyte/cli/universal_connector/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal connectors using PyAirbyte as backends. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal +from airbyte.cli.universal_connector.smoke_test_source import SourceSmokeTest +from airbyte.cli.universal_connector.source import SourcePyAirbyteUniversal + + +__all__ = [ + "DestinationPyAirbyteUniversal", + "SourcePyAirbyteUniversal", + "SourceSmokeTest", +] diff --git a/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py b/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py new file mode 100644 index 000000000..e0ec084ef --- /dev/null +++ b/airbyte/cli/universal_connector/_source_smoke_test_scenarios.py @@ -0,0 +1,490 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Predefined smoke test scenarios for destination regression testing. + +Each scenario defines a stream name, JSON schema, optional primary key, +and either inline records or a record generator reference. +""" + +from __future__ import annotations + +import math +from typing import Any + + +_DEFAULT_LARGE_BATCH_COUNT = 1000 + +HIGH_VOLUME_SCENARIO_NAMES: set[str] = { + "large_batch_stream", +} + +PREDEFINED_SCENARIOS: list[dict[str, Any]] = [ + { + "name": "basic_types", + "description": "Covers fundamental column types: string, integer, number, boolean.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "amount": {"type": "number"}, + "is_active": {"type": "boolean"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "name": "Alice", "amount": 100.50, "is_active": True}, + {"id": 2, "name": "Bob", "amount": 0.0, "is_active": False}, + {"id": 3, "name": "", "amount": -99.99, "is_active": True}, + ], + }, + { + "name": "timestamp_types", + "description": "Covers date and timestamp formats including ISO 8601 variations.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_date": {"type": "string", "format": "date"}, + "updated_at": {"type": "string", "format": "date-time"}, + "epoch_seconds": {"type": "integer"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "created_date": "2024-01-15", + "updated_at": "2024-01-15T10:30:00Z", + "epoch_seconds": 1705312200, + }, + { + "id": 2, + "created_date": "1970-01-01", + "updated_at": "1970-01-01T00:00:00+00:00", + "epoch_seconds": 0, + }, + { + "id": 3, + "created_date": "2099-12-31", + "updated_at": "2099-12-31T23:59:59.999999Z", + "epoch_seconds": 4102444799, + }, + ], + }, + { + "name": "large_decimals_and_numbers", + "description": ( + "Tests handling of very large numbers, " "high precision decimals, and boundary values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "big_integer": {"type": "integer"}, + "precise_decimal": {"type": "number"}, + "small_decimal": {"type": "number"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "big_integer": 9999999999999999, + "precise_decimal": math.pi, + "small_decimal": 0.000001, + }, + { + "id": 2, + "big_integer": -9999999999999999, + "precise_decimal": -0.1, + "small_decimal": 1e-10, + }, + { + "id": 3, + "big_integer": 0, + "precise_decimal": 99999999.99999999, + "small_decimal": 0.0, + }, + ], + }, + { + "name": "nested_json_objects", + "description": "Tests nested object and array handling in destination columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "metadata": { + "type": "object", + "properties": { + "source": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}}, + }, + }, + "nested_deep": { + "type": "object", + "properties": { + "level1": { + "type": "object", + "properties": { + "level2": { + "type": "object", + "properties": { + "value": {"type": "string"}, + }, + }, + }, + }, + }, + }, + "items_array": { + "type": "array", + "items": { + "type": "object", + "properties": { + "sku": {"type": "string"}, + "qty": {"type": "integer"}, + }, + }, + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "metadata": {"source": "api", "tags": ["a", "b", "c"]}, + "nested_deep": {"level1": {"level2": {"value": "deep"}}}, + "items_array": [{"sku": "ABC", "qty": 10}], + }, + { + "id": 2, + "metadata": {"source": "manual", "tags": []}, + "nested_deep": {"level1": {"level2": {"value": ""}}}, + "items_array": [], + }, + ], + }, + { + "name": "null_handling", + "description": "Tests null values across all column types and patterns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "nullable_string": {"type": ["null", "string"]}, + "nullable_integer": {"type": ["null", "integer"]}, + "nullable_number": {"type": ["null", "number"]}, + "nullable_boolean": {"type": ["null", "boolean"]}, + "nullable_object": { + "type": ["null", "object"], + "properties": {"key": {"type": "string"}}, + }, + "always_null": {"type": ["null", "string"]}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "nullable_string": "present", + "nullable_integer": 42, + "nullable_number": math.pi, + "nullable_boolean": True, + "nullable_object": {"key": "val"}, + "always_null": None, + }, + { + "id": 2, + "nullable_string": None, + "nullable_integer": None, + "nullable_number": None, + "nullable_boolean": None, + "nullable_object": None, + "always_null": None, + }, + { + "id": 3, + "nullable_string": "", + "nullable_integer": 0, + "nullable_number": 0.0, + "nullable_boolean": False, + "nullable_object": {}, + "always_null": None, + }, + ], + }, + { + "name": "column_naming_edge_cases", + "description": ("Tests special characters, casing, " "and reserved words in column names."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "CamelCaseColumn": {"type": "string"}, + "ALLCAPS": {"type": "string"}, + "snake_case_column": {"type": "string"}, + "column-with-dashes": {"type": "string"}, + "column.with.dots": {"type": "string"}, + "column with spaces": {"type": "string"}, + "select": {"type": "string"}, + "from": {"type": "string"}, + "order": {"type": "string"}, + "group": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "CamelCaseColumn": "camel", + "ALLCAPS": "caps", + "snake_case_column": "snake", + "column-with-dashes": "dashes", + "column.with.dots": "dots", + "column with spaces": "spaces", + "select": "reserved_select", + "from": "reserved_from", + "order": "reserved_order", + "group": "reserved_group", + }, + ], + }, + { + "name": "table_naming_edge_cases", + "description": ("Stream with special characters in the name " "to test table naming."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "table_name_test"}, + ], + }, + { + "name": "CamelCaseStreamName", + "description": "Stream with CamelCase name to test case handling.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "camel_case_stream_test"}, + ], + }, + { + "name": "wide_table_50_columns", + "description": "Tests a wide table with 50 columns.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + **{f"col_{i:03d}": {"type": ["null", "string"]} for i in range(1, 50)}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + **{f"col_{i:03d}": f"val_{i}" for i in range(1, 50)}, + }, + { + "id": 2, + **{f"col_{i:03d}": None for i in range(1, 50)}, + }, + ], + }, + { + "name": "empty_stream", + "description": ("A stream that emits zero records, " "testing empty dataset handling."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [], + }, + { + "name": "single_record_stream", + "description": "A stream with exactly one record.", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "value": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + {"id": 1, "value": "only_record"}, + ], + }, + { + "name": "large_batch_stream", + "description": ( + "A stream that generates a configurable " "number of records for batch testing." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "value": {"type": "number"}, + "category": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "record_count": _DEFAULT_LARGE_BATCH_COUNT, + "record_generator": "large_batch", + "high_volume": True, + }, + { + "name": "unicode_and_special_strings", + "description": ( + "Tests unicode characters, emoji, escape " "sequences, and special string values." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "unicode_text": {"type": "string"}, + "special_chars": {"type": "string"}, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "unicode_text": "Hello World", + "special_chars": "line1\nline2\ttab", + }, + { + "id": 2, + "unicode_text": "Caf\u00e9 na\u00efve r\u00e9sum\u00e9", + "special_chars": 'quote"inside', + }, + { + "id": 3, + "unicode_text": "\u4f60\u597d\u4e16\u754c", + "special_chars": "back\\slash", + }, + { + "id": 4, + "unicode_text": "\u0410\u0411\u0412\u0413", + "special_chars": "", + }, + ], + }, + { + "name": "schema_with_no_primary_key", + "description": ("A stream without a primary key, " "testing append-only behavior."), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "event_id": {"type": "string"}, + "event_type": {"type": "string"}, + "payload": {"type": "string"}, + }, + }, + "primary_key": None, + "records": [ + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + {"event_id": "evt_001", "event_type": "click", "payload": "{}"}, + { + "event_id": "evt_002", + "event_type": "view", + "payload": '{"page": "home"}', + }, + ], + }, + { + "name": "long_column_names", + "description": ( + "Tests handling of very long column names " "that may exceed database limits." + ), + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": { + "type": "string", + }, + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": { + "type": "string", + }, + }, + }, + "primary_key": [["id"]], + "records": [ + { + "id": 1, + "a_very_long_column_name_that_exceeds" + "_typical_database_limits_and_should_be" + "_truncated_or_handled_gracefully_by" + "_the_destination": "long_col_1", + "another_extremely_verbose_column_name" + "_designed_to_test_the_absolute_maximum" + "_length_that_any_reasonable_database" + "_would_support": "long_col_2", + }, + ], + }, +] + + +def generate_large_batch_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Generate records for the large_batch_stream scenario.""" + count = scenario.get("record_count", _DEFAULT_LARGE_BATCH_COUNT) + categories = ["cat_a", "cat_b", "cat_c", "cat_d", "cat_e"] + return [ + { + "id": i, + "name": f"record_{i:06d}", + "value": float(i) * 1.1, + "category": categories[i % len(categories)], + } + for i in range(1, count + 1) + ] + + +def get_scenario_records( + scenario: dict[str, Any], +) -> list[dict[str, Any]]: + """Get records for a scenario, using generator if specified.""" + if scenario.get("record_generator") == "large_batch": + return generate_large_batch_records(scenario) + return scenario.get("records", []) diff --git a/airbyte/cli/universal_connector/destination.py b/airbyte/cli/universal_connector/destination.py new file mode 100644 index 000000000..c8fe929a7 --- /dev/null +++ b/airbyte/cli/universal_connector/destination.py @@ -0,0 +1,336 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal destination implementation using PyAirbyte caches. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from __future__ import annotations + +import datetime +import json +import logging +import uuid +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +from sqlalchemy import text + +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + DestinationSyncMode, + Status, + Type, +) + +from airbyte.caches.bigquery import BigQueryCache +from airbyte.caches.duckdb import DuckDBCache +from airbyte.caches.motherduck import MotherDuckCache +from airbyte.caches.postgres import PostgresCache +from airbyte.caches.snowflake import SnowflakeCache +from airbyte.secrets.base import SecretString + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + from sqlalchemy.engine import Engine + + from airbyte.caches.base import CacheBase + + +logger = logging.getLogger("airbyte") + + +DESTINATION_TYPE_DUCKDB = "duckdb" +DESTINATION_TYPE_POSTGRES = "postgres" +DESTINATION_TYPE_SNOWFLAKE = "snowflake" +DESTINATION_TYPE_BIGQUERY = "bigquery" +DESTINATION_TYPE_MOTHERDUCK = "motherduck" + + +class DestinationPyAirbyteUniversal(Destination): + """Universal destination that writes to any PyAirbyte-supported cache backend.""" + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002 + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/destinations/pyairbyte-universal", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PyAirbyte Universal Destination Spec", + "type": "object", + "required": ["destination_type"], + "properties": { + "destination_type": { + "type": "string", + "title": "Destination Type", + "description": "The type of destination to write to.", + "enum": ["duckdb", "postgres", "snowflake", "bigquery", "motherduck"], + }, + "duckdb": { + "type": "object", + "title": "DuckDB Configuration", + "properties": { + "db_path": {"type": "string", "default": "/local/pyairbyte.duckdb"}, + "schema_name": {"type": "string", "default": "main"}, + }, + }, + "postgres": { + "type": "object", + "title": "PostgreSQL Configuration", + "properties": { + "host": {"type": "string", "default": "localhost"}, + "port": {"type": "integer", "default": 5432}, + "username": {"type": "string"}, + "password": {"type": "string", "airbyte_secret": True}, + "database": {"type": "string"}, + "schema_name": {"type": "string", "default": "public"}, + }, + }, + "snowflake": { + "type": "object", + "title": "Snowflake Configuration", + "properties": { + "account": {"type": "string"}, + "username": {"type": "string"}, + "password": {"type": "string", "airbyte_secret": True}, + "warehouse": {"type": "string"}, + "database": {"type": "string"}, + "schema_name": {"type": "string", "default": "PUBLIC"}, + "role": {"type": "string"}, + }, + }, + "bigquery": { + "type": "object", + "title": "BigQuery Configuration", + "properties": { + "project_name": {"type": "string"}, + "dataset_name": {"type": "string"}, + "credentials_path": {"type": "string"}, + }, + }, + "motherduck": { + "type": "object", + "title": "MotherDuck Configuration", + "properties": { + "database": {"type": "string", "default": "my_db"}, + "schema_name": {"type": "string", "default": "main"}, + "api_key": {"type": "string", "airbyte_secret": True}, + }, + }, + }, + }, + ) + + def _get_cache(self, config: Mapping[str, Any]) -> CacheBase: + """Create and return the appropriate cache based on configuration.""" + destination_type = config.get("destination_type") + + if destination_type == DESTINATION_TYPE_DUCKDB: + duckdb_config = config.get("duckdb", {}) + return DuckDBCache( + db_path=duckdb_config.get("db_path", "/local/pyairbyte.duckdb"), + schema_name=duckdb_config.get("schema_name", "main"), + ) + + if destination_type == DESTINATION_TYPE_POSTGRES: + pg_config = config.get("postgres", {}) + return PostgresCache( + host=pg_config.get("host", "localhost"), + port=pg_config.get("port", 5432), + username=pg_config.get("username", "postgres"), + password=SecretString(pg_config.get("password", "")), + database=pg_config.get("database", "postgres"), + schema_name=pg_config.get("schema_name", "public"), + ) + + if destination_type == DESTINATION_TYPE_SNOWFLAKE: + sf_config = config.get("snowflake", {}) + password = sf_config.get("password") + return SnowflakeCache( + account=sf_config.get("account", ""), + username=sf_config.get("username", ""), + password=SecretString(password) if password else None, + warehouse=sf_config.get("warehouse", ""), + database=sf_config.get("database", ""), + schema_name=sf_config.get("schema_name", "PUBLIC"), + role=sf_config.get("role", ""), + ) + + if destination_type == DESTINATION_TYPE_BIGQUERY: + bq_config = config.get("bigquery", {}) + return BigQueryCache( + project_name=bq_config.get("project_name", ""), + dataset_name=bq_config.get("dataset_name", ""), + credentials_path=bq_config.get("credentials_path"), + ) + + if destination_type == DESTINATION_TYPE_MOTHERDUCK: + md_config = config.get("motherduck", {}) + return MotherDuckCache( + database=md_config.get("database", "my_db"), + schema_name=md_config.get("schema_name", "main"), + api_key=SecretString(md_config.get("api_key", "")), + ) + + raise ValueError(f"Unsupported destination type: {destination_type}") + + def check( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Test the connection to the destination.""" + cache = None + try: + cache = self._get_cache(config) + engine = cache.get_sql_engine() + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, message=f"Connection failed: {e!r}" + ) + finally: + if cache is not None: + cache.close() + + def write( + self, + config: Mapping[str, Any], + configured_catalog: ConfiguredAirbyteCatalog, + input_messages: Iterable[AirbyteMessage], + ) -> Iterable[AirbyteMessage]: + """Write data to the destination using PyAirbyte cache. + + This method processes messages in a streaming fashion, buffering records + and flushing on state messages to ensure fault tolerance. + """ + cache = self._get_cache(config) + try: + streams = {s.stream.name for s in configured_catalog.streams} + schema_name = cache.schema_name + + logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams") + + # Get SQL engine and ensure schema exists + engine = cache.get_sql_engine() + with engine.connect() as conn: + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Create tables for each stream + for configured_stream in configured_catalog.streams: + name = configured_stream.stream.name + table_name = f"_airbyte_raw_{name}" + + with engine.connect() as conn: + if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: + logger.info(f"Dropping table for overwrite: {table_name}") + conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Create the raw table if needed + create_sql = f""" + CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( + _airbyte_ab_id VARCHAR(36) PRIMARY KEY, + _airbyte_emitted_at TIMESTAMP, + _airbyte_data JSON + ) + """ + conn.execute(text(create_sql)) + conn.commit() # pyrefly: ignore[missing-attribute] + + # Buffer for records + buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) + + for message in input_messages: + if message.type == Type.STATE: + # Flush the buffer before yielding state + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + buffer = defaultdict(lambda: defaultdict(list)) + yield message + + elif message.type == Type.RECORD: + record = message.record + if record is None: + continue + stream_name = record.stream + if stream_name not in streams: + logger.debug(f"Stream {stream_name} not in configured streams, skipping") + continue + + # Add to buffer + buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) + # Use the record's original emitted_at timestamp if available, + # otherwise fall back to current time + emitted_at = ( + datetime.datetime.fromtimestamp( + record.emitted_at / 1000, tz=datetime.timezone.utc + ) + if record.emitted_at is not None + else datetime.datetime.now(datetime.timezone.utc) + ) + buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat()) + buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) + + else: + logger.debug(f"Message type {message.type} not handled, skipping") + + # Flush any remaining records + for stream_name in list(buffer.keys()): + self._flush_buffer( + engine=engine, + buffer=buffer, + schema_name=schema_name, + stream_name=stream_name, + ) + finally: + cache.close() + + def _flush_buffer( + self, + *, + engine: Engine, + buffer: dict[str, dict[str, list[Any]]], + schema_name: str, + stream_name: str, + ) -> None: + """Flush buffered records to the database.""" + if not buffer[stream_name]["_airbyte_ab_id"]: + return + + table_name = f"_airbyte_raw_{stream_name}" + entries = buffer[stream_name] + + logger.info(f"Flushing {len(entries['_airbyte_ab_id'])} records to {table_name}") + + with engine.connect() as conn: + for i in range(len(entries["_airbyte_ab_id"])): + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + conn.execute( + insert_sql, + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + }, + ) + conn.commit() # pyrefly: ignore[missing-attribute] diff --git a/airbyte/cli/universal_connector/run.py b/airbyte/cli/universal_connector/run.py new file mode 100644 index 000000000..6e1612a94 --- /dev/null +++ b/airbyte/cli/universal_connector/run.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the PyAirbyte Universal destination.""" + +import sys + +from airbyte.cli.universal_connector import DestinationPyAirbyteUniversal + + +def run() -> None: + """Run the destination.""" + DestinationPyAirbyteUniversal().run(sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/universal_connector/run_smoke_test.py b/airbyte/cli/universal_connector/run_smoke_test.py new file mode 100644 index 000000000..774b4a29d --- /dev/null +++ b/airbyte/cli/universal_connector/run_smoke_test.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the Smoke Test source.""" + +import sys + +from airbyte_cdk.entrypoint import launch + +from airbyte.cli.universal_connector.smoke_test_source import SourceSmokeTest + + +def run() -> None: + """Run the smoke test source.""" + launch(SourceSmokeTest(), sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/universal_connector/run_source.py b/airbyte/cli/universal_connector/run_source.py new file mode 100644 index 000000000..0a518a6be --- /dev/null +++ b/airbyte/cli/universal_connector/run_source.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Entry point for the PyAirbyte Universal source.""" + +import sys + +from airbyte_cdk.entrypoint import launch + +from airbyte.cli.universal_connector import SourcePyAirbyteUniversal + + +def run() -> None: + """Run the source.""" + launch(SourcePyAirbyteUniversal(), sys.argv[1:]) + + +if __name__ == "__main__": + run() diff --git a/airbyte/cli/universal_connector/smoke_test_source.py b/airbyte/cli/universal_connector/smoke_test_source.py new file mode 100644 index 000000000..0c19f744f --- /dev/null +++ b/airbyte/cli/universal_connector/smoke_test_source.py @@ -0,0 +1,322 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Smoke test source for destination regression testing. + +This source generates synthetic test data covering common edge cases +that break destinations: type variations, null handling, naming edge cases, +schema variations, and batch size variations. + +Predefined scenarios are always available. Additional scenarios can be +injected dynamically via the ``custom_scenarios`` config field. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING, Any + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + Status, + SyncMode, + Type, +) +from airbyte_cdk.sources.source import Source + +from airbyte.cli.universal_connector._source_smoke_test_scenarios import ( + _DEFAULT_LARGE_BATCH_COUNT, + PREDEFINED_SCENARIOS, + get_scenario_records, +) + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + +logger = logging.getLogger("airbyte") + + +def _build_streams_from_scenarios( + scenarios: list[dict[str, Any]], +) -> list[AirbyteStream]: + """Build AirbyteStream objects from scenario definitions.""" + return [ + AirbyteStream( + name=scenario["name"], + json_schema=scenario["json_schema"], + supported_sync_modes=[SyncMode.full_refresh], + source_defined_cursor=False, + source_defined_primary_key=scenario.get("primary_key"), + ) + for scenario in scenarios + ] + + +class SourceSmokeTest(Source): + """Smoke test source for destination regression testing. + + Generates synthetic data across predefined scenarios that cover + common destination failure patterns. Supports dynamic injection + of additional scenarios via the ``custom_scenarios`` config field. + """ + + def spec( + self, + logger: logging.Logger, # noqa: ARG002 + ) -> ConnectorSpecification: + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/sources/smoke-test", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Smoke Test Source Spec", + "type": "object", + "required": [], + "properties": { + "custom_scenarios": { + "type": "array", + "title": "Custom Test Scenarios", + "description": ( + "Additional test scenarios to inject " + "at runtime. Each scenario defines a " + "stream name, JSON schema, and records." + ), + "items": { + "type": "object", + "required": [ + "name", + "json_schema", + "records", + ], + "properties": { + "name": { + "type": "string", + "description": ("Stream name for " "this scenario."), + }, + "description": { + "type": "string", + "description": ( + "Human-readable description " "of this scenario." + ), + }, + "json_schema": { + "type": "object", + "description": ("JSON schema for the stream."), + }, + "records": { + "type": "array", + "description": ("Records to emit " "for this stream."), + "items": {"type": "object"}, + }, + "primary_key": { + "type": ["array", "null"], + "description": ( + "Primary key definition " "(list of key paths) " "or null." + ), + "items": { + "type": "array", + "items": {"type": "string"}, + }, + }, + }, + }, + "default": [], + }, + "large_batch_record_count": { + "type": "integer", + "title": "Large Batch Record Count", + "description": ( + "Number of records to generate for " + "the large_batch_stream scenario. " + "Set to 0 to skip this stream." + ), + "default": 1000, + }, + "all_fast_streams": { + "type": "boolean", + "title": "All Fast Streams", + "description": ( + "Include all fast (non-high-volume) " "predefined streams." + ), + "default": True, + }, + "all_slow_streams": { + "type": "boolean", + "title": "All Slow Streams", + "description": ( + "Include all slow (high-volume) streams " + "such as large_batch_stream. These are " + "excluded by default to avoid incurring " + "the cost of large record sets." + ), + "default": False, + }, + "scenario_filter": { + "type": "array", + "title": "Scenario Filter", + "description": ( + "Specific scenario names to include. " + "These are unioned with the boolean-driven " + "sets (deduped). If omitted or empty, " + "only the boolean flags control selection." + ), + "items": {"type": "string"}, + "default": [], + }, + }, + }, + ) + + def _get_all_scenarios( + self, + config: Mapping[str, Any], + ) -> list[dict[str, Any]]: + """Combine predefined and custom scenarios. + + Selection logic: + 1. Boolean flags control groups: ``all_fast_streams`` + (default true) enables non-high-volume scenarios, + ``all_slow_streams`` (default false) enables + high-volume scenarios. + 2. ``scenario_filter`` names are unioned with the boolean sets. + 3. Custom scenarios are always included. + 4. The final list is deduplicated by name. + """ + include_default = config.get("all_fast_streams", True) + include_high_volume = config.get("all_slow_streams", False) + scenario_filter: list[str] = config.get("scenario_filter", []) + explicit_names: set[str] = set(scenario_filter) + + large_batch_count = config.get( + "large_batch_record_count", + _DEFAULT_LARGE_BATCH_COUNT, + ) + + scenarios: list[dict[str, Any]] = [] + seen_names: set[str] = set() + + for scenario in PREDEFINED_SCENARIOS: + name = scenario["name"] + is_high_volume = scenario.get("high_volume", False) + + included_by_flag = (include_high_volume and is_high_volume) or ( + include_default and not is_high_volume + ) + if not included_by_flag and name not in explicit_names: + continue + + s = dict(scenario) + if name == "large_batch_stream" and large_batch_count != _DEFAULT_LARGE_BATCH_COUNT: + s["record_count"] = large_batch_count + + if name not in seen_names: + scenarios.append(s) + seen_names.add(name) + + custom = config.get("custom_scenarios", []) + if custom: + for cs in custom: + name = cs.get("name", "") + if not name or not cs.get("json_schema"): + continue + if name not in seen_names: + scenarios.append( + { + "name": name, + "description": cs.get( + "description", + "Custom injected scenario", + ), + "json_schema": cs["json_schema"], + "primary_key": cs.get("primary_key"), + "records": cs.get("records", []), + } + ) + seen_names.add(name) + + return scenarios + + def check( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Validate the configuration.""" + custom = config.get("custom_scenarios", []) + for i, scenario in enumerate(custom): + if not scenario.get("name"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=("Custom scenario at index " f"{i} is missing 'name'."), + ) + if not scenario.get("json_schema"): + return AirbyteConnectionStatus( + status=Status.FAILED, + message=( + f"Custom scenario " f"'{scenario['name']}' " f"is missing 'json_schema'." + ), + ) + + scenarios = self._get_all_scenarios(config) + if not scenarios: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=("No scenarios available. " "Check scenario_filter config."), + ) + + logger.info("Smoke test source check passed " f"with {len(scenarios)} scenarios.") + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + def discover( + self, + logger: logging.Logger, + config: Mapping[str, Any], + ) -> AirbyteCatalog: + """Return the catalog with all test scenario streams.""" + scenarios = self._get_all_scenarios(config) + streams = _build_streams_from_scenarios(scenarios) + logger.info(f"Discovered {len(streams)} smoke test streams.") + return AirbyteCatalog(streams=streams) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[Any] | None = None, # noqa: ARG002 + ) -> Iterable[AirbyteMessage]: + """Read records from selected smoke test streams.""" + selected_streams = {stream.stream.name for stream in catalog.streams} + scenarios = self._get_all_scenarios(config) + scenario_map = {s["name"]: s for s in scenarios} + now_ms = int(time.time() * 1000) + + for stream_name in selected_streams: + scenario = scenario_map.get(stream_name) + if not scenario: + logger.warning(f"Stream '{stream_name}' not found " f"in scenarios, skipping.") + continue + + records = get_scenario_records(scenario) + logger.info(f"Emitting {len(records)} records " f"for stream '{stream_name}'.") + + for record in records: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=record, + emitted_at=now_ms, + ), + ) diff --git a/airbyte/cli/universal_connector/source.py b/airbyte/cli/universal_connector/source.py new file mode 100644 index 000000000..afd5baeb2 --- /dev/null +++ b/airbyte/cli/universal_connector/source.py @@ -0,0 +1,201 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Universal source implementation using PyAirbyte to wrap any source connector. + +.. warning:: + This module is experimental and subject to change without notice. + The APIs and behavior may be modified or removed in future versions. +""" + +from __future__ import annotations + +import json +import logging +from typing import TYPE_CHECKING, Any + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + Status, + SyncMode, + Type, +) +from airbyte_cdk.sources.source import Source + +import airbyte as ab +from airbyte.progress import ProgressStyle, ProgressTracker + + +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping + + +logger = logging.getLogger("airbyte") + + +class SourcePyAirbyteUniversal(Source): + """Universal source that wraps any PyAirbyte-supported source connector. + + This source acts as a proxy, using PyAirbyte to instantiate and run + any registered source connector based on the configuration provided. + """ + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002 + """Return the connector specification.""" + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/sources/pyairbyte-universal", + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "PyAirbyte Universal Source Spec", + "type": "object", + "required": ["source_name", "source_config"], + "properties": { + "source_name": { + "type": "string", + "title": "Source Connector Name", + "description": ( + "The name of the source connector to use " + "(e.g., 'source-github', 'source-postgres')." + ), + "examples": ["source-github", "source-postgres", "source-stripe"], + }, + "source_config": { + "type": "object", + "title": "Source Configuration", + "description": "The configuration for the underlying source connector.", + "additionalProperties": True, + }, + "source_version": { + "type": "string", + "title": "Source Version", + "description": "Optional: specific version of the source connector to use.", + "default": "latest", + }, + }, + }, + ) + + def _get_pyairbyte_source( + self, + config: Mapping[str, Any], + ) -> ab.Source: + """Get a PyAirbyte source instance from the configuration.""" + source_name = config.get("source_name") + source_config = config.get("source_config", {}) + source_version = config.get("source_version", "latest") + + if not source_name: + raise ValueError("source_name is required in configuration") + + return ab.get_source( + source_name, + config=source_config, + version=source_version if source_version != "latest" else None, + install_if_missing=True, + ) + + def check( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteConnectionStatus: + """Test the connection to the underlying source connector.""" + try: + source = self._get_pyairbyte_source(config) + source.check() + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + except ValueError as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=str(e), + ) + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Connection check failed: {e!r}", + ) + + def discover( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + ) -> AirbyteCatalog: + """Discover the catalog from the underlying source connector.""" + source = self._get_pyairbyte_source(config) + # Convert PyAirbyte catalog to CDK catalog format + # Serialize to dict with enum values as strings, then construct CDK objects + catalog_json = source.discovered_catalog.model_dump_json() + catalog_dict = json.loads(catalog_json) + + streams = [] + for stream_dict in catalog_dict.get("streams", []): + # Convert sync mode strings to SyncMode enums + sync_modes = [SyncMode(mode) for mode in stream_dict.get("supported_sync_modes", [])] + streams.append( + AirbyteStream( + name=stream_dict["name"], + json_schema=stream_dict.get("json_schema", {}), + supported_sync_modes=sync_modes, + source_defined_cursor=stream_dict.get("source_defined_cursor"), + default_cursor_field=stream_dict.get("default_cursor_field"), + source_defined_primary_key=stream_dict.get("source_defined_primary_key"), + namespace=stream_dict.get("namespace"), + ) + ) + return AirbyteCatalog(streams=streams) + + def read( + self, + logger: logging.Logger, # noqa: ARG002 + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: list[Any] | None = None, # noqa: ARG002 + ) -> Iterable[AirbyteMessage]: + """Read data from the underlying source connector. + + This method uses PyAirbyte's _read_with_catalog to stream + AirbyteMessages from the configured source connector. + """ + source = self._get_pyairbyte_source(config) + + # Select the streams from the catalog + stream_names = [stream.stream.name for stream in catalog.streams] + source.select_streams(stream_names) + + # Get the configured catalog from PyAirbyte + configured_catalog = source.get_configured_catalog(streams=stream_names) + + # Use _read_with_catalog to get raw AirbyteMessages + progress_tracker = ProgressTracker( + ProgressStyle.PLAIN, + source=source, + cache=None, + destination=None, + expected_streams=stream_names, + ) + + # Convert PyAirbyte messages to CDK messages + # The types differ (airbyte_protocol.models vs airbyte_cdk.models) and enum values + # need to be serialized to strings for the CDK serializer + for message in source._read_with_catalog( # noqa: SLF001 + catalog=configured_catalog, + progress_tracker=progress_tracker, + ): + # Only yield RECORD messages - filter out LOG, TRACE, etc. + # The CDK will handle its own logging + if message.type.name == "RECORD" and message.record: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=message.record.stream, + data=message.record.data, + emitted_at=message.record.emitted_at, + namespace=message.record.namespace, + ), + ) + # TODO: Add STATE message handling for incremental sync support. + # https://github.com/airbytehq/PyAirbyte/issues/987 diff --git a/pyproject.toml b/pyproject.toml index 4b6fa4e16..be1c4de4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,9 @@ dev = [ pyairbyte = "airbyte.cli:cli" pyab = "airbyte.cli:cli" airbyte-mcp = "airbyte.mcp.server:main" +destination-pyairbyte = "airbyte.cli.universal_connector.run:run" +source-pyairbyte = "airbyte.cli.universal_connector.run_source:run" +source-smoke-test = "airbyte.cli.universal_connector.run_smoke_test:run" [build-system] requires = ["hatchling", "uv-dynamic-versioning"]