Skip to content

Commit af47fa7

Browse files
committed
fix demos
1 parent 1c75ffc commit af47fa7

File tree

7 files changed

+52
-43
lines changed

7 files changed

+52
-43
lines changed

src/main/java/examples/HttpSSEBridgeExample.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
package examples;
22

33
import io.vertx.core.AbstractVerticle;
4-
import io.vertx.core.Handler;
54
import io.vertx.core.Promise;
65
import io.vertx.core.Vertx;
76
import io.vertx.core.eventbus.Message;
87
import io.vertx.core.http.HttpHeaders;
9-
import io.vertx.core.http.HttpServerRequest;
10-
import io.vertx.core.http.HttpServerResponse;
118
import io.vertx.core.json.JsonObject;
129
import io.vertx.ext.bridge.PermittedOptions;
1310
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
14-
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
15-
import io.vertx.ext.web.client.WebClient;
16-
import io.vertx.ext.web.codec.BodyCodec;
17-
18-
import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.request;
11+
import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
1912

2013
public class HttpSSEBridgeExample extends AbstractVerticle {
2114

@@ -30,8 +23,8 @@ public void start(Promise<Void> start) {
3023
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
3124
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
3225

33-
// once we fix the interface we can avoid the casts
34-
Handler<HttpServerRequest> bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(
26+
// use explicit class because SSE method is not on the interface currently
27+
HttpJsonRPCStreamEventBusBridgeImpl bridge = new HttpJsonRPCStreamEventBusBridgeImpl(
3528
vertx,
3629
new JsonRPCBridgeOptions()
3730
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
@@ -43,8 +36,6 @@ public void start(Promise<Void> start) {
4336
null
4437
);
4538

46-
WebClient client = WebClient.create(vertx);
47-
4839
vertx
4940
.createHttpServer()
5041
.requestHandler(req -> {
@@ -54,19 +45,15 @@ public void start(Promise<Void> start) {
5445
req.response()
5546
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
5647
.sendFile("sse.html");
57-
} else if ("/jsonrpc".equals(req.path())){
48+
} else if ("/jsonrpc".equals(req.path())) {
5849
bridge.handle(req);
59-
} else if ("/jsonrpc-sse".equals(req.path())) {
60-
HttpServerResponse resp = req.response().setChunked(true).putHeader("Content-Type", "text/event-stream");
61-
request(
62-
"register",
63-
(int) (Math.random() * 100_000),
64-
new JsonObject().put("address", "ping"),
65-
buffer -> client
66-
.post(8080, "localhost", "/jsonrpc")
67-
.as(BodyCodec.pipe(resp))
68-
.sendBuffer(buffer)
69-
);
50+
} else if ("/jsonrpc-sse".equals(req.path())) {
51+
JsonObject message = new JsonObject()
52+
.put("jsonrpc", "2.0")
53+
.put("method", "register")
54+
.put("id", (int) (Math.random() * 100_000))
55+
.put("params", new JsonObject().put("address", "ping"));
56+
bridge.handleSSE(req.response(), message);
7057
} else {
7158
req.response().setStatusCode(404).end("Not Found");
7259
}

src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,6 @@
3232
*
3333
* @author <a href="mailto:plopes@redhat.com">Paulo Lopes</a>
3434
*/
35-
36-
// TODO: "extends Handler<NetSocket>" was a bad idea because it locks the implementation to TCP sockets. Instead we
37-
// should have explicit methods that either handle a NetSocket or a WebSocketBase:
38-
// handle(NetSocket socket) handle(WebSocketBase socket)
39-
// or: return a handler, e.g.:
40-
// Handler<WebSocketBase> webSocketHandler();
41-
// Handler<NetSocket> netSocketHandler();
42-
43-
// How about generifying this interface as in JsonRPCStreamEventBusBridge<T> extends Handler<T> ?
44-
// similarly create a base class for the impl and concrete impls for each protocol.
4535
@VertxGen
4636
public interface JsonRPCStreamEventBusBridge {
4737

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public void handle(HttpServerRequest socket) {
4646

4747
// TODO: body may be an array (batching)
4848
final JsonObject msg = new JsonObject(buffer);
49-
System.out.println(msg);
5049

5150
if (this.isInvalid(msg)) {
5251
return;
@@ -79,4 +78,44 @@ public void handle(HttpServerRequest socket) {
7978
// on failure
8079
() -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end());
8180
}
81+
82+
// TODO: discuss implications of accepting response here. bridge events may not be emitted.
83+
// but if accepting request cannot use handler as the request is usually empty and handler is
84+
// not invoked until data has been read. also same thing for other cases
85+
public void handleSSE(HttpServerResponse socket, JsonObject msg) {
86+
final Map<String, MessageConsumer<?>> registry = new ConcurrentHashMap<>();
87+
88+
socket.exceptionHandler(t -> {
89+
log.error(t.getMessage(), t);
90+
registry.values().forEach(MessageConsumer::unregister);
91+
registry.clear();
92+
});
93+
if (this.isInvalid(msg)) {
94+
return;
95+
}
96+
97+
HttpServerResponse response = socket
98+
.setChunked(true)
99+
.putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream")
100+
.endHandler(handler -> {
101+
registry.values().forEach(MessageConsumer::unregister);
102+
registry.clear();
103+
});
104+
105+
final String method = msg.getString("method");
106+
if (!method.equalsIgnoreCase("register")) {
107+
log.error("Invalid method for SSE!");
108+
return;
109+
}
110+
111+
final Object id = msg.getValue("id");
112+
Consumer<JsonObject> writer = payload -> {
113+
// TODO: Should we use id or address for event name?
114+
response.write("event: " + payload.getJsonObject("result").getString("address") + "\n");
115+
response.write("data: " + payload.encode() + "\n\n");
116+
};
117+
register(writer, id, msg, registry, replies);
118+
}
119+
120+
82121
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
});
1313
}
1414
</script>
15-
<title>Websockets Bridge Example</title>
15+
<title>HTTP Bridge Example</title>
1616
</head>
1717
<body>
1818
<input type="text" id="payload" value='{"jsonrpc":"2.0","method":"send","id":"2","params":{"address":"echo","body":"Hello World!"}}'> <input type="button" onclick="sendMsg()" value="Send message"/>

src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,11 @@
88
import io.vertx.core.http.HttpHeaders;
99
import io.vertx.core.http.HttpServerRequest;
1010
import io.vertx.core.http.HttpServerResponse;
11-
import io.vertx.core.http.WebSocketBase;
1211
import io.vertx.core.json.JsonObject;
13-
import io.vertx.core.net.NetSocket;
14-
import io.vertx.ext.bridge.BridgeOptions;
1512
import io.vertx.ext.bridge.PermittedOptions;
16-
import io.vertx.ext.eventbus.bridge.tcp.impl.JsonRPCStreamEventBusBridgeImpl;
17-
import io.vertx.ext.eventbus.bridge.tcp.impl.TCPJsonRPCStreamEventBusBridgeImpl;
1813
import io.vertx.ext.web.client.WebClient;
1914
import io.vertx.ext.web.codec.BodyCodec;
2015

21-
import java.util.Random;
22-
2316
import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.request;
2417

2518
public class InteropWebSocketServer extends AbstractVerticle {

0 commit comments

Comments
 (0)