Skip to content

Commit 902d56d

Browse files
feat: add in process channel support (#31)
* feat: add in process channel support * test: add tests for inprocess and copy constructor
1 parent 84e5de1 commit 902d56d

File tree

5 files changed

+200
-13
lines changed

5 files changed

+200
-13
lines changed

grpc-client-utils/build.gradle.kts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@ dependencies {
1111
because("https://snyk.io/vuln/SNYK-JAVA-IONETTY-2812456")
1212
}
1313
}
14-
api(platform("io.grpc:grpc-bom:1.45.1"))
14+
api(platform("io.grpc:grpc-bom:1.47.0"))
1515
api("io.grpc:grpc-context")
1616
api("io.grpc:grpc-api")
1717

1818
implementation(project(":grpc-context-utils"))
1919
implementation("org.slf4j:slf4j-api:1.7.36")
20+
implementation("io.grpc:grpc-core")
2021

21-
annotationProcessor("org.projectlombok:lombok:1.18.22")
22-
compileOnly("org.projectlombok:lombok:1.18.22")
22+
annotationProcessor("org.projectlombok:lombok:1.18.24")
23+
compileOnly("org.projectlombok:lombok:1.18.24")
2324

2425
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
2526
testImplementation("org.mockito:mockito-core:4.4.0")

grpc-client-utils/src/main/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistry.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,25 @@
99
import java.util.Objects;
1010
import java.util.concurrent.ConcurrentHashMap;
1111
import java.util.concurrent.TimeUnit;
12+
import java.util.function.Supplier;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

