diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 650de345a..9dab3f4ff 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -2235,6 +2235,42 @@ def get_connection_catalog( ) +def get_refreshed_connection_catalog( + connection_id: str, + *, + api_root: str, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, +) -> dict[str, Any]: + """Get the configured catalog for a connection with a refreshed schema from the source. + + Uses the Config API endpoint: POST /v1/web_backend/connections/get + with ``withRefreshedCatalog: true``, which triggers a discover operation + on the connection's source and returns the updated catalog. + + This is equivalent to clicking "Refresh source schema" in the Airbyte UI. + + Args: + connection_id: The connection ID to get catalog for. + api_root: The API root URL. + client_id: OAuth client ID. + client_secret: OAuth client secret. + bearer_token: Bearer token for authentication (alternative to client credentials). + + Returns: + Dictionary containing the connection info with refreshed syncCatalog. + """ + return _make_config_api_request( + path="/web_backend/connections/get", + json={"connectionId": connection_id, "withRefreshedCatalog": True}, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + bearer_token=bearer_token, + ) + + def replace_connection_catalog( connection_id: str, configured_catalog_dict: dict[str, Any], diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index cfa3149a1..47a029ca5 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -648,6 +648,170 @@ def import_raw_catalog(self, catalog: dict[str, Any]) -> None: bearer_token=self.workspace.bearer_token, ) + def refresh_catalog(self) -> dict[str, Any]: + """Refresh the connection's catalog by re-discovering the source schema. + + Triggers a discover operation on the connection's source connector, then + replaces the connection's catalog with the refreshed result. This is + equivalent to clicking "Refresh source schema" in the Airbyte UI. + + This is useful after pinning a new connector version that advertises + new streams or updated sync mode support. + + Returns: + The refreshed syncCatalog dict. + + Raises: + PyAirbyteInputError: If the refreshed catalog is empty or missing. + """ + refreshed_response = api_util.get_refreshed_connection_catalog( + connection_id=self.connection_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, + ) + refreshed_catalog = refreshed_response.get("syncCatalog") + if not refreshed_catalog: + raise PyAirbyteInputError( + message="Refreshed catalog is empty.", + context={"connection_id": self.connection_id}, + ) + + api_util.replace_connection_catalog( + connection_id=self.connection_id, + configured_catalog_dict=refreshed_catalog, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, + ) + return refreshed_catalog + + def set_stream_sync_mode( + self, + stream_name: str, + *, + sync_mode: str, + stream_namespace: str | None = None, + destination_sync_mode: str | None = None, + cursor_field: str | None = None, + ) -> None: + """Set the sync mode for a specific stream on this connection. + + Safely modifies only the specified stream in the connection's syncCatalog. + Validates that the requested sync mode is supported by the stream before + applying the change. + + Args: + stream_name: The name of the stream to modify. + sync_mode: The source sync mode to set (``"incremental"`` or ``"full_refresh"``). + stream_namespace: The namespace of the stream to modify. If not provided and + multiple streams share the same name, a ``PyAirbyteInputError`` is raised. + destination_sync_mode: The destination sync mode to set + (``"append"``, ``"overwrite"``, or ``"append_dedup"``). If not provided, + the existing destination sync mode is preserved. + cursor_field: The cursor field to use for incremental syncs. Required when + switching to incremental mode if the stream does not have a default + cursor. If not provided, the existing cursor field is preserved. + + Raises: + PyAirbyteInputError: If the stream is not found in the catalog, the requested + sync mode is not in the stream's ``supportedSyncModes``, the stream name + is ambiguous (multiple matches without a namespace), or incremental mode + is requested without a usable cursor field. + """ + catalog = self.dump_raw_catalog() + if not catalog or "streams" not in catalog: + raise PyAirbyteInputError( + message="Connection catalog is empty or missing.", + context={"connection_id": self.connection_id}, + ) + + # Find all matching streams by name (and optionally namespace) + matching_entries: list[dict[str, Any]] = [] + for entry in catalog["streams"]: + stream_def = entry.get("stream", {}) + if stream_def.get("name") != stream_name: + continue + if stream_namespace is not None and stream_def.get("namespace") != stream_namespace: + continue + matching_entries.append(entry) + + if not matching_entries: + available_streams = [ + f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}" + for e in catalog["streams"] + ] + raise PyAirbyteInputError( + message=f"Stream '{stream_name}' not found in connection catalog.", + context={ + "connection_id": self.connection_id, + "available_streams": available_streams, + }, + ) + + if len(matching_entries) > 1: + matching_namespaces = [e.get("stream", {}).get("namespace") for e in matching_entries] + raise PyAirbyteInputError( + message=( + f"Stream name '{stream_name}' is ambiguous " + f"({len(matching_entries)} matches found)." + ), + context={ + "connection_id": self.connection_id, + "stream_name": stream_name, + "matching_namespaces": matching_namespaces, + }, + ) + + target_entry = matching_entries[0] + + # Validate the sync mode is supported + stream_def = target_entry.get("stream", {}) + supported_sync_modes: list[str] = stream_def.get("supportedSyncModes", []) + if sync_mode not in supported_sync_modes: + raise PyAirbyteInputError( + message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."), + context={ + "stream_name": stream_name, + "requested_sync_mode": sync_mode, + "supported_sync_modes": supported_sync_modes, + }, + ) + + # Update the stream config + config = target_entry.get("config") or {} + + # Guard: fail fast if switching to incremental without a usable cursor + if sync_mode == "incremental": + existing_cursor = config.get("cursorField") or stream_def.get("defaultCursorField") + source_defined_cursor = bool(stream_def.get("sourceDefinedCursor")) + if cursor_field is None and not existing_cursor and not source_defined_cursor: + raise PyAirbyteInputError( + message=( + f"Stream '{stream_name}' needs a cursor field before switching " + "to incremental sync mode." + ), + context={ + "connection_id": self.connection_id, + "stream_name": stream_name, + }, + ) + + config["syncMode"] = sync_mode + + if destination_sync_mode is not None: + config["destinationSyncMode"] = destination_sync_mode + + if cursor_field is not None: + config["cursorField"] = [cursor_field] + + target_entry["config"] = config + + # Save the updated catalog + self.import_raw_catalog(catalog) + def rename(self, name: str) -> CloudConnection: """Rename the connection. diff --git a/airbyte/mcp/cloud.py b/airbyte/mcp/cloud.py index 594bee0db..0375204ce 100644 --- a/airbyte/mcp/cloud.py +++ b/airbyte/mcp/cloud.py @@ -2586,6 +2586,142 @@ def update_cloud_connection( ) +@mcp_tool( + destructive=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def refresh_connection_catalog( + ctx: Context, + connection_id: Annotated[ + str, + Field(description="The ID of the connection to refresh the catalog for."), + ], + *, + workspace_id: Annotated[ + str | None, + Field( + description=WORKSPACE_ID_TIP_TEXT, + default=None, + ), + ], +) -> str: + """Refresh the catalog for a connection by re-discovering the source schema. + + Triggers a discover operation on the connection's source connector and updates + the connection's catalog with the latest stream definitions and supported sync + modes. This is equivalent to clicking "Refresh source schema" in the Airbyte UI. + + This is useful after pinning a new connector version that advertises new streams + or updated sync mode support. + """ + check_guid_created_in_session(connection_id) + workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) + connection = workspace.get_connection(connection_id=connection_id) + + refreshed_catalog = connection.refresh_catalog() + stream_count = len(refreshed_catalog.get("streams", [])) + + return ( + f"Successfully refreshed catalog for connection '{connection_id}'. " + f"Catalog now contains {stream_count} stream(s). " + f"URL: {connection.connection_url}" + ) + + +@mcp_tool( + destructive=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def set_stream_sync_mode( + ctx: Context, + connection_id: Annotated[ + str, + Field(description="The ID of the connection to modify."), + ], + stream_name: Annotated[ + str, + Field(description="The name of the stream to change the sync mode for."), + ], + sync_mode: Annotated[ + Literal["incremental", "full_refresh"], + Field(description="The source sync mode to set: 'incremental' or 'full_refresh'."), + ], + *, + stream_namespace: Annotated[ + str | None, + Field( + description=( + "The namespace of the stream to modify. Required when multiple streams " + "share the same name but differ by namespace." + ), + default=None, + ), + ], + destination_sync_mode: Annotated[ + Literal["append", "overwrite", "append_dedup"] | None, + Field( + description=( + "The destination sync mode to set: 'append', 'overwrite', or 'append_dedup'. " + "If not provided, the existing destination sync mode is preserved." + ), + default=None, + ), + ], + cursor_field: Annotated[ + str | None, + Field( + description=( + "The cursor field to use for incremental syncs. " + "Required when switching to incremental mode if the stream does not have " + "a default cursor. If not provided, the existing cursor field is preserved." + ), + default=None, + ), + ], + workspace_id: Annotated[ + str | None, + Field( + description=WORKSPACE_ID_TIP_TEXT, + default=None, + ), + ], +) -> str: + """Set the sync mode for a specific stream on a connection. + + Safely modifies only the specified stream in the connection's syncCatalog. + Validates that the requested sync mode is supported by the stream before + applying the change. + + This is useful when switching a stream from full_refresh to incremental + (or vice versa) after a connector version upgrade that adds incremental support. + """ + check_guid_created_in_session(connection_id) + workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) + connection = workspace.get_connection(connection_id=connection_id) + + connection.set_stream_sync_mode( + stream_name=stream_name, + sync_mode=sync_mode, + stream_namespace=stream_namespace, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + ) + + return ( + f"Successfully set sync mode for stream '{stream_name}' " + f"on connection '{connection_id}' to '{sync_mode}'" + + ( + f" with destination sync mode '{destination_sync_mode}'" + if destination_sync_mode + else "" + ) + + (f" and cursor field '{cursor_field}'" if cursor_field else "") + + f". URL: {connection.connection_url}" + ) + + @mcp_tool( read_only=True, idempotent=True,