| 
5 | 5 |  */  | 
6 | 6 | package org.hibernate.reactive;  | 
7 | 7 | 
 
  | 
 | 8 | +import java.util.ArrayList;  | 
8 | 9 | import java.util.Collection;  | 
9 | 10 | import java.util.List;  | 
10 | 11 | import java.util.Objects;  | 
11 | 12 | import java.util.Queue;  | 
12 | 13 | import java.util.concurrent.CompletableFuture;  | 
 | 14 | +import java.util.concurrent.CompletionStage;  | 
13 | 15 | import java.util.concurrent.ConcurrentLinkedQueue;  | 
14 | 16 | import java.util.concurrent.CountDownLatch;  | 
15 | 17 | import java.util.concurrent.ExecutorService;  | 
16 | 18 | import java.util.concurrent.Executors;  | 
17 |  | -import java.util.stream.IntStream;  | 
 | 19 | + | 
 | 20 | + | 
18 | 21 | 
 
  | 
19 | 22 | import org.junit.jupiter.api.Test;  | 
20 | 23 | 
 
  | 
 | 
27 | 30 | import jakarta.persistence.Id;  | 
28 | 31 | import jakarta.persistence.Table;  | 
29 | 32 | 
 
  | 
30 |  | -import static java.util.Arrays.stream;  | 
31 | 33 | import static java.util.concurrent.CompletableFuture.allOf;  | 
32 | 34 | import static java.util.concurrent.CompletableFuture.runAsync;  | 
33 |  | -import static java.util.stream.Stream.concat;  | 
34 | 35 | import static org.assertj.core.api.Assertions.assertThat;  | 
 | 36 | +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;  | 
35 | 37 | 
 
  | 
