Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions dispatcher/DAQController.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DAQController():
resetting of runs (the ~hourly stop/start) during normal operations.
"""

def __init__(self, config, daq_config, mongo_connector, logger, hypervisor):
def __init__(self, config, daq_config, mongo_connector, logger, hypervisor, logic_timer, gps_start_detectors):

self.mongo = mongo_connector
self.hypervisor = hypervisor
Expand All @@ -43,7 +43,7 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor):

# Timeout properties come from config
self.timeouts = {
k.lower() : int(config['%sCommandTimeout' % k])
k.lower() : int(config[f'{k}CommandTimeout'])
for k in ['Arm','Start','Stop']}
self.stop_retries = int(config['RetryReset'])

Expand All @@ -58,6 +58,9 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor):
self.start_cmd_delay = float(config['StartCmdDelay'])
self.stop_cmd_delay = float(config['StopCmdDelay'])

self.logic_timer = logic_timer
self.gps_start_detectors = gps_start_detectors

def solve_problem(self, latest_status, goal_state):
"""
This is sort of the whole thing that all the other code is supporting
Expand Down Expand Up @@ -154,7 +157,7 @@ def solve_problem(self, latest_status, goal_state):
# Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed
elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT:
self.logger.info(f"The {det} is in timeout, check timeouts")
self.logger.debug("Checking %s timeouts", det)
self.logger.debug(f"Checking {det} timeouts")
self.handle_timeout(detector=det)

elif latest_status[det]['status'] == DAQ_STATUS.ERROR:
Expand Down Expand Up @@ -214,17 +217,25 @@ def control_detector(self, command, detector, force=False):
gs = self.goal_state
if command == 'arm':
if self.one_detector_arming:
self.logger.info('Another detector already arming, can\'t arm %s' % detector)
self.logger.info(f"Another detector already arming, can't arm {detector}")
# this leads to run number overlaps
return 1
readers, cc = self.mongo.get_hosts_for_mode(gs[detector]['mode'])
hosts = (cc, readers)
delay = 0
self.one_detector_arming = True
elif command == 'start':
delay = self.start_cmd_delay
if detector in self.gps_start_detectors:
dt = self.mongo.time_to_next_gps()
if 1 < dt < self.logic_timer+1:
# can't start unless there's a GPS signal in the offing
# we give it at least 1 second
self.logger.debug(f'Waiting for GPS signal to start {detector}')
return 0
delay = 0
readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode'])
hosts = (readers, cc)
delay = self.start_cmd_delay
#Reset arming timeout counter
self.missed_arm_cycles[detector]=0
else: # stop
Expand All @@ -249,8 +260,8 @@ def control_detector(self, command, detector, force=False):
return 0

else:
self.logger.debug('Can\'t send %s to %s, timeout at %i/%i' % (
command, detector, dt, self.timeouts[command]))
self.logger.debug(f'Can\'t send {command} to {detector}, '
f'timeout at {int(dt)}/{self.timeouts[command]}')
return 1
return 0

Expand Down Expand Up @@ -291,8 +302,8 @@ def check_timeouts(self, detector, command=None):
local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[detector]+1)

if dt < local_timeouts[command]:
self.logger.debug('%i is within the %i second timeout for a %s command' %
(dt, local_timeouts[command], command))
self.logger.debug(f'{int(dt)} is within the {local_timeouts[command]} '
f'second timeout for a {command} command')
else:
# timing out, maybe send stop?
if command == 'stop':
Expand Down Expand Up @@ -320,13 +331,12 @@ def check_timeouts(self, detector, command=None):
self.error_stop_count[detector] += 1
else:
self.mongo.log_error(
('%s took more than %i seconds to %s, indicating a possible timeout or error' %
(detector, self.timeouts[command], command)),
(f'{detector} took more than {self.timeouts[command]} seconds '
f'to {command}, indicating a possible timeout or error'),
'ERROR',
'%s_TIMEOUT' % command.upper())
f'{command.upper()}_TIMEOUT')
#Keep track of how often the arming sequence times out
if self.control_detector(detector=detector, command='stop') == 0:
# only increment the counter if we actually issued a STOP
if self.control_detector(detector=detector, command='stop') == 0: # only increment the counter if we actually issued a STOP
self.missed_arm_cycles[detector] += 1
self.logger.info(f'{detector} missed {self.missed_arm_cycles[detector]} arm cycles')
else:
Expand Down Expand Up @@ -358,8 +368,9 @@ def check_run_turnover(self, detector):
time_now = now()
run_length = int(self.goal_state[detector]['stop_after'])*60
run_duration = (time_now - start_time).total_seconds()
self.logger.debug('Checking run turnover for %s: %i/%i' % (detector, run_duration, run_length))
self.logger.debug(f'Checking run turnover for {detector}: '
f'{run_duration}/{run_length}')
if run_duration > run_length:
self.logger.info('Stopping run for %s' % detector)
self.logger.info(f'Stopping run for {detector}')
self.control_detector(detector=detector, command='stop')

