-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathhandler.py
More file actions
57 lines (40 loc) · 1.61 KB
/
handler.py
File metadata and controls
57 lines (40 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from datetime import time
from flask_restful import Resource
import flask
def _defaultAddMeasurementFunc(device, timestamp, type,value, units):
pass
class UploadMeasurementHandler(Resource):
def __init__(self, addMeasurementFunc = _defaultAddMeasurementFunc) -> None:
super().__init__()
self._addMeasurement = addMeasurementFunc
def post(self):
json_input = flask.request.get_json()
deviceId = json_input["device"]
timestamp = json_input["when"]
measurementType = json_input["body"]["type"]
value = json_input["body"]["value"]
units = json_input["body"]["units"]
self._addMeasurement(deviceId, timestamp, measurementType, value, units)
return {"message": "uploaded sensor data"}, 200
def _defaultAddAlertFunc(device, timestamp, type, message):
pass
def _defaultGetAlertFunc(limit=None):
pass
class UploadAlertHandler(Resource):
def __init__(self,
addAlertFunc = _defaultAddAlertFunc,
getAlertFunc=_defaultGetAlertFunc) -> None:
super().__init__()
self._addAlert = addAlertFunc
self._getAlert = getAlertFunc
def post(self):
json_input = flask.request.get_json()
deviceId = json_input["device"]
timestamp = json_input["when"]
alertType = json_input["body"]["type"]
message = json_input["body"]["message"]
self._addAlert(deviceId, timestamp, alertType, message)
return {"message": "uploaded alert"}, 200
def get(self):
val = self._getAlert(limit=50)
return (val, 200,)