From 0ef147745407a3a91cc4965e7d66d93597f499f5 Mon Sep 17 00:00:00 2001 From: ovresko <32199620+ovresko@users.noreply.github.com> Date: Tue, 11 Jan 2022 08:19:26 -0500 Subject: [PATCH 1/4] Create client_async.py Support for async with aioredis 1.3.1 --- redistimeseries/client_async/client_async.py | 510 +++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 redistimeseries/client_async/client_async.py diff --git a/redistimeseries/client_async/client_async.py b/redistimeseries/client_async/client_async.py new file mode 100644 index 0000000..f0a3372 --- /dev/null +++ b/redistimeseries/client_async/client_async.py @@ -0,0 +1,510 @@ +from aioredis import Redis + +class TSInfo(object): + rules = [] + labels = [] + sourceKey = None + chunk_count = None + memory_usage = None + total_samples = None + retention_msecs = None + last_time_stamp = None + first_time_stamp = None + # As of RedisTimeseries >= v1.4 max_samples_per_chunk is deprecated in favor of chunk_size + max_samples_per_chunk = None + chunk_size = None + duplicate_policy = None + + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.rules = response['rules'] + self.sourceKey = response['sourceKey'] + self.chunkCount = response['chunkCount'] + self.memory_usage = response['memoryUsage'] + self.total_samples = response['totalSamples'] + self.labels = list_to_dict(response['labels']) + self.retention_msecs = response['retentionTime'] + self.lastTimeStamp = response['lastTimestamp'] + self.first_time_stamp = response['firstTimestamp'] + if 'maxSamplesPerChunk' in response: + self.max_samples_per_chunk = response['maxSamplesPerChunk'] + self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes + if 'chunkSize' in response: + self.chunk_size = response['chunkSize'] + if 'duplicatePolicy' in response: + self.duplicate_policy = response['duplicatePolicy'] + if type(self.duplicate_policy) == bytes: + self.duplicate_policy = self.duplicate_policy.decode() + +def list_to_dict(aList): + return {nativestr(aList[i][0]):nativestr(aList[i][1]) + for i in range(len(aList))} + +def parse_range(response): + return [tuple((l[0], float(l[1]))) for l in response] + +def parse_m_range(response): + res = [] + for item in response: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + parse_range(item[2])]}) + return res + +def bool_ok(response): + return nativestr(response) == 'OK' + +def parse_get(response): + if response == []: + return None + return (int(response[0]), float(response[1])) + +def parse_m_get(response): + res = [] + for item in response: + if item[2] == []: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), None, None]}) + else: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + int(item[2][0]), float(item[2][1])]}) + + return res + +def parseToList(response): + res = [] + for item in response: + res.append(nativestr(item)) + return res + +class Client(object): #changed from StrictRedis + """ + This class subclasses redis-py's `Redis` and implements + RedisTimeSeries's commands (prefixed with "ts"). + The client allows to interact with RedisTimeSeries and use all of + it's functionality. + """ + + CREATE_CMD = 'TS.CREATE' + ALTER_CMD = 'TS.ALTER' + ADD_CMD = 'TS.ADD' + MADD_CMD = 'TS.MADD' + INCRBY_CMD = 'TS.INCRBY' + DECRBY_CMD = 'TS.DECRBY' + CREATERULE_CMD = 'TS.CREATERULE' + DELETERULE_CMD = 'TS.DELETERULE' + RANGE_CMD = 'TS.RANGE' + REVRANGE_CMD = 'TS.REVRANGE' + MRANGE_CMD = 'TS.MRANGE' + MREVRANGE_CMD = 'TS.MREVRANGE' + GET_CMD = 'TS.GET' + MGET_CMD = 'TS.MGET' + INFO_CMD = 'TS.INFO' + QUERYINDEX_CMD = 'TS.QUERYINDEX' + redis:Redis=None + @classmethod + async def create(conn:Redis): + """ + Creates a new RedisTimeSeries client. + """ + self = Client() + self.redis = conn + + # Set the module commands' callbacks + self.MODULE_CALLBACKS = { + self.CREATE_CMD : bool_ok, + self.ALTER_CMD : bool_ok, + self.CREATERULE_CMD : bool_ok, + self.DELETERULE_CMD : bool_ok, + self.RANGE_CMD : parse_range, + self.REVRANGE_CMD: parse_range, + self.MRANGE_CMD : parse_m_range, + self.MREVRANGE_CMD: parse_m_range, + self.GET_CMD : parse_get, + self.MGET_CMD : parse_m_get, + self.INFO_CMD : TSInfo, + self.QUERYINDEX_CMD : parseToList, + } + # for k in MODULE_CALLBACKS: + # self.redis.set_response_callback(k, MODULE_CALLBACKS[k]) + + @staticmethod + def appendUncompressed(params, uncompressed): + if uncompressed: + params.extend(['UNCOMPRESSED']) + + @staticmethod + def appendWithLabels(params, with_labels): + if with_labels: + params.extend(['WITHLABELS']) + + @staticmethod + def appendRetention(params, retention): + if retention is not None: + params.extend(['RETENTION', retention]) + + @staticmethod + def appendLabels(params, labels): + if labels: + params.append('LABELS') + for k, v in labels.items(): + params.extend([k,v]) + + @staticmethod + def appendCount(params, count): + if count is not None: + params.extend(['COUNT', count]) + + @staticmethod + def appendTimestamp(params, timestamp): + if timestamp is not None: + params.extend(['TIMESTAMP', timestamp]) + + @staticmethod + def appendAggregation(params, aggregation_type, + bucket_size_msec): + params.append('AGGREGATION') + params.extend([aggregation_type, bucket_size_msec]) + + @staticmethod + def appendChunkSize(params, chunk_size): + if chunk_size is not None: + params.extend(['CHUNK_SIZE', chunk_size]) + + @staticmethod + def appendDuplicatePolicy(params, command, duplicate_policy): + if duplicate_policy is not None: + if command == 'TS.ADD': + params.extend(['ON_DUPLICATE', duplicate_policy]) + else: + params.extend(['DUPLICATE_POLICY', duplicate_policy]) + + def _execute(self,cmd:str,response): + if response is not None and cmd in self.CREATE_CMD: + return self.CREATE_CMD[cmd](response) + return response + + async def create(self, key, **kwargs): + """ + Create a new time-series. + + Args: + key: time-series key + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key] + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.CREATE_CMD, duplicate_policy) + self.appendLabels(params, labels) + + return self._execute(self.CREATE_CMD,await self.redis.execute(self.CREATE_CMD, *params)) + + async def alter(self, key, **kwargs): + """ + Update the retention, labels of an existing key. The parameters + are the same as TS.CREATE. + """ + retention_msecs = kwargs.get('retention_msecs', None) + labels = kwargs.get('labels', {}) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key] + self.appendRetention(params, retention_msecs) + self.appendDuplicatePolicy(params, self.ALTER_CMD, duplicate_policy) + self.appendLabels(params, labels) + + return self._execute(self.ALTER_CMD,await self.redis.execute(self.ALTER_CMD, *params)) + # return await self.redis.execute(self.ALTER_CMD, *params) + + async def add(self, key, timestamp, value, **kwargs): + """ + Append (or create and append) a new sample to the series. + + Args: + key: time-series key + timestamp: timestamp of the sample. * can be used for automatic timestamp (using the system clock). + value: numeric data value of the sample + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key, timestamp, value] + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.ADD_CMD, duplicate_policy) + self.appendLabels(params, labels) + return self._execute(self.ADD_CMD,await self.redis.execute(self.ADD_CMD, *params)) + # return await self.redis.execute(self.ADD_CMD, *params) + + async def madd(self, ktv_tuples): + """ + Appends (or creates and appends) a new ``value`` to series + ``key`` with ``timestamp``. Expects a list of ``tuples`` as + (``key``,``timestamp``, ``value``). Return value is an + array with timestamps of insertions. + """ + params = [] + for ktv in ktv_tuples: + for item in ktv: + params.append(item) + + return self._execute(self.MADD_CMD,await self.redis.execute(self.MADD_CMD, *params)) + # return await self.redis.execute(self.MADD_CMD, *params) + + async def incrby(self, key, value, **kwargs): + """ + Increment (or create an time-series and increment) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. + + Args: + key: time-series key + value: numeric data value of the sample + timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + params = [key, value] + self.appendTimestamp(params, timestamp) + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendLabels(params, labels) + return self._execute(self.INCRBY_CMD,await self.redis.execute(self.INCRBY_CMD, *params)) + # return self.redis.execute(self.INCRBY_CMD, *params) + + async def decrby(self, key, value, **kwargs): + """ + Decrement (or create an time-series and decrement) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. + + Args: + key: time-series key + value: numeric data value of the sample + timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + params = [key, value] + self.appendTimestamp(params, timestamp) + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendLabels(params, labels) + return self._execute(self.DECRBY_CMD,await self.redis.execute(self.DECRBY_CMD, *params)) + # return self.redis.execute(self.DECRBY_CMD, *params) + + async def createrule(self, source_key, dest_key, + aggregation_type, bucket_size_msec): + """ + Creates a compaction rule from values added to ``source_key`` + into ``dest_key``. Aggregating for ``bucket_size_msec`` where an + ``aggregation_type`` can be ['avg', 'sum', 'min', 'max', + 'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + """ + params=[source_key, dest_key] + self.appendAggregation(params, aggregation_type, bucket_size_msec) + return self._execute(self.CREATERULE_CMD,await self.redis.execute(self.CREATERULE_CMD, *params)) + # return self.redis.execute(self.CREATERULE_CMD, *params) + + async def deleterule(self, source_key, dest_key): + """Deletes a compaction rule""" + return self._execute(self.DELETERULE_CMD,await self.redis.execute(self.DELETERULE_CMD, source_key, dest_key)) + # return self.redis.execute(self.DELETERULE_CMD, source_key, dest_key) + + async def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec): + """ + Internal method to create TS.RANGE and TS.REVRANGE arguments + """ + params = [key, from_time, to_time] + self.appendCount(params, count) + if aggregation_type is not None: + self.appendAggregation(params, aggregation_type, bucket_size_msec) + return params + + async def range(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): + """ + Query a range in forward direction for a specific time-serie. + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) + return self._execute(self.RANGE_CMD,await self.redis.execute(self.RANGE_CMD, *params)) + # return self.redis.execute(self.RANGE_CMD, *params) + + async def revrange(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): + """ + Query a range in reverse direction for a specific time-serie. + Note: This command is only available since RedisTimeSeries >= v1.4 + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) + return self._execute(self.REVRANGE_CMD,await self.redis.execute(self.REVRANGE_CMD, *params)) + # return self.redis.execute(self.REVRANGE_CMD, *params) + + + async def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels): + """ + Internal method to create TS.MRANGE and TS.MREVRANGE arguments + """ + params = [from_time, to_time] + self.appendCount(params, count) + if aggregation_type is not None: + self.appendAggregation(params, aggregation_type, bucket_size_msec) + self.appendWithLabels(params, with_labels) + params.extend(['FILTER']) + params += filters + return params + + async def mrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in forward direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) + return self._execute(self.MRANGE_CMD,await self.redis.execute(self.MRANGE_CMD, *params)) + # return self.redis.execute(self.MRANGE_CMD, *params) + + async def mrevrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in reverse direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) + return self._execute(self.MREVRANGE_CMD,await self.redis.execute(self.MREVRANGE_CMD, *params)) + # return self.redis.execute(self.MREVRANGE_CMD, *params) + + async def get(self, key): + """Gets the last sample of ``key``""" + return self._execute(self.GET_CMD,await self.redis.execute(self.GET_CMD, key)) + # return self.redis.execute(self.GET_CMD, key) + + async def mget(self, filters, with_labels=False): + """Get the last samples matching the specific ``filter``.""" + params = [] + self.appendWithLabels(params, with_labels) + params.extend(['FILTER']) + params += filters + return self._execute(self.MGET_CMD,await self.redis.execute(self.MGET_CMD, *params)) + # return self.redis.execute(self.MGET_CMD, *params) + + async def info(self, key): + """Gets information of ``key``""" + return self._execute(self.INFO_CMD,await self.redis.execute(self.INFO_CMD, key)) + # return self.redis.execute(self.INFO_CMD, key) + + async def queryindex(self, filters): + """Get all the keys matching the ``filter`` list.""" + return self._execute(self.QUERYINDEX_CMD,await self.redis.execute(self.QUERYINDEX_CMD, *filters)) + # return self.redis.execute(self.QUERYINDEX_CMD, *filters) + + async def pipeline(self, transaction=True, shard_hint=None): + """ + Return a new pipeline object that can queue multiple commands for + later execution. ``transaction`` indicates whether all commands + should be executed atomically. Apart from making a group of operations + atomic, pipelines are useful for reducing the back-and-forth overhead + between the client and server. + Overridden in order to provide the right client through the pipeline. + """ + return self.redis.pipeline() + From bfbff350713486215706ab3b20862a062cfa6207 Mon Sep 17 00:00:00 2001 From: ovresko <32199620+ovresko@users.noreply.github.com> Date: Tue, 11 Jan 2022 08:20:04 -0500 Subject: [PATCH 2/4] Delete redistimeseries/client_async directory --- redistimeseries/client_async/client_async.py | 510 ------------------- 1 file changed, 510 deletions(-) delete mode 100644 redistimeseries/client_async/client_async.py diff --git a/redistimeseries/client_async/client_async.py b/redistimeseries/client_async/client_async.py deleted file mode 100644 index f0a3372..0000000 --- a/redistimeseries/client_async/client_async.py +++ /dev/null @@ -1,510 +0,0 @@ -from aioredis import Redis - -class TSInfo(object): - rules = [] - labels = [] - sourceKey = None - chunk_count = None - memory_usage = None - total_samples = None - retention_msecs = None - last_time_stamp = None - first_time_stamp = None - # As of RedisTimeseries >= v1.4 max_samples_per_chunk is deprecated in favor of chunk_size - max_samples_per_chunk = None - chunk_size = None - duplicate_policy = None - - - def __init__(self, args): - response = dict(zip(map(nativestr, args[::2]), args[1::2])) - self.rules = response['rules'] - self.sourceKey = response['sourceKey'] - self.chunkCount = response['chunkCount'] - self.memory_usage = response['memoryUsage'] - self.total_samples = response['totalSamples'] - self.labels = list_to_dict(response['labels']) - self.retention_msecs = response['retentionTime'] - self.lastTimeStamp = response['lastTimestamp'] - self.first_time_stamp = response['firstTimestamp'] - if 'maxSamplesPerChunk' in response: - self.max_samples_per_chunk = response['maxSamplesPerChunk'] - self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes - if 'chunkSize' in response: - self.chunk_size = response['chunkSize'] - if 'duplicatePolicy' in response: - self.duplicate_policy = response['duplicatePolicy'] - if type(self.duplicate_policy) == bytes: - self.duplicate_policy = self.duplicate_policy.decode() - -def list_to_dict(aList): - return {nativestr(aList[i][0]):nativestr(aList[i][1]) - for i in range(len(aList))} - -def parse_range(response): - return [tuple((l[0], float(l[1]))) for l in response] - -def parse_m_range(response): - res = [] - for item in response: - res.append({ nativestr(item[0]) : [list_to_dict(item[1]), - parse_range(item[2])]}) - return res - -def bool_ok(response): - return nativestr(response) == 'OK' - -def parse_get(response): - if response == []: - return None - return (int(response[0]), float(response[1])) - -def parse_m_get(response): - res = [] - for item in response: - if item[2] == []: - res.append({ nativestr(item[0]) : [list_to_dict(item[1]), None, None]}) - else: - res.append({ nativestr(item[0]) : [list_to_dict(item[1]), - int(item[2][0]), float(item[2][1])]}) - - return res - -def parseToList(response): - res = [] - for item in response: - res.append(nativestr(item)) - return res - -class Client(object): #changed from StrictRedis - """ - This class subclasses redis-py's `Redis` and implements - RedisTimeSeries's commands (prefixed with "ts"). - The client allows to interact with RedisTimeSeries and use all of - it's functionality. - """ - - CREATE_CMD = 'TS.CREATE' - ALTER_CMD = 'TS.ALTER' - ADD_CMD = 'TS.ADD' - MADD_CMD = 'TS.MADD' - INCRBY_CMD = 'TS.INCRBY' - DECRBY_CMD = 'TS.DECRBY' - CREATERULE_CMD = 'TS.CREATERULE' - DELETERULE_CMD = 'TS.DELETERULE' - RANGE_CMD = 'TS.RANGE' - REVRANGE_CMD = 'TS.REVRANGE' - MRANGE_CMD = 'TS.MRANGE' - MREVRANGE_CMD = 'TS.MREVRANGE' - GET_CMD = 'TS.GET' - MGET_CMD = 'TS.MGET' - INFO_CMD = 'TS.INFO' - QUERYINDEX_CMD = 'TS.QUERYINDEX' - redis:Redis=None - @classmethod - async def create(conn:Redis): - """ - Creates a new RedisTimeSeries client. - """ - self = Client() - self.redis = conn - - # Set the module commands' callbacks - self.MODULE_CALLBACKS = { - self.CREATE_CMD : bool_ok, - self.ALTER_CMD : bool_ok, - self.CREATERULE_CMD : bool_ok, - self.DELETERULE_CMD : bool_ok, - self.RANGE_CMD : parse_range, - self.REVRANGE_CMD: parse_range, - self.MRANGE_CMD : parse_m_range, - self.MREVRANGE_CMD: parse_m_range, - self.GET_CMD : parse_get, - self.MGET_CMD : parse_m_get, - self.INFO_CMD : TSInfo, - self.QUERYINDEX_CMD : parseToList, - } - # for k in MODULE_CALLBACKS: - # self.redis.set_response_callback(k, MODULE_CALLBACKS[k]) - - @staticmethod - def appendUncompressed(params, uncompressed): - if uncompressed: - params.extend(['UNCOMPRESSED']) - - @staticmethod - def appendWithLabels(params, with_labels): - if with_labels: - params.extend(['WITHLABELS']) - - @staticmethod - def appendRetention(params, retention): - if retention is not None: - params.extend(['RETENTION', retention]) - - @staticmethod - def appendLabels(params, labels): - if labels: - params.append('LABELS') - for k, v in labels.items(): - params.extend([k,v]) - - @staticmethod - def appendCount(params, count): - if count is not None: - params.extend(['COUNT', count]) - - @staticmethod - def appendTimestamp(params, timestamp): - if timestamp is not None: - params.extend(['TIMESTAMP', timestamp]) - - @staticmethod - def appendAggregation(params, aggregation_type, - bucket_size_msec): - params.append('AGGREGATION') - params.extend([aggregation_type, bucket_size_msec]) - - @staticmethod - def appendChunkSize(params, chunk_size): - if chunk_size is not None: - params.extend(['CHUNK_SIZE', chunk_size]) - - @staticmethod - def appendDuplicatePolicy(params, command, duplicate_policy): - if duplicate_policy is not None: - if command == 'TS.ADD': - params.extend(['ON_DUPLICATE', duplicate_policy]) - else: - params.extend(['DUPLICATE_POLICY', duplicate_policy]) - - def _execute(self,cmd:str,response): - if response is not None and cmd in self.CREATE_CMD: - return self.CREATE_CMD[cmd](response) - return response - - async def create(self, key, **kwargs): - """ - Create a new time-series. - - Args: - key: time-series key - retention_msecs: Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses - labels: Set of label-value pairs that represent metadata labels of the key. - chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). - duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) - Can be one of: - - 'block': an error will occur for any out of order sample - - 'first': ignore the new value - - 'last': override with latest value - - 'min': only override if the value is lower than the existing value - - 'max': only override if the value is higher than the existing value - When this is not set, the server-wide default will be used. - """ - retention_msecs = kwargs.get('retention_msecs', None) - uncompressed = kwargs.get('uncompressed', False) - labels = kwargs.get('labels', {}) - chunk_size = kwargs.get('chunk_size', None) - duplicate_policy = kwargs.get('duplicate_policy', None) - params = [key] - self.appendRetention(params, retention_msecs) - self.appendUncompressed(params, uncompressed) - self.appendChunkSize(params, chunk_size) - self.appendDuplicatePolicy(params, self.CREATE_CMD, duplicate_policy) - self.appendLabels(params, labels) - - return self._execute(self.CREATE_CMD,await self.redis.execute(self.CREATE_CMD, *params)) - - async def alter(self, key, **kwargs): - """ - Update the retention, labels of an existing key. The parameters - are the same as TS.CREATE. - """ - retention_msecs = kwargs.get('retention_msecs', None) - labels = kwargs.get('labels', {}) - duplicate_policy = kwargs.get('duplicate_policy', None) - params = [key] - self.appendRetention(params, retention_msecs) - self.appendDuplicatePolicy(params, self.ALTER_CMD, duplicate_policy) - self.appendLabels(params, labels) - - return self._execute(self.ALTER_CMD,await self.redis.execute(self.ALTER_CMD, *params)) - # return await self.redis.execute(self.ALTER_CMD, *params) - - async def add(self, key, timestamp, value, **kwargs): - """ - Append (or create and append) a new sample to the series. - - Args: - key: time-series key - timestamp: timestamp of the sample. * can be used for automatic timestamp (using the system clock). - value: numeric data value of the sample - retention_msecs: Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses - labels: Set of label-value pairs that represent metadata labels of the key. - chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). - duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) - Can be one of: - - 'block': an error will occur for any out of order sample - - 'first': ignore the new value - - 'last': override with latest value - - 'min': only override if the value is lower than the existing value - - 'max': only override if the value is higher than the existing value - When this is not set, the server-wide default will be used. - """ - retention_msecs = kwargs.get('retention_msecs', None) - uncompressed = kwargs.get('uncompressed', False) - labels = kwargs.get('labels', {}) - chunk_size = kwargs.get('chunk_size', None) - duplicate_policy = kwargs.get('duplicate_policy', None) - params = [key, timestamp, value] - self.appendRetention(params, retention_msecs) - self.appendUncompressed(params, uncompressed) - self.appendChunkSize(params, chunk_size) - self.appendDuplicatePolicy(params, self.ADD_CMD, duplicate_policy) - self.appendLabels(params, labels) - return self._execute(self.ADD_CMD,await self.redis.execute(self.ADD_CMD, *params)) - # return await self.redis.execute(self.ADD_CMD, *params) - - async def madd(self, ktv_tuples): - """ - Appends (or creates and appends) a new ``value`` to series - ``key`` with ``timestamp``. Expects a list of ``tuples`` as - (``key``,``timestamp``, ``value``). Return value is an - array with timestamps of insertions. - """ - params = [] - for ktv in ktv_tuples: - for item in ktv: - params.append(item) - - return self._execute(self.MADD_CMD,await self.redis.execute(self.MADD_CMD, *params)) - # return await self.redis.execute(self.MADD_CMD, *params) - - async def incrby(self, key, value, **kwargs): - """ - Increment (or create an time-series and increment) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. - - Args: - key: time-series key - value: numeric data value of the sample - timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). - retention_msecs: Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses - labels: Set of label-value pairs that represent metadata labels of the key. - chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). - """ - timestamp = kwargs.get('timestamp', None) - retention_msecs = kwargs.get('retention_msecs', None) - uncompressed = kwargs.get('uncompressed', False) - labels = kwargs.get('labels', {}) - chunk_size = kwargs.get('chunk_size', None) - params = [key, value] - self.appendTimestamp(params, timestamp) - self.appendRetention(params, retention_msecs) - self.appendUncompressed(params, uncompressed) - self.appendChunkSize(params, chunk_size) - self.appendLabels(params, labels) - return self._execute(self.INCRBY_CMD,await self.redis.execute(self.INCRBY_CMD, *params)) - # return self.redis.execute(self.INCRBY_CMD, *params) - - async def decrby(self, key, value, **kwargs): - """ - Decrement (or create an time-series and decrement) the latest sample's of a series. - This command can be used as a counter or gauge that automatically gets history as a time series. - - Args: - key: time-series key - value: numeric data value of the sample - timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). - retention_msecs: Maximum age for samples compared to last event time (in milliseconds). - If None or 0 is passed then the series is not trimmed at all. - uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses - labels: Set of label-value pairs that represent metadata labels of the key. - chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). - """ - timestamp = kwargs.get('timestamp', None) - retention_msecs = kwargs.get('retention_msecs', None) - uncompressed = kwargs.get('uncompressed', False) - labels = kwargs.get('labels', {}) - chunk_size = kwargs.get('chunk_size', None) - params = [key, value] - self.appendTimestamp(params, timestamp) - self.appendRetention(params, retention_msecs) - self.appendUncompressed(params, uncompressed) - self.appendChunkSize(params, chunk_size) - self.appendLabels(params, labels) - return self._execute(self.DECRBY_CMD,await self.redis.execute(self.DECRBY_CMD, *params)) - # return self.redis.execute(self.DECRBY_CMD, *params) - - async def createrule(self, source_key, dest_key, - aggregation_type, bucket_size_msec): - """ - Creates a compaction rule from values added to ``source_key`` - into ``dest_key``. Aggregating for ``bucket_size_msec`` where an - ``aggregation_type`` can be ['avg', 'sum', 'min', 'max', - 'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - """ - params=[source_key, dest_key] - self.appendAggregation(params, aggregation_type, bucket_size_msec) - return self._execute(self.CREATERULE_CMD,await self.redis.execute(self.CREATERULE_CMD, *params)) - # return self.redis.execute(self.CREATERULE_CMD, *params) - - async def deleterule(self, source_key, dest_key): - """Deletes a compaction rule""" - return self._execute(self.DELETERULE_CMD,await self.redis.execute(self.DELETERULE_CMD, source_key, dest_key)) - # return self.redis.execute(self.DELETERULE_CMD, source_key, dest_key) - - async def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec): - """ - Internal method to create TS.RANGE and TS.REVRANGE arguments - """ - params = [key, from_time, to_time] - self.appendCount(params, count) - if aggregation_type is not None: - self.appendAggregation(params, aggregation_type, bucket_size_msec) - return params - - async def range(self, key, from_time, to_time, count=None, - aggregation_type=None, bucket_size_msec=0): - """ - Query a range in forward direction for a specific time-serie. - - Args: - key: Key name for timeseries. - from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. - count: Optional maximum number of returned results. - aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', - 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - bucket_size_msec: Time bucket for aggregation in milliseconds. - """ - params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) - return self._execute(self.RANGE_CMD,await self.redis.execute(self.RANGE_CMD, *params)) - # return self.redis.execute(self.RANGE_CMD, *params) - - async def revrange(self, key, from_time, to_time, count=None, - aggregation_type=None, bucket_size_msec=0): - """ - Query a range in reverse direction for a specific time-serie. - Note: This command is only available since RedisTimeSeries >= v1.4 - - Args: - key: Key name for timeseries. - from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. - count: Optional maximum number of returned results. - aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', - 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - bucket_size_msec: Time bucket for aggregation in milliseconds. - """ - params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) - return self._execute(self.REVRANGE_CMD,await self.redis.execute(self.REVRANGE_CMD, *params)) - # return self.redis.execute(self.REVRANGE_CMD, *params) - - - async def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels): - """ - Internal method to create TS.MRANGE and TS.MREVRANGE arguments - """ - params = [from_time, to_time] - self.appendCount(params, count) - if aggregation_type is not None: - self.appendAggregation(params, aggregation_type, bucket_size_msec) - self.appendWithLabels(params, with_labels) - params.extend(['FILTER']) - params += filters - return params - - async def mrange(self, from_time, to_time, filters, count=None, - aggregation_type=None, bucket_size_msec=0, with_labels=False): - """ - Query a range across multiple time-series by filters in forward direction. - - Args: - from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. - filters: filter to match the time-series labels. - count: Optional maximum number of returned results. - aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', - 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - bucket_size_msec: Time bucket for aggregation in milliseconds. - with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. - If this argument is not set, by default, an empty Array will be replied on the labels array position. - """ - params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) - return self._execute(self.MRANGE_CMD,await self.redis.execute(self.MRANGE_CMD, *params)) - # return self.redis.execute(self.MRANGE_CMD, *params) - - async def mrevrange(self, from_time, to_time, filters, count=None, - aggregation_type=None, bucket_size_msec=0, with_labels=False): - """ - Query a range across multiple time-series by filters in reverse direction. - - Args: - from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). - to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. - filters: filter to match the time-series labels. - count: Optional maximum number of returned results. - aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', - 'last', 'std.p', 'std.s', 'var.p', 'var.s'] - bucket_size_msec: Time bucket for aggregation in milliseconds. - with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. - If this argument is not set, by default, an empty Array will be replied on the labels array position. - """ - params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) - return self._execute(self.MREVRANGE_CMD,await self.redis.execute(self.MREVRANGE_CMD, *params)) - # return self.redis.execute(self.MREVRANGE_CMD, *params) - - async def get(self, key): - """Gets the last sample of ``key``""" - return self._execute(self.GET_CMD,await self.redis.execute(self.GET_CMD, key)) - # return self.redis.execute(self.GET_CMD, key) - - async def mget(self, filters, with_labels=False): - """Get the last samples matching the specific ``filter``.""" - params = [] - self.appendWithLabels(params, with_labels) - params.extend(['FILTER']) - params += filters - return self._execute(self.MGET_CMD,await self.redis.execute(self.MGET_CMD, *params)) - # return self.redis.execute(self.MGET_CMD, *params) - - async def info(self, key): - """Gets information of ``key``""" - return self._execute(self.INFO_CMD,await self.redis.execute(self.INFO_CMD, key)) - # return self.redis.execute(self.INFO_CMD, key) - - async def queryindex(self, filters): - """Get all the keys matching the ``filter`` list.""" - return self._execute(self.QUERYINDEX_CMD,await self.redis.execute(self.QUERYINDEX_CMD, *filters)) - # return self.redis.execute(self.QUERYINDEX_CMD, *filters) - - async def pipeline(self, transaction=True, shard_hint=None): - """ - Return a new pipeline object that can queue multiple commands for - later execution. ``transaction`` indicates whether all commands - should be executed atomically. Apart from making a group of operations - atomic, pipelines are useful for reducing the back-and-forth overhead - between the client and server. - Overridden in order to provide the right client through the pipeline. - """ - return self.redis.pipeline() - From c8f5e1d11060c5daa6ee35bf8af0cb275c928543 Mon Sep 17 00:00:00 2001 From: ovresko <32199620+ovresko@users.noreply.github.com> Date: Tue, 11 Jan 2022 08:20:28 -0500 Subject: [PATCH 3/4] Create client_async.py Support for aioredis 1.3.1 and async await --- redistimeseries/client_async.py | 510 ++++++++++++++++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 redistimeseries/client_async.py diff --git a/redistimeseries/client_async.py b/redistimeseries/client_async.py new file mode 100644 index 0000000..f0a3372 --- /dev/null +++ b/redistimeseries/client_async.py @@ -0,0 +1,510 @@ +from aioredis import Redis + +class TSInfo(object): + rules = [] + labels = [] + sourceKey = None + chunk_count = None + memory_usage = None + total_samples = None + retention_msecs = None + last_time_stamp = None + first_time_stamp = None + # As of RedisTimeseries >= v1.4 max_samples_per_chunk is deprecated in favor of chunk_size + max_samples_per_chunk = None + chunk_size = None + duplicate_policy = None + + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.rules = response['rules'] + self.sourceKey = response['sourceKey'] + self.chunkCount = response['chunkCount'] + self.memory_usage = response['memoryUsage'] + self.total_samples = response['totalSamples'] + self.labels = list_to_dict(response['labels']) + self.retention_msecs = response['retentionTime'] + self.lastTimeStamp = response['lastTimestamp'] + self.first_time_stamp = response['firstTimestamp'] + if 'maxSamplesPerChunk' in response: + self.max_samples_per_chunk = response['maxSamplesPerChunk'] + self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes + if 'chunkSize' in response: + self.chunk_size = response['chunkSize'] + if 'duplicatePolicy' in response: + self.duplicate_policy = response['duplicatePolicy'] + if type(self.duplicate_policy) == bytes: + self.duplicate_policy = self.duplicate_policy.decode() + +def list_to_dict(aList): + return {nativestr(aList[i][0]):nativestr(aList[i][1]) + for i in range(len(aList))} + +def parse_range(response): + return [tuple((l[0], float(l[1]))) for l in response] + +def parse_m_range(response): + res = [] + for item in response: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + parse_range(item[2])]}) + return res + +def bool_ok(response): + return nativestr(response) == 'OK' + +def parse_get(response): + if response == []: + return None + return (int(response[0]), float(response[1])) + +def parse_m_get(response): + res = [] + for item in response: + if item[2] == []: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), None, None]}) + else: + res.append({ nativestr(item[0]) : [list_to_dict(item[1]), + int(item[2][0]), float(item[2][1])]}) + + return res + +def parseToList(response): + res = [] + for item in response: + res.append(nativestr(item)) + return res + +class Client(object): #changed from StrictRedis + """ + This class subclasses redis-py's `Redis` and implements + RedisTimeSeries's commands (prefixed with "ts"). + The client allows to interact with RedisTimeSeries and use all of + it's functionality. + """ + + CREATE_CMD = 'TS.CREATE' + ALTER_CMD = 'TS.ALTER' + ADD_CMD = 'TS.ADD' + MADD_CMD = 'TS.MADD' + INCRBY_CMD = 'TS.INCRBY' + DECRBY_CMD = 'TS.DECRBY' + CREATERULE_CMD = 'TS.CREATERULE' + DELETERULE_CMD = 'TS.DELETERULE' + RANGE_CMD = 'TS.RANGE' + REVRANGE_CMD = 'TS.REVRANGE' + MRANGE_CMD = 'TS.MRANGE' + MREVRANGE_CMD = 'TS.MREVRANGE' + GET_CMD = 'TS.GET' + MGET_CMD = 'TS.MGET' + INFO_CMD = 'TS.INFO' + QUERYINDEX_CMD = 'TS.QUERYINDEX' + redis:Redis=None + @classmethod + async def create(conn:Redis): + """ + Creates a new RedisTimeSeries client. + """ + self = Client() + self.redis = conn + + # Set the module commands' callbacks + self.MODULE_CALLBACKS = { + self.CREATE_CMD : bool_ok, + self.ALTER_CMD : bool_ok, + self.CREATERULE_CMD : bool_ok, + self.DELETERULE_CMD : bool_ok, + self.RANGE_CMD : parse_range, + self.REVRANGE_CMD: parse_range, + self.MRANGE_CMD : parse_m_range, + self.MREVRANGE_CMD: parse_m_range, + self.GET_CMD : parse_get, + self.MGET_CMD : parse_m_get, + self.INFO_CMD : TSInfo, + self.QUERYINDEX_CMD : parseToList, + } + # for k in MODULE_CALLBACKS: + # self.redis.set_response_callback(k, MODULE_CALLBACKS[k]) + + @staticmethod + def appendUncompressed(params, uncompressed): + if uncompressed: + params.extend(['UNCOMPRESSED']) + + @staticmethod + def appendWithLabels(params, with_labels): + if with_labels: + params.extend(['WITHLABELS']) + + @staticmethod + def appendRetention(params, retention): + if retention is not None: + params.extend(['RETENTION', retention]) + + @staticmethod + def appendLabels(params, labels): + if labels: + params.append('LABELS') + for k, v in labels.items(): + params.extend([k,v]) + + @staticmethod + def appendCount(params, count): + if count is not None: + params.extend(['COUNT', count]) + + @staticmethod + def appendTimestamp(params, timestamp): + if timestamp is not None: + params.extend(['TIMESTAMP', timestamp]) + + @staticmethod + def appendAggregation(params, aggregation_type, + bucket_size_msec): + params.append('AGGREGATION') + params.extend([aggregation_type, bucket_size_msec]) + + @staticmethod + def appendChunkSize(params, chunk_size): + if chunk_size is not None: + params.extend(['CHUNK_SIZE', chunk_size]) + + @staticmethod + def appendDuplicatePolicy(params, command, duplicate_policy): + if duplicate_policy is not None: + if command == 'TS.ADD': + params.extend(['ON_DUPLICATE', duplicate_policy]) + else: + params.extend(['DUPLICATE_POLICY', duplicate_policy]) + + def _execute(self,cmd:str,response): + if response is not None and cmd in self.CREATE_CMD: + return self.CREATE_CMD[cmd](response) + return response + + async def create(self, key, **kwargs): + """ + Create a new time-series. + + Args: + key: time-series key + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key] + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.CREATE_CMD, duplicate_policy) + self.appendLabels(params, labels) + + return self._execute(self.CREATE_CMD,await self.redis.execute(self.CREATE_CMD, *params)) + + async def alter(self, key, **kwargs): + """ + Update the retention, labels of an existing key. The parameters + are the same as TS.CREATE. + """ + retention_msecs = kwargs.get('retention_msecs', None) + labels = kwargs.get('labels', {}) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key] + self.appendRetention(params, retention_msecs) + self.appendDuplicatePolicy(params, self.ALTER_CMD, duplicate_policy) + self.appendLabels(params, labels) + + return self._execute(self.ALTER_CMD,await self.redis.execute(self.ALTER_CMD, *params)) + # return await self.redis.execute(self.ALTER_CMD, *params) + + async def add(self, key, timestamp, value, **kwargs): + """ + Append (or create and append) a new sample to the series. + + Args: + key: time-series key + timestamp: timestamp of the sample. * can be used for automatic timestamp (using the system clock). + value: numeric data value of the sample + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample + - 'first': ignore the new value + - 'last': override with latest value + - 'min': only override if the value is lower than the existing value + - 'max': only override if the value is higher than the existing value + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + duplicate_policy = kwargs.get('duplicate_policy', None) + params = [key, timestamp, value] + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendDuplicatePolicy(params, self.ADD_CMD, duplicate_policy) + self.appendLabels(params, labels) + return self._execute(self.ADD_CMD,await self.redis.execute(self.ADD_CMD, *params)) + # return await self.redis.execute(self.ADD_CMD, *params) + + async def madd(self, ktv_tuples): + """ + Appends (or creates and appends) a new ``value`` to series + ``key`` with ``timestamp``. Expects a list of ``tuples`` as + (``key``,``timestamp``, ``value``). Return value is an + array with timestamps of insertions. + """ + params = [] + for ktv in ktv_tuples: + for item in ktv: + params.append(item) + + return self._execute(self.MADD_CMD,await self.redis.execute(self.MADD_CMD, *params)) + # return await self.redis.execute(self.MADD_CMD, *params) + + async def incrby(self, key, value, **kwargs): + """ + Increment (or create an time-series and increment) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. + + Args: + key: time-series key + value: numeric data value of the sample + timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + params = [key, value] + self.appendTimestamp(params, timestamp) + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendLabels(params, labels) + return self._execute(self.INCRBY_CMD,await self.redis.execute(self.INCRBY_CMD, *params)) + # return self.redis.execute(self.INCRBY_CMD, *params) + + async def decrby(self, key, value, **kwargs): + """ + Decrement (or create an time-series and decrement) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. + + Args: + key: time-series key + value: numeric data value of the sample + timestamp: timestamp of the sample. None can be used for automatic timestamp (using the system clock). + retention_msecs: Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses + labels: Set of label-value pairs that represent metadata labels of the key. + chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get('timestamp', None) + retention_msecs = kwargs.get('retention_msecs', None) + uncompressed = kwargs.get('uncompressed', False) + labels = kwargs.get('labels', {}) + chunk_size = kwargs.get('chunk_size', None) + params = [key, value] + self.appendTimestamp(params, timestamp) + self.appendRetention(params, retention_msecs) + self.appendUncompressed(params, uncompressed) + self.appendChunkSize(params, chunk_size) + self.appendLabels(params, labels) + return self._execute(self.DECRBY_CMD,await self.redis.execute(self.DECRBY_CMD, *params)) + # return self.redis.execute(self.DECRBY_CMD, *params) + + async def createrule(self, source_key, dest_key, + aggregation_type, bucket_size_msec): + """ + Creates a compaction rule from values added to ``source_key`` + into ``dest_key``. Aggregating for ``bucket_size_msec`` where an + ``aggregation_type`` can be ['avg', 'sum', 'min', 'max', + 'range', 'count', 'first', 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + """ + params=[source_key, dest_key] + self.appendAggregation(params, aggregation_type, bucket_size_msec) + return self._execute(self.CREATERULE_CMD,await self.redis.execute(self.CREATERULE_CMD, *params)) + # return self.redis.execute(self.CREATERULE_CMD, *params) + + async def deleterule(self, source_key, dest_key): + """Deletes a compaction rule""" + return self._execute(self.DELETERULE_CMD,await self.redis.execute(self.DELETERULE_CMD, source_key, dest_key)) + # return self.redis.execute(self.DELETERULE_CMD, source_key, dest_key) + + async def __range_params(self, key, from_time, to_time, count, aggregation_type, bucket_size_msec): + """ + Internal method to create TS.RANGE and TS.REVRANGE arguments + """ + params = [key, from_time, to_time] + self.appendCount(params, count) + if aggregation_type is not None: + self.appendAggregation(params, aggregation_type, bucket_size_msec) + return params + + async def range(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): + """ + Query a range in forward direction for a specific time-serie. + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) + return self._execute(self.RANGE_CMD,await self.redis.execute(self.RANGE_CMD, *params)) + # return self.redis.execute(self.RANGE_CMD, *params) + + async def revrange(self, key, from_time, to_time, count=None, + aggregation_type=None, bucket_size_msec=0): + """ + Query a range in reverse direction for a specific time-serie. + Note: This command is only available since RedisTimeSeries >= v1.4 + + Args: + key: Key name for timeseries. + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + """ + params = self.__range_params(key, from_time, to_time, count, aggregation_type, bucket_size_msec) + return self._execute(self.REVRANGE_CMD,await self.redis.execute(self.REVRANGE_CMD, *params)) + # return self.redis.execute(self.REVRANGE_CMD, *params) + + + async def __mrange_params(self, aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels): + """ + Internal method to create TS.MRANGE and TS.MREVRANGE arguments + """ + params = [from_time, to_time] + self.appendCount(params, count) + if aggregation_type is not None: + self.appendAggregation(params, aggregation_type, bucket_size_msec) + self.appendWithLabels(params, with_labels) + params.extend(['FILTER']) + params += filters + return params + + async def mrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in forward direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) + return self._execute(self.MRANGE_CMD,await self.redis.execute(self.MRANGE_CMD, *params)) + # return self.redis.execute(self.MRANGE_CMD, *params) + + async def mrevrange(self, from_time, to_time, filters, count=None, + aggregation_type=None, bucket_size_msec=0, with_labels=False): + """ + Query a range across multiple time-series by filters in reverse direction. + + Args: + from_time: Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. + filters: filter to match the time-series labels. + count: Optional maximum number of returned results. + aggregation_type: Optional aggregation type. Can be one of ['avg', 'sum', 'min', 'max', 'range', 'count', 'first', + 'last', 'std.p', 'std.s', 'var.p', 'var.s'] + bucket_size_msec: Time bucket for aggregation in milliseconds. + with_labels: Include in the reply the label-value pairs that represent metadata labels of the time-series. + If this argument is not set, by default, an empty Array will be replied on the labels array position. + """ + params = self.__mrange_params(aggregation_type, bucket_size_msec, count, filters, from_time, to_time, with_labels) + return self._execute(self.MREVRANGE_CMD,await self.redis.execute(self.MREVRANGE_CMD, *params)) + # return self.redis.execute(self.MREVRANGE_CMD, *params) + + async def get(self, key): + """Gets the last sample of ``key``""" + return self._execute(self.GET_CMD,await self.redis.execute(self.GET_CMD, key)) + # return self.redis.execute(self.GET_CMD, key) + + async def mget(self, filters, with_labels=False): + """Get the last samples matching the specific ``filter``.""" + params = [] + self.appendWithLabels(params, with_labels) + params.extend(['FILTER']) + params += filters + return self._execute(self.MGET_CMD,await self.redis.execute(self.MGET_CMD, *params)) + # return self.redis.execute(self.MGET_CMD, *params) + + async def info(self, key): + """Gets information of ``key``""" + return self._execute(self.INFO_CMD,await self.redis.execute(self.INFO_CMD, key)) + # return self.redis.execute(self.INFO_CMD, key) + + async def queryindex(self, filters): + """Get all the keys matching the ``filter`` list.""" + return self._execute(self.QUERYINDEX_CMD,await self.redis.execute(self.QUERYINDEX_CMD, *filters)) + # return self.redis.execute(self.QUERYINDEX_CMD, *filters) + + async def pipeline(self, transaction=True, shard_hint=None): + """ + Return a new pipeline object that can queue multiple commands for + later execution. ``transaction`` indicates whether all commands + should be executed atomically. Apart from making a group of operations + atomic, pipelines are useful for reducing the back-and-forth overhead + between the client and server. + Overridden in order to provide the right client through the pipeline. + """ + return self.redis.pipeline() + From 1f95528f03444a05c9cf44d97a6d7472c299f261 Mon Sep 17 00:00:00 2001 From: ovresko <32199620+ovresko@users.noreply.github.com> Date: Tue, 11 Jan 2022 08:24:20 -0500 Subject: [PATCH 4/4] typo --- redistimeseries/client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redistimeseries/client_async.py b/redistimeseries/client_async.py index f0a3372..4165965 100644 --- a/redistimeseries/client_async.py +++ b/redistimeseries/client_async.py @@ -179,8 +179,8 @@ def appendDuplicatePolicy(params, command, duplicate_policy): params.extend(['DUPLICATE_POLICY', duplicate_policy]) def _execute(self,cmd:str,response): - if response is not None and cmd in self.CREATE_CMD: - return self.CREATE_CMD[cmd](response) + if response is not None and cmd in self.MODULE_CALLBACKS: + return self.MODULE_CALLBACKS[cmd](response) return response async def create(self, key, **kwargs):