74 changes: 57 additions & 17 deletions dispatcher/MongoConnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MongoConnect(object):

"""

def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False):
def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, testing=False, logic_timer, gps_start_detectors, gps_period):

# Define DB connectivity. Log is separate to make it easier to split off if needed
dbn = config['ControlDatabaseName']
Expand All @@ -75,6 +75,7 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor,
'log': self.dax_db['log'],
'options': self.dax_db['options'],
'run': self.runs_db[config['RunsDatabaseCollection']],
'gps': self.runs_db['gps_sync'],
}

self.error_sent = {}
Expand Down Expand Up @@ -147,6 +148,10 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor,
self.command_thread = threading.Thread(target=self.process_commands)
self.command_thread.start()

self.gps_start_detectors = gps_start_detectors
self.gps_period = gps_period
self.logic_timer = logic_timer

def quit(self):
self.run = False
try:
Expand Down Expand Up @@ -197,14 +202,15 @@ def get_update(self, dc):
each node we know about
"""
try:
sort=[('_id', -1)]
for detector in dc.keys():
for host in dc[detector]['readers'].keys():
doc = self.collections['node_status'].find_one({'host': host},
sort=[('_id', -1)])
sort=sort)
dc[detector]['readers'][host] = doc
for host in dc[detector]['controller'].keys():
doc = self.collections['node_status'].find_one({'host': host},
sort=[('_id', -1)])
sort=sort)
dc[detector]['controller'][host] = doc
except Exception as e:
self.logger.error(f'Got error while getting update: {type(e)}: {e}')
Expand All @@ -215,6 +221,22 @@ def get_update(self, dc):
# Now compute aggregate status
return self.latest_status if self.aggregate_status() is None else None

def time_to_next_gps(self):
"""
In how many seconds can we expect the next GPS signal?
"""
# how long ago was the most recent signal?
utc = self.get_gps_time()[2]
dt = now().timestamp() - utc
return self.gps_period - dt

def get_gps_time(self):
"""
Return the most recent GPS timestamp, and its UTC equivalent (approx)
"""
doc = self.collections['gps'].find_one({'channel': 0}, sort=[('_id', -1)])
return doc['gps_sec'], doc['gps_ns'], int(str(doc['_id'])[:8], 16)

def clear_error_timeouts(self):
self.error_sent = {}

Expand Down Expand Up @@ -487,14 +509,14 @@ def get_run_mode(self, mode):
return None
base_doc = self.collections['options'].find_one({'name': mode})
if base_doc is None:
self.log_error("Mode '%s' doesn't exist" % mode, "info", "info")
self.log_error(f"Mode '{mode}' doesn't exist", "info", "info")
return None
if 'includes' not in base_doc or len(base_doc['includes']) == 0:
return base_doc
try:
if self.collections['options'].count_documents({'name':
{'$in': base_doc['includes']}}) != len(base_doc['includes']):
self.log_error("At least one subconfig for mode '%s' doesn't exist" % mode, "WARNING", "WARNING")
self.log_error(f"At least one subconfig for mode '{mode}' doesn't exist", "WARNING", "WARNING")
return None
return list(self.collections["options"].aggregate([
{'$match': {'name': mode}},
Expand All @@ -507,7 +529,7 @@ def get_run_mode(self, mode):
{'$project': {'_id': 0, 'description': 0, 'includes': 0, 'subconfig': 0}},
]))[0]
except Exception as e:
self.logger.error("Got a %s exception in doc pulling: %s" % (type(e), e))
self.logger.error(f"Got a {type(e)} exception in doc pulling: {e}")
return None

def get_hosts_for_mode(self, mode, detector=None):
Expand Down Expand Up @@ -733,7 +755,7 @@ def log_error(self, message, priority, etype):
if ( (etype in self.error_sent and self.error_sent[etype] is not None) and
(etype in self.error_timeouts and self.error_timeouts[etype] is not None) and
(nowtime-self.error_sent[etype]).total_seconds() <= self.error_timeouts[etype]):
self.logger.debug("Could log error, but still in timeout for type %s"%etype)
self.logger.debug(f"Could log error, but still in timeout for type {type}")
return
self.error_sent[etype] = nowtime
try:
Expand All @@ -744,7 +766,7 @@ def log_error(self, message, priority, etype):
})
except Exception as e:
self.logger.error(f'Database error, can\'t issue error message: {type(e)}, {e}')
self.logger.info("Error message from dispatcher: %s" % (message))
self.logger.info(f"Error message from dispatcher: {message}")
return

