diff --git a/src/main/java/io/nats/streaming/NatsStreaming.java b/src/main/java/io/nats/streaming/NatsStreaming.java index 4611419..edd74f6 100644 --- a/src/main/java/io/nats/streaming/NatsStreaming.java +++ b/src/main/java/io/nats/streaming/NatsStreaming.java @@ -20,6 +20,7 @@ public final class NatsStreaming { static final String ERR_SUB_REQ_TIMEOUT = PFX + "subscribe request timeout"; static final String ERR_UNSUB_REQ_TIMEOUT = PFX + "unsubscribe request timeout"; static final String ERR_CONNECTION_CLOSED = PFX + "connection closed"; + static final String ERR_PUB_TIMEOUT = PFX + "publish timeout"; static final String ERR_TIMEOUT = PFX + "publish ack timeout"; static final String ERR_BAD_ACK = PFX + "malformed ack"; static final String ERR_BAD_SUBSCRIPTION = PFX + "invalid subscription"; diff --git a/src/main/java/io/nats/streaming/Options.java b/src/main/java/io/nats/streaming/Options.java index b568ded..bff1581 100644 --- a/src/main/java/io/nats/streaming/Options.java +++ b/src/main/java/io/nats/streaming/Options.java @@ -16,12 +16,14 @@ public class Options { private final Duration ackTimeout; private final String discoverPrefix; private final int maxPubAcksInFlight; + private final Duration pubTimeout; // can be null, which means wait forever private Options(Builder builder) { this.natsUrl = builder.natsUrl; this.natsConn = builder.natsConn; this.connectTimeout = builder.connectTimeout; this.ackTimeout = builder.ackTimeout; + this.pubTimeout = builder.pubTimeout; this.discoverPrefix = builder.discoverPrefix; this.maxPubAcksInFlight = builder.maxPubAcksInFlight; } @@ -38,6 +40,11 @@ Duration getConnectTimeout() { return connectTimeout; } + Duration getPubTimeout() { + return pubTimeout; + } + + // can be null, which means wait forever Duration getAckTimeout() { return ackTimeout; } @@ -57,6 +64,7 @@ public static final class Builder implements Serializable { private transient io.nats.client.Connection natsConn; // A Connection is not Serializable private Duration connectTimeout = Duration.ofSeconds(NatsStreaming.DEFAULT_CONNECT_WAIT); private Duration ackTimeout = Duration.ofMillis(SubscriptionImpl.DEFAULT_ACK_WAIT); + private Duration pubTimeout = SubscriptionImpl.DEFAULT_PUB_WAIT > 0 ? Duration.ofMillis(SubscriptionImpl.DEFAULT_PUB_WAIT) : null; private String discoverPrefix = NatsStreaming.DEFAULT_DISCOVER_PREFIX; private int maxPubAcksInFlight = NatsStreaming.DEFAULT_MAX_PUB_ACKS_IN_FLIGHT; @@ -72,10 +80,18 @@ public Builder(Options template) { this.natsConn = template.natsConn; this.connectTimeout = template.connectTimeout; this.ackTimeout = template.ackTimeout; + this.pubTimeout = template.pubTimeout; this.discoverPrefix = template.discoverPrefix; this.maxPubAcksInFlight = template.maxPubAcksInFlight; } + // set specific publish timeout (time to block waiting for inflight slot) + // set to null to wait forever (default) + public Builder pubWait(Duration pubTimeout) { + this.pubTimeout = pubTimeout; + return this; + } + public Builder pubAckWait(Duration ackTimeout) { this.ackTimeout = ackTimeout; return this; diff --git a/src/main/java/io/nats/streaming/StreamingConnection.java b/src/main/java/io/nats/streaming/StreamingConnection.java index 39c8e47..3f33f1f 100644 --- a/src/main/java/io/nats/streaming/StreamingConnection.java +++ b/src/main/java/io/nats/streaming/StreamingConnection.java @@ -23,8 +23,9 @@ public interface StreamingConnection extends AutoCloseable { * @throws IOException if the publish operation is not successful * @throws InterruptedException if the calling thread is interrupted before the call completes * @throws IllegalStateException if the connection is closed + * @throws TimeoutException if the publish could not be completed after blocking for opts.getPubTimeout() */ - void publish(String subject, byte[] data) throws IOException, InterruptedException; + void publish(String subject, byte[] data) throws IOException, InterruptedException, TimeoutException; /** * Publishes the payload specified by {@code data} to the subject specified by {@code subject} @@ -39,10 +40,11 @@ public interface StreamingConnection extends AutoCloseable { * @throws IOException if an I/O exception is encountered * @throws InterruptedException if the calling thread is interrupted before the call completes * @throws IllegalStateException if the connection is closed + * @throws TimeoutException if the publish could not be completed after blocking for opts.getPubTimeout() * @see AckHandler */ String publish(String subject, byte[] data, AckHandler ah) - throws IOException, InterruptedException; + throws IOException, InterruptedException, TimeoutException; /** * Creates a {@link Subscription} with interest in a given subject, assigns the callback, and diff --git a/src/main/java/io/nats/streaming/StreamingConnectionFactory.java b/src/main/java/io/nats/streaming/StreamingConnectionFactory.java index 601d98f..df22590 100644 --- a/src/main/java/io/nats/streaming/StreamingConnectionFactory.java +++ b/src/main/java/io/nats/streaming/StreamingConnectionFactory.java @@ -16,6 +16,7 @@ * options. A client uses it to create a connection to the STAN streaming data system. */ public class StreamingConnectionFactory { + private Duration pubTimeout = null; // null pub timeout means forever private Duration ackTimeout = Duration.ofMillis(SubscriptionImpl.DEFAULT_ACK_WAIT); private Duration connectTimeout = Duration.ofSeconds(NatsStreaming .DEFAULT_CONNECT_WAIT); @@ -49,7 +50,33 @@ public StreamingConnection createConnection() throws IOException, InterruptedExc Options options() { return new Options.Builder().connectWait(connectTimeout).pubAckWait(ackTimeout) .discoverPrefix(discoverPrefix).maxPubAcksInFlight(maxPubAcksInFlight) - .natsConn(natsConn).natsUrl(natsUrl).build(); + .natsConn(natsConn).natsUrl(natsUrl).pubWait(pubTimeout).build(); + } + + /** + * Returns the pub timeout. + * + * @return the pubPubWait + */ + public Duration getPubTimeout() { return pubTimeout; } + + /** + * Sets the pub timeout duration. + * + * @param pubTimeout the pubWait to set. pass null for forever. + */ + public void setPubTimeout(Duration pubTimeout) { + this.pubTimeout = pubTimeout; + } + + /** + * Sets the pub timeout in the specified time unit. + * + * @param pubTimeout the pubWait to set + * @param unit the time unit to set + */ + public void setPubTimeout(long pubTimeout, TimeUnit unit) { + this.pubTimeout = Duration.ofMillis(unit.toMillis(pubTimeout)); } /** diff --git a/src/main/java/io/nats/streaming/StreamingConnectionImpl.java b/src/main/java/io/nats/streaming/StreamingConnectionImpl.java index 1e9a575..56416e0 100644 --- a/src/main/java/io/nats/streaming/StreamingConnectionImpl.java +++ b/src/main/java/io/nats/streaming/StreamingConnectionImpl.java @@ -215,6 +215,11 @@ public void close() throws IOException, InterruptedException { } } + // we won't get the acks back now we've unsubscribed, so clear the acks pending channel + // this also unblocks any threads that are trying to publish and are blocked on this channel + // they will fail to publish (due to the nats connection having been torn down) and error out + pubAckChan.clear(); + if (getHbSubscription() != null) { try { getHbSubscription().unsubscribe(); @@ -283,7 +288,7 @@ BlockingQueue createErrorChannel() { // Publish will publish to the cluster and wait for an ACK. @Override - public void publish(String subject, byte[] data) throws IOException, InterruptedException { + public void publish(String subject, byte[] data) throws IOException, InterruptedException, TimeoutException { final BlockingQueue ch = createErrorChannel(); publish(subject, data, null, ch); String err; @@ -303,25 +308,28 @@ public void publish(String subject, byte[] data) throws IOException, Interrupted */ @Override public String publish(String subject, byte[] data, AckHandler ah) throws IOException, - InterruptedException { + InterruptedException, TimeoutException { return publish(subject, data, ah, null); } private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue ch) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { String subj; String ackSubject; Duration ackTimeout; + Duration pubTimeout; BlockingQueue pac; final AckClosure a; final PubMsg pe; String guid; byte[] bytes; + io.nats.client.Connection nc; a = createAckClosure(ah, ch); this.lock(); try { - if (getNatsConnection() == null) { + nc = getNatsConnection(); + if (nc == null) { throw new IllegalStateException(NatsStreaming.ERR_CONNECTION_CLOSED); } @@ -340,6 +348,7 @@ private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue // snapshot ackSubject = this.ackSubject; ackTimeout = opts.getAckTimeout(); + pubTimeout = opts.getPubTimeout(); pac = pubAckChan; } finally { this.unlock(); @@ -347,7 +356,22 @@ private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue // Use the buffered channel to control the number of outstanding acks. try { - pac.put(PubAck.getDefaultInstance()); + if (pubTimeout != null) { + if (!pac.offer(PubAck.getDefaultInstance(),pubTimeout.toMillis(),TimeUnit.MILLISECONDS)) { + // could not publish, too many in flight, escalate back to caller + this.lock(); + try { + // although we didn't make an entry into the pub ack channel, we did make one into the map, + // clean that up before throwing + pubAckMap.remove(guid); + } finally { + this.unlock(); + } + throw new TimeoutException(NatsStreaming.ERR_PUB_TIMEOUT); + } + } else { + pac.put(PubAck.getDefaultInstance()); + } } catch (InterruptedException e) { // TODO: Reevaluate this. // Eat this because you can't really do anything with it @@ -355,7 +379,7 @@ private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue try { nc.publish(subj, ackSubject, bytes, true); - } catch (IOException e) { + } catch (Exception e) { removeAck(guid); throw (e); } diff --git a/src/main/java/io/nats/streaming/SubscriptionImpl.java b/src/main/java/io/nats/streaming/SubscriptionImpl.java index 5945485..2a06fdb 100644 --- a/src/main/java/io/nats/streaming/SubscriptionImpl.java +++ b/src/main/java/io/nats/streaming/SubscriptionImpl.java @@ -20,6 +20,7 @@ class SubscriptionImpl implements Subscription { + static final long DEFAULT_PUB_WAIT = -1; // -1 means block forever until can publish static final long DEFAULT_ACK_WAIT = 30 * 1000; static final int DEFAULT_MAX_IN_FLIGHT = 1024; diff --git a/src/test/java/io/nats/streaming/ITConnectionTest.java b/src/test/java/io/nats/streaming/ITConnectionTest.java index fb3a897..c37352e 100644 --- a/src/test/java/io/nats/streaming/ITConnectionTest.java +++ b/src/test/java/io/nats/streaming/ITConnectionTest.java @@ -1827,6 +1827,9 @@ public void testNoDuplicatesOnSubscriberStart() throws Exception { pubLatch.countDown(); } catch (IOException e) { e.printStackTrace(); + } catch (TimeoutException e) { + System.err.println("publish timed out"); + Thread.currentThread().interrupt(); } catch (InterruptedException e) { System.err.println("publish interrupted"); Thread.currentThread().interrupt();