Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 100 additions & 7 deletions runner/arazzo_runner/executor/operation_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import jsonpointer

from arazzo_runner.auth.models import SecurityOption, SecurityRequirement
from arazzo_runner.evaluator import ExpressionEvaluator
from arazzo_runner.models import ExecutionState

# Configure logging
logger = logging.getLogger("arazzo-runner.executor")
Expand Down Expand Up @@ -39,6 +41,47 @@ def find_by_id(self, operation_id: str) -> dict | None:
Returns:
Dictionary with operation details or None if not found
"""
# Special handling: support references in Arazzo specs like
# $sourceDescriptions.<source_name>.<operationId>
if isinstance(operation_id, str) and operation_id.startswith("$sourceDescriptions."):
# parse into three parts: prefix, source_name, operation_name
parts = operation_id.split(".", 2)
if len(parts) == 3:
_, source_name, target_op = parts
source_desc = self.source_descriptions.get(source_name)
if source_desc:
paths = source_desc.get("paths", {})
for path, path_item in paths.items():
for method, operation in path_item.items():
if (
method in ["get", "post", "put", "delete", "patch"]
and operation.get("operationId") == target_op
):
try:
servers = source_desc.get("servers")
if not servers or not isinstance(servers, list):
raise ValueError(
"Missing or invalid 'servers' list in OpenAPI spec."
)
base_url = servers[0].get("url")
if not base_url or not isinstance(base_url, str):
raise ValueError(
"Missing or invalid 'url' in the first server object."
)
except (IndexError, ValueError) as e:
raise ValueError(
f"Could not determine base URL from OpenAPI spec servers: {e}"
) from e

return {
"source": source_name,
"path": path,
"method": method,
"url": base_url + path,
"operation": operation,
}

# Default: search all source descriptions for an operation with matching operationId
for source_name, source_desc in self.source_descriptions.items():
# Search through paths and operations
paths = source_desc.get("paths", {})
Expand Down Expand Up @@ -311,6 +354,9 @@ def _extract_path_method_with_regex(

# Decode the path (replace ~1 with / and ~0 with ~)
decoded_path = encoded_path.replace("~1", "/").replace("~0", "~")
# Normalize leading slashes: decoded_path may start with multiple slashes
# because encoded_path often begins with a leading '/'. Ensure a single leading slash.
decoded_path = "/" + decoded_path.lstrip("/")
Copy link

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path normalization logic is duplicated from existing code and could cause unexpected behavior. The comment mentions "multiple slashes" but the logic strips all leading slashes then adds one back, which could incorrectly modify paths that legitimately start with multiple slashes. Consider extracting this to a dedicated path normalization utility function to ensure consistent behavior across the codebase.

Copilot uses AI. Check for mistakes.
logger.debug(f"Decoded path: {decoded_path}")

# Try to find the operation in the source description
Expand Down Expand Up @@ -749,11 +795,58 @@ def get_operations_for_workflow(self, workflow: dict) -> list[dict]:
if op_info:
operations.append(op_info)
elif "operationPath" in step:
# operationPath format: <source>#<json_pointer>
match = re.match(r"([^#]+)#(.+)", step["operationPath"])
if match:
source_url, json_pointer = match.groups()
op_info = self.find_by_path(source_url, json_pointer)
if op_info:
operations.append(op_info)
# operationPath may be <source>#<json_pointer> or a runtime expression
# referencing a sourceDescription. We evaluate any braced expressions
# using the shared ExpressionEvaluator and then map the resolved
# left-hand value to a source name (by name or by matching the
# source's declared `url`) before delegating to find_by_path.
op_path = step["operationPath"]
if not isinstance(op_path, str):
continue

left, json_pointer = (op_path.split("#", 1) + [""])[0:2]
resolved_left = left.strip()

# Evaluate embedded braced runtime expressions using central evaluator
if "{" in resolved_left and "}" in resolved_left:
eval_state = ExecutionState(workflow_id="__internal__")

def _eval_braced(m: re.Match) -> str:
expr = m.group(1)
Comment on lines +812 to +815
Copy link

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an ExecutionState with a hardcoded workflow_id "internal" for expression evaluation may not provide the proper context for runtime expressions. This could lead to incorrect evaluation results if the expressions depend on actual workflow state. Consider passing the actual workflow context or creating a more appropriate evaluation context.

Suggested change
eval_state = ExecutionState(workflow_id="__internal__")
def _eval_braced(m: re.Match) -> str:
expr = m.group(1)
workflow_id = workflow.get("id", "__internal__")
eval_state = ExecutionState(workflow_id=workflow_id)
def _eval_braced(m: re.Match) -> str:

Copilot uses AI. Check for mistakes.
try:
val = ExpressionEvaluator.evaluate_expression(
expr, eval_state, self.source_descriptions
)
except Exception:
val = None
return "" if val is None else str(val)

resolved_left = re.sub(r"\{(\$[^}]+)\}", _eval_braced, resolved_left)

# Resolve a source name: prefer explicit $sourceDescriptions.<name>,
# otherwise match by declared url or exact name
source_name = None
if resolved_left.startswith("$sourceDescriptions."):
parts = resolved_left.split(".", 2)
source_name = parts[1] if len(parts) >= 2 else None
else:
source_name = next(
(
name
for name, desc in self.source_descriptions.items()
if desc.get("url") and (
desc.get("url") == resolved_left
or resolved_left.endswith(desc.get("url"))
or desc.get("url") in resolved_left
or resolved_left in desc.get("url")
)
),
None,
)
Comment on lines +834 to +845
Copy link

Copilot AI Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URL matching logic is overly permissive and could lead to false positives. Using substring matching (in operator) on both directions could match unintended sources. For example, if one source has URL "api.com" and another has "myapi.com", both would match "api.com". Consider using more precise matching logic such as exact matches or proper URL parsing.

Copilot uses AI. Check for mistakes.
if source_name is None and resolved_left in self.source_descriptions:
source_name = resolved_left

op_info = self.find_by_path(source_name or resolved_left, json_pointer)
if op_info:
operations.append(op_info)
return operations
102 changes: 102 additions & 0 deletions runner/tests/executor/test_operation_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,5 +371,107 @@ def test_get_security_requirements_for_openapi_operation_basic():
]


def test_find_by_id_with_source_descriptions_reference():
"""Ensure operationId references using $sourceDescriptions.<name>.<op> resolve to the correct source."""
source_descriptions = {
"pet-coupons": {
"servers": [{"url": "http://pet.example"}],
"paths": {
"/pet/{petId}/coupons": {
"get": {"operationId": "findPetsByTags", "summary": "Find pets"}
}
},
},
"other": {
"servers": [{"url": "http://other.example"}],
"paths": {"/items": {"get": {"operationId": "listItems"}}},
},
}

finder = OperationFinder(source_descriptions)

op_ref = "$sourceDescriptions.pet-coupons.findPetsByTags"
op_info = finder.find_by_id(op_ref)
assert op_info is not None
assert op_info["source"] == "pet-coupons"
assert op_info["path"] == "/pet/{petId}/coupons"
assert op_info["method"] == "get"


def test_find_by_id_with_missing_source_falls_back():
"""If the referenced source doesn't exist, find_by_id should fall back to global search or return None."""
source_descriptions = {
"api": {
"servers": [{"url": "http://localhost"}],
"paths": {"/foo": {"get": {"operationId": "op1"}}},
}
}

