diff --git a/src/oci-ag-mcp-server/.env.example b/src/oci-ag-mcp-server/.env.example new file mode 100644 index 00000000..076ada74 --- /dev/null +++ b/src/oci-ag-mcp-server/.env.example @@ -0,0 +1,11 @@ +# ---------- MCP Authentication (OIDC) ---------- +OCI_CONFIG_URL= +OCI_MCP_CLIENT_ID= +OCI_MCP_CLIENT_SECRET= + +# ---------- Access Governance API (Client Credentials) ---------- +OCI_AG_CLIENT_ID= +OCI_AG_CLIENT_SECRET= +OCI_TOKEN_URL= +AG_BASE_URL= +AG_SCOPE= diff --git a/src/oci-ag-mcp-server/README.md b/src/oci-ag-mcp-server/README.md new file mode 100644 index 00000000..4f9adb98 --- /dev/null +++ b/src/oci-ag-mcp-server/README.md @@ -0,0 +1,90 @@ +# Oracle Access Governance MCP Server + +## Overview + +The Oracle Access Governance MCP Server exposes Access Governance (AG) capabilities as MCP tools, enabling secure interaction from MCP-compatible clients (e.g., Claude, custom clients etc.). + +It integrates with OCI IAM (Identity Domains) using OAuth 2.0 (OIDC) to authenticate MCP clients. All tool executions require a valid token issued by OCI IAM. + +### Flow +1. User authenticates via OCI IAM (OIDC) +2. MCP server receives an access/ID token +3. Token is validated +4. Authenticated requests are allowed to invoke MCP tools +5. Server uses client credentials flow to call OCI AG APIs + +--- + +## Setup + +### 1. Clone the repository + +``` +git clone https://github.com/anuj-git1412/oci-ag-mcp-server.git +cd oci-ag-mcp-server +``` + +### 2. Configure OAuth (OCI IAM) + +Setting up authentication requires registering a confidential client in OCI IAM domain. +The application must include the following redirect URI: + +``` +http://localhost:8000/mcp/auth/callback +``` + +Register a second client app for access to AG APIs. + +--- + +### 3. Create environment configuration + +``` +cp .env.example .env +``` + +Update `.env` with your values: + +``` +# ---------- MCP Authentication (OIDC) ---------- +OCI_CONFIG_URL= +OCI_MCP_CLIENT_ID= +OCI_MCP_CLIENT_SECRET= + +# ---------- Access Governance API (Client Credentials) ---------- +OCI_AG_CLIENT_ID= +OCI_AG_CLIENT_SECRET= +OCI_TOKEN_URL= +AG_BASE_URL= +AG_SCOPE= +``` + +## Running the Server + +``` +uvx oracle.oci-ag-mcp-server +``` + +--- + +## Available Tools + +| Tool Name | Description | +|------------------------------|------------------------------------------------------------------------------| +| `list_identities` | Retrieve all identities (users) from the AG environment. | +| `list_identity_collections` | Retrieve all identity collections (groups of users) from the AG environment. | +| `create_identity_collection` | Creates a new identity collection in the AG environment. | +| `list_access_bundles` | Retrieve all access bundles from the AG environment. | +| `list_orchestrated_systems` | Retrieve all orchestrated systems from the AG environment. | +| `list_access_requests` | Retrieve all access requests from the AG environment. | +| `create_access_request` | Creates a new access request in the AG environment. | +| `health_check` | Returns basic health status. | +--- + +## License + +Copyright (c) 2026 Oracle and/or its affiliates. + +Licensed under the Universal Permissive License v1.0: +https://oss.oracle.com/licenses/upl/ + diff --git a/src/oci-ag-mcp-server/examples/ag_mcp_test_client.py b/src/oci-ag-mcp-server/examples/ag_mcp_test_client.py new file mode 100644 index 00000000..1f4218d9 --- /dev/null +++ b/src/oci-ag-mcp-server/examples/ag_mcp_test_client.py @@ -0,0 +1,169 @@ +import asyncio +import json +from fastmcp import Client +from fastmcp.client.auth import OAuth + + +# ---------- AUTH ---------- + +class DebugOAuth(OAuth): + async def redirect_handler(self, authorization_url: str): + return await super().redirect_handler(authorization_url) + +oauth = DebugOAuth(scopes=["openid", "profile", "email", "groups", "approles", "get_approles"]) + + +# ---------- CONTEXT ---------- + +CONTEXT: dict = {} + + +def store_result(key, result): + if not result.structured_content: + return + + content = result.structured_content + + if isinstance(content, dict) and "result" in content: + CONTEXT[key] = content["result"] + else: + CONTEXT[key] = content + + +def get_first_id(key): + items = CONTEXT.get(key, []) + if not items: + raise Exception(f"No data found for {key}") + return items[0]["id"] + + +# ---------- TOOL EXECUTION ---------- + +async def run_tool(client, tool_name, payload=None): + payload = payload or {} + + result = await client.call_tool(tool_name, payload) + + if result.structured_content: + print(json.dumps(result.structured_content, indent=2)) + else: + print(result) + + return result + +# ---------- FLOWS ---------- + +async def health_check_flow(client): + await run_tool(client, "health_check") + + +async def list_identity_collections_flow(client): + res = await run_tool(client, "list_identity_collections") + store_result("collections", res) + + +async def list_identities_flow(client): + res = await run_tool(client, "list_identities") + store_result("identities", res) + + +async def list_access_bundles_flow(client): + await run_tool(client, "list_access_bundles") + + +async def list_orchestrated_systems_flow(client): + await run_tool(client, "list_orchestrated_systems") + + +async def list_access_requests_flow(client): + await run_tool(client, "list_access_requests") + + +async def create_identity_collection_flow(client): + display_name = input("Collection Display Name: ") + owner = input("Owner (email or name): ") + included_raw = input("Included identities (comma separated, optional): ") + + included = [x.strip() for x in included_raw.split(",") if x.strip()] + + await run_tool( + client, + "create_identity_collection", + { + "display_name": display_name, + "owner": owner, + "included_identities": included, + }, + ) + + +async def create_access_request_flow(client): + justification = input("Justification: ") + created_by = input("Requester (name/email): ") + + beneficiaries_raw = input("Beneficiaries (comma separated): ") + bundles_raw = input("Access Bundles (comma separated): ") + + beneficiaries = [x.strip() for x in beneficiaries_raw.split(",") if x.strip()] + bundles = [x.strip() for x in bundles_raw.split(",") if x.strip()] + + await run_tool( + client, + "create_access_request", + { + "justification": justification, + "created_by_user": created_by, + "beneficiaries": beneficiaries, + "access_bundles": bundles, + }, + ) + + +# ---------- MENU ---------- + +FLOWS = { + "1": ("Health Check", health_check_flow), + "2": ("List Identity Collections", list_identity_collections_flow), + "3": ("List Identities", list_identities_flow), + "4": ("Create Identity Collection", create_identity_collection_flow), + "5": ("List Access Bundles", list_access_bundles_flow), + "6": ("List Orchestrated Systems", list_orchestrated_systems_flow), + "7": ("List Access Requests", list_access_requests_flow), + "8": ("Create Access Request", create_access_request_flow), +} + + +# ---------- MAIN ---------- + +async def main(): + client = Client( + "http://localhost:8000/mcp", + auth=oauth, + ) + + async with client: + await client.list_tools() + + while True: + print("\n=== ACTION MENU ===") + for k, v in FLOWS.items(): + print(f"{k}. {v[0]}") + print("0. Exit") + + choice = input("Select action: ").strip() + + if choice == "0": + break + + if choice not in FLOWS: + continue + + try: + _, flow = FLOWS[choice] + await flow(client) + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/__init__.py b/src/oci-ag-mcp-server/oracle/__init__.py new file mode 100644 index 00000000..d08e3e1c --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/__init__.py @@ -0,0 +1,5 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/__init__.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/__init__.py new file mode 100644 index 00000000..d08e3e1c --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/__init__.py @@ -0,0 +1,5 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/ag_client.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/ag_client.py new file mode 100644 index 00000000..1c090339 --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/ag_client.py @@ -0,0 +1,156 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +import os +import time +import aiohttp + +from .consts import REQUEST_TIMEOUT, AG_API_VERSION + + +class AccessGovernanceClient: + def __init__(self): + self.base_url = os.getenv("AG_BASE_URL") + self.token_url = os.getenv("OCI_TOKEN_URL") + self.client_id = os.getenv("OCI_AG_CLIENT_ID") + self.client_secret = os.getenv("OCI_AG_CLIENT_SECRET") + self.scope = os.getenv("AG_SCOPE") + + self._token: str | None = None + self._token_expiry: float = 0 + + # ---------- INIT ---------- + + def _validate_env(self): + required = { + "AG_BASE_URL": self.base_url, + "OCI_TOKEN_URL": self.token_url, + "OCI_AG_CLIENT_ID": self.client_id, + "OCI_AG_CLIENT_SECRET": self.client_secret, + "AG_SCOPE": self.scope, + } + + missing = [k for k, v in required.items() if not v] + if missing: + raise ValueError( + f"Missing required environment variables: {', '.join(missing)}" + ) + + # ---------- SESSION ---------- + + def _session(self): + return aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT) + ) + + # ---------- TOKEN ---------- + + async def _get_token(self) -> str: + if self._token and time.time() < self._token_expiry - 30: + return self._token + + async with self._session() as session: + async with session.post( + self.token_url, + data={"grant_type": "client_credentials", "scope": self.scope}, + auth=aiohttp.BasicAuth(self.client_id, self.client_secret), + ) as resp: + resp.raise_for_status() + data = await resp.json() + + self._token = data["access_token"] + self._token_expiry = time.time() + data.get("expires_in", 3600) + + return self._token + + # ---------- REQUEST ---------- + + async def _request( + self, + method: str, + path: str, + json: dict | None = None, + ): + token = await self._get_token() + + async with self._session() as session: + async with session.request( + method, + f"{self.base_url}{path}", + headers={"Authorization": f"Bearer {token}"}, + json=json, + ) as resp: + resp.raise_for_status() + return await resp.json() + + # ---------- PATH ---------- + + def _path(self, service: str, resource: str) -> str: + return f"/access-governance/{service}/{AG_API_VERSION}/{resource}" + + # ---------- API METHODS ---------- + + async def list_identities(self): + return await self._request( + "GET", + self._path("identities", "identities"), + ) + + async def get_identity(self, identity_id: str): + return await self._request( + "GET", + self._path("identities", f"identities/{identity_id}"), + ) + + async def list_identity_collections(self): + return await self._request( + "GET", + self._path("access-controls", "identityCollections"), + ) + + async def create_identity_collection(self, payload: dict): + return await self._request( + "POST", + self._path("access-controls", "identityCollections"), + json=payload, + ) + + async def list_access_reviews(self): + return await self._request( + "GET", + self._path("access-reviews", "accessReviews/identity"), + ) + + async def get_access_review(self, review_id: str): + return await self._request( + "GET", + self._path("access-reviews", f"accessReviews/{review_id}"), + ) + + async def list_orchestrated_systems(self): + return await self._request( + "GET", + self._path("service-administration", "orchestratedSystems"), + ) + + async def list_access_requests(self): + return await self._request( + "GET", + self._path("access-controls", "accessRequests"), + ) + + async def list_access_bundles(self): + return await self._request( + "GET", + self._path("access-controls", "accessBundles"), + ) + + async def create_access_request(self, payload: dict): + return await self._request( + "POST", + self._path("access-controls", "accessRequests"), + json=payload, + ) \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/auth.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/auth.py new file mode 100644 index 00000000..569627d6 --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/auth.py @@ -0,0 +1,27 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +import os + +from fastmcp.server.auth.providers.oci import OCIProvider + +# ---------- AUTH PROVIDER ---------- + +class CustomOCIProvider(OCIProvider): + + def _prepare_scopes_for_token_exchange(self, scopes: list[str]) -> list[str]: + return [] + +def get_auth_provider(): + + return CustomOCIProvider( + config_url=os.getenv("OCI_CONFIG_URL"), + client_id=os.getenv("OCI_MCP_CLIENT_ID"), + client_secret=os.getenv("OCI_MCP_CLIENT_SECRET"), + base_url="http://localhost:8000", + redirect_path="/mcp/auth/callback", + required_scopes=["openid", "get_approles", "approles"], + ) \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/consts.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/consts.py new file mode 100644 index 00000000..84d8ff9f --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/consts.py @@ -0,0 +1,8 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +AG_API_VERSION = "20250331" +REQUEST_TIMEOUT = 30 \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/models.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/models.py new file mode 100644 index 00000000..526b82c7 --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/models.py @@ -0,0 +1,137 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from pydantic import BaseModel, Field +from typing import Optional + + +# ----------- MODELS ----------- + +class Identity(BaseModel): + id: str = Field(..., description="Unique identifier of the identity") + name: Optional[str] = Field( + None, + description="Display name of the identity", + ) + + +class IdentityCollection(BaseModel): + id: str = Field( + ..., + description="Unique identifier of the identity collection", + ) + name: str = Field( + ..., + description="Name of the identity collection", + ) + + +class AccessBundle(BaseModel): + id: str = Field( + ..., + description="Unique identifier of the access bundle", + ) + name: str = Field( + ..., + description="Name of the access bundle", + ) + + +class OrchestratedSystem(BaseModel): + id: str = Field( + ..., + description="Unique identifier of the orchestrated system", + ) + name: str = Field( + ..., + description="Name of the orchestrated system", + ) + type: Optional[str] = Field( + None, + description="Type of the orchestrated system", + ) + + +class AccessRequest(BaseModel): + id: str = Field( + ..., + description="Unique identifier of the access request", + ) + justification: Optional[str] = Field( + None, + description="Justification provided for the access request", + ) + requestStatus: Optional[str] = Field( + None, + description="Current status of the access request", + ) + timeCreated: Optional[str] = Field( + None, + description="Timestamp when the access request was created", + ) + timeUpdated: Optional[str] = Field( + None, + description="Timestamp when the access request was last updated", + ) + + +# ----------- MAPPERS ----------- + +def map_identity(raw: dict) -> Identity: + return Identity( + id=raw.get("id"), + name=raw.get("name"), + ) + + +def map_identity_collection(data: dict) -> IdentityCollection: + name = ( + data.get("displayName") + or data.get("name") + or "Unknown" + ) + + return IdentityCollection( + id=data.get("id"), + name=name, + ) + + +def map_access_bundle(raw: dict) -> AccessBundle: + name = ( + raw.get("displayName") + or raw.get("name") + or "Unknown" + ) + + return AccessBundle( + id=raw.get("id"), + name=name, + ) + + +def map_orchestrated_system(raw: dict) -> OrchestratedSystem: + name = ( + raw.get("displayName") + or raw.get("name") + or "Unknown" + ) + + return OrchestratedSystem( + id=raw.get("id"), + name=name, + type=raw.get("type"), + ) + + +def map_access_request(raw: dict) -> AccessRequest: + return AccessRequest( + id=raw.get("id"), + justification=raw.get("justification"), + requestStatus=raw.get("requestStatus"), + timeCreated=raw.get("timeCreated"), + timeUpdated=raw.get("timeUpdated"), + ) \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/server.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/server.py new file mode 100644 index 00000000..e3bda76b --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/server.py @@ -0,0 +1,233 @@ +""" +Copyright (c) 2026, Oracle and/or its affiliates. +Licensed under the Universal Permissive License v1.0 as shown at +https://oss.oracle.com/licenses/upl. +""" + +from dotenv import load_dotenv +from fastmcp import FastMCP +from pydantic import Field + +from .auth import get_auth_provider +from .ag_client import AccessGovernanceClient +from .models import ( + map_identity, + map_identity_collection, + map_access_bundle, + map_orchestrated_system, + map_access_request, +) + +# ---------- ENV ---------- + +load_dotenv() + +# ---------- MCP INIT ---------- + +mcp = FastMCP( + name="oci-ag-mcp-server", + auth=get_auth_provider() +) + +client = AccessGovernanceClient() + +# ---------- TOOLS ---------- + + +@mcp.tool( + name="health_check", + description="Check if the MCP server is running and reachable. Returns basic health status." +) +async def health_check() -> dict: + return {"status": "Healthy"} + + +@mcp.tool( + name="list_identities", + description="Retrieve all identities (users) available in Access Governance." +) +async def list_identities() -> list[dict]: + data = await client.list_identities() + return [map_identity(d).model_dump() for d in data.get("items", [])] + + +@mcp.tool( + name="list_identity_collections", + description="Retrieve all identity collections (groups of users)." +) +async def list_identity_collections() -> list[dict]: + data = await client.list_identity_collections() + return [map_identity_collection(d).model_dump() for d in data.get("items", [])] + + +@mcp.tool( + name="create_identity_collection", + description="Create a new identity collection (group of users)." +) +async def create_identity_collection( + display_name: str = Field( + ..., + description="Display name of the identity collection" + ), + owner: str = Field( + ..., + description="Owner (user name or ID)" + ), + included_identities: list[str] | None = Field( + None, + description="Optional list of identities to include" + ), +) -> dict: + + included_identities = included_identities or [] + owner_identity = await _resolve_identity(owner) + + included = [ + { + "id": resolved["id"], + "name": resolved["name"], + } + for x in included_identities + for resolved in [await _resolve_identity(x)] + ] + + payload = { + "name": _generate_name(display_name), + "displayName": display_name, + "description": _generate_description(display_name), + "includedIdentities": included, + "excludedIdentities": [], + "owners": [ + { + "id": owner_identity["id"], + "name": owner_identity["name"], + "isPrimary": True, + } + ], + "tags": _generate_tags(display_name), + "isManagedAtOrchestratedSystem": False, + } + + return await client.create_identity_collection(payload) + + +@mcp.tool( + name="list_access_bundles", + description="Retrieve all access bundles." +) +async def list_access_bundles() -> list[dict]: + data = await client.list_access_bundles() + return [map_access_bundle(d).model_dump() for d in data.get("items", [])] + + +@mcp.tool( + name="list_orchestrated_systems", + description="Retrieve all orchestrated systems." +) +async def list_orchestrated_systems() -> list[dict]: + data = await client.list_orchestrated_systems() + return [map_orchestrated_system(d).model_dump() for d in data.get("items", [])] + + +@mcp.tool( + name="list_access_requests", + description="Retrieve all access requests." +) +async def list_access_requests() -> list[dict]: + data = await client.list_access_requests() + return [map_access_request(d).model_dump() for d in data.get("items", [])] + + +@mcp.tool( + name="create_access_request", + description="Create a new access request." +) +async def create_access_request( + justification: str = Field(..., description="Reason for requesting access"), + beneficiaries: list[str] = Field(..., description="Users receiving access"), + access_bundles: list[str] = Field(..., description="Access bundles to assign"), + created_by_user: str = Field(..., description="Requester"), +) -> dict: + created_by = await _resolve_identity(created_by_user) + + identities = [(await _resolve_identity(b))["id"] for b in beneficiaries] + bundles = [(await _resolve_access_bundle(b))["id"] for b in access_bundles] + + payload = { + "justification": justification, + "createdBy": created_by["id"], + "accessBundles": bundles, + "identities": identities, + } + + return await client.create_access_request(payload) + + +# ---------- HELPERS ---------- + +IDENTITY_CACHE = None +ACCESS_BUNDLE_CACHE = None + + +async def _get_identities(): + global IDENTITY_CACHE + if IDENTITY_CACHE is None: + data = await client.list_identities() + IDENTITY_CACHE = [map_identity(d) for d in data.get("items", [])] + return IDENTITY_CACHE + + +async def _resolve_identity(identifier: str) -> dict: + identities = await _get_identities() + identifier = identifier.lower() + + for i in identities: + if identifier in {(i.id or "").lower(), (i.name or "").lower()}: + return {"id": i.id, "name": i.name} + + raise ValueError(f"Identity not found: {identifier}") + + +async def _get_access_bundles(): + global ACCESS_BUNDLE_CACHE + if ACCESS_BUNDLE_CACHE is None: + data = await client.list_access_bundles() + ACCESS_BUNDLE_CACHE = [map_access_bundle(d) for d in data.get("items", [])] + return ACCESS_BUNDLE_CACHE + + +async def _resolve_access_bundle(name: str) -> dict: + bundles = await _get_access_bundles() + name = name.lower() + + for b in bundles: + if name in {(b.id or "").lower(), (b.name or "").lower()}: + return {"id": b.id, "name": b.name} + + raise ValueError(f"Access bundle not found: {name}") + + +def _generate_name(display_name: str) -> str: + return display_name.lower().replace(" ", "_") + + +def _generate_description(display_name: str) -> str: + return f"Identity collection for {display_name}" + + +def _generate_tags(display_name: str) -> list[str]: + return [_generate_name(display_name)] + + +# ---------- RUN ---------- + +def main(): + mcp.run( + transport="http", + host="localhost", + port=8000, + ) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_models.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_models.py new file mode 100644 index 00000000..d44bd142 --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_models.py @@ -0,0 +1,87 @@ +import pytest +from pydantic import ValidationError + +from oracle.oci_ag_mcp_server.models import ( + map_identity, + map_identity_collection, + map_access_bundle, + map_orchestrated_system, + map_access_request, + Identity, + IdentityCollection, + AccessBundle, + OrchestratedSystem, + AccessRequest +) + + +class TestAGModels: + + # ---------- Identity ---------- + + def test_map_identity(self): + data = {"id": "id1", "name": "User1"} + + result = map_identity(data) + + assert isinstance(result, Identity) + assert result.id == "id1" + assert result.name == "User1" + + + # ---------- IdentityCollection ---------- + + def test_map_identity_collection(self): + data = {"id": "col1", "name": "Collection1"} + + result = map_identity_collection(data) + + assert isinstance(result, IdentityCollection) + assert result.id == "col1" + assert result.name == "Collection1" + + + # ---------- Access Bundle ---------- + + def test_map_access_bundle(self): + data = {"id": "b1", "displayName": "Bundle1"} + + result = map_access_bundle(data) + assert isinstance(result, AccessBundle) + assert result.id == "b1" + assert result.name == "Bundle1" + + + # ---------- Orchestrated System ---------- + + def test_map_orchestrated_system(self): + data = {"id": "sys1", "displayName": "System1", "type": "APP"} + + result = map_orchestrated_system(data) + + assert isinstance(result, OrchestratedSystem) + assert result.id == "sys1" + assert result.name == "System1" + assert result.type == "APP" + + + # ---------- Access Request ---------- + + def test_map_access_request(self): + data = { + "id": "req1", + "justification": "testing", + "requestStatus": "PENDING", + "timeCreated": "2025-03-20", + "timeUpdated": "2025-03-20" + } + + result = map_access_request(data) + + assert isinstance(result, AccessRequest) + + assert result.id == "req1" + assert result.justification == "testing" + assert result.requestStatus == "PENDING" + assert result.timeCreated == "2025-03-20" + assert result.timeUpdated == "2025-03-20" \ No newline at end of file diff --git a/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_tools.py b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_tools.py new file mode 100644 index 00000000..c58ba0c1 --- /dev/null +++ b/src/oci-ag-mcp-server/oracle/oci_ag_mcp_server/tests/test_ag_tools.py @@ -0,0 +1,145 @@ +import pytest +from unittest.mock import AsyncMock, patch +from fastmcp import Client +import oracle.oci_ag_mcp_server.server as server + +class TestAGTools: + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_health_check(self, mock_client): + async with Client(server.mcp) as client: + result = (await client.call_tool("health_check", {})).structured_content + assert result["status"] == "Healthy" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_list_identities(self, mock_client): + mock_client.list_identities = AsyncMock( + return_value={"items": [{"id": "id1", "name": "User1"}]} + ) + + async with Client(server.mcp) as client: + result = (await client.call_tool("list_identities", {})).structured_content + items = result["result"] + + assert len(items) == 1 + assert items[0]["id"] == "id1" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_list_identity_collections(self, mock_client): + mock_client.list_identity_collections = AsyncMock( + return_value={"items": [{"id": "col1", "name": "Collection1"}]} + ) + + async with Client(server.mcp) as client: + result = (await client.call_tool("list_identity_collections", {})).structured_content + items = result["result"] + + assert len(items) == 1 + assert items[0]["id"] == "col1" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server._resolve_identity") + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_create_identity_collection( + self, + mock_client, + mock_resolve_identity + ): + mock_resolve_identity.return_value = {"id": "id1", "name": "User1"} + + mock_client.create_identity_collection = AsyncMock( + return_value={"id": "col1", "name": "test"} + ) + + async with Client(server.mcp) as client: + result = ( + await client.call_tool( + "create_identity_collection", + { + "display_name": "Test Collection", + "owner": "User1", + "included_identities": ["User1"] + } + ) + ).structured_content + + assert result["id"] == "col1" + + # ---------------- NEW TOOLS ---------------- + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_list_access_bundles(self, mock_client): + mock_client.list_access_bundles = AsyncMock( + return_value={"items": [{"id": "b1", "displayName": "Bundle1"}]} + ) + + async with Client(server.mcp) as client: + result = (await client.call_tool("list_access_bundles", {})).structured_content + items = result["result"] + + assert len(items) == 1 + assert items[0]["id"] == "b1" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_list_orchestrated_systems(self, mock_client): + mock_client.list_orchestrated_systems = AsyncMock( + return_value={"items": [{"id": "sys1", "displayName": "System1"}]} + ) + + async with Client(server.mcp) as client: + result = (await client.call_tool("list_orchestrated_systems", {})).structured_content + items = result["result"] + + assert len(items) == 1 + assert items[0]["id"] == "sys1" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_list_access_requests(self, mock_client): + mock_client.list_access_requests = AsyncMock( + return_value={"items": [{"id": "req1", "status": "PENDING"}]} + ) + + async with Client(server.mcp) as client: + result = (await client.call_tool("list_access_requests", {})).structured_content + items = result["result"] + + assert len(items) == 1 + assert items[0]["id"] == "req1" + + @pytest.mark.asyncio + @patch("oracle.oci_ag_mcp_server.server._resolve_access_bundle") + @patch("oracle.oci_ag_mcp_server.server._resolve_identity") + @patch("oracle.oci_ag_mcp_server.server.client") + async def test_create_access_request( + self, + mock_client, + mock_resolve_identity, + mock_resolve_bundle, + ): + mock_resolve_identity.return_value = {"id": "user1", "name": "User1"} + mock_resolve_bundle.return_value = {"id": "bundle1", "name": "Bundle1"} + + mock_client.create_access_request = AsyncMock( + return_value={"id": "req1"} + ) + + async with Client(server.mcp) as client: + result = ( + await client.call_tool( + "create_access_request", + { + "justification": "test", + "created_by_user": "User1", + "beneficiaries": ["User1"], + "access_bundles": ["Bundle1"] + } + ) + ).structured_content + + assert result["id"] == "req1" \ No newline at end of file diff --git a/src/oci-ag-mcp-server/pyproject.toml b/src/oci-ag-mcp-server/pyproject.toml new file mode 100644 index 00000000..e6f7b42c --- /dev/null +++ b/src/oci-ag-mcp-server/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "oracle.oci-ag-mcp-server" +version = "0.1.0" +description = "Access Governance MCP server" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "aiohttp>=3.13.3", + "fastmcp>=3.1.1", + "pydantic>=2.12.5", + "python-dotenv>=1.2.2", +] + +[project.scripts] +"oracle.oci-ag-mcp-server" = "oracle.oci_ag_mcp_server.server:main" + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["."] + +[tool.pytest.ini_options] +pythonpath = ["."] \ No newline at end of file