diff --git a/setup.py b/setup.py index 3b536bc..0a7806c 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ # List run-time dependencies here. These will be installed by pip when # your project is installed. - install_requires=['numpy', 'pandas', 'futures', 'requests_futures'], + install_requires=['numpy', 'pandas', 'requests_futures'], # If there are data files included in your packages that need to be # installed, specify them here. If using Python 2.6 or less, then these @@ -83,6 +83,8 @@ # Note: for creating the source distribution, they had to be included in the # MANIFEST.in as well. package_data={ - 'tmpo': ['tmpo.ipynb', 'LICENSE', 'README.md'], + '': ['LICENSE', 'README.md'], + 'tmpo': ['.flukso.crt'], }, + include_package_data=True, ) diff --git a/tmpo/.flukso.crt b/tmpo/.flukso.crt new file mode 100644 index 0000000..b3c65ab --- /dev/null +++ b/tmpo/.flukso.crt @@ -0,0 +1,22 @@ + +-----BEGIN CERTIFICATE----- +MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV +BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN +MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G +A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 +49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 +SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw +Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u +t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV +12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G +A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap +N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz +bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w +DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 +42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P +j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l +MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o +cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg +TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= +-----END CERTIFICATE----- \ No newline at end of file diff --git a/tmpo/__init__.py b/tmpo/__init__.py index ab1e46c..b649c6b 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,590 +1,10 @@ __title__ = "tmpo" -__version__ = "0.2.10" +__version__ = "0.3.7" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -FLUKSO_CRT = """ ------BEGIN CERTIFICATE----- -MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV -BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN -MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G -A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B -AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 -49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 -SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw -Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u -t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV -12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G -A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap -N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz -bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w -DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 -42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P -j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l -MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o -cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg -TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= ------END CERTIFICATE-----""" - -SQL_SENSOR_TABLE = """ - CREATE TABLE IF NOT EXISTS sensor( - sid TEXT, - token TEXT, - PRIMARY KEY(sid))""" - -SQL_SENSOR_INS = """ - INSERT INTO sensor - (sid, token) - VALUES (?, ?)""" - -SQL_SENSOR_DEL = """ - DELETE FROM sensor - WHERE sid = ?""" - -SQL_TMPO_DEL = """ - DELETE FROM tmpo - WHERE sid = ?""" - -SQL_SENSOR_ALL = """ - SELECT sid - FROM sensor - ORDER BY sid""" - -SQL_SENSOR_TOKEN = """ - SELECT token - FROM sensor - WHERE sid = ?""" - -SQL_TMPO_TABLE = """ - CREATE TABLE IF NOT EXISTS tmpo( - sid TEXT, - rid INTEGER, - lvl INTEGER, - bid INTEGER, - ext TEXT, - created REAL, - data BLOB, - PRIMARY KEY(sid, rid, lvl, bid))""" - -SQL_TMPO_INS = """ - INSERT INTO tmpo - (sid, rid, lvl, bid, ext, created, data) - VALUES (?, ?, ?, ?, ?, ?, ?)""" - -SQL_TMPO_CLEAN = """ - DELETE - FROM tmpo - WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" - -SQL_TMPO_ALL = """ - SELECT sid, rid, lvl, bid, ext, created, data - FROM tmpo - WHERE sid = ? - ORDER BY rid ASC, lvl DESC, bid ASC""" - -SQL_TMPO_LAST = """ - SELECT rid, lvl, bid, ext - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_LAST_DATA = """ - SELECT rid, lvl, bid, ext, data - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_FIRST = """ - SELECT rid, lvl, bid - FROM tmpo - WHERE sid = ? - ORDER BY created ASC, lvl ASC - LIMIT 1""" - -SQL_TMPO_RID_MAX = """ - SELECT MAX(rid) - FROM tmpo - WHERE sid = ?""" - -API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" -API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" - -HTTP_ACCEPT = { - "json": "application/json", - "gz": "application/gzip"} - -RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' -DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" -DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" -EPOCHS_MAX = 2147483647 - - -import os -import sys -import io -import math -import time -import sqlite3 -import requests_futures.sessions -import concurrent.futures -import zlib -import re -import json -import numpy as np -import pandas as pd -from functools import wraps - - -def dbcon(func): - """Set up connection before executing function, commit and close connection - afterwards. Unless a connection already has been created.""" - @wraps(func) - def wrapper(*args, **kwargs): - self = args[0] - if self.dbcon is None: - # set up connection - self.dbcon = sqlite3.connect(self.db) - self.dbcur = self.dbcon.cursor() - self.dbcur.execute(SQL_SENSOR_TABLE) - self.dbcur.execute(SQL_TMPO_TABLE) - - # execute function - try: - result = func(*args, **kwargs) - except Exception as e: - # on exception, first close connection and then raise - self.dbcon.rollback() - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - raise e - else: - # commit everything and close connection - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - else: - result = func(*args, **kwargs) - return result - return wrapper - - -class Session(): - def __init__(self, path=None, workers=16): - """ - Parameters - ---------- - path : str, optional - location for the database - workers : int - default 16 - """ - self.debug = False - if path is None: - path = os.path.expanduser("~") # $HOME - if sys.platform == "win32": - self.home = os.path.join(path, "tmpo") - else: - self.home = os.path.join(path, ".tmpo") - self.db = os.path.join(self.home, "tmpo.sqlite3") - self.crt = os.path.join(self.home, "flukso.crt") - self.host = "api.flukso.net" - try: - os.mkdir(self.home) - except OSError: # dir exists - pass - else: - with io.open(self.crt, "wb") as f: - f.write(FLUKSO_CRT.encode("ascii")) - self.rqs = requests_futures.sessions.FuturesSession( - executor=concurrent.futures.ThreadPoolExecutor( - max_workers=workers)) - self.rqs.headers.update({"X-Version": "1.0"}) - self.dbcon = None - self.dbcur = None - - @dbcon - def add(self, sid, token): - """ - Add new sensor to the database - - Parameters - ---------- - sid : str - SensorId - token : str - """ - try: - self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) - except sqlite3.IntegrityError: # sensor entry exists - pass - - @dbcon - def remove(self, sid): - """ - Remove sensor from the database - - Parameters - ---------- - sid : str - SensorID - """ - self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def reset(self, sid): - """ - Removes all tmpo blocks for a given sensor, but keeps sensor table - intact, so sensor id and token remain in the database. - - Parameters - ---------- - sid : str - """ - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def sync(self, *sids): - """ - Synchronise data - - Parameters - ---------- - sids : list of str - SensorIDs to sync - Optional, leave empty to sync everything - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - for sid in sids: - self.dbcur.execute(SQL_TMPO_LAST, (sid,)) - last = self.dbcur.fetchone() - if last: - rid, lvl, bid, ext = last - self._clean(sid, rid, lvl, bid) - # prevent needless polling - if time.time() < bid + 256: - return - else: - rid, lvl, bid = 0, 0, 0 - self._req_sync(sid, rid, lvl, bid) - - @dbcon - def list(self, *sids): - """ - List all tmpo-blocks in the database - - Parameters - ---------- - sids : list of str - SensorID's for which to list blocks - Optional, leave empty to get them all - - Returns - ------- - list[list[tuple]] - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - slist = [] - for sid in sids: - tlist = [] - for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): - tlist.append(tmpo) - sid, rid, lvl, bid, ext, ctd, blk = tmpo - self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) - slist.append(tlist) - return slist - - @dbcon - def series(self, sid, recycle_id=None, head=None, tail=None, - datetime=True): - """ - Create data Series - - Parameters - ---------- - sid : str - recycle_id : optional - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.Series - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - if recycle_id is None: - self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) - recycle_id = self.dbcur.fetchone()[0] - tlist = self.list(sid)[0] - srlist = [] - for _sid, rid, lvl, bid, ext, ctd, blk in tlist: - if (recycle_id == rid - and head < self._blocktail(lvl, bid) - and tail >= bid): - srlist.append(self._blk2series(ext, blk, head, tail)) - if len(srlist) > 0: - ts = pd.concat(srlist) - ts.name = sid - if datetime is True: - ts.index = pd.to_datetime(ts.index, unit="s", utc=True) - return ts - else: - return pd.Series([], name=sid) - - @dbcon - def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): - """ - Create data frame - - Parameters - ---------- - sids : list[str] - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.DataFrame - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - series = [self.series(sid, head=head, tail=tail, datetime=False) - for sid in sids] - df = pd.concat(series, axis=1) - if datetime is True: - df.index = pd.to_datetime(df.index, unit="s", utc=True) - return df - - @dbcon - def first_timestamp(self, sid, epoch=False): - """ - Get the first available timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() - if first_block is None: - return None - - timestamp = first_block[2] - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - return timestamp - - def last_timestamp(self, sid, epoch=False): - """ - Get the theoretical last timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - timestamp, value = self.last_datapoint(sid, epoch) - return timestamp - - def last_datapoint(self, sid, epoch=False): - """ - Parameters - ---------- - sid : str - SensorId - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int, float - """ - block = self._last_block(sid) - if block is None: - return None, None - - header = block['h'] - timestamp, value = header['tail'] - - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - - return timestamp, value - - @dbcon - def _last_block(self, sid): - cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) - row = cur.fetchone() - if row is None: - return None - - rid, lvl, bid, ext, blk = row - - jblk = self._decompress_block(blk, ext) - data = json.loads(jblk.decode('UTF-8')) - return data - - def _decompress_block(self, blk, ext): - if ext != "gz": - raise NotImplementedError("Compression type not supported in tmpo") - jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding - return jblk - - def _2epochs(self, time): - if isinstance(time, pd.Timestamp): - return int(math.floor(time.value / 1e9)) - elif isinstance(time, int): - return time - else: - raise NotImplementedError("Time format not supported. " + - "Use epochs or a Pandas timestamp.") - - def _blk2series(self, ext, blk, head, tail): - jblk = self._decompress_block(blk, ext) - m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) - pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) - try: - pdsblk = pd.read_json( - pdjblk, - typ="series", - dtype="float", - orient="split", - numpy=True, - date_unit="s") - except: - return pd.Series() - h = json.loads(m.group("h")) - self._npdelta(pdsblk.index, h["head"][0]) - self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] - return pdsblk_truncated - - def _npdelta(self, a, delta): - """Numpy: Modifying Array Values - http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" - for x in np.nditer(a, op_flags=["readwrite"]): - delta += x - x[...] = delta - return a - - def _req_sync(self, sid, rid, lvl, bid): - self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) - token, = self.dbcur.fetchone() - headers = { - "Accept": HTTP_ACCEPT["json"], - "X-Token": token} - params = { - "rid": rid, - "lvl": lvl, - "bid": bid} - f = self.rqs.get( - API_TMPO_SYNC % (self.host, sid), - headers=headers, - params=params, - verify=self.crt) - r = f.result() - r.raise_for_status() - fs = [] - for t in r.json(): - fs.append((t, self._req_block( - sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) - for (t, f) in fs: - self._write_block( - f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) - - def _req_block(self, sid, token, rid, lvl, bid, ext): - headers = { - "Accept": HTTP_ACCEPT["gz"], - "X-Token": token} - f = self.rqs.get( - API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), - headers=headers, - verify=self.crt) - self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) - return f - - def _write_block(self, r, sid, rid, lvl, bid, ext): - blk = sqlite3.Binary(r.content) - now = time.time() - self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) - self._clean(sid, rid, lvl, bid) - self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) - - def _clean(self, sid, rid, lvl, bid): - if lvl == 8: - return - lastchild = self._lastchild(lvl, bid) - self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) - self._clean(sid, rid, lvl - 4, lastchild) - - def _lastchild(self, lvl, bid): - delta = math.trunc(2 ** (lvl - 4)) - return bid + 15 * delta - - def _blocktail(self, lvl, bid): - delta = math.trunc(2 ** lvl) - return bid + delta - - def _dprintf(self, fmt, *args): - if self.debug: - print(fmt % args) +from .sqlitesession import SQLiteSession as Session +from .sqlitesession import SQLiteSession +from .apisession import APISession diff --git a/tmpo/apisession.py b/tmpo/apisession.py new file mode 100644 index 0000000..7c20a53 --- /dev/null +++ b/tmpo/apisession.py @@ -0,0 +1,425 @@ +import os +import requests_futures.sessions +import concurrent.futures +import time +import math +import pandas as pd +import re +import zlib +import json +import numpy as np + +HTTP_ACCEPT = { + "json": "application/json", + "gz": "application/gzip"} + +API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" +API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" + +DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" +RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' +EPOCHS_MAX = 2147483647 + + +class APISession: + """ + Get Flukso Data (aka. TMPO Blocks) directly from the Flukso API + """ + def __init__(self, workers=16, cert=None): + """ + Parameters + ---------- + workers : int + default 16 + """ + self.debug = False + + package_dir = os.path.dirname(__file__) + if cert != False: + self.crt = os.path.join(package_dir, ".flukso.crt") + else: + self.crt = cert + self.host = "api.flukso.net" + + self.rqs = requests_futures.sessions.FuturesSession( + executor=concurrent.futures.ThreadPoolExecutor( + max_workers=workers)) + self.rqs.headers.update({"X-Version": "1.0"}) + + self.sensors = {} + + def add(self, sid, token): + """ + Add a sensor and token to the client, so you don't have to enter the + token for each call (also for compatibility with sqlitesession) + + Parameters + ---------- + sid : str + token : str + """ + self.sensors.update({sid: token}) + + def series(self, sid, recycle_id=None, head=None, tail=None, + datetime=True, token=None): + """ + Create data Series + + Parameters + ---------- + sid : str + token : str, optional + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.Series + """ + token = token if token else self.sensors[sid] + + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + blist = self._req_blocklist(sid=sid, token=token, rid=recycle_id) + blist = self._slice_blist(blist=blist, head=head, tail=tail) + if len(blist) == 0: + return pd.Series([], name=sid) + blocks_futures = self._req_blocks(sid=sid, token=token, blist=blist) + + def parse_blocks(): + for h, future in blocks_futures: + r = future.result() + r.raise_for_status() + _ts = self._blk2series(ext=h['ext'], blk=r.content, + head=head, tail=tail) + yield _ts + + blocks = parse_blocks() + ts = pd.concat(blocks) + if ts.empty: + return pd.Series([], name=sid) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit='s', utc=True) + return ts + + def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): + """ + Create data frame + + Parameters + ---------- + sids : [str] + List of SensorID's + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.DataFrame + """ + series = (self.series(sid, head=head, tail=tail, datetime=False) + for sid in sids) + df = pd.concat(series, axis=1) + if datetime is True: + df.index = pd.to_datetime(df.index, unit="s", utc=True) + return df + + def first_timestamp(self, sid, token=None, epoch=False): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + token : str, optional + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + token = token if token else self.sensors[sid] + + blist = self._req_blocklist(sid=sid, token=token) + if len(blist) == 0: + return None + first = blist[0]['bid'] + if not epoch: + first = self._epoch2timestamp(first) + return first + + def last_timestamp(self, sid, token=None, epoch=False): + """ + Get the last timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + token: str, optional + token : str + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + last, _v = self.last_datapoint(sid=sid, token=token, epoch=epoch) + return last + + def last_datapoint(self, sid, token=None, epoch=False): + """ + Parameters + ---------- + sid : str + SensorId + token : str, optional + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + (pd.Timestamp, float) or (int, float) + """ + block = self._last_block(sid=sid, token=token) + if block is None: + return None, None + + header = block['h'] + timestamp, value = header['tail'] + + if not epoch: + timestamp = self._epoch2timestamp(timestamp) + + return timestamp, value + + def _last_block(self, sid, token=None): + token = token if token else self.sensors[sid] + + blist = self._req_blocklist(sid=sid, token=token) + last = blist[-1] + bf = self._req_block(sid=sid, token=token, rid=last['rid'], + lvl=last['lvl'], bid=last['bid'], ext=last['ext']) + r = bf.result() + r.raise_for_status() + + jblk = self._decompress_block(blk=r.content, ext=last['ext']) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _epoch2timestamp(self, epoch): + timestamp = pd.Timestamp.utcfromtimestamp(epoch) + timestamp = timestamp.tz_localize('UTC') + return timestamp + + def _2epochs(self, time): + if isinstance(time, pd.Timestamp): + return int(math.floor(time.value / 1e9)) + elif isinstance(time, int): + return time + else: + raise NotImplementedError("Time format not supported. " + + "Use epochs or a Pandas timestamp.") + + def _blk2series(self, ext, blk, head, tail): + jblk = self._decompress_block(blk, ext) + m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + if m is None: + return pd.Series() + pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) + try: + pdsblk = pd.read_json( + pdjblk, + typ="series", + dtype="float", + orient="split", + date_unit="s") + except: + return pd.Series() + h = json.loads(m.group("h")) + self._npdelta(pdsblk.index, h["head"][0]) + self._npdelta(pdsblk, h["head"][1]) + pdsblk_truncated = pdsblk + try: + pdsblk_truncated = pdsblk_truncated.loc[head:] + except TypeError: + pass + try: + pdsblk_truncated = pdsblk_truncated.loc[:tail] + except TypeError: + pass + return pdsblk_truncated + + def _decompress_block(self, blk, ext): + if ext != "gz": + raise NotImplementedError("Compression type not supported in tmpo") + jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding + return jblk + + def _req_blocklist(self, sid, token, rid=0, lvl=0, bid=0): + """ + Request the list of available blocks from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + + Returns + ------- + [dict] + """ + headers = { + "Accept": HTTP_ACCEPT["json"], + "X-Token": token} + params = { + "rid": rid if rid else 0, + "lvl": lvl if rid else 0, + "bid": bid if rid else 0} + f = self.rqs.get( + API_TMPO_SYNC % ("api.flukso.net", sid), + headers=headers, + params=params, + verify=self.crt + ) + r = f.result() + r.raise_for_status() + j = r.json() + blist = sorted(j, key=lambda x: x['bid']) + return blist + + def _req_block(self, sid, token, rid, lvl, bid, ext): + """ + Request a block (as a request future) from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + ext : str + + Returns + ------- + Response + """ + headers = { + "Accept": HTTP_ACCEPT["gz"], + "X-Token": token} + f = self.rqs.get( + API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), + headers=headers, + verify=self.crt) + self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) + return f + + def _req_blocks(self, sid, token, blist): + """ + Request multiple blocks (as a list of request futures) + + Parameters + ---------- + sid : str + token : str + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + + Returns + ------- + (dict, Response) + """ + fs = [] + for t in blist: + fs.append( + (t, self._req_block(sid=sid, token=token, rid=t['rid'], + lvl=t['lvl'], bid=t['bid'], ext=t['ext']))) + return fs + + @staticmethod + def _npdelta(a, delta): + """Numpy: Modifying Array Values + http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" + for x in np.nditer(a, op_flags=["readwrite"]): + delta += x + x[...] = delta + return a + + def _lastchild(self, lvl, bid): + delta = math.trunc(2 ** (lvl - 4)) + return bid + 15 * delta + + def _blocktail(self, lvl, bid): + delta = math.trunc(2 ** lvl) + return bid + delta + + def _dprintf(self, fmt, *args): + if self.debug: + print(fmt % args) + + def _slice_blist(self, blist, head, tail): + """ + Slice a blist (block headers) so it contains all blocks with data + between head and tail + + Parameters + ---------- + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + head : int + epochs + tail : int + epochs + + Returns + ------- + [dict] + """ + ret = [] + for b in blist: + if head < self._blocktail(lvl=b['lvl'], bid=b['bid']) and tail >= b['bid']: + ret.append(b) + return ret diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py new file mode 100644 index 0000000..f9006fc --- /dev/null +++ b/tmpo/sqlitesession.py @@ -0,0 +1,372 @@ +SQL_SENSOR_TABLE = """ + CREATE TABLE IF NOT EXISTS sensor( + sid TEXT, + token TEXT, + PRIMARY KEY(sid))""" + +SQL_SENSOR_INS = """ + INSERT INTO sensor + (sid, token) + VALUES (?, ?)""" + +SQL_SENSOR_DEL = """ + DELETE FROM sensor + WHERE sid = ?""" + +SQL_TMPO_DEL = """ + DELETE FROM tmpo + WHERE sid = ?""" + +SQL_SENSOR_ALL = """ + SELECT sid + FROM sensor + ORDER BY sid""" + +SQL_SENSOR_TOKEN = """ + SELECT token + FROM sensor + WHERE sid = ?""" + +SQL_TMPO_TABLE = """ + CREATE TABLE IF NOT EXISTS tmpo( + sid TEXT, + rid INTEGER, + lvl INTEGER, + bid INTEGER, + ext TEXT, + created REAL, + data BLOB, + PRIMARY KEY(sid, rid, lvl, bid))""" + +SQL_TMPO_INS = """ + INSERT INTO tmpo + (sid, rid, lvl, bid, ext, created, data) + VALUES (?, ?, ?, ?, ?, ?, ?)""" + +SQL_TMPO_CLEAN = """ + DELETE + FROM tmpo + WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" + +SQL_TMPO_ALL = """ + SELECT sid, rid, lvl, bid, ext, created, data + FROM tmpo + WHERE sid = ? + ORDER BY rid ASC, lvl DESC, bid ASC""" + +SQL_TMPO_LAST = """ + SELECT rid, lvl, bid, ext + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_LAST_DATA = """ + SELECT rid, lvl, bid, ext, data + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_FIRST = """ + SELECT rid, lvl, bid + FROM tmpo + WHERE sid = ? + ORDER BY created ASC, lvl ASC + LIMIT 1""" + +SQL_TMPO_RID_MAX = """ + SELECT MAX(rid) + FROM tmpo + WHERE sid = ?""" + +DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" + + +import os +import sys +import time +import sqlite3 +import json +import pandas as pd +from functools import wraps +from .apisession import APISession, EPOCHS_MAX + + +def dbcon(func): + """Set up connection before executing function, commit and close connection + afterwards. Unless a connection already has been created.""" + @wraps(func) + def wrapper(*args, **kwargs): + self = args[0] + if self.dbcon is None: + # set up connection + self.dbcon = sqlite3.connect(self.db) + self.dbcur = self.dbcon.cursor() + self.dbcur.execute(SQL_SENSOR_TABLE) + self.dbcur.execute(SQL_TMPO_TABLE) + + # execute function + try: + result = func(*args, **kwargs) + except Exception as e: + # on exception, first close connection and then raise + self.dbcon.rollback() + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + raise e + else: + # commit everything and close connection + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + else: + result = func(*args, **kwargs) + return result + return wrapper + + +class SQLiteSession(APISession): + def __init__(self, path=None, workers=16, cert=None): + """ + Parameters + ---------- + path : str, optional + location for the sqlite database + workers : int + default 16 + """ + super(SQLiteSession, self).__init__(workers=workers, cert=cert) + + if path is None: + path = os.path.expanduser("~") # $HOME + if sys.platform == "win32": + self.home = os.path.join(path, "tmpo") + else: + self.home = os.path.join(path, ".tmpo") + self.db = os.path.join(self.home, "tmpo.sqlite3") + + try: + os.mkdir(self.home) + except OSError: # dir exists + pass + + self.dbcon = None + self.dbcur = None + + @dbcon + def add(self, sid, token): + """ + Add new sensor to the database + + Parameters + ---------- + sid : str + SensorId + token : str + """ + try: + self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) + except sqlite3.IntegrityError: # sensor entry exists + pass + + @dbcon + def remove(self, sid): + """ + Remove sensor from the database + + Parameters + ---------- + sid : str + SensorID + """ + self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def reset(self, sid): + """ + Removes all tmpo blocks for a given sensor, but keeps sensor table + intact, so sensor id and token remain in the database. + + Parameters + ---------- + sid : str + """ + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def sync(self, *sids): + """ + Synchronise data + + Parameters + ---------- + sids : list of str + SensorIDs to sync + Optional, leave empty to sync everything + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + for sid in sids: + self.dbcur.execute(SQL_TMPO_LAST, (sid,)) + last = self.dbcur.fetchone() + if last: + rid, lvl, bid, ext = last + self._clean(sid, rid, lvl, bid) + # prevent needless polling + if time.time() < bid + 256: + return + else: + rid, lvl, bid = 0, 0, 0 + self._req_sync(sid, rid, lvl, bid) + + @dbcon + def list(self, *sids): + """ + List all tmpo-blocks in the database + + Parameters + ---------- + sids : list of str + SensorID's for which to list blocks + Optional, leave empty to get them all + + Returns + ------- + list[list[tuple]] + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + slist = [] + for sid in sids: + tlist = [] + for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): + tlist.append(tmpo) + sid, rid, lvl, bid, ext, ctd, blk = tmpo + self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) + slist.append(tlist) + return slist + + @dbcon + def series(self, sid, recycle_id=None, head=None, tail=None, + datetime=True, **kwargs): + """ + Create data Series + + Parameters + ---------- + sid : str + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pandas.Series + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + if recycle_id is None: + self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) + recycle_id = self.dbcur.fetchone()[0] + tlist = self.list(sid)[0] + srlist = [] + for _sid, rid, lvl, bid, ext, ctd, blk in tlist: + if (recycle_id == rid + and head < self._blocktail(lvl, bid) + and tail >= bid): + srlist.append(self._blk2series(ext, blk, head, tail)) + if len(srlist) > 0: + ts = pd.concat(srlist) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit="s", utc=True) + return ts + else: + return pd.Series([], name=sid) + + @dbcon + def first_timestamp(self, sid, epoch=False, **kwargs): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() + if first_block is None: + return None + + timestamp = first_block[2] + if not epoch: + timestamp = self._epoch2timestamp(timestamp) + return timestamp + + @dbcon + def _last_block(self, sid, **kwargs): + cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) + row = cur.fetchone() + if row is None: + return None + + rid, lvl, bid, ext, blk = row + + jblk = self._decompress_block(blk, ext) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _req_sync(self, sid, rid, lvl, bid): + self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) + token, = self.dbcur.fetchone() + blist = self._req_blocklist(sid=sid, token=token, rid=rid, lvl=lvl, + bid=bid) + fs = self._req_blocks(sid=sid, token=token, blist=blist) + for (t, f) in fs: + self._write_block( + f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) + + def _write_block(self, r, sid, rid, lvl, bid, ext): + blk = sqlite3.Binary(r.content) + now = time.time() + try: + self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) + except sqlite3.IntegrityError: + pass + self._clean(sid, rid, lvl, bid) + self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) + + def _clean(self, sid, rid, lvl, bid): + if lvl == 8: + return + lastchild = self._lastchild(lvl, bid) + self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) + self._clean(sid, rid, lvl - 4, lastchild)