From 1d99afd964e9e9df0f7914d2f175d81846026fba Mon Sep 17 00:00:00 2001 From: Marc Neudert Date: Wed, 9 Nov 2022 18:36:18 +0100 Subject: [PATCH] Support configuring default connection query language --- CHANGELOG.md | 5 ++ lib/instream/connection/config.ex | 13 ++++ lib/instream/connection/query_runner_v1.ex | 14 ++--- lib/instream/connection/query_runner_v2.ex | 6 +- lib/instream/query/headers.ex | 2 +- lib/instream/query/url.ex | 57 ++++++++++------- test/influxdb_v1/connection_test.exs | 72 +++++++++++++++++++++ test/influxdb_v2/connection_test.exs | 73 ++++++++++++++++++++++ 8 files changed, 208 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bbf723..73dc963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v2.3.0-dev + +- Enhancements + - The default query language can be configured at the connection level. If no per-query option is given it will be used instead of the default for your `:version` configuration (defaults are `:flux` for `:v2` and `:influxql` for `:v1`) + ## v2.2.0 (2022-10-16) - Enhancements diff --git a/lib/instream/connection/config.ex b/lib/instream/connection/config.ex index e933c3c..0388f39 100644 --- a/lib/instream/connection/config.ex +++ b/lib/instream/connection/config.ex @@ -168,6 +168,19 @@ defmodule Instream.Connection.Config do `:query` (InfluxDB v1). If nothing or an invalid value is given the connection will be made using `:basic` authentication. + ## Query Language + + By default Instream uses the default query language based on the configured + connection version (`:influxql` for `:v1`, `:flux` for `:v2`). + + You can change this default in your configuration: + + config :my_app, MyConnection, + query_language: :influxql # or :flux + + This configuration will then be used if you are not passing a specific + `:query_language` option for your query. + ## Point Writer If you are using the regular line protocol writer `Instream.Writer.Line` diff --git a/lib/instream/connection/query_runner_v1.ex b/lib/instream/connection/query_runner_v1.ex index 2a36442..e1a0a5a 100644 --- a/lib/instream/connection/query_runner_v1.ex +++ b/lib/instream/connection/query_runner_v1.ex @@ -63,8 +63,8 @@ defmodule Instream.Connection.QueryRunnerV1 do headers = Headers.assemble(config, opts) http_opts = http_opts(config, opts) - body = read_body(query, opts) - method = read_method(opts) + body = read_body(query, config, opts) + method = read_method(config, opts) url = read_url(conn, query, opts) {query_time, response} = @@ -208,15 +208,15 @@ defmodule Instream.Connection.QueryRunnerV1 do defp log(_, _), do: :ok - defp read_body(query, opts) do - case opts[:query_language] do + defp read_body(query, config, opts) do + case opts[:query_language] || config[:query_language] do :flux -> query _ -> "" end end - defp read_method(opts) do - case opts[:query_language] do + defp read_method(config, opts) do + case opts[:query_language] || config[:query_language] do :flux -> :post _ -> opts[:method] || :get end @@ -226,7 +226,7 @@ defmodule Instream.Connection.QueryRunnerV1 do config = conn.config() url = URL.query(config, opts) - case opts[:query_language] do + case opts[:query_language] || config[:query_language] do :flux -> url diff --git a/lib/instream/connection/query_runner_v2.ex b/lib/instream/connection/query_runner_v2.ex index 742de74..52a100d 100644 --- a/lib/instream/connection/query_runner_v2.ex +++ b/lib/instream/connection/query_runner_v2.ex @@ -206,7 +206,9 @@ defmodule Instream.Connection.QueryRunnerV2 do defp log(_, _), do: :ok defp read_body(conn, query, opts) do - case opts[:query_language] do + config = conn.config() + + case opts[:query_language] || config[:query_language] do :influxql -> "" @@ -228,7 +230,7 @@ defmodule Instream.Connection.QueryRunnerV2 do config = conn.config() url = URL.query(config, opts) - case opts[:query_language] do + case opts[:query_language] || config[:query_language] do :influxql -> case opts[:params] do params when is_map(params) -> diff --git a/lib/instream/query/headers.ex b/lib/instream/query/headers.ex index af509d6..dd1261b 100644 --- a/lib/instream/query/headers.ex +++ b/lib/instream/query/headers.ex @@ -8,7 +8,7 @@ defmodule Instream.Query.Headers do def assemble(config, options \\ []) do assemble_auth(config[:auth]) ++ assemble_encoding(options[:result_as]) ++ - assemble_language(config[:version], options[:query_language]) + assemble_language(config[:version], options[:query_language] || config[:query_language]) end @doc """ diff --git a/lib/instream/query/url.ex b/lib/instream/query/url.ex index 25adb44..6700e51 100644 --- a/lib/instream/query/url.ex +++ b/lib/instream/query/url.ex @@ -24,30 +24,12 @@ defmodule Instream.Query.URL do """ @spec query(Keyword.t(), Keyword.t()) :: String.t() def query(config, opts) do - case {config[:version], opts[:query_language]} do - {:v2, :influxql} -> - config - |> url("query") - |> append_param("db", opts[:database] || config[:database]) - |> append_param("epoch", encode_precision(opts[:precision])) - - {:v2, _} -> - config - |> url("api/v2/query") - |> append_param("org", opts[:org] || config[:org]) - - {:v1, :flux} -> - config - |> url("api/v2/query") - |> append_param("db", opts[:database] || config[:database]) - |> append_param("epoch", encode_precision(opts[:precision])) - - {:v1, _} -> - config - |> url("query") - |> append_param("db", opts[:database] || config[:database]) - |> append_param("epoch", encode_precision(opts[:precision])) - end + query_url( + config[:version], + opts[:query_language] || config[:query_language], + config, + opts + ) end @doc """ @@ -113,6 +95,33 @@ defmodule Instream.Query.URL do defp encode_precision(:nanosecond), do: "ns" defp encode_precision(_), do: "" + defp query_url(:v2, :influxql, config, opts) do + config + |> url("query") + |> append_param("db", opts[:database] || config[:database]) + |> append_param("epoch", encode_precision(opts[:precision])) + end + + defp query_url(:v2, _, config, opts) do + config + |> url("api/v2/query") + |> append_param("org", opts[:org] || config[:org]) + end + + defp query_url(:v1, :flux, config, opts) do + config + |> url("api/v2/query") + |> append_param("db", opts[:database] || config[:database]) + |> append_param("epoch", encode_precision(opts[:precision])) + end + + defp query_url(:v1, _, config, opts) do + config + |> url("query") + |> append_param("db", opts[:database] || config[:database]) + |> append_param("epoch", encode_precision(opts[:precision])) + end + defp url(config, endpoint) do url = [ diff --git a/test/influxdb_v1/connection_test.exs b/test/influxdb_v1/connection_test.exs index 12b06e3..31114ae 100644 --- a/test/influxdb_v1/connection_test.exs +++ b/test/influxdb_v1/connection_test.exs @@ -19,6 +19,26 @@ defmodule Instream.InfluxDBv1.ConnectionTest do ] end + defmodule QueryLanguageConnection do + use Instream.Connection, + otp_app: :instream, + config: [ + init: {__MODULE__.Initializer, :init} + ] + + defmodule Initializer do + def init(conn) do + config = + Keyword.merge( + Application.get_env(:instream, TestConnection), + query_language: :flux + ) + + Application.put_env(:instream, conn, config) + end + end + end + defmodule TestSeries do use Instream.Series @@ -160,4 +180,56 @@ defmodule Instream.InfluxDBv1.ConnectionTest do assert @tags == values_tags assert 0 < length(value_rows) end + + describe "query language from connection config" do + test "use default from config" do + start_supervised!(QueryLanguageConnection) + + measurement = "default_query_language_config" + + :ok = + QueryLanguageConnection.write([ + %{ + measurement: measurement, + fields: %{value: 42} + } + ]) + + result = + QueryLanguageConnection.query(""" + from(bucket:"test_database/autogen") + |> range(start: -5m) + |> filter(fn: (r) => r._measurement == "#{measurement}") + |> last() + """) + + assert [ + %{ + "_field" => "value", + "_measurement" => ^measurement, + "_value" => 42, + "result" => "_result" + } + ] = result + end + + test "override default config" do + start_supervised!(QueryLanguageConnection) + + measurement = "default_query_language_override" + + :ok = + QueryLanguageConnection.write([ + %{ + measurement: measurement, + fields: %{value: 42} + } + ]) + + assert %{results: [%{series: [%{name: ^measurement, values: [[_, 42]]}]}]} = + QueryLanguageConnection.query("SELECT LAST(value) FROM #{measurement}", + query_language: :influxql + ) + end + end end diff --git a/test/influxdb_v2/connection_test.exs b/test/influxdb_v2/connection_test.exs index 5225f30..4a3c56c 100644 --- a/test/influxdb_v2/connection_test.exs +++ b/test/influxdb_v2/connection_test.exs @@ -19,6 +19,26 @@ defmodule Instream.InfluxDBv2.ConnectionTest do ] end + defmodule QueryLanguageConnection do + use Instream.Connection, + otp_app: :instream, + config: [ + init: {__MODULE__.Initializer, :init} + ] + + defmodule Initializer do + def init(conn) do + config = + Keyword.merge( + Application.get_env(:instream, TestConnection), + query_language: :influxql + ) + + Application.put_env(:instream, conn, config) + end + end + end + defmodule TestSeries do use Instream.Series @@ -167,4 +187,57 @@ defmodule Instream.InfluxDBv2.ConnectionTest do assert %{results: [%{series: [%{name: "params", values: [[_, ^test_field]]}]}]} = TestConnection.query(query, query_language: :influxql, params: params) end + + describe "query language from connection config" do + test "use default from config" do + start_supervised!(QueryLanguageConnection) + + measurement = "default_query_language_config" + + :ok = + QueryLanguageConnection.write([ + %{ + measurement: measurement, + fields: %{value: 42} + } + ]) + + assert %{results: [%{series: [%{name: ^measurement, values: [[_, 42]]}]}]} = + QueryLanguageConnection.query("SELECT LAST(value) FROM #{measurement}") + end + + test "override default config" do + start_supervised!(QueryLanguageConnection) + + measurement = "default_query_language_override" + + :ok = + QueryLanguageConnection.write([ + %{ + measurement: measurement, + fields: %{value: 42} + } + ]) + + result = + QueryLanguageConnection.query( + """ + from(bucket:"#{QueryLanguageConnection.config(:bucket)}") + |> range(start: -5m) + |> filter(fn: (r) => r._measurement == "#{measurement}") + |> last() + """, + query_language: :flux + ) + + assert [ + %{ + "_field" => "value", + "_measurement" => ^measurement, + "_value" => 42, + "result" => "_result" + } + ] = result + end + end end