finder = OperationFinder(source_descriptions)

# Reference a non-existent source; should not raise, and should try global search (no match -> None)
op_ref = "$sourceDescriptions.nonexistent.op1"
op_info = finder.find_by_id(op_ref)
assert op_info is None


def test_get_operations_for_workflow_with_operationPath_runtime_expression():
"""Ensure get_operations_for_workflow handles operationPath runtime expressions
that reference a sourceDescriptions entry combined with a JSON Pointer.
"""
source_descriptions = {
"pet-coupons": {
"servers": [{"url": "http://pet.example"}],
"paths": {
"/pet/findByTags": {
"get": {"operationId": "findPetsByTags", "summary": "Find pets"}
}
},
}
}

finder = OperationFinder(source_descriptions)

wf = {"steps": [{"operationPath": "$sourceDescriptions.pet-coupons#/paths/~1pet~1findByTags/get"}]}

ops = finder.get_operations_for_workflow(wf)
assert isinstance(ops, list)
assert len(ops) == 1
op = ops[0]
assert op["source"] == "pet-coupons"
assert op["path"] == "/pet/findByTags"
assert op["method"] == "get"


def test_get_operations_for_workflow_with_braced_runtime_expression():
"""Ensure get_operations_for_workflow evaluates braced runtime expressions
embedded in operationPath, e.g. '{$sourceDescriptions.pet-coupons.url}#/paths/~1pet~1findByTags/get'
"""
source_descriptions = {
"pet-coupons": {
"url": "pet.example", # using a simple url attribute to exercise .url evaluation
"servers": [{"url": "http://pet.example"}],
"paths": {
"/pet/findByTags": {
"get": {"operationId": "findPetsByTags", "summary": "Find pets"}
}
},
}
}

finder = OperationFinder(source_descriptions)

# Example operationPath with braced runtime expression referencing the sourceDescriptions url
wf = {"steps": [{"operationPath": "{$sourceDescriptions.pet-coupons.url}#/paths/~1pet~1findByTags/get"}]}

ops = finder.get_operations_for_workflow(wf)
assert isinstance(ops, list)
assert len(ops) == 1
op = ops[0]
assert op["source"] == "pet-coupons"
assert op["path"] == "/pet/findByTags"
assert op["method"] == "get"


if __name__ == "__main__":
unittest.main()