def get_run_start(self, number):
Expand Down Expand Up @@ -777,10 +799,11 @@ def insert_run_doc(self, detector):
'user': self.goal_state[detector]['user'],
'mode': self.goal_state[detector]['mode'],
'bootstrax': {'state': None},
'end': None
'end': None,
'gps_start': [0,0],
}

# If there's a source add the source. Also add the complete ini file.
# If there's a source add the source. Also add the complete config
cfg = self.get_run_mode(self.goal_state[detector]['mode'])
if cfg is not None and 'source' in cfg.keys():
run_doc['source'] = str(cfg['source'])
Expand All @@ -802,13 +825,30 @@ def insert_run_doc(self, detector):
'location': cfg['strax_output_path']
}]

# The cc needs some time to get started
time.sleep(self.cc_start_wait)
try:
start_time = self.get_ack_time(detector, 'start')
except Exception as e:
self.logger.error('Couldn\'t find start time ack')
start_time = None
if detector in self.gps_start_detectors:
# wait for GPS signal to come along
time.sleep(self.time_to_next_gps()+0.5)
try:
gps_sec, gps_ns, utc = self.get_gps_time()
if (now().timestamp() - utc) < self.logic_timer:
# we got the freshest entry
start_time = datetime.datetime.fromtimestamp(gps_sec + gps_ns/1e9,
tz=pytz.utc)
run_doc['gps_start'] = [gps_sec, gps_ns]
else:
self.logger.error(f'GPS timestamp too old? {gps_sec}, {gps_ns}, {utc}')
start_time = None
except Exception as e:
self.logger.error('Couldn\'t find a start time')
start_time = None
else:
# The cc needs some time to get started
time.sleep(self.cc_start_wait)
try:
start_time = self.get_ack_time(detector, 'start')
except Exception as e:
self.logger.error('Couldn\'t find start time ack')
start_time = None

if start_time is None:
start_time = now()-datetime.timedelta(seconds=2)
Expand Down
11 changes: 10 additions & 1 deletion dispatcher/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ LogName = dispatcher

# Poll frequency in seconds for main program loop. It makes
# some sense to make other time-based options multiples of
# this, though not required
# this, though not required. Do not make this an even divisor
# of the GPS period or you could deadlock
PollFrequency = 4

# How long since a client's last check-in until we consider
Expand All @@ -20,6 +21,11 @@ ControlDatabaseName = daq
RunsDatabaseName = xenonnt
RunsDatabaseCollection = runs

# these detectors start in sync with the GPS
StartWithGPS = tpc
# period of GPS signals
GPSPeriod = 10

# Timeouts (seconds to wait until we assume it failed)
# Give Arm command some time in case baselines noisy
ArmCommandTimeout = 60
Expand All @@ -42,6 +48,7 @@ HypervisorNuclearTimeout = 1800
# Time between reader and CC start (can be float)
# THIS IS AN IMPORTANT VALUE, DON'T CHANGE IT UNLESS YOU KNOW WHAT YOU'RE DOING
# 20210308 - Darryl
# 20220414 - Only meaningful for detectors not starting with GPS signals
StartCmdDelay = 1
# Time between CC and reader stop (less important, also can be float)
StopCmdDelay = 5
Expand Down Expand Up @@ -88,6 +95,8 @@ TimeoutActionThreshold = 10
ControlDatabaseName = test
RunsDatabaseName = test
RunsDatabaseCollection = runs
StartWithGPS=
GPSPeriod=10
ArmCommandTimeout = 60
MaxArmCycles=3
StartCommandTimeout = 20
Expand Down
8 changes: 5 additions & 3 deletions dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def setup():

def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, args):
sh = daqnt.SignalHandler()
sleep_period = int(config['PollFrequency'])

# Declare necessary classes
hv = Hypervisor(
Expand All @@ -73,14 +74,15 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc,
testing=args.test,
slackbot=SlackBot,
)
mc = MongoConnect(config, daq_config, logger, control_mc, runs_mc, hv, args.test)
dc = DAQController(config, daq_config, mc, logger, hv)
gps_start = config['StartWithGPS'].split()
gps_period = int(config['GPSPeriod')
mc = MongoConnect(config, daq_config, logger, control_mc, runs_mc, hv, args.test, sleep_period, gps_start, gps_period)
dc = DAQController(config, daq_config, mc, logger, hv, sleep_period, gps_start)

# connect the triangle
hv.mongo_connect = mc
hv.daq_controller = dc

sleep_period = int(config['PollFrequency'])

last_loop_dt = 0

Expand Down