From 8156690b123b1a430dfb1765a762678dae5ae72b Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Sun, 25 Jan 2026 14:14:17 -0800 Subject: [PATCH] Add FunctionInvocationId gRPC metadata propagation for Azure Functions Adds support for propagating the Azure Functions invocation ID to the Durable Functions host via gRPC metadata, enabling correlation between worker-side function invocations and host-side orchestration events. - Added interceptor support to DurableTaskGrpcClientBuilder - Created FunctionInvocationIdInterceptor in azurefunctions module - Updated DurableClientContext to configure the interceptor automatically - Added unit tests for the interceptor Related to Azure/azure-functions-durable-extension#3317 --- CHANGELOG.md | 1 + .../azurefunctions/DurableClientContext.java | 23 +++- .../FunctionInvocationIdInterceptor.java | 45 ++++++++ .../FunctionInvocationIdInterceptorTests.java | 101 ++++++++++++++++++ .../durabletask/DurableTaskGrpcClient.java | 6 ++ .../DurableTaskGrpcClientBuilder.java | 32 ++++++ 6 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptor.java create mode 100644 azurefunctions/src/test/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 87977e6f..8d2483ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Unreleased +* Add client operation correlation logging: `FunctionInvocationId` is now propagated via gRPC metadata to the host for client operations, enabling correlation with host logs. ## v1.6.2 * Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249)) diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java index a952db68..a1206572 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java @@ -11,6 +11,7 @@ import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import com.microsoft.durabletask.azurefunctions.internal.FunctionInvocationIdInterceptor; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; @@ -29,6 +30,7 @@ public class DurableClientContext { private String taskHubName; private String requiredQueryStringParameters; private DurableTaskClient client; + private String functionInvocationId; /** * Gets the name of the client binding's task hub. @@ -39,6 +41,18 @@ public String getTaskHubName() { return this.taskHubName; } + /** + * Sets the function invocation ID for correlation with host-side logs. + *

