diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 91151166b62..2b3e039e2b7 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -365,5 +365,10 @@ public final class ConfigDefaults { "$.Credentials.SessionToken", "$.InventoryConfigurationList[*].Destination.S3BucketDestination.Encryption.SSEKMS.KeyId"); + static final boolean DEFAULT_TRACE_AGENT_RETRY_ENABLED = true; + static final int DEFAULT_TRACE_AGENT_RETRY_QUEUE_SIZE = 5000; + static final long DEFAULT_TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS = 1000L; + static final long DEFAULT_TRACE_AGENT_RETRY_BACKOFF_MAX_MS = 60000L; + private ConfigDefaults() {} } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java index bc57f96612a..4edb870b9fb 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java @@ -165,5 +165,11 @@ public final class TracerConfig { "trace.cloud.payload.tagging.max-tags"; public static final String TRACE_SERVICE_DISCOVERY_ENABLED = "trace.service.discovery.enabled"; + public static final String TRACE_AGENT_RETRY_ENABLED = "trace.agent.retry.enabled"; + public static final String TRACE_AGENT_RETRY_QUEUE_SIZE = "trace.agent.retry.queue.size"; + public static final String TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS = + "trace.agent.retry.backoff.initial.ms"; + public static final String TRACE_AGENT_RETRY_BACKOFF_MAX_MS = "trace.agent.retry.backoff.max.ms"; + private TracerConfig() {} } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java index a0011216770..0f9cd3eb901 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java @@ -6,6 +6,7 @@ import datadog.communication.serialization.FlushingBuffer; import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; +import datadog.trace.api.Config; import datadog.trace.core.CoreSpan; import datadog.trace.core.monitor.HealthMetrics; import java.nio.ByteBuffer; @@ -24,6 +25,7 @@ public class PayloadDispatcherImpl implements ByteBufferConsumer, PayloadDispatc private final RemoteMapperDiscovery mapperDiscovery; private final HealthMetrics healthMetrics; private final Monitoring monitoring; + private final TraceRetryManager retryManager; private Recording batchTimer; private RemoteMapper mapper; @@ -41,6 +43,11 @@ public PayloadDispatcherImpl( this.api = api; this.healthMetrics = healthMetrics; this.monitoring = monitoring; + + // Initialize retry manager + Config config = Config.get(); + this.retryManager = new TraceRetryManager(api, config); + this.retryManager.start(); } @Override @@ -109,6 +116,14 @@ public void accept(int messageCount, ByteBuffer buffer) { healthMetrics.onSerialize(sizeInBytes); RemoteApi.Response response = api.sendSerializedTraces(payload); mapper.reset(); + + // Check if we should retry on 429 + if (retryManager != null && response.retryable()) { + // Agent returned 429 (backpressure), enqueue for retry + retryManager.enqueue(payload, 0); // 0 indicates first retry attempt + return; + } + if (response.success()) { if (log.isDebugEnabled()) { log.debug("Successfully sent {} traces to the API", messageCount); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteApi.java index d2f3290700b..d99e38589df 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteApi.java @@ -107,46 +107,64 @@ protected static String getResponseBody(okhttp3.Response response) { public static final class Response { /** Factory method for a successful request with a trivial response body */ public static Response success(final int status) { - return new Response(true, status, null, null); + return new Response(true, false, status, null, null, null); } /** Factory method for a successful request with a trivial response body */ public static Response success(final int status, String response) { - return new Response(true, status, null, response); + return new Response(true, false, status, null, response, null); } /** Factory method for a successful request will a malformed response body */ public static Response success(final int status, final Throwable exception) { - return new Response(true, status, exception, null); + return new Response(true, false, status, exception, null, null); } /** Factory method for a request that receive an error status in response */ public static Response failed(final int status) { - return new Response(false, status, null, null); + return new Response(false, false, status, null, null, null); } /** Factory method for a failed communication attempt */ public static Response failed(final Throwable exception) { - return new Response(false, null, exception, null); + return new Response(false, false, null, exception, null, null); + } + + /** Factory method for a 429 response that should be retried */ + public static Response retryable(final int status, final Payload payload) { + return new Response(false, true, status, null, null, payload); } private final boolean success; + private final boolean retryable; private final Integer status; private final Throwable exception; private final String response; + private final Payload payload; private Response( - final boolean success, final Integer status, final Throwable exception, String response) { + final boolean success, + final boolean retryable, + final Integer status, + final Throwable exception, + String response, + Payload payload) { this.success = success; + this.retryable = retryable; this.status = status; this.exception = exception; this.response = response; + this.payload = payload; } public boolean success() { return success; } + public boolean retryable() { + return retryable; + } + public OptionalInt status() { return status == null ? OptionalInt.empty() : OptionalInt.of(status); } @@ -158,5 +176,9 @@ public Optional exception() { public String response() { return response; } + + public Payload payload() { + return payload; + } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceRetryManager.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceRetryManager.java new file mode 100644 index 00000000000..508ea4759e0 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceRetryManager.java @@ -0,0 +1,214 @@ +package datadog.trace.common.writer; + +import datadog.trace.api.Config; +import java.io.Closeable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages retry of trace payloads that receive a 429 (Too Many Requests) response from the agent. + * Uses a bounded queue and exponential backoff strategy. + */ +public class TraceRetryManager implements Closeable { + private static final Logger log = LoggerFactory.getLogger(TraceRetryManager.class); + + private final BlockingQueue retryQueue; + private final RemoteApi api; + private final Config config; + private final Thread retryWorker; + private volatile boolean running; + + // Telemetry counters + private final AtomicLong retriesEnqueued = new AtomicLong(0); + private final AtomicLong retriesDroppedQueueFull = new AtomicLong(0); + private final AtomicLong retriesResubmitted = new AtomicLong(0); + private final AtomicLong retriesExhausted = new AtomicLong(0); + private final AtomicLong retries429Count = new AtomicLong(0); + + public TraceRetryManager(RemoteApi api, Config config) { + this.api = api; + this.config = config; + this.retryQueue = new LinkedBlockingQueue<>(config.getRetryQueueSize()); + this.retryWorker = new Thread(this::processRetries, "dd-trace-retry-worker"); + this.retryWorker.setDaemon(true); + this.running = false; + } + + /** Start the retry worker thread */ + public void start() { + if (!running) { + running = true; + retryWorker.start(); + log.debug("TraceRetryManager started with queue size {}", config.getRetryQueueSize()); + } + } + + /** + * Enqueue a payload for retry with exponential backoff + * + * @param payload the payload to retry + * @param attemptCount the number of previous attempts (0 for first retry) + */ + public void enqueue(Payload payload, int attemptCount) { + long backoffMs = calculateBackoff(attemptCount); + long retryAfterMs = System.currentTimeMillis() + backoffMs; + RetryEntry entry = new RetryEntry(payload, retryAfterMs, attemptCount); + + if (!retryQueue.offer(entry)) { + retriesDroppedQueueFull.incrementAndGet(); + log.warn( + "Retry queue full (size={}), dropping payload with {} traces", + config.getRetryQueueSize(), + payload.traceCount()); + } else { + retriesEnqueued.incrementAndGet(); + log.debug( + "Enqueued payload for retry with backoff {}ms (attempt {}, {} traces)", + backoffMs, + attemptCount + 1, + payload.traceCount()); + } + } + + /** + * Calculate exponential backoff delay + * + * @param attemptCount the number of previous attempts + * @return backoff delay in milliseconds + */ + private long calculateBackoff(int attemptCount) { + long backoff = config.getRetryBackoffInitialMs() * (1L << attemptCount); + return Math.min(backoff, config.getRetryBackoffMaxMs()); + } + + /** Worker loop that drains the retry queue and resubmits payloads */ + private void processRetries() { + log.debug("Retry worker thread started"); + while (running) { + try { + RetryEntry entry = retryQueue.poll(1, TimeUnit.SECONDS); + if (entry != null) { + long now = System.currentTimeMillis(); + if (now < entry.retryAfterMs) { + // Not ready yet, sleep and re-enqueue + long sleepMs = entry.retryAfterMs - now; + Thread.sleep(sleepMs); + } + resubmit(entry); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.debug("Retry worker interrupted"); + break; + } + } + log.debug("Retry worker thread stopped"); + } + + /** + * Resubmit a payload from the retry queue + * + * @param entry the retry entry to resubmit + */ + private void resubmit(RetryEntry entry) { + try { + RemoteApi.Response response = api.sendSerializedTraces(entry.payload); + + if (response.retryable()) { + // Still getting 429, re-enqueue with increased backoff + retries429Count.incrementAndGet(); + log.debug( + "Retry received another 429 for {} traces (attempt {})", + entry.payload.traceCount(), + entry.attemptCount + 1); + enqueue(entry.payload, entry.attemptCount + 1); + } else if (response.success()) { + // Success! + retriesResubmitted.incrementAndGet(); + log.debug( + "Retry successful for {} traces after {} attempts", + entry.payload.traceCount(), + entry.attemptCount + 1); + } else { + // Other error, drop and log + retriesExhausted.incrementAndGet(); + log.warn( + "Retry exhausted for {} traces with status {} after {} attempts", + entry.payload.traceCount(), + response.status().isPresent() ? response.status().getAsInt() : "unknown", + entry.attemptCount + 1); + } + } catch (Exception e) { + retriesExhausted.incrementAndGet(); + log.warn( + "Exception during retry of {} traces after {} attempts", + entry.payload.traceCount(), + entry.attemptCount + 1, + e); + } + } + + @Override + public void close() { + log.debug("Shutting down TraceRetryManager"); + running = false; + retryWorker.interrupt(); + try { + retryWorker.join(5000); // Wait up to 5 seconds for graceful shutdown + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for retry worker to stop"); + } + + // Log final telemetry + log.info( + "TraceRetryManager closed. Stats: enqueued={}, resubmitted={}, dropped_queue_full={}, exhausted={}, http_429={}", + retriesEnqueued.get(), + retriesResubmitted.get(), + retriesDroppedQueueFull.get(), + retriesExhausted.get(), + retries429Count.get()); + } + + // Telemetry accessors for testing and monitoring + public long getRetriesEnqueued() { + return retriesEnqueued.get(); + } + + public long getRetriesDroppedQueueFull() { + return retriesDroppedQueueFull.get(); + } + + public long getRetriesResubmitted() { + return retriesResubmitted.get(); + } + + public long getRetriesExhausted() { + return retriesExhausted.get(); + } + + public long getRetries429Count() { + return retries429Count.get(); + } + + public int getQueueSize() { + return retryQueue.size(); + } + + /** Internal class to hold retry state for a payload */ + private static class RetryEntry { + final Payload payload; + final long retryAfterMs; + final int attemptCount; + + RetryEntry(Payload payload, long retryAfterMs, int attemptCount) { + this.payload = payload; + this.retryAfterMs = retryAfterMs; + this.attemptCount = attemptCount; + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index 645bbc4b9e9..70342910765 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -37,11 +37,19 @@ public class DDAgentApi extends RemoteApi { // rather it identifies this tracer as one which has computed top level status private static final String DATADOG_CLIENT_COMPUTED_TOP_LEVEL = "Datadog-Client-Computed-Top-Level"; + private static final String DATADOG_SEND_REAL_HTTP_STATUS = "Datadog-Send-Real-Http-Status"; private static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count"; private static final String DATADOG_DROPPED_TRACE_COUNT = "Datadog-Client-Dropped-P0-Traces"; private static final String DATADOG_DROPPED_SPAN_COUNT = "Datadog-Client-Dropped-P0-Spans"; private static final String DATADOG_AGENT_STATE = "Datadog-Agent-State"; + /** Result of submitting traces to the agent, distinguishing 429 for retry */ + public enum SubmitResult { + OK, + RETRY_429, + ERROR + } + private final List responseListeners = new ArrayList<>(); private final boolean metricsEnabled; @@ -104,6 +112,7 @@ public Response sendSerializedTraces(final Payload payload) { try { final Request request = prepareRequest(tracesUrl, headers) + .addHeader(DATADOG_SEND_REAL_HTTP_STATUS, "true") .addHeader(X_DATADOG_TRACE_COUNT, Integer.toString(payload.traceCount())) .addHeader(DATADOG_DROPPED_TRACE_COUNT, Long.toString(payload.droppedTraces())) .addHeader(DATADOG_DROPPED_SPAN_COUNT, Long.toString(payload.droppedSpans())) @@ -122,6 +131,12 @@ public Response sendSerializedTraces(final Payload payload) { try (final Recording recording = sendPayloadTimer.start(); final okhttp3.Response response = httpClient.newCall(request).execute()) { handleAgentChange(response.header(DATADOG_AGENT_STATE)); + if (response.code() == 429) { + // Agent is overloaded, return retryable response + log.debug( + "Trace agent returned 429 (Too Many Requests) for {} traces", payload.traceCount()); + return Response.retryable(response.code(), payload); + } if (response.code() != 200) { agentErrorCounter.incrementErrorCount(response.message(), payload.traceCount()); countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null); @@ -158,6 +173,23 @@ private void handleAgentChange(String state) { } } + /** + * Classifies a Response to determine if it should be retried, succeeded, or dropped. + * + * @param response the response from sendSerializedTraces + * @return OK if 200, RETRY_429 if 429, ERROR otherwise + */ + public SubmitResult classifyResponse(Response response) { + if (!response.success()) { + // Check if it's a 429 status that should be retried + if (response.status().isPresent() && response.status().getAsInt() == 429) { + return SubmitResult.RETRY_429; + } + return SubmitResult.ERROR; + } + return SubmitResult.OK; + } + @Override protected Logger getLogger() { return log; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/RetryEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/RetryEntry.java new file mode 100644 index 00000000000..e3a534d8ddb --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/RetryEntry.java @@ -0,0 +1,16 @@ +package datadog.trace.common.writer.ddagent; + +import datadog.trace.common.writer.Payload; + +/** Represents a payload queued for retry with backoff metadata */ +class RetryEntry { + final Payload payload; + final long retryAfterMs; + final int attemptCount; + + RetryEntry(Payload payload, long retryAfterMs, int attemptCount) { + this.payload = payload; + this.retryAfterMs = retryAfterMs; + this.attemptCount = attemptCount; + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentApiTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentApiTest.groovy index 3342a975969..57f01867075 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentApiTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentApiTest.groovy @@ -448,6 +448,126 @@ class DDAgentApiTest extends DDCoreSpecification { } } + def "opt-in header is sent on all requests"() { + setup: + def headerReceived = null + def agent = httpServer { + handlers { + put("v0.4/traces") { + headerReceived = request.headers.get("Datadog-Send-Real-Http-Status") + response.status(200).send() + } + } + } + def client = createAgentApi(agent.address.toString())[1] + def payload = prepareTraces("v0.4/traces", []) + + when: + client.sendSerializedTraces(payload) + + then: + headerReceived == "true" + + cleanup: + agent.close() + } + + def "429 response is classified as RETRY_429"() { + setup: + def agent = httpServer { + handlers { + put("v0.3/traces") { + response.status(429).send() + } + put("v0.4/traces") { + response.status(429).send() + } + put("v0.5/traces") { + response.status(429).send() + } + } + } + def client = createAgentApi(agent.address.toString())[1] + def payload = prepareTraces("v0.4/traces", []) + + when: + def response = client.sendSerializedTraces(payload) + def result = client.classifyResponse(response) + + then: + !response.success() + response.status().asInt == 429 + result == DDAgentApi.SubmitResult.RETRY_429 + + cleanup: + agent.close() + } + + def "200 response is classified as OK"() { + setup: + def agent = httpServer { + handlers { + put("v0.3/traces") { + response.status(200).send() + } + put("v0.4/traces") { + response.status(200).send() + } + put("v0.5/traces") { + response.status(200).send() + } + } + } + def client = createAgentApi(agent.address.toString())[1] + def payload = prepareTraces("v0.4/traces", []) + + when: + def response = client.sendSerializedTraces(payload) + def result = client.classifyResponse(response) + + then: + response.success() + response.status().asInt == 200 + result == DDAgentApi.SubmitResult.OK + + cleanup: + agent.close() + } + + def "non-429 error response is classified as ERROR"() { + setup: + def agent = httpServer { + handlers { + put("v0.3/traces") { + response.status(errorCode).send() + } + put("v0.4/traces") { + response.status(errorCode).send() + } + put("v0.5/traces") { + response.status(errorCode).send() + } + } + } + def client = createAgentApi(agent.address.toString())[1] + def payload = prepareTraces("v0.4/traces", []) + + when: + def response = client.sendSerializedTraces(payload) + def result = client.classifyResponse(response) + + then: + !response.success() + response.status().asInt == errorCode + result == DDAgentApi.SubmitResult.ERROR + + cleanup: + agent.close() + + where: + errorCode << [400, 404, 500, 503] + } + def createAgentApi(String url) { HttpUrl agentUrl = HttpUrl.get(url) OkHttpClient client = OkHttpUtils.buildHttpClient(agentUrl, 1000) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceRetryManagerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceRetryManagerTest.groovy new file mode 100644 index 00000000000..2e7d4a8c790 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceRetryManagerTest.groovy @@ -0,0 +1,183 @@ +package datadog.trace.common.writer + +import datadog.trace.api.Config +import datadog.trace.core.test.DDCoreSpecification +import spock.lang.Timeout + +import java.util.concurrent.atomic.AtomicInteger + +@Timeout(10) +class TraceRetryManagerTest extends DDCoreSpecification { + + def "enqueuing payload for retry increments counter"() { + setup: + def mockApi = Mock(RemoteApi) + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 10 + getRetryBackoffInitialMs() >> 100L + getRetryBackoffMaxMs() >> 1000L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + def payload = Mock(Payload) { + traceCount() >> 5 + } + + when: + retryManager.enqueue(payload, 0) + + then: + retryManager.retriesEnqueued == 1 + retryManager.queueSize == 1 + } + + def "full queue drops payloads and increments dropped counter"() { + setup: + def mockApi = Mock(RemoteApi) + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 2 + getRetryBackoffInitialMs() >> 100L + getRetryBackoffMaxMs() >> 1000L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + def payload1 = Mock(Payload) { traceCount() >> 1 } + def payload2 = Mock(Payload) { traceCount() >> 2 } + def payload3 = Mock(Payload) { traceCount() >> 3 } + + when: + retryManager.enqueue(payload1, 0) + retryManager.enqueue(payload2, 0) + retryManager.enqueue(payload3, 0) + + then: + retryManager.retriesEnqueued == 2 + retryManager.retriesDroppedQueueFull == 1 + retryManager.queueSize == 2 + } + + def "successful retry increments success counter"() { + setup: + def mockApi = Mock(RemoteApi) + def successResponse = RemoteApi.Response.success(200) + + mockApi.sendSerializedTraces(_) >> successResponse + + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 10 + getRetryBackoffInitialMs() >> 50L + getRetryBackoffMaxMs() >> 1000L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + def payload = Mock(Payload) { + traceCount() >> 5 + } + + retryManager.start() + + when: + retryManager.enqueue(payload, 0) + // Wait for retry to process + Thread.sleep(200) + + then: + retryManager.retriesResubmitted >= 1 + + cleanup: + retryManager.close() + } + + def "429 response triggers exponential backoff and re-enqueue"() { + setup: + def mockApi = Mock(RemoteApi) + def mockPayload = Mock(Payload) { + traceCount() >> 5 + } + def response429 = RemoteApi.Response.retryable(429, mockPayload) + def successResponse = RemoteApi.Response.success(200) + def callCount = new AtomicInteger(0) + + // First call returns 429, second call returns success + mockApi.sendSerializedTraces(_) >> { args -> + callCount.incrementAndGet() == 1 ? response429 : successResponse + } + + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 10 + getRetryBackoffInitialMs() >> 50L + getRetryBackoffMaxMs() >> 200L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + + retryManager.start() + + when: + retryManager.enqueue(mockPayload, 0) + // Wait for initial retry + re-queue + second retry + Thread.sleep(500) + + then: + callCount.get() >= 2 + retryManager.retries429Count >= 1 + retryManager.retriesResubmitted >= 1 + + cleanup: + retryManager.close() + } + + def "non-429 error drops payload and does not re-enqueue"() { + setup: + def mockApi = Mock(RemoteApi) + def errorResponse = RemoteApi.Response.failed(500) + def callCount = new AtomicInteger(0) + + mockApi.sendSerializedTraces(_) >> { + callCount.incrementAndGet() + return errorResponse + } + + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 10 + getRetryBackoffInitialMs() >> 50L + getRetryBackoffMaxMs() >> 200L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + def payload = Mock(Payload) { + traceCount() >> 5 + } + + retryManager.start() + + when: + retryManager.enqueue(payload, 0) + // Wait for retry to process + Thread.sleep(200) + + then: + callCount.get() == 1 + retryManager.retriesResubmitted == 0 + retryManager.retriesExhausted == 1 + retryManager.queueSize == 0 + + cleanup: + retryManager.close() + } + + def "close interrupts retry worker"() { + setup: + def mockApi = Mock(RemoteApi) + def mockConfig = Mock(Config) { + getRetryQueueSize() >> 10 + getRetryBackoffInitialMs() >> 100L + getRetryBackoffMaxMs() >> 1000L + } + def retryManager = new TraceRetryManager(mockApi, mockConfig) + + when: + retryManager.start() + Thread.sleep(50) + retryManager.close() + Thread.sleep(100) + + then: + !retryManager.@retryWorker.isAlive() + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index d8410864868..a30661f4b83 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -153,6 +153,10 @@ import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_128_BIT_TRACEID_GENERATION_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_128_BIT_TRACEID_LOGGING_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT; +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS; +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_RETRY_BACKOFF_MAX_MS; +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_RETRY_ENABLED; +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_RETRY_QUEUE_SIZE; import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_V05_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_ANALYTICS_ENABLED; import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_BAGGAGE_MAX_BYTES; @@ -606,6 +610,10 @@ import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_ARGS; import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_PATH; import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_PORT; +import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS; +import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_RETRY_BACKOFF_MAX_MS; +import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_RETRY_ENABLED; +import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_RETRY_QUEUE_SIZE; import static datadog.trace.api.config.TracerConfig.TRACE_AGENT_URL; import static datadog.trace.api.config.TracerConfig.TRACE_ANALYTICS_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_BAGGAGE_MAX_BYTES; @@ -807,6 +815,10 @@ public static String getHostName() { private final String agentUnixDomainSocket; private final String agentNamedPipe; private final int agentTimeout; + private final boolean retryEnabled; + private final int retryQueueSize; + private final long retryBackoffInitialMs; + private final long retryBackoffMaxMs; /** Should be set to {@code true} when running in agentless mode in a JVM without TLS */ private final boolean forceClearTextHttpForIntakeClient; @@ -1498,6 +1510,18 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins agentTimeout = configProvider.getInteger(AGENT_TIMEOUT, DEFAULT_AGENT_TIMEOUT); + retryEnabled = + configProvider.getBoolean(TRACE_AGENT_RETRY_ENABLED, DEFAULT_TRACE_AGENT_RETRY_ENABLED); + retryQueueSize = + configProvider.getInteger( + TRACE_AGENT_RETRY_QUEUE_SIZE, DEFAULT_TRACE_AGENT_RETRY_QUEUE_SIZE); + retryBackoffInitialMs = + configProvider.getLong( + TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS, DEFAULT_TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS); + retryBackoffMaxMs = + configProvider.getLong( + TRACE_AGENT_RETRY_BACKOFF_MAX_MS, DEFAULT_TRACE_AGENT_RETRY_BACKOFF_MAX_MS); + forceClearTextHttpForIntakeClient = configProvider.getBoolean(FORCE_CLEAR_TEXT_HTTP_FOR_INTAKE_CLIENT, false); @@ -3025,6 +3049,22 @@ public int getAgentTimeout() { return agentTimeout; } + public boolean isRetryEnabled() { + return retryEnabled; + } + + public int getRetryQueueSize() { + return retryQueueSize; + } + + public long getRetryBackoffInitialMs() { + return retryBackoffInitialMs; + } + + public long getRetryBackoffMaxMs() { + return retryBackoffMaxMs; + } + public boolean isForceClearTextHttpForIntakeClient() { return forceClearTextHttpForIntakeClient; }