@@ -51,37 +51,39 @@ private AsyncPublisher(Publisher<T> original, Executor executor) {
5151
5252 @ Override
5353 public void subscribe (Subscriber <? super T > s ) {
54- original . subscribe ( new AsyncSubscriber < T >( requireNonNull (s ), executor ) );
54+ AsyncSubscriber . wrapAndSubscribe ( original , requireNonNull (s ), executor );
5555 }
5656
5757 private static final class AsyncSubscriber <T > implements Subscriber <T > {
5858 private final BlockingQueue <Object > signalQueue = new LinkedBlockingQueue <Object >();
5959
60- private AsyncSubscriber (final Subscriber <? super T > original , final Executor executor ) {
60+ static <T > void wrapAndSubscribe (final Publisher <T > publisher ,
61+ final Subscriber <? super T > targetSubscriber , final Executor executor ) {
62+ final AsyncSubscriber <T > asyncSubscriber = new AsyncSubscriber <T >();
6163 try {
6264 executor .execute (new Runnable () {
6365 @ Override
6466 public void run () {
6567 for (; ; ) {
6668 try {
67- final Object signal = signalQueue .take ();
69+ final Object signal = asyncSubscriber . signalQueue .take ();
6870 if (signal instanceof Cancelled ) {
6971 return ;
7072 } else if (signal instanceof TerminalSignal ) {
7173 Thread .sleep (TERMINAL_DELAY_MS );
7274 TerminalSignal terminalSignal = (TerminalSignal ) signal ;
7375 if (terminalSignal .cause == null ) {
74- original .onComplete ();
76+ targetSubscriber .onComplete ();
7577 } else {
76- original .onError (terminalSignal .cause );
78+ targetSubscriber .onError (terminalSignal .cause );
7779 }
7880 return ;
7981 } else if (signal instanceof OnSubscribeSignal ) {
80- original .onSubscribe (((OnSubscribeSignal ) signal ).subscription );
82+ targetSubscriber .onSubscribe (((OnSubscribeSignal ) signal ).subscription );
8183 } else {
8284 @ SuppressWarnings ("unchecked" )
8385 final T onNextSignal = ((OnNextSignal <T >) signal ).onNext ;
84- original .onNext (onNextSignal );
86+ targetSubscriber .onNext (onNextSignal );
8587 }
8688 } catch (InterruptedException ex ) {
8789 throw new RuntimeException (ex );
@@ -90,7 +92,7 @@ public void run() {
9092 }
9193 });
9294 } catch (Throwable cause ) {
93- original .onSubscribe (new Subscription () {
95+ targetSubscriber .onSubscribe (new Subscription () {
9496 @ Override
9597 public void request (long n ) {
9698 }
@@ -99,8 +101,12 @@ public void request(long n) {
99101 public void cancel () {
100102 }
101103 });
102- original .onError (new IllegalStateException ("Executor rejected" , cause ));
104+ targetSubscriber .onError (new IllegalStateException ("Executor rejected" , cause ));
105+ // Publisher rejected the target subscriber and terminated it, don't continue to subscribe to avoid
106+ // duplicate termination.
107+ return ;
103108 }
109+ publisher .subscribe (asyncSubscriber );
104110 }
105111
106112 @ Override
0 commit comments