diff --git a/src/oci-cloud-mcp-server/README.md b/src/oci-cloud-mcp-server/README.md index f184fc5a..d3f4f0c4 100644 --- a/src/oci-cloud-mcp-server/README.md +++ b/src/oci-cloud-mcp-server/README.md @@ -5,6 +5,7 @@ This server provides tools to interact with Oracle Cloud Infrastructure (OCI) services using the official OCI Python SDK directly (no OCI CLI subprocess calls). It exposes generic tools that let you: - Invoke any OCI SDK client operation by fully-qualified client class and method name - Discover available operations for a given OCI client +- Inspect a specific client operation and return expected kwargs ## Running the server @@ -26,6 +27,8 @@ ORACLE_MCP_HOST= ORACLE_MCP_PORT= uvx oracle.o | --- | --- | | invoke_oci_api | Invoke an OCI Python SDK API via client and operation name. Example: client_fqn="oci.core.ComputeClient", operation="list_instances", params={"compartment_id": "ocid1.compartment.oc1..."} | | list_client_operations | List public callable operations for a given OCI client class (by fully-qualified name). | +| get_client_operation_details | Get details for one operation on a client, including `expected_kwargs` when available from SDK source. | +| list_oci_clients | List all available OCI Python SDK clients discoverable in the current environment. | ### invoke_oci_api @@ -58,9 +61,64 @@ Response (shape): ### list_client_operations - client_fqn: Fully-qualified client class name, e.g. `oci.identity.IdentityClient` +- name_regex (optional): Regex to filter operation names (Python `re` syntax; uses `re.search`). Returns a list of operations with a short summary extracted from docstrings when available. +Example usage (filtering only list operations): +```json +{ + "client_fqn": "oci.core.ComputeClient", + "name_regex": "^list_" +} +``` + +### list_oci_clients + +Returns a list of OCI SDK client classes available in the installed `oci` Python SDK. + +Example usage: +```json +{} +``` + +Response (shape): +```json +{ + "count": 2, + "clients": [ + { "client_fqn": "oci.core.ComputeClient", "module": "oci.core", "class": "ComputeClient" }, + { "client_fqn": "oci.identity.IdentityClient", "module": "oci.identity", "class": "IdentityClient" } + ] +} +``` + +### get_client_operation_details + +- client_fqn: Fully-qualified client class name, e.g. `oci.core.ComputeClient` +- operation: Client method/operation, e.g. `list_instances` + +Returns operation metadata plus `expected_kwargs` parsed from SDK method source when available. + +Example usage: +```json +{ + "client_fqn": "oci.core.ComputeClient", + "operation": "list_instances" +} +``` + +Response (shape): +```json +{ + "client_fqn": "oci.core.ComputeClient", + "operation": "list_instances", + "summary": "Lists the instances in the specified compartment.", + "params": "(self, compartment_id, **kwargs)", + "expected_kwargs": ["limit", "opc_request_id", "page", "sort_by", "sort_order"] +} +``` + ## Passing complex model parameters Many OCI SDK operations expect complex model instances (e.g., CreateVcnDetails) rather than raw dictionaries. @@ -98,6 +156,52 @@ This server now automatically constructs SDK model objects from JSON parameters - Nested dictionaries and lists inside such parameters are recursively coerced. For lists that do not obviously map to a model type, you can provide explicit hints. +- Nested fields can be provided in Python SDK snake_case; the server normalizes them to OCI wire keys + (for example: `source_type -> sourceType`, `tcp_options -> tcpOptions`, + `destination_port_range -> destinationPortRange`, `instance_options -> instanceOptions`) + before model deserialization. + +Common nested update examples: + +```json +{ + "client_fqn": "oci.core.ComputeClient", + "operation": "update_instance", + "params": { + "instance_id": "ocid1.instance.oc1..exampleuniqueID", + "update_instance_details": { + "instance_options": { + "are_legacy_imds_endpoints_disabled": true + } + } + } +} +``` + +```json +{ + "client_fqn": "oci.core.VirtualNetworkClient", + "operation": "update_security_list", + "params": { + "security_list_id": "ocid1.securitylist.oc1..exampleuniqueID", + "update_security_list_details": { + "ingress_security_rules": [ + { + "protocol": "6", + "source": "0.0.0.0/0", + "source_type": "CIDR_BLOCK", + "tcp_options": { + "destination_port_range": { + "min": 22, + "max": 22 + } + } + } + ] + } + } +} +``` Explicit model hints (optional): - __model: Simple class name in the client's models module (e.g., "CreateVcnDetails") diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/server.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/server.py index d8e07575..5fcb9294 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/server.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/server.py @@ -7,6 +7,7 @@ import inspect import json import os +import pkgutil import re from importlib import import_module from logging import Logger @@ -29,16 +30,40 @@ invoking API clients and operations in-process (no CLI). - invoke_oci_api: Call any OCI SDK client operation by FQN and method. - list_client_operations: Discover available operations on a client. + - get_client_operation_details: Inspect one operation including expected kwargs. """, ) _user_agent_name = __project__.split("oracle.", 1)[1].split("-server", 1)[0] _ADDITIONAL_UA = f"{_user_agent_name}/{__version__}" +_STATEFUL_OPERATION_PREFIXES = ( + "create_", + "update_", + "delete_", + "launch_", + "terminate_", + "attach_", + "detach_", + "change_", + "move_", + "copy_", + "cancel_", + "restore_", + "export_", + "import_", + "start_", + "stop_", + "reboot_", + "reset_", + "patch_", + "bulk_", +) -# Backwards-compat pagination allowlist placeholder. -# Heuristic detection handles most cases; keep an empty set to avoid NameError -# in any residual fallback checks. -known_paginated: set = set() +# Backwards-compat pagination allowlist for operations whose pagination support +# is not always discoverable from signatures or generated source. +known_paginated: set = { + "get_rr_set", +} def _get_config_and_signer() -> Tuple[Dict[str, Any], Any]: @@ -97,9 +122,7 @@ def _import_client(client_fqn: str) -> Any: """ if "." not in client_fqn: raise ValueError("client_fqn must be a fully-qualified class name like 'oci.core.ComputeClient'") - module_name, class_name = client_fqn.rsplit(".", 1) - module = import_module(module_name) - cls = getattr(module, class_name) + cls = _load_client_class(client_fqn) if not inspect.isclass(cls): raise ValueError(f"{client_fqn} is not a class") config, signer = _get_config_and_signer() @@ -107,17 +130,409 @@ def _import_client(client_fqn: str) -> Any: return instance +def _load_client_class(client_fqn: str) -> type: + module_name, class_name = client_fqn.rsplit(".", 1) + module = import_module(module_name) + return getattr(module, class_name) + + +def _details_alias_keys(operation: str) -> Tuple[Optional[str], Optional[str]]: + if not operation.startswith(("create_", "update_")): + return None, None + _, _, op_rest = operation.partition("_") + if not op_rest: + return None, None + return f"{op_rest}_details", f"{operation}_details" + + +def _operation_suffix_details_alias_keys( + operation: str, +) -> Tuple[Optional[str], Optional[str]]: + if "_" not in operation: + return None, None + _, _, op_rest = operation.partition("_") + if not op_rest: + return None, None + return f"{op_rest}_details", f"{operation}_details" + + +def _apply_details_alias( + params: Dict[str, Any], operation: str, require_dst_in: Optional[set[str]] = None +) -> Dict[str, Any]: + src, dst = _details_alias_keys(operation) + if not src or not dst or src not in params or dst in params: + return dict(params) + if require_dst_in is not None and dst not in require_dst_in: + return dict(params) + updated = dict(params) + updated[dst] = updated.pop(src) + return updated + + +def _describe_callable(member: Callable[..., Any]) -> Tuple[str, str]: + try: + doc = (inspect.getdoc(member) or "").strip() + first_line = doc.splitlines()[0] if doc else "" + except Exception: + first_line = "" + + try: + params = str(inspect.signature(member)) + except Exception: + params = "" + + return first_line, params + + +def _validate_invoke_client_fqn(client_fqn: str) -> None: + if not isinstance(client_fqn, str) or not client_fqn.startswith("oci."): + raise ValueError("invoke_oci_api only allows OCI SDK client classes under the 'oci.' namespace") + if "." not in client_fqn: + raise ValueError("client_fqn must be a fully-qualified class name like 'oci.core.ComputeClient'") + class_name = client_fqn.rsplit(".", 1)[-1] + if not class_name.endswith("Client"): + raise ValueError("invoke_oci_api only allows OCI SDK client classes ending in 'Client'") + + +def _classify_invoke_error(operation: str, error: Exception) -> Dict[str, str]: + message = str(error) + lower_message = message.lower() + result = {"error": message} + + if not operation.startswith(_STATEFUL_OPERATION_PREFIXES): + return result + + if "invalid typeid provided" in lower_message or "missing 1 required positional argument" in lower_message: + result["error_category"] = "payload_shape" + result["error_hint"] = ( + "The request payload may not match the SDK shape expected by this operation. " + "Check the parameter names, required top-level details object, and nested model fields." + ) + return result + + if "incorrectstate" in lower_message or "lifecycle state" in lower_message or "not in the correct state" in lower_message: + result["error_category"] = "instance_lifecycle_state" + result["error_hint"] = ( + "The target resource may not be in a valid state for this operation. " + "Check the current resource state and retry when it is stable." + ) + return result + + shape_terms = ("shape_config", "shape config", "ocpus", "memory_in_gbs", "memoryingbs") + if "invalidparameter" in lower_message and any(term in lower_message for term in shape_terms): + result["error_category"] = "invalid_shape_config" + result["error_hint"] = ( + "One or more configuration values may be invalid for this resource. " + "Check the allowed values and constraints for the target operation." + ) + return result + + return result + + def _snake_to_camel(name: str) -> str: parts = name.split("_") return "".join(p.capitalize() for p in parts if p) +def _resolve_model_class_from_swagger_type( + swagger_type: str, models_module: Any +) -> Optional[type]: + """ + Resolve an OCI SDK model class from a swagger type string. + + Examples: + - "InstanceOptions" -> models.InstanceOptions + - "list[IngressSecurityRule]" -> models.IngressSecurityRule + - "dict(str, TcpOptions)" -> models.TcpOptions + """ + if not models_module or not isinstance(swagger_type, str): + return None + + t = swagger_type.strip() + + m_list = re.match(r"^list\[(.+)\]$", t) + if m_list: + return _resolve_model_class_from_swagger_type( + m_list.group(1).strip(), models_module + ) + + m_dict = re.match(r"^dict\(\s*[^,]+,\s*(.+)\)$", t) + if m_dict: + return _resolve_model_class_from_swagger_type( + m_dict.group(1).strip(), models_module + ) + + primitives = { + "str", + "int", + "float", + "bool", + "object", + "datetime", + "date", + } + if t in primitives: + return None + + class_name = t.rsplit(".", 1)[-1] + cls = _resolve_model_class(models_module, class_name) + return cls if inspect.isclass(cls) else None + + +def _resolve_discriminated_model_class( + swagger_type: str, payload: Dict[str, Any], models_module: Any +) -> Optional[type]: + """ + Resolve a more specific model class when payload contains a discriminator-like field. + + Example: + swagger_type = "InstanceSourceDetails", payload.source_type = "image" + -> InstanceSourceViaImageDetails + """ + if not models_module or not isinstance(payload, dict) or not isinstance(swagger_type, str): + return None + + if swagger_type == "SearchDetails": + discriminator = payload.get("type") + if not isinstance(discriminator, str): + if "query" in payload: + discriminator = "Structured" + elif "text" in payload: + discriminator = "FreeText" + known_search_types = { + "Structured": "StructuredSearchDetails", + "FreeText": "FreeTextSearchDetails", + } + cls_name = known_search_types.get(discriminator) + if cls_name: + cls = _resolve_model_class(models_module, cls_name) + if inspect.isclass(cls): + return cls + return None + + discriminator = payload.get("source_type") + if not isinstance(discriminator, str): + discriminator = payload.get("sourceType") + if not isinstance(discriminator, str): + return None + + norm = re.sub(r"[^a-zA-Z0-9]+", "_", discriminator).strip("_") + if not norm: + return None + discr_token = _snake_to_camel(norm.lower()) + + candidates: List[str] = [] + if swagger_type.endswith("Details"): + prefix = swagger_type[: -len("Details")] + candidates.append(f"{prefix}Via{discr_token}Details") + candidates.append(f"{prefix}{discr_token}Details") + + for cand in candidates: + cls = _resolve_model_class(models_module, cand) + if inspect.isclass(cls): + return cls + return None + + +def _apply_discriminator_defaults( + swagger_type: str, payload: Dict[str, Any] +) -> Dict[str, Any]: + if not isinstance(payload, dict) or not isinstance(swagger_type, str): + return payload + + updated = dict(payload) + if swagger_type == "SearchDetails" and "type" not in updated: + if "query" in updated: + updated["type"] = "Structured" + elif "text" in updated: + updated["type"] = "FreeText" + return updated + + +def _resolve_model_class_with_payload( + models_module: Any, swagger_type: str, payload: Dict[str, Any] +) -> Tuple[Optional[type], Dict[str, Any]]: + adjusted_payload = _apply_discriminator_defaults(swagger_type, payload) + cls = _resolve_discriminated_model_class(swagger_type, adjusted_payload, models_module) + if cls is None: + cls = _resolve_model_class_from_swagger_type(swagger_type, models_module) + return cls, adjusted_payload + + +def _normalize_value_for_swagger_type( + value: Any, swagger_type: str, models_module: Any, use_wire_keys: bool = False +) -> Any: + """ + Normalize nested dict/list payloads to SDK attribute_map keys based on swagger type. + """ + if not isinstance(swagger_type, str): + return value + + t = swagger_type.strip() + + if isinstance(value, list): + m_list = re.match(r"^list\[(.+)\]$", t) + if not m_list: + return value + item_type = m_list.group(1).strip() + return [ + _normalize_value_for_swagger_type( + item, item_type, models_module, use_wire_keys=use_wire_keys + ) + for item in value + ] + + if isinstance(value, dict): + value = _apply_discriminator_defaults(t, value) + m_dict = re.match(r"^dict\(\s*[^,]+,\s*(.+)\)$", t) + if m_dict: + val_type = m_dict.group(1).strip() + return { + k: _normalize_value_for_swagger_type( + v, val_type, models_module, use_wire_keys=use_wire_keys + ) + for k, v in value.items() + } + + nested_cls, value = _resolve_model_class_with_payload(models_module, t, value) + if nested_cls: + normalized = _normalize_mapping_for_model( + value, nested_cls, models_module, use_wire_keys=use_wire_keys + ) + # Prefer passing real nested model instances to parent constructors. + try: + return _construct_model_with_class(normalized, nested_cls, models_module) + except Exception: + return normalized + + return value + + +def _normalize_mapping_for_model( + payload: Dict[str, Any], + model_cls: type, + models_module: Any, + use_wire_keys: bool, +) -> Dict[str, Any]: + """ + Convert a model payload from snake_case attribute names to the SDK wire keys + defined in model_cls.attribute_map, recursively for nested model fields. + """ + if not isinstance(payload, dict): + return payload + + swagger_types, attribute_map = _get_model_schema(model_cls) + if not isinstance(swagger_types, dict): + return payload + if not isinstance(attribute_map, dict): + attribute_map = {} + + reverse_attribute_map = { + wire: attr for attr, wire in attribute_map.items() if isinstance(wire, str) + } + + normalized: Dict[str, Any] = {} + for in_key, raw_value in payload.items(): + attr_key = in_key if in_key in swagger_types else reverse_attribute_map.get(in_key) + if attr_key in swagger_types: + out_key = attribute_map.get(attr_key, in_key) if use_wire_keys else attr_key + normalized[out_key] = _normalize_value_for_swagger_type( + raw_value, + swagger_types[attr_key], + models_module, + use_wire_keys=use_wire_keys, + ) + else: + normalized[in_key] = raw_value + + return normalized + + +def _get_model_schema(model_cls: type) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, str]]]: + """ + Return (swagger_types, attribute_map) for an OCI model class. + + OCI SDK 2.160.0+ exposes these on instances, so instantiate the model + before reading its schema metadata. + """ + try: + inst = model_cls() + swagger_types = getattr(inst, "swagger_types", None) + attribute_map = getattr(inst, "attribute_map", None) + if isinstance(swagger_types, dict): + if not isinstance(attribute_map, dict): + attribute_map = {} + return swagger_types, attribute_map + except Exception: + pass + + return None, None + + +def _from_dict_if_available(model_cls: type, payload: Dict[str, Any]): + """ + Best-effort wrapper for SDKs that expose oci.util.from_dict. + OCI SDK 2.160.0 does not expose this symbol. + """ + from_dict = getattr(oci.util, "from_dict", None) + if callable(from_dict): + return from_dict(model_cls, payload) + raise AttributeError("oci.util.from_dict is not available") + + +def _to_model_attribute_map_keys( + payload: Dict[str, Any], model_cls: type, models_module: Any +) -> Dict[str, Any]: + """ + Convert to SDK wire keys (camelCase) using attribute_map. + Kept for compatibility with existing tests/helpers. + """ + return _normalize_mapping_for_model( + payload, model_cls, models_module, use_wire_keys=True + ) + + +def _to_model_attribute_keys( + payload: Dict[str, Any], model_cls: type, models_module: Any +) -> Dict[str, Any]: + """ + Convert payload keys to model attribute names (snake_case) recursively. + This is the most robust input shape for oci.util.from_dict/constructors. + """ + return _normalize_mapping_for_model( + payload, model_cls, models_module, use_wire_keys=False + ) + + def _import_models_module_from_client_fqn(client_fqn: str): try: - pkg = client_fqn.rsplit(".", 1)[0] - return import_module(f"{pkg}.models") + # Typical OCI client FQNs look like: + # oci.core.compute_client.ComputeClient + # and models live at: + # oci.core.models + parts = client_fqn.split(".") + candidates: List[str] = [] + + if len(parts) >= 4: + # preferred: drop "." + candidates.append(".".join(parts[:-2]) + ".models") + if len(parts) >= 3: + # fallback for simpler FQNs used in tests/mocks: "x.y.Client" -> "x.y.models" + candidates.append(".".join(parts[:-1]) + ".models") + + seen = set() + for mod_name in candidates: + if mod_name in seen: + continue + seen.add(mod_name) + try: + return import_module(mod_name) + except Exception: + continue except Exception: - return None + pass + return None def _resolve_model_class(models_module: Any, class_name: str): @@ -127,6 +542,63 @@ def _resolve_model_class(models_module: Any, class_name: str): return None +def _candidate_model_names_for_param( + param_name: str, operation_name: str +) -> List[str]: + names: List[str] = [] + if not isinstance(param_name, str) or not param_name: + return names + + suffixes = ("_details", "_config", "_configuration", "_source_details") + for suffix in suffixes: + if not param_name.endswith(suffix): + continue + names.append(_snake_to_camel(param_name)) + if suffix == "_details": + # add verb-specific class names only for canonical payload keys, + # e.g. vcn_details/create_vcn_details -> CreateVcnDetails. + op_verb, _, op_rest = operation_name.partition("_") + if op_verb in ("create", "update"): + if param_name in (f"{op_rest}_details", f"{operation_name}_details"): + base = _snake_to_camel(op_rest) + names.append(f"{op_verb.capitalize()}{base}Details") + break + + # de-duplicate while preserving order + seen = set() + out: List[str] = [] + for n in names: + if n not in seen: + out.append(n) + seen.add(n) + return out + + +def _operation_param_model_candidates( + method: Optional[Callable[..., Any]], operation_name: str +) -> Dict[str, List[str]]: + hints: Dict[str, List[str]] = {} + if method is None: + return hints + try: + sig = inspect.signature(method) + except Exception: + return hints + + for name, param in sig.parameters.items(): + if name in ("self", "kwargs"): + continue + if param.kind in ( + inspect.Parameter.VAR_POSITIONAL, + inspect.Parameter.VAR_KEYWORD, + ): + continue + cands = _candidate_model_names_for_param(name, operation_name) + if cands: + hints[name] = cands + return hints + + def _coerce_mapping_values( mapping: Dict[str, Any], models_module: Any, parent_prefix: Optional[str] = None ) -> Dict[str, Any]: @@ -141,15 +613,20 @@ def _coerce_mapping_values( out: Dict[str, Any] = {} for key, val in mapping.items(): if isinstance(val, dict): + if key == "source_details" or key.endswith("_source_details"): + # Keep source details as a plain mapping so parent swagger_types + + # discriminator normalization can resolve concrete models correctly. + out[key] = _coerce_mapping_values(val, models_module, parent_prefix=None) + continue candidates: List[str] = [] for s in suffixes: if key.endswith(s): base_camel = _snake_to_camel(key) - candidates.append(base_camel) # also try parent-prefixed variants like 'LaunchInstanceShapeConfigDetails' if parent_prefix: candidates.append(f"{parent_prefix}{base_camel}Details") candidates.append(f"{parent_prefix}{base_camel}") + candidates.append(base_camel) break out[key] = _construct_model_from_mapping(val, models_module, candidates) elif isinstance(val, list): @@ -193,55 +670,71 @@ def _construct_model_from_mapping( cls = getattr(mod, cls_name) if inspect.isclass(cls): try: - return oci.util.from_dict(cls, clean) + clean = _apply_discriminator_defaults(cls_name, clean) + return _build_model_instance(clean, cls, models_module) except Exception: - return cls(**clean) + pass except Exception: pass # try explicit simple class name within models module if models_module and isinstance(class_name, str): - cls = _resolve_model_class(models_module, class_name) + cls, clean = _resolve_model_class_with_payload(models_module, class_name, clean) if inspect.isclass(cls): - # filter unknown keys via swagger_types (when available) before constructing - filtered_clean = clean try: - swagger_types = getattr(cls, "swagger_types", None) - if isinstance(swagger_types, dict): - filtered_clean = {k: v for k, v in clean.items() if k in swagger_types} + return _build_model_instance(clean, cls, models_module) except Exception: - filtered_clean = clean - try: - return oci.util.from_dict(cls, filtered_clean) - except Exception: - try: - return cls(**filtered_clean) - except Exception: - pass + pass # try candidates derived from param name if models_module: for cand in candidate_classnames: - cls = _resolve_model_class(models_module, cand) + cls, clean = _resolve_model_class_with_payload(models_module, cand, clean) if inspect.isclass(cls): - # filter unknown keys before constructing (honor swagger_types when present) - filtered_clean = clean - try: - swagger_types = getattr(cls, "swagger_types", None) - if isinstance(swagger_types, dict): - filtered_clean = {k: v for k, v in clean.items() if k in swagger_types} - except Exception: - filtered_clean = clean try: - return oci.util.from_dict(cls, filtered_clean) + return _build_model_instance(clean, cls, models_module) except Exception: - try: - return cls(**filtered_clean) - except Exception: - continue + continue # fall back to original mapping return mapping -def _coerce_params_to_oci_models(client_fqn: str, operation: str, params: Dict[str, Any]) -> Dict[str, Any]: +def _construct_model_with_class(mapping: Dict[str, Any], cls: type, models_module: Any): + clean = {k: v for k, v in mapping.items() if not k.startswith("__")} + parent_prefix_hint: Optional[str] = None + cls_name = getattr(cls, "__name__", "") + if isinstance(cls_name, str) and cls_name: + if cls_name.endswith("Details"): + parent_prefix_hint = cls_name[: -len("Details")] + else: + parent_prefix_hint = cls_name + + clean = _coerce_mapping_values( + clean, models_module, parent_prefix=parent_prefix_hint + ) + + return _build_model_instance(clean, cls, models_module) + + +def _build_model_instance(mapping: Dict[str, Any], cls: type, models_module: Any): + normalized_clean = _to_model_attribute_keys(mapping, cls, models_module) + filtered_clean = normalized_clean + try: + swagger_types, _ = _get_model_schema(cls) + if isinstance(swagger_types, dict): + filtered_clean = { + k: v for k, v in normalized_clean.items() if k in swagger_types + } + except Exception: + filtered_clean = normalized_clean + try: + return _from_dict_if_available(cls, filtered_clean) + except Exception: + return cls(**filtered_clean) +def _coerce_params_to_oci_models( + client_fqn: str, + operation: str, + params: Dict[str, Any], + method: Optional[Callable[..., Any]] = None, +) -> Dict[str, Any]: """ Convert plain dict/list params to OCI model instances when appropriate. Heuristics: @@ -252,34 +745,60 @@ def _coerce_params_to_oci_models(client_fqn: str, operation: str, params: Dict[s if not params: return {} models_module = _import_models_module_from_client_fqn(client_fqn) + source_params = ( + _align_params_to_signature(method, operation, params) + if method is not None + else _apply_details_alias(params, operation) + ) + op_hints = _operation_param_model_candidates(method, operation) suffixes = ("_details", "_config", "_configuration", "_source_details") out: Dict[str, Any] = {} - for key, val in params.items(): + for key, val in source_params.items(): if isinstance(val, dict): - candidates: List[str] = [] + candidates: List[str] = list(op_hints.get(key, [])) dest_key = key for s in suffixes: if key.endswith(s): - candidates.append(_snake_to_camel(key)) + candidates.extend(_candidate_model_names_for_param(key, operation)) if s == "_details": - base = _snake_to_camel(key[: -len(s)]) - # try verb-specific model classes if this is a create_/update_ op - if operation.startswith("create_"): - candidates.append(f"Create{base}Details") - elif operation.startswith("update_"): - candidates.append(f"Update{base}Details") # rename "_details" to the SDK's expected # "create__details"/"update__details" - if operation.startswith("create_") or operation.startswith("update_"): - _, _, op_rest = operation.partition("_") - if key == f"{op_rest}_details": - # only rename to SDK-expected key when destination not already - # provided by caller - alt_key = f"{operation}_details" - if alt_key not in params: - dest_key = alt_key + src, dst = _details_alias_keys(operation) + if key == src and dst and dst not in params: + dest_key = dst break - out[dest_key] = _construct_model_from_mapping(val, models_module, candidates) + if dest_key != key: + candidates.extend(op_hints.get(dest_key, [])) + candidates.extend(_candidate_model_names_for_param(dest_key, operation)) + # de-duplicate while preserving order + seen = set() + deduped: List[str] = [] + for c in candidates: + if c not in seen: + deduped.append(c) + seen.add(c) + constructed = None + if models_module: + for cand in deduped: + cls, val = _resolve_model_class_with_payload( + models_module, cand, val + ) + if inspect.isclass(cls): + swagger_types, _ = _get_model_schema(cls) + if not isinstance(swagger_types, dict): + continue + try: + constructed = _construct_model_with_class( + val, cls, models_module + ) + break + except Exception: + continue + if constructed is None: + constructed = _construct_model_from_mapping( + val, models_module, deduped + ) + out[dest_key] = constructed elif isinstance(val, list): new_list: List[Any] = [] for item in val: @@ -296,13 +815,7 @@ def _coerce_params_to_oci_models(client_fqn: str, operation: str, params: Dict[s out[key] = val # final aliasing: if caller used "_details" and op is create_/update_, # rename to the SDK-expected "create__details"/"update__details". - if operation.startswith("create_") or operation.startswith("update_"): - _, _, op_rest = operation.partition("_") - src = f"{op_rest}_details" - dst = f"{operation}_details" - if src in out and dst not in out: - out[dst] = out.pop(src) - return out + return _apply_details_alias(out, operation) def _align_params_to_signature( @@ -316,20 +829,68 @@ def _align_params_to_signature( """ try: sig = inspect.signature(method) - param_names = set(sig.parameters.keys()) except Exception: return params - aligned = dict(params) - if operation_name.startswith("create_") or operation_name.startswith("update_"): - _, _, op_rest = operation_name.partition("_") - src = f"{op_rest}_details" - dst = f"{operation_name}_details" - if src in aligned and dst not in aligned and dst in param_names: - aligned[dst] = aligned.pop(src) + sig_params = dict(sig.parameters) + param_names = set(sig_params.keys()) + aligned = _apply_details_alias(aligned, operation_name, require_dst_in=param_names) + + expected_src, expected_dst = _operation_suffix_details_alias_keys(operation_name) + if ( + expected_src + and expected_dst + and expected_src in aligned + and expected_dst in param_names + and expected_dst not in aligned + ): + aligned[expected_dst] = aligned.pop(expected_src) + + # General fallback for SDK operations that use abbreviated id parameters + # (e.g., ig_id, rt_id). If exactly one caller *_id key does not match and + # exactly one signature *_id parameter is missing, remap it. + explicit_param_names = { + name + for name, p in sig_params.items() + if name != "self" + and p.kind + not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD) + } + unknown_id_keys = [ + k for k in aligned.keys() if k.endswith("_id") and k not in explicit_param_names + ] + missing_id_params = [ + p + for p in explicit_param_names + if p.endswith("_id") and p not in aligned + ] + if len(unknown_id_keys) == 1 and len(missing_id_params) == 1: + src_id = unknown_id_keys[0] + dst_id = missing_id_params[0] + aligned[dst_id] = aligned.pop(src_id) return aligned +def _coerce_params_for_invoke( + client_fqn: str, + operation: str, + params: Dict[str, Any], + method: Callable[..., Any], +) -> Dict[str, Any]: + """ + Call model coercion with the newer method hint when supported, while keeping + older test doubles that only accept the original 3-argument signature working. + """ + try: + return _coerce_params_to_oci_models( + client_fqn, operation, params, method=method + ) + except TypeError as e: + if "unexpected keyword argument 'method'" not in str(e): + raise + return _coerce_params_to_oci_models(client_fqn, operation, params) + + def _serialize_oci_data(data: Any) -> Any: """ Convert OCI SDK model objects or collections into JSON-serializable structures. @@ -343,6 +904,16 @@ def ensure_jsonable(obj: Any) -> Any: return [ensure_jsonable(x) for x in obj] if isinstance(obj, dict): return {k: ensure_jsonable(v) for k, v in obj.items()} + # OCI models in newer SDK versions expose swagger_types on instances. + swagger_types = getattr(obj, "swagger_types", None) + if isinstance(swagger_types, dict): + out: Dict[str, Any] = {} + for key in swagger_types.keys(): + try: + out[key] = ensure_jsonable(getattr(obj, key)) + except Exception: + continue + return out try: json.dumps(obj) return obj @@ -352,7 +923,7 @@ def ensure_jsonable(obj: Any) -> Any: try: converted = oci.util.to_dict(data) except Exception: - converted = data + converted = ensure_jsonable(data) return ensure_jsonable(converted) @@ -400,6 +971,9 @@ def _supports_pagination(method: Callable[..., Any], operation_name: str) -> boo - Method signature includes 'page' or 'limit' kwargs (explicit params). - Method accepts **kwargs and operation name indicates record/rrset style (DNS-like). """ + if operation_name.startswith(_STATEFUL_OPERATION_PREFIXES): + return False + try: if operation_name.startswith("list_"): return True @@ -411,14 +985,14 @@ def _supports_pagination(method: Callable[..., Any], operation_name: str) -> boo if ek and (("page" in ek) or ("limit" in ek)): return True - # inspect docstring for pagination-related params exposed in docs - # (e.g., ':param int limit:' or ':param str page:') try: if _docstring_mentions_pagination(method): return True except Exception: pass + if operation_name in known_paginated: + return True sig = inspect.signature(method) param_names = set(sig.parameters.keys()) if "page" in param_names or "limit" in param_names: @@ -439,43 +1013,36 @@ def _call_with_pagination_if_applicable( """ if _supports_pagination(method, operation_name): logger.info(f"Using paginator for operation {operation_name}") - response = oci.pagination.list_call_get_all_results(method, **params) - opc_request_id = None try: - opc_request_id = response.headers.get("opc-request-id") - except Exception: + response = oci.pagination.list_call_get_all_results(method, **params) opc_request_id = None - return response.data, opc_request_id + try: + opc_request_id = response.headers.get("opc-request-id") + except Exception: + opc_request_id = None + return response.data, opc_request_id + except Exception as e: + logger.warning( + f"Paginator path failed for {operation_name}; falling back to direct call: {e}" + ) # non-list operation; pre-alias known kwarg patterns and invoke, with fallback aliasing - call_params = dict(params) - if operation_name.startswith("create_") or operation_name.startswith("update_"): - _, _, op_rest = operation_name.partition("_") - src = f"{op_rest}_details" - dst = f"{operation_name}_details" - if src in call_params and dst not in call_params: - call_params[dst] = call_params.pop(src) + call_params = _apply_details_alias(params, operation_name) try: logger.debug(f"_call_with_pagination_if_applicable call_params keys: {list(call_params.keys())}") logger.debug(f"op: {operation_name}") response = method(**call_params) except TypeError as e: - # fallback: if user passed "_details" for a create_/update_ op, - # retry with "create__details"/"update__details" msg = str(e) - if "unexpected keyword argument" in msg and ( - operation_name.startswith("create_") or operation_name.startswith("update_") - ): + if "unexpected keyword argument" in msg: try: bad_kw = msg.split("'")[1] - _, _, op_rest = operation_name.partition("_") - expected_src = f"{op_rest}_details" - if bad_kw == expected_src and expected_src in call_params: - alt_key = f"{operation_name}_details" - new_params = dict(call_params) - new_params[alt_key] = new_params.pop(expected_src) - response = method(**new_params) + expected_src, _ = _details_alias_keys(operation_name) + if expected_src and bad_kw == expected_src and bad_kw in call_params: + retry_params = dict(call_params) + retry_params.pop(bad_kw, None) + response = method(**retry_params) else: raise except Exception: @@ -501,9 +1068,9 @@ def invoke_oci_api( client_fqn: Annotated[str, "Fully-qualified client class, e.g. 'oci.core.ComputeClient'"], operation: Annotated[str, "Client method/operation name, e.g. 'list_instances' or 'get_instance'"], params: Annotated[ - Dict[str, Any], + Optional[Dict[str, Any]], "Keyword arguments for the operation (JSON object). Use snake_case keys as in SDK.", - ] = {}, + ] = None, ) -> dict: """ Example: @@ -512,6 +1079,7 @@ def invoke_oci_api( params={'compartment_id': '', 'availability_domain': '...'} """ try: + _validate_invoke_client_fqn(client_fqn) client = _import_client(client_fqn) if not hasattr(client, operation): raise AttributeError(f"Operation '{operation}' not found on client '{client_fqn}'") @@ -521,42 +1089,15 @@ def invoke_oci_api( raise AttributeError(f"Attribute '{operation}' on client '{client_fqn}' is not callable") params = params or {} - # pre-normalize parameter key to the SDK-expected kw for create_/update_ ops, - # so downstream coercion and invocation consistently use the correct key. - normalized_params = dict(params) - if operation.startswith("create_") or operation.startswith("update_"): - _, _, op_rest = operation.partition("_") - src = f"{op_rest}_details" - dst = f"{operation}_details" - if src in normalized_params and dst not in normalized_params: - normalized_params[dst] = normalized_params.pop(src) - - coerced_params = _coerce_params_to_oci_models(client_fqn, operation, normalized_params) - # final kwarg aliasing at the top-level prior to invocation to ensure correct SDK kw - final_params = dict(coerced_params) - - final_params = _align_params_to_signature(method, operation, final_params) + coerced_params = _coerce_params_for_invoke( + client_fqn, operation, params, method + ) + final_params = _align_params_to_signature(method, operation, coerced_params) logger.debug(f"invoke_oci_api final_params keys: {list(final_params.keys())}") logger.debug(f"op: {operation}") - try: - data, opc_request_id = _call_with_pagination_if_applicable(method, final_params, operation) - except TypeError as e: - msg = str(e) - if "unexpected keyword argument" in msg and ( - operation.startswith("create_") or operation.startswith("update_") - ): - # last-chance aliasing retry at top level - _, _, op_rest = operation.partition("_") - src = f"{op_rest}_details" - dst = f"{operation}_details" - if src in final_params and dst not in final_params: - alt_params = dict(final_params) - alt_params[dst] = alt_params.pop(src) - data, opc_request_id = _call_with_pagination_if_applicable(method, alt_params, operation) - else: - raise - else: - raise + data, opc_request_id = _call_with_pagination_if_applicable( + method, final_params, operation + ) result = { "client": client_fqn, @@ -572,46 +1113,188 @@ def invoke_oci_api( except Exception as e: logger.error(f"Error invoking OCI API {client_fqn}.{operation}: {e}") + error_info = _classify_invoke_error(operation, e) return { "client": client_fqn, "operation": operation, "params": params or {}, - "error": str(e), + **error_info, } @mcp.tool(description="List public callable operations for a given OCI client class.") def list_client_operations( - client_fqn: Annotated[str, "Fully-qualified client class, e.g. 'oci.core.ComputeClient'"], + client_fqn: Annotated[ + str, "Fully-qualified client class, e.g. 'oci.core.ComputeClient'" + ], + name_regex: Annotated[ + Optional[str], + "Optional regex to filter operations by name (Python re syntax). Uses re.search.", + ] = None, ) -> dict: try: - module_name, class_name = client_fqn.rsplit(".", 1) - module = import_module(module_name) - cls = getattr(module, class_name) + cls = _load_client_class(client_fqn) if not inspect.isclass(cls): raise ValueError(f"{client_fqn} is not a class") + compiled: Optional[re.Pattern] = None + if name_regex: + try: + compiled = re.compile(name_regex) + except re.error as e: + raise ValueError(f"Invalid name_regex: {e}") + ops: List[dict] = [] for name, member in inspect.getmembers(cls, predicate=inspect.isfunction): if name.startswith("_"): continue - try: - doc = (inspect.getdoc(member) or "").strip() - first_line = doc.splitlines()[0] if doc else "" - params = inspect.signature(member) - except Exception: - first_line = "" - params = "" - ops.append({"name": name, "summary": first_line, "params": str(params)}) + if compiled and not compiled.search(name): + continue + first_line, params = _describe_callable(member) + ops.append({"name": name, "summary": first_line, "params": params}) - logger.info(f"Found {len(ops)} operations on {client_fqn}") + logger.info( + f"Found {len(ops)} operations on {client_fqn} (name_regex={name_regex!r})" + ) # return a mapping to avoid Pydantic RootModel list-wrapping - return {"operations": ops} + result = {"operations": ops} + if name_regex is not None: + result["name_regex"] = name_regex + return result except Exception as e: logger.error(f"Error listing operations for {client_fqn}: {e}") raise +@mcp.tool( + description="Get details for one OCI client operation, including expected_kwargs when available." +) +def get_client_operation_details( + client_fqn: Annotated[ + str, "Fully-qualified client class, e.g. 'oci.core.ComputeClient'" + ], + operation: Annotated[str, "Client method/operation name, e.g. 'list_instances'"], +) -> dict: + try: + cls = _load_client_class(client_fqn) + if not inspect.isclass(cls): + raise ValueError(f"{client_fqn} is not a class") + + if not hasattr(cls, operation): + raise AttributeError( + f"Operation '{operation}' not found on client '{client_fqn}'" + ) + + member = getattr(cls, operation) + if not callable(member): + raise AttributeError( + f"Attribute '{operation}' on client '{client_fqn}' is not callable" + ) + + first_line, params = _describe_callable(member) + + expected_kwargs = _extract_expected_kwargs_from_source(member) + if expected_kwargs is None: + expected_kwargs_list = [] + else: + expected_kwargs_list = sorted(expected_kwargs) + + return { + "client_fqn": client_fqn, + "operation": operation, + "summary": first_line, + "params": params, + "expected_kwargs": expected_kwargs_list, + } + except Exception as e: + logger.error( + f"Error getting operation details for {client_fqn}.{operation}: {e}" + ) + raise + + +def _discover_oci_clients() -> List[dict]: + """Discover available OCI Python SDK client classes. + + Returns a list of entries of the shape: + { + "client_fqn": "oci.core.ComputeClient", + "module": "oci.core", + "class": "ComputeClient" + } + + Detection: + - Walk submodules under `oci`. + - Import only modules whose name ends with `_client` (OCI SDK convention) + to avoid importing every single models module. + - Collect any public class ending with `Client`. + """ + clients: List[dict] = [] + + try: + def iter_oci_submodules(): + # walk packages recursively under `oci` but only import *_client modules + for modinfo in pkgutil.walk_packages(oci.__path__, prefix="oci."): + name = getattr(modinfo, "name", None) or modinfo[1] + if not isinstance(name, str): + continue + if not name.endswith("_client"): + continue + yield name + + seen: set[str] = set() + for module_name in iter_oci_submodules(): + try: + module = import_module(module_name) + except Exception: + continue + + for cls_name, member in inspect.getmembers(module, predicate=inspect.isclass): + if cls_name.startswith("_"): + continue + if cls_name == "BaseClient": + continue + if not cls_name.endswith("Client"): + continue + try: + src = inspect.getsource(member.__init__) + if "self.base_client" not in src: + continue + except Exception: + # If we can't inspect source, keep the client-like class rather than + # dropping a valid OCI client from discovery. + pass + + client_module = getattr(member, "__module__", None) + if not isinstance(client_module, str) or not client_module: + client_module = module_name + client_fqn = f"{client_module}.{cls_name}" + if client_fqn in seen: + continue + seen.add(client_fqn) + clients.append( + { + "client_fqn": client_fqn, + "module": module_name, + "class": cls_name, + } + ) + except Exception as e: + logger.error(f"Error discovering OCI SDK clients: {e}") + raise + + # Stable output for tests and user experience + clients.sort(key=lambda c: c["client_fqn"]) + return clients + + +@mcp.tool(description="List all available clients in the installed OCI Python SDK.") +def list_oci_clients() -> dict: + """Return a list of discoverable OCI SDK client classes.""" + clients = _discover_oci_clients() + return {"count": len(clients), "clients": clients} + + def main(): host = os.getenv("ORACLE_MCP_HOST") port = os.getenv("ORACLE_MCP_PORT") diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_bootstrap_and_introspection.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_bootstrap_and_introspection.py index 296527b7..7b7f6986 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_bootstrap_and_introspection.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_bootstrap_and_introspection.py @@ -239,6 +239,25 @@ def create_vcn(vcn_details): # noqa: ARG001 out = _align_params_to_signature(create_vcn, "create_vcn", params) assert out == params + def test_does_not_remap_unrelated_details_key(self): + def create_vcn(create_vcn_details): # noqa: ARG001 + return None + + params = {"subnet_details": {"x": 1}} + out = _align_params_to_signature(create_vcn, "create_vcn", params) + assert out == params + + def test_remaps_non_create_update_operation_details_from_operation_suffix(self): + def move_compartment(move_compartment_details): # noqa: ARG001 + return None + + params = {"compartment_details": {"x": 1}} + out = _align_params_to_signature( + move_compartment, "move_compartment", params + ) + assert "move_compartment_details" in out + assert "compartment_details" not in out + class TestGetConfigAndSignerMoreBranches: def test_token_signer_build_failure_falls_back_to_api_key(self, monkeypatch): diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_edge_cases.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_edge_cases.py index dcf63231..af5d15b9 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_edge_cases.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_edge_cases.py @@ -4,6 +4,7 @@ from fastmcp import Client from fastmcp.exceptions import ToolError from oracle.oci_cloud_mcp_server.server import ( + _align_params_to_signature, _call_with_pagination_if_applicable, _coerce_params_to_oci_models, _construct_model_from_mapping, @@ -11,6 +12,8 @@ _resolve_model_class, _serialize_oci_data, _supports_pagination, + _to_model_attribute_map_keys, + invoke_oci_api, mcp, ) @@ -38,6 +41,29 @@ def raising_from_dict(cls, data): # noqa: ARG001 assert isinstance(inst, MyDetails) assert inst._data == {"a": 1} + def test_candidate_ctor_works_without_from_dict_symbol(self, monkeypatch): + class MyDetails: + def __init__(self, **kwargs): + self.swagger_types = {"a": "int"} + self.attribute_map = {"a": "a"} + self._data = dict(kwargs) + + fake_models = SimpleNamespace(MyDetails=MyDetails) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, + "from_dict", + lambda cls, data: (_ for _ in ()).throw(Exception("unexpected")), # noqa: ARG005 + raising=False, + ) + + inst = _construct_model_from_mapping( + {"a": 1, "b": 2}, fake_models, ["MyDetails"] + ) + assert isinstance(inst, MyDetails) + assert inst._data == {"a": 1} + class TestCoerceTopLevelConfigSuffix: def test_top_level_config_suffix_constructs_model(self, monkeypatch): @@ -98,7 +124,7 @@ def create_vcn(self, create_vcn_details): # noqa: ARG001 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "create_vcn", "params": {"vcn_details": {"k": 42}}, }, @@ -110,6 +136,218 @@ def create_vcn(self, create_vcn_details): # noqa: ARG001 assert res["data"]["k"] == 42 +class TestNestedSnakeCaseNormalization: + def test_model_attribute_map_normalizes_nested_snake_case(self): + class PortRange: + def __init__(self, **kwargs): + self.swagger_types = {"min": "int", "max": "int"} + self.attribute_map = {"min": "min", "max": "max"} + self.kw = dict(kwargs) + + class TcpOptions: + def __init__(self, **kwargs): + self.swagger_types = {"destination_port_range": "PortRange"} + self.attribute_map = { + "destination_port_range": "destinationPortRange" + } + self.kw = dict(kwargs) + + class IngressSecurityRule: + def __init__(self, **kwargs): + self.swagger_types = { + "protocol": "str", + "source": "str", + "source_type": "str", + "tcp_options": "TcpOptions", + } + self.attribute_map = { + "protocol": "protocol", + "source": "source", + "source_type": "sourceType", + "tcp_options": "tcpOptions", + } + self.kw = dict(kwargs) + + class UpdateSecurityListDetails: + def __init__(self, **kwargs): + self.swagger_types = { + "ingress_security_rules": "list[IngressSecurityRule]" + } + self.attribute_map = { + "ingress_security_rules": "ingressSecurityRules" + } + self.kw = dict(kwargs) + + fake_models = SimpleNamespace( + PortRange=PortRange, + TcpOptions=TcpOptions, + IngressSecurityRule=IngressSecurityRule, + UpdateSecurityListDetails=UpdateSecurityListDetails, + ) + + out = _to_model_attribute_map_keys( + { + "ingress_security_rules": [ + { + "protocol": "6", + "source": "0.0.0.0/0", + "source_type": "CIDR_BLOCK", + "tcp_options": { + "destination_port_range": {"min": 22, "max": 22} + }, + } + ] + }, + UpdateSecurityListDetails, + fake_models, + ) + + assert "ingressSecurityRules" in out + rule = out["ingressSecurityRules"][0] + assert isinstance(rule, IngressSecurityRule) + assert "source_type" in rule.kw + assert "tcp_options" in rule.kw + assert isinstance(rule.kw["tcp_options"], TcpOptions) + assert "destination_port_range" in rule.kw["tcp_options"].kw + + def test_model_attribute_map_normalizes_route_rule_network_entity_id(self): + class RouteRule: + swagger_types = { + "destination": "str", + "destination_type": "str", + "network_entity_id": "str", + } + attribute_map = { + "destination": "destination", + "destination_type": "destinationType", + "network_entity_id": "networkEntityId", + } + + class UpdateRouteTableDetails: + swagger_types = {"route_rules": "list[RouteRule]"} + attribute_map = {"route_rules": "routeRules"} + + fake_models = SimpleNamespace( + RouteRule=RouteRule, UpdateRouteTableDetails=UpdateRouteTableDetails + ) + + out = _to_model_attribute_map_keys( + { + "route_rules": [ + { + "destination": "0.0.0.0/0", + "destination_type": "CIDR_BLOCK", + "network_entity_id": "ocid1.internetgateway.oc1..example", + } + ] + }, + UpdateRouteTableDetails, + fake_models, + ) + + assert "routeRules" in out + rr = out["routeRules"][0] + assert rr["destinationType"] == "CIDR_BLOCK" + assert rr["networkEntityId"] == "ocid1.internetgateway.oc1..example" + + def test_model_attribute_map_normalizes_launch_instance_source_details(self): + class InstanceSourceViaImageDetails: + swagger_types = {"source_type": "str", "image_id": "str"} + attribute_map = {"source_type": "sourceType", "image_id": "imageId"} + + class LaunchInstanceShapeConfigDetails: + swagger_types = {"ocpus": "float", "memory_in_gbs": "float"} + attribute_map = {"ocpus": "ocpus", "memory_in_gbs": "memoryInGBs"} + + class CreateVnicDetails: + swagger_types = {"subnet_id": "str", "assign_public_ip": "bool"} + attribute_map = {"subnet_id": "subnetId", "assign_public_ip": "assignPublicIp"} + + class LaunchInstanceDetails: + swagger_types = { + "availability_domain": "str", + "compartment_id": "str", + "shape": "str", + "shape_config": "LaunchInstanceShapeConfigDetails", + "source_details": "InstanceSourceViaImageDetails", + "create_vnic_details": "CreateVnicDetails", + } + attribute_map = { + "availability_domain": "availabilityDomain", + "compartment_id": "compartmentId", + "shape": "shape", + "shape_config": "shapeConfig", + "source_details": "sourceDetails", + "create_vnic_details": "createVnicDetails", + } + + fake_models = SimpleNamespace( + LaunchInstanceDetails=LaunchInstanceDetails, + InstanceSourceViaImageDetails=InstanceSourceViaImageDetails, + LaunchInstanceShapeConfigDetails=LaunchInstanceShapeConfigDetails, + CreateVnicDetails=CreateVnicDetails, + ) + + out = _to_model_attribute_map_keys( + { + "availability_domain": "AD-1", + "compartment_id": "ocid1.compartment.oc1..example", + "shape": "VM.Standard.A1.Flex", + "shape_config": {"ocpus": 1.0, "memory_in_gbs": 6.0}, + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + }, + "create_vnic_details": { + "subnet_id": "ocid1.subnet.oc1..example", + "assign_public_ip": True, + }, + }, + LaunchInstanceDetails, + fake_models, + ) + + assert out["availabilityDomain"] == "AD-1" + assert out["compartmentId"] == "ocid1.compartment.oc1..example" + assert out["shapeConfig"]["memoryInGBs"] == 6.0 + assert out["sourceDetails"]["sourceType"] == "image" + assert out["sourceDetails"]["imageId"] == "ocid1.image.oc1..example" + assert out["createVnicDetails"]["assignPublicIp"] is True + + def test_model_attribute_map_uses_source_details_discriminator(self): + class InstanceSourceDetails: + swagger_types = {"source_type": "str"} + attribute_map = {"source_type": "sourceType"} + + class InstanceSourceViaImageDetails: + swagger_types = {"source_type": "str", "image_id": "str"} + attribute_map = {"source_type": "sourceType", "image_id": "imageId"} + + class LaunchInstanceDetails: + swagger_types = {"source_details": "InstanceSourceDetails"} + attribute_map = {"source_details": "sourceDetails"} + + fake_models = SimpleNamespace( + InstanceSourceDetails=InstanceSourceDetails, + InstanceSourceViaImageDetails=InstanceSourceViaImageDetails, + LaunchInstanceDetails=LaunchInstanceDetails, + ) + + out = _to_model_attribute_map_keys( + { + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + } + }, + LaunchInstanceDetails, + fake_models, + ) + + assert out["sourceDetails"]["sourceType"] == "image" + assert out["sourceDetails"]["imageId"] == "ocid1.image.oc1..example" + + class TestListClientOperationsImportErrorTool: @pytest.mark.asyncio async def test_list_client_operations_import_error(self, monkeypatch): @@ -148,7 +386,7 @@ def get_list(self): res = ( await client.call_tool( "invoke_oci_api", - {"client_fqn": "x.y.FakeClient", "operation": "get_list"}, + {"client_fqn": "oci.fake.FakeClient", "operation": "get_list"}, ) ).data @@ -303,6 +541,412 @@ def __init__(self, **kwargs): assert isinstance(out["update_vcn_details"], UpdateVcnDetails) assert out["update_vcn_details"].kw == {"a": 2} + def test_method_hint_prefers_create_model_over_generic(self, monkeypatch): + class VcnDetails: + def __init__(self, **kwargs): + raise Exception("generic should not be selected") + + class CreateVcnDetails: + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + fake_models = SimpleNamespace( + VcnDetails=VcnDetails, + CreateVcnDetails=CreateVcnDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + + def create_vcn(create_vcn_details): # noqa: ARG001 + return None + + out = _coerce_params_to_oci_models( + "x.y.Client", + "create_vcn", + {"vcn_details": {"a": 1}}, + method=create_vcn, + ) + assert isinstance(out["create_vcn_details"], CreateVcnDetails) + assert out["create_vcn_details"].kw == {"a": 1} + + +class TestSchemaDrivenCoercion: + def test_update_route_table_uses_instance_level_model_schema(self, monkeypatch): + class RouteRule: + def __init__(self, **kwargs): + self.swagger_types = { + "destination": "str", + "destination_type": "str", + "network_entity_id": "str", + } + self.attribute_map = { + "destination": "destination", + "destination_type": "destinationType", + "network_entity_id": "networkEntityId", + } + self.kw = dict(kwargs) + + class UpdateRouteTableDetails: + def __init__(self, **kwargs): + self.swagger_types = {"route_rules": "list[RouteRule]"} + self.attribute_map = {"route_rules": "routeRules"} + self.kw = dict(kwargs) + + fake_models = SimpleNamespace( + RouteRule=RouteRule, + UpdateRouteTableDetails=UpdateRouteTableDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + + out = _coerce_params_to_oci_models( + "x.y.Client", + "update_route_table", + { + "update_route_table_details": { + "route_rules": [ + { + "destination": "0.0.0.0/0", + "destination_type": "CIDR_BLOCK", + "network_entity_id": "ocid1.internetgateway.oc1..example", + } + ] + } + }, + ) + details = out["update_route_table_details"] + assert isinstance(details, UpdateRouteTableDetails) + rr = details.kw["route_rules"][0] + if isinstance(rr, dict): + assert rr["destination_type"] == "CIDR_BLOCK" + assert rr["network_entity_id"] == "ocid1.internetgateway.oc1..example" + else: + assert isinstance(rr, RouteRule) + assert rr.kw["destination_type"] == "CIDR_BLOCK" + assert rr.kw["network_entity_id"] == "ocid1.internetgateway.oc1..example" + + def test_update_route_table_with_real_sdk_models(self, monkeypatch): + from oci.core import models as core_models + + # OCI SDK 2.160.0 stores swagger_types/attribute_map on instances. + assert not hasattr(core_models.UpdateRouteTableDetails, "swagger_types") + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: core_models, + ) + + out = _coerce_params_to_oci_models( + "oci.core.VirtualNetworkClient", + "update_route_table", + { + "update_route_table_details": { + "route_rules": [ + { + "destination": "0.0.0.0/0", + "destination_type": "CIDR_BLOCK", + "network_entity_id": "ocid1.internetgateway.oc1..example", + } + ] + } + }, + ) + details = out["update_route_table_details"] + assert isinstance(details, core_models.UpdateRouteTableDetails) + rr = details.route_rules[0] + rr_neid = rr["network_entity_id"] if isinstance(rr, dict) else rr.network_entity_id + assert rr_neid == "ocid1.internetgateway.oc1..example" + + def test_launch_instance_with_real_sdk_models_builds_typed_source_details( + self, monkeypatch + ): + from oci.core import models as core_models + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: core_models, + ) + + out = _coerce_params_to_oci_models( + "oci.core.ComputeClient", + "launch_instance", + { + "launch_instance_details": { + "availability_domain": "AD-1", + "compartment_id": "ocid1.compartment.oc1..example", + "shape": "VM.Standard.A1.Flex", + "shape_config": {"ocpus": 1, "memory_in_gbs": 6}, + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + }, + "create_vnic_details": { + "subnet_id": "ocid1.subnet.oc1..example", + "assign_public_ip": True, + }, + } + }, + ) + + details = out["launch_instance_details"] + assert isinstance(details, core_models.LaunchInstanceDetails) + assert isinstance(details.shape_config, core_models.LaunchInstanceShapeConfigDetails) + assert isinstance(details.create_vnic_details, core_models.CreateVnicDetails) + assert isinstance( + details.source_details, core_models.InstanceSourceViaImageDetails + ) + assert details.source_details.source_type == "image" + + def test_update_route_table_plain_snake_case_coerces_nested_route_rules( + self, monkeypatch + ): + class RouteRule: + swagger_types = { + "destination": "str", + "destination_type": "str", + "network_entity_id": "str", + } + attribute_map = { + "destination": "destination", + "destination_type": "destinationType", + "network_entity_id": "networkEntityId", + } + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + class UpdateRouteTableDetails: + swagger_types = {"route_rules": "list[RouteRule]"} + attribute_map = {"route_rules": "routeRules"} + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + fake_models = SimpleNamespace( + RouteRule=RouteRule, + UpdateRouteTableDetails=UpdateRouteTableDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + out = _coerce_params_to_oci_models( + "x.y.Client", + "update_route_table", + { + "update_route_table_details": { + "route_rules": [ + { + "destination": "0.0.0.0/0", + "destination_type": "CIDR_BLOCK", + "network_entity_id": "ocid1.internetgateway.oc1..example", + } + ] + } + }, + ) + details = out["update_route_table_details"] + assert isinstance(details, UpdateRouteTableDetails) + rr = details.kw["route_rules"][0] + if isinstance(rr, dict): + assert rr["destination_type"] == "CIDR_BLOCK" + assert rr["network_entity_id"] == "ocid1.internetgateway.oc1..example" + else: + assert isinstance(rr, RouteRule) + assert rr.kw["destination_type"] == "CIDR_BLOCK" + assert rr.kw["network_entity_id"] == "ocid1.internetgateway.oc1..example" + + def test_launch_instance_plain_snake_case_no_explicit_model_hints(self, monkeypatch): + class LaunchInstanceShapeConfigDetails: + swagger_types = {"ocpus": "float", "memory_in_gbs": "float"} + attribute_map = {"ocpus": "ocpus", "memory_in_gbs": "memoryInGBs"} + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + class InstanceSourceViaImageDetails: + swagger_types = {"source_type": "str", "image_id": "str"} + attribute_map = {"source_type": "sourceType", "image_id": "imageId"} + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + class CreateVnicDetails: + swagger_types = {"subnet_id": "str", "assign_public_ip": "bool"} + attribute_map = {"subnet_id": "subnetId", "assign_public_ip": "assignPublicIp"} + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + class LaunchInstanceDetails: + swagger_types = { + "availability_domain": "str", + "compartment_id": "str", + "shape": "str", + "shape_config": "LaunchInstanceShapeConfigDetails", + "source_details": "InstanceSourceDetails", + "create_vnic_details": "CreateVnicDetails", + "metadata": "dict(str, str)", + } + attribute_map = { + "availability_domain": "availabilityDomain", + "compartment_id": "compartmentId", + "shape": "shape", + "shape_config": "shapeConfig", + "source_details": "sourceDetails", + "create_vnic_details": "createVnicDetails", + "metadata": "metadata", + } + + def __init__(self, **kwargs): + self.kw = dict(kwargs) + + fake_models = SimpleNamespace( + LaunchInstanceDetails=LaunchInstanceDetails, + LaunchInstanceShapeConfigDetails=LaunchInstanceShapeConfigDetails, + InstanceSourceViaImageDetails=InstanceSourceViaImageDetails, + CreateVnicDetails=CreateVnicDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + out = _coerce_params_to_oci_models( + "x.y.Client", + "launch_instance", + { + "launch_instance_details": { + "availability_domain": "AD-1", + "compartment_id": "ocid1.compartment.oc1..example", + "shape": "VM.Standard.A1.Flex", + "shape_config": {"ocpus": 1, "memory_in_gbs": 6}, + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + }, + "create_vnic_details": { + "subnet_id": "ocid1.subnet.oc1..example", + "assign_public_ip": True, + }, + "metadata": {"ssh_authorized_keys": "ssh-rsa AAA..."}, + } + }, + ) + + details = out["launch_instance_details"] + assert isinstance(details, LaunchInstanceDetails) + shape_cfg = details.kw["shape_config"] + src = details.kw["source_details"] + vnic = details.kw["create_vnic_details"] + + shape_mem = ( + shape_cfg["memory_in_gbs"] + if isinstance(shape_cfg, dict) + else shape_cfg.kw["memory_in_gbs"] + ) + src_type = src["source_type"] if isinstance(src, dict) else src.kw["source_type"] + src_img = src["image_id"] if isinstance(src, dict) else src.kw["image_id"] + assign_public = ( + vnic["assign_public_ip"] if isinstance(vnic, dict) else vnic.kw["assign_public_ip"] + ) + + assert shape_mem == 6 + assert src_type == "image" + assert src_img == "ocid1.image.oc1..example" + assert assign_public is True + if not isinstance(src, dict): + assert isinstance(src, InstanceSourceViaImageDetails) + + def test_search_resources_plain_query_defaults_to_structured_details( + self, monkeypatch + ): + from oci.resource_search import models as search_models + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: search_models, + ) + + out = _coerce_params_to_oci_models( + "oci.resource_search.ResourceSearchClient", + "search_resources", + {"search_details": {"query": "query instance resources"}}, + ) + + details = out["search_details"] + assert isinstance(details, search_models.StructuredSearchDetails) + assert details.type == "Structured" + assert details.query == "query instance resources" + + def test_invoke_launch_instance_plain_dict_payload_coerces_models( + self, monkeypatch + ): + from oci.core import models as core_models + + captured: dict = {} + + class FakeComputeClient: + def launch_instance(self, launch_instance_details, **kwargs): # noqa: ARG002 + captured["details"] = launch_instance_details + return SimpleNamespace( + data={"id": "ocid1.instance.oc1..example"}, + headers={"opc-request-id": "req-123"}, + ) + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_client", + lambda client_fqn: FakeComputeClient(), + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: core_models, + ) + + result = invoke_oci_api.fn( + "oci.core.ComputeClient", + "launch_instance", + { + "launch_instance_details": { + "availability_domain": "AD-1", + "compartment_id": "ocid1.compartment.oc1..example", + "display_name": "test", + "shape": "VM.Standard.A1.Flex", + "shape_config": {"ocpus": 1, "memory_in_gbs": 6}, + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + }, + "create_vnic_details": { + "subnet_id": "ocid1.subnet.oc1..example", + "assign_public_ip": True, + }, + } + }, + ) + + details = captured["details"] + assert isinstance(details, core_models.LaunchInstanceDetails) + assert isinstance(details.shape_config, core_models.LaunchInstanceShapeConfigDetails) + assert isinstance(details.create_vnic_details, core_models.CreateVnicDetails) + assert isinstance(details.source_details, core_models.InstanceSourceViaImageDetails) + assert result["opc_request_id"] == "req-123" + class TestListElementClassFqn: def test_list_item_with_class_fqn_constructs(self, monkeypatch): @@ -340,6 +984,21 @@ def test_list_of_scalars_passthrough(self): assert out["items"] == [1, 2, 3] +class TestSignatureAlignment: + def test_align_maps_single_unknown_id_to_single_expected_id(self): + def delete_internet_gateway(ig_id, **kwargs): # noqa: ARG001 + return None + + out = _align_params_to_signature( + delete_internet_gateway, + "delete_internet_gateway", + {"internet_gateway_id": "ocid1.internetgateway.oc1..example"}, + ) + assert "ig_id" in out + assert out["ig_id"] == "ocid1.internetgateway.oc1..example" + assert "internet_gateway_id" not in out + + class TestFqnCtorFallback: def test_construct_model_from_fqn_from_dict_failure_uses_ctor(self, monkeypatch): # module and class available by FQN @@ -443,6 +1102,256 @@ def __init__(self, **kwargs): assert isinstance(inst._data["shape_config"], InstanceShapeConfig) assert inst._data["shape_config"]._data["ocpus"] == 1 + def test_parent_prefix_preferred_over_generic_nested_candidate( + self, monkeypatch + ): + class LaunchInstanceDetails: + swagger_types = {"shape_config": "LaunchInstanceShapeConfigDetails"} + attribute_map = {"shape_config": "shapeConfig"} + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + class ShapeConfig: + def __init__(self, **kwargs): + raise Exception("generic should not be used") + + class LaunchInstanceShapeConfigDetails: + def __init__(self, **kwargs): + self._data = dict(kwargs) + + fake_models = SimpleNamespace( + LaunchInstanceDetails=LaunchInstanceDetails, + ShapeConfig=ShapeConfig, + LaunchInstanceShapeConfigDetails=LaunchInstanceShapeConfigDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + out = _coerce_params_to_oci_models( + "x.y.Fake", + "launch_instance", + {"launch_instance_details": {"shape_config": {"ocpus": 1}}}, + ) + inst = out["launch_instance_details"] + assert isinstance(inst, LaunchInstanceDetails) + shape_cfg = inst._data["shape_config"] + if isinstance(shape_cfg, dict): + assert shape_cfg["ocpus"] == 1 + else: + assert isinstance(shape_cfg, LaunchInstanceShapeConfigDetails) + assert shape_cfg._data["ocpus"] == 1 + + def test_parent_prefix_avoids_generic_shape_config_in_construct_with_class( + self, monkeypatch + ): + class LaunchInstanceDetails: + swagger_types = {"shape_config": "LaunchInstanceShapeConfigDetails"} + attribute_map = {"shape_config": "shapeConfig"} + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + class ShapeConfig: + # Generic class that accepts kwargs; prior behavior could select this, + # leading to malformed launch payloads for real SDK requests. + def __init__(self, **kwargs): + self._data = dict(kwargs) + + class LaunchInstanceShapeConfigDetails: + swagger_types = {"ocpus": "float"} + attribute_map = {"ocpus": "ocpus"} + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + fake_models = SimpleNamespace( + LaunchInstanceDetails=LaunchInstanceDetails, + ShapeConfig=ShapeConfig, + LaunchInstanceShapeConfigDetails=LaunchInstanceShapeConfigDetails, + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import _construct_model_with_class + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + inst = _construct_model_with_class( + {"shape_config": {"ocpus": 1}}, + LaunchInstanceDetails, + fake_models, + ) + assert isinstance(inst, LaunchInstanceDetails) + shape_cfg = inst._data["shape_config"] + assert isinstance(shape_cfg, LaunchInstanceShapeConfigDetails) + assert shape_cfg._data["ocpus"] == 1 + + def test_source_details_stays_mapping_for_parent_discriminator_path( + self, monkeypatch + ): + class LaunchInstanceDetails: + swagger_types = {"source_details": "InstanceSourceDetails"} + attribute_map = {"source_details": "sourceDetails"} + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + class SourceDetails: + # intentionally wrong candidate; this should not be chosen for source_details + def __init__(self, **kwargs): + self._data = dict(kwargs) + + fake_models = SimpleNamespace( + LaunchInstanceDetails=LaunchInstanceDetails, + SourceDetails=SourceDetails, + ) + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + out = _coerce_params_to_oci_models( + "x.y.Fake", + "launch_instance", + { + "launch_instance_details": { + "source_details": { + "source_type": "image", + "image_id": "ocid1.image.oc1..example", + } + } + }, + ) + inst = out["launch_instance_details"] + assert isinstance(inst, LaunchInstanceDetails) + src = ( + inst._data["sourceDetails"] + if "sourceDetails" in inst._data + else inst._data["source_details"] + ) + assert isinstance(src, dict) + assert src["source_type"] == "image" + assert src["image_id"] == "ocid1.image.oc1..example" + + def test_construct_model_with_class_keeps_camelcase_keys_via_normalization( + self, monkeypatch + ): + class CreateSubnetDetails: + swagger_types = { + "compartment_id": "str", + "vcn_id": "str", + "dns_label": "str", + "prohibit_public_ip_on_vnic": "bool", + } + attribute_map = { + "compartment_id": "compartmentId", + "vcn_id": "vcnId", + "dns_label": "dnsLabel", + "prohibit_public_ip_on_vnic": "prohibitPublicIpOnVnic", + } + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + fake_models = SimpleNamespace(CreateSubnetDetails=CreateSubnetDetails) + from oracle.oci_cloud_mcp_server.server import _construct_model_with_class + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + inst = _construct_model_with_class( + { + "compartment_id": "ocid1.compartment.oc1..x", + "vcnId": "ocid1.vcn.oc1..x", + "dnsLabel": "edge1", + "prohibitPublicIpOnVnic": False, + }, + CreateSubnetDetails, + fake_models, + ) + assert isinstance(inst, CreateSubnetDetails) + assert inst._data["compartment_id"] == "ocid1.compartment.oc1..x" + assert inst._data["vcn_id"] == "ocid1.vcn.oc1..x" + assert inst._data["dns_label"] == "edge1" + assert inst._data["prohibit_public_ip_on_vnic"] is False + + def test_create_subnet_alias_with_mixed_key_styles_coerces_model(self, monkeypatch): + class CreateSubnetDetails: + swagger_types = { + "compartment_id": "str", + "vcn_id": "str", + "cidr_block": "str", + "display_name": "str", + "dns_label": "str", + "prohibit_public_ip_on_vnic": "bool", + } + attribute_map = { + "compartment_id": "compartmentId", + "vcn_id": "vcnId", + "cidr_block": "cidrBlock", + "display_name": "displayName", + "dns_label": "dnsLabel", + "prohibit_public_ip_on_vnic": "prohibitPublicIpOnVnic", + } + + def __init__(self, **kwargs): + self._data = dict(kwargs) + + fake_models = SimpleNamespace(CreateSubnetDetails=CreateSubnetDetails) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_models_module_from_client_fqn", + lambda fqn: fake_models, + ) + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr( + _oci.util, "from_dict", lambda cls, data: cls(**data), raising=False + ) + + out = _coerce_params_to_oci_models( + "x.y.Client", + "create_subnet", + { + "subnet_details": { + "compartment_id": "ocid1.compartment.oc1..x", + "vcnId": "ocid1.vcn.oc1..x", + "cidr_block": "10.0.1.0/24", + "display_name": "edge-subnet", + "dnsLabel": "edge1", + "prohibitPublicIpOnVnic": False, + } + }, + ) + + details = out["create_subnet_details"] + assert isinstance(details, CreateSubnetDetails) + assert details._data["compartment_id"] == "ocid1.compartment.oc1..x" + assert details._data["vcn_id"] == "ocid1.vcn.oc1..x" + assert details._data["cidr_block"] == "10.0.1.0/24" + assert details._data["display_name"] == "edge-subnet" + assert details._data["dns_label"] == "edge1" + assert details._data["prohibit_public_ip_on_vnic"] is False + class TestDunderMainExecution: def test_running_module_as_main_calls_mcp_run(self, monkeypatch): @@ -597,7 +1506,7 @@ def get_tuple(self): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_tuple", "params": {}, }, @@ -625,6 +1534,19 @@ class P: assert isinstance(out, str) assert "P" in out + def test_serialize_model_when_to_dict_returns_original_model(self, monkeypatch): + from oracle.oci_cloud_mcp_server.server import oci as _oci + + class M: + def __init__(self): + self.swagger_types = {"a": "str"} + self.attribute_map = {"a": "a"} + self.a = "x" + + monkeypatch.setattr(_oci.util, "to_dict", lambda data: data, raising=False) + out = _serialize_oci_data(M()) + assert out == {"a": "x"} + class TestInvokeGenericException: @pytest.mark.asyncio @@ -650,7 +1572,7 @@ def get_crash(self): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_crash", "params": {}, }, @@ -661,6 +1583,63 @@ def get_crash(self): assert "boom" in res["error"] +class TestInvokeErrorClassification: + @pytest.mark.parametrize( + ("operation", "params", "error_message", "expected_category", "expected_hint"), + [ + ( + "launch_instance", + {"launch_instance_details": {"shape": "VM.Standard.A1.Flex"}}, + "Invalid typeId provided ", + "payload_shape", + "may not match the SDK shape", + ), + ( + "update_instance", + {"instance_id": "ocid1.instance.oc1..example"}, + "InvalidParameter: shape_config ocpus must be between 1 and 4", + "invalid_shape_config", + "configuration values may be invalid", + ), + ( + "terminate_instance", + {"instance_id": "ocid1.instance.oc1..example"}, + "IncorrectState: Instance is not in the correct state for this operation", + "instance_lifecycle_state", + "may not be in a valid state", + ), + ], + ) + def test_invoke_error_gets_expected_hint( + self, + monkeypatch, + operation, + params, + error_message, + expected_category, + expected_hint, + ): + class FakeClient: + def __getattr__(self, name): + if name != operation: + raise AttributeError(name) + + def _op(**kwargs): # noqa: ARG001 + raise Exception(error_message) + + return _op + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._import_client", + lambda client_fqn: FakeClient(), + ) + + res = invoke_oci_api.fn("oci.core.ComputeClient", operation, params) + + assert res["error_category"] == expected_category + assert expected_hint in res["error_hint"] + + class TestListPaginatorHeadersMissing: def test_list_headers_object_without_get(self, monkeypatch): # ensure list_* branch handles headers object without 'get' @@ -728,7 +1707,7 @@ def get_plain(self): res = ( await client.call_tool( "invoke_oci_api", - {"client_fqn": "x.y.FakeClient", "operation": "get_plain"}, + {"client_fqn": "oci.fake.FakeClient", "operation": "get_plain"}, ) ).data @@ -785,7 +1764,7 @@ def get(self): res = ( await client.call_tool( "invoke_oci_api", - {"client_fqn": "x.y.FakeClient", "operation": "get"}, + {"client_fqn": "oci.fake.FakeClient", "operation": "get"}, ) ).data @@ -884,4 +1863,108 @@ def get_zone_records(**kwargs): # noqa: ARG001 """ return None - assert _supports_pagination(get_zone_records, "get_zone_records") is True + assert _supports_pagination(get_zone_records, "list_zone_records") is True + + def test_supports_pagination_docstring_only_false_for_create(self): + def create_vcn(**kwargs): # noqa: ARG001 + """ + Create VCN. + + :param str page: mentioned in docs but not applicable + :param int limit: mentioned in docs but not applicable + """ + return None + + assert _supports_pagination(create_vcn, "create_vcn") is False + + def test_supports_pagination_docstring_only_false_for_unlisted_mutation_verbs(self): + def move_compartment(**kwargs): # noqa: ARG001 + """ + Move compartment. + + :param str page: mentioned in docs but not applicable + :param int limit: mentioned in docs but not applicable + """ + return None + + assert _supports_pagination(move_compartment, "move_compartment") is False + + +class TestPaginationFallback: + def test_paginator_failure_falls_back_to_direct_call(self, monkeypatch): + class FakeResponse: + def __init__(self): + self.data = {"ok": True} + self.headers = {"opc-request-id": "fallback-1"} + + def list_stuff(**kwargs): # noqa: ARG001 + return FakeResponse() + + from oracle.oci_cloud_mcp_server.server import oci as _oci + + def boom(*args, **kwargs): # noqa: ARG001 + raise AttributeError("object has no attribute items") + + monkeypatch.setattr( + _oci.pagination, "list_call_get_all_results", boom, raising=False + ) + + data, opc = _call_with_pagination_if_applicable(list_stuff, {}, "list_stuff") + assert data == {"ok": True} + assert opc == "fallback-1" + + def test_paginator_failure_falls_back_for_create_operation(self, monkeypatch): + class FakeResponse: + def __init__(self): + self.data = {"id": "ocid1.vcn.oc1..example"} + self.headers = {"opc-request-id": "fallback-create-1"} + + def create_vcn(**kwargs): # noqa: ARG001 + return FakeResponse() + + from oracle.oci_cloud_mcp_server.server import oci as _oci + + def boom(*args, **kwargs): # noqa: ARG001 + raise AttributeError("'Vcn' object has no attribute 'items'") + + monkeypatch.setattr( + _oci.pagination, "list_call_get_all_results", boom, raising=False + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._supports_pagination", + lambda method, op: True, # noqa: ARG005 + ) + + data, opc = _call_with_pagination_if_applicable(create_vcn, {}, "create_vcn") + assert data["id"] == "ocid1.vcn.oc1..example" + assert opc == "fallback-create-1" + + def test_create_operation_never_uses_paginator_heuristics(self, monkeypatch): + calls = {"paginator": 0} + + class FakeResponse: + def __init__(self): + self.data = {"id": "ocid1.vcn.oc1..example"} + self.headers = {"opc-request-id": "non-paged-create-1"} + + def create_vcn(**kwargs): # noqa: ARG001 + return FakeResponse() + + from oracle.oci_cloud_mcp_server.server import oci as _oci + + def paginator(*args, **kwargs): # noqa: ARG001 + calls["paginator"] += 1 + raise AssertionError("paginator should not be used for create operations") + + monkeypatch.setattr( + _oci.pagination, "list_call_get_all_results", paginator, raising=False + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._extract_expected_kwargs_from_source", + lambda method: {"page", "limit"}, # noqa: ARG005 + ) + + data, opc = _call_with_pagination_if_applicable(create_vcn, {}, "create_vcn") + assert data["id"] == "ocid1.vcn.oc1..example" + assert opc == "non-paged-create-1" + assert calls["paginator"] == 0 diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_model_coercion.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_model_coercion.py index 19084015..5cfa081f 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_model_coercion.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_model_coercion.py @@ -44,9 +44,9 @@ def create_vcn(self, create_vcn_details): def import_side_effect(name): # return appropriate fake module depending on requested import - if name == "x.y": + if name == "oci.fake": return fake_client_module - if name == "x.y.models": + if name == "oci.fake.models": return fake_models_module raise ImportError(name) @@ -62,7 +62,7 @@ def import_side_effect(name): # note: We intentionally pass "vcn_details" (missing the "create_") # the server should rename it to "create_vcn_details" and coerce dict to model. payload = { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "create_vcn", "params": { "vcn_details": { @@ -79,7 +79,7 @@ async def run(): result = asyncio.run(run()) print("TOOL RESULT:", result) - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "create_vcn" # original params echoed back assert "vcn_details" in result["params"] diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_models_and_serialization.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_models_and_serialization.py index 58ebc62c..92e1f714 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_models_and_serialization.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_models_and_serialization.py @@ -39,6 +39,24 @@ def test_failure_returns_none(self, monkeypatch): mod = _import_models_module_from_client_fqn("x.y.ClientClass") assert mod is None + def test_oci_client_fqn_resolves_service_models_module(self, monkeypatch): + calls = [] + + def fake_import(name): + calls.append(name) + if name == "oci.core.models": + return SimpleNamespace(ok=True) + raise ImportError("nope") + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", fake_import + ) + mod = _import_models_module_from_client_fqn( + "oci.core.compute_client.ComputeClient" + ) + assert mod is not None + assert calls[0] == "oci.core.models" + class TestResolveModelClass: def test_not_found_returns_none(self): diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_pagination_and_invocation.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_pagination_and_invocation.py index f83eafdf..5ed7a948 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_pagination_and_invocation.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_pagination_and_invocation.py @@ -54,7 +54,7 @@ def __init__(self, config, signer): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "does_not_exist", "params": {}, }, @@ -84,7 +84,7 @@ def __init__(self, config, signer): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_thing", "params": {}, }, @@ -222,7 +222,7 @@ def create_vcn(self, foo): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "create_vcn", "params": {"vcn_details": {"x": 1}}, }, diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_server_extras.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_server_extras.py index 27cc006b..ed52b43d 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_server_extras.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_server_extras.py @@ -12,13 +12,25 @@ from fastmcp.exceptions import ToolError from oracle.oci_cloud_mcp_server.server import ( _ADDITIONAL_UA, + _apply_discriminator_defaults, _align_params_to_signature, + _classify_invoke_error, + _coerce_params_for_invoke, _coerce_params_to_oci_models, + _construct_model_from_mapping, + _construct_model_with_class, + _discover_oci_clients, _get_config_and_signer, _import_client, _import_models_module_from_client_fqn, + _resolve_discriminated_model_class, + _resolve_model_class_from_swagger_type, _serialize_oci_data, + _validate_invoke_client_fqn, + get_client_operation_details, + invoke_oci_api, list_client_operations, + list_oci_clients, main, mcp, ) @@ -118,6 +130,74 @@ def create_vcn(create_vcn_details): # noqa: ARG001 assert "vcn_details" not in aligned +class TestInvokeValidationHelpers: + def test_validate_invoke_client_fqn_rejects_non_oci_namespace(self): + with pytest.raises(ValueError, match="under the 'oci\\.' namespace"): + _validate_invoke_client_fqn("x.y.FakeClient") + + def test_validate_invoke_client_fqn_rejects_non_client_suffix(self): + with pytest.raises(ValueError, match="ending in 'Client'"): + _validate_invoke_client_fqn("oci.core.Compute") + + def test_classify_invoke_error_adds_payload_shape_hint_for_stateful_op(self): + out = _classify_invoke_error( + "create_vcn", TypeError("missing 1 required positional argument: 'create_vcn_details'") + ) + assert out["error_category"] == "payload_shape" + assert "payload" in out["error_hint"].lower() + + def test_classify_invoke_error_does_not_add_hint_for_read_only_op(self): + out = _classify_invoke_error("get_vcn", RuntimeError("boom")) + assert out == {"error": "boom"} + + +class TestSwaggerTypeHelpers: + def test_resolve_model_class_from_swagger_type_handles_list_and_dict(self): + class TcpOptions: + pass + + fake_models = SimpleNamespace(TcpOptions=TcpOptions) + assert _resolve_model_class_from_swagger_type("list[TcpOptions]", fake_models) is TcpOptions + assert _resolve_model_class_from_swagger_type("dict(str, TcpOptions)", fake_models) is TcpOptions + + def test_resolve_model_class_from_swagger_type_returns_none_for_primitive(self): + fake_models = SimpleNamespace() + assert _resolve_model_class_from_swagger_type("str", fake_models) is None + + def test_apply_discriminator_defaults_for_search_details(self): + assert _apply_discriminator_defaults("SearchDetails", {"query": "x"})["type"] == "Structured" + assert _apply_discriminator_defaults("SearchDetails", {"text": "x"})["type"] == "FreeText" + + def test_resolve_discriminated_model_class_handles_search_details_query(self): + class StructuredSearchDetails: + pass + + fake_models = SimpleNamespace(StructuredSearchDetails=StructuredSearchDetails) + cls = _resolve_discriminated_model_class("SearchDetails", {"query": "foo"}, fake_models) + assert cls is StructuredSearchDetails + + def test_resolve_discriminated_model_class_returns_none_for_blank_source_type(self): + fake_models = SimpleNamespace() + cls = _resolve_discriminated_model_class( + "InstanceSourceDetails", {"source_type": "!!!"}, fake_models + ) + assert cls is None + + +class TestCoerceParamsForInvoke: + def test_re_raises_typeerror_when_not_method_kwarg_issue(self, monkeypatch): + def boom(*args, **kwargs): + raise TypeError("different type error") + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._coerce_params_to_oci_models", + boom, + ) + + with pytest.raises(TypeError, match="different type error"): + _coerce_params_for_invoke("oci.fake.FakeClient", "get_thing", {}, lambda: None) + + class TestSerializeFallback: @pytest.mark.asyncio async def test_invoke_oci_api_serializes_unjsonable_objects_to_str(self): @@ -150,7 +230,7 @@ def get_weird(self, id): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_weird", "params": {"id": "abc"}, }, @@ -168,6 +248,33 @@ class X: ... assert isinstance(s, str) assert "X" in s + def test_serialize_swagger_types_skips_missing_attribute(self, monkeypatch): + from oracle.oci_cloud_mcp_server.server import oci as _oci + + monkeypatch.setattr(_oci.util, "to_dict", lambda obj: obj, raising=False) + + class Model: + swagger_types = {"present": "str", "missing": "str"} + + def __init__(self): + self.present = "ok" + + assert _serialize_oci_data(Model()) == {"present": "ok"} + + +class TestInvokeClientFqnValidation: + def test_invoke_rejects_non_oci_client_fqn_before_import(self): + with patch("oracle.oci_cloud_mcp_server.server._import_client") as m_import: + res = invoke_oci_api.fn("x.y.FakeClient", "do_thing", {}) + + assert "error" in res + assert "only allows OCI SDK client classes" in res["error"] + m_import.assert_not_called() + + def test_validate_helper_rejects_non_oci_client_fqn(self): + with pytest.raises(ValueError, match="only allows OCI SDK client classes"): + _validate_invoke_client_fqn("x.y.FakeClient") + class TestListClientOperationsErrors: @pytest.mark.asyncio @@ -316,14 +423,14 @@ def __init__(self, config, signer): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "does_not_exist", "params": {}, }, ) ).data - assert res["client"] == "x.y.FakeClient" + assert res["client"] == "oci.fake.FakeClient" assert res["operation"] == "does_not_exist" assert "error" in res assert "not found" in res["error"].lower() @@ -350,14 +457,14 @@ def __init__(self, config, signer): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_thing", "params": {}, }, ) ).data - assert res["client"] == "x.y.FakeClient" + assert res["client"] == "oci.fake.FakeClient" assert res["operation"] == "get_thing" assert "error" in res assert "not callable" in res["error"].lower() @@ -577,7 +684,7 @@ def get_plain(self, id): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_plain", "params": {"id": "1"}, }, @@ -603,7 +710,7 @@ async def test_invoke_oci_api_import_error_surfaces_as_error_payload(self): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.Nope", + "client_fqn": "oci.nope.NopeClient", "operation": "get_thing", "params": {}, }, @@ -704,6 +811,52 @@ def sig_raises(_): assert "operations" in res2 +class TestListClientOperationsRegexFilter: + @pytest.mark.asyncio + async def test_name_regex_filters_results(self, monkeypatch): + class Klass: + def list_things(self): + """List things.""" + + def get_thing(self): + """Get thing.""" + + fake_module = SimpleNamespace(Klass=Klass) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", lambda name: fake_module + ) + + async with Client(mcp) as client: + res = ( + await client.call_tool( + "list_client_operations", + {"client_fqn": "x.y.Klass", "name_regex": "^list_"}, + ) + ).data + + assert res["name_regex"] == "^list_" + names = [op["name"] for op in res["operations"]] + assert names == ["list_things"] + + @pytest.mark.asyncio + async def test_invalid_name_regex_raises_tool_error(self, monkeypatch): + class Klass: + def list_things(self): + return 1 + + fake_module = SimpleNamespace(Klass=Klass) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", lambda name: fake_module + ) + + async with Client(mcp) as client: + with pytest.raises(ToolError): + await client.call_tool( + "list_client_operations", + {"client_fqn": "x.y.Klass", "name_regex": "["}, + ) + + class TestConstructModelFallback: def test_construct_model_returns_mapping_when_no_candidates(self): from oracle.oci_cloud_mcp_server.server import _construct_model_from_mapping @@ -759,7 +912,7 @@ def get_thing(self, id): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_thing", "params": {"id": "1"}, }, @@ -837,67 +990,200 @@ def foo(self, a, b=1): # noqa: ARG002 assert "params" in entry and "(" in entry["params"] and ")" in entry["params"] -class TestFromDictSuccess: - def test_construct_model_from_class_name_from_dict_success(self, monkeypatch): - # ensure oci.util.from_dict success path is exercised for __model (simple class name) - class MyModel: - def __init__(self, **kwargs): - self._data = dict(kwargs) +class TestGetClientOperationDetails: + @pytest.mark.asyncio + async def test_returns_expected_kwargs_when_present(self, monkeypatch): + class Klass: + def list_things(self, compartment_id): # noqa: ARG002 + """List things.""" - fake_models = SimpleNamespace(MyModel=MyModel) + fake_module = SimpleNamespace(Klass=Klass) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", lambda name: fake_module + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._extract_expected_kwargs_from_source", + lambda member: {"page", "limit", "opc_request_id"}, + ) - # inject a from_dict into oci.util that calls the constructor - from oracle.oci_cloud_mcp_server.server import oci as _oci + async with Client(mcp) as client: + res = ( + await client.call_tool( + "get_client_operation_details", + {"client_fqn": "x.y.Klass", "operation": "list_things"}, + ) + ).data - monkeypatch.setattr(_oci.util, "from_dict", lambda cls, data: cls(**data), raising=False) + assert res["client_fqn"] == "x.y.Klass" + assert res["operation"] == "list_things" + assert res["summary"] == "List things." + assert "(" in res["params"] and ")" in res["params"] + assert res["expected_kwargs"] == ["limit", "opc_request_id", "page"] - from oracle.oci_cloud_mcp_server.server import _construct_model_from_mapping + @pytest.mark.asyncio + async def test_returns_empty_expected_kwargs_when_source_unavailable(self, monkeypatch): + class Klass: + def get_thing(self, id): # noqa: ARG002 + """Get thing.""" - inst = _construct_model_from_mapping({"__model": "MyModel", "a": 1}, fake_models, []) - assert isinstance(inst, MyModel) - assert inst._data == {"a": 1} + fake_module = SimpleNamespace(Klass=Klass) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", lambda name: fake_module + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._extract_expected_kwargs_from_source", + lambda member: None, + ) - def test_construct_model_from_candidate_from_dict_success(self, monkeypatch): - # ensure success path for candidate classnames (derived from param name) - class VcnDetails: - def __init__(self, **kwargs): - self._data = dict(kwargs) + async with Client(mcp) as client: + res = ( + await client.call_tool( + "get_client_operation_details", + {"client_fqn": "x.y.Klass", "operation": "get_thing"}, + ) + ).data - fake_models = SimpleNamespace(VcnDetails=VcnDetails) + assert res["expected_kwargs"] == [] - from oracle.oci_cloud_mcp_server.server import oci as _oci + def test_direct_invalid_fqn_raises(self): + with pytest.raises(Exception): + get_client_operation_details("InvalidFqn", "list_things") - monkeypatch.setattr(_oci.util, "from_dict", lambda cls, data: cls(**data), raising=False) + def test_direct_not_callable_raises(self): + class Klass: + get_thing = 123 - from oracle.oci_cloud_mcp_server.server import _construct_model_from_mapping + with patch("oracle.oci_cloud_mcp_server.server.import_module") as m_import: + m_import.return_value = SimpleNamespace(Klass=Klass) + with pytest.raises(Exception, match="not callable"): + get_client_operation_details("x.y.Klass", "get_thing") - inst = _construct_model_from_mapping({"a": 2}, fake_models, ["VcnDetails"]) - assert isinstance(inst, VcnDetails) - assert inst._data == {"a": 2} + def test_direct_falls_back_when_doc_and_signature_introspection_fail(self, monkeypatch): + class Klass: + def get_thing(self, thing_id): # noqa: ARG002 + return None - def test_construct_model_from_fqn_from_dict_success(self, monkeypatch): - # ensure success path for __model_fqn using from_dict + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", + lambda name: SimpleNamespace(Klass=Klass), + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.inspect.getdoc", + lambda member: (_ for _ in ()).throw(Exception("doc fail")), + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.inspect.signature", + lambda member: (_ for _ in ()).throw(Exception("sig fail")), + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._extract_expected_kwargs_from_source", + lambda member: None, + ) + + out = get_client_operation_details.fn("x.y.Klass", "get_thing") + assert out["summary"] == "" + assert out["params"] == "" + assert out["expected_kwargs"] == [] + + +class TestFromDictSuccess: + @pytest.mark.parametrize( + ("payload", "models_factory", "candidate_classnames", "expected_class_name", "expected_attr", "expected_value"), + [ + ( + {"__model": "MyModel", "a": 1}, + lambda: SimpleNamespace( + MyModel=type( + "MyModel", + (), + {"__init__": lambda self, **kwargs: setattr(self, "_data", dict(kwargs))}, + ) + ), + [], + "MyModel", + "_data", + {"a": 1}, + ), + ( + {"a": 2}, + lambda: SimpleNamespace( + VcnDetails=type( + "VcnDetails", + (), + {"__init__": lambda self, **kwargs: setattr(self, "_data", dict(kwargs))}, + ) + ), + ["VcnDetails"], + "VcnDetails", + "_data", + {"a": 2}, + ), + ( + {"__model_fqn": "mymod2.Pear", "a": 3}, + lambda: None, + [], + "Pear", + "kw", + {"a": 3}, + ), + ], + ) + def test_construct_model_from_dict_success_paths( + self, + monkeypatch, + payload, + models_factory, + candidate_classnames, + expected_class_name, + expected_attr, + expected_value, + ): + from oracle.oci_cloud_mcp_server.server import oci as _oci import sys as _sys from types import ModuleType + monkeypatch.setattr(_oci.util, "from_dict", lambda cls, data: cls(**data), raising=False) + mod = ModuleType("mymod2") + pear_cls = type( + "Pear", + (), + {"__init__": lambda self, **kwargs: setattr(self, "kw", dict(kwargs))}, + ) + setattr(mod, "Pear", pear_cls) + _sys.modules["mymod2"] = mod + + fake_models = models_factory() + inst = _construct_model_from_mapping(payload, fake_models, candidate_classnames) + assert type(inst).__name__ == expected_class_name + assert getattr(inst, expected_attr) == expected_value - class Pear: + def test_construct_model_with_class_falls_back_when_get_model_schema_raises(self, monkeypatch): + class WeirdDetails: def __init__(self, **kwargs): self.kw = dict(kwargs) - setattr(mod, "Pear", Pear) - _sys.modules["mymod2"] = mod + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._to_model_attribute_keys", + lambda payload, cls, models_module: dict(payload), + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._get_model_schema", + lambda cls: (_ for _ in ()).throw(RuntimeError("no schema")), + ) from oracle.oci_cloud_mcp_server.server import oci as _oci - monkeypatch.setattr(_oci.util, "from_dict", lambda cls, data: cls(**data), raising=False) - - from oracle.oci_cloud_mcp_server.server import _construct_model_from_mapping + monkeypatch.setattr( + _oci.util, + "from_dict", + lambda cls, data: cls(**data), + raising=False, + ) - inst = _construct_model_from_mapping({"__model_fqn": "mymod2.Pear", "a": 3}, None, []) - assert isinstance(inst, Pear) - assert inst.kw == {"a": 3} + inst = _construct_model_with_class({"a": 1}, WeirdDetails, SimpleNamespace()) + assert isinstance(inst, WeirdDetails) + assert inst.kw == {"a": 1} class TestPaginationListPath: @@ -964,7 +1250,7 @@ def list_things(self, compartment_id): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "list_things", "params": {"compartment_id": "ocid1.compartment..zzz"}, }, @@ -1117,7 +1403,7 @@ def create_vcn(self, create_vcn_details): # noqa: ARG001 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "create_vcn", "params": {"vcn_details": {"ignored": True}}, }, @@ -1147,6 +1433,125 @@ class Z: assert "Z" in s +class TestDiscoverOciClients: + def test_discovers_client_classes_and_sorts(self, monkeypatch): + # Fake pkgutil.walk_packages output (matching `_client` modules) + class ModInfo: + def __init__(self, name): + self.name = name + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.pkgutil.walk_packages", + lambda path, prefix=None: [ + ModInfo("oci.core.compute_client"), + ModInfo("oci.core.virtual_network_client"), + ModInfo("oci.identity.identity_client"), + ModInfo("oci.identity.models"), # not a _client module + ], + ) + + class ComputeClient: + __module__ = "oci.core" + + def __init__(self, config, **kwargs): + self.base_client = object() + + class VirtualNetworkClient: + __module__ = "oci.core" + + def __init__(self, config, **kwargs): + self.base_client = object() + + class IdentityClient: + __module__ = "oci.identity" + + def __init__(self, config, **kwargs): + self.base_client = object() + + class ImportedClient: + __module__ = "oci.other" + + def __init__(self, config, **kwargs): + self.not_base_client = object() + + def fake_import(name): + if name == "oci.core.compute_client": + return SimpleNamespace( + ComputeClient=ComputeClient, + ImportedClient=ImportedClient, + SomethingElse=object, + ) + if name == "oci.core.virtual_network_client": + return SimpleNamespace(VirtualNetworkClient=VirtualNetworkClient) + if name == "oci.identity.identity_client": + return SimpleNamespace(IdentityClient=IdentityClient) + raise ImportError("nope") + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", fake_import + ) + + out = _discover_oci_clients() + fqns = [c["client_fqn"] for c in out] + + # should include three real clients + assert "oci.core.ComputeClient" in fqns + assert "oci.core.VirtualNetworkClient" in fqns + assert "oci.identity.IdentityClient" in fqns + # should NOT include client-like classes without composed base_client support + assert "oci.core.ImportedClient" not in fqns + # should be sorted + assert fqns == sorted(fqns) + + def test_import_failures_are_ignored(self, monkeypatch): + class ModInfo: + def __init__(self, name): + self.name = name + + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.pkgutil.walk_packages", + lambda path, prefix=None: [ModInfo("oci.broken_client")], + ) + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server.import_module", + lambda name: (_ for _ in ()).throw(ImportError("boom")), + ) + + out = _discover_oci_clients() + assert out == [] + + +class TestListOciClientsTool: + @pytest.mark.asyncio + async def test_tool_returns_count_and_clients(self, monkeypatch): + monkeypatch.setattr( + "oracle.oci_cloud_mcp_server.server._discover_oci_clients", + lambda: [ + { + "client_fqn": "oci.core.ComputeClient", + "module": "oci.core", + "class": "ComputeClient", + }, + { + "client_fqn": "oci.identity.IdentityClient", + "module": "oci.identity", + "class": "IdentityClient", + }, + ], + ) + + async with Client(mcp) as client: + res = (await client.call_tool("list_oci_clients", {})).data + + assert res["count"] == 2 + assert isinstance(res["clients"], list) + assert res["clients"][0]["client_fqn"] == "oci.core.ComputeClient" + + # NOTE: `@mcp.tool` wraps functions into FastMCP `FunctionTool` objects, + # which are not directly callable. We validate tool behavior via the + # FastMCP client in async tests above. + + class TestInvokeUnexpectedKwOther: @pytest.mark.asyncio async def test_invoke_oci_api_unexpected_kw_non_matching_raises_error(self, monkeypatch): @@ -1172,7 +1577,7 @@ def get_thing(self, id): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_thing", # wrong kw 'uuid' should produce an error "params": {"uuid": "x"}, diff --git a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_tools.py b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_tools.py index 84e1d8c5..cc09f7a2 100644 --- a/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_tools.py +++ b/src/oci-cloud-mcp-server/oracle/oci_cloud_mcp_server/tests/test_tools.py @@ -42,7 +42,7 @@ def _hidden(self): result = ( await client.call_tool( "list_client_operations", - {"client_fqn": "x.y.FakeClient"}, + {"client_fqn": "oci.fake.FakeClient"}, ) ).data ops = result.get("operations", result) if isinstance(result, dict) else result or [] @@ -83,14 +83,14 @@ def get_thing(self, id): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_thing", "params": {"id": "abc"}, }, ) ).data - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "get_thing" assert result["params"] == {"id": "abc"} assert result["opc_request_id"] == "req-123" @@ -130,14 +130,14 @@ def list_things(self, compartment_id): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "list_things", "params": {"compartment_id": "ocid1.compartment..xyz"}, }, ) ).data - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "list_things" assert isinstance(result["data"], list) assert len(result["data"]) == 3 @@ -178,14 +178,14 @@ def get_zone_records(self, zone_name, page=None, limit=None): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_zone_records", "params": {"zone_name": "do-not-delete-me-testing-zone.example"}, }, ) ).data - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "get_zone_records" assert isinstance(result["data"], list) assert len(result["data"]) == 4 @@ -224,14 +224,14 @@ def summarize_metrics(self, compartment_id): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "summarize_metrics", "params": {"compartment_id": "ocid1.compartment..abc"}, }, ) ).data - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "summarize_metrics" assert isinstance(result["data"], list) assert len(result["data"]) == 2 @@ -270,7 +270,7 @@ def get_rr_set(self, zone_name_or_id, domain, rtype, page=None, limit=None): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_rr_set", "params": { "zone_name_or_id": "do-not-delete-me-testing-zone.example", @@ -281,7 +281,7 @@ def get_rr_set(self, zone_name_or_id, domain, rtype, page=None, limit=None): ) ).data - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "get_rr_set" assert isinstance(result["data"], list) assert len(result["data"]) == 3 @@ -319,7 +319,7 @@ def get_config(self, id): await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_config", "params": {"id": "abc"}, }, @@ -328,7 +328,7 @@ def get_config(self, id): # ensure paginator was not invoked mock_pager.assert_not_called() - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "get_config" assert result["data"] == {"ok": True, "id": "abc"} @@ -370,7 +370,7 @@ def get_widget(self, widget_id, **kwargs): # noqa: ARG002 await client.call_tool( "invoke_oci_api", { - "client_fqn": "x.y.FakeClient", + "client_fqn": "oci.fake.FakeClient", "operation": "get_widget", "params": {"widget_id": "w1"}, }, @@ -379,6 +379,6 @@ def get_widget(self, widget_id, **kwargs): # noqa: ARG002 # ensure paginator was not invoked for **kwargs-only non-DNS-like method mock_pager.assert_not_called() - assert result["client"] == "x.y.FakeClient" + assert result["client"] == "oci.fake.FakeClient" assert result["operation"] == "get_widget" assert result["data"] == {"id": "w1", "ok": True} diff --git a/src/oci-cloud-mcp-server/pyproject.toml b/src/oci-cloud-mcp-server/pyproject.toml index face7e1d..42c5bc34 100644 --- a/src/oci-cloud-mcp-server/pyproject.toml +++ b/src/oci-cloud-mcp-server/pyproject.toml @@ -11,7 +11,7 @@ authors = [ ] dependencies = [ "fastmcp==2.14.2", - "oci==2.160.0" + "oci>=2.160.0" ] classifiers = [ diff --git a/src/oci-cloud-mcp-server/uv.lock b/src/oci-cloud-mcp-server/uv.lock index d1b92ce8..d664129d 100644 --- a/src/oci-cloud-mcp-server/uv.lock +++ b/src/oci-cloud-mcp-server/uv.lock @@ -744,7 +744,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "fastmcp", specifier = "==2.14.2" }, - { name = "oci", specifier = "==2.160.0" }, + { name = "oci", specifier = ">=2.160.0" }, ] [package.metadata.requires-dev]