Skip to content
This repository was archived by the owner on Dec 31, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/io/nats/streaming/NatsStreaming.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class NatsStreaming {
static final String ERR_SUB_REQ_TIMEOUT = PFX + "subscribe request timeout";
static final String ERR_UNSUB_REQ_TIMEOUT = PFX + "unsubscribe request timeout";
static final String ERR_CONNECTION_CLOSED = PFX + "connection closed";
static final String ERR_PUB_TIMEOUT = PFX + "publish timeout";
static final String ERR_TIMEOUT = PFX + "publish ack timeout";
static final String ERR_BAD_ACK = PFX + "malformed ack";
static final String ERR_BAD_SUBSCRIPTION = PFX + "invalid subscription";
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/nats/streaming/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ public class Options {
private final Duration ackTimeout;
private final String discoverPrefix;
private final int maxPubAcksInFlight;
private final Duration pubTimeout; // can be null, which means wait forever

private Options(Builder builder) {
this.natsUrl = builder.natsUrl;
this.natsConn = builder.natsConn;
this.connectTimeout = builder.connectTimeout;
this.ackTimeout = builder.ackTimeout;
this.pubTimeout = builder.pubTimeout;
this.discoverPrefix = builder.discoverPrefix;
this.maxPubAcksInFlight = builder.maxPubAcksInFlight;
}
Expand All @@ -38,6 +40,11 @@ Duration getConnectTimeout() {
return connectTimeout;
}

Duration getPubTimeout() {
return pubTimeout;
}

// can be null, which means wait forever
Duration getAckTimeout() {
return ackTimeout;
}
Expand All @@ -57,6 +64,7 @@ public static final class Builder implements Serializable {
private transient io.nats.client.Connection natsConn; // A Connection is not Serializable
private Duration connectTimeout = Duration.ofSeconds(NatsStreaming.DEFAULT_CONNECT_WAIT);
private Duration ackTimeout = Duration.ofMillis(SubscriptionImpl.DEFAULT_ACK_WAIT);
private Duration pubTimeout = SubscriptionImpl.DEFAULT_PUB_WAIT > 0 ? Duration.ofMillis(SubscriptionImpl.DEFAULT_PUB_WAIT) : null;
private String discoverPrefix = NatsStreaming.DEFAULT_DISCOVER_PREFIX;
private int maxPubAcksInFlight = NatsStreaming.DEFAULT_MAX_PUB_ACKS_IN_FLIGHT;

Expand All @@ -72,10 +80,18 @@ public Builder(Options template) {
this.natsConn = template.natsConn;
this.connectTimeout = template.connectTimeout;
this.ackTimeout = template.ackTimeout;
this.pubTimeout = template.pubTimeout;
this.discoverPrefix = template.discoverPrefix;
this.maxPubAcksInFlight = template.maxPubAcksInFlight;
}

// set specific publish timeout (time to block waiting for inflight slot)
// set to null to wait forever (default)
public Builder pubWait(Duration pubTimeout) {
this.pubTimeout = pubTimeout;
return this;
}

public Builder pubAckWait(Duration ackTimeout) {
this.ackTimeout = ackTimeout;
return this;
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/nats/streaming/StreamingConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public interface StreamingConnection extends AutoCloseable {
* @throws IOException if the publish operation is not successful
* @throws InterruptedException if the calling thread is interrupted before the call completes
* @throws IllegalStateException if the connection is closed
* @throws TimeoutException if the publish could not be completed after blocking for opts.getPubTimeout()
*/
void publish(String subject, byte[] data) throws IOException, InterruptedException;
void publish(String subject, byte[] data) throws IOException, InterruptedException, TimeoutException;

/**
* Publishes the payload specified by {@code data} to the subject specified by {@code subject}
Expand All @@ -39,10 +40,11 @@ public interface StreamingConnection extends AutoCloseable {
* @throws IOException if an I/O exception is encountered
* @throws InterruptedException if the calling thread is interrupted before the call completes
* @throws IllegalStateException if the connection is closed
* @throws TimeoutException if the publish could not be completed after blocking for opts.getPubTimeout()
* @see AckHandler
*/
String publish(String subject, byte[] data, AckHandler ah)
throws IOException, InterruptedException;
throws IOException, InterruptedException, TimeoutException;

/**
* Creates a {@link Subscription} with interest in a given subject, assigns the callback, and
Expand Down
29 changes: 28 additions & 1 deletion src/main/java/io/nats/streaming/StreamingConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* options. A client uses it to create a connection to the STAN streaming data system.
*/
public class StreamingConnectionFactory {
private Duration pubTimeout = null; // null pub timeout means forever
private Duration ackTimeout = Duration.ofMillis(SubscriptionImpl.DEFAULT_ACK_WAIT);
private Duration connectTimeout = Duration.ofSeconds(NatsStreaming
.DEFAULT_CONNECT_WAIT);
Expand Down Expand Up @@ -49,7 +50,33 @@ public StreamingConnection createConnection() throws IOException, InterruptedExc
Options options() {
return new Options.Builder().connectWait(connectTimeout).pubAckWait(ackTimeout)
.discoverPrefix(discoverPrefix).maxPubAcksInFlight(maxPubAcksInFlight)
.natsConn(natsConn).natsUrl(natsUrl).build();
.natsConn(natsConn).natsUrl(natsUrl).pubWait(pubTimeout).build();
}

/**
* Returns the pub timeout.
*
* @return the pubPubWait
*/
public Duration getPubTimeout() { return pubTimeout; }

/**
* Sets the pub timeout duration.
*
* @param pubTimeout the pubWait to set. pass null for forever.
*/
public void setPubTimeout(Duration pubTimeout) {
this.pubTimeout = pubTimeout;
}

/**
* Sets the pub timeout in the specified time unit.
*
* @param pubTimeout the pubWait to set
* @param unit the time unit to set
*/
public void setPubTimeout(long pubTimeout, TimeUnit unit) {
this.pubTimeout = Duration.ofMillis(unit.toMillis(pubTimeout));
}

/**
Expand Down
36 changes: 30 additions & 6 deletions src/main/java/io/nats/streaming/StreamingConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ public void close() throws IOException, InterruptedException {
}
}

// we won't get the acks back now we've unsubscribed, so clear the acks pending channel
// this also unblocks any threads that are trying to publish and are blocked on this channel
// they will fail to publish (due to the nats connection having been torn down) and error out
pubAckChan.clear();

if (getHbSubscription() != null) {
try {
getHbSubscription().unsubscribe();
Expand Down Expand Up @@ -283,7 +288,7 @@ BlockingQueue<String> createErrorChannel() {

// Publish will publish to the cluster and wait for an ACK.
@Override
public void publish(String subject, byte[] data) throws IOException, InterruptedException {
public void publish(String subject, byte[] data) throws IOException, InterruptedException, TimeoutException {
final BlockingQueue<String> ch = createErrorChannel();
publish(subject, data, null, ch);
String err;
Expand All @@ -303,25 +308,28 @@ public void publish(String subject, byte[] data) throws IOException, Interrupted
*/
@Override
public String publish(String subject, byte[] data, AckHandler ah) throws IOException,
InterruptedException {
InterruptedException, TimeoutException {
return publish(subject, data, ah, null);
}

private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue<String> ch)
throws IOException, InterruptedException {
throws IOException, InterruptedException, TimeoutException {
String subj;
String ackSubject;
Duration ackTimeout;
Duration pubTimeout;
BlockingQueue<PubAck> pac;
final AckClosure a;
final PubMsg pe;
String guid;
byte[] bytes;
io.nats.client.Connection nc;

a = createAckClosure(ah, ch);
this.lock();
try {
if (getNatsConnection() == null) {
nc = getNatsConnection();
if (nc == null) {
throw new IllegalStateException(NatsStreaming.ERR_CONNECTION_CLOSED);
}

Expand All @@ -340,22 +348,38 @@ private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue
// snapshot
ackSubject = this.ackSubject;
ackTimeout = opts.getAckTimeout();
pubTimeout = opts.getPubTimeout();
pac = pubAckChan;
} finally {
this.unlock();
}

// Use the buffered channel to control the number of outstanding acks.
try {
pac.put(PubAck.getDefaultInstance());
if (pubTimeout != null) {
if (!pac.offer(PubAck.getDefaultInstance(),pubTimeout.toMillis(),TimeUnit.MILLISECONDS)) {
// could not publish, too many in flight, escalate back to caller
this.lock();
try {
// although we didn't make an entry into the pub ack channel, we did make one into the map,
// clean that up before throwing
pubAckMap.remove(guid);
} finally {
this.unlock();
}
throw new TimeoutException(NatsStreaming.ERR_PUB_TIMEOUT);
}
} else {
pac.put(PubAck.getDefaultInstance());
}
} catch (InterruptedException e) {
// TODO: Reevaluate this.
// Eat this because you can't really do anything with it
}

try {
nc.publish(subj, ackSubject, bytes, true);
} catch (IOException e) {
} catch (Exception e) {
removeAck(guid);
throw (e);
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/streaming/SubscriptionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

class SubscriptionImpl implements Subscription {

static final long DEFAULT_PUB_WAIT = -1; // -1 means block forever until can publish
static final long DEFAULT_ACK_WAIT = 30 * 1000;
static final int DEFAULT_MAX_IN_FLIGHT = 1024;

Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/nats/streaming/ITConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,9 @@ public void testNoDuplicatesOnSubscriberStart() throws Exception {
pubLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.err.println("publish timed out");
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
System.err.println("publish interrupted");
Thread.currentThread().interrupt();
Expand Down