From 14c5e152274eced327fdf0ae1dd1f00313f1ee8c Mon Sep 17 00:00:00 2001 From: Fernando Date: Tue, 24 Mar 2026 15:16:38 -0300 Subject: [PATCH 1/9] ENH: (CLTARCH) Remove use of thread to run async methods. this code was developed with the help of Github Copilot. --- siriuspy/siriuspy/clientarch/client.py | 69 ++++++++------------------ 1 file changed, 22 insertions(+), 47 deletions(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index dbd7387d5..ae2cbdb7e 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -11,9 +11,9 @@ import ssl as _ssl import urllib as _urllib from datetime import timedelta as _timedelta -from threading import Thread as _Thread from urllib.parse import quote as _quote +import nest_asyncio import numpy as _np import urllib3 as _urllib3 from aiohttp import ClientSession as _ClientSession @@ -40,7 +40,6 @@ def __init__(self, server_url=None, timeout=None): self.session = None self._timeout = timeout self._url = server_url or self.SERVER_URL - self._ret = None self._request_url = None # print('urllib3 InsecureRequestWarning disabled!') _urllib3.disable_warnings(_urllib3.exceptions.InsecureRequestWarning) @@ -97,13 +96,10 @@ def login(self, username, password): headers = {'User-Agent': 'Mozilla/5.0'} payload = {'username': username, 'password': password} url = self._create_url(method='login') - ret = self._run_async_event_loop( - self._create_session, - url, - headers=headers, - payload=payload, - ssl=False, + coro = self._create_session( + url, headers=headers, payload=payload, ssl=False ) + ret = self._run_sync_coro(coro) if ret is not None: self.session, authenticated = ret if authenticated: @@ -119,7 +115,8 @@ def login(self, username, password): def logout(self): """Close login session.""" if self.session: - resp = self._run_async_event_loop(self._close_session) + coro = self._close_session() + resp = self._run_sync_coro(coro) self.session = None return resp return None @@ -458,13 +455,12 @@ def _process_url_link_args(pvnames, pvoptnrpts, pvcolors, pvusediff): def _make_request(self, url, need_login=False, return_json=False): """Make request.""" self._request_url = url - response = self._run_async_event_loop( - self._handle_request, + coro = self._handle_request( url, return_json=return_json, need_login=need_login, ) - return response + return self._run_sync_coro(coro) def _create_url(self, method, **kwargs): """Create URL.""" @@ -480,40 +476,19 @@ def _create_url(self, method, **kwargs): return url # ---------- async methods ---------- - - def _run_async_event_loop(self, *args, **kwargs): - # NOTE: Run the asyncio commands in a separated Thread to isolate - # their EventLoop from the external environment (important for class - # to work within jupyter notebook environment). - _thread = _Thread( - target=self._thread_run_async_event_loop, - daemon=True, - args=args, - kwargs=kwargs, - ) - _thread.start() - _thread.join() - return self._ret - - def _thread_run_async_event_loop(self, func, *args, **kwargs): - """Get event loop.""" - close = False - try: - loop = _asyncio.get_event_loop() - except RuntimeError as error: - if 'no current event loop' in str(error): - loop = _asyncio.new_event_loop() - _asyncio.set_event_loop(loop) - close = True - else: - raise error + def _run_sync_coro(self, coro): + """Run an async coroutine synchronously, compatible with Jupyter.""" try: - self._ret = loop.run_until_complete(func(*args, **kwargs)) - except _asyncio.TimeoutError: - raise _exceptions.TimeoutError - - if close: - loop.close() + loop = _asyncio.get_running_loop() + try: + return loop.run_until_complete(coro) + except RuntimeError: + # Event loop already running (typical in Jupyter notebooks). + nest_asyncio.apply(loop) + return loop.run_until_complete(coro) + except RuntimeError: + # No running loop, create a new one + return _asyncio.run(coro) async def _handle_request(self, url, return_json=False, need_login=False): """Handle request.""" @@ -562,8 +537,8 @@ async def _get_request_response(self, url, session, return_json): except ValueError: _log.error(f'Error with URL {response.url}') response = None - except _asyncio.TimeoutError as err_msg: - raise _exceptions.TimeoutError(err_msg) + except _asyncio.TimeoutError as err: + raise _exceptions.TimeoutError from err return response async def _create_session(self, url, headers, payload, ssl): From 6299426af03a5ae71017ca67e669dd8a96f56375 Mon Sep 17 00:00:00 2001 From: Fernando Date: Tue, 24 Mar 2026 15:24:50 -0300 Subject: [PATCH 2/9] BUG: update requirements.txt with nest_asyncio. --- siriuspy/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/siriuspy/requirements.txt b/siriuspy/requirements.txt index d7d73e713..3ac3aa082 100644 --- a/siriuspy/requirements.txt +++ b/siriuspy/requirements.txt @@ -1,6 +1,7 @@ aiohttp>=3.7.4 bottleneck>=1.3.2 mathphys +nest_asyncio numpy<=1.23 scipy<=1.13 pyepics>=3.4.0 From eb1a25e1d60f8ec82d9e7cb585bd2e34971e15ac Mon Sep 17 00:00:00 2001 From: Fernando Date: Wed, 25 Mar 2026 11:24:06 -0300 Subject: [PATCH 3/9] STY: (CLTARCH) format code. --- siriuspy/siriuspy/clientarch/client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index ae2cbdb7e..c81645735 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -349,7 +349,7 @@ def gen_archviewer_url_link( time_ref=None, pvoptnrpts=None, pvcolors=None, - pvusediff=False + pvusediff=False, ): """Generate a Archiver Viewer URL for the given PVs. @@ -393,7 +393,8 @@ def gen_archviewer_url_link( # Thanks to Rafael Lyra for the basis of this implementation! archiver_viewer_url = _envars.SRVURL_ARCHIVER_VIEWER + '/?pvConfig=' args = ClientArchiver._process_url_link_args( - pvnames, pvoptnrpts, pvcolors, pvusediff) + pvnames, pvoptnrpts, pvcolors, pvusediff + ) pvoptnrpts, pvcolors, pvusediff = args pv_search = '' for idx in range(len(pvnames)): @@ -456,9 +457,7 @@ def _make_request(self, url, need_login=False, return_json=False): """Make request.""" self._request_url = url coro = self._handle_request( - url, - return_json=return_json, - need_login=need_login, + url, return_json=return_json, need_login=need_login ) return self._run_sync_coro(coro) From 7227a31cf953f2b06ed8157cad90ea832b89ae32 Mon Sep 17 00:00:00 2001 From: Fernando Date: Wed, 25 Mar 2026 11:30:21 -0300 Subject: [PATCH 4/9] MNT: (CLTARCH) Remove nest_asyncio; use thread based method. This code was done with the help of Google AI Mode. Since the nest_asyncio package was archived, we implemented again a solution based on running a different event loop in a new thread however, this thread is long lived and all tasks are submitted using asyncio.run_coroutine_threadsafe function. Which guarantees that exceptions will be passed to the end user and that there will be no infinite hangs. --- siriuspy/requirements.txt | 1 - siriuspy/siriuspy/clientarch/client.py | 91 ++++++++++++++++++++------ 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/siriuspy/requirements.txt b/siriuspy/requirements.txt index 3ac3aa082..d7d73e713 100644 --- a/siriuspy/requirements.txt +++ b/siriuspy/requirements.txt @@ -1,7 +1,6 @@ aiohttp>=3.7.4 bottleneck>=1.3.2 mathphys -nest_asyncio numpy<=1.23 scipy<=1.13 pyepics>=3.4.0 diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index c81645735..cf5c8ce03 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -8,15 +8,15 @@ import asyncio as _asyncio import logging as _log -import ssl as _ssl import urllib as _urllib from datetime import timedelta as _timedelta +from threading import Thread as _Thread from urllib.parse import quote as _quote -import nest_asyncio import numpy as _np import urllib3 as _urllib3 from aiohttp import ClientSession as _ClientSession + try: from lzstring import LZString as _LZString except: @@ -34,6 +34,10 @@ class ClientArchiver: SERVER_URL = _envars.SRVURL_ARCHIVER ENDPOINT = '/mgmt/bpl' + def __delete__(self): + """Turn off thread when deleting.""" + self.shutdown() + def __init__(self, server_url=None, timeout=None): """Initialize.""" timeout = timeout or ClientArchiver.DEFAULT_TIMEOUT @@ -41,17 +45,47 @@ def __init__(self, server_url=None, timeout=None): self._timeout = timeout self._url = server_url or self.SERVER_URL self._request_url = None - # print('urllib3 InsecureRequestWarning disabled!') + self._thread = self._loop = None + self.connect() _urllib3.disable_warnings(_urllib3.exceptions.InsecureRequestWarning) + def connect(self): + """Starts bg. event loop in a separate thread. + + Raises: + RuntimeError: when library is alread connected. + """ + if self._loop_alive(): + return + + self._loop = _asyncio.new_event_loop() + self._thread = _Thread(target=self._run_event_loop, daemon=True) + self._thread.start() + + def shutdown(self, timeout=5): + """Safely stops the bg. loop and waits for the thread to exit.""" + if not self._loop_alive(): + return + + # 1. Cancel all pending tasks in the loop (to avoid ResourceWarnings) + self._loop.call_soon_threadsafe(self._cancel_all_tasks) + + # 2. Schedule the loop to stop processing + self._loop.call_soon_threadsafe(self._loop.stop) + + # 3. Wait for the thread to actually finish + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + print('Warning: Background thread did not stop in time.') + @property def connected(self): """Connected.""" + if not self._loop_alive(): + return False try: - status = _urllib.request.urlopen( - self._url, timeout=self._timeout, context=_ssl.SSLContext() - ).status - return status == 200 + resp = self._make_request(self._url, return_json=False) + return resp.status == 200 except _urllib.error.URLError: return False @@ -453,6 +487,31 @@ def _process_url_link_args(pvnames, pvoptnrpts, pvcolors, pvusediff): pvusediff = [pvusediff] * len(pvnames) return pvoptnrpts, pvcolors, pvusediff + def _loop_alive(self): + """Check if thread is alive and loop is running.""" + return ( + self._thread is not None + and self._thread.is_alive() + and self._loop.is_running() + ) + + def _cancel_all_tasks(self): + """Helper to cancel tasks (must be called from the loop's thread).""" + if hasattr(_asyncio, 'all_tasks'): + all_tasks = _asyncio.all_tasks(loop=self._loop) + else: # python 3.6 + all_tasks = _asyncio.Task.all_tasks(loop=self._loop) + + for task in all_tasks: + task.cancel() + + def _run_event_loop(self): + _asyncio.set_event_loop(self._loop) + try: + self._loop.run_forever() + finally: + self._loop.close() + def _make_request(self, url, need_login=False, return_json=False): """Make request.""" self._request_url = url @@ -474,20 +533,14 @@ def _create_url(self, method, **kwargs): url += '&'.join(['{}={}'.format(k, v) for k, v in kwargs.items()]) return url - # ---------- async methods ---------- def _run_sync_coro(self, coro): """Run an async coroutine synchronously, compatible with Jupyter.""" - try: - loop = _asyncio.get_running_loop() - try: - return loop.run_until_complete(coro) - except RuntimeError: - # Event loop already running (typical in Jupyter notebooks). - nest_asyncio.apply(loop) - return loop.run_until_complete(coro) - except RuntimeError: - # No running loop, create a new one - return _asyncio.run(coro) + if not self._thread.is_alive(): + raise RuntimeError('Library is shut down') + future = _asyncio.run_coroutine_threadsafe(coro, self._loop) + return future.result(timeout=self._timeout) + + # ---------- async methods ---------- async def _handle_request(self, url, return_json=False, need_login=False): """Handle request.""" From 07ae2c6a87b631a1d50d5fd54013f5207bd790b5 Mon Sep 17 00:00:00 2001 From: Fernando Date: Wed, 1 Apr 2026 15:31:04 -0300 Subject: [PATCH 5/9] BUG: (CLTARC.CLT) Refactors async request handling and error management. Simplifies request logic by removing the need for a return_json flag, centralizing response deserialization, and improving error handling for timeouts and payload issues. Enhances code clarity by isolating JSON/text parsing and standardizing return values. Prepares for more robust integration and debugging in asynchronous workflows. --- siriuspy/siriuspy/clientarch/client.py | 96 ++++++++++++-------------- 1 file changed, 45 insertions(+), 51 deletions(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index cf5c8ce03..560283a27 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -15,7 +15,10 @@ import numpy as _np import urllib3 as _urllib3 -from aiohttp import ClientSession as _ClientSession +from aiohttp import ( + client_exceptions as _aio_exceptions, + ClientSession as _ClientSession +) try: from lzstring import LZString as _LZString @@ -84,8 +87,7 @@ def connected(self): if not self._loop_alive(): return False try: - resp = self._make_request(self._url, return_json=False) - return resp.status == 200 + return bool(self._make_request(self._url + '/mgmt')) except _urllib.error.URLError: return False @@ -160,7 +162,7 @@ def getPVsInfo(self, pvnames): if isinstance(pvnames, (list, tuple)): pvnames = ','.join(pvnames) url = self._create_url(method='getPVStatus', pv=pvnames) - resp = self._make_request(url, return_json=True) + resp = self._make_request(url) return None if not resp else resp def getAllPVs(self, pvnames): @@ -168,7 +170,7 @@ def getAllPVs(self, pvnames): if isinstance(pvnames, (list, tuple)): pvnames = ','.join(pvnames) url = self._create_url(method='getAllPVs', pv=pvnames, limit='-1') - resp = self._make_request(url, return_json=True) + resp = self._make_request(url) return None if not resp else resp def deletePVs(self, pvnames): @@ -184,7 +186,7 @@ def deletePVs(self, pvnames): def getPausedPVsReport(self): """Get Paused PVs Report.""" url = self._create_url(method='getPausedPVsReport') - resp = self._make_request(url, return_json=True) + resp = self._make_request(url) return None if not resp else resp def getRecentlyModifiedPVs(self, limit=None, epoch_time=True): @@ -198,7 +200,7 @@ def getRecentlyModifiedPVs(self, limit=None, epoch_time=True): if limit is not None: method += f'?limit={str(limit)}' url = self._create_url(method=method) - resp = self._make_request(url, return_json=True) + resp = self._make_request(url) # convert to epoch, if the case if resp and epoch_time: @@ -315,7 +317,7 @@ def getData( end = len(all_urls) pvn2idcs[pvname_orig[i]] = _np.arange(ini, end) - resps = self._make_request(all_urls, return_json=True) + resps = self._make_request(all_urls) if not resps: return None @@ -362,7 +364,7 @@ def getPVDetails(self, pvname, get_request_url=False): url = self._create_url(method='getPVDetails', pv=pvname) if get_request_url: return url - resp = self._make_request(url, return_json=True) + resp = self._make_request(url) return None if not resp else resp def switch_to_online_data(self): @@ -512,12 +514,10 @@ def _run_event_loop(self): finally: self._loop.close() - def _make_request(self, url, need_login=False, return_json=False): + def _make_request(self, url, need_login=False): """Make request.""" self._request_url = url - coro = self._handle_request( - url, return_json=return_json, need_login=need_login - ) + coro = self._handle_request(url, need_login=need_login) return self._run_sync_coro(coro) def _create_url(self, method, **kwargs): @@ -542,57 +542,51 @@ def _run_sync_coro(self, coro): # ---------- async methods ---------- - async def _handle_request(self, url, return_json=False, need_login=False): + async def _handle_request(self, url, need_login=False): """Handle request.""" if self.session is not None: - response = await self._get_request_response( - url, self.session, return_json - ) + response = await self._get_request_response(url, self.session) elif need_login: raise _exceptions.AuthenticationError('You need to login first.') else: async with _ClientSession() as sess: - response = await self._get_request_response( - url, sess, return_json - ) + response = await self._get_request_response(url, sess) return response - async def _get_request_response(self, url, session, return_json): + async def _get_request_response(self, url, session): """Get request response.""" + url = [url] if isinstance(url, str) else url try: - if isinstance(url, list): - response = await _asyncio.gather(*[ - session.get(u, ssl=False, timeout=self._timeout) - for u in url - ]) - if any([not r.ok for r in response]): - return None - if return_json: - jsons = list() - for res in response: - try: - data = await res.json() - jsons.append(data) - except ValueError: - _log.error(f'Error with URL {res.url}') - jsons.append(None) - response = jsons - else: - response = await session.get( - url, ssl=False, timeout=self._timeout - ) - if not response.ok: - return None - if return_json: - try: - response = await response.json() - except ValueError: - _log.error(f'Error with URL {response.url}') - response = None + response = await _asyncio.gather(*[ + self._fetch_url(session, u) for u in url + ]) except _asyncio.TimeoutError as err: - raise _exceptions.TimeoutError from err + raise _exceptions.TimeoutError( + 'Timeout reached. Try to increase `timeout`.' + ) from err + except _aio_exceptions.ClientPayloadError as err: + raise _exceptions.PayloadError( + "Payload Error. Increasing `timeout` won't help. " + 'Try:\n - decreasing `query_bin_interval`;' + '\n - or decrease the time interval for the aquisition;' + ) from err + + if len(url) == 1: + return response[0] return response + async def _fetch_url(self, session, url): + async with session.get(url, timeout=self._timeout) as response: + if response.status != 200: + return None + try: + return await response.json() + except _aio_exceptions.ContentTypeError: + return await response.text() + except ValueError: + _log.error('Error with URL %s', response.url) + return None + async def _create_session(self, url, headers, payload, ssl): """Create session and handle login.""" session = _ClientSession() From 49cc4fe58fa860594b5cf826f261eccf3470a510 Mon Sep 17 00:00:00 2001 From: Fernando Date: Wed, 1 Apr 2026 15:35:13 -0300 Subject: [PATCH 6/9] DOC: (CLTARCH.CLT) add documentation for catching ContentTypeError. --- siriuspy/siriuspy/clientarch/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index 560283a27..8706dbca2 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -582,6 +582,7 @@ async def _fetch_url(self, session, url): try: return await response.json() except _aio_exceptions.ContentTypeError: + # for cases where response returns html (self.connected). return await response.text() except ValueError: _log.error('Error with URL %s', response.url) From f407bc58820d70ce2d18b39d04a3b20e1808e52f Mon Sep 17 00:00:00 2001 From: Fernando Date: Mon, 6 Apr 2026 10:34:32 -0300 Subject: [PATCH 7/9] BUG: (CLTARCH.CLT) Ensure ssl=False so it works on control room PCs, where IP address, instead of server name is defined. --- siriuspy/siriuspy/clientarch/client.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index 8706dbca2..8c2a2cecf 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -17,7 +17,8 @@ import urllib3 as _urllib3 from aiohttp import ( client_exceptions as _aio_exceptions, - ClientSession as _ClientSession + ClientSession as _ClientSession, + TCPConnector as _TCPConn ) try: @@ -132,9 +133,7 @@ def login(self, username, password): headers = {'User-Agent': 'Mozilla/5.0'} payload = {'username': username, 'password': password} url = self._create_url(method='login') - coro = self._create_session( - url, headers=headers, payload=payload, ssl=False - ) + coro = self._create_session(url, headers=headers, payload=payload) ret = self._run_sync_coro(coro) if ret is not None: self.session, authenticated = ret @@ -549,7 +548,7 @@ async def _handle_request(self, url, need_login=False): elif need_login: raise _exceptions.AuthenticationError('You need to login first.') else: - async with _ClientSession() as sess: + async with _ClientSession(connector=_TCPConn(ssl=False)) as sess: response = await self._get_request_response(url, sess) return response @@ -588,11 +587,11 @@ async def _fetch_url(self, session, url): _log.error('Error with URL %s', response.url) return None - async def _create_session(self, url, headers, payload, ssl): + async def _create_session(self, url, headers, payload): """Create session and handle login.""" - session = _ClientSession() + session = _ClientSession(connector=_TCPConn(ssl=False)) async with session.post( - url, headers=headers, data=payload, ssl=ssl, timeout=self._timeout + url, headers=headers, data=payload, timeout=self._timeout ) as response: content = await response.content.read() authenticated = b'authenticated' in content From 8c10caaddaca1300a88fb762789360de77bdd5ac Mon Sep 17 00:00:00 2001 From: Fernando Date: Mon, 6 Apr 2026 10:35:22 -0300 Subject: [PATCH 8/9] BUG: (CLTARCH.CLT) Defines `"Host"` in header so that login works in control room (bug already in master). --- siriuspy/siriuspy/clientarch/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index 8c2a2cecf..744b0294e 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -130,7 +130,7 @@ def last_requested_url(self): def login(self, username, password): """Open login session.""" - headers = {'User-Agent': 'Mozilla/5.0'} + headers = {'User-Agent': 'Mozilla/5.0', 'Host': 'cnpem.br'} payload = {'username': username, 'password': password} url = self._create_url(method='login') coro = self._create_session(url, headers=headers, payload=payload) From 46a68f9581cc3f0dc6c7a8d03e55ada23b50b52d Mon Sep 17 00:00:00 2001 From: Fernando Date: Mon, 6 Apr 2026 11:00:58 -0300 Subject: [PATCH 9/9] BUG: (CLTARCH.CLT) Fix problem with single URLs in request. --- siriuspy/siriuspy/clientarch/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index 744b0294e..902584a50 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -554,7 +554,8 @@ async def _handle_request(self, url, need_login=False): async def _get_request_response(self, url, session): """Get request response.""" - url = [url] if isinstance(url, str) else url + single = isinstance(url, str) + url = [url] if single else url try: response = await _asyncio.gather(*[ self._fetch_url(session, u) for u in url @@ -570,7 +571,7 @@ async def _get_request_response(self, url, session): '\n - or decrease the time interval for the aquisition;' ) from err - if len(url) == 1: + if single: return response[0] return response