Skip to content

Commit d803133

Browse files
committed
db sync over mqtt
1 parent 7159d10 commit d803133

File tree

3 files changed

+98
-13
lines changed

3 files changed

+98
-13
lines changed

configuration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ def __init__(self):
66
self.mqtt_command_topic = ""
77
self.mqtt_response_topic = ""
88
self.mqtt_json_topic = ""
9+
self.mqtt_sync_request_topic = ""
910
self.mongo_url = ""
1011
self.mongo_collection = ""

mq.py

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import glob
2+
import sqlite3
13
import time
24
import os
35
from datetime import datetime
@@ -86,12 +88,13 @@ def run(self):
8688
def on_connect(self, client: mqtt.Client, userdata, flags, rc):
8789
self.send_msg("Well hello there")
8890
client.subscribe(self.configuration.mqtt_command_topic, qos=2)
91+
client.subscribe(self.configuration.mqtt_sync_request_topic, qos=1)
8992
self.ntp_update()
9093

9194
def on_message(self, client, userdata, message: mqtt.MQTTMessage):
92-
if message.topic == self.configuration.mqtt_command_topic:
93-
try:
94-
cmd_str = message.payload.decode()
95+
try:
96+
cmd_str = message.payload.decode()
97+
if message.topic == self.configuration.mqtt_command_topic:
9598
cmd_split = cmd_str.split(' ')
9699
if cmd_split[0] == "temp":
97100
temp_rate = self.fix_decimal(cmd_split[1])
@@ -105,13 +108,23 @@ def on_message(self, client, userdata, message: mqtt.MQTTMessage):
105108
if len(cmd_split) > 2:
106109
pulse_interval = int(cmd_split[2])
107110
self.set_insulin_bolus(bolus, pulse_interval)
111+
elif cmd_split[0] == "status":
112+
self.pod_check_event.set()
108113
elif cmd_split[0] == "reboot":
109114
self.send_msg("sir yes sir")
110115
os.system('sudo shutdown -r now')
111116
else:
112117
self.send_msg("lol what?")
113-
except:
114-
self.send_msg("that didn't seem right")
118+
elif message.topic == self.configuration.mqtt_sync_request_topic:
119+
if cmd_str == "latest":
120+
self.send_result(self.i_pod)
121+
else:
122+
spl = cmd_str.split(' ')
123+
pod_id = spl[0]
124+
req_ids = spl[1:]
125+
self.fill_request(pod_id, req_ids)
126+
except:
127+
self.send_msg("that didn't seem right")
115128

116129
def on_disconnect(self, client, userdata, rc):
117130
self.logger.info("Disconnected from mqtt server")
@@ -172,17 +185,26 @@ def pdm_loop_main(self):
172185
if not self.dry_run:
173186
self.i_pdm.deactivate_pod()
174187
self.send_msg("all is well, all is good")
175-
self.send_result(self.i_pod)
176188
check_wait = 3600
177189
except:
178190
self.send_msg("deactivation failed")
179191
check_wait = 1
192+
finally:
193+
self.send_result(self.i_pod)
180194
continue
181195

182196
try:
183197
self.send_msg("checking pod status")
184198
if not self.dry_run:
185-
self.i_pdm.update_status()
199+
try:
200+
self.i_pdm.update_status()
201+
except:
202+
self.send_msg("failed to get pod status")
203+
check_wait = 60
204+
continue
205+
finally:
206+
self.send_result(self.i_pod)
207+
186208
if self.i_pod.state_faulted:
187209
self.send_msg("pod is faulted! oh my")
188210
check_wait = 1
@@ -198,6 +220,8 @@ def pdm_loop_main(self):
198220
except:
199221
self.send_msg("couldn't reach pod I guess?")
200222
check_wait = 300
223+
finally:
224+
self.send_result(self.i_pod)
201225

202226
with self.pod_request_lock:
203227
if self.i_rate_requested is not None:
@@ -265,14 +289,15 @@ def fix_decimal(self, f):
265289

266290
def send_result(self, pod):
267291
msg = pod.GetString()
268-
self.logger.info(msg)
292+
if pod.pod_id is None:
293+
return
269294
self.client.publish(self.configuration.mqtt_json_topic,
270-
payload=msg, qos=2)
295+
payload=msg, qos=1)
271296

272297
def send_msg(self, msg):
273298
self.logger.info(msg)
274299
self.client.publish(self.configuration.mqtt_response_topic,
275-
payload="%s (%s UTC)" % (msg, datetime.utcnow().strftime("%H:%M:%S")), qos=2)
300+
payload=msg, qos=1)
276301

277302
def ntp_update(self):
278303
if self.dry_run:
@@ -292,6 +317,50 @@ def ntp_update(self):
292317
except:
293318
self.logger.info("update failed")
294319