1516
public class GrpcChannelRegistry {
1617
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
17-
private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>();
18+
private final Map<String, ManagedChannel> channelMap;
1819
private volatile boolean isShutdown = false;
1920

21+
public GrpcChannelRegistry(GrpcChannelRegistry sourceRegistry) {
22+
// Copy constructor
23+
this.isShutdown = sourceRegistry.isShutdown();
24+
this.channelMap = new ConcurrentHashMap<>(sourceRegistry.channelMap);
25+
}
26+
27+
public GrpcChannelRegistry() {
28+
this.channelMap = new ConcurrentHashMap<>();
29+
}
30+
2031
/**
2132
* Use either {@link #forSecureAddress(String, int)} or {@link #forPlaintextAddress(String, int)}
2233
*/
@@ -30,44 +41,56 @@ public ManagedChannel forSecureAddress(String host, int port) {
3041
}
3142

3243
public ManagedChannel forSecureAddress(String host, int port, GrpcChannelConfig config) {
33-
assert !this.isShutdown;
44+
assert !this.isShutdown();
3445
String channelId = this.getChannelId(host, port, false, config);
35-
return this.channelMap.computeIfAbsent(
36-
channelId, unused -> this.buildNewChannel(host, port, false, config));
46+
return this.getOrComputeChannel(
47+
channelId, () -> this.buildNewChannel(host, port, false, config));
3748
}
3849

3950
public ManagedChannel forPlaintextAddress(String host, int port) {
4051
return forPlaintextAddress(host, port, GrpcChannelConfig.builder().build());
4152
}
4253

4354
public ManagedChannel forPlaintextAddress(String host, int port, GrpcChannelConfig config) {
44-
assert !this.isShutdown;
55+
assert !this.isShutdown();
4556
String channelId = this.getChannelId(host, port, true, config);
46-
return this.channelMap.computeIfAbsent(
47-
channelId, unused -> this.buildNewChannel(host, port, true, config));
57+
return this.getOrComputeChannel(
58+
channelId, () -> this.buildNewChannel(host, port, true, config));
4859
}
4960

5061
private ManagedChannel buildNewChannel(
5162
String host, int port, boolean isPlaintext, GrpcChannelConfig config) {
5263
LOG.info("Creating new channel {}", this.getChannelId(host, port, isPlaintext, config));
5364

54-
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(host, port);
65+
ManagedChannelBuilder<?> builder = this.getBuilderForAddress(host, port);
5566
if (isPlaintext) {
5667
builder.usePlaintext();
5768
}
69+
return this.configureAndBuildChannel(builder, config);
70+
}
71+
72+
ManagedChannelBuilder<?> getBuilderForAddress(String host, int port) {
73+
return ManagedChannelBuilder.forAddress(host, port);
74+
}
5875

76+
ManagedChannel configureAndBuildChannel(
77+
ManagedChannelBuilder<?> builder, GrpcChannelConfig config) {
5978
if (config.getMaxInboundMessageSize() != null) {
6079
builder.maxInboundMessageSize(config.getMaxInboundMessageSize());
6180
}
6281
return builder.build();
6382
}
6483

65-
private String getChannelId(
66-
String host, int port, boolean isPlaintext, GrpcChannelConfig config) {
84+
String getChannelId(String host, int port, boolean isPlaintext, GrpcChannelConfig config) {
6785
String securePrefix = isPlaintext ? "plaintext" : "secure";
6886
return securePrefix + ":" + host + ":" + port + ":" + Objects.hash(config);
6987
}
7088

89+
final ManagedChannel getOrComputeChannel(
90+
String channelId, Supplier<ManagedChannel> channelSupplier) {
91+
return this.channelMap.computeIfAbsent(channelId, unused -> channelSupplier.get());
92+
}
93+
7194
/**
7295
* Shuts down channels using a default deadline of 1 minute.
7396
*
@@ -109,6 +132,10 @@ public void shutdown(Deadline deadline) {
109132
this.channelMap.clear();
110133
}
111134

135+
public boolean isShutdown() {
136+
return this.isShutdown;
137+
}
138+
112139
private void initiateChannelShutdown(String channelId, ManagedChannel managedChannel) {
113140
LOG.info("Starting shutdown for channel [{}]", channelId);
114141
managedChannel.shutdown();
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.hypertrace.core.grpcutils.client;
2+
3+
import io.grpc.ManagedChannel;
4+
import io.grpc.ManagedChannelBuilder;
5+
import io.grpc.inprocess.InProcessChannelBuilder;
6+
import java.util.Collections;
7+
import java.util.Map;
8+
import java.util.Objects;
9+
import java.util.Optional;
10+
11+
public class InProcessGrpcChannelRegistry extends GrpcChannelRegistry {
12+
private final Map<String, String> authorityToInProcessNamedOverride;
13+
14+
public InProcessGrpcChannelRegistry() {
15+
this(Collections.emptyMap());
16+
}
17+
18+
public InProcessGrpcChannelRegistry(Map<String, String> authorityToInProcessNamedOverride) {
19+
this.authorityToInProcessNamedOverride = authorityToInProcessNamedOverride;
20+
}
21+
22+
public InProcessGrpcChannelRegistry(GrpcChannelRegistry sourceRegistry) {
23+
super(sourceRegistry);
24+
if (sourceRegistry instanceof InProcessGrpcChannelRegistry) {
25+
this.authorityToInProcessNamedOverride =
26+
Map.copyOf(
27+
((InProcessGrpcChannelRegistry) sourceRegistry).authorityToInProcessNamedOverride);
28+
} else {
29+
this.authorityToInProcessNamedOverride = Collections.emptyMap();
30+
}
31+
}
32+
33+
public ManagedChannel forName(String name) {
34+
return this.forName(name, GrpcChannelConfig.builder().build());
35+
}
36+
37+
public ManagedChannel forName(String name, GrpcChannelConfig config) {
38+
assert !this.isShutdown();
39+
return this.getOrComputeChannel(
40+
this.getInProcessChannelId(name, config), () -> this.buildInProcessChannel(name, config));
41+
}
42+
43+
String getChannelId(String host, int port, boolean isPlaintext, GrpcChannelConfig config) {
44+
return this.getInProcessOverrideNameForHostAndPort(host, port)
45+
.map(name -> this.getInProcessChannelId(name, config))
46+
.orElseGet(() -> super.getChannelId(host, port, isPlaintext, config));
47+
}
48+
49+
@Override
50+
ManagedChannelBuilder<?> getBuilderForAddress(String host, int port) {
51+
return this.getInProcessOverrideNameForHostAndPort(host, port)
52+
.<ManagedChannelBuilder<?>>map(InProcessChannelBuilder::forName)
53+
.orElseGet(() -> super.getBuilderForAddress(host, port));
54+
}
55+
56+
ManagedChannel buildInProcessChannel(String name, GrpcChannelConfig config) {
57+
return this.configureAndBuildChannel(InProcessChannelBuilder.forName(name), config);
58+
}
59+
60+
String getInProcessChannelId(String name, GrpcChannelConfig config) {
61+
return "inprocess:" + name + ":" + Objects.hash(config);
62+
}
63+
64+
private Optional<String> getInProcessOverrideNameForHostAndPort(String host, int port) {
65+
return Optional.ofNullable(
66+
this.authorityToInProcessNamedOverride.get(this.toAuthority(host, port)));
67+
}
68+
69+
private String toAuthority(String host, int port) {
70+
return host + ":" + port;
71+
}
72+
}

grpc-client-utils/src/test/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistryTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.grpc.Deadline.Ticker;
1717
import io.grpc.ManagedChannel;
1818
import io.grpc.ManagedChannelBuilder;
19+
import java.util.Map;
1920
import java.util.concurrent.TimeUnit;
2021
import org.junit.jupiter.api.BeforeEach;
2122
import org.junit.jupiter.api.Test;
@@ -127,4 +128,13 @@ void throwsIfNewChannelRequestedAfterShutdown() {
127128
assertThrows(AssertionError.class, () -> this.channelRegistry.forPlaintextAddress("foo", 1000));
128129
assertThrows(AssertionError.class, () -> this.channelRegistry.forSecureAddress("foo", 1000));
129130
}
131+
132+
@Test
133+
void copyConstructorReusesExistingChannels() {
134+
GrpcChannelRegistry firstRegistry = new GrpcChannelRegistry();
135+
136+
Channel firstChannel = firstRegistry.forSecureAddress("foo", 1000);
137+
138+
assertSame(firstChannel, new GrpcChannelRegistry(firstRegistry).forSecureAddress("foo", 1000));
139+
}
130140
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.hypertrace.core.grpcutils.client;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotSame;
5+
import static org.junit.jupiter.api.Assertions.assertSame;
6+
7+
import io.grpc.Channel;
8+
import java.util.Map;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
12+
class InProcessGrpcChannelRegistryTest {
13+
InProcessGrpcChannelRegistry channelRegistry;
14+
15+
@BeforeEach
16+
void beforeEach() {
17+
this.channelRegistry = new InProcessGrpcChannelRegistry();
18+
}
19+
20+
@Test
21+
void createsNewChannelsAsRequested() {
22+
// Regular channel
23+
assertEquals("foo:1000", this.channelRegistry.forPlaintextAddress("foo", 1000).authority());
24+
assertEquals("foo:1000", this.channelRegistry.forSecureAddress("foo", 1000).authority());
25+
// In process runs on localhost
26+
assertEquals("localhost", this.channelRegistry.forName("inprocess-name").authority());
27+
}
28+
29+
@Test
30+
void reusesInProcessChannels() {
31+
assertSame(
32+
this.channelRegistry.forName("inprocess-name"),
33+
this.channelRegistry.forName("inprocess-name"));
34+
35+
// But not if their config differs
36+
assertNotSame(
37+
this.channelRegistry.forName("inprocess-name"),
38+
this.channelRegistry.forName(
39+
"inprocess-name", GrpcChannelConfig.builder().maxInboundMessageSize(100).build()));
40+
}
41+
42+
@Test
43+
void overridesAuthorityByConfig() {
44+
this.channelRegistry = new InProcessGrpcChannelRegistry(Map.of("foo:1000", "inprocess-name"));
45+
assertSame(
46+
this.channelRegistry.forSecureAddress("foo", 1000),
47+
this.channelRegistry.forName("inprocess-name"));
48+
assertSame(
49+
this.channelRegistry.forPlaintextAddress("foo", 1000),
50+
this.channelRegistry.forName("inprocess-name"));
51+
// Also works with custom config
52+
53+
assertSame(
54+
this.channelRegistry.forSecureAddress(
55+
"foo", 1000, GrpcChannelConfig.builder().maxInboundMessageSize(100).build()),
56+
this.channelRegistry.forName(
57+
"inprocess-name", GrpcChannelConfig.builder().maxInboundMessageSize(100).build()));
58+
59+
// but custom config shouldn't match the default config ones
60+
assertNotSame(
61+
this.channelRegistry.forSecureAddress("foo", 1000),
62+
this.channelRegistry.forName(
63+
"inprocess-name", GrpcChannelConfig.builder().maxInboundMessageSize(100).build()));
64+
}
65+
66+
@Test
67+
void copyConstructorReusesExistingChannelsAndOverrides() {
68+
InProcessGrpcChannelRegistry firstRegistry =
69+
new InProcessGrpcChannelRegistry(Map.of("foo:1000", "inprocess-name"));
70+
71+
Channel firstChannel = firstRegistry.forName("inprocess-name");
72+
73+
assertSame(
74+
firstChannel,
75+
new InProcessGrpcChannelRegistry(firstRegistry).forSecureAddress("foo", 1000));
76+
}
77+
}

0 commit comments

Comments
 (0)