diff --git a/.claude/commands/write-integration-tests.md b/.claude/commands/write-integration-tests.md new file mode 100644 index 000000000..352c57124 --- /dev/null +++ b/.claude/commands/write-integration-tests.md @@ -0,0 +1,627 @@ +# Write Integration Tests + +You are helping a developer write integration tests for an Atlan connector using the `BaseIntegrationTest` framework from `application_sdk`. + +## Your Job + +Generate a complete `tests/integration/test_{connector}_integration.py` file, plus `tests/integration/__init__.py` if it doesn't exist. + +## Step 1: Gather Context + +Before writing any tests, read these files to understand the connector: + +1. **`.env`** or **`.env.example`** — find `ATLAN_APPLICATION_NAME` and any `E2E_*` variables already defined +2. **`main.py`** — understand the app structure +3. **`pyproject.toml`** — check the project name for context +4. **Any existing `tests/integration/` files** — avoid duplicating work + +From the `.env`, extract: +- `ATLAN_APPLICATION_NAME` → this is the `APP_NAME` (e.g. `postgres`, `mysql`, `snowflake`) +- Default port for this connector type +- Any connector-specific credential fields (e.g. `sslmode`, `warehouse`, `role`) + +## Step 2: Understand the Framework + +### File Location +``` +tests/ +└── integration/ + ├── __init__.py (empty, just marks it as a package) + └── test_{app_name}_integration.py +``` + +### Import Pattern +```python +from application_sdk.test_utils.integration import ( + BaseIntegrationTest, + Scenario, + contains, + equals, + exists, + is_dict, + is_string, + is_true, + matches, + # add others as needed +) +``` + +### Class Structure +```python +class Test{ConnectorName}Integration(BaseIntegrationTest): + """Integration tests for {ConnectorName} connector. + + Credentials are auto-loaded from E2E_{APP_NAME}_* env vars. + Server URL is auto-discovered from ATLAN_APP_HTTP_HOST/PORT. + Each scenario becomes its own pytest test. + """ + + # Fields merged with auto-discovered env creds for every scenario + default_credentials = { + "authType": "basic", + "type": "all", + # any other always-needed fields + } + + # Used for all preflight/workflow scenarios + default_metadata = { + "exclude-filter": "{}", + "include-filter": '{"^{db}$": ["^{schema}$"]}', + "temp-table-regex": "", + "extraction-method": "direct", + } + + # Used for all workflow scenarios + default_connection = { + "connection_name": "test_connection", + "connection_qualified_name": "default/{app_name}/test_integration", + } + + scenarios = [...] +``` + +### Scenario Fields + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | Yes | Snake_case unique ID. Becomes `test_{name}` in pytest. | +| `api` | Yes | `"auth"`, `"preflight"`, or `"workflow"` | +| `assert_that` | Yes | Dict mapping dot-notation paths to predicates | +| `credentials` | No | Override auto-loaded creds. Use for negative tests only. | +| `metadata` | No | Per-scenario metadata override | +| `connection` | No | Per-scenario connection override | +| `description` | No | Human-readable description shown in output | +| `skip` | No | `True` to skip | +| `skip_reason` | No | Reason string shown when skipped | + +**Key rule:** Only set `credentials` on a Scenario when you want to **override** the auto-loaded ones (e.g. negative tests). For positive tests, omit `credentials` entirely. + +### Credential Auto-Discovery + +The framework reads `ATLAN_APPLICATION_NAME`, then loads all `E2E_{APP_NAME}_*` env vars: +``` +E2E_POSTGRES_USERNAME=postgres → {"username": "postgres"} +E2E_POSTGRES_HOST=localhost → {"host": "localhost"} +E2E_POSTGRES_PORT=5432 → {"port": 5432} ← auto-converted to int +``` +These are merged with `default_credentials` (class-level). `default_credentials` wins on conflicts. + +### API Response Shapes + +**Auth** (`POST /workflows/v1/auth`): +```json +// Success +{"success": true, "message": "Authentication successful"} +// Failure +{"success": false, "error": "...", "details": "..."} +``` + +**Preflight** (`POST /workflows/v1/check`): +```json +// Success +{ + "success": true, + "data": { + "databaseSchemaCheck": {"success": true, "successMessage": "...", "failureMessage": ""}, + "tablesCheck": {"success": true, "successMessage": "Tables check successful. Table count: 42", "failureMessage": ""}, + "versionCheck": {"success": true, "successMessage": "...", "failureMessage": ""} + } +} +// Failure +{"success": false, "error": "...", "details": "..."} +``` + +**Workflow** (`POST /workflows/v1/start`): +```json +// Success +{"success": true, "message": "Workflow started successfully", "data": {"workflow_id": "...", "run_id": "..."}} +``` + +### Assertion Reference + +```python +# Basic +equals(True) # exact equality +not_equals("error") # inequality +exists() # not None +is_none() # is None +is_true() # truthy +is_false() # falsy + +# Collections +one_of(["a", "b"]) # value in list +contains("Table count:") # substring or item in collection +has_length(5) # len == 5 +is_empty() # empty +is_not_empty() # non-empty + +# Numeric +greater_than(0) +between(0, 100) + +# String +matches(r"^\d+\.\d+") # regex +starts_with("http") +ends_with(".csv") + +# Type +is_dict() +is_list() +is_string() +is_type(str) + +# Combinators +all_of(is_string(), is_not_empty()) +any_of(equals("ok"), equals("success")) +none_of(contains("error")) + +# Custom +custom(lambda x: x % 2 == 0, "is_even") +``` + +### Nested Path Access +Use dot notation to traverse response dicts: +```python +"data.databaseSchemaCheck.success" # → response["data"]["databaseSchemaCheck"]["success"] +"data.workflow_id" # → response["data"]["workflow_id"] +``` + +## Step 3: Generate Scenarios + +Write scenarios covering all three tiers. Mark clearly with comments. + +### Auth Scenarios (minimum 3, target 7+) + +**Required:** +- `auth_valid_credentials` — valid creds succeed, message matches exactly +- `auth_response_structure` — response shape is correct (types) +- `auth_invalid_credentials` — completely wrong creds fail + +**Recommended:** +- `auth_wrong_password` — correct user, wrong password only +- `auth_wrong_host` — unreachable/nonexistent host +- `auth_wrong_database` — valid server, nonexistent database +- `auth_wrong_port` — valid host, wrong port + +**Connector-Specific** (add if relevant): +- SSL/TLS modes, IAM auth, OAuth, etc. — mark with `skip=True` if env might not support + +**For negative tests**, provide a full credentials dict (all required fields) with just one field wrong: +```python +valid_creds_base = { + "username": "{default_user}", + "password": "{default_pass}", # or a sensible default + "host": "localhost", + "port": {default_port}, + "database": "{test_db}", + "authType": "basic", + "type": "all", +} + +Scenario( + name="auth_wrong_password", + api="auth", + credentials={**valid_creds_base, "password": "definitely_wrong"}, + assert_that={"success": equals(False)}, + description="Correct user but wrong password fails", +), +``` + +### Preflight Scenarios (minimum 5, target 10+) + +**Required:** +- `preflight_valid_configuration` — all three sub-checks pass, data is dict +- `preflight_database_schema_check` — databaseSchemaCheck passes +- `preflight_tables_check` — tablesCheck passes, message contains "Table count:" +- `preflight_version_check` — versionCheck passes +- `preflight_invalid_credentials` — fails with bad creds + +**Recommended:** +- `preflight_nonexistent_database_in_filter` — filter refs a DB that doesn't exist → databaseSchemaCheck fails +- `preflight_nonexistent_schema_in_filter` — filter refs a schema that doesn't exist → databaseSchemaCheck fails +- `preflight_empty_include_filter` — `{}` include-filter still works +- `preflight_wildcard_schemas` — `"*"` for schemas works +- `preflight_multiple_schemas` — multiple schema patterns work + +**Optional:** +- `preflight_exclude_filter` — exclude filter removes schemas +- `preflight_temp_table_regex` — temp table regex accepted +- `preflight_tables_check_count_nonzero` — count > 0 +- `preflight_version_message_format` — message says "meets minimum" + +### Workflow Scenarios (minimum 2, target 5+) + +**Required:** +- `workflow_start_success` — all fields present, success +- `workflow_response_contains_ids` — IDs are strings + +**Recommended:** +- `workflow_invalid_credentials` — fails with bad creds +- `workflow_custom_connection_name` — custom connection name accepted +- `workflow_narrow_filter` — narrow include-filter works + +**Optional:** +- `workflow_wide_filter` — wildcard filter works +- `workflow_multiple_databases` — multi-db filter works + +## Step 4: Write the File + +### File Header + +```python +"""Integration tests for {ConnectorName} connector. + +Prerequisites: + 1. Set env vars in .env: + ATLAN_APPLICATION_NAME={app_name} + E2E_{APP_NAME}_USERNAME=... + E2E_{APP_NAME}_PASSWORD=... + E2E_{APP_NAME}_HOST=... + E2E_{APP_NAME}_PORT=... + E2E_{APP_NAME}_DATABASE=... + + 2. Start services: + uv run poe start-deps # Dapr + Temporal + uv run python main.py # App server + + 3. Run tests: + uv run pytest tests/integration/ -v + uv run pytest tests/integration/ -v -k "auth" + uv run pytest tests/integration/ -v -k "preflight" + uv run pytest tests/integration/ -v -k "workflow" +""" +``` + +### Negative Test Helper + +Define `valid_creds_base` at module level (before the class) with sensible placeholder values. Use it for all negative tests that mutate a single field. + +### Section Comments + +Organize scenarios with comments: +```python +# ================================================================= +# Auth Tests +# ================================================================= +# ... auth scenarios ... + +# ================================================================= +# Preflight Tests +# ================================================================= +# ... preflight scenarios ... + +# ================================================================= +# Workflow Tests +# ================================================================= +# ... workflow scenarios ... +``` + +## Step 5: Create/Check `__init__.py` + +If `tests/integration/__init__.py` doesn't exist, create it as an empty file. + +## Running Tests + +After generating, remind the user: + +```bash +# Start dependencies (separate terminal) +uv run poe start-deps + +# Start app server (separate terminal) +uv run python main.py + +# Run all integration tests +uv run pytest tests/integration/ -v + +# Run by API type +uv run pytest tests/integration/ -v -k "auth" +uv run pytest tests/integration/ -v -k "preflight" +uv run pytest tests/integration/ -v -k "workflow" + +# Run a specific scenario +uv run pytest tests/integration/ -v -k "auth_valid_credentials" + +# Show full output (print statements) +uv run pytest tests/integration/ -v -s +``` + +## CI/CD Deployment — Ready-to-Use Workflow Templates + +After the tests pass locally, deploy them to CI. Below are complete, copy-paste workflow templates. + +### Template 1: Standard Connector (Public Source — Postgres, Redshift, Snowflake) + +```yaml +# .github/workflows/integration-tests.yaml +name: Integration Tests + +on: + pull_request: + types: [labeled] + workflow_dispatch: + +jobs: + integration-test: + if: >- + github.event_name == 'workflow_dispatch' || + github.event.label.name == 'int-test' + runs-on: ubuntu-latest + timeout-minutes: 20 + concurrency: + group: integration-test-${{ github.ref }} + cancel-in-progress: true + permissions: + pull-requests: write + contents: write + statuses: write + + steps: + - name: Checkout PR branch + uses: actions/checkout@v4.0.0 + + - name: Install Dapr CLI + run: | + DAPR_VERSION="1.16.2" + wget -q https://github.com/dapr/cli/releases/download/v${DAPR_VERSION}/dapr_linux_amd64.tar.gz -O /tmp/dapr.tar.gz + tar -xzf /tmp/dapr.tar.gz -C /tmp + sudo mv /tmp/dapr /usr/local/bin/ + chmod +x /usr/local/bin/dapr + dapr init --runtime-version ${DAPR_VERSION} --slim + + - name: Install Temporal CLI + run: curl -sSf https://temporal.download/cli.sh | sh + + - name: Add Dapr and Temporal to PATH + run: | + echo "$HOME/.dapr/bin" >> $GITHUB_PATH + echo "$HOME/.temporalio/bin" >> $GITHUB_PATH + + - name: Setup Python, uv, and dependencies + uses: atlanhq/application-sdk/.github/actions/setup-deps@main + + - name: Download Dapr components + run: uv run poe download-components + + - name: Start Dapr + Temporal + run: | + uv run poe start-deps + sleep 5 + + - name: Start app server + env: + ATLAN_LOCAL_DEVELOPMENT: "true" + ATLAN_APPLICATION_NAME: {APP_NAME} # <-- CHANGE THIS + run: | + uv run python main.py & + echo "Waiting for app server on :8000..." + for i in $(seq 1 60); do + if curl -sf http://localhost:8000/server/health > /dev/null 2>&1; then + echo "App server ready after ${i}s" + break + fi + if [ "$i" -eq 60 ]; then + echo "::error::App server failed to start within 60s" + exit 1 + fi + sleep 1 + done + + - name: Run integration tests + id: tests + env: + # <-- CHANGE THESE to match your connector's secrets + E2E_{APP_NAME}_HOST: ${{ secrets.{APP_NAME}_HOST }} + E2E_{APP_NAME}_PORT: "5432" + E2E_{APP_NAME}_USERNAME: ${{ secrets.{APP_NAME}_USERNAME }} + E2E_{APP_NAME}_PASSWORD: ${{ secrets.{APP_NAME}_PASSWORD }} + E2E_{APP_NAME}_DATABASE: "default" + ATLAN_LOCAL_DEVELOPMENT: "true" + ATLAN_APPLICATION_NAME: {APP_NAME} + run: | + mkdir -p results + set +e + uv run pytest tests/integration/ -v \ + --tb=short \ + --junit-xml=results/test-results.xml \ + 2>&1 | tee results/test-output.txt + TEST_EXIT_CODE=${PIPESTATUS[0]} + set -e + SUMMARY=$(grep -E "^(FAILED|ERROR|=)" results/test-output.txt | tail -1) + echo "summary=$SUMMARY" >> "$GITHUB_OUTPUT" + exit $TEST_EXIT_CODE + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: integration-test-results + path: results/ + retention-days: 14 + + - name: Post PR comment + if: always() && github.event_name == 'pull_request' + uses: mshick/add-pr-comment@b8f338c590a895d50bcbfa6c5859251edc8952fc + with: + message-id: "integration_test_results" + message: | + ## Integration Test Results + **Status:** ${{ steps.tests.outcome == 'success' && 'Passed' || 'Failed' }} + **Summary:** `${{ steps.tests.outputs.summary || 'No summary available' }}` + **Run:** [${{ github.run_id }}](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}) + continue-on-error: true + + - name: Set commit status + if: always() && github.event_name == 'pull_request' + uses: actions/github-script@v7 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const state = '${{ steps.tests.outcome }}' === 'success' ? 'success' : 'failure'; + await github.rest.repos.createCommitStatus({ + owner: context.repo.owner, + repo: context.repo.repo, + sha: context.payload.pull_request.head.sha, + state: state, + target_url: `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`, + description: state === 'success' ? 'Integration tests passed' : 'Integration tests failed', + context: 'integration-tests' + }); + continue-on-error: true + + - name: Cleanup + if: always() + run: | + kill $(lsof -t -i :8000) 2>/dev/null || true + uv run poe stop-deps || true +``` + +### Template 2: VPN-Protected Source (ClickHouse, Oracle, on-prem) + +Add these two steps **before** "Start Dapr + Temporal": + +```yaml + # Requires: GLOBALPROTECT_USERNAME, GLOBALPROTECT_PASSWORD (secrets) + # GLOBALPROTECT_PORTAL_URL (variable, e.g. vpn2.atlan.app) + - name: Connect to VPN (GlobalProtect) + uses: atlanhq/github-actions/globalprotect-connect-action@main + with: + portal-url: ${{ vars.GLOBALPROTECT_PORTAL_URL }} + username: ${{ secrets.GLOBALPROTECT_USERNAME }} + password: ${{ secrets.GLOBALPROTECT_PASSWORD }} + + - name: Verify VPN connectivity + run: | + echo "Testing source connectivity through VPN..." + curl -sk --connect-timeout 10 https://{YOUR_SOURCE_HOST}:{PORT} \ + && echo "Source reachable!" \ + || echo "Warning: source not reachable — tests may fail" +``` + +### Template 3: REST/PAT Auth Connector (Tableau, Salesforce) + +For connectors with camelCase credential fields, put them in `default_credentials` on the test class (env var auto-discovery lowercases everything): + +```python +class TestTableauIntegration(BaseIntegrationTest): + # CamelCase fields must be here, not in env vars + default_credentials = { + "authType": "personal_access_token", + "protocol": "https", + "defaultSite": os.environ.get("E2E_TABLEAU_DEFAULTSITE", ""), + } +``` + +And in the workflow YAML, only set the simple fields as env vars: +```yaml + env: + E2E_TABLEAU_HOST: ${{ secrets.TABLEAU_HOST }} + E2E_TABLEAU_PORT: "443" + E2E_TABLEAU_USERNAME: ${{ secrets.TABLEAU_PAT_TOKEN_NAME }} + E2E_TABLEAU_PASSWORD: ${{ secrets.TABLEAU_PAT_TOKEN_VALUE }} + E2E_TABLEAU_DEFAULTSITE: ${{ secrets.TABLEAU_SITE }} +``` + +### GitHub Secrets Checklist + +For each connector repo, add these secrets in Settings → Secrets → Actions: + +| Secret | Example | Required | +|--------|---------|----------| +| `{APP}_HOST` | `my-db.rds.amazonaws.com` | Yes | +| `{APP}_PORT` | `5432` | Only if non-standard | +| `{APP}_USERNAME` | `admin` | Yes | +| `{APP}_PASSWORD` | `secret123` | Yes | +| `{APP}_DATABASE` | `default` | Only if needed | +| `GLOBALPROTECT_USERNAME` | `john.doe` | Only for VPN sources | +| `GLOBALPROTECT_PASSWORD` | (system password) | Only for VPN sources | + +And one **variable** (Settings → Variables → Actions): + +| Variable | Value | Required | +|----------|-------|----------| +| `GLOBALPROTECT_PORTAL_URL` | `vpn2.atlan.app` | Only for VPN sources | + +### Reference Implementations + +These are live, working pipelines you can copy from: + +| Connector | Workflow File | Source Type | Demo PRs | +|-----------|--------------|-------------|----------| +| **Postgres** | [integration-tests.yaml](https://github.com/atlanhq/atlan-postgres-app/blob/demo/integration-tests-passing/.github/workflows/integration-tests.yaml) | SQL, public RDS | [#319](https://github.com/atlanhq/atlan-postgres-app/pull/319) (pass), [#320](https://github.com/atlanhq/atlan-postgres-app/pull/320) (fail) | +| **Tableau** | [integration-tests.yaml](https://github.com/atlanhq/atlan-tableau-app/blob/tests/integration-tests/.github/workflows/integration-tests.yaml) | REST, PAT auth | [#8](https://github.com/atlanhq/atlan-tableau-app/pull/8) (pass), [#9](https://github.com/atlanhq/atlan-tableau-app/pull/9) (fail) | +| **ClickHouse** | [integration-tests.yaml](https://github.com/atlanhq/atlan-clickhouse-app/blob/tests/integration-tests/.github/workflows/integration-tests.yaml) | SQL, VPN | [#28](https://github.com/atlanhq/atlan-clickhouse-app/pull/28) (pass), [#29](https://github.com/atlanhq/atlan-clickhouse-app/pull/29) (fail) | + +## Enable Merge Blocking (CRITICAL — DO NOT SKIP) + +**This step is mandatory.** Without it, the integration tests run but don't actually prevent broken code from being merged. + +1. Go to the repo → **Settings** → **Branches** → **Add branch protection rule** +2. Branch name pattern: `main` +3. Check **"Require status checks to pass before merging"** +4. Search for `integration-tests` and select it +5. Click **Save changes** + +**Why this matters:** With the new `publish.yaml` pipeline, merging to `main` automatically builds a container image and creates a release on the Global Marketplace. Without merge blocking, a broken PR goes straight from merge to production. The integration test status check is the gate that prevents this. + +## Checklist Before Finishing + +- [ ] `tests/integration/__init__.py` exists +- [ ] Test file has docstring with prerequisites +- [ ] `valid_creds_base` defined at module level if negative tests use it +- [ ] All required auth scenarios present (3+) +- [ ] All required preflight scenarios present (5+) +- [ ] All required workflow scenarios present (2+) +- [ ] Recommended scenarios added with connector-specific details +- [ ] Skipped scenarios have `skip_reason` set +- [ ] `default_credentials` has connector-specific static fields (authType, type, etc.) +- [ ] `default_metadata` uses a real database/schema from the `.env` +- [ ] `default_connection` uses the correct `app_name` in `connection_qualified_name` +- [ ] No hardcoded passwords or secrets in positive test scenarios (those use auto-discovery) + +## IMPORTANT: After Everything Is Done — Prompt the User + +After all tests pass and the CI workflow is deployed, you MUST ask the user: + +--- + +**The integration tests are working and the CI workflow is deployed. There is one final critical step:** + +**You need to enable branch protection so that failing integration tests actually block merging.** + +Do you have admin access to this repo? If yes, go to: +> **Settings → Branches → Add branch protection rule** +> - Branch name pattern: `main` +> - Check "Require status checks to pass before merging" +> - Search for `integration-tests` and select it +> - Save + +If you don't have admin access, ask your team lead or repo owner to do this. It takes 30 seconds. + +**Without this step, the tests run but don't block anything.** Since `publish.yaml` auto-deploys on merge to main, broken code would go straight to production. This is the single most important configuration step. + +Would you like me to help you verify the branch protection rule is set up correctly? + +--- + +Do NOT skip this prompt. The entire value of the pipeline depends on merge blocking being enabled. diff --git a/application_sdk/test_utils/integration/__init__.py b/application_sdk/test_utils/integration/__init__.py new file mode 100644 index 000000000..5088ee7fb --- /dev/null +++ b/application_sdk/test_utils/integration/__init__.py @@ -0,0 +1,179 @@ +"""Integration testing framework for Apps-SDK. + +This module provides a declarative, data-driven approach to integration testing. +Developers define test scenarios as data, and the framework handles everything: +credential loading, server discovery, test execution, and assertion validation. + +Quick Start (zero boilerplate): + + 1. Set environment variables in .env: + ATLAN_APPLICATION_NAME=postgres + E2E_POSTGRES_USERNAME=user + E2E_POSTGRES_PASSWORD=pass + E2E_POSTGRES_HOST=localhost + E2E_POSTGRES_PORT=5432 + + 2. Define scenarios and a test class: + + >>> from application_sdk.test_utils.integration import ( + ... Scenario, BaseIntegrationTest, equals, exists, is_true, is_dict + ... ) + >>> + >>> class TestMyConnector(BaseIntegrationTest): + ... scenarios = [ + ... Scenario( + ... name="auth_works", + ... api="auth", + ... assert_that={"success": equals(True)}, + ... ), + ... Scenario( + ... name="auth_fails", + ... api="auth", + ... credentials={"username": "bad", "password": "wrong"}, + ... assert_that={"success": equals(False)}, + ... ), + ... Scenario( + ... name="preflight_works", + ... api="preflight", + ... metadata={"include-filter": '{"^mydb$": ["^public$"]}'}, + ... assert_that={"success": equals(True), "data": is_dict()}, + ... ), + ... ] + + 3. Run: pytest tests/integration/ -v + + That's it! Credentials are auto-loaded from E2E_* env vars. + Server URL is auto-discovered from ATLAN_APP_HTTP_HOST/PORT. + Each scenario becomes its own pytest test. + +Supported APIs: +- auth: Test authentication (/workflows/v1/auth) +- metadata: Fetch metadata (/workflows/v1/metadata) +- preflight: Preflight checks (/workflows/v1/check) +- workflow: Start workflow (/workflows/v1/{endpoint}) +- config: Get/update workflow config (/workflows/v1/config/{id}) + +For detailed documentation, see: + docs/docs/guides/integration-testing.md +""" + +# ============================================================================= +# Models +# ============================================================================= + +from .assertions import ( # Basic assertions; Collection assertions; Numeric assertions; String assertions; Type assertions; Combinators; Custom + all_of, + any_of, + between, + contains, + custom, + ends_with, + equals, + exists, + greater_than, + greater_than_or_equal, + has_length, + is_dict, + is_empty, + is_false, + is_list, + is_none, + is_not_empty, + is_string, + is_true, + is_type, + less_than, + less_than_or_equal, + matches, + none_of, + not_contains, + not_equals, + not_one_of, + one_of, + starts_with, +) +from .client import IntegrationTestClient +from .comparison import ( + AssetDiff, + GapReport, + compare_metadata, + load_actual_output, + load_expected_data, +) +from .lazy import Lazy, evaluate_if_lazy, is_lazy, lazy +from .models import APIType, Scenario, ScenarioResult +from .runner import BaseIntegrationTest, generate_test_methods, parametrize_scenarios +from .validation import ( + format_validation_report, + get_normalised_dataframe, + get_schema_file_paths, + validate_with_pandera, +) + +# ============================================================================= +# Public API +# ============================================================================= + +__all__ = [ + # Models + "APIType", + "Scenario", + "ScenarioResult", + # Lazy evaluation + "Lazy", + "lazy", + "is_lazy", + "evaluate_if_lazy", + # Assertions - Basic + "equals", + "not_equals", + "exists", + "is_none", + "is_true", + "is_false", + # Assertions - Collections + "one_of", + "not_one_of", + "contains", + "not_contains", + "has_length", + "is_empty", + "is_not_empty", + # Assertions - Numeric + "greater_than", + "greater_than_or_equal", + "less_than", + "less_than_or_equal", + "between", + # Assertions - String + "matches", + "starts_with", + "ends_with", + # Assertions - Type + "is_type", + "is_dict", + "is_list", + "is_string", + # Assertions - Combinators + "all_of", + "any_of", + "none_of", + "custom", + # Metadata Comparison + "AssetDiff", + "GapReport", + "compare_metadata", + "load_actual_output", + "load_expected_data", + # Client + "IntegrationTestClient", + # Runner + "BaseIntegrationTest", + "generate_test_methods", + "parametrize_scenarios", + # Data Validation (Pandera) + "validate_with_pandera", + "format_validation_report", + "get_normalised_dataframe", + "get_schema_file_paths", +] diff --git a/application_sdk/test_utils/integration/assertions.py b/application_sdk/test_utils/integration/assertions.py new file mode 100644 index 000000000..a3b2e6886 --- /dev/null +++ b/application_sdk/test_utils/integration/assertions.py @@ -0,0 +1,839 @@ +"""Assertion DSL for integration testing. + +This module provides higher-order functions that return predicates for use +in scenario assertions. Each function returns a callable that takes an +actual value and returns True/False. + +The design follows functional programming principles: +- Higher-order functions: Functions that return functions +- Composability: Assertions can be combined using all_of/any_of +- Declarative: Describe what to check, not how + +Example: + >>> from application_sdk.test_utils.integration import Scenario, equals, exists, one_of + >>> + >>> Scenario( + ... name="auth_test", + ... api="auth", + ... args={"credentials": {...}}, + ... assert_that={ + ... "success": equals(True), + ... "data.user_id": exists(), + ... "data.role": one_of(["admin", "user"]), + ... } + ... ) +""" + +import re +from typing import Any, Callable, List, Optional, Pattern, Union + +# Type alias for predicate functions +Predicate = Callable[[Any], bool] + + +# ============================================================================= +# Basic Assertions +# ============================================================================= + + +def equals(expected: Any, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value equals the expected value. + + Args: + expected: The expected value. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual == expected. + + Example: + >>> check = equals(True) + >>> check(True) # True + >>> check(False) # False + >>> check = equals(True, description="Auth should succeed") + """ + + def predicate(actual: Any) -> bool: + return actual == expected + + predicate.__doc__ = f"equals({expected!r})" + if description is not None: + predicate.description = description + return predicate + + +def not_equals(unexpected: Any, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value does not equal the unexpected value. + + Args: + unexpected: The value that should not match. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual != unexpected. + + Example: + >>> check = not_equals(None) + >>> check("value") # True + >>> check(None) # False + """ + + def predicate(actual: Any) -> bool: + return actual != unexpected + + predicate.__doc__ = f"not_equals({unexpected!r})" + if description is not None: + predicate.description = description + return predicate + + +def exists(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is not None. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is not None. + + Example: + >>> check = exists() + >>> check("value") # True + >>> check(None) # False + """ + + def predicate(actual: Any) -> bool: + return actual is not None + + predicate.__doc__ = "exists()" + if description is not None: + predicate.description = description + return predicate + + +def is_none(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is None. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is None. + + Example: + >>> check = is_none() + >>> check(None) # True + >>> check("value") # False + """ + + def predicate(actual: Any) -> bool: + return actual is None + + predicate.__doc__ = "is_none()" + if description is not None: + predicate.description = description + return predicate + + +def is_true(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is truthy. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if bool(actual) is True. + + Example: + >>> check = is_true() + >>> check(True) # True + >>> check(1) # True + >>> check("") # False + """ + + def predicate(actual: Any) -> bool: + return bool(actual) + + predicate.__doc__ = "is_true()" + if description is not None: + predicate.description = description + return predicate + + +def is_false(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is falsy. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if bool(actual) is False. + + Example: + >>> check = is_false() + >>> check(False) # True + >>> check(0) # True + >>> check("x") # False + """ + + def predicate(actual: Any) -> bool: + return not bool(actual) + + predicate.__doc__ = "is_false()" + if description is not None: + predicate.description = description + return predicate + + +# ============================================================================= +# Collection Assertions +# ============================================================================= + + +def one_of(options: List[Any], *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is one of the given options. + + Args: + options: List of valid values. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is in options. + + Example: + >>> check = one_of(["admin", "user", "guest"]) + >>> check("admin") # True + >>> check("unknown") # False + """ + + def predicate(actual: Any) -> bool: + return actual in options + + predicate.__doc__ = f"one_of({options!r})" + if description is not None: + predicate.description = description + return predicate + + +def not_one_of(excluded: List[Any], *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is not one of the given values. + + Args: + excluded: List of values that should not match. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is not in excluded. + + Example: + >>> check = not_one_of(["error", "failed"]) + >>> check("success") # True + >>> check("error") # False + """ + + def predicate(actual: Any) -> bool: + return actual not in excluded + + predicate.__doc__ = f"not_one_of({excluded!r})" + if description is not None: + predicate.description = description + return predicate + + +def contains(item: Any, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value contains the given item. + + Works for strings (substring check) and collections (membership check). + + Args: + item: The item to search for. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if item is in actual. + + Example: + >>> check = contains("error") + >>> check("An error occurred") # True + >>> check("Success") # False + >>> + >>> check = contains(42) + >>> check([1, 42, 3]) # True + """ + + def predicate(actual: Any) -> bool: + try: + return item in actual + except TypeError: + return False + + predicate.__doc__ = f"contains({item!r})" + if description is not None: + predicate.description = description + return predicate + + +def not_contains(item: Any, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value does not contain the given item. + + Args: + item: The item that should not be present. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if item is not in actual. + + Example: + >>> check = not_contains("password") + >>> check("user logged in") # True + >>> check("password: 123") # False + """ + + def predicate(actual: Any) -> bool: + try: + return item not in actual + except TypeError: + return True + + predicate.__doc__ = f"not_contains({item!r})" + if description is not None: + predicate.description = description + return predicate + + +def has_length(expected_length: int, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value has the expected length. + + Args: + expected_length: The expected length. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if len(actual) == expected_length. + + Example: + >>> check = has_length(3) + >>> check([1, 2, 3]) # True + >>> check("abc") # True + >>> check([1, 2]) # False + """ + + def predicate(actual: Any) -> bool: + try: + return len(actual) == expected_length + except TypeError: + return False + + predicate.__doc__ = f"has_length({expected_length})" + if description is not None: + predicate.description = description + return predicate + + +def is_empty(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is empty. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is empty. + + Example: + >>> check = is_empty() + >>> check([]) # True + >>> check("") # True + >>> check([1]) # False + """ + + def predicate(actual: Any) -> bool: + try: + return len(actual) == 0 + except TypeError: + return False + + predicate.__doc__ = "is_empty()" + if description is not None: + predicate.description = description + return predicate + + +def is_not_empty(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is not empty. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is not empty. + + Example: + >>> check = is_not_empty() + >>> check([1]) # True + >>> check("x") # True + >>> check([]) # False + """ + + def predicate(actual: Any) -> bool: + try: + return len(actual) > 0 + except TypeError: + return False + + predicate.__doc__ = "is_not_empty()" + if description is not None: + predicate.description = description + return predicate + + +# ============================================================================= +# Numeric Assertions +# ============================================================================= + + +def greater_than( + value: Union[int, float], *, description: Optional[str] = None +) -> Predicate: + """Assert that the actual value is greater than the given value. + + Args: + value: The value to compare against. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual > value. + + Example: + >>> check = greater_than(0) + >>> check(1) # True + >>> check(0) # False + >>> check(-1) # False + """ + + def predicate(actual: Any) -> bool: + try: + return actual > value + except TypeError: + return False + + predicate.__doc__ = f"greater_than({value})" + if description is not None: + predicate.description = description + return predicate + + +def greater_than_or_equal( + value: Union[int, float], *, description: Optional[str] = None +) -> Predicate: + """Assert that the actual value is greater than or equal to the given value. + + Args: + value: The value to compare against. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual >= value. + + Example: + >>> check = greater_than_or_equal(0) + >>> check(1) # True + >>> check(0) # True + >>> check(-1) # False + """ + + def predicate(actual: Any) -> bool: + try: + return actual >= value + except TypeError: + return False + + predicate.__doc__ = f"greater_than_or_equal({value})" + if description is not None: + predicate.description = description + return predicate + + +def less_than( + value: Union[int, float], *, description: Optional[str] = None +) -> Predicate: + """Assert that the actual value is less than the given value. + + Args: + value: The value to compare against. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual < value. + + Example: + >>> check = less_than(10) + >>> check(5) # True + >>> check(10) # False + """ + + def predicate(actual: Any) -> bool: + try: + return actual < value + except TypeError: + return False + + predicate.__doc__ = f"less_than({value})" + if description is not None: + predicate.description = description + return predicate + + +def less_than_or_equal( + value: Union[int, float], *, description: Optional[str] = None +) -> Predicate: + """Assert that the actual value is less than or equal to the given value. + + Args: + value: The value to compare against. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual <= value. + + Example: + >>> check = less_than_or_equal(10) + >>> check(5) # True + >>> check(10) # True + >>> check(11) # False + """ + + def predicate(actual: Any) -> bool: + try: + return actual <= value + except TypeError: + return False + + predicate.__doc__ = f"less_than_or_equal({value})" + if description is not None: + predicate.description = description + return predicate + + +def between( + min_value: Union[int, float], + max_value: Union[int, float], + *, + description: Optional[str] = None, +) -> Predicate: + """Assert that the actual value is between min and max (inclusive). + + Args: + min_value: The minimum value (inclusive). + max_value: The maximum value (inclusive). + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if min_value <= actual <= max_value. + + Example: + >>> check = between(1, 10) + >>> check(5) # True + >>> check(1) # True + >>> check(0) # False + """ + + def predicate(actual: Any) -> bool: + try: + return min_value <= actual <= max_value + except TypeError: + return False + + predicate.__doc__ = f"between({min_value}, {max_value})" + if description is not None: + predicate.description = description + return predicate + + +# ============================================================================= +# String Assertions +# ============================================================================= + + +def matches( + pattern: Union[str, Pattern], *, description: Optional[str] = None +) -> Predicate: + """Assert that the actual value matches the given regex pattern. + + Args: + pattern: A regex pattern string or compiled pattern. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual matches the pattern. + + Example: + >>> check = matches(r"^[a-z]+$") + >>> check("hello") # True + >>> check("Hello") # False + >>> check("123") # False + """ + compiled = re.compile(pattern) if isinstance(pattern, str) else pattern + + def predicate(actual: Any) -> bool: + if actual is None: + return False + return compiled.match(str(actual)) is not None + + predicate.__doc__ = f"matches({pattern!r})" + if description is not None: + predicate.description = description + return predicate + + +def starts_with(prefix: str, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value starts with the given prefix. + + Args: + prefix: The expected prefix. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual starts with prefix. + + Example: + >>> check = starts_with("http") + >>> check("https://example.com") # True + >>> check("ftp://example.com") # False + """ + + def predicate(actual: Any) -> bool: + try: + return str(actual).startswith(prefix) + except (TypeError, AttributeError): + return False + + predicate.__doc__ = f"starts_with({prefix!r})" + if description is not None: + predicate.description = description + return predicate + + +def ends_with(suffix: str, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value ends with the given suffix. + + Args: + suffix: The expected suffix. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual ends with suffix. + + Example: + >>> check = ends_with(".json") + >>> check("data.json") # True + >>> check("data.xml") # False + """ + + def predicate(actual: Any) -> bool: + try: + return str(actual).endswith(suffix) + except (TypeError, AttributeError): + return False + + predicate.__doc__ = f"ends_with({suffix!r})" + if description is not None: + predicate.description = description + return predicate + + +# ============================================================================= +# Type Assertions +# ============================================================================= + + +def is_type(expected_type: type, *, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is an instance of the given type. + + Args: + expected_type: The expected type. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if isinstance(actual, expected_type). + + Example: + >>> check = is_type(str) + >>> check("hello") # True + >>> check(123) # False + """ + + def predicate(actual: Any) -> bool: + return isinstance(actual, expected_type) + + predicate.__doc__ = f"is_type({expected_type.__name__})" + if description is not None: + predicate.description = description + return predicate + + +def is_dict(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is a dictionary. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is a dict. + + Example: + >>> check = is_dict() + >>> check({"key": "value"}) # True + >>> check([1, 2, 3]) # False + """ + p = is_type(dict) + if description is not None: + p.description = description + return p + + +def is_list(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is a list. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is a list. + + Example: + >>> check = is_list() + >>> check([1, 2, 3]) # True + >>> check("abc") # False + """ + p = is_type(list) + if description is not None: + p.description = description + return p + + +def is_string(*, description: Optional[str] = None) -> Predicate: + """Assert that the actual value is a string. + + Args: + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if actual is a str. + + Example: + >>> check = is_string() + >>> check("hello") # True + >>> check(123) # False + """ + p = is_type(str) + if description is not None: + p.description = description + return p + + +# ============================================================================= +# Combinators (Compose Assertions) +# ============================================================================= + + +def all_of(*predicates: Predicate, description: Optional[str] = None) -> Predicate: + """Assert that all predicates pass. + + Args: + *predicates: Variable number of predicates to combine. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if all predicates pass. + + Example: + >>> check = all_of(exists(), is_string(), starts_with("http")) + >>> check("https://example.com") # True + >>> check(None) # False + """ + + def predicate(actual: Any) -> bool: + return all(p(actual) for p in predicates) + + predicate.__doc__ = f"all_of({len(predicates)} predicates)" + if description is not None: + predicate.description = description + return predicate + + +def any_of(*predicates: Predicate, description: Optional[str] = None) -> Predicate: + """Assert that at least one predicate passes. + + Args: + *predicates: Variable number of predicates to combine. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if any predicate passes. + + Example: + >>> check = any_of(equals("admin"), equals("superuser")) + >>> check("admin") # True + >>> check("superuser") # True + >>> check("guest") # False + """ + + def predicate(actual: Any) -> bool: + return any(p(actual) for p in predicates) + + predicate.__doc__ = f"any_of({len(predicates)} predicates)" + if description is not None: + predicate.description = description + return predicate + + +def none_of(*predicates: Predicate, description: Optional[str] = None) -> Predicate: + """Assert that none of the predicates pass. + + Args: + *predicates: Variable number of predicates to combine. + description: Optional human-readable explanation shown on failure. + + Returns: + Predicate: A function that returns True if no predicate passes. + + Example: + >>> check = none_of(contains("error"), contains("fail")) + >>> check("success") # True + >>> check("error found") # False + """ + + def predicate(actual: Any) -> bool: + return not any(p(actual) for p in predicates) + + predicate.__doc__ = f"none_of({len(predicates)} predicates)" + if description is not None: + predicate.description = description + return predicate + + +# ============================================================================= +# Custom Assertion +# ============================================================================= + + +def custom(fn: Callable[[Any], bool], description: str = "custom") -> Predicate: + """Create a custom assertion from a user-provided function. + + Args: + fn: A function that takes the actual value and returns True/False. + description: Description for error messages. Also stored as + the predicate's description attribute for consistency + with other assertion functions. + + Returns: + Predicate: The function wrapped as a predicate. + + Example: + >>> check = custom(lambda x: x % 2 == 0, "is_even") + >>> check(4) # True + >>> check(3) # False + """ + fn.__doc__ = description + fn.description = description + return fn diff --git a/application_sdk/test_utils/integration/client.py b/application_sdk/test_utils/integration/client.py new file mode 100644 index 000000000..360b2f40c --- /dev/null +++ b/application_sdk/test_utils/integration/client.py @@ -0,0 +1,391 @@ +"""API client wrapper for integration testing. + +This module provides a unified interface for calling the Core 3 APIs +(auth, preflight, workflow) during integration tests. + +It wraps the existing APIServerClient and provides: +1. A mapping from API type strings to client methods +2. Better error handling (returns response instead of asserting) +3. Support for dynamic workflow endpoints +""" + +from typing import Any, Callable, Dict, Optional +from urllib.parse import urljoin + +import requests + +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + + +class IntegrationTestClient: + """Client for integration testing of the Core 3 APIs. + + This client wraps HTTP calls to the application server and provides + a unified interface for auth, preflight, and workflow operations. + + Unlike the E2E client which asserts on status codes, this client + returns the full response for the test framework to validate. + + Attributes: + host: The base URL of the application server. + version: API version prefix (default: "v1"). + workflow_endpoint: The endpoint for starting workflows (default: "/start"). + timeout: Request timeout in seconds. + + Example: + >>> client = IntegrationTestClient(host="http://localhost:8000") + >>> response = client.call_api("auth", {"credentials": {...}}) + >>> print(response["success"]) + """ + + def __init__( + self, + host: str, + version: str = "v1", + workflow_endpoint: str = "/start", + timeout: int = 30, + ): + """Initialize the integration test client. + + Args: + host: The base URL of the application server. + version: API version prefix. + workflow_endpoint: The endpoint for starting workflows. + timeout: Request timeout in seconds. + """ + self.host = host + self.version = version + self.workflow_endpoint = workflow_endpoint + self.timeout = timeout + self.base_url = urljoin(host, f"workflows/{version}") + + def call_api( + self, + api: str, + args: Dict[str, Any], + endpoint_override: Optional[str] = None, + ) -> Dict[str, Any]: + """Call an API based on the API type. + + This is the main entry point for the test framework. It routes + the call to the appropriate API method based on the api parameter. + + Args: + api: The API type ("auth", "preflight", "workflow"). + args: The arguments to pass to the API. + endpoint_override: Optional override for the workflow endpoint. + + Returns: + Dict[str, Any]: The API response as a dictionary. + + Raises: + ValueError: If the API type is not supported. + requests.RequestException: If the HTTP request fails. + """ + api_lower = api.lower() + + if api_lower == "auth": + return self._call_auth(args) + elif api_lower == "metadata": + return self._call_metadata(args) + elif api_lower == "preflight": + return self._call_preflight(args) + elif api_lower == "workflow": + return self._call_workflow(args, endpoint_override) + elif api_lower == "config": + return self._call_config(args) + else: + raise ValueError( + f"Unsupported API type: '{api}'. " + f"Must be one of: auth, metadata, preflight, workflow, config" + ) + + def _call_auth(self, args: Dict[str, Any]) -> Dict[str, Any]: + """Call the authentication API. + + Args: + args: Must contain "credentials" key. + + Returns: + Dict[str, Any]: The API response. + """ + credentials = args.get("credentials", args) + return self._post("/auth", data=credentials) + + def _call_metadata(self, args: Dict[str, Any]) -> Dict[str, Any]: + """Call the metadata API. + + Args: + args: Must contain "credentials" key. + + Returns: + Dict[str, Any]: The API response. + """ + credentials = args.get("credentials", args) + return self._post("/metadata", data=credentials) + + def _call_preflight(self, args: Dict[str, Any]) -> Dict[str, Any]: + """Call the preflight check API. + + Args: + args: Must contain "credentials" and "metadata" keys. + + Returns: + Dict[str, Any]: The API response. + """ + data = { + "credentials": args.get("credentials", {}), + "metadata": args.get("metadata", {}), + } + return self._post("/check", data=data) + + def _call_workflow( + self, + args: Dict[str, Any], + endpoint_override: Optional[str] = None, + ) -> Dict[str, Any]: + """Call the workflow start API. + + Args: + args: The workflow arguments (credentials, metadata, connection). + endpoint_override: Optional override for the workflow endpoint. + + Returns: + Dict[str, Any]: The API response. + """ + endpoint = endpoint_override or self.workflow_endpoint + return self._post(endpoint, data=args) + + def _call_config(self, args: Dict[str, Any]) -> Dict[str, Any]: + """Call the config GET or POST API. + + Args: + args: Must contain "config_action" ("get" or "update"), + "config_workflow_id", and optionally "config_payload". + + Returns: + Dict[str, Any]: The API response. + """ + action = args.get("config_action", "get") + workflow_id = args.get("config_workflow_id") + + if not workflow_id: + return { + "success": False, + "error": "config_workflow_id is required for config API calls", + } + + if action == "get": + return self.get_config(workflow_id) + elif action == "update": + payload = args.get("config_payload", {}) + return self.update_config(workflow_id, payload) + else: + return { + "success": False, + "error": f"Invalid config_action: '{action}'. Must be 'get' or 'update'", + } + + def get_config(self, workflow_id: str) -> Dict[str, Any]: + """Get the configuration for a workflow. + + Args: + workflow_id: The workflow ID. + + Returns: + Dict[str, Any]: The config response. + """ + return self._get(f"/config/{workflow_id}") + + def update_config( + self, workflow_id: str, payload: Dict[str, Any] + ) -> Dict[str, Any]: + """Update the configuration for a workflow. + + Args: + workflow_id: The workflow ID. + payload: The config update payload (connection, metadata). + + Returns: + Dict[str, Any]: The config response. + """ + return self._post(f"/config/{workflow_id}", data=payload) + + def get_workflow_status( + self, + workflow_id: str, + run_id: str, + ) -> Dict[str, Any]: + """Get the status of a workflow execution. + + Args: + workflow_id: The workflow ID. + run_id: The run ID. + + Returns: + Dict[str, Any]: The workflow status response. + """ + return self._get(f"/status/{workflow_id}/{run_id}") + + def _get(self, endpoint: str) -> Dict[str, Any]: + """Make a GET request to the API. + + Args: + endpoint: The API endpoint (relative to base_url). + + Returns: + Dict[str, Any]: The response as a dictionary. + + Raises: + requests.RequestException: If the request fails. + """ + url = f"{self.base_url}{endpoint}" + logger.debug(f"GET {url}") + + try: + response = requests.get(url, timeout=self.timeout) + return self._handle_response(response) + except requests.ConnectionError as e: + logger.error(f"GET request failed - cannot connect to {url}: {e}") + return { + "success": False, + "error": { + "code": "CONNECTION_FAILED", + "message": ( + f"Cannot connect to server at {self.host}. " + f"Is the application running? Start it with: uv run python main.py" + ), + "details": str(e), + }, + } + except requests.Timeout as e: + logger.error(f"GET request timed out after {self.timeout}s: {e}") + return { + "success": False, + "error": { + "code": "REQUEST_TIMEOUT", + "message": f"Request to {url} timed out after {self.timeout}s", + "details": str(e), + }, + } + except requests.RequestException as e: + logger.error(f"GET request failed: {e}") + return { + "success": False, + "error": { + "code": "REQUEST_FAILED", + "message": str(e), + }, + } + + def _post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]: + """Make a POST request to the API. + + Args: + endpoint: The API endpoint (relative to base_url). + data: The request body. + + Returns: + Dict[str, Any]: The response as a dictionary. + + Raises: + requests.RequestException: If the request fails. + """ + url = f"{self.base_url}{endpoint}" + logger.debug(f"POST {url}") + + try: + response = requests.post(url, json=data, timeout=self.timeout) + return self._handle_response(response) + except requests.ConnectionError as e: + logger.error(f"POST request failed - cannot connect to {url}: {e}") + return { + "success": False, + "error": { + "code": "CONNECTION_FAILED", + "message": ( + f"Cannot connect to server at {self.host}. " + f"Is the application running? Start it with: uv run python main.py" + ), + "details": str(e), + }, + } + except requests.Timeout as e: + logger.error(f"POST request timed out after {self.timeout}s: {e}") + return { + "success": False, + "error": { + "code": "REQUEST_TIMEOUT", + "message": f"Request to {url} timed out after {self.timeout}s", + "details": str(e), + }, + } + except requests.RequestException as e: + logger.error(f"POST request failed: {e}") + return { + "success": False, + "error": { + "code": "REQUEST_FAILED", + "message": str(e), + }, + } + + def _handle_response(self, response: requests.Response) -> Dict[str, Any]: + """Handle the HTTP response and convert to dictionary. + + Args: + response: The requests Response object. + + Returns: + Dict[str, Any]: The response as a dictionary with status info. + """ + try: + result = response.json() + except ValueError: + # Response is not JSON + result = { + "success": False, + "error": { + "code": "INVALID_RESPONSE", + "message": "Response is not valid JSON", + "body": response.text[:500] if response.text else None, + }, + } + + # Add HTTP status info if not present + if "_http_status" not in result: + result["_http_status"] = response.status_code + + return result + + +# ============================================================================= +# API Method Mapping (Higher-Order Function Pattern) +# ============================================================================= + +# Type alias for API method functions +APIMethod = Callable[[IntegrationTestClient, Dict[str, Any]], Dict[str, Any]] + + +def create_api_method_map() -> Dict[str, APIMethod]: + """Create a mapping of API types to client methods. + + This uses higher-order functions to create a flexible mapping + that can be extended or customized. + + Returns: + Dict[str, APIMethod]: Mapping of API type strings to callable methods. + """ + return { + "auth": lambda client, args: client._call_auth(args), + "metadata": lambda client, args: client._call_metadata(args), + "preflight": lambda client, args: client._call_preflight(args), + "workflow": lambda client, args: client._call_workflow(args), + "config": lambda client, args: client._call_config(args), + } + + +# Default API method map +API_METHODS = create_api_method_map() diff --git a/application_sdk/test_utils/integration/comparison.py b/application_sdk/test_utils/integration/comparison.py new file mode 100644 index 000000000..d8ba00cd3 --- /dev/null +++ b/application_sdk/test_utils/integration/comparison.py @@ -0,0 +1,475 @@ +"""Metadata comparison engine for integration testing. + +This module compares actual extracted metadata against an expected baseline +and produces a gap report detailing missing assets, extra assets, and +attribute mismatches. + +The comparison is connector-agnostic — it works with any asset type +(Database, Schema, Table, Column, etc.) as long as assets follow the +standard structure with ``typeName``, ``attributes``, and ``customAttributes``. + +Example expected data JSON:: + + { + "Database": [ + {"attributes": {"name": "mydb", "connectorName": "postgres"}} + ], + "Table": [ + {"attributes": {"name": "orders", "columnCount": 6}} + ] + } + +Usage:: + + from application_sdk.test_utils.integration.comparison import ( + compare_metadata, load_expected_data, load_actual_output, + ) + + expected = load_expected_data("tests/expected/baseline.json") + actual = load_actual_output("/tmp/output", workflow_id, run_id) + report = compare_metadata(expected, actual, strict=True) + if report.has_gaps: + raise AssertionError(report.format_report()) +""" + +import os +from dataclasses import dataclass, field +from glob import glob +from typing import Any, Dict, List, Optional, Set + +import orjson + +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + +# Fields that change between runs and should be ignored by default +DEFAULT_IGNORED_FIELDS: Set[str] = { + "qualifiedName", + "connectionQualifiedName", + "lastSyncWorkflowName", + "lastSyncRun", + "lastSyncRunAt", + "tenantId", + "connectionName", + "databaseQualifiedName", + "schemaQualifiedName", + "tableQualifiedName", + "viewQualifiedName", +} + +# Nested reference fields that contain run-specific qualified names +DEFAULT_IGNORED_NESTED_FIELDS: Set[str] = { + "atlanSchema", + "database", + "table", + "view", + "materialisedView", + "parentTable", + "tablePartition", +} + + +@dataclass +class AssetDiff: + """A single difference found between expected and actual metadata. + + Attributes: + asset_type: The typeName of the asset (e.g., "Table", "Column"). + asset_name: The name of the asset from attributes.name. + diff_type: Category of difference — one of "missing", "extra", + "attribute_mismatch", "missing_attribute", "count_mismatch". + field: Dot-separated path to the differing field + (e.g., "attributes.columnCount"). None for asset-level diffs. + expected: The expected value. None for extra assets. + actual: The actual value. None for missing assets. + """ + + asset_type: str + asset_name: str + diff_type: str + field: Optional[str] = None + expected: Any = None + actual: Any = None + + def __str__(self) -> str: + if self.diff_type in ("missing", "extra"): + return f"[{self.diff_type.upper()}] {self.asset_type}/{self.asset_name}" + return ( + f"[{self.diff_type.upper()}] {self.asset_type}/{self.asset_name} " + f"-> {self.field}: expected={self.expected!r}, actual={self.actual!r}" + ) + + +@dataclass +class GapReport: + """Summary of all differences between expected and actual metadata. + + Attributes: + diffs: List of individual asset differences. + summary: Count of diffs by type. + """ + + diffs: List[AssetDiff] = field(default_factory=list) + summary: Dict[str, int] = field(default_factory=dict) + expected_file: Optional[str] = None + + @property + def has_gaps(self) -> bool: + """Return True if any differences were found.""" + return len(self.diffs) > 0 + + def format_report(self) -> str: + """Format the gap report as a human-readable string for pytest output.""" + if not self.has_gaps: + return "No gaps found — actual metadata matches expected." + + lines = ["Metadata validation failed:", ""] + + # Show baseline file path if available + if self.expected_file: + lines.append(f"Expected baseline: {self.expected_file}") + lines.append("") + + # Summary + lines.append("Summary:") + for diff_type, count in sorted(self.summary.items()): + lines.append(f" {diff_type}: {count}") + lines.append("") + + # Group diffs by asset type + by_type: Dict[str, List[AssetDiff]] = {} + for diff in self.diffs: + by_type.setdefault(diff.asset_type, []).append(diff) + + for asset_type, type_diffs in sorted(by_type.items()): + lines.append(f"[{asset_type}]") + for diff in type_diffs: + lines.append(f" {diff}") + lines.append("") + + return "\n".join(lines) + + +def compare_metadata( + expected: Dict[str, List[Dict[str, Any]]], + actual: List[Dict[str, Any]], + strict: bool = True, + ignored_fields: Optional[Set[str]] = None, + expected_file: Optional[str] = None, +) -> GapReport: + """Compare actual extracted metadata against an expected baseline. + + Args: + expected: Expected metadata grouped by asset type. Keys are type names + (e.g., "Table"), values are lists of asset dicts with ``attributes`` + and optionally ``customAttributes``. + actual: List of actual extracted asset records, each with ``typeName``, + ``attributes``, and optionally ``customAttributes``. + strict: If True, extra assets in actual output that are not in the + expected data will be reported as gaps. + ignored_fields: Set of attribute field names to skip during comparison. + Defaults to ``DEFAULT_IGNORED_FIELDS``. + + Returns: + GapReport: A report of all differences found. + """ + if ignored_fields is None: + ignored_fields = DEFAULT_IGNORED_FIELDS + + report = GapReport(expected_file=expected_file) + + # Group actual assets by typeName + actual_by_type: Dict[str, List[Dict[str, Any]]] = {} + for record in actual: + type_name = record.get("typeName", "Unknown") + actual_by_type.setdefault(type_name, []).append(record) + + for asset_type, expected_assets in expected.items(): + actual_assets = actual_by_type.get(asset_type, []) + + # Count check + if len(expected_assets) != len(actual_assets): + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name="*", + diff_type="count_mismatch", + field="count", + expected=len(expected_assets), + actual=len(actual_assets), + ) + ) + _increment_summary(report, "count_mismatch") + + # Build lookup dict for actual assets keyed by attributes.name + actual_by_name: Dict[str, Dict[str, Any]] = {} + for asset in actual_assets: + name = _get_asset_name(asset) + if name: + actual_by_name[name] = asset + + # Check each expected asset + expected_names: Set[str] = set() + for expected_asset in expected_assets: + name = _get_asset_name(expected_asset) + if not name: + logger.warning( + f"Expected {asset_type} asset has no attributes.name, skipping" + ) + continue + + expected_names.add(name) + actual_asset = actual_by_name.get(name) + + if actual_asset is None: + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name=name, + diff_type="missing", + ) + ) + _increment_summary(report, "missing") + continue + + # Compare attributes + _compare_attributes( + report=report, + asset_type=asset_type, + asset_name=name, + expected_attrs=expected_asset.get("attributes", {}), + actual_attrs=actual_asset.get("attributes", {}), + prefix="attributes", + ignored_fields=ignored_fields, + ) + + # Compare customAttributes + expected_custom = expected_asset.get("customAttributes") + actual_custom = actual_asset.get("customAttributes") + if expected_custom: + _compare_attributes( + report=report, + asset_type=asset_type, + asset_name=name, + expected_attrs=expected_custom, + actual_attrs=actual_custom or {}, + prefix="customAttributes", + ignored_fields=ignored_fields, + ) + + # Check for extra assets in actual (strict mode) + if strict: + for name in actual_by_name: + if name not in expected_names: + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name=name, + diff_type="extra", + ) + ) + _increment_summary(report, "extra") + + # Check for asset types in actual that aren't in expected (strict mode) + if strict: + for asset_type in actual_by_type: + if asset_type not in expected: + for asset in actual_by_type[asset_type]: + name = _get_asset_name(asset) or "" + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name=name, + diff_type="extra", + ) + ) + _increment_summary(report, "extra") + + return report + + +def load_expected_data(file_path: str) -> Dict[str, List[Dict[str, Any]]]: + """Load expected metadata from a JSON file. + + The file should contain a JSON object mapping asset type names to lists + of asset records. + + Args: + file_path: Path to the expected data JSON file. + + Returns: + Dict mapping asset type names to lists of asset dicts. + + Raises: + FileNotFoundError: If the file does not exist. + ValueError: If the file content is not valid JSON or wrong structure. + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"Expected data file not found: {file_path}") + + with open(file_path, "rb") as f: + data = orjson.loads(f.read()) + + if not isinstance(data, dict): + raise ValueError( + f"Expected data file must contain a JSON object mapping asset types " + f"to lists, got {type(data).__name__}" + ) + + for key, value in data.items(): + if not isinstance(value, list): + raise ValueError( + f"Expected data for asset type '{key}' must be a list, " + f"got {type(value).__name__}" + ) + + return data + + +def load_actual_output( + base_path: str, + workflow_id: str, + run_id: str, + subdirectory: str = "transformed", +) -> List[Dict[str, Any]]: + """Load all extracted metadata from the output directory. + + Reads JSONL files from ``{base_path}/{workflow_id}/{run_id}/{subdirectory}/`` + and returns all records as a flat list. + + Args: + base_path: Base directory where connector writes extracted output. + workflow_id: The workflow ID from the API response. + run_id: The run ID from the API response. + subdirectory: Subdirectory within the run output to read from. + Defaults to "transformed". If the subdirectory exists, only + files within it are read. If it does not exist, falls back + to reading the entire run directory. Pass "" to always read + the full directory. + + Returns: + List of asset records (each with typeName, attributes, etc.). + + Raises: + FileNotFoundError: If the output directory does not exist or is empty. + """ + output_dir = os.path.join(base_path, workflow_id, run_id) + + if not os.path.isdir(output_dir): + raise FileNotFoundError(f"Extracted output directory not found: {output_dir}") + + # Prefer subdirectory if specified and exists; fall back to full dir + search_dir = output_dir + if subdirectory: + sub_path = os.path.join(output_dir, subdirectory) + if os.path.isdir(sub_path): + search_dir = sub_path + logger.info(f"Reading actual output from subdirectory: {search_dir}") + else: + logger.warning( + f"Subdirectory '{subdirectory}' not found in {output_dir}, " + f"falling back to full directory" + ) + + records: List[Dict[str, Any]] = [] + json_files = glob(os.path.join(search_dir, "**", "*.json"), recursive=True) + + for json_file in sorted(json_files): + with open(json_file, "rb") as f: + for line in f: + line = line.strip() + if line: + records.append(orjson.loads(line)) + + if not records: + raise FileNotFoundError( + f"No metadata records found in output directory: {output_dir}" + ) + + logger.info(f"Loaded {len(records)} actual metadata records from {output_dir}") + return records + + +def _get_asset_name(asset: Dict[str, Any]) -> Optional[str]: + """Extract a unique lookup key from an asset's attributes. + + For child assets like Columns that share names across parents (e.g., + ``city_id`` exists in both ``cities`` and ``state_provinces``), the key + includes the parent name: ``cities/city_id``. This prevents collisions + in the lookup dict. + + Parent is resolved from ``tableName``, ``viewName``, or ``parentName`` + (in that order). If no parent context exists, falls back to just ``name``. + """ + attrs = asset.get("attributes", {}) + if not isinstance(attrs, dict): + return None + + name = attrs.get("name") + if name is None: + return None + + # Build composite key for child assets that have a parent reference + parent = attrs.get("tableName") or attrs.get("viewName") or attrs.get("parentName") + if parent: + return f"{parent}/{name}" + + return name + + +def _compare_attributes( + report: GapReport, + asset_type: str, + asset_name: str, + expected_attrs: Dict[str, Any], + actual_attrs: Dict[str, Any], + prefix: str, + ignored_fields: Set[str], +) -> None: + """Compare attributes between expected and actual, adding diffs to report.""" + if not expected_attrs: + return + + for key, expected_value in expected_attrs.items(): + if key in ignored_fields: + continue + + if key in DEFAULT_IGNORED_NESTED_FIELDS: + continue + + field_path = f"{prefix}.{key}" + + if key not in actual_attrs: + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name=asset_name, + diff_type="missing_attribute", + field=field_path, + expected=expected_value, + actual=None, + ) + ) + _increment_summary(report, "missing_attribute") + continue + + actual_value = actual_attrs[key] + + if expected_value != actual_value: + report.diffs.append( + AssetDiff( + asset_type=asset_type, + asset_name=asset_name, + diff_type="attribute_mismatch", + field=field_path, + expected=expected_value, + actual=actual_value, + ) + ) + _increment_summary(report, "attribute_mismatch") + + +def _increment_summary(report: GapReport, diff_type: str) -> None: + """Increment the count for a diff type in the report summary.""" + report.summary[diff_type] = report.summary.get(diff_type, 0) + 1 diff --git a/application_sdk/test_utils/integration/lazy.py b/application_sdk/test_utils/integration/lazy.py new file mode 100644 index 000000000..a8f5fc212 --- /dev/null +++ b/application_sdk/test_utils/integration/lazy.py @@ -0,0 +1,162 @@ +"""Lazy evaluation utilities for integration testing. + +This module provides lazy evaluation wrappers that defer computation until +the value is actually needed. This is useful for: + +1. Credential loading - Don't load credentials at import time, only when tests run +2. Environment-specific values - Allow tests to be defined in one environment, run in another +3. Caching - Expensive computations are only performed once + +Example: + >>> from application_sdk.test_utils.integration import lazy + >>> + >>> # Value is not computed until evaluate() is called + >>> creds = lazy(lambda: load_credentials_from_env("MY_APP")) + >>> + >>> # Later, when test runs: + >>> actual_creds = creds.evaluate() # Now it loads + >>> actual_creds_again = creds.evaluate() # Returns cached value +""" + +from typing import Any, Callable, Generic, TypeVar + +T = TypeVar("T") + + +class Lazy(Generic[T]): + """Wrapper for lazy evaluation with caching. + + The wrapped function is not called until evaluate() is invoked. + Once evaluated, the result is cached and returned on subsequent calls. + + Attributes: + _fn: The function to evaluate lazily. + _cached: The cached result after evaluation. + _evaluated: Whether the function has been evaluated. + + Example: + >>> expensive_value = Lazy(lambda: compute_expensive_thing()) + >>> # Nothing computed yet + >>> result = expensive_value.evaluate() # Now it computes + >>> result2 = expensive_value.evaluate() # Returns cached value + """ + + def __init__(self, fn: Callable[[], T]): + """Initialize the lazy wrapper. + + Args: + fn: A callable that takes no arguments and returns the value. + """ + if not callable(fn): + raise TypeError("Lazy wrapper requires a callable") + self._fn = fn + self._cached: T = None # type: ignore + self._evaluated: bool = False + + def evaluate(self) -> T: + """Evaluate the wrapped function and return the result. + + The function is only called on the first invocation. + Subsequent calls return the cached result. + + Returns: + T: The result of calling the wrapped function. + + Raises: + Any exception raised by the wrapped function. + """ + if not self._evaluated: + self._cached = self._fn() + self._evaluated = True + return self._cached + + def is_evaluated(self) -> bool: + """Check if the value has been evaluated. + + Returns: + bool: True if evaluate() has been called, False otherwise. + """ + return self._evaluated + + def reset(self) -> None: + """Reset the lazy wrapper to re-evaluate on next access. + + This clears the cached value and allows the function to be + called again on the next evaluate() call. + """ + self._cached = None # type: ignore + self._evaluated = False + + def __repr__(self) -> str: + """String representation of the lazy wrapper.""" + status = "evaluated" if self._evaluated else "unevaluated" + return f"Lazy({status})" + + +def lazy(fn: Callable[[], T]) -> Lazy[T]: + """Create a lazy evaluation wrapper. + + This is the primary interface for creating lazy values. + + Args: + fn: A callable that takes no arguments and returns the value. + + Returns: + Lazy[T]: A lazy wrapper around the function. + + Example: + >>> from application_sdk.test_utils.integration import lazy + >>> + >>> def load_creds(): + ... return {"username": "test", "password": "secret"} + >>> + >>> creds = lazy(load_creds) + >>> # Or with lambda: + >>> creds = lazy(lambda: {"username": "test", "password": "secret"}) + """ + return Lazy(fn) + + +def is_lazy(value: Any) -> bool: + """Check if a value is a lazy wrapper. + + Args: + value: Any value to check. + + Returns: + bool: True if the value is a Lazy wrapper, False otherwise. + + Example: + >>> from application_sdk.test_utils.integration import lazy, is_lazy + >>> + >>> value = lazy(lambda: 42) + >>> is_lazy(value) # True + >>> is_lazy(42) # False + """ + return isinstance(value, Lazy) + + +def evaluate_if_lazy(value: T) -> T: + """Evaluate a value if it's lazy, otherwise return as-is. + + This is a convenience function for handling values that may or may not + be lazy-wrapped. + + Args: + value: A value that may be a Lazy wrapper or a regular value. + + Returns: + T: The evaluated value (if lazy) or the original value. + + Example: + >>> from application_sdk.test_utils.integration import lazy, evaluate_if_lazy + >>> + >>> lazy_value = lazy(lambda: 42) + >>> regular_value = 42 + >>> + >>> evaluate_if_lazy(lazy_value) # Returns 42 + >>> evaluate_if_lazy(regular_value) # Returns 42 + """ + if is_lazy(value): + return value.evaluate() + return value diff --git a/application_sdk/test_utils/integration/models.py b/application_sdk/test_utils/integration/models.py new file mode 100644 index 000000000..b88876b95 --- /dev/null +++ b/application_sdk/test_utils/integration/models.py @@ -0,0 +1,229 @@ +"""Core data models for integration testing framework. + +This module defines the data structures used to declare integration test scenarios +in a declarative, data-driven manner. +""" + +import os +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, Optional, Set, Union + + +class APIType(Enum): + """Supported API types for integration testing. + + These represent the Core APIs that connectors implement. + """ + + AUTH = "auth" + METADATA = "metadata" + PREFLIGHT = "preflight" + WORKFLOW = "workflow" + CONFIG = "config" + + @classmethod + def from_string(cls, value: str) -> "APIType": + """Convert string to APIType enum. + + Args: + value: String representation of the API type. + + Returns: + APIType: The corresponding enum value. + + Raises: + ValueError: If the value is not a valid API type. + """ + try: + return cls(value.lower()) + except ValueError: + valid_values = [e.value for e in cls] + raise ValueError( + f"Invalid API type: '{value}'. Must be one of: {valid_values}" + ) + + +# Type alias for assertion predicates (higher-order functions) +Predicate = Callable[[Any], bool] + +# Type alias for lazy evaluation wrapper (forward reference) +LazyValue = Any # Will be Lazy type from lazy.py + + +@dataclass +class Scenario: + """Represents a single integration test scenario. + + A scenario defines what API to test and what to assert on the response. + Credentials, metadata, and connection are auto-loaded from environment + variables unless explicitly overridden. + + Simplified Usage (recommended): + >>> Scenario( + ... name="auth_works", + ... api="auth", + ... assert_that={"success": equals(True)}, + ... ) + # Credentials auto-loaded from E2E_{APP_NAME}_* env vars + + Override credentials for negative tests: + >>> Scenario( + ... name="auth_fails", + ... api="auth", + ... credentials={"username": "bad", "password": "wrong"}, + ... assert_that={"success": equals(False)}, + ... ) + + Override metadata for preflight: + >>> Scenario( + ... name="preflight_custom", + ... api="preflight", + ... metadata={"include-filter": '{"^mydb$": ["^public$"]}'}, + ... assert_that={"success": equals(True)}, + ... ) + + Attributes: + name: Unique identifier for the scenario. + api: The API type to test ("auth", "metadata", "preflight", "workflow", "config"). + assert_that: Dictionary mapping response paths to assertion predicates. + credentials: Optional credentials override. If not provided, auto-loaded from env. + metadata: Optional metadata override. If not provided, uses class defaults. + connection: Optional connection override. If not provided, uses class defaults. + args: Full args override for backward compatibility. Takes precedence over + credentials/metadata/connection if provided. + endpoint: Optional override for the workflow endpoint (dynamic). + description: Optional human-readable description of what this tests. + skip: If True, this scenario will be skipped during test execution. + skip_reason: Reason for skipping (shown in test output). + expected_data: Optional path to a JSON file containing expected metadata output. + When set, the framework will poll for workflow completion and compare + actual extracted metadata against the expected baseline. + extracted_output_base_path: Optional base directory where connector writes + extracted output. Falls back to the class-level attribute if not set. + output_subdirectory: Subdirectory within the run output dir to search for + JSONL files. Defaults to "transformed". Set to "" to search the + entire run directory. + strict_comparison: If True, extra assets in actual output that are not in + the expected JSON will cause the test to fail. Defaults to True. + workflow_timeout: Seconds to wait for workflow completion. Defaults to 300. + polling_interval: Seconds between workflow status polls. Defaults to 10. + ignored_fields: Set of attribute field names to skip during comparison + (e.g., dynamic fields like qualifiedName that change between runs). + If not provided, a default set of dynamic fields is used. + config_action: For config API scenarios, "get" or "update". + config_workflow_id: Workflow ID for config GET/POST. Can be a string + or a callable (e.g., lambda that reads from shared state). + config_payload: For config update, the payload to send. + schema_base_path: Base directory containing pandera YAML schemas for + data validation. When set, extracted output files are validated + against the pandera schemas after workflow completion. + """ + + name: str + api: str + assert_that: Dict[str, Predicate] + credentials: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None + connection: Optional[Dict[str, Any]] = None + args: Optional[Union[Dict[str, Any], LazyValue]] = None + endpoint: Optional[str] = None + description: str = "" + skip: bool = False + skip_reason: str = "" + expected_data: Optional[str] = None + extracted_output_base_path: Optional[str] = None + output_subdirectory: str = "transformed" + strict_comparison: bool = True + workflow_timeout: int = 300 + polling_interval: int = 10 + ignored_fields: Optional[Set[str]] = None + config_action: Optional[str] = None + config_workflow_id: Optional[Any] = None + config_payload: Optional[Dict[str, Any]] = None + schema_base_path: Optional[str] = None + + def __post_init__(self): + """Validate the scenario after initialization.""" + if not self.name: + raise ValueError("Scenario name cannot be empty") + + if not self.api: + raise ValueError("Scenario api cannot be empty") + + # Validate API type + valid_apis = [e.value for e in APIType] + if self.api.lower() not in valid_apis: + raise ValueError( + f"Invalid API type: '{self.api}'. Must be one of: {valid_apis}" + ) + + if not self.assert_that: + raise ValueError("Scenario must have at least one assertion") + + if self.expected_data and self.api.lower() != "workflow": + raise ValueError( + "expected_data can only be set for workflow scenarios, " + f"but api is '{self.api}'" + ) + + if self.expected_data and not os.path.isfile(self.expected_data): + raise FileNotFoundError( + f"Scenario '{self.name}': expected_data file not found: " + f"{self.expected_data}" + ) + + if self.api.lower() == "config": + if self.config_action not in ("get", "update"): + raise ValueError( + f"config_action must be 'get' or 'update' for config scenarios, " + f"got: {self.config_action!r}" + ) + if self.config_workflow_id is None: + raise ValueError("config_workflow_id is required for config scenarios") + + @property + def api_type(self) -> APIType: + """Get the API type as an enum value.""" + return APIType.from_string(self.api) + + @property + def uses_default_credentials(self) -> bool: + """Check if this scenario uses default (auto-loaded) credentials.""" + return self.credentials is None and self.args is None + + +@dataclass +class ScenarioResult: + """Result of executing a single scenario. + + Attributes: + scenario: The scenario that was executed. + success: Whether all assertions passed. + response: The raw API response. + assertion_results: Dictionary mapping assertion paths to pass/fail details. + error: Exception if the scenario failed unexpectedly. + duration_ms: Time taken to execute the scenario in milliseconds. + """ + + scenario: Scenario + success: bool + response: Optional[Dict[str, Any]] = None + assertion_results: Dict[str, Any] = field(default_factory=dict) + error: Optional[Exception] = None + duration_ms: float = 0.0 + + def __str__(self) -> str: + """Human-readable representation of the result.""" + status = "PASSED" if self.success else "FAILED" + msg = f"[{status}] {self.scenario.name}" + if not self.success and self.error: + msg += f" - Error: {self.error}" + elif not self.success: + failed = [ + k + for k, v in self.assertion_results.items() + if not v.get("passed", False) + ] + msg += f" - Failed assertions: {failed}" + return msg diff --git a/application_sdk/test_utils/integration/runner.py b/application_sdk/test_utils/integration/runner.py new file mode 100644 index 000000000..5ed39d4c0 --- /dev/null +++ b/application_sdk/test_utils/integration/runner.py @@ -0,0 +1,940 @@ +"""Test runner for integration testing framework. + +This module provides the BaseIntegrationTest class that executes scenarios +and validates assertions. It integrates with pytest for test discovery +and execution. + +Key Features: +- Auto-discovers credentials from E2E_* environment variables +- Auto-generates individual pytest test methods per scenario +- Server health check before tests run +- Rich assertion error messages showing actual vs expected +- Declarative scenario execution with no boilerplate + +Example (simplified - no helper functions needed): + >>> from application_sdk.test_utils.integration import ( + ... BaseIntegrationTest, Scenario, equals + ... ) + >>> + >>> scenarios = [ + ... Scenario( + ... name="auth_valid", + ... api="auth", + ... assert_that={"success": equals(True)} + ... ) + ... ] + >>> + >>> class MyConnectorTest(BaseIntegrationTest): + ... scenarios = scenarios + ... # Credentials auto-loaded from E2E_{APP_NAME}_* env vars +""" + +import os +import time +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Type + +import orjson +import pytest +import requests as http_requests + +from application_sdk.observability.logger_adaptor import get_logger + +from .client import IntegrationTestClient +from .comparison import compare_metadata, load_actual_output, load_expected_data +from .lazy import evaluate_if_lazy +from .models import APIType, Scenario, ScenarioResult +from .validation import format_validation_report, validate_with_pandera + +logger = get_logger(__name__) + +# Sentinel value to distinguish "not provided" from None +_NOT_PROVIDED = object() + + +def _auto_discover_credentials(scenario_name: str = "") -> Dict[str, Any]: + """Auto-discover credentials from E2E_* environment variables. + + Checks for scenario-specific env vars first, then falls back to + app-level defaults. + + Resolution order: + 1. E2E_{SCENARIO_NAME}_* (scenario-specific) + 2. E2E_{APP_NAME}_* (app-level default) + + For example, with ATLAN_APPLICATION_NAME=postgres and + scenario_name="preflight_missing_permissions": + + Scenario-specific (checked first): + E2E_PREFLIGHT_MISSING_PERMISSIONS_USERNAME=restricted_user + + App-level fallback: + E2E_POSTGRES_USERNAME=admin + E2E_POSTGRES_PASSWORD=secret + E2E_POSTGRES_PORT=5432 + + Result: {"username": "restricted_user", "password": "secret", "port": 5432} + + Args: + scenario_name: The scenario name used to look up scenario-specific + env vars. If empty, only app-level defaults are returned. + + Returns: + Dict[str, Any]: Auto-discovered credentials from env vars. + """ + app_name = os.getenv("ATLAN_APPLICATION_NAME", "").upper() + if not app_name: + logger.warning( + "ATLAN_APPLICATION_NAME not set. Cannot auto-discover credentials. " + "Set it in your .env file or environment." + ) + return {} + + # Collect app-level defaults: E2E_{APP_NAME}_* + app_prefix = f"E2E_{app_name}_" + app_credentials = _collect_env_credentials(app_prefix) + + if app_credentials: + logger.info( + f"Auto-discovered {len(app_credentials)} credential fields " + f"from {app_prefix}* env vars: {list(app_credentials.keys())}" + ) + else: + logger.warning( + f"No {app_prefix}* environment variables found. " + f"Set them in your .env file or environment." + ) + + # Check for scenario-specific overrides: E2E_{SCENARIO_NAME}_* + if scenario_name: + scenario_prefix = f"E2E_{scenario_name.upper()}_" + scenario_credentials = _collect_env_credentials(scenario_prefix) + + if scenario_credentials: + logger.info( + f"Found {len(scenario_credentials)} scenario-specific credential " + f"fields from {scenario_prefix}* env vars: " + f"{list(scenario_credentials.keys())}" + ) + # Scenario-specific vars override app-level defaults + return {**app_credentials, **scenario_credentials} + + return app_credentials + + +def _collect_env_credentials(prefix: str) -> Dict[str, Any]: + """Collect credentials from environment variables matching a prefix. + + Args: + prefix: The env var prefix to match (e.g. "E2E_POSTGRES_"). + + Returns: + Dict[str, Any]: Credentials extracted from matching env vars. + """ + credentials: Dict[str, Any] = {} + + for key, value in os.environ.items(): + if key.startswith(prefix): + field_name = key[len(prefix) :].lower() + # Auto-convert numeric values (e.g., port) + if value.isdigit(): + value = int(value) + credentials[field_name] = value + + return credentials + + +def _auto_discover_server() -> str: + """Auto-discover the app server URL from environment variables. + + Reads ATLAN_APP_HTTP_HOST and ATLAN_APP_HTTP_PORT to build the server URL. + + Returns: + str: The server URL (e.g., "http://localhost:8000"). + """ + host = os.getenv("ATLAN_APP_HTTP_HOST", "localhost") + port = os.getenv("ATLAN_APP_HTTP_PORT", "8000") + # 0.0.0.0 isn't reachable from the test client, use localhost instead + if host == "0.0.0.0": + host = "localhost" + return f"http://{host}:{port}" + + +def _check_server_health(server_url: str, timeout: int = 5) -> bool: + """Check if the application server is running and reachable. + + Args: + server_url: The server URL to check. + timeout: Timeout in seconds. + + Returns: + bool: True if server is reachable, False otherwise. + """ + try: + response = http_requests.get(f"{server_url}/server/health", timeout=timeout) + return response.status_code == 200 + except http_requests.ConnectionError: + return False + except Exception: + # Server is reachable but health endpoint might not exist + # That's OK - at least the server is running + return True + + +class BaseIntegrationTest: + """Base class for integration tests. + + Subclasses just define scenarios and the framework handles everything else: + credentials from env vars, server discovery, test method generation, + and assertion validation. + + Minimal Example: + >>> class TestMyConnector(BaseIntegrationTest): + ... scenarios = [ + ... Scenario(name="auth_works", api="auth", + ... assert_that={"success": equals(True)}) + ... ] + + Class Attributes: + scenarios: List of Scenario objects to execute. + server_host: Base URL of the app server (auto-discovered from env if not set). + server_version: API version prefix (default: "v1"). + workflow_endpoint: Default endpoint for workflow API (default: "/start"). + timeout: Request timeout in seconds (default: 30). + default_credentials: Extra credential fields merged with auto-discovered ones. + default_metadata: Default metadata for preflight/workflow tests. + default_connection: Default connection info for workflow tests. + skip_server_check: Set True to skip the server health check. + extracted_output_base_path: Base directory where connector writes extracted + output. Used for metadata output validation when scenarios set expected_data. + + Hooks: + setup_test_environment: Called before any tests run. + cleanup_test_environment: Called after all tests complete. + build_credentials: Transform auto-discovered credentials before use. + before_scenario: Called before each scenario. + after_scenario: Called after each scenario. + """ + + # Scenario definitions - subclasses should override + scenarios: List[Scenario] = [] + + # Server configuration (auto-discovered from env if not set) + server_host: str = "" + server_version: str = "v1" + workflow_endpoint: str = "/start" + timeout: int = 30 + + # Default values merged with auto-discovered credentials + default_credentials: Dict[str, Any] = {} + default_metadata: Dict[str, Any] = {} + default_connection: Dict[str, Any] = {} + + # Skip server health check (useful for debugging) + skip_server_check: bool = False + + # Base path for extracted output (used by metadata output validation) + extracted_output_base_path: str = "" + + # Base path for pandera YAML schemas (used by data validation) + schema_base_path: str = "" + + # Internal state + client: IntegrationTestClient + _results: List[ScenarioResult] + + def __init_subclass__(cls, **kwargs): + """Auto-generate individual test methods for each scenario. + + This runs when a subclass is defined, creating test_ + methods so pytest shows each scenario as a separate test. + """ + super().__init_subclass__(**kwargs) + + # Only generate if the subclass defines its own scenarios + if "scenarios" in cls.__dict__ and cls.scenarios: + _generate_individual_tests(cls) + + @classmethod + def setup_class(cls) -> None: + """Set up the test class before any tests run. + + This method: + 1. Auto-discovers server URL and credentials from env vars + 2. Checks if the server is running + 3. Initializes the API client + 4. Calls the setup_test_environment hook + """ + # Auto-discover server URL if not explicitly set + if not cls.server_host: + cls.server_host = _auto_discover_server() + logger.info(f"Auto-discovered server: {cls.server_host}") + + # Server health check + if not cls.skip_server_check: + if not _check_server_health(cls.server_host): + pytest.fail( + f"\n{'=' * 60}\n" + f"SERVER NOT RUNNING at {cls.server_host}\n" + f"{'=' * 60}\n" + f"Start the application server before running integration tests:\n" + f" uv run python main.py\n" + f"{'=' * 60}" + ) + + # Validate that app-level credentials are discoverable (early warning) + _auto_discover_credentials() + + # Initialize the client + cls.client = IntegrationTestClient( + host=cls.server_host, + version=cls.server_version, + workflow_endpoint=cls.workflow_endpoint, + timeout=cls.timeout, + ) + + # Initialize results tracking + cls._results = [] + + # Call user-defined setup hook + if hasattr(cls, "setup_test_environment"): + logger.info("Running setup_test_environment hook") + cls.setup_test_environment() + + @classmethod + def teardown_class(cls) -> None: + """Tear down the test class after all tests complete.""" + logger.info(f"Tearing down integration test class: {cls.__name__}") + + # Call user-defined cleanup hook + if hasattr(cls, "cleanup_test_environment"): + logger.info("Running cleanup_test_environment hook") + cls.cleanup_test_environment() + + # Log summary + if cls._results: + passed = sum(1 for r in cls._results if r.success) + failed = sum(1 for r in cls._results if not r.success) + total = len(cls._results) + logger.info( + f"Integration test summary: {passed}/{total} passed" + + (f", {failed} failed" if failed else "") + ) + + # Write machine-readable summary for CI + try: + cls._write_summary() + except Exception as e: + logger.warning(f"Failed to write test summary: {e}") + + @classmethod + def _write_summary(cls) -> Optional[str]: + """Write a machine-readable JSON summary of all test results. + + Writes to the path specified by INTEGRATION_TEST_SUMMARY_PATH env var, + defaulting to ./integration-test-summary.json. + + Returns: + The path the summary was written to, or None if no results. + """ + if not cls._results: + return None + + summary_path = os.getenv( + "INTEGRATION_TEST_SUMMARY_PATH", + "./integration-test-summary.json", + ) + + passed = sum(1 for r in cls._results if r.success) + failed = sum( + 1 + for r in cls._results + if not r.success + and r.error + and not isinstance(r.error, pytest.skip.Exception) + ) + skipped = sum( + 1 + for r in cls._results + if r.error and isinstance(r.error, pytest.skip.Exception) + ) + total = len(cls._results) + + scenarios_data = [] + for result in cls._results: + scenario_entry: Dict[str, Any] = { + "name": result.scenario.name, + "api": result.scenario.api, + "status": "passed" + if result.success + else ( + "skipped" + if result.error and isinstance(result.error, pytest.skip.Exception) + else "failed" + ), + "duration_ms": round(result.duration_ms, 2), + "description": result.scenario.description or "", + } + + if result.assertion_results: + scenario_entry["assertions"] = result.assertion_results + + if not result.success and result.error: + scenario_entry["error"] = str(result.error) + + if result.scenario.expected_data: + scenario_entry["metadata_validation"] = { + "expected_file": result.scenario.expected_data, + "strict": result.scenario.strict_comparison, + } + + scenarios_data.append(scenario_entry) + + summary = { + "app_name": os.getenv("ATLAN_APPLICATION_NAME", "unknown"), + "timestamp": datetime.now(timezone.utc).isoformat(), + "server_url": cls.server_host, + "test_class": cls.__name__, + "total": total, + "passed": passed, + "failed": failed, + "skipped": skipped, + "scenarios": scenarios_data, + } + + summary_dir = os.path.dirname(os.path.abspath(summary_path)) + if summary_dir: + os.makedirs(summary_dir, exist_ok=True) + with open(summary_path, "wb") as f: + f.write(orjson.dumps(summary, option=orjson.OPT_INDENT_2)) + + logger.info(f"Integration test summary written to {summary_path}") + return summary_path + + def _build_scenario_args(self, scenario: Scenario) -> Dict[str, Any]: + """Build the API args for a scenario. + + Priority order for credentials: + 1. scenario.args (full override, backward compat) - used as-is + 2. scenario.credentials (explicit dict override) + 3. cls.default_credentials (class-level defaults) + 4. E2E_{SCENARIO_NAME}_* env vars (scenario-specific) + 5. E2E_{APP_NAME}_* env vars (app-level defaults) + + Args: + scenario: The scenario to build args for. + + Returns: + Dict[str, Any]: The complete args dict for the API call. + """ + # If scenario has explicit args (backward compat), use those + if scenario.args is not None: + return evaluate_if_lazy(scenario.args) + + # Build credentials: env vars -> class defaults -> scenario overrides + if scenario.credentials is not None: + # Scenario provides explicit credentials - use as-is + credentials = scenario.credentials + else: + # Discover credentials with scenario-specific overrides + env_credentials = _auto_discover_credentials(scenario.name) + # Merge env vars + class defaults + credentials = {**env_credentials, **self.default_credentials} + # Apply build_credentials hook if defined + if hasattr(self, "build_credentials") and callable(self.build_credentials): + credentials = self.build_credentials(credentials) + + args = {"credentials": credentials} + + # Add metadata for preflight and workflow + if scenario.api.lower() in ("preflight", "workflow"): + if scenario.metadata is not None: + metadata = scenario.metadata + else: + metadata = {**self.default_metadata} + args["metadata"] = metadata + + # Add connection for workflow + if scenario.api.lower() == "workflow": + if scenario.connection is not None: + connection = scenario.connection + else: + connection = {**self.default_connection} + args["connection"] = connection + + # Config API — pass action, workflow_id, and optional payload + if scenario.api.lower() == "config": + workflow_id = scenario.config_workflow_id + # Support callable workflow_id (e.g., lambda reading shared state) + if callable(workflow_id): + workflow_id = workflow_id() + args["config_action"] = scenario.config_action + args["config_workflow_id"] = workflow_id + if scenario.config_payload is not None: + args["config_payload"] = scenario.config_payload + + return args + + def _execute_scenario(self, scenario: Scenario) -> ScenarioResult: + """Execute a single scenario and return the result. + + Args: + scenario: The scenario to execute. + + Returns: + ScenarioResult: The result of the scenario execution. + """ + logger.info(f"Executing scenario: {scenario.name}") + + # Check if scenario should be skipped + if scenario.skip: + logger.info(f"Skipping scenario: {scenario.name} - {scenario.skip_reason}") + pytest.skip(scenario.skip_reason or "Scenario marked as skip") + + start_time = time.time() + result = ScenarioResult(scenario=scenario, success=False) + + try: + # Call before_scenario hook if defined + if hasattr(self, "before_scenario"): + self.before_scenario(scenario) + + # Step 1: Build args (auto-fill from env if needed) + args = self._build_scenario_args(scenario) + logger.debug(f"Built args for {scenario.name}") + + # Step 2: Call the API + endpoint = scenario.endpoint or self.workflow_endpoint + response = self.client.call_api( + api=scenario.api, + args=args, + endpoint_override=endpoint if scenario.api == "workflow" else None, + ) + result.response = response + logger.debug(f"API response for {scenario.name}: {response}") + + # Step 3: Validate assertions with rich error messages + assertion_results = self._validate_assertions( + response, scenario.assert_that + ) + result.assertion_results = assertion_results + + # Check if all assertions passed + all_passed = all(r["passed"] for r in assertion_results.values()) + result.success = all_passed + + if not all_passed: + failed_details = [] + for path, detail in assertion_results.items(): + if not detail["passed"]: + desc_suffix = "" + if detail.get("description"): + desc_suffix = f"\n \u2192 {detail['description']}" + failed_details.append( + f" - {path}: expected {detail['expected']}, " + f"got {detail['actual']!r}{desc_suffix}" + ) + error_msg = ( + f"Assertions failed for scenario '{scenario.name}':\n" + + "\n".join(failed_details) + ) + logger.error(error_msg) + raise AssertionError(error_msg) + + # Step 4: Poll for workflow completion if any output validation is needed + needs_metadata = ( + scenario.expected_data and scenario.api_type == APIType.WORKFLOW + ) + schema_path = scenario.schema_base_path or self.schema_base_path + needs_pandera = bool(schema_path) and scenario.api_type == APIType.WORKFLOW + + if needs_metadata or needs_pandera: + self._ensure_workflow_completed(scenario, response) + + # Step 5: Validate metadata output if expected_data is set + if needs_metadata: + self._validate_workflow_output(scenario, response) + + # Step 6: Validate data with pandera if schema_base_path is set + if needs_pandera: + self._validate_pandera_schemas(scenario, response, schema_path) + + logger.info(f"Scenario {scenario.name} passed") + + except Exception as e: + result.error = e + result.success = False + if not isinstance(e, (AssertionError, pytest.skip.Exception)): + logger.error(f"Scenario {scenario.name} failed with error: {e}") + raise + + finally: + result.duration_ms = (time.time() - start_time) * 1000 + self._results.append(result) + + # Call after_scenario hook if defined + if hasattr(self, "after_scenario"): + self.after_scenario(scenario, result) + + return result + + def _validate_assertions( + self, + response: Dict[str, Any], + assertions: Dict[str, Any], + ) -> Dict[str, Dict[str, Any]]: + """Validate all assertions against the response. + + Returns rich results with actual/expected values for error messages. + + Args: + response: The API response dictionary. + assertions: Dictionary mapping paths to predicates. + + Returns: + Dict[str, Dict]: Dict mapping paths to {passed, actual, expected}. + """ + results = {} + + for path, predicate in assertions.items(): + actual = self._get_nested_value(response, path) + expected_desc = getattr(predicate, "__doc__", str(predicate)) + custom_desc = getattr(predicate, "description", None) + + try: + passed = predicate(actual) + result_entry = { + "passed": passed, + "actual": actual, + "expected": expected_desc, + } + if custom_desc: + result_entry["description"] = custom_desc + results[path] = result_entry + if not passed: + logger.debug( + f"Assertion failed: {path} - " + f"expected {expected_desc}, got {actual!r}" + ) + except Exception as e: + logger.error(f"Assertion error for {path}: {e}") + result_entry = { + "passed": False, + "actual": actual, + "expected": expected_desc, + "error": str(e), + } + if custom_desc: + result_entry["description"] = custom_desc + results[path] = result_entry + + return results + + def _ensure_workflow_completed( + self, scenario: Scenario, response: Dict[str, Any] + ) -> None: + """Poll for workflow completion. Called before any output validation. + + Args: + scenario: The scenario being executed. + response: The workflow start API response containing workflow_id/run_id. + + Raises: + AssertionError: If workflow doesn't complete successfully. + """ + data = response.get("data", {}) + workflow_id = data.get("workflow_id") + run_id = data.get("run_id") + + if not workflow_id or not run_id: + raise AssertionError( + f"Cannot validate workflow output for scenario '{scenario.name}': " + f"response missing workflow_id or run_id" + ) + + logger.info( + f"Waiting for workflow completion: {workflow_id}/{run_id} " + f"(timeout={scenario.workflow_timeout}s)" + ) + final_status = self._poll_workflow_completion( + workflow_id=workflow_id, + run_id=run_id, + timeout=scenario.workflow_timeout, + interval=scenario.polling_interval, + ) + + if final_status != "COMPLETED": + raise AssertionError( + f"Workflow did not complete successfully for scenario " + f"'{scenario.name}': status={final_status}" + ) + + def _validate_workflow_output( + self, scenario: Scenario, response: Dict[str, Any] + ) -> None: + """Validate workflow output against expected metadata baseline. + + Assumes workflow has already completed (called after _ensure_workflow_completed). + + Args: + scenario: The scenario with expected_data set. + response: The workflow start API response containing workflow_id/run_id. + + Raises: + AssertionError: If metadata validation fails. + """ + data = response.get("data", {}) + workflow_id = data.get("workflow_id") + run_id = data.get("run_id") + + # Resolve extracted output base path + base_path = ( + scenario.extracted_output_base_path or self.extracted_output_base_path + ) + if not base_path: + raise AssertionError( + f"Cannot validate workflow output for scenario '{scenario.name}': " + f"extracted_output_base_path not set on scenario or test class" + ) + + # Load actual and expected data + logger.info(f"Loading actual output from {base_path}/{workflow_id}/{run_id}") + actual = load_actual_output( + base_path, + workflow_id, + run_id, + subdirectory=scenario.output_subdirectory, + ) + + logger.info(f"Loading expected data from {scenario.expected_data}") + expected = load_expected_data(scenario.expected_data) + + # Compare + gap_report = compare_metadata( + expected=expected, + actual=actual, + strict=scenario.strict_comparison, + ignored_fields=scenario.ignored_fields, + expected_file=scenario.expected_data, + ) + + if gap_report.has_gaps: + raise AssertionError( + f"Metadata validation failed for scenario '{scenario.name}' " + f"(baseline: {scenario.expected_data}):\n\n" + + gap_report.format_report() + ) + + logger.info( + f"Metadata validation passed for scenario '{scenario.name}': " + f"{len(actual)} assets match expected baseline" + ) + + def _validate_pandera_schemas( + self, + scenario: Scenario, + response: Dict[str, Any], + schema_base_path: str, + ) -> None: + """Validate extracted workflow output against pandera YAML schemas. + + This runs after the workflow completes and validates the actual + extracted data against pandera schemas for column types, value + ranges, and record counts. + + Args: + scenario: The scenario with schema_base_path set. + response: The workflow start API response containing workflow_id/run_id. + schema_base_path: Path to directory containing pandera YAML schemas. + + Raises: + AssertionError: If pandera validation fails. + """ + data = response.get("data", {}) + workflow_id = data.get("workflow_id") + run_id = data.get("run_id") + + if not workflow_id or not run_id: + raise AssertionError( + f"Cannot validate pandera schemas for scenario '{scenario.name}': " + f"response missing workflow_id or run_id" + ) + + # Build the extracted output path + base_path = ( + scenario.extracted_output_base_path or self.extracted_output_base_path + ) + if not base_path: + raise AssertionError( + f"Cannot validate pandera schemas for scenario '{scenario.name}': " + f"extracted_output_base_path not set on scenario or test class" + ) + + extracted_output_path = f"{base_path}/{workflow_id}/{run_id}" + + logger.info( + f"Running pandera validation for scenario '{scenario.name}' " + f"using schemas from {schema_base_path}" + ) + + results = validate_with_pandera( + schema_base_path=schema_base_path, + extracted_output_path=extracted_output_path, + subdirectory=scenario.output_subdirectory, + ) + + # Check if any validations failed + failures = [r for r in results if not r["success"]] + if failures: + report = format_validation_report(results) + raise AssertionError( + f"Pandera validation failed for scenario '{scenario.name}':\n\n" + + report + ) + + total_records = sum(r["record_count"] for r in results) + logger.info( + f"Pandera validation passed for scenario '{scenario.name}': " + f"{len(results)} schemas, {total_records} total records validated" + ) + + def _poll_workflow_completion( + self, + workflow_id: str, + run_id: str, + timeout: int, + interval: int, + ) -> str: + """Poll the workflow status until completion or timeout. + + Args: + workflow_id: The workflow ID. + run_id: The run ID. + timeout: Maximum seconds to wait. + interval: Seconds between polls. + + Returns: + str: The final workflow status (e.g., "COMPLETED", "FAILED"). + + Raises: + TimeoutError: If the workflow does not complete within the timeout. + """ + start_time = time.time() + + while True: + status_response = self.client.get_workflow_status(workflow_id, run_id) + + if not status_response.get("success", False): + logger.warning(f"Workflow status check failed: {status_response}") + # Continue polling — transient failures are possible + else: + current_status = status_response.get("data", {}).get("status", "") + logger.debug(f"Workflow status: {current_status}") + + if current_status != "RUNNING": + return current_status + + elapsed = time.time() - start_time + if elapsed > timeout: + raise TimeoutError( + f"Workflow {workflow_id}/{run_id} did not complete " + f"within {timeout}s (elapsed: {elapsed:.0f}s)" + ) + + time.sleep(interval) + + def _get_nested_value(self, data: Dict[str, Any], path: str) -> Any: + """Get a value from a nested dictionary using dot notation. + + Args: + data: The dictionary to search. + path: Dot-separated path (e.g., "data.workflow_id"). + + Returns: + Any: The value at the path, or None if not found. + """ + if not path: + return data + + parts = path.split(".") + current = data + + for part in parts: + if current is None: + return None + + if isinstance(current, dict): + current = current.get(part) + elif isinstance(current, list) and part.isdigit(): + index = int(part) + current = current[index] if 0 <= index < len(current) else None + else: + return None + + return current + + def _run_all_scenarios(self) -> None: + """Execute all scenarios sequentially (backward compatibility). + + Not prefixed with test_ so pytest does not discover it. + Only used when __init_subclass__ did not generate individual tests. + """ + if not self.scenarios: + pytest.skip("No scenarios defined") + + for scenario in self.scenarios: + self._execute_scenario(scenario) + + +def _generate_individual_tests(test_class: Type[BaseIntegrationTest]) -> None: + """Generate individual test methods for each scenario on the class. + + Each scenario becomes test_ so pytest shows them separately. + + Args: + test_class: The test class to add methods to. + """ + for scenario in test_class.scenarios: + method_name = f"test_{scenario.name}" + + def make_test(s: Scenario): + def test_method(self): + self._execute_scenario(s) + + test_method.__doc__ = s.description or f"Test scenario: {s.name}" + return test_method + + setattr(test_class, method_name, make_test(scenario)) + + +# ============================================================================= +# Public API (backward compat) +# ============================================================================= + + +def generate_test_methods(test_class: Type[BaseIntegrationTest]) -> None: + """Generate individual test methods for each scenario. + + NOTE: This is now done automatically via __init_subclass__ when you + define scenarios on your test class. You don't need to call this manually. + + Args: + test_class: The test class to add methods to. + """ + _generate_individual_tests(test_class) + + +def parametrize_scenarios(scenarios: List[Scenario]): + """Create a pytest parametrize decorator for scenarios. + + Args: + scenarios: List of scenarios to parametrize. + + Returns: + A pytest.mark.parametrize decorator. + """ + return pytest.mark.parametrize( + "scenario", + scenarios, + ids=[s.name for s in scenarios], + ) diff --git a/application_sdk/test_utils/integration/validation.py b/application_sdk/test_utils/integration/validation.py new file mode 100644 index 000000000..3decc9c5a --- /dev/null +++ b/application_sdk/test_utils/integration/validation.py @@ -0,0 +1,289 @@ +"""Pandera-based data validation for integration testing. + +This module provides schema-based validation of extracted output files +using pandera YAML schemas. It validates that the actual data produced +by a workflow conforms to expected column types, value ranges, and +record counts. + +This is separate from `comparison.py` (which diffs metadata assets) — +pandera validates column-level schema on raw output files. + +Usage: + Define pandera schemas as YAML files in a directory structure + that mirrors the extracted output structure: + + tests/integration/schema/ + Database/schema.yaml + Table/schema.yaml + Column/schema.yaml + + Then set `schema_base_path` on the scenario or test class: + + >>> Scenario( + ... name="workflow_with_validation", + ... api="workflow", + ... schema_base_path="tests/integration/schema", + ... assert_that={"success": equals(True)}, + ... ) +""" + +import os +from glob import glob +from typing import Any, Dict, List + +import orjson +import pandas as pd +import pandera.extensions as extensions +from pandera.io import from_yaml + +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + + +# ============================================================================= +# Custom Pandera Check Methods +# ============================================================================= + + +@extensions.register_check_method(statistics=["expected_record_count"]) +def check_record_count_ge(df: pd.DataFrame, *, expected_record_count: int) -> bool: + """Validate that a DataFrame has at least the expected number of records. + + This is registered as a custom pandera check method that can be used + in YAML schema files: + + checks: + check_record_count_ge: + expected_record_count: 10 + + Args: + df: The DataFrame to validate. + expected_record_count: Minimum expected row count. + + Returns: + bool: True if the record count is sufficient. + + Raises: + ValueError: If the record count is below the expected minimum. + """ + if df.shape[0] >= expected_record_count: + return True + raise ValueError( + f"Expected record count >= {expected_record_count}, got: {df.shape[0]}" + ) + + +# ============================================================================= +# Data Loading +# ============================================================================= + + +def get_normalised_dataframe(extracted_file_path: str) -> pd.DataFrame: + """Read extracted output files and normalize into a DataFrame. + + Supports JSON (line-delimited) and Parquet files. All files in the + directory tree are merged into a single DataFrame. + + Args: + extracted_file_path: Directory containing extracted output files. + + Returns: + pd.DataFrame: Normalized DataFrame with all records. + + Raises: + FileNotFoundError: If no data files are found. + """ + data: List[Dict[str, Any]] = [] + + # Search for JSON and parquet files + json_files = glob(f"{extracted_file_path}/**/*.json", recursive=True) + parquet_files = glob(f"{extracted_file_path}/**/*.parquet", recursive=True) + files_list = json_files or parquet_files + + for f_name in files_list or []: + if f_name.endswith(".parquet"): + df = pd.read_parquet(f_name) + data.extend(df.to_dict(orient="records")) + elif f_name.endswith(".json"): + with open(f_name, "rb") as f: + data.extend([orjson.loads(line) for line in f]) + + if not data: + raise FileNotFoundError( + f"No data found in extracted directory: {extracted_file_path}" + ) + + return pd.json_normalize(data) + + +def get_schema_file_paths(schema_base_path: str) -> List[str]: + """Find all pandera YAML schema files in the given directory. + + Recursively searches for .yaml and .yml files. + + Args: + schema_base_path: Root directory containing schema files. + + Returns: + List[str]: Sorted list of schema file paths. + + Raises: + FileNotFoundError: If no schema files are found. + """ + search_pattern = f"{schema_base_path}/**/*" + yaml_files = glob(f"{search_pattern}.yaml", recursive=True) + glob( + f"{search_pattern}.yml", recursive=True + ) + + if not yaml_files: + raise FileNotFoundError(f"No pandera schema files found in: {schema_base_path}") + + return sorted(yaml_files) + + +# ============================================================================= +# Validation +# ============================================================================= + + +def validate_with_pandera( + schema_base_path: str, + extracted_output_path: str, + subdirectory: str = "transformed", +) -> List[Dict[str, Any]]: + """Validate extracted output against pandera YAML schemas. + + For each schema file found under `schema_base_path`, the validator: + 1. Derives the corresponding extracted output subdirectory + 2. Loads and normalizes the extracted data into a DataFrame + 3. Validates the DataFrame against the pandera schema + + The schema file path relative to `schema_base_path` determines which + subdirectory of extracted output to validate. For example: + + Schema: tests/integration/schema/Database/schema.yaml + Output: {extracted_output_path}/{subdirectory}/Database/ + + Args: + schema_base_path: Root directory containing pandera YAML schemas. + extracted_output_path: Root directory of extracted workflow output + (typically: base_path/workflow_id/run_id). + subdirectory: Subdirectory within output path to search. + Defaults to "transformed". + + Returns: + List of validation result dicts, each containing: + - schema_file: Path to the schema file + - entity: The entity type (e.g., "Database", "Table") + - success: Whether validation passed + - error: Error message if validation failed (None if passed) + - record_count: Number of records validated + + Raises: + FileNotFoundError: If schema_base_path doesn't exist or has no schemas. + """ + if not os.path.exists(schema_base_path): + raise FileNotFoundError(f"Schema base path not found: {schema_base_path}") + + schema_files = get_schema_file_paths(schema_base_path) + results: List[Dict[str, Any]] = [] + + output_base = ( + os.path.join(extracted_output_path, subdirectory) + if subdirectory + else extracted_output_path + ) + + for schema_file in schema_files: + # Derive the entity type from the schema file path + # e.g., "tests/schema/Database/schema.yaml" -> "Database" + relative_path = schema_file.replace(schema_base_path, "") + entity = ( + relative_path.replace(".yaml", "") + .replace(".yml", "") + .strip(os.sep) + .split(os.sep)[0] + if os.sep in relative_path.strip(os.sep) + else os.path.splitext(os.path.basename(schema_file))[0] + ) + + # Derive the extracted data path + # Schema path relative to base determines output subdirectory + extracted_path_suffix = ( + relative_path.replace(".yaml", "").replace(".yml", "").strip(os.sep) + ) + # Remove the schema filename part (e.g., "Database/schema" -> "Database") + if os.sep in extracted_path_suffix: + extracted_path_suffix = os.path.dirname(extracted_path_suffix) + + extracted_file_path = os.path.join(output_base, extracted_path_suffix) + + result: Dict[str, Any] = { + "schema_file": schema_file, + "entity": entity, + "success": False, + "error": None, + "record_count": 0, + } + + try: + logger.info(f"Validating {entity} data against {schema_file}") + + # Load pandera schema from YAML + schema = from_yaml(schema_file) + + # Load and normalize extracted data + dataframe = get_normalised_dataframe(extracted_file_path) + result["record_count"] = len(dataframe) + + # Validate with lazy error reporting + schema.validate(dataframe, lazy=True) + + result["success"] = True + logger.info( + f"Validation passed for {entity}: " + f"{result['record_count']} records validated" + ) + + except FileNotFoundError as e: + result["error"] = str(e) + logger.warning(f"Skipping {entity} validation: {e}") + except Exception as e: + result["error"] = str(e) + logger.error(f"Validation failed for {entity}: {e}") + + results.append(result) + + return results + + +def format_validation_report(results: List[Dict[str, Any]]) -> str: + """Format pandera validation results into a human-readable report. + + Args: + results: List of validation result dicts from validate_with_pandera. + + Returns: + str: Formatted report string. + """ + lines = ["Pandera Data Validation Report:", ""] + passed = sum(1 for r in results if r["success"]) + failed = sum(1 for r in results if not r["success"]) + total = len(results) + + lines.append(f"Summary: {passed}/{total} passed, {failed} failed") + lines.append("") + + for result in results: + status = "PASS" if result["success"] else "FAIL" + line = f" [{status}] {result['entity']} ({result['record_count']} records)" + if result["error"]: + # Truncate long error messages + error_preview = result["error"][:200] + if len(result["error"]) > 200: + error_preview += "..." + line += f"\n Error: {error_preview}" + lines.append(line) + + return "\n".join(lines) diff --git a/docs/guides/integration-testing.md b/docs/guides/integration-testing.md new file mode 100644 index 000000000..716e549b3 --- /dev/null +++ b/docs/guides/integration-testing.md @@ -0,0 +1,547 @@ +# Integration Testing Guide + +This guide explains how to write integration tests for your connector using the Apps-SDK integration testing framework. + +## Overview + +The integration testing framework provides a **declarative, data-driven** approach to testing. Instead of writing procedural test code, you define **scenarios** that specify: + +- What API to test +- What inputs to provide +- What outputs to expect + +The framework handles the rest: calling APIs, validating assertions, and reporting results. + +## Why Use This Framework? + +### For External Developers + +- **Easy to Use**: Define scenarios as data, not code +- **Minimal Python Knowledge**: Just fill in the template +- **Comprehensive Coverage**: Test auth, preflight, and workflow APIs +- **Consistent Quality**: Same test structure across all connectors + +### For TDD (Test-Driven Development) + +- **Scenarios = Specification**: Write what should happen before implementing +- **Fast Feedback**: Run tests frequently during development +- **Regression Prevention**: Ensure changes don't break existing functionality + +## Quick Start + +### Step 1: Copy the Example + +```bash +cp -r tests/integration/_example tests/integration/my_connector +``` + +### Step 2: Define Your Scenarios + +Edit `scenarios.py`: + +```python +from application_sdk.test_utils.integration import ( + Scenario, lazy, equals, exists +) + +def load_credentials(): + return { + "host": os.getenv("MY_DB_HOST"), + "username": os.getenv("MY_DB_USER"), + "password": os.getenv("MY_DB_PASSWORD"), + } + +scenarios = [ + Scenario( + name="auth_valid", + api="auth", + args=lazy(lambda: {"credentials": load_credentials()}), + assert_that={"success": equals(True)} + ), +] +``` + +### Step 3: Create Test Class + +Edit `test_integration.py`: + +```python +from application_sdk.test_utils.integration import BaseIntegrationTest +from .scenarios import scenarios + +class MyConnectorTest(BaseIntegrationTest): + scenarios = scenarios + server_host = "http://localhost:8000" +``` + +### Step 4: Run Tests + +```bash +export MY_DB_HOST=localhost +export MY_DB_USER=test +export MY_DB_PASSWORD=secret +export APP_SERVER_URL=http://localhost:8000 + +pytest tests/integration/my_connector/ -v +``` + +## Core Concepts + +### Scenarios + +A **Scenario** defines a single test case: + +```python +Scenario( + name="auth_valid_credentials", # Unique identifier + api="auth", # API to test + args={"credentials": {...}}, # Input arguments + assert_that={"success": equals(True)} # Expected outcomes +) +``` + +### Supported APIs + +| API | Endpoint | Purpose | +|-----|----------|---------| +| `auth` | `/workflows/v1/auth` | Test authentication | +| `preflight` | `/workflows/v1/check` | Validate configuration | +| `workflow` | `/workflows/v1/{endpoint}` | Start workflow | + +### Lazy Evaluation + +Use `lazy()` to defer computation until test execution: + +```python +# BAD: Loads at import time (fails if env vars missing) +args={"credentials": load_credentials()} + +# GOOD: Loads when test runs +args=lazy(lambda: {"credentials": load_credentials()}) +``` + +Benefits: +- Tests can be defined in one environment, run in another +- Credentials loaded only when needed +- Values cached after first evaluation + +### Assertion DSL + +The assertion DSL provides **higher-order functions** that return predicates: + +```python +from application_sdk.test_utils.integration import ( + equals, exists, one_of, contains, greater_than +) + +assert_that = { + "success": equals(True), + "data.workflow_id": exists(), + "data.status": one_of(["RUNNING", "COMPLETED"]), + "message": contains("successful"), + "data.count": greater_than(0), +} +``` + +## Assertion Reference + +### Basic Assertions + +| Function | Description | Example | +|----------|-------------|---------| +| `equals(value)` | Exact equality | `equals(True)` | +| `not_equals(value)` | Not equal | `not_equals(None)` | +| `exists()` | Not None | `exists()` | +| `is_none()` | Is None | `is_none()` | +| `is_true()` | Truthy value | `is_true()` | +| `is_false()` | Falsy value | `is_false()` | + +### Collection Assertions + +| Function | Description | Example | +|----------|-------------|---------| +| `one_of(list)` | Value in list | `one_of(["a", "b"])` | +| `not_one_of(list)` | Value not in list | `not_one_of(["error"])` | +| `contains(item)` | Contains item | `contains("success")` | +| `not_contains(item)` | Doesn't contain | `not_contains("error")` | +| `has_length(n)` | Length equals n | `has_length(3)` | +| `is_empty()` | Empty collection | `is_empty()` | +| `is_not_empty()` | Non-empty | `is_not_empty()` | + +### Numeric Assertions + +| Function | Description | Example | +|----------|-------------|---------| +| `greater_than(n)` | Greater than | `greater_than(0)` | +| `greater_than_or_equal(n)` | >= | `greater_than_or_equal(1)` | +| `less_than(n)` | Less than | `less_than(100)` | +| `less_than_or_equal(n)` | <= | `less_than_or_equal(10)` | +| `between(min, max)` | In range | `between(1, 10)` | + +### String Assertions + +| Function | Description | Example | +|----------|-------------|---------| +| `matches(pattern)` | Regex match | `matches(r"^[a-z]+$")` | +| `starts_with(prefix)` | Starts with | `starts_with("http")` | +| `ends_with(suffix)` | Ends with | `ends_with(".json")` | + +### Type Assertions + +| Function | Description | Example | +|----------|-------------|---------| +| `is_type(type)` | Instance check | `is_type(str)` | +| `is_dict()` | Is dictionary | `is_dict()` | +| `is_list()` | Is list | `is_list()` | +| `is_string()` | Is string | `is_string()` | + +### Combinators + +Combine multiple assertions: + +```python +from application_sdk.test_utils.integration import all_of, any_of, none_of + +# All must pass +"data.name": all_of(exists(), is_string(), is_not_empty()) + +# At least one must pass +"data.role": any_of(equals("admin"), equals("superuser")) + +# None should pass +"message": none_of(contains("error"), contains("fail")) +``` + +### Custom Assertions + +Create your own: + +```python +from application_sdk.test_utils.integration import custom + +# Using custom() +"data.count": custom(lambda x: x % 2 == 0, "is_even") + +# Or directly as a lambda +"data.value": lambda x: x > 0 and x < 100 +``` + +## Writing Effective Scenarios + +### Auth Scenarios + +Test different authentication methods and edge cases: + +```python +auth_scenarios = [ + # Valid credentials + Scenario( + name="auth_valid", + api="auth", + args=lazy(lambda: {"credentials": load_credentials()}), + assert_that={"success": equals(True)} + ), + + # Invalid password + Scenario( + name="auth_invalid_password", + api="auth", + args=lazy(lambda: { + "credentials": {**load_credentials(), "password": "wrong"} + }), + assert_that={"success": equals(False)} + ), + + # Empty credentials + Scenario( + name="auth_empty", + api="auth", + args={"credentials": {}}, + assert_that={"success": equals(False)} + ), +] +``` + +### Preflight Scenarios + +Test configuration validation: + +```python +preflight_scenarios = [ + # Valid configuration + Scenario( + name="preflight_valid", + api="preflight", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": {"databases": ["TEST_DB"]} + }), + assert_that={"success": equals(True)} + ), + + # Non-existent database + Scenario( + name="preflight_bad_database", + api="preflight", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": {"databases": ["NONEXISTENT"]} + }), + assert_that={"success": equals(False)} + ), +] +``` + +### Workflow Scenarios + +Test workflow execution: + +```python +workflow_scenarios = [ + # Successful workflow + Scenario( + name="workflow_success", + api="workflow", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": {"databases": ["TEST_DB"]}, + "connection": {"name": "test_conn"} + }), + assert_that={ + "success": equals(True), + "data.workflow_id": exists(), + "data.run_id": exists(), + } + ), +] +``` + +## Test Class Configuration + +### Basic Configuration + +```python +class MyConnectorTest(BaseIntegrationTest): + scenarios = scenarios + server_host = "http://localhost:8000" + server_version = "v1" + workflow_endpoint = "/start" + timeout = 30 +``` + +### Dynamic Workflow Endpoint + +If your workflow endpoint is different from `/start`: + +```python +class MyConnectorTest(BaseIntegrationTest): + scenarios = scenarios + workflow_endpoint = "/extract" # Custom endpoint +``` + +Or per-scenario: + +```python +Scenario( + name="workflow_custom_endpoint", + api="workflow", + endpoint="/custom/start", # Override for this scenario + args={...}, + assert_that={...} +) +``` + +### Setup and Teardown Hooks + +```python +class MyConnectorTest(BaseIntegrationTest): + scenarios = scenarios + + @classmethod + def setup_test_environment(cls): + """Called before any tests run.""" + # Create test database, schema, etc. + cls.db = create_database_connection() + cls.db.execute("CREATE SCHEMA test_schema") + + @classmethod + def cleanup_test_environment(cls): + """Called after all tests complete.""" + # Drop test database, clean up + cls.db.execute("DROP SCHEMA test_schema CASCADE") + cls.db.close() + + def before_scenario(self, scenario): + """Called before each scenario.""" + print(f"Running: {scenario.name}") + + def after_scenario(self, scenario, result): + """Called after each scenario.""" + status = "PASSED" if result.success else "FAILED" + print(f"{scenario.name}: {status}") +``` + +## Running Tests + +### Basic Execution + +```bash +# All integration tests +pytest tests/integration/ -v + +# Specific connector +pytest tests/integration/my_connector/ -v + +# Single scenario +pytest tests/integration/my_connector/ -v -k "auth_valid" +``` + +### With Logging + +```bash +# INFO level +pytest tests/integration/ -v --log-cli-level=INFO + +# DEBUG level (shows API responses) +pytest tests/integration/ -v --log-cli-level=DEBUG +``` + +### Skip Slow Tests + +Mark scenarios to skip: + +```python +Scenario( + name="workflow_large_extraction", + api="workflow", + args={...}, + assert_that={...}, + skip=True, + skip_reason="Takes too long for CI" +) +``` + +## Best Practices + +### 1. Use Lazy Evaluation for Credentials + +```python +# Always use lazy() for credentials +args=lazy(lambda: {"credentials": load_credentials()}) +``` + +### 2. Test Negative Cases + +Don't just test the happy path: + +```python +scenarios = [ + # Happy path + Scenario(name="auth_valid", ...), + + # Negative cases + Scenario(name="auth_invalid_password", ...), + Scenario(name="auth_empty_credentials", ...), + Scenario(name="auth_missing_username", ...), +] +``` + +### 3. Use Descriptive Names + +```python +# Good names +"auth_invalid_password" +"preflight_missing_permissions" +"workflow_large_dataset" + +# Bad names +"test_1" +"scenario_a" +``` + +### 4. Document Complex Scenarios + +```python +Scenario( + name="preflight_partial_permissions", + description="Test when user has read but not write permissions", + api="preflight", + args={...}, + assert_that={...} +) +``` + +### 5. Clean Up Test Data + +Use hooks to manage test data: + +```python +@classmethod +def setup_test_environment(cls): + cls.test_data = create_test_data() + +@classmethod +def cleanup_test_environment(cls): + delete_test_data(cls.test_data) +``` + +## Troubleshooting + +### "Server not available" + +Check server is running: +```bash +curl http://localhost:8000/server/health +``` + +### "Credentials not loading" + +Verify environment variables: +```bash +env | grep MY_DB_ +``` + +### "Assertion failed" + +Run with debug logging: +```bash +pytest -v --log-cli-level=DEBUG +``` + +### "Timeout" + +Increase timeout: +```python +class MyTest(BaseIntegrationTest): + timeout = 60 # Increase from default 30 +``` + +## Example Directory Structure + +``` +tests/integration/ +├── __init__.py +├── conftest.py # Shared fixtures +├── README.md +├── _example/ # Reference example +│ ├── __init__.py +│ ├── conftest.py +│ ├── scenarios.py +│ ├── test_integration.py +│ └── README.md +└── my_connector/ # Your connector tests + ├── __init__.py + ├── conftest.py + ├── scenarios.py + └── test_integration.py +``` + +## Summary + +1. **Copy the example**: Start from `tests/integration/_example/` +2. **Define scenarios**: Edit `scenarios.py` with your test cases +3. **Create test class**: Inherit from `BaseIntegrationTest` +4. **Set environment variables**: Configure credentials +5. **Run tests**: `pytest tests/integration/my_connector/ -v` + +The framework handles the complexity of API calls, response validation, and reporting. You focus on defining what to test and what to expect. diff --git a/test-rules.md b/test-rules.md new file mode 100644 index 000000000..e8f6ae786 --- /dev/null +++ b/test-rules.md @@ -0,0 +1,41 @@ +Write Unit Test for each Activity + +No side-effects allowed (mock these behaviours) + +No API calls + +No File Read/Writes + +No DB Read/Writes + +Cover the following: + +Null values + +Empty values - str, dict, list, etc + +Expected Exceptions raised on negative behaviours + +Resides in the App's codebase and run in CI/CD in App's releases + + +# Writing Integration Tests + + +Write Integration Tests for the entire App + +Use real Source systems and Local Object Store and Secret Store + +Cover the following: + +Connector's Authentication Types + +Connector's Extraction Types + +Preflight Checks API + +Filter Metadata API + +Special Configs not exposed via UI + +Resides in the App's codebase and run in CI/CD in App's releases \ No newline at end of file diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 000000000..711868668 --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,254 @@ +# Integration Tests + +This directory contains integration tests for Apps-SDK connectors. + +## Quick Start + +### 1. Copy the Example + +```bash +cp -r tests/integration/_example tests/integration/my_connector +``` + +### 2. Configure Your Scenarios + +Edit `tests/integration/my_connector/scenarios.py`: + +```python +from application_sdk.test_utils.integration import Scenario, lazy, equals, exists + +def load_my_credentials(): + return { + "host": os.getenv("MY_DB_HOST"), + "username": os.getenv("MY_DB_USER"), + "password": os.getenv("MY_DB_PASSWORD"), + } + +scenarios = [ + Scenario( + name="auth_valid", + api="auth", + args=lazy(lambda: {"credentials": load_my_credentials()}), + assert_that={"success": equals(True)} + ), + # Add more scenarios... +] +``` + +### 3. Set Environment Variables + +```bash +export ATLAN_APPLICATION_NAME=postgres +export E2E_POSTGRES_HOST=localhost +export E2E_POSTGRES_USERNAME=admin +export E2E_POSTGRES_PASSWORD=secret +export E2E_POSTGRES_PORT=5432 +``` + +#### Per-Scenario Credentials + +Scenarios that need different credentials (e.g. a restricted user for permission +tests) can use scenario-specific env vars. The framework checks for these first, +then falls back to the app-level defaults. + +**Naming convention:** + +| Scope | Pattern | Example | +|---|---|---| +| Scenario-specific (checked first) | `E2E__` | `E2E_PREFLIGHT_MISSING_PERMISSIONS_USERNAME` | +| App-level default (fallback) | `E2E__` | `E2E_POSTGRES_USERNAME` | + +Scenario names are matched in uppercase with underscores (matching the `name` +field on the `Scenario` object). + +```bash +# App-level defaults (used by most scenarios) +export E2E_POSTGRES_USERNAME=admin +export E2E_POSTGRES_PASSWORD=secret + +# Scenario-specific overrides (only for preflight_missing_permissions) +export E2E_PREFLIGHT_MISSING_PERMISSIONS_USERNAME=restricted_user +export E2E_PREFLIGHT_MISSING_PERMISSIONS_PASSWORD=restricted_pass +``` + +```python +Scenario( + name="preflight_missing_permissions", + api="preflight", + # No credentials= override needed — framework auto-resolves: + # 1. Looks for E2E_PREFLIGHT_MISSING_PERMISSIONS_* env vars + # 2. Falls back to E2E_POSTGRES_* defaults + metadata={"databases": ["mydb"]}, + assert_that={"success": equals(False)}, +) +``` + +**Priority order for credential resolution:** + +1. `scenario.credentials` — explicit dict on the Scenario (highest) +2. `cls.default_credentials` — class-level defaults on the test class +3. `E2E_{SCENARIO_NAME}_*` env vars — scenario-specific overrides +4. `E2E_{APP_NAME}_*` env vars — app-level defaults (lowest) + +### 4. Run Tests + +```bash +# Run all integration tests +pytest tests/integration/ -v + +# Run specific connector tests +pytest tests/integration/my_connector/ -v + +# Run with verbose output +pytest tests/integration/my_connector/ -v --log-cli-level=INFO +``` + +## Directory Structure + +``` +tests/integration/ +├── __init__.py +├── conftest.py # Shared fixtures +├── README.md # This file +└── _example/ # Reference example (copy this) + ├── README.md + ├── scenarios.py # Scenario definitions + ├── test_integration.py + └── conftest.py +``` + +## Writing Scenarios + +A scenario defines: +- **name**: Unique identifier +- **api**: Which API to test ("auth", "preflight", "workflow") +- **args**: Input arguments (can be lazy-evaluated) +- **assert_that**: Expected outcomes using assertion DSL + +### Example Scenarios + +```python +from application_sdk.test_utils.integration import ( + Scenario, lazy, equals, exists, one_of, is_not_empty +) + +scenarios = [ + # Valid authentication + Scenario( + name="auth_valid_credentials", + api="auth", + args=lazy(lambda: {"credentials": load_credentials()}), + assert_that={ + "success": equals(True), + "message": equals("Authentication successful"), + } + ), + + # Invalid authentication + Scenario( + name="auth_invalid_credentials", + api="auth", + args={"credentials": {"username": "wrong", "password": "wrong"}}, + assert_that={ + "success": equals(False), + } + ), + + # Preflight check + Scenario( + name="preflight_valid", + api="preflight", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": {"databases": ["TEST_DB"]} + }), + assert_that={ + "success": equals(True), + } + ), + + # Workflow execution + Scenario( + name="workflow_full_extraction", + api="workflow", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": {"databases": ["TEST_DB"]}, + "connection": {"name": "test_conn"} + }), + assert_that={ + "success": equals(True), + "data.workflow_id": exists(), + } + ), +] +``` + +## Assertion DSL Reference + +### Basic Assertions +- `equals(value)` - Exact equality +- `not_equals(value)` - Not equal +- `exists()` - Not None +- `is_none()` - Is None +- `is_true()` - Truthy +- `is_false()` - Falsy + +### Collection Assertions +- `one_of([...])` - Value in list +- `contains(item)` - Contains item +- `has_length(n)` - Length equals n +- `is_empty()` - Empty collection +- `is_not_empty()` - Non-empty + +### String Assertions +- `matches(regex)` - Regex match +- `starts_with(prefix)` - Starts with +- `ends_with(suffix)` - Ends with + +### Numeric Assertions +- `greater_than(n)` - Greater than +- `less_than(n)` - Less than +- `between(min, max)` - In range + +### Combinators +- `all_of(p1, p2, ...)` - All pass +- `any_of(p1, p2, ...)` - Any passes +- `none_of(p1, p2, ...)` - None pass + +### Custom +- `custom(fn)` - User function + +## Lazy Evaluation + +Use `lazy()` to defer credential loading until test execution: + +```python +# BAD: Loads immediately at import time +args={"credentials": load_credentials()} # Fails if env vars not set + +# GOOD: Loads when test runs +args=lazy(lambda: {"credentials": load_credentials()}) # Deferred +``` + +## Troubleshooting + +### Tests not finding server +- Ensure server is running: `curl http://localhost:8000/server/health` +- Check APP_SERVER_URL environment variable + +### Credentials not loading +- Verify environment variables are set +- Use `lazy()` for deferred loading + +### Assertions failing +- Check response structure with `--log-cli-level=DEBUG` +- Verify paths in assert_that match response keys + +## Best Practices + +1. **Use lazy() for credentials** - Don't load at import time +2. **Test negative cases** - Invalid credentials, missing data +3. **Keep scenarios focused** - One thing per scenario +4. **Use descriptive names** - `auth_invalid_password` not `test_1` +5. **Clean up test data** - Use setup/teardown hooks diff --git a/tests/integration/_example/README.md b/tests/integration/_example/README.md new file mode 100644 index 000000000..8e7bee91b --- /dev/null +++ b/tests/integration/_example/README.md @@ -0,0 +1,286 @@ +# Example Integration Test + +This is a complete, working example of integration tests using the Apps-SDK +integration testing framework. Copy this directory as a starting point for +your connector's integration tests. + +## Quick Start + +### 1. Set Environment Variables + +```bash +# Server configuration +export APP_SERVER_URL=http://localhost:8000 + +# Database credentials (customize for your connector) +export EXAMPLE_DB_HOST=localhost +export EXAMPLE_DB_PORT=5432 +export EXAMPLE_DB_USER=test_user +export EXAMPLE_DB_PASSWORD=test_password +export EXAMPLE_DB_NAME=test_db +``` + +### 2. Start Your Application Server + +Make sure your connector application is running: + +```bash +# Example: Start your connector +python examples/application_sql.py +``` + +### 3. Run Tests + +```bash +# Run all example tests +pytest tests/integration/_example/ -v + +# Run with logging +pytest tests/integration/_example/ -v --log-cli-level=INFO + +# Run specific scenario type +pytest tests/integration/_example/ -v -k "auth" + +# Run single scenario +pytest tests/integration/_example/ -v -k "auth_valid_credentials" +``` + +## Files in This Directory + +``` +_example/ +├── README.md # This file +├── __init__.py # Package marker +├── conftest.py # Example-specific fixtures +├── scenarios.py # Scenario definitions +└── test_integration.py # Test class +``` + +### scenarios.py + +Defines all test scenarios using the declarative format: + +```python +from application_sdk.test_utils.integration import Scenario, lazy, equals + +scenarios = [ + Scenario( + name="auth_valid_credentials", + api="auth", + args=lazy(lambda: {"credentials": load_credentials()}), + assert_that={"success": equals(True)} + ), + # ... more scenarios +] +``` + +### test_integration.py + +The test class that runs scenarios: + +```python +from application_sdk.test_utils.integration import BaseIntegrationTest +from .scenarios import scenarios + +class ExampleIntegrationTest(BaseIntegrationTest): + scenarios = scenarios + server_host = "http://localhost:8000" +``` + +### conftest.py + +Pytest fixtures specific to this example: + +- `example_credentials` - Load credentials from env +- `skip_if_no_server` - Skip if server unavailable +- `skip_if_no_database` - Skip if database unavailable + +## Customizing for Your Connector + +### Step 1: Copy the Directory + +```bash +cp -r tests/integration/_example tests/integration/my_connector +``` + +### Step 2: Update Credential Loading + +Edit `scenarios.py` to load your connector's credentials: + +```python +def load_credentials(): + return { + # Your credential fields + "account": os.getenv("SNOWFLAKE_ACCOUNT"), + "username": os.getenv("SNOWFLAKE_USER"), + "password": os.getenv("SNOWFLAKE_PASSWORD"), + "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"), + } +``` + +### Step 3: Define Your Scenarios + +Add scenarios for your connector's specific behavior: + +```python +scenarios = [ + # Auth with OAuth + Scenario( + name="auth_oauth", + api="auth", + args=lazy(lambda: { + "credentials": { + "client_id": os.getenv("CLIENT_ID"), + "client_secret": os.getenv("CLIENT_SECRET"), + } + }), + assert_that={"success": equals(True)} + ), + + # Preflight with specific config + Scenario( + name="preflight_with_warehouse", + api="preflight", + args=lazy(lambda: { + "credentials": load_credentials(), + "metadata": { + "warehouse": "COMPUTE_WH", + "databases": ["TEST_DB"], + } + }), + assert_that={"success": equals(True)} + ), +] +``` + +### Step 4: Update Test Class + +Edit `test_integration.py`: + +```python +class MyConnectorIntegrationTest(BaseIntegrationTest): + scenarios = scenarios + server_host = os.getenv("APP_SERVER_URL", "http://localhost:8000") + workflow_endpoint = "/extract" # If different from /start + + @classmethod + def setup_test_environment(cls): + # Create test data + pass + + @classmethod + def cleanup_test_environment(cls): + # Clean up test data + pass +``` + +### Step 5: Update Fixtures + +Edit `conftest.py` with your connector-specific fixtures. + +## Scenario Categories + +### Authentication Scenarios + +Test different authentication methods and edge cases: + +- Valid credentials +- Invalid credentials +- Missing fields +- Expired tokens (if applicable) +- Different auth methods (OAuth, API key, etc.) + +### Preflight Scenarios + +Test configuration validation: + +- Valid configuration +- Invalid credentials +- Missing permissions +- Non-existent resources +- Edge case configurations + +### Workflow Scenarios + +Test workflow execution: + +- Successful extraction +- Partial failures +- Invalid configurations +- Timeout handling + +## Assertion Examples + +```python +from application_sdk.test_utils.integration import ( + equals, exists, one_of, contains, is_not_empty, + greater_than, matches, all_of, any_of +) + +assert_that = { + # Basic equality + "success": equals(True), + + # Check existence + "data.workflow_id": exists(), + + # Check in list + "data.status": one_of(["RUNNING", "COMPLETED"]), + + # String contains + "message": contains("successful"), + + # Numeric comparison + "data.count": greater_than(0), + + # Regex match + "data.id": matches(r"^[a-f0-9-]+$"), + + # Combined assertions + "data.name": all_of(exists(), is_not_empty()), +} +``` + +## Troubleshooting + +### "Server not available" + +Ensure your application server is running: + +```bash +curl http://localhost:8000/server/health +``` + +### "Credentials not loading" + +Check environment variables are set: + +```bash +env | grep EXAMPLE_ +``` + +### "Assertion failed" + +Run with debug logging: + +```bash +pytest tests/integration/_example/ -v --log-cli-level=DEBUG +``` + +### "Timeout" + +Increase the timeout in test class: + +```python +class MyTest(BaseIntegrationTest): + timeout = 60 # Increase from default 30 +``` + +## Best Practices + +1. **Use lazy() for credentials** - Prevents failures at import time +2. **Test negative cases** - Invalid inputs, missing permissions +3. **Clean up test data** - Use setup/cleanup hooks +4. **Use descriptive names** - `auth_expired_token` not `test_3` +5. **Document scenarios** - Use the description field +6. **Skip unavailable tests** - Use skip/skip_reason fields diff --git a/tests/integration/_example/__init__.py b/tests/integration/_example/__init__.py new file mode 100644 index 000000000..6bf35f210 --- /dev/null +++ b/tests/integration/_example/__init__.py @@ -0,0 +1,4 @@ +"""Example integration test for reference. + +Copy this directory as a starting point for your connector's integration tests. +""" diff --git a/tests/integration/_example/conftest.py b/tests/integration/_example/conftest.py new file mode 100644 index 000000000..02349196f --- /dev/null +++ b/tests/integration/_example/conftest.py @@ -0,0 +1,115 @@ +"""Fixtures specific to the example integration tests. + +This file contains pytest fixtures that are only used by the +example integration tests. Copy and modify for your connector. +""" + +import os +from typing import Any, Dict + +import pytest + + +@pytest.fixture(scope="module") +def example_credentials() -> Dict[str, Any]: + """Provide example credentials for testing. + + Returns: + Dict[str, Any]: Example credentials from environment. + """ + return { + "host": os.getenv("EXAMPLE_DB_HOST", "localhost"), + "port": int(os.getenv("EXAMPLE_DB_PORT", "5432")), + "username": os.getenv("EXAMPLE_DB_USER", "test_user"), + "password": os.getenv("EXAMPLE_DB_PASSWORD", "test_password"), + "database": os.getenv("EXAMPLE_DB_NAME", "test_db"), + } + + +@pytest.fixture(scope="module") +def example_metadata() -> Dict[str, Any]: + """Provide example metadata configuration. + + Returns: + Dict[str, Any]: Example metadata configuration. + """ + return { + "databases": [os.getenv("EXAMPLE_DB_NAME", "test_db")], + "include_schemas": ["public"], + } + + +@pytest.fixture(scope="module") +def example_connection() -> Dict[str, Any]: + """Provide example connection configuration. + + Returns: + Dict[str, Any]: Example connection configuration. + """ + return { + "connection_name": "example_test_connection", + "qualified_name": "default/example/test", + } + + +@pytest.fixture +def skip_if_no_server(): + """Skip test if server is not available. + + Usage: + def test_something(skip_if_no_server): + # Test will be skipped if server not available + ... + """ + import requests + + server_url = os.getenv("APP_SERVER_URL", "http://localhost:8000") + + try: + response = requests.get(f"{server_url}/server/health", timeout=5) + if response.status_code != 200: + pytest.skip(f"Server not healthy at {server_url}") + except requests.RequestException: + pytest.skip(f"Server not available at {server_url}") + + +@pytest.fixture +def skip_if_no_database(): + """Skip test if database is not available. + + Customize this fixture for your connector's database. + + Usage: + def test_something(skip_if_no_database): + # Test will be skipped if database not available + ... + """ + # Example: Check if we can connect to the database + # Customize this for your connector + host = os.getenv("EXAMPLE_DB_HOST") + if not host: + pytest.skip("Database host not configured (EXAMPLE_DB_HOST)") + + # Add actual connection check if needed + # try: + # conn = connect_to_database(...) + # conn.close() + # except Exception: + # pytest.skip("Could not connect to database") + + +# ============================================================================= +# README for this file +# ============================================================================= +# +# This conftest.py is specific to the _example integration tests. +# When you copy this directory for your connector: +# +# 1. Rename fixtures (example_* -> your_connector_*) +# 2. Update credential loading to match your connector +# 3. Update skip conditions for your external dependencies +# 4. Add any connector-specific fixtures you need +# +# The parent conftest.py (tests/integration/conftest.py) provides +# shared fixtures like load_credentials_from_env() that are available +# to all integration tests. diff --git a/tests/integration/_example/scenarios.py b/tests/integration/_example/scenarios.py new file mode 100644 index 000000000..4ac115095 --- /dev/null +++ b/tests/integration/_example/scenarios.py @@ -0,0 +1,198 @@ +"""Example scenario definitions for integration testing. + +This file demonstrates how to define test scenarios using the +integration testing framework. + +With the simplified framework, you typically don't need helper functions. +Credentials are auto-loaded from E2E_{APP_NAME}_* environment variables. + +Usage: + 1. Copy this file to your connector's test directory + 2. Define scenarios with assertions + 3. Run: pytest tests/integration/ -v +""" + +from application_sdk.test_utils.integration import ( + Scenario, + all_of, + contains, + equals, + exists, + is_not_empty, + is_string, + one_of, +) + +# ============================================================================= +# Auth Scenarios +# ============================================================================= + +auth_scenarios = [ + # Valid credentials - auto-loaded from E2E_* env vars + Scenario( + name="auth_valid_credentials", + api="auth", + assert_that={ + "success": equals(True), + "message": all_of(is_string(), is_not_empty()), + }, + description="Test authentication with valid credentials", + ), + # Invalid credentials - override to test failure + Scenario( + name="auth_invalid_credentials", + api="auth", + credentials={ + "host": "invalid_host", + "port": 9999, + "username": "invalid_user", + "password": "invalid_password", + "database": "invalid_db", + }, + assert_that={ + "success": equals(False), + }, + description="Test authentication with invalid credentials", + ), + # Empty credentials - should fail + Scenario( + name="auth_empty_credentials", + api="auth", + credentials={}, + assert_that={ + "success": equals(False), + }, + description="Test authentication with empty credentials", + ), +] + + +# ============================================================================= +# Preflight Scenarios +# ============================================================================= + +preflight_scenarios = [ + # Valid configuration - credentials auto-loaded + Scenario( + name="preflight_valid_config", + api="preflight", + metadata={ + "databases": ["test_db"], + "include_schemas": ["public"], + "exclude_tables": [], + }, + assert_that={ + "success": equals(True), + }, + description="Test preflight check with valid configuration", + ), + # Invalid credentials + Scenario( + name="preflight_invalid_credentials", + api="preflight", + credentials={ + "host": "invalid_host", + "port": 9999, + "username": "invalid_user", + "password": "invalid_password", + "database": "invalid_db", + }, + metadata={ + "databases": ["test_db"], + "include_schemas": ["public"], + }, + assert_that={ + "success": equals(False), + }, + description="Test preflight check with invalid credentials", + ), +] + + +# ============================================================================= +# Workflow Scenarios +# ============================================================================= + +workflow_scenarios = [ + # Valid workflow - credentials auto-loaded + Scenario( + name="workflow_valid_execution", + api="workflow", + metadata={ + "databases": ["test_db"], + "include_schemas": ["public"], + }, + connection={ + "connection_name": "example_test_connection", + "qualified_name": "default/example/test", + }, + assert_that={ + "success": equals(True), + "message": contains("successfully"), + "data.workflow_id": exists(), + "data.run_id": exists(), + }, + description="Test workflow execution with valid configuration", + ), + # Workflow with metadata output validation + # After the workflow completes, the framework compares the actual + # extracted metadata against the expected baseline JSON file. + # Uncomment and adapt for your connector: + # + # Scenario( + # name="workflow_with_metadata_validation", + # api="workflow", + # metadata={ + # "databases": ["test_db"], + # "include_schemas": ["public"], + # }, + # connection={ + # "connection_name": "example_test_connection", + # "qualified_name": "default/example/test", + # }, + # expected_data="tests/integration/_example/expected/baseline.json", + # strict_comparison=True, + # workflow_timeout=300, + # polling_interval=10, + # assert_that={ + # "success": equals(True), + # "data.workflow_id": exists(), + # }, + # description="Workflow with metadata output validation against baseline", + # ), + # Invalid credentials + Scenario( + name="workflow_invalid_credentials", + api="workflow", + credentials={ + "host": "invalid_host", + "username": "invalid_user", + "password": "invalid_password", + }, + metadata={ + "databases": ["test_db"], + }, + connection={ + "connection_name": "example_test_connection", + "qualified_name": "default/example/test", + }, + assert_that={ + "success": one_of([True, False]), + }, + description="Test workflow with invalid credentials", + ), +] + + +# ============================================================================= +# All Scenarios +# ============================================================================= + +scenarios = auth_scenarios + preflight_scenarios + workflow_scenarios + +__all__ = [ + "scenarios", + "auth_scenarios", + "preflight_scenarios", + "workflow_scenarios", +] diff --git a/tests/integration/_example/test_integration.py b/tests/integration/_example/test_integration.py new file mode 100644 index 000000000..449713f7c --- /dev/null +++ b/tests/integration/_example/test_integration.py @@ -0,0 +1,36 @@ +"""Example integration test class. + +This file demonstrates how to create an integration test class +using the Apps-SDK integration testing framework. + +Prerequisites: + 1. Set environment variables: + ATLAN_APPLICATION_NAME=your_app + E2E_YOUR_APP_HOST=localhost + E2E_YOUR_APP_USERNAME=test + E2E_YOUR_APP_PASSWORD=secret + + 2. Start application server: + uv run python main.py + + 3. Run tests: + pytest tests/integration/ -v + pytest tests/integration/ -v -k "auth_valid" +""" + +from application_sdk.test_utils.integration import BaseIntegrationTest + +from .scenarios import scenarios + + +class ExampleIntegrationTest(BaseIntegrationTest): + """Integration tests for the example connector. + + Just define scenarios - the framework handles everything: + - Credentials auto-loaded from E2E_* env vars + - Server URL auto-discovered from ATLAN_APP_HTTP_HOST/PORT + - Individual pytest tests auto-generated per scenario + - Rich assertion error messages + """ + + scenarios = scenarios diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 000000000..130a8f7d4 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,83 @@ +"""Shared fixtures and configuration for integration tests. + +This module provides pytest fixtures that are available to all +integration tests in this directory. +""" + +import os +from typing import Any, Dict + +import pytest + + +@pytest.fixture(scope="session") +def server_host() -> str: + """Get the application server host from environment. + + Returns: + str: The server URL (default: http://localhost:8000). + """ + return os.getenv("APP_SERVER_URL", "http://localhost:8000") + + +@pytest.fixture(scope="session") +def integration_test_config() -> Dict[str, Any]: + """Get integration test configuration from environment. + + Returns: + Dict[str, Any]: Configuration dictionary. + """ + return { + "server_host": os.getenv("APP_SERVER_URL", "http://localhost:8000"), + "server_version": os.getenv("APP_SERVER_VERSION", "v1"), + "workflow_endpoint": os.getenv("WORKFLOW_ENDPOINT", "/start"), + "timeout": int(os.getenv("INTEGRATION_TEST_TIMEOUT", "30")), + } + + +def load_credentials_from_env(prefix: str) -> Dict[str, Any]: + """Load credentials from environment variables with a given prefix. + + This helper function collects all environment variables that start + with the given prefix and creates a credentials dictionary. + + Args: + prefix: The environment variable prefix (e.g., "POSTGRES"). + + Returns: + Dict[str, Any]: Credentials dictionary. + + Example: + # With environment variables: + # POSTGRES_HOST=localhost + # POSTGRES_PORT=5432 + # POSTGRES_USER=test + + >>> creds = load_credentials_from_env("POSTGRES") + >>> creds + {"host": "localhost", "port": "5432", "user": "test"} + """ + credentials = {} + prefix_upper = prefix.upper() + + for key, value in os.environ.items(): + if key.startswith(f"{prefix_upper}_"): + # Remove prefix and convert to lowercase + cred_key = key[len(prefix_upper) + 1 :].lower() + credentials[cred_key] = value + + return credentials + + +@pytest.fixture +def load_creds(): + """Fixture that returns the load_credentials_from_env function. + + This allows test modules to use the credential loader as a fixture. + + Example: + def test_something(load_creds): + creds = load_creds("MY_APP") + assert "username" in creds + """ + return load_credentials_from_env diff --git a/tests/unit/test_utils/__init__.py b/tests/unit/test_utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/test_utils/integration/__init__.py b/tests/unit/test_utils/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/test_utils/integration/test_comparison.py b/tests/unit/test_utils/integration/test_comparison.py new file mode 100644 index 000000000..30f6ca5b3 --- /dev/null +++ b/tests/unit/test_utils/integration/test_comparison.py @@ -0,0 +1,413 @@ +"""Tests for the metadata comparison engine.""" + +import json +import os +import tempfile + +import pytest + +from application_sdk.test_utils.integration.comparison import ( + AssetDiff, + GapReport, + compare_metadata, + load_actual_output, + load_expected_data, +) + + +class TestCompareMetadata: + """Tests for the compare_metadata function.""" + + def test_identical_data_no_gaps(self): + """Identical expected and actual data produces no gaps.""" + expected = { + "Table": [ + {"attributes": {"name": "orders", "columnCount": 6}}, + {"attributes": {"name": "users", "columnCount": 3}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders", "columnCount": 6}}, + {"typeName": "Table", "attributes": {"name": "users", "columnCount": 3}}, + ] + + report = compare_metadata(expected, actual) + assert not report.has_gaps + + def test_missing_asset_detected(self): + """An asset in expected but not in actual is reported as missing.""" + expected = { + "Table": [ + {"attributes": {"name": "orders"}}, + {"attributes": {"name": "users"}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + ] + + report = compare_metadata(expected, actual) + assert report.has_gaps + + missing = [d for d in report.diffs if d.diff_type == "missing"] + assert len(missing) == 1 + assert missing[0].asset_name == "users" + assert missing[0].asset_type == "Table" + + def test_extra_asset_strict_mode(self): + """Extra assets in actual output fail the test in strict mode.""" + expected = { + "Table": [ + {"attributes": {"name": "orders"}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + {"typeName": "Table", "attributes": {"name": "extra_table"}}, + ] + + report = compare_metadata(expected, actual, strict=True) + assert report.has_gaps + + extra = [d for d in report.diffs if d.diff_type == "extra"] + assert len(extra) == 1 + assert extra[0].asset_name == "extra_table" + + def test_extra_asset_lenient_mode(self): + """Extra assets in actual output are ignored in lenient mode.""" + expected = { + "Table": [ + {"attributes": {"name": "orders"}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + {"typeName": "Table", "attributes": {"name": "extra_table"}}, + ] + + report = compare_metadata(expected, actual, strict=False) + # Only count_mismatch, no "extra" diffs + extra = [d for d in report.diffs if d.diff_type == "extra"] + assert len(extra) == 0 + + def test_attribute_mismatch_detected(self): + """Differing attribute values are reported as attribute_mismatch.""" + expected = { + "Table": [ + {"attributes": {"name": "orders", "columnCount": 6}}, + ] + } + actual = [ + { + "typeName": "Table", + "attributes": {"name": "orders", "columnCount": 10}, + }, + ] + + report = compare_metadata(expected, actual) + assert report.has_gaps + + mismatches = [d for d in report.diffs if d.diff_type == "attribute_mismatch"] + assert len(mismatches) == 1 + assert mismatches[0].field == "attributes.columnCount" + assert mismatches[0].expected == 6 + assert mismatches[0].actual == 10 + + def test_missing_attribute_detected(self): + """An attribute in expected but absent in actual is reported.""" + expected = { + "Table": [ + {"attributes": {"name": "orders", "description": "Order table"}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + ] + + report = compare_metadata(expected, actual) + assert report.has_gaps + + missing_attrs = [d for d in report.diffs if d.diff_type == "missing_attribute"] + assert len(missing_attrs) == 1 + assert missing_attrs[0].field == "attributes.description" + + def test_count_mismatch_reported(self): + """Different asset counts are reported as count_mismatch.""" + expected = { + "Table": [ + {"attributes": {"name": "orders"}}, + {"attributes": {"name": "users"}}, + {"attributes": {"name": "products"}}, + ] + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + ] + + report = compare_metadata(expected, actual) + count_diffs = [d for d in report.diffs if d.diff_type == "count_mismatch"] + assert len(count_diffs) == 1 + assert count_diffs[0].expected == 3 + assert count_diffs[0].actual == 1 + + def test_ignored_fields_skipped(self): + """Fields in the ignored set are not compared.""" + expected = { + "Table": [ + { + "attributes": { + "name": "orders", + "qualifiedName": "old/path/orders", + "columnCount": 6, + } + }, + ] + } + actual = [ + { + "typeName": "Table", + "attributes": { + "name": "orders", + "qualifiedName": "new/path/orders", + "columnCount": 6, + }, + }, + ] + + report = compare_metadata(expected, actual) + assert not report.has_gaps + + def test_custom_ignored_fields(self): + """Custom ignored_fields set overrides defaults.""" + expected = { + "Table": [ + {"attributes": {"name": "orders", "columnCount": 6}}, + ] + } + actual = [ + { + "typeName": "Table", + "attributes": {"name": "orders", "columnCount": 10}, + }, + ] + + # Ignore columnCount + report = compare_metadata(expected, actual, ignored_fields={"columnCount"}) + assert not report.has_gaps + + def test_custom_attributes_compared(self): + """customAttributes are compared when present in expected.""" + expected = { + "Table": [ + { + "attributes": {"name": "orders"}, + "customAttributes": {"table_type": "TABLE"}, + }, + ] + } + actual = [ + { + "typeName": "Table", + "attributes": {"name": "orders"}, + "customAttributes": {"table_type": "VIEW"}, + }, + ] + + report = compare_metadata(expected, actual) + assert report.has_gaps + mismatches = [d for d in report.diffs if d.diff_type == "attribute_mismatch"] + assert len(mismatches) == 1 + assert mismatches[0].field == "customAttributes.table_type" + + def test_extra_asset_type_strict_mode(self): + """Asset types in actual but not in expected are flagged in strict mode.""" + expected = { + "Table": [{"attributes": {"name": "orders"}}], + } + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + {"typeName": "Column", "attributes": {"name": "order_id"}}, + ] + + report = compare_metadata(expected, actual, strict=True) + extra = [ + d + for d in report.diffs + if d.diff_type == "extra" and d.asset_type == "Column" + ] + assert len(extra) == 1 + + def test_nested_reference_fields_ignored(self): + """Nested reference fields like atlanSchema are skipped by default.""" + expected = { + "Table": [ + { + "attributes": { + "name": "orders", + "columnCount": 6, + "atlanSchema": { + "typeName": "Schema", + "uniqueAttributes": {"qualifiedName": "old/path"}, + }, + } + }, + ] + } + actual = [ + { + "typeName": "Table", + "attributes": { + "name": "orders", + "columnCount": 6, + "atlanSchema": { + "typeName": "Schema", + "uniqueAttributes": {"qualifiedName": "new/path"}, + }, + }, + }, + ] + + report = compare_metadata(expected, actual) + assert not report.has_gaps + + def test_multiple_asset_types(self): + """Comparison works across multiple asset types.""" + expected = { + "Database": [{"attributes": {"name": "mydb"}}], + "Table": [ + {"attributes": {"name": "orders", "columnCount": 6}}, + {"attributes": {"name": "users", "columnCount": 3}}, + ], + } + actual = [ + {"typeName": "Database", "attributes": {"name": "mydb"}}, + {"typeName": "Table", "attributes": {"name": "orders", "columnCount": 6}}, + {"typeName": "Table", "attributes": {"name": "users", "columnCount": 3}}, + ] + + report = compare_metadata(expected, actual) + assert not report.has_gaps + + def test_empty_expected_data(self): + """Empty expected data with actual assets reports extras in strict mode.""" + expected = {} + actual = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + ] + + report = compare_metadata(expected, actual, strict=True) + assert report.has_gaps + + def test_empty_actual_data(self): + """Empty actual data with expected assets reports missing.""" + expected = { + "Table": [{"attributes": {"name": "orders"}}], + } + actual = [] + + report = compare_metadata(expected, actual) + assert report.has_gaps + missing = [d for d in report.diffs if d.diff_type == "missing"] + assert len(missing) == 1 + + +class TestGapReport: + """Tests for GapReport formatting.""" + + def test_no_gaps_message(self): + """Empty report produces a clean message.""" + report = GapReport() + assert "No gaps found" in report.format_report() + + def test_format_report_includes_summary(self): + """Report includes summary counts.""" + report = GapReport( + diffs=[ + AssetDiff("Table", "orders", "missing"), + AssetDiff("Table", "users", "extra"), + ], + summary={"missing": 1, "extra": 1}, + ) + output = report.format_report() + assert "missing: 1" in output + assert "extra: 1" in output + + def test_format_report_groups_by_type(self): + """Report groups diffs by asset type.""" + report = GapReport( + diffs=[ + AssetDiff("Table", "orders", "missing"), + AssetDiff("Column", "col1", "extra"), + ], + summary={"missing": 1, "extra": 1}, + ) + output = report.format_report() + assert "[Table]" in output + assert "[Column]" in output + + +class TestLoadExpectedData: + """Tests for load_expected_data.""" + + def test_load_valid_file(self): + """Valid JSON file loads correctly.""" + data = {"Table": [{"attributes": {"name": "orders"}}]} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(data, f) + f.flush() + result = load_expected_data(f.name) + + os.unlink(f.name) + assert result == data + + def test_file_not_found(self): + """Missing file raises FileNotFoundError.""" + with pytest.raises(FileNotFoundError): + load_expected_data("/nonexistent/path.json") + + def test_invalid_json_structure(self): + """Non-dict JSON raises ValueError.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump([1, 2, 3], f) + f.flush() + + with pytest.raises(ValueError, match="JSON object"): + load_expected_data(f.name) + + os.unlink(f.name) + + +class TestLoadActualOutput: + """Tests for load_actual_output.""" + + def test_load_jsonl_files(self): + """JSONL files in output directory are loaded.""" + with tempfile.TemporaryDirectory() as tmpdir: + workflow_dir = os.path.join(tmpdir, "wf1", "run1", "table") + os.makedirs(workflow_dir) + + records = [ + {"typeName": "Table", "attributes": {"name": "orders"}}, + {"typeName": "Table", "attributes": {"name": "users"}}, + ] + with open(os.path.join(workflow_dir, "table.json"), "wb") as f: + for r in records: + f.write(json.dumps(r).encode() + b"\n") + + result = load_actual_output(tmpdir, "wf1", "run1") + assert len(result) == 2 + assert result[0]["attributes"]["name"] == "orders" + + def test_directory_not_found(self): + """Missing directory raises FileNotFoundError.""" + with pytest.raises(FileNotFoundError): + load_actual_output("/nonexistent", "wf1", "run1") + + def test_empty_directory(self): + """Empty directory raises FileNotFoundError.""" + with tempfile.TemporaryDirectory() as tmpdir: + workflow_dir = os.path.join(tmpdir, "wf1", "run1") + os.makedirs(workflow_dir) + + with pytest.raises(FileNotFoundError, match="No metadata records"): + load_actual_output(tmpdir, "wf1", "run1")