36 | 38 | public class CancelSignalTest extends BaseReactiveTest {  | 
37 | 39 | 	private static final Logger LOG = Logger.getLogger( CancelSignalTest.class );  | 
38 | 40 | 
 
  | 
 | 41 | +	private static final int EXECUTION_SIZE = 10;  | 
 | 42 | + | 
39 | 43 | 	@Override  | 
40 | 44 | 	protected Collection<Class<?>> annotatedEntities() {  | 
41 | 45 | 		return List.of( GuineaPig.class );  | 
42 | 46 | 	}  | 
43 | 47 | 
 
  | 
 | 48 | +	@Override  | 
 | 49 | +	public CompletionStage<Void> deleteEntities(Class<?>... entities) {  | 
 | 50 | +		// We don't need to delete anything  | 
 | 51 | +		return voidFuture();  | 
 | 52 | +	}  | 
 | 53 | + | 
44 | 54 | 	@Test  | 
45 | 55 | 	public void cleanupConnectionWhenCancelSignal(VertxTestContext context) {  | 
46 | 56 | 		// larger than 'sql pool size' to check entering the 'pool waiting queue'  | 
47 |  | -		int executeSize = 10;  | 
48 | 57 | 		CountDownLatch firstSessionWaiter = new CountDownLatch( 1 );  | 
49 | 58 | 		Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>();  | 
50 | 59 | 
 
  | 
51 |  | -		ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize );  | 
52 |  | -		// Create some jobs that are going to be cancelled asynchronously  | 
53 |  | -		CompletableFuture[] withSessionFutures = IntStream  | 
54 |  | -				.range( 0, executeSize )  | 
55 |  | -				.mapToObj( i -> runAsync(  | 
56 |  | -						() -> {  | 
57 |  | -							CountDownLatch countDownLatch = new CountDownLatch( 1 );  | 
58 |  | -							Cancellable cancellable = getMutinySessionFactory()  | 
59 |  | -									.withSession( s -> {  | 
60 |  | -										LOG.debug( "start withSession: " + i );  | 
61 |  | -										sleep( 100 );  | 
62 |  | -										firstSessionWaiter.countDown();  | 
63 |  | -										return s.find( GuineaPig.class, 1 );  | 
64 |  | -									} )  | 
65 |  | -									.onTermination().invoke( () -> {  | 
66 |  | -										countDownLatch.countDown();  | 
67 |  | -										LOG.debug( "future " + i + " terminated" );  | 
68 |  | -									} )  | 
69 |  | -									.subscribe().with( item -> LOG.debug( "end withSession: "  + i  ) );  | 
70 |  | -							cancellableQueue.add( cancellable );  | 
71 |  | -							await( countDownLatch );  | 
72 |  | -						},  | 
73 |  | -						withSessionExecutor  | 
74 |  | -				) )  | 
75 |  | -				.toArray( CompletableFuture[]::new );  | 
76 |  | - | 
77 |  | -		// Create jobs that are going to cancel the previous ones  | 
78 |  | -		ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize );  | 
79 |  | -		CompletableFuture[] cancelFutures = IntStream  | 
80 |  | -				.range( 0, executeSize )  | 
81 |  | -				.mapToObj( i -> runAsync(  | 
82 |  | -						() -> {  | 
83 |  | -							await( firstSessionWaiter );  | 
84 |  | -							cancellableQueue.poll().cancel();  | 
85 |  | -							sleep( 500 );  | 
86 |  | -						},  | 
87 |  | -						cancelExecutor  | 
88 |  | -				) )  | 
89 |  | -				.toArray( CompletableFuture[]::new );  | 
90 |  | - | 
91 |  | -		CompletableFuture<Void> allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) )  | 
92 |  | -						.toArray( CompletableFuture[]::new )  | 
93 |  | -		);  | 
 | 60 | +		final List<CompletableFuture<?>> allFutures = new ArrayList<>();  | 
 | 61 | + | 
 | 62 | +		ExecutorService withSessionExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );  | 
 | 63 | +		for ( int j = 0; j < EXECUTION_SIZE; j++ ) {  | 
 | 64 | +			final int i = j;  | 
 | 65 | +			allFutures.add( runAsync( () -> {  | 
 | 66 | +						  CountDownLatch countDownLatch = new CountDownLatch( 1 );  | 
 | 67 | +						  Cancellable cancellable = getMutinySessionFactory()  | 
 | 68 | +								  .withSession( s -> {  | 
 | 69 | +									  LOG.info( "start withSession: " + i );  | 
 | 70 | +									  sleep( 100 );  | 
 | 71 | +									  firstSessionWaiter.countDown();  | 
 | 72 | +									  return s.find( GuineaPig.class, 1 );  | 
 | 73 | +								  } )  | 
 | 74 | +								  .onCancellation().invoke( () -> {  | 
 | 75 | +									  LOG.info( "future " + i + " cancelled" );  | 
 | 76 | +									  countDownLatch.countDown();  | 
 | 77 | +								  } )  | 
 | 78 | +								  .subscribe()  | 
 | 79 | +								  // We cancelled the job, it shouldn't really finish  | 
 | 80 | +								  .with( item -> LOG.info( "end withSession: "  + i  ) );  | 
 | 81 | +						  cancellableQueue.add( cancellable );  | 
 | 82 | +						  await( countDownLatch );  | 
 | 83 | +					  },  | 
 | 84 | +					  withSessionExecutor  | 
 | 85 | +			) );  | 
 | 86 | +		}  | 
94 | 87 | 
 
  | 
95 |  | -		// Test that there shouldn't be any pending process  | 
96 |  | -		test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) );  | 
 | 88 | +		ExecutorService cancelExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );  | 
 | 89 | +		for ( int i = 0; i < EXECUTION_SIZE; i++ ) {  | 
 | 90 | +			allFutures.add( runAsync( () -> {  | 
 | 91 | +						  await( firstSessionWaiter );  | 
 | 92 | +						  cancellableQueue.poll().cancel();  | 
 | 93 | +						  sleep( 500 );  | 
 | 94 | +					  },  | 
 | 95 | +					  cancelExecutor  | 
 | 96 | +			) );  | 
 | 97 | +		}  | 
 | 98 | + | 
 | 99 | +		test(  | 
 | 100 | +				context, allOf( allFutures.toArray( new CompletableFuture<?>[0] ) )  | 
 | 101 | +						.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) )  | 
 | 102 | +		);  | 
97 | 103 | 	}  | 
98 | 104 | 
 
  | 
99 | 105 | 	private static double sqlPendingMetric() {  | 
 | 
0 commit comments