Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,5 +365,10 @@ public final class ConfigDefaults {
"$.Credentials.SessionToken",
"$.InventoryConfigurationList[*].Destination.S3BucketDestination.Encryption.SSEKMS.KeyId");

static final boolean DEFAULT_TRACE_AGENT_RETRY_ENABLED = true;
static final int DEFAULT_TRACE_AGENT_RETRY_QUEUE_SIZE = 5000;
static final long DEFAULT_TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS = 1000L;
static final long DEFAULT_TRACE_AGENT_RETRY_BACKOFF_MAX_MS = 60000L;

private ConfigDefaults() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,11 @@ public final class TracerConfig {
"trace.cloud.payload.tagging.max-tags";
public static final String TRACE_SERVICE_DISCOVERY_ENABLED = "trace.service.discovery.enabled";

public static final String TRACE_AGENT_RETRY_ENABLED = "trace.agent.retry.enabled";
public static final String TRACE_AGENT_RETRY_QUEUE_SIZE = "trace.agent.retry.queue.size";
public static final String TRACE_AGENT_RETRY_BACKOFF_INITIAL_MS =
"trace.agent.retry.backoff.initial.ms";
public static final String TRACE_AGENT_RETRY_BACKOFF_MAX_MS = "trace.agent.retry.backoff.max.ms";

private TracerConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.communication.serialization.FlushingBuffer;
import datadog.communication.serialization.WritableFormatter;
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.Config;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.monitor.HealthMetrics;
import java.nio.ByteBuffer;
Expand All @@ -24,6 +25,7 @@ public class PayloadDispatcherImpl implements ByteBufferConsumer, PayloadDispatc
private final RemoteMapperDiscovery mapperDiscovery;
private final HealthMetrics healthMetrics;
private final Monitoring monitoring;
private final TraceRetryManager retryManager;

private Recording batchTimer;
private RemoteMapper mapper;
Expand All @@ -41,6 +43,11 @@ public PayloadDispatcherImpl(
this.api = api;
this.healthMetrics = healthMetrics;
this.monitoring = monitoring;

// Initialize retry manager
Config config = Config.get();
this.retryManager = new TraceRetryManager(api, config);
this.retryManager.start();
}

@Override
Expand Down Expand Up @@ -109,6 +116,14 @@ public void accept(int messageCount, ByteBuffer buffer) {
healthMetrics.onSerialize(sizeInBytes);
RemoteApi.Response response = api.sendSerializedTraces(payload);
mapper.reset();

// Check if we should retry on 429
if (retryManager != null && response.retryable()) {
// Agent returned 429 (backpressure), enqueue for retry
retryManager.enqueue(payload, 0); // 0 indicates first retry attempt
return;
}

if (response.success()) {
if (log.isDebugEnabled()) {
log.debug("Successfully sent {} traces to the API", messageCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,46 +107,64 @@ protected static String getResponseBody(okhttp3.Response response) {
public static final class Response {
/** Factory method for a successful request with a trivial response body */
public static Response success(final int status) {
return new Response(true, status, null, null);
return new Response(true, false, status, null, null, null);
}

/** Factory method for a successful request with a trivial response body */
public static Response success(final int status, String response) {
return new Response(true, status, null, response);
return new Response(true, false, status, null, response, null);
}

/** Factory method for a successful request will a malformed response body */
public static Response success(final int status, final Throwable exception) {
return new Response(true, status, exception, null);
return new Response(true, false, status, exception, null, null);
}

/** Factory method for a request that receive an error status in response */
public static Response failed(final int status) {
return new Response(false, status, null, null);
return new Response(false, false, status, null, null, null);
}

/** Factory method for a failed communication attempt */
public static Response failed(final Throwable exception) {
return new Response(false, null, exception, null);
return new Response(false, false, null, exception, null, null);
}

/** Factory method for a 429 response that should be retried */
public static Response retryable(final int status, final Payload payload) {
return new Response(false, true, status, null, null, payload);
}

private final boolean success;
private final boolean retryable;
private final Integer status;
private final Throwable exception;
private final String response;
private final Payload payload;

private Response(
final boolean success, final Integer status, final Throwable exception, String response) {
final boolean success,
final boolean retryable,
final Integer status,
final Throwable exception,
String response,
Payload payload) {
this.success = success;
this.retryable = retryable;
this.status = status;
this.exception = exception;
this.response = response;
this.payload = payload;
}

public boolean success() {
return success;
}

public boolean retryable() {
return retryable;
}

public OptionalInt status() {
return status == null ? OptionalInt.empty() : OptionalInt.of(status);
}
Expand All @@ -158,5 +176,9 @@ public Optional<Throwable> exception() {
public String response() {
return response;
}

public Payload payload() {
return payload;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package datadog.trace.common.writer;

import datadog.trace.api.Config;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manages retry of trace payloads that receive a 429 (Too Many Requests) response from the agent.
* Uses a bounded queue and exponential backoff strategy.
*/
public class TraceRetryManager implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TraceRetryManager.class);

private final BlockingQueue<RetryEntry> retryQueue;
private final RemoteApi api;
private final Config config;
private final Thread retryWorker;
private volatile boolean running;

// Telemetry counters
private final AtomicLong retriesEnqueued = new AtomicLong(0);
private final AtomicLong retriesDroppedQueueFull = new AtomicLong(0);
private final AtomicLong retriesResubmitted = new AtomicLong(0);
private final AtomicLong retriesExhausted = new AtomicLong(0);
private final AtomicLong retries429Count = new AtomicLong(0);

public TraceRetryManager(RemoteApi api, Config config) {
this.api = api;
this.config = config;
this.retryQueue = new LinkedBlockingQueue<>(config.getRetryQueueSize());
this.retryWorker = new Thread(this::processRetries, "dd-trace-retry-worker");
this.retryWorker.setDaemon(true);
this.running = false;
}

/** Start the retry worker thread */
public void start() {
if (!running) {
running = true;
retryWorker.start();
log.debug("TraceRetryManager started with queue size {}", config.getRetryQueueSize());
}
}

/**
* Enqueue a payload for retry with exponential backoff
*
* @param payload the payload to retry
* @param attemptCount the number of previous attempts (0 for first retry)
*/
public void enqueue(Payload payload, int attemptCount) {
long backoffMs = calculateBackoff(attemptCount);
long retryAfterMs = System.currentTimeMillis() + backoffMs;
RetryEntry entry = new RetryEntry(payload, retryAfterMs, attemptCount);

if (!retryQueue.offer(entry)) {
retriesDroppedQueueFull.incrementAndGet();
log.warn(
"Retry queue full (size={}), dropping payload with {} traces",
config.getRetryQueueSize(),
payload.traceCount());
} else {
retriesEnqueued.incrementAndGet();
log.debug(
"Enqueued payload for retry with backoff {}ms (attempt {}, {} traces)",
backoffMs,
attemptCount + 1,
payload.traceCount());
}
}

/**
* Calculate exponential backoff delay
*
* @param attemptCount the number of previous attempts
* @return backoff delay in milliseconds
*/
private long calculateBackoff(int attemptCount) {
long backoff = config.getRetryBackoffInitialMs() * (1L << attemptCount);
return Math.min(backoff, config.getRetryBackoffMaxMs());
}

/** Worker loop that drains the retry queue and resubmits payloads */
private void processRetries() {
log.debug("Retry worker thread started");
while (running) {
try {
RetryEntry entry = retryQueue.poll(1, TimeUnit.SECONDS);
if (entry != null) {
long now = System.currentTimeMillis();
if (now < entry.retryAfterMs) {
// Not ready yet, sleep and re-enqueue
long sleepMs = entry.retryAfterMs - now;
Thread.sleep(sleepMs);
}
resubmit(entry);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.debug("Retry worker interrupted");
break;
}
}
log.debug("Retry worker thread stopped");
}

/**
* Resubmit a payload from the retry queue
*
* @param entry the retry entry to resubmit
*/
private void resubmit(RetryEntry entry) {
try {
RemoteApi.Response response = api.sendSerializedTraces(entry.payload);

if (response.retryable()) {
// Still getting 429, re-enqueue with increased backoff
retries429Count.incrementAndGet();
log.debug(
"Retry received another 429 for {} traces (attempt {})",
entry.payload.traceCount(),
entry.attemptCount + 1);
enqueue(entry.payload, entry.attemptCount + 1);
} else if (response.success()) {
// Success!
retriesResubmitted.incrementAndGet();
log.debug(
"Retry successful for {} traces after {} attempts",
entry.payload.traceCount(),
entry.attemptCount + 1);
} else {
// Other error, drop and log
retriesExhausted.incrementAndGet();
log.warn(
"Retry exhausted for {} traces with status {} after {} attempts",
entry.payload.traceCount(),
response.status().isPresent() ? response.status().getAsInt() : "unknown",
entry.attemptCount + 1);
}
} catch (Exception e) {
retriesExhausted.incrementAndGet();
log.warn(
"Exception during retry of {} traces after {} attempts",
entry.payload.traceCount(),
entry.attemptCount + 1,
e);
}
}

@Override
public void close() {
log.debug("Shutting down TraceRetryManager");
running = false;
retryWorker.interrupt();
try {
retryWorker.join(5000); // Wait up to 5 seconds for graceful shutdown
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for retry worker to stop");
}

// Log final telemetry
log.info(
"TraceRetryManager closed. Stats: enqueued={}, resubmitted={}, dropped_queue_full={}, exhausted={}, http_429={}",
retriesEnqueued.get(),
retriesResubmitted.get(),
retriesDroppedQueueFull.get(),
retriesExhausted.get(),
retries429Count.get());
}

// Telemetry accessors for testing and monitoring
public long getRetriesEnqueued() {
return retriesEnqueued.get();
}

public long getRetriesDroppedQueueFull() {
return retriesDroppedQueueFull.get();
}

public long getRetriesResubmitted() {
return retriesResubmitted.get();
}

public long getRetriesExhausted() {
return retriesExhausted.get();
}

public long getRetries429Count() {
return retries429Count.get();
}

public int getQueueSize() {
return retryQueue.size();
}

/** Internal class to hold retry state for a payload */
private static class RetryEntry {
final Payload payload;
final long retryAfterMs;
final int attemptCount;

RetryEntry(Payload payload, long retryAfterMs, int attemptCount) {
this.payload = payload;
this.retryAfterMs = retryAfterMs;
this.attemptCount = attemptCount;
}
}
}
Loading
Loading