diff --git a/ARIA_DABs/resources/workflows/case_linking_databricks_job.yml b/ARIA_DABs/resources/workflows/case_linking_databricks_job.yml index 20b7ef8a..c6d3cf06 100644 --- a/ARIA_DABs/resources/workflows/case_linking_databricks_job.yml +++ b/ARIA_DABs/resources/workflows/case_linking_databricks_job.yml @@ -18,5 +18,5 @@ resources: - task_key: DLT_creation_of_case_linking job_cluster_key: case_linking_databricks_job notebook_task: - notebook_path: "/Workspace/live/ACTIVE/MVP/Active_CCD_Publish_Case_Linking" + notebook_path: "/Workspace/live/ACTIVE/MVP/CaseLinking/Active_CCD_Publish_Case_Linking" source: WORKSPACE diff --git a/ARIA_DABs/resources/workflows/case_linking_results_databricks_job.yml b/ARIA_DABs/resources/workflows/case_linking_results_databricks_job.yml index e8c6bc71..06a219b6 100644 --- a/ARIA_DABs/resources/workflows/case_linking_results_databricks_job.yml +++ b/ARIA_DABs/resources/workflows/case_linking_results_databricks_job.yml @@ -12,7 +12,7 @@ resources: - task_key: active_ccd_results_case_linking job_cluster_key: ccd_case_linking_result_databricks_job notebook_task: - notebook_path: "/Workspace/live/ACTIVE/MVP/Active_CCD_Results_Case_Linking" + notebook_path: "/Workspace/live/ACTIVE/MVP/CaseLinking/Active_CCD_Results_Case_Linking" source: WORKSPACE queue: enabled: true diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py index 56a48720..5a52f8ca 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py @@ -138,7 +138,7 @@ def submit_case_event(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payl @retry_on_result( max_retries=2, base_delay=30, - max_delay=120, + max_delay=60, retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", ) def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overwrite=False): diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py index f6b7bcf5..715a7741 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py @@ -1,8 +1,6 @@ import asyncio import azure.functions as func import logging -import logging.handlers -import queue as _queue import json import os diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/host.json b/AzureFunctions/ACTIVE/active_caselink_ccd/host.json index 8b4aa73d..bf9b0dd5 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/host.json +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/host.json @@ -15,16 +15,5 @@ "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[4.*, 5.0.0)" - }, - // "scale": { - // "minInstances": 1, - // "maxInstances": 5, - // "maxBurst": 1 - // }, - "retry": { - "strategy": "exponentialBackoff", - "maxRetryCount": 3, - "minimumInterval": "00:00:02", - "maximumInterval": "00:00:10" } } diff --git a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py index 75261e41..d71c9f31 100644 --- a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py @@ -2,6 +2,10 @@ import logging import requests from datetime import datetime, timezone +try: + from .retry_decorator import retry_on_result +except ImportError: + from retry_decorator import retry_on_result # tokenManager lives in the same package. When this module is imported by the # Functions host the package root will be `AzureFunctions.ACTIVE.active_ccd`. @@ -124,6 +128,12 @@ def submit_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, u # caseNo = event.key, payloadData = event.value +@retry_on_result( + max_retries=2, + base_delay=30, + max_delay=60, + retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", +) def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): print(f"Starting processing case for {caseNo}") diff --git a/AzureFunctions/ACTIVE/active_ccd/host.json b/AzureFunctions/ACTIVE/active_ccd/host.json index 65077e68..8dd458ac 100644 --- a/AzureFunctions/ACTIVE/active_ccd/host.json +++ b/AzureFunctions/ACTIVE/active_ccd/host.json @@ -12,11 +12,5 @@ "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[4.*, 5.0.0)" - }, - "retry": { - "strategy": "exponentialBackoff", - "maxRetryCount": 3, - "minimumInterval": "00:00:02", - "maximumInterval": "00:00:10" } } diff --git a/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py b/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py new file mode 100644 index 00000000..40dd06bd --- /dev/null +++ b/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py @@ -0,0 +1,70 @@ +import time +import random +from functools import wraps + + +def retry_on_result( + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 60.0, + jitter: bool = True, + retry_on=None, +): + """ + Retry decorator with exponential backoff for sync functions intended + for use with asyncio.to_thread. Retries only when retry_on + predicate returns True β€” exceptions are not caught. + + Args: + max_retries: Number of retry attempts after the first failure. + base_delay: Initial delay in seconds (doubles each attempt). + max_delay: Upper cap on delay in seconds. + jitter: Randomise delay to 50-100% of computed value. + retry_on: Optional callable(result) -> bool. If provided, a + return value for which this returns True is treated as + a retryable failure (the result is returned as-is + after all attempts are exhausted). + + Usage: + @retry_on_result(max_retries=3, retry_on=lambda r: r.get("Status") == "ERROR") + def my_blocking_call(): + ... + + result = await asyncio.to_thread(my_blocking_call) + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + last_result = None + + for attempt in range(max_retries + 1): + result = func(*args, **kwargs) + + if retry_on is not None and retry_on(result): + last_result = result + + if attempt == max_retries: + print( + f"{func.__name__} returned a retryable result after " + f"{max_retries + 1} attempts. Returning last result." + ) + return last_result + + delay = min(base_delay * (2 ** attempt), max_delay) + if jitter: + delay *= 0.5 + random.random() * 0.5 + + print( + f"{func.__name__} returned a retryable result " + f"(attempt {attempt + 1}/{max_retries + 1}). " + f"Retrying in {delay:.2f}s..." + ) + time.sleep(delay) + continue + + return result + + return last_result + + return wrapper + return decorator diff --git a/AzureFunctions/ACTIVE/active_cdam/.funcignore b/AzureFunctions/ACTIVE/active_cdam/.funcignore new file mode 100644 index 00000000..9966315f --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/.funcignore @@ -0,0 +1,8 @@ +.git* +.vscode +__azurite_db*__.json +__blobstorage__ +__queuestorage__ +local.settings.json +test +.venv \ No newline at end of file diff --git a/AzureFunctions/ACTIVE/active_cdam/.gitignore b/AzureFunctions/ACTIVE/active_cdam/.gitignore new file mode 100644 index 00000000..7685fc4a --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/.gitignore @@ -0,0 +1,135 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don’t work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Azure Functions artifacts +bin +obj +appsettings.json +local.settings.json + +# Azurite artifacts +__blobstorage__ +__queuestorage__ +__azurite_db*__.json +.python_packages \ No newline at end of file diff --git a/AzureFunctions/ACTIVE/active_cdam/__init__.py b/AzureFunctions/ACTIVE/active_cdam/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py new file mode 100644 index 00000000..12948170 --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py @@ -0,0 +1,180 @@ +import requests +from azure.storage.blob import BlobServiceClient +from azure.identity import DefaultAzureCredential +from datetime import datetime, timezone +from urllib.parse import urlparse +try: + from .retry_decorator import retry_on_result +except ImportError: + from retry_decorator import retry_on_result + +# tokenManager lives in the same package. When this module is imported by the +# Functions host the package root will be `AzureFunctions.ACTIVE.active_cdam`. +# Use a robust import that works both when running under the Functions host +# (package import) and when running the module directly (script import). +try: + # package import when running under Functions host + from .cdam_tokenManager import IDAMTokenManager, S2S_Manager +except Exception: + # fallback when running as a script in the same folder + from cdam_tokenManager import IDAMTokenManager, S2S_Manager + +# Instantiate only one IDAMTokenManager instance per ccdFunctions import. +idam_token_mgr = IDAMTokenManager(env="sbox") + + +def upload_document(cdam_base_url, jid, ctid, cid, file_name, doc_binary, content_type, idam_token, s2s_token): + upload_document_endpoint = "/cases/documents" + upload_document_url = f"{cdam_base_url}{upload_document_endpoint}" + + headers = { + "Authorization": f"Bearer {idam_token}", # IDAM user JWT + "ServiceAuthorization": f"{s2s_token}", # service-to-service JWT + "Accept": "application/json" + } + + try: + body = { + "classification": "PUBLIC", + "caseTypeId": ctid, + "jurisdictionId": jid + } + + files = [ + ("files", (file_name, doc_binary, content_type)) + ] + + print(f"πŸ”’ Uploading document for CaseNo {cid}: upload_document_url = {upload_document_url}, headers = {headers}, body = {body}\n") + + response = requests.post(upload_document_url, headers=headers, data=body, files=files) + + print(f"πŸ”’ Upload document response status for {cid}: {response.status_code}:{response.text}\n") + return response + + except Exception as e: + print(f"❌ Network error while calling {upload_document_url}: {e}") + return None + + +@retry_on_result( + max_retries=2, + base_delay=30, + max_delay=60, + retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", +) +def process_event(env, caseNo, runId, file_name, file_url, file_content_type): + print(f"Starting document upload for {caseNo} for {file_name} using file path {file_url} with content type {file_content_type}") + + startDateTime = datetime.now(timezone.utc).isoformat() + + try: + idam_token, _ = idam_token_mgr.get_token() + except Exception as e: + result = { + "RunID": runId, + "CaseNo": caseNo, + "StartDateTime": startDateTime, + "EndDateTime": datetime.now(timezone.utc).isoformat(), + "Status": "ERROR", + "Error": f"failed to gather s2s token: {e}", + "CDAMResponse": "" + } + return result + + try: + s2s_manager = S2S_Manager("sbox", 21) + s2s_token = s2s_manager.get_token() + except Exception as e: + result = { + "RunID": runId, + "CaseNo": caseNo, + "StartDateTime": startDateTime, + "EndDateTime": datetime.now(timezone.utc).isoformat(), + "Status": "ERROR", + "Error": f"failed to gather IDAM token: {e}", + "CDAMResponse": "" + } + return result + + jid = "IA" + ctid = "Asylum" + + urls = { + "sbox": "http://ccd-case-document-am-api-aat.service.core-compute-aat.internal", # Default preview instances connect to AAT DM API. + "stg": "http://ccd-case-document-am-api-aat.service.core-compute-aat.internal", + "prod": None, + } + + try: + cdam_base_url = urls[env] + print(f"URL for {urls}") + + except KeyError: + raise ValueError("Invalid environment") + + print(f"Getting binary for document at path: {file_url}") + try: + parsed_path = urlparse(file_url) + account_url = f"{parsed_path.scheme}://{parsed_path.netloc}" + + blob_path = parsed_path.path.lstrip("/") + container, _, blob = blob_path.partition("/") + + blob_service_client = BlobServiceClient(account_url, credential=DefaultAzureCredential()) + print("Successfully authenticated with storage account {account_url}.") + file_binary_in_bytes = blob_service_client.get_blob_client(container=container, blob=blob).download_blob().readall() + except Exception as e: + print(f"Error in downloading document binary: {e}") + result = { + "RunID": runId, + "CaseNo": caseNo, + "StartDateTime": startDateTime, + "EndDateTime": datetime.now(timezone.utc).isoformat(), + "Status": "ERROR", + "Error": f"Failed to read given blob: {e}", + "CDAMResponse": "" + } + + return result + print(f"Downloaded binary for document at path: {file_url}") + + # submit case + print("Starting CDAM upload") + upload_document_response = upload_document(cdam_base_url, jid, ctid, caseNo, file_name, file_binary_in_bytes, file_content_type, idam_token, s2s_token) + print(f"CDAM upload response = {upload_document_response}") + + if upload_document_response is None or upload_document_response.status_code not in {201, 200}: + if upload_document_response is not None: + status_code = upload_document_response.status_code + text = upload_document_response.text + else: + status_code = "N/A" + text = "No response from API" + + print(f"Document upload failed: {status_code} - {text}") + + result = { + "RunID": runId, + "CaseNo": caseNo, + "StartDateTime": startDateTime, + "EndDateTime": datetime.now(timezone.utc).isoformat(), + "Status": "ERROR", + "Error": f"Document upload failed: {status_code} - {text}", + "CDAMResponse": "" + } + + return result + + else: + result = { + "RunID": runId, + "CaseNo": caseNo, + "StartDateTime": startDateTime, + "EndDateTime": datetime.now(timezone.utc).isoformat(), + "Status": "SUCCESS", + "Error": None, + "CDAMResponse": upload_document_response.json() + } + + print(f"βœ… Case {caseNo} document uploaded successfully with document link: {upload_document_response.json()['links']['self']['href']}") + return result diff --git a/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py b/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py new file mode 100644 index 00000000..542f637f --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py @@ -0,0 +1,219 @@ +import pyotp +import requests +import threading +from azure.identity import DefaultAzureCredential +from azure.keyvault.secrets import SecretClient +from datetime import datetime, timezone, timedelta + +class IDAMTokenManager: + def __init__(self, env: str, skew: int = 28792): + self.env = env + self._token = None + self._expiration_time = None + self._uid = None + self.skew = timedelta(seconds=int(skew)) + self._lock = threading.RLock() + + # ----- Environment config (host + key vault) ----- + + urls = { + "sbox": { + "idam_host": "https://idam-web-public.aat.platform.hmcts.net", + "key_vault": "ia-aat" + }, + "stg": { + "idam_host": "https://idam-api.aat.platform.hmcts.net", + "key_vault": "ia-aat" + }, + "prod": { + "idam_host": "", + "key_vault": "" + } + } + + self.idam_host = urls[self.env]["idam_host"] + key_vault = urls[self.env]["key_vault"] + + # ----- Secrets (fetched once per manager instance) ----- + # Databricks utility to retrieve secrets from Key Vault. + # need to change this to use kv client + credentials = DefaultAzureCredential() + kv_client = SecretClient(vault_url=f"https://{key_vault}.vault.azure.net/", credential=credentials) + self.client_id = kv_client.get_secret("idam-client-id").value + self.client_secret = kv_client.get_secret("idam-secret").value + self.username = kv_client.get_secret("system-username").value + self.password = kv_client.get_secret("system-password").value + # OAuth2 token endpoint (password grant). + self.token_url = f"{self.idam_host}/o/token" + self.uid_url = f"{self.idam_host}/o/userinfo" + # Requested scopes. + self.scope = "openid profile roles" + + def _fetch_token(self): + data = { + "grant_type": "password", + "client_id": self.client_id, + "client_secret": self.client_secret, + "username": self.username, + "password": self.password, + "scope": self.scope, + } + + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + idam_response = requests.post(self.token_url, headers=headers, data=data) + + if idam_response.status_code != 200: + raise RuntimeError(f"Token request failed: {idam_response.status_code} {idam_response.text}") + + payload = idam_response.json() + + idam_token = payload.get("access_token") + expires_in = payload.get("expires_in") + if not idam_token or not expires_in: + raise RuntimeError(f"Invalid token response: {payload}") + + now = datetime.now(timezone.utc) + + expiration_time = now + timedelta(seconds=int(expires_in)) + + uid = self._fetch_uid(idam_token) + + return idam_token, expiration_time, uid + + def _needs_refresh(self): + + if not self._token or not self._expiration_time: + return True + + return datetime.now(timezone.utc) >= (self._expiration_time - self.skew) + + def get_token(self): + if not self._needs_refresh(): + return self._token, self._uid + # use locking to only allow one block to refresh; + # other blocks will have to wait + with self._lock: + if self._needs_refresh(): + self._token, self._expiration_time, self._uid = self._fetch_token() + return self._token, self._uid + + def _fetch_uid(self, idam_token): + uid_headers = {"Authorization": f"Bearer {idam_token}"} + # make the get request to get the uid + try: + idam_response = requests.get(self.uid_url, headers=uid_headers) + except Exception as e: + print(f"UID request failed: {e}") + # safely convert to json + try: + payload = idam_response.json() + except ValueError: + raise RuntimeError(f"UID endpoint did not return valid JSON: {idam_response.text}") + # get the uid from the json + uid = payload.get("uid") + if not uid: + raise RuntimeError(f"UID missing in response: {payload}") + return uid + + def invalidate(self): + with self._lock: + self._token = None + self._expiration_time = None + self._uid = None + + +class S2S_Manager(): + def __init__(self, env: str, skew: int = 21000): + self.env = env + self._s2s_token = None + self.expire_time = None + self._lock = threading.RLock() + self._skew = skew + + # ----- Environment config (host + key vault) ----- + s2s_base = "http://rpe-service-auth-provider-aat.service.core-compute-aat.internal" + urls = { + "sbox": { + "s2s_host": s2s_base, + "s2s_ip": "http://10.10.143.250/", + "key_vault": "ia-aat"}, + "stg": { + "s2s_host": s2s_base, + "s2s_ip": None, + "key_vault": "ia-aat"}, + "prod": { + "s2s_host": None, + "s2s_ip": None, + "key_vault": None} + } + + self.s2s_host = urls[self.env]["s2s_host"] + self.s2s_ip = urls[self.env]["s2s_ip"] + key_vault = urls[self.env]["key_vault"] + + # ----- Secrets (fetched once per manager instance) ----- + # Databricks utility to retrieve secrets from Key Vault. + # need to change this to use kv client + + credentials = DefaultAzureCredential() + kv_client = SecretClient(vault_url=f"https://{key_vault}.vault.azure.net/", credential=credentials) + + self._s2s_secret = kv_client.get_secret("s2s-secret").value + + self.url = f"{self.s2s_host}/lease" + self.s2s_microservice = "iac" + + def _fetch_s2s_token(self): + otp = pyotp.TOTP(self._s2s_secret).now() + # create payload + s2s_payload = { + "microservice": self.s2s_microservice, + "oneTimePassword": otp + } + # Attempt to make request to s2s endpoint + try: + now = datetime.now(timezone.utc) + s2s_response = requests.post( + self.url, + json=s2s_payload, + headers={ + # "Host": self.s2s_host, + "Content-Type": "application/json" + } + ) + except Exception as e: + raise EOFError(f"Error reuesting service to service token: {e}") + # Ensure you get a 200 response else raise an error + if s2s_response.status_code != 200: + raise RuntimeError(f"Error requesting service to service token: {s2s_response.text}") + + # Extract token from response + try: + payload = s2s_response.text.strip() + # s2s_token = payload.get("token") + except Exception as e: + raise RuntimeError(f"Error extracting service to service token: {e}") + + self.expire_time = now + timedelta(seconds=int(self._skew)) + self._s2s_token = payload + + return payload + + def get_token(self): + if self.expire_time and datetime.now(timezone.utc) < self.expire_time: + return self._s2s_token + with self._lock: + self._s2s_token = self._fetch_s2s_token() + return self._s2s_token + + +if __name__ == "__main__": + idam_token_mgr = IDAMTokenManager(env="sbox") + token, uid = idam_token_mgr.get_token() + print(f"IDAM Token: {token}") + print(f"UID: {uid}") + + s2s_manager = S2S_Manager("sbox", 21) + s2s_token = s2s_manager.get_token() + print(f"S2S Token: {s2s_token}") diff --git a/AzureFunctions/ACTIVE/active_cdam/function_app.py b/AzureFunctions/ACTIVE/active_cdam/function_app.py new file mode 100644 index 00000000..dbe05ed4 --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/function_app.py @@ -0,0 +1,107 @@ +import asyncio +import azure.functions as func +import logging +import json +import os + +from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub import EventData +from azure.identity.aio import DefaultAzureCredential +from azure.keyvault.secrets.aio import SecretClient +from typing import List + +try: + from .cdamFunctions import process_event +except Exception: + # Fallback for running the script directly during local debugging. + from cdamFunctions import process_event + +logging.getLogger().setLevel(logging.INFO) +logger = logging.getLogger(__name__) + +ENV = os.environ["ENVIRONMENT"] or "sbox" +LZ_KEY = os.environ["LZ_KEY"] +ARIA_NAME = "active" + +eventhub_name = f"evh-active-cdam-pub-{ENV}-{LZ_KEY}-uks-dlrm-01" +eventhub_connection = "sboxdlrmeventhubns_RootManageSharedAccessKey_EVENTHUB" + +app = func.FunctionApp() + + +@app.function_name("eventhub_trigger") +@app.event_hub_message_trigger( + arg_name="azeventhub", + event_hub_name=eventhub_name, + consumer_group='$Default', + connection=eventhub_connection, + starting_position="@latest", + cardinality='many', + max_batch_size=1, + data_type='binary' +) +async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): + logger.info(f"Processing a batch of {len(azeventhub)} events") + + # Retrieve credentials + credential = DefaultAzureCredential() + logger.info("Connected to Azure Credentials") + + kv_url = f"https://ingest{LZ_KEY}-meta002-{ENV}.vault.azure.net" + kv_client = SecretClient(vault_url=kv_url, credential=credential) + logger.info(f"Connected to KeyVault: {kv_url}") + + results_eh_name = f"evh-active-cdam-res-{ENV}-{LZ_KEY}-uks-dlrm-01" + results_eh_key = await kv_client.get_secret(f"{results_eh_name}-key") + result_eh_secret_key = results_eh_key.value + logger.info("Acquired KV secret for Results Event Hub") + + res_eh_producer = EventHubProducerClient.from_connection_string(conn_str=result_eh_secret_key) + + async with res_eh_producer: + event_data_batch = await res_eh_producer.create_batch() + try: + for event in azeventhub: + try: + logger.info(f"Event received with partition key: {event.partition_key}") + + # Parse the payload + caseNo = event.partition_key + payload_str = event.get_body().decode('utf-8') + payload = json.loads(payload_str) + run_id = payload.get("RunID", None) + file_name = payload.get("FileName") + file_url = payload.get("FileURL") + file_content_type = payload.get("FileContentType", "text/html") # Default to HTML content type. + + result = await asyncio.to_thread(process_event, ENV, caseNo, run_id, file_name, file_url, file_content_type) + + # Mark processed if success + if result.get("Status") == "SUCCESS": + logger.info(f"CDAM upload document for: {caseNo} is successful.") + + result_json = json.dumps(result) + + try: + event_data_batch.add(EventData(result_json)) + except ValueError: + # If the batch is full, send it and create a new one + await res_eh_producer.send_batch(event_data_batch) + logger.info("Sent a batch of events to Results Event Hub") + event_data_batch = await res_eh_producer.create_batch() + event_data_batch.add(EventData(result_json)) + + except Exception as e: + logger.error(f"Error processing event for caseNo {caseNo}: {e}") + + # Send any remaining events in the batch + if len(event_data_batch) > 0: + await res_eh_producer.send_batch(event_data_batch) + logger.info("Sent the final batch of events to Results Event Hub") + + except Exception as e: + logger.error(f"Error in event hub processing batch: {e}") + finally: + # Clean up all clients + await kv_client.close() + await credential.close() diff --git a/AzureFunctions/ACTIVE/active_cdam/host.json b/AzureFunctions/ACTIVE/active_cdam/host.json new file mode 100644 index 00000000..bf9b0dd5 --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/host.json @@ -0,0 +1,19 @@ +{ + "version": "2.0", + "logging": { + "logLevel": { + "default": "Information" + }, + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "maxTelemetryItemsPerSecond" : 1000, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/AzureFunctions/ACTIVE/active_cdam/requirements.txt b/AzureFunctions/ACTIVE/active_cdam/requirements.txt new file mode 100644 index 00000000..52a4cb69 --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/requirements.txt @@ -0,0 +1,37 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +aiohappyeyeballs==2.6.1 +aiohttp==3.11.18 +aiosignal==1.3.2 +attrs==25.3.0 +azure-core==1.34.0 +azure-eventhub==5.15.0 +azure-functions==1.23.0 +azure-identity==1.25.2 +azure-keyvault-secrets==4.8.0 +azure-storage-blob==12.20.0 +certifi==2025.4.26 +cffi==1.17.1 +charset-normalizer==3.4.2 +cryptography==44.0.3 +frozenlist==1.6.0 +idna==3.10 +isodate==0.7.2 +MarkupSafe==3.0.2 +msal==1.32.3 +msal-extensions==1.3.1 +multidict==6.4.4 +propcache==0.3.1 +pycparser==2.22 +PyJWT==2.10.1 +requests==2.32.3 +six==1.17.0 +tenacity==9.1.2 +typing_extensions==4.13.2 +urllib3==2.4.0 +Werkzeug==3.1.3 +yarl==1.20.0 +pyotp==2.8.0 \ No newline at end of file diff --git a/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py b/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py new file mode 100644 index 00000000..40dd06bd --- /dev/null +++ b/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py @@ -0,0 +1,70 @@ +import time +import random +from functools import wraps + + +def retry_on_result( + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 60.0, + jitter: bool = True, + retry_on=None, +): + """ + Retry decorator with exponential backoff for sync functions intended + for use with asyncio.to_thread. Retries only when retry_on + predicate returns True β€” exceptions are not caught. + + Args: + max_retries: Number of retry attempts after the first failure. + base_delay: Initial delay in seconds (doubles each attempt). + max_delay: Upper cap on delay in seconds. + jitter: Randomise delay to 50-100% of computed value. + retry_on: Optional callable(result) -> bool. If provided, a + return value for which this returns True is treated as + a retryable failure (the result is returned as-is + after all attempts are exhausted). + + Usage: + @retry_on_result(max_retries=3, retry_on=lambda r: r.get("Status") == "ERROR") + def my_blocking_call(): + ... + + result = await asyncio.to_thread(my_blocking_call) + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + last_result = None + + for attempt in range(max_retries + 1): + result = func(*args, **kwargs) + + if retry_on is not None and retry_on(result): + last_result = result + + if attempt == max_retries: + print( + f"{func.__name__} returned a retryable result after " + f"{max_retries + 1} attempts. Returning last result." + ) + return last_result + + delay = min(base_delay * (2 ** attempt), max_delay) + if jitter: + delay *= 0.5 + random.random() * 0.5 + + print( + f"{func.__name__} returned a retryable result " + f"(attempt {attempt + 1}/{max_retries + 1}). " + f"Retrying in {delay:.2f}s..." + ) + time.sleep(delay) + continue + + return result + + return last_result + + return wrapper + return decorator diff --git a/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Publish.ipynb b/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Publish.ipynb new file mode 100644 index 00000000..8f3b3705 --- /dev/null +++ b/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Publish.ipynb @@ -0,0 +1,593 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "446e282f-e690-438d-ba84-041d67ad0e39", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install confluent-kafka # required by job cluster until we deploy via DABs" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b6d3662f-e541-49ef-927f-1e7c6ae27334", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Import packages" + } + }, + "outputs": [], + "source": [ + "import logging\n", + "import json\n", + "import uuid\n", + "from pyspark.sql.functions import col, lit, to_json\n", + "from pyspark.sql.types import StructType, StructField, StringType\n", + "from confluent_kafka import KafkaException" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0772806f-f2ca-4fc6-bf4c-f8784d5dabc4", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Initialise logging" + } + }, + "outputs": [], + "source": [ + "logger = logging.getLogger(\"DatabricksWorkflow\")\n", + "logger.setLevel(logging.INFO)\n", + "handler = logging.StreamHandler()\n", + "formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n", + "handler.setFormatter(formatter)\n", + "if not logger.hasHandlers():\n", + " logger.addHandler(handler)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "84acefcc-8861-4465-b331-f740d160460e", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up configs" + } + }, + "outputs": [], + "source": [ + "# --- Load configuration JSON ---\n", + "config_path = \"dbfs:/configs/config.json\"\n", + "try:\n", + " config = spark.read.option(\"multiline\", \"true\").json(config_path)\n", + " logger.info(f\"Successfully read config file from {config_path}\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not read config file at {config_path}: {e}\", exc_info=True)\n", + " raise FileNotFoundError(f\"Could not read config file at {config_path}: {e}\")\n", + "\n", + "# --- Extract environment and lz_key ---\n", + "try:\n", + " first_row = config.first()\n", + " env = first_row[\"env\"].strip().lower()\n", + " lz_key = first_row[\"lz_key\"].strip().lower()\n", + " logger.info(f\"Extracted configs: env={env}, lz_key={lz_key}\")\n", + "except Exception as e:\n", + " logger.error(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\", exc_info=True)\n", + " raise KeyError(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\")\n", + "\n", + "# --- Construct keyvault name ---\n", + "try:\n", + " keyvault_name = f\"ingest{lz_key}-meta002-{env}\"\n", + " logger.info(f\"Constructed keyvault name: {keyvault_name}\")\n", + "except Exception as e:\n", + " logger.error(f\"Error constructing keyvault name: {e}\", exc_info=True)\n", + " raise ValueError(f\"Error constructing keyvault name: {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "688eadb3-029f-4141-8725-2fb843088687", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up OAuth" + } + }, + "outputs": [], + "source": [ + "\n", + "# --- Access the Service Principal secrets from Key Vault ---\n", + "try:\n", + " client_secret = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-SECRET')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-SECRET from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " tenant_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-TENANT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-TENANT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " client_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "logger.info(\"βœ… Successfully retrieved all Service Principal secrets from Key Vault\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ae0ba55d-393b-4eeb-b006-a2ea51811a19", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "# --- Parameterise containers ---\n", + "curated_storage_account = f\"ingest{lz_key}curated{env}\"\n", + "curated_container = \"gold\"\n", + "silver_curated_container = \"silver\"\n", + "\n", + "# --- Assign OAuth to storage accounts ---\n", + "storage_accounts = [curated_storage_account]\n", + "\n", + "for storage_account in storage_accounts:\n", + " try:\n", + " configs = {\n", + " f\"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net\": \"OAuth\",\n", + " f\"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net\":\n", + " \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n", + " f\"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net\": client_id,\n", + " f\"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net\": client_secret,\n", + " f\"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net\":\n", + " f\"https://login.microsoftonline.com/{tenant_id}/oauth2/token\"\n", + " }\n", + "\n", + " for key, val in configs.items():\n", + " try:\n", + " spark.conf.set(key, val)\n", + " except Exception as e:\n", + " logger.error(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\")\n", + "\n", + " logger.info(f\"βœ… Successfully configured OAuth for storage account: {storage_account}\")\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error configuring OAuth for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Error configuring OAuth for storage account '{storage_account}': {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e29f790a-cbd2-496f-ab33-a6945d7dd415", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Kafka configuration" + } + }, + "outputs": [], + "source": [ + "eh_kv_secret = dbutils.secrets.get(scope=keyvault_name, key=\"RootManageSharedAccessKey\")\n", + "\n", + "# Event Hub configurations\n", + "eventhubs_hostname = f\"ingest{lz_key}-integration-eventHubNamespace001-{env}.servicebus.windows.net:9093\"\n", + "conf = {\n", + " 'bootstrap.servers': eventhubs_hostname,\n", + " 'security.protocol': 'SASL_SSL',\n", + " 'sasl.mechanism': 'PLAIN',\n", + " 'sasl.username': '$ConnectionString',\n", + " 'sasl.password': eh_kv_secret,\n", + " 'retries': 5\n", + "}\n", + "broadcast_conf = sc.broadcast(conf)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "81149135-90dc-4e6a-9dd8-d2230813a0b4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "state = \"CASE_LINKING\"\n", + "\n", + "try:\n", + " spark.sql(f\"DELETE FROM delta.`abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_audit_db_eh` WHERE true\")\n", + " logger.info(f\"{state}: old data successfully deleted from publish_audit_db_eh\")\n", + "except Exception as e:\n", + " logger.warning(f\"{state}: Could not delete data from publish_audit_db_eh - {e}\")\n", + "\n", + "try:\n", + " spark.sql(f\"DELETE FROM delta.`abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/{state}/ack_audit` WHERE true\")\n", + " logger.info(f\"{state}: ack_audit data deleted\")\n", + "except Exception as e:\n", + " logger.warning(f\"{state}: no ack audit data exists to delete - {e}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b5cdac48-5ad9-4527-88ff-23ec95e13438", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Process_partition function" + } + }, + "outputs": [], + "source": [ + "# --- Partition processing ---\n", + "def process_partition(partition):\n", + " from confluent_kafka import Producer\n", + " from datetime import datetime, timezone\n", + "\n", + " success_list = []\n", + " failure_list = []\n", + "\n", + " producer = Producer(**broadcast_conf.value)\n", + "\n", + " for row in partition:\n", + " if row[\"CCDCaseReferenceNumber\"] is None:\n", + " logger.warning(f\"Skipping row with missing CCD Case Reference for Case Linking: {row}\")\n", + " continue\n", + "\n", + " currentRunID = row[\"RunID\"]\n", + " currentCcdReference = row[\"CCDCaseReferenceNumber\"]\n", + " currentCaseLinkPayload = {\n", + " \"caseLinks\": row.asDict(recursive=True)[\"CaseLinkPayload\"]\n", + " }\n", + " processStartDateTime = datetime.now(timezone.utc).strftime(\"%Y-%m-%d %H:%M:%S.%f\")\n", + "\n", + " # Closure for callback to capture row-specific variables\n", + " def make_delivery_report(run_id, ccd_reference, payload):\n", + " caseLinks = payload.get(\"caseLinks\", [])\n", + " def delivery_report(err, msg):\n", + " key_str = msg.key().decode('utf-8') if msg.key() else \"Unknown\"\n", + " timestamp = datetime.now(timezone.utc).strftime(\"%Y-%m-%d %H:%M:%S.%f\")\n", + " if err:\n", + " failure_list.append((run_id, ccd_reference, len(caseLinks), processStartDateTime, timestamp, \"ERROR\", str(err)))\n", + " logger.error(f\"Message delivery failed for case linking {ccd_reference}: {err}\")\n", + " else:\n", + " success_list.append((run_id, ccd_reference, len(caseLinks), processStartDateTime, timestamp, \"SUCCESS\", \"\"))\n", + " logger.info(f\"Message delivered successfully for case linking {ccd_reference}\")\n", + " return delivery_report\n", + "\n", + " delivery_callback = make_delivery_report(currentRunID, currentCcdReference, currentCaseLinkPayload)\n", + "\n", + " # Produce to Kafka \n", + " try:\n", + " if currentCcdReference:\n", + " value = json.dumps({\n", + " \"RunID\": currentRunID,\n", + " \"CCDCaseReferenceNumber\": currentCcdReference,\n", + " \"CaseLinkPayload\": currentCaseLinkPayload\n", + " }).encode('utf-8')\n", + " else:\n", + " caseLinks = currentCaseLinkPayload.get(\"caseLinks\", [])\n", + " failure_list.append((currentRunID, currentCcdReference, len(caseLinks), processStartDateTime, datetime.now(timezone.utc).strftime(\"%Y-%m-%d %H:%M:%S.%f\"), \"ERROR\", \"Missing CCDCaseReferenceNumber\"))\n", + " continue\n", + "\n", + " try: \n", + " print(\"Sending message to Kafka for case linking...\")\n", + " producer.produce(\n", + " topic=f'evh-active-caselink-pub-{env}-{lz_key}-uks-dlrm-01',\n", + " key=currentCcdReference,\n", + " value=value,\n", + " callback=delivery_callback\n", + " )\n", + " \n", + " except KafkaException as e:\n", + " logger.error(f\"Kafka producer failed (check connectivity!): {e}\")\n", + "\n", + " except BufferError:\n", + " logger.error(\"Producer buffer full.\")\n", + "\n", + " # Flush producer at the end of partition\n", + " try:\n", + " producer.flush()\n", + " except Exception as e:\n", + " logger.error(f\"Flush error: {e}\")\n", + "\n", + " # Merge results\n", + " results = success_list + failure_list\n", + " return results\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0b0f638a-1d76-42e9-8d06-23ec8c53d7a7", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Process Kafka Events" + } + }, + "outputs": [], + "source": [ + "# --- Define schema for result DataFrame ---\n", + "result_schema = StructType([\n", + " StructField(\"RunID\", StringType(), True),\n", + " StructField(\"CCDCaseReferenceNumber\", StringType(), True),\n", + " StructField(\"CaseLinkCount\", StringType(), True),\n", + " StructField(\"StartDateTime\", StringType(), True),\n", + " StructField(\"EndDateTime\", StringType(), True),\n", + " StructField(\"Status\", StringType(), True),\n", + " StructField(\"Error\", StringType(), True)\n", + "])\n", + "\n", + "logger.info(f\"πŸ”„ Processing state: {state}\")\n", + "\n", + "silver_base_path = f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_audit_db_eh\"\n", + "\n", + "try:\n", + " payload_df = spark.read.table(f\"hive_metastore.ariadm_active_appeals.case_link_payloads\")\n", + "\n", + " # Generate unique RunID per batch\n", + " try:\n", + " logger.info(\"Attempting to get Databricks context...\")\n", + "\n", + " # Get the context JSON (string)\n", + " context_str = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()\n", + " logger.debug(f\"Raw context JSON: {context_str}\")\n", + "\n", + " # Parse JSON into a dict\n", + " context = json.loads(context_str)\n", + " tags = context.get(\"tags\", {})\n", + "\n", + " # Pull jobRunId directly\n", + " run_id = tags.get(\"jobRunId\")\n", + " if run_id:\n", + " logger.info(f\"Using jobRunId from tags: {run_id}\")\n", + " else:\n", + " logger.warning(\"jobRunId not found in tags! Setting with UUID.\")\n", + " run_id = str(uuid.uuid4())\n", + " except Exception as e:\n", + " logger.error(f\"Exception retrieving Databricks jobRunId: {e}\")\n", + " run_id = None \n", + " \n", + " results_df = (\n", + " payload_df\n", + " .withColumn(\"RunID\", lit(run_id))\n", + " .withColumn(\"CCDCaseReferenceNumber\", col(\"CCDCaseReferenceNumberFrom\"))\n", + " .withColumn(\"CaseLinkPayload\", col(\"caseLinks\"))\n", + " .select('RunID', 'CCDCaseReferenceNumber', 'CaseLinkPayload')\n", + " .persist()\n", + " )\n", + "\n", + " record_count = results_df.count()\n", + " if record_count == 0:\n", + " print(f\"ℹ️ No data to process for state: {state}\")\n", + " else:\n", + " print(f\"πŸ“Š Found {record_count} records for state: {state}\")\n", + " optimized_df = results_df.repartition(1)\n", + "\n", + " # Map partitions and collect results \n", + " result_rdd = optimized_df.rdd.mapPartitions(process_partition).collect()\n", + " result_df = spark.createDataFrame(result_rdd, result_schema)\n", + "\n", + " # Write results incrementally to Delta \n", + " result_df.write.format(\"delta\") \\\n", + " .mode(\"append\") \\\n", + " .option(\"mergeSchema\", \"true\") \\\n", + " .save(silver_base_path)\n", + "\n", + " # Display results\n", + " display(result_df.select(\"RunID\", \"CCDCaseReferenceNumber\", \"CaseLinkCount\", \"StartDateTime\", \"EndDateTime\", \"Status\", \"Error\"))\n", + "\n", + " # Highlight failures\n", + " failed_df = result_df.filter(col(\"Status\") == \"ERROR\")\n", + " failed_count = failed_df.count()\n", + " if failed_count > 0:\n", + " logger.error(f\"⚠️ Found {failed_count} failed records for state: {state}\")\n", + " display(failed_df.select(\"RunID\", \"CCDCaseReferenceNumber\", \"CaseLinkCount\", \"StartDateTime\", \"EndDateTime\", \"Status\", \"Error\"))\n", + " else:\n", + " logger.info(f\"βœ… No failed records for state: {state}\")\n", + "\n", + " kafka_result_count = result_df.count()\n", + " logger.info(f\"πŸ“Š Kafka processing completed: {kafka_result_count} records for state: {state}\")\n", + " logger.info(f\"βœ… Successfully sent {record_count} records to Kafka for state: {state}\")\n", + "\n", + "except Exception as e:\n", + " logger.error(f\"❌ Error processing state {state}: {e}\")\n", + "\n", + "logger.info(f\"πŸŽ‰ Completed processing for state: {state}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "753f6003-b42f-4af3-a5a1-f870a8993e1a", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Copy data from individual state folder into grouped folder" + } + }, + "outputs": [], + "source": [ + "dbutils.fs.cp(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_audit_db_eh\",\n", + " f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_payload_audit\", True)\n", + "\n", + "publish_payload = spark.read.format(\"delta\").load(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_audit_db_eh\")\n", + "publish_payload.createOrReplaceTempView(\"publish_payload\")" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "implicitDf": true, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "767ff8ab-2c5d-4c8f-bdaf-d1dfd1e247e7", + "showTitle": false, + "tableResultSettingsMap": { + "0": { + "dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1763042629291}", + "filterBlob": null, + "queryPlanFiltersBlob": null, + "tableResultIndex": 0 + } + }, + "title": "" + } + }, + "outputs": [], + "source": [ + "%sql\n", + "SELECT * FROM publish_payload" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "5614f6fe-1de8-4af6-9359-c7d4e584fad4", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "dbutils.notebook.exit(f\"{state} notebook completed successfully\")\n", + "logger.info(f\"πŸŽ‰ Completed processing for state: {state}\")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": { + "base_environment": "", + "environment_version": "3" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 7780900700361124, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 4 + }, + "notebookName": "Active_CCD_Publish_Case_Linking", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Results.ipynb b/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Results.ipynb new file mode 100644 index 00000000..6d47d9e8 --- /dev/null +++ b/Databricks/ACTIVE/MVP/CDAM/Active_CDAM_Results.ipynb @@ -0,0 +1,369 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "39cd25ff-fdcd-4dbc-9268-72e247e475e3", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install confluent_kafka" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9bb3c5cf-b229-4aee-a829-67b51da04fa3", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Import packages" + } + }, + "outputs": [], + "source": [ + "from pyspark.sql.functions import col, from_json\n", + "from pyspark.sql.types import StructType, StructField, StringType\n", + "\n", + "import logging\n", + "import traceback" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0245e09f-90a2-4134-8fff-3a39d6fd5cc1", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Initialise logging" + } + }, + "outputs": [], + "source": [ + "logger = logging.getLogger(\"DatabricksWorkflow\")\n", + "logger.setLevel(logging.INFO)\n", + "handler = logging.StreamHandler()\n", + "formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n", + "handler.setFormatter(formatter)\n", + "if not logger.hasHandlers():\n", + " logger.addHandler(handler)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a633bbde-145e-4063-a90a-13aa7107d099", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up configs" + } + }, + "outputs": [], + "source": [ + "config_path = \"dbfs:/configs/config.json\"\n", + "try:\n", + " config = spark.read.option(\"multiline\", \"true\").json(config_path)\n", + " logger.info(f\"Successfully read config file from {config_path}\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not read config file at {config_path}: {e}\", exc_info=True)\n", + " raise FileNotFoundError(f\"Could not read config file at {config_path}: {e}\")\n", + "\n", + "try:\n", + " first_row = config.first()\n", + " env = first_row[\"env\"].strip().lower()\n", + " lz_key = first_row[\"lz_key\"].strip().lower()\n", + " logger.info(f\"Extracted configs: env={env}, lz_key={lz_key}\")\n", + "except Exception as e:\n", + " logger.error(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\", exc_info=True)\n", + " raise KeyError(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\")\n", + "\n", + "try:\n", + " keyvault_name = f\"ingest{lz_key}-meta002-{env}\"\n", + " logger.info(f\"Constructed keyvault name: {keyvault_name}\")\n", + "except Exception as e:\n", + " logger.error(f\"Error constructing keyvault name: {e}\", exc_info=True)\n", + " raise ValueError(f\"Error constructing keyvault name: {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "dadbe708-794d-4c07-8922-afdef6c25f50", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up OAuth" + } + }, + "outputs": [], + "source": [ + "\n", + "try:\n", + " client_secret = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-SECRET')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-SECRET from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " tenant_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-TENANT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-TENANT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " client_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "logger.info(\"βœ… Successfully retrieved all Service Principal secrets from Key Vault\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "002e771a-b124-4cee-b3e5-8b0f9abd3bd9", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Assign OAuth" + } + }, + "outputs": [], + "source": [ + "# --- Parameterise containers ---\n", + "curated_storage_account = f\"ingest{lz_key}curated{env}\"\n", + "curated_container = \"gold\"\n", + "silver_curated_container = \"silver\"\n", + "checkpoint_storage_account = f\"ingest{lz_key}xcutting{env}\"\n", + "checkpoint_container = \"db-ack-checkpoint\"\n", + "\n", + "# --- Assign OAuth to storage accounts ---\n", + "storage_accounts = [curated_storage_account, checkpoint_storage_account]\n", + "\n", + "for storage_account in storage_accounts:\n", + " try:\n", + " configs = {\n", + " f\"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net\": \"OAuth\",\n", + " f\"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net\":\n", + " \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n", + " f\"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net\": client_id,\n", + " f\"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net\": client_secret,\n", + " f\"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net\":\n", + " f\"https://login.microsoftonline.com/{tenant_id}/oauth2/token\"\n", + " }\n", + "\n", + " for key, val in configs.items():\n", + " try:\n", + " spark.conf.set(key, val)\n", + " except Exception as e:\n", + " logger.error(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\")\n", + "\n", + " logger.info(f\"βœ… Successfully configured OAuth for storage account: {storage_account}\")\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error configuring OAuth for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Error configuring OAuth for storage account '{storage_account}': {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "779690b2-69f5-4c95-bb38-6b0b091fd088", + "showTitle": true, + "tableResultSettingsMap": { + "0": { + "dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1762268157719}", + "filterBlob": "{\"version\":1,\"filterGroups\":[],\"syncTimestamp\":1762866819579}", + "queryPlanFiltersBlob": "[]", + "tableResultIndex": 0 + } + }, + "title": "Autoloader" + } + }, + "outputs": [], + "source": [ + "spark.conf.set(\"spark.databricks.delta.schema.autoMerge.enabled\", \"true\")\n", + "\n", + "state = \"CASE_LINKING\"\n", + "\n", + "# Kafka / paths setup as you have\n", + "data_path = f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/{state}/ack_audit\"\n", + "checkpoint_path = f\"abfss://{checkpoint_container}@{checkpoint_storage_account}.dfs.core.windows.net/APPEALS/{state}/ACK/\"\n", + "\n", + "# Keep schema exactly as it exists in Pub notebook\n", + "result_schema = StructType([\n", + " StructField(\"RunID\", StringType(), True),\n", + " StructField(\"CCDCaseReferenceNumber\", StringType(), True),\n", + " StructField(\"CaseLinkCount\", StringType(), True),\n", + " StructField(\"StartDateTime\", StringType(), True),\n", + " StructField(\"EndDateTime\", StringType(), True),\n", + " StructField(\"Status\", StringType(), True),\n", + " StructField(\"Error\", StringType(), True)\n", + "])\n", + "\n", + "EH_NAMESPACE = f\"ingest{lz_key}-integration-eventHubNamespace001-{env}\"\n", + "EH_NAME = f\"evh-active-caselink-res-{env}-{lz_key}-uks-dlrm-01\"\n", + "\n", + "connection_string = dbutils.secrets.get(keyvault_name, \"RootManageSharedAccessKey\")\n", + "\n", + "KAFKA_OPTIONS = {\n", + " \"kafka.bootstrap.servers\": f\"{EH_NAMESPACE}.servicebus.windows.net:9093\",\n", + " \"subscribe\": EH_NAME,\n", + " \"startingOffsets\": \"latest\",\n", + " \"kafka.security.protocol\": \"SASL_SSL\",\n", + " \"failOnDataLoss\": \"false\",\n", + " \"kafka.sasl.mechanism\": \"PLAIN\",\n", + " \"kafka.sasl.jaas.config\": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{connection_string}\";'\n", + "}\n", + "\n", + "# Start streaming\n", + "try: \n", + " eventhubdf = spark.readStream.format(\"kafka\").options(**KAFKA_OPTIONS).load()\n", + "\n", + " # display(eventhubdf)\n", + "\n", + " parsed_df = (\n", + " eventhubdf\n", + " .select(col(\"value\").cast(\"string\").alias(\"json_str\"))\n", + " .select(from_json(col(\"json_str\"), result_schema).alias(\"json_obj\"))\n", + " .select(\"json_obj.*\")\n", + " )\n", + "\n", + " display(parsed_df)\n", + "\n", + " query = parsed_df.writeStream \\\n", + " .format(\"delta\") \\\n", + " .option(\"checkpointLocation\", checkpoint_path) \\\n", + " .outputMode(\"append\") \\\n", + " .start(data_path)\n", + " \n", + " logger.info(\"Streaming query started successfully. Waiting for data...\")\n", + "\n", + "except Exception as e:\n", + " logger.error(\"Error starting the stream: %s\", str(e))\n", + " logger.error(traceback.format_exc())\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "df90eeee-710e-4a2f-aa0a-6c3b70becf3f", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Gracefully exit notebook" + } + }, + "outputs": [], + "source": [ + "dbutils.notebook.exit(f\"Notebook completed successfully\")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [ + { + "elements": [], + "globalVars": {}, + "guid": "", + "layoutOption": { + "grid": true, + "stack": true + }, + "nuid": "b96d44f6-1061-455e-960e-fc87e7965c4a", + "origId": 7780900700361136, + "title": "CCD Call Result Dashboard", + "version": "DashboardViewV1", + "width": 1440 + } + ], + "environmentMetadata": { + "base_environment": "", + "environment_version": "3" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 7448024178269919, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 4 + }, + "notebookName": "Active_CCD_Results_Case_Linking", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/Databricks/ACTIVE/MVP/Case_Linking_Tracking_Dashboard.ipynb b/Databricks/ACTIVE/MVP/CDAM/CDAM_Tracking_Dashboard.ipynb similarity index 100% rename from Databricks/ACTIVE/MVP/Case_Linking_Tracking_Dashboard.ipynb rename to Databricks/ACTIVE/MVP/CDAM/CDAM_Tracking_Dashboard.ipynb diff --git a/Databricks/ACTIVE/MVP/Active_CCD_Publish_Case_Linking.ipynb b/Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Publish_Case_Linking.ipynb similarity index 98% rename from Databricks/ACTIVE/MVP/Active_CCD_Publish_Case_Linking.ipynb rename to Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Publish_Case_Linking.ipynb index 89415ad8..8f3b3705 100644 --- a/Databricks/ACTIVE/MVP/Active_CCD_Publish_Case_Linking.ipynb +++ b/Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Publish_Case_Linking.ipynb @@ -273,7 +273,13 @@ " spark.sql(f\"DELETE FROM delta.`abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/{state}/publish_audit_db_eh` WHERE true\")\n", " logger.info(f\"{state}: old data successfully deleted from publish_audit_db_eh\")\n", "except Exception as e:\n", - " logger.warning(f\"{state}: Could not delete data from publish_audit_db_eh - {e}\")" + " logger.warning(f\"{state}: Could not delete data from publish_audit_db_eh - {e}\")\n", + "\n", + "try:\n", + " spark.sql(f\"DELETE FROM delta.`abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/{state}/ack_audit` WHERE true\")\n", + " logger.info(f\"{state}: ack_audit data deleted\")\n", + "except Exception as e:\n", + " logger.warning(f\"{state}: no ack audit data exists to delete - {e}\")" ] }, { diff --git a/Databricks/ACTIVE/MVP/Active_CCD_Results_Case_Linking.ipynb b/Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Results_Case_Linking.ipynb similarity index 98% rename from Databricks/ACTIVE/MVP/Active_CCD_Results_Case_Linking.ipynb rename to Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Results_Case_Linking.ipynb index 1c91c3fe..6d47d9e8 100644 --- a/Databricks/ACTIVE/MVP/Active_CCD_Results_Case_Linking.ipynb +++ b/Databricks/ACTIVE/MVP/CaseLinking/Active_CCD_Results_Case_Linking.ipynb @@ -240,9 +240,11 @@ "source": [ "spark.conf.set(\"spark.databricks.delta.schema.autoMerge.enabled\", \"true\")\n", "\n", + "state = \"CASE_LINKING\"\n", + "\n", "# Kafka / paths setup as you have\n", - "data_path = f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/CASE_LINKING/ack_audit\"\n", - "checkpoint_path = f\"abfss://{checkpoint_container}@{checkpoint_storage_account}.dfs.core.windows.net/APPEALS/CASE_LINKING/ACK/\"\n", + "data_path = f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/{state}/ack_audit\"\n", + "checkpoint_path = f\"abfss://{checkpoint_container}@{checkpoint_storage_account}.dfs.core.windows.net/APPEALS/{state}/ACK/\"\n", "\n", "# Keep schema exactly as it exists in Pub notebook\n", "result_schema = StructType([\n", diff --git a/Databricks/ACTIVE/MVP/CaseLinking/Case_Linking_Tracking_Dashboard.ipynb b/Databricks/ACTIVE/MVP/CaseLinking/Case_Linking_Tracking_Dashboard.ipynb new file mode 100644 index 00000000..677a7436 --- /dev/null +++ b/Databricks/ACTIVE/MVP/CaseLinking/Case_Linking_Tracking_Dashboard.ipynb @@ -0,0 +1,656 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "282a1963-cd5d-4076-a466-330fd4eed639", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install confluent_kafka" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f4609d39-8e6c-41d5-9abf-392a9ee56455", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Import Libraries" + } + }, + "outputs": [], + "source": [ + "from confluent_kafka import Producer\n", + "import json\n", + "from itertools import islice\n", + "import numpy as np\n", + "from pyspark.sql.functions import col, decode, split, element_at, udf, lit, reduce, from_json, regexp_replace, concat, when\n", + "import logging\n", + "from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DateType\n", + "import datetime\n", + "from pyspark.sql import SparkSession, DataFrame\n", + "from pyspark import SparkContext\n", + "import os\n", + "from functools import reduce\n", + "import time\n", + "import traceback" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "64cf17e7-1a72-4e5d-b50f-835f5f169017", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Initialise logging" + } + }, + "outputs": [], + "source": [ + "logger = logging.getLogger(\"DatabricksWorkflow\")\n", + "logger.setLevel(logging.INFO)\n", + "handler = logging.StreamHandler()\n", + "formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n", + "handler.setFormatter(formatter)\n", + "if not logger.hasHandlers():\n", + " logger.addHandler(handler)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3010f725-1597-452d-89a1-d3636edff2ad", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up cnfigs" + } + }, + "outputs": [], + "source": [ + "config_path = \"dbfs:/configs/config.json\"\n", + "try:\n", + " config = spark.read.option(\"multiline\", \"true\").json(config_path)\n", + " logger.info(f\"Successfully read config file from {config_path}\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not read config file at {config_path}: {e}\", exc_info=True)\n", + " raise FileNotFoundError(f\"Could not read config file at {config_path}: {e}\")\n", + "\n", + "try:\n", + " first_row = config.first()\n", + " env = first_row[\"env\"].strip().lower()\n", + " lz_key = first_row[\"lz_key\"].strip().lower()\n", + " logger.info(f\"Extracted configs: env={env}, lz_key={lz_key}\")\n", + "except Exception as e:\n", + " logger.error(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\", exc_info=True)\n", + " raise KeyError(f\"Missing expected keys 'env' or 'lz_key' in config file: {e}\")\n", + "\n", + "try:\n", + " keyvault_name = f\"ingest{lz_key}-meta002-{env}\"\n", + " logger.info(f\"Constructed keyvault name: {keyvault_name}\")\n", + "except Exception as e:\n", + " logger.error(f\"Error constructing keyvault name: {e}\", exc_info=True)\n", + " raise ValueError(f\"Error constructing keyvault name: {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "daf71926-d55f-4e3c-be2a-8ae9808b5390", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Set-up OAuth" + } + }, + "outputs": [], + "source": [ + "\n", + "try:\n", + " client_secret = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-SECRET')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-SECRET from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-SECRET' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " tenant_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-TENANT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-TENANT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-TENANT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "try:\n", + " client_id = dbutils.secrets.get(scope=keyvault_name, key='SERVICE-PRINCIPLE-CLIENT-ID')\n", + " logger.info(\"Successfully retrieved SERVICE-PRINCIPLE-CLIENT-ID from Key Vault\")\n", + "except Exception as e:\n", + " logger.error(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\", exc_info=True)\n", + " raise KeyError(f\"Could not retrieve 'SERVICE-PRINCIPLE-CLIENT-ID' from Key Vault '{keyvault_name}': {e}\")\n", + "\n", + "logger.info(\"βœ… Successfully retrieved all Service Principal secrets from Key Vault\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "881b1297-abd6-47b9-83f3-0111b4a47cd8", + "showTitle": true, + "tableResultSettingsMap": {}, + "title": "Assign OAuth" + } + }, + "outputs": [], + "source": [ + "# --- Parameterise containers ---\n", + "curated_storage_account = f\"ingest{lz_key}curated{env}\"\n", + "curated_container = \"gold\"\n", + "silver_curated_container = \"silver\"\n", + "\n", + "# --- Assign OAuth to storage accounts ---\n", + "storage_accounts = [curated_storage_account]\n", + "\n", + "for storage_account in storage_accounts:\n", + " try:\n", + " configs = {\n", + " f\"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net\": \"OAuth\",\n", + " f\"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net\":\n", + " \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n", + " f\"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net\": client_id,\n", + " f\"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net\": client_secret,\n", + " f\"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net\":\n", + " f\"https://login.microsoftonline.com/{tenant_id}/oauth2/token\"\n", + " }\n", + "\n", + " for key, val in configs.items():\n", + " try:\n", + " spark.conf.set(key, val)\n", + " except Exception as e:\n", + " logger.error(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Failed to set Spark config '{key}' for storage account '{storage_account}': {e}\")\n", + "\n", + " logger.info(f\"βœ… Successfully configured OAuth for storage account: {storage_account}\")\n", + "\n", + " except Exception as e:\n", + " logger.error(f\"Error configuring OAuth for storage account '{storage_account}': {e}\", exc_info=True)\n", + " raise RuntimeError(f\"Error configuring OAuth for storage account '{storage_account}': {e}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "86c6ca78-8d30-4ebc-a762-4adb3a11fdc6", + "showTitle": true, + "tableResultSettingsMap": { + "0": { + "dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1764156534467}", + "filterBlob": "{\"version\":1,\"filterGroups\":[],\"syncTimestamp\":1764851051152}", + "queryPlanFiltersBlob": "[]", + "tableResultIndex": 0 + } + }, + "title": "From CCD Publish Payload to CCD Call Result" + } + }, + "outputs": [ + { + "data": { + "text/plain": [ + "Databricks visualization. Run in Databricks to view." + ] + }, + "metadata": { + "application/vnd.databricks.v1.subcommand+json": { + "baseErrorDetails": null, + "bindings": {}, + "collapsed": false, + "command": "%python\n__backend_agg_display_orig = display\n__backend_agg_dfs = []\ndef __backend_agg_display_new(df):\n __backend_agg_df_modules = [\"pandas.core.frame\", \"databricks.koalas.frame\", \"pyspark.sql.dataframe\", \"pyspark.pandas.frame\", \"pyspark.sql.connect.dataframe\"]\n if (type(df).__module__ in __backend_agg_df_modules and type(df).__name__ == 'DataFrame') or isinstance(df, list):\n __backend_agg_dfs.append(df)\n\ndisplay = __backend_agg_display_new\n\ndef __backend_agg_user_code_fn():\n import base64\n exec(base64.standard_b64decode(\"Y2NkX2Nhc2VfbGlua19wdWJsaXNoX3BheWxvYWRfcmVzdWx0ID0gc3BhcmsucmVhZC5mb3JtYXQoImRlbHRhIikubG9hZChmImFiZnNzOi8ve3NpbHZlcl9jdXJhdGVkX2NvbnRhaW5lcn1Ae2N1cmF0ZWRfc3RvcmFnZV9hY2NvdW50fS5kZnMuY29yZS53aW5kb3dzLm5ldC9BUklBRE0vQUNUSVZFL0NDRC9BUFBFQUxTL0NBU0VfTElOS0lORy9wdWJsaXNoX3BheWxvYWRfYXVkaXQiKQpjY2RfY2FzZV9saW5rX3B1Ymxpc2hfcGF5bG9hZF9yZXN1bHQuY3JlYXRlT3JSZXBsYWNlVGVtcFZpZXcoImNjZF9jYXNlX2xpbmtfcHVibGlzaF9wYXlsb2FkX3Jlc3VsdCIpCgpjY2RfY2FzZV9saW5rX2NhbGxfcmVzdWx0ID0gc3BhcmsucmVhZC5mb3JtYXQoImRlbHRhIikubG9hZChmImFiZnNzOi8vc2lsdmVyQGluZ2VzdHtsel9rZXl9Y3VyYXRlZHtlbnZ9LmRmcy5jb3JlLndpbmRvd3MubmV0L0FSSUFETS9BQ1RJVkUvQ0NEL0FVRElUL0FQUEVBTFMvQ0FTRV9MSU5LSU5HL2Fja19hdWRpdCIpCmNjZF9jYXNlX2xpbmtfY2FsbF9yZXN1bHQuY3JlYXRlT3JSZXBsYWNlVGVtcFZpZXcoImNjZF9jYXNlX2xpbmtfY2FsbF9yZXN1bHQiKQoKc3Bhcmsuc3FsKCIiIgogICAgU0VMRUNUIAogICAgICAgIENPQUxFU0NFKHQyLlJ1bklkLCB0MS5SdW5JZCkgQVMgUnVuSUQsCiAgICAgICAgQ09BTEVTQ0UodDIuQ0NEQ2FzZVJlZmVyZW5jZU51bWJlciwgdDEuQ0NEQ2FzZVJlZmVyZW5jZU51bWJlcikgYXMgYENDRCBDYXNlIFJlZmVyZW5jZSBOdW1iZXJgLAogICAgICAgIHQxLlN0YXR1cyBhcyBgQ2FzZSBMaW5rIFB1Ymxpc2ggU3RhdHVzYCwKICAgICAgICB0Mi5TdGF0dXMgYXMgYENhc2UgTGluayBDQ0QgQ2FsbCBTdGF0dXNgLAogICAgICAgIHQxLkNhc2VMaW5rQ291bnQgYXMgYENhc2UgTGluayBQdWJsaXNoIENhc2UgTGluayBDb3VudGAsCiAgICAgICAgdDEuU3RhcnREYXRlVGltZSBhcyBgQ2FzZSBMaW5rIFB1Ymxpc2ggUHVibGlzaGluZyBEYXRlIFRpbWVgLAogICAgICAgIHQxLkVycm9yIGFzIGBDYXNlIExpbmsgUHVibGlzaCBFcnJvcmAsCiAgICAgICAgdDIuQ2FzZUxpbmtDb3VudCBhcyBgQ2FzZSBMaW5rIENDRCBDYWxsIEZ1bmN0aW9uIEFwcCBDYXNlIExpbmsgQ291bnRgLAogICAgICAgIHQyLlN0YXJ0RGF0ZVRpbWUgYXMgYENhc2UgTGluayBDQ0QgQ2FsbCBGdW5jdGlvbiBBcHAgU3RhcnQgRGF0ZSBUaW1lYCwKICAgICAgICB0Mi5FbmREYXRlVGltZSBhcyBgQ2FzZSBMaW5rIENDRCBDYWxsIEZ1bmN0aW9uIEFwcCBFbmQgRGF0ZSBUaW1lYCwKICAgICAgICB0Mi5FcnJvciBhcyBgQ2FzZSBMaW5rIENDRCBDYWxsIEZ1bmN0aW9uIEFwcCBFcnJvcmAKICAgIEZST00gY2NkX2Nhc2VfbGlua19wdWJsaXNoX3BheWxvYWRfcmVzdWx0IHQxCiAgICAgICAgRlVMTCBPVVRFUiBKT0lOIGNjZF9jYXNlX2xpbmtfY2FsbF9yZXN1bHQgdDIgT04gdDEuQ0NEQ2FzZVJlZmVyZW5jZU51bWJlciA9IHQyLkNDRENhc2VSZWZlcmVuY2VOdW1iZXIgQU5EIHQxLlJ1bklEID0gdDIuUnVuSUQKICAgIE9SREVSIEJZIGBDYXNlIExpbmsgUHVibGlzaCBQdWJsaXNoaW5nIERhdGUgVGltZWAgREVTQwoiIiIpLmRpc3BsYXkoKQo=\").decode())\n\ntry:\n # run user code\n __backend_agg_user_code_fn()\n\n #reset display function\n display = __backend_agg_display_orig\n\n if len(__backend_agg_dfs) > 0:\n # create a temp view\n if type(__backend_agg_dfs[0]).__module__ == \"databricks.koalas.frame\":\n # koalas dataframe\n __backend_agg_dfs[0].to_spark().createOrReplaceTempView(\"DatabricksViewbd615c2\")\n elif type(__backend_agg_dfs[0]).__module__ == \"pandas.core.frame\" or isinstance(__backend_agg_dfs[0], list):\n # pandas dataframe\n spark.createDataFrame(__backend_agg_dfs[0]).createOrReplaceTempView(\"DatabricksViewbd615c2\")\n else:\n __backend_agg_dfs[0].createOrReplaceTempView(\"DatabricksViewbd615c2\")\n #run backend agg\n display(spark.sql(\"\"\"WITH q AS (select * from DatabricksViewbd615c2) SELECT `State`,COUNT(*) `column_954ccfa6298`,`CCD Call Status` FROM q GROUP BY `CCD Call Status`,`State`\"\"\"))\n else:\n displayHTML(\"dataframe no longer exists. If you're using dataframe.display(), use display(dataframe) instead.\")\n\n\nfinally:\n spark.sql(\"drop view if exists DatabricksViewbd615c2\")\n display = __backend_agg_display_orig\n del __backend_agg_display_new\n del __backend_agg_display_orig\n del __backend_agg_dfs\n del __backend_agg_user_code_fn\n\n", + "commandTitle": "Visualization 1", + "commandType": "auto", + "commandVersion": 0, + "commentThread": [], + "commentsVisible": false, + "contentSha256Hex": null, + "customPlotOptions": { + "redashChart": [ + { + "key": "type", + "value": "CHART" + }, + { + "key": "options", + "value": { + "alignYAxesAtZero": true, + "coefficient": 1, + "columnConfigurationMap": { + "series": { + "column": "CCD Call Status", + "id": "column_954ccfa6297" + }, + "x": { + "column": "State", + "id": "column_954ccfa6295" + }, + "y": [ + { + "column": "*", + "id": "column_954ccfa6298", + "transform": "COUNT" + } + ] + }, + "dateTimeFormat": "DD/MM/YYYY HH:mm", + "direction": { + "type": "counterclockwise" + }, + "error_y": { + "type": "data", + "visible": true + }, + "globalSeriesType": "column", + "isAggregationOn": true, + "legend": { + "traceorder": "normal" + }, + "missingValuesAsZero": true, + "numberFormat": "0,0.[00000]", + "percentFormat": "0[.]00%", + "series": { + "error_y": { + "type": "data", + "visible": true + }, + "stacking": null + }, + "seriesOptions": { + "column_954ccfa6298": { + "type": "column", + "yAxis": 0 + } + }, + "showDataLabels": false, + "sizemode": "diameter", + "sortX": true, + "sortY": true, + "swappedAxes": true, + "textFormat": "", + "useAggregationsUi": true, + "valuesOptions": {}, + "version": 2, + "xAxis": { + "labels": { + "enabled": true + }, + "type": "-" + }, + "yAxis": [ + { + "type": "-" + }, + { + "opposite": true, + "type": "-" + } + ] + } + } + ] + }, + "datasetPreviewNameToCmdIdMap": {}, + "diffDeletes": [], + "diffInserts": [], + "displayType": "redashChart", + "error": null, + "errorDetails": null, + "errorSummary": null, + "errorTraceType": null, + "finishTime": 0, + "globalVars": {}, + "guid": "", + "height": "auto", + "hideCommandCode": false, + "hideCommandResult": false, + "iPythonMetadata": null, + "inputWidgets": {}, + "isLockedInExamMode": false, + "latestAssumeRoleInfo": null, + "latestUser": "a user", + "latestUserId": null, + "listResultMetadata": null, + "metadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "nuid": "3f4bc456-1bf9-4a63-a08d-ed455cb3cac9", + "origId": 0, + "parentHierarchy": [], + "pivotAggregation": null, + "pivotColumns": null, + "position": 8, + "resultDbfsErrorMessage": null, + "resultDbfsStatus": "INLINED_IN_TREE", + "results": null, + "showCommandTitle": false, + "startTime": 0, + "state": "input", + "streamStates": {}, + "subcommandOptions": { + "queryPlan": { + "groups": [ + { + "column": "State", + "type": "column" + }, + { + "column": "CCD Call Status", + "type": "column" + } + ], + "selects": [ + { + "column": "State", + "type": "column" + }, + { + "alias": "column_954ccfa6298", + "args": [ + { + "column": "*", + "type": "column" + } + ], + "function": "COUNT", + "type": "function" + }, + { + "column": "CCD Call Status", + "type": "column" + } + ] + } + }, + "submitTime": 0, + "subtype": "tableResultSubCmd.visualization", + "tableResultIndex": 0, + "tableResultSettingsMap": {}, + "useConsistentColors": false, + "version": "CommandV1", + "width": "auto", + "workflows": null, + "xColumns": null, + "yColumns": null + } + }, + "output_type": "display_data" + } + ], + "source": [ + "ccd_case_link_publish_payload_result = spark.read.format(\"delta\").load(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/CASE_LINKING/publish_payload_audit\")\n", + "ccd_case_link_publish_payload_result.createOrReplaceTempView(\"ccd_case_link_publish_payload_result\")\n", + "\n", + "ccd_case_link_call_result = spark.read.format(\"delta\").load(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/CASE_LINKING/ack_audit\")\n", + "ccd_case_link_call_result.createOrReplaceTempView(\"ccd_case_link_call_result\")\n", + "\n", + "spark.sql(\"\"\"\n", + " SELECT \n", + " COALESCE(t2.RunId, t1.RunId) AS RunID,\n", + " COALESCE(t2.CCDCaseReferenceNumber, t1.CCDCaseReferenceNumber) as `CCD Case Reference Number`,\n", + " t1.Status as `Case Link Publish Status`,\n", + " t2.Status as `Case Link CCD Call Status`,\n", + " t1.CaseLinkCount as `Case Link Publish Case Link Count`,\n", + " t1.StartDateTime as `Case Link Publish Publishing Date Time`,\n", + " t1.Error as `Case Link Publish Error`,\n", + " t2.CaseLinkCount as `Case Link CCD Call Function App Case Link Count`,\n", + " t2.StartDateTime as `Case Link CCD Call Function App Start Date Time`,\n", + " t2.EndDateTime as `Case Link CCD Call Function App End Date Time`,\n", + " t2.Error as `Case Link CCD Call Function App Error`\n", + " FROM ccd_case_link_publish_payload_result t1\n", + " FULL OUTER JOIN ccd_case_link_call_result t2 ON t1.CCDCaseReferenceNumber = t2.CCDCaseReferenceNumber AND t1.RunID = t2.RunID\n", + " ORDER BY `Case Link Publish Publishing Date Time` DESC\n", + "\"\"\").display()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "implicitDf": true, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "f6a073c1-bb35-499c-9497-24fe9d9be352", + "showTitle": true, + "tableResultSettingsMap": { + "0": { + "dataGridStateBlob": "{\"version\":1,\"tableState\":{\"columnPinning\":{\"left\":[\"#row_number#\"],\"right\":[]},\"columnSizing\":{\"CaseNo\":183},\"columnVisibility\":{}},\"settings\":{\"columns\":{}},\"syncTimestamp\":1773652553328}", + "filterBlob": "{\"version\":1,\"filterGroups\":[],\"syncTimestamp\":1768996115008}", + "queryPlanFiltersBlob": "[]", + "tableResultIndex": 0 + } + }, + "title": "From Segmentation to CCD Call Result" + } + }, + "outputs": [], + "source": [ + "case_link_groups = spark.read.table(f\"hive_metastore.ariadm_active_appeals.silver_case_link_groups\")\n", + "case_link_groups.createOrReplaceTempView(\"case_link_groups\")\n", + "case_link_mappings = spark.read.table(f\"hive_metastore.ariadm_active_appeals.aria_ccd_case_mappings\")\n", + "case_link_mappings.createOrReplaceTempView(\"case_link_mappings\")\n", + "case_link_combinations = spark.read.table(f\"hive_metastore.ariadm_active_appeals.case_link_combinations\")\n", + "case_link_combinations.createOrReplaceTempView(\"case_link_combinations\")\n", + "case_link_payloads = spark.read.table(f\"hive_metastore.ariadm_active_appeals.case_link_payloads\")\n", + "case_link_payloads.createOrReplaceTempView(\"case_link_payloads\")\n", + "\n", + "ccd_case_link_publish_payload_result = spark.read.format(\"delta\").load(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/APPEALS/CASE_LINKING/publish_payload_audit\")\n", + "ccd_case_link_publish_payload_result.createOrReplaceTempView(\"ccd_case_link_publish_payload_result\")\n", + "\n", + "ccd_case_link_call_result = spark.read.format(\"delta\").load(f\"abfss://{silver_curated_container}@{curated_storage_account}.dfs.core.windows.net/ARIADM/ACTIVE/CCD/AUDIT/APPEALS/CASE_LINKING/ack_audit\")\n", + "ccd_case_link_call_result.createOrReplaceTempView(\"ccd_case_link_call_result\")\n", + "\n", + "spark.sql(\"\"\"\n", + " SELECT \n", + " g.CaseNo AS CaseNo,\n", + " p.CCDCaseReferenceNumberFrom AS `CCD Case Reference Number`,\n", + " CASE WHEN g.CaseNo IS NOT NULL AND g.LinkNo IS NOT NULL\n", + " THEN 'YES'\n", + " ELSE 'NO'\n", + " END AS `ARIA Case Link Groups Exist`,\n", + " CASE WHEN m.CaseNo IS NOT NULL AND m.CCDCaseID IS NOT NULL AND m.CCDCaseID = t2.CCDCaseReferenceNumber\n", + " THEN 'SUCCESS'\n", + " ELSE 'ERROR'\n", + " END AS `ARIA-CCD Case Mappings Status`,\n", + " CASE WHEN c.CCDCaseReferenceNumberFrom IS NOT NULL AND c.CCDCaseReferenceNumberFrom = t2.CCDCaseReferenceNumber\n", + " THEN 'SUCCESS'\n", + " ELSE 'ERROR'\n", + " END AS `Case Link Combinations Status`,\n", + " CASE WHEN p.CCDCaseReferenceNumberFrom IS NOT NULL AND p.CCDCaseReferenceNumberFrom != '' THEN 'SUCCESS' ELSE 'ERROR' END AS `Case Link Payloads Status`,\n", + " t1.Status AS `Case Link Publish Status`,\n", + " t2.Status AS `Case Link CCD Call Status`,\n", + " CASE WHEN g.LinkNo IS NOT NULL\n", + " THEN COUNT(*) OVER (PARTITION BY g.LinkNo) - 1\n", + " ELSE 0\n", + " END AS `ARIA Case Link Groups Case Link Count`,\n", + " t1.CaseLinkCount as `Case Link Publish Case Link Count`,\n", + " t1.StartDateTime as `Case Link Publish Publishing Date Time`,\n", + " t1.Error as `Case Link Publish Error`,\n", + " t2.CaseLinkCount as `Case Link CCD Call Function App Case Link Count`,\n", + " t2.StartDateTime as `Case Link CCD Call Function App Start Date Time`,\n", + " t2.EndDateTime as `Case Link CCD Call Function App End Date Time`,\n", + " t2.Error as `Case Link CCD Call Function App Error`\n", + " FROM case_link_groups g\n", + " FULL OUTER JOIN case_link_mappings m ON g.CaseNo = m.CaseNo\n", + " FULL OUTER JOIN case_link_combinations c ON m.CaseNo = c.CaseNoFrom\n", + " FULL OUTER JOIN case_link_payloads p ON c.CCDCaseReferenceNumberFrom = p.CCDCaseReferenceNumberFrom\n", + " FULL OUTER JOIN ccd_case_link_publish_payload_result t1 ON p.CCDCaseReferenceNumberFrom = t1.CCDCaseReferenceNumber\n", + " FULL OUTER JOIN ccd_case_link_call_result t2 ON t1.CCDCaseReferenceNumber = t2.CCDCaseReferenceNumber AND t1.RunID = t2.RunID\n", + " ORDER BY `Case Link Publish Publishing Date Time` DESC\n", + "\"\"\").display()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "0614573a-f693-44fb-bbe2-6abcd7883060", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "dbutils.notebook.exit(\"Notebook completed successfully\")" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [ + { + "elements": [ + { + "dashboardResultIndex": 0, + "elementNUID": "f6a073c1-bb35-499c-9497-24fe9d9be352", + "elementType": "command", + "guid": "02ef1a54-7596-46ed-b272-8aafd38d23ad", + "options": null, + "position": { + "height": 6, + "width": 12, + "x": 0, + "y": 6, + "z": null + }, + "resultIndex": null + }, + { + "dashboardResultIndex": 0, + "elementNUID": "86c6ca78-8d30-4ebc-a762-4adb3a11fdc6", + "elementType": "command", + "guid": "06a416e4-707b-4bc4-b8d4-39453a39e79c", + "options": null, + "position": { + "height": 14, + "width": 24, + "x": 0, + "y": 12, + "z": null + }, + "resultIndex": null + }, + { + "dashboardResultIndex": null, + "elementNUID": "3f4bc456-1bf9-4a63-a08d-ed455cb3cac9", + "elementType": "command", + "guid": "77b9c1fb-dcdd-4abd-af4a-1ccded852386", + "options": null, + "position": { + "height": 6, + "width": 16, + "x": 0, + "y": 0, + "z": null + }, + "resultIndex": null + } + ], + "globalVars": {}, + "guid": "", + "layoutOption": { + "grid": true, + "stack": true + }, + "nuid": "460527ac-f6c6-44a4-bca1-ba672fcab2ae", + "origId": 5342013896403477, + "title": "Case Execution Tracking Dashboard", + "version": "DashboardViewV1", + "width": 2048 + } + ], + "environmentMetadata": { + "base_environment": "", + "environment_version": "4" + }, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "mostRecentlyExecutedCommandWithImplicitDF": { + "commandId": 8664186941464404, + "dataframes": [ + "_sqldf" + ] + }, + "pythonIndentUnit": 4 + }, + "notebookName": "Case_Linking_Tracking_Dashboard", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/azure-pipelines_sbox_build.yml b/azure-pipelines_sbox_build.yml index e9c0def7..ad117bd0 100644 --- a/azure-pipelines_sbox_build.yml +++ b/azure-pipelines_sbox_build.yml @@ -20,7 +20,7 @@ parameters: service_connection: DTS-DATAINGEST-SBOX pool_name: 'hmcts-sds-ptl' segments: ['bails', 'apluta', 'td', 'joh', 'aplfpa', 'aplfta', 'sbails'] - states: ['active_ccd', 'active_caselink_ccd'] # Active Function State + states: ['active_ccd', 'active_caselink_ccd', 'active_cdam'] # Active Function State landing_zones: - landing_zone: '00' databricks_host: 'https://adb-3635282203417052.12.azuredatabricks.net' @@ -47,7 +47,7 @@ parameters: dependsOn: sbox_aria pool_name: 'hmcts-sds-ptl' segments: ['bails', 'apluta', 'td', 'joh', 'aplfpa', 'aplfta', 'sbails'] - states: ['active_ccd', 'active_caselink_ccd'] # Active Function State + states: ['active_ccd', 'active_caselink_ccd', 'active_cdam'] # Active Function State landing_zones: - landing_zone: '00' databricks_host: 'https://adb-4305432441461530.10.azuredatabricks.net' @@ -285,6 +285,7 @@ stages: dependsOn: - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_active_ccd - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_active_caselink_ccd + - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_active_cdam - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_bails - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_apluta - Deploy_Functions_${{ deployment.environment }}_LZ${{ lz.landing_zone }}_td diff --git a/tests/active/cdamFunctionApp/__init__.py b/tests/active/cdamFunctionApp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/active/cdamFunctionApp/cdam_functions_test.py b/tests/active/cdamFunctionApp/cdam_functions_test.py new file mode 100644 index 00000000..fcdd814f --- /dev/null +++ b/tests/active/cdamFunctionApp/cdam_functions_test.py @@ -0,0 +1,370 @@ +import pytest +from unittest.mock import patch, MagicMock + +# Patch Azure SDK clients before the module-level IDAMTokenManager(env="sbox") +# instantiation runs, so that importing cdamFunctions does not hit Key Vault. +with patch("azure.identity.DefaultAzureCredential"), \ + patch("azure.keyvault.secrets.SecretClient") as _mock_kv_cls: + _mock_kv_cls.return_value.get_secret.return_value.value = "fake-secret" + from AzureFunctions.ACTIVE.active_cdam.cdamFunctions import ( + upload_document, + process_event, + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def mock_response(status_code, json_data=None, text=""): + mock = MagicMock() + mock.status_code = status_code + mock.text = text + mock.json.return_value = json_data or {} + return mock + + +MODULE = "AzureFunctions.ACTIVE.active_cdam.cdamFunctions" +SLEEP_PATH = "AzureFunctions.ACTIVE.active_cdam.retry_decorator.time.sleep" + +UPLOAD_COMMON = dict( + cdam_base_url="http://ccd-case-document-am-api-aat.service.core-compute-aat.internal", + jid="IA", + ctid="Asylum", + cid="1234567890123456", + file_name="test_document.html", + doc_binary=b"document", + content_type="text/html", + idam_token="idam-token", + s2s_token="s2s-token", +) + +PROCESS_DEFAULTS = dict( + env="sbox", + caseNo="1234567890123456", + runId="run-001", + file_name="test.html", + file_url="https://mystorageaccount.blob.core.windows.net/mycontainer/path/to/test.html", + file_content_type="text/html", +) + + +# --------------------------------------------------------------------------- +# upload_document +# --------------------------------------------------------------------------- + +@patch("requests.post") +def test_upload_document_success_returns_response(mock_post): + mock_post.return_value = mock_response(201, {"documents": [{"id": "doc-123"}]}) + + response = upload_document(**UPLOAD_COMMON) + + assert response.status_code == 201 + + +@patch("requests.post") +def test_upload_document_posts_to_correct_url(mock_post): + mock_post.return_value = mock_response(201) + + upload_document(**UPLOAD_COMMON) + + expected_url = f"{UPLOAD_COMMON['cdam_base_url']}/cases/documents" + mock_post.assert_called_once() + assert mock_post.call_args.args[0] == expected_url + + +@patch("requests.post") +def test_upload_document_sends_correct_auth_headers(mock_post): + mock_post.return_value = mock_response(201) + + upload_document(**UPLOAD_COMMON) + + headers = mock_post.call_args.kwargs["headers"] + assert headers["Authorization"] == f"Bearer {UPLOAD_COMMON['idam_token']}" + assert headers["ServiceAuthorization"] == UPLOAD_COMMON["s2s_token"] + + +@patch("requests.post") +def test_upload_document_sends_correct_classification_body(mock_post): + mock_post.return_value = mock_response(201) + + upload_document(**UPLOAD_COMMON) + + body = mock_post.call_args.kwargs["data"] + assert body["classification"] == "PUBLIC" + assert body["caseTypeId"] == UPLOAD_COMMON["ctid"] + assert body["jurisdictionId"] == UPLOAD_COMMON["jid"] + + +@patch("requests.post") +def test_upload_document_includes_file_in_request(mock_post): + mock_post.return_value = mock_response(201) + + upload_document(**UPLOAD_COMMON) + + files = mock_post.call_args.kwargs["files"] + assert len(files) == 1 + field_name, (name, binary, content_type) = files[0] + assert field_name == "files" + assert name == UPLOAD_COMMON["file_name"] + assert binary == UPLOAD_COMMON["doc_binary"] + assert content_type == UPLOAD_COMMON["content_type"] + + +@patch("requests.post") +def test_upload_document_uses_provided_content_type(mock_post): + mock_post.return_value = mock_response(201) + + upload_document(**{**UPLOAD_COMMON, "content_type": "application/pdf"}) + + files = mock_post.call_args.kwargs["files"] + _field, (_name, _binary, ct) = files[0] + assert ct == "application/pdf" + + +@patch("requests.post", side_effect=Exception("Connection refused")) +def test_upload_document_returns_none_on_network_error(mock_post): + response = upload_document(**UPLOAD_COMMON) + + assert response is None + + +@patch("requests.post") +def test_upload_document_returns_non_201_response(mock_post): + mock_post.return_value = mock_response(400, text="Bad Request") + + response = upload_document(**UPLOAD_COMMON) + + assert response.status_code == 400 + + +# --------------------------------------------------------------------------- +# process_event fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def no_retry_sleep(): + """Suppress retry backoff sleeps in all process_event tests.""" + with patch(SLEEP_PATH): + yield + + +@pytest.fixture +def mock_token_managers(): + """Patch the module-level idam_token_mgr and S2S_Manager.""" + with patch(f"{MODULE}.idam_token_mgr") as mock_idam, \ + patch(f"{MODULE}.S2S_Manager") as mock_s2s_cls: + + mock_idam.get_token.return_value = ("mock-idam-token", "uid-001") + + mock_s2s_inst = MagicMock() + mock_s2s_inst.get_token.return_value = "mock-s2s-token" + mock_s2s_cls.return_value = mock_s2s_inst + + yield mock_idam, mock_s2s_cls + + +@pytest.fixture +def mock_blob_client(): + """Patch BlobServiceClient to return fake document binary.""" + with patch(f"{MODULE}.BlobServiceClient") as mock_blob_cls: + mock_blob_instance = MagicMock() + mock_blob_cls.return_value = mock_blob_instance + mock_blob_instance.get_blob_client.return_value.readall.return_value = b"doc" + yield mock_blob_cls + + +# --------------------------------------------------------------------------- +# process_event β€” token acquisition +# --------------------------------------------------------------------------- + +def test_process_event_idam_token_failure_returns_error(): + with patch(f"{MODULE}.idam_token_mgr") as mock_idam: + mock_idam.get_token.side_effect = Exception("IDAM unreachable") + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert "IDAM" in result["Error"] or "s2s" in result["Error"] + assert result["CaseNo"] == PROCESS_DEFAULTS["caseNo"] + assert result["RunID"] == PROCESS_DEFAULTS["runId"] + + +def test_process_event_s2s_token_failure_returns_error(): + with patch(f"{MODULE}.idam_token_mgr") as mock_idam, \ + patch(f"{MODULE}.S2S_Manager") as mock_s2s_cls: + + mock_idam.get_token.return_value = ("tok", "uid") + mock_s2s_cls.side_effect = Exception("S2S unreachable") + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert result["CaseNo"] == PROCESS_DEFAULTS["caseNo"] + + +# --------------------------------------------------------------------------- +# process_event β€” invalid environment +# --------------------------------------------------------------------------- + +def test_process_event_invalid_env_raises_value_error(mock_token_managers, mock_blob_client): + with pytest.raises(ValueError, match="Invalid environment"): + process_event( + env="invalid_env", + caseNo="1234567890123456", + runId="run-001", + file_name="doc.html", + file_url="https://storage.blob.core.windows.net/container/blob", + file_content_type="text/html", + ) + + +# --------------------------------------------------------------------------- +# process_event β€” blob read failure +# --------------------------------------------------------------------------- + +@pytest.mark.usefixtures("mock_token_managers") +def test_process_event_blob_read_failure_returns_error(): + with patch(f"{MODULE}.BlobServiceClient") as mock_blob_cls: + mock_blob_cls.side_effect = Exception("Blob storage unavailable") + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert "blob" in result["Error"].lower() or "Failed" in result["Error"] + assert result["CaseNo"] == PROCESS_DEFAULTS["caseNo"] + + +# --------------------------------------------------------------------------- +# process_event β€” upload failure +# --------------------------------------------------------------------------- + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_upload_returns_none_returns_error(mock_upload, mock_blob_client): + mock_upload.return_value = None + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert "No response" in result["Error"] + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_upload_non_2xx_returns_error(mock_upload, mock_blob_client): + mock_upload.return_value = mock_response(500, text="Internal Server Error") + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert "500" in result["Error"] + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_upload_400_returns_error(mock_upload, mock_blob_client): + mock_upload.return_value = mock_response(400, text="Bad Request") + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "ERROR" + assert "400" in result["Error"] + + +# --------------------------------------------------------------------------- +# process_event β€” success +# --------------------------------------------------------------------------- + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_success_with_201(mock_upload, mock_blob_client): + doc_href = "http://cdam.example.com/documents/doc-abc" + mock_upload.return_value = mock_response( + 201, + json_data={"links": {"self": {"href": doc_href}}}, + ) + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "SUCCESS" + assert result["Error"] is None + assert result["CaseNo"] == PROCESS_DEFAULTS["caseNo"] + assert result["RunID"] == PROCESS_DEFAULTS["runId"] + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_success_with_200(mock_upload, mock_blob_client): + doc_href = "http://cdam.example.com/documents/doc-abc" + mock_upload.return_value = mock_response( + 200, + json_data={"links": {"self": {"href": doc_href}}}, + ) + + result = process_event(**PROCESS_DEFAULTS) + + assert result["Status"] == "SUCCESS" + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_result_contains_cdam_response(mock_upload, mock_blob_client): + cdam_resp = {"links": {"self": {"href": "http://example.com/doc"}}} + mock_upload.return_value = mock_response(201, json_data=cdam_resp) + + result = process_event(**PROCESS_DEFAULTS) + + assert result["CDAMResponse"] == cdam_resp + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_passes_idam_token_string_not_tuple(mock_upload, mock_blob_client): + """idam_token passed to upload_document must be a plain string, not a tuple.""" + mock_upload.return_value = mock_response( + 201, + json_data={"links": {"self": {"href": "http://example.com/doc"}}}, + ) + + process_event(**PROCESS_DEFAULTS) + + # upload_document is called positionally; idam_token is at index 7 after the + # new content_type arg was inserted at index 6. + call_args = mock_upload.call_args.args + idam_token_arg = call_args[7] + assert isinstance(idam_token_arg, str), ( + f"Expected idam_token to be a str, got {type(idam_token_arg)}: {idam_token_arg!r}" + ) + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_forwards_file_content_type(mock_upload, mock_blob_client): + """file_content_type from the event payload must be forwarded to upload_document.""" + mock_upload.return_value = mock_response( + 201, + json_data={"links": {"self": {"href": "http://example.com/doc"}}}, + ) + + process_event(**{**PROCESS_DEFAULTS, "file_content_type": "application/pdf"}) + + call_args = mock_upload.call_args.args + # content_type is the 7th positional arg (index 6) + assert call_args[6] == "application/pdf" + + +@pytest.mark.usefixtures("mock_token_managers") +@patch(f"{MODULE}.upload_document") +def test_process_event_result_has_start_and_end_datetime(mock_upload, mock_blob_client): + mock_upload.return_value = mock_response( + 201, + json_data={"links": {"self": {"href": "http://example.com/doc"}}}, + ) + + result = process_event(**PROCESS_DEFAULTS) + + assert "StartDateTime" in result + assert "EndDateTime" in result + assert result["StartDateTime"] is not None + assert result["EndDateTime"] is not None diff --git a/tests/active/cdamFunctionApp/cdam_tokenManager_test.py b/tests/active/cdamFunctionApp/cdam_tokenManager_test.py new file mode 100644 index 00000000..4a6e18bf --- /dev/null +++ b/tests/active/cdamFunctionApp/cdam_tokenManager_test.py @@ -0,0 +1,519 @@ +import pytest +from datetime import datetime, timezone, timedelta +from unittest.mock import patch, MagicMock + +from AzureFunctions.ACTIVE.active_cdam.cdam_tokenManager import IDAMTokenManager, S2S_Manager + +MODULE = "AzureFunctions.ACTIVE.active_cdam.cdam_tokenManager" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_kv_client(overrides=None): + """Return a mock SecretClient with configurable secret values.""" + secrets = { + "idam-client-id": "fake-client-id", + "idam-secret": "fake-client-secret", + "system-username": "fake-username", + "system-password": "fake-password", + "s2s-secret": "JBSWY3DPEHPK3PXP", # valid base32 for pyotp + } + if overrides: + secrets.update(overrides) + + def get_secret(name): + secret = MagicMock() + secret.value = secrets[name] + return secret + + kv = MagicMock() + kv.get_secret.side_effect = get_secret + return kv + + +def make_idam_manager(env="sbox", skew=28792, **kv_overrides): + """Instantiate IDAMTokenManager with Azure dependencies mocked.""" + with patch(f"{MODULE}.DefaultAzureCredential"), \ + patch(f"{MODULE}.SecretClient") as mock_kv_cls: + mock_kv_cls.return_value = make_kv_client(kv_overrides) + return IDAMTokenManager(env=env, skew=skew) + + +def make_s2s_manager(env="sbox", skew=21, **kv_overrides): + """Instantiate S2S_Manager with Azure dependencies mocked.""" + with patch(f"{MODULE}.DefaultAzureCredential"), \ + patch(f"{MODULE}.SecretClient") as mock_kv_cls: + mock_kv_cls.return_value = make_kv_client(kv_overrides) + return S2S_Manager(env=env, skew=skew) + + +def mock_token_response(status_code=200, access_token="tok-abc", expires_in=3600): + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = {"access_token": access_token, "expires_in": expires_in} + resp.text = "OK" + return resp + + +def mock_uid_response(uid="user-123"): + resp = MagicMock() + resp.json.return_value = {"uid": uid} + return resp + + +# --------------------------------------------------------------------------- +# IDAMTokenManager.__init__ +# --------------------------------------------------------------------------- + +def test_idam_init_sbox_sets_correct_idam_host(): + mgr = make_idam_manager(env="sbox") + assert mgr.idam_host == "https://idam-web-public.aat.platform.hmcts.net" + + +def test_idam_init_stg_sets_correct_idam_host(): + mgr = make_idam_manager(env="stg") + assert mgr.idam_host == "https://idam-api.aat.platform.hmcts.net" + + +def test_idam_init_key_vault_url_contains_ia_aat(): + with patch(f"{MODULE}.DefaultAzureCredential"), \ + patch(f"{MODULE}.SecretClient") as mock_kv_cls: + mock_kv_cls.return_value = make_kv_client() + IDAMTokenManager(env="sbox") + assert "ia-aat" in mock_kv_cls.call_args.kwargs["vault_url"] + + +def test_idam_init_fetches_all_required_secrets(): + with patch(f"{MODULE}.DefaultAzureCredential"), \ + patch(f"{MODULE}.SecretClient") as mock_kv_cls: + kv = make_kv_client() + mock_kv_cls.return_value = kv + IDAMTokenManager(env="sbox") + + fetched = [c.args[0] for c in kv.get_secret.call_args_list] + assert "idam-client-id" in fetched + assert "idam-secret" in fetched + assert "system-username" in fetched + assert "system-password" in fetched + + +def test_idam_init_stores_secrets_on_instance(): + mgr = make_idam_manager( + **{ + "idam-client-id": "my-client", + "idam-secret": "my-secret", + "system-username": "user@test.com", + "system-password": "p4ssw0rd", + } + ) + assert mgr.client_id == "my-client" + assert mgr.client_secret == "my-secret" + assert mgr.username == "user@test.com" + assert mgr.password == "p4ssw0rd" + + +def test_idam_init_token_state_is_none(): + mgr = make_idam_manager() + assert mgr._token is None + assert mgr._expiration_time is None + assert mgr._uid is None + + +def test_idam_init_token_url_and_uid_url_are_set(): + mgr = make_idam_manager(env="sbox") + assert mgr.token_url.endswith("/o/token") + assert mgr.uid_url.endswith("/o/userinfo") + assert mgr.idam_host in mgr.token_url + + +# --------------------------------------------------------------------------- +# IDAMTokenManager._needs_refresh +# --------------------------------------------------------------------------- + +def test_needs_refresh_when_token_is_none(): + mgr = make_idam_manager() + assert mgr._needs_refresh() is True + + +def test_needs_refresh_when_expiration_is_none(): + mgr = make_idam_manager() + mgr._token = "tok" + assert mgr._needs_refresh() is True + + +def test_needs_refresh_when_token_is_past_expiry(): + mgr = make_idam_manager(skew=0) + mgr._token = "tok" + mgr._expiration_time = datetime.now(timezone.utc) - timedelta(seconds=1) + assert mgr._needs_refresh() is True + + +def test_needs_refresh_when_within_skew_window(): + mgr = make_idam_manager(skew=3600) + mgr._token = "tok" + # Expires in 30 minutes, but skew is 1 hour β†’ still needs refresh + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(minutes=30) + assert mgr._needs_refresh() is True + + +def test_no_refresh_needed_when_token_is_valid(): + mgr = make_idam_manager(skew=10) + mgr._token = "tok" + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(hours=8) + assert mgr._needs_refresh() is False + + +# --------------------------------------------------------------------------- +# IDAMTokenManager._fetch_uid +# --------------------------------------------------------------------------- + +def test_fetch_uid_success(): + mgr = make_idam_manager() + with patch("requests.get", return_value=mock_uid_response("user-abc")): + uid = mgr._fetch_uid("some-idam-token") + assert uid == "user-abc" + + +def test_fetch_uid_uses_bearer_token_header(): + mgr = make_idam_manager() + with patch("requests.get", return_value=mock_uid_response()) as mock_get: + mgr._fetch_uid("my-token") + _, kwargs = mock_get.call_args + assert kwargs["headers"]["Authorization"] == "Bearer my-token" + + +def test_fetch_uid_raises_on_invalid_json(): + mgr = make_idam_manager() + bad_resp = MagicMock() + bad_resp.json.side_effect = ValueError("No JSON object") + bad_resp.text = "Not JSON" + with patch("requests.get", return_value=bad_resp): + with pytest.raises(RuntimeError, match="valid JSON"): + mgr._fetch_uid("some-token") + + +def test_fetch_uid_raises_when_uid_missing(): + mgr = make_idam_manager() + resp = MagicMock() + resp.json.return_value = {"sub": "something-else"} + with patch("requests.get", return_value=resp): + with pytest.raises(RuntimeError, match="UID missing"): + mgr._fetch_uid("some-token") + + +# --------------------------------------------------------------------------- +# IDAMTokenManager._fetch_token +# --------------------------------------------------------------------------- + +def test_fetch_token_success_returns_token_expiry_and_uid(): + mgr = make_idam_manager() + with patch("requests.post", return_value=mock_token_response(access_token="new-tok", expires_in=3600)), \ + patch("requests.get", return_value=mock_uid_response("uid-999")): + token, expiry, uid = mgr._fetch_token() + + assert token == "new-tok" + assert uid == "uid-999" + assert expiry > datetime.now(timezone.utc) + + +def test_fetch_token_posts_to_token_url(): + mgr = make_idam_manager() + with patch("requests.post", return_value=mock_token_response()) as mock_post, \ + patch("requests.get", return_value=mock_uid_response()): + mgr._fetch_token() + + assert mock_post.call_args.args[0] == mgr.token_url + + +def test_fetch_token_posts_password_grant_data(): + mgr = make_idam_manager() + with patch("requests.post", return_value=mock_token_response()) as mock_post, \ + patch("requests.get", return_value=mock_uid_response()): + mgr._fetch_token() + + data = mock_post.call_args.kwargs["data"] + assert data["grant_type"] == "password" + assert data["client_id"] == mgr.client_id + assert data["client_secret"] == mgr.client_secret + assert data["username"] == mgr.username + assert data["password"] == mgr.password + + +def test_fetch_token_raises_on_non_200(): + mgr = make_idam_manager() + bad_resp = MagicMock() + bad_resp.status_code = 401 + bad_resp.text = "Unauthorized" + with patch("requests.post", return_value=bad_resp): + with pytest.raises(RuntimeError, match="Token request failed"): + mgr._fetch_token() + + +def test_fetch_token_raises_when_access_token_missing(): + mgr = make_idam_manager() + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"expires_in": 3600} # no access_token + with patch("requests.post", return_value=resp): + with pytest.raises(RuntimeError, match="Invalid token response"): + mgr._fetch_token() + + +def test_fetch_token_raises_when_expires_in_missing(): + mgr = make_idam_manager() + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"access_token": "tok"} # no expires_in + with patch("requests.post", return_value=resp): + with pytest.raises(RuntimeError, match="Invalid token response"): + mgr._fetch_token() + + +# --------------------------------------------------------------------------- +# IDAMTokenManager.get_token +# --------------------------------------------------------------------------- + +def test_get_token_returns_cached_when_valid(): + mgr = make_idam_manager(skew=10) + mgr._token = "cached-tok" + mgr._uid = "cached-uid" + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(hours=8) + + token, uid = mgr.get_token() + + assert token == "cached-tok" + assert uid == "cached-uid" + + +def test_get_token_refreshes_when_expired(): + mgr = make_idam_manager(skew=0) + mgr._token = "old-tok" + mgr._uid = "old-uid" + mgr._expiration_time = datetime.now(timezone.utc) - timedelta(seconds=1) + + with patch("requests.post", return_value=mock_token_response(access_token="fresh-tok")), \ + patch("requests.get", return_value=mock_uid_response("fresh-uid")): + token, uid = mgr.get_token() + + assert token == "fresh-tok" + assert uid == "fresh-uid" + + +def test_get_token_does_not_call_fetch_when_valid(): + mgr = make_idam_manager(skew=10) + mgr._token = "valid-tok" + mgr._uid = "valid-uid" + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(hours=8) + + with patch.object(mgr, "_fetch_token") as mock_fetch: + mgr.get_token() + + mock_fetch.assert_not_called() + + +# --------------------------------------------------------------------------- +# IDAMTokenManager.invalidate +# --------------------------------------------------------------------------- + +def test_invalidate_clears_token_uid_and_expiry(): + mgr = make_idam_manager(skew=10) + mgr._token = "tok" + mgr._uid = "uid" + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(hours=1) + + mgr.invalidate() + + assert mgr._token is None + assert mgr._uid is None + assert mgr._expiration_time is None + + +def test_invalidate_causes_next_get_token_to_refresh(): + mgr = make_idam_manager(skew=10) + mgr._token = "tok" + mgr._uid = "uid" + mgr._expiration_time = datetime.now(timezone.utc) + timedelta(hours=8) + + assert mgr._needs_refresh() is False + mgr.invalidate() + assert mgr._needs_refresh() is True + + +# --------------------------------------------------------------------------- +# S2S_Manager.__init__ +# --------------------------------------------------------------------------- + +def test_s2s_init_sbox_sets_correct_host(): + mgr = make_s2s_manager(env="sbox") + assert "rpe-service-auth-provider-aat" in mgr.s2s_host + + +def test_s2s_init_fetches_s2s_secret(): + with patch(f"{MODULE}.DefaultAzureCredential"), \ + patch(f"{MODULE}.SecretClient") as mock_kv_cls: + kv = make_kv_client() + mock_kv_cls.return_value = kv + S2S_Manager(env="sbox", skew=21) + + fetched = [c.args[0] for c in kv.get_secret.call_args_list] + assert "s2s-secret" in fetched + + +def test_s2s_init_stores_secret_on_instance(): + mgr = make_s2s_manager(**{"s2s-secret": "MY_SECRET_VALUE"}) + assert mgr._s2s_secret == "MY_SECRET_VALUE" + + +def test_s2s_init_token_state_is_none(): + mgr = make_s2s_manager() + assert mgr._s2s_token is None + assert mgr.expire_time is None + + +def test_s2s_init_sets_lease_url(): + mgr = make_s2s_manager(env="sbox") + assert mgr.url == f"{mgr.s2s_host}/lease" + + +def test_s2s_init_microservice_is_iac(): + mgr = make_s2s_manager() + assert mgr.s2s_microservice == "iac" + + +# --------------------------------------------------------------------------- +# S2S_Manager._fetch_s2s_token +# --------------------------------------------------------------------------- + +def test_fetch_s2s_token_success_returns_token(): + mgr = make_s2s_manager(skew=21) + resp = MagicMock() + resp.status_code = 200 + resp.text = " s2s-jwt-value " + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "123456" + token = mgr._fetch_s2s_token() + + assert token == "s2s-jwt-value" + assert mgr._s2s_token == "s2s-jwt-value" + + +def test_fetch_s2s_token_sets_expire_time(): + mgr = make_s2s_manager(skew=21) + resp = MagicMock() + resp.status_code = 200 + resp.text = "tok" + before = datetime.now(timezone.utc) + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "000000" + mgr._fetch_s2s_token() + + assert mgr.expire_time is not None + assert mgr.expire_time > before + + +def test_fetch_s2s_token_posts_correct_microservice_and_otp(): + mgr = make_s2s_manager() + resp = MagicMock() + resp.status_code = 200 + resp.text = "tok" + with patch("requests.post", return_value=resp) as mock_post, \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "777888" + mgr._fetch_s2s_token() + + body = mock_post.call_args.kwargs["json"] + assert body["microservice"] == "iac" + assert body["oneTimePassword"] == "777888" + + +def test_fetch_s2s_token_uses_s2s_secret_for_otp(): + mgr = make_s2s_manager(**{"s2s-secret": "TESTSECRET123456"}) + mgr._s2s_secret = "TESTSECRET123456" + resp = MagicMock() + resp.status_code = 200 + resp.text = "tok" + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "111111" + mgr._fetch_s2s_token() + + mock_totp.assert_called_once_with("TESTSECRET123456") + + +def test_fetch_s2s_token_raises_eoferror_on_network_error(): + mgr = make_s2s_manager() + with patch("requests.post", side_effect=Exception("Connection timeout")), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "123456" + with pytest.raises(EOFError, match="Error reuesting service to service token"): + mgr._fetch_s2s_token() + + +def test_fetch_s2s_token_raises_runtime_error_on_non_200(): + mgr = make_s2s_manager() + resp = MagicMock() + resp.status_code = 403 + resp.text = "Forbidden" + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "123456" + with pytest.raises(RuntimeError, match="Error requesting service to service token"): + mgr._fetch_s2s_token() + + +# --------------------------------------------------------------------------- +# S2S_Manager.get_token +# --------------------------------------------------------------------------- + +def test_s2s_get_token_fetches_when_no_cached_token(): + mgr = make_s2s_manager(skew=21) + resp = MagicMock() + resp.status_code = 200 + resp.text = "brand-new-s2s" + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "000000" + token = mgr.get_token() + + assert token == "brand-new-s2s" + + +def test_s2s_get_token_returns_cached_when_not_expired(): + mgr = make_s2s_manager(skew=21) + mgr._s2s_token = "cached-s2s" + mgr.expire_time = datetime.now(timezone.utc) + timedelta(hours=1) + + token = mgr.get_token() + + assert token == "cached-s2s" + + +def test_s2s_get_token_refreshes_when_expired(): + mgr = make_s2s_manager(skew=21) + mgr._s2s_token = "old-s2s" + mgr.expire_time = datetime.now(timezone.utc) - timedelta(seconds=1) + + resp = MagicMock() + resp.status_code = 200 + resp.text = "refreshed-s2s" + with patch("requests.post", return_value=resp), \ + patch(f"{MODULE}.pyotp.TOTP") as mock_totp: + mock_totp.return_value.now.return_value = "999999" + token = mgr.get_token() + + assert token == "refreshed-s2s" + + +def test_s2s_get_token_does_not_call_fetch_when_valid(): + mgr = make_s2s_manager(skew=21) + mgr._s2s_token = "valid-s2s" + mgr.expire_time = datetime.now(timezone.utc) + timedelta(hours=1) + + with patch.object(mgr, "_fetch_s2s_token") as mock_fetch: + mgr.get_token() + + mock_fetch.assert_not_called() diff --git a/tests/active/cdamFunctionApp/retry_decorator_test.py b/tests/active/cdamFunctionApp/retry_decorator_test.py new file mode 100644 index 00000000..067f8845 --- /dev/null +++ b/tests/active/cdamFunctionApp/retry_decorator_test.py @@ -0,0 +1,198 @@ +import pytest +from unittest.mock import patch + +from AzureFunctions.ACTIVE.active_cdam.retry_decorator import retry_on_result + +SLEEP_PATH = "AzureFunctions.ACTIVE.active_cdam.retry_decorator.time.sleep" + +RETRYABLE = lambda r: isinstance(r, dict) and r.get("Status") == "ERROR" + + +def make_results_fn(*results): + """Return a function that yields successive return values.""" + values = list(results) + + def fn(*args, **kwargs): + fn.call_count += 1 + return values.pop(0) if values else None + + fn.call_count = 0 + fn.__name__ = "fn" + fn.__doc__ = None + return fn + + +# --------------------------------------------------------------------------- +# Basic behaviour +# --------------------------------------------------------------------------- + +def test_success_on_first_attempt_no_sleep(): + """Function returns a non-retryable result first try β€” no retries, no sleep.""" + fn = make_results_fn("ok") + decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + result = decorated() + + assert result == "ok" + assert fn.call_count == 1 + mock_sleep.assert_not_called() + + +def test_no_retry_when_retry_on_is_none(): + """With retry_on=None, an ERROR result is returned immediately without retrying.""" + fn = make_results_fn({"Status": "ERROR"}) + decorated = retry_on_result(max_retries=3, retry_on=None)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + result = decorated() + + assert result == {"Status": "ERROR"} + assert fn.call_count == 1 + mock_sleep.assert_not_called() + + +def test_retries_on_error_result_then_succeeds(): + """Function returns ERROR twice then SUCCESS; called 3 times total.""" + fn = make_results_fn({"Status": "ERROR"}, {"Status": "ERROR"}, {"Status": "SUCCESS"}) + decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) + + with patch(SLEEP_PATH): + result = decorated() + + assert result == {"Status": "SUCCESS"} + assert fn.call_count == 3 + + +def test_returns_last_error_result_after_all_retries_exhausted(): + """After max_retries+1 attempts all returning ERROR, the last result is returned.""" + fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(3)]) + decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, jitter=False)(fn) + + with patch(SLEEP_PATH): + result = decorated() + + assert result["Status"] == "ERROR" + assert fn.call_count == 3 + + +def test_total_call_count_equals_max_retries_plus_one(): + """With max_retries=N, function is called N+1 times total.""" + fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(5)]) + decorated = retry_on_result(max_retries=4, retry_on=RETRYABLE, jitter=False)(fn) + + with patch(SLEEP_PATH): + decorated() + + assert fn.call_count == 5 + + +def test_zero_retries_returns_error_result_immediately(): + """max_retries=0 means no retries; ERROR result returned after one attempt.""" + fn = make_results_fn({"Status": "ERROR"}) + decorated = retry_on_result(max_retries=0, retry_on=RETRYABLE)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + result = decorated() + + assert result["Status"] == "ERROR" + assert fn.call_count == 1 + mock_sleep.assert_not_called() + + +def test_exceptions_are_not_caught(): + """Exceptions raised inside the wrapped function propagate immediately.""" + def fn(): + raise ValueError("boom") + + decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) + + with patch(SLEEP_PATH): + with pytest.raises(ValueError, match="boom"): + decorated() + + +# --------------------------------------------------------------------------- +# Sleep / delay behaviour +# --------------------------------------------------------------------------- + +def test_no_sleep_after_final_failed_attempt(): + """sleep() is not called after the last failed attempt.""" + fn = make_results_fn(*[{"Status": "ERROR"}] * 3) + decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + decorated() + + # 3 attempts β†’ sleep after attempt 0 and 1 only + assert mock_sleep.call_count == 2 + + +def test_exponential_backoff_delay_without_jitter(): + """Delay doubles each attempt: base, 2*base, 4*base...""" + fn = make_results_fn(*[{"Status": "ERROR"}] * 4) + decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, max_delay=999, jitter=False)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + decorated() + + actual_delays = [c.args[0] for c in mock_sleep.call_args_list] + assert actual_delays == [1.0, 2.0, 4.0] + + +def test_delay_capped_at_max_delay(): + """Delay never exceeds max_delay.""" + fn = make_results_fn(*[{"Status": "ERROR"}] * 6) + decorated = retry_on_result(max_retries=5, retry_on=RETRYABLE, base_delay=10.0, max_delay=15.0, jitter=False)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + decorated() + + for c in mock_sleep.call_args_list: + assert c.args[0] <= 15.0 + + +def test_jitter_delay_within_expected_range(): + """With jitter=True, sleep delay is between 50% and 100% of computed value.""" + fn = make_results_fn({"Status": "ERROR"}, {"Status": "SUCCESS"}) + decorated = retry_on_result(max_retries=1, retry_on=RETRYABLE, base_delay=4.0, max_delay=999, jitter=True)(fn) + + with patch(SLEEP_PATH) as mock_sleep: + decorated() + + actual_delay = mock_sleep.call_args_list[0].args[0] + # base_delay * 2^0 = 4.0; jitter range: [2.0, 4.0] + assert 2.0 <= actual_delay <= 4.0 + + +# --------------------------------------------------------------------------- +# Argument forwarding & metadata +# --------------------------------------------------------------------------- + +def test_passes_args_and_kwargs_to_wrapped_function(): + """Positional and keyword arguments are forwarded correctly.""" + received = {} + + def fn(*args, **kwargs): + received["args"] = args + received["kwargs"] = kwargs + return "result" + + decorated = retry_on_result()(fn) + + with patch(SLEEP_PATH): + result = decorated(1, 2, key="val") + + assert result == "result" + assert received["args"] == (1, 2) + assert received["kwargs"] == {"key": "val"} + + +def test_preserves_function_name_and_docstring(): + """@wraps correctly preserves __name__ and __doc__.""" + @retry_on_result() + def my_func(): + """My docstring.""" + + assert my_func.__name__ == "my_func" + assert my_func.__doc__ == "My docstring." diff --git a/tests/active/functionApp/ccdFunctions_test.py b/tests/active/functionApp/ccdFunctions_test.py index 59d09619..89e2552e 100644 --- a/tests/active/functionApp/ccdFunctions_test.py +++ b/tests/active/functionApp/ccdFunctions_test.py @@ -255,3 +255,141 @@ def test_start_failure_returns_error(self): assert "StartResponse" not in result mock_validate.assert_not_called() mock_submit.assert_not_called() + + def test_start_returns_none_returns_error(self): + with ( + patch(PATCH_IDAM), + patch(PATCH_S2S) as mock_s2s_cls, + patch(PATCH_START, return_value=None), + patch(PATCH_VALIDATE) as mock_validate, + patch(PATCH_SUBMIT) as mock_submit, + patch(PATCH_IDAM_MGR) as mock_idam_mgr, + ): + mock_idam_mgr.get_token.return_value = ("idam-token", "uid-123") + mock_s2s_inst = Mock() + mock_s2s_inst.get_token.return_value = "s2s-token" + mock_s2s_cls.return_value = mock_s2s_inst + + from AzureFunctions.ACTIVE.active_ccd.ccdFunctions import process_case + + result = process_case( + env="sbox", + caseNo="CASE-006", + payloadData={}, + runId="run-6", + state="appealSubmitted", + PR_REFERENCE="pr-123", + ) + + assert result["Status"] == "ERROR" + assert "No response from API" in result["Error"] + mock_validate.assert_not_called() + mock_submit.assert_not_called() + + +class TestProcessCaseTokenFailures: + """process_case returns an error result when token acquisition fails.""" + + def test_idam_token_failure_returns_error(self): + with ( + patch(PATCH_IDAM), + patch(PATCH_S2S), + patch(PATCH_IDAM_MGR) as mock_idam_mgr, + ): + mock_idam_mgr.get_token.side_effect = Exception("IDAM unreachable") + + from AzureFunctions.ACTIVE.active_ccd.ccdFunctions import process_case + + result = process_case( + env="sbox", + caseNo="CASE-007", + payloadData={}, + runId="run-7", + state="appealSubmitted", + PR_REFERENCE="pr-123", + ) + + assert result["Status"] == "ERROR" + assert "IDAM" in result["Error"] + + def test_s2s_token_failure_returns_error(self): + with ( + patch(PATCH_IDAM), + patch(PATCH_S2S) as mock_s2s_cls, + patch(PATCH_IDAM_MGR) as mock_idam_mgr, + ): + mock_idam_mgr.get_token.return_value = ("idam-token", "uid-123") + mock_s2s_cls.return_value.get_token.side_effect = Exception("S2S unreachable") + + from AzureFunctions.ACTIVE.active_ccd.ccdFunctions import process_case + + result = process_case( + env="sbox", + caseNo="CASE-008", + payloadData={}, + runId="run-8", + state="appealSubmitted", + PR_REFERENCE="pr-123", + ) + + assert result["Status"] == "ERROR" + assert "s2s" in result["Error"] + + +class TestProcessCaseInvalidEnv: + """process_case raises ValueError for unknown environments.""" + + def test_invalid_env_raises_value_error(self): + with ( + patch(PATCH_IDAM), + patch(PATCH_S2S) as mock_s2s_cls, + patch(PATCH_IDAM_MGR) as mock_idam_mgr, + ): + mock_idam_mgr.get_token.return_value = ("idam-token", "uid-123") + mock_s2s_inst = Mock() + mock_s2s_inst.get_token.return_value = "s2s-token" + mock_s2s_cls.return_value = mock_s2s_inst + + from AzureFunctions.ACTIVE.active_ccd.ccdFunctions import process_case + + with pytest.raises(ValueError, match="Invalid environment"): + process_case( + env="unknown", + caseNo="CASE-009", + payloadData={}, + runId="run-9", + state="appealSubmitted", + PR_REFERENCE="pr-123", + ) + + +class TestProcessCaseSubmitNone: + """process_case returns an error result when submit_case returns None.""" + + def test_submit_returns_none_returns_error(self): + with ( + patch(PATCH_IDAM), + patch(PATCH_S2S) as mock_s2s_cls, + patch(PATCH_START, return_value=START_RESPONSE), + patch(PATCH_VALIDATE, return_value=VALIDATE_RESPONSE), + patch(PATCH_SUBMIT, return_value=None), + patch(PATCH_IDAM_MGR) as mock_idam_mgr, + ): + mock_idam_mgr.get_token.return_value = ("idam-token", "uid-123") + mock_s2s_inst = Mock() + mock_s2s_inst.get_token.return_value = "s2s-token" + mock_s2s_cls.return_value = mock_s2s_inst + + from AzureFunctions.ACTIVE.active_ccd.ccdFunctions import process_case + + result = process_case( + env="sbox", + caseNo="CASE-010", + payloadData={"appealReferenceNumber": "HU/010/2024"}, + runId="run-10", + state="appealSubmitted", + PR_REFERENCE="pr-123", + ) + + assert result["Status"] == "ERROR" + assert "No response from API" in result["Error"]