diff --git a/siriuspy/siriuspy/clientarch/client.py b/siriuspy/siriuspy/clientarch/client.py index dbd7387d5..bf0163b4a 100644 --- a/siriuspy/siriuspy/clientarch/client.py +++ b/siriuspy/siriuspy/clientarch/client.py @@ -7,8 +7,9 @@ """ import asyncio as _asyncio +import getpass as _getpass import logging as _log -import ssl as _ssl +import math as _math import urllib as _urllib from datetime import timedelta as _timedelta from threading import Thread as _Thread @@ -16,55 +17,111 @@ import numpy as _np import urllib3 as _urllib3 -from aiohttp import ClientSession as _ClientSession +from aiohttp import ( + client_exceptions as _aio_exceptions, + ClientSession as _ClSession, + TCPConnector as _TCPConn +) +from mathphys.functions import get_namedtuple as _get_namedtuple + try: from lzstring import LZString as _LZString -except: +except ModuleNotFoundError: _LZString = None from .. import envars as _envars from . import exceptions as _exceptions -from .time import Time as _Time +from .time import get_time_intervals as _get_time_intervals, Time as _Time class ClientArchiver: """Archiver Data Fetcher class.""" - DEFAULT_TIMEOUT = 5.0 # [s] + DEF_QUERY_SPLIT_INTERVAL = 12 * 60 * 60 # 12h + DEF_QUERY_MAX_CONCURRENCY = 100 # maximum number of concurrent queries + DEFAULT_QUERY_TIMEOUT = 5.0 # [s] SERVER_URL = _envars.SRVURL_ARCHIVER ENDPOINT = '/mgmt/bpl' - def __init__(self, server_url=None, timeout=None): + _REPORTS = { + 'DisconnectedPVs': 'getCurrentlyDisconnectedPVs', + 'PausedPVs': 'getPausedPVsReport', + 'EventRate': 'getEventRateReport', + 'StorageRate': 'getStorageRateReport', + 'RecentlyAddedPVs': 'getRecentlyAddedPVs', + 'RecentlyModifiedPVs': 'getRecentlyModifiedPVs', + 'LostConnections': 'getLostConnectionsReport', + 'LastKnownTimestamps': 'getSilentPVsReport', + 'DroppedEventsWrongTimestamp': 'getPVsByDroppedEventsTimestamp', + 'DroppedEventsBufferOverflow': 'getPVsByDroppedEventsBuffer', + 'DroppedEventsTypeChange': 'getPVsByDroppedEventsTypeChange', + } + ReportTypes = _get_namedtuple( + 'ReportTypes', _REPORTS.keys(), _REPORTS.values() + ) + _PROC_TYPES = { + 'None_': '', + 'TotalCount': 'ncount', + 'Mean': 'mean', + 'Median': 'median', + 'STD': 'std', + 'Variance': 'variance', + 'Popvariance': 'popvariance', + 'Kurtosis': 'kurtosis', + 'Skewness': 'skewness', + 'Min': 'mini', + 'Max': 'maxi', + 'STDoverMean': 'jitter', + 'Count': 'count', + 'FirstSample': 'firstSample', + 'LastSample': 'lastSample', + 'FirstFill': 'firstFill', + 'LastFill': 'lastFill', + 'Linear': 'linear', + 'Loess': 'loess', + 'Optimized': 'optimized', + 'OptimLastSample': 'optimLastSample', + 'NthSample': 'nth', + 'SelectByChange': 'deadBand', + 'IgnoreOutliers': 'ignoreflyers', + 'Outliers': 'flyers', + } + ProcessingTypes = _get_namedtuple( + 'ProcessingTypes', _PROC_TYPES.keys(), _PROC_TYPES.values() + ) + + def __delete__(self): + """Turn off thread when deleting.""" + self.logout() + self.shutdown() + + def __init__(self, server_url=None, query_timeout=None): """Initialize.""" - timeout = timeout or ClientArchiver.DEFAULT_TIMEOUT + query_timeout = query_timeout or ClientArchiver.DEFAULT_QUERY_TIMEOUT self.session = None - self._timeout = timeout self._url = server_url or self.SERVER_URL - self._ret = None self._request_url = None - # print('urllib3 InsecureRequestWarning disabled!') + self._thread = self._loop = self._semaphore = None + self._query_timeout = query_timeout + self._query_split_interval = self.DEF_QUERY_SPLIT_INTERVAL + self._query_max_concurrency = self.DEF_QUERY_MAX_CONCURRENCY + self.connect() _urllib3.disable_warnings(_urllib3.exceptions.InsecureRequestWarning) @property def connected(self): - """Connected.""" + """Return whether the archiver is connected.""" + if not self._loop_alive(): + return False try: - status = _urllib.request.urlopen( - self._url, timeout=self._timeout, context=_ssl.SSLContext() - ).status - return status == 200 + return bool(self.make_request(self._url + '/mgmt')) except _urllib.error.URLError: return False @property - def timeout(self): - """Connection timeout.""" - return self._timeout - - @timeout.setter - def timeout(self, value): - """Set connection timeout.""" - self._timeout = float(value) + def last_requested_url(self): + """Return the last requested URL or URLs.""" + return self._request_url @property def server_url(self): @@ -87,23 +144,101 @@ def server_url(self, url): self.logout() self._url = url + # ------------ query related properties -------------- + @property - def last_requested_url(self): - """.""" - return self._request_url + def query_timeout(self): + """Request timeout for each query.""" + return self._query_timeout + + @query_timeout.setter + def query_timeout(self, value): + """Set request timeout for each query.""" + self._query_timeout = max(float(value), 0) + + @property + def query_split_interval(self): + """Queries larger than this interval will be split. + + If set to 0 or None, no splitting will be done. + """ + return self._query_split_interval + + @query_split_interval.setter + def query_split_interval(self, new_intvl): + if new_intvl is None: + new_intvl = 0 + if not isinstance(new_intvl, (float, int)): + raise _exceptions.TypeError( + 'expected argument of type float or int, got ' + + str(type(new_intvl)) + ) + self._query_split_interval = max(int(new_intvl), 0) + + @property + def query_max_concurrency(self): + """Maximum number of concurrent queries to server.""" + return self._query_max_concurrency + + @query_max_concurrency.setter + def query_max_concurrency(self, new_val): + if not isinstance(new_val, (float, int)): + raise _exceptions.TypeError( + 'expected argument of type float or int, got ' + + str(type(new_val)) + ) + self._query_max_concurrency = int(new_val) + + # ------------- methods to control client behavior -------------- + + def connect(self): + """Starts bg. event loop in a separate thread when in async mode.""" + if self._loop_alive(): + return + + self._loop = _asyncio.new_event_loop() + self._thread = _Thread(target=self._run_event_loop, daemon=True) + self._thread.start() + + def shutdown(self, timeout=5): + """Safely stops the bg. loop and waits for the thread to exit.""" + if not self._loop_alive(): + return + + # 1. Cancel all pending tasks in the loop (to avoid ResourceWarnings) + self._loop.call_soon_threadsafe(self._cancel_all_tasks) + + # 2. Schedule the loop to stop processing + self._loop.call_soon_threadsafe(self._loop.stop) + + # 3. Wait for the thread to actually finish + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + print('Warning: Background thread did not stop in time.') - def login(self, username, password): - """Open login session.""" - headers = {'User-Agent': 'Mozilla/5.0'} + def login(self, username, password=None): + """Login to the Archiver server. + + Args: + username (str): Username to login. + password (str): Password to login. If not provided, it will be + (secretly) prompted in the console. + + Returns: + bool: True if login was successful, False otherwise. + """ + if self.session is not None: + self.logout() + + if password is None: + password = _getpass.getpass( + prompt=f'Password for user {username}: ', stream=None + ) payload = {'username': username, 'password': password} url = self._create_url(method='login') - ret = self._run_async_event_loop( - self._create_session, - url, - headers=headers, - payload=payload, - ssl=False, - ) + + coro = self._create_session(url, payload=payload) + ret = self._run_sync_coro(coro) if ret is not None: self.session, authenticated = ret if authenticated: @@ -118,164 +253,574 @@ def login(self, username, password): def logout(self): """Close login session.""" - if self.session: - resp = self._run_async_event_loop(self._close_session) - self.session = None - return resp - return None - - def getPVsInfo(self, pvnames): - """Get PVs Info.""" - if isinstance(pvnames, (list, tuple)): - pvnames = ','.join(pvnames) - url = self._create_url(method='getPVStatus', pv=pvnames) - resp = self._make_request(url, return_json=True) + if self.session is None: + return + coro = self._close_session() + resp = self._run_sync_coro(coro) + self.session = None + return resp + + def switch_to_online_data(self): + """Switch to online data. + + Sets server URL to online data URL and logs out if needed. + """ + self.server_url = _envars.SRVURL_ARCHIVER + self.logout() + + def switch_to_offline_data(self): + """Switch to offline data. + + Sets server URL to offline data URL and logs out if needed. + """ + self.server_url = _envars.SRVURL_ARCHIVER_OFFLINE_DATA + self.logout() + + # ------------- methods to get PVs informations -------------- + + def get_pvs_info(self, wildcards='*', max_num_pvs=-1): + """Get PVs Info. + + Call method `getPVStatus` of the Archiver Appliance, which returns a + list of PVs matching the wildcards, with some details about each PV, + such as its type, connection status, etc. + + Args: + wildcards (str|list|tuple): Wildcards to match. + max_num_pvs (int): Maximum number of PVs to return. + + Returns: + list: List of dictionary with PVs details. + + """ + if isinstance(wildcards, (list, tuple)): + wildcards = ','.join(wildcards) + + max_num_pvs = f'{int(max_num_pvs)}' + url = self._create_url( + method='getPVStatus', pv=wildcards, limit=max_num_pvs + ) + resp = self.make_request(url) return None if not resp else resp - def getAllPVs(self, pvnames): - """Get All PVs.""" - if isinstance(pvnames, (list, tuple)): - pvnames = ','.join(pvnames) - url = self._create_url(method='getAllPVs', pv=pvnames, limit='-1') - resp = self._make_request(url, return_json=True) + def get_all_pvs(self, wildcards='*', max_num_pvs=-1): + """Get All PVs matching wildcards. + + Call method `getAllPVs` of the Archiver Appliance, which returns a + list of PVs matching the wildcards. + + Args: + wildcards (str|list|tuple): Wildcards to match. + max_num_pvs (int): Maximum number of PVs to return. + + Returns: + list: List of dictionary with PVs details. + """ + if isinstance(wildcards, (list, tuple)): + wildcards = ','.join(wildcards) + + max_num_pvs = f'{int(max_num_pvs)}' + url = self._create_url( + method='getAllPVs', pv=wildcards, limit=max_num_pvs + ) + resp = self.make_request(url) return None if not resp else resp - def deletePVs(self, pvnames): - """Delete PVs.""" - if not isinstance(pvnames, (list, tuple)): - pvnames = (pvnames,) - for pvname in pvnames: - url = self._create_url( - method='deletePV', pv=pvname, deleteData='true' - ) - self._make_request(url, need_login=True) + def get_pv_details(self, pvname, get_request_url=False): + """Get PV Details. + + Call method `getPVDetails` of the Archiver Appliance, which returns + PVs details regarding its archiving status. + + Args: + pvname (str): Name of the PV to get details. + get_request_url (bool): Whether to only return request url. + + Returns: + list (None | list): List of dictionary with PVs details. + """ + url = self._create_url(method='getPVDetails', pv=pvname) + if get_request_url: + return url + resp = self.make_request(url) + return None if not resp else resp + + def get_pv_type_info(self, pvname: str): + """Get PV Type Info. + + Call method `getPVTypeInfo` of the Archiver Appliance, which returns + Archiving information for a PV, such as its archiving policy. + + Args: + pvname (str): Name of the PV to get type info. + + Returns: + list: List of dictionary with PVs details. + + """ + url = self._create_url(method='getPVTypeInfo', pv=pvname) + resp = self.make_request(url) + return None if not resp else resp + + # ------------- methods to get appliance metrics -------------- + + def get_detailed_appliance_metrics(self): + """Get detailed appliance metrics for archiver appliance. + + Call method `getApplianceMetricsForAppliance` of the Archiver + Appliance, which returns a list of metrics for the archiver + appliance. + + Returns: + response (dict|None): Response of the request. + """ + url = self._create_url( + method='getApplianceMetricsForAppliance', + appliance='lnls_control_appliance_1', + ) + resp = self.make_request(url) + return None if not resp else resp + + def get_process_metrics_for_appliance(self): + """Get process metrics for archiver appliance. + + Call method `getProcessMetricsDataForAppliance` of the Archiver + Appliance, which returns a list of metrics for the processing consumed + by the archiver appliance. + + Returns: + response (dict|None): Response of the request. The metrics + that are returned in case of success are: + - system load in % + - engine heap in % + - etl heap in % + - retrieval heap in % + """ + url = self._create_url( + method='getProcessMetricsDataForAppliance', + appliance='lnls_control_appliance_1', + ) + resp = self.make_request(url) + return None if not resp else resp + + def get_report(self, report_name='PausedPVs', max_num_pvs=None): + """Get Paused PVs Report. + + Call report methods of the Archiver Appliance. Possible reports are: + - DisconnectedPVs --> `getCurrentlyDisconnectedPVs` + - PausedPVs --> `getPausedPVsReport` + - EventRate --> `getEventRateReport` + - StorageRate --> `getStorageRateReport` + - RecentlyAddedPVs --> `getRecentlyAddedPVs` + - RecentlyModifiedPVs --> `getRecentlyModifiedPVs` + - LostConnections --> `getLostConnectionsReport` + - LastKnownTimestamps --> `getSilentPVsReport` + - DroppedEventsWrongTimestamp --> `getPVsByDroppedEventsTimestamp` + - DroppedEventsBufferOverflow --> `getPVsByDroppedEventsBuffer` + - DroppedEventsTypeChange --> `getPVsByDroppedEventsTypeChange` + For details on the content of each report, please, refer to the + Archiver Appliance documentation. + + The results of each report will be unprocessed in a json dict. + In case you want a processed report, please, refer to the specific + methods for each report, such as `get_recently_modified_pvs` for the + `RecentlyModifiedPVs` report. + + Args: + report_name (str): Report name. Use self.ReportTypes to get + all available reports. + max_num_pvs (int): Maximum number of PVs to return. + + Returns: + dict: Report results. + + """ + method = getattr(self.ReportTypes, report_name) + if max_num_pvs is not None: + max_num_pvs = f'{int(max_num_pvs)}' + url = self._create_url(method=method, limit=max_num_pvs) + else: + url = self._create_url(method=method) - def getPausedPVsReport(self): - """Get Paused PVs Report.""" - url = self._create_url(method='getPausedPVsReport') - resp = self._make_request(url, return_json=True) + resp = self.make_request(url) return None if not resp else resp - def getRecentlyModifiedPVs(self, limit=None, epoch_time=True): + def get_recently_modified_pvs(self, max_num_pvs=None, epoch_time=True): """Get list of PVs with recently modified PVTypeInfo. Currently version of the epics archiver appliance returns pvname list from oldest to newest modified timestamps. + + Args: + max_num_pvs (int): Maximum number of PVs to return. + epoch_time (bool): Convert timestamps to epoch. + + Returns: + list: List of dictionary with PVs details. """ - method = 'getRecentlyModifiedPVs' - # get data - if limit is not None: - method += f'?limit={str(limit)}' - url = self._create_url(method=method) - resp = self._make_request(url, return_json=True) + resp = self.get_report( + self, + report_name=self.ReportTypes.RecentlyModifiedPVs, + max_num_pvs=max_num_pvs, + ) # convert to epoch, if the case if resp and epoch_time: for item in resp: - modtime = item['modificationTime'][ - :-7 - ] # remove ISO8601 offset + modtime = item['modificationTime'][:-7] # rm. ISO8601 offset epoch_time = _Time.conv_to_epoch(modtime, '%b/%d/%Y %H:%M:%S') item['modificationTime'] = epoch_time return None if not resp else resp - def pausePVs(self, pvnames): - """Pause PVs.""" + # ------------- Management of PVs methods -------------- + + def delete_pvs(self, pvnames, delete_data=False): + """Delete PVs. + + Call method `deletePV` of the Archiver Appliance, which deletes PVs. + + This method requires that self.login() is called first. + + Args: + pvnames (str|list|tuple): PVs to delete. + delete_data (bool): Delete data associated with the PVs. + + Returns: + response (list): Response of the request for each PV. + """ + if not isinstance(pvnames, (list, tuple)): + pvnames = (pvnames,) + + delete_data = 'true' if delete_data else 'false' + ret = [] + for pvname in pvnames: + url = self._create_url( + method='deletePV', pv=pvname, deleteData=delete_data + ) + ret.append(self.make_request(url, need_login=True)) + return ret + + def pause_pvs(self, pvnames): + """Pause PVs. + + Call method `pauseArchivingPV` of the Archiver Appliance, which pauses + archiving for a PV. + + This method requires that self.login() is called first. + + Args: + pvnames (list|tuple): List of PVs to pause. + + Returns: + response (list): Response of the request for each PV. + """ if not isinstance(pvnames, (list, tuple)): pvnames = (pvnames,) + ret = [] for pvname in pvnames: url = self._create_url(method='pauseArchivingPV', pv=pvname) - self._make_request(url, need_login=True) + ret.append(self.make_request(url, need_login=True)) + return ret + + def rename_pv(self, oldname, newname): + """Rename PVs. + + Call method `renamePV` of the Archiver Appliance, which renames a PV. + + This method requires that self.login() is called first. - def renamePV(self, oldname, newname): - """Rename PVs.""" + Args: + oldname (str): Old PV name. + newname (str): New PV name. + + Returns: + response (dict|None): Response of the request. + """ url = self._create_url(method='renamePV', pv=oldname, newname=newname) - return self._make_request(url, need_login=True) + return self.make_request(url, need_login=True) - def resumePVs(self, pvnames): - """Resume PVs.""" + def resume_pvs(self, pvnames): + """Resume PVs. + + Call method `resumeArchivingPV` of the Archiver Appliance, which + resumes archiving for a PV. + + This method requires that self.login() is called first. + + Args: + pvnames (list|tuple): List of PVs to resume. + + Returns: + response (list): Response of the request for each PV. + """ if not isinstance(pvnames, (list, tuple)): pvnames = (pvnames,) + ret = [] for pvname in pvnames: url = self._create_url(method='resumeArchivingPV', pv=pvname) - self._make_request(url, need_login=True) + ret.append(self.make_request(url, need_login=True)) + return ret + + # ------------- methods related to get_data -------------- - def getData( + def get_data( self, - pvname, + pvnames, timestamp_start, timestamp_stop, - process_type='', - interval=None, - stddev=None, - get_request_url=False, + query_split_interval=None, + proc_type='', + proc_type_param1=None, + proc_type_param2=3.0, ): """Get archiver data. - pvname -- name of pv. - timestamp_start -- timestamp of interval start - Example: '2019-05-23T13:32:27.570Z' - timestamp_stop -- timestamp of interval stop - Example: '2019-05-23T13:32:27.570Z' - process_type -- data processing type to use. Can be: - '', 'mean', 'median', 'std', 'variance', - 'popvariance', 'kurtosis', 'skewness' - 'mini', 'maxi', 'jitter', 'count', 'ncount', - 'firstSample', 'lastSample', 'firstFill', 'lastFill', - 'nth', 'ignoreflyers' or 'flyers' - interval -- interval of the bin of data, in seconds - stddev -- number of standard deviations. - argument used in processing 'ignoreflyers' and 'flyers'. + Args: + pvnames (str|list|tuple): names of the PVs. + timestamp_start (str|int|Time|list|tuple): start time for query. + If it is a list or tuple, all PVs will be queried for each of + the time intervals. In this case, it must have the same length + as `timestamp_stop`. + timestamp_stop (str|int|Time|list|tuple): stop time for query. + If it is a list or tuple, all PVs will be queried for each of + the time intervals. In this case, it must have the same length + as `timestamp_start`. + query_split_interval (int): overwrites `self.query_split_interval`. + Defaults to `self.query_split_interval`. Maximum interval for + queries. If + `timestamp_stop - timestamp_start > query_split_interval`, + it will be split into parallel queries. If query_split_interval<=0, + no splitting will be done. + proc_type (str): data processing type to use for query. Defaults to + ''. For details on each operator, please, refer to the section + Processing of data of the following page: + https://epicsarchiver.readthedocs.io/en/latest/user/userguide.html + + The options implemented here are: + + The options below do not take any aditional parameter: + '' --> No processing, raw data is returned. + 'ncount' --> total number of updates in the whole interval. + + All types of processing below, require an aditional parameter, + controlled by the input `proc_type_param1`. Then the + refered statistics will be performed within this interval: + 'mean' + 'median' + 'std' + 'variance' + 'popvariance' --> population variance. + 'kurtosis' + 'skewness' + 'mini' --> same as min, which is also accepted by the archiver. + 'maxi' --> same as max, which is also accepted by the archiver. + 'jitter' --> std / mean for each bin. + 'count' --> number of updates in each bin. + 'firstSample' + 'lastSample' + 'firstFill' --> see url for difference to `'firstSample'`. + 'lastFill' --> see url for difference to `'lastSample'`. + 'linear' --> not sure, look at the archiver docs. + 'loess' --> not sure, look at the archiver docs. + + The processing below also use an aditional parameter, but its + meaning is different from the statistics above: + 'optimized' --> the parameter means the total number of points + to be returned, instead of the time interval. + 'optimLastSample' --> close to 'opimized'. See docs for diff. + 'nth' --> return every nth sample. + 'deadBand' --> similar to ADEL. Only return when values change + by a certain amount. + + For both statistics below a second parameter is needed to configure + acquisition, controlled by `proc_type_param2`. This + parameter controls the number of standard deviations to consider + in the filtering bellow. The default of this parameter is 3.0: + 'ignoreflyers' --> whether to ignore outliers + 'flyers' --> only return outliers + + proc_type_param1 (int): First parameter for data processing. See + `proc_type` for more details. + proc_type_param2 (int): Second parameter for data processing. See + `proc_type` for more details. + + Returns: + dict: a dictionary with PV names as keys and data as values. + """ - if isinstance(pvname, str): - pvname = [pvname] - if isinstance(timestamp_start, str): + if isinstance(pvnames, str): + pvnames = [pvnames] + + urls, pvn2idcs = self.get_request_url_for_get_data( + pvnames, + timestamp_start, + timestamp_stop, + query_split_interval=query_split_interval, + proc_type=proc_type, + proc_type_param1=proc_type_param1, + proc_type_param2=proc_type_param2, + return_pvn2idcs_dict=True, + ) + urls = [urls] if isinstance(urls, str) else urls + + resps = self.make_request(urls) + if not resps: + return None + + return self.process_resquest_of_get_data(pvnames, resps, pvn2idcs) + + def get_request_url_for_get_data( # noqa: C901 + self, + pvnames, + timestamp_start, + timestamp_stop, + query_split_interval=None, + proc_type=None, + proc_type_param1=None, + proc_type_param2=None, + return_pvn2idcs_dict=False, + ): + """Get url for data request in `get_data` function. + + Args: + pvnames (str|list|tuple): names of the PVs. + timestamp_start (str|int|Time|list|tuple): start time for query. + If it is a list or tuple, all PVs will be queried for each of + the time intervals. In this case, it must have the same length + as `timestamp_stop`. + timestamp_stop (str|int|Time|list|tuple): stop time for query. + If it is a list or tuple, all PVs will be queried for each of + the time intervals. In this case, it must have the same length + as `timestamp_start`. + query_split_interval (int): overwrites `self.query_split_interval`. + Defaults to `self.query_split_interval`. Maximum interval for + queries. If + `timestamp_stop - timestamp_start > query_split_interval`, + it will be split into parallel queries. If query_split_interval<=0, + no splitting will be done. + proc_type (str): data processing type to use for query. Defaults to + ''. For details on each operator, please, refer to the section + Processing of data of the following page: + https://epicsarchiver.readthedocs.io/en/latest/user/userguide.html + + The options implemented here are: + + The options below do not take any aditional parameter: + '' --> No processing, raw data is returned. + 'ncount' --> total number of updates in the whole interval. + + All types of processing below, require an aditional parameter, + controlled by the input `proc_type_param1`. Then the + refered statistics will be performed within this interval: + 'mean' + 'median' + 'std' + 'variance' + 'popvariance' --> population variance. + 'kurtosis' + 'skewness' + 'mini' --> same as min, which is also accepted by the archiver. + 'maxi' --> same as max, which is also accepted by the archiver. + 'jitter' --> std / mean for each bin. + 'count' --> number of updates in each bin. + 'firstSample' + 'lastSample' + 'firstFill' --> see url for difference to `'firstSample'`. + 'lastFill' --> see url for difference to `'lastSample'`. + 'linear' --> not sure, look at the archiver docs. + 'loess' --> not sure, look at the archiver docs. + + The processing below also use an aditional parameter, but its + meaning is different from the statistics above: + 'optimized' --> the parameter means the total number of points + to be returned, instead of the time interval. + 'optimLastSample' --> close to 'opimized'. See docs for diff. + 'nth' --> return every nth sample. + 'deadBand' --> similar to ADEL. Only return when values change + by a certain amount. + + For both statistics below a second parameter is needed to configure + acquisition, controlled by `proc_type_param2`. This + parameter controls the number of standard deviations to consider + in the filtering bellow. The default of this parameter is 3.0: + 'ignoreflyers' --> whether to ignore outliers + 'flyers' --> only return outliers + + proc_type_param1 (int): First parameter for data processing. See + `proc_type` for more details. + proc_type_param2 (int): Second parameter for data processing. See + `proc_type` for more details. + return_pvn2idcs_dict (bool): whether to return a dictionary with + PV names as keys and indices as values. Defaults to False. + + Returns: + str|list|tuple: url or list of urls. + """ + if isinstance(pvnames, str): + pvnames = [pvnames] + + if not isinstance(timestamp_start, (list, tuple)): timestamp_start = [timestamp_start] - if isinstance(timestamp_stop, str): + if not isinstance(timestamp_stop, (list, tuple)): timestamp_stop = [timestamp_stop] - if not isinstance(timestamp_start, (list, tuple)) or not isinstance( - timestamp_stop, (list, tuple) - ): - raise _exceptions.TypeError( - "'timestampstart' and 'timestamp_stop' arguments must be " - 'timestamp strings or iterable.' - ) - pvname_orig = list(pvname) - if process_type: - process_str = process_type - if interval is not None: - process_str += '_' + str(int(interval)) - if 'flyers' in process_type and stddev is not None: - process_str += '_' + str(int(stddev)) - pvname = [process_str + '(' + pvn + ')' for pvn in pvname] + if len(timestamp_start) != len(timestamp_stop): + raise _exceptions.IndexError( + '`timestamp_start` and `timestamp_stop` must have same length.' + ) - if get_request_url: - tstart = _urllib.parse.quote(timestamp_start[0]) - tstop = _urllib.parse.quote(timestamp_stop[-1]) - url = [ - self._create_url( - method='getData.json', - pv=pvn, - **{'from': tstart, 'to': tstop}, - ) - for pvn in pvname - ] - return url[0] if len(pvname) == 1 else url + inter = self.query_split_interval + if query_split_interval is not None: + inter = query_split_interval + + tstamps_start = [] + tstamps_stop = [] + for tst, tsp in zip(timestamp_start, timestamp_stop): # noqa: B905 + try: + tst = _Time(tst) + tsp = _Time(tsp) + except (TypeError, ValueError) as err: + raise _exceptions.TypeError( + '`timestamp_start` and `timestamp_stop` must be either ' + 'timestamp string, integer timestamp or Time objects. ' + 'Or an iterable of these objects.' + ) from err + tstarts, tstops = _get_time_intervals( + tst, tsp, inter, return_isoformat=True + ) + if isinstance(tstarts, (list, tuple)): + tstamps_start.extend(tstarts) + tstamps_stop.extend(tstops) + else: + tstamps_start.append(tstarts) + tstamps_stop.append(tstops) + + pvname_orig = list(pvnames) + if proc_type: + process_str = proc_type + if proc_type != 'ncount' and proc_type_param1 is not None: + if 'deadBand' in process_str: + decim = -int(_math.log10(abs(proc_type_param1))) + 1 + process_str += f'_{proc_type_param1:{max(0, decim)}f}' + else: + process_str += f'_{int(proc_type_param1):d}' + if 'flyers' in proc_type and proc_type_param2 is not None: + process_str += f'_{proc_type_param2:.2f}' + pvnames = [process_str + '(' + pvn + ')' for pvn in pvnames] pvn2idcs = dict() all_urls = list() - for i, pvn in enumerate(pvname): + for i, pvn in enumerate(pvnames): urls = [] - for tstart, tstop in zip(timestamp_start, timestamp_stop): + for tst, tsp in zip(tstamps_start, tstamps_stop): # noqa: B905 urls.append( self._create_url( method='getData.json', pv=pvn, **{ - 'from': _urllib.parse.quote(tstart), - 'to': _urllib.parse.quote(tstop), + 'from': _urllib.parse.quote(tst), + 'to': _urllib.parse.quote(tsp), }, ) ) @@ -284,10 +829,23 @@ def getData( end = len(all_urls) pvn2idcs[pvname_orig[i]] = _np.arange(ini, end) - resps = self._make_request(all_urls, return_json=True) - if not resps: - return None + all_urls = all_urls[0] if len(all_urls) == 1 else all_urls + if return_pvn2idcs_dict: + return all_urls, pvn2idcs + return all_urls + + def process_resquest_of_get_data(self, pvnames, resps, pvn2idcs): + """Process result of `self.get_data` request. + + Args: + pvnames (list): list of PV names envolved in request. + resps (dict): output of `self.make_request` called from + `self.get_data`. + pvn2idcs (dict): list of pvnames to indices in `resps`. + Returns: + pvn2resp (dict): dictionary with PVs data. + """ pvn2resp = dict() for pvn, idcs in pvn2idcs.items(): _ts, _vs = _np.array([]), list() @@ -305,14 +863,12 @@ def getData( _st = _np.r_[_st, [v['status'] for v in data]] _sv = _np.r_[_sv, [v['severity'] for v in data]] if not _ts.size: - timestamp, value, status, severity = [None, None, None, None] + timestamp = value = status = severity = None else: _, _tsidx = _np.unique(_ts, return_index=True) - timestamp, status, severity = ( - _ts[_tsidx], - _st[_tsidx], - _sv[_tsidx], - ) + timestamp = _ts[_tsidx] + status = _st[_tsidx] + severity = _sv[_tsidx] value = [_vs[i] for i in _tsidx] pvn2resp[pvn] = dict( @@ -322,27 +878,27 @@ def getData( severity=severity, ) - if len(pvname) == 1: - return pvn2resp[pvname_orig[0]] + if len(pvnames) == 1: + return pvn2resp[pvnames[0]] return pvn2resp - def getPVDetails(self, pvname, get_request_url=False): - """Get PV Details.""" - url = self._create_url(method='getPVDetails', pv=pvname) - if get_request_url: - return url - resp = self._make_request(url, return_json=True) - return None if not resp else resp + # ------------- General purpose methods -------------- - def switch_to_online_data(self): - """.""" - self.server_url = _envars.SRVURL_ARCHIVER - self.session = None + def make_request(self, url, need_login=False): + """Make request. - def switch_to_offline_data(self): - """.""" - self.server_url = _envars.SRVURL_ARCHIVER_OFFLINE_DATA - self.session = None + Args: + url (str|list|tuple): url or list of urls to request. + need_login (bool): whether request requires login. + + Returns: + dict: dictionary with response. + """ + self._request_url = url + _log.debug('Number of urls: %d', len(url)) + + coro = self._handle_request_async(url, need_login=need_login) + return self._run_sync_coro(coro) @staticmethod def gen_archviewer_url_link( @@ -352,7 +908,7 @@ def gen_archviewer_url_link( time_ref=None, pvoptnrpts=None, pvcolors=None, - pvusediff=False + pvusediff=False, ): """Generate a Archiver Viewer URL for the given PVs. @@ -396,7 +952,8 @@ def gen_archviewer_url_link( # Thanks to Rafael Lyra for the basis of this implementation! archiver_viewer_url = _envars.SRVURL_ARCHIVER_VIEWER + '/?pvConfig=' args = ClientArchiver._process_url_link_args( - pvnames, pvoptnrpts, pvcolors, pvusediff) + pvnames, pvoptnrpts, pvcolors, pvusediff + ) pvoptnrpts, pvcolors, pvusediff = args pv_search = '' for idx in range(len(pvnames)): @@ -455,124 +1012,126 @@ def _process_url_link_args(pvnames, pvoptnrpts, pvcolors, pvusediff): pvusediff = [pvusediff] * len(pvnames) return pvoptnrpts, pvcolors, pvusediff - def _make_request(self, url, need_login=False, return_json=False): - """Make request.""" - self._request_url = url - response = self._run_async_event_loop( - self._handle_request, - url, - return_json=return_json, - need_login=need_login, + def _loop_alive(self): + """Check if thread is alive and loop is running.""" + return ( + self._thread is not None + and self._thread.is_alive() + and self._loop is not None + and self._loop.is_running() ) - return response + + def _cancel_all_tasks(self): + """Helper to cancel tasks (must be called from the loop's thread).""" + if hasattr(_asyncio, 'all_tasks'): + all_tasks = _asyncio.all_tasks(loop=self._loop) + else: # python 3.6 + all_tasks = _asyncio.Task.all_tasks(loop=self._loop) + + for task in all_tasks: + task.cancel() + + def _run_event_loop(self): + _asyncio.set_event_loop(self._loop) + try: + self._loop.run_forever() + finally: + self._loop.close() def _create_url(self, method, **kwargs): """Create URL.""" - url = self._url + url = self._url + self.ENDPOINT if method.startswith('getData.json'): - url += '/retrieval/data' - else: - url += self.ENDPOINT + url = self._url + '/retrieval/data' + url += '/' + method if kwargs: url += '?' - url += '&'.join(['{}={}'.format(k, v) for k, v in kwargs.items()]) + url += '&'.join([f'{k}={v}' for k, v in kwargs.items()]) return url - # ---------- async methods ---------- + def _run_sync_coro(self, coro): + """Run an async coroutine synchronously, compatible with Jupyter.""" + if not self._thread.is_alive(): + raise RuntimeError('Library is shut down') + future = _asyncio.run_coroutine_threadsafe(coro, self._loop) + return future.result() - def _run_async_event_loop(self, *args, **kwargs): - # NOTE: Run the asyncio commands in a separated Thread to isolate - # their EventLoop from the external environment (important for class - # to work within jupyter notebook environment). - _thread = _Thread( - target=self._thread_run_async_event_loop, - daemon=True, - args=args, - kwargs=kwargs, - ) - _thread.start() - _thread.join() - return self._ret - - def _thread_run_async_event_loop(self, func, *args, **kwargs): - """Get event loop.""" - close = False - try: - loop = _asyncio.get_event_loop() - except RuntimeError as error: - if 'no current event loop' in str(error): - loop = _asyncio.new_event_loop() - _asyncio.set_event_loop(loop) - close = True - else: - raise error - try: - self._ret = loop.run_until_complete(func(*args, **kwargs)) - except _asyncio.TimeoutError: - raise _exceptions.TimeoutError - - if close: - loop.close() + # ---------- async methods ---------- - async def _handle_request(self, url, return_json=False, need_login=False): + async def _handle_request_async(self, url, need_login=False): """Handle request.""" + self._semaphore = _asyncio.Semaphore(self._query_max_concurrency) if self.session is not None: - response = await self._get_request_response( - url, self.session, return_json - ) + response = await self._get_request_response(url, self.session) elif need_login: raise _exceptions.AuthenticationError('You need to login first.') else: - async with _ClientSession() as sess: - response = await self._get_request_response( - url, sess, return_json - ) + # NOTE: we need to define a connector with ssl=False so that url + # with IP address can be requested without SSL errors. + async with _ClSession(connector=_TCPConn(ssl=False)) as sess: + response = await self._get_request_response(url, sess) + self._semaphore = None return response - async def _get_request_response(self, url, session, return_json): + async def _get_request_response(self, url, session): """Get request response.""" + single = isinstance(url, str) + url = [url] if single else url try: - if isinstance(url, list): - response = await _asyncio.gather(*[ - session.get(u, ssl=False, timeout=self._timeout) - for u in url - ]) - if any([not r.ok for r in response]): + response = await _asyncio.gather(*[ + self._fetch_url(session, u) for u in url + ]) + except _asyncio.TimeoutError as err: + raise _exceptions.TimeoutError( + 'Timeout reached. Try to:\n - increase `query_timeout`;' + '\n - decrease `query_split_interval`;' + '\n - decrease the time interval for the aquisition;' + ) from err + except _aio_exceptions.ClientPayloadError as err: + raise _exceptions.PayloadError( + 'Payload Error. This is probably due to some bug in the ' + 'code or some unexpected response from the server.\n' + 'Please, report this to the developers with the traceback ' + 'and the query url.' + ) from err + + if single: + return response[0] + return response + + async def _fetch_url(self, session, url): + async with self._semaphore: + _log.debug('Fetching URL: %s', url) + async with session.get(url, timeout=self._query_timeout) as resp: + if resp.status != 200: return None - if return_json: - jsons = list() - for res in response: - try: - data = await res.json() - jsons.append(data) - except ValueError: - _log.error(f'Error with URL {res.url}') - jsons.append(None) - response = jsons - else: - response = await session.get( - url, ssl=False, timeout=self._timeout - ) - if not response.ok: + try: + return await resp.json() + except _aio_exceptions.ContentTypeError: + # for cases where response returns html (self.connected). + return await resp.text() + except ValueError: + _log.error('Error with URL %s', resp.url) return None - if return_json: - try: - response = await response.json() - except ValueError: - _log.error(f'Error with URL {response.url}') - response = None - except _asyncio.TimeoutError as err_msg: - raise _exceptions.TimeoutError(err_msg) - return response - async def _create_session(self, url, headers, payload, ssl): + async def _create_session(self, url, payload): """Create session and handle login.""" - session = _ClientSession() + # NOTE: we need to define a connector with ssl=False so that url with + # IP address can be requested without SSL errors. This is needed in + # the control room, where the server is accessed through its IP + # address and not a domain name. + headers = { + 'User-Agent': 'Mozilla/5.0', + 'Host': 'cnpem.br', # NOTE: this is required (404 otherwise). + 'content-type': 'application/x-www-form-urlencoded', + } + + session = _ClSession(connector=_TCPConn(ssl=False)) async with session.post( - url, headers=headers, data=payload, ssl=ssl, timeout=self._timeout + url, headers=headers, data=payload, timeout=self._query_timeout ) as response: - content = await response.content.read() + content = await response.read() authenticated = b'authenticated' in content return session, authenticated diff --git a/siriuspy/siriuspy/clientarch/devices.py b/siriuspy/siriuspy/clientarch/devices.py index e01969fc1..ea73b4cf0 100644 --- a/siriuspy/siriuspy/clientarch/devices.py +++ b/siriuspy/siriuspy/clientarch/devices.py @@ -40,7 +40,9 @@ def __init__(self, devname, propty='', connector=None): self._times = None self._values = None super().__init__(pvnames, connector=connector) - self._parallel_query_bin_interval = 3600 + self.query_split_interval = 3600 + self.processing_type = self.ProcessingTypes.Mean + self.processing_type_param1 = 1 @property def devnames(self): @@ -57,12 +59,12 @@ def values(self): """Return retrieved orbit interpolated values.""" return self._values - def update(self, mean_sec=None, parallel=True): + def update(self, query_timeout=None): """Update state by retrieving data.""" - super().update(mean_sec=mean_sec, parallel=parallel) + super().update(query_timeout=query_timeout) # interpolate data - self._times, self._values = self._interpolate_data(mean_sec) + self._times, self._values = self._interpolate_data() # --- private methods --- @@ -71,9 +73,10 @@ def _get_pvnames(self): pvnames = [] return devnames, pvnames - def _interpolate_data(self, mean_sec): + def _interpolate_data(self): # calc mean_sec if not passed nr_pvs = len(self._pvdata) + mean_sec = self.processing_type_param1 if mean_sec is None: mean_sec = sum( map( @@ -84,7 +87,7 @@ def _interpolate_data(self, mean_sec): mean_sec /= nr_pvs # times vector - t0_, t1_ = self.timestamp_start, self.timestamp_stop + t0_, t1_ = self.time_start.timestamp(), self.time_stop.timestamp() times = _np.arange(t0_, t1_, mean_sec) # builds orbit matrix using interpolation diff --git a/siriuspy/siriuspy/clientarch/exceptions.py b/siriuspy/siriuspy/clientarch/exceptions.py index 9d5939069..9a7e35957 100644 --- a/siriuspy/siriuspy/clientarch/exceptions.py +++ b/siriuspy/siriuspy/clientarch/exceptions.py @@ -2,6 +2,8 @@ import asyncio as _asyncio +import aiohttp.client_exceptions as _aio_excep + class ClientArchError(Exception): """ClientArch Abstract Exception.""" @@ -15,6 +17,10 @@ class TimeoutError(ClientArchError, _asyncio.TimeoutError): """ClientArch Timeout Exception.""" +class PayloadError(ClientArchError, _aio_excep.ClientPayloadError): + """ClientArch Timeout Exception.""" + + class RuntimeError(ClientArchError, RuntimeError): """ClientArch Runtime Exception.""" diff --git a/siriuspy/siriuspy/clientarch/pvarch.py b/siriuspy/siriuspy/clientarch/pvarch.py index ff89062c0..35aada5b8 100644 --- a/siriuspy/siriuspy/clientarch/pvarch.py +++ b/siriuspy/siriuspy/clientarch/pvarch.py @@ -3,38 +3,35 @@ from copy import deepcopy as _dcopy import numpy as _np -from mathphys.functions import load_pickle as _load_pickle, \ +from mathphys.functions import ( + load_pickle as _load_pickle, save_pickle as _save_pickle +) from .. import envars as _envars from . import exceptions as _exceptions from .client import ClientArchiver as _ClientArchiver -from .time import get_time_intervals as _get_time_intervals, Time as _Time +from .time import Time as _Time class _Base: - DEF_PARALLEL_QUERY_BIN_INTERVAL = 12 * 60 * 60 # 12h - def __init__(self, connector=None, offline_data=False): self._connector = None - self._offline_data = offline_data - self._time_start = None - self._time_stop = None self.connector = connector - self.connect() + self.connect(offline_data=offline_data) @property def is_archived(self): """Is archived.""" self.connect() - return self.connector.getPVDetails(self.pvname) is not None + return self.connector.get_pv_details(self.pvname) is not None - def connect(self): + def connect(self, offline_data=False): """Connect.""" if self.connector is None: url_off = _envars.SRVURL_ARCHIVER_OFFLINE_DATA url_on = _envars.SRVURL_ARCHIVER - url = url_off if self._offline_data else url_on + url = url_off if offline_data else url_on self._connector = _ClientArchiver(server_url=url) @property @@ -57,18 +54,32 @@ def connector(self, conn): @property def is_offline_data(self): - """.""" - return self._offline_data + """Whether server url points to online or offline data. + + Return None in case the url is not recognized as either online or + offline. + """ + if self._connector.server_url == _envars.SRVURL_ARCHIVER_OFFLINE_DATA: + return True + elif self._connector.server_url == _envars.SRVURL_ARCHIVER: + return False + else: + return None @property - def timeout(self): - """Connection timeout.""" - return self.connector.timeout + def query_timeout(self): + """Request timeout for each query. + + This is a global setting for the connector, so all PVData objects + share it, but we allow it to be set through PVDataSet for convenience. + + """ + return self.connector.query_timeout - @timeout.setter - def timeout(self, value): - """Set connection timeout.""" - self.connector.timeout = float(value) + @query_timeout.setter + def query_timeout(self, value): + """Set request timeout for each query.""" + self.connector.query_timeout = float(value) @property def connected(self): @@ -87,64 +98,6 @@ def switch_to_offline_data(self): if self.connector: self.connector.switch_to_offline_data() - @property - def timestamp_start(self): - """Timestamp start.""" - if not self._time_start: - return None - return self._time_start.timestamp() - - @timestamp_start.setter - def timestamp_start(self, new_timestamp): - if not isinstance(new_timestamp, (float, int)): - raise _exceptions.TypeError( - 'expected argument of type float or int, got ' - + str(type(new_timestamp)) - ) - self._time_start = _Time(timestamp=new_timestamp) - - @property - def time_start(self): - """Time start.""" - return self._time_start - - @time_start.setter - def time_start(self, new_time): - if not isinstance(new_time, _Time): - raise _exceptions.TypeError( - 'expected argument of type Time, got ' + str(type(new_time)) - ) - self._time_start = new_time - - @property - def timestamp_stop(self): - """Timestamp stop.""" - if not self._time_stop: - return None - return self._time_stop.timestamp() - - @timestamp_stop.setter - def timestamp_stop(self, new_timestamp): - if not isinstance(new_timestamp, (float, int)): - raise _exceptions.TypeError( - 'expected argument of type float or int, got ' - + str(type(new_timestamp)) - ) - self._time_stop = _Time(timestamp=new_timestamp) - - @property - def time_stop(self): - """Time stop.""" - return self._time_stop - - @time_stop.setter - def time_stop(self, new_time): - if not isinstance(new_time, _Time): - raise _exceptions.TypeError( - 'expected argument of type Time, got ' + str(type(new_time)) - ) - self._time_stop = new_time - def gen_archviewer_url_link( self, pvnames, @@ -153,7 +106,7 @@ def gen_archviewer_url_link( time_ref=None, pvoptnrpts=None, pvcolors=None, - pvusediff=False + pvusediff=False, ): """Generate a Archiver Viewer URL for the given PVs. @@ -191,7 +144,8 @@ def gen_archviewer_url_link( time_ref=time_ref, pvoptnrpts=pvoptnrpts, pvcolors=pvcolors, - pvusediff=pvusediff) + pvusediff=pvusediff, + ) return url @@ -236,24 +190,23 @@ def __init__(self, pvname, connector=None): def request_url(self): """.""" self.connect() - url = self.connector.getPVDetails(self.pvname, get_request_url=True) + url = self.connector.get_pv_details(self.pvname, get_request_url=True) return url - @property - def is_archived(self): + def update(self, query_timeout=None): # noqa: C901 """.""" self.connect() - data = self.connector.getPVDetails(self.pvname) - if not data: - return False - return True - def update(self, timeout=None): - """.""" - self.connect() - if timeout is not None: - self.timeout = timeout - data = self.connector.getPVDetails(self.pvname) + if query_timeout is not None: + query_timeout0 = self.query_timeout + self.query_timeout = query_timeout + + try: + data = self.connector.get_pv_details(self.pvname) + finally: + if query_timeout is not None: + self.query_timeout = query_timeout0 + if not data: return False for datum in data: @@ -274,6 +227,7 @@ def update(self, timeout=None): self.is_paused = value.lower() == 'yes' elif field == 'Is this PV currently connected?': self.is_connected = value.lower() == 'yes' + return True def __str__(self): @@ -306,6 +260,8 @@ def __str__(self): class PVData(_Base): """Archive PV Data.""" + ProcessingTypes = _ClientArchiver.ProcessingTypes + def __init__(self, pvname, connector=None, offline_data=False): """Initialize.""" super().__init__(connector, offline_data=offline_data) @@ -314,9 +270,65 @@ def __init__(self, pvname, connector=None, offline_data=False): self._value = None self._status = None self._severity = None - self._parallel_query_bin_interval = ( - _Base.DEF_PARALLEL_QUERY_BIN_INTERVAL + self._time_start = _Time.now() + self._time_stop = self._time_start + self._query_split_interval = self.connector.query_split_interval + self._processing_type = self.ProcessingTypes.None_ + self._processing_type_param1 = None + self._processing_type_param2 = 3.0 # number of sigma + + def __str__(self): + """.""" + stg = '' + stg += 'Connector Properties:\n' + stg += ' {:<30s}: {:d}\n'.format( + 'query_max_concurrency: ', self.query_max_concurrency + ) + stg += ' {:<30s}: {:.1f}\n'.format( + 'query_timeout [s]', self.query_timeout + ) + stg += '\nPV Data Properties:\n' + + tss = self.time_start.get_iso8601() + tsp = self.time_stop.get_iso8601() + stg += ' {:<30s}: {:}\n'.format('pvname', self.pvname) + stg += ' {:<30s}: {:}\n'.format('time_start', tss) + stg += ' {:<30s}: {:}\n'.format('time_stop', tsp) + stg += ' {:<30s}: {:d}\n'.format( + 'query_split_interval [s]', self.query_split_interval ) + prty = self.processing_type + pr1 = self.processing_type_param1 + stg += ' {:<30s}: {:}\n'.format( + 'processing_type', prty if prty else "''" + ) + if prty == self.ProcessingTypes.SelectByChange: + pr1 = 'None' if pr1 is None else f'{pr1:.2g}' + stg += ' {:<30s}: {:}\n'.format( + 'processing_type_param1 [val. units]', pr1 + ) + elif prty != self.ProcessingTypes.None_: + pr1 = 'None' if pr1 is None else f'{pr1:d}' + stg += ' {:<30s}: {:}\n'.format( + 'processing_type_param1 [s]', pr1 + ) + if prty in ( + self.ProcessingTypes.Outliers, + self.ProcessingTypes.IgnoreOutliers, + ): + stg += ' {:<30s}: {:.2g}\n'.format( + 'processing_type_param2 [std/mean]', + self.processing_type_param2, + ) + + stg += ' {:<30s}'.format('Data Length: ') + if self.timestamp is not None: + stg += '{:d}\n'.format(len(self.timestamp)) + else: + stg += 'Not loaded yet.\n' + return stg + + # -------- PV data properties -------- @property def pvname(self): @@ -325,29 +337,17 @@ def pvname(self): @property def request_url(self): - """Request url.""" + """Get request url.""" self.connect() - url = self.connector.getData( - self.pvname, - self.time_start.get_iso8601(), - self.time_stop.get_iso8601(), - get_request_url=True, + return self.connector.get_request_url_for_get_data( + self._pvname, + self.time_start, + self.time_stop, + query_split_interval=self.query_split_interval, + proc_type=self.processing_type, + proc_type_param1=self.processing_type_param1, + proc_type_param2=self.processing_type_param2, ) - return url - - @property - def parallel_query_bin_interval(self): - """Parallel query bin interval.""" - return self._parallel_query_bin_interval - - @parallel_query_bin_interval.setter - def parallel_query_bin_interval(self, new_intvl): - if not isinstance(new_intvl, (float, int)): - raise _exceptions.TypeError( - 'expected argument of type float or int, got ' - + str(type(new_intvl)) - ) - self._parallel_query_bin_interval = new_intvl @property def timestamp(self): @@ -369,35 +369,193 @@ def severity(self): """Severity data.""" return self._severity - def update(self, mean_sec=None, parallel=True, timeout=None): + # ------- PV data acquisition and processing properties -------- + + @property + def query_split_interval(self): + """Queries larger than this interval will be split. + + If set to 0 or None, no splitting will be done. + """ + return self._query_split_interval + + @query_split_interval.setter + def query_split_interval(self, new_intvl): + if new_intvl is None: + new_intvl = 0 + if not isinstance(new_intvl, (float, int)): + raise _exceptions.TypeError( + 'expected argument of type float or int, got ' + + str(type(new_intvl)) + ) + self._query_split_interval = max(int(new_intvl), 0) + + @property + def query_max_concurrency(self): + """Query max concurrency. + + This is a global setting for the connector, so all PVData objects + share it, but we allow it to be set through PVDataSet for convenience. + + """ + return self.connector.query_max_concurrency + + @query_max_concurrency.setter + def query_max_concurrency(self, new_intvl): + self.connector.query_max_concurrency = new_intvl + + @property + def time_start(self): + """Time start. + + Return siriuspy.clientarch.time.Time object. + """ + return self._time_start + + @time_start.setter + def time_start(self, new_time): + """Accept any value that can be converted to a Time object.""" + self._time_start = _Time(new_time) + + @property + def time_stop(self): + """Time stop. + + Return siriuspy.clientarch.time.Time object. + """ + return self._time_stop + + @time_stop.setter + def time_stop(self, new_time): + """Accept any value that can be converted to a Time object.""" + self._time_stop = _Time(new_time) + + @property + def processing_type(self): + """Data processing type to use for query. + + For details on each operator, please, refer to the section + Processing of data of the following page: + https://epicsarchiver.readthedocs.io/en/latest/user/userguide.html + + The options implemented here are: + + The options below do not take any aditional parameter: + '' --> No processing, raw data is returned. + 'ncount' --> total number of updates in the whole interval. + + All types of processing below, require an aditional parameter, + controlled by the input `processing_type_param1`. Then the + refered statistics will be performed within this interval: + 'mean' + 'median' + 'std' + 'variance' + 'popvariance' --> population variance. + 'kurtosis' + 'skewness' + 'mini' --> same as min, which is also accepted by the archiver. + 'maxi' --> same as max, which is also accepted by the archiver. + 'jitter' --> std / mean for each bin. + 'count' --> number of updates in each bin. + 'firstSample' + 'lastSample' + 'firstFill' --> see url for difference to `'firstSample'`. + 'lastFill' --> see url for difference to `'lastSample'`. + 'linear' --> not sure, look at the archiver docs. + 'loess' --> not sure, look at the archiver docs. + + The processing below also use an aditional parameter, but its + meaning is different from the statistics above: + 'optimized' --> the parameter means the total number of points + to be returned, instead of the time interval. + 'optimLastSample' --> close to 'opimized'. See docs for diff. + 'nth' --> return every nth sample. + 'deadBand' --> similar to ADEL. Only return when values change + by a certain amount. + + For both statistics below a second parameter is needed to configure + acquisition, controlled by `processing_type_param2`. This + parameter controls the number of standard deviations to consider + in the filtering bellow. The default of this parameter is 3.0: + 'ignoreflyers' --> whether to ignore outliers + 'flyers' --> only return outliers + """ + return self._processing_type + + @processing_type.setter + def processing_type(self, new_type): + if not isinstance(new_type, str): + raise _exceptions.TypeError( + 'expected argument of type str, got ' + str(type(new_type)) + ) + elif new_type not in self.ProcessingTypes: + raise _exceptions.ValueError( + f'invalid processing type: {new_type}. Must be one of: ' + '`self.ProcessingTypes`.' + ) + self._processing_type = new_type + + @property + def processing_type_param1(self): + """Processing type param1. + + For most processing types, this is a time interval in seconds, but for + some types, it has a different meaning. Please, refer to the + documentation of `processing_type` for details. + """ + return self._processing_type_param1 + + @processing_type_param1.setter + def processing_type_param1(self, new_param): + if not isinstance(new_param, (int, float)): + raise _exceptions.TypeError( + 'expected argument of type int or float, got ' + + str(type(new_param)) + ) + self._processing_type_param1 = new_param + + @property + def processing_type_param2(self): + """Processing type param2. + + See docs for `processing_type`. For most processing types, this is not + used, but for some types, it controls the number of standard + deviations to consider in outlier filtering, with a default value of 3. + """ + return self._processing_type_param2 + + @processing_type_param2.setter + def processing_type_param2(self, new_param): + if not isinstance(new_param, (int, float)): + raise _exceptions.TypeError( + 'expected argument of type int or float, got ' + + str(type(new_param)) + ) + self._processing_type_param2 = new_param + + def update(self, query_timeout=None): """Update.""" self.connect() - if timeout is not None: - self.timeout = timeout - if None in (self.timestamp_start, self.timestamp_stop): - print('Start and stop timestamps not defined! Aborting.') - return - process_type = 'mean' if mean_sec is not None else '' - interval = self.parallel_query_bin_interval - if parallel: - timestamp_start, timestamp_stop = _get_time_intervals( + if query_timeout is not None: + query_timeout0 = self.query_timeout + self.query_timeout = query_timeout + + try: + data = self.connector.get_data( + self._pvname, self.time_start, self.time_stop, - interval, - return_isoformat=True, + query_split_interval=self.query_split_interval, + proc_type=self.processing_type, + proc_type_param1=self.processing_type_param1, + proc_type_param2=self.processing_type_param2, ) - else: - timestamp_start = self.time_start.get_iso8601() - timestamp_stop = self.time_stop.get_iso8601() + finally: + if query_timeout is not None: + self.query_timeout = query_timeout0 - data = self.connector.getData( - self._pvname, - timestamp_start, - timestamp_stop, - process_type=process_type, - interval=mean_sec, - ) if not data: return self.set_data(**data) @@ -410,7 +568,7 @@ def gen_archviewer_url_link( time_ref=None, pvoptnrpts=None, pvcolors=None, - pvusediff=False + pvusediff=False, ): """Generate a Archiver Viewer URL for the given PVs. @@ -452,7 +610,8 @@ def gen_archviewer_url_link( time_ref=time_ref, pvoptnrpts=pvoptnrpts, pvcolors=pvcolors, - pvusediff=pvusediff) + pvusediff=pvusediff, + ) return url def set_data(self, timestamp, value, status, severity): @@ -473,6 +632,12 @@ def to_dict(self): pvname (str): the name of the PV. timestamp_start (time.time): start of acquisition time. timestamp_stop (time.time): end of acquisition time. + query_split_interval (int): interval to split queries. + query_max_concurrency (int): max concurrency for queries. + query_timeout (float): timeout for queries. + processing_type (str): type of processing for queries. + processing_type_param1 (float or int): param 1 for processing. + processing_type_param2 (float or int): param 2 for processing. data (dict): dictionary with archiver data with fields: value (numpy.ndarray): values of the PV. timestamp (numpy.ndarray): timestamps of the PV. @@ -483,8 +648,14 @@ def to_dict(self): return dict( server_url=self.connector.server_url, pvname=self.pvname, - timestamp_start=self.timestamp_start, - timestamp_stop=self.timestamp_stop, + timestamp_start=self.time_start.timestamp(), + timestamp_stop=self.time_stop.timestamp(), + query_split_interval=self.query_split_interval, + query_max_concurrency=self.query_max_concurrency, + query_timeout=self.query_timeout, + processing_type=self.processing_type, + processing_type_param1=self.processing_type_param1, + processing_type_param2=self.processing_type_param2, data=dict( timestamp=self.timestamp, value=self.value, @@ -506,8 +677,14 @@ def from_dict(infos): """ pvdata = PVData(infos['pvname'], connector=infos['server_url']) - pvdata.timestamp_start = infos['timestamp_start'] - pvdata.timestamp_stop = infos['timestamp_stop'] + pvdata.time_start = infos['timestamp_start'] + pvdata.time_stop = infos['timestamp_stop'] + pvdata.query_split_interval = infos['query_split_interval'] + pvdata.query_max_concurrency = infos['query_max_concurrency'] + pvdata.query_timeout = infos['query_timeout'] + pvdata.processing_type = infos['processing_type'] + pvdata.processing_type_param1 = infos['processing_type_param1'] + pvdata.processing_type_param2 = infos['processing_type_param2'] pvdata.set_data(**infos['data']) return pvdata @@ -541,15 +718,73 @@ def from_pickle(fname): class PVDataSet(_Base): """A set of PVData objects.""" + ProcessingTypes = _ClientArchiver.ProcessingTypes + def __init__(self, pvnames, connector=None, offline_data=False): """Initialize.""" super().__init__(connector, offline_data=offline_data) self._pvnames = pvnames - self._parallel_query_bin_interval = ( - _Base.DEF_PARALLEL_QUERY_BIN_INTERVAL - ) self._pvdata = self._init_pvdatas(pvnames, self.connector) + def __str__(self): + """.""" + stg = '' + stg += 'Connector Properties:\n' + stg += ' {:<30s}: {:d}\n'.format( + 'query_max_concurrency', self.query_max_concurrency + ) + stg += ' {:<30s}: {:.1f}\n'.format( + 'query_timeout [s]', self.query_timeout + ) + stg += '\nPV Data Properties:\n' + tmpl = ' {:<30s} {:^30s} {:^30s} {:^15s} ' + tmpl += '{:^12s} {:^10s} {:^10s} {:^10s}\n' + stg += tmpl.format( + 'PV Name', + 'Time Start', + 'Time Stop', + 'Bin Interval', + 'Proc. Type', + 'Param1', + 'Param2', + 'Data Length', + ) + for pvn, pvd in self._pvdata.items(): + prty = pvd.processing_type + prty = prty if prty else "''" + + pr1 = pvd.processing_type_param1 + pr1s = 'None' if pr1 is None else f'{pr1:d}' + if prty != self.ProcessingTypes.SelectByChange: + pr1s = 'None' if pr1 is None else f'{pr1:.1g}' + elif prty != self.ProcessingTypes.None_: + pr1s = 'N/A' + + pr2 = 'N/A' + if prty in ( + self.ProcessingTypes.Outliers, + self.ProcessingTypes.IgnoreOutliers, + ): + pr2 = f'{pvd.processing_type_param2:.1g}' + + dlen = 'Not Loaded' + if pvd.timestamp is not None: + stg += f'{len(pvd.timestamp):d}' + + stg += tmpl.format( + pvn, + pvd.time_start.get_iso8601(), + pvd.time_stop.get_iso8601(), + f'{pvd.query_split_interval:d}', + prty, + pr1s, + pr2, + dlen, + ) + return stg + + # -------- Properties to control data acquisition and processing -------- + @property def pvnames(self): """PV names.""" @@ -560,12 +795,199 @@ def pvnames(self, new_pvnames): self._pvnames = new_pvnames self._pvdata = self._init_pvdatas(new_pvnames, self.connector) + @property + def query_split_interval(self): + """Queries larger than this interval will be split. + + If set to 0 or None, no splitting will be done. + """ + qry = [self._pvdata[pvn].query_split_interval for pvn in self._pvnames] + if len(set(qry)) == 1: + return qry[0] + return qry + + @query_split_interval.setter + def query_split_interval(self, value): + if value is None: + value = 0 + if isinstance(value, (int, float)): + value = len(self._pvnames) * [int(value)] + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].query_split_interval = val + + @property + def query_max_concurrency(self): + """Query max concurrency. + + This is a global setting for the connector, so all PVData objects + share it, but we allow it to be set through PVDataSet for convenience. + + """ + return self.connector.query_max_concurrency + + @query_max_concurrency.setter + def query_max_concurrency(self, new_intvl): + self.connector.query_max_concurrency = new_intvl + + @property + def time_start(self): + """Start time.""" + tstt = [self._pvdata[pvn].time_start for pvn in self._pvnames] + if len(set(tstt)) == 1: + return tstt[0] + return tstt + + @time_start.setter + def time_start(self, value): + """Accept any value that can be converted to a Time object.""" + try: + value = _Time(value) + value = [value] * len(self._pvnames) + except Exception: # noqa: S110 + pass + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].time_start = val + + @property + def time_stop(self): + """Stop time.""" + tstt = [self._pvdata[pvn].time_stop for pvn in self._pvnames] + if len(set(tstt)) == 1: + return tstt[0] + return tstt + + @time_stop.setter + def time_stop(self, value): + """Accept any value that can be converted to a Time object.""" + try: + value = _Time(value) + value = [value] * len(self._pvnames) + except Exception: # noqa: S110 + pass + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].time_stop = val + + @property + def processing_type(self): + """Data processing type to use for query. + + For details on each operator, please, refer to the section + Processing of data of the following page: + https://epicsarchiver.readthedocs.io/en/latest/user/userguide.html + + The options implemented here are: + + The options below do not take any aditional parameter: + '' --> No processing, raw data is returned. + 'ncount' --> total number of updates in the whole interval. + + All types of processing below, require an aditional parameter, + controlled by the input `proc_type_param1`. Then the + refered statistics will be performed within this interval: + 'mean' + 'median' + 'std' + 'variance' + 'popvariance' --> population variance. + 'kurtosis' + 'skewness' + 'mini' --> same as min, which is also accepted by the archiver. + 'maxi' --> same as max, which is also accepted by the archiver. + 'jitter' --> std / mean for each bin. + 'count' --> number of updates in each bin. + 'firstSample' + 'lastSample' + 'firstFill' --> see url for difference to `'firstSample'`. + 'lastFill' --> see url for difference to `'lastSample'`. + 'linear' --> not sure, look at the archiver docs. + 'loess' --> not sure, look at the archiver docs. + + The processing below also use an aditional parameter, but its + meaning is different from the statistics above: + 'optimized' --> the parameter means the total number of points + to be returned, instead of the time interval. + 'optimLastSample' --> close to 'opimized'. See docs for diff. + 'nth' --> return every nth sample. + 'deadBand' --> similar to ADEL. Only return when values change + by a certain amount. + + For both statistics below a second parameter is needed to configure + acquisition, controlled by `proc_type_param2`. This + parameter controls the number of standard deviations to consider + in the filtering bellow. The default of this parameter is 3.0: + 'ignoreflyers' --> whether to ignore outliers + 'flyers' --> only return outliers + """ + proc = [self._pvdata[pvn].processing_type for pvn in self._pvnames] + if len(set(proc)) == 1: + return proc[0] + return proc + + @processing_type.setter + def processing_type(self, value): + if isinstance(value, str): + value = len(self._pvnames) * [value] + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].processing_type = val + + @property + def processing_type_param1(self): + """Processing type param1.""" + param = [ + self._pvdata[pvn].processing_type_param1 for pvn in self._pvnames + ] + if len(set(param)) == 1: + return param[0] + return param + + @processing_type_param1.setter + def processing_type_param1(self, value): + if value is None or isinstance(value, (int, float)): + value = len(self._pvnames) * [value] + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].processing_type_param1 = val + + @property + def processing_type_param2(self): + """Processing type param2.""" + param = [ + self._pvdata[pvn].processing_type_param2 for pvn in self._pvnames + ] + if len(set(param)) == 1: + return param[0] + return param + + @processing_type_param2.setter + def processing_type_param2(self, value): + if value is None or isinstance(value, (int, float)): + value = len(self._pvnames) * [value] + if len(value) != len(self._pvnames): + raise ValueError('value must have the same length as pvnames') + + for pvn, val in zip(self._pvnames, value): # noqa: B905 + self._pvdata[pvn].processing_type_param2 = val + @property def is_archived(self): """Is archived.""" self.connect() for pvn in self._pvnames: - if self.connector.getPVDetails(pvn) is None: + if self.connector.get_pv_details(pvn) is None: return False return True @@ -575,7 +997,7 @@ def not_archived(self): self.connect() not_archived = list() for pvn in self._pvnames: - if self.connector.getPVDetails(pvn) is None: + if self.connector.get_pv_details(pvn) is None: not_archived.append(pvn) return not_archived @@ -585,52 +1007,45 @@ def archived(self): archived = set(self._pvnames) - set(self.not_archived) return list(archived) - @property - def parallel_query_bin_interval(self): - """Parallel query bin interval.""" - return self._parallel_query_bin_interval - - @parallel_query_bin_interval.setter - def parallel_query_bin_interval(self, new_intvl): - if not isinstance(new_intvl, (float, int)): - raise _exceptions.TypeError( - 'expected argument of type float or int, got ' - + str(type(new_intvl)) - ) - self._parallel_query_bin_interval = new_intvl - for pvname in self._pvnames: - self._pvdata[ - pvname - ].parallel_query_bin_interval = self._parallel_query_bin_interval - - def update(self, mean_sec=None, parallel=True, timeout=None): + def update(self, query_timeout=None): """Update.""" self.connect() - if timeout is not None: - self.timeout = timeout - if None in (self.timestamp_start, self.timestamp_stop): - print('Start and stop timestamps not defined! Aborting.') - return - process_type = 'mean' if mean_sec is not None else '' - interval = self.parallel_query_bin_interval - if parallel: - timestamp_start, timestamp_stop = _get_time_intervals( - self.time_start, - self.time_stop, - interval, - return_isoformat=True, + if query_timeout is not None: + query_timeout0 = self.query_timeout + self.query_timeout = query_timeout + + all_urls = [] + pvn2idcs = dict() + for pvn in self._pvnames: + pvd = self._pvdata[pvn] + urls = self.connector.get_request_url_for_get_data( + pvn, + pvd.time_start, + pvd.time_stop, + query_split_interval=pvd.query_split_interval, + proc_type=pvd.processing_type, + proc_type_param1=pvd.processing_type_param1, + proc_type_param2=pvd.processing_type_param2, + return_pvn2idcs_dict=False, ) - else: - timestamp_start = self.time_start.get_iso8601() - timestamp_stop = self.time_stop.get_iso8601() - - data = self.connector.getData( - self._pvnames, - timestamp_start, - timestamp_stop, - process_type=process_type, - interval=mean_sec, + urls = [urls] if isinstance(urls, str) else urls + ini = len(all_urls) + all_urls.extend(urls) + end = len(all_urls) + pvn2idcs[pvn] = _np.arange(ini, end) + + try: + resps = self.connector.make_request(all_urls) + finally: + if query_timeout is not None: + self.query_timeout = query_timeout0 + + if not resps: + return None + + data = self.connector.process_resquest_of_get_data( + self._pvnames, resps, pvn2idcs ) if not data: @@ -649,7 +1064,7 @@ def gen_archviewer_url_link( time_ref=None, pvoptnrpts=None, pvcolors=None, - pvusediff=False + pvusediff=False, ): """Generate a Archiver Viewer URL for the given PVs. @@ -693,21 +1108,12 @@ def gen_archviewer_url_link( time_ref=time_ref, pvoptnrpts=pvoptnrpts, pvcolors=pvcolors, - pvusediff=pvusediff) + pvusediff=pvusediff, + ) return url def _init_pvdatas(self, pvnames, connector): - pvdata = dict() - for pvname in pvnames: - pvdata[pvname] = PVData(pvname, connector) - pvdata[ - pvname - ].parallel_query_bin_interval = self._parallel_query_bin_interval - if self.time_start is not None: - pvdata[pvname].time_start = self.time_start - if self.time_stop is not None: - pvdata[pvname].time_stop = self._time_stop - return pvdata + return {pvname: PVData(pvname, connector) for pvname in pvnames} def __getitem__(self, val): """Get item.""" @@ -740,12 +1146,7 @@ def to_dict(self): all PVs. Compatible input for PVData.from_dict. """ - data = dict( - server_url=self.connector.server_url, - pvnames=self.pvnames, - timestamp_start=self.timestamp_start, - timestamp_stop=self.timestamp_stop, - ) + data = dict(server_url=self.connector.server_url, pvnames=self.pvnames) data['pvdata_info'] = [self[pvn].to_dict() for pvn in self._pvnames] return data @@ -762,10 +1163,8 @@ def from_dict(info): """ pvdataset = PVDataSet(info['pvnames'], info['server_url']) - pvdataset.timestamp_start = info['timestamp_start'] - pvdataset.timestamp_stop = info['timestamp_stop'] for i, pvdata in enumerate(pvdataset): - pvdata.set_data(**info['pvdata_info'][i]['data']) + pvdata.from_dict(**info['pvdata_info'][i]) return pvdataset def to_pickle(self, fname, overwrite=False): diff --git a/siriuspy/siriuspy/clientarch/time.py b/siriuspy/siriuspy/clientarch/time.py index ef5472b17..8e2c19c08 100644 --- a/siriuspy/siriuspy/clientarch/time.py +++ b/siriuspy/siriuspy/clientarch/time.py @@ -3,21 +3,29 @@ from calendar import timegm as _timegm from datetime import datetime as _datetime, timedelta as _timedelta -from . import exceptions as _exceptions +import numpy as _np class Time(_datetime): - """Time conversion class. + """Time class.""" - Usage options: + _DEFAULT_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S.%f' + + def __new__(cls, *args, **kwargs): # noqa: D417, C901 + """Create Time object. + + Usage options: + + Time(datetime) + datetime is a keyword/positional argument of datetime|Time class. Time(timestamp) - timestamp is a float/int keyword/positional argument. + timestamp is a float|int keyword/positional argument. Time(timestamp_string) Time(timestamp_string, timestamp_format='%Y-%m-%d %H:%M:%S.%f') - timestamp_string is a str keyword/positional argument. - timestamp_format is an optional keyword argument for string - formating. Defaults to '%Y-%m-%d %H:%M:%S.%f'. + `timestamp_string` is a str keyword/positional argument. + `timestamp_format` is an optional keyword argument for string + formating. Defaults to '%Y-%m-%d %H:%M:%S.%f' or iso8601. Time(year, month, day) Time(year, month, day, hour) @@ -29,68 +37,118 @@ class Time(_datetime): are integer keyword/positional arguments. tzinfo must be None or of a tzinfo subclass keyword/positional argument. - """ - _DEFAULT_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S.%f' - _DATETIME_ARGS = { - 'year', - 'month', - 'day', - 'hour', - 'minute', - 'second', - 'microsecond', - 'tzinfo', - } - - def __new__(cls, *args, **kwargs): - """New object.""" + Any of the above options (apart from the last) can be used with an + additional keyword argument for `tzinfo`. + + Args: + datetime (datetime|Time): keyword/positional argument. + timestamp (float|int): keyword/positional argument. + timestamp_string (str): keyword/positional argument. + timestamp_format (str): keyword argument for string formating. + year (int): keyword/positional argument. + month (int): keyword/positional argument. + day (int): keyword/positional argument. + hour (int): keyword/positional argument. + minute (int): keyword/positional argument. + second (int): keyword/positional argument. + microsecond (int): keyword/positional argument. + tzinfo (tzinfo): keyword/positional argument. Defaults to None. + """ if not args and not kwargs: - raise _exceptions.TypeError( - 'no arguments found to build Time object' - ) + raise TypeError('no arguments found to build Time object') + if len(args) == 2: + kwargs['tzinfo'] = args[1] + args = args[:1] if len(args) == 1: - if isinstance(args[0], (float, int)): - return Time.fromtimestamp(args[0]) - if isinstance(args[0], str): - timestamp_format = ( - kwargs['timestamp_format'] - if 'timestamp_format' in kwargs - else Time._DEFAULT_TIMESTAMP_FORMAT - ) - return Time.strptime(args[0], timestamp_format) - raise _exceptions.TypeError( - f'argument of unexpected type {type(args[0])}' - ) - if len(kwargs) == 1: - if 'timestamp' in kwargs: - return Time.fromtimestamp(kwargs['timestamp']) - if 'timestamp_string' in kwargs: - return Time.strptime( - kwargs['timestamp_string'], Time._DEFAULT_TIMESTAMP_FORMAT + arg = args[0] + dic_ = { + 'timestamp': (int, float), + 'timestamp_string': (str,), + 'datetime': (_datetime,), + } + if not isinstance(arg, sum(dic_.values(), ())): + raise TypeError(f'Argument of unexpected type {type(arg)}') + + for key, typ in dic_.items(): + if isinstance(arg, typ) and key not in kwargs: + kwargs[key] = arg + break + else: + raise TypeError( + 'Conflicting positional and keyword arguments.' ) - if set(kwargs.keys()) & Time._DATETIME_ARGS: - raise _exceptions.TypeError( - 'missing input arguments, verify usage options.' + elif len(args) == 8: + if 'tzinfo' in kwargs: + raise TypeError( + 'Conflicting positional and keyword arguments.' ) - raise _exceptions.TypeError(f'unexpected key argument {kwargs}') - if len(kwargs) == 2: - if set(kwargs.keys()) == {'timestamp_string', 'timestamp_format'}: - return Time.strptime( - kwargs['timestamp_string'], kwargs['timestamp_format'] + kwargs['tzinfo'] = args[7] + args = args[:7] + + if not {'timestamp', 'timestamp_string'} - kwargs.keys(): + raise TypeError('Conflicting positional and keyword arguments.') + + tz = kwargs.get('tzinfo') + tzl = _datetime.now().astimezone().tzinfo + if 'datetime' in kwargs: + dtim = kwargs['datetime'] + tz = tz or dtim.tzinfo or tzl + obj = super().fromtimestamp(dtim.timestamp(), tz=tz) + elif 'timestamp' in kwargs: + obj = super().fromtimestamp(kwargs['timestamp'], tz=tz or tzl) + elif 'timestamp_string' in kwargs: + ts_str = kwargs['timestamp_string'] + try: + ts_fmt = kwargs.get( + 'timestamp_format', Time._DEFAULT_TIMESTAMP_FORMAT ) - if set(kwargs.keys()) & Time._DATETIME_ARGS: - raise _exceptions.TypeError( - 'missing input arguments, verify usage options.' + obj = ( + super().strptime(ts_str, ts_fmt).replace(tzinfo=tz or tzl) ) - raise _exceptions.TypeError( - f'unexpected key arguments {list(kwargs.keys())}' + except ValueError: + import sys as _sys + if _sys.version_info <= (3, 8): + from dateutil import parser as _dateutil_parser + tim = _dateutil_parser.parse(ts_str) + else: + tim = super().fromisoformat(ts_str) + tz = tz or tim.tzinfo + obj = super().fromtimestamp(tim.timestamp(), tz=tz) + else: + kwargs.setdefault('tzinfo', tzl) + obj = super().__new__(cls, *args, **kwargs) + + # NOTE: This if is necessary for python versions prior to 3.9. + # in this cases, calling super().fromtimestamp with tzinfo returns + # an object of datetime class. + if not isinstance(obj, cls): + return super().__new__( + cls, + obj.year, + obj.month, + obj.day, + obj.hour, + obj.minute, + obj.second, + obj.microsecond, + tzinfo=obj.tzinfo, ) - return super().__new__(cls, *args, **kwargs) + return obj def get_iso8601(self): """Get iso8601 format.""" - return self.astimezone().isoformat() + return self.astimezone(self.tzinfo).isoformat() + + @staticmethod + def now(tz=None): + """Get current time.""" + return _datetime.now(tz).astimezone(tz) + + @staticmethod + def utcnow(): + """Get current UTC time.""" + raise NotImplementedError('utcnow is not implemented for Time class.') def __add__(self, other): """Addition.""" @@ -121,29 +179,41 @@ def conv_to_epoch(time, datetime_format): def get_time_intervals( - time_start, time_stop, interval, return_isoformat=False + time_start: Time, + time_stop: Time, + interval: int, + return_isoformat=False, + tzinfo=None, ): - """Return intervals of 'interval' duration from time_start to time_stop.""" - if time_start + interval >= time_stop: - timestamp_start = ( - time_start.get_iso8601() if return_isoformat else time_start - ) - timestamp_stop = ( - time_stop.get_iso8601() if return_isoformat else time_stop - ) + """Break `time_start` to `time_stop` in intervals of `interval` seconds. + + Args: + time_start (Time): start time. + time_stop (Time): stop time. + interval (int|float|None): interval duration in seconds. + If <= 0 or None, no splitting will be done. + return_isoformat (bool): return in iso8601 format. + tzinfo (tzinfo): timezone info. Defaults to None, which means using + the timezone of `time_start`. + + Returns: + start_time (Time|str | list[Time|str]): start times. + stop_time (Time|str | list[Time|str]): stop times. + """ + tzinfo = tzinfo or time_start.tzinfo + t_start = time_start.timestamp() + t_stop = time_stop.timestamp() + if interval is None or interval <= 0: + t_start = [t_start] + t_stop = [t_stop] else: - t_start = time_start - t_stop = t_start + interval - timestamp_start = [t_start] - timestamp_stop = [t_stop] - while t_stop < time_stop: - t_start += interval - t_stop = t_stop + interval - if t_stop + interval > time_stop: - t_stop = time_stop - timestamp_start.append(t_start) - timestamp_stop.append(t_stop) - if return_isoformat: - timestamp_start = [t.get_iso8601() for t in timestamp_start] - timestamp_stop = [t.get_iso8601() for t in timestamp_stop] - return timestamp_start, timestamp_stop + t_start = _np.arange(t_start, t_stop, int(interval)) + t_stop = _np.r_[t_start[1:], t_stop] + t_start = [Time(t, tzinfo=tzinfo) for t in t_start] + t_stop = [Time(t, tzinfo=tzinfo) for t in t_stop] + if return_isoformat: + t_start = [t.get_iso8601() for t in t_start] + t_stop = [t.get_iso8601() for t in t_stop] + if len(t_start) == 1: + return t_start[0], t_stop[0] + return t_start, t_stop diff --git a/siriuspy/siriuspy/currinfo/main.py b/siriuspy/siriuspy/currinfo/main.py index c85e58e64..74dcda0a0 100644 --- a/siriuspy/siriuspy/currinfo/main.py +++ b/siriuspy/siriuspy/currinfo/main.py @@ -55,8 +55,8 @@ def close(self): @staticmethod def _get_value_from_arch(pvname): carch = _ClientArch() - datetime = _datetime.now().isoformat() + '-03:00' - return carch.getData(pvname, datetime, datetime) + datetime = _datetime.now().astimezone().isoformat() + return carch.get_data(pvname, datetime, datetime) class _ASCurrInfoApp(_CurrInfoApp): diff --git a/siriuspy/siriuspy/machshift/gensumm_macreport.py b/siriuspy/siriuspy/machshift/gensumm_macreport.py index 7925aa7c9..89b0a20da 100644 --- a/siriuspy/siriuspy/machshift/gensumm_macreport.py +++ b/siriuspy/siriuspy/machshift/gensumm_macreport.py @@ -53,7 +53,7 @@ for intvl in intervals: macreports[intvl[0]] = MacReport() # macreports[intvl[0]].connector.server_url = 'https://archiver-temp.cnpem.br' # necessary for 2024 and early - macreports[intvl[0]].connector.timeout = 30 + macreports[intvl[0]].connector.query_timeout = 30 macreports[intvl[0]].time_start = intvl[0] macreports[intvl[0]].time_stop = intvl[1] macreports[intvl[0]].update() @@ -136,7 +136,7 @@ # programmed vs. delivered hours macr = MacReport() -macr.connector.timeout = 300 +macr.connector.query_timeout = 300 macr.time_start = Time(2024, 1, 1, 0, 0) macr.time_stop = Time(2024, 12, 31, 23, 59, 59) macr.update() diff --git a/siriuspy/siriuspy/machshift/macreport.py b/siriuspy/siriuspy/machshift/macreport.py index 258b5faba..2a264de5e 100644 --- a/siriuspy/siriuspy/machshift/macreport.py +++ b/siriuspy/siriuspy/machshift/macreport.py @@ -282,8 +282,8 @@ def __init__(self, connector=None, logger=None): self._init_connectors() # query data - self._time_start = None - self._time_stop = None + self._time_start = _Time.now() + self._time_stop = self._time_start # user shift stats self._usershift_progmd_time = None @@ -439,19 +439,6 @@ def logger(self, new_logger): datefmt='%F %T', level=_log.INFO, stream=_sys.stdout) - @property - def timestamp_start(self): - """Query interval start timestamp.""" - if not self._time_start: - return None - return self._time_start.timestamp() - - @timestamp_start.setter - def timestamp_start(self, new_timestamp): - if not isinstance(new_timestamp, (float, int)): - raise TypeError('expected argument of type float or int') - self._time_start = _Time(timestamp=new_timestamp) - @property def time_start(self): """Time start.""" @@ -459,22 +446,8 @@ def time_start(self): @time_start.setter def time_start(self, new_time): - if not isinstance(new_time, _Time): - raise TypeError('expected argument of type Time') - self._time_start = new_time - - @property - def timestamp_stop(self): - """Query interval stop timestamp.""" - if not self._time_stop: - return None - return self._time_stop.timestamp() - - @timestamp_stop.setter - def timestamp_stop(self, new_timestamp): - if not isinstance(new_timestamp, (float, int)): - raise TypeError('expected argument of type float or int') - self._time_stop = _Time(timestamp=new_timestamp) + """Accept any value that can be converted to a Time object.""" + self._time_start = _Time(new_time) @property def time_stop(self): @@ -483,9 +456,8 @@ def time_stop(self): @time_stop.setter def time_stop(self, new_time): - if not isinstance(new_time, _Time): - raise TypeError('expected argument of type Time') - self._time_stop = new_time + """Accept any value that can be converted to a Time object.""" + self._time_stop = _Time(new_time) # user shift stats @@ -1062,30 +1034,40 @@ def update(self): # current _t0 = _time.time() - self._pvdata[self._current_pv].parallel_query_bin_interval = 60*60*6 - self._pvdata[self._current_pv].update(MacReport.QUERY_AVG_TIME) + pvd = self._pvdata[self._current_pv] + pvd.query_split_interval = 60 * 60 * 6 + pvd.processing_type = pvd.ProcessingTypes.Mean + pvd.processing_type_param1 = MacReport.QUERY_AVG_TIME + pvd.update() + self._update_log(log_msg.format(self._current_pv, _time.time()-_t0)) # macshift, interlock and stability indicators for pvn in self._pvnames: if pvn == self._current_pv: continue - interval, parallel = None, False _t0 = _time.time() - self._pvdata[pvn].update(mean_sec=interval, parallel=parallel) + # Set query_split_interval for the rest of PVs to 0 to + # avoid multiple queries and speed up the process. + self._pvdata[pvn].query_split_interval = 0 + self._pvdata[pvn].update() self._update_log(log_msg.format(pvn, _time.time()-_t0)) # ps for group, pvdataset in self._pvdataset.items(): _t0 = _time.time() - pvdataset.update(parallel=False) + # Set query_split_interval for the rest of PVs to 0 to + # avoid multiple queries and speed up the process. + pvdataset.query_split_interval = 0 + pvdataset.update() self._update_log(log_msg.format( - 'SI PS '+group.capitalize(), _time.time()-_t0)) + 'SI PS '+group.capitalize(), _time.time()-_t0) + ) self._compute_stats() def plot_raw_data(self): - """Plot raw data for period timestamp_start to timestamp_stop.""" + """Plot raw data for period time_start to time_stop.""" if not self._raw_data: print('No data to display. Call update() to get data.') return diff --git a/siriuspy/siriuspy/machshift/savedata_macreport.py b/siriuspy/siriuspy/machshift/savedata_macreport.py index 2a8848e51..3f55cba92 100644 --- a/siriuspy/siriuspy/machshift/savedata_macreport.py +++ b/siriuspy/siriuspy/machshift/savedata_macreport.py @@ -10,7 +10,7 @@ # get data from interval macr = MacReport() -macr.connector.timeout = 300 +macr.connector.query_timeout = 300 macr.time_start = time_start macr.time_stop = time_stop macr.update() diff --git a/siriuspy/siriuspy/pwrsupply/tests/Untitled.ipynb b/siriuspy/siriuspy/pwrsupply/tests/Untitled.ipynb index c5942f78b..2034c799e 100644 --- a/siriuspy/siriuspy/pwrsupply/tests/Untitled.ipynb +++ b/siriuspy/siriuspy/pwrsupply/tests/Untitled.ipynb @@ -101,10 +101,10 @@ "tstamp1 = ini_time.isoformat() + '-03:00'\n", "tstamp2 = end_time.isoformat() + '-03:00'\n", "\n", - "info11 = ca.getData(pvname11, tstamp1, tstamp2)\n", - "info12 = ca.getData(pvname12, tstamp1, tstamp2)\n", - "info13 = ca.getData(pvname13, tstamp1, tstamp2)\n", - "info14 = ca.getData(pvname14, tstamp1, tstamp2)\n", + "info11 = ca.get_data(pvname11, tstamp1, tstamp2)\n", + "info12 = ca.get_data(pvname12, tstamp1, tstamp2)\n", + "info13 = ca.get_data(pvname13, tstamp1, tstamp2)\n", + "info14 = ca.get_data(pvname14, tstamp1, tstamp2)\n", "time11, data11 = info11['timestamp'], info11['value']\n", "time12, data12 = info12['timestamp'], info12['value']\n", "time13, data13 = info13['timestamp'], info13['value']\n", diff --git a/siriuspy/tests/clientarch/__init__.py b/siriuspy/tests/clientarch/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/siriuspy/tests/clientarch/test_clientarch.py b/siriuspy/tests/clientarch/test_clientarch.py new file mode 100644 index 000000000..51230a4c5 --- /dev/null +++ b/siriuspy/tests/clientarch/test_clientarch.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python-sirius + +"""Test the archiver client class.""" + +import datetime +import traceback +from unittest import TestCase + +from siriuspy.clientarch.time import get_time_intervals, Time + + +class TestClientArchTime(TestCase): + """Test update and delete config meets requirements.""" + + def test_constructor(self): + """Test api.""" + tz_local = datetime.datetime.now().astimezone().tzinfo + tim_dt = datetime.datetime(2025, 1, 8, 10, 13, 14, 4587, tz_local) + tim_dt_naive = datetime.datetime(2025, 1, 8, 10, 13, 14, 4587) + try: + tim_naive = Time(2025, 1, 8) + tim_naive = Time(2025, 1, 8, 10) + tim_naive = Time(2025, 1, 8, 10, 13) + tim_naive = Time(2025, 1, 8, 10, 13, 14) + tim_naive = Time(2025, 1, 8, 10, 13, 14, 4587) + tim_naive = Time(tim_naive) + + tim_ts1 = Time(tim_naive.timestamp()) + tim_ts2 = Time( + tim_naive.strftime(tim_naive._DEFAULT_TIMESTAMP_FORMAT) + ) + tim_ts3 = Time(tim_naive.get_iso8601()) + tim_ts4 = Time(tim_dt_naive) + tim_ts5 = Time(tim_dt) + tim_ts6 = Time(tim_naive) + except Exception as err: + traceback.print_exc() + self.fail(err) + + self.assertEqual(tim_ts1, tim_naive) + self.assertEqual(tim_ts2, tim_naive) + self.assertEqual(tim_ts3, tim_naive) + self.assertEqual(tim_ts4, tim_naive) + self.assertEqual(tim_ts5, tim_naive) + self.assertEqual(tim_ts6, tim_naive) + + tz_info = datetime.timezone(datetime.timedelta(seconds=-1 * 3600)) + try: + tim = Time(2025, 1, 8, 10, 13, 14, 4587, tz_info) + tim = Time(2025, 1, 8, tzinfo=tz_info) + tim = Time(2025, 1, 8, 10, tzinfo=tz_info) + tim = Time(2025, 1, 8, 10, 13, tzinfo=tz_info) + tim = Time(2025, 1, 8, 10, 13, 14, tzinfo=tz_info) + tim = Time(2025, 1, 8, 10, 13, 14, 4587, tzinfo=tz_info) + + tim_ts1 = Time(tim_naive.timestamp(), tzinfo=tz_info) + tim_ts2 = Time( + tim_naive.strftime(tim_naive._DEFAULT_TIMESTAMP_FORMAT), + tzinfo=tz_info, + ) + tim_ts3 = Time(tim_naive.get_iso8601(), tzinfo=tz_info) + tim_ts4 = Time(tim_dt) + tim_ts5 = Time(tim_dt_naive, tzinfo=tz_info) + tim_ts6 = Time(tim_naive, tzinfo=tz_info) + tim_ts7 = Time(tim) + except Exception as err: + traceback.print_exc() + self.fail(err) + + self.assertNotEqual(tim, tim_naive) + self.assertNotEqual(tim.timestamp(), tim_naive.timestamp()) + + self.assertNotEqual(tim_ts1, tim) + self.assertNotEqual(tim_ts1.timestamp(), tim.timestamp()) + + self.assertEqual(tim_ts2, tim) + + self.assertNotEqual(tim_ts3, tim) + self.assertNotEqual(tim_ts3.timestamp(), tim.timestamp()) + self.assertEqual(tim_ts1, tim_ts3) + + self.assertNotEqual(tim_ts4, tim) + self.assertNotEqual(tim_ts4.timestamp(), tim.timestamp()) + self.assertEqual(tim_ts3, tim_ts4) + + self.assertNotEqual(tim_ts5, tim) + self.assertNotEqual(tim_ts5.timestamp(), tim.timestamp()) + self.assertEqual(tim_ts4, tim_ts5) + + self.assertNotEqual(tim_ts6, tim) + self.assertNotEqual(tim_ts6.timestamp(), tim.timestamp()) + self.assertEqual(tim_ts5, tim_ts6) + + self.assertEqual(tim_ts7, tim) + + with self.assertRaises(ValueError): + Time('2025-01-ladieno') + with self.assertRaises(TypeError): + Time((tim,)) + ts_int = tim.timestamp() + ts_str = tim.get_iso8601() + with self.assertRaises(TypeError): + Time(ts_int, timestamp=ts_int) + with self.assertRaises(TypeError): + Time(ts_str, timestamp=ts_str) + with self.assertRaises(TypeError): + Time(ts_int, timestamp_string=ts_int) + with self.assertRaises(TypeError): + Time(ts_str, timestamp=ts_int) + with self.assertRaises(TypeError): + Time(timestamp=ts_int, timestamp_string=ts_str) + with self.assertRaises(TypeError): + Time(timestamp_string=ts_int) + with self.assertRaises(TypeError): + Time(timestamp=ts_str) + + def test_get_time_intervals(self): + """Test get_time_intervals.""" + tz_info = datetime.timezone(datetime.timedelta(seconds=-3 * 3600)) + time_start = Time(2026, 1, 13, 0, 0, 0, 345, tzinfo=tz_info) + time_stop = time_start + 24 * 3600 + interval = 3600 * 10 + + tst_corr = [ + '2026-01-13T00:00:00.000345-03:00', + '2026-01-13T10:00:00.000345-03:00', + '2026-01-13T20:00:00.000345-03:00', + ] + tsp_corr = [ + '2026-01-13T10:00:00.000345-03:00', + '2026-01-13T20:00:00.000345-03:00', + '2026-01-14T00:00:00.000345-03:00', + ] + tst, tsp = get_time_intervals( + time_start, + time_stop, + interval, + return_isoformat=True, + tzinfo=tz_info, + ) + self.assertEqual(tst, tst_corr) + self.assertEqual(tsp, tsp_corr) + + tst_corr = [Time(t) for t in tst_corr] + tsp_corr = [Time(t) for t in tsp_corr] + tst, tsp = get_time_intervals( + time_start, time_stop, interval, return_isoformat=False + ) + self.assertEqual(tst, tst_corr) + self.assertEqual(tsp, tsp_corr) + + time_stop = time_start + 4 * 3600 + tst, tsp = get_time_intervals( + time_start, time_stop, interval, return_isoformat=False + ) + self.assertEqual(tst, time_start) + self.assertEqual(tsp, time_stop) diff --git a/siriuspy/tests/currinfo/test_main.py b/siriuspy/tests/currinfo/test_main.py index 7a8102bc4..6778087f4 100755 --- a/siriuspy/tests/currinfo/test_main.py +++ b/siriuspy/tests/currinfo/test_main.py @@ -39,7 +39,7 @@ def setUp(self): "siriuspy.currinfo.main._ClientArch", autospec=True) self.addCleanup(ca_patcher.stop) self.mock_ca = ca_patcher.start() - self.mock_ca.return_value.getData.return_value = None + self.mock_ca.return_value.get_data.return_value = None pv_patcher = mock.patch( "siriuspy.currinfo.main._PV", autospec=True) self.addCleanup(pv_patcher.stop)