diff --git a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java index c669ab7..17f5eef 100644 --- a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java +++ b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java @@ -5,9 +5,11 @@ import com.uid2.shared.Utils; import com.uid2.shared.optout.OptOutEntry; import com.uid2.shared.optout.OptOutUtils; +import io.micrometer.core.instrument.Metrics; import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.micrometer.core.instrument.Timer; import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +18,7 @@ import java.net.URISyntaxException; import java.net.http.HttpRequest; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { @@ -30,9 +33,14 @@ public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { private final EndpointConfig config; private final RetryingWebClient retryingClient; + private final Timer timer; public OptOutPartnerEndpoint(Vertx vertx, EndpointConfig config) { this.config = config; + this.timer = Timer.builder("uid2.optout.deltasend_successfulchunktime_ms") + .description("Timer for each HTTP connection that successfully transfers part of a delta to a partner") + .tag("remote_partner", this.name()) + .register(Metrics.globalRegistry); this.retryingClient = new RetryingWebClient(vertx, config.url(), config.method(), config.retryCount(), config.retryBackoffMs()); } @@ -43,6 +51,7 @@ public String name() { @Override public Future send(OptOutEntry entry) { + long startTimeMs = System.currentTimeMillis(); return this.retryingClient.send( (URI uri, HttpMethod method) -> { URIBuilder uriBuilder = new URIBuilder(uri); @@ -85,6 +94,8 @@ public Future send(OptOutEntry entry) { } if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { + long finishTimeMs = System.currentTimeMillis(); + timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS); return true; } diff --git a/src/main/java/com/uid2/optout/web/RetryingWebClient.java b/src/main/java/com/uid2/optout/web/RetryingWebClient.java index b69a3cf..d7dba0c 100644 --- a/src/main/java/com/uid2/optout/web/RetryingWebClient.java +++ b/src/main/java/com/uid2/optout/web/RetryingWebClient.java @@ -12,6 +12,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.BiFunction; @@ -19,15 +20,20 @@ public class RetryingWebClient { private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class); private final URI uri; private final HttpMethod method; + private final long resultTimeoutMs; private final int retryCount; private final int retryBackoffMs; private final HttpClient httpClient; private Vertx vertx; public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) { + this(vertx, uri, method, retryCount, retryBackoffMs, 5*60*1000); + } + public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs, long resultTimeoutMs) { this.vertx = vertx; this.uri = URI.create(uri); this.method = method; + this.resultTimeoutMs = resultTimeoutMs; this.httpClient = HttpClient.newHttpClient(); this.retryCount = retryCount; @@ -42,7 +48,8 @@ public Future send(BiFunction requestCreator Promise promise = Promise.promise(); HttpRequest request = requestCreator.apply(this.uri, this.method); - CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); + CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .orTimeout(this.resultTimeoutMs, TimeUnit.MILLISECONDS); asyncResponse.thenAccept(response -> { try { diff --git a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java index 94c6695..aa87619 100644 --- a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java +++ b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java @@ -1,7 +1,11 @@ package com.uid2.optout.web; import io.netty.handler.codec.http.HttpMethod; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.After; @@ -11,8 +15,11 @@ import java.net.URI; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.Random; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; @RunWith(VertxUnitRunner.class) public class RetryingWebClientTest { @@ -34,6 +41,13 @@ public void setup(TestContext ctx) { // pick a random code and respond with it int statusCode = Integer.valueOf(statusCodes[rand.nextInt(statusCodes.length)]); req.response().setStatusCode(statusCode).end(); + } else if (subPath.startsWith("delayed")) { + vertx.setTimer(1000, id -> { + try { + req.response().setStatusCode(200).end(); + } + catch (Exception ex) {} + }); } else { int statusCode = Integer.valueOf(subPath); req.response().setStatusCode(statusCode).end(); @@ -175,4 +189,36 @@ private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMeth })); } } + + public Function assertStatusCodeFactory(TestContext ctx, int code) { + return result -> { + ctx.assertEquals(code, result.statusCode()); + return code == result.statusCode(); + }; + } + public Handler> ensureAsyncExceptionFactory(TestContext ctx, Class exceptionClass) { + return ctx.asyncAssertFailure(cause -> { + ctx.assertTrue(cause.getClass() == exceptionClass, "Expected a " + exceptionClass.toString() + " but got a " + cause); + }); + } + + @Test + public void longRequest_longerTimeout_expectSuccess(TestContext ctx) { + testDelayedResponse(ctx, assertStatusCodeFactory(ctx, 200), 1500) + .onComplete(ctx.asyncAssertSuccess()); + } + + @Test + public void longRequest_shorterTimeout_expectFailure(TestContext ctx) { + testDelayedResponse(ctx, req -> true, 500) + .onComplete(ensureAsyncExceptionFactory(ctx, TimeoutException.class)); + } + + private Future testDelayedResponse(TestContext ctx, Function assertion, int resultTimeoutMs) { + Async async = ctx.async(); + + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/delayed", HttpMethod.GET, 0, 0, resultTimeoutMs); + return c.send((URI uri, HttpMethod method) -> HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(), assertion) + .andThen(r -> async.complete()); + } }