Skip to content

feat: readback introspection for non-database destinations via paired sources #1004

@devin-ai-integration

Description

@devin-ai-integration

Context

PR #1000 added readback introspection for SQL-based destinations by leveraging PyAirbyte's existing cache implementations (Postgres, Snowflake, BigQuery, DuckDB, MotherDuck). This covers ~90-95% of destination usage by volume but leaves non-database destinations without any readback capability.

For destinations that don't support a SQL interface — or where building a full cache implementation isn't justified — we need an alternative approach for e2e smoke test readback.

Proposal: Paired Source Readback

Instead of mapping destination configs to cache configs, we map destination configs to source configs. After writing data to the destination, we use the paired source to read it back and compare.

Two cases to handle

Case 1: An existing Airbyte source is available

Some destinations have a natural source counterpart already published in the Airbyte connector catalog:

Destination Paired Source Notes
destination-s3 source-s3 Config mapping: bucket, region, credentials
destination-gcs source-gcs Config mapping: bucket, credentials
destination-azure-blob-storage source-azure-blob-storage Config mapping: account, container, credentials
destination-elasticsearch source-elasticsearch Config mapping: endpoint, auth
destination-mongodb source-mongodb Config mapping: connection string, database

For these, we need:

  • A config translation layer (similar to _dest_to_cache.py) that maps destination config fields to the corresponding source config fields
  • A paired_source_name registry that maps destination names to source names
  • Logic to run the paired source after the write, targeting the same namespace/prefix, and compare output

Case 2: No existing source — use a declarative YAML source definition

Some destinations (especially vector DBs) have no published source connector:

Destination Status Approach
destination-pinecone No source exists Declarative YAML source
destination-weaviate No source exists Declarative YAML source
destination-milvus No source exists Declarative YAML source
destination-qdrant No source exists Declarative YAML source
destination-chroma No source exists Declarative YAML source
destination-typesense No source exists Declarative YAML source

For these, we create minimal YAML source definitions using the Declarative CDK that live within the PyAirbyte repo (e.g. under airbyte/_sources/yaml/). These YAML files:

  • Are not published to the connector registry — they're internal to PyAirbyte
  • Define just enough to read back the data that was written (list records, basic pagination)
  • Are paired to destination names via a mapping dict

Proposed Architecture

Destination.get_readback_source(schema_name=...) -> Source
    |
    ├── Case A: SQL cache exists → use existing cache-based readback (PR #1000)
    ├── Case B: Paired published source exists → translate config, return Source
    └── Case C: Paired YAML source exists → load YAML, translate config, return Source

Config translation

Similar to the existing *_destination_to_cache() functions in caches/_utils/_dest_to_cache.py, we need *_destination_to_source() translation functions. Example for S3:

def s3_destination_to_source(
    destination_configuration: dict[str, Any],
) -> dict[str, Any]:
    """Translate destination-s3 config to source-s3 config."""
    return {
        "bucket": destination_configuration["s3_bucket_name"],
        "aws_access_key_id": destination_configuration.get("access_key_id"),
        "aws_secret_access_key": destination_configuration.get("secret_access_key"),
        "region_name": destination_configuration.get("s3_bucket_region", "us-east-1"),
        "path_prefix": destination_configuration.get("s3_bucket_path", ""),
        # Source needs to know the output format the destination used
        "format": _infer_source_format(destination_configuration),
    }

YAML source definition (example for Pinecone)

A minimal declarative YAML source that can list vectors from a Pinecone index:

# airbyte/_sources/yaml/source-pinecone-readback.yaml
version: "0.1.0"
type: DeclarativeSource
streams:
  - type: DeclarativeStream
    name: "vectors"
    retriever:
      type: SimpleRetriever
      requester:
        type: HttpRequester
        url_base: "https://{{ config.index_host }}"
        path: "/query"
        http_method: POST
        authenticator:
          type: ApiKeyAuthenticator
          api_key: "{{ config.api_key }}"
          inject_into:
            type: RequestOption
            inject_into: header
            field_name: "Api-Key"

Registry

# Mapping of destination names to paired source info
_DESTINATION_TO_SOURCE_INFO: dict[str, SourceInfo] = {
    "s3": SourceInfo(source_name="source-s3", config_translator=s3_destination_to_source),
    "gcs": SourceInfo(source_name="source-gcs", config_translator=gcs_destination_to_source),
    "pinecone": SourceInfo(
        yaml_path="airbyte/_sources/yaml/source-pinecone-readback.yaml",
        config_translator=pinecone_destination_to_source,
    ),
    # ...
}

Readback comparison

For non-SQL destinations, the readback comparison is different from the SQL case:

  • We can't query row counts or column types via SQL
  • Instead, we read all records back through the paired source and compare:
    • Record count per stream
    • Field names present in returned records
    • Null/non-null field counts (same stat as the SQL path, computed in Python from the records)
  • The result model (TableStatistics / ColumnStatistics) can be reused — the stats are the same, just computed differently

Implementation phases

  1. Phase 1: Framework — Destination.get_readback_source(), config translation interface, source info registry
  2. Phase 2: Published source pairs — S3, GCS, Azure Blob, Elasticsearch, MongoDB
  3. Phase 3: YAML source definitions — Pinecone, Weaviate, Milvus, etc.
  4. Phase 4: Integration into run_destination_smoke_test() alongside the existing cache-based path

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions