From 2e509f671770b2eee26f938d833ed56bedf64fad Mon Sep 17 00:00:00 2001 From: Michael Gerdemann Date: Thu, 10 Oct 2024 11:11:28 +0200 Subject: [PATCH] fix(ocpp): Make websocket connection persistent --- packages/control/ocpp.py | 126 ++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 73 deletions(-) diff --git a/packages/control/ocpp.py b/packages/control/ocpp.py index 64a565de98..200692b9c4 100644 --- a/packages/control/ocpp.py +++ b/packages/control/ocpp.py @@ -10,43 +10,38 @@ from control.optional_data import OptionalProtocol from modules.common.fault_state import FaultState - log = logging.getLogger(__name__) - class OcppMixin: + _ws: Optional[websockets.WebSocketClientProtocol] = None + def _get_formatted_time(self: OptionalProtocol) -> str: return datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") - def _process_call(self: OptionalProtocol, - chargebox_id: str, - fault_state: FaultState, - func: Callable) -> Optional[websockets.WebSocketClientProtocol]: - async def make_call() -> websockets.WebSocketClientProtocol: - async with websockets.connect(self.data.ocpp.url+chargebox_id, - subprotocols=[self.data.ocpp.version]) as ws: - try: - cp = OcppChargepoint(chargebox_id, ws, 2) - await cp.call(func) - except asyncio.exceptions.TimeoutError: - # log.exception("Erwarteter TimeOut StartTransaction") - pass - return ws - try: - if self.data.ocpp.active and chargebox_id: - return asyncio.run(make_call()) - except websockets.exceptions.InvalidStatusCode: - fault_state.warning(f"Chargebox ID {chargebox_id} konnte nicht im OCPP-Backend gefunden werden oder " - "URL des Backends ist falsch.") - return None + async def _get_connection(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> Optional[websockets.WebSocketClientProtocol]: + if self._ws is None or not self._ws.open: + try: + self._ws = await websockets.connect( + self.data.ocpp.url + chargebox_id, + subprotocols=[self.data.ocpp.version] + ) + except websockets.exceptions.InvalidStatusCode: + fault_state.warning(f"Chargebox ID {chargebox_id} konnte nicht im OCPP-Backend gefunden werden oder URL des Backends ist falsch.") + self._ws = None + return self._ws + + async def _process_call(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, func: Callable): + ws = await self._get_connection(chargebox_id, fault_state) + if ws: + try: + cp = OcppChargepoint(chargebox_id, ws, 2) + await cp.call(func) + except asyncio.exceptions.TimeoutError: + pass - def boot_notification(self: OptionalProtocol, - chargebox_id: str, - fault_state: FaultState, - model: str, - serial_number: str) -> Optional[int]: + async def boot_notification(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, model: str, serial_number: str) -> Optional[int]: try: - self._process_call(chargebox_id, fault_state, call.BootNotification( + await self._process_call(chargebox_id, fault_state, call.BootNotification( charge_point_model=model, charge_point_vendor="openWB", firmware_version=data.data.system_data["system"].data["version"], @@ -55,72 +50,57 @@ def boot_notification(self: OptionalProtocol, except Exception as e: fault_state.from_exception(e) - def start_transaction(self: OptionalProtocol, - chargebox_id: str, - fault_state: FaultState, - connector_id: int, - id_tag: str, - imported: int) -> Optional[int]: + async def start_transaction(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, connector_id: int, id_tag: str, imported: int) -> Optional[int]: try: - ws = self._process_call(chargebox_id, fault_state, call.StartTransaction( + await self._process_call(chargebox_id, fault_state, call.StartTransaction( connector_id=connector_id, id_tag=id_tag if id_tag else "", meter_start=int(imported), timestamp=self._get_formatted_time() )) - if ws: - tansaction_id = json.loads(ws.messages[0])[2]["transactionId"] - log.debug(f"Transaction ID: {tansaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und " - f"Zählerstand: {imported} erhalten.") - return tansaction_id + if self._ws and self._ws.messages: + transaction_id = json.loads(self._ws.messages[0])[2]["transactionId"] + log.debug(f"Transaction ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und Zählerstand: {imported} erhalten.") + return transaction_id except Exception as e: fault_state.from_exception(e) return None - def transfer_values(self: OptionalProtocol, - chargebox_id: str, - fault_state: FaultState, - connector_id: int, - imported: int) -> None: + async def transfer_values(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, connector_id: int, imported: int) -> None: try: - self._process_call(chargebox_id, fault_state, call.MeterValues( + await self._process_call(chargebox_id, fault_state, call.MeterValues( connector_id=connector_id, - meter_value=[{"timestamp": self._get_formatted_time(), - "sampledValue": [ - { - "value": f'{int(imported)}', - "context": "Sample.Periodic", - "format": "Raw", - "measurand": "Energy.Active.Import.Register", - "unit": "Wh" - }, - ]}], + meter_value=[{ + "timestamp": self._get_formatted_time(), + "sampledValue": [{ + "value": f'{int(imported)}', + "context": "Sample.Periodic", + "format": "Raw", + "measurand": "Energy.Active.Import.Register", + "unit": "Wh" + }] + }] )) log.debug(f"Zählerstand {imported} an Chargebox ID: {chargebox_id} übermittelt.") except Exception as e: fault_state.from_exception(e) - def send_heart_beat(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> None: + async def send_heart_beat(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState) -> None: try: - self._process_call(chargebox_id, fault_state, call.Heartbeat()) + await self._process_call(chargebox_id, fault_state, call.Heartbeat()) log.debug(f"Heartbeat an Chargebox ID: {chargebox_id} gesendet.") except Exception as e: fault_state.from_exception(e) - def stop_transaction(self: OptionalProtocol, - chargebox_id: str, - fault_state: FaultState, - imported: int, - transaction_id: int, - id_tag: str) -> None: + async def stop_transaction(self: OptionalProtocol, chargebox_id: str, fault_state: FaultState, imported: int, transaction_id: int, id_tag: str) -> None: try: - self._process_call(chargebox_id, fault_state, call.StopTransaction(meter_stop=int(imported), - timestamp=self._get_formatted_time(), - transaction_id=transaction_id, - reason="EVDisconnected", - id_tag=id_tag if id_tag else "" - )) - log.debug(f"Transaction mit ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und " - f"Zählerstand: {imported} beendet.") + await self._process_call(chargebox_id, fault_state, call.StopTransaction( + meter_stop=int(imported), + timestamp=self._get_formatted_time(), + transaction_id=transaction_id, + reason="EVDisconnected", + id_tag=id_tag if id_tag else "" + )) + log.debug(f"Transaction mit ID: {transaction_id} für Chargebox ID: {chargebox_id} mit Tag: {id_tag} und Zählerstand: {imported} beendet.") except Exception as e: fault_state.from_exception(e)