From 88336576de0edff757157d75db37f327ad2a5461 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Wed, 23 Apr 2025 01:10:11 -0400 Subject: [PATCH 1/9] update to use new InfluxDB3 source --- python/sources/influxdb_3/library.json | 102 +++++++++-- python/sources/influxdb_3/main.py | 195 +++++++-------------- python/sources/influxdb_3/requirements.txt | 6 +- 3 files changed, 161 insertions(+), 142 deletions(-) diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index a44a34304..16c83942b 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -18,15 +18,7 @@ "Type": "EnvironmentVariable", "InputType": "OutputTopic", "Description": "This is the Kafka topic that will receive the query results", - "DefaultValue": "influxdbv3-data", - "Required": true - }, - { - "Name": "task_interval", - "Type": "EnvironmentVariable", - "InputType": "FreeText", - "Description": "Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1y", - "DefaultValue": "5m", + "DefaultValue": "influxdbv3-data-source", "Required": true }, { @@ -62,11 +54,97 @@ "Required": true }, { - "Name": "INFLUXDB_MEASUREMENT_NAME", + "Name": "INFLUXDB_QUERY_MEASUREMENTS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The measurements to query. If None, all measurements will be processed.", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_KEY_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB record column used for the Kafka message key, else uses the measurement's name", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time)", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_MEASUREMENT_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The column name used for inserting the measurement name, else uses '_measurement'", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_SQL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "A custom SQL query to use for retrieving data from InfluxDB, else uses default", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_START_DATE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The RFC3339-formatted start time for querying InfluxDB, else uses current runtime", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_END_DATE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_TIME_DELTA", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Time interval for batching queries, e.g., '5m' for 5 minutes", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_MAX_RETRIES", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Maximum number of retries for querying or producing (with multiplicative backoff)", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_DELAY_SECONDS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Add an optional delay (in seconds) between producing batches", + "Required": false + }, + { + "Name": "CONSUMER_GROUP_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The name of the consumer group to use when consuming from Kafka", + "DefaultValue": "influxdb-sink", + "Required": true + }, + { + "Name": "BUFFER_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of records that sink holds before flush data to the InfluxDb", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "BUFFER_TIMEOUT", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used", - "DefaultValue": "", + "Description": "The number of seconds that sink holds before flush data to the InfluxDb", + "DefaultValue": "1", "Required": false } ], diff --git a/python/sources/influxdb_3/main.py b/python/sources/influxdb_3/main.py index 7910df06f..4500836b0 100644 --- a/python/sources/influxdb_3/main.py +++ b/python/sources/influxdb_3/main.py @@ -1,138 +1,79 @@ -# Import utility modules +from dateutil import parser import os -import random -import json -import logging -from time import sleep +import inspect -# import vendor-specific libraries from quixstreams import Application -from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext -import influxdb_client_3 as InfluxDBClient3 +from quixstreams.sources.community.influxdb3 import InfluxDB3Source # for local dev, load env vars from a .env file from dotenv import load_dotenv load_dotenv() -# Initialize logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Create a Quix Application -app = Application(use_changelog_topics=False) - -# Define a serializer for messages, using JSON Serializer for ease -serializer = JSONSerializer() - -# Define the topic using the "output" environment variable -topic_name = os.environ["output"] -topic = app.topic(topic_name) - -influxdb3_client = InfluxDBClient3.InfluxDBClient3(token=os.environ["INFLUXDB_TOKEN"], - host=os.environ["INFLUXDB_HOST"], - org=os.environ["INFLUXDB_ORG"], - database=os.environ["INFLUXDB_DATABASE"]) - -measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", os.environ["output"]) -interval = os.environ.get("task_interval", "5m") - -# Global variable to control the main loop's execution -run = True - -# InfluxDB interval-to-seconds conversion dictionary -UNIT_SECONDS = { - "s": 1, - "m": 60, - "h": 3600, - "d": 86400, - "w": 604800, - "y": 31536000, -} - -# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing. -# This function is useful for determining the frequency of certain operations. -def interval_to_seconds(interval: str) -> int: - try: - return int(interval[:-1]) * UNIT_SECONDS[interval[-1]] - except ValueError as e: - if "invalid literal" in str(e): - raise ValueError( - "interval format is {int}{unit} i.e. '10h'; " - f"valid units: {list(UNIT_SECONDS.keys())}") - except KeyError: - raise ValueError( - f"Unknown interval unit: {interval[-1]}; " - f"valid units: {list(UNIT_SECONDS.keys())}") - -interval_seconds = interval_to_seconds(interval) - -# Function to fetch data from InfluxDB and send it to Quix -# It runs in a continuous loop, periodically fetching data based on the interval. -def get_data(): - # Run in a loop until the main thread is terminated - while run: - try: - query_definition = f'SELECT * FROM "{measurement_name}" WHERE time >= now() - {interval}' - print(f"Sending query {query_definition}") - # Query InfluxDB 3.0 using influxql or sql - table = influxdb3_client.query( - query=query_definition, - mode="pandas", - language="influxql") - - table = table.drop(columns=["iox::measurement"]) - - # If there are rows to write to the stream at this time - if not table.empty: - # Convert to JSON for JSON-to-bytes serializer - json_result = table.to_json(orient='records', date_format='iso') - yield json_result - print(f"Query retrieved {table.size} records") - else: - print("No new data to publish.") - - # Wait for the next interval - sleep(interval_seconds) - - except Exception as e: - print("query failed", flush=True) - print(f"error: {e}", flush=True) - sleep(1) - -def main(): + +def get_kwargs_defaults() -> dict[str, Any]: """ - Read data from the Query and publish it to Kafka + Gets the default kwargs of MongoDBSink so they can be passed in instances + where the user did not provide an environment variable. """ + params = inspect.signature(InfluxDB3Source.__init__).parameters.values() + return { + param.name: param.default for param in params if + param.default is not inspect.Parameter.empty + } + + +def _measurements(): + if measurements := os.getenv("INFLUXDB_QUERY_MEASUREMENTS"): + return measurements.split(',') + + +def _key_setter(): + if column := os.getenv("INFLUXDB_RECORD_KEY_COLUMN"): + return lambda record: record[column] + + +def _timestamp_setter(): + if column := os.getenv("INFLUXDB_RECORD_TIMESTAMP_COLUMN"): + return lambda record: record[column] + + +def _start_date(): + if date := os.getenv("INFLUXDB_QUERY_START_DATE"): + return parser.parse(date) + + +def _end_date(): + if date := os.getenv("INFLUXDB_QUERY_END_DATE"): + return parser.parse(date) + + +kwargs_defaults = get_kwargs_defaults() +influxdb3_source = InfluxDB3Source( + host=os.environ["INFLUXDB_HOST"], + token=os.environ["INFLUXDB_TOKEN"], + organization_id=os.environ["INFLUXDB_ORG"], + database=os.environ["INFLUXDB_DATABASE"], + measurements=_measurements() or kwargs_defaults["measurements"], + key_setter=_key_setter() or kwargs_defaults["key_setter"], + timestamp_setter=_timestamp_setter() or kwargs_defaults["timestamp_setter"], + measurement_column_name=os.getenv("INFLUXDB_RECORD_MEASUREMENT_COLUMN") or kwargs_defaults["measurement_column_name"], + sql_query=os.getenv("INFLUXDB_QUERY_SQL") or kwargs_defaults["sql_query"], + start_date=_start_date() or kwargs_defaults["start_date"], + end_date=_end_date() or kwargs_defaults["end_date"], + time_delta=os.getenv("INFLUXDB_QUERY_TIME_DELTA") or kwargs_defaults["time_delta"], + max_retries=int(retries) if (retries := os.getenv("INFLUXDB_QUERY_MAX_RETRIES")) is not None else kwargs_defaults["max_retries"], + delay=float(delay) if (delay := os.getenv("INFLUXDB_QUERY_DELAY_SECONDS")) is not None else kwargs_defaults["delay"], +) + +# Create a Quix platform-specific application instead +app = Application( + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-source"), + auto_offset_reset="earliest", + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), +) +sdf = app.add_source(influxdb3_source, topic=app.topic(name=os.environ["output"])) + - # Create a pre-configured Producer object. - # Producer is already setup to use Quix brokers. - # It will also ensure that the topics exist before producing to them if - # Application.Quix is initialized with "auto_create_topics=True". - - with app.get_producer() as producer: - for res in get_data(): - # Parse the JSON string into a Python object - records = json.loads(res) - for index, obj in enumerate(records): - # Generate a unique message_key for each row - # Change to a tag name if you want to aggregate data by a specific tag such as "SensorID"—e.g. message_key = obj['SensorID'] - message_key = f"INFLUX_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}" - logger.info(f"Produced message with key:{message_key}, value:{obj}") - - # Serialize row value to bytes - serialized_value = serializer( - value=obj, ctx=SerializationContext(topic=topic.name, field="value") - ) - - # publish the data to the topic - producer.produce( - topic=topic.name, - key=message_key, - value=serialized_value, - ) - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("Exiting.") \ No newline at end of file +if __name__ == '__main__': + app.run() diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index 6bd4e7c57..17f65149c 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,3 +1,3 @@ -quixstreams==3.23.1 -influxdb3-python==0.3.6 -python-dotenv \ No newline at end of file +quixstreams[influxdb3] +python-dotenv +dateutil \ No newline at end of file From 60d63194dfa56e62d43c13a85b82a2b4e828cbbf Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Thu, 24 Apr 2025 20:46:23 -0400 Subject: [PATCH 2/9] update readme and clean up some things --- python/sources/influxdb_3/README.md | 40 ++++++++++++++++------ python/sources/influxdb_3/dockerfile | 9 ++++- python/sources/influxdb_3/library.json | 28 +++++++-------- python/sources/influxdb_3/requirements.txt | 3 +- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/python/sources/influxdb_3/README.md b/python/sources/influxdb_3/README.md index e2610b68c..1c8e242c1 100644 --- a/python/sources/influxdb_3/README.md +++ b/python/sources/influxdb_3/README.md @@ -1,10 +1,14 @@ # InfluxDB v3 -[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically query InfluxDB and publish the results to a Kafka topic. +[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically +query InfluxDB3 and publish the results to a Kafka topic. + +To learn more about how it functions, [check out the underlying +Quix Streams `InfluxDB3Source`](https://quix.io/docs/quix-streams/connectors/sources/influxdb3-source.html). ## How to run -Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector. +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. Clicking `Set up connector` allows you to enter your connection details and runtime parameters. @@ -17,17 +21,33 @@ Then either: The connector uses the following environment variables: -- **output**: This is the output topic that will receive the stream (Default: `influxdb`, Required: `True`) -- **task_interval**: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1mo, 1y (Default: `5m`, Required: `True`) -- **INFLUXDB_HOST**: Host address for the InfluxDB instance. (Default: `eu-central-1-1.aws.cloud2.influxdata.com`, Required: `True`) -- **INFLUXDB_TOKEN**: Authentication token to access InfluxDB. (Default: ``, Required: `True`) -- **INFLUXDB_ORG**: Organization name in InfluxDB. (Default: ``, Required: `False`) -- **INFLUXDB_DATABASE**: Database name in InfluxDB where data is stored. (Default: ``, Required: `True`) -- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used (Default: ``, Required: `False`) +### Required + +- `output`: The Kafka topic that will receive the query results. +- `INFLUXDB_HOST`: Host address for the InfluxDB instance. +- `INFLUXDB_TOKEN`: Authentication token to access InfluxDB. +- `INFLUXDB_ORG`: Organization name in InfluxDB. +- `INFLUXDB_DATABASE`: Database name in InfluxDB where data is stored. + +### Optional + +- `INFLUXDB_QUERY_MEASUREMENTS`: The measurements to query. If left None, all measurements will be processed. +- `INFLUXDB_RECORD_TIMESTAMP_COLUMN`: The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time). +- `INFLUXDB_RECORD_MEASUREMENT_COLUMN`: The column name used for inserting the measurement name, else uses `'_measurement'`. +- `INFLUXDB_QUERY_SQL`: A custom SQL query to use for retrieving data from InfluxDB, else uses default. +- `INFLUXDB_QUERY_START_DATE`: The RFC3339-formatted start time for querying InfluxDB, else uses current runtime. +- `INFLUXDB_QUERY_END_DATE`: The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only. +- `INFLUXDB_QUERY_TIME_DELTA`: Time interval for batching queries, e.g., `'5m'` for 5 minutes. +- `INFLUXDB_QUERY_MAX_RETRIES`: Maximum number of retries for querying or producing (with multiplicative backoff). +- `INFLUXDB_QUERY_DELAY_SECONDS`: Add an optional delay (in seconds) between producing batches +- `CONSUMER_GROUP_NAME`: The name of the consumer group to use when consuming from Kafka. +- `BUFFER_SIZE`: The number of records that sink holds before flush data to InfluxDb. +- `BUFFER_TIMEOUT`: The number of seconds that sink holds before flush data to the InfluxDb. ## Requirements / Prerequisites -You will need to have an InfluxDB 3.0 instance available and an API authentication token. +You will need to have an InfluxDB 3.0 instance available and an API authentication token ( +unless otherwise noted). ## Contribute diff --git a/python/sources/influxdb_3/dockerfile b/python/sources/influxdb_3/dockerfile index 692316bbb..a7dbc6ee2 100644 --- a/python/sources/influxdb_3/dockerfile +++ b/python/sources/influxdb_3/dockerfile @@ -1,5 +1,12 @@ FROM python:3.12.5-slim-bookworm - + +# TODO: remove this RUN block when done doing "@ git+" install in requirements.txt +# This should be done BEFORE merging PR +RUN apt-get update && \ + apt-get install -y git && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + # Set environment variables for non-interactive setup and unbuffered output ENV DEBIAN_FRONTEND=noninteractive \ PYTHONUNBUFFERED=1 \ diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index 16c83942b..89173ab74 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -17,7 +17,7 @@ "Name": "output", "Type": "EnvironmentVariable", "InputType": "OutputTopic", - "Description": "This is the Kafka topic that will receive the query results", + "Description": "The Kafka topic that will receive the query results", "DefaultValue": "influxdbv3-data-source", "Required": true }, @@ -57,77 +57,77 @@ "Name": "INFLUXDB_QUERY_MEASUREMENTS", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The measurements to query. If None, all measurements will be processed.", + "Description": "The measurements to query. If left None, all measurements will be processed.", "Required": false }, { "Name": "INFLUXDB_RECORD_KEY_COLUMN", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The InfluxDB record column used for the Kafka message key, else uses the measurement's name", + "Description": "The InfluxDB record column used for the Kafka message key, else uses the measurement's name.", "Required": false }, { "Name": "INFLUXDB_RECORD_TIMESTAMP_COLUMN", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time)", + "Description": "The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time).", "Required": false }, { "Name": "INFLUXDB_RECORD_MEASUREMENT_COLUMN", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The column name used for inserting the measurement name, else uses '_measurement'", + "Description": "The column name used for inserting the measurement name, else uses '_measurement'.", "Required": false }, { "Name": "INFLUXDB_QUERY_SQL", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "A custom SQL query to use for retrieving data from InfluxDB, else uses default", + "Description": "A custom SQL query to use for retrieving data from InfluxDB, else uses default.", "Required": false }, { "Name": "INFLUXDB_QUERY_START_DATE", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The RFC3339-formatted start time for querying InfluxDB, else uses current runtime", + "Description": "The RFC3339-formatted start time for querying InfluxDB, else uses current runtime.", "Required": false }, { "Name": "INFLUXDB_QUERY_END_DATE", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only", + "Description": "The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only.", "Required": false }, { "Name": "INFLUXDB_QUERY_TIME_DELTA", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "Time interval for batching queries, e.g., '5m' for 5 minutes", + "Description": "Time interval for batching queries, e.g., '5m' for 5 minutes.", "Required": false }, { "Name": "INFLUXDB_QUERY_MAX_RETRIES", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "Maximum number of retries for querying or producing (with multiplicative backoff)", + "Description": "Maximum number of retries for querying or producing (with multiplicative backoff).", "Required": false }, { "Name": "INFLUXDB_QUERY_DELAY_SECONDS", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "Add an optional delay (in seconds) between producing batches", + "Description": "Add an optional delay (in seconds) between producing batches.", "Required": false }, { "Name": "CONSUMER_GROUP_NAME", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The name of the consumer group to use when consuming from Kafka", + "Description": "The name of the consumer group to use when consuming from Kafka.", "DefaultValue": "influxdb-sink", "Required": true }, @@ -135,7 +135,7 @@ "Name": "BUFFER_SIZE", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The number of records that sink holds before flush data to the InfluxDb", + "Description": "The number of records that sink holds before flush data to InfluxDb.", "DefaultValue": "1000", "Required": false }, @@ -143,7 +143,7 @@ "Name": "BUFFER_TIMEOUT", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The number of seconds that sink holds before flush data to the InfluxDb", + "Description": "The number of seconds that sink holds before flush data to the InfluxDb.", "DefaultValue": "1", "Required": false } diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index 17f65149c..1ac74f798 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,3 +1,4 @@ -quixstreams[influxdb3] +# TODO: remove "@ git+" version of install before merging PR +quixstreams[influxdb3] @ git+https://github.com/quixio/quix-streams.git@source/influxdb python-dotenv dateutil \ No newline at end of file From 5b29260e2b88bc9bc12b02f2baa3c5ad894fde26 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 25 Apr 2025 01:36:06 -0400 Subject: [PATCH 3/9] add details for connecting --- python/sources/influxdb_3/README.md | 17 +++++++++++++++++ python/sources/influxdb_3/library.json | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/python/sources/influxdb_3/README.md b/python/sources/influxdb_3/README.md index 1c8e242c1..dfebf1b81 100644 --- a/python/sources/influxdb_3/README.md +++ b/python/sources/influxdb_3/README.md @@ -6,6 +6,21 @@ query InfluxDB3 and publish the results to a Kafka topic. To learn more about how it functions, [check out the underlying Quix Streams `InfluxDB3Source`](https://quix.io/docs/quix-streams/connectors/sources/influxdb3-source.html). + + +## Using with a Quix Cloud InfluxDB3 Service + +This deployment will work seamlessly with a [Quix Cloud InfluxDB3 service](https://github.com/quixio/quix-samples/tree/main/docker/influxdb_3). + +Simply provide the following arguments when setting up this connector: + +```shell +INFLUXDB_HOST="http://influxdb3:80" +INFLUXDB_ORG="" # required, but ignored +INFLUXDB_TOKEN="" # required, but ignored +``` + + ## How to run Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. @@ -17,6 +32,8 @@ Then either: * or click `Customise connector` to inspect or alter the code before deployment. + + ## Environment Variables The connector uses the following environment variables: diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index 89173ab74..d7b885186 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -26,7 +26,7 @@ "Type": "EnvironmentVariable", "InputType": "FreeText", "Description": "Host address for the InfluxDB instance.", - "DefaultValue": "eu-central-1-1.aws.cloud2.influxdata.com", + "DefaultValue": "http://influxdb3:80", "Required": true }, { From 4ce99edb50040d236753fd634e27e70141b91f0e Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Fri, 25 Apr 2025 01:42:22 -0400 Subject: [PATCH 4/9] adjust secret field --- python/sources/influxdb_3/library.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index d7b885186..8871b850b 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -32,7 +32,7 @@ { "Name": "INFLUXDB_TOKEN", "Type": "EnvironmentVariable", - "InputType": "FreeText", + "InputType": "Secret", "Description": "Authentication token to access InfluxDB.", "DefaultValue": "", "Required": true From 3b4f220fa01f5a16259b4019da8de04bc55510ca Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 28 Apr 2025 11:24:17 -0400 Subject: [PATCH 5/9] fix requirement --- python/sources/influxdb_3/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index 1ac74f798..fcb3c57f0 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,4 +1,4 @@ # TODO: remove "@ git+" version of install before merging PR quixstreams[influxdb3] @ git+https://github.com/quixio/quix-streams.git@source/influxdb python-dotenv -dateutil \ No newline at end of file +python-dateutil \ No newline at end of file From 81d0748374b9941befa52791c74c3873a90c59ba Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 28 Apr 2025 11:29:02 -0400 Subject: [PATCH 6/9] fix typing --- python/sources/influxdb_3/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/sources/influxdb_3/main.py b/python/sources/influxdb_3/main.py index 4500836b0..e8466f6be 100644 --- a/python/sources/influxdb_3/main.py +++ b/python/sources/influxdb_3/main.py @@ -1,6 +1,7 @@ from dateutil import parser import os import inspect +from typing import Any from quixstreams import Application from quixstreams.sources.community.influxdb3 import InfluxDB3Source From 5a27f7917e6aea54a668dcbe008b1a1c5dbbf8b2 Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 28 Apr 2025 11:38:51 -0400 Subject: [PATCH 7/9] fix bad default value --- python/sources/influxdb_3/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/sources/influxdb_3/main.py b/python/sources/influxdb_3/main.py index e8466f6be..9ffd1731a 100644 --- a/python/sources/influxdb_3/main.py +++ b/python/sources/influxdb_3/main.py @@ -62,8 +62,8 @@ def _end_date(): start_date=_start_date() or kwargs_defaults["start_date"], end_date=_end_date() or kwargs_defaults["end_date"], time_delta=os.getenv("INFLUXDB_QUERY_TIME_DELTA") or kwargs_defaults["time_delta"], - max_retries=int(retries) if (retries := os.getenv("INFLUXDB_QUERY_MAX_RETRIES")) is not None else kwargs_defaults["max_retries"], - delay=float(delay) if (delay := os.getenv("INFLUXDB_QUERY_DELAY_SECONDS")) is not None else kwargs_defaults["delay"], + max_retries=int(retries) if (retries := os.getenv("INFLUXDB_QUERY_MAX_RETRIES", '')) != '' else kwargs_defaults["max_retries"], + delay=float(delay) if (delay := os.getenv("INFLUXDB_QUERY_DELAY_SECONDS", '')) != '' else kwargs_defaults["delay"], ) # Create a Quix platform-specific application instead From 30498a882c96e06d44729140e7140f4d35579ebd Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Tue, 1 Jul 2025 03:53:24 -0400 Subject: [PATCH 8/9] update to latest quixstreams (tested) --- python/sources/influxdb_3/dockerfile | 7 ------- python/sources/influxdb_3/requirements.txt | 3 +-- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/python/sources/influxdb_3/dockerfile b/python/sources/influxdb_3/dockerfile index a7dbc6ee2..752b6e836 100644 --- a/python/sources/influxdb_3/dockerfile +++ b/python/sources/influxdb_3/dockerfile @@ -1,12 +1,5 @@ FROM python:3.12.5-slim-bookworm -# TODO: remove this RUN block when done doing "@ git+" install in requirements.txt -# This should be done BEFORE merging PR -RUN apt-get update && \ - apt-get install -y git && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* - # Set environment variables for non-interactive setup and unbuffered output ENV DEBIAN_FRONTEND=noninteractive \ PYTHONUNBUFFERED=1 \ diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index fcb3c57f0..db213e73e 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,4 +1,3 @@ -# TODO: remove "@ git+" version of install before merging PR -quixstreams[influxdb3] @ git+https://github.com/quixio/quix-streams.git@source/influxdb +quixstreams[influxdb3]==3.17.0 python-dotenv python-dateutil \ No newline at end of file From 89b27dcf539f0ac72d7f05944ed2f94979f4aa5d Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Thu, 16 Oct 2025 16:18:14 -0400 Subject: [PATCH 9/9] rebase and small fixes --- python/sources/influxdb_3/library.json | 2 +- python/sources/influxdb_3/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index 8871b850b..d592769fd 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -140,7 +140,7 @@ "Required": false }, { - "Name": "BUFFER_TIMEOUT", + "Name": "BUFFER_DELAY", "Type": "EnvironmentVariable", "InputType": "FreeText", "Description": "The number of seconds that sink holds before flush data to the InfluxDb.", diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index db213e73e..bd989a0e8 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,3 +1,3 @@ -quixstreams[influxdb3]==3.17.0 +quixstreams[influxdb3]==3.23.1 python-dotenv python-dateutil \ No newline at end of file