From b4ff21c3cfb4b8f11b8f560d08bb8b8a7a64f86a Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Tue, 12 Mar 2024 14:56:12 +0000 Subject: [PATCH 01/16] Adding agent for the Lightning Detector --- socs/agents/ld_monitor/__init__.py | 0 socs/agents/ld_monitor/agent.py | 423 +++++++++++++++++++++++++++++ 2 files changed, 423 insertions(+) create mode 100644 socs/agents/ld_monitor/__init__.py create mode 100644 socs/agents/ld_monitor/agent.py diff --git a/socs/agents/ld_monitor/__init__.py b/socs/agents/ld_monitor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py new file mode 100644 index 000000000..306dce19c --- /dev/null +++ b/socs/agents/ld_monitor/agent.py @@ -0,0 +1,423 @@ +import socket +import numpy +import time +import os +import argparse +import txaio +import yaml +from ocs import ocs_agent, site_config +from ocs.ocs_twisted import Pacemaker,TimeoutLock + +verbosity=False + +class ld_monitor: + """Receives and decodes data of the lightning detector via UDP + + Parameters + ---------- + host : str + Address of the computer reading the data. + port : int + Port of host where data will be received, default 1110. + verbose : boolean + Defines verbosity of the function (debug purposes). + + Attributes + ---------- + verbose : bool + Defines verbosity for debugging purposes + host : string + Defines the host where data will be received (where the agent is to be ran) + port : int + Port number in the local host to be bound to receive the data + sockopen : bool + Indicates when the socket is open + inittime : float + Logs the time at which initialization was carried out + data_dict : dictionary + Raw data received from the lightning detector + newdata_dict : dictioanry + The dictionary where new data is received + """ + + def __init__(self,port=1110,verbose=verbosity): + self.verbose=verbose + self.port=port + + # get localhost ip + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + self.host=s.getsockname()[0] + + if hasattr(self,'sockopen'): + self.sock.close() + + # open and bind socket to receive lightning detector data + try: + self.sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) + except: + print('Failed to create socket') + + try: + self.sock.bind((self.host,self.port)) + self.sockopen=True + self.inittime=time.time() + except: + print('Failed to bind socket') + + # initialize variables to account for absence of previous data + self.data_dict={ + 'd_type':numpy.nan, + 'field_value':numpy.nan, + 'rot_fault':0, + 'time_last':-1., + 'tsince_last':-1., + 'dist':-1, + 'unit_d':0, + 'high_field':-1, + 'hifield_value':-1000., + 'alarm_r':0, + 'alarm_o':0, + 'alarm_y':0, + 'delay_g':1, + 'clear':0, + 'r_timer':0, + 'o_timer':0, + 'y_timer':0, + 'g_timer':0, + 'allclear_timer':0, + 'faultcode':0 + } + + if self.verbose: + print('ld_monitor function monitor initialized') + + def read_data(self): + """ + Receives data from the lightning detector via UDP, + and returns a dictionary containing formatted data + """ + + self.data, _ = self.sock.recvfrom(1024) + self.data=self.data.decode('utf-8') + + # receiving an "e-fiel" sentence + if self.data[0]=='$': + data_split=self.data[1:].split(',') + rot_fault=int(data_split[1].split('*')[0]) + self.newdata_dict={ + 'd_type':0, + 'field_value':float(data_split[0]), + 'rot_fault':rot_fault + } + self.data_dict.update(self.newdata_dict) + return self.data_dict + + elif self.data[0]=='@': + param=self.data[1:3] + + match param: + # receiving a "lightning strike" sentence + case 'LI': + data_split=self.data.split(',')[1:] + if data_split[2].split('*')[0]=='Miles': + unit_d=0 + elif data_split[2].split('*')[0]=='Km': + unit_d=1 + + self.newdata_dict={ + 'd_type':1, + 'time_last':time.time(), + 'dist':int(data_split[1]), + 'unit_d':unit_d + } + self.data_dict.update(self.newdata_dict) + + return self.data_dict + + # receiving a "high e-field" sentence, account for 2 types + case 'HF': + data_split=self.data[1:].split(',') + if len(data_split)==1: + self.newdata_dict={ + 'd_type':2, + 'high_field':1, + 'hifield_value':float(self.data_dict['field_value']) + } + else: + self.newdata_dict={ + 'd_type':2, + 'hifield_value':float(data_split[1]) + } + self.data_dict.update(self.newdata_dict) + return self.data_dict + + # status sentence + case 'ST': + faultcode=int(self.data.split(',')[-1].split('*')[0],16) + data_split=[int(i) for i in self.data.split(',')[1:-1]] + + self.newdata_dict={ + 'd_type':3, + 'alarm_r':data_split[0], + 'alarm_o':data_split[1], + 'alarm_y':data_split[2], + 'delay_g':data_split[3], + 'clear':data_split[4], + 'r_timer':data_split[5], + 'o_timer':data_split[6], + 'y_timer':data_split[7], + 'g_timer':data_split[8], + 'allclear_timer':data_split[9], + 'faultcode':faultcode + } + + self.data_dict.update(self.newdata_dict) + return self.data_dict + + # disregard "alarm timers" sentence but update sentence type + case 'WT': + self.newdata_dict={'d_type':4} + self.data_dict.update(self.newdata_dict) + return self.data_dict + + def read_cycle(self): + """ + In each cycle data is read and then parsed following + the format required to publish data to the ocs feed + """ + try: + cycle_data={} + self.read_data() + + # updates time since last strike if previous strike data exists + if self.data_dict['time_last']==-1.: + self.data_dict['tsince_last']=-1. + else: + self.data_dict['tsince_last']=(time.time() + -self.data_dict['time_last']) + + # parse data to ocs agent feed format + for key in self.data_dict: + cycle_data[key]={'value':self.data_dict[key]} + + if self.verbose==True: + print(cycle_data) + return cycle_data + + except: + pass + if self.verbose: + print('Passing to next data iteration') + +class ld_monitorAgent: + """Monitor the Lightning Detector data via UDP. + + Parameters + ---------- + agent : OCSAgent + OCSAgent object which forms this Agent + unit : int + sample_interval : float + Time between samples in seconds. + + Attributes + ---------- + agent : OCSAgent + OCSAgent object which forms this Agent + take_data : bool + Tracks whether or not the agent is actively issuing SNMP GET commands + to the ibootbar. Setting to false stops sending commands. + log : txaio.tx.Logger + txaio logger object, created by the OCSAgent + """ + + def __init__(self, agent, unit=1, sample_interval=15.): + + self.unit = unit + self.agent: ocs_agent.OCSAgent = agent + self.log = agent.log + self.lock = TimeoutLock() + + self.pacemaker_freq = 1. / sample_interval + + self.initialized = False + self.take_data = False + + self.ld_monitor= None + + agg_params = { + 'frame_length':10*60 # [sec] + } + self.agent.register_feed('ld_monitor', + record=True, + agg_params=agg_params, + buffer_time=0) + + def _connect(self): + """connect() + Instantiates LD object and check if client is open + """ + self.ld_monitor= ld_monitor(verbose=verbosity) + self.initialized = True + + @ocs_agent.param('auto_acquire', default=False, type=bool) + def init_ld_monitor(self, session, params=None): + """ + Perform first time setup of the LD. + + Parameters: + auto_acquire (bool, optional): Starts data acquisition after + initialization if True. Defaults to False. + + """ + + if self.initialized: + return True, "Already initialized." + + with self.lock.acquire_timeout(3, job='init') as acquired: + if not acquired: + self.log.warn("Could not start init because " + "{} is already running".format(self.lock.job)) + return False, "Could not acquire lock." + + session.set_status('starting') + + self._connect() + if not self.initialized: + return False, 'Could not connect to LD' + + # Start data acquisition if requested + if params['auto_acquire']: + self.agent.start('acq') + return True, 'LD initialized.' + + @ocs_agent.param('_') + def acq(self, session, params=None): + """acq() + + Starts the data acquisition process + + """ + with self.lock.acquire_timeout(0, job='acq') as acquired: + if not acquired: + self.log.warn("Could not start acq because {} is already running" + .format(self.lock.job)) + return False, "Could not acquire lock." + + session.set_status('running') + + self.take_data = True + + session.data = {"fields": {}} + + pm = Pacemaker(self.pacemaker_freq) + while self.take_data: + pm.sleep() + + current_time = time.time() + data = { + 'timestamp': current_time, + 'connection': {}, + 'block_name': 'registers', + 'data': {} + } + if self.ld_monitor.sockopen==False: + self.initialized = False + + #Try to re-initialize if connection lost + if not self.initialized: + self._connect() + + # Only get readings if connected + if self.initialized: + session.data.update({'connection': {'last_attempt': time.time(), + 'connected': True}}) + + regdata = self.ld_monitor.read_cycle() + + if regdata: + for reg in regdata: + data['data'][reg] = regdata[reg]["value"] + field_dict = {reg: regdata[reg]['value']} + session.data['fields'].update(field_dict) + session.data.update({'timestamp': current_time}) + else: + self.log.info('Connection error or error in processing data.') + self.initialized = False + + # Continue trying to connect + if not self.initialized: + session.data.update({'connection': {'last_attempt': time.time(), + 'connected': False}}) + self.log.info('Trying to reconnect.') + continue + + for field, val in data['data'].items(): + _data = { + 'timestamp': current_time, + 'block_name': field, + 'data': {field: val} + } + self.agent.publish_to_feed('ld_monitor', _data) + + self.agent.feeds['ld_monitor'].flush_buffer() + + return True, 'Acquisition exited cleanly.' + + def _stop_acq(self, session, params=None): + """ + Stops acq process. + """ + if self.take_data: + self.take_data = False + return True, 'requested to stop taking data.' + else: + return False, 'acq is not currently running' + + +def make_parser(parser=None): + if parser is None: + parser = argparse.ArgumentParser() + + pgroup = parser.add_argument_group('Agent Options') + pgroup.add_argument("--unit", default=1, + help="unit to listen to.") + pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], + help="Starting action for the agent.") + pgroup.add_argument("--sample-interval", type=float, default=15., help="Time between samples in seconds.") + + return parser + + +def main(args=None): + # Start logging + txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) + + parser = make_parser() + + # Interpret options in the context of site_config. + args = site_config.parse_args(agent_class='ld_monitor', + parser=parser, + args=args) + + # Automatically acquire data if requested (default) + init_params = False + if args.mode == 'init': + init_params = {'auto_acquire': False} + elif args.mode == 'acq': + init_params = {'auto_acquire': True} + print('init_params', init_params) + agent, runner = ocs_agent.init_site_agent(args) + + p = ld_monitorAgent(agent, + unit=int(args.unit), + sample_interval=args.sample_interval) + agent.register_task('init_ld_monitor', p.init_ld_monitor, + startup=init_params) + agent.register_process('acq', p.acq, p._stop_acq) + runner.run(agent, auto_reconnect=True) + + +if __name__ == '__main__': + main() From 2c9719b7e1aae36c53dbd4bef36c4314639772b5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Mar 2024 15:14:33 +0000 Subject: [PATCH 02/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- socs/agents/ld_monitor/agent.py | 253 ++++++++++++++++---------------- 1 file changed, 128 insertions(+), 125 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 306dce19c..9fe7d9bd9 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -1,14 +1,16 @@ +import argparse +import os import socket -import numpy import time -import os -import argparse + +import numpy import txaio import yaml from ocs import ocs_agent, site_config -from ocs.ocs_twisted import Pacemaker,TimeoutLock +from ocs.ocs_twisted import Pacemaker, TimeoutLock + +verbosity = False -verbosity=False class ld_monitor: """Receives and decodes data of the lightning detector via UDP @@ -40,55 +42,55 @@ class ld_monitor: The dictionary where new data is received """ - def __init__(self,port=1110,verbose=verbosity): - self.verbose=verbose - self.port=port + def __init__(self, port=1110, verbose=verbosity): + self.verbose = verbose + self.port = port # get localhost ip with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) - self.host=s.getsockname()[0] - - if hasattr(self,'sockopen'): + self.host = s.getsockname()[0] + + if hasattr(self, 'sockopen'): self.sock.close() # open and bind socket to receive lightning detector data try: - self.sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) - except: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + except BaseException: print('Failed to create socket') - + try: - self.sock.bind((self.host,self.port)) - self.sockopen=True - self.inittime=time.time() - except: + self.sock.bind((self.host, self.port)) + self.sockopen = True + self.inittime = time.time() + except BaseException: print('Failed to bind socket') - + # initialize variables to account for absence of previous data - self.data_dict={ - 'd_type':numpy.nan, - 'field_value':numpy.nan, - 'rot_fault':0, - 'time_last':-1., - 'tsince_last':-1., - 'dist':-1, - 'unit_d':0, - 'high_field':-1, - 'hifield_value':-1000., - 'alarm_r':0, - 'alarm_o':0, - 'alarm_y':0, - 'delay_g':1, - 'clear':0, - 'r_timer':0, - 'o_timer':0, - 'y_timer':0, - 'g_timer':0, - 'allclear_timer':0, - 'faultcode':0 - } - + self.data_dict = { + 'd_type': numpy.nan, + 'field_value': numpy.nan, + 'rot_fault': 0, + 'time_last': -1., + 'tsince_last': -1., + 'dist': -1, + 'unit_d': 0, + 'high_field': -1, + 'hifield_value': -1000., + 'alarm_r': 0, + 'alarm_o': 0, + 'alarm_y': 0, + 'delay_g': 1, + 'clear': 0, + 'r_timer': 0, + 'o_timer': 0, + 'y_timer': 0, + 'g_timer': 0, + 'allclear_timer': 0, + 'faultcode': 0 + } + if self.verbose: print('ld_monitor function monitor initialized') @@ -99,85 +101,85 @@ def read_data(self): """ self.data, _ = self.sock.recvfrom(1024) - self.data=self.data.decode('utf-8') - + self.data = self.data.decode('utf-8') + # receiving an "e-fiel" sentence - if self.data[0]=='$': - data_split=self.data[1:].split(',') - rot_fault=int(data_split[1].split('*')[0]) - self.newdata_dict={ - 'd_type':0, - 'field_value':float(data_split[0]), - 'rot_fault':rot_fault - } + if self.data[0] == '$': + data_split = self.data[1:].split(',') + rot_fault = int(data_split[1].split('*')[0]) + self.newdata_dict = { + 'd_type': 0, + 'field_value': float(data_split[0]), + 'rot_fault': rot_fault + } self.data_dict.update(self.newdata_dict) return self.data_dict - - elif self.data[0]=='@': - param=self.data[1:3] - + + elif self.data[0] == '@': + param = self.data[1:3] + match param: # receiving a "lightning strike" sentence case 'LI': - data_split=self.data.split(',')[1:] - if data_split[2].split('*')[0]=='Miles': - unit_d=0 - elif data_split[2].split('*')[0]=='Km': - unit_d=1 - - self.newdata_dict={ - 'd_type':1, - 'time_last':time.time(), - 'dist':int(data_split[1]), - 'unit_d':unit_d - } + data_split = self.data.split(',')[1:] + if data_split[2].split('*')[0] == 'Miles': + unit_d = 0 + elif data_split[2].split('*')[0] == 'Km': + unit_d = 1 + + self.newdata_dict = { + 'd_type': 1, + 'time_last': time.time(), + 'dist': int(data_split[1]), + 'unit_d': unit_d + } self.data_dict.update(self.newdata_dict) return self.data_dict - + # receiving a "high e-field" sentence, account for 2 types case 'HF': - data_split=self.data[1:].split(',') - if len(data_split)==1: - self.newdata_dict={ - 'd_type':2, - 'high_field':1, - 'hifield_value':float(self.data_dict['field_value']) - } + data_split = self.data[1:].split(',') + if len(data_split) == 1: + self.newdata_dict = { + 'd_type': 2, + 'high_field': 1, + 'hifield_value': float(self.data_dict['field_value']) + } else: - self.newdata_dict={ - 'd_type':2, - 'hifield_value':float(data_split[1]) - } + self.newdata_dict = { + 'd_type': 2, + 'hifield_value': float(data_split[1]) + } self.data_dict.update(self.newdata_dict) return self.data_dict - + # status sentence case 'ST': - faultcode=int(self.data.split(',')[-1].split('*')[0],16) - data_split=[int(i) for i in self.data.split(',')[1:-1]] - - self.newdata_dict={ - 'd_type':3, - 'alarm_r':data_split[0], - 'alarm_o':data_split[1], - 'alarm_y':data_split[2], - 'delay_g':data_split[3], - 'clear':data_split[4], - 'r_timer':data_split[5], - 'o_timer':data_split[6], - 'y_timer':data_split[7], - 'g_timer':data_split[8], - 'allclear_timer':data_split[9], - 'faultcode':faultcode - } - + faultcode = int(self.data.split(',')[-1].split('*')[0], 16) + data_split = [int(i) for i in self.data.split(',')[1:-1]] + + self.newdata_dict = { + 'd_type': 3, + 'alarm_r': data_split[0], + 'alarm_o': data_split[1], + 'alarm_y': data_split[2], + 'delay_g': data_split[3], + 'clear': data_split[4], + 'r_timer': data_split[5], + 'o_timer': data_split[6], + 'y_timer': data_split[7], + 'g_timer': data_split[8], + 'allclear_timer': data_split[9], + 'faultcode': faultcode + } + self.data_dict.update(self.newdata_dict) return self.data_dict # disregard "alarm timers" sentence but update sentence type case 'WT': - self.newdata_dict={'d_type':4} + self.newdata_dict = {'d_type': 4} self.data_dict.update(self.newdata_dict) return self.data_dict @@ -187,29 +189,30 @@ def read_cycle(self): the format required to publish data to the ocs feed """ try: - cycle_data={} + cycle_data = {} self.read_data() - + # updates time since last strike if previous strike data exists - if self.data_dict['time_last']==-1.: - self.data_dict['tsince_last']=-1. + if self.data_dict['time_last'] == -1.: + self.data_dict['tsince_last'] = -1. else: - self.data_dict['tsince_last']=(time.time() - -self.data_dict['time_last']) - + self.data_dict['tsince_last'] = (time.time() + - self.data_dict['time_last']) + # parse data to ocs agent feed format for key in self.data_dict: - cycle_data[key]={'value':self.data_dict[key]} - - if self.verbose==True: + cycle_data[key] = {'value': self.data_dict[key]} + + if self.verbose: print(cycle_data) return cycle_data - except: + except BaseException: pass if self.verbose: print('Passing to next data iteration') - + + class ld_monitorAgent: """Monitor the Lightning Detector data via UDP. @@ -233,7 +236,7 @@ class ld_monitorAgent: """ def __init__(self, agent, unit=1, sample_interval=15.): - + self.unit = unit self.agent: ocs_agent.OCSAgent = agent self.log = agent.log @@ -244,10 +247,10 @@ def __init__(self, agent, unit=1, sample_interval=15.): self.initialized = False self.take_data = False - self.ld_monitor= None + self.ld_monitor = None agg_params = { - 'frame_length':10*60 # [sec] + 'frame_length': 10 * 60 # [sec] } self.agent.register_feed('ld_monitor', record=True, @@ -258,7 +261,7 @@ def _connect(self): """connect() Instantiates LD object and check if client is open """ - self.ld_monitor= ld_monitor(verbose=verbosity) + self.ld_monitor = ld_monitor(verbose=verbosity) self.initialized = True @ocs_agent.param('auto_acquire', default=False, type=bool) @@ -295,9 +298,9 @@ def init_ld_monitor(self, session, params=None): @ocs_agent.param('_') def acq(self, session, params=None): """acq() - + Starts the data acquisition process - + """ with self.lock.acquire_timeout(0, job='acq') as acquired: if not acquired: @@ -322,10 +325,10 @@ def acq(self, session, params=None): 'block_name': 'registers', 'data': {} } - if self.ld_monitor.sockopen==False: + if not self.ld_monitor.sockopen: self.initialized = False - #Try to re-initialize if connection lost + # Try to re-initialize if connection lost if not self.initialized: self._connect() @@ -335,7 +338,7 @@ def acq(self, session, params=None): 'connected': True}}) regdata = self.ld_monitor.read_cycle() - + if regdata: for reg in regdata: data['data'][reg] = regdata[reg]["value"] @@ -352,7 +355,7 @@ def acq(self, session, params=None): 'connected': False}}) self.log.info('Trying to reconnect.') continue - + for field, val in data['data'].items(): _data = { 'timestamp': current_time, @@ -409,10 +412,10 @@ def main(args=None): init_params = {'auto_acquire': True} print('init_params', init_params) agent, runner = ocs_agent.init_site_agent(args) - + p = ld_monitorAgent(agent, - unit=int(args.unit), - sample_interval=args.sample_interval) + unit=int(args.unit), + sample_interval=args.sample_interval) agent.register_task('init_ld_monitor', p.init_ld_monitor, startup=init_params) agent.register_process('acq', p.acq, p._stop_acq) From 042f0b9f08b225fb2e75693a999f094d1ed9e0c7 Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Tue, 12 Mar 2024 16:24:35 +0000 Subject: [PATCH 03/16] Removed unused yaml library from code --- socs/agents/ld_monitor/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 9fe7d9bd9..96994062d 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -5,7 +5,6 @@ import numpy import txaio -import yaml from ocs import ocs_agent, site_config from ocs.ocs_twisted import Pacemaker, TimeoutLock From 81f3604566bc83f914abac396da4347485a44b54 Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Tue, 12 Mar 2024 18:29:59 +0000 Subject: [PATCH 04/16] replaced match function to comply with python3.8 --- socs/agents/ld_monitor/agent.py | 115 ++++++++++++++++---------------- 1 file changed, 57 insertions(+), 58 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 96994062d..5ce2f637a 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -102,7 +102,7 @@ def read_data(self): self.data, _ = self.sock.recvfrom(1024) self.data = self.data.decode('utf-8') - # receiving an "e-fiel" sentence + # "e-field" sentence if self.data[0] == '$': data_split = self.data[1:].split(',') rot_fault = int(data_split[1].split('*')[0]) @@ -116,71 +116,70 @@ def read_data(self): elif self.data[0] == '@': param = self.data[1:3] + + # "lightning strike" sentence + if param=='LI': + data_split = self.data.split(',')[1:] + if data_split[2].split('*')[0] == 'Miles': + unit_d = 0 + elif data_split[2].split('*')[0] == 'Km': + unit_d = 1 + + self.newdata_dict = { + 'd_type': 1, + 'time_last': time.time(), + 'dist': int(data_split[1]), + 'unit_d': unit_d + } + self.data_dict.update(self.newdata_dict) - match param: - # receiving a "lightning strike" sentence - case 'LI': - data_split = self.data.split(',')[1:] - if data_split[2].split('*')[0] == 'Miles': - unit_d = 0 - elif data_split[2].split('*')[0] == 'Km': - unit_d = 1 + return self.data_dict + # "high e-field" sentence, account for 2 types + elif param=='HF': + data_split = self.data[1:].split(',') + if len(data_split) == 1: self.newdata_dict = { - 'd_type': 1, - 'time_last': time.time(), - 'dist': int(data_split[1]), - 'unit_d': unit_d - } - self.data_dict.update(self.newdata_dict) - - return self.data_dict - - # receiving a "high e-field" sentence, account for 2 types - case 'HF': - data_split = self.data[1:].split(',') - if len(data_split) == 1: - self.newdata_dict = { - 'd_type': 2, - 'high_field': 1, - 'hifield_value': float(self.data_dict['field_value']) - } - else: - self.newdata_dict = { - 'd_type': 2, - 'hifield_value': float(data_split[1]) + 'd_type': 2, + 'high_field': 1, + 'hifield_value': float(self.data_dict['field_value']) } - self.data_dict.update(self.newdata_dict) - return self.data_dict - - # status sentence - case 'ST': - faultcode = int(self.data.split(',')[-1].split('*')[0], 16) - data_split = [int(i) for i in self.data.split(',')[1:-1]] - + else: self.newdata_dict = { - 'd_type': 3, - 'alarm_r': data_split[0], - 'alarm_o': data_split[1], - 'alarm_y': data_split[2], - 'delay_g': data_split[3], - 'clear': data_split[4], - 'r_timer': data_split[5], - 'o_timer': data_split[6], - 'y_timer': data_split[7], - 'g_timer': data_split[8], - 'allclear_timer': data_split[9], - 'faultcode': faultcode + 'd_type': 2, + 'hifield_value': float(data_split[1]) + } + self.data_dict.update(self.newdata_dict) + return self.data_dict + + # "status" sentence + elif param=='ST': + faultcode = int(self.data.split(',')[-1].split('*')[0], 16) + data_split = [int(i) for i in self.data.split(',')[1:-1]] + + self.newdata_dict = { + 'd_type': 3, + 'alarm_r': data_split[0], + 'alarm_o': data_split[1], + 'alarm_y': data_split[2], + 'delay_g': data_split[3], + 'clear': data_split[4], + 'r_timer': data_split[5], + 'o_timer': data_split[6], + 'y_timer': data_split[7], + 'g_timer': data_split[8], + 'allclear_timer': data_split[9], + 'faultcode': faultcode } - self.data_dict.update(self.newdata_dict) - return self.data_dict + self.data_dict.update(self.newdata_dict) + return self.data_dict - # disregard "alarm timers" sentence but update sentence type - case 'WT': - self.newdata_dict = {'d_type': 4} - self.data_dict.update(self.newdata_dict) - return self.data_dict + # disregard "alarm timers" sentence but update sentence type + elif param=='WT': + self.newdata_dict = {'d_type': 4} + self.data_dict.update(self.newdata_dict) + return self.data_dict def read_cycle(self): """ From ca14faefa50e82b9bb1718a0dfe01f25a21d211b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Mar 2024 18:31:15 +0000 Subject: [PATCH 05/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- socs/agents/ld_monitor/agent.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 5ce2f637a..c96035744 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -116,9 +116,9 @@ def read_data(self): elif self.data[0] == '@': param = self.data[1:3] - - # "lightning strike" sentence - if param=='LI': + + # "lightning strike" sentence + if param == 'LI': data_split = self.data.split(',')[1:] if data_split[2].split('*')[0] == 'Miles': unit_d = 0 @@ -130,30 +130,30 @@ def read_data(self): 'time_last': time.time(), 'dist': int(data_split[1]), 'unit_d': unit_d - } + } self.data_dict.update(self.newdata_dict) return self.data_dict # "high e-field" sentence, account for 2 types - elif param=='HF': + elif param == 'HF': data_split = self.data[1:].split(',') if len(data_split) == 1: self.newdata_dict = { 'd_type': 2, 'high_field': 1, 'hifield_value': float(self.data_dict['field_value']) - } + } else: self.newdata_dict = { 'd_type': 2, 'hifield_value': float(data_split[1]) - } + } self.data_dict.update(self.newdata_dict) return self.data_dict # "status" sentence - elif param=='ST': + elif param == 'ST': faultcode = int(self.data.split(',')[-1].split('*')[0], 16) data_split = [int(i) for i in self.data.split(',')[1:-1]] @@ -170,13 +170,13 @@ def read_data(self): 'g_timer': data_split[8], 'allclear_timer': data_split[9], 'faultcode': faultcode - } + } self.data_dict.update(self.newdata_dict) return self.data_dict # disregard "alarm timers" sentence but update sentence type - elif param=='WT': + elif param == 'WT': self.newdata_dict = {'d_type': 4} self.data_dict.update(self.newdata_dict) return self.data_dict From 4d0d2dd8990f3debd940880b02a91779f2409166 Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Sun, 24 Mar 2024 16:27:54 +0000 Subject: [PATCH 06/16] Added modifications suggested by Brian Koopman --- socs/agents/ld_monitor/agent.py | 852 ++++++++++++++++---------------- 1 file changed, 428 insertions(+), 424 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index c96035744..7f9afe252 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -1,424 +1,428 @@ -import argparse -import os -import socket -import time - -import numpy -import txaio -from ocs import ocs_agent, site_config -from ocs.ocs_twisted import Pacemaker, TimeoutLock - -verbosity = False - - -class ld_monitor: - """Receives and decodes data of the lightning detector via UDP - - Parameters - ---------- - host : str - Address of the computer reading the data. - port : int - Port of host where data will be received, default 1110. - verbose : boolean - Defines verbosity of the function (debug purposes). - - Attributes - ---------- - verbose : bool - Defines verbosity for debugging purposes - host : string - Defines the host where data will be received (where the agent is to be ran) - port : int - Port number in the local host to be bound to receive the data - sockopen : bool - Indicates when the socket is open - inittime : float - Logs the time at which initialization was carried out - data_dict : dictionary - Raw data received from the lightning detector - newdata_dict : dictioanry - The dictionary where new data is received - """ - - def __init__(self, port=1110, verbose=verbosity): - self.verbose = verbose - self.port = port - - # get localhost ip - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(("8.8.8.8", 80)) - self.host = s.getsockname()[0] - - if hasattr(self, 'sockopen'): - self.sock.close() - - # open and bind socket to receive lightning detector data - try: - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - except BaseException: - print('Failed to create socket') - - try: - self.sock.bind((self.host, self.port)) - self.sockopen = True - self.inittime = time.time() - except BaseException: - print('Failed to bind socket') - - # initialize variables to account for absence of previous data - self.data_dict = { - 'd_type': numpy.nan, - 'field_value': numpy.nan, - 'rot_fault': 0, - 'time_last': -1., - 'tsince_last': -1., - 'dist': -1, - 'unit_d': 0, - 'high_field': -1, - 'hifield_value': -1000., - 'alarm_r': 0, - 'alarm_o': 0, - 'alarm_y': 0, - 'delay_g': 1, - 'clear': 0, - 'r_timer': 0, - 'o_timer': 0, - 'y_timer': 0, - 'g_timer': 0, - 'allclear_timer': 0, - 'faultcode': 0 - } - - if self.verbose: - print('ld_monitor function monitor initialized') - - def read_data(self): - """ - Receives data from the lightning detector via UDP, - and returns a dictionary containing formatted data - """ - - self.data, _ = self.sock.recvfrom(1024) - self.data = self.data.decode('utf-8') - - # "e-field" sentence - if self.data[0] == '$': - data_split = self.data[1:].split(',') - rot_fault = int(data_split[1].split('*')[0]) - self.newdata_dict = { - 'd_type': 0, - 'field_value': float(data_split[0]), - 'rot_fault': rot_fault - } - self.data_dict.update(self.newdata_dict) - return self.data_dict - - elif self.data[0] == '@': - param = self.data[1:3] - - # "lightning strike" sentence - if param == 'LI': - data_split = self.data.split(',')[1:] - if data_split[2].split('*')[0] == 'Miles': - unit_d = 0 - elif data_split[2].split('*')[0] == 'Km': - unit_d = 1 - - self.newdata_dict = { - 'd_type': 1, - 'time_last': time.time(), - 'dist': int(data_split[1]), - 'unit_d': unit_d - } - self.data_dict.update(self.newdata_dict) - - return self.data_dict - - # "high e-field" sentence, account for 2 types - elif param == 'HF': - data_split = self.data[1:].split(',') - if len(data_split) == 1: - self.newdata_dict = { - 'd_type': 2, - 'high_field': 1, - 'hifield_value': float(self.data_dict['field_value']) - } - else: - self.newdata_dict = { - 'd_type': 2, - 'hifield_value': float(data_split[1]) - } - self.data_dict.update(self.newdata_dict) - return self.data_dict - - # "status" sentence - elif param == 'ST': - faultcode = int(self.data.split(',')[-1].split('*')[0], 16) - data_split = [int(i) for i in self.data.split(',')[1:-1]] - - self.newdata_dict = { - 'd_type': 3, - 'alarm_r': data_split[0], - 'alarm_o': data_split[1], - 'alarm_y': data_split[2], - 'delay_g': data_split[3], - 'clear': data_split[4], - 'r_timer': data_split[5], - 'o_timer': data_split[6], - 'y_timer': data_split[7], - 'g_timer': data_split[8], - 'allclear_timer': data_split[9], - 'faultcode': faultcode - } - - self.data_dict.update(self.newdata_dict) - return self.data_dict - - # disregard "alarm timers" sentence but update sentence type - elif param == 'WT': - self.newdata_dict = {'d_type': 4} - self.data_dict.update(self.newdata_dict) - return self.data_dict - - def read_cycle(self): - """ - In each cycle data is read and then parsed following - the format required to publish data to the ocs feed - """ - try: - cycle_data = {} - self.read_data() - - # updates time since last strike if previous strike data exists - if self.data_dict['time_last'] == -1.: - self.data_dict['tsince_last'] = -1. - else: - self.data_dict['tsince_last'] = (time.time() - - self.data_dict['time_last']) - - # parse data to ocs agent feed format - for key in self.data_dict: - cycle_data[key] = {'value': self.data_dict[key]} - - if self.verbose: - print(cycle_data) - return cycle_data - - except BaseException: - pass - if self.verbose: - print('Passing to next data iteration') - - -class ld_monitorAgent: - """Monitor the Lightning Detector data via UDP. - - Parameters - ---------- - agent : OCSAgent - OCSAgent object which forms this Agent - unit : int - sample_interval : float - Time between samples in seconds. - - Attributes - ---------- - agent : OCSAgent - OCSAgent object which forms this Agent - take_data : bool - Tracks whether or not the agent is actively issuing SNMP GET commands - to the ibootbar. Setting to false stops sending commands. - log : txaio.tx.Logger - txaio logger object, created by the OCSAgent - """ - - def __init__(self, agent, unit=1, sample_interval=15.): - - self.unit = unit - self.agent: ocs_agent.OCSAgent = agent - self.log = agent.log - self.lock = TimeoutLock() - - self.pacemaker_freq = 1. / sample_interval - - self.initialized = False - self.take_data = False - - self.ld_monitor = None - - agg_params = { - 'frame_length': 10 * 60 # [sec] - } - self.agent.register_feed('ld_monitor', - record=True, - agg_params=agg_params, - buffer_time=0) - - def _connect(self): - """connect() - Instantiates LD object and check if client is open - """ - self.ld_monitor = ld_monitor(verbose=verbosity) - self.initialized = True - - @ocs_agent.param('auto_acquire', default=False, type=bool) - def init_ld_monitor(self, session, params=None): - """ - Perform first time setup of the LD. - - Parameters: - auto_acquire (bool, optional): Starts data acquisition after - initialization if True. Defaults to False. - - """ - - if self.initialized: - return True, "Already initialized." - - with self.lock.acquire_timeout(3, job='init') as acquired: - if not acquired: - self.log.warn("Could not start init because " - "{} is already running".format(self.lock.job)) - return False, "Could not acquire lock." - - session.set_status('starting') - - self._connect() - if not self.initialized: - return False, 'Could not connect to LD' - - # Start data acquisition if requested - if params['auto_acquire']: - self.agent.start('acq') - return True, 'LD initialized.' - - @ocs_agent.param('_') - def acq(self, session, params=None): - """acq() - - Starts the data acquisition process - - """ - with self.lock.acquire_timeout(0, job='acq') as acquired: - if not acquired: - self.log.warn("Could not start acq because {} is already running" - .format(self.lock.job)) - return False, "Could not acquire lock." - - session.set_status('running') - - self.take_data = True - - session.data = {"fields": {}} - - pm = Pacemaker(self.pacemaker_freq) - while self.take_data: - pm.sleep() - - current_time = time.time() - data = { - 'timestamp': current_time, - 'connection': {}, - 'block_name': 'registers', - 'data': {} - } - if not self.ld_monitor.sockopen: - self.initialized = False - - # Try to re-initialize if connection lost - if not self.initialized: - self._connect() - - # Only get readings if connected - if self.initialized: - session.data.update({'connection': {'last_attempt': time.time(), - 'connected': True}}) - - regdata = self.ld_monitor.read_cycle() - - if regdata: - for reg in regdata: - data['data'][reg] = regdata[reg]["value"] - field_dict = {reg: regdata[reg]['value']} - session.data['fields'].update(field_dict) - session.data.update({'timestamp': current_time}) - else: - self.log.info('Connection error or error in processing data.') - self.initialized = False - - # Continue trying to connect - if not self.initialized: - session.data.update({'connection': {'last_attempt': time.time(), - 'connected': False}}) - self.log.info('Trying to reconnect.') - continue - - for field, val in data['data'].items(): - _data = { - 'timestamp': current_time, - 'block_name': field, - 'data': {field: val} - } - self.agent.publish_to_feed('ld_monitor', _data) - - self.agent.feeds['ld_monitor'].flush_buffer() - - return True, 'Acquisition exited cleanly.' - - def _stop_acq(self, session, params=None): - """ - Stops acq process. - """ - if self.take_data: - self.take_data = False - return True, 'requested to stop taking data.' - else: - return False, 'acq is not currently running' - - -def make_parser(parser=None): - if parser is None: - parser = argparse.ArgumentParser() - - pgroup = parser.add_argument_group('Agent Options') - pgroup.add_argument("--unit", default=1, - help="unit to listen to.") - pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], - help="Starting action for the agent.") - pgroup.add_argument("--sample-interval", type=float, default=15., help="Time between samples in seconds.") - - return parser - - -def main(args=None): - # Start logging - txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) - - parser = make_parser() - - # Interpret options in the context of site_config. - args = site_config.parse_args(agent_class='ld_monitor', - parser=parser, - args=args) - - # Automatically acquire data if requested (default) - init_params = False - if args.mode == 'init': - init_params = {'auto_acquire': False} - elif args.mode == 'acq': - init_params = {'auto_acquire': True} - print('init_params', init_params) - agent, runner = ocs_agent.init_site_agent(args) - - p = ld_monitorAgent(agent, - unit=int(args.unit), - sample_interval=args.sample_interval) - agent.register_task('init_ld_monitor', p.init_ld_monitor, - startup=init_params) - agent.register_process('acq', p.acq, p._stop_acq) - runner.run(agent, auto_reconnect=True) - - -if __name__ == '__main__': - main() +import argparse +import os +import socket +import time + +import numpy +import txaio +from ocs import ocs_agent, site_config +from ocs.ocs_twisted import Pacemaker, TimeoutLock + + +class LDMonitor: + """Receives and decodes data of the lightning detector via UDP + + Parameters + ---------- + host : str + Address of the computer reading the data. + port : int + Port of host where data will be received, default 1110. + + Attributes + ---------- + port : int + Port number in the local host to be bound to receive the data + log : txaio.tx.Logger + txaio logger object, created by the OCSAgent + sockopen : bool + Indicates when the socket is open + inittime : float + Logs the time at which initialization was carried out + data_dict : dictionary + Dictionary data stored from the lightning detector + newdata_dict : dictioanry + Dictionary where new data is received + """ + + def __init__(self, port=1110): + self.port = port + self.log = txaio.make_logger() + + # check if socket has been opened + if hasattr(self,'sockopen'): + if self.sockopen==True: + self.sock.close() + self.log.info('Socket closed preemptively') + + # open and bing socket to receieve lightning detector data + try: + self.log.info('Opening socket') + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) + except BaseException: + self.log.info('Failed to create socket') + + try: + self.log.info('Binding socket') + self.sock.bind(('', self.port)) + self.sockopen = True + self.inittime = time.time() + except BaseException: + self.log.info('Failed to bind socket') + + # initialize variables to account for absence of previous data + self.data_dict = { + 'd_type': numpy.nan, + 'field_value': numpy.nan, + 'rot_fault': 0, + 'time_last': -1., + 'tsince_last': -1., + 'dist': -1, + 'unit_d': 0, + 'high_field': -1, + 'hifield_value': -1000., + 'alarm_r': 0, + 'alarm_o': 0, + 'alarm_y': 0, + 'delay_g': 1, + 'clear': 0, + 'r_timer': 0, + 'o_timer': 0, + 'y_timer': 0, + 'g_timer': 0, + 'allclear_timer': 0, + 'faultcode': 0 + } + + self.log.info('LDMonitor function initialized') + + def read_data(self): + """ + Receives data from the lightning detector via UDP, + and returns a dictionary containing formatted data + """ + + self.data, _ = self.sock.recvfrom(1024) + self.data = self.data.decode('utf-8') + + # "e-field" sentence + if self.data[0] == '$': + data_split = self.data[1:].split(',') + rot_fault = int(data_split[1].split('*')[0]) + self.newdata_dict = { + 'd_type': 0, + 'field_value': float(data_split[0]), + 'rot_fault': rot_fault + } + self.data_dict.update(self.newdata_dict) + return self.data_dict + + elif self.data[0] == '@': + param = self.data[1:3] + + # "lightning strike" sentence + if param == 'LI': + data_split = self.data.split(',')[1:] + if data_split[2].split('*')[0] == 'Miles': + unit_d = 0 + elif data_split[2].split('*')[0] == 'Km': + unit_d = 1 + + self.newdata_dict = { + 'd_type': 1, + 'time_last': time.time(), + 'dist': int(data_split[1]), + 'unit_d': unit_d + } + self.data_dict.update(self.newdata_dict) + + self.log.info('Lightning strike detected!') + + return self.data_dict + + # "high e-field" sentence, account for 2 types + elif param == 'HF': + data_split = self.data[1:].split(',') + if len(data_split) == 1: + self.newdata_dict = { + 'd_type': 2, + 'high_field': 1, + 'hifield_value': float(self.data_dict['field_value']) + } + else: + self.newdata_dict = { + 'd_type': 2, + 'hifield_value': float(data_split[1]) + } + self.data_dict.update(self.newdata_dict) + return self.data_dict + + # "status" sentence + elif param == 'ST': + faultcode = int(self.data.split(',')[-1].split('*')[0], 16) + data_split = [int(i) for i in self.data.split(',')[1:-1]] + + self.newdata_dict = { + 'd_type': 3, + 'alarm_r': data_split[0], + 'alarm_o': data_split[1], + 'alarm_y': data_split[2], + 'delay_g': data_split[3], + 'clear': data_split[4], + 'r_timer': data_split[5], + 'o_timer': data_split[6], + 'y_timer': data_split[7], + 'g_timer': data_split[8], + 'allclear_timer': data_split[9], + 'faultcode': faultcode + } + + self.data_dict.update(self.newdata_dict) + return self.data_dict + + # disregard "alarm timers" sentence but update sentence type + elif param == 'WT': + self.newdata_dict = {'d_type': 4} + self.data_dict.update(self.newdata_dict) + return self.data_dict + + def read_cycle(self): + """ + In each cycle data is read and then parsed following + the format required to publish data to the ocs feed + """ + try: + self.read_data() + + # updates time since last strike if strike data exists + if self.data_dict['time_last'] == -1.: + self.data_dict['tsince_last'] = -1. + else: + self.data_dict['tsince_last'] = (time.time() + - self.data_dict['time_last']) + + return self.data_dict + + except BaseException: + pass + self.log.info('LD data read error, passing to next data iteration') + + +class LDMonitorAgent: + """Monitor the Lightning Detector data via UDP. + + Parameters + ---------- + agent : OCSAgent + OCSAgent object which forms this Agent + unit : int + sample_interval : float + Time between samples in seconds. + + Attributes + ---------- + agent : OCSAgent + OCSAgent object which forms this Agent + take_data : bool + Tracks whether or not the agent is actively issuing SNMP GET commands + to the ibootbar. Setting to false stops sending commands. + log : txaio.tx.Logger + txaio logger object, created by the OCSAgent + """ + + def __init__(self, agent, sample_interval=15.): + + self.agent: ocs_agent.OCSAgent = agent + self.log = agent.log + self.lock = TimeoutLock() + + self.pacemaker_freq = 1. / sample_interval + + self.initialized = False + self.take_data = False + + self.LDMonitor = None + + agg_params = { + 'frame_length': 10 * 60 # [sec] + } + self.agent.register_feed('LDMonitor', + record=True, + agg_params=agg_params, + buffer_time=0) + + def _connect(self): + """connect() + Instantiates LD object and check if client is open + """ + self.LDMonitor = LDMonitor() + self.initialized = True + + @ocs_agent.param('auto_acquire', default=False, type=bool) + def init_LDMonitor(self, session, params=None): + """ + **Task** - Perform first time setup of the LD. + + Parameters: + auto_acquire (bool, optional): Starts data acquisition after + initialization if True. Defaults to False. + + """ + + if self.initialized: + return True, "Already initialized." + + with self.lock.acquire_timeout(3, job='init') as acquired: + if not acquired: + self.log.warn("Could not start init because " + "{} is already running".format(self.lock.job)) + return False, "Could not acquire lock." + + session.set_status('starting') + + self._connect() + if not self.initialized: + return False, 'Could not connect to LD' + + # Start data acquisition if requested + if params['auto_acquire']: + self.agent.start('acq') + return True, 'LD initialized.' + + @ocs_agent.param('_') + def acq(self, session, params=None): + """acq() + + **Process** - Starts the data acquisition process + + Notes + _____ + The most recent data collected is stored in session data in the + structure:: + + >>> response.session['data'] + {'fields':{ + 'd_type': 3, 'field_value': 0.28, 'rot_fault': 0, + 'time_last': -1.0, 'tsince_last': -1.0, 'dist': -1, 'unit_d': 0, + 'high_field': -1, 'hifield_value': -1000.0, 'alarm_r': 0, + 'alarm_o': 0, 'alarm_y': 0, 'delay_g': 1, 'clear': 1, 'r_timer': 0, + 'o_timer': 0, 'y_timer': 0, 'g_timer': 0, 'allclear_timer': 0, + 'faultcode': 0 + }, + ... + 'connection': { + 'conn_timestamp': 1711285858.1063662, + 'connected': True}, 'data_timestamp': 1711285864.6254003 + } + } + + """ + with self.lock.acquire_timeout(0, job='acq') as acquired: + if not acquired: + self.log.warn("Could not start acq because {} is already running" + .format(self.lock.job)) + return False, "Could not acquire lock." + + session.set_status('running') + + self.take_data = True + + session.data = {"fields": {}} + + pm = Pacemaker(self.pacemaker_freq) + while self.take_data: + pm.sleep() + + current_time = time.time() + data = { + 'timestamp': current_time, + 'connection': {}, + 'block_name': 'registers', + 'data': {} + } + if not self.LDMonitor.sockopen: + self.initialized = False + + # Try to re-initialize if connection lost + if not self.initialized: + self._connect() + + # Only get readings if connected + if self.initialized: + session.data.update({'connection': {'conn_timestamp': self.LDMonitor.inittime, + 'connected': True}}) + + ld_data = self.LDMonitor.read_cycle() + + if ld_data: + for key, value in ld_data.items(): + data['data'][key]=value + session.data.update({'data_timestamp':current_time, + 'fields':ld_data}) + self.log.debug(ld_data) + else: + self.log.info('Connection error or error in processing data.') + self.initialized = False + + # Continue trying to connect + if not self.initialized: + session.data.update({'connection': {'last_attempt': time.time(), + 'connected': False}}) + self.log.info('Trying to reconnect.') + continue + + for field, val in data['data'].items(): + _data = { + 'timestamp': current_time, + 'block_name': field, + 'data': {field: val} + } + self.agent.publish_to_feed('LDMonitor', _data) + + self.agent.feeds['LDMonitor'].flush_buffer() + + return True, 'Acquisition exited cleanly.' + + def _stop_acq(self, session, params=None): + """ + Stops acq process. + """ + if self.take_data: + self.take_data = False + return True, 'requested to stop taking data.' + else: + return False, 'acq is not currently running' + + +def make_parser(parser=None): + if parser is None: + parser = argparse.ArgumentParser() + + pgroup = parser.add_argument_group('Agent Options') + pgroup.add_argument('--mode', type=str, choices=['idle', 'init', 'acq'], + help="Starting action for the agent.") + pgroup.add_argument("--sample-interval", type=float, default=.2, help="Time between samples in seconds.") + + return parser + + +def main(args=None): + # Start logging + txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) + + parser = make_parser() + + # Interpret options in the context of site_config. + args = site_config.parse_args(agent_class='LDMonitor', + parser=parser, + args=args) + + # Automatically acquire data if requested (default) + init_params = False + if args.mode == 'init': + init_params = {'auto_acquire': False} + elif args.mode == 'acq': + init_params = {'auto_acquire': True} + print('init_params', init_params) + agent, runner = ocs_agent.init_site_agent(args) + + p = LDMonitorAgent(agent,sample_interval=args.sample_interval) + agent.register_task('init_LDMonitor', p.init_LDMonitor, + startup=init_params) + agent.register_process('acq', p.acq, p._stop_acq) + runner.run(agent, auto_reconnect=True) + + +if __name__ == '__main__': + main() From a1e730298de2379e169d07fcb05b21a4102473f0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 24 Mar 2024 16:29:22 +0000 Subject: [PATCH 07/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- socs/agents/ld_monitor/agent.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 7f9afe252..1fc21643a 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -40,16 +40,16 @@ def __init__(self, port=1110): self.log = txaio.make_logger() # check if socket has been opened - if hasattr(self,'sockopen'): - if self.sockopen==True: + if hasattr(self, 'sockopen'): + if self.sockopen: self.sock.close() self.log.info('Socket closed preemptively') - + # open and bing socket to receieve lightning detector data try: self.log.info('Opening socket') self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except BaseException: self.log.info('Failed to create socket') @@ -126,7 +126,7 @@ def read_data(self): 'unit_d': unit_d } self.data_dict.update(self.newdata_dict) - + self.log.info('Lightning strike detected!') return self.data_dict @@ -184,7 +184,7 @@ def read_cycle(self): """ try: self.read_data() - + # updates time since last strike if strike data exists if self.data_dict['time_last'] == -1.: self.data_dict['tsince_last'] = -1. @@ -285,7 +285,7 @@ def acq(self, session, params=None): """acq() **Process** - Starts the data acquisition process - + Notes _____ The most recent data collected is stored in session data in the @@ -299,7 +299,7 @@ def acq(self, session, params=None): 'alarm_o': 0, 'alarm_y': 0, 'delay_g': 1, 'clear': 1, 'r_timer': 0, 'o_timer': 0, 'y_timer': 0, 'g_timer': 0, 'allclear_timer': 0, 'faultcode': 0 - }, + }, ... 'connection': { 'conn_timestamp': 1711285858.1063662, @@ -344,12 +344,12 @@ def acq(self, session, params=None): 'connected': True}}) ld_data = self.LDMonitor.read_cycle() - + if ld_data: for key, value in ld_data.items(): - data['data'][key]=value - session.data.update({'data_timestamp':current_time, - 'fields':ld_data}) + data['data'][key] = value + session.data.update({'data_timestamp': current_time, + 'fields': ld_data}) self.log.debug(ld_data) else: self.log.info('Connection error or error in processing data.') @@ -417,7 +417,7 @@ def main(args=None): print('init_params', init_params) agent, runner = ocs_agent.init_site_agent(args) - p = LDMonitorAgent(agent,sample_interval=args.sample_interval) + p = LDMonitorAgent(agent, sample_interval=args.sample_interval) agent.register_task('init_LDMonitor', p.init_LDMonitor, startup=init_params) agent.register_process('acq', p.acq, p._stop_acq) From f575fd291d44be94a44a58184aabcf800f70d424 Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Sun, 24 Mar 2024 18:19:37 +0000 Subject: [PATCH 08/16] another set of minor corrections after BK's review --- socs/agents/ld_monitor/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 7f9afe252..6263f19d5 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -328,7 +328,6 @@ def acq(self, session, params=None): data = { 'timestamp': current_time, 'connection': {}, - 'block_name': 'registers', 'data': {} } if not self.LDMonitor.sockopen: From 484f2223c24790b0693be886c68562f594cabc7a Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Sun, 24 Mar 2024 19:43:30 +0000 Subject: [PATCH 09/16] final changes after PR review --- socs/agents/ld_monitor/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 4568e8674..8dea688d6 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -171,7 +171,7 @@ def read_data(self): self.data_dict.update(self.newdata_dict) return self.data_dict - # disregard "alarm timers" sentence but update sentence type + # disregard "alarm timers" sentence, but update sentence type elif param == 'WT': self.newdata_dict = {'d_type': 4} self.data_dict.update(self.newdata_dict) From 219bcc2129eb4653a726887d11125d3a347ee891 Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Sun, 24 Mar 2024 20:16:09 +0000 Subject: [PATCH 10/16] removed unused docstrings --- socs/agents/ld_monitor/agent.py | 1 - 1 file changed, 1 deletion(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 8dea688d6..d315708c0 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -206,7 +206,6 @@ class LDMonitorAgent: ---------- agent : OCSAgent OCSAgent object which forms this Agent - unit : int sample_interval : float Time between samples in seconds. From c8c765545088466f698a6b0af4a8ede461eb006b Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Wed, 1 May 2024 20:48:14 +0000 Subject: [PATCH 11/16] Function name modifications on agent, added .rst --- socs/agents/ld_monitor/agent.py | 14 +++--- socs/agents/ld_monitor/ld_monitor.rst | 68 +++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) create mode 100644 socs/agents/ld_monitor/ld_monitor.rst diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index d315708c0..d815cb615 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -236,7 +236,7 @@ def __init__(self, agent, sample_interval=15.): agg_params = { 'frame_length': 10 * 60 # [sec] } - self.agent.register_feed('LDMonitor', + self.agent.register_feed('ld_monitor', record=True, agg_params=agg_params, buffer_time=0) @@ -245,12 +245,14 @@ def _connect(self): """connect() Instantiates LD object and check if client is open """ + self.LDMonitor = LDMonitor() self.initialized = True @ocs_agent.param('auto_acquire', default=False, type=bool) - def init_LDMonitor(self, session, params=None): - """ + def init_ld_monitor(self, session, params=None): + """init_ld_monitor(auto_acquire=False) + **Task** - Perform first time setup of the LD. Parameters: @@ -366,9 +368,9 @@ def acq(self, session, params=None): 'block_name': field, 'data': {field: val} } - self.agent.publish_to_feed('LDMonitor', _data) + self.agent.publish_to_feed('ld_monitor', _data) - self.agent.feeds['LDMonitor'].flush_buffer() + self.agent.feeds['ld_monitor'].flush_buffer() return True, 'Acquisition exited cleanly.' @@ -416,7 +418,7 @@ def main(args=None): agent, runner = ocs_agent.init_site_agent(args) p = LDMonitorAgent(agent, sample_interval=args.sample_interval) - agent.register_task('init_LDMonitor', p.init_LDMonitor, + agent.register_task('init_ld_monitor', p.init_ld_monitor, startup=init_params) agent.register_process('acq', p.acq, p._stop_acq) runner.run(agent, auto_reconnect=True) diff --git a/socs/agents/ld_monitor/ld_monitor.rst b/socs/agents/ld_monitor/ld_monitor.rst new file mode 100644 index 000000000..6004878b3 --- /dev/null +++ b/socs/agents/ld_monitor/ld_monitor.rst @@ -0,0 +1,68 @@ +.. highlight:: rst + +======================== +Lightning Detector Agent +======================== + +The lightning detector agent communicates with the Lightning Detector System at the site and parses the data to obtain approximate lightning strike distances and standardized alarm levels. + +.. argparse:: + :module: socs.agents.ld_monitor.agent.LDMonitorAgent + :func: make_parser + :prog: agent.py + +Configuration File Examples +--------------------------- + +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. + +OCS Site Config +```````````````` + +An example site-config-file block:: + + {'agent-class': 'LDMonitorAgent', + 'instance-id': 'ld_monitor', + 'arguments': [['--mode', 'acq']}, + +Docker Compose +`````````````` + +An example docker-compose configuration:: + + ocs-template: + image: simonsobs/ocs-lightning-detector:latest + hostname: ocs-docker + environment: + - LOGLEVEL=info + volumes: + - ${OCS_CONFIG_DIR}:/config + +Description +----------- + +The Lightning Detector System is connnected through serial communication with a dedicated PC at the site, in which a propietary application calculates approximate lightning strike distances and adjusts alarm levels accordingly. Data is parsed and the most important parameters are updated. The dedicated PC is continously running a script that streams the data via UDP to the client. + +Transmitted data +---------------- + +The lightning detector transmits its data in "sentences". There are 5 types of expected sentences: +-electric field +-lightning strike +-high-field +-status +-alarm timers +Electric field sentences report the electric field value measured by the Electric Field Mill in kV/m. Strike sentences include lightning strike distance and units (meters or miles) and is only transmitted if a strike is detected. High field sentences report an alarm status with respect to set thresholds of electric field. Status sentences include data such as alarms (red, orange, yellow), remaining timers, all clear status, fault codes, among others. Alarm timers sentences are disregarded, as its information is redundant. Each of the sentences' data is parsed and data updated to the feed. + +Agent API +--------- + +.. autoclass:: socs.agents.ld_monitor.agent.LDMonitorAgent + :members: + +Supporting APIs +--------------- + +.. autoclass:: socs.agents.ld_monitor.agent.LDMonitor + :members: From 80db4dbb76a56bfd6a81601eea915cd22b58a65e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 1 May 2024 20:50:20 +0000 Subject: [PATCH 12/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- socs/agents/ld_monitor/ld_monitor.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socs/agents/ld_monitor/ld_monitor.rst b/socs/agents/ld_monitor/ld_monitor.rst index 6004878b3..72bc49514 100644 --- a/socs/agents/ld_monitor/ld_monitor.rst +++ b/socs/agents/ld_monitor/ld_monitor.rst @@ -47,7 +47,7 @@ The Lightning Detector System is connnected through serial communication with a Transmitted data ---------------- -The lightning detector transmits its data in "sentences". There are 5 types of expected sentences: +The lightning detector transmits its data in "sentences". There are 5 types of expected sentences: -electric field -lightning strike -high-field From 663ca2618916bcf45c2fa8ddd5f83b80200dc596 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 May 2024 23:58:48 -0400 Subject: [PATCH 13/16] Move rst file into docs directory --- {socs/agents/ld_monitor => docs/agents}/ld_monitor.rst | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {socs/agents/ld_monitor => docs/agents}/ld_monitor.rst (100%) diff --git a/socs/agents/ld_monitor/ld_monitor.rst b/docs/agents/ld_monitor.rst similarity index 100% rename from socs/agents/ld_monitor/ld_monitor.rst rename to docs/agents/ld_monitor.rst From e8e94ad7db8c2a379c651321c26daa7d96d7cfae Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 6 May 2024 23:59:05 -0400 Subject: [PATCH 14/16] Add docs page to index --- docs/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.rst b/docs/index.rst index 7d8e746dd..dcb51b8f3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -62,6 +62,7 @@ API Reference Full API documentation for core parts of the SOCS library. agents/lakeshore372 agents/lakeshore425 agents/latrt_xy_stage + agents/ld_monitor agents/magpie agents/meinberg_m1000_agent agents/pfeiffer From fbe2ff1e7ed00763ecd4d5d9cfbb8da86b99d89b Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 May 2024 00:08:21 -0400 Subject: [PATCH 15/16] Fix build warnings and format docs --- docs/agents/ld_monitor.rst | 46 ++++++++++++++++++++++----------- socs/agents/ld_monitor/agent.py | 30 ++++++++++----------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/docs/agents/ld_monitor.rst b/docs/agents/ld_monitor.rst index 72bc49514..c82e9a982 100644 --- a/docs/agents/ld_monitor.rst +++ b/docs/agents/ld_monitor.rst @@ -4,10 +4,12 @@ Lightning Detector Agent ======================== -The lightning detector agent communicates with the Lightning Detector System at the site and parses the data to obtain approximate lightning strike distances and standardized alarm levels. +The lightning detector agent communicates with the Lightning Detector System at +the site and parses the data to obtain approximate lightning strike distances +and standardized alarm levels. .. argparse:: - :module: socs.agents.ld_monitor.agent.LDMonitorAgent + :module: socs.agents.ld_monitor.agent :func: make_parser :prog: agent.py @@ -23,8 +25,8 @@ OCS Site Config An example site-config-file block:: {'agent-class': 'LDMonitorAgent', - 'instance-id': 'ld_monitor', - 'arguments': [['--mode', 'acq']}, + 'instance-id': 'ld_monitor', + 'arguments': [['--mode', 'acq']}, Docker Compose `````````````` @@ -32,7 +34,7 @@ Docker Compose An example docker-compose configuration:: ocs-template: - image: simonsobs/ocs-lightning-detector:latest + image: simonsobs/socs:latest hostname: ocs-docker environment: - LOGLEVEL=info @@ -42,18 +44,32 @@ An example docker-compose configuration:: Description ----------- -The Lightning Detector System is connnected through serial communication with a dedicated PC at the site, in which a propietary application calculates approximate lightning strike distances and adjusts alarm levels accordingly. Data is parsed and the most important parameters are updated. The dedicated PC is continously running a script that streams the data via UDP to the client. +The Lightning Detector System is connnected through serial communication with a +dedicated PC at the site, in which a propietary application calculates +approximate lightning strike distances and adjusts alarm levels accordingly. +Data is parsed and the most important parameters are updated. The dedicated PC +is continously running a script that streams the data via UDP to the client. -Transmitted data ----------------- +Transmitted Data +```````````````` -The lightning detector transmits its data in "sentences". There are 5 types of expected sentences: --electric field --lightning strike --high-field --status --alarm timers -Electric field sentences report the electric field value measured by the Electric Field Mill in kV/m. Strike sentences include lightning strike distance and units (meters or miles) and is only transmitted if a strike is detected. High field sentences report an alarm status with respect to set thresholds of electric field. Status sentences include data such as alarms (red, orange, yellow), remaining timers, all clear status, fault codes, among others. Alarm timers sentences are disregarded, as its information is redundant. Each of the sentences' data is parsed and data updated to the feed. +The lightning detector transmits its data in "sentences". There are 5 types of +expected sentences: + +* electric field +* lightning strike +* high-field +* status +* alarm timers + +Electric field sentences report the electric field value measured by the +Electric Field Mill in kV/m. Strike sentences include lightning strike distance +and units (meters or miles) and is only transmitted if a strike is detected. +High field sentences report an alarm status with respect to set thresholds of +electric field. Status sentences include data such as alarms (red, orange, +yellow), remaining timers, all clear status, fault codes, among others. Alarm +timers sentences are disregarded, as its information is redundant. Each of the +sentences' data are parsed and published to the feed. Agent API --------- diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index d815cb615..2816fa475 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -292,21 +292,21 @@ def acq(self, session, params=None): The most recent data collected is stored in session data in the structure:: - >>> response.session['data'] - {'fields':{ - 'd_type': 3, 'field_value': 0.28, 'rot_fault': 0, - 'time_last': -1.0, 'tsince_last': -1.0, 'dist': -1, 'unit_d': 0, - 'high_field': -1, 'hifield_value': -1000.0, 'alarm_r': 0, - 'alarm_o': 0, 'alarm_y': 0, 'delay_g': 1, 'clear': 1, 'r_timer': 0, - 'o_timer': 0, 'y_timer': 0, 'g_timer': 0, 'allclear_timer': 0, - 'faultcode': 0 - }, - ... - 'connection': { - 'conn_timestamp': 1711285858.1063662, - 'connected': True}, 'data_timestamp': 1711285864.6254003 - } - } + >>> response.session['data'] + {'fields':{ + 'd_type': 3, 'field_value': 0.28, 'rot_fault': 0, + 'time_last': -1.0, 'tsince_last': -1.0, 'dist': -1, 'unit_d': 0, + 'high_field': -1, 'hifield_value': -1000.0, 'alarm_r': 0, + 'alarm_o': 0, 'alarm_y': 0, 'delay_g': 1, 'clear': 1, 'r_timer': 0, + 'o_timer': 0, 'y_timer': 0, 'g_timer': 0, 'allclear_timer': 0, + 'faultcode': 0 + }, + ... + 'connection': { + 'conn_timestamp': 1711285858.1063662, + 'connected': True}, 'data_timestamp': 1711285864.6254003 + } + } """ with self.lock.acquire_timeout(0, job='acq') as acquired: From a278a1aa7be7729534b7c3be56078aa3af197ddf Mon Sep 17 00:00:00 2001 From: Felipe Carrero Date: Mon, 6 Jan 2025 15:53:13 +0000 Subject: [PATCH 16/16] fixed bug when reading lightning strike sentences --- socs/agents/ld_monitor/agent.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/socs/agents/ld_monitor/agent.py b/socs/agents/ld_monitor/agent.py index 2816fa475..f6d901c36 100644 --- a/socs/agents/ld_monitor/agent.py +++ b/socs/agents/ld_monitor/agent.py @@ -31,7 +31,7 @@ class LDMonitor: Logs the time at which initialization was carried out data_dict : dictionary Dictionary data stored from the lightning detector - newdata_dict : dictioanry + newdata_dict : dictionary Dictionary where new data is received """ @@ -114,15 +114,15 @@ def read_data(self): # "lightning strike" sentence if param == 'LI': data_split = self.data.split(',')[1:] - if data_split[2].split('*')[0] == 'Miles': + if data_split[1].split('*')[0] == 'Miles': unit_d = 0 - elif data_split[2].split('*')[0] == 'Km': + elif data_split[1].split('*')[0] == 'Km': unit_d = 1 self.newdata_dict = { 'd_type': 1, 'time_last': time.time(), - 'dist': int(data_split[1]), + 'dist': int(data_split[0]), 'unit_d': unit_d } self.data_dict.update(self.newdata_dict)