Skip to content

Commit c82d513

Browse files
committed
Add demos
1 parent 82de124 commit c82d513

File tree

12 files changed

+489
-502
lines changed

12 files changed

+489
-502
lines changed

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@
8080
<dependency>
8181
<groupId>io.vertx</groupId>
8282
<artifactId>vertx-web-client</artifactId>
83-
<scope>test</scope>
83+
<!-- TODO: Change back to test scope once SSE demo can work without this or move SSE demo -->
84+
<!--<scope>test</scope>-->
8485
</dependency>
8586
<dependency>
8687
<groupId>junit</groupId>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package examples;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.Promise;
6+
import io.vertx.core.Vertx;
7+
import io.vertx.core.eventbus.Message;
8+
import io.vertx.core.http.HttpHeaders;
9+
import io.vertx.core.http.HttpServerRequest;
10+
import io.vertx.core.http.HttpServerResponse;
11+
import io.vertx.core.http.WebSocketBase;
12+
import io.vertx.core.json.JsonObject;
13+
import io.vertx.ext.bridge.PermittedOptions;
14+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
15+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
16+
17+
public class HttpBridgeExample extends AbstractVerticle {
18+
19+
public static void main(String[] args) {
20+
Vertx.vertx().deployVerticle(new HttpBridgeExample());
21+
}
22+
23+
@Override
24+
public void start(Promise<Void> start) {
25+
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
26+
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
27+
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
28+
29+
JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
30+
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
31+
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
32+
.addInboundPermitted(new PermittedOptions().setAddress("ping"))
33+
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
34+
.addOutboundPermitted(new PermittedOptions().setAddress("hello"))
35+
.addOutboundPermitted(new PermittedOptions().setAddress("ping"));
36+
37+
Handler<HttpServerRequest> bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(vertx, options, null);
38+
39+
vertx
40+
.createHttpServer()
41+
.requestHandler(req -> {
42+
if ("/".equals(req.path())) {
43+
req.response()
44+
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
45+
.sendFile("http.html");
46+
} else if ("/jsonrpc".equals(req.path())){
47+
bridge.handle(req);
48+
} else {
49+
req.response().setStatusCode(404).end("Not Found");
50+
}
51+
})
52+
.listen(8080)
53+
.onFailure(start::fail)
54+
.onSuccess(server -> {
55+
System.out.println("Server listening at http://localhost:8080");
56+
start.complete();
57+
});
58+
59+
}
60+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package examples;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.Promise;
6+
import io.vertx.core.Vertx;
7+
import io.vertx.core.eventbus.Message;
8+
import io.vertx.core.http.HttpHeaders;
9+
import io.vertx.core.http.HttpServerRequest;
10+
import io.vertx.core.http.HttpServerResponse;
11+
import io.vertx.core.json.JsonObject;
12+
import io.vertx.ext.bridge.PermittedOptions;
13+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
14+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
15+
import io.vertx.ext.web.client.WebClient;
16+
import io.vertx.ext.web.codec.BodyCodec;
17+
18+
import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.request;
19+
20+
public class HttpSSEBridgeExample extends AbstractVerticle {
21+
22+
public static void main(String[] args) {
23+
Vertx.vertx().deployVerticle(new HttpSSEBridgeExample());
24+
}
25+
26+
@Override
27+
public void start(Promise<Void> start) {
28+
// just to have some messages flowing around
29+
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
30+
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
31+
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
32+
33+
// once we fix the interface we can avoid the casts
34+
Handler<HttpServerRequest> bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(
35+
vertx,
36+
new JsonRPCBridgeOptions()
37+
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
38+
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
39+
.addInboundPermitted(new PermittedOptions().setAddress("test"))
40+
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
41+
.addOutboundPermitted(new PermittedOptions().setAddress("test"))
42+
.addOutboundPermitted(new PermittedOptions().setAddress("ping")),
43+
null
44+
);
45+
46+
WebClient client = WebClient.create(vertx);
47+
48+
vertx
49+
.createHttpServer()
50+
.requestHandler(req -> {
51+
// this is where any http request will land
52+
// serve the base HTML application
53+
if ("/".equals(req.path())) {
54+
req.response()
55+
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
56+
.sendFile("sse.html");
57+
} else if ("/jsonrpc".equals(req.path())){
58+
bridge.handle(req);
59+
} else if ("/jsonrpc-sse".equals(req.path())) {
60+
HttpServerResponse resp = req.response().setChunked(true).putHeader("Content-Type", "text/event-stream");
61+
request(
62+
"register",
63+
(int) (Math.random() * 100_000),
64+
new JsonObject().put("address", "ping"),
65+
buffer -> client
66+
.post(8080, "localhost", "/jsonrpc")
67+
.as(BodyCodec.pipe(resp))
68+
.sendBuffer(buffer)
69+
);
70+
} else {
71+
req.response().setStatusCode(404).end("Not Found");
72+
}
73+
})
74+
.listen(8080)
75+
.onFailure(start::fail)
76+
.onSuccess(server -> {
77+
System.out.println("Server listening at http://localhost:8080");
78+
start.complete();
79+
});
80+
}
81+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package examples;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.Promise;
6+
import io.vertx.core.Vertx;
7+
import io.vertx.core.eventbus.Message;
8+
import io.vertx.core.http.HttpHeaders;
9+
import io.vertx.core.http.WebSocketBase;
10+
import io.vertx.core.json.JsonObject;
11+
import io.vertx.ext.bridge.PermittedOptions;
12+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
13+
import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
14+
15+
public class WebsocketBridgeExample extends AbstractVerticle {
16+
17+
public static void main(String[] args) {
18+
Vertx.vertx().deployVerticle(new WebsocketBridgeExample());
19+
}
20+
21+
@Override
22+
public void start(Promise<Void> start) {
23+
vertx.eventBus().consumer("hello", (Message<JsonObject> msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
24+
vertx.eventBus().consumer("echo", (Message<JsonObject> msg) -> msg.reply(msg.body()));
25+
vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
26+
27+
JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
28+
.addInboundPermitted(new PermittedOptions().setAddress("hello"))
29+
.addInboundPermitted(new PermittedOptions().setAddress("echo"))
30+
.addInboundPermitted(new PermittedOptions().setAddress("ping"))
31+
.addOutboundPermitted(new PermittedOptions().setAddress("echo"))
32+
.addOutboundPermitted(new PermittedOptions().setAddress("hello"))
33+
.addOutboundPermitted(new PermittedOptions().setAddress("ping"))
34+
// if set to false, then websockets messages are received on frontend as binary frames
35+
.setWebsocketsTextAsFrame(true);
36+
37+
Handler<WebSocketBase> bridge = JsonRPCStreamEventBusBridge.webSocketHandler(vertx, options, null);
38+
39+
vertx
40+
.createHttpServer()
41+
.requestHandler(req -> {
42+
// this is where any http request will land
43+
if ("/jsonrpc".equals(req.path())) {
44+
// we switch from HTTP to WebSocket
45+
req.toWebSocket()
46+
.onFailure(err -> {
47+
err.printStackTrace();
48+
req.response().setStatusCode(500).end(err.getMessage());
49+
})
50+
.onSuccess(bridge::handle);
51+
} else {
52+
// serve the base HTML application
53+
if ("/".equals(req.path())) {
54+
req.response()
55+
.putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
56+
.sendFile("ws.html");
57+
} else {
58+
// 404 all the rest
59+
req.response().setStatusCode(404).end("Not Found");
60+
}
61+
}
62+
})
63+
.listen(8080)
64+
.onFailure(start::fail)
65+
.onSuccess(server -> {
66+
System.out.println("Server listening at http://localhost:8080");
67+
start.complete();
68+
});
69+
}
70+
}

src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,14 @@ static Handler<NetSocket> netSocketHandler(Vertx vertx, JsonRPCBridgeOptions opt
5858
}
5959

6060
static Handler<WebSocketBase> webSocketHandler(Vertx vertx) {
61-
return webSocketHandler(vertx, null, null, false);
61+
return webSocketHandler(vertx, null, null);
6262
}
6363

6464
static Handler<WebSocketBase> webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) {
65-
return webSocketHandler(vertx, options, null, false);
65+
return webSocketHandler(vertx, options, null);
6666
}
6767
static Handler<WebSocketBase> webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler<BridgeEvent<WebSocketBase>> eventHandler) {
68-
return new WebsocketJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler, false);
69-
}
70-
71-
static Handler<WebSocketBase> webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler<BridgeEvent<WebSocketBase>> eventHandler, boolean useText) {
72-
return new WebsocketJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler, useText);
68+
return new WebsocketJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler);
7369
}
7470

