diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8b659fdf0..15ae64644 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,6 +2,14 @@ Thank you for your interest in contributing to PyAirbyte! +## Docstring Style + +Use **Markdown** formatting in all docstrings — not reStructuredText (reST). + +- Use single backticks for inline code: `` `MyClass` ``, not double backticks (` ``MyClass`` `). +- Reference methods as `` `get_column_info()` ``, not `:meth:\`get_column_info\``. +- Use standard Markdown for emphasis, lists, and links. + ## 🚀 Releasing This project uses [`semantic-pr-release-drafter`](https://github.com/aaronsteers/semantic-pr-release-drafter) for automated release management. To release, simply click "`Edit`" on the latest release draft from the [releases page](https://github.com/airbytehq/PyAirbyte/releases), and then click "`Publish release`". This publish operation will trigger all necessary downstream publish operations. diff --git a/airbyte/_util/destination_smoke_tests.py b/airbyte/_util/destination_smoke_tests.py index 67c87adbb..8f6e6dffe 100644 --- a/airbyte/_util/destination_smoke_tests.py +++ b/airbyte/_util/destination_smoke_tests.py @@ -7,11 +7,17 @@ Smoke tests send synthetic data from the built-in smoke test source to a destination connector and report whether the destination accepted the data -without errors. No readback or comparison is performed. +without errors. + +When the destination has a compatible cache implementation, readback +introspection is automatically performed to produce stats on the written +data: table row counts, column names/types, and per-column null/non-null +counts. """ from __future__ import annotations +import logging import time from datetime import datetime, timezone from pathlib import Path @@ -22,12 +28,16 @@ from airbyte import get_source from airbyte.exceptions import PyAirbyteInputError +from airbyte.shared.sql_processor import TableStatistics # noqa: TC001 # Pydantic needs at runtime + + +logger = logging.getLogger(__name__) NAMESPACE_PREFIX = "zz_deleteme" """Prefix for auto-generated smoke test namespaces. -The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the +The `zz_` prefix sorts last alphabetically; `deleteme` signals the namespace is safe for automated cleanup. """ @@ -46,11 +56,11 @@ def generate_namespace( ) -> str: """Generate a smoke-test namespace. - Format: ``zz_deleteme_yyyymmdd_hhmm_``. - The ``zz_`` prefix sorts last alphabetically and the ``deleteme`` + Format: `zz_deleteme_yyyymmdd_hhmm_`. + The `zz_` prefix sorts last alphabetically and the `deleteme` token acts as a guard for automated cleanup scripts. - If *namespace_suffix* is not provided, ``smoke_test`` is used as the + If `namespace_suffix` is not provided, `smoke_test` is used as the default suffix. """ suffix = namespace_suffix or DEFAULT_NAMESPACE_SUFFIX @@ -59,6 +69,11 @@ def generate_namespace( return f"{NAMESPACE_PREFIX}_{ts}_{suffix}" +# --------------------------------------------------------------------------- +# Smoke test result model +# --------------------------------------------------------------------------- + + class DestinationSmokeTestResult(BaseModel): """Result of a destination smoke test run.""" @@ -83,6 +98,22 @@ class DestinationSmokeTestResult(BaseModel): error: str | None = None """Error message if the smoke test failed.""" + table_statistics: dict[str, TableStatistics] | None = None + """Map of stream name to table statistics (row counts, columns, stats). + + Populated when the destination has a compatible cache, regardless of + write success (to support partial-success inspection). `None` when + the destination does not have a compatible cache. + """ + + tables_not_found: dict[str, str] | None = None + """Stream names whose expected tables were not found in the destination. + + Maps stream name to the expected (normalized) table name that was + looked up but not found. Populated alongside `table_statistics`. + `None` when readback was not performed. + """ + def get_smoke_test_source( *, @@ -183,6 +214,41 @@ def get_smoke_test_source( ) +def _prepare_destination_config( + destination: Destination, +) -> None: + """Prepare the destination config for smoke testing. + + The catalog namespace (set on each stream by the source) is the primary + mechanism that directs destinations to write into the test schema. + Modern destinations respect the catalog namespace without needing a + config-level schema override. + + This function applies config-level tweaks that are *not* handled by + the catalog namespace: + + - **Typing/deduplication enabled** — forces `disable_type_dedupe` + to `False` so the destination creates final typed tables (not just + raw staging). Readback introspection requires final tables. + + Note: This mutates the destination's config in place. + """ + config = dict(destination.get_config()) + changed = False + + # Ensure typing and deduplication are enabled so that final tables + # (not just raw staging) are created for readback inspection. + if config.get("disable_type_dedupe"): + logger.info( + "Forcing 'disable_type_dedupe' to False so final tables are created.", + ) + config["disable_type_dedupe"] = False + changed = True + + if changed: + destination.set_config(config, validate=False) + + def _sanitize_error(ex: Exception) -> str: """Extract an error message from an exception without leaking secrets. @@ -208,9 +274,12 @@ def run_destination_smoke_test( Sends synthetic test data from the smoke test source to the specified destination and returns a structured result. - This function does NOT read back data from the destination or compare - results. It only verifies that the destination accepts the data without - errors. + When the destination has a compatible cache implementation, readback + introspection is automatically performed (even on write failure, to + support partial-success inspection). + The readback produces stats on the written data (table row counts, + column names/types, and per-column null/non-null counts) and is + included in the result as `table_statistics` and `tables_not_found`. `destination` is a resolved `Destination` object ready for writing. @@ -221,8 +290,8 @@ def run_destination_smoke_test( - A comma-separated string or list of specific scenario names. `namespace_suffix` is an optional suffix appended to the auto-generated - namespace. Defaults to ``smoke_test`` when not provided - (e.g. ``zz_deleteme_20260318_2256_smoke_test``). + namespace. Defaults to `smoke_test` when not provided + (e.g. `zz_deleteme_20260318_2256_smoke_test`). `reuse_namespace` is an exact namespace string to reuse from a previous run. When set, no new namespace is generated. @@ -244,12 +313,21 @@ def run_destination_smoke_test( custom_scenarios_file=custom_scenarios_file, ) + # Capture stream names for readback before the write consumes the source + stream_names = source_obj.get_selected_streams() + # Normalize scenarios to a display string if isinstance(scenarios, list): scenarios_str = ",".join(scenarios) if scenarios else "fast" else: scenarios_str = scenarios + # Prepare the destination config for smoke testing (e.g. ensure + # disable_type_dedupe is off so final tables are created for readback). + # The catalog namespace on each stream is the primary mechanism that + # directs the destination to write into the test schema. + _prepare_destination_config(destination) + start_time = time.monotonic() success = False error_message: str | None = None @@ -267,6 +345,32 @@ def run_destination_smoke_test( elapsed = time.monotonic() - start_time + # Perform readback introspection (runs even on write failure for partial-success support) + table_statistics: dict[str, TableStatistics] | None = None + tables_not_found: dict[str, str] | None = None + if destination.is_cache_supported: + try: + cache = destination.get_sql_cache(schema_name=namespace) + table_statistics = cache.fetch_table_statistics(stream_names) + tables_not_found = { + name: cache.processor.get_sql_table_name(name) + for name in stream_names + if name not in table_statistics + } + except Exception: + logger.warning( + "Readback failed for destination '%s'.", + destination.name, + exc_info=True, + ) + else: + logger.info( + "Skipping table and column statistics retrieval for " + "destination '%s' because no SQL interface mapping has been " + "defined.", + destination.name, + ) + return DestinationSmokeTestResult( success=success, destination=destination.name, @@ -275,4 +379,6 @@ def run_destination_smoke_test( scenarios_requested=scenarios_str, elapsed_seconds=round(elapsed, 2), error=error_message, + table_statistics=table_statistics, + tables_not_found=tables_not_found, ) diff --git a/airbyte/caches/_utils/__init__.py b/airbyte/caches/_utils/__init__.py new file mode 100644 index 000000000..1022a6f22 --- /dev/null +++ b/airbyte/caches/_utils/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Cache utility modules for translating between cache and destination configurations.""" diff --git a/airbyte/destinations/_translate_cache_to_dest.py b/airbyte/caches/_utils/_cache_to_dest.py similarity index 100% rename from airbyte/destinations/_translate_cache_to_dest.py rename to airbyte/caches/_utils/_cache_to_dest.py diff --git a/airbyte/destinations/_translate_dest_to_cache.py b/airbyte/caches/_utils/_dest_to_cache.py similarity index 56% rename from airbyte/destinations/_translate_dest_to_cache.py rename to airbyte/caches/_utils/_dest_to_cache.py index 8f7055708..a5b4700dc 100644 --- a/airbyte/destinations/_translate_dest_to_cache.py +++ b/airbyte/caches/_utils/_dest_to_cache.py @@ -3,6 +3,9 @@ from __future__ import annotations +import os +import tempfile +from pathlib import Path from typing import TYPE_CHECKING, Any from airbyte_api.models import ( @@ -33,8 +36,24 @@ SNOWFLAKE_PASSWORD_SECRET_NAME = "SNOWFLAKE_PASSWORD" +_SUPPORTED_DESTINATION_TYPES: set[str] = { + "bigquery", + "duckdb", + "motherduck", + "postgres", + "snowflake", +} + + +def get_supported_destination_types() -> set[str]: + """Return the set of destination type identifiers that have cache support.""" + return _SUPPORTED_DESTINATION_TYPES + + def destination_to_cache( destination_configuration: api_util.DestinationConfiguration | dict[str, Any], + *, + schema_name: str | None = None, ) -> CacheBase: """Get the destination configuration from the cache.""" conversion_fn_map: dict[str, Callable[[Any], CacheBase]] = { @@ -71,7 +90,14 @@ def destination_to_cache( ) conversion_fn = conversion_fn_map[destination_type] - return conversion_fn(destination_configuration) + cache = conversion_fn(destination_configuration) + if schema_name is not None: + cache.schema_name = schema_name + # Force engine re-creation so the schema_translate_map picks up + # the overridden schema_name (the engine is lazily cached during + # __init__ with the original schema from the destination config). + cache.processor.sql_config.dispose_engine() + return cache def bigquery_destination_to_cache( @@ -81,10 +107,38 @@ def bigquery_destination_to_cache( We may have to inject credentials, because they are obfuscated when config is returned from the REST API. + + When the destination config contains a plaintext `credentials_json` field + (the local `Destination.get_sql_cache()` path), the JSON is written to a + temporary file and used directly. Otherwise we fall back to the + `BIGQUERY_CREDENTIALS_PATH` secret/env-var (the cloud API path). """ - credentials_path = get_secret("BIGQUERY_CREDENTIALS_PATH") + # Extract credentials_json before converting to the Pydantic model, + # because DestinationBigquery may strip or obfuscate the field. + raw_credentials_json: str | None = None if isinstance(destination_configuration, dict): - destination_configuration = DestinationBigquery(**destination_configuration) + raw_credentials_json = destination_configuration.get("credentials_json") + filtered = { + k: v + for k, v in destination_configuration.items() + if k not in {"destinationType", "DESTINATION_TYPE"} + } + destination_configuration = DestinationBigquery(**filtered) + elif hasattr(destination_configuration, "credentials_json"): + raw_credentials_json = destination_configuration.credentials_json + + if raw_credentials_json and "****" not in raw_credentials_json: + # Plaintext credentials from a local destination config — write to + # a temp file so BigQueryCache can use it as a credentials path. + # The file is intentionally *not* deleted on close because the + # cache needs the path to remain valid after this function returns. + tmp_fd, tmp_path = tempfile.mkstemp(suffix=".json", prefix="bq_creds_") + Path(tmp_path).write_text(raw_credentials_json, encoding="utf-8") + # Close the file descriptor opened by mkstemp (write_text uses its own). + os.close(tmp_fd) + credentials_path = tmp_path + else: + credentials_path = get_secret("BIGQUERY_CREDENTIALS_PATH") return BigQueryCache( project_name=destination_configuration.project_id, @@ -95,19 +149,47 @@ def bigquery_destination_to_cache( def duckdb_destination_to_cache( - destination_configuration: DestinationDuckdb, + destination_configuration: DestinationDuckdb | dict[str, Any], ) -> DuckDBCache: """Create a new DuckDB cache from the destination configuration.""" + if isinstance(destination_configuration, dict): + filtered = { + k: v + for k, v in destination_configuration.items() + if k not in {"destinationType", "DESTINATION_TYPE"} + } + destination_configuration = DestinationDuckdb(**filtered) + + db_path = destination_configuration.destination_path + + # The DuckDB destination Docker container mounts a host directory to + # `/local` inside the container. Paths written as `/local/foo.duckdb` + # actually live at `/destination-duckdb/foo.duckdb` on the + # host. Resolve the host-side path so the cache can open the file. + if db_path.startswith(("/local/", "/local\\")): + from airbyte.constants import DEFAULT_PROJECT_DIR # noqa: PLC0415 + + host_path = str(DEFAULT_PROJECT_DIR / "destination-duckdb" / db_path[len("/local/") :]) + db_path = host_path + return DuckDBCache( - db_path=destination_configuration.destination_path, + db_path=db_path, schema_name=destination_configuration.schema or "main", ) def motherduck_destination_to_cache( - destination_configuration: DestinationDuckdb, + destination_configuration: DestinationDuckdb | dict[str, Any], ) -> MotherDuckCache: - """Create a new DuckDB cache from the destination configuration.""" + """Create a new MotherDuck cache from the destination configuration.""" + if isinstance(destination_configuration, dict): + filtered = { + k: v + for k, v in destination_configuration.items() + if k not in {"destinationType", "DESTINATION_TYPE"} + } + destination_configuration = DestinationDuckdb(**filtered) + if not destination_configuration.motherduck_api_key: raise ValueError("MotherDuck API key is required for MotherDuck cache.") @@ -119,9 +201,18 @@ def motherduck_destination_to_cache( def postgres_destination_to_cache( - destination_configuration: DestinationPostgres, + destination_configuration: DestinationPostgres | dict[str, Any], ) -> PostgresCache: """Create a new Postgres cache from the destination configuration.""" + if isinstance(destination_configuration, dict): + # Strip dispatch keys before constructing the model object. + filtered = { + k: v + for k, v in destination_configuration.items() + if k not in {"destinationType", "DESTINATION_TYPE"} + } + destination_configuration = DestinationPostgres(**filtered) + port: int = int(destination_configuration.port) if destination_configuration.port else 5432 if not destination_configuration.password: raise ValueError("Password is required for Postgres cache.") @@ -146,7 +237,12 @@ def snowflake_destination_to_cache( is returned from the REST API. """ if isinstance(destination_configuration, dict): - destination_configuration = DestinationSnowflake(**destination_configuration) + filtered = { + k: v + for k, v in destination_configuration.items() + if k not in {"destinationType", "DESTINATION_TYPE"} + } + destination_configuration = DestinationSnowflake(**filtered) snowflake_password: str | None = None if ( @@ -163,7 +259,10 @@ def snowflake_destination_to_cache( "Password is required for Snowflake cache, but it was not available." ) from ex else: - snowflake_password = get_secret(destination_password) + # The password is a plaintext value (e.g. from a local + # Destination's hydrated config). Use it directly instead + # of treating it as a secret name to look up. + snowflake_password = destination_password else: snowflake_password = get_secret(password_secret_name) diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index bb585184a..2683577d4 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -24,7 +24,7 @@ from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP from airbyte.datasets._sql import CachedDataset from airbyte.shared.catalog_providers import CatalogProvider -from airbyte.shared.sql_processor import SqlConfig +from airbyte.shared.sql_processor import SqlConfig, TableStatistics from airbyte.shared.state_writers import StdOutStateWriter @@ -439,6 +439,21 @@ def __iter__( # type: ignore [override] # Overriding Pydantic model method """Iterate over the streams in the cache.""" return ((name, dataset) for name, dataset in self.streams.items()) + def fetch_table_statistics( + self, + stream_names: list[str], + ) -> dict[str, TableStatistics]: + """Return table statistics for the given stream names. + + Delegates to `self.processor.fetch_table_statistics()` which queries + row counts, column info, and per-column null/non-null stats for each + stream. + + Returns a dict mapping stream name to a `TableStatistics` instance. + Streams whose tables are not found are omitted from the result. + """ + return self.processor.fetch_table_statistics(stream_names) + def _write_airbyte_message_stream( self, stdin: IO[str] | AirbyteMessageIterator, diff --git a/airbyte/caches/bigquery.py b/airbyte/caches/bigquery.py index a6aaf71e1..fe5596f9f 100644 --- a/airbyte/caches/bigquery.py +++ b/airbyte/caches/bigquery.py @@ -22,13 +22,13 @@ from airbyte_api.models import DestinationBigquery from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor +from airbyte.caches._utils._cache_to_dest import ( + bigquery_cache_to_destination_configuration, +) from airbyte.caches.base import ( CacheBase, ) from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE -from airbyte.destinations._translate_cache_to_dest import ( - bigquery_cache_to_destination_configuration, -) if TYPE_CHECKING: diff --git a/airbyte/caches/duckdb.py b/airbyte/caches/duckdb.py index 3f720e309..c27a661fa 100644 --- a/airbyte/caches/duckdb.py +++ b/airbyte/caches/duckdb.py @@ -23,8 +23,8 @@ from duckdb_engine import DuckDBEngineWarning from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor +from airbyte.caches._utils._cache_to_dest import duckdb_cache_to_destination_configuration from airbyte.caches.base import CacheBase -from airbyte.destinations._translate_cache_to_dest import duckdb_cache_to_destination_configuration if TYPE_CHECKING: diff --git a/airbyte/caches/motherduck.py b/airbyte/caches/motherduck.py index 3fcff8e31..0cb6e70bb 100644 --- a/airbyte/caches/motherduck.py +++ b/airbyte/caches/motherduck.py @@ -26,10 +26,10 @@ from airbyte._processors.sql.duckdb import DuckDBConfig from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor -from airbyte.caches.duckdb import DuckDBCache -from airbyte.destinations._translate_cache_to_dest import ( +from airbyte.caches._utils._cache_to_dest import ( motherduck_cache_to_destination_configuration, ) +from airbyte.caches.duckdb import DuckDBCache from airbyte.secrets import SecretString @@ -73,7 +73,7 @@ class MotherDuckCache(MotherDuckConfig, DuckDBCache): _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor - paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-motherduck" paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb @property diff --git a/airbyte/caches/postgres.py b/airbyte/caches/postgres.py index db66bbc7d..678baad8a 100644 --- a/airbyte/caches/postgres.py +++ b/airbyte/caches/postgres.py @@ -24,10 +24,10 @@ from airbyte_api.models import DestinationPostgres from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor -from airbyte.caches.base import CacheBase -from airbyte.destinations._translate_cache_to_dest import ( +from airbyte.caches._utils._cache_to_dest import ( postgres_cache_to_destination_configuration, ) +from airbyte.caches.base import CacheBase if TYPE_CHECKING: @@ -42,7 +42,7 @@ class PostgresCache(PostgresConfig, CacheBase): _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor - paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-postgres" paired_destination_config_class: ClassVar[type | None] = DestinationPostgres @property diff --git a/airbyte/caches/snowflake.py b/airbyte/caches/snowflake.py index 2bf5485cb..d2d9e1f4a 100644 --- a/airbyte/caches/snowflake.py +++ b/airbyte/caches/snowflake.py @@ -64,10 +64,10 @@ from airbyte_api.models import DestinationSnowflake from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor -from airbyte.caches.base import CacheBase -from airbyte.destinations._translate_cache_to_dest import ( +from airbyte.caches._utils._cache_to_dest import ( snowflake_cache_to_destination_configuration, ) +from airbyte.caches.base import CacheBase from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase @@ -78,7 +78,7 @@ class SnowflakeCache(SnowflakeConfig, CacheBase): _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor - paired_destination_name: ClassVar[str | None] = "destination-bigquery" + paired_destination_name: ClassVar[str | None] = "destination-snowflake" paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake @property diff --git a/airbyte/cli/pyab.py b/airbyte/cli/pyab.py index f71ca5b5a..f179ff307 100644 --- a/airbyte/cli/pyab.py +++ b/airbyte/cli/pyab.py @@ -729,9 +729,11 @@ def destination_smoke_test( failure patterns: type variations, null handling, naming edge cases, schema variations, and batch sizes. - This command does NOT read back data from the destination or compare - results. It only verifies that the destination accepts the data without - errors. + When the destination has a compatible cache implementation (DuckDB, + Postgres, Snowflake, BigQuery, MotherDuck), readback introspection + is automatically performed after a successful write. The readback + produces stats on the written data: table row counts, column + names/types, and per-column null/non-null counts. Usage examples: diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index c59ae2012..8920b9615 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -110,9 +110,9 @@ from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse from airbyte._util import api_util +from airbyte.caches._utils._dest_to_cache import destination_to_cache from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES from airbyte.datasets import CachedDataset -from airbyte.destinations._translate_dest_to_cache import destination_to_cache from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index ce391e109..0c540ffd7 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -14,6 +14,10 @@ from airbyte._message_iterators import AirbyteMessageIterator from airbyte._util.temp_files import as_temp_files from airbyte._writers.base import AirbyteWriterInterface +from airbyte.caches._utils._dest_to_cache import ( + destination_to_cache, + get_supported_destination_types, +) from airbyte.caches.util import get_default_cache from airbyte.progress import ProgressTracker from airbyte.results import ReadResult, WriteResult @@ -35,6 +39,9 @@ from airbyte.shared.state_writers import StateWriterBase +_CANONICAL_PREFIX = "destination-" + + class Destination(ConnectorBase, AirbyteWriterInterface): """A class representing a destination that can be called.""" @@ -61,6 +68,60 @@ def __init__( validate=validate, ) + @staticmethod + def _normalize_destination_name(name: str) -> str: + """Normalize a destination name to canonical form (`destination-`). + + Accepts either the short form (e.g. `snowflake`) or the canonical + form (e.g. `destination-snowflake`). + """ + if not name.startswith(_CANONICAL_PREFIX): + return f"{_CANONICAL_PREFIX}{name}" + return name + + @property + def is_cache_supported(self) -> bool: + """Whether this destination has a compatible cache implementation. + + Returns `True` when `get_sql_cache()` is expected to succeed for + the destination's connector type. + """ + dest_type = self._normalize_destination_name( + self.name, + ).replace(_CANONICAL_PREFIX, "") + return dest_type in get_supported_destination_types() + + def get_sql_cache( + self, + *, + schema_name: str | None = None, + ) -> CacheBase: + """Return a SQL Cache for querying data written by this destination. + + This follows the same pattern as + `SyncResult.get_sql_cache()` in `airbyte.cloud.sync_results`: + it builds a cache from the destination's configuration using + `destination_to_cache()`. + + Args: + schema_name: Override the schema/namespace on the returned cache. + When `None` the cache uses the default schema from the + destination config. + + Raises: + ValueError: If the destination type is not supported. + """ + resolved_name = self._normalize_destination_name(self.name) + config = dict(self._hydrated_config) + + # Ensure the config carries a destinationType key so that + # destination_to_cache() can dispatch correctly. + if "destinationType" not in config and "DESTINATION_TYPE" not in config: + dest_type = resolved_name.replace(_CANONICAL_PREFIX, "") + config["destinationType"] = dest_type + + return destination_to_cache(config, schema_name=schema_name) + def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements self, source_data: Source | ReadResult, diff --git a/airbyte/mcp/local.py b/airbyte/mcp/local.py index 7df085365..06631f55f 100644 --- a/airbyte/mcp/local.py +++ b/airbyte/mcp/local.py @@ -910,8 +910,12 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917 type variations, null handling, naming edge cases, schema variations, and batch sizes. - This tool does NOT read back data from the destination or compare results. - It only verifies that the destination accepts the data without errors. + When the destination has a compatible cache implementation (DuckDB, + Postgres, Snowflake, BigQuery, MotherDuck), readback introspection is + automatically performed after a successful write. The readback produces + stats on the written data: table row counts, column names/types, and + per-column null/non-null counts. Results are included in the response + as `table_statistics` and `tables_not_found`. """ # Resolve destination config config_dict = resolve_connector_config( diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index 1663ad23d..add3e6592 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -84,6 +84,44 @@ class SQLRuntimeError(Exception): """Raised when an SQL operation fails.""" +class ColumnStatistics(BaseModel): + """Null/non-null statistics for a single column.""" + + column_name: str + """The column name as found in the destination.""" + + column_type: str + """The SQL data type name as reported by the database.""" + + null_count: int | None = None + """Number of NULL values in this column.""" + + non_null_count: int | None = None + """Number of non-NULL values in this column.""" + + total_count: int | None = None + """Total row count (null_count + non_null_count).""" + + +class TableStatistics(BaseModel): + """Statistics for a single table: row count, column info, and per-column stats.""" + + table_name: str + """The table name as found in the destination.""" + + database_name: str | None = None + """The database name where this table resides.""" + + schema_name: str | None = None + """The schema name where this table resides.""" + + row_count: int | None = None + """Number of rows found.""" + + column_statistics: list[ColumnStatistics] + """Per-column names, types, and null/non-null statistics.""" + + class SqlConfig(BaseModel, abc.ABC): """Common configuration for SQL connections.""" @@ -1012,10 +1050,10 @@ def _append_temp_table_to_final_table( self._execute_sql( f""" INSERT INTO {self._fully_qualified(final_table_name)} ( - {f',{nl} '.join(columns)} + {f",{nl} ".join(columns)} ) SELECT - {f',{nl} '.join(columns)} + {f",{nl} ".join(columns)} FROM {self._fully_qualified(temp_table_name)} """, ) @@ -1040,8 +1078,7 @@ def _swap_temp_table_with_final_table( deletion_name = f"{final_table_name}_deleteme" commands = "\n".join( [ - f"ALTER TABLE {self._fully_qualified(final_table_name)} RENAME " - f"TO {deletion_name};", + f"ALTER TABLE {self._fully_qualified(final_table_name)} RENAME TO {deletion_name};", f"ALTER TABLE {self._fully_qualified(temp_table_name)} RENAME " f"TO {final_table_name};", f"DROP TABLE {self._fully_qualified(deletion_name)};", @@ -1081,10 +1118,10 @@ def _merge_temp_table_to_final_table( {set_clause} WHEN NOT MATCHED THEN INSERT ( - {f',{nl} '.join(columns)} + {f",{nl} ".join(columns)} ) VALUES ( - tmp.{f',{nl} tmp.'.join(columns)} + tmp.{f",{nl} tmp.".join(columns)} ); """, ) @@ -1179,3 +1216,179 @@ def _table_exists( Subclasses may override this method to provide a more efficient implementation. """ return table_name in self._get_tables_list() + + # ---- Table introspection helpers ---- + + def fetch_row_count( + self, + table_name: str, + ) -> int: + """Return the number of rows in the given table. + + Raises `SQLRuntimeError` if the table does not exist or the query + fails for any other reason. + """ + sql = f"SELECT COUNT(*) AS row_count FROM {self._fully_qualified(table_name)}" + result = self._execute_sql(sql) + row = result.mappings().fetchone() + if row is None: + return 0 + # Case-insensitive key lookup: Snowflake uppercases unquoted aliases. + row_ci = {k.lower(): v for k, v in row.items()} + return int(row_ci.get("row_count", 0)) + + def fetch_column_info( + self, + table_name: str, + *, + inspector: Inspector | None = None, + ) -> list[dict[str, str]]: + """Return actual column names and types for the given table. + + This method differs from `_get_sql_column_definitions` in that it always + returns actual detected column types from the database. It will never + return previously-cached types or 'expected' types based on the stream + JSON schema. + + Each entry is a dict with `column_name` and `column_type` keys. + + Args: + table_name: The table to inspect. + inspector: An optional pre-created SQLAlchemy `Inspector` to reuse. + When inspecting many tables, passing a shared inspector avoids + creating a new one per call. + + Raises if the table does not exist or is not accessible. + """ + if inspector is None: + inspector = sqlalchemy.inspect(self.get_sql_engine()) + columns = inspector.get_columns(table_name, schema=self.sql_config.schema_name) + return [ + { + "column_name": col["name"], + "column_type": str(col["type"]), + } + for col in columns + ] + + def _get_column_stats( + self, + table_name: str, + columns: list[dict[str, str]], + ) -> list[dict[str, Any]]: + """Return per-column null/non-null counts for the given table. + + `columns` should be a list of dicts with at least a `column_name` + key (as returned by `fetch_column_info()`). + + Returns a list of dicts with `column_name`, `null_count`, + `non_null_count`, and `total_count` keys. + + Positional aliases (`nn_0`, `nn_1`, ...) are used instead of + column-name-derived aliases to avoid issues with databases that + truncate long identifiers (PostgreSQL: 63 chars). + """ + if not columns: + return [] + + count_exprs: list[str] = [] + for idx, col in enumerate(columns): + col_name = col["column_name"] + quoted = self._quote_identifier(col_name) + count_exprs.append(f"COUNT({quoted}) AS nn_{idx}") + + count_exprs_str = ", ".join(count_exprs) + sql = ( + f"SELECT COUNT(*) AS total_rows, {count_exprs_str} " + f"FROM {self._fully_qualified(table_name)}" + ) + + result = self._execute_sql(sql) + row = result.mappings().fetchone() + if row is None: + return [] + + # Case-insensitive key lookup for cross-DB compatibility. + row_ci = {k.lower(): v for k, v in row.items()} + total_rows = int(row_ci.get("total_rows", 0)) + + stats = [] + for idx, col in enumerate(columns): + col_name = col["column_name"] + non_null_key = f"nn_{idx}" + val = row_ci.get(non_null_key) + non_null_count = int(val) if val is not None else 0 + stats.append( + { + "column_name": col_name, + "null_count": total_rows - non_null_count, + "non_null_count": non_null_count, + "total_count": total_rows, + } + ) + + return stats + + def fetch_table_statistics( + self, + stream_names: list[str], + ) -> dict[str, TableStatistics]: + """Return table statistics for the given stream names. + + For each stream, resolves the expected table name via the processor's + normalizer, queries row counts, column info, and per-column null/non-null + stats. + + If the normalized table name is not found, falls back to the original + stream name (some destinations preserve original casing). + + Returns a dict mapping stream name to a `TableStatistics` instance. + Streams whose tables are not found are omitted from the result. + """ + result: dict[str, TableStatistics] = {} + + # Share a single inspector across all tables to avoid repeated creation. + shared_inspector: Inspector = sqlalchemy.inspect(self.get_sql_engine()) + + for stream_name in stream_names: + expected_table = self.get_sql_table_name(stream_name) + + # Try the normalized name first, then fall back to original. + table_name: str | None = None + for candidate in (expected_table, stream_name): + if self._table_exists(candidate): + table_name = candidate + break + + if table_name is None: + continue + + row_count = self.fetch_row_count(table_name) + columns = self.fetch_column_info(table_name, inspector=shared_inspector) + stats = self._get_column_stats(table_name, columns) + + # Merge column info and stats into ColumnStatistics objects. + stats_by_col = {s["column_name"]: s for s in stats} + col_statistics: list[ColumnStatistics] = [] + for col in columns: + col_name = col["column_name"] + col_stat = stats_by_col.get(col_name, {}) + col_statistics.append( + ColumnStatistics( + column_name=col_name, + column_type=col["column_type"], + null_count=col_stat.get("null_count", 0), + non_null_count=col_stat.get("non_null_count", 0), + total_count=col_stat.get("total_count", 0), + ) + ) + + result[stream_name] = TableStatistics( + table_name=table_name, + database_name=self.database_name, + schema_name=self.sql_config.schema_name, + row_count=row_count, + column_statistics=col_statistics, + ) + + return result