Skip to content

Commit e1b4abd

Browse files
authored
Merge pull request #54 from salesforce/feature/context
Feature/context
2 parents 0ebb44f + c41f503 commit e1b4abd

12 files changed

+1244
-0
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.context;
9+
10+
import io.grpc.Context;
11+
import io.grpc.Metadata;
12+
13+
import javax.annotation.Nullable;
14+
import javax.annotation.concurrent.NotThreadSafe;
15+
import java.util.Set;
16+
17+
import static com.google.common.base.Preconditions.*;
18+
19+
20+
/**
21+
* {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor}
22+
* and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata}
23+
* class.
24+
*
25+
* <p>This class is not thread safe, implementations should ensure that ambient context reads and writes do
26+
* not occur in multiple threads concurrently.
27+
*
28+
* <p>See package javadoc for more info.
29+
*/
30+
@NotThreadSafe
31+
public final class AmbientContext {
32+
static final Context.Key<AmbientContext> DATA_KEY = Context.key("AmbientContext");
33+
34+
/**
35+
* Attaches an empty ambient context to the provided gRPC {@code Context}.
36+
*
37+
* @throws IllegalStateException if an ambient context has already been attached to the
38+
* provided gRPC {@code Context}.
39+
*/
40+
public static Context initialize(Context context) {
41+
checkNotNull(context, "context");
42+
checkState(DATA_KEY.get(context) == null,
43+
"AmbientContext has already been created in the scope of the current context");
44+
return context.withValue(DATA_KEY, new AmbientContext());
45+
}
46+
47+
/**
48+
* Returns the ambient context attached to the current gRPC {@code Context}.
49+
*
50+
* @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}.
51+
*/
52+
public static AmbientContext current() {
53+
checkState(DATA_KEY.get() != null,
54+
"AmbientContext has not yet been created in the scope of the current context");
55+
return DATA_KEY.get();
56+
}
57+
58+
/**
59+
* @return true if an {@code AmbientContext} is attached to the current gRPC context.
60+
*/
61+
public static boolean isPresent() {
62+
return DATA_KEY.get() != null;
63+
}
64+
65+
private Metadata contextMetadata;
66+
private Object freezeKey = null;
67+
68+
AmbientContext() {
69+
this.contextMetadata = new Metadata();
70+
}
71+
72+
/**
73+
* Copy constructor.
74+
*/
75+
AmbientContext(AmbientContext other) {
76+
this();
77+
this.contextMetadata.merge(other.contextMetadata);
78+
}
79+
80+
/**
81+
* Makes the AmbientContext as read-only, preventing any further modification. A "freeze key" is returned, which
82+
* can be used to {@link #thaw(Object)} the AmbientContext in the future.
83+
*
84+
* <p>{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the
85+
* interceptor chain completes.
86+
*
87+
* @return a "freeze key" that can be used passed to {@link #thaw(Object)}
88+
*
89+
* @throws IllegalStateException if the AmbientContext is already frozen
90+
*/
91+
public Object freeze() {
92+
checkState(!isFrozen(), "AmbientContext already frozen. Cannot freeze() twice.");
93+
freezeKey = new Object();
94+
return freezeKey;
95+
}
96+
97+
/**
98+
* Makes the AmbientContext mutable again, after {@link #freeze()} has been called. A "freeze key" is needed to
99+
* unfreeze the AmbientContext, ensuring only the code that froze the context can subsequently thaw it.
100+
*
101+
* <p>{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the
102+
* interceptor chain completes.
103+
*
104+
* @param freezeKey the "freeze key" returned by {@link #freeze()}
105+
*
106+
* @throws IllegalStateException if the AmbientContext has not yet been frozen
107+
* @throws IllegalArgumentException if the {@code freezeKey} is incorrect
108+
*/
109+
public void thaw(Object freezeKey) {
110+
checkState(isFrozen(), "AmbientContext is not frozen. Cannot thaw().");
111+
checkArgument(this.freezeKey == freezeKey,
112+
"The provided freezeKey is not the same object returned by freeze()");
113+
this.freezeKey = null;
114+
}
115+
116+
/**
117+
* Similar to {@link #initialize(Context)}, {@code fork()} attaches a shallow clone of this {@code AmbientContext}
118+
* to a provided gRPC {@code Context}. Use {@code fork()} when you want create a temporary context scope.
119+
*
120+
* @param context
121+
* @return
122+
*/
123+
public Context fork(Context context) {
124+
return context.withValue(DATA_KEY, new AmbientContext(this));
125+
}
126+
127+
/**
128+
* @return true of the AmbientContext has been frozen
129+
*/
130+
public boolean isFrozen() {
131+
return freezeKey != null;
132+
}
133+
134+
private void checkFreeze() {
135+
checkState(freezeKey == null, "AmbientContext cannot be modified while frozen");
136+
}
137+
138+
/**
139+
* Returns true if a value is defined for the given key.
140+
*
141+
* <p>This is done by linear search, so if it is followed by {@link #get} or {@link #getAll},
142+
* prefer calling them directly and checking the return value against {@code null}.
143+
*/
144+
public boolean containsKey(Metadata.Key<?> key) {
145+
return contextMetadata.containsKey(key);
146+
}
147+
148+
/**
149+
* Remove all values for the given key without returning them. This is a minor performance
150+
* optimization if you do not need the previous values.
151+
*
152+
* @throws IllegalStateException if the AmbientContext is frozen
153+
*/
154+
public <T> void discardAll(Metadata.Key<T> key) {
155+
checkFreeze();
156+
contextMetadata.discardAll(key);
157+
}
158+
159+
/**
160+
* Returns the last ambient context entry added with the name 'name' parsed as T.
161+
*
162+
* @return the parsed metadata entry or null if there are none.
163+
*/
164+
@Nullable
165+
public <T> T get(Metadata.Key<T> key) {
166+
return contextMetadata.get(key);
167+
}
168+
169+
/**
170+
* Returns all the ambient context entries named 'name', in the order they were received, parsed as T, or
171+
* null if there are none. The iterator is not guaranteed to be "live." It may or may not be
172+
* accurate if the ambient context is mutated.
173+
*/
174+
@Nullable
175+
public <T> Iterable<T> getAll(final Metadata.Key<T> key) {
176+
return contextMetadata.getAll(key);
177+
}
178+
179+
/**
180+
* Returns set of all keys in store.
181+
*
182+
* @return unmodifiable Set of keys
183+
*/
184+
public Set<String> keys() {
185+
return contextMetadata.keys();
186+
}
187+
188+
/**
189+
* Adds the {@code key, value} pair. If {@code key} already has values, {@code value} is added to
190+
* the end. Duplicate values for the same key are permitted.
191+
*
192+
* @throws NullPointerException if key or value is null
193+
* @throws IllegalStateException if the AmbientContext is frozen
194+
*/
195+
public <T> void put(Metadata.Key<T> key, T value) {
196+
checkFreeze();
197+
contextMetadata.put(key, value);
198+
}
199+
200+
/**
201+
* Removes the first occurrence of {@code value} for {@code key}.
202+
*
203+
* @param key key for value
204+
* @param value value
205+
* @return {@code true} if {@code value} removed; {@code false} if {@code value} was not present
206+
*
207+
* @throws NullPointerException if {@code key} or {@code value} is null
208+
* @throws IllegalStateException if the AmbientContext is frozen
209+
*/
210+
public <T> boolean remove(Metadata.Key<T> key, T value) {
211+
checkFreeze();
212+
return contextMetadata.remove(key, value);
213+
}
214+
215+
/**
216+
* Remove all values for the given key. If there were no values, {@code null} is returned.
217+
*
218+
* @throws IllegalStateException if the AmbientContext is frozen
219+
*/
220+
public <T> Iterable<T> removeAll(Metadata.Key<T> key) {
221+
checkFreeze();
222+
return contextMetadata.removeAll(key);
223+
}
224+
225+
@Override
226+
public String toString() {
227+
return (isFrozen() ? "[FROZEN] " : "[THAWED] ") + contextMetadata.toString();
228+
}
229+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.context;
9+
10+
import io.grpc.*;
11+
12+
/**
13+
* {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into an ambient context.
14+
* Header values can be accessed using the {@link AmbientContext} class.
15+
*
16+
* <p>Each {@code AmbientContextClientInterceptor} marshals headers with a know prefix. If multiple prefixes are needed,
17+
* add multiple {@code AmbientContextClientInterceptor} instances to the gRPC interceptor chain.
18+
*
19+
* <p>See package javadoc for more info.
20+
*/
21+
public class AmbientContextClientInterceptor implements ClientInterceptor {
22+
private String headerPrefix;
23+
24+
/**
25+
* Constructs an {@code AmbientContextClientInterceptor} that marshals request headers with a know prefix into the
26+
* {@link AmbientContext}.
27+
*
28+
* @param headerPrefix the header prefix to marshal.
29+
*/
30+
public AmbientContextClientInterceptor(String headerPrefix) {
31+
this.headerPrefix = headerPrefix;
32+
}
33+
34+
@Override
35+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
36+
if (AmbientContext.isPresent()) {
37+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
38+
@Override
39+
public void start(Listener<RespT> responseListener, Metadata headers) {
40+
AmbientContext ctx = AmbientContext.current();
41+
if (ctx != null) {
42+
for (String keyString : ctx.keys()) {
43+
if (keyString.startsWith(headerPrefix)) {
44+
if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
45+
Metadata.Key<byte[]> key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER);
46+
Iterable<byte[]> values = ctx.getAll(key);
47+
if (values != null) {
48+
for (byte[] value : values) {
49+
headers.put(key, value);
50+
}
51+
}
52+
} else {
53+
Metadata.Key<String> key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER);
54+
Iterable<String> values = ctx.getAll(key);
55+
if (values != null) {
56+
for (String value : values) {
57+
headers.put(key, value);
58+
}
59+
}
60+
}
61+
}
62+
}
63+
}
64+
super.start(responseListener, headers);
65+
}
66+
};
67+
} else {
68+
// Noop if ambient context is absent
69+
return next.newCall(method, callOptions);
70+
}
71+
}
72+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.context;
9+
10+
import io.grpc.*;
11+
12+
/**
13+
* Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context
14+
* to downstream services. This typically happens when multi-threading code fails to correctly transfer the gRPC
15+
* context to worker threads.
16+
*
17+
* <p>{@code AmbientContextEnforcerClientInterceptor} is used to enforce context propagation by <i>catastrophically</i>
18+
* failing downstream service calls if the ambient context is missing or incomplete.
19+
*/
20+
public class AmbientContextEnforcerClientInterceptor implements ClientInterceptor {
21+
private String[] requiredContextKeys = {};
22+
23+
/**
24+
* Constructs an {@code AmbientContextEnforcerClientInterceptor} with no required context keys.
25+
*/
26+
public AmbientContextEnforcerClientInterceptor() {
27+
}
28+
29+
/**
30+
* Constructs an {@code AmbientContextEnforcerClientInterceptor} with a set of required context keys.
31+
*/
32+
public AmbientContextEnforcerClientInterceptor(String... requiredContextKeys) {
33+
this.requiredContextKeys = requiredContextKeys;
34+
}
35+
36+
/**
37+
* MissingAmbientContextException is thrown to indicate a breakdown in context continuity between services.
38+
*/
39+
public static class MissingAmbientContextException extends RuntimeException {
40+
public MissingAmbientContextException(String message) {
41+
super(message);
42+
}
43+
}
44+
45+
@Override
46+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
47+
// Throw if ambient context is missing
48+
if (!AmbientContext.isPresent()) {
49+
throw missingContextException();
50+
}
51+
52+
// Throw if required context keys are missing
53+
for (String requiredKey : requiredContextKeys) {
54+
if (!AmbientContext.current().keys().contains(requiredKey)) {
55+
throw incompleteContextException(requiredKey);
56+
}
57+
}
58+
59+
return next.newCall(method, callOptions);
60+
}
61+
62+
/**
63+
* Override this method to change the exception type or message thrown when the ambient context is missing
64+
* entirely, perhaps to reference your own internal documentation.
65+
*
66+
* @return a RuntimeException
67+
*/
68+
protected RuntimeException missingContextException() {
69+
return new MissingAmbientContextException("No AmbientContext is attached to the current gRPC Context. " +
70+
"Make sure Context is correctly transferred between worker threads using Context.wrap() or " +
71+
"Context.currentContextExecutor().");
72+
}
73+
74+
/**
75+
* Override this method to change the exception type or message thrown when the ambient context is missing a
76+
* required context key, perhaps to reference your own internal documentation.
77+
*
78+
* @return a RuntimeException
79+
*/
80+
protected RuntimeException incompleteContextException(String missingKey) {
81+
return new MissingAmbientContextException("The AmbientContext is missing a required context key: " + missingKey);
82+
}
83+
}

0 commit comments

Comments
 (0)