Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,25 @@
from converter.cisu.resources_info.resources_info_cisu_constants import (
ResourcesInfoCISUConstants,
)
from converter.repositories.message_repository import get_last_rc_ri_by_case_id
from converter.cisu.resources_info.resources_info_cisu_helper import (
enrich_from_persisted,
)
from converter.repositories.message_repository import (
get_last_rc_ri_by_case_id,
get_rs_messages_by_case_id,
)
from converter.utils import get_field_value, set_value, delete_paths
import logging

logger = logging.getLogger(__name__)


class ConversionError(ValueError):
def __init__(self, message):
self.message = message
super().__init__(self.message)


class ResourceUpdateResult(TypedDict):
engaged_resources_updated: bool
modified_status_resources: List[Dict[str, Any]]
Expand All @@ -27,6 +39,11 @@ def get_rs_message_type(cls) -> str:
def get_cisu_message_type(cls) -> str:
return "resourcesInfoCisu"

@classmethod
def _get_latest_state(cls, states: list[dict]) -> dict:
"""Return the state with the most recent datetime from a list of states."""
return sorted(states, key=lambda x: x.get("datetime", ""))[-1]

@classmethod
def _build_rs_ri_from_cisu(cls, edxl_json: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a RC-RI to a RS-RI, removing the position field from each resource."""
Expand Down Expand Up @@ -202,16 +219,41 @@ def from_cisu_to_rs(cls, edxl_json: Dict[str, Any]) -> List[Dict[str, Any]]:

@classmethod
def from_rs_to_cisu(cls, edxl_json: Dict[str, Any]) -> Dict[str, Any]:
"""
Mother function that takes a RS-RI message and returns a RC-RI message
This functions manages:
- taking the RS-RI as param
- fetches potential peristed RS-SR in the db
- merges different resources (following métier rules)
- makes the conversion from rs to cisu
- returns the converted message
"""

logger.info("Converting from RS to CISU format for Resources Info message.")
logger.debug(f"Message content: {edxl_json}")
output_json = cls.copy_rs_input_content(edxl_json)
output_use_case_json = cls.copy_rs_input_use_case_content(edxl_json)

case_id = get_field_value(output_use_case_json, "caseId")
_, persisted_rs_sr = get_rs_messages_by_case_id(case_id)
rs_sr_contents = [
pm.payload["content"][0]["jsonContent"]["embeddedJsonContent"]["message"][
"resourcesStatus"
]
for pm in persisted_rs_sr
]
enriched = enrich_from_persisted(output_use_case_json, rs_sr_contents)

return cls.convert_single_rs_ri(output_json, enriched)

@classmethod
def convert_single_rs_ri(
cls, output_json: Dict[str, Any], output_use_case_json: Dict[str, Any]
):
resources = get_field_value(
output_use_case_json, ResourcesInfoCISUConstants.RESOURCE_PATH
)

converted_resources = cls.convert_resources_to_cisu(resources)
converted_resources = cls._convert_resources_to_cisu(resources)

if len(converted_resources) < 1:
raise ValueError(
Expand All @@ -228,26 +270,24 @@ def from_rs_to_cisu(cls, edxl_json: Dict[str, Any]) -> Dict[str, Any]:
return cls.format_cisu_output_json(output_json, output_use_case_json)

@classmethod
def convert_resources_to_cisu(
def _convert_resources_to_cisu(
cls, resources: list[Dict[str, Any]]
) -> list[Dict[str, Any]]:
converted_resources = []

for index, resource in enumerate(resources):
logger.debug(f"Processing resource: {resource}")
rs_vehicle_type = get_field_value(
resource, ResourcesInfoCISUConstants.VEHICLE_TYPE_PATH
)

cisu_vehicle_type = cls.translate_to_cisu_vehicle_type(rs_vehicle_type)
logger.debug("Processing resource: {resource}")

if not cisu_vehicle_type: # if we couldn't map the vehicleType on a SIS known type, we continue to filter the whole resource out
vehicle_type = cls.translate_to_cisu_vehicle_type(
get_field_value(resource, ResourcesInfoCISUConstants.VEHICLE_TYPE_PATH)
)
if vehicle_type is None:
continue

set_value(
resource,
ResourcesInfoCISUConstants.VEHICLE_TYPE_PATH,
cisu_vehicle_type,
vehicle_type,
)

current_resource_path = (
Expand All @@ -261,33 +301,31 @@ def convert_resources_to_cisu(
current_state_path,
)
cls.keep_last_state(resource)

delete_paths(resource, [ResourcesInfoCISUConstants.PATIENT_ID_KEY])

converted_resources.append(resource)

return converted_resources

@classmethod
def translate_to_cisu_vehicle_type(cls, rs_vehicle_type: str) -> str | None:
"""Translate a RS vehicle type to its CISU equivalent, or None if not mappable."""
if rs_vehicle_type.startswith(ResourcesInfoCISUConstants.VEHICLE_TYPE_SIS):
return ResourcesInfoCISUConstants.VEHICLE_TYPE_SIS
elif rs_vehicle_type.startswith(ResourcesInfoCISUConstants.VEHICLE_TYPE_SMUR):
if rs_vehicle_type.startswith(ResourcesInfoCISUConstants.VEHICLE_TYPE_SMUR):
return ResourcesInfoCISUConstants.VEHICLE_TYPE_SMUR
else:
logger.info(
"Removing resource because vehicleType '%s' is not supported",
rs_vehicle_type,
)
return None
logger.info("vehicleType '%s' is not mappable to CISU", rs_vehicle_type)
return None

@classmethod
def keep_last_state(cls, resource: Dict[str, Any]) -> None:
states = get_field_value(resource, ResourcesInfoCISUConstants.STATE_PATH)
if not states or len(states) == 0:
raise ValueError(
raise ConversionError(
"No states found in resource, mandatory for CISU conversion."
)

latest_state = sorted(states, key=lambda x: x.get("datetime", ""))[-1]
set_value(resource, ResourcesInfoCISUConstants.STATE_PATH, latest_state)
set_value(
resource,
ResourcesInfoCISUConstants.STATE_PATH,
cls._get_latest_state(states),
)
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
def merge_info_and_resources(
resources: list[dict],
resources_status_list: list[dict],
) -> list[dict]:
"""
Enrichit une liste de resources avec les états provenant des resources_status.
from converter.utils import get_field_value, set_value
from converter.cisu.resources_info.resources_info_cisu_constants import (
ResourcesInfoCISUConstants,
)

Args:
resources: liste de resources (issues du RS-RI déjà extraites)
resources_status_list: liste de resourceStatus (issues des RS-SR déjà extraites)

Returns:
- resources enrichies si un resource_status a été trouvé ; resource inchangée dans le cas contraire
"""
def enrich_from_persisted(rs_ri: dict, rs_sr_list: list[dict]) -> dict:
"""Enrich RS-RI resources with state from persisted RS-SR messages (in-place + return)."""
if not rs_sr_list:
return rs_ri

resource_state_by_resource_id: dict[str, dict] = {}

for resource_status in resources_status_list:
resource_id = resource_status.get("resourceId")
state = resource_status.get("state")

if resource_id is not None and state is not None:
resource_state_by_resource_id[resource_id] = state
resources = get_field_value(rs_ri, ResourcesInfoCISUConstants.RESOURCE_PATH)
sr_by_resource_id = {sr.get("resourceId"): sr for sr in rs_sr_list}

for resource in resources:
resource_id = resource.get("resourceId")
if isinstance(resource_id, str):
rs_sr_state = resource_state_by_resource_id.get(resource_id)
persisted_sr = sr_by_resource_id.get(resource_id)
if persisted_sr is None:
continue

persisted_state = persisted_sr.get("state")
if persisted_state is None:
continue

current_states = get_field_value(
resource, ResourcesInfoCISUConstants.STATE_PATH
)
if current_states is None:
set_value(
resource,
ResourcesInfoCISUConstants.STATE_PATH,
[persisted_state],
)
else:
rs_sr_state = None

# override or set state from RS-SR
if rs_sr_state is not None:
resource["state"] = [
rs_sr_state
] # we override the RS-RI state array by a single array with last state only
if persisted_state not in current_states:
current_states.append(persisted_state)

return resources
return rs_ri
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from converter.cisu.base_cisu_converter import BaseCISUConverter
from converter.repositories.message_repository import (
get_last_rs_ri_by_case_id,
get_last_rs_sr_per_resource_by_case_id,
get_rs_messages_by_case_id,
)
from converter.cisu.resources_info.resources_info_cisu_helper import (
merge_info_and_resources,
enrich_from_persisted,
)
from converter.cisu.resources_info.resources_info_cisu_converter import (
ResourcesInfoCISUConverter,
)
from converter.cisu.resources_status.resources_status_constants import (
ResourcesStatusConstants,
)
from converter.cisu.resources_info.resources_info_cisu_constants import (
ResourcesInfoCISUConstants,
)

from typing import Any, Dict

from converter.utils import get_field_value, set_value
from converter.utils import get_field_value


class ResourcesStatusConverter(BaseCISUConverter):
Expand All @@ -37,31 +32,24 @@ def from_rs_to_cisu(
content = cls.copy_rs_input_use_case_content(edxl_json)
case_id = get_field_value(content, ResourcesStatusConstants.CASE_ID)

persisted_rs_ri = get_last_rs_ri_by_case_id(case_id)
if persisted_rs_ri is None: # No RS-RI persisted yet, we return an empty list
rs_ri_msg, persisted_rs_sr = get_rs_messages_by_case_id(case_id)
if rs_ri_msg is None:
return []

persisted_rs_sr_list = get_last_rs_sr_per_resource_by_case_id(case_id)
rs_ri = rs_ri_msg.payload

rs_ri = persisted_rs_ri.payload
rs_ri_content = ResourcesInfoCISUConverter.copy_rs_input_use_case_content(rs_ri)
rs_sr_content_list = [
cls.copy_rs_input_use_case_content(msg.payload)
for msg in persisted_rs_sr_list
rs_sr_contents = [
pm.payload["content"][0]["jsonContent"]["embeddedJsonContent"]["message"][
"resourcesStatus"
]
for pm in persisted_rs_sr
]
rs_sr_contents.append(content)

# merge RS-SRs in RS-RI
resources = get_field_value(
rs_ri_content, ResourcesInfoCISUConstants.RESOURCE_PATH
)

merged_resources = merge_info_and_resources(resources, rs_sr_content_list)

set_value(
rs_ri_content, ResourcesInfoCISUConstants.RESOURCE_PATH, merged_resources
)
merged_rs_ri = ResourcesInfoCISUConverter.format_rs_output_json(
rs_ri, rs_ri_content
output_json = ResourcesInfoCISUConverter.copy_rs_input_content(rs_ri)
rs_ri_use_case = ResourcesInfoCISUConverter.copy_rs_input_use_case_content(
rs_ri
)
enriched = enrich_from_persisted(rs_ri_use_case, rs_sr_contents)

return ResourcesInfoCISUConverter.from_rs_to_cisu(merged_rs_ri)
return ResourcesInfoCISUConverter.convert_single_rs_ri(output_json, enriched)
Loading
Loading