From 6e19b02a0da8b249a6c95fa3449cc24a2a6d8295 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:02:01 +0000 Subject: [PATCH 1/2] feat(mcp): Add tools to modify stream sync settings and refresh catalog on existing connections Adds two new MCP tools: - refresh_connection_catalog: Triggers a discover operation on a connection's source and updates the catalog with latest stream definitions and sync modes - set_stream_sync_mode: Safely changes the sync mode for a specific stream on a connection, with validation that the mode is supported Core logic lives in CloudConnection (connections.py) and api_util.py, with MCP tools as thin wrappers per the presentation layer pattern. Closes #993 Co-Authored-By: AJ Steers --- airbyte/_util/api_util.py | 36 ++++++++++ airbyte/cloud/connections.py | 121 +++++++++++++++++++++++++++++++++ airbyte/mcp/cloud.py | 125 +++++++++++++++++++++++++++++++++++ 3 files changed, 282 insertions(+) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 650de345a..9dab3f4ff 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -2235,6 +2235,42 @@ def get_connection_catalog( ) +def get_refreshed_connection_catalog( + connection_id: str, + *, + api_root: str, + client_id: SecretString | None, + client_secret: SecretString | None, + bearer_token: SecretString | None, +) -> dict[str, Any]: + """Get the configured catalog for a connection with a refreshed schema from the source. + + Uses the Config API endpoint: POST /v1/web_backend/connections/get + with ``withRefreshedCatalog: true``, which triggers a discover operation + on the connection's source and returns the updated catalog. + + This is equivalent to clicking "Refresh source schema" in the Airbyte UI. + + Args: + connection_id: The connection ID to get catalog for. + api_root: The API root URL. + client_id: OAuth client ID. + client_secret: OAuth client secret. + bearer_token: Bearer token for authentication (alternative to client credentials). + + Returns: + Dictionary containing the connection info with refreshed syncCatalog. + """ + return _make_config_api_request( + path="/web_backend/connections/get", + json={"connectionId": connection_id, "withRefreshedCatalog": True}, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + bearer_token=bearer_token, + ) + + def replace_connection_catalog( connection_id: str, configured_catalog_dict: dict[str, Any], diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index cfa3149a1..45beff755 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -648,6 +648,127 @@ def import_raw_catalog(self, catalog: dict[str, Any]) -> None: bearer_token=self.workspace.bearer_token, ) + def refresh_catalog(self) -> dict[str, Any]: + """Refresh the connection's catalog by re-discovering the source schema. + + Triggers a discover operation on the connection's source connector, then + replaces the connection's catalog with the refreshed result. This is + equivalent to clicking "Refresh source schema" in the Airbyte UI. + + This is useful after pinning a new connector version that advertises + new streams or updated sync mode support. + + Returns: + The refreshed syncCatalog dict. + + Raises: + PyAirbyteInputError: If the refreshed catalog is empty or missing. + """ + refreshed_response = api_util.get_refreshed_connection_catalog( + connection_id=self.connection_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, + ) + refreshed_catalog = refreshed_response.get("syncCatalog") + if not refreshed_catalog: + raise PyAirbyteInputError( + message="Refreshed catalog is empty.", + context={"connection_id": self.connection_id}, + ) + + api_util.replace_connection_catalog( + connection_id=self.connection_id, + configured_catalog_dict=refreshed_catalog, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + bearer_token=self.workspace.bearer_token, + ) + return refreshed_catalog + + def set_stream_sync_mode( + self, + stream_name: str, + *, + sync_mode: str, + destination_sync_mode: str | None = None, + cursor_field: str | None = None, + ) -> None: + """Set the sync mode for a specific stream on this connection. + + Safely modifies only the specified stream in the connection's syncCatalog. + Validates that the requested sync mode is supported by the stream before + applying the change. + + Args: + stream_name: The name of the stream to modify. + sync_mode: The source sync mode to set (``"incremental"`` or ``"full_refresh"``). + destination_sync_mode: The destination sync mode to set + (``"append"``, ``"overwrite"``, or ``"append_dedup"``). If not provided, + the existing destination sync mode is preserved. + cursor_field: The cursor field to use for incremental syncs. Required when + switching to incremental mode if the stream does not have a default + cursor. If not provided, the existing cursor field is preserved. + + Raises: + PyAirbyteInputError: If the stream is not found in the catalog, or if the + requested sync mode is not in the stream's ``supportedSyncModes``. + """ + catalog = self.dump_raw_catalog() + if not catalog or "streams" not in catalog: + raise PyAirbyteInputError( + message="Connection catalog is empty or missing.", + context={"connection_id": self.connection_id}, + ) + + # Find the target stream + target_entry: dict[str, Any] | None = None + for entry in catalog["streams"]: + stream_def = entry.get("stream", {}) + if stream_def.get("name") == stream_name: + target_entry = entry + break + + if target_entry is None: + available_streams = [e.get("stream", {}).get("name") for e in catalog["streams"]] + raise PyAirbyteInputError( + message=f"Stream '{stream_name}' not found in connection catalog.", + context={ + "connection_id": self.connection_id, + "available_streams": available_streams, + }, + ) + + # Validate the sync mode is supported + stream_def = target_entry.get("stream", {}) + supported_sync_modes: list[str] = stream_def.get("supportedSyncModes", []) + if sync_mode not in supported_sync_modes: + raise PyAirbyteInputError( + message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."), + context={ + "stream_name": stream_name, + "requested_sync_mode": sync_mode, + "supported_sync_modes": supported_sync_modes, + }, + ) + + # Update the stream config + config = target_entry.get("config", {}) + config["syncMode"] = sync_mode + + if destination_sync_mode is not None: + config["destinationSyncMode"] = destination_sync_mode + + if cursor_field is not None: + config["cursorField"] = [cursor_field] + + target_entry["config"] = config + + # Save the updated catalog + self.import_raw_catalog(catalog) + def rename(self, name: str) -> CloudConnection: """Rename the connection. diff --git a/airbyte/mcp/cloud.py b/airbyte/mcp/cloud.py index 594bee0db..1df17dfd9 100644 --- a/airbyte/mcp/cloud.py +++ b/airbyte/mcp/cloud.py @@ -2586,6 +2586,131 @@ def update_cloud_connection( ) +@mcp_tool( + destructive=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def refresh_connection_catalog( + ctx: Context, + connection_id: Annotated[ + str, + Field(description="The ID of the connection to refresh the catalog for."), + ], + *, + workspace_id: Annotated[ + str | None, + Field( + description=WORKSPACE_ID_TIP_TEXT, + default=None, + ), + ], +) -> str: + """Refresh the catalog for a connection by re-discovering the source schema. + + Triggers a discover operation on the connection's source connector and updates + the connection's catalog with the latest stream definitions and supported sync + modes. This is equivalent to clicking "Refresh source schema" in the Airbyte UI. + + This is useful after pinning a new connector version that advertises new streams + or updated sync mode support. + """ + check_guid_created_in_session(connection_id) + workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) + connection = workspace.get_connection(connection_id=connection_id) + + refreshed_catalog = connection.refresh_catalog() + stream_count = len(refreshed_catalog.get("streams", [])) + + return ( + f"Successfully refreshed catalog for connection '{connection_id}'. " + f"Catalog now contains {stream_count} stream(s). " + f"URL: {connection.connection_url}" + ) + + +@mcp_tool( + destructive=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def set_stream_sync_mode( + ctx: Context, + connection_id: Annotated[ + str, + Field(description="The ID of the connection to modify."), + ], + stream_name: Annotated[ + str, + Field(description="The name of the stream to change the sync mode for."), + ], + sync_mode: Annotated[ + Literal["incremental", "full_refresh"], + Field(description="The source sync mode to set: 'incremental' or 'full_refresh'."), + ], + *, + destination_sync_mode: Annotated[ + Literal["append", "overwrite", "append_dedup"] | None, + Field( + description=( + "The destination sync mode to set: 'append', 'overwrite', or 'append_dedup'. " + "If not provided, the existing destination sync mode is preserved." + ), + default=None, + ), + ], + cursor_field: Annotated[ + str | None, + Field( + description=( + "The cursor field to use for incremental syncs. " + "Required when switching to incremental mode if the stream does not have " + "a default cursor. If not provided, the existing cursor field is preserved." + ), + default=None, + ), + ], + workspace_id: Annotated[ + str | None, + Field( + description=WORKSPACE_ID_TIP_TEXT, + default=None, + ), + ], +) -> str: + """Set the sync mode for a specific stream on a connection. + + Safely modifies only the specified stream in the connection's syncCatalog. + Validates that the requested sync mode is supported by the stream before + applying the change. + + This is useful when switching a stream from full_refresh to incremental + (or vice versa) after a connector version upgrade that adds incremental support. + """ + check_guid_created_in_session(connection_id) + workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) + connection = workspace.get_connection(connection_id=connection_id) + + connection.set_stream_sync_mode( + stream_name=stream_name, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + ) + + return ( + f"Successfully set sync mode for stream '{stream_name}' " + f"on connection '{connection_id}' to '{sync_mode}'" + + ( + f" with destination sync mode '{destination_sync_mode}'" + if destination_sync_mode + else "" + ) + + (f" and cursor field '{cursor_field}'" if cursor_field else "") + + f". URL: {connection.connection_url}" + ) + + @mcp_tool( read_only=True, idempotent=True, From e6fce67aa9112487e07f9b1922177ef1bf43d772 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:16:59 +0000 Subject: [PATCH 2/2] fix: Add namespace disambiguation and cursor field guard for set_stream_sync_mode Addresses CodeRabbit review feedback: - Add stream_namespace parameter to disambiguate same-named streams in different namespaces. Raises PyAirbyteInputError when name is ambiguous. - Add fail-fast guard when switching to incremental mode without a usable cursor field (no existing cursor, no default cursor, no source-defined cursor). Co-Authored-By: AJ Steers --- airbyte/cloud/connections.py | 65 ++++++++++++++++++++++++++++++------ airbyte/mcp/cloud.py | 11 ++++++ 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 45beff755..47a029ca5 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -693,6 +693,7 @@ def set_stream_sync_mode( stream_name: str, *, sync_mode: str, + stream_namespace: str | None = None, destination_sync_mode: str | None = None, cursor_field: str | None = None, ) -> None: @@ -705,6 +706,8 @@ def set_stream_sync_mode( Args: stream_name: The name of the stream to modify. sync_mode: The source sync mode to set (``"incremental"`` or ``"full_refresh"``). + stream_namespace: The namespace of the stream to modify. If not provided and + multiple streams share the same name, a ``PyAirbyteInputError`` is raised. destination_sync_mode: The destination sync mode to set (``"append"``, ``"overwrite"``, or ``"append_dedup"``). If not provided, the existing destination sync mode is preserved. @@ -713,8 +716,10 @@ def set_stream_sync_mode( cursor. If not provided, the existing cursor field is preserved. Raises: - PyAirbyteInputError: If the stream is not found in the catalog, or if the - requested sync mode is not in the stream's ``supportedSyncModes``. + PyAirbyteInputError: If the stream is not found in the catalog, the requested + sync mode is not in the stream's ``supportedSyncModes``, the stream name + is ambiguous (multiple matches without a namespace), or incremental mode + is requested without a usable cursor field. """ catalog = self.dump_raw_catalog() if not catalog or "streams" not in catalog: @@ -723,16 +728,21 @@ def set_stream_sync_mode( context={"connection_id": self.connection_id}, ) - # Find the target stream - target_entry: dict[str, Any] | None = None + # Find all matching streams by name (and optionally namespace) + matching_entries: list[dict[str, Any]] = [] for entry in catalog["streams"]: stream_def = entry.get("stream", {}) - if stream_def.get("name") == stream_name: - target_entry = entry - break - - if target_entry is None: - available_streams = [e.get("stream", {}).get("name") for e in catalog["streams"]] + if stream_def.get("name") != stream_name: + continue + if stream_namespace is not None and stream_def.get("namespace") != stream_namespace: + continue + matching_entries.append(entry) + + if not matching_entries: + available_streams = [ + f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}" + for e in catalog["streams"] + ] raise PyAirbyteInputError( message=f"Stream '{stream_name}' not found in connection catalog.", context={ @@ -741,6 +751,22 @@ def set_stream_sync_mode( }, ) + if len(matching_entries) > 1: + matching_namespaces = [e.get("stream", {}).get("namespace") for e in matching_entries] + raise PyAirbyteInputError( + message=( + f"Stream name '{stream_name}' is ambiguous " + f"({len(matching_entries)} matches found)." + ), + context={ + "connection_id": self.connection_id, + "stream_name": stream_name, + "matching_namespaces": matching_namespaces, + }, + ) + + target_entry = matching_entries[0] + # Validate the sync mode is supported stream_def = target_entry.get("stream", {}) supported_sync_modes: list[str] = stream_def.get("supportedSyncModes", []) @@ -755,7 +781,24 @@ def set_stream_sync_mode( ) # Update the stream config - config = target_entry.get("config", {}) + config = target_entry.get("config") or {} + + # Guard: fail fast if switching to incremental without a usable cursor + if sync_mode == "incremental": + existing_cursor = config.get("cursorField") or stream_def.get("defaultCursorField") + source_defined_cursor = bool(stream_def.get("sourceDefinedCursor")) + if cursor_field is None and not existing_cursor and not source_defined_cursor: + raise PyAirbyteInputError( + message=( + f"Stream '{stream_name}' needs a cursor field before switching " + "to incremental sync mode." + ), + context={ + "connection_id": self.connection_id, + "stream_name": stream_name, + }, + ) + config["syncMode"] = sync_mode if destination_sync_mode is not None: diff --git a/airbyte/mcp/cloud.py b/airbyte/mcp/cloud.py index 1df17dfd9..0375204ce 100644 --- a/airbyte/mcp/cloud.py +++ b/airbyte/mcp/cloud.py @@ -2649,6 +2649,16 @@ def set_stream_sync_mode( Field(description="The source sync mode to set: 'incremental' or 'full_refresh'."), ], *, + stream_namespace: Annotated[ + str | None, + Field( + description=( + "The namespace of the stream to modify. Required when multiple streams " + "share the same name but differ by namespace." + ), + default=None, + ), + ], destination_sync_mode: Annotated[ Literal["append", "overwrite", "append_dedup"] | None, Field( @@ -2694,6 +2704,7 @@ def set_stream_sync_mode( connection.set_stream_sync_mode( stream_name=stream_name, sync_mode=sync_mode, + stream_namespace=stream_namespace, destination_sync_mode=destination_sync_mode, cursor_field=cursor_field, )