Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# temporary files
*.swp
*~
*#

# executable
redax
Expand Down
240 changes: 121 additions & 119 deletions dispatcher/DAQController.py

Large diffs are not rendered by default.

173 changes: 97 additions & 76 deletions dispatcher/MongoConnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MongoConnect(object):
D. Coderre, 12. Mar. 2019
D. Masson, 2019-2021
S. di Pede, 2020-2021
V. D'Andrea, Oct 2022

Brief: This code handles the mongo connectivity for both the DAQ
databases (the ones used for system-wide communication) and the
Expand Down Expand Up @@ -130,13 +131,13 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor,
self.dc = daq_config
self.hv_timeout_fix = {}
for detector in self.dc:
self.latest_status[detector] = {'readers': {}, 'controller': {}}
#self.latest_status[detector] = {'readers': {}, 'controller': {}}
for reader in self.dc[detector]['readers']:
self.latest_status[detector]['readers'][reader] = {}
#self.latest_status[detector]['readers'][reader] = {}
self.host_config[reader] = detector
self.hv_timeout_fix[reader] = now()
for controller in self.dc[detector]['controller']:
self.latest_status[detector]['controller'][controller] = {}
#self.latest_status[detector]['controller'][controller] = {}
self.host_config[controller] = detector
self.hv_timeout_fix[controller] = now()

Expand Down Expand Up @@ -213,8 +214,8 @@ def get_update(self, dc):
self.latest_status = dc

# Now compute aggregate status
return self.latest_status if self.aggregate_status() is None else None

return self.latest_status if self.aggregate_status() else None
def clear_error_timeouts(self):
self.error_sent = {}

