|
3 | 3 |
|
4 | 4 | The data model consists of tables, records, columns.
|
5 | 5 | """
|
| 6 | +import codecs |
| 7 | +import csv |
| 8 | +from http.client import HTTPResponse |
6 | 9 | from json import JSONEncoder
|
| 10 | +from typing import List, Iterator |
| 11 | +from influxdb_client.rest import _UTF_8_encoding |
7 | 12 |
|
8 | 13 |
|
9 | 14 | class FluxStructure:
|
@@ -137,3 +142,145 @@ def __str__(self):
|
137 | 142 | def __repr__(self):
|
138 | 143 | """Format for inspection."""
|
139 | 144 | return f"<{type(self).__name__}: field={self.values.get('_field')}, value={self.values.get('_value')}>"
|
| 145 | + |
| 146 | + |
| 147 | +class TableList(List[FluxTable]): |
| 148 | + """:class:`~influxdb_client.client.flux_table.FluxTable` list with additionally functional to better handle of query result.""" # noqa: E501 |
| 149 | + |
| 150 | + def to_values(self, columns: List['str'] = None) -> List[List[object]]: |
| 151 | + """ |
| 152 | + Serialize query results to a flattened list of values. |
| 153 | +
|
| 154 | + :param columns: if not ``None`` then only specified columns are presented in results |
| 155 | + :return: :class:`~list` of values |
| 156 | +
|
| 157 | + Output example: |
| 158 | +
|
| 159 | + .. code-block:: python |
| 160 | +
|
| 161 | + [ |
| 162 | + ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], |
| 163 | + ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], |
| 164 | + ... |
| 165 | + ] |
| 166 | +
|
| 167 | + Configure required columns: |
| 168 | +
|
| 169 | + .. code-block:: python |
| 170 | +
|
| 171 | + from influxdb_client import InfluxDBClient |
| 172 | +
|
| 173 | + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: |
| 174 | +
|
| 175 | + # Query: using Table structure |
| 176 | + tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') |
| 177 | +
|
| 178 | + # Serialize to values |
| 179 | + output = tables.to_values(columns=['location', '_time', '_value']) |
| 180 | + print(output) |
| 181 | + """ |
| 182 | + |
| 183 | + def filter_values(record): |
| 184 | + if columns is not None: |
| 185 | + return [record.values.get(k) for k in columns] |
| 186 | + return record.values.values() |
| 187 | + |
| 188 | + return self._to_values(filter_values) |
| 189 | + |
| 190 | + def to_json(self, columns: List['str'] = None, **kwargs) -> str: |
| 191 | + """ |
| 192 | + Serialize query results to a JSON formatted :class:`~str`. |
| 193 | +
|
| 194 | + :param columns: if not ``None`` then only specified columns are presented in results |
| 195 | + :return: :class:`~str` |
| 196 | +
|
| 197 | + The query results is flattened to array: |
| 198 | +
|
| 199 | + .. code-block:: javascript |
| 200 | +
|
| 201 | + [ |
| 202 | + { |
| 203 | + "_measurement": "mem", |
| 204 | + "_start": "2021-06-23T06:50:11.897825+00:00", |
| 205 | + "_stop": "2021-06-25T06:50:11.897825+00:00", |
| 206 | + "_time": "2020-02-27T16:20:00.897825+00:00", |
| 207 | + "region": "north", |
| 208 | + "_field": "usage", |
| 209 | + "_value": 15 |
| 210 | + }, |
| 211 | + { |
| 212 | + "_measurement": "mem", |
| 213 | + "_start": "2021-06-23T06:50:11.897825+00:00", |
| 214 | + "_stop": "2021-06-25T06:50:11.897825+00:00", |
| 215 | + "_time": "2020-02-27T16:20:01.897825+00:00", |
| 216 | + "region": "west", |
| 217 | + "_field": "usage", |
| 218 | + "_value": 10 |
| 219 | + }, |
| 220 | + ... |
| 221 | + ] |
| 222 | +
|
| 223 | + The JSON format could be configured via ``**kwargs`` arguments: |
| 224 | +
|
| 225 | + .. code-block:: python |
| 226 | +
|
| 227 | + from influxdb_client import InfluxDBClient |
| 228 | +
|
| 229 | + with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: |
| 230 | +
|
| 231 | + # Query: using Table structure |
| 232 | + tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') |
| 233 | +
|
| 234 | + # Serialize to JSON |
| 235 | + output = tables.to_json(indent=5) |
| 236 | + print(output) |
| 237 | +
|
| 238 | + For all available options see - `json.dump <https://docs.python.org/3/library/json.html#json.dump>`_. |
| 239 | + """ |
| 240 | + if 'indent' not in kwargs: |
| 241 | + kwargs['indent'] = 2 |
| 242 | + |
| 243 | + def filter_values(record): |
| 244 | + if columns is not None: |
| 245 | + return {k: v for (k, v) in record.values.items() if k in columns} |
| 246 | + return record.values |
| 247 | + |
| 248 | + import json |
| 249 | + return json.dumps(self._to_values(filter_values), cls=FluxStructureEncoder, **kwargs) |
| 250 | + |
| 251 | + def _to_values(self, mapping): |
| 252 | + return [mapping(record) for table in self for record in table.records] |
| 253 | + |
| 254 | + |
| 255 | +class CSVIterator(Iterator[List[str]]): |
| 256 | + """:class:`Iterator[List[str]]` with additionally functional to better handle of query result.""" |
| 257 | + |
| 258 | + def __init__(self, response: HTTPResponse) -> None: |
| 259 | + """Initialize ``csv.reader``.""" |
| 260 | + self.delegate = csv.reader(codecs.iterdecode(response, _UTF_8_encoding)) |
| 261 | + |
| 262 | + def __iter__(self): |
| 263 | + """Return an iterator object.""" |
| 264 | + return self.delegate.__iter__() |
| 265 | + |
| 266 | + def __next__(self): |
| 267 | + """Retrieve the next item from the iterator.""" |
| 268 | + return self.delegate.__next__() |
| 269 | + |
| 270 | + def to_values(self) -> List[List[str]]: |
| 271 | + """ |
| 272 | + Serialize query results to a flattened list of values. |
| 273 | +
|
| 274 | + :return: :class:`~list` of values |
| 275 | +
|
| 276 | + Output example: |
| 277 | +
|
| 278 | + .. code-block:: python |
| 279 | +
|
| 280 | + [ |
| 281 | + ['New York', '2022-06-14T08:00:51.749072045Z', '24.3'], |
| 282 | + ['Prague', '2022-06-14T08:00:51.749072045Z', '25.3'], |
| 283 | + ... |
| 284 | + ] |
| 285 | + """ |
| 286 | + return list(self.delegate) |
0 commit comments