Skip to content

Commit caa1722

Browse files
authored
feat: query timeout improvement (#265)
* feat: (WIP) add writeTimeout and queryTimeout to ClientConfig. Properties added. Next, apply in transports. * feat: (WIP) adding new writeTimeout and queryTimeout config properties. * chore: checkstyle cleanup * tests: set timeouts for timeout tests to shorter value for CircleCI * tests: improve test of cloned GrpcCallOptions object. * tests: improve timeout exceeded test. * docs: update javadoc comments and add timeout example. * docs: update CHANGELOG.md
1 parent 92efe4b commit caa1722

File tree

11 files changed

+675
-9
lines changed

11 files changed

+675
-9
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,18 @@
22

33
### Features
44

5+
1. [#265](https://github.com/InfluxCommunity/influxdb3-java/pull/265) Add more precise timeout properties to `ClientConfig`.
6+
1. Current property `timeout` is deprecated, as it applies only to the Write API and can be confusing to some users.
7+
2. Two new properties are added, along with getters and similar setters in the `ClientConfig.Builder`.
8+
1. `writeTimeout` - a `java.time.Duration` that applies only to the Write API.
9+
2. `queryTimeout` - a `java.time.Duration` used to calculate deadlines when using the Query API.
10+
3. These properties can also be defined when creating a client using environment variables. Respectively:
11+
1. `INFLUX_WRITE_TIMEOUT` - a positive integer. The time unit is in seconds.
12+
2. `INFLUX_QUERY_TIMEOUT` - a positive integer. The time unit is in seconds.
13+
4. These properties can also be defined when creating a client using system properties. Respectively:
14+
1. `influx.writeTimeout` - a positive integer. The time unit is in seconds.
15+
2. `influx.queryTimeout` - a positive integer. The time unit is in seconds.
16+
517
### CI
618

719
1. [#266](https://github.com/InfluxCommunity/influxdb3-java/pull/266) Add tests for arm64 CircleCI.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3;
23+
24+
import java.time.Duration;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.stream.Stream;
27+
28+
import com.influxdb.v3.client.InfluxDBClient;
29+
import com.influxdb.v3.client.PointValues;
30+
import com.influxdb.v3.client.config.ClientConfig;
31+
32+
/**
33+
* This example shows how to set universal timeouts for writes and queries.
34+
* <p>
35+
* The example depends on the "influxdb3-java" module and this module should be built first
36+
* by running "mvn install" in the root directory.
37+
*/
38+
public final class TimeoutsExample {
39+
40+
public static String resolveProperty(final String property, final String fallback) {
41+
return System.getProperty(property, System.getenv(property)) == null
42+
? fallback : System.getProperty(property, System.getenv(property));
43+
}
44+
45+
private TimeoutsExample() { }
46+
47+
public static void main(final String[] args) {
48+
// timeout to use for writes. Experiment with lower values to see timeout exceptions.
49+
Duration writeTimeout = Duration.ofMillis(5000L);
50+
// timeout to use for queries. Experiment with lower values to see timeout exceptions.
51+
Duration queryTimeout = Duration.ofMillis(5000L);
52+
53+
String host = resolveProperty("INFLUX_HOST", "http://localhost:8181");
54+
String token = resolveProperty("INFLUX_TOKEN", "my-token");
55+
String database = resolveProperty("INFLUX_DATABASE", "my-database");
56+
57+
String measurement = "timeout_example";
58+
59+
ClientConfig config = new ClientConfig.Builder()
60+
.host(host)
61+
.token(token.toCharArray())
62+
.database(database)
63+
.writeTimeout(writeTimeout) // set timeout to be used with the Write API
64+
.queryTimeout(queryTimeout) // set timeout to be used with the Query API
65+
.build();
66+
67+
try (InfluxDBClient client = InfluxDBClient.getInstance(config)) {
68+
client.writeRecord(String.format("%s,id=0001 temp=30.14,ticks=42i", measurement));
69+
70+
TimeUnit.SECONDS.sleep(1);
71+
String sql = String.format("SELECT * FROM %s ORDER BY time DESC", measurement);
72+
try (Stream<PointValues> values = client.queryPoints(sql)) {
73+
values.forEach(pv -> {
74+
String sv = measurement + ","
75+
+ " id: " + pv.getTag("id") + ","
76+
+ " fVal: " + pv.getFloatField("temp") + ","
77+
+ " iVal: " + pv.getIntegerField("ticks") + ","
78+
+ " " + pv.getTimestamp();
79+
System.out.println(sv);
80+
});
81+
}
82+
} catch (Exception e) {
83+
throw new RuntimeException(e);
84+
}
85+
}
86+
}

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 99 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@
5454
* <li><code>defaultTags</code> - defaultTags added when writing points to InfluxDB</li>
5555
* <li><code>gzipThreshold</code> - threshold when gzip compression is used for writing points to InfluxDB</li>
5656
* <li><code>writeNoSync</code> - skip waiting for WAL persistence on write</li>
57-
* <li><code>responseTimeout</code> - timeout when connecting to InfluxDB</li>
57+
* <li><code>timeout</code> - <i>deprecated in 1.4.0</i> timeout when connecting to InfluxDB,
58+
* please use more informative properties <code>writeTimeout</code> and <code>queryTimeout</code></li>
59+
* <li><code>writeTimeout</code> - timeout when writing data to InfluxDB</li>
60+
* <li><code>queryTimeout</code> - timeout used to calculate a default gRPC deadline when querying InfluxDB.
61+
* Can be <code>null</code>, in which case queries can potentially run forever.</li>
5862
* <li><code>allowHttpRedirects</code> - allow redirects for InfluxDB connections</li>
5963
* <li><code>disableServerCertificateValidation</code> -
6064
* disable server certificate validation for HTTPS connections
@@ -99,7 +103,10 @@ public final class ClientConfig {
99103
private final Integer gzipThreshold;
100104
private final Boolean writeNoSync;
101105
private final Map<String, String> defaultTags;
106+
@Deprecated
102107
private final Duration timeout;
108+
private final Duration writeTimeout;
109+
private final Duration queryTimeout;
103110
private final Boolean allowHttpRedirects;
104111
private final Boolean disableServerCertificateValidation;
105112
private final String proxyUrl;
@@ -203,15 +210,42 @@ public Map<String, String> getDefaultTags() {
203210
}
204211

205212
/**
206-
* Gets the default timeout to use for the API calls. Default to '10 seconds'.
213+
* Gets the default timeout to use for write API calls.
214+
* Defaults to '{@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds'.
215+
* <p>
216+
* Deprecated in v1.4.0. Please use more informative <code>getWriteTimeout()</code>.
207217
*
208-
* @return the default timeout to use for the API calls
218+
* @return the default timeout to use for write API calls
219+
* @see #getWriteTimeout()
209220
*/
210221
@Nonnull
222+
@Deprecated
211223
public Duration getTimeout() {
212224
return timeout;
213225
}
214226

227+
/**
228+
* Gets the default timeout to use for REST Write API calls. Default is
229+
* {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds.
230+
*
231+
* @return the default timeout to use for REST Write API calls.
232+
*/
233+
@Nonnull
234+
public Duration getWriteTimeout() {
235+
return writeTimeout;
236+
}
237+
238+
/**
239+
* Gets the default timeout Duration to use for calculating a gRPC Deadline when making Query API calls.
240+
* Can be null, in which case queries can potentially wait or run forever.
241+
*
242+
* @return the default timeout Duration to use for Query API calls.
243+
*/
244+
@Nullable
245+
public Duration getQueryTimeout() {
246+
return queryTimeout;
247+
}
248+
215249
/**
216250
* Gets the automatically following HTTP 3xx redirects. Default to 'false'.
217251
*
@@ -312,6 +346,8 @@ public boolean equals(final Object o) {
312346
&& Objects.equals(writeNoSync, that.writeNoSync)
313347
&& Objects.equals(defaultTags, that.defaultTags)
314348
&& Objects.equals(timeout, that.timeout)
349+
&& Objects.equals(writeTimeout, that.writeTimeout)
350+
&& Objects.equals(queryTimeout, that.queryTimeout)
315351
&& Objects.equals(allowHttpRedirects, that.allowHttpRedirects)
316352
&& Objects.equals(disableServerCertificateValidation, that.disableServerCertificateValidation)
317353
&& Objects.equals(proxy, that.proxy)
@@ -325,7 +361,7 @@ public boolean equals(final Object o) {
325361
public int hashCode() {
326362
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
327363
database, writePrecision, gzipThreshold, writeNoSync,
328-
timeout, allowHttpRedirects, disableServerCertificateValidation,
364+
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
329365
proxy, proxyUrl, authenticator, headers,
330366
defaultTags, sslRootsFilePath);
331367
}
@@ -340,6 +376,8 @@ public String toString() {
340376
.add("gzipThreshold=" + gzipThreshold)
341377
.add("writeNoSync=" + writeNoSync)
342378
.add("timeout=" + timeout)
379+
.add("writeTimeout=" + writeTimeout)
380+
.add("queryTimeout=" + queryTimeout)
343381
.add("allowHttpRedirects=" + allowHttpRedirects)
344382
.add("disableServerCertificateValidation=" + disableServerCertificateValidation)
345383
.add("proxy=" + proxy)
@@ -366,7 +404,10 @@ public static final class Builder {
366404
private Integer gzipThreshold;
367405
private Boolean writeNoSync;
368406
private Map<String, String> defaultTags;
407+
@Deprecated
369408
private Duration timeout;
409+
private Duration writeTimeout;
410+
private Duration queryTimeout;
370411
private Boolean allowHttpRedirects;
371412
private Boolean disableServerCertificateValidation;
372413
private ProxySelector proxy;
@@ -496,18 +537,55 @@ public Builder defaultTags(@Nullable final Map<String, String> defaultTags) {
496537
}
497538

498539
/**
499-
* Sets the default timeout to use for the API calls. Default to '10 seconds'.
540+
* Sets the default timeout to use for Write API calls. Defaults to
541+
* '{@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds'.
542+
* <p>
543+
* Deprecated in v1.4.0. This setter is superseded by the clearer <code>writeTimeout()</code>.
500544
*
501-
* @param timeout default timeout to use for the API calls. Default to '10 seconds'.
545+
* @param timeout default timeout to use for Write API calls. Default to
546+
* ''{@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT} seconds'.
502547
* @return this
548+
*
549+
* @see #writeTimeout(Duration writeTimeout)
503550
*/
551+
@Deprecated
504552
@Nonnull
505553
public Builder timeout(@Nullable final Duration timeout) {
506554

507555
this.timeout = timeout;
508556
return this;
509557
}
510558

559+
/**
560+
* Sets the default writeTimeout to use for Write API calls in the REST client.
561+
* Default is {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT}
562+
*
563+
* @param writeTimeout default timeout to use for REST API write calls. Default is
564+
* {@value com.influxdb.v3.client.write.WriteOptions#DEFAULT_WRITE_TIMEOUT}
565+
* @return - this
566+
*/
567+
@Nonnull
568+
public Builder writeTimeout(@Nullable final Duration writeTimeout) {
569+
570+
this.writeTimeout = writeTimeout;
571+
return this;
572+
}
573+
574+
/**
575+
* Sets standard query timeout used to calculate a gRPC deadline when making Query API calls.
576+
* If <code>null</code>, queries can potentially wait or run forever.
577+
*
578+
* @param queryTimeout default timeout used to calculate deadline for Query API calls.
579+
* If <code>null</code>, queries can potentially wait or run forever.
580+
* Default value is <code>null</code>.
581+
* @return this
582+
*/
583+
@Nonnull
584+
public Builder queryTimeout(@Nullable final Duration queryTimeout) {
585+
this.queryTimeout = queryTimeout;
586+
return this;
587+
}
588+
511589
/**
512590
* Sets the automatically following HTTP 3xx redirects. Default to 'false'.
513591
*
@@ -719,6 +797,16 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
719797
if (writeNoSync != null) {
720798
this.writeNoSync(Boolean.parseBoolean(writeNoSync));
721799
}
800+
final String writeTimeout = get.apply("INFLUX_WRITE_TIMEOUT", "influx.writeTimeout");
801+
if (writeTimeout != null) {
802+
long to = Long.parseLong(writeTimeout);
803+
this.writeTimeout(Duration.ofSeconds(to));
804+
}
805+
final String queryTimeout = get.apply("INFLUX_QUERY_TIMEOUT", "influx.queryTimeout");
806+
if (queryTimeout != null) {
807+
long to = Long.parseLong(queryTimeout);
808+
this.queryTimeout(Duration.ofSeconds(to));
809+
}
722810

723811
return new ClientConfig(this);
724812
}
@@ -761,7 +849,11 @@ private ClientConfig(@Nonnull final Builder builder) {
761849
gzipThreshold = builder.gzipThreshold != null ? builder.gzipThreshold : WriteOptions.DEFAULT_GZIP_THRESHOLD;
762850
writeNoSync = builder.writeNoSync != null ? builder.writeNoSync : WriteOptions.DEFAULT_NO_SYNC;
763851
defaultTags = builder.defaultTags;
764-
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(10);
852+
timeout = builder.timeout != null ? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
853+
writeTimeout = builder.writeTimeout != null
854+
? builder.writeTimeout : builder.timeout != null
855+
? builder.timeout : Duration.ofSeconds(WriteOptions.DEFAULT_WRITE_TIMEOUT);
856+
queryTimeout = builder.queryTimeout;
765857
allowHttpRedirects = builder.allowHttpRedirects != null ? builder.allowHttpRedirects : false;
766858
disableServerCertificateValidation = builder.disableServerCertificateValidation != null
767859
? builder.disableServerCertificateValidation : false;

src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,46 @@ public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessag
288288
return this;
289289
}
290290

291+
/**
292+
* Helper method to clone already existing gRPC options.
293+
*
294+
* @param grpcCallOptions = options to copy
295+
* @return this
296+
*/
297+
public Builder fromGrpcCallOptions(@Nonnull final GrpcCallOptions grpcCallOptions) {
298+
if (grpcCallOptions.getDeadline() != null) {
299+
this.deadline = grpcCallOptions.getDeadline();
300+
}
301+
if (grpcCallOptions.getExecutor() != null) {
302+
this.executor = grpcCallOptions.getExecutor();
303+
}
304+
if (grpcCallOptions.getCompressorName() != null) {
305+
this.compressorName = grpcCallOptions.getCompressorName();
306+
}
307+
if (grpcCallOptions.getWaitForReady() != null) {
308+
this.waitForReady = grpcCallOptions.getWaitForReady();
309+
}
310+
if (grpcCallOptions.getMaxInboundMessageSize() != null) {
311+
this.maxInboundMessageSize = grpcCallOptions.getMaxInboundMessageSize();
312+
}
313+
if (grpcCallOptions.getMaxOutboundMessageSize() != null) {
314+
this.maxOutboundMessageSize = grpcCallOptions.getMaxOutboundMessageSize();
315+
}
316+
return this;
317+
}
318+
291319
/**
292320
* Sets the maximum allowed message size acceptable sent to the remote peer.
321+
* <p>
322+
* Note: this property leads to grpc-java issue 12109 and can lead to the connection hanging indefinitely.
323+
* See (<a href="https://github.com/grpc/grpc-java/issues/12109">grpc-java 12109</a>)
293324
*
294325
* @param maxOutboundMessageSize The maximum message send size
295326
* @return this
296327
*/
297328
public Builder withMaxOutboundMessageSize(@Nonnull final Integer maxOutboundMessageSize) {
329+
// TODO remove warning about issue 12109 in javadoc above,
330+
// once 12109 is resolved and dependencies are updated.
298331
this.maxOutboundMessageSize = maxOutboundMessageSize;
299332
return this;
300333
}

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Objects;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.logging.Logger;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.IntStream;
@@ -37,6 +38,7 @@
3738
import javax.annotation.Nonnull;
3839
import javax.annotation.Nullable;
3940

41+
import io.grpc.Deadline;
4042
import io.netty.handler.codec.http.HttpMethod;
4143
import io.netty.handler.codec.http.HttpResponseStatus;
4244
import org.apache.arrow.flight.CallOption;
@@ -396,6 +398,13 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
396398
Arguments.checkNotNull(parameters, "parameters");
397399
Arguments.checkNotNull(options, "options");
398400

401+
if (options.grpcCallOptions().getDeadline() == null && config.getQueryTimeout() != null) {
402+
options.setGrpcCallOptions(new GrpcCallOptions.Builder()
403+
.fromGrpcCallOptions(options.grpcCallOptions())
404+
.withDeadline(Deadline.after(config.getQueryTimeout().toMillis(), TimeUnit.MILLISECONDS))
405+
.build());
406+
}
407+
399408
if (closed) {
400409
throw new IllegalStateException("InfluxDBClient has been closed.");
401410
}
@@ -413,6 +422,7 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
413422
});
414423

415424
CallOption[] callOptions = options.grpcCallOptions().getCallOptions();
425+
416426
return flightSqlClient.execute(
417427
query,
418428
database,

src/main/java/com/influxdb/v3/client/internal/RestClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void checkServerTrusted(
101101

102102
// timeout and redirects
103103
HttpClient.Builder builder = HttpClient.newBuilder()
104-
.connectTimeout(config.getTimeout())
104+
.connectTimeout(config.getWriteTimeout())
105105
.followRedirects(config.getAllowHttpRedirects()
106106
? HttpClient.Redirect.NORMAL : HttpClient.Redirect.NEVER);
107107

0 commit comments

Comments
 (0)