Expand All @@ -236,17 +237,16 @@ def aggregate_status(self):
apply to both
"""
now_time = time.time()
ret = None
aggstat = {
k:{ 'status': -1,
'detector': k,
'rate': 0,
'time': now(),
'buff': 0,
'mode': None,
'pll_unlocks': 0,
'number': -1}
for k in self.dc}
k:{ 'status': -1,
'detector': k,
'rate': 0,
'time': now(),
'buff': 0,
'mode': None,
'pll_unlocks': 0,
'number': -1}
for k in self.dc}
phys_stat = {k: [] for k in self.dc}
for detector in self.latest_status.keys():
# detector = logical
Expand Down Expand Up @@ -304,7 +304,7 @@ def aggregate_status(self):
else:
status_list = list(statuses.values())

# Now we aggregate the statuses
# Now we aggregate the statuses for the logical detectors
status = self.combine_statuses(status_list)

self.latest_status[detector]['status'] = status
Expand All @@ -316,10 +316,16 @@ def aggregate_status(self):
except Exception as e:
self.logger.error(f'DB snafu? Couldn\'t update aggregate status. '
f'{type(e)}, {e}')

return None
self.physical_status = phys_stat
return ret


# Aggregate status for the physical detectors
for logical in self.latest_status.keys():
for det in self.latest_status[logical]['detectors'].keys():
status = self.combine_statuses(phys_stat[det])
self.latest_status[logical]['detectors'][det]['status'] = status
return True

def combine_statuses(self, status_list):
# First, the "or" statuses
for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']:
Expand Down Expand Up @@ -425,9 +431,9 @@ def is_linked(self, a, b):
self.logger.debug(f'{a} and {b} aren\'t link?? How this happen?? {mode_a} {detectors}')
return False

def get_super_detector(self):
def get_logical_detector(self):
"""
Get the Super Detector configuration
Get the Logical Detector configuration
if the detectors are in a compatible linked mode.
- case A: tpc, mv and nv all linked
- case B: tpc, mv and nv all un-linked
Expand All @@ -436,43 +442,56 @@ def get_super_detector(self):
- case E: tpc unlinked, mv and nv linked
We will check the compatibility of the linked mode for a pair of detectors per time.
"""
ret = {'tpc': {'controller': self.dc['tpc']['controller'][:],
'readers': self.dc['tpc']['readers'][:],
'detectors': ['tpc']}}

tpc = self.dc['tpc']
mv = self.dc['muon_veto']
nv = self.dc['neutron_veto']

tpc_mv = self.is_linked('tpc', 'muon_veto')
tpc_nv = self.is_linked('tpc', 'neutron_veto')
mv_nv = self.is_linked('muon_veto', 'neutron_veto')

# tpc and muon_veto linked mode
if tpc_mv:
# case A or C
ret['tpc']['controller'] += mv['controller']
ret['tpc']['readers'] += mv['readers']
ret['tpc']['detectors'] += ['muon_veto']
else:
# case B or E
ret['muon_veto'] = {'controller': mv['controller'][:],
'readers': mv['readers'][:],
'detectors': ['muon_veto']}
if tpc_nv:
# case A or D
ret['tpc']['controller'] += nv['controller'][:]
ret['tpc']['readers'] += nv['readers'][:]
ret['tpc']['detectors'] += ['neutron_veto']
elif mv_nv and not tpc_mv:

is_tpc_mv = self.is_linked('tpc', 'muon_veto')
is_tpc_nv = self.is_linked('tpc', 'neutron_veto')
is_mv_nv = self.is_linked('muon_veto', 'neutron_veto')

if is_tpc_mv and is_tpc_nv and is_mv_nv:
# case A
ret = {'all_linked': {'controller': tpc['controller'][:] + mv['controller'][:] + nv['controller'][:],
'readers': tpc['readers'][:] + mv['readers'][:] + nv['readers'][:],
'detectors': ['tpc','muon_veto','neutron_veto']}}
elif is_tpc_mv and not is_tpc_nv and not is_mv_nv:
# case C
ret = {'tpc_mv': {'controller': tpc['controller'][:] + mv['controller'][:],
'readers': tpc['readers'][:] + mv['readers'][:],
'detectors': ['tpc','muon_veto']},
'nv': {'controller': nv['controller'][:],
'readers': nv['readers'][:],
'detectors': ['neutron_veto']}}
elif is_tpc_nv and not is_tpc_mv and not is_mv_nv:
# case D
ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:],
'readers': tpc['readers'][:] + nv['readers'][:],
'detectors': ['tpc','neutron_veto']},
'mv': {'controller': mv['controller'][:],
'readers': mv['readers'][:],
'detectors': ['muon_veto']}}
elif is_mv_nv and not is_tpc_mv and not is_tpc_nv:
# case E
ret['muon_veto']['controller'] += nv['controller'][:]
ret['muon_veto']['readers'] += nv['readers'][:]
ret['muon_veto']['detectors'] += ['neutron_veto']
ret = {'tpc': {'controller': tpc['controller'][:],
'readers': tpc['readers'][:],
'detectors': ['tpc']},
'mv_nv': {'controller': mv['controller'][:] + nv['controller'][:],
'readers': mv['readers'][:] + nv['readers'][:],
'detectors': ['muon_veto','neutron_veto']}}
else:
# case B or C
ret['neutron_veto'] = {'controller': nv['controller'][:],
'readers': nv['readers'][:],
'detectors': ['neutron_veto']}

# case B
ret = {'tpc': {'controller': tpc['controller'][:],
'readers': tpc['readers'][:],
'detectors': ['tpc']},
'mv': {'controller': mv['controller'][:],
'readers': mv['readers'][:],
'detectors': ['muon_veto']},
'nv': {'controller': nv['controller'][:],
'readers': nv['readers'][:],
'detectors': ['neutron_veto']}}

# convert the host lists to dics for later
for det in list(ret.keys()):
ret[det]['controller'] = {c:{} for c in ret[det]['controller']}
Expand Down Expand Up @@ -543,19 +562,20 @@ def get_next_run_number(self):
return 0
return list(cursor)[0]['number']+1

def set_stop_time(self, number, detectors, force):
def set_stop_time(self, number, logical, force):
"""
Sets the 'end' field of the run doc to the time when the STOP command was ack'd
"""
self.logger.info(f"Updating run {number} with end time ({detectors})")
self.logger.info(f"Updating run {number} with end time ({logical})")
if number == -1:
return
try:
time.sleep(0.5) # this number depends on the CC command polling time
if (endtime := self.get_ack_time(detectors, 'stop') ) is None:
if (endtime := self.get_ack_time(logical, 'stop') ) is None:
self.logger.debug(f'No end time found for run {number}')
endtime = now() -datetime.timedelta(seconds=1)
query = {"number": int(number), "end": None, 'detectors': detectors}
endtime = now() - datetime.timedelta(seconds=1)
det = list(latest_status[logical]['detectors'].keys())[0]
query = {"number": int(number), "end": None, 'detectors': det}
updates = {"$set": {"end": endtime}}
if force:
updates["$push"] = {"tags": {"name": "_messy", "user": "daq",
Expand All @@ -571,10 +591,11 @@ def set_stop_time(self, number, detectors, force):
]):
rate[doc['_id']] = {'avg': doc['avg'], 'max': doc['max']}
channels = set()
if 'tpc' in detectors:

if 'tpc' in latest_status[logical]['detectors'].keys():
# figure out which channels weren't running
readers = list(self.latest_status[detectors]['readers'].keys())
for doc in self.collections['node_status'].find({'host': {'$in': readers}, 'number': int(number)}):
readers = list(self.latest_status[logical]['readers'].keys())
for doc in self.collections['node_status'].find({'host': {'$in': readers},'number': int(number)}):
channels |= set(map(int, doc['channels'].keys()))
updates = {'rate': rate}
if len(channels):
Expand All @@ -589,25 +610,25 @@ def set_stop_time(self, number, detectors, force):
self.logger.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}")
return

def get_ack_time(self, detector, command, recurse=True):
def get_ack_time(self, logical, command, recurse=True):
'''
Finds the time when specified detector's crate controller ack'd the specified command
'''
# the first cc is the "master", so its ack time is what counts
cc = list(self.latest_status[detector]['controller'].keys())[0]
cc = list(self.latest_status[logical]['controller'].keys())[0]
query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command}
sort = [('_id', -1)]
doc = self.collections['outgoing_commands'].find_one(query, sort=sort)
dt = (now() - doc['acknowledged'][cc].replace(tzinfo=pytz.utc)).total_seconds()
if dt > 30: # TODO make this a config value
if recurse:
# No way we found the correct command here, maybe we're too soon
self.logger.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?')
self.logger.debug(f'Most recent ack for {logical}-{command} is {dt:.1f}?')
time.sleep(2) # if in doubt
return self.get_ack_time(detector, command, False)
return self.get_ack_time(logical, command, False)
else:
# Welp
self.logger.debug(f'No recent ack time for {detector}-{command}')
self.logger.debug(f'No recent ack time for {logical}-{command}')
return None
return doc['acknowledged'][cc]

Expand Down Expand Up @@ -763,35 +784,35 @@ def get_run_start(self, number):
return self.run_start_cache[str(number)]
return None

def insert_run_doc(self, detector):
def insert_run_doc(self, logical):

if (number := self.get_next_run_number()) == NO_NEW_RUN:
self.logger.error("DB having a moment")
return -1
# the rundoc gets the physical detectors, not the logical
detectors = self.latest_status[detector]['detectors']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to complain in this line about detector. Should it be logical?


det = list(detectors.keys())[0]:
run_doc = {
"number": number,
'detectors': detectors,
'user': self.goal_state[detector]['user'],
'mode': self.goal_state[detector]['mode'],
'user': self.goal_state[det]['user'],
'mode': self.goal_state[det]['mode'],
'bootstrax': {'state': None},
'end': None
}

# If there's a source add the source. Also add the complete ini file.
cfg = self.get_run_mode(self.goal_state[detector]['mode'])
cfg = self.get_run_mode(self.goal_state[det]['mode'])
if cfg is not None and 'source' in cfg.keys():
run_doc['source'] = str(cfg['source'])
run_doc['daq_config'] = cfg

# If the user started the run with a comment add that too
if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "":
if "comment" in self.goal_state[det] and self.goal_state[det]['comment'] != "":
run_doc['comments'] = [{
"user": self.goal_state[detector]['user'],
"user": self.goal_state[det]['user'],
"date": now(),
"comment": self.goal_state[detector]['comment']
"comment": self.goal_state[det]['comment']
}]

# Make a data entry so bootstrax can find the thing
Expand All @@ -805,7 +826,7 @@ def insert_run_doc(self, detector):
# The cc needs some time to get started
time.sleep(self.cc_start_wait)
try:
start_time = self.get_ack_time(detector, 'start')
start_time = self.get_ack_time(logical, 'start')
except Exception as e:
self.logger.error('Couldn\'t find start time ack')
start_time = None
Expand Down
4 changes: 2 additions & 2 deletions dispatcher/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ MasterDAQConfig = {
"neutron_veto": {
"controller": ["reader6_controller_0"],
"readers": ["reader6_reader_0", "reader6_reader_1"]
}
}
}
}

# Addresses for the VME crates
VMEConfig = {
Expand Down
22 changes: 11 additions & 11 deletions dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,25 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc,
# Get most recent goal state from database. Users will update this from the website.
if (goal_state := mc.get_wanted_state()) is None:
continue
# Get the Super-Detector configuration
current_config = mc.get_super_detector()
# Get the Logical Detector configuration
current_config = mc.get_logical_detector()
# Get most recent check-in from all connected hosts
if (latest_status := mc.get_update(current_config)) is None:
continue

# Print an update
for detector in latest_status.keys():
state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE'
msg = (f'The {detector} should be {state} and is '
f'{latest_status[detector]["status"].name}')
if latest_status[detector]['number'] != -1:
msg += f' ({latest_status[detector]["number"]})'
logger.debug(msg)
for logical in latest_status.keys():
for det in latest_status[logical]['detectors'].keys():
state = 'ACTIVE' if goal_state[det]['active'] == 'true' else 'INACTIVE'
msg = (f'{logical} {det} should be {state}: '
f'logical detector is {latest_status[logical]["status"]}, '
f'physical detector is {latest_status[logical]["detectors"][det]["status"]}')
if latest_status[logical]['number'] != -1:
msg += f' ({latest_status[logical]["number"]})'
logger.debug(msg)
msg = (f"Linking: tpc-mv: {mc.is_linked('tpc', 'muon_veto')}, "
f"tpc-nv: {mc.is_linked('tpc', 'neutron_veto')}, "
f"mv-nv: {mc.is_linked('muon_veto', 'neutron_veto')}")
logger.debug(msg)

# Decision time. Are we actually in our goal state? If not what should we do?
dc.solve_problem(latest_status, goal_state)

Expand Down
Loading