+ * Call this method before calling {@link #getClient()} to enable correlation + * between client operations and host-side logs. + * + * @param invocationId the Azure Functions invocation ID + */ + public void setFunctionInvocationId(String invocationId) { + this.functionInvocationId = invocationId; + } + /** * Gets the durable task client associated with the current function invocation. * @@ -56,7 +70,14 @@ public DurableTaskClient getClient() { throw new IllegalStateException("The client context RPC base URL was invalid!", ex); } - this.client = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort()).build(); + DurableTaskGrpcClientBuilder builder = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort()); + + // Add interceptor for function invocation ID correlation if set + if (this.functionInvocationId != null && !this.functionInvocationId.isEmpty()) { + builder.addInterceptor(new FunctionInvocationIdInterceptor(this.functionInvocationId)); + } + + this.client = builder.build(); return this.client; } diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptor.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptor.java new file mode 100644 index 00000000..539f0ee3 --- /dev/null +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptor.java @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask.azurefunctions.internal; + +import io.grpc.*; + +/** + * A gRPC client interceptor that adds the Azure Functions invocation ID to outgoing calls + * for correlation with host-side logs. + */ +public final class FunctionInvocationIdInterceptor implements ClientInterceptor { + private static final String INVOCATION_ID_METADATA_KEY_NAME = "x-azure-functions-invocationid"; + private static final Metadata.Key INVOCATION_ID_KEY = + Metadata.Key.of(INVOCATION_ID_METADATA_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER); + + private final String invocationId; + + /** + * Creates a new interceptor that will add the specified invocation ID to all gRPC calls. + * + * @param invocationId the Azure Functions invocation ID to add to calls + */ + public FunctionInvocationIdInterceptor(String invocationId) { + this.invocationId = invocationId; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + if (invocationId != null && !invocationId.isEmpty()) { + headers.put(INVOCATION_ID_KEY, invocationId); + } + super.start(responseListener, headers); + } + }; + } +} diff --git a/azurefunctions/src/test/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptorTests.java b/azurefunctions/src/test/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptorTests.java new file mode 100644 index 00000000..be64e10f --- /dev/null +++ b/azurefunctions/src/test/java/com/microsoft/durabletask/azurefunctions/internal/FunctionInvocationIdInterceptorTests.java @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.azurefunctions.internal; + +import io.grpc.*; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +/** + * Tests for FunctionInvocationIdInterceptor. + */ +public class FunctionInvocationIdInterceptorTests { + + private static final Metadata.Key INVOCATION_ID_KEY = + Metadata.Key.of("x-azure-functions-invocationid", Metadata.ASCII_STRING_MARSHALLER); + + @Test + public void interceptCall_addsInvocationIdToMetadata() { + // Arrange + String testInvocationId = "test-invocation-id-123"; + FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(testInvocationId); + + Channel mockChannel = mock(Channel.class); + ClientCall mockCall = mock(ClientCall.class); + MethodDescriptor mockMethod = mock(MethodDescriptor.class); + CallOptions callOptions = CallOptions.DEFAULT; + + when(mockChannel.newCall(any(), any())).thenReturn(mockCall); + + // Act + ClientCall interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel); + + // Assert - Start the call to trigger the metadata modification + Metadata headers = new Metadata(); + interceptedCall.start(mock(ClientCall.Listener.class), headers); + + // Verify the invocation ID was added to the headers + assertEquals(testInvocationId, headers.get(INVOCATION_ID_KEY)); + } + + @Test + public void interceptCall_withNullInvocationId_doesNotAddHeader() { + // Arrange + FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null); + + Channel mockChannel = mock(Channel.class); + ClientCall mockCall = mock(ClientCall.class); + MethodDescriptor mockMethod = mock(MethodDescriptor.class); + CallOptions callOptions = CallOptions.DEFAULT; + + when(mockChannel.newCall(any(), any())).thenReturn(mockCall); + + // Act + ClientCall interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel); + + // Assert - Start the call to trigger the metadata modification + Metadata headers = new Metadata(); + interceptedCall.start(mock(ClientCall.Listener.class), headers); + + // Verify no invocation ID was added + assertNull(headers.get(INVOCATION_ID_KEY)); + } + + @Test + public void interceptCall_withEmptyInvocationId_doesNotAddHeader() { + // Arrange + FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(""); + + Channel mockChannel = mock(Channel.class); + ClientCall mockCall = mock(ClientCall.class); + MethodDescriptor mockMethod = mock(MethodDescriptor.class); + CallOptions callOptions = CallOptions.DEFAULT; + + when(mockChannel.newCall(any(), any())).thenReturn(mockCall); + + // Act + ClientCall interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel); + + // Assert - Start the call to trigger the metadata modification + Metadata headers = new Metadata(); + interceptedCall.start(mock(ClientCall.Listener.class), headers); + + // Verify no invocation ID was added + assertNull(headers.get(INVOCATION_ID_KEY)); + } + + @Test + public void constructor_acceptsValidInvocationId() { + // Act & Assert - no exception should be thrown + FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("valid-id"); + assertNotNull(interceptor); + } + + @Test + public void constructor_acceptsNull() { + // Act & Assert - no exception should be thrown + FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null); + assertNotNull(interceptor); + } +} diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 52d072b8..84e7394a 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -58,6 +58,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { sidecarGrpcChannel = this.managedSidecarChannel; } + // Apply any interceptors that were configured in the builder + List interceptors = builder.getInterceptors(); + if (!interceptors.isEmpty()) { + sidecarGrpcChannel = ClientInterceptors.intercept(sidecarGrpcChannel, interceptors); + } + this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java index 1a1cb6f2..4a0e1b04 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java @@ -3,6 +3,11 @@ package com.microsoft.durabletask; import io.grpc.Channel; +import io.grpc.ClientInterceptor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process @@ -13,6 +18,7 @@ public final class DurableTaskGrpcClientBuilder { int port; Channel channel; String defaultVersion; + List interceptors = new ArrayList<>(); /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -65,6 +71,32 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) { return this; } + /** + * Adds a gRPC client interceptor to be applied to all gRPC calls made by the client. + *

+ * Interceptors can be used to add custom headers, logging, or other cross-cutting concerns + * to gRPC calls. Multiple interceptors can be added and will be applied in the order they + * were added. + * + * @param interceptor the gRPC client interceptor to add + * @return this builder object + */ + public DurableTaskGrpcClientBuilder addInterceptor(ClientInterceptor interceptor) { + if (interceptor != null) { + this.interceptors.add(interceptor); + } + return this; + } + + /** + * Gets the list of interceptors that have been added to this builder. + * + * @return an unmodifiable list of interceptors + */ + List getInterceptors() { + return Collections.unmodifiableList(this.interceptors); + } + /** * Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object. * @return a new {@link DurableTaskClient} object