7571
static Handler<HttpServerRequest> httpSocketHandler(Vertx vertx) {

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ public void handle(HttpServerRequest socket) {
6565
Consumer<Buffer> writer;
6666
if (method.equals("register")) {
6767
response.setChunked(true);
68-
writer = response::write;
68+
writer = body -> {
69+
JsonObject object = body.toJsonObject();
70+
response.write("event: " + object.getJsonObject("result").getString("address") + "\n");
71+
response.write("data: " + object.encode() + "\n\n");
72+
};
6973
} else {
7074
writer = response::end;
7175
}

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
import java.util.function.Consumer;
1818

1919
public class WebsocketJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl<WebSocketBase> {
20-
boolean useText;
2120

22-
public WebsocketJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler<BridgeEvent<WebSocketBase>> bridgeEventHandler, boolean useText) {
21+
public WebsocketJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler<BridgeEvent<WebSocketBase>> bridgeEventHandler) {
2322
super(vertx, options, bridgeEventHandler);
24-
this.useText = useText;
2523
}
2624

2725
@Override
@@ -35,7 +33,7 @@ public void handle(WebSocketBase socket) {
3533
final Map<String, Message<JsonObject>> replies = new ConcurrentHashMap<>();
3634

3735
Consumer<Buffer> consumer;
38-
if (useText) {
36+
if (options.getWebsocketsTextAsFrame()) {
3937
consumer = buffer -> socket.writeTextMessage(buffer.toString());
4038
} else {
4139
consumer = socket::writeBinaryMessage;

0 commit comments

Comments
 (0)