Skip to content

Commit 789a996

Browse files
committed
Benchmark old and new approaches at different levels of concurrency
1 parent 9309773 commit 789a996

File tree

4 files changed

+328
-0
lines changed

4 files changed

+328
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package datadog.trace.bootstrap;
2+
3+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
4+
import static org.openjdk.jmh.annotations.Mode.AverageTime;
5+
6+
import datadog.trace.bootstrap.weakmap.WeakMapContextStore;
7+
import datadog.trace.bootstrap.weakmap.WeakMaps;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.function.BiFunction;
12+
import org.openjdk.jmh.annotations.Benchmark;
13+
import org.openjdk.jmh.annotations.BenchmarkMode;
14+
import org.openjdk.jmh.annotations.Fork;
15+
import org.openjdk.jmh.annotations.Level;
16+
import org.openjdk.jmh.annotations.OutputTimeUnit;
17+
import org.openjdk.jmh.annotations.Param;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.Setup;
20+
import org.openjdk.jmh.annotations.State;
21+
import org.openjdk.jmh.annotations.Threads;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
@State(Scope.Benchmark)
25+
@BenchmarkMode(AverageTime)
26+
@OutputTimeUnit(MICROSECONDS)
27+
@SuppressWarnings({"unused", "rawtypes", "unchecked"})
28+
public class WeakContextStoreBenchmark {
29+
private static final int NUM_STORES = 3;
30+
31+
@Param({"false", "true"})
32+
public boolean global;
33+
34+
private BiFunction[] stores;
35+
36+
private Map<String, String> kvs;
37+
38+
private AtomicInteger threadNumber;
39+
40+
@Setup(Level.Trial)
41+
public void setup() {
42+
WeakMaps.registerAsSupplier();
43+
44+
stores = new BiFunction[NUM_STORES];
45+
for (int i = 0; i < NUM_STORES; i++) {
46+
stores[i] = global ? globalWeakStorePutIfAbsent(i) : weakMapStorePutIfAbsent(i);
47+
}
48+
49+
kvs = new HashMap<>();
50+
for (int i = 0; i < 1_000; i++) {
51+
kvs.put("key_" + i, "value_" + i);
52+
}
53+
54+
threadNumber = new AtomicInteger();
55+
}
56+
57+
@Benchmark
58+
@Fork(value = 1)
59+
@Threads(value = 1)
60+
public void singleThreaded(Blackhole blackhole) {
61+
test(blackhole);
62+
}
63+
64+
@Benchmark
65+
@Fork(value = 1)
66+
@Threads(value = 10)
67+
public void multiThreaded10(Blackhole blackhole) {
68+
test(blackhole);
69+
}
70+
71+
@Benchmark
72+
@Fork(value = 1)
73+
@Threads(value = 100)
74+
public void multiThreaded100(Blackhole blackhole) {
75+
test(blackhole);
76+
}
77+
78+
private void test(Blackhole blackhole) {
79+
// assign each benchmark thread a single store to operate on during the benchmark
80+
// the number of concurrent requests to a store goes up as more threads are added
81+
BiFunction store = stores[threadNumber.getAndIncrement() % NUM_STORES];
82+
for (Map.Entry e : kvs.entrySet()) {
83+
blackhole.consume(store.apply(e.getKey(), e.getValue()));
84+
}
85+
}
86+
87+
private static BiFunction globalWeakStorePutIfAbsent(int storeId) {
88+
return (k, v) -> GlobalWeakContextStore.weakPutIfAbsent(k, storeId, v);
89+
}
90+
91+
private static BiFunction weakMapStorePutIfAbsent(int storeId) {
92+
return new WeakMapContextStore<>(storeId)::putIfAbsent;
93+
}
94+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package datadog.trace.bootstrap.weakmap;
2+
3+
import java.util.function.Function;
4+
5+
public interface WeakMap<K, V> {
6+
int size();
7+
8+
boolean containsKey(K target);
9+
10+
V get(K key);
11+
12+
void put(K key, V value);
13+
14+
void putIfAbsent(K key, V value);
15+
16+
V computeIfAbsent(K key, Function<? super K, ? extends V> supplier);
17+
18+
V remove(K key);
19+
20+
abstract class Supplier {
21+
private static volatile Supplier SUPPLIER;
22+
23+
protected abstract <K, V> WeakMap<K, V> get();
24+
25+
public static <K, V> WeakMap<K, V> newWeakMap() {
26+
return SUPPLIER.get();
27+
}
28+
29+
public static synchronized void registerIfAbsent(Supplier supplier) {
30+
if (null == SUPPLIER) {
31+
SUPPLIER = supplier;
32+
}
33+
}
34+
}
35+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package datadog.trace.bootstrap.weakmap;
2+
3+
import datadog.trace.bootstrap.ContextStore;
4+
5+
/**
6+
* Weak {@link ContextStore} that acts as a fall-back when field-injection isn't possible.
7+
*
8+
* <p>This class should be created lazily because it uses weak maps with background cleanup.
9+
*/
10+
public final class WeakMapContextStore<K, V> implements ContextStore<K, V> {
11+
private static final int DEFAULT_MAX_SIZE = 50_000;
12+
13+
private final int maxSize;
14+
private final WeakMap<Object, Object> map = WeakMap.Supplier.newWeakMap();
15+
16+
public WeakMapContextStore(int maxSize) {
17+
this.maxSize = maxSize;
18+
}
19+
20+
public WeakMapContextStore() {
21+
this(DEFAULT_MAX_SIZE);
22+
}
23+
24+
@Override
25+
@SuppressWarnings("unchecked")
26+
public V get(final K key) {
27+
return (V) map.get(key);
28+
}
29+
30+
@Override
31+
public void put(final K key, final V context) {
32+
if (map.size() < maxSize) {
33+
map.put(key, context);
34+
}
35+
}
36+
37+
@Override
38+
public V putIfAbsent(final K key, final V context) {
39+
V existingContext = get(key);
40+
if (null == existingContext) {
41+
// This whole part with using synchronized is only because
42+
// we want to avoid prematurely calling the factory if
43+
// someone else is doing a putIfAbsent at the same time.
44+
// There is still the possibility that there is a concurrent
45+
// call to put that will win, but that is indistinguishable
46+
// from the put happening right after the putIfAbsent.
47+
synchronized (map) {
48+
existingContext = get(key);
49+
if (null == existingContext) {
50+
existingContext = context;
51+
put(key, existingContext);
52+
}
53+
}
54+
}
55+
return existingContext;
56+
}
57+
58+
@Override
59+
public V putIfAbsent(final K key, final Factory<V> contextFactory) {
60+
return computeIfAbsent(key, contextFactory);
61+
}
62+
63+
@Override
64+
public V computeIfAbsent(K key, KeyAwareFactory<? super K, V> contextFactory) {
65+
V existingContext = get(key);
66+
if (null == existingContext) {
67+
// This whole part with using synchronized is only because
68+
// we want to avoid prematurely calling the factory if
69+
// someone else is doing a putIfAbsent at the same time.
70+
// There is still the possibility that there is a concurrent
71+
// call to put that will win, but that is indistinguishable
72+
// from the put happening right after the putIfAbsent.
73+
synchronized (map) {
74+
existingContext = get(key);
75+
if (null == existingContext) {
76+
existingContext = contextFactory.create(key);
77+
put(key, existingContext);
78+
}
79+
}
80+
}
81+
return existingContext;
82+
}
83+
84+
@Override
85+
@SuppressWarnings("unchecked")
86+
public V remove(final K key) {
87+
return (V) map.remove(key);
88+
}
89+
90+
// Package reachable for testing
91+
int size() {
92+
return map.size();
93+
}
94+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package datadog.trace.bootstrap.weakmap;
2+
3+
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap;
4+
import datadog.trace.api.Platform;
5+
import datadog.trace.util.AgentTaskScheduler;
6+
import datadog.trace.util.AgentTaskScheduler.Task;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.function.Function;
9+
10+
public class WeakMaps {
11+
private static final long CLEAN_FREQUENCY_SECONDS = 1;
12+
13+
public static <K, V> WeakMap<K, V> newWeakMap() {
14+
final WeakConcurrentMap<K, V> map = new WeakConcurrentMap<>(false, true);
15+
if (!Platform.isNativeImageBuilder()) {
16+
AgentTaskScheduler.get()
17+
.weakScheduleAtFixedRate(
18+
MapCleaningTask.INSTANCE,
19+
map,
20+
CLEAN_FREQUENCY_SECONDS,
21+
CLEAN_FREQUENCY_SECONDS,
22+
TimeUnit.SECONDS);
23+
}
24+
return new Adapter<>(map);
25+
}
26+
27+
private WeakMaps() {}
28+
29+
public static void registerAsSupplier() {
30+
WeakMap.Supplier.registerIfAbsent(
31+
new WeakMap.Supplier() {
32+
@Override
33+
protected <K, V> WeakMap<K, V> get() {
34+
return WeakMaps.newWeakMap();
35+
}
36+
});
37+
}
38+
39+
// Important to use explicit class to avoid implicit hard references to target
40+
private static class MapCleaningTask implements Task<WeakConcurrentMap<?, ?>> {
41+
static final MapCleaningTask INSTANCE = new MapCleaningTask();
42+
43+
@Override
44+
public void run(final WeakConcurrentMap<?, ?> target) {
45+
target.expungeStaleEntries();
46+
}
47+
}
48+
49+
private static class Adapter<K, V> implements WeakMap<K, V> {
50+
private final WeakConcurrentMap<K, V> map;
51+
52+
private Adapter(final WeakConcurrentMap<K, V> map) {
53+
this.map = map;
54+
}
55+
56+
@Override
57+
public int size() {
58+
return map.approximateSize();
59+
}
60+
61+
@Override
62+
public boolean containsKey(final K key) {
63+
return map.containsKey(key);
64+
}
65+
66+
@Override
67+
public V get(final K key) {
68+
return map.get(key);
69+
}
70+
71+
@Override
72+
public void put(final K key, final V value) {
73+
if (null != value) {
74+
map.put(key, value);
75+
} else {
76+
map.remove(key); // WeakConcurrentMap doesn't accept null values
77+
}
78+
}
79+
80+
@Override
81+
public void putIfAbsent(final K key, final V value) {
82+
map.putIfAbsent(key, value);
83+
}
84+
85+
@Override
86+
public V computeIfAbsent(final K key, final Function<? super K, ? extends V> supplier) {
87+
V value = map.get(key);
88+
if (null == value) {
89+
synchronized (this) {
90+
value = map.get(key);
91+
if (null == value) {
92+
value = supplier.apply(key);
93+
map.put(key, value);
94+
}
95+
}
96+
}
97+
return value;
98+
}
99+
100+
@Override
101+
public V remove(K key) {
102+
return map.remove(key);
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)