Skip to content

Commit 353e99a

Browse files
committed
Rework ZeroMqMessageProducerTests for XPUB
* Use an `XPUB` socket to receive subscriptions before publishing. This makes the test more robust and less blocked for that `Thread.sleep()`
1 parent 7072cf1 commit 353e99a

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ void testMessageProducerForPair() {
9494
}
9595

9696
@Test
97-
void testMessageProducerForPubSubReceiveRaw() throws InterruptedException {
97+
void testMessageProducerForPubSubReceiveRaw() {
9898
String socketAddress = "inproc://messageProducer.test";
99-
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.PUB);
99+
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.XPUB);
100100
socket.bind(socketAddress);
101+
socket.setReceiveTimeOut(10_000);
101102

102103
FluxMessageChannel outputChannel = new FluxMessageChannel();
103104

@@ -126,17 +127,15 @@ void testMessageProducerForPubSubReceiveRaw() throws InterruptedException {
126127
messageProducer.afterPropertiesSet();
127128
messageProducer.start();
128129

129-
// Give it some time to connect and subscribe
130-
Thread.sleep(2000);
130+
assertThat(socket.recv()).isNotNull();
131131

132132
ZMsg msg = ZMsg.newStringMsg("test");
133133
msg.wrap(new ZFrame("testTopic"));
134134
msg.send(socket);
135135

136136
messageProducer.subscribeToTopics("other");
137137

138-
// Give it some time to connect and subscribe
139-
Thread.sleep(2000);
138+
assertThat(socket.recv()).isNotNull();
140139

141140
msg = ZMsg.newStringMsg("test");
142141
msg.wrap(new ZFrame("otherTopic"));

0 commit comments

Comments
 (0)