Skip to content

Commit 00d47b8

Browse files
committed
Simplify handleSSE
1 parent 22c4663 commit 00d47b8

File tree

4 files changed

+50
-77
lines changed

4 files changed

+50
-77
lines changed

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@
8080
<dependency>
8181
<groupId>io.vertx</groupId>
8282
<artifactId>vertx-web-client</artifactId>
83-
<!-- TODO: Change back to test scope once SSE demo can work without this or move SSE demo -->
84-
<!--<scope>test</scope>-->
83+
<scope>test</scope>
8584
</dependency>
8685
<dependency>
8786
<groupId>junit</groupId>

src/main/java/examples/HttpSSEBridgeExample.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
import io.vertx.core.Vertx;
66
import io.vertx.core.eventbus.Message;
77
import io.vertx.core.http.HttpHeaders;
8+
import io.vertx.core.http.HttpServerResponse;
89
import io.vertx.core.json.JsonObject;
910
import io.vertx.ext.bridge.PermittedOptions;
1011
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
1112
import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
1213

14+
import java.util.function.Consumer;
15+
1316
public class HttpSSEBridgeExample extends AbstractVerticle {
1417

1518
public static void main(String[] args) {
@@ -33,7 +36,7 @@ public void start(Promise<Void> start) {
3336
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
3437
.addOutboundPermitted(new PermittedOptions().setAddress("test"))
3538
.addOutboundPermitted(new PermittedOptions().setAddress("ping")),
36-
null
39+
event -> event.complete(true)
3740
);
3841

3942
vertx
@@ -48,12 +51,7 @@ public void start(Promise<Void> start) {
4851
} else if ("/jsonrpc".equals(req.path())) {
4952
bridge.handle(req);
5053
} else if ("/jsonrpc-sse".equals(req.path())) {
51-
JsonObject message = new JsonObject()
52-
.put("jsonrpc", "2.0")
53-
.put("method", "register")
54-
.put("id", (int) (Math.random() * 100_000))
55-
.put("params", new JsonObject().put("address", "ping"));
56-
bridge.handleSSE(req.response(), message);
54+
bridge.handleSSE(req, (int) (Math.random() * 100_000), new JsonObject().put("address", "ping"));
5755
} else {
5856
req.response().setStatusCode(404).end("Not Found");
5957
}

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
import io.vertx.core.Handler;
44
import io.vertx.core.Vertx;
5-
import io.vertx.core.buffer.Buffer;
65
import io.vertx.core.eventbus.Message;
76
import io.vertx.core.eventbus.MessageConsumer;
87
import io.vertx.core.http.HttpHeaders;
98
import io.vertx.core.http.HttpServerRequest;
109
import io.vertx.core.http.HttpServerResponse;
11-
import io.vertx.core.json.Json;
1210
import io.vertx.core.json.JsonObject;
1311
import io.vertx.ext.bridge.BridgeEventType;
1412
import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
@@ -35,9 +33,6 @@ public void handle(HttpServerRequest socket) {
3533
() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket),
3634
// on success
3735
() -> {
38-
// TODO: make these maps persistent across requests otherwise replies won't work because
39-
// http client cannot reply again in the same request after receiving a response and has
40-
// to make a new request.
4136
final Map<String, MessageConsumer<?>> registry = new ConcurrentHashMap<>();
4237

4338
socket.exceptionHandler(t -> {
@@ -68,10 +63,7 @@ public void handle(HttpServerRequest socket) {
6863
Consumer<JsonObject> writer;
6964
if (method.equals("register")) {
7065
response.setChunked(true);
71-
writer = payload -> {
72-
response.write("event: " + payload.getJsonObject("result").getString("address") + "\n");
73-
response.write("data: " + payload.encode() + "\n\n");
74-
};
66+
writer = payload -> response.write(payload.encode());
7567
} else {
7668
writer = payload -> response.end(payload.encode());
7769
}
@@ -82,42 +74,44 @@ public void handle(HttpServerRequest socket) {
8274
() -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end());
8375
}
8476

85-
// TODO: discuss implications of accepting response here. bridge events may not be emitted.
86-
// but if accepting request cannot use handler as the request is usually empty and handler is
87-
// not invoked until data has been read. also same thing for other cases
88-
public void handleSSE(HttpServerResponse socket, JsonObject msg) {
89-
final Map<String, MessageConsumer<?>> registry = new ConcurrentHashMap<>();
90-
91-
socket.exceptionHandler(t -> {
92-
log.error(t.getMessage(), t);
93-
registry.values().forEach(MessageConsumer::unregister);
94-
registry.clear();
95-
});
96-
if (this.isInvalid(msg)) {
97-
return;
98-
}
99-
100-
HttpServerResponse response = socket
101-
.setChunked(true)
102-
.putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream")
103-
.endHandler(handler -> {
104-
registry.values().forEach(MessageConsumer::unregister);
105-
registry.clear();
106-
});
107-
108-
final String method = msg.getString("method");
109-
if (!method.equalsIgnoreCase("register")) {
110-
log.error("Invalid method for SSE!");
111-
return;
112-
}
113-
114-
final Object id = msg.getValue("id");
115-
Consumer<JsonObject> writer = payload -> {
116-
// TODO: Should we use id or address for event name?
117-
response.write("event: " + payload.getJsonObject("result").getString("address") + "\n");
118-
response.write("data: " + payload.encode() + "\n\n");
119-
};
120-
register(writer, id, msg, registry, replies);
77+
// TODO: Discuss. Currently we are only adding such methods because SSE doesn't have a body, maybe we could
78+
// instead mandate some query params in the request to signal SSE. but bodyHandler is not invoked
79+
// in that case so how to handle the request. endHandler or check query params first before applying
80+
// bodyHandler ?
81+
public void handleSSE(HttpServerRequest socket, Object id, JsonObject msg) {
82+
checkCallHook(
83+
// process the new socket according to the event handler
84+
() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket),
85+
() -> {
86+
final Map<String, MessageConsumer<?>> registry = new ConcurrentHashMap<>();
87+
88+
socket.exceptionHandler(t -> {
89+
log.error(t.getMessage(), t);
90+
registry.values().forEach(MessageConsumer::unregister);
91+
registry.clear();
92+
});
93+
94+
HttpServerResponse response = socket.response().setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE,
95+
"text/event-stream").endHandler(handler -> {
96+
checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket));
97+
registry.values().forEach(MessageConsumer::unregister);
98+
registry.clear();
99+
});
100+
101+
Consumer<JsonObject> writer = payload -> {
102+
JsonObject result = payload.getJsonObject("result");
103+
if (result != null) {
104+
String address = result.getString("address");
105+
if (address != null) {
106+
response.write("event: " + address + "\n");
107+
response.write("data: " + payload.encode() + "\n\n");
108+
}
109+
}
110+
};
111+
register(writer, id, msg, registry, replies);
112+
},
113+
() -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end()
114+
);
121115
}
122116

123117

src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
package io.vertx.ext.eventbus.bridge.tcp;
22

33
import io.vertx.core.AbstractVerticle;
4-
import io.vertx.core.Handler;
54
import io.vertx.core.Promise;
65
import io.vertx.core.Vertx;
76
import io.vertx.core.eventbus.Message;
87
import io.vertx.core.http.HttpHeaders;
9-
import io.vertx.core.http.HttpServerRequest;
10-
import io.vertx.core.http.HttpServerResponse;
11-
import io.vertx.core.http.HttpServerResponse;
128
import io.vertx.core.json.JsonObject;
139
import io.vertx.ext.bridge.PermittedOptions;
14-
import io.vertx.ext.web.client.WebClient;
15-
import io.vertx.ext.web.codec.BodyCodec;
16-
17-
import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.request;
10+
import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
1811

1912
public class InteropWebSocketServer extends AbstractVerticle {
2013

@@ -31,8 +24,7 @@ public void start(Promise<Void> start) {
3124
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
3225
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
3326

34-
// once we fix the interface we can avoid the casts
35-
Handler<HttpServerRequest> bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(
27+
HttpJsonRPCStreamEventBusBridgeImpl bridge = (HttpJsonRPCStreamEventBusBridgeImpl) JsonRPCStreamEventBusBridge.httpSocketHandler(
3628
vertx,
3729
new JsonRPCBridgeOptions()
3830
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
@@ -41,11 +33,9 @@ public void start(Promise<Void> start) {
4133
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
4234
.addOutboundPermitted(new PermittedOptions().setAddress("test"))
4335
.addOutboundPermitted(new PermittedOptions().setAddress("ping")),
44-
null
36+
null
4537
);
4638

47-
WebClient client = WebClient.create(vertx);
48-
4939
vertx
5040
.createHttpServer()
5141
.requestHandler(req -> {
@@ -58,16 +48,8 @@ public void start(Promise<Void> start) {
5848
} else if ("/jsonrpc".equals(req.path())){
5949
bridge.handle(req);
6050
} else if ("/jsonrpc-sse".equals(req.path())) {
61-
HttpServerResponse resp = req.response().setChunked(true).putHeader("Content-Type", "text/event-stream");
62-
request(
63-
"register",
64-
(int) (Math.random() * 100_000),
65-
new JsonObject().put("address", "ping"),
66-
buffer -> client
67-
.post(8080, "localhost", "/jsonrpc")
68-
.as(BodyCodec.pipe(resp))
69-
.sendBuffer(buffer)
70-
);
51+
JsonObject params = new JsonObject().put("params", new JsonObject().put("address", "ping"));
52+
bridge.handleSSE(req, (int) (Math.random() * 100_000), params);
7153
} else {
7254
req.response().setStatusCode(404).end("Not Found");
7355
}

0 commit comments

Comments
 (0)