Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
* Add client operation correlation logging: `FunctionInvocationId` is now propagated via gRPC metadata to the host for client operations, enabling correlation with host logs.

## v1.6.2
* Fixing gRPC channel shutdown ([#249](https://github.com/microsoft/durabletask-java/pull/249))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import com.microsoft.durabletask.azurefunctions.internal.FunctionInvocationIdInterceptor;

import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
Expand All @@ -29,6 +30,7 @@ public class DurableClientContext {
private String taskHubName;
private String requiredQueryStringParameters;
private DurableTaskClient client;
private String functionInvocationId;

/**
* Gets the name of the client binding's task hub.
Expand All @@ -39,6 +41,18 @@ public String getTaskHubName() {
return this.taskHubName;
}

/**
* Sets the function invocation ID for correlation with host-side logs.
* <p>
* Call this method before calling {@link #getClient()} to enable correlation
* between client operations and host-side logs.
*
* @param invocationId the Azure Functions invocation ID
*/
public void setFunctionInvocationId(String invocationId) {
this.functionInvocationId = invocationId;
}

/**
* Gets the durable task client associated with the current function invocation.
*
Expand All @@ -56,7 +70,14 @@ public DurableTaskClient getClient() {
throw new IllegalStateException("The client context RPC base URL was invalid!", ex);
}

this.client = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort()).build();
DurableTaskGrpcClientBuilder builder = new DurableTaskGrpcClientBuilder().port(rpcURL.getPort());

// Add interceptor for function invocation ID correlation if set
if (this.functionInvocationId != null && !this.functionInvocationId.isEmpty()) {
builder.addInterceptor(new FunctionInvocationIdInterceptor(this.functionInvocationId));
}

this.client = builder.build();
return this.client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.durabletask.azurefunctions.internal;

import io.grpc.*;

/**
* A gRPC client interceptor that adds the Azure Functions invocation ID to outgoing calls
* for correlation with host-side logs.
*/
public final class FunctionInvocationIdInterceptor implements ClientInterceptor {
private static final String INVOCATION_ID_METADATA_KEY_NAME = "x-azure-functions-invocationid";
private static final Metadata.Key<String> INVOCATION_ID_KEY =
Metadata.Key.of(INVOCATION_ID_METADATA_KEY_NAME, Metadata.ASCII_STRING_MARSHALLER);

private final String invocationId;

/**
* Creates a new interceptor that will add the specified invocation ID to all gRPC calls.
*
* @param invocationId the Azure Functions invocation ID to add to calls
*/
public FunctionInvocationIdInterceptor(String invocationId) {
this.invocationId = invocationId;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {

return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (invocationId != null && !invocationId.isEmpty()) {
headers.put(INVOCATION_ID_KEY, invocationId);
}
super.start(responseListener, headers);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask.azurefunctions.internal;

import io.grpc.*;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

/**
* Tests for FunctionInvocationIdInterceptor.
*/
public class FunctionInvocationIdInterceptorTests {

private static final Metadata.Key<String> INVOCATION_ID_KEY =
Metadata.Key.of("x-azure-functions-invocationid", Metadata.ASCII_STRING_MARSHALLER);

@Test
public void interceptCall_addsInvocationIdToMetadata() {
// Arrange
String testInvocationId = "test-invocation-id-123";
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(testInvocationId);

Channel mockChannel = mock(Channel.class);
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
CallOptions callOptions = CallOptions.DEFAULT;

when(mockChannel.newCall(any(), any())).thenReturn(mockCall);

// Act
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);

// Assert - Start the call to trigger the metadata modification
Metadata headers = new Metadata();
interceptedCall.start(mock(ClientCall.Listener.class), headers);

// Verify the invocation ID was added to the headers
assertEquals(testInvocationId, headers.get(INVOCATION_ID_KEY));
}

@Test
public void interceptCall_withNullInvocationId_doesNotAddHeader() {
// Arrange
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null);

Channel mockChannel = mock(Channel.class);
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
CallOptions callOptions = CallOptions.DEFAULT;

when(mockChannel.newCall(any(), any())).thenReturn(mockCall);

// Act
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);

// Assert - Start the call to trigger the metadata modification
Metadata headers = new Metadata();
interceptedCall.start(mock(ClientCall.Listener.class), headers);

// Verify no invocation ID was added
assertNull(headers.get(INVOCATION_ID_KEY));
}

@Test
public void interceptCall_withEmptyInvocationId_doesNotAddHeader() {
// Arrange
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("");

Channel mockChannel = mock(Channel.class);
ClientCall<Object, Object> mockCall = mock(ClientCall.class);
MethodDescriptor<Object, Object> mockMethod = mock(MethodDescriptor.class);
CallOptions callOptions = CallOptions.DEFAULT;

when(mockChannel.newCall(any(), any())).thenReturn(mockCall);

// Act
ClientCall<Object, Object> interceptedCall = interceptor.interceptCall(mockMethod, callOptions, mockChannel);

// Assert - Start the call to trigger the metadata modification
Metadata headers = new Metadata();
interceptedCall.start(mock(ClientCall.Listener.class), headers);

// Verify no invocation ID was added
assertNull(headers.get(INVOCATION_ID_KEY));
}

@Test
public void constructor_acceptsValidInvocationId() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor("valid-id");
assertNotNull(interceptor);
}

@Test
public void constructor_acceptsNull() {
// Act & Assert - no exception should be thrown
FunctionInvocationIdInterceptor interceptor = new FunctionInvocationIdInterceptor(null);
assertNotNull(interceptor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
sidecarGrpcChannel = this.managedSidecarChannel;
}

// Apply any interceptors that were configured in the builder
List<ClientInterceptor> interceptors = builder.getInterceptors();
if (!interceptors.isEmpty()) {
sidecarGrpcChannel = ClientInterceptors.intercept(sidecarGrpcChannel, interceptors);
}

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
package com.microsoft.durabletask;

import io.grpc.Channel;
import io.grpc.ClientInterceptor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process
Expand All @@ -13,6 +18,7 @@ public final class DurableTaskGrpcClientBuilder {
int port;
Channel channel;
String defaultVersion;
List<ClientInterceptor> interceptors = new ArrayList<>();

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -65,6 +71,32 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
return this;
}

/**
* Adds a gRPC client interceptor to be applied to all gRPC calls made by the client.
* <p>
* Interceptors can be used to add custom headers, logging, or other cross-cutting concerns
* to gRPC calls. Multiple interceptors can be added and will be applied in the order they
* were added.
*
* @param interceptor the gRPC client interceptor to add
* @return this builder object
*/
public DurableTaskGrpcClientBuilder addInterceptor(ClientInterceptor interceptor) {
if (interceptor != null) {
this.interceptors.add(interceptor);
}
return this;
}

/**
* Gets the list of interceptors that have been added to this builder.
*
* @return an unmodifiable list of interceptors
*/
List<ClientInterceptor> getInterceptors() {
return Collections.unmodifiableList(this.interceptors);
}

/**
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskClient} object
Expand Down
Loading