320+
def fill_request(self, pod_id, req_ids):
321+
db_path = self.find_db_path(pod_id)
322+
if db_path is None:
323+
self.send_msg("but I can't?")
324+
return
325+
326+
with sqlite3.connect(db_path) as conn:
327+
for req_id in req_ids:
328+
req_id = int(req_id)
329+
cursor = conn.execute("SELECT rowid, timestamp, pod_json FROM pod_history WHERE rowid = " + str(req_id))
330+
row = cursor.fetchone()
331+
if row is not None:
332+
js = json.loads(row[2])
333+
js["pod_id"] = pod_id
334+
js["last_command_db_id"] = row[0]
335+
js["last_command_db_ts"] = row[1]
336+
337+
self.client.publish(self.configuration.mqtt_json_topic,
338+
payload=json.dumps(js), qos=0)
339+
cursor.close()
340+
341+
def find_db_path(self, pod_id):
342+
self.i_pod._fix_pod_id()
343+
if self.i_pod.pod_id == pod_id:
344+
return "/home/pi/omnipy/data/pod.db"
345+
346+
found_db_path=None
347+
for db_path in glob.glob("/home/pi/omnipy/data/*.db"):
348+
with sqlite3.connect(db_path) as conn:
349+
cursor = conn.execute("SELECT pod_json FROM pod_history WHERE pod_state > 0 LIMIT 1")
350+
row = cursor.fetchone()
351+
if row is not None:
352+
js = json.loads(row[0])
353+
if "pod_id" not in js or js["pod_id"] is None:
354+
found_id = "L" + str(js["id_lot"]) + "T" + str(js["id_t"])
355+
else:
356+
found_id = js["pod_id"]
357+
358+
if found_id == pod_id:
359+
found_db_path = db_path
360+
break
361+
cursor.close()
362+
return found_db_path
363+
295364
if __name__ == '__main__':
296365
operator = MqOperator()
297366
operator.run()

podcomm/pod.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class Pod:
77
def __init__(self):
88
self.db_migrated = False
99

10+
self.pod_id = None
1011
self.id_lot = None
1112
self.id_t = None
1213
self.id_version_pm = None
@@ -67,6 +68,7 @@ def __init__(self):
6768

6869
self.last_command = None
6970
self.last_command_db_id = None
71+
self.last_command_db_ts = None
7072
self.last_enacted_temp_basal_start = None
7173
self.last_enacted_temp_basal_duration = None
7274
self.last_enacted_temp_basal_amount = None
@@ -75,6 +77,8 @@ def __init__(self):
7577

7678

7779
def Save(self, save_as = None):
80+
self._fix_pod_id()
81+
7882
if save_as is not None:
7983
self.path = save_as + POD_FILE_SUFFIX
8084
self.path_db = save_as + POD_DB_SUFFIX
@@ -84,7 +88,7 @@ def Save(self, save_as = None):
8488
self.path_db = POD_FILE + POD_DB_SUFFIX
8589

8690
try:
87-
self.last_command_db_id = self.log()
91+
self.last_command_db_id, self.last_command_db_ts = self.log()
8892
except:
8993
pass
9094

@@ -110,6 +114,9 @@ def Load(path, db_path=None):
110114

111115
p.id_lot = d.get("id_lot", None)
112116
p.id_t = d.get("id_t", None)
117+
p.pod_id = d.get("pod_id", None)
118+
p._fix_pod_id()
119+
113120
p.id_version_pm = d.get("id_version_pm", None)
114121
p.id_version_pi = d.get("id_version_pi", None)
115122
p.id_version_unknown_byte = d.get("id_version_unknown_byte", None)
@@ -178,6 +185,7 @@ def is_active(self):
178185

179186

180187
def __str__(self):
188+
self._fix_pod_id()
181189
return json.dumps(self.__dict__, indent=4, sort_keys=True)
182190

183191
def _get_conn(self):
@@ -208,19 +216,21 @@ def _ensure_db_structure(self):
208216

209217
def log(self):
210218
try:
219+
self._fix_pod_id()
211220
self._ensure_db_structure()
212221
with self._get_conn() as conn:
213222
sql = """ INSERT INTO pod_history (timestamp, pod_state, pod_minutes, pod_last_command,
214223
insulin_delivered, insulin_canceled, insulin_reservoir, pod_json)
215224
VALUES(?,?,?,?,?,?,?,?) """
216225

217-
values = (time.time(), self.state_progress, self.state_active_minutes,
226+
ts = time.time()
227+
values = (ts, self.state_progress, self.state_active_minutes,
218228
str(self.last_command), self.insulin_delivered, self.insulin_canceled, self.insulin_reservoir,
219229
json.dumps(self.__dict__, indent=4, sort_keys=True))
220230

221231
c = conn.cursor()
222232
c.execute(sql, values)
223-
return c.lastrowid
233+
return c.lastrowid, ts
224234
except:
225235
getLogger().exception("Error while writing to database")
226236

@@ -251,3 +261,8 @@ def get_history(self):
251261
# print(row[4])
252262
except:
253263
getLogger().exception("Error while writing to database")
264+
265+
def _fix_pod_id(self):
266+
if self.pod_id is None:
267+
if self.id_t is not None and self.id_lot is not None:
268+
self.pod_id = "L" + str(self.id_lot) + "T" + str(self.id_t)

0 commit comments

Comments
 (0)