From e12a32ab6430f1a8c14971ef30da874c530a3e83 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Dec 2017 21:51:37 +0100 Subject: [PATCH 01/23] merge master (#9) * typo in tz_localize * tmpo delete query * reset method * version bump * Split `SQL_TMPO_LAST` into `SQL_TMPO_LAST` (which now doesn't include data) and `SQL_TMPO_LAST_DATA` (which does include data). Sync uses the one without data to determine from where to start. * pd.tslib.Timestamp will be deprecated, use pd.Timestamp instead * `ix` is deprecated, changed to `loc` * upgrade of notebook * version bump * also update docstrings for Timestamp depreciation --- tmpo.ipynb | 232 ++++++++++++++++++++++++++++------------------- tmpo/__init__.py | 45 ++++++--- 2 files changed, 174 insertions(+), 103 deletions(-) diff --git a/tmpo.ipynb b/tmpo.ipynb index a85d2bd..8855169 100644 --- a/tmpo.ipynb +++ b/tmpo.ipynb @@ -1,95 +1,143 @@ { - "metadata": { - "name": "", - "signature": "sha256:ccf6e7833d8f67f7ad30cf0ce0c9299967deb314eaada60a808b4e7b06b7118b" - }, - "nbformat": 3, - "nbformat_minor": 0, - "worksheets": [ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "%matplotlib inline" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "token = \"b371402dc767cc83e41bc294b63f9586\"\n", + "house = {\n", + " \"cellar\": {\n", + " \"electricity\": \"fed676021dacaaf6a12a8dda7685be34\"\n", + " }\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import os\n", + "import tmpo\n", + "\n", + "s = tmpo.Session(path=os.getcwd())\n", + "for room in house:\n", + " for sensor in house[room]:\n", + " s.add(house[room][sensor], token)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "s.debug = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "s.sync()" + ] + }, { - "cells": [ - { - "cell_type": "code", - "collapsed": false, - "input": [ - "%matplotlib inline" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "token = \"b371402dc767cc83e41bc294b63f9586\"\n", - "house = {\n", - " \"cellar\": {\n", - " \"electricity\": \"fed676021dacaaf6a12a8dda7685be34\"\n", - " }\n", - "}" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "import os\n", - "import tmpo\n", - "\n", - "s = tmpo.Session(path=os.getcwd())\n", - "for room in house:\n", - " for sensor in house[room]:\n", - " s.add(house[room][sensor], token)" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "s.debug = True" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "s.sync()" - ], - "language": "python", - "metadata": {}, - "outputs": [] - }, - { - "cell_type": "code", - "collapsed": false, - "input": [ - "import matplotlib.pyplot as plt\n", - "\n", - "room = \"cellar\"\n", - "plt.figure(figsize=(16,10))\n", - "ax1=plt.subplot()\n", - "ax1.grid()\n", - "ax1.set_ylabel(u'Wh')\n", - "df = s.dataframe([house[room][\"electricity\"]])\n", - "plt.title(room + \" electricity\")\n", - "plt.plot_date(df.index, df, fmt=\"r-\")" - ], - "language": "python", - "metadata": {}, - "outputs": [] - } - ], - "metadata": {} + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import pandas as pd\n", + "start = pd.Timestamp('20160101')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "df = s.dataframe([house[room][\"electricity\"]], head=start)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false, + "scrolled": false + }, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "room = \"cellar\"\n", + "plt.figure(figsize=(16,10))\n", + "ax1=plt.subplot()\n", + "ax1.grid()\n", + "ax1.set_ylabel(u'Wh')\n", + "plt.title(room + \" electricity\")\n", + "df.plot(ax=ax1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] } - ] -} \ No newline at end of file + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.5.0" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 9acec67..ab1e46c 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.2.8" +__version__ = "0.2.10" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" @@ -43,6 +43,10 @@ DELETE FROM sensor WHERE sid = ?""" +SQL_TMPO_DEL = """ + DELETE FROM tmpo + WHERE sid = ?""" + SQL_SENSOR_ALL = """ SELECT sid FROM sensor @@ -81,6 +85,13 @@ ORDER BY rid ASC, lvl DESC, bid ASC""" SQL_TMPO_LAST = """ + SELECT rid, lvl, bid, ext + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_LAST_DATA = """ SELECT rid, lvl, bid, ext, data FROM tmpo WHERE sid = ? @@ -225,6 +236,19 @@ def remove(self, sid): SensorID """ self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def reset(self, sid): + """ + Removes all tmpo blocks for a given sensor, but keeps sensor table + intact, so sensor id and token remain in the database. + + Parameters + ---------- + sid : str + """ + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) @dbcon def sync(self, *sids): @@ -243,7 +267,7 @@ def sync(self, *sids): self.dbcur.execute(SQL_TMPO_LAST, (sid,)) last = self.dbcur.fetchone() if last: - rid, lvl, bid, ext, blk = last + rid, lvl, bid, ext = last self._clean(sid, rid, lvl, bid) # prevent needless polling if time.time() < bid + 256: @@ -289,10 +313,10 @@ def series(self, sid, recycle_id=None, head=None, tail=None, ---------- sid : str recycle_id : optional - head : int | pandas.tslib.Timestamp, optional + head : int | pandas.Timestamp, optional Start of the interval default earliest available - tail : int | pandas.tslib.Timestamp, optional + tail : int | pandas.Timestamp, optional End of the interval default max epoch datetime : bool @@ -340,10 +364,10 @@ def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): Parameters ---------- sids : list[str] - head : int | pandas.tslib.Timestamp, optional + head : int | pandas.Timestamp, optional Start of the interval default earliest available - tail : int | pandas.tslib.Timestamp, optional + tail : int | pandas.Timestamp, optional End of the interval default max epoch datetime : bool @@ -396,7 +420,7 @@ def first_timestamp(self, sid, epoch=False): timestamp = first_block[2] if not epoch: timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz.localize('UTC') + timestamp = timestamp.tz_localize('UTC') return timestamp def last_timestamp(self, sid, epoch=False): @@ -449,7 +473,7 @@ def last_datapoint(self, sid, epoch=False): @dbcon def _last_block(self, sid): - cur = self.dbcur.execute(SQL_TMPO_LAST, (sid,)) + cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) row = cur.fetchone() if row is None: return None @@ -467,7 +491,7 @@ def _decompress_block(self, blk, ext): return jblk def _2epochs(self, time): - if isinstance(time, pd.tslib.Timestamp): + if isinstance(time, pd.Timestamp): return int(math.floor(time.value / 1e9)) elif isinstance(time, int): return time @@ -492,8 +516,7 @@ def _blk2series(self, ext, blk, head, tail): h = json.loads(m.group("h")) self._npdelta(pdsblk.index, h["head"][0]) self._npdelta(pdsblk, h["head"][1]) - # Use the built-in ix method to truncate - pdsblk_truncated = pdsblk.ix[head:tail] + pdsblk_truncated = pdsblk.loc[head:tail] return pdsblk_truncated def _npdelta(self, a, delta): From c5a26eb7249ca00da23b2297683e07a11a6aa482 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 11:49:18 +0200 Subject: [PATCH 02/23] put cert as file in package --- tmpo/.flukso.crt | 22 ++++++++++++++++++++++ tmpo/__init__.py | 31 +++---------------------------- 2 files changed, 25 insertions(+), 28 deletions(-) create mode 100644 tmpo/.flukso.crt diff --git a/tmpo/.flukso.crt b/tmpo/.flukso.crt new file mode 100644 index 0000000..b3c65ab --- /dev/null +++ b/tmpo/.flukso.crt @@ -0,0 +1,22 @@ + +-----BEGIN CERTIFICATE----- +MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV +BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN +MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G +A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 +49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 +SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw +Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u +t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV +12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G +A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap +N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz +bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w +DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 +42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P +j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l +MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o +cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg +TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= +-----END CERTIFICATE----- \ No newline at end of file diff --git a/tmpo/__init__.py b/tmpo/__init__.py index ab1e46c..ebbbc19 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -5,29 +5,6 @@ __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -FLUKSO_CRT = """ ------BEGIN CERTIFICATE----- -MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV -BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN -MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G -A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B -AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 -49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 -SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw -Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u -t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV -12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G -A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap -N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz -bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w -DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 -42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P -j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l -MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o -cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg -TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= ------END CERTIFICATE-----""" - SQL_SENSOR_TABLE = """ CREATE TABLE IF NOT EXISTS sensor( sid TEXT, @@ -125,7 +102,6 @@ import os import sys -import io import math import time import sqlite3 @@ -193,15 +169,14 @@ def __init__(self, path=None, workers=16): else: self.home = os.path.join(path, ".tmpo") self.db = os.path.join(self.home, "tmpo.sqlite3") - self.crt = os.path.join(self.home, "flukso.crt") + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") self.host = "api.flukso.net" try: os.mkdir(self.home) except OSError: # dir exists pass - else: - with io.open(self.crt, "wb") as f: - f.write(FLUKSO_CRT.encode("ascii")) self.rqs = requests_futures.sessions.FuturesSession( executor=concurrent.futures.ThreadPoolExecutor( max_workers=workers)) From b885991c7faf05403ed41ed62a061a36825b6faf Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 11:58:48 +0200 Subject: [PATCH 03/23] move code out of __init__.py --- tmpo/__init__.py | 559 +----------------------------------------- tmpo/sqlitesession.py | 558 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 559 insertions(+), 558 deletions(-) create mode 100644 tmpo/sqlitesession.py diff --git a/tmpo/__init__.py b/tmpo/__init__.py index ebbbc19..fca9c7f 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -5,561 +5,4 @@ __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -SQL_SENSOR_TABLE = """ - CREATE TABLE IF NOT EXISTS sensor( - sid TEXT, - token TEXT, - PRIMARY KEY(sid))""" - -SQL_SENSOR_INS = """ - INSERT INTO sensor - (sid, token) - VALUES (?, ?)""" - -SQL_SENSOR_DEL = """ - DELETE FROM sensor - WHERE sid = ?""" - -SQL_TMPO_DEL = """ - DELETE FROM tmpo - WHERE sid = ?""" - -SQL_SENSOR_ALL = """ - SELECT sid - FROM sensor - ORDER BY sid""" - -SQL_SENSOR_TOKEN = """ - SELECT token - FROM sensor - WHERE sid = ?""" - -SQL_TMPO_TABLE = """ - CREATE TABLE IF NOT EXISTS tmpo( - sid TEXT, - rid INTEGER, - lvl INTEGER, - bid INTEGER, - ext TEXT, - created REAL, - data BLOB, - PRIMARY KEY(sid, rid, lvl, bid))""" - -SQL_TMPO_INS = """ - INSERT INTO tmpo - (sid, rid, lvl, bid, ext, created, data) - VALUES (?, ?, ?, ?, ?, ?, ?)""" - -SQL_TMPO_CLEAN = """ - DELETE - FROM tmpo - WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" - -SQL_TMPO_ALL = """ - SELECT sid, rid, lvl, bid, ext, created, data - FROM tmpo - WHERE sid = ? - ORDER BY rid ASC, lvl DESC, bid ASC""" - -SQL_TMPO_LAST = """ - SELECT rid, lvl, bid, ext - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_LAST_DATA = """ - SELECT rid, lvl, bid, ext, data - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_FIRST = """ - SELECT rid, lvl, bid - FROM tmpo - WHERE sid = ? - ORDER BY created ASC, lvl ASC - LIMIT 1""" - -SQL_TMPO_RID_MAX = """ - SELECT MAX(rid) - FROM tmpo - WHERE sid = ?""" - -API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" -API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" - -HTTP_ACCEPT = { - "json": "application/json", - "gz": "application/gzip"} - -RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' -DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" -DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" -EPOCHS_MAX = 2147483647 - - -import os -import sys -import math -import time -import sqlite3 -import requests_futures.sessions -import concurrent.futures -import zlib -import re -import json -import numpy as np -import pandas as pd -from functools import wraps - - -def dbcon(func): - """Set up connection before executing function, commit and close connection - afterwards. Unless a connection already has been created.""" - @wraps(func) - def wrapper(*args, **kwargs): - self = args[0] - if self.dbcon is None: - # set up connection - self.dbcon = sqlite3.connect(self.db) - self.dbcur = self.dbcon.cursor() - self.dbcur.execute(SQL_SENSOR_TABLE) - self.dbcur.execute(SQL_TMPO_TABLE) - - # execute function - try: - result = func(*args, **kwargs) - except Exception as e: - # on exception, first close connection and then raise - self.dbcon.rollback() - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - raise e - else: - # commit everything and close connection - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - else: - result = func(*args, **kwargs) - return result - return wrapper - - -class Session(): - def __init__(self, path=None, workers=16): - """ - Parameters - ---------- - path : str, optional - location for the database - workers : int - default 16 - """ - self.debug = False - if path is None: - path = os.path.expanduser("~") # $HOME - if sys.platform == "win32": - self.home = os.path.join(path, "tmpo") - else: - self.home = os.path.join(path, ".tmpo") - self.db = os.path.join(self.home, "tmpo.sqlite3") - - package_dir = os.path.dirname(__file__) - self.crt = os.path.join(package_dir, ".flukso.crt") - self.host = "api.flukso.net" - try: - os.mkdir(self.home) - except OSError: # dir exists - pass - self.rqs = requests_futures.sessions.FuturesSession( - executor=concurrent.futures.ThreadPoolExecutor( - max_workers=workers)) - self.rqs.headers.update({"X-Version": "1.0"}) - self.dbcon = None - self.dbcur = None - - @dbcon - def add(self, sid, token): - """ - Add new sensor to the database - - Parameters - ---------- - sid : str - SensorId - token : str - """ - try: - self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) - except sqlite3.IntegrityError: # sensor entry exists - pass - - @dbcon - def remove(self, sid): - """ - Remove sensor from the database - - Parameters - ---------- - sid : str - SensorID - """ - self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def reset(self, sid): - """ - Removes all tmpo blocks for a given sensor, but keeps sensor table - intact, so sensor id and token remain in the database. - - Parameters - ---------- - sid : str - """ - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def sync(self, *sids): - """ - Synchronise data - - Parameters - ---------- - sids : list of str - SensorIDs to sync - Optional, leave empty to sync everything - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - for sid in sids: - self.dbcur.execute(SQL_TMPO_LAST, (sid,)) - last = self.dbcur.fetchone() - if last: - rid, lvl, bid, ext = last - self._clean(sid, rid, lvl, bid) - # prevent needless polling - if time.time() < bid + 256: - return - else: - rid, lvl, bid = 0, 0, 0 - self._req_sync(sid, rid, lvl, bid) - - @dbcon - def list(self, *sids): - """ - List all tmpo-blocks in the database - - Parameters - ---------- - sids : list of str - SensorID's for which to list blocks - Optional, leave empty to get them all - - Returns - ------- - list[list[tuple]] - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - slist = [] - for sid in sids: - tlist = [] - for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): - tlist.append(tmpo) - sid, rid, lvl, bid, ext, ctd, blk = tmpo - self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) - slist.append(tlist) - return slist - - @dbcon - def series(self, sid, recycle_id=None, head=None, tail=None, - datetime=True): - """ - Create data Series - - Parameters - ---------- - sid : str - recycle_id : optional - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.Series - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - if recycle_id is None: - self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) - recycle_id = self.dbcur.fetchone()[0] - tlist = self.list(sid)[0] - srlist = [] - for _sid, rid, lvl, bid, ext, ctd, blk in tlist: - if (recycle_id == rid - and head < self._blocktail(lvl, bid) - and tail >= bid): - srlist.append(self._blk2series(ext, blk, head, tail)) - if len(srlist) > 0: - ts = pd.concat(srlist) - ts.name = sid - if datetime is True: - ts.index = pd.to_datetime(ts.index, unit="s", utc=True) - return ts - else: - return pd.Series([], name=sid) - - @dbcon - def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): - """ - Create data frame - - Parameters - ---------- - sids : list[str] - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.DataFrame - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - series = [self.series(sid, head=head, tail=tail, datetime=False) - for sid in sids] - df = pd.concat(series, axis=1) - if datetime is True: - df.index = pd.to_datetime(df.index, unit="s", utc=True) - return df - - @dbcon - def first_timestamp(self, sid, epoch=False): - """ - Get the first available timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() - if first_block is None: - return None - - timestamp = first_block[2] - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - return timestamp - - def last_timestamp(self, sid, epoch=False): - """ - Get the theoretical last timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - timestamp, value = self.last_datapoint(sid, epoch) - return timestamp - - def last_datapoint(self, sid, epoch=False): - """ - Parameters - ---------- - sid : str - SensorId - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int, float - """ - block = self._last_block(sid) - if block is None: - return None, None - - header = block['h'] - timestamp, value = header['tail'] - - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - - return timestamp, value - - @dbcon - def _last_block(self, sid): - cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) - row = cur.fetchone() - if row is None: - return None - - rid, lvl, bid, ext, blk = row - - jblk = self._decompress_block(blk, ext) - data = json.loads(jblk.decode('UTF-8')) - return data - - def _decompress_block(self, blk, ext): - if ext != "gz": - raise NotImplementedError("Compression type not supported in tmpo") - jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding - return jblk - - def _2epochs(self, time): - if isinstance(time, pd.Timestamp): - return int(math.floor(time.value / 1e9)) - elif isinstance(time, int): - return time - else: - raise NotImplementedError("Time format not supported. " + - "Use epochs or a Pandas timestamp.") - - def _blk2series(self, ext, blk, head, tail): - jblk = self._decompress_block(blk, ext) - m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) - pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) - try: - pdsblk = pd.read_json( - pdjblk, - typ="series", - dtype="float", - orient="split", - numpy=True, - date_unit="s") - except: - return pd.Series() - h = json.loads(m.group("h")) - self._npdelta(pdsblk.index, h["head"][0]) - self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] - return pdsblk_truncated - - def _npdelta(self, a, delta): - """Numpy: Modifying Array Values - http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" - for x in np.nditer(a, op_flags=["readwrite"]): - delta += x - x[...] = delta - return a - - def _req_sync(self, sid, rid, lvl, bid): - self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) - token, = self.dbcur.fetchone() - headers = { - "Accept": HTTP_ACCEPT["json"], - "X-Token": token} - params = { - "rid": rid, - "lvl": lvl, - "bid": bid} - f = self.rqs.get( - API_TMPO_SYNC % (self.host, sid), - headers=headers, - params=params, - verify=self.crt) - r = f.result() - r.raise_for_status() - fs = [] - for t in r.json(): - fs.append((t, self._req_block( - sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) - for (t, f) in fs: - self._write_block( - f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) - - def _req_block(self, sid, token, rid, lvl, bid, ext): - headers = { - "Accept": HTTP_ACCEPT["gz"], - "X-Token": token} - f = self.rqs.get( - API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), - headers=headers, - verify=self.crt) - self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) - return f - - def _write_block(self, r, sid, rid, lvl, bid, ext): - blk = sqlite3.Binary(r.content) - now = time.time() - self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) - self._clean(sid, rid, lvl, bid) - self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) - - def _clean(self, sid, rid, lvl, bid): - if lvl == 8: - return - lastchild = self._lastchild(lvl, bid) - self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) - self._clean(sid, rid, lvl - 4, lastchild) - - def _lastchild(self, lvl, bid): - delta = math.trunc(2 ** (lvl - 4)) - return bid + 15 * delta - - def _blocktail(self, lvl, bid): - delta = math.trunc(2 ** lvl) - return bid + delta - - def _dprintf(self, fmt, *args): - if self.debug: - print(fmt % args) +from .sqlitesession import SQLiteSession as Session \ No newline at end of file diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py new file mode 100644 index 0000000..5a682f9 --- /dev/null +++ b/tmpo/sqlitesession.py @@ -0,0 +1,558 @@ +SQL_SENSOR_TABLE = """ + CREATE TABLE IF NOT EXISTS sensor( + sid TEXT, + token TEXT, + PRIMARY KEY(sid))""" + +SQL_SENSOR_INS = """ + INSERT INTO sensor + (sid, token) + VALUES (?, ?)""" + +SQL_SENSOR_DEL = """ + DELETE FROM sensor + WHERE sid = ?""" + +SQL_TMPO_DEL = """ + DELETE FROM tmpo + WHERE sid = ?""" + +SQL_SENSOR_ALL = """ + SELECT sid + FROM sensor + ORDER BY sid""" + +SQL_SENSOR_TOKEN = """ + SELECT token + FROM sensor + WHERE sid = ?""" + +SQL_TMPO_TABLE = """ + CREATE TABLE IF NOT EXISTS tmpo( + sid TEXT, + rid INTEGER, + lvl INTEGER, + bid INTEGER, + ext TEXT, + created REAL, + data BLOB, + PRIMARY KEY(sid, rid, lvl, bid))""" + +SQL_TMPO_INS = """ + INSERT INTO tmpo + (sid, rid, lvl, bid, ext, created, data) + VALUES (?, ?, ?, ?, ?, ?, ?)""" + +SQL_TMPO_CLEAN = """ + DELETE + FROM tmpo + WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" + +SQL_TMPO_ALL = """ + SELECT sid, rid, lvl, bid, ext, created, data + FROM tmpo + WHERE sid = ? + ORDER BY rid ASC, lvl DESC, bid ASC""" + +SQL_TMPO_LAST = """ + SELECT rid, lvl, bid, ext + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_LAST_DATA = """ + SELECT rid, lvl, bid, ext, data + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_FIRST = """ + SELECT rid, lvl, bid + FROM tmpo + WHERE sid = ? + ORDER BY created ASC, lvl ASC + LIMIT 1""" + +SQL_TMPO_RID_MAX = """ + SELECT MAX(rid) + FROM tmpo + WHERE sid = ?""" + +API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" +API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" + +HTTP_ACCEPT = { + "json": "application/json", + "gz": "application/gzip"} + +RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' +DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" +DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" +EPOCHS_MAX = 2147483647 + + +import os +import sys +import math +import time +import sqlite3 +import requests_futures.sessions +import concurrent.futures +import zlib +import re +import json +import numpy as np +import pandas as pd +from functools import wraps + + +def dbcon(func): + """Set up connection before executing function, commit and close connection + afterwards. Unless a connection already has been created.""" + @wraps(func) + def wrapper(*args, **kwargs): + self = args[0] + if self.dbcon is None: + # set up connection + self.dbcon = sqlite3.connect(self.db) + self.dbcur = self.dbcon.cursor() + self.dbcur.execute(SQL_SENSOR_TABLE) + self.dbcur.execute(SQL_TMPO_TABLE) + + # execute function + try: + result = func(*args, **kwargs) + except Exception as e: + # on exception, first close connection and then raise + self.dbcon.rollback() + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + raise e + else: + # commit everything and close connection + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + else: + result = func(*args, **kwargs) + return result + return wrapper + + +class SQLiteSession(): + def __init__(self, path=None, workers=16): + """ + Parameters + ---------- + path : str, optional + location for the database + workers : int + default 16 + """ + self.debug = False + if path is None: + path = os.path.expanduser("~") # $HOME + if sys.platform == "win32": + self.home = os.path.join(path, "tmpo") + else: + self.home = os.path.join(path, ".tmpo") + self.db = os.path.join(self.home, "tmpo.sqlite3") + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") + self.host = "api.flukso.net" + try: + os.mkdir(self.home) + except OSError: # dir exists + pass + self.rqs = requests_futures.sessions.FuturesSession( + executor=concurrent.futures.ThreadPoolExecutor( + max_workers=workers)) + self.rqs.headers.update({"X-Version": "1.0"}) + self.dbcon = None + self.dbcur = None + + @dbcon + def add(self, sid, token): + """ + Add new sensor to the database + + Parameters + ---------- + sid : str + SensorId + token : str + """ + try: + self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) + except sqlite3.IntegrityError: # sensor entry exists + pass + + @dbcon + def remove(self, sid): + """ + Remove sensor from the database + + Parameters + ---------- + sid : str + SensorID + """ + self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def reset(self, sid): + """ + Removes all tmpo blocks for a given sensor, but keeps sensor table + intact, so sensor id and token remain in the database. + + Parameters + ---------- + sid : str + """ + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def sync(self, *sids): + """ + Synchronise data + + Parameters + ---------- + sids : list of str + SensorIDs to sync + Optional, leave empty to sync everything + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + for sid in sids: + self.dbcur.execute(SQL_TMPO_LAST, (sid,)) + last = self.dbcur.fetchone() + if last: + rid, lvl, bid, ext = last + self._clean(sid, rid, lvl, bid) + # prevent needless polling + if time.time() < bid + 256: + return + else: + rid, lvl, bid = 0, 0, 0 + self._req_sync(sid, rid, lvl, bid) + + @dbcon + def list(self, *sids): + """ + List all tmpo-blocks in the database + + Parameters + ---------- + sids : list of str + SensorID's for which to list blocks + Optional, leave empty to get them all + + Returns + ------- + list[list[tuple]] + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + slist = [] + for sid in sids: + tlist = [] + for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): + tlist.append(tmpo) + sid, rid, lvl, bid, ext, ctd, blk = tmpo + self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) + slist.append(tlist) + return slist + + @dbcon + def series(self, sid, recycle_id=None, head=None, tail=None, + datetime=True): + """ + Create data Series + + Parameters + ---------- + sid : str + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pandas.Series + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + if recycle_id is None: + self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) + recycle_id = self.dbcur.fetchone()[0] + tlist = self.list(sid)[0] + srlist = [] + for _sid, rid, lvl, bid, ext, ctd, blk in tlist: + if (recycle_id == rid + and head < self._blocktail(lvl, bid) + and tail >= bid): + srlist.append(self._blk2series(ext, blk, head, tail)) + if len(srlist) > 0: + ts = pd.concat(srlist) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit="s", utc=True) + return ts + else: + return pd.Series([], name=sid) + + @dbcon + def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): + """ + Create data frame + + Parameters + ---------- + sids : list[str] + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pandas.DataFrame + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + series = [self.series(sid, head=head, tail=tail, datetime=False) + for sid in sids] + df = pd.concat(series, axis=1) + if datetime is True: + df.index = pd.to_datetime(df.index, unit="s", utc=True) + return df + + @dbcon + def first_timestamp(self, sid, epoch=False): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() + if first_block is None: + return None + + timestamp = first_block[2] + if not epoch: + timestamp = pd.Timestamp.utcfromtimestamp(timestamp) + timestamp = timestamp.tz_localize('UTC') + return timestamp + + def last_timestamp(self, sid, epoch=False): + """ + Get the theoretical last timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + timestamp, value = self.last_datapoint(sid, epoch) + return timestamp + + def last_datapoint(self, sid, epoch=False): + """ + Parameters + ---------- + sid : str + SensorId + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int, float + """ + block = self._last_block(sid) + if block is None: + return None, None + + header = block['h'] + timestamp, value = header['tail'] + + if not epoch: + timestamp = pd.Timestamp.utcfromtimestamp(timestamp) + timestamp = timestamp.tz_localize('UTC') + + return timestamp, value + + @dbcon + def _last_block(self, sid): + cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) + row = cur.fetchone() + if row is None: + return None + + rid, lvl, bid, ext, blk = row + + jblk = self._decompress_block(blk, ext) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _decompress_block(self, blk, ext): + if ext != "gz": + raise NotImplementedError("Compression type not supported in tmpo") + jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding + return jblk + + def _2epochs(self, time): + if isinstance(time, pd.Timestamp): + return int(math.floor(time.value / 1e9)) + elif isinstance(time, int): + return time + else: + raise NotImplementedError("Time format not supported. " + + "Use epochs or a Pandas timestamp.") + + def _blk2series(self, ext, blk, head, tail): + jblk = self._decompress_block(blk, ext) + m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) + try: + pdsblk = pd.read_json( + pdjblk, + typ="series", + dtype="float", + orient="split", + numpy=True, + date_unit="s") + except: + return pd.Series() + h = json.loads(m.group("h")) + self._npdelta(pdsblk.index, h["head"][0]) + self._npdelta(pdsblk, h["head"][1]) + pdsblk_truncated = pdsblk.loc[head:tail] + return pdsblk_truncated + + def _npdelta(self, a, delta): + """Numpy: Modifying Array Values + http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" + for x in np.nditer(a, op_flags=["readwrite"]): + delta += x + x[...] = delta + return a + + def _req_sync(self, sid, rid, lvl, bid): + self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) + token, = self.dbcur.fetchone() + headers = { + "Accept": HTTP_ACCEPT["json"], + "X-Token": token} + params = { + "rid": rid, + "lvl": lvl, + "bid": bid} + f = self.rqs.get( + API_TMPO_SYNC % (self.host, sid), + headers=headers, + params=params, + verify=self.crt) + r = f.result() + r.raise_for_status() + fs = [] + for t in r.json(): + fs.append((t, self._req_block( + sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) + for (t, f) in fs: + self._write_block( + f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) + + def _req_block(self, sid, token, rid, lvl, bid, ext): + headers = { + "Accept": HTTP_ACCEPT["gz"], + "X-Token": token} + f = self.rqs.get( + API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), + headers=headers, + verify=self.crt) + self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) + return f + + def _write_block(self, r, sid, rid, lvl, bid, ext): + blk = sqlite3.Binary(r.content) + now = time.time() + self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) + self._clean(sid, rid, lvl, bid) + self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) + + def _clean(self, sid, rid, lvl, bid): + if lvl == 8: + return + lastchild = self._lastchild(lvl, bid) + self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) + self._clean(sid, rid, lvl - 4, lastchild) + + def _lastchild(self, lvl, bid): + delta = math.trunc(2 ** (lvl - 4)) + return bid + 15 * delta + + def _blocktail(self, lvl, bid): + delta = math.trunc(2 ** lvl) + return bid + delta + + def _dprintf(self, fmt, *args): + if self.debug: + print(fmt % args) From 0213bc60afa22d15d1809a428e18c43fd1f1da10 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 18:02:34 +0200 Subject: [PATCH 04/23] split apisession - sqlitesession --- tmpo/__init__.py | 5 +- tmpo/apisession.py | 387 ++++++++++++++++++++++++++++++++++++++++++ tmpo/sqlitesession.py | 193 ++++----------------- 3 files changed, 423 insertions(+), 162 deletions(-) create mode 100644 tmpo/apisession.py diff --git a/tmpo/__init__.py b/tmpo/__init__.py index fca9c7f..6033548 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,8 +1,9 @@ __title__ = "tmpo" -__version__ = "0.2.10" +__version__ = "0.3.0" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -from .sqlitesession import SQLiteSession as Session \ No newline at end of file +from .sqlitesession import SQLiteSession as Session +from .apisession import APISession diff --git a/tmpo/apisession.py b/tmpo/apisession.py new file mode 100644 index 0000000..09440aa --- /dev/null +++ b/tmpo/apisession.py @@ -0,0 +1,387 @@ +import os +import requests_futures.sessions +import concurrent.futures +import time +import math +import pandas as pd +import re +import zlib +import json +import numpy as np + +HTTP_ACCEPT = { + "json": "application/json", + "gz": "application/gzip"} + +API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" +API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" + +DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" +RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' +EPOCHS_MAX = 2147483647 + + +class APISession: + """ + Get Flukso Data (aka. TMPO Blocks) directly from the Flukso API + """ + def __init__(self, workers=16): + """ + Parameters + ---------- + workers : int + default 16 + """ + self.debug = False + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") + self.host = "api.flukso.net" + + self.rqs = requests_futures.sessions.FuturesSession( + executor=concurrent.futures.ThreadPoolExecutor( + max_workers=workers)) + self.rqs.headers.update({"X-Version": "1.0"}) + + def series(self, sid, token, recycle_id=None, head=None, tail=None, + datetime=True): + """ + Create data Series + + Parameters + ---------- + sid : str + token : str + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.Series + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + blist = self._req_blocklist(sid=sid, token=token, rid=recycle_id) + blist = self._slice_blist(blist=blist, head=head, tail=tail) + blocks_futures = self._req_blocks(sid=sid, token=token, blist=blist) + + def parse_blocks(): + for h, future in blocks_futures: + r = future.result() + r.raise_for_status() + _ts = self._blk2series(ext=h['ext'], blk=r.content, + head=head, tail=tail) + yield _ts + + blocks = parse_blocks() + ts = pd.concat(blocks) + if ts.empty: + return pd.Series([], name=sid) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit='s', utc=True) + return ts + + def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): + """ + Create data frame + + Parameters + ---------- + sids : [(str, str)] + List of sensor, token tuples: [(sid, token), ...] + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.DataFrame + """ + series = (self.series(sid, token=token, head=head, tail=tail, datetime=False) + for sid, token in sids) + df = pd.concat(series, axis=1) + if datetime is True: + df.index = pd.to_datetime(df.index, unit="s", utc=True) + return df + + def first_timestamp(self, sid, token, epoch=False): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + blist = self._req_blocklist(sid=sid, token=token) + first = blist[0]['bid'] + if not epoch: + first = self._epoch2timestamp(first) + return first + + def last_timestamp(self, sid, token, epoch=False): + """ + Get the last timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + token : str + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + last, _v = self.last_datapoint(sid=sid, token=token, epoch=epoch) + return last + + def last_datapoint(self, sid, token, epoch=False): + """ + Parameters + ---------- + sid : str + SensorId + token : str + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + (pd.Timestamp, float) or (int, float) + """ + block = self._last_block(sid=sid, token=token) + if block is None: + return None, None + + header = block['h'] + timestamp, value = header['tail'] + + if not epoch: + timestamp = self._epoch2timestamp(timestamp) + + return timestamp, value + + def _last_block(self, sid, token): + blist = self._req_blocklist(sid=sid, token=token) + last = blist[-1] + bf = self._req_block(sid=sid, token=token, rid=last['rid'], + lvl=last['lvl'], bid=last['bid'], ext=last['ext']) + r = bf.result() + r.raise_for_status() + + jblk = self._decompress_block(blk=r.content, ext=last['ext']) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _epoch2timestamp(self, epoch): + timestamp = pd.Timestamp.utcfromtimestamp(epoch) + timestamp = timestamp.tz_localize('UTC') + return timestamp + + def _2epochs(self, time): + if isinstance(time, pd.Timestamp): + return int(math.floor(time.value / 1e9)) + elif isinstance(time, int): + return time + else: + raise NotImplementedError("Time format not supported. " + + "Use epochs or a Pandas timestamp.") + + def _blk2series(self, ext, blk, head, tail): + jblk = self._decompress_block(blk, ext) + m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) + try: + pdsblk = pd.read_json( + pdjblk, + typ="series", + dtype="float", + orient="split", + numpy=True, + date_unit="s") + except: + return pd.Series() + h = json.loads(m.group("h")) + self._npdelta(pdsblk.index, h["head"][0]) + self._npdelta(pdsblk, h["head"][1]) + pdsblk_truncated = pdsblk.loc[head:tail] + return pdsblk_truncated + + def _decompress_block(self, blk, ext): + if ext != "gz": + raise NotImplementedError("Compression type not supported in tmpo") + jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding + return jblk + + def _req_blocklist(self, sid, token, rid=0, lvl=0, bid=0): + """ + Request the list of available blocks from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + + Returns + ------- + [dict] + """ + headers = { + "Accept": HTTP_ACCEPT["json"], + "X-Token": token} + params = { + "rid": rid if rid else 0, + "lvl": lvl if rid else 0, + "bid": bid if rid else 0} + f = self.rqs.get( + API_TMPO_SYNC % ("api.flukso.net", sid), + headers=headers, + params=params, + verify=self.crt + ) + r = f.result() + r.raise_for_status() + j = r.json() + blist = sorted(j, key=lambda x: x['bid']) + return blist + + def _req_block(self, sid, token, rid, lvl, bid, ext): + """ + Request a block (as a request future) from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + ext : str + + Returns + ------- + Response + """ + headers = { + "Accept": HTTP_ACCEPT["gz"], + "X-Token": token} + f = self.rqs.get( + API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), + headers=headers, + verify=self.crt) + self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) + return f + + def _req_blocks(self, sid, token, blist): + """ + Request multiple blocks (as a list of request futures) + + Parameters + ---------- + sid : str + token : str + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + + Returns + ------- + (dict, Response) + """ + fs = [] + for t in blist: + fs.append( + (t, self._req_block(sid=sid, token=token, rid=t['rid'], + lvl=t['lvl'], bid=t['bid'], ext=t['ext']))) + return fs + + @staticmethod + def _npdelta(a, delta): + """Numpy: Modifying Array Values + http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" + for x in np.nditer(a, op_flags=["readwrite"]): + delta += x + x[...] = delta + return a + + def _lastchild(self, lvl, bid): + delta = math.trunc(2 ** (lvl - 4)) + return bid + 15 * delta + + def _blocktail(self, lvl, bid): + delta = math.trunc(2 ** lvl) + return bid + delta + + def _dprintf(self, fmt, *args): + if self.debug: + print(fmt % args) + + def _slice_blist(self, blist, head, tail): + """ + Slice a blist (block headers) so it contains all blocks with data + between head and tail + + Parameters + ---------- + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + head : int + epochs + tail : int + epochs + + Returns + ------- + [dict] + """ + ret = [] + for b in blist: + if head < self._blocktail(lvl=b['lvl'], bid=b['bid']) and tail >= b['bid']: + ret.append(b) + return ret diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py index 5a682f9..35ffc56 100644 --- a/tmpo/sqlitesession.py +++ b/tmpo/sqlitesession.py @@ -80,32 +80,17 @@ FROM tmpo WHERE sid = ?""" -API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" -API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" - -HTTP_ACCEPT = { - "json": "application/json", - "gz": "application/gzip"} - -RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' -DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" -EPOCHS_MAX = 2147483647 import os import sys -import math import time import sqlite3 -import requests_futures.sessions -import concurrent.futures -import zlib -import re import json -import numpy as np import pandas as pd from functools import wraps +from .apisession import APISession, EPOCHS_MAX def dbcon(func): @@ -144,17 +129,18 @@ def wrapper(*args, **kwargs): return wrapper -class SQLiteSession(): +class SQLiteSession(APISession): def __init__(self, path=None, workers=16): """ Parameters ---------- path : str, optional - location for the database + location for the sqlite database workers : int default 16 """ - self.debug = False + super(SQLiteSession, self).__init__(workers=workers) + if path is None: path = os.path.expanduser("~") # $HOME if sys.platform == "win32": @@ -163,17 +149,11 @@ def __init__(self, path=None, workers=16): self.home = os.path.join(path, ".tmpo") self.db = os.path.join(self.home, "tmpo.sqlite3") - package_dir = os.path.dirname(__file__) - self.crt = os.path.join(package_dir, ".flukso.crt") - self.host = "api.flukso.net" try: os.mkdir(self.home) except OSError: # dir exists pass - self.rqs = requests_futures.sessions.FuturesSession( - executor=concurrent.futures.ThreadPoolExecutor( - max_workers=workers)) - self.rqs.headers.update({"X-Version": "1.0"}) + self.dbcon = None self.dbcur = None @@ -273,7 +253,7 @@ def list(self, *sids): @dbcon def series(self, sid, recycle_id=None, head=None, tail=None, - datetime=True): + datetime=True, **kwargs): """ Create data Series @@ -324,14 +304,13 @@ def series(self, sid, recycle_id=None, head=None, tail=None, else: return pd.Series([], name=sid) - @dbcon def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): """ Create data frame Parameters ---------- - sids : list[str] + sids : [str] head : int | pandas.Timestamp, optional Start of the interval default earliest available @@ -344,27 +323,15 @@ def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): Returns ------- - pandas.DataFrame + pd.DataFrame """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - series = [self.series(sid, head=head, tail=tail, datetime=False) - for sid in sids] - df = pd.concat(series, axis=1) - if datetime is True: - df.index = pd.to_datetime(df.index, unit="s", utc=True) + sids = [(sid, None) for sid in sids] + df = super(SQLiteSession, self).dataframe(sids=sids, head=head, + tail=tail, datetime=datetime) return df @dbcon - def first_timestamp(self, sid, epoch=False): + def first_timestamp(self, sid, epoch=False, **kwargs): """ Get the first available timestamp for a sensor @@ -387,60 +354,47 @@ def first_timestamp(self, sid, epoch=False): timestamp = first_block[2] if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') + timestamp = self._epoch2timestamp(timestamp) return timestamp - def last_timestamp(self, sid, epoch=False): + def last_timestamp(self, sid, epoch=False, **kwargs): """ - Get the theoretical last timestamp for a sensor + Get the last timestamp for a sensor Parameters ---------- sid : str - SensorID epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp + kwargs : dict Returns ------- - pd.Timestamp | int + """ - timestamp, value = self.last_datapoint(sid, epoch) - return timestamp + last = super(SQLiteSession, self).last_timestamp( + sid=sid, token=kwargs.get('token'), epoch=epoch) + return last - def last_datapoint(self, sid, epoch=False): + def last_datapoint(self, sid, epoch=False, **kwargs): """ + Get the last datapoint for a sensor + Parameters ---------- sid : str - SensorId epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp + kwargs : dict Returns ------- - pd.Timestamp | int, float + (pd.Timestamp, float) or (int, float) """ - block = self._last_block(sid) - if block is None: - return None, None - - header = block['h'] - timestamp, value = header['tail'] - - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - - return timestamp, value + dp = super(SQLiteSession, self).last_datapoint( + sid=sid, token=kwargs.get('token'), epoch=epoch) + return dp @dbcon - def _last_block(self, sid): + def _last_block(self, sid, **kwargs): cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) row = cur.fetchone() if row is None: @@ -452,85 +406,16 @@ def _last_block(self, sid): data = json.loads(jblk.decode('UTF-8')) return data - def _decompress_block(self, blk, ext): - if ext != "gz": - raise NotImplementedError("Compression type not supported in tmpo") - jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding - return jblk - - def _2epochs(self, time): - if isinstance(time, pd.Timestamp): - return int(math.floor(time.value / 1e9)) - elif isinstance(time, int): - return time - else: - raise NotImplementedError("Time format not supported. " + - "Use epochs or a Pandas timestamp.") - - def _blk2series(self, ext, blk, head, tail): - jblk = self._decompress_block(blk, ext) - m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) - pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) - try: - pdsblk = pd.read_json( - pdjblk, - typ="series", - dtype="float", - orient="split", - numpy=True, - date_unit="s") - except: - return pd.Series() - h = json.loads(m.group("h")) - self._npdelta(pdsblk.index, h["head"][0]) - self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] - return pdsblk_truncated - - def _npdelta(self, a, delta): - """Numpy: Modifying Array Values - http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" - for x in np.nditer(a, op_flags=["readwrite"]): - delta += x - x[...] = delta - return a - def _req_sync(self, sid, rid, lvl, bid): self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) token, = self.dbcur.fetchone() - headers = { - "Accept": HTTP_ACCEPT["json"], - "X-Token": token} - params = { - "rid": rid, - "lvl": lvl, - "bid": bid} - f = self.rqs.get( - API_TMPO_SYNC % (self.host, sid), - headers=headers, - params=params, - verify=self.crt) - r = f.result() - r.raise_for_status() - fs = [] - for t in r.json(): - fs.append((t, self._req_block( - sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) + blist = self._req_blocklist(sid=sid, token=token, rid=rid, lvl=lvl, + bid=bid) + fs = self._req_blocks(sid=sid, token=token, blist=blist) for (t, f) in fs: self._write_block( f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) - def _req_block(self, sid, token, rid, lvl, bid, ext): - headers = { - "Accept": HTTP_ACCEPT["gz"], - "X-Token": token} - f = self.rqs.get( - API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), - headers=headers, - verify=self.crt) - self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) - return f - def _write_block(self, r, sid, rid, lvl, bid, ext): blk = sqlite3.Binary(r.content) now = time.time() @@ -544,15 +429,3 @@ def _clean(self, sid, rid, lvl, bid): lastchild = self._lastchild(lvl, bid) self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) self._clean(sid, rid, lvl - 4, lastchild) - - def _lastchild(self, lvl, bid): - delta = math.trunc(2 ** (lvl - 4)) - return bid + 15 * delta - - def _blocktail(self, lvl, bid): - delta = math.trunc(2 ** lvl) - return bid + delta - - def _dprintf(self, fmt, *args): - if self.debug: - print(fmt % args) From f854517a29973e0bd23fc1e6bcb88209b1db862d Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 11:49:18 +0200 Subject: [PATCH 05/23] put cert as file in package --- tmpo/.flukso.crt | 22 ++++++++++++++++++++++ tmpo/__init__.py | 31 +++---------------------------- 2 files changed, 25 insertions(+), 28 deletions(-) create mode 100644 tmpo/.flukso.crt diff --git a/tmpo/.flukso.crt b/tmpo/.flukso.crt new file mode 100644 index 0000000..b3c65ab --- /dev/null +++ b/tmpo/.flukso.crt @@ -0,0 +1,22 @@ + +-----BEGIN CERTIFICATE----- +MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV +BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN +MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G +A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 +49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 +SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw +Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u +t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV +12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G +A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap +N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz +bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w +DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 +42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P +j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l +MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o +cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg +TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= +-----END CERTIFICATE----- \ No newline at end of file diff --git a/tmpo/__init__.py b/tmpo/__init__.py index ab1e46c..ebbbc19 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -5,29 +5,6 @@ __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -FLUKSO_CRT = """ ------BEGIN CERTIFICATE----- -MIIDfzCCAmegAwIBAgIJANYOkpI6yVcFMA0GCSqGSIb3DQEBBQUAMDMxCzAJBgNV -BAYTAkJFMQ8wDQYDVQQKEwZGbHVrc28xEzARBgNVBAMTCmZsdWtzby5uZXQwHhcN -MTAwNjAxMjE1ODAyWhcNMzUwNTI2MjE1ODAyWjAzMQswCQYDVQQGEwJCRTEPMA0G -A1UEChMGRmx1a3NvMRMwEQYDVQQDEwpmbHVrc28ubmV0MIIBIjANBgkqhkiG9w0B -AQEFAAOCAQ8AMIIBCgKCAQEA6CtNI3YrF/7Ak3etIe+XnL4HwJYki4PyaWI4S7W1 -49C9W5AEbEd7ufnsaku3eVxMqOP6b5L7MFpCCGDiM1Zt32yYAcL65eCrofZw1DE0 -SuWos0Z1P4y2rIUFHya8g8bUh7lUvq30IBgnnUh7Lo0eQT1XfnC/KMUnvseHI/iw -Y3HhYX+espsCPh1a0ATLlEk93XK99q/5mgojSGQxmwPj/91mOWmJOO4edEQAhK+u -t6wCNxZNnf9yyyzzLczwMytfrwBWJEJjJFTfr3JiEmHdl4dt7UiuElGLMr9dFhPV -12Bidxszov663ffUiIUmV/fkMWF1ZEWXFS0x+VJ52seChwIDAQABo4GVMIGSMB0G -A1UdDgQWBBQGMvERFrapN1lmOm9SVR8qB+uj/zBjBgNVHSMEXDBagBQGMvERFrap -N1lmOm9SVR8qB+uj/6E3pDUwMzELMAkGA1UEBhMCQkUxDzANBgNVBAoTBkZsdWtz -bzETMBEGA1UEAxMKZmx1a3NvLm5ldIIJANYOkpI6yVcFMAwGA1UdEwQFMAMBAf8w -DQYJKoZIhvcNAQEFBQADggEBAOZjgNoNhJLckVMEYZiYWqRDWeRPBkyGStCH93r3 -42PpuKDyysxI1ldLTcUpUSrs1AtdSIEiEahWr6zVW4QW4o9iqO905E03aTO86L+P -j7SIBPP01M2f70pHpnz+uH1MDxsarI96qllslWfymYI7c6yUN/VciWfNWa38nK1l -MiQJuDvElNy8aN1JJtXHFUQK/I8ois1ATT1rGAiqrkDZIm4pdDmqB/zLI3qIJf8o -cKIo2x/YkVhuDmIpU/XVA13csXrXU+CLfFyNdY1a/6Dhv2B4wG6J5RGuxWmA+Igg -TTysD+aqqzs8XstqDu/aLjMzFKMaXNvDoCbdFQGVXfx0F1A= ------END CERTIFICATE-----""" - SQL_SENSOR_TABLE = """ CREATE TABLE IF NOT EXISTS sensor( sid TEXT, @@ -125,7 +102,6 @@ import os import sys -import io import math import time import sqlite3 @@ -193,15 +169,14 @@ def __init__(self, path=None, workers=16): else: self.home = os.path.join(path, ".tmpo") self.db = os.path.join(self.home, "tmpo.sqlite3") - self.crt = os.path.join(self.home, "flukso.crt") + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") self.host = "api.flukso.net" try: os.mkdir(self.home) except OSError: # dir exists pass - else: - with io.open(self.crt, "wb") as f: - f.write(FLUKSO_CRT.encode("ascii")) self.rqs = requests_futures.sessions.FuturesSession( executor=concurrent.futures.ThreadPoolExecutor( max_workers=workers)) From 7a030be41e112b6c69bf52ac98a4d3c4557f5fcf Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 11:58:48 +0200 Subject: [PATCH 06/23] move code out of __init__.py --- tmpo/__init__.py | 559 +----------------------------------------- tmpo/sqlitesession.py | 558 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 559 insertions(+), 558 deletions(-) create mode 100644 tmpo/sqlitesession.py diff --git a/tmpo/__init__.py b/tmpo/__init__.py index ebbbc19..fca9c7f 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -5,561 +5,4 @@ __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -SQL_SENSOR_TABLE = """ - CREATE TABLE IF NOT EXISTS sensor( - sid TEXT, - token TEXT, - PRIMARY KEY(sid))""" - -SQL_SENSOR_INS = """ - INSERT INTO sensor - (sid, token) - VALUES (?, ?)""" - -SQL_SENSOR_DEL = """ - DELETE FROM sensor - WHERE sid = ?""" - -SQL_TMPO_DEL = """ - DELETE FROM tmpo - WHERE sid = ?""" - -SQL_SENSOR_ALL = """ - SELECT sid - FROM sensor - ORDER BY sid""" - -SQL_SENSOR_TOKEN = """ - SELECT token - FROM sensor - WHERE sid = ?""" - -SQL_TMPO_TABLE = """ - CREATE TABLE IF NOT EXISTS tmpo( - sid TEXT, - rid INTEGER, - lvl INTEGER, - bid INTEGER, - ext TEXT, - created REAL, - data BLOB, - PRIMARY KEY(sid, rid, lvl, bid))""" - -SQL_TMPO_INS = """ - INSERT INTO tmpo - (sid, rid, lvl, bid, ext, created, data) - VALUES (?, ?, ?, ?, ?, ?, ?)""" - -SQL_TMPO_CLEAN = """ - DELETE - FROM tmpo - WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" - -SQL_TMPO_ALL = """ - SELECT sid, rid, lvl, bid, ext, created, data - FROM tmpo - WHERE sid = ? - ORDER BY rid ASC, lvl DESC, bid ASC""" - -SQL_TMPO_LAST = """ - SELECT rid, lvl, bid, ext - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_LAST_DATA = """ - SELECT rid, lvl, bid, ext, data - FROM tmpo - WHERE sid = ? - ORDER BY created DESC, lvl DESC - LIMIT 1""" - -SQL_TMPO_FIRST = """ - SELECT rid, lvl, bid - FROM tmpo - WHERE sid = ? - ORDER BY created ASC, lvl ASC - LIMIT 1""" - -SQL_TMPO_RID_MAX = """ - SELECT MAX(rid) - FROM tmpo - WHERE sid = ?""" - -API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" -API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" - -HTTP_ACCEPT = { - "json": "application/json", - "gz": "application/gzip"} - -RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' -DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" -DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" -EPOCHS_MAX = 2147483647 - - -import os -import sys -import math -import time -import sqlite3 -import requests_futures.sessions -import concurrent.futures -import zlib -import re -import json -import numpy as np -import pandas as pd -from functools import wraps - - -def dbcon(func): - """Set up connection before executing function, commit and close connection - afterwards. Unless a connection already has been created.""" - @wraps(func) - def wrapper(*args, **kwargs): - self = args[0] - if self.dbcon is None: - # set up connection - self.dbcon = sqlite3.connect(self.db) - self.dbcur = self.dbcon.cursor() - self.dbcur.execute(SQL_SENSOR_TABLE) - self.dbcur.execute(SQL_TMPO_TABLE) - - # execute function - try: - result = func(*args, **kwargs) - except Exception as e: - # on exception, first close connection and then raise - self.dbcon.rollback() - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - raise e - else: - # commit everything and close connection - self.dbcon.commit() - self.dbcon.close() - self.dbcon = None - self.dbcur = None - else: - result = func(*args, **kwargs) - return result - return wrapper - - -class Session(): - def __init__(self, path=None, workers=16): - """ - Parameters - ---------- - path : str, optional - location for the database - workers : int - default 16 - """ - self.debug = False - if path is None: - path = os.path.expanduser("~") # $HOME - if sys.platform == "win32": - self.home = os.path.join(path, "tmpo") - else: - self.home = os.path.join(path, ".tmpo") - self.db = os.path.join(self.home, "tmpo.sqlite3") - - package_dir = os.path.dirname(__file__) - self.crt = os.path.join(package_dir, ".flukso.crt") - self.host = "api.flukso.net" - try: - os.mkdir(self.home) - except OSError: # dir exists - pass - self.rqs = requests_futures.sessions.FuturesSession( - executor=concurrent.futures.ThreadPoolExecutor( - max_workers=workers)) - self.rqs.headers.update({"X-Version": "1.0"}) - self.dbcon = None - self.dbcur = None - - @dbcon - def add(self, sid, token): - """ - Add new sensor to the database - - Parameters - ---------- - sid : str - SensorId - token : str - """ - try: - self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) - except sqlite3.IntegrityError: # sensor entry exists - pass - - @dbcon - def remove(self, sid): - """ - Remove sensor from the database - - Parameters - ---------- - sid : str - SensorID - """ - self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def reset(self, sid): - """ - Removes all tmpo blocks for a given sensor, but keeps sensor table - intact, so sensor id and token remain in the database. - - Parameters - ---------- - sid : str - """ - self.dbcur.execute(SQL_TMPO_DEL, (sid,)) - - @dbcon - def sync(self, *sids): - """ - Synchronise data - - Parameters - ---------- - sids : list of str - SensorIDs to sync - Optional, leave empty to sync everything - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - for sid in sids: - self.dbcur.execute(SQL_TMPO_LAST, (sid,)) - last = self.dbcur.fetchone() - if last: - rid, lvl, bid, ext = last - self._clean(sid, rid, lvl, bid) - # prevent needless polling - if time.time() < bid + 256: - return - else: - rid, lvl, bid = 0, 0, 0 - self._req_sync(sid, rid, lvl, bid) - - @dbcon - def list(self, *sids): - """ - List all tmpo-blocks in the database - - Parameters - ---------- - sids : list of str - SensorID's for which to list blocks - Optional, leave empty to get them all - - Returns - ------- - list[list[tuple]] - """ - if sids == (): - sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] - slist = [] - for sid in sids: - tlist = [] - for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): - tlist.append(tmpo) - sid, rid, lvl, bid, ext, ctd, blk = tmpo - self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) - slist.append(tlist) - return slist - - @dbcon - def series(self, sid, recycle_id=None, head=None, tail=None, - datetime=True): - """ - Create data Series - - Parameters - ---------- - sid : str - recycle_id : optional - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.Series - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - if recycle_id is None: - self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) - recycle_id = self.dbcur.fetchone()[0] - tlist = self.list(sid)[0] - srlist = [] - for _sid, rid, lvl, bid, ext, ctd, blk in tlist: - if (recycle_id == rid - and head < self._blocktail(lvl, bid) - and tail >= bid): - srlist.append(self._blk2series(ext, blk, head, tail)) - if len(srlist) > 0: - ts = pd.concat(srlist) - ts.name = sid - if datetime is True: - ts.index = pd.to_datetime(ts.index, unit="s", utc=True) - return ts - else: - return pd.Series([], name=sid) - - @dbcon - def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): - """ - Create data frame - - Parameters - ---------- - sids : list[str] - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pandas.DataFrame - """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - series = [self.series(sid, head=head, tail=tail, datetime=False) - for sid in sids] - df = pd.concat(series, axis=1) - if datetime is True: - df.index = pd.to_datetime(df.index, unit="s", utc=True) - return df - - @dbcon - def first_timestamp(self, sid, epoch=False): - """ - Get the first available timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() - if first_block is None: - return None - - timestamp = first_block[2] - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - return timestamp - - def last_timestamp(self, sid, epoch=False): - """ - Get the theoretical last timestamp for a sensor - - Parameters - ---------- - sid : str - SensorID - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int - """ - timestamp, value = self.last_datapoint(sid, epoch) - return timestamp - - def last_datapoint(self, sid, epoch=False): - """ - Parameters - ---------- - sid : str - SensorId - epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp - - Returns - ------- - pd.Timestamp | int, float - """ - block = self._last_block(sid) - if block is None: - return None, None - - header = block['h'] - timestamp, value = header['tail'] - - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - - return timestamp, value - - @dbcon - def _last_block(self, sid): - cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) - row = cur.fetchone() - if row is None: - return None - - rid, lvl, bid, ext, blk = row - - jblk = self._decompress_block(blk, ext) - data = json.loads(jblk.decode('UTF-8')) - return data - - def _decompress_block(self, blk, ext): - if ext != "gz": - raise NotImplementedError("Compression type not supported in tmpo") - jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding - return jblk - - def _2epochs(self, time): - if isinstance(time, pd.Timestamp): - return int(math.floor(time.value / 1e9)) - elif isinstance(time, int): - return time - else: - raise NotImplementedError("Time format not supported. " + - "Use epochs or a Pandas timestamp.") - - def _blk2series(self, ext, blk, head, tail): - jblk = self._decompress_block(blk, ext) - m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) - pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) - try: - pdsblk = pd.read_json( - pdjblk, - typ="series", - dtype="float", - orient="split", - numpy=True, - date_unit="s") - except: - return pd.Series() - h = json.loads(m.group("h")) - self._npdelta(pdsblk.index, h["head"][0]) - self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] - return pdsblk_truncated - - def _npdelta(self, a, delta): - """Numpy: Modifying Array Values - http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" - for x in np.nditer(a, op_flags=["readwrite"]): - delta += x - x[...] = delta - return a - - def _req_sync(self, sid, rid, lvl, bid): - self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) - token, = self.dbcur.fetchone() - headers = { - "Accept": HTTP_ACCEPT["json"], - "X-Token": token} - params = { - "rid": rid, - "lvl": lvl, - "bid": bid} - f = self.rqs.get( - API_TMPO_SYNC % (self.host, sid), - headers=headers, - params=params, - verify=self.crt) - r = f.result() - r.raise_for_status() - fs = [] - for t in r.json(): - fs.append((t, self._req_block( - sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) - for (t, f) in fs: - self._write_block( - f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) - - def _req_block(self, sid, token, rid, lvl, bid, ext): - headers = { - "Accept": HTTP_ACCEPT["gz"], - "X-Token": token} - f = self.rqs.get( - API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), - headers=headers, - verify=self.crt) - self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) - return f - - def _write_block(self, r, sid, rid, lvl, bid, ext): - blk = sqlite3.Binary(r.content) - now = time.time() - self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) - self._clean(sid, rid, lvl, bid) - self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) - - def _clean(self, sid, rid, lvl, bid): - if lvl == 8: - return - lastchild = self._lastchild(lvl, bid) - self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) - self._clean(sid, rid, lvl - 4, lastchild) - - def _lastchild(self, lvl, bid): - delta = math.trunc(2 ** (lvl - 4)) - return bid + 15 * delta - - def _blocktail(self, lvl, bid): - delta = math.trunc(2 ** lvl) - return bid + delta - - def _dprintf(self, fmt, *args): - if self.debug: - print(fmt % args) +from .sqlitesession import SQLiteSession as Session \ No newline at end of file diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py new file mode 100644 index 0000000..5a682f9 --- /dev/null +++ b/tmpo/sqlitesession.py @@ -0,0 +1,558 @@ +SQL_SENSOR_TABLE = """ + CREATE TABLE IF NOT EXISTS sensor( + sid TEXT, + token TEXT, + PRIMARY KEY(sid))""" + +SQL_SENSOR_INS = """ + INSERT INTO sensor + (sid, token) + VALUES (?, ?)""" + +SQL_SENSOR_DEL = """ + DELETE FROM sensor + WHERE sid = ?""" + +SQL_TMPO_DEL = """ + DELETE FROM tmpo + WHERE sid = ?""" + +SQL_SENSOR_ALL = """ + SELECT sid + FROM sensor + ORDER BY sid""" + +SQL_SENSOR_TOKEN = """ + SELECT token + FROM sensor + WHERE sid = ?""" + +SQL_TMPO_TABLE = """ + CREATE TABLE IF NOT EXISTS tmpo( + sid TEXT, + rid INTEGER, + lvl INTEGER, + bid INTEGER, + ext TEXT, + created REAL, + data BLOB, + PRIMARY KEY(sid, rid, lvl, bid))""" + +SQL_TMPO_INS = """ + INSERT INTO tmpo + (sid, rid, lvl, bid, ext, created, data) + VALUES (?, ?, ?, ?, ?, ?, ?)""" + +SQL_TMPO_CLEAN = """ + DELETE + FROM tmpo + WHERE sid = ? AND rid = ? AND lvl = ? AND bid <= ?""" + +SQL_TMPO_ALL = """ + SELECT sid, rid, lvl, bid, ext, created, data + FROM tmpo + WHERE sid = ? + ORDER BY rid ASC, lvl DESC, bid ASC""" + +SQL_TMPO_LAST = """ + SELECT rid, lvl, bid, ext + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_LAST_DATA = """ + SELECT rid, lvl, bid, ext, data + FROM tmpo + WHERE sid = ? + ORDER BY created DESC, lvl DESC + LIMIT 1""" + +SQL_TMPO_FIRST = """ + SELECT rid, lvl, bid + FROM tmpo + WHERE sid = ? + ORDER BY created ASC, lvl ASC + LIMIT 1""" + +SQL_TMPO_RID_MAX = """ + SELECT MAX(rid) + FROM tmpo + WHERE sid = ?""" + +API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" +API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" + +HTTP_ACCEPT = { + "json": "application/json", + "gz": "application/gzip"} + +RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' +DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" +DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" +EPOCHS_MAX = 2147483647 + + +import os +import sys +import math +import time +import sqlite3 +import requests_futures.sessions +import concurrent.futures +import zlib +import re +import json +import numpy as np +import pandas as pd +from functools import wraps + + +def dbcon(func): + """Set up connection before executing function, commit and close connection + afterwards. Unless a connection already has been created.""" + @wraps(func) + def wrapper(*args, **kwargs): + self = args[0] + if self.dbcon is None: + # set up connection + self.dbcon = sqlite3.connect(self.db) + self.dbcur = self.dbcon.cursor() + self.dbcur.execute(SQL_SENSOR_TABLE) + self.dbcur.execute(SQL_TMPO_TABLE) + + # execute function + try: + result = func(*args, **kwargs) + except Exception as e: + # on exception, first close connection and then raise + self.dbcon.rollback() + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + raise e + else: + # commit everything and close connection + self.dbcon.commit() + self.dbcon.close() + self.dbcon = None + self.dbcur = None + else: + result = func(*args, **kwargs) + return result + return wrapper + + +class SQLiteSession(): + def __init__(self, path=None, workers=16): + """ + Parameters + ---------- + path : str, optional + location for the database + workers : int + default 16 + """ + self.debug = False + if path is None: + path = os.path.expanduser("~") # $HOME + if sys.platform == "win32": + self.home = os.path.join(path, "tmpo") + else: + self.home = os.path.join(path, ".tmpo") + self.db = os.path.join(self.home, "tmpo.sqlite3") + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") + self.host = "api.flukso.net" + try: + os.mkdir(self.home) + except OSError: # dir exists + pass + self.rqs = requests_futures.sessions.FuturesSession( + executor=concurrent.futures.ThreadPoolExecutor( + max_workers=workers)) + self.rqs.headers.update({"X-Version": "1.0"}) + self.dbcon = None + self.dbcur = None + + @dbcon + def add(self, sid, token): + """ + Add new sensor to the database + + Parameters + ---------- + sid : str + SensorId + token : str + """ + try: + self.dbcur.execute(SQL_SENSOR_INS, (sid, token)) + except sqlite3.IntegrityError: # sensor entry exists + pass + + @dbcon + def remove(self, sid): + """ + Remove sensor from the database + + Parameters + ---------- + sid : str + SensorID + """ + self.dbcur.execute(SQL_SENSOR_DEL, (sid,)) + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def reset(self, sid): + """ + Removes all tmpo blocks for a given sensor, but keeps sensor table + intact, so sensor id and token remain in the database. + + Parameters + ---------- + sid : str + """ + self.dbcur.execute(SQL_TMPO_DEL, (sid,)) + + @dbcon + def sync(self, *sids): + """ + Synchronise data + + Parameters + ---------- + sids : list of str + SensorIDs to sync + Optional, leave empty to sync everything + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + for sid in sids: + self.dbcur.execute(SQL_TMPO_LAST, (sid,)) + last = self.dbcur.fetchone() + if last: + rid, lvl, bid, ext = last + self._clean(sid, rid, lvl, bid) + # prevent needless polling + if time.time() < bid + 256: + return + else: + rid, lvl, bid = 0, 0, 0 + self._req_sync(sid, rid, lvl, bid) + + @dbcon + def list(self, *sids): + """ + List all tmpo-blocks in the database + + Parameters + ---------- + sids : list of str + SensorID's for which to list blocks + Optional, leave empty to get them all + + Returns + ------- + list[list[tuple]] + """ + if sids == (): + sids = [sid for (sid,) in self.dbcur.execute(SQL_SENSOR_ALL)] + slist = [] + for sid in sids: + tlist = [] + for tmpo in self.dbcur.execute(SQL_TMPO_ALL, (sid,)): + tlist.append(tmpo) + sid, rid, lvl, bid, ext, ctd, blk = tmpo + self._dprintf(DBG_TMPO_WRITE, ctd, sid, rid, lvl, bid, len(blk)) + slist.append(tlist) + return slist + + @dbcon + def series(self, sid, recycle_id=None, head=None, tail=None, + datetime=True): + """ + Create data Series + + Parameters + ---------- + sid : str + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pandas.Series + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + if recycle_id is None: + self.dbcur.execute(SQL_TMPO_RID_MAX, (sid,)) + recycle_id = self.dbcur.fetchone()[0] + tlist = self.list(sid)[0] + srlist = [] + for _sid, rid, lvl, bid, ext, ctd, blk in tlist: + if (recycle_id == rid + and head < self._blocktail(lvl, bid) + and tail >= bid): + srlist.append(self._blk2series(ext, blk, head, tail)) + if len(srlist) > 0: + ts = pd.concat(srlist) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit="s", utc=True) + return ts + else: + return pd.Series([], name=sid) + + @dbcon + def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): + """ + Create data frame + + Parameters + ---------- + sids : list[str] + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pandas.DataFrame + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + series = [self.series(sid, head=head, tail=tail, datetime=False) + for sid in sids] + df = pd.concat(series, axis=1) + if datetime is True: + df.index = pd.to_datetime(df.index, unit="s", utc=True) + return df + + @dbcon + def first_timestamp(self, sid, epoch=False): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + first_block = self.dbcur.execute(SQL_TMPO_FIRST, (sid,)).fetchone() + if first_block is None: + return None + + timestamp = first_block[2] + if not epoch: + timestamp = pd.Timestamp.utcfromtimestamp(timestamp) + timestamp = timestamp.tz_localize('UTC') + return timestamp + + def last_timestamp(self, sid, epoch=False): + """ + Get the theoretical last timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + timestamp, value = self.last_datapoint(sid, epoch) + return timestamp + + def last_datapoint(self, sid, epoch=False): + """ + Parameters + ---------- + sid : str + SensorId + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int, float + """ + block = self._last_block(sid) + if block is None: + return None, None + + header = block['h'] + timestamp, value = header['tail'] + + if not epoch: + timestamp = pd.Timestamp.utcfromtimestamp(timestamp) + timestamp = timestamp.tz_localize('UTC') + + return timestamp, value + + @dbcon + def _last_block(self, sid): + cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) + row = cur.fetchone() + if row is None: + return None + + rid, lvl, bid, ext, blk = row + + jblk = self._decompress_block(blk, ext) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _decompress_block(self, blk, ext): + if ext != "gz": + raise NotImplementedError("Compression type not supported in tmpo") + jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding + return jblk + + def _2epochs(self, time): + if isinstance(time, pd.Timestamp): + return int(math.floor(time.value / 1e9)) + elif isinstance(time, int): + return time + else: + raise NotImplementedError("Time format not supported. " + + "Use epochs or a Pandas timestamp.") + + def _blk2series(self, ext, blk, head, tail): + jblk = self._decompress_block(blk, ext) + m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) + try: + pdsblk = pd.read_json( + pdjblk, + typ="series", + dtype="float", + orient="split", + numpy=True, + date_unit="s") + except: + return pd.Series() + h = json.loads(m.group("h")) + self._npdelta(pdsblk.index, h["head"][0]) + self._npdelta(pdsblk, h["head"][1]) + pdsblk_truncated = pdsblk.loc[head:tail] + return pdsblk_truncated + + def _npdelta(self, a, delta): + """Numpy: Modifying Array Values + http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" + for x in np.nditer(a, op_flags=["readwrite"]): + delta += x + x[...] = delta + return a + + def _req_sync(self, sid, rid, lvl, bid): + self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) + token, = self.dbcur.fetchone() + headers = { + "Accept": HTTP_ACCEPT["json"], + "X-Token": token} + params = { + "rid": rid, + "lvl": lvl, + "bid": bid} + f = self.rqs.get( + API_TMPO_SYNC % (self.host, sid), + headers=headers, + params=params, + verify=self.crt) + r = f.result() + r.raise_for_status() + fs = [] + for t in r.json(): + fs.append((t, self._req_block( + sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) + for (t, f) in fs: + self._write_block( + f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) + + def _req_block(self, sid, token, rid, lvl, bid, ext): + headers = { + "Accept": HTTP_ACCEPT["gz"], + "X-Token": token} + f = self.rqs.get( + API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), + headers=headers, + verify=self.crt) + self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) + return f + + def _write_block(self, r, sid, rid, lvl, bid, ext): + blk = sqlite3.Binary(r.content) + now = time.time() + self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) + self._clean(sid, rid, lvl, bid) + self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) + + def _clean(self, sid, rid, lvl, bid): + if lvl == 8: + return + lastchild = self._lastchild(lvl, bid) + self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) + self._clean(sid, rid, lvl - 4, lastchild) + + def _lastchild(self, lvl, bid): + delta = math.trunc(2 ** (lvl - 4)) + return bid + 15 * delta + + def _blocktail(self, lvl, bid): + delta = math.trunc(2 ** lvl) + return bid + delta + + def _dprintf(self, fmt, *args): + if self.debug: + print(fmt % args) From 255f686291f2be86cdbba491bf3ecc317aae73db Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 14 Jun 2018 18:02:34 +0200 Subject: [PATCH 07/23] split apisession - sqlitesession --- tmpo/__init__.py | 5 +- tmpo/apisession.py | 387 ++++++++++++++++++++++++++++++++++++++++++ tmpo/sqlitesession.py | 193 ++++----------------- 3 files changed, 423 insertions(+), 162 deletions(-) create mode 100644 tmpo/apisession.py diff --git a/tmpo/__init__.py b/tmpo/__init__.py index fca9c7f..6033548 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,8 +1,9 @@ __title__ = "tmpo" -__version__ = "0.2.10" +__version__ = "0.3.0" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" __copyright__ = "Copyright 2017 Bart Van Der Meerssche" -from .sqlitesession import SQLiteSession as Session \ No newline at end of file +from .sqlitesession import SQLiteSession as Session +from .apisession import APISession diff --git a/tmpo/apisession.py b/tmpo/apisession.py new file mode 100644 index 0000000..09440aa --- /dev/null +++ b/tmpo/apisession.py @@ -0,0 +1,387 @@ +import os +import requests_futures.sessions +import concurrent.futures +import time +import math +import pandas as pd +import re +import zlib +import json +import numpy as np + +HTTP_ACCEPT = { + "json": "application/json", + "gz": "application/gzip"} + +API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" +API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" + +DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" +RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' +EPOCHS_MAX = 2147483647 + + +class APISession: + """ + Get Flukso Data (aka. TMPO Blocks) directly from the Flukso API + """ + def __init__(self, workers=16): + """ + Parameters + ---------- + workers : int + default 16 + """ + self.debug = False + + package_dir = os.path.dirname(__file__) + self.crt = os.path.join(package_dir, ".flukso.crt") + self.host = "api.flukso.net" + + self.rqs = requests_futures.sessions.FuturesSession( + executor=concurrent.futures.ThreadPoolExecutor( + max_workers=workers)) + self.rqs.headers.update({"X-Version": "1.0"}) + + def series(self, sid, token, recycle_id=None, head=None, tail=None, + datetime=True): + """ + Create data Series + + Parameters + ---------- + sid : str + token : str + recycle_id : optional + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.Series + """ + if head is None: + head = 0 + else: + head = self._2epochs(head) + + if tail is None: + tail = EPOCHS_MAX + else: + tail = self._2epochs(tail) + + blist = self._req_blocklist(sid=sid, token=token, rid=recycle_id) + blist = self._slice_blist(blist=blist, head=head, tail=tail) + blocks_futures = self._req_blocks(sid=sid, token=token, blist=blist) + + def parse_blocks(): + for h, future in blocks_futures: + r = future.result() + r.raise_for_status() + _ts = self._blk2series(ext=h['ext'], blk=r.content, + head=head, tail=tail) + yield _ts + + blocks = parse_blocks() + ts = pd.concat(blocks) + if ts.empty: + return pd.Series([], name=sid) + ts.name = sid + if datetime is True: + ts.index = pd.to_datetime(ts.index, unit='s', utc=True) + return ts + + def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): + """ + Create data frame + + Parameters + ---------- + sids : [(str, str)] + List of sensor, token tuples: [(sid, token), ...] + head : int | pandas.Timestamp, optional + Start of the interval + default earliest available + tail : int | pandas.Timestamp, optional + End of the interval + default max epoch + datetime : bool + convert index to datetime + default True + + Returns + ------- + pd.DataFrame + """ + series = (self.series(sid, token=token, head=head, tail=tail, datetime=False) + for sid, token in sids) + df = pd.concat(series, axis=1) + if datetime is True: + df.index = pd.to_datetime(df.index, unit="s", utc=True) + return df + + def first_timestamp(self, sid, token, epoch=False): + """ + Get the first available timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + blist = self._req_blocklist(sid=sid, token=token) + first = blist[0]['bid'] + if not epoch: + first = self._epoch2timestamp(first) + return first + + def last_timestamp(self, sid, token, epoch=False): + """ + Get the last timestamp for a sensor + + Parameters + ---------- + sid : str + SensorID + token : str + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + pd.Timestamp | int + """ + last, _v = self.last_datapoint(sid=sid, token=token, epoch=epoch) + return last + + def last_datapoint(self, sid, token, epoch=False): + """ + Parameters + ---------- + sid : str + SensorId + token : str + epoch : bool + default False + If True return as epoch + If False return as pd.Timestamp + + Returns + ------- + (pd.Timestamp, float) or (int, float) + """ + block = self._last_block(sid=sid, token=token) + if block is None: + return None, None + + header = block['h'] + timestamp, value = header['tail'] + + if not epoch: + timestamp = self._epoch2timestamp(timestamp) + + return timestamp, value + + def _last_block(self, sid, token): + blist = self._req_blocklist(sid=sid, token=token) + last = blist[-1] + bf = self._req_block(sid=sid, token=token, rid=last['rid'], + lvl=last['lvl'], bid=last['bid'], ext=last['ext']) + r = bf.result() + r.raise_for_status() + + jblk = self._decompress_block(blk=r.content, ext=last['ext']) + data = json.loads(jblk.decode('UTF-8')) + return data + + def _epoch2timestamp(self, epoch): + timestamp = pd.Timestamp.utcfromtimestamp(epoch) + timestamp = timestamp.tz_localize('UTC') + return timestamp + + def _2epochs(self, time): + if isinstance(time, pd.Timestamp): + return int(math.floor(time.value / 1e9)) + elif isinstance(time, int): + return time + else: + raise NotImplementedError("Time format not supported. " + + "Use epochs or a Pandas timestamp.") + + def _blk2series(self, ext, blk, head, tail): + jblk = self._decompress_block(blk, ext) + m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) + try: + pdsblk = pd.read_json( + pdjblk, + typ="series", + dtype="float", + orient="split", + numpy=True, + date_unit="s") + except: + return pd.Series() + h = json.loads(m.group("h")) + self._npdelta(pdsblk.index, h["head"][0]) + self._npdelta(pdsblk, h["head"][1]) + pdsblk_truncated = pdsblk.loc[head:tail] + return pdsblk_truncated + + def _decompress_block(self, blk, ext): + if ext != "gz": + raise NotImplementedError("Compression type not supported in tmpo") + jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding + return jblk + + def _req_blocklist(self, sid, token, rid=0, lvl=0, bid=0): + """ + Request the list of available blocks from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + + Returns + ------- + [dict] + """ + headers = { + "Accept": HTTP_ACCEPT["json"], + "X-Token": token} + params = { + "rid": rid if rid else 0, + "lvl": lvl if rid else 0, + "bid": bid if rid else 0} + f = self.rqs.get( + API_TMPO_SYNC % ("api.flukso.net", sid), + headers=headers, + params=params, + verify=self.crt + ) + r = f.result() + r.raise_for_status() + j = r.json() + blist = sorted(j, key=lambda x: x['bid']) + return blist + + def _req_block(self, sid, token, rid, lvl, bid, ext): + """ + Request a block (as a request future) from the API + + Parameters + ---------- + sid : str + token : str + rid : int + lvl : int + bid : int + ext : str + + Returns + ------- + Response + """ + headers = { + "Accept": HTTP_ACCEPT["gz"], + "X-Token": token} + f = self.rqs.get( + API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), + headers=headers, + verify=self.crt) + self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) + return f + + def _req_blocks(self, sid, token, blist): + """ + Request multiple blocks (as a list of request futures) + + Parameters + ---------- + sid : str + token : str + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + + Returns + ------- + (dict, Response) + """ + fs = [] + for t in blist: + fs.append( + (t, self._req_block(sid=sid, token=token, rid=t['rid'], + lvl=t['lvl'], bid=t['bid'], ext=t['ext']))) + return fs + + @staticmethod + def _npdelta(a, delta): + """Numpy: Modifying Array Values + http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" + for x in np.nditer(a, op_flags=["readwrite"]): + delta += x + x[...] = delta + return a + + def _lastchild(self, lvl, bid): + delta = math.trunc(2 ** (lvl - 4)) + return bid + 15 * delta + + def _blocktail(self, lvl, bid): + delta = math.trunc(2 ** lvl) + return bid + delta + + def _dprintf(self, fmt, *args): + if self.debug: + print(fmt % args) + + def _slice_blist(self, blist, head, tail): + """ + Slice a blist (block headers) so it contains all blocks with data + between head and tail + + Parameters + ---------- + blist : [dict] + [ + {'rid': 0, 'lvl': 20, 'bid': 1506803712, 'ext': 'gz'}, + ... + ] + head : int + epochs + tail : int + epochs + + Returns + ------- + [dict] + """ + ret = [] + for b in blist: + if head < self._blocktail(lvl=b['lvl'], bid=b['bid']) and tail >= b['bid']: + ret.append(b) + return ret diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py index 5a682f9..35ffc56 100644 --- a/tmpo/sqlitesession.py +++ b/tmpo/sqlitesession.py @@ -80,32 +80,17 @@ FROM tmpo WHERE sid = ?""" -API_TMPO_SYNC = "https://%s/sensor/%s/tmpo/sync" -API_TMPO_BLOCK = "https://%s/sensor/%s/tmpo/%d/%d/%d" - -HTTP_ACCEPT = { - "json": "application/json", - "gz": "application/gzip"} - -RE_JSON_BLK = r'^\{"h":(?P\{.+?\}),"t":(?P\[.+?\]),"v":(?P\[.+?\])\}$' -DBG_TMPO_REQUEST = "[r] time:%.3f sid:%s rid:%d lvl:%2d bid:%d" DBG_TMPO_WRITE = "[w] time:%.3f sid:%s rid:%d lvl:%2d bid:%d size[B]:%d" -EPOCHS_MAX = 2147483647 import os import sys -import math import time import sqlite3 -import requests_futures.sessions -import concurrent.futures -import zlib -import re import json -import numpy as np import pandas as pd from functools import wraps +from .apisession import APISession, EPOCHS_MAX def dbcon(func): @@ -144,17 +129,18 @@ def wrapper(*args, **kwargs): return wrapper -class SQLiteSession(): +class SQLiteSession(APISession): def __init__(self, path=None, workers=16): """ Parameters ---------- path : str, optional - location for the database + location for the sqlite database workers : int default 16 """ - self.debug = False + super(SQLiteSession, self).__init__(workers=workers) + if path is None: path = os.path.expanduser("~") # $HOME if sys.platform == "win32": @@ -163,17 +149,11 @@ def __init__(self, path=None, workers=16): self.home = os.path.join(path, ".tmpo") self.db = os.path.join(self.home, "tmpo.sqlite3") - package_dir = os.path.dirname(__file__) - self.crt = os.path.join(package_dir, ".flukso.crt") - self.host = "api.flukso.net" try: os.mkdir(self.home) except OSError: # dir exists pass - self.rqs = requests_futures.sessions.FuturesSession( - executor=concurrent.futures.ThreadPoolExecutor( - max_workers=workers)) - self.rqs.headers.update({"X-Version": "1.0"}) + self.dbcon = None self.dbcur = None @@ -273,7 +253,7 @@ def list(self, *sids): @dbcon def series(self, sid, recycle_id=None, head=None, tail=None, - datetime=True): + datetime=True, **kwargs): """ Create data Series @@ -324,14 +304,13 @@ def series(self, sid, recycle_id=None, head=None, tail=None, else: return pd.Series([], name=sid) - @dbcon def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): """ Create data frame Parameters ---------- - sids : list[str] + sids : [str] head : int | pandas.Timestamp, optional Start of the interval default earliest available @@ -344,27 +323,15 @@ def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): Returns ------- - pandas.DataFrame + pd.DataFrame """ - if head is None: - head = 0 - else: - head = self._2epochs(head) - - if tail is None: - tail = EPOCHS_MAX - else: - tail = self._2epochs(tail) - - series = [self.series(sid, head=head, tail=tail, datetime=False) - for sid in sids] - df = pd.concat(series, axis=1) - if datetime is True: - df.index = pd.to_datetime(df.index, unit="s", utc=True) + sids = [(sid, None) for sid in sids] + df = super(SQLiteSession, self).dataframe(sids=sids, head=head, + tail=tail, datetime=datetime) return df @dbcon - def first_timestamp(self, sid, epoch=False): + def first_timestamp(self, sid, epoch=False, **kwargs): """ Get the first available timestamp for a sensor @@ -387,60 +354,47 @@ def first_timestamp(self, sid, epoch=False): timestamp = first_block[2] if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') + timestamp = self._epoch2timestamp(timestamp) return timestamp - def last_timestamp(self, sid, epoch=False): + def last_timestamp(self, sid, epoch=False, **kwargs): """ - Get the theoretical last timestamp for a sensor + Get the last timestamp for a sensor Parameters ---------- sid : str - SensorID epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp + kwargs : dict Returns ------- - pd.Timestamp | int + """ - timestamp, value = self.last_datapoint(sid, epoch) - return timestamp + last = super(SQLiteSession, self).last_timestamp( + sid=sid, token=kwargs.get('token'), epoch=epoch) + return last - def last_datapoint(self, sid, epoch=False): + def last_datapoint(self, sid, epoch=False, **kwargs): """ + Get the last datapoint for a sensor + Parameters ---------- sid : str - SensorId epoch : bool - default False - If True return as epoch - If False return as pd.Timestamp + kwargs : dict Returns ------- - pd.Timestamp | int, float + (pd.Timestamp, float) or (int, float) """ - block = self._last_block(sid) - if block is None: - return None, None - - header = block['h'] - timestamp, value = header['tail'] - - if not epoch: - timestamp = pd.Timestamp.utcfromtimestamp(timestamp) - timestamp = timestamp.tz_localize('UTC') - - return timestamp, value + dp = super(SQLiteSession, self).last_datapoint( + sid=sid, token=kwargs.get('token'), epoch=epoch) + return dp @dbcon - def _last_block(self, sid): + def _last_block(self, sid, **kwargs): cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) row = cur.fetchone() if row is None: @@ -452,85 +406,16 @@ def _last_block(self, sid): data = json.loads(jblk.decode('UTF-8')) return data - def _decompress_block(self, blk, ext): - if ext != "gz": - raise NotImplementedError("Compression type not supported in tmpo") - jblk = zlib.decompress(blk, zlib.MAX_WBITS | 16) # gzip decoding - return jblk - - def _2epochs(self, time): - if isinstance(time, pd.Timestamp): - return int(math.floor(time.value / 1e9)) - elif isinstance(time, int): - return time - else: - raise NotImplementedError("Time format not supported. " + - "Use epochs or a Pandas timestamp.") - - def _blk2series(self, ext, blk, head, tail): - jblk = self._decompress_block(blk, ext) - m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) - pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) - try: - pdsblk = pd.read_json( - pdjblk, - typ="series", - dtype="float", - orient="split", - numpy=True, - date_unit="s") - except: - return pd.Series() - h = json.loads(m.group("h")) - self._npdelta(pdsblk.index, h["head"][0]) - self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] - return pdsblk_truncated - - def _npdelta(self, a, delta): - """Numpy: Modifying Array Values - http://docs.scipy.org/doc/numpy/reference/arrays.nditer.html""" - for x in np.nditer(a, op_flags=["readwrite"]): - delta += x - x[...] = delta - return a - def _req_sync(self, sid, rid, lvl, bid): self.dbcur.execute(SQL_SENSOR_TOKEN, (sid,)) token, = self.dbcur.fetchone() - headers = { - "Accept": HTTP_ACCEPT["json"], - "X-Token": token} - params = { - "rid": rid, - "lvl": lvl, - "bid": bid} - f = self.rqs.get( - API_TMPO_SYNC % (self.host, sid), - headers=headers, - params=params, - verify=self.crt) - r = f.result() - r.raise_for_status() - fs = [] - for t in r.json(): - fs.append((t, self._req_block( - sid, token, t["rid"], t["lvl"], t["bid"], t["ext"]))) + blist = self._req_blocklist(sid=sid, token=token, rid=rid, lvl=lvl, + bid=bid) + fs = self._req_blocks(sid=sid, token=token, blist=blist) for (t, f) in fs: self._write_block( f.result(), sid, t["rid"], t["lvl"], t["bid"], t["ext"]) - def _req_block(self, sid, token, rid, lvl, bid, ext): - headers = { - "Accept": HTTP_ACCEPT["gz"], - "X-Token": token} - f = self.rqs.get( - API_TMPO_BLOCK % (self.host, sid, rid, lvl, bid), - headers=headers, - verify=self.crt) - self._dprintf(DBG_TMPO_REQUEST, time.time(), sid, rid, lvl, bid) - return f - def _write_block(self, r, sid, rid, lvl, bid, ext): blk = sqlite3.Binary(r.content) now = time.time() @@ -544,15 +429,3 @@ def _clean(self, sid, rid, lvl, bid): lastchild = self._lastchild(lvl, bid) self.dbcur.execute(SQL_TMPO_CLEAN, (sid, rid, lvl - 4, lastchild)) self._clean(sid, rid, lvl - 4, lastchild) - - def _lastchild(self, lvl, bid): - delta = math.trunc(2 ** (lvl - 4)) - return bid + 15 * delta - - def _blocktail(self, lvl, bid): - delta = math.trunc(2 ** lvl) - return bid + delta - - def _dprintf(self, fmt, *args): - if self.debug: - print(fmt % args) From 62fcee2b17dbad45fa7dd65d7408e624dd656c5f Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 09:48:23 +0200 Subject: [PATCH 08/23] also import SQLiteSession --- tmpo/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 6033548..5382e25 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -6,4 +6,5 @@ __copyright__ = "Copyright 2017 Bart Van Der Meerssche" from .sqlitesession import SQLiteSession as Session +from .sqlitesession import SQLiteSession from .apisession import APISession From 219b78feb1f06fc6c641c5fd55dc2e0ce2bd464d Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 11:03:39 +0200 Subject: [PATCH 09/23] save sensorids & tokens in apisession to make inheritance easier --- tmpo/apisession.py | 46 +++++++++++++++++++++++--------- tmpo/sqlitesession.py | 62 ------------------------------------------- 2 files changed, 34 insertions(+), 74 deletions(-) diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 09440aa..e811f7f 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -43,15 +43,29 @@ def __init__(self, workers=16): max_workers=workers)) self.rqs.headers.update({"X-Version": "1.0"}) - def series(self, sid, token, recycle_id=None, head=None, tail=None, - datetime=True): + self.sensors = {} + + def add(self, sid, token): """ - Create data Series + Add a sensor and token to the client, so you don't have to enter the + token for each call (also for compatibility with sqlitesession) Parameters ---------- sid : str token : str + """ + self.sensors.update({sid: token}) + + def series(self, sid, recycle_id=None, head=None, tail=None, + datetime=True, token=None): + """ + Create data Series + + Parameters + ---------- + sid : str + token : str, optional recycle_id : optional head : int | pandas.Timestamp, optional Start of the interval @@ -67,6 +81,8 @@ def series(self, sid, token, recycle_id=None, head=None, tail=None, ------- pd.Series """ + token = token if token else self.sensors.get(sid) + if head is None: head = 0 else: @@ -104,8 +120,8 @@ def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): Parameters ---------- - sids : [(str, str)] - List of sensor, token tuples: [(sid, token), ...] + sids : [str] + List of SensorID's head : int | pandas.Timestamp, optional Start of the interval default earliest available @@ -120,14 +136,14 @@ def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): ------- pd.DataFrame """ - series = (self.series(sid, token=token, head=head, tail=tail, datetime=False) - for sid, token in sids) + series = (self.series(sid, head=head, tail=tail, datetime=False) + for sid in sids) df = pd.concat(series, axis=1) if datetime is True: df.index = pd.to_datetime(df.index, unit="s", utc=True) return df - def first_timestamp(self, sid, token, epoch=False): + def first_timestamp(self, sid, token=None, epoch=False): """ Get the first available timestamp for a sensor @@ -135,6 +151,7 @@ def first_timestamp(self, sid, token, epoch=False): ---------- sid : str SensorID + token : str, optional epoch : bool default False If True return as epoch @@ -144,13 +161,15 @@ def first_timestamp(self, sid, token, epoch=False): ------- pd.Timestamp | int """ + token = token if token else self.sensors.get(sid) + blist = self._req_blocklist(sid=sid, token=token) first = blist[0]['bid'] if not epoch: first = self._epoch2timestamp(first) return first - def last_timestamp(self, sid, token, epoch=False): + def last_timestamp(self, sid, token=None, epoch=False): """ Get the last timestamp for a sensor @@ -158,6 +177,7 @@ def last_timestamp(self, sid, token, epoch=False): ---------- sid : str SensorID + token: str, optional token : str epoch : bool default False @@ -171,13 +191,13 @@ def last_timestamp(self, sid, token, epoch=False): last, _v = self.last_datapoint(sid=sid, token=token, epoch=epoch) return last - def last_datapoint(self, sid, token, epoch=False): + def last_datapoint(self, sid, token=None, epoch=False): """ Parameters ---------- sid : str SensorId - token : str + token : str, optional epoch : bool default False If True return as epoch @@ -199,7 +219,9 @@ def last_datapoint(self, sid, token, epoch=False): return timestamp, value - def _last_block(self, sid, token): + def _last_block(self, sid, token=None): + token = token if token else self.sensors.get(sid) + blist = self._req_blocklist(sid=sid, token=token) last = blist[-1] bf = self._req_block(sid=sid, token=token, rid=last['rid'], diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py index 35ffc56..c61dd29 100644 --- a/tmpo/sqlitesession.py +++ b/tmpo/sqlitesession.py @@ -304,32 +304,6 @@ def series(self, sid, recycle_id=None, head=None, tail=None, else: return pd.Series([], name=sid) - def dataframe(self, sids, head=0, tail=EPOCHS_MAX, datetime=True): - """ - Create data frame - - Parameters - ---------- - sids : [str] - head : int | pandas.Timestamp, optional - Start of the interval - default earliest available - tail : int | pandas.Timestamp, optional - End of the interval - default max epoch - datetime : bool - convert index to datetime - default True - - Returns - ------- - pd.DataFrame - """ - sids = [(sid, None) for sid in sids] - df = super(SQLiteSession, self).dataframe(sids=sids, head=head, - tail=tail, datetime=datetime) - return df - @dbcon def first_timestamp(self, sid, epoch=False, **kwargs): """ @@ -357,42 +331,6 @@ def first_timestamp(self, sid, epoch=False, **kwargs): timestamp = self._epoch2timestamp(timestamp) return timestamp - def last_timestamp(self, sid, epoch=False, **kwargs): - """ - Get the last timestamp for a sensor - - Parameters - ---------- - sid : str - epoch : bool - kwargs : dict - - Returns - ------- - - """ - last = super(SQLiteSession, self).last_timestamp( - sid=sid, token=kwargs.get('token'), epoch=epoch) - return last - - def last_datapoint(self, sid, epoch=False, **kwargs): - """ - Get the last datapoint for a sensor - - Parameters - ---------- - sid : str - epoch : bool - kwargs : dict - - Returns - ------- - (pd.Timestamp, float) or (int, float) - """ - dp = super(SQLiteSession, self).last_datapoint( - sid=sid, token=kwargs.get('token'), epoch=epoch) - return dp - @dbcon def _last_block(self, sid, **kwargs): cur = self.dbcur.execute(SQL_TMPO_LAST_DATA, (sid,)) From 3d22c05da89b0597c7c7f57c4fcdc8dc66baad31 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 11:04:55 +0200 Subject: [PATCH 10/23] don't use get, to force error if token is not present --- tmpo/apisession.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tmpo/apisession.py b/tmpo/apisession.py index e811f7f..546a1b5 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -81,7 +81,7 @@ def series(self, sid, recycle_id=None, head=None, tail=None, ------- pd.Series """ - token = token if token else self.sensors.get(sid) + token = token if token else self.sensors[sid] if head is None: head = 0 @@ -161,7 +161,7 @@ def first_timestamp(self, sid, token=None, epoch=False): ------- pd.Timestamp | int """ - token = token if token else self.sensors.get(sid) + token = token if token else self.sensors[sid] blist = self._req_blocklist(sid=sid, token=token) first = blist[0]['bid'] @@ -220,7 +220,7 @@ def last_datapoint(self, sid, token=None, epoch=False): return timestamp, value def _last_block(self, sid, token=None): - token = token if token else self.sensors.get(sid) + token = token if token else self.sensors[sid] blist = self._req_blocklist(sid=sid, token=token) last = blist[-1] From b35bb209d6854f4d8440eb30c65923f0d796924c Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 12:09:52 +0200 Subject: [PATCH 11/23] include cert in package data --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3b536bc..4b23196 100644 --- a/setup.py +++ b/setup.py @@ -83,6 +83,7 @@ # Note: for creating the source distribution, they had to be included in the # MANIFEST.in as well. package_data={ - 'tmpo': ['tmpo.ipynb', 'LICENSE', 'README.md'], + 'tmpo': ['LICENSE', 'README.md', 'tmpo/.flukso.crt'], }, + include_package_data=True, ) From 71db2f62867087257b81817267044a461a172e41 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 12:11:45 +0200 Subject: [PATCH 12/23] try other paths --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4b23196..cc8388e 100644 --- a/setup.py +++ b/setup.py @@ -83,7 +83,7 @@ # Note: for creating the source distribution, they had to be included in the # MANIFEST.in as well. package_data={ - 'tmpo': ['LICENSE', 'README.md', 'tmpo/.flukso.crt'], + 'tmpo': ['LICENSE', 'README.md', '.flukso.crt'], }, include_package_data=True, ) From d9d9238a992fd710941b0fcb5161c781827b976e Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Mon, 18 Jun 2018 12:13:01 +0200 Subject: [PATCH 13/23] include files outside of tmpo folder --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index cc8388e..9e5f2ad 100644 --- a/setup.py +++ b/setup.py @@ -83,7 +83,8 @@ # Note: for creating the source distribution, they had to be included in the # MANIFEST.in as well. package_data={ - 'tmpo': ['LICENSE', 'README.md', '.flukso.crt'], + '': ['LICENSE', 'README.md'], + 'tmpo': ['.flukso.crt'], }, include_package_data=True, ) From fce38d0e7be5e8b071463721f748175d9493671d Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 17 Jul 2018 12:24:09 +0200 Subject: [PATCH 14/23] check for empty block list --- tmpo/apisession.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 546a1b5..511ca1c 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -95,6 +95,8 @@ def series(self, sid, recycle_id=None, head=None, tail=None, blist = self._req_blocklist(sid=sid, token=token, rid=recycle_id) blist = self._slice_blist(blist=blist, head=head, tail=tail) + if len(blist) == 0: + return pd.Series([], name=sid) blocks_futures = self._req_blocks(sid=sid, token=token, blist=blist) def parse_blocks(): From f74a6f3ec33662b1157f486f9493ad1825988351 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 17 Jul 2018 12:30:02 +0200 Subject: [PATCH 15/23] check for empty block list --- tmpo/apisession.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 511ca1c..1577e9a 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -166,6 +166,8 @@ def first_timestamp(self, sid, token=None, epoch=False): token = token if token else self.sensors[sid] blist = self._req_blocklist(sid=sid, token=token) + if len(blist) == 0: + return None first = blist[0]['bid'] if not epoch: first = self._epoch2timestamp(first) From 4d67569800d1951215ed4e0959e7f82d7fab960e Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 17 Jul 2018 12:31:46 +0200 Subject: [PATCH 16/23] version bump --- tmpo/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 5382e25..6e11490 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.0" +__version__ = "0.3.1" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" From 1b468a382677b5d50de75ac4925c268595e01a45 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 2 Apr 2019 16:33:08 +0200 Subject: [PATCH 17/23] skip block when write causes an integrityerror --- tmpo/__init__.py | 2 +- tmpo/sqlitesession.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 6e11490..2e2beb8 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.1" +__version__ = "0.3.2" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py index c61dd29..b9944a4 100644 --- a/tmpo/sqlitesession.py +++ b/tmpo/sqlitesession.py @@ -357,7 +357,10 @@ def _req_sync(self, sid, rid, lvl, bid): def _write_block(self, r, sid, rid, lvl, bid, ext): blk = sqlite3.Binary(r.content) now = time.time() - self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) + try: + self.dbcur.execute(SQL_TMPO_INS, (sid, rid, lvl, bid, ext, now, blk)) + except sqlite3.IntegrityError: + pass self._clean(sid, rid, lvl, bid) self._dprintf(DBG_TMPO_WRITE, now, sid, rid, lvl, bid, len(blk)) From bc1f41464f3a22ac35281d1ca44a007c93ca14a7 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Thu, 29 Oct 2020 15:59:19 +0100 Subject: [PATCH 18/23] check if decompressed content is not None --- tmpo/__init__.py | 2 +- tmpo/apisession.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 2e2beb8..d49d359 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.2" +__version__ = "0.3.3" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 1577e9a..89365b7 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -254,6 +254,8 @@ def _2epochs(self, time): def _blk2series(self, ext, blk, head, tail): jblk = self._decompress_block(blk, ext) m = re.match(RE_JSON_BLK, jblk.decode("utf-8")) + if m is None: + return pd.Series() pdjblk = '{"index":%s,"data":%s}' % (m.group("t"), m.group("v")) try: pdsblk = pd.read_json( From 53e8dc53562c021ec51cc70eab17a44b043ce1bf Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Fri, 15 Jan 2021 16:12:27 +0100 Subject: [PATCH 19/23] option to omit certificate --- tmpo/__init__.py | 2 +- tmpo/apisession.py | 8 +++++--- tmpo/sqlitesession.py | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index d49d359..9907c44 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.3" +__version__ = "0.3.4" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 89365b7..baad8b7 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -25,7 +25,7 @@ class APISession: """ Get Flukso Data (aka. TMPO Blocks) directly from the Flukso API """ - def __init__(self, workers=16): + def __init__(self, workers=16, cert=None): """ Parameters ---------- @@ -35,7 +35,10 @@ def __init__(self, workers=16): self.debug = False package_dir = os.path.dirname(__file__) - self.crt = os.path.join(package_dir, ".flukso.crt") + if cert != False: + self.crt = os.path.join(package_dir, ".flukso.crt") + else: + self.crt = cert self.host = "api.flukso.net" self.rqs = requests_futures.sessions.FuturesSession( @@ -263,7 +266,6 @@ def _blk2series(self, ext, blk, head, tail): typ="series", dtype="float", orient="split", - numpy=True, date_unit="s") except: return pd.Series() diff --git a/tmpo/sqlitesession.py b/tmpo/sqlitesession.py index b9944a4..f9006fc 100644 --- a/tmpo/sqlitesession.py +++ b/tmpo/sqlitesession.py @@ -130,7 +130,7 @@ def wrapper(*args, **kwargs): class SQLiteSession(APISession): - def __init__(self, path=None, workers=16): + def __init__(self, path=None, workers=16, cert=None): """ Parameters ---------- @@ -139,7 +139,7 @@ def __init__(self, path=None, workers=16): workers : int default 16 """ - super(SQLiteSession, self).__init__(workers=workers) + super(SQLiteSession, self).__init__(workers=workers, cert=cert) if path is None: path = os.path.expanduser("~") # $HOME From d0e8b56045cb094232e483ff540b2a4217416299 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 30 Nov 2021 16:44:51 +0100 Subject: [PATCH 20/23] truncate error --- tmpo/apisession.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tmpo/apisession.py b/tmpo/apisession.py index baad8b7..7de4e61 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -272,7 +272,15 @@ def _blk2series(self, ext, blk, head, tail): h = json.loads(m.group("h")) self._npdelta(pdsblk.index, h["head"][0]) self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdsblk.loc[head:tail] + pdsblk_truncated = pdjblk + try: + pdsblk_truncated = pdsblk_truncated.loc[head:] + except TypeError: + pass + try: + pdsblk_truncated = pdsblk_truncated.loc[:tail] + except TypeError: + pass return pdsblk_truncated def _decompress_block(self, blk, ext): From a414bc598c47aa58e62fc01a29fa7576dfb34fad Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 30 Nov 2021 16:47:03 +0100 Subject: [PATCH 21/23] version bump --- tmpo/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 9907c44..deb8030 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.4" +__version__ = "0.3.5" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" From b5da7ab5883bbf18d67222328f34a7af48700f14 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 30 Nov 2021 16:49:26 +0100 Subject: [PATCH 22/23] typo fix and version bump --- tmpo/__init__.py | 2 +- tmpo/apisession.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tmpo/__init__.py b/tmpo/__init__.py index deb8030..279c59e 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.5" +__version__ = "0.3.6" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT" diff --git a/tmpo/apisession.py b/tmpo/apisession.py index 7de4e61..7c20a53 100644 --- a/tmpo/apisession.py +++ b/tmpo/apisession.py @@ -272,7 +272,7 @@ def _blk2series(self, ext, blk, head, tail): h = json.loads(m.group("h")) self._npdelta(pdsblk.index, h["head"][0]) self._npdelta(pdsblk, h["head"][1]) - pdsblk_truncated = pdjblk + pdsblk_truncated = pdsblk try: pdsblk_truncated = pdsblk_truncated.loc[head:] except TypeError: From da6e815ab505a56270d97d24b0ec1902d8216fe9 Mon Sep 17 00:00:00 2001 From: Jan Pecinovsky Date: Tue, 8 Feb 2022 18:23:41 +0100 Subject: [PATCH 23/23] version that does not require futures --- setup.py | 2 +- tmpo/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 9e5f2ad..0a7806c 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ # List run-time dependencies here. These will be installed by pip when # your project is installed. - install_requires=['numpy', 'pandas', 'futures', 'requests_futures'], + install_requires=['numpy', 'pandas', 'requests_futures'], # If there are data files included in your packages that need to be # installed, specify them here. If using Python 2.6 or less, then these diff --git a/tmpo/__init__.py b/tmpo/__init__.py index 279c59e..b649c6b 100644 --- a/tmpo/__init__.py +++ b/tmpo/__init__.py @@ -1,5 +1,5 @@ __title__ = "tmpo" -__version__ = "0.3.6" +__version__ = "0.3.7" __build__ = 0x000100 __author__ = "Bart Van Der Meerssche" __license__ = "MIT"