Skip to content

Commit 07af89b

Browse files
committed
[#2518] Test id generation with connection opened lazily
1 parent 6276906 commit 07af89b

File tree

1 file changed

+278
-0
lines changed

1 file changed

+278
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import java.util.concurrent.CompletionStage;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.hibernate.SessionFactory;
13+
import org.hibernate.boot.registry.StandardServiceRegistry;
14+
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
15+
import org.hibernate.cfg.Configuration;
16+
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
17+
import org.hibernate.reactive.stage.Stage;
18+
import org.hibernate.reactive.util.impl.CompletionStages;
19+
import org.hibernate.reactive.vertx.VertxInstance;
20+
21+
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.BeforeAll;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.TestInstance;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
27+
import io.vertx.core.AbstractVerticle;
28+
import io.vertx.core.DeploymentOptions;
29+
import io.vertx.core.Promise;
30+
import io.vertx.core.Vertx;
31+
import io.vertx.core.VertxOptions;
32+
import io.vertx.junit5.Timeout;
33+
import io.vertx.junit5.VertxExtension;
34+
import io.vertx.junit5.VertxTestContext;
35+
import jakarta.persistence.Entity;
36+
import jakarta.persistence.GeneratedValue;
37+
import jakarta.persistence.Id;
38+
import jakarta.persistence.Table;
39+
40+
import static java.util.concurrent.TimeUnit.MINUTES;
41+
import static org.assertj.core.api.Assertions.fail;
42+
import static org.hibernate.cfg.AvailableSettings.SHOW_SQL;
43+
import static org.hibernate.reactive.BaseReactiveTest.setDefaultProperties;
44+
import static org.hibernate.reactive.provider.Settings.POOL_CONNECT_TIMEOUT;
45+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
46+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
47+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
48+
49+
/**
50+
* This is a multi-threaded stress test, intentionally consuming some time
51+
* that also opens the connection lazily.
52+
* The purpose is to verify that the sequence optimizer used by Hibernate Reactive
53+
* is indeed able to generate unique IDs backed by the database sequences, while
54+
* running multiple operations in different threads and on multiple Vert.x eventloops.
55+
* This is very similar to MultithreadedIdentityGenerationTest except it models
56+
* the full operations including the insert statements, while the latter focuses
57+
* on the generated IDs to be unique; it's useful to maintain both tests as:
58+
* - ID generation needs to be unique so it's good to stress that aspect
59+
* in isolation
60+
* - insert operations are downstream events, so this allows us to test that
61+
* such downstream events are not being unintentionally duplicated/dropped,
62+
* which could actually happen when the id generator triggers unintended
63+
* threading behaviours.
64+
*
65+
* N.B. We actually had a case in which the IDs were uniquely generated but the
66+
* downstream event was being processed twice (or more) concurrently, so it's
67+
* useful to have both integration tests.
68+
*
69+
* A typical reactive application will not require multiple threads, but we
70+
* specifically want to test for the case in which the single ID source is being
71+
* shared across multiple threads and also multiple eventloops.
72+
* @see MultithreadedInsertionTest
73+
*/
74+
@ExtendWith(VertxExtension.class)
75+
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
76+
@Timeout(value = MultithreadedInsertionWithLazyConnectionTest.TIMEOUT_MINUTES, timeUnit = MINUTES)
77+
public class MultithreadedInsertionWithLazyConnectionTest {
78+
79+
/**
80+
* The number of threads should be higher than the default size of the connection pool so that
81+
* this test is also effective in detecting problems with resource starvation.
82+
*/
83+
private static final int N_THREADS = 12;
84+
private static final int ENTITIES_STORED_PER_THREAD = 2000;
85+
86+
//Should finish much sooner, but generating this amount of IDs could be slow on some CIs
87+
public static final int TIMEOUT_MINUTES = 10;
88+
89+
// Keeping this disabled because it generates a lot of queries
90+
private static final boolean LOG_SQL = false;
91+
92+
/**
93+
* If true, it will print info about the threads
94+
*/
95+
private static final boolean THREAD_PRETTY_MSG = true;
96+
97+
private static final Latch startLatch = new Latch( "start", N_THREADS );
98+
private static final Latch endLatch = new Latch( "end", N_THREADS );
99+
100+
private static Stage.SessionFactory stageSessionFactory;
101+
private static Vertx vertx;
102+
private static SessionFactory sessionFactory;
103+
104+
@BeforeAll
105+
public static void setupSessionFactory() {
106+
vertx = Vertx.vertx( getVertxOptions() );
107+
Configuration configuration = new Configuration();
108+
setDefaultProperties( configuration );
109+
configuration.addAnnotatedClass( EntityWithGeneratedId.class );
110+
configuration.setProperty( SHOW_SQL, String.valueOf( LOG_SQL ) );
111+
configuration.setProperty( POOL_CONNECT_TIMEOUT, String.valueOf( TIMEOUT_MINUTES * 60 * 1000 ) );
112+
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
113+
.applySettings( configuration.getProperties() )
114+
//Inject our custom vert.x instance:
115+
.addService( VertxInstance.class, () -> vertx );
116+
StandardServiceRegistry registry = builder.build();
117+
sessionFactory = configuration.buildSessionFactory( registry );
118+
stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class );
119+
}
120+
121+
private static VertxOptions getVertxOptions() {
122+
final VertxOptions vertxOptions = new VertxOptions();
123+
vertxOptions.setEventLoopPoolSize( N_THREADS );
124+
//We relax the blocked thread checks as we'll actually use latches to block them
125+
//intentionally for the purpose of the test; functionally this isn't required
126+
//but it's useful as self-test in the design of this, to ensure that the way
127+
//things are setup are indeed being run in multiple, separate threads.
128+
vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES );
129+
vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
130+
return vertxOptions;
131+
}
132+
133+
@AfterAll
134+
public static void closeSessionFactory() {
135+
stageSessionFactory.close();
136+
}
137+
138+
@Test
139+
public void testIdentityGenerator(VertxTestContext context) {
140+
final DeploymentOptions deploymentOptions = new DeploymentOptions();
141+
deploymentOptions.setInstances( N_THREADS );
142+
143+
vertx
144+
.deployVerticle( InsertEntitiesVerticle::new, deploymentOptions )
145+
.onSuccess( res -> {
146+
endLatch.waitForEveryone();
147+
context.completeNow();
148+
} )
149+
.onFailure( context::failNow )
150+
.eventually( () -> vertx.close() );
151+
}
152+
153+
private static class InsertEntitiesVerticle extends AbstractVerticle {
154+
155+
int sequentialOperation = 0;
156+
157+
public InsertEntitiesVerticle() {
158+
}
159+
160+
@Override
161+
public void start(Promise<Void> startPromise) {
162+
startLatch.reached();
163+
startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism
164+
final String initialThreadName = Thread.currentThread().getName();
165+
final Stage.Session session = stageSessionFactory.createSession();
166+
storeMultipleEntities( session )
167+
.handle( CompletionStages::handle )
168+
.thenCompose( handler -> session
169+
.close()
170+
.thenCompose( handler::getResultAsCompletionStage )
171+
)
172+
.whenComplete( (o, throwable) -> {
173+
endLatch.reached();
174+
if ( throwable != null ) {
175+
prettyOut( throwable.getMessage() );
176+
startPromise.fail( throwable );
177+
}
178+
else {
179+
if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) {
180+
prettyOut( "Thread switch detected. Expecting " + initialThreadName + ", actual " + Thread.currentThread().getName() );
181+
startPromise.fail( "Thread switch detected!" );
182+
}
183+
else {
184+
startPromise.complete();
185+
}
186+
}
187+
} );
188+
}
189+
190+
private CompletionStage<Void> storeMultipleEntities(Stage.Session s) {
191+
return loop( 0, ENTITIES_STORED_PER_THREAD, index -> storeEntity( s ) );
192+
}
193+
194+
private CompletionStage<Void> storeEntity(Stage.Session s) {
195+
final Thread beforeOperationThread = Thread.currentThread();
196+
final int localVerticleOperationSequence = sequentialOperation++;
197+
final EntityWithGeneratedId entity = new EntityWithGeneratedId();
198+
entity.name = beforeOperationThread + "__" + localVerticleOperationSequence;
199+
200+
return s
201+
.withTransaction( t -> s.persist( entity ) )
202+
.thenCompose( v -> beforeOperationThread != Thread.currentThread()
203+
? failedFuture( new IllegalStateException( "Detected an unexpected switch of carrier threads!" ) )
204+
: voidFuture() );
205+
}
206+
207+
@Override
208+
public void stop() {
209+
prettyOut( "Verticle stopped " + super.toString() );
210+
}
211+
}
212+
213+
/**
214+
* Trivial entity using default id generation
215+
*/
216+
@Entity
217+
@Table(name = "Entity")
218+
private static class EntityWithGeneratedId {
219+
@Id
220+
@GeneratedValue
221+
Long id;
222+
223+
String name;
224+
225+
public EntityWithGeneratedId() {
226+
}
227+
}
228+
229+
/**
230+
* Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design
231+
*/
232+
private static final class Latch {
233+
private final String label;
234+
private final CountDownLatch countDownLatch;
235+
236+
public Latch(String label, int membersCount) {
237+
this.label = label;
238+
this.countDownLatch = new CountDownLatch( membersCount );
239+
}
240+
241+
public void reached() {
242+
final long count = countDownLatch.getCount();
243+
countDownLatch.countDown();
244+
prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) );
245+
}
246+
247+
public void waitForEveryone() {
248+
try {
249+
boolean reachedZero = countDownLatch.await( TIMEOUT_MINUTES, MINUTES );
250+
if ( reachedZero ) {
251+
prettyOut( "Everyone has now breached '" + label + "'" );
252+
}
253+
else {
254+
fail( "Time out reached" );
255+
}
256+
}
257+
catch ( InterruptedException e ) {
258+
fail( e );
259+
}
260+
}
261+
}
262+
263+
private static void prettyOut(final String message) {
264+
if ( THREAD_PRETTY_MSG ) {
265+
final String threadName = Thread.currentThread().getName();
266+
final long l = System.currentTimeMillis();
267+
final long seconds = ( l / 1000 ) - initialSecond;
268+
//We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision
269+
//as it's not very relevant to see exactly how long each stage took (it's actually distracting)
270+
//but it's more useful to group things coarsely when some lock or timeout introduces a significant
271+
//divide between some operations (when a starvation or timeout happens it takes some seconds).
272+
System.out.println( seconds + " - " + threadName + ": " + message );
273+
}
274+
}
275+
276+
private static final long initialSecond = ( System.currentTimeMillis() / 1000 );
277+
278+
}

0 commit comments

Comments
 (0)