From afb786404c8cc8fcbabbc6a3f60dcd3fbea1c74d Mon Sep 17 00:00:00 2001 From: mfioren Date: Wed, 5 Feb 2025 10:48:22 +0000 Subject: [PATCH 1/5] updated dbesm sim --- simulators/dbesm/__init__.py | 211 ++++++++++++++++- tests/test_dbesm.py | 437 ++++++++++++++++++++++++++++++++++- 2 files changed, 642 insertions(+), 6 deletions(-) diff --git a/simulators/dbesm/__init__.py b/simulators/dbesm/__init__.py index 0fdf2b1..11c3644 100644 --- a/simulators/dbesm/__init__.py +++ b/simulators/dbesm/__init__.py @@ -34,7 +34,13 @@ class System(ListeningSystem): 'DBE GETCFG': '_get_cfg', 'DBE SETDBEATT': '_set_dbeatt', 'DBE GETDBEATT': '_get_dbeatt', - 'DBE FIRM': '_get_firm' + 'DBE GETFIRM': '_get_firm', + 'DBE SETDBEAMP': '_set_dbeamp', + 'DBE GETDBEAMP': '_get_dbeamp', + 'DBE SETDBEEQ': '_set_dbeeq', + 'DBE GETDBEEQ': '_get_dbeeq', + 'DBE SETDBEBPF': '_set_dbebpf', + 'DBE GETDBEBPF': '_get_dbebpf' } errors = { @@ -546,6 +552,209 @@ def _get_firm(self, params): retval += f'Prog=DBESM, Rev=rev {selected_board["FIRM"]}' return retval + '\x0D\x0A' + def _set_dbeamp(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 3: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.amps_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["AMP"]) + print(board["AMP"][a[out_idx[b_idx]]]) + if (float(params[2]) not in [0,1]): + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} value out of range\n') + elif board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + board["AMP"][a[out_idx[b_idx]]] = float(params[2]) + retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' + return retval[:-1] + '\x0D\x0A' + + def _get_dbeamp(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 2: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.amps_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["AMP"]) + print(board["AMP"][a[out_idx[b_idx]]]) + + if board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + retval += (f'ACK {params[1]} BOARD {board["Address"]} ' + f'AMP {a[out_idx[b_idx]]} VALUE ' + f'{board["AMP"][a[out_idx[b_idx]]]}\n') + return retval[:-1] + '\x0D\x0A' + + + def _set_dbeeq(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 3: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.eqs_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["EQ"]) + print(board["EQ"][a[out_idx[b_idx]]]) + if (float(params[2]) not in [0,1]): + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} value out of range\n') + elif board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + board["EQ"][a[out_idx[b_idx]]] = float(params[2]) + retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' + return retval[:-1] + '\x0D\x0A' + + def _get_dbeeq(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 2: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.eqs_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["EQ"]) + print(board["EQ"][a[out_idx[b_idx]]]) + + if board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + retval += (f'ACK {params[1]} BOARD {board["Address"]} ' + f'EQ {a[out_idx[b_idx]]} VALUE ' + f'{board["EQ"][a[out_idx[b_idx]]]}\n') + return retval[:-1] + '\x0D\x0A' + + + def _set_dbebpf(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 3: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.bpfs_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["BPF"]) + print(board["BPF"][a[out_idx[b_idx]]]) + if (float(params[2]) not in [0,1]): + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} value out of range\n') + elif board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + board["BPF"][a[out_idx[b_idx]]] = float(params[2]) + retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' + return retval[:-1] + '\x0D\x0A' + + def _get_dbebpf(self, params): + retval = '' + boardx = {} + selected_boards = [] + out_idx = [] + if len(params) != 2: + return self._error(params[0], 1001) + b, d = zip(*self.outputs) + for i, v in enumerate(d): + if v == params[1]: + out_idx.append(i) + boardx = next((sub for sub in self.boards + if int(sub['Address']) == int(b[i])), None) + selected_boards.append(boardx) + if len(selected_boards) == 0: + return self._error(params[0], 1015) + else: + brd, a = zip(*self.bpfs_in_boards) + for board in selected_boards: + b_idx = selected_boards.index(board) + print(brd) + print(board["BPF"]) + print(board["BPF"][a[out_idx[b_idx]]]) + + if board["Status"] != 0: + retval += (f'ERR DBE {params[1]} BOARD ' + f'{board["Address"]} unreachable\n') + else: + retval += (f'ACK {params[1]} BOARD {board["Address"]} ' + f'BPF {a[out_idx[b_idx]]} VALUE ' + f'{board["BPF"][a[out_idx[b_idx]]]}\n') + return retval[:-1] + '\x0D\x0A' + def _error(self, device_code, error_code, board_address=None): error_string = self.errors.get(error_code) if error_code == 1001: diff --git a/tests/test_dbesm.py b/tests/test_dbesm.py index 8ff1281..3cd5779 100644 --- a/tests/test_dbesm.py +++ b/tests/test_dbesm.py @@ -303,14 +303,14 @@ def test_nak_mode_NotInt(self, obs_mode='MFS_7'): self.assertEqual(response, 'NAK unknown command\x0D\x0A') def test_firm_ok(self, board=0): - message = f"DBE FIRM BOARD {board}\x0D\x0A" + message = f"DBE GETFIRM BOARD {board}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, 'ACK\nBOARD [0-9]+ ' 'Prog=DBESM, Rev=rev [0-9]+.[0-9]+_[A-Za-z]+_[A-Za-z]+\r\n') def test_firm_boardErr(self, board=0): - message = f"DBE FIRM BOARD {board}\x0D\x0A" + message = f"DBE GETFIRM BOARD {board}\x0D\x0A" self._disable_board(board) response = self._send(message) print(response) @@ -318,20 +318,20 @@ def test_firm_boardErr(self, board=0): 'unreachable\x0D\x0A') def test_firm_noBoard(self, board=999): - message = f"DBE FIRM BOARD {board}\x0D\x0A" + message = f"DBE GETFIRM BOARD {board}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE BOARD {board} ' 'not existing\x0D\x0A') def test_nak_firm_missing(self): - message = "DBE FIRM BOARD\x0D\x0A" + message = "DBE GETFIRM BOARD\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') def test_nak_firm_NotInt(self): - message = "DBE FIRM BOARD BOARD\x0D\x0A" + message = "DBE GETFIRM BOARD BOARD\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') @@ -997,5 +997,432 @@ def test_nak_getdbeatt(self): self.assertEqual(response, 'NAK unknown command\x0D\x0A') +# SETDBEAMP + + + def test_setdbeamp_single_on(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeamp_single_off(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEAMP {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeamp_mult_on(self, out_dbe='prova'): + message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeamp_mult_off(self, out_dbe='prova'): + message = f"DBE SETDBEAMP {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeamp_single_IntValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEAMP {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeamp_single_FloatValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEAMP {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeamp_mult_IntValErr(self, out_dbe='prova'): + message = f"DBE SETDBEAMP {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeamp_mult_FloatValErr(self, out_dbe='prova'): + message = f"DBE SETDBEAMP {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_setdbeamp_outErr(self, out_dbe='NOTHING'): + message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbeamp_boardValErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEAMP {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' value out of range\r\n') + + def test_nak_setdbeamp(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + +# GETDBEAMP + + def test_getdbeamp_single(self, out_dbe='1_DBBC2'): + message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbeamp_mult(self, out_dbe='prova'): + message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_getdbeamp_outErr(self, out_dbe='NOTHING'): + message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_getdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_nak_getdbeamp(self): + message = "DBE GETDBEAMP\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + + +# SETDBEEQ + + + def test_setdbeeq_single_on(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeeq_single_off(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEEQ {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeeq_mult_on(self, out_dbe='prova'): + message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeeq_mult_off(self, out_dbe='prova'): + message = f"DBE SETDBEEQ {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbeeq_single_IntValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEEQ {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeeq_single_FloatValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEEQ {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeeq_mult_IntValErr(self, out_dbe='prova'): + message = f"DBE SETDBEEQ {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeeq_mult_FloatValErr(self, out_dbe='prova'): + message = f"DBE SETDBEEQ {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_setdbeeq_outErr(self, out_dbe='NOTHING'): + message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbeeq_boardValErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEEQ {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' value out of range\r\n') + + def test_nak_setdbeeq(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + +# GETDBEEQ + + def test_getdbeeq_single(self, out_dbe='1_DBBC2'): + message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbeeq_mult(self, out_dbe='prova'): + message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_getdbeeq_outErr(self, out_dbe='NOTHING'): + message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_getdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_nak_getdbeeq(self): + message = "DBE GETDBEEQ\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + + +# SETDBEBPF + + + def test_setdbebpf_single_on(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbebpf_single_off(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEBPF {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbebpf_mult_on(self, out_dbe='prova'): + message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbebpf_mult_off(self, out_dbe='prova'): + message = f"DBE SETDBEBPF {out_dbe} 0\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + self.assertRegex(response, f'DBE {out_dbe} BOARD [0-9]+ ACK\r\n') + + def test_setdbebpf_single_IntValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEBPF {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbebpf_single_FloatValErr(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEBPF {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbebpf_mult_IntValErr(self, out_dbe='prova'): + message = f"DBE SETDBEBPF {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbebpf_mult_FloatValErr(self, out_dbe='prova'): + message = f"DBE SETDBEBPF {out_dbe} 1.5\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' + '[0-9]+ value out of range\r\n') + + def test_setdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_setdbebpf_outErr(self, out_dbe='NOTHING'): + message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_setdbebpf_boardValErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE SETDBEBPF {out_dbe} 2\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' value out of range\r\n') + + def test_nak_setdbebpf(self, out_dbe='1_DBBC2'): + message = f"DBE SETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + +# GETDBEBPF + + def test_getdbebpf_single(self, out_dbe='1_DBBC2'): + message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbebpf_mult(self, out_dbe='prova'): + message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' + 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + + def test_getdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertRegex(response, f'ERR DBE {out_dbe} BOARD {err_board}' + ' unreachable\r\n') + + def test_getdbebpf_outErr(self, out_dbe='NOTHING'): + message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_getdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=3): + self._disable_board(err_board) + message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'ERR DBE Output not existing\r\n') + + def test_nak_getdbebpf(self): + message = "DBE GETDBEBPF\x0D\x0A" + response = self._send(message) + print(response) + self.assertEqual(response, 'NAK unknown command\x0D\x0A') + + + if __name__ == '__main__': unittest.main() From 81976fcd3ece0f0cdeafcadbe778dfb32111b60e Mon Sep 17 00:00:00 2001 From: mfioren Date: Wed, 5 Feb 2025 19:06:03 +0000 Subject: [PATCH 2/5] Updated dbesm sim, test --- simulators/dbesm/__init__.py | 183 ++++++++++++++++------------- tests/test_dbesm.py | 215 +++++++++++++++++++---------------- 2 files changed, 221 insertions(+), 177 deletions(-) diff --git a/simulators/dbesm/__init__.py b/simulators/dbesm/__init__.py index 11c3644..6f02792 100644 --- a/simulators/dbesm/__init__.py +++ b/simulators/dbesm/__init__.py @@ -70,20 +70,27 @@ class System(ListeningSystem): 'MFS_7', ] - out_boards = ['0', '0', '0', '1', '1', '2', '2', '3', '3'] + out_boards = ['1', '1', '1', '2', '2', '3', '3', '4', '4'] out_dbe = ['1_DBBC2', 'prova', 'SARDA_01', 'prova', 'prova2', 'Space_Debris', 'prova', 'prova2', 'SARDA_14'] outputs = list(zip(out_boards, out_dbe)) out_att = [11, 3, 2, 1, 8, 1, 5, 16, 7] + out_amp = [3, 9, 1, 1, 1, 8, 6, 5, 4] + out_eq = [5, 6, 4, 3, 3, 9, 4, 7, 1] + out_bpf = [7, 5, 6, 5, 5, 2, 6, 9, 1] atts_in_boards = list(zip(out_boards, out_att)) + amps_in_boards = list(zip(out_boards, out_amp)) + eqs_in_boards = list(zip(out_boards, out_eq)) + bpfs_in_boards = list(zip(out_boards, out_bpf)) def __init__(self): self.msg = '' # Status -1 -> board not available + # Status >1 -> temp sensor not present (timeout) self.boards = [ { - 'Address': '3', + 'Address': '1', "Status": 0, "Configuration": "default", "REG": self._init_reg(), @@ -111,7 +118,7 @@ def __init__(self): "FIRM": "0.116_NEW_win" }, { - 'Address': '1', + 'Address': '3', "Status": 0, "Configuration": "default", "REG": self._init_reg(), @@ -125,7 +132,7 @@ def __init__(self): "FIRM": "0.116_NEW_win" }, { - 'Address': '0', + 'Address': '4', "Status": 0, "Configuration": "default", "REG": self._init_reg(), @@ -150,7 +157,7 @@ def _init_att(self): return att def parse(self, byte): - print(byte) + #print(byte) if byte == self.tail: msg = self.msg[:-1] self.msg = '' @@ -197,12 +204,13 @@ def _set_allmode(self, params): return self._error(params[0], 1001) elif params[1] not in self.obs_mode: return self._error(params[0], 1003) - for key in self.boards: - if key['Status'] != 0: - retval += f'BOARD {key["Address"]} ERR DBE BOARD unreachable\n' + for board in self.boards: + if board['Status'] == 1: + retval += (f'BOARD {self.boards.index(board)+1} ' + f'ERR DBE BOARD unreachable\n') else: - key["Configuration"] = params[1] - retval += f'BOARD {key["Address"]} ACK\n' + board["Configuration"] = params[1] + retval += f'BOARD {self.boards.index(board)+1} ACK\n' return retval[:-1] + '\x0D\x0A' def _set_mode(self, params): @@ -210,7 +218,7 @@ def _set_mode(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -218,7 +226,7 @@ def _set_mode(self, params): return self._error(params[0], 1007, params[2]) elif params[3] not in self.obs_mode: return self._error(params[0], 1003) - elif selected_board["Status"] != 0: + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) else: selected_board["Configuration"] = params[3] @@ -230,9 +238,9 @@ def _store_allmode(self, params): return self._error(params[0], 1001) elif params[1] in self.obs_mode: return self._error(params[0], 1008) - for key in self.boards: - if key['Status'] != 0: - err.append(key['Address']) + for board in self.boards: + if board['Status'] == 1: + err.append(str(self.boards.index(board)+1)) if len(err) > 0: if len(err) == 1: return self._error(params[0], 1005, err[0]) @@ -256,16 +264,16 @@ def _get_status(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) if selected_board is None: return self._error(params[0], 1007, params[2]) - elif selected_board["Status"] != 0: + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) retval = self.ack + '\n' - retval += f'BOARD {selected_board["Address"]}\n\n' + retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' retval += f'REG=[ {" ".join(map(str,selected_board["REG"]))} ]\n\n' retval += f'ATT=[ {" ".join(map(str,selected_board["ATT"])) } ]' return retval + '\x0D\x0A' @@ -275,14 +283,14 @@ def _set_att(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[3])), None) + if (self.boards.index(sub)+1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(0, 17)): return self._error(params[0], 1010, params[1]) elif float(params[5]) not in list(numpy.arange(0, 32, 0.5)): - return self._error(params[0], 1011, selected_board["Address"]) - elif selected_board["Status"] != 0: + return self._error(params[0], 1011, params[3]) + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[3]) else: selected_board["ATT"][int(params[1])] = float(params[5]) @@ -295,14 +303,14 @@ def _set_amp(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[3])), None) + if (self.boards.index(sub)+1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(1, 11)): return self._error(params[0], 1012, params[1]) elif float(params[5]) not in [0, 1]: - return self._error(params[0], 1011, selected_board["Address"]) - elif selected_board["Status"] != 0: + return self._error(params[0], 1011, params[3]) + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[3]) else: selected_board["AMP"][int(params[1]) - 1] = params[5] @@ -315,14 +323,14 @@ def _set_eq(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[3])), None) + if (self.boards.index(sub)+1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(1, 11)): return self._error(params[0], 1013, params[1]) elif float(params[5]) not in [0, 1]: - return self._error(params[0], 1011, selected_board["Address"]) - elif selected_board["Status"] != 0: + return self._error(params[0], 1011, params[3]) + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[3]) else: selected_board["EQ"][int(params[1]) - 1] = params[5] @@ -336,7 +344,7 @@ def _set_bpf(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[3])), None) + if (self.boards.index(sub)+1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif params[1] not in channels: # accept 1, 1a NOT a1, 1aa, a, 1a2 @@ -354,8 +362,8 @@ def _set_bpf(self, params): else: return self._error(params[0], 1014, params[1]) elif float(params[5]) not in [0, 1]: - return self._error(params[0], 1011, selected_board["Address"]) - elif selected_board["Status"] != 0: + return self._error(params[0], 1011, params[3]) + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[3]) else: channel = -1 @@ -376,13 +384,17 @@ def _all_diag(self, params): return self._error(params[0], 1001) else: for board in self.boards: - retval += f'BOARD {board["Address"]} ' - if board['Status'] == 0: + retval += f'BOARD {self.boards.index(board)+1} ' + if board["Status"] == 0: retval += self.ack + '\n' retval += f'5V {board["5V"]} 3V3 {board["3V3"]}\n' retval += f'T0 {board["T0"]}\n\n' - else: + elif int(board["Status"]) == 1: retval += 'ERR DBE BOARD unreachable\n\n' + elif int(board["Status"]) > 1: + retval += self.ack + '\n' + retval += f'5V {board["5V"]} 3V3 {board["3V3"]}\n' + retval += f'temp sensor not present\n\n' return retval[:-2] + '\x0D\x0A' def _diag(self, params): @@ -390,20 +402,26 @@ def _diag(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) if selected_board is None: return self._error(params[0], 1007, params[2]) - elif selected_board["Status"] != 0: + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) - else: + elif selected_board["Status"] == 0: retval = self.ack + '\n' - retval += f'BOARD {selected_board["Address"]}\n\n' + retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' retval += f'5V {selected_board["5V"]}' retval += f' 3V3 {selected_board["3V3"]}\n' retval += f'T0 {selected_board["T0"]}' + elif selected_board["Status"] > 1: + retval = self.ack + '\n' + retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' + retval += f'5V {selected_board["5V"]}' + retval += f' 3V3 {selected_board["3V3"]}\n' + retval += f'temp sensor not present' return retval + '\x0D\x0A' def _set_status(self, params): @@ -411,7 +429,7 @@ def _set_status(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) if selected_board is None: return self._error(params[0], 1007, params[2]) @@ -426,17 +444,17 @@ def _get_comp(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) if selected_board is None: return self._error(params[0], 1007, params[2]) - elif selected_board["Status"] != 0: + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) else: retval = 'ACK\n' - retval += f'BOARD {params[2]}\n\n' + retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' retval += f'AMP=[ {" ".join(map(str,selected_board["AMP"]))} ]\n' retval += f'EQ=[ {" ".join(map(str,selected_board["EQ"]))} ]\n' retval += f'BPF=[ {" ".join(map(str,selected_board["BPF"])) } ]' @@ -449,8 +467,8 @@ def _get_cfg(self, params): else: retval += self.ack + '\n' for board in self.boards: - retval += f'BOARD {board["Address"]} ' - if board['Status'] == 0: + retval += f'BOARD {self.boards.index(board)+1} ' + if (board['Status'] == 0 or (board['Status'] > 1)): retval += f'{board["Configuration"]}\n\n' else: retval += 'ERR DBE BOARD unreachable\n\n' @@ -476,28 +494,29 @@ def _set_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["ATT"]) - print(board["ATT"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["ATT"]) + #print(board["ATT"][a[out_idx[b_idx]]]) if ((params[2] == '+3' or params[2] == '-3') and - not (0 <= (float(board["ATT"][a[out_idx[b_idx]]]) + - float(params[2])) <= 31.5)): + not (0 <= (float(board["ATT"][a[out_idx[b_idx]]]) + + float(params[2])) <= 31.5)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} value out of range\n') + f'{self.boards.index(board)+1} value out of range\n') elif (params[2] != '+3' and params[2] != '-3' and float(params[2]) not in list(numpy.arange(0, 32, 0.5))): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} value out of range\n') - elif board["Status"] != 0: + f'{self.boards.index(board)+1} value out of range\n') + elif ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: if params[2] == '+3' or params[2] == '-3': board["ATT"][a[out_idx[b_idx]]] += float(params[2]) else: board["ATT"][a[out_idx[b_idx]]] = float(params[2]) - retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' + retval += (f'DBE {params[1]} BOARD ' + f'{self.boards.index(board)+1} ACK\n') return retval[:-1] + '\x0D\x0A' def _get_dbeatt(self, params): @@ -520,17 +539,17 @@ def _get_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["ATT"]) - print(board["ATT"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["ATT"]) + #print(board["ATT"][a[out_idx[b_idx]]]) - if board["Status"] != 0: + if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: - retval += (f'ACK {params[1]} BOARD {board["Address"]} ' - f'ATT {a[out_idx[b_idx]]} VALUE ' - f'{board["ATT"][a[out_idx[b_idx]]]}\n') + retval += (f'ACK {params[1]} BOARD ' + f'{self.boards.index(board)+1} ATT {a[out_idx[b_idx]]} ' + f'VALUE {board["ATT"][a[out_idx[b_idx]]]}\n') return retval[:-1] + '\x0D\x0A' def _get_firm(self, params): @@ -538,17 +557,17 @@ def _get_firm(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if int(sub['Address']) == int(params[2])), None) + if (self.boards.index(sub)+1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) if selected_board is None: return self._error(params[0], 1007, params[2]) - elif selected_board["Status"] != 0: + elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) else: retval = self.ack + '\n' - retval += f'BOARD {selected_board["Address"]} ' + retval += f'BOARD {self.boards.index(selected_board)} ' retval += f'Prog=DBESM, Rev=rev {selected_board["FIRM"]}' return retval + '\x0D\x0A' @@ -572,9 +591,9 @@ def _set_dbeamp(self, params): brd, a = zip(*self.amps_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["AMP"]) - print(board["AMP"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["AMP"]) + #print(board["AMP"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0,1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{board["Address"]} value out of range\n') @@ -606,9 +625,9 @@ def _get_dbeamp(self, params): brd, a = zip(*self.amps_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["AMP"]) - print(board["AMP"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["AMP"]) + #print(board["AMP"][a[out_idx[b_idx]]]) if board["Status"] != 0: retval += (f'ERR DBE {params[1]} BOARD ' @@ -640,9 +659,9 @@ def _set_dbeeq(self, params): brd, a = zip(*self.eqs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["EQ"]) - print(board["EQ"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["EQ"]) + #print(board["EQ"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0,1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{board["Address"]} value out of range\n') @@ -674,9 +693,9 @@ def _get_dbeeq(self, params): brd, a = zip(*self.eqs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["EQ"]) - print(board["EQ"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["EQ"]) + #print(board["EQ"][a[out_idx[b_idx]]]) if board["Status"] != 0: retval += (f'ERR DBE {params[1]} BOARD ' @@ -708,9 +727,9 @@ def _set_dbebpf(self, params): brd, a = zip(*self.bpfs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["BPF"]) - print(board["BPF"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["BPF"]) + #print(board["BPF"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0,1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{board["Address"]} value out of range\n') @@ -742,9 +761,9 @@ def _get_dbebpf(self, params): brd, a = zip(*self.bpfs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - print(brd) - print(board["BPF"]) - print(board["BPF"][a[out_idx[b_idx]]]) + #print(brd) + #print(board["BPF"]) + #print(board["BPF"][a[out_idx[b_idx]]]) if board["Status"] != 0: retval += (f'ERR DBE {params[1]} BOARD ' diff --git a/tests/test_dbesm.py b/tests/test_dbesm.py index 3cd5779..8a08fc5 100644 --- a/tests/test_dbesm.py +++ b/tests/test_dbesm.py @@ -17,8 +17,12 @@ def _disable_board(self, err_board=3): disable_msg = f'DBE SETSTATUS BOARD {err_board} VALUE 1\x0D\x0A' self._send(disable_msg) + def _noTemp_board(self, err_board=3): + noTemp_msg = f'DBE SETSTATUS BOARD {err_board} VALUE 2\x0D\x0A' + self._send(noTemp_msg) + def _test_all_boards(self, response, err_board=999, diag=False): - for board in range(0, 4): + for board in range(1, 5): if diag: if board != err_board: self.assertRegex(response, f'BOARD {board} ACK\n5V ' @@ -59,43 +63,43 @@ def test_nak_all_2spaces(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_set_2spaces(self, ch=0, board=0, val=0.0): + def test_nak_set_2spaces(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_set_3spaces(self, ch=0, board=0, val=0.0): + def test_nak_set_3spaces(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_set_space_end(self, ch=0, board=0, val=0.0): + def test_nak_set_space_end(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val} \x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_set_2spaces_end(self, ch=0, board=0, val=0.0): + def test_nak_set_2spaces_end(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val} \x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_set_3spaces_end(self, ch=0, board=0, val=0.0): + def test_nak_set_3spaces_end(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val} \x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_mispell1(self, ch=0, board=0, val=0.0): + def test_nak_mispell1(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} OARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_mispell2(self, ch=0, board=0, val=0.0): + def test_nak_mispell2(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALE {val}\x0D\x0A" response = self._send(message) print(response) @@ -116,7 +120,7 @@ def test_setstatus_noBoard(self, board=999, val=1): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'not existing\x0D\x0A') - def test_nak_setstaus_missing(self, board=0): + def test_nak_setstatus_missing(self, board=1): message = f"DBE SETSTATUS BOARD {board} VALUE\x0D\x0A" response = self._send(message) print(response) @@ -128,7 +132,7 @@ def test_nak_setstaus_notIntBoard(self, board='BOARD', val=1): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_nak_setstaus_notIntVal(self, board=0, val='VALUE'): + def test_nak_setstaus_notIntVal(self, board=1, val='VALUE'): message = f"DBE SETSTATUS BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) @@ -151,6 +155,19 @@ def test_all_diag_boardErr(self, err_board=3): print(response) self._test_all_boards(response, err_board, True) + def test_all_diag_noTemp(self, err_board=3): + message = "DBE ReadALLDIAG\x0D\x0A" + self._noTemp_board(err_board) + response = self._send(message) + print(response) + for board in range(1, 5): + if board != err_board: + self.assertRegex(response, f'BOARD {board} ACK\n5V ' + '[0-9][.][0-9]+ 3V3 [0-9][.][0-9]+\nT0 [0-9]+[.][0-9]+') + else: + self.assertRegex(response, f'BOARD {board} ACK\n5V ' + '[0-9][.][0-9]+ 3V3 [0-9][.][0-9]+\ntemp sensor not present') + def test_nak_alldiag(self): message = "DBE ReadALLDIAG ?????\x0D\x0A" response = self._send(message) @@ -232,8 +249,8 @@ def test_storeallmode_2boardErr(self, obs_mode='CustomCfg', self._disable_board(err_board2) response = self._send(message) print(response) - self.assertEqual(response, f'ERR DBE BOARD {err_board1} ' - f'{err_board2} unreachable\x0D\x0A') + self.assertEqual(response, f'ERR DBE BOARD {err_board2} ' + f'{err_board1} unreachable\x0D\x0A') def test_storeallmode_cfgErr(self, obs_mode='MFS_7'): message = f"DBE STOREALLMODE {obs_mode}\x0D\x0A" @@ -256,13 +273,13 @@ def test_nak_storeall_missing(self): # MODE, FIRM - def test_mode_ok(self, board=0, obs_mode='MFS_7'): + def test_mode_ok(self, board=1, obs_mode='MFS_7'): message = f"DBE MODE BOARD {board} {obs_mode}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_mode_boardErr(self, board=0, obs_mode='MFS_7'): + def test_mode_boardErr(self, board=1, obs_mode='MFS_7'): message = f"DBE MODE BOARD {board} {obs_mode}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -270,13 +287,13 @@ def test_mode_boardErr(self, board=0, obs_mode='MFS_7'): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'unreachable\x0D\x0A') - def test_mode_cfgErr(self, board=0, obs_mode='PIPPO'): + def test_mode_cfgErr(self, board=1, obs_mode='PIPPO'): message = f"DBE MODE BOARD {board} {obs_mode}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE CFG file not existing\x0D\x0A') - def test_mode_bothErr(self, board=0, obs_mode='PIPPO'): + def test_mode_bothErr(self, board=1, obs_mode='PIPPO'): message = f"DBE MODE BOARD {board} {obs_mode}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -302,14 +319,14 @@ def test_nak_mode_NotInt(self, obs_mode='MFS_7'): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_firm_ok(self, board=0): + def test_firm_ok(self, board=1): message = f"DBE GETFIRM BOARD {board}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, 'ACK\nBOARD [0-9]+ ' 'Prog=DBESM, Rev=rev [0-9]+.[0-9]+_[A-Za-z]+_[A-Za-z]+\r\n') - def test_firm_boardErr(self, board=0): + def test_firm_boardErr(self, board=1): message = f"DBE GETFIRM BOARD {board}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -338,14 +355,14 @@ def test_nak_firm_NotInt(self): # DIAG, GETSTATUS, GETCOMP - def test_diag_ok(self, board=0): + def test_diag_ok(self, board=1): message = f"DBE ReadDIAG BOARD {board}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, f'ACK\nBOARD {board}\n\n5V [0-9][.][0-9]+ ' '3V3 [0-9][.][0-9]+\nT0 [0-9]+[.][0-9]+\r\n') - def test_diag_boardErr(self, board=0): + def test_diag_boardErr(self, board=1): message = f"DBE ReadDIAG BOARD {board}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -360,6 +377,14 @@ def test_diag_noBoard(self, board=999): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'not existing\x0D\x0A') + def test_diag_noTemp(self, board=1): + message = f"DBE ReadDIAG BOARD {board}\x0D\x0A" + self._noTemp_board(board) + response = self._send(message) + print(response) + self.assertRegex(response, f'ACK\nBOARD {board}\n\n5V [0-9][.][0-9]+ ' + '3V3 [0-9][.][0-9]+\ntemp sensor not present\r\n') + def test_nak_diag_missing(self): message = "DBE ReadDIAG BOARD\x0D\x0A" response = self._send(message) @@ -372,7 +397,7 @@ def test_nak_diag_NotInt(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_getstatus_ok(self, board=0): + def test_getstatus_ok(self, board=1): message = f"DBE GETSTATUS BOARD {board}\x0D\x0A" response = self._send(message) print(response) @@ -384,7 +409,7 @@ def test_getstatus_ok(self, board=0): '[0-9]+[.][05] [0-9]+[.][05] [0-9]+[.][05] [0-9]+[.][05] ' '[0-9]+[.][05] ]\r\n') - def test_getstatus_boardErr(self, board=0): + def test_getstatus_boardErr(self, board=1): message = f"DBE GETSTATUS BOARD {board}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -411,7 +436,7 @@ def test_nak_getstatus_NotInt(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_getcomp_ok(self, board=0): + def test_getcomp_ok(self, board=1): message = f"DBE GETCOMP BOARD {board}\x0D\x0A" response = self._send(message) print(response) @@ -424,7 +449,7 @@ def test_getcomp_ok(self, board=0): self.assertEqual(len(response[76:96].split(" ")), 11) self.assertEqual(len(response[75:96]), 21) - def test_getcomp_boardErr(self, board=0): + def test_getcomp_boardErr(self, board=1): message = f"DBE GETCOMP BOARD {board}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -453,43 +478,43 @@ def test_nak_getcomp_NotInt(self): # SETATT - def test_setatt_ok(self, ch=0, board=0, val=0.0): + def test_setatt_ok(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setatt_ok_2(self, ch=0, board=0, val=.5): + def test_setatt_ok_2(self, ch=0, board=1, val=.5): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setatt_ok_3(self, ch=0, board=0, val=1.000): + def test_setatt_ok_3(self, ch=0, board=1, val=1.000): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setatt_ok_4(self, ch=0, board=0, val=1): + def test_setatt_ok_4(self, ch=0, board=1, val=1): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setatt_chErr1(self, ch=17, board=0, val=0.0): + def test_setatt_chErr1(self, ch=17, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE ATT {ch} not existing\x0D\x0A') - def test_setatt_chErr2(self, ch=0.0, board=0, val=0.0): + def test_setatt_chErr2(self, ch=0.0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setatt_boardErr(self, ch=0, board=0, val=0.0): + def test_setatt_boardErr(self, ch=0, board=1, val=0.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -497,32 +522,32 @@ def test_setatt_boardErr(self, ch=0, board=0, val=0.0): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'unreachable\x0D\x0A') - def test_setatt_valErr1(self, ch=0, board=0, val=0.6): + def test_setatt_valErr1(self, ch=0, board=1, val=0.6): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setatt_valErr2(self, ch=0, board=0, val=32.0): + def test_setatt_valErr2(self, ch=0, board=1, val=32.0): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setatt_ChValErr(self, ch=17, board=0, val=0.6): + def test_setatt_ChValErr(self, ch=17, board=1, val=0.6): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE ATT {ch} not existing\x0D\x0A') - def test_setatt_BoardValErr(self, ch=1, board=0, val=0.6): + def test_setatt_BoardValErr(self, ch=1, board=1, val=0.6): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setatt_AllErr(self, ch=17, board=0, val=0.6): + def test_setatt_AllErr(self, ch=17, board=1, val=0.6): message = f"DBE SETATT {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -557,25 +582,25 @@ def test_nak_setatt_NotInt(self): # SETAMP - def test_setamp_ok(self, ch=1, board=0, val=0): + def test_setamp_ok(self, ch=1, board=1, val=0): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setamp_chErr1(self, ch=11, board=0, val=0): + def test_setamp_chErr1(self, ch=11, board=1, val=0): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE AMP {ch} not existing\x0D\x0A') - def test_setamp_chErr2(self, ch=1.0, board=0, val=0): + def test_setamp_chErr2(self, ch=1.0, board=1, val=0): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setamp_boardErr(self, ch=1, board=0, val=0): + def test_setamp_boardErr(self, ch=1, board=1, val=0): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -583,25 +608,25 @@ def test_setamp_boardErr(self, ch=1, board=0, val=0): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'unreachable\x0D\x0A') - def test_setamp_valErr1(self, ch=1, board=0, val=0.6): + def test_setamp_valErr1(self, ch=1, board=1, val=0.6): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setamp_valErr2(self, ch=1, board=0, val=2): + def test_setamp_valErr2(self, ch=1, board=1, val=2): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setamp_ChValErr(self, ch=11, board=0, val=2): + def test_setamp_ChValErr(self, ch=11, board=1, val=2): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE AMP {ch} not existing\x0D\x0A') - def test_setamp_AllErr(self, ch=11, board=0, val=2): + def test_setamp_AllErr(self, ch=11, board=1, val=2): message = f"DBE SETAMP {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -629,25 +654,25 @@ def test_nak_setamp_NotInt(self): # SETEQ - def test_seteq_ok(self, ch=1, board=0, val=0): + def test_seteq_ok(self, ch=1, board=1, val=0): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_seteq_chErr1(self, ch=11, board=0, val=0): + def test_seteq_chErr1(self, ch=11, board=1, val=0): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE EQ {ch} not existing\x0D\x0A') - def test_seteq_chErr2(self, ch=1.0, board=0, val=0): + def test_seteq_chErr2(self, ch=1.0, board=1, val=0): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_seteq_boardErr(self, ch=1, board=0, val=0): + def test_seteq_boardErr(self, ch=1, board=1, val=0): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -655,25 +680,25 @@ def test_seteq_boardErr(self, ch=1, board=0, val=0): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'unreachable\x0D\x0A') - def test_seteq_valErr1(self, ch=1, board=0, val=0.6): + def test_seteq_valErr1(self, ch=1, board=1, val=0.6): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_seteq_valErr2(self, ch=1, board=0, val=2): + def test_seteq_valErr2(self, ch=1, board=1, val=2): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_seteq_ChValErr(self, ch=11, board=0, val=2): + def test_seteq_ChValErr(self, ch=11, board=1, val=2): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE EQ {ch} not existing\x0D\x0A') - def test_seteq_AllErr(self, ch=11, board=0, val=2): + def test_seteq_AllErr(self, ch=11, board=1, val=2): message = f"DBE SETEQ {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -701,61 +726,61 @@ def test_nak_seteq_NotInt(self): # SETBPF - def test_setbpf_ok_1a(self, ch='1a', board=0, val=0): + def test_setbpf_ok_1a(self, ch='1a', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setbpf_ok_1b(self, ch='1b', board=0, val=0): + def test_setbpf_ok_1b(self, ch='1b', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setbpf_ok_2(self, ch='2', board=0, val=0): + def test_setbpf_ok_2(self, ch='2', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ACK\x0D\x0A') - def test_setbpf_chErr1(self, ch=11, board=0, val=0): + def test_setbpf_chErr1(self, ch=11, board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE BPF {ch} not existing\x0D\x0A') - def test_setbpf_chErr2(self, ch='111a', board=0, val=0): + def test_setbpf_chErr2(self, ch='111a', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE BPF {ch} not existing\x0D\x0A') - def test_setbpf_chErr3(self, ch='1a2', board=0, val=0): + def test_setbpf_chErr3(self, ch='1a2', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setbpf_chErr4(self, ch='a1', board=0, val=0): + def test_setbpf_chErr4(self, ch='a1', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setbpf_chErr5(self, ch='1aa', board=0, val=0): + def test_setbpf_chErr5(self, ch='1aa', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setbpf_chErr6(self, ch='a', board=0, val=0): + def test_setbpf_chErr6(self, ch='a', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - def test_setbpf_boardErr(self, ch='1a', board=0, val=0): + def test_setbpf_boardErr(self, ch='1a', board=1, val=0): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -763,25 +788,25 @@ def test_setbpf_boardErr(self, ch='1a', board=0, val=0): self.assertEqual(response, f'ERR DBE BOARD {board} ' 'unreachable\x0D\x0A') - def test_setbpf_valErr1(self, ch='1a', board=0, val=0.6): + def test_setbpf_valErr1(self, ch='1a', board=1, val=0.6): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setbpf_valErr2(self, ch='1a', board=0, val=2): + def test_setbpf_valErr2(self, ch='1a', board=1, val=2): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE value out of range\x0D\x0A') - def test_setbpf_ChValErr(self, ch=11, board=0, val=2): + def test_setbpf_ChValErr(self, ch=11, board=1, val=2): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, f'ERR DBE BPF {ch} not existing\x0D\x0A') - def test_setbpf_AllErr(self, ch=11, board=0, val=2): + def test_setbpf_AllErr(self, ch=11, board=1, val=2): message = f"DBE SETBPF {ch} BOARD {board} VALUE {val}\x0D\x0A" self._disable_board(board) response = self._send(message) @@ -916,7 +941,7 @@ def test_setdbeatt_mult_ValErr(self, out_dbe='prova'): self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - def test_setdbeatt_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeatt_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEATT {out_dbe} 1.0\x0D\x0A" response = self._send(message) @@ -930,14 +955,14 @@ def test_setdbeatt_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeatt_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_setdbeatt_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEATT {out_dbe} 1.0\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeatt_boardValErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeatt_boardValErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEATT {out_dbe} 32\x0D\x0A" response = self._send(message) @@ -969,7 +994,7 @@ def test_getdbeatt_mult(self, out_dbe='prova'): self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' 'ATT [0-9]+ VALUE [0-9]+.[0-9]+\r\n') - def test_getdbeatt_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_getdbeatt_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEATT {out_dbe}\x0D\x0A" response = self._send(message) @@ -983,7 +1008,7 @@ def test_getdbeatt_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_getdbeatt_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_getdbeatt_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEATT {out_dbe}\x0D\x0A" response = self._send(message) @@ -1058,7 +1083,7 @@ def test_setdbeamp_mult_FloatValErr(self, out_dbe='prova'): self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - def test_setdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1072,14 +1097,14 @@ def test_setdbeamp_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_setdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeamp_boardValErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeamp_boardValErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEAMP {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1100,18 +1125,18 @@ def test_getdbeamp_single(self, out_dbe='1_DBBC2'): response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'AMP [0-9]+ VALUE [0-9]+\r\n') def test_getdbeamp_mult(self, out_dbe='prova'): message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'AMP [0-9]+ VALUE [0-9]+\r\n') self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'AMP [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'AMP [0-9]+ VALUE [0-9]+\r\n') - def test_getdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_getdbeamp_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" response = self._send(message) @@ -1125,7 +1150,7 @@ def test_getdbeamp_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_getdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_getdbeamp_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEAMP {out_dbe}\x0D\x0A" response = self._send(message) @@ -1200,7 +1225,7 @@ def test_setdbeeq_mult_FloatValErr(self, out_dbe='prova'): self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - def test_setdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1214,14 +1239,14 @@ def test_setdbeeq_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_setdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbeeq_boardValErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbeeq_boardValErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEEQ {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1242,18 +1267,18 @@ def test_getdbeeq_single(self, out_dbe='1_DBBC2'): response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'EQ [0-9]+ VALUE [0-9]+\r\n') def test_getdbeeq_mult(self, out_dbe='prova'): message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'EQ [0-9]+ VALUE [0-9]+\r\n') self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'EQ [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'EQ [0-9]+ VALUE [0-9]+\r\n') - def test_getdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_getdbeeq_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" response = self._send(message) @@ -1267,7 +1292,7 @@ def test_getdbeeq_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_getdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_getdbeeq_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEEQ {out_dbe}\x0D\x0A" response = self._send(message) @@ -1342,7 +1367,7 @@ def test_setdbebpf_mult_FloatValErr(self, out_dbe='prova'): self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - def test_setdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1356,14 +1381,14 @@ def test_setdbebpf_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_setdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" response = self._send(message) print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_setdbebpf_boardValErr(self, out_dbe='SARDA_14', err_board=3): + def test_setdbebpf_boardValErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE SETDBEBPF {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1384,18 +1409,18 @@ def test_getdbebpf_single(self, out_dbe='1_DBBC2'): response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'BPF [0-9]+ VALUE [0-9]+\r\n') def test_getdbebpf_mult(self, out_dbe='prova'): message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" response = self._send(message) print(response) self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'BPF [0-9]+ VALUE [0-9]+\r\n') self.assertRegex(response, f'ACK {out_dbe} BOARD [0-9]+ ' - 'BPF [0-9]+ VALUE [0-9]+.[0-9]+\r\n') + 'BPF [0-9]+ VALUE [0-9]+\r\n') - def test_getdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=3): + def test_getdbebpf_boardErr(self, out_dbe='SARDA_14', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" response = self._send(message) @@ -1409,7 +1434,7 @@ def test_getdbebpf_outErr(self, out_dbe='NOTHING'): print(response) self.assertEqual(response, 'ERR DBE Output not existing\r\n') - def test_getdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=3): + def test_getdbebpf_boardOutErr(self, out_dbe='NOTHING', err_board=4): self._disable_board(err_board) message = f"DBE GETDBEBPF {out_dbe}\x0D\x0A" response = self._send(message) From 78ca87dc7a7ead5ad5a16139f78b44e30189eeeb Mon Sep 17 00:00:00 2001 From: mfioren Date: Wed, 5 Feb 2025 20:43:40 +0000 Subject: [PATCH 3/5] Updated and linted dbesm sim, test --- simulators/dbesm/__init__.py | 119 ++++++++++++++++++----------------- tests/test_dbesm.py | 18 ++---- 2 files changed, 65 insertions(+), 72 deletions(-) diff --git a/simulators/dbesm/__init__.py b/simulators/dbesm/__init__.py index 6f02792..26e599a 100644 --- a/simulators/dbesm/__init__.py +++ b/simulators/dbesm/__init__.py @@ -157,7 +157,7 @@ def _init_att(self): return att def parse(self, byte): - #print(byte) +# print(byte) if byte == self.tail: msg = self.msg[:-1] self.msg = '' @@ -218,7 +218,7 @@ def _set_mode(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -240,7 +240,7 @@ def _store_allmode(self, params): return self._error(params[0], 1008) for board in self.boards: if board['Status'] == 1: - err.append(str(self.boards.index(board)+1)) + err.append(str(self.boards.index(board) + 1)) if len(err) > 0: if len(err) == 1: return self._error(params[0], 1005, err[0]) @@ -264,7 +264,7 @@ def _get_status(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -283,7 +283,7 @@ def _set_att(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[3])), None) + if (self.boards.index(sub) + 1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(0, 17)): @@ -303,7 +303,7 @@ def _set_amp(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[3])), None) + if (self.boards.index(sub) + 1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(1, 11)): @@ -323,7 +323,7 @@ def _set_eq(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[3])), None) + if (self.boards.index(sub) + 1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif int(params[1]) not in list(range(1, 11)): @@ -344,7 +344,7 @@ def _set_bpf(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[3])), None) + if (self.boards.index(sub) + 1) == int(params[3])), None) if selected_board is None: return self._error(params[0], 1007, params[3]) elif params[1] not in channels: # accept 1, 1a NOT a1, 1aa, a, 1a2 @@ -394,7 +394,7 @@ def _all_diag(self, params): elif int(board["Status"]) > 1: retval += self.ack + '\n' retval += f'5V {board["5V"]} 3V3 {board["3V3"]}\n' - retval += f'temp sensor not present\n\n' + retval += 'temp sensor not present\n\n' return retval[:-2] + '\x0D\x0A' def _diag(self, params): @@ -402,7 +402,7 @@ def _diag(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -421,7 +421,7 @@ def _diag(self, params): retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' retval += f'5V {selected_board["5V"]}' retval += f' 3V3 {selected_board["3V3"]}\n' - retval += f'temp sensor not present' + retval += 'temp sensor not present' return retval + '\x0D\x0A' def _set_status(self, params): @@ -429,7 +429,7 @@ def _set_status(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) if selected_board is None: return self._error(params[0], 1007, params[2]) @@ -444,7 +444,7 @@ def _get_comp(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -486,7 +486,7 @@ def _set_dbeatt(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -494,9 +494,9 @@ def _set_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["ATT"]) - #print(board["ATT"][a[out_idx[b_idx]]]) +# print(brd) +# print(board["ATT"]) +# print(board["ATT"][a[out_idx[b_idx]]]) if ((params[2] == '+3' or params[2] == '-3') and not (0 <= (float(board["ATT"][a[out_idx[b_idx]]]) + float(params[2])) <= 31.5)): @@ -531,7 +531,7 @@ def _get_dbeatt(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -539,9 +539,9 @@ def _get_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["ATT"]) - #print(board["ATT"][a[out_idx[b_idx]]]) +# print(brd) +# print(board["ATT"]) +# print(board["ATT"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' @@ -557,7 +557,7 @@ def _get_firm(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub)+1) == int(params[2])), None) + if (self.boards.index(sub) +1 ) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -583,7 +583,7 @@ def _set_dbeamp(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -591,18 +591,19 @@ def _set_dbeamp(self, params): brd, a = zip(*self.amps_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["AMP"]) - #print(board["AMP"][a[out_idx[b_idx]]]) - if (float(params[2]) not in [0,1]): +# print(brd) +# print(board["AMP"]) +# print(board["AMP"][a[out_idx[b_idx]]]) + if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} value out of range\n') - elif board["Status"] != 0: + f'{self.boards.index(board)+1} value out of range\n') + elif ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: board["AMP"][a[out_idx[b_idx]]] = float(params[2]) - retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' + retval += (f'DBE {params[1]} BOARD ' + f'{self.boards.index(board)+1} ACK\n') return retval[:-1] + '\x0D\x0A' def _get_dbeamp(self, params): @@ -617,7 +618,7 @@ def _get_dbeamp(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -629,13 +630,13 @@ def _get_dbeamp(self, params): #print(board["AMP"]) #print(board["AMP"][a[out_idx[b_idx]]]) - if board["Status"] != 0: + if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: - retval += (f'ACK {params[1]} BOARD {board["Address"]} ' - f'AMP {a[out_idx[b_idx]]} VALUE ' - f'{board["AMP"][a[out_idx[b_idx]]]}\n') + retval += (f'ACK {params[1]} BOARD ' + f'{self.boards.index(board)+1} AMP {a[out_idx[b_idx]]} ' + f'VALUE {board["AMP"][a[out_idx[b_idx]]]}\n') return retval[:-1] + '\x0D\x0A' @@ -651,7 +652,7 @@ def _set_dbeeq(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -659,15 +660,15 @@ def _set_dbeeq(self, params): brd, a = zip(*self.eqs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["EQ"]) - #print(board["EQ"][a[out_idx[b_idx]]]) - if (float(params[2]) not in [0,1]): +# print(brd) +# print(board["EQ"]) +# print(board["EQ"][a[out_idx[b_idx]]]) + if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} value out of range\n') - elif board["Status"] != 0: + f'{self.boards.index(board)+1} value out of range\n') + elif ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: board["EQ"][a[out_idx[b_idx]]] = float(params[2]) retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' @@ -685,7 +686,7 @@ def _get_dbeeq(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -697,9 +698,9 @@ def _get_dbeeq(self, params): #print(board["EQ"]) #print(board["EQ"][a[out_idx[b_idx]]]) - if board["Status"] != 0: + if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: retval += (f'ACK {params[1]} BOARD {board["Address"]} ' f'EQ {a[out_idx[b_idx]]} VALUE ' @@ -719,7 +720,7 @@ def _set_dbebpf(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -730,12 +731,12 @@ def _set_dbebpf(self, params): #print(brd) #print(board["BPF"]) #print(board["BPF"][a[out_idx[b_idx]]]) - if (float(params[2]) not in [0,1]): + if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} value out of range\n') - elif board["Status"] != 0: + f'{self.boards.index(board)+1} value out of range\n') + elif ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: board["BPF"][a[out_idx[b_idx]]] = float(params[2]) retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' @@ -753,7 +754,7 @@ def _get_dbebpf(self, params): if v == params[1]: out_idx.append(i) boardx = next((sub for sub in self.boards - if int(sub['Address']) == int(b[i])), None) + if (self.boards.index(sub) + 1) == int(b[i])), None) selected_boards.append(boardx) if len(selected_boards) == 0: return self._error(params[0], 1015) @@ -761,13 +762,13 @@ def _get_dbebpf(self, params): brd, a = zip(*self.bpfs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["BPF"]) - #print(board["BPF"][a[out_idx[b_idx]]]) +# print(brd) +# print(board["BPF"]) +# print(board["BPF"][a[out_idx[b_idx]]]) - if board["Status"] != 0: + if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' - f'{board["Address"]} unreachable\n') + f'{self.boards.index(board)+1} unreachable\n') else: retval += (f'ACK {params[1]} BOARD {board["Address"]} ' f'BPF {a[out_idx[b_idx]]} VALUE ' diff --git a/tests/test_dbesm.py b/tests/test_dbesm.py index 8a08fc5..bd7f09d 100644 --- a/tests/test_dbesm.py +++ b/tests/test_dbesm.py @@ -18,8 +18,8 @@ def _disable_board(self, err_board=3): self._send(disable_msg) def _noTemp_board(self, err_board=3): - noTemp_msg = f'DBE SETSTATUS BOARD {err_board} VALUE 2\x0D\x0A' - self._send(noTemp_msg) + noTemp_msg = f'DBE SETSTATUS BOARD {err_board} VALUE 2\x0D\x0A' + self._send(noTemp_msg) def _test_all_boards(self, response, err_board=999, diag=False): for board in range(1, 5): @@ -1021,10 +1021,8 @@ def test_nak_getdbeatt(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - # SETDBEAMP - def test_setdbeamp_single_on(self, out_dbe='1_DBBC2'): message = f"DBE SETDBEAMP {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1064,7 +1062,7 @@ def test_setdbeamp_single_FloatValErr(self, out_dbe='1_DBBC2'): print(response) self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - + def test_setdbeamp_mult_IntValErr(self, out_dbe='prova'): message = f"DBE SETDBEAMP {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1163,10 +1161,8 @@ def test_nak_getdbeamp(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - # SETDBEEQ - def test_setdbeeq_single_on(self, out_dbe='1_DBBC2'): message = f"DBE SETDBEEQ {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1206,7 +1202,7 @@ def test_setdbeeq_single_FloatValErr(self, out_dbe='1_DBBC2'): print(response) self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - + def test_setdbeeq_mult_IntValErr(self, out_dbe='prova'): message = f"DBE SETDBEEQ {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1305,10 +1301,8 @@ def test_nak_getdbeeq(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - # SETDBEBPF - def test_setdbebpf_single_on(self, out_dbe='1_DBBC2'): message = f"DBE SETDBEBPF {out_dbe} 1\x0D\x0A" response = self._send(message) @@ -1348,7 +1342,7 @@ def test_setdbebpf_single_FloatValErr(self, out_dbe='1_DBBC2'): print(response) self.assertRegex(response, f'ERR DBE {out_dbe} BOARD ' '[0-9]+ value out of range\r\n') - + def test_setdbebpf_mult_IntValErr(self, out_dbe='prova'): message = f"DBE SETDBEBPF {out_dbe} 2\x0D\x0A" response = self._send(message) @@ -1447,7 +1441,5 @@ def test_nak_getdbebpf(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') - - if __name__ == '__main__': unittest.main() From 9f132fe6b94d1c9a20761da1a44938aa213a22c1 Mon Sep 17 00:00:00 2001 From: mfioren Date: Tue, 11 Mar 2025 14:18:31 +0000 Subject: [PATCH 4/5] Fix #310, first dbesm working implementation --- simulators/dbesm/__init__.py | 36 +++++++++++++++++------------------- tests/test_dbesm.py | 1 + 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/simulators/dbesm/__init__.py b/simulators/dbesm/__init__.py index 26e599a..c550fa9 100644 --- a/simulators/dbesm/__init__.py +++ b/simulators/dbesm/__init__.py @@ -157,7 +157,7 @@ def _init_att(self): return att def parse(self, byte): -# print(byte) + # print(byte) if byte == self.tail: msg = self.msg[:-1] self.msg = '' @@ -557,7 +557,7 @@ def _get_firm(self, params): return self._error(params[0], 1001) try: selected_board = next((sub for sub in self.boards - if (self.boards.index(sub) +1 ) == int(params[2])), None) + if (self.boards.index(sub) + 1) == int(params[2])), None) except ValueError: return self._error(params[0], 1001) @@ -591,9 +591,9 @@ def _set_dbeamp(self, params): brd, a = zip(*self.amps_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) -# print(brd) -# print(board["AMP"]) -# print(board["AMP"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["AMP"]) + # print(board["AMP"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{self.boards.index(board)+1} value out of range\n') @@ -626,9 +626,9 @@ def _get_dbeamp(self, params): brd, a = zip(*self.amps_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["AMP"]) - #print(board["AMP"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["AMP"]) + # print(board["AMP"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' @@ -639,7 +639,6 @@ def _get_dbeamp(self, params): f'VALUE {board["AMP"][a[out_idx[b_idx]]]}\n') return retval[:-1] + '\x0D\x0A' - def _set_dbeeq(self, params): retval = '' boardx = {} @@ -660,9 +659,9 @@ def _set_dbeeq(self, params): brd, a = zip(*self.eqs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) -# print(brd) -# print(board["EQ"]) -# print(board["EQ"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["EQ"]) + # print(board["EQ"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{self.boards.index(board)+1} value out of range\n') @@ -694,9 +693,9 @@ def _get_dbeeq(self, params): brd, a = zip(*self.eqs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["EQ"]) - #print(board["EQ"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["EQ"]) + # print(board["EQ"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): retval += (f'ERR DBE {params[1]} BOARD ' @@ -707,7 +706,6 @@ def _get_dbeeq(self, params): f'{board["EQ"][a[out_idx[b_idx]]]}\n') return retval[:-1] + '\x0D\x0A' - def _set_dbebpf(self, params): retval = '' boardx = {} @@ -728,9 +726,9 @@ def _set_dbebpf(self, params): brd, a = zip(*self.bpfs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) - #print(brd) - #print(board["BPF"]) - #print(board["BPF"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["BPF"]) + # print(board["BPF"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): retval += (f'ERR DBE {params[1]} BOARD ' f'{self.boards.index(board)+1} value out of range\n') diff --git a/tests/test_dbesm.py b/tests/test_dbesm.py index bd7f09d..7e515bf 100644 --- a/tests/test_dbesm.py +++ b/tests/test_dbesm.py @@ -1441,5 +1441,6 @@ def test_nak_getdbebpf(self): print(response) self.assertEqual(response, 'NAK unknown command\x0D\x0A') + if __name__ == '__main__': unittest.main() From 2e36a9dc0954ecfa72b208ec1c86f424df498e26 Mon Sep 17 00:00:00 2001 From: Giuseppe Carboni Date: Tue, 11 Mar 2025 16:47:25 +0100 Subject: [PATCH 5/5] Fixed Python 3.13.2 linter errors --- simulators/dbesm/__init__.py | 180 +++++++++++++++++++++-------------- 1 file changed, 111 insertions(+), 69 deletions(-) diff --git a/simulators/dbesm/__init__.py b/simulators/dbesm/__init__.py index c550fa9..5787307 100644 --- a/simulators/dbesm/__init__.py +++ b/simulators/dbesm/__init__.py @@ -206,11 +206,13 @@ def _set_allmode(self, params): return self._error(params[0], 1003) for board in self.boards: if board['Status'] == 1: - retval += (f'BOARD {self.boards.index(board)+1} ' - f'ERR DBE BOARD unreachable\n') + retval += ( + f'BOARD {self.boards.index(board) + 1} ' + f'ERR DBE BOARD unreachable\n' + ) else: board["Configuration"] = params[1] - retval += f'BOARD {self.boards.index(board)+1} ACK\n' + retval += f'BOARD {self.boards.index(board) + 1} ACK\n' return retval[:-1] + '\x0D\x0A' def _set_mode(self, params): @@ -273,9 +275,9 @@ def _get_status(self, params): elif selected_board["Status"] == 1: return self._error(params[0], 1005, params[2]) retval = self.ack + '\n' - retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' - retval += f'REG=[ {" ".join(map(str,selected_board["REG"]))} ]\n\n' - retval += f'ATT=[ {" ".join(map(str,selected_board["ATT"])) } ]' + retval += f'BOARD {self.boards.index(selected_board) + 1}\n\n' + retval += f'REG=[ {" ".join(map(str, selected_board["REG"]))} ]\n\n' + retval += f'ATT=[ {" ".join(map(str, selected_board["ATT"]))} ]' return retval + '\x0D\x0A' def _set_att(self, params): @@ -384,7 +386,7 @@ def _all_diag(self, params): return self._error(params[0], 1001) else: for board in self.boards: - retval += f'BOARD {self.boards.index(board)+1} ' + retval += f'BOARD {self.boards.index(board) + 1} ' if board["Status"] == 0: retval += self.ack + '\n' retval += f'5V {board["5V"]} 3V3 {board["3V3"]}\n' @@ -412,15 +414,15 @@ def _diag(self, params): return self._error(params[0], 1005, params[2]) elif selected_board["Status"] == 0: retval = self.ack + '\n' - retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' - retval += f'5V {selected_board["5V"]}' - retval += f' 3V3 {selected_board["3V3"]}\n' + retval += f'BOARD {self.boards.index(selected_board) + 1}\n\n' + retval += f'5V {selected_board["5V"]} ' + retval += f'3V3 {selected_board["3V3"]}\n' retval += f'T0 {selected_board["T0"]}' elif selected_board["Status"] > 1: retval = self.ack + '\n' - retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' - retval += f'5V {selected_board["5V"]}' - retval += f' 3V3 {selected_board["3V3"]}\n' + retval += f'BOARD {self.boards.index(selected_board) + 1}\n\n' + retval += f'5V {selected_board["5V"]} ' + retval += f'3V3 {selected_board["3V3"]}\n' retval += 'temp sensor not present' return retval + '\x0D\x0A' @@ -454,10 +456,10 @@ def _get_comp(self, params): return self._error(params[0], 1005, params[2]) else: retval = 'ACK\n' - retval += f'BOARD {self.boards.index(selected_board)+1}\n\n' - retval += f'AMP=[ {" ".join(map(str,selected_board["AMP"]))} ]\n' - retval += f'EQ=[ {" ".join(map(str,selected_board["EQ"]))} ]\n' - retval += f'BPF=[ {" ".join(map(str,selected_board["BPF"])) } ]' + retval += f'BOARD {self.boards.index(selected_board) + 1}\n\n' + retval += f'AMP=[ {" ".join(map(str, selected_board["AMP"]))} ]\n' + retval += f'EQ=[ {" ".join(map(str, selected_board["EQ"]))} ]\n' + retval += f'BPF=[ {" ".join(map(str, selected_board["BPF"]))} ]' return retval + '\x0D\x0A' def _get_cfg(self, params): @@ -467,7 +469,7 @@ def _get_cfg(self, params): else: retval += self.ack + '\n' for board in self.boards: - retval += f'BOARD {self.boards.index(board)+1} ' + retval += f'BOARD {self.boards.index(board) + 1} ' if (board['Status'] == 0 or (board['Status'] > 1)): retval += f'{board["Configuration"]}\n\n' else: @@ -494,29 +496,37 @@ def _set_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) -# print(brd) -# print(board["ATT"]) -# print(board["ATT"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["ATT"]) + # print(board["ATT"][a[out_idx[b_idx]]]) if ((params[2] == '+3' or params[2] == '-3') and not (0 <= (float(board["ATT"][a[out_idx[b_idx]]]) + float(params[2])) <= 31.5)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} value out of range\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} value out of range\n' + ) elif (params[2] != '+3' and params[2] != '-3' and float(params[2]) not in list(numpy.arange(0, 32, 0.5))): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} value out of range\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} value out of range\n' + ) elif ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: if params[2] == '+3' or params[2] == '-3': board["ATT"][a[out_idx[b_idx]]] += float(params[2]) else: board["ATT"][a[out_idx[b_idx]]] = float(params[2]) - retval += (f'DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} ACK\n') + retval += ( + f'DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} ACK\n' + ) return retval[:-1] + '\x0D\x0A' def _get_dbeatt(self, params): @@ -539,17 +549,22 @@ def _get_dbeatt(self, params): brd, a = zip(*self.atts_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) -# print(brd) -# print(board["ATT"]) -# print(board["ATT"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["ATT"]) + # print(board["ATT"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: - retval += (f'ACK {params[1]} BOARD ' - f'{self.boards.index(board)+1} ATT {a[out_idx[b_idx]]} ' - f'VALUE {board["ATT"][a[out_idx[b_idx]]]}\n') + retval += ( + f'ACK {params[1]} BOARD ' + f'{self.boards.index(board) + 1} ' + f'ATT {a[out_idx[b_idx]]} ' + f'VALUE {board["ATT"][a[out_idx[b_idx]]]}\n' + ) return retval[:-1] + '\x0D\x0A' def _get_firm(self, params): @@ -595,15 +610,21 @@ def _set_dbeamp(self, params): # print(board["AMP"]) # print(board["AMP"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} value out of range\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} value out of range\n' + ) elif ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: board["AMP"][a[out_idx[b_idx]]] = float(params[2]) - retval += (f'DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} ACK\n') + retval += ( + f'DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} ACK\n' + ) return retval[:-1] + '\x0D\x0A' def _get_dbeamp(self, params): @@ -631,12 +652,17 @@ def _get_dbeamp(self, params): # print(board["AMP"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: - retval += (f'ACK {params[1]} BOARD ' - f'{self.boards.index(board)+1} AMP {a[out_idx[b_idx]]} ' - f'VALUE {board["AMP"][a[out_idx[b_idx]]]}\n') + retval += ( + f'ACK {params[1]} BOARD ' + f'{self.boards.index(board) + 1} ' + f'AMP {a[out_idx[b_idx]]} ' + f'VALUE {board["AMP"][a[out_idx[b_idx]]]}\n' + ) return retval[:-1] + '\x0D\x0A' def _set_dbeeq(self, params): @@ -663,11 +689,15 @@ def _set_dbeeq(self, params): # print(board["EQ"]) # print(board["EQ"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} value out of range\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} value out of range\n' + ) elif ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: board["EQ"][a[out_idx[b_idx]]] = float(params[2]) retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' @@ -698,12 +728,16 @@ def _get_dbeeq(self, params): # print(board["EQ"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: - retval += (f'ACK {params[1]} BOARD {board["Address"]} ' - f'EQ {a[out_idx[b_idx]]} VALUE ' - f'{board["EQ"][a[out_idx[b_idx]]]}\n') + retval += ( + f'ACK {params[1]} BOARD {board["Address"]} ' + f'EQ {a[out_idx[b_idx]]} VALUE ' + f'{board["EQ"][a[out_idx[b_idx]]]}\n' + ) return retval[:-1] + '\x0D\x0A' def _set_dbebpf(self, params): @@ -730,11 +764,15 @@ def _set_dbebpf(self, params): # print(board["BPF"]) # print(board["BPF"][a[out_idx[b_idx]]]) if (float(params[2]) not in [0, 1]): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} value out of range\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} value out of range\n' + ) elif ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: board["BPF"][a[out_idx[b_idx]]] = float(params[2]) retval += f'DBE {params[1]} BOARD {board["Address"]} ACK\n' @@ -760,17 +798,21 @@ def _get_dbebpf(self, params): brd, a = zip(*self.bpfs_in_boards) for board in selected_boards: b_idx = selected_boards.index(board) -# print(brd) -# print(board["BPF"]) -# print(board["BPF"][a[out_idx[b_idx]]]) + # print(brd) + # print(board["BPF"]) + # print(board["BPF"][a[out_idx[b_idx]]]) if ((board["Status"] == 1) or (board["Address"] not in brd)): - retval += (f'ERR DBE {params[1]} BOARD ' - f'{self.boards.index(board)+1} unreachable\n') + retval += ( + f'ERR DBE {params[1]} BOARD ' + f'{self.boards.index(board) + 1} unreachable\n' + ) else: - retval += (f'ACK {params[1]} BOARD {board["Address"]} ' - f'BPF {a[out_idx[b_idx]]} VALUE ' - f'{board["BPF"][a[out_idx[b_idx]]]}\n') + retval += ( + f'ACK {params[1]} BOARD {board["Address"]} ' + f'BPF {a[out_idx[b_idx]]} VALUE ' + f'{board["BPF"][a[out_idx[b_idx]]]}\n' + ) return retval[:-1] + '\x0D\x0A' def _error(self, device_code, error_code, board_address=None):