diff --git a/.editorconfig b/.editorconfig
index fff7c5f..1cd5d67 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -7,3 +7,8 @@ indent_size = 2
trim_trailing_whitespace = true
end_of_line = lf
insert_final_newline = true
+
+[**/examples/**.java]
+# 84 looks like a odd number, however
+# it accounts for 4 spaces (class and example method indentation)
+max_line_length = 84
diff --git a/pom.xml b/pom.xml
index 8cdf9d3..c306ac7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,11 @@
io.vertxvertx-bridge-common
- ${project.version}
+
+
+
+ io.vertx
+ vertx-json-schema
@@ -73,6 +77,11 @@
test-jartest
+
+ io.vertx
+ vertx-web-client
+ test
+ junitjunit
diff --git a/src/client/nodejs/package.json b/src/client/nodejs/package.json
index 13a1a99..645ea79 100644
--- a/src/client/nodejs/package.json
+++ b/src/client/nodejs/package.json
@@ -11,4 +11,4 @@
"scripts": {
"test": "mocha ./test/index.js"
}
-}
\ No newline at end of file
+}
diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc
index 910eeb3..7a52ca9 100644
--- a/src/main/asciidoc/index.adoc
+++ b/src/main/asciidoc/index.adoc
@@ -1,7 +1,7 @@
= Vert.x-tcp-eventbus-bridge
:toc: left
-Vert.x-tcp-eventbus-bridge is a TCP bridge to Vert.x EventBus. To use this project, add the following
+Vert.x-tcp-eventbus-bridge is a bridge to Vert.x EventBus. To use this project, add the following
dependency to the _dependencies_ section of your build descriptor:
Maven (in your `pom.xml`):
@@ -26,14 +26,102 @@ The TCP EventBus bridge is built on top of TCP, meaning that any application tha
create TCP sockets can interact with a remote Vert.x instance via its event bus.
The main use case for the TCP bridge _versus_ the SockJS bridge is for applications that are more
-resource-constrained and that need to be lightweight since the whole HTTP WebSockets is replaced with plain TCP sockets.
+resource-constrained and that need to be lightweight since the whole SockJS is replaced with plain sockets.
It remains of course useful even for applications that don't have tight resource constraints:
the protocol is simple enough to efficiently provide an integration interface with non-JVM
applications.
-The protocol has been kept as simple as possible and communications use Frames both ways.
-The structure of a Frame looks like this:
+== JSON-RPC2
+
+The current bridge is designed around https://www.jsonrpc.org/specification[json-rpc2]. This allows developers to connect from any platform or programming language with support for this simple format.
+
+JSON-RPC is a stateless, light-weight remote procedure call (RPC) protocol. This means that concepts such as `client` or `server` are not defined. In fact an application communicating using JSON-RPC can act as:
+
+1. client
+2. server
+3. client-and-server
+
+=== JSON-RPC and TCP Sockets
+
+The first mode of operations for the bridge is to allow straight forward communication between two applicatitons capable of doing simple TCP socket connections.
+
+First start by creating a bridge in vert.x and allow communications to the `in` and `out` addresses:
+
+[source,$lang]
+----
+{@link examples.TCPBridgeExamples#example2}
+----
+
+You can test the application by writing to the socket directly from your terminal:
+
+[source,bash]
+----
+echo '{"jsonrpc":"2.0","method":"send","id":"1","params":{"address":"in","body":"Hello World!"}}' |\
+netcat localhost 7000
+----
+
+Or using a favourite json-rpc library of your choice.
+
+=== JSON-RPC and Web-Sockets
+
+The bridge can work exaclty the same way as with TCP, but using WebSockets instead. The use is very similar:
+
+[source,$lang]
+----
+{@link examples.WebsocketBridgeExample#example1}
+----
+
+This example may resemble more complex, but in fact it is very similar to the previous one. The major differences are that, with websockets, a http server must be created and a protocol upgrade should be put in place to upgrade from HTTP to WS. The upgrade should happen at a well known path (omitted in the example for brevity).
+
+The second difference is that we allow both text or binary WS frames (as configured in the options). This is to allow all kinds of integrations. For simple HTML integrations, text frames can be simpler to use, but nothing really forbids its use from a browser either.
+
+[source,html]
+----
+
+
+
+
+
+
+
+
+
+----
+
+=== JSON-RPC and HTTP
+
+The previous transports allow bi-directional messaging, however HTTP is built on top of the request-response style of protocols. When using the bridge in this mode, only a single request can be performed per request. It should be very common to find this pattern on json-rpc libraries. The setup would be as:
+
+[source,$lang]
+----
+{@link examples.HttpBridgeExample#example1}
+----
+
+Just like the previous example, the bridge should be used at a specific path, but this is omitted for brevity of the example.
+
+=== JSON-RPC and HTTP SSE
+
+HTTP also supports server sent events. In this mode a single request can receive a stream of responses. Due to the way SSE works, the stream is always from server to client. Our bridge also allows this by specifying the SSE mode:
+
+TODO: This is still incomplete (API done but public interface missing).
+
+== LEGACY TCP
+
+Previously, the protocol was as simple as possible and communications used frames both ways.
+The structure of a Frame looked like this:
----
<{
diff --git a/src/main/java/examples/HttpBridgeExample.java b/src/main/java/examples/HttpBridgeExample.java
new file mode 100644
index 0000000..f85e516
--- /dev/null
+++ b/src/main/java/examples/HttpBridgeExample.java
@@ -0,0 +1,77 @@
+package examples;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
+
+public class HttpBridgeExample extends AbstractVerticle {
+
+ public static void main(String[] args) {
+ Vertx.vertx().deployVerticle(new HttpBridgeExample());
+ }
+
+ @Override
+ public void start(Promise start) {
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+ vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("ping"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping"));
+
+ Handler bridge = JsonRPCStreamEventBusBridge.httpSocketHandler(vertx, options);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ if ("/".equals(req.path())) {
+ req.response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
+ .sendFile("http.html");
+ } else if ("/jsonrpc".equals(req.path())) {
+ bridge.handle(req);
+ } else {
+ req.response().setStatusCode(404).end("Not Found");
+ }
+ })
+ .listen(8080)
+ .onFailure(start::fail)
+ .onSuccess(server -> {
+ System.out.println("Server listening at http://localhost:8080");
+ start.complete();
+ });
+ }
+
+ public void example1() {
+
+ JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("in"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("out"));
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ JsonRPCStreamEventBusBridge.httpSocketHandler(
+ vertx,
+ options)
+ .handle(req);
+ })
+ .listen(8080)
+ .onSuccess(server -> {
+ // server is ready!
+ });
+ }
+}
diff --git a/src/main/java/examples/HttpSSEBridgeExample.java b/src/main/java/examples/HttpSSEBridgeExample.java
new file mode 100644
index 0000000..3bb02ac
--- /dev/null
+++ b/src/main/java/examples/HttpSSEBridgeExample.java
@@ -0,0 +1,87 @@
+package examples;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
+import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
+
+import java.util.UUID;
+
+public class HttpSSEBridgeExample extends AbstractVerticle {
+
+ public static void main(String[] args) {
+ Vertx.vertx().deployVerticle(new HttpSSEBridgeExample());
+ }
+
+ @Override
+ public void start(Promise start) {
+ // just to have some messages flowing around
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+ vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ // use explicit class because SSE method is not on the interface currently
+ HttpJsonRPCStreamEventBusBridgeImpl bridge = new HttpJsonRPCStreamEventBusBridgeImpl(
+ vertx,
+ new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping")),
+ event -> event.complete(true)
+ );
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ // this is where any http request will land
+ // serve the base HTML application
+ if ("/".equals(req.path())) {
+ req.response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
+ .sendFile("sse.html");
+ } else if ("/jsonrpc".equals(req.path())) {
+ bridge.handle(req);
+ } else if ("/jsonrpc-sse".equals(req.path())) {
+ bridge.handleSSE(req, UUID.randomUUID().toString(), new JsonObject().put("params", new JsonObject().put("address", "ping")));
+ } else {
+ req.response().setStatusCode(404).end("Not Found");
+ }
+ })
+ .listen(8080)
+ .onFailure(start::fail)
+ .onSuccess(server -> {
+ System.out.println("Server listening at http://localhost:8080");
+ start.complete();
+ });
+ }
+
+// public void example1() {
+//
+// JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
+// .addInboundPermitted(new PermittedOptions().setAddress("in"))
+// .addOutboundPermitted(new PermittedOptions().setAddress("out"));
+//
+// vertx
+// .createHttpServer()
+// .requestHandler(req -> {
+// JsonRPCStreamEventBusBridge.httpSocketHandler(
+// vertx,
+// options)
+// .handleSSE(req);
+// })
+// .listen(8080)
+// .onSuccess(server -> {
+// // server is ready!
+// });
+// }
+
+}
diff --git a/src/main/java/examples/TCPBridgeExamples.java b/src/main/java/examples/TCPBridgeExamples.java
index fc62529..7ff6221 100644
--- a/src/main/java/examples/TCPBridgeExamples.java
+++ b/src/main/java/examples/TCPBridgeExamples.java
@@ -20,10 +20,11 @@
import io.vertx.docgen.Source;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
import io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge;
/**
- *
* @author Paulo Lopes
*/
@Source
@@ -32,10 +33,10 @@ public class TCPBridgeExamples {
public void example1(Vertx vertx) {
TcpEventBusBridge bridge = TcpEventBusBridge.create(
- vertx,
- new BridgeOptions()
- .addInboundPermitted(new PermittedOptions().setAddress("in"))
- .addOutboundPermitted(new PermittedOptions().setAddress("out")));
+ vertx,
+ new BridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("in"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("out")));
bridge.listen(7000, res -> {
if (res.succeeded()) {
@@ -46,4 +47,21 @@ public void example1(Vertx vertx) {
});
}
+
+ public void example2(Vertx vertx) {
+
+ vertx
+ .createNetServer()
+ .connectHandler(
+ JsonRPCStreamEventBusBridge.netSocketHandler(
+ vertx,
+ new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("in"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("out")))
+ ).listen(7000)
+ .onSuccess(server -> {
+ // server is ready!
+ });
+
+ }
}
diff --git a/src/main/java/examples/WebsocketBridgeExample.java b/src/main/java/examples/WebsocketBridgeExample.java
new file mode 100644
index 0000000..36800f4
--- /dev/null
+++ b/src/main/java/examples/WebsocketBridgeExample.java
@@ -0,0 +1,101 @@
+package examples;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.WebSocketBase;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge;
+
+public class WebsocketBridgeExample extends AbstractVerticle {
+
+ public static void main(String[] args) {
+ Vertx.vertx().deployVerticle(new WebsocketBridgeExample());
+ }
+
+ @Override
+ public void start(Promise start) {
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+ vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("ping"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping"))
+ // if set to false, then websockets messages are received on frontend as binary frames
+ .setWebsocketsTextAsFrame(true);
+
+ Handler bridge = JsonRPCStreamEventBusBridge.webSocketHandler(vertx, options, null);
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ // this is where any http request will land
+ if ("/jsonrpc".equals(req.path())) {
+ // we switch from HTTP to WebSocket
+ req.toWebSocket()
+ .onFailure(err -> {
+ err.printStackTrace();
+ req.response().setStatusCode(500).end(err.getMessage());
+ })
+ .onSuccess(bridge::handle);
+ } else {
+ // serve the base HTML application
+ if ("/".equals(req.path())) {
+ req.response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
+ .sendFile("ws.html");
+ } else {
+ // 404 all the rest
+ req.response().setStatusCode(404).end("Not Found");
+ }
+ }
+ })
+ .listen(8080)
+ .onFailure(start::fail)
+ .onSuccess(server -> {
+ System.out.println("Server listening at http://localhost:8080");
+ start.complete();
+ });
+ }
+
+ public void example1(Vertx vertx) {
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ // this is where any http request will be handled
+
+ // perform a protocol upgrade
+ req.toWebSocket()
+ .onFailure(err -> {
+ err.printStackTrace();
+ req.response().setStatusCode(500).end(err.getMessage());
+ })
+ .onSuccess(ws -> {
+ JsonRPCBridgeOptions options = new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("in"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("out"))
+ // if set to false, then websockets messages are received
+ // on frontend as binary frames
+ .setWebsocketsTextAsFrame(true);
+
+ JsonRPCStreamEventBusBridge.webSocketHandler(vertx, options).handle(ws);
+ });
+ })
+ .listen(8080)
+ .onSuccess(server -> {
+ // server is ready!
+ });
+
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java
index 1485c22..3bedd21 100644
--- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/BridgeEvent.java
@@ -22,6 +22,7 @@
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
+import io.vertx.ext.bridge.BaseBridgeEvent;
/**
* Represents an event that occurs on the event bus bridge.
@@ -31,7 +32,7 @@
* @author Tim Fox
*/
@VertxGen
-public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent {
+public interface BridgeEvent extends BaseBridgeEvent {
/**
* Get the raw JSON message for the event. This will be null for SOCKET_CREATED or SOCKET_CLOSED events as there is
@@ -41,7 +42,10 @@ public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent {
* @return this reference, so it can be used fluently
*/
@Fluent
- BridgeEvent setRawMessage(JsonObject message);
+ BridgeEvent setRawMessage(JsonObject message);
+
+ // TODO: this will cause problems with WebSockets as they don't share any common base interface
+ // this will be a breaking change to users, as the return type is now generic, is this OK?
/**
* Get the SockJSSocket instance corresponding to the event
@@ -49,5 +53,5 @@ public interface BridgeEvent extends io.vertx.ext.bridge.BaseBridgeEvent {
* @return the SockJSSocket instance
*/
@CacheReturn
- NetSocket socket();
+ T socket();
}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java
new file mode 100644
index 0000000..f4efa81
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCBridgeOptions.java
@@ -0,0 +1,127 @@
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.codegen.annotations.DataObject;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.BridgeOptions;
+import io.vertx.ext.bridge.BridgeOptionsConverter;
+import io.vertx.ext.bridge.PermittedOptions;
+
+import java.util.List;
+
+@DataObject(generateConverter = true)
+public class JsonRPCBridgeOptions extends BridgeOptions {
+ private boolean websocketsTextAsFrame;
+
+ public JsonRPCBridgeOptions() {}
+
+ /**
+ * Creates a new instance of {@link JsonRPCBridgeOptions} by copying the content of another {@link JsonRPCBridgeOptions}
+ *
+ * @param that the {@link JsonRPCBridgeOptions} to copy.
+ */
+ public JsonRPCBridgeOptions(JsonRPCBridgeOptions that) {
+ super(that);
+ this.websocketsTextAsFrame = that.websocketsTextAsFrame;
+ }
+
+ /**
+ * Creates a new instance of {@link JsonRPCBridgeOptions} from its JSON representation.
+ * This method uses the generated converter.
+ *
+ * @param json the serialized {@link JsonRPCBridgeOptions}
+ * @see BridgeOptionsConverter
+ */
+ public JsonRPCBridgeOptions(JsonObject json) {
+ JsonRPCBridgeOptionsConverter.fromJson(json, this);
+ }
+
+ /**
+ * Serializes the current {@link JsonRPCBridgeOptions} to JSON. This method uses the generated converter.
+ *
+ * @return the serialized object
+ */
+ public JsonObject toJson() {
+ JsonObject json = new JsonObject();
+ JsonRPCBridgeOptionsConverter.toJson(this, json);
+ return json;
+ }
+
+ /**
+ * Sets whether to use text format for websockets frames for the current {@link JsonRPCBridgeOptions}.
+ *
+ * @param websocketsTextAsFrame the choice whether to use text format
+ * @return the current {@link JsonRPCBridgeOptions}.
+ */
+ public JsonRPCBridgeOptions setWebsocketsTextAsFrame(boolean websocketsTextAsFrame) {
+ this.websocketsTextAsFrame = websocketsTextAsFrame;
+ return this;
+ }
+
+ /**
+ * @return whether to use text format for websockets frames.
+ */
+ public boolean getWebsocketsTextAsFrame() {
+ return websocketsTextAsFrame;
+ }
+
+
+ /**
+ * Adds an inbound permitted option to the current {@link JsonRPCBridgeOptions}.
+ *
+ * @param permitted the inbound permitted
+ * @return the current {@link JsonRPCBridgeOptions}.
+ */
+ public JsonRPCBridgeOptions addInboundPermitted(PermittedOptions permitted) {
+ super.addInboundPermitted(permitted);
+ return this;
+ }
+
+ /**
+ * @return the list of inbound permitted options. Empty if none.
+ */
+ public List getInboundPermitteds() {
+ return super.getInboundPermitteds();
+ }
+
+ /**
+ * Sets the list of inbound permitted options.
+ *
+ * @param inboundPermitted the list to use, must not be {@link null}. This method use the direct list reference
+ * (and doesn't create a copy).
+ * @return the current {@link JsonRPCBridgeOptions}.
+ */
+ public JsonRPCBridgeOptions setInboundPermitteds(List inboundPermitted) {
+ super.setInboundPermitteds(inboundPermitted);
+ return this;
+ }
+
+ /**
+ * Adds an outbound permitted option to the current {@link JsonRPCBridgeOptions}.
+ *
+ * @param permitted the outbound permitted
+ * @return the current {@link JsonRPCBridgeOptions}.
+ */
+ public JsonRPCBridgeOptions addOutboundPermitted(PermittedOptions permitted) {
+ super.addOutboundPermitted(permitted);
+ return this;
+ }
+
+ /**
+ * @return the list of outbound permitted options. Empty if none.
+ */
+ public List getOutboundPermitteds() {
+ return super.getOutboundPermitteds();
+ }
+
+ /**
+ * Sets the list of outbound permitted options.
+ *
+ * @param outboundPermitted the list to use, must not be {@link null}. This method use the direct list reference
+ * (and doesn't create a copy).
+ * @return the current {@link JsonRPCBridgeOptions}.
+ */
+ public JsonRPCBridgeOptions setOutboundPermitteds(List outboundPermitted) {
+ super.setOutboundPermitteds(outboundPermitted);
+ return this;
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java
new file mode 100644
index 0000000..ceb6487
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridge.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.WebSocketBase;
+import io.vertx.core.net.NetSocket;
+import io.vertx.ext.bridge.BridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
+import io.vertx.ext.eventbus.bridge.tcp.impl.JsonRPCStreamEventBusBridgeImpl;
+import io.vertx.ext.eventbus.bridge.tcp.impl.TCPJsonRPCStreamEventBusBridgeImpl;
+import io.vertx.ext.eventbus.bridge.tcp.impl.WebsocketJsonRPCStreamEventBusBridgeImpl;
+
+/**
+ * JSONRPC stream EventBus bridge for Vert.x
+ *
+ * @author Paulo Lopes
+ */
+@VertxGen
+public interface JsonRPCStreamEventBusBridge {
+
+ static Handler netSocketHandler(Vertx vertx) {
+ return netSocketHandler(vertx, null, null);
+ }
+
+ static Handler netSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) {
+ return netSocketHandler(vertx, options, null);
+ }
+
+ static Handler netSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) {
+ return new TCPJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler);
+ }
+
+ static Handler webSocketHandler(Vertx vertx) {
+ return webSocketHandler(vertx, null, null);
+ }
+
+ static Handler webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) {
+ return webSocketHandler(vertx, options, null);
+ }
+ static Handler webSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) {
+ return new WebsocketJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler);
+ }
+
+ static Handler httpSocketHandler(Vertx vertx) {
+ return httpSocketHandler(vertx, null, null);
+ }
+
+ static Handler httpSocketHandler(Vertx vertx, JsonRPCBridgeOptions options) {
+ return httpSocketHandler(vertx, options, null);
+ }
+
+ static Handler httpSocketHandler(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) {
+ return new HttpJsonRPCStreamEventBusBridgeImpl(vertx, options, eventHandler);
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java
index 5abbdda..cd10836 100644
--- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java
@@ -22,6 +22,7 @@
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServerOptions;
+import io.vertx.core.net.NetSocket;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.eventbus.bridge.tcp.impl.TcpEventBusBridgeImpl;
@@ -44,7 +45,7 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options) {
static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions) {
return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,null);
}
- static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions,Handler eventHandler) {
+ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler> eventHandler) {
return new TcpEventBusBridgeImpl(vertx, options, netServerOptions,eventHandler);
}
/**
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java
index d18d11a..fafe1af 100644
--- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/BridgeEventImpl.java
@@ -20,7 +20,6 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
-import io.vertx.core.net.NetSocket;
import io.vertx.ext.bridge.BridgeEventType;
import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
@@ -28,14 +27,14 @@
* @author Tim Fox
* @author grant@iowntheinter.net
*/
-class BridgeEventImpl implements BridgeEvent {
+class BridgeEventImpl implements BridgeEvent {
private final BridgeEventType type;
private final JsonObject rawMessage;
- private final NetSocket socket;
+ private final T socket;
private final Promise promise;
- public BridgeEventImpl(BridgeEventType type, JsonObject rawMessage, NetSocket socket) {
+ public BridgeEventImpl(BridgeEventType type, JsonObject rawMessage, T socket) {
this.type = type;
this.rawMessage = rawMessage;
this.socket = socket;
@@ -58,7 +57,7 @@ public JsonObject getRawMessage() {
}
@Override
- public BridgeEvent setRawMessage(JsonObject message) {
+ public BridgeEvent setRawMessage(JsonObject message) {
if (message != rawMessage) {
rawMessage.clear().mergeIn(message);
}
@@ -71,7 +70,7 @@ public void handle(AsyncResult asyncResult) {
}
@Override
- public NetSocket socket() {
+ public T socket() {
return socket;
}
@@ -114,5 +113,4 @@ public boolean tryFail(Throwable cause) {
public boolean tryFail(String failureMessage) {
return promise.tryFail(failureMessage);
}
-
}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java
new file mode 100644
index 0000000..fe55475
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java
@@ -0,0 +1,125 @@
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.BridgeEventType;
+import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class HttpJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl {
+
+ // http client cannot reply in the same request in which it originally received
+ // a response so the replies map should be persistent across http request
+ final Map> replies = new ConcurrentHashMap<>();
+
+ public HttpJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) {
+ super(vertx, options, bridgeEventHandler);
+ }
+
+ @Override
+ public void handle(HttpServerRequest socket) {
+ checkCallHook(
+ // process the new socket according to the event handler
+ () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket),
+ // on success
+ () -> {
+ final Map> registry = new ConcurrentHashMap<>();
+
+ socket.exceptionHandler(t -> {
+ log.error(t.getMessage(), t);
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ }).handler(buffer -> {
+ // TODO: handle content type
+
+ // TODO: body may be an array (batching)
+ final JsonObject msg = new JsonObject(buffer);
+
+ if (this.isInvalid(msg)) {
+ return;
+ }
+
+ final String method = msg.getString("method");
+ final Object id = msg.getValue("id");
+ HttpServerResponse response = socket
+ .response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+ .endHandler(handler -> {
+ registry.values().forEach(MessageConsumer::unregister);
+ // normal close, trigger the event
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket));
+ registry.clear();
+ });
+ Consumer writer;
+ if (method.equals("register")) {
+ response.setChunked(true);
+ writer = payload -> response.write(payload.encode());
+ } else {
+ writer = payload -> response.end(payload.encode());
+ }
+ dispatch(writer, method, id, msg, registry, replies);
+ });
+ },
+ // on failure
+ () -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end());
+ }
+
+ // TODO: Discuss. Currently we are only adding such methods because SSE doesn't have a body, maybe we could
+ // instead mandate some query params in the request to signal SSE. but bodyHandler is not invoked
+ // in that case so how to handle the request. endHandler or check query params first before applying
+ // bodyHandler ?
+ public void handleSSE(HttpServerRequest socket, Object id, JsonObject msg) {
+ checkCallHook(
+ // process the new socket according to the event handler
+ () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket),
+ () -> {
+ final Map> registry = new ConcurrentHashMap<>();
+
+ socket.exceptionHandler(t -> {
+ log.error(t.getMessage(), t);
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ });
+
+ HttpServerResponse response = socket.response()
+ .setChunked(true)
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream")
+ .endHandler(handler -> {
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket));
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ })
+ .exceptionHandler(err -> {
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_ERROR, null, socket));
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ });
+
+ Consumer writer = payload -> {
+ JsonObject result = payload.getJsonObject("result");
+ if (result != null) {
+ String address = result.getString("address");
+ if (address != null) {
+ response.write("event: " + address + "\n");
+ response.write("data: " + payload.encode() + "\n\n");
+ }
+ }
+ };
+ register(writer, id, msg, registry, replies);
+ },
+ () -> socket.response().setStatusCode(500).setStatusMessage("Internal Server Error").end()
+ );
+ }
+
+
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java
new file mode 100644
index 0000000..2660672
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.*;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.BridgeEventType;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper;
+import io.vertx.json.schema.Draft;
+import io.vertx.json.schema.JsonSchema;
+import io.vertx.json.schema.JsonSchemaOptions;
+import io.vertx.json.schema.OutputUnit;
+import io.vertx.json.schema.Validator;
+
+import java.util.*;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Abstract TCP EventBus bridge. Handles all common socket operations but has no knowledge on the payload.
+ *
+ * @author Paulo Lopes
+ */
+public abstract class JsonRPCStreamEventBusBridgeImpl implements Handler {
+
+ protected static final Logger log = LoggerFactory.getLogger(JsonRPCStreamEventBusBridgeImpl.class);
+ protected static final JsonObject EMPTY = new JsonObject(Collections.emptyMap());
+
+ protected final Vertx vertx;
+
+ protected final EventBus eb;
+
+ protected final Map compiledREs = new HashMap<>();
+ protected final JsonRPCBridgeOptions options;
+ protected final Handler> bridgeEventHandler;
+
+ private final Validator requestValidator;
+
+ public JsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> eventHandler) {
+ this.vertx = vertx;
+ this.eb = vertx.eventBus();
+ this.options = options != null ? options : new JsonRPCBridgeOptions();
+ this.bridgeEventHandler = eventHandler;
+ this.requestValidator = getRequestValidator();
+ }
+
+ private Validator getRequestValidator() {
+ String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json";
+ Buffer buffer = vertx.fileSystem().readFileBlocking(path);
+ JsonObject jsonObject = buffer.toJsonObject();
+ return Validator.create(JsonSchema.of(jsonObject),
+ new JsonSchemaOptions().setDraft(Draft.DRAFT202012).setBaseUri("https://vertx.io"));
+ }
+
+ protected boolean isInvalid(JsonObject object) {
+ OutputUnit outputUnit = requestValidator.validate(object);
+ if (!outputUnit.getValid()) {
+ log.error("Invalid message. Error: " + outputUnit.getErrors() + " . Message: " + object);
+ return true;
+ }
+ return false;
+ }
+
+ protected void dispatch(Consumer socket, String method, Object id, JsonObject msg, Map> registry, Map> replies) {
+ switch (method) {
+ case "send":
+ checkCallHook(
+ () -> new BridgeEventImpl<>(BridgeEventType.SEND, msg, null),
+ () -> send(socket, id, msg, registry, replies),
+ () -> JsonRPCHelper.error(id, -32040, "access_denied", socket)
+ );
+ break;
+ case "publish":
+ checkCallHook(
+ () -> new BridgeEventImpl<>(BridgeEventType.SEND, msg, null),
+ () -> publish(socket, id, msg, registry, replies),
+ () -> JsonRPCHelper.error(id, -32040, "access_denied", socket)
+ );
+ break;
+ case "register":
+ checkCallHook(
+ () -> new BridgeEventImpl<>(BridgeEventType.REGISTER, msg, null),
+ () -> register(socket, id, msg, registry, replies),
+ () -> JsonRPCHelper.error(id, -32040, "access_denied", socket)
+ );
+ break;
+ case "unregister":
+ checkCallHook(
+ () -> new BridgeEventImpl<>(BridgeEventType.UNREGISTER, msg, null),
+ () -> unregister(socket, id, msg, registry, replies),
+ () -> JsonRPCHelper.error(id, -32040, "access_denied", socket)
+ );
+ break;
+ case "ping":
+ JsonRPCHelper.response(id, "pong", socket);
+ break;
+ default:
+ JsonRPCHelper.error(id, -32601, "unknown_method", socket);
+ break;
+ }
+ }
+
+ protected void unregister(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) {
+ final JsonObject params = msg.getJsonObject("params", EMPTY);
+ final String address = params.getString("address");
+
+ if (address == null) {
+ JsonRPCHelper.error(id, -32602, "invalid_parameters", socket);
+ return;
+ }
+
+ if (checkMatches(false, address)) {
+ MessageConsumer> consumer = registry.remove(address);
+ if (consumer != null) {
+ consumer.unregister();
+ if (id != null) {
+ // ack
+ JsonRPCHelper.response(id, EMPTY, socket);
+ }
+ } else {
+ JsonRPCHelper.error(id, -32044, "unknown_address", socket);
+ }
+ } else {
+ JsonRPCHelper.error(id, -32040, "access_denied", socket);
+ }
+ }
+
+ protected void register(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) {
+ final JsonObject params = msg.getJsonObject("params", EMPTY);
+ final String address = params.getString("address");
+
+ if (address == null) {
+ JsonRPCHelper.error(id, -32602, "invalid_parameters", socket);
+ return;
+ }
+
+ if (checkMatches(false, address)) {
+ registry.put(address, eb.consumer(address, res1 -> {
+ // save a reference to the message so tcp bridged messages can be replied properly
+ if (res1.replyAddress() != null) {
+ replies.put(res1.replyAddress(), res1);
+ }
+
+ final JsonObject responseHeaders = new JsonObject();
+
+ // clone the headers from / to
+ for (Map.Entry entry : res1.headers()) {
+ responseHeaders.put(entry.getKey(), entry.getValue());
+ }
+
+ JsonRPCHelper.response(
+ id,
+ new JsonObject()
+ .put("address", res1.address())
+ .put("replyAddress", res1.replyAddress())
+ .put("headers", responseHeaders)
+ .put("body", res1.body()),
+ socket);
+ }));
+ checkCallHook(
+ () -> new BridgeEventImpl<>(BridgeEventType.REGISTERED, msg, null),
+ () -> {
+ if (id != null) {
+ // ack
+ JsonRPCHelper.response(id, EMPTY, socket);
+ }
+ });
+ } else {
+ JsonRPCHelper.error(id, -32040, "access_denied", socket);
+ }
+ }
+
+ protected void publish(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) {
+ final JsonObject params = msg.getJsonObject("params", EMPTY);
+ final String address = params.getString("address");
+
+ if (address == null) {
+ JsonRPCHelper.error(id, -32602, "invalid_parameters", socket);
+ return;
+ }
+
+ if (checkMatches(true, address)) {
+ final JsonObject body = params.getJsonObject("body");
+ final DeliveryOptions deliveryOptions = parseMsgHeaders(new DeliveryOptions(), params.getJsonObject("headers"));
+
+ eb.publish(address, body, deliveryOptions);
+ if (id != null) {
+ // ack
+ JsonRPCHelper.response(id, EMPTY, socket);
+ }
+ } else {
+ JsonRPCHelper.error(id, -32040, "access_denied", socket);
+ }
+ }
+
+ protected void send(Consumer socket, Object id, JsonObject msg, Map> registry, Map> replies) {
+ final JsonObject params = msg.getJsonObject("params", EMPTY);
+ final String address = params.getString("address");
+
+ if (address == null) {
+ JsonRPCHelper.error(id, -32602, "invalid_parameters", socket);
+ return;
+ }
+
+ if (checkMatches(true, address, replies)) {
+ final Object body = params.getValue("body");
+ final DeliveryOptions deliveryOptions = parseMsgHeaders(new DeliveryOptions(), params.getJsonObject("headers"));
+
+ if (id != null) {
+ // id is not null, it is a request from TCP endpoint that will wait for a response
+ eb.request(address, body, deliveryOptions, request -> {
+ if (request.failed()) {
+ JsonRPCHelper.error(id, (ReplyException) request.cause(), socket);
+ } else {
+ final Message response = request.result();
+ final JsonObject responseHeaders = new JsonObject();
+
+ // clone the headers from / to
+ for (Map.Entry entry : response.headers()) {
+ responseHeaders.put(entry.getKey(), entry.getValue());
+ }
+
+ if (response.replyAddress() != null) {
+ replies.put(response.replyAddress(), response);
+ }
+
+ JsonRPCHelper.response(
+ id,
+ new JsonObject()
+ .put("headers", responseHeaders)
+ .put("id", response.replyAddress())
+ .put("body", response.body()),
+ socket);
+ }
+ });
+ } else {
+ // no reply address it might be a response, a failure or a request that does not need a response
+ if (replies.containsKey(address)) {
+ // address is registered, it is not a request
+ final JsonObject error = params.getJsonObject("error");
+ if (error == null) {
+ // No error block, it is a response
+ replies.get(address).reply(body, deliveryOptions);
+ } else {
+ // error block, fail the original response
+ replies.get(address).fail(error.getInteger("failureCode"), error.getString("message"));
+ }
+ } else {
+ // it is a request that does not expect a response
+ eb.send(address, body, deliveryOptions);
+ }
+ }
+ // replies are a one time off operation
+ replies.remove(address);
+ } else {
+ JsonRPCHelper.error(id, -32040, "access_denied", socket);
+ }
+ }
+
+ protected void checkCallHook(Supplier> eventSupplier) {
+ checkCallHook(eventSupplier, null, null);
+ }
+
+ protected void checkCallHook(Supplier> eventSupplier, Runnable okAction) {
+ checkCallHook(eventSupplier, okAction, null);
+ }
+
+ protected void checkCallHook(Supplier> eventSupplier, Runnable okAction, Runnable rejectAction) {
+ if (bridgeEventHandler == null) {
+ if (okAction != null) {
+ okAction.run();
+ }
+ } else {
+ BridgeEvent event = eventSupplier.get();
+ bridgeEventHandler.handle(event);
+ event.future().onComplete(res -> {
+ if (res.succeeded()) {
+ if (res.result()) {
+ if (okAction != null) {
+ okAction.run();
+ }
+ } else {
+ if (rejectAction != null) {
+ rejectAction.run();
+ } else {
+ log.debug("Bridge handler prevented: " + event.toString());
+ }
+ }
+ } else {
+ log.error("Failure in bridge event handler", res.cause());
+ }
+ });
+ }
+ }
+
+ protected boolean checkMatches(boolean inbound, String address) {
+ return checkMatches(inbound, address, null);
+ }
+
+ protected boolean checkMatches(boolean inbound, String address, Map> replies) {
+ // special case, when dealing with replies the addresses are not in the inbound/outbound list but on
+ // the replies registry
+ if (replies != null && inbound && replies.containsKey(address)) {
+ return true;
+ }
+
+ List matches = inbound ? options.getInboundPermitteds() : options.getOutboundPermitteds();
+
+ for (PermittedOptions matchHolder : matches) {
+ String matchAddress = matchHolder.getAddress();
+ String matchRegex;
+ if (matchAddress == null) {
+ matchRegex = matchHolder.getAddressRegex();
+ } else {
+ matchRegex = null;
+ }
+
+ boolean addressOK;
+ if (matchAddress == null) {
+ addressOK = matchRegex == null || regexMatches(matchRegex, address);
+ } else {
+ addressOK = matchAddress.equals(address);
+ }
+
+ if (addressOK) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ protected boolean regexMatches(String matchRegex, String address) {
+ Pattern pattern = compiledREs.get(matchRegex);
+ if (pattern == null) {
+ pattern = Pattern.compile(matchRegex);
+ compiledREs.put(matchRegex, pattern);
+ }
+ Matcher m = pattern.matcher(address);
+ return m.matches();
+ }
+
+ protected DeliveryOptions parseMsgHeaders(DeliveryOptions options, JsonObject headers) {
+ if (headers == null)
+ return options;
+
+ Iterator fnameIter = headers.fieldNames().iterator();
+ String fname;
+ while (fnameIter.hasNext()) {
+ fname = fnameIter.next();
+ if ("timeout".equals(fname)) {
+ options.setSendTimeout(headers.getLong(fname));
+ } else if ("localOnly".equals(fname)) {
+ options.setLocalOnly(headers.getBoolean(fname));
+ } else if ("codecName".equals(fname)) {
+ options.setCodecName(headers.getString(fname));
+ } else {
+ options.addHeader(fname, headers.getString(fname));
+ }
+ }
+
+ return options;
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java
new file mode 100644
index 0000000..4674c9c
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ParserHandler.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.buffer.Buffer;
+
+@FunctionalInterface
+public interface ParserHandler {
+ void handle(String contentType, Buffer body);
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java
new file mode 100644
index 0000000..bcab1a3
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/ReadableBuffer.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.buffer.Buffer;
+
+final class ReadableBuffer {
+
+ private static final int MARK_WATERMARK = 4 * 1024;
+
+ private Buffer buffer;
+ private int offset;
+ private int mark;
+
+ void append(Buffer chunk) {
+ // either the buffer is null or all read
+ if (buffer == null || Math.min(mark, offset) == buffer.length()) {
+ buffer = chunk;
+ offset = 0;
+ return;
+ }
+
+ // slice the buffer discarding the read bytes
+ if (
+ // the offset (read operations) must be further than the last checkpoint
+ offset >= mark &&
+ // there must be already read more than water mark
+ mark > MARK_WATERMARK &&
+ // and there are more bytes to read already
+ buffer.length() > mark) {
+
+ // clean up when there's too much data
+ buffer = buffer.getBuffer(mark, buffer.length());
+ offset -= mark;
+ mark = 0;
+ }
+
+ buffer.appendBuffer(chunk);
+ }
+
+ int findSTX() {
+ for (int i = offset; i < buffer.length(); i++) {
+ byte b = buffer.getByte(i);
+ switch (b) {
+ case '\r':
+ case '\n':
+ // skip new lines
+ continue;
+ case '{':
+ case '[':
+ return i;
+ default:
+ throw new IllegalStateException("Unexpected value in buffer: (int)" + ((int) b));
+ }
+ }
+
+ return -1;
+ }
+
+ int findETX(int offset) {
+ // brace start / end
+ final byte bs, be;
+ // brace count
+ int bc = 0;
+
+ switch (buffer.getByte(offset)) {
+ case '{':
+ bs = '{';
+ be = '}';
+ break;
+ case '[':
+ bs = '[';
+ be = ']';
+ break;
+ default:
+ throw new IllegalStateException("Message 1st byte isn't valid: " + buffer.getByte(offset));
+ }
+
+ for (int i = offset; i < buffer.length(); i++) {
+ byte b = buffer.getByte(i);
+ if (b == bs) {
+ bc++;
+ } else
+ if (b == be) {
+ bc--;
+ } else {
+ continue;
+ }
+ // validation
+ if (bc < 0) {
+ // unbalanced braces
+ throw new IllegalStateException("Message format is not valid: " + buffer.getString(offset, i) + "...");
+ }
+ if (bc == 0) {
+ // complete
+ return i + 1;
+ }
+ }
+
+ return -1;
+ }
+
+ Buffer readBytes(int offset, int count) {
+ Buffer bytes = null;
+ if (buffer.length() - offset >= count) {
+ bytes = buffer.getBuffer(offset, offset + count);
+ this.offset = offset + count;
+ }
+ return bytes;
+ }
+
+ int readableBytes() {
+ return buffer.length() - offset;
+ }
+
+ void mark() {
+ mark = offset;
+ }
+
+ void reset() {
+ offset = mark;
+ }
+
+
+ @Override
+ public String toString() {
+ return buffer != null ? buffer.toString() : "null";
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java
new file mode 100644
index 0000000..c8f0a85
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/StreamParser.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+
+public final class StreamParser implements Handler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamParser.class);
+
+ // the callback when a full response message has been decoded
+ private Handler handler;
+ private Handler exceptionHandler;
+
+ // a composite buffer to allow buffer concatenation as if it was
+ // a long stream
+ private final ReadableBuffer buffer = new ReadableBuffer();
+
+ public StreamParser handler(Handler handler) {
+ this.handler = handler;
+ return this;
+ }
+
+ public StreamParser exceptionHandler(Handler handler) {
+ this.exceptionHandler = handler;
+ return this;
+ }
+
+ @Override
+ public void handle(Buffer chunk) {
+ if (chunk.length() > 0) {
+ // add the chunk to the buffer
+ buffer.append(chunk);
+
+ // the minimum messages are "{}" or "[]"
+ while (buffer.readableBytes() >= 2) {
+ // setup a rollback point
+ buffer.mark();
+
+ final Buffer payload;
+
+ try {
+ // locate the message boundaries
+ final int start = buffer.findSTX();
+
+ // no start found yet
+ if (start == -1) {
+ buffer.reset();
+ break;
+ }
+
+ final int end = buffer.findETX(start);
+
+ // no end found yet
+ if (end == -1) {
+ buffer.reset();
+ break;
+ }
+
+ payload = buffer.readBytes(start, end - start);
+ } catch (IllegalStateException ise) {
+ exceptionHandler.handle(ise);
+ break;
+ }
+
+ // payload is found, deliver it to the handler
+ try {
+ handler.handle(payload);
+ } catch (RuntimeException e) {
+ // these are user exceptions, not protocol exceptions
+ // we can continue the parsing job
+ LOGGER.error("Failed to handle payload", e);
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java
new file mode 100644
index 0000000..abffbd6
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java
@@ -0,0 +1,78 @@
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.NetSocket;
+import io.vertx.ext.bridge.BridgeEventType;
+import io.vertx.ext.bridge.BridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class TCPJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl {
+
+ public TCPJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) {
+ super(vertx, options, bridgeEventHandler);
+ }
+
+ @Override
+ public void handle(NetSocket socket) {
+ checkCallHook(
+ // process the new socket according to the event handler
+ () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, socket),
+ // on success
+ () -> {
+ final Map> registry = new ConcurrentHashMap<>();
+ final Map> replies = new ConcurrentHashMap<>();
+
+ socket
+ .exceptionHandler(t -> {
+ log.error(t.getMessage(), t);
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ })
+ .endHandler(v -> {
+ registry.values().forEach(MessageConsumer::unregister);
+ // normal close, trigger the event
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket));
+ registry.clear();
+ })
+ .handler(
+ // create a protocol parser
+ new StreamParser()
+ .exceptionHandler(t -> {
+ log.error(t.getMessage(), t);
+ })
+ .handler(buffer -> {
+ // TODO: handle content type
+
+ // TODO: body may be an array (batching)
+ final JsonObject msg = new JsonObject(buffer);
+ if (this.isInvalid(msg)) {
+ return;
+ }
+ final String method = msg.getString("method");
+ final Object id = msg.getValue("id");
+
+ Consumer writer = payload -> socket.write(payload.toBuffer().appendString("\r\n"));
+
+ dispatch(
+ writer,
+ method,
+ id,
+ msg,
+ registry,
+ replies);
+ }));
+ },
+ // on failure
+ socket::close
+ );
+ }
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java
index ff2b111..7a8ab7e 100644
--- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java
@@ -59,10 +59,10 @@ public class TcpEventBusBridgeImpl implements TcpEventBusBridge {
private final Map compiledREs = new HashMap<>();
private final BridgeOptions options;
- private final Handler bridgeEventHandler;
+ private final Handler> bridgeEventHandler;
- public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler eventHandler) {
+ public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions, Handler> eventHandler) {
this.eb = vertx.eventBus();
this.options = options != null ? options : new BridgeOptions();
this.bridgeEventHandler = eventHandler;
@@ -205,7 +205,7 @@ private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map new BridgeEventImpl(BridgeEventType.REGISTERED, msg, socket), null, null);
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.REGISTERED, msg, socket), null, null);
} else {
sendErrFrame("access_denied", socket);
}
@@ -252,7 +252,7 @@ private void handler(NetSocket socket) {
final String type = msg.getString("type", "message");
final String address = msg.getString("address");
BridgeEventType eventType = parseType(type);
- checkCallHook(() -> new BridgeEventImpl(eventType, msg, socket),
+ checkCallHook(() -> new BridgeEventImpl<>(eventType, msg, socket),
() -> {
if (eventType != BridgeEventType.SOCKET_PING && address == null) {
sendErrFrame("missing_address", socket);
@@ -289,13 +289,13 @@ public Future close() {
return server.close();
}
- private void checkCallHook(Supplier eventSupplier, Runnable okAction, Runnable rejectAction) {
+ private void checkCallHook(Supplier> eventSupplier, Runnable okAction, Runnable rejectAction) {
if (bridgeEventHandler == null) {
if (okAction != null) {
okAction.run();
}
} else {
- BridgeEventImpl event = eventSupplier.get();
+ BridgeEventImpl event = eventSupplier.get();
bridgeEventHandler.handle(event);
event.future().onComplete(res -> {
if (res.succeeded()) {
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java
new file mode 100644
index 0000000..c178db0
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java
@@ -0,0 +1,101 @@
+package io.vertx.ext.eventbus.bridge.tcp.impl;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.http.WebSocketBase;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.BridgeEventType;
+import io.vertx.ext.bridge.BridgeOptions;
+import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
+import io.vertx.ext.eventbus.bridge.tcp.JsonRPCBridgeOptions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class WebsocketJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl {
+
+ public WebsocketJsonRPCStreamEventBusBridgeImpl(Vertx vertx, JsonRPCBridgeOptions options, Handler> bridgeEventHandler) {
+ super(vertx, options, bridgeEventHandler);
+ }
+
+ @Override
+ public void handle(WebSocketBase socket) {
+ checkCallHook(
+ // process the new socket according to the event handler
+ () -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CREATED, null, null),
+ // on success
+ () -> {
+ final Map> registry = new ConcurrentHashMap<>();
+ final Map> replies = new ConcurrentHashMap<>();
+
+ Consumer consumer;
+ if (options.getWebsocketsTextAsFrame()) {
+ consumer = payload -> socket.writeTextMessage(payload.encode());
+ } else {
+ consumer = payload -> socket.writeBinaryMessage(payload.toBuffer());
+ }
+
+ socket
+ .exceptionHandler(t -> {
+ log.error(t.getMessage(), t);
+ registry.values().forEach(MessageConsumer::unregister);
+ registry.clear();
+ })
+ .endHandler(v -> {
+ registry.values().forEach(MessageConsumer::unregister);
+ // normal close, trigger the event
+ checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, null));
+ registry.clear();
+ })
+ .frameHandler(frame -> {
+ // TODO: this could be an [], in this case, after parsing, we should loop and call for each element the
+ // code bellow.
+
+ // One idea from vs-jsonrpcstream was the use of content-types, so define how the message was formated
+ // by default json (like in the spec) but microsoft was suggesting messagepack as alternative. I'm not
+ // sure if we should implement this. The TCP parser was accounting for it, but is it a good idea? maybe not?
+
+ // not handling CLOSE frames here, endHandler will be invoked on the socket later
+ // ping frames are automatically handled by websockets so safe to ignore here
+ if (frame.isClose() || frame.isPing()) {
+ return;
+ }
+
+ final JsonObject msg = new JsonObject(frame.binaryData());
+ if (this.isInvalid(msg)) {
+ return;
+ }
+
+ final String method = msg.getString("method");
+ final Object id = msg.getValue("id");
+
+ // TODO: we should wrap the socket in order to override the "write" method to write a text frame
+ // TODO: the current WriteStream assumes binary frames which are harder to handle on the browser
+ // TODO: maybe we could make this configurable (binary/text)
+
+ // if we create a wraper, say an interface:
+ // interface SocketWriter { write(Buffer buff) }
+ // then we can create specific implementation wrappers for all kinds of sockets, netSocket, webSocket (binary or text)
+
+ // given that the wraper is at the socket level (it's not that heavy in terms of garbage collection, 1 extra object per connection.
+ // And a connection is long lasting, not like HTTP
+
+ dispatch(
+ consumer,
+ method,
+ id,
+ msg,
+ registry,
+ replies);
+ });
+ },
+ // on failure
+ socket::close
+ );
+ }
+
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
index b5e1f3e..3af18c3 100644
--- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
@@ -23,6 +23,12 @@
import io.vertx.core.json.JsonObject;
/**
+ * TODO: refactor this whole thing to be simpler. Avoid the header's parsing, that is probably a bad idea it was very VisualStudio specific
+ * once we do that, don't rely on line endings as end of message. Instead we need to locate the end of a message.
+ * To locate the end of the message, we need to count braces. If a message starts with "{" we increase the counter,
+ * every time we see "}" we decrease. If we reach 0 it's a full message. If we ever go negative we're on a broken state.
+ * The same for "[" as jsonrpc batches are just an array of messages
+ *
* Simple LV parser
*
* @author Paulo Lopes
@@ -64,6 +70,7 @@ public void handle(Buffer buffer) {
if (remainingBytes - 4 >= length) {
// we have a complete message
try {
+ // TODO: this is wrong, we can have both JsonObject or JsonArray
client.handle(Future.succeededFuture(new JsonObject(_buffer.getString(_offset, _offset + length))));
} catch (DecodeException e) {
// bad json
@@ -111,4 +118,4 @@ private void append(Buffer newBuffer) {
private int bytesRemaining() {
return (_buffer.length() - _offset) < 0 ? 0 : (_buffer.length() - _offset);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java
new file mode 100644
index 0000000..fd62041
--- /dev/null
+++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp.impl.protocol;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.ReplyException;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+
+import java.util.function.Consumer;
+
+/**
+ * Helper class to format and send frames over a socket
+ *
+ * @author Paulo Lopes
+ */
+public class JsonRPCHelper {
+
+ private JsonRPCHelper() {
+ }
+
+ // TODO: Should we refactor this helpers to return the buffer with the encoded message and let the caller perform
+ // the write? This would allow the caller to differentiate from a binary write from a text write?
+ // The same applies to all methods on this helper class
+ public static void request(String method, Object id, JsonObject params, MultiMap headers, Consumer handler) {
+
+ final JsonObject payload = new JsonObject().put("jsonrpc", "2.0");
+
+ if (method == null) {
+ throw new IllegalStateException("method cannot be null");
+ }
+
+ payload.put("method", method);
+
+ if (id != null) {
+ payload.put("id", id);
+ }
+
+ if (params != null) {
+ payload.put("params", params.copy());
+ }
+
+ // write
+ if (headers != null) {
+ headers.forEach(entry -> {
+ handler.accept(
+ Buffer.buffer(entry.getKey()).appendString(": ").appendString(entry.getValue()).appendString("\r\n")
+ );
+ });
+ // end of headers
+ handler.accept(Buffer.buffer("\r\n"));
+ }
+
+ handler.accept(payload.toBuffer().appendString("\r\n"));
+ }
+
+ public static void request(String method, Object id, JsonObject params, Consumer handler) {
+ request(method, id, params, null, handler);
+ }
+
+ public static void request(String method, Object id, Consumer handler) {
+ request(method, id, null, null, handler);
+ }
+
+ public static void request(String method, Consumer handler) {
+ request(method, null, null, null, handler);
+ }
+
+ public static void request(String method, JsonObject params, Consumer handler) {
+ request(method, null, params, null, handler);
+ }
+
+ public static void response(Object id, Object result, Consumer handler) {
+ final JsonObject payload = new JsonObject()
+ .put("jsonrpc", "2.0")
+ .put("id", id)
+ .put("result", result);
+
+ handler.accept(payload);
+ }
+
+ public static void error(Object id, Number code, String message, Consumer handler) {
+ final JsonObject payload = new JsonObject()
+ .put("jsonrpc", "2.0")
+ .put("id", id);
+
+ final JsonObject error = new JsonObject();
+ payload.put("error", error);
+
+ if (code != null) {
+ error.put("code", code);
+ }
+
+ if (message != null) {
+ error.put("message", message);
+ }
+
+ handler.accept(payload);
+ }
+
+ public static void error(Object id, ReplyException failure, Consumer handler) {
+ error(id, failure.failureCode(), failure.getMessage(), handler);
+ }
+
+ public static void error(Object id, String message, Consumer handler) {
+ error(id, -32000, message, handler);
+ }
+}
diff --git a/src/main/resources/http.html b/src/main/resources/http.html
new file mode 100644
index 0000000..ad11d59
--- /dev/null
+++ b/src/main/resources/http.html
@@ -0,0 +1,21 @@
+
+
+
+ HTTP Bridge Example
+
+
+
+
+
diff --git a/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json b/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json
new file mode 100644
index 0000000..c5ab7ee
--- /dev/null
+++ b/src/main/resources/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json
@@ -0,0 +1,41 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://vertx.io/jsonrpc.schema.json",
+ "title": "Vert.x Event Bus Bridge JSON-RPC 2.0 Specification",
+ "description": "JSON-RPC schema to validate messages sent to a Vert.x Event Bus Bridge",
+ "anyOf": [
+ { "$ref": "#/definitions/request" },
+ {
+ "type": "array",
+ "items": { "$ref": "#/definitions/request" }
+ }
+ ],
+ "definitions": {
+ "request": {
+ "type": "object",
+ "properties": {
+ "jsonrpc": {
+ "description": "A String specifying the version of the JSON-RPC protocol. MUST be exactly \"2.0\".",
+ "const": "2.0"
+ },
+ "method": {
+ "description": "A String containing the name of the method to be invoked. Method names that begin with the word rpc followed by a period character (U+002E or ASCII 46) are reserved for rpc-internal methods and extensions and MUST NOT be used for anything else.",
+ "type": "string"
+ },
+ "params": {
+ "description": "A Structured value that holds the parameter values to be used during the invocation of the method. This member MAY be omitted.",
+ "type": ["object", "array"]
+ },
+ "id": {
+ "description": "An identifier established by the Client that MUST contain a String, Number, or NULL value if included. If it is not included it is assumed to be a notification. The value SHOULD normally not be Null and Numbers SHOULD NOT contain fractional parts.",
+ "type": ["string", "integer", "null"]
+ }
+ },
+ "required": [
+ "jsonrpc",
+ "method"
+ ],
+ "additionalProperties": false
+ }
+ }
+}
diff --git a/src/main/resources/sse.html b/src/main/resources/sse.html
new file mode 100644
index 0000000..2806f0e
--- /dev/null
+++ b/src/main/resources/sse.html
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
diff --git a/src/main/resources/ws.html b/src/main/resources/ws.html
new file mode 100644
index 0000000..80591a1
--- /dev/null
+++ b/src/main/resources/ws.html
@@ -0,0 +1,30 @@
+
+
+
+ Websockets Bridge Example
+
+
+
+
+
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java
new file mode 100644
index 0000000..e952094
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/HttpJsonRPCStreamEventBusBridgeImplTest.java
@@ -0,0 +1,404 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetSocket;
+import io.vertx.ext.bridge.BridgeOptions;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser;
+import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.ext.web.client.WebClient;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*;
+
+@RunWith(VertxUnitRunner.class)
+public class HttpJsonRPCStreamEventBusBridgeImplTest {
+
+ @Rule
+ public RunTestOnContext rule = new RunTestOnContext();
+
+ private final Handler> eventHandler = event -> event.complete(true);
+
+ @Before
+ public void before(TestContext should) {
+ final Async test = should.async();
+ final Vertx vertx = rule.vertx();
+
+ vertx.eventBus().consumer("hello",
+ (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+
+ vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ vertx.createHttpServer().requestHandler(JsonRPCStreamEventBusBridge.httpSocketHandler(vertx,
+ new JsonRPCBridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("hello")).addInboundPermitted(
+ new PermittedOptions().setAddress("echo")).addInboundPermitted(
+ new PermittedOptions().setAddress("test")).addOutboundPermitted(
+ new PermittedOptions().setAddress("echo")).addOutboundPermitted(
+ new PermittedOptions().setAddress("test")).addOutboundPermitted(new PermittedOptions().setAddress("ping")),
+ eventHandler)).listen(7000, res -> {
+ should.assertTrue(res.succeeded());
+ test.complete();
+ });
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendVoidMessage(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ final WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ client.close();
+ test.complete();
+ });
+
+ request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoHandlers(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ final WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ request("send", "#backtrack",
+ new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testErrorReply(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ final WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ msg.fail(0, "oops!");
+ });
+
+ request("send", "#backtrack",
+ new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrack(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ request("send", "#backtrack",
+ new JsonObject().put("address", "hello").put("body", new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrackTimeout(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ // This does not reply and will provoke a timeout
+ vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ });
+
+ JsonObject headers = new JsonObject().put("timeout", 100L);
+
+ request("send", "#backtrack", new JsonObject().put("address", "test").put("headers", headers).put("body",
+ new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject error = frame.getJsonObject("error");
+
+ should.assertEquals(
+ "Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test",
+ error.getString("message"));
+ should.assertEquals(-1, error.getInteger("code"));
+
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithDuplicateReplyID(TestContext should) {
+ // replies must always return to the same origin
+
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("third-party-receiver", msg -> should.fail());
+
+ request("send", "third-party-receiver",
+ new JsonObject().put("address", "hello").put("body", new JsonObject().put("value", "vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+ }
+
+ @Test(timeout = 10_000L)
+ @Ignore
+ public void testRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ final AtomicInteger messageCount = new AtomicInteger(0);
+
+ request("register", "#backtrack", new JsonObject().put("address", "echo"),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ // 2 messages will arrive
+ // 1) ACK for register message
+ // 2) MESSAGE for echo
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else {
+ // reply for echo message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }
+ }).onFailure(should::fail));
+
+ // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this
+ // remote consumer
+
+ request("publish", "#backtrack",
+ new JsonObject().put("address", "echo").put("body", new JsonObject().put("value", "Vert.x")),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ }).onFailure(should::fail));
+
+ }
+
+ @Test(timeout = 10_000L)
+ @Ignore
+ public void testUnRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ final String address = "test";
+ // 4 replies will arrive:
+ // 1). ACK for register
+ // 2). ACK for publish
+ // 3). message published to test
+ // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered.
+ final AtomicInteger messageCount = new AtomicInteger(0);
+ final AtomicInteger messageCount2 = new AtomicInteger(0);
+ final StreamParser parser = new StreamParser().exceptionHandler(should::fail).handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else if (messageCount.get() == 1) {
+ // got message, then unregister the handler
+ should.assertFalse(frame.containsKey("error"));
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ request("unregister", "#backtrack", new JsonObject().put("address", address),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame2 = handler.bodyAsJsonObject();
+ if (messageCount2.get() == 0) {
+ // ACK for publish message
+ should.assertFalse(frame2.containsKey("error"));
+ should.assertTrue(frame2.containsKey("result"));
+ should.assertEquals("#backtrack", frame2.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount2.compareAndSet(0, 1));
+ }
+ else {
+ // ACK for unregister message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time error reply for send message is expected
+ should.assertTrue(messageCount.compareAndSet(3, 4));
+
+ request("send", "#backtrack", new JsonObject().put("address", address).put("body",
+ new JsonObject().put("value", "This will fail anyway!")), buffer1 -> {
+ });
+ }
+ }).onFailure(should::fail));
+ }
+ else {
+ // TODO: Check error handling of bridge for consistency
+ // consumer on 'test' has been unregistered, send message will fail.
+ should.assertTrue(frame.containsKey("error"));
+ JsonObject error = frame.getJsonObject("error");
+ should.assertEquals(-1, error.getInteger("code"));
+ should.assertEquals("No handlers for address test", error.getString("message"));
+
+ client.close();
+ test.complete();
+ }
+ });
+
+ request("register", "#backtrack", new JsonObject().put("address", address),
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount.compareAndSet(1, 2));
+ }).onFailure(should::fail));
+
+ request("publish", "#backtrack",
+ new JsonObject().put("address", address).put("body", new JsonObject().put("value", "Vert.x")), buffer -> {
+ });
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendPing(TestContext should) {
+ final Vertx vertx = rule.vertx();
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+
+ request("ping", "#backtrack",
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ should.assertEquals("pong", frame.getString("result"));
+ client.close();
+ test.complete();
+ }).onFailure(should::fail));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoAddress(TestContext should) {
+ final Vertx vertx = rule.vertx();
+
+ WebClient client = WebClient.create(vertx);
+ final Async test = should.async();
+ final AtomicBoolean errorOnce = new AtomicBoolean(false);
+
+ request("send", "#backtrack",
+ buffer -> client.post(7000, "localhost", "/").sendBuffer(buffer).onSuccess(handler -> {
+ JsonObject frame = handler.bodyAsJsonObject();
+ if (!errorOnce.compareAndSet(false, true)) {
+ should.fail("Client gets error message twice!");
+ }
+ else {
+ should.assertTrue(frame.containsKey("error"));
+ should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message"));
+ vertx.setTimer(200, l -> {
+ client.close();
+ test.complete();
+ });
+ }
+ }).onFailure(should::fail));
+ }
+
+}
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java
new file mode 100644
index 0000000..3b8b519
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/InteropWebSocketServer.java
@@ -0,0 +1,64 @@
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.HttpJsonRPCStreamEventBusBridgeImpl;
+
+public class InteropWebSocketServer extends AbstractVerticle {
+
+ // To test just run this application from the IDE and then open the browser on http://localhost:8080
+ // later we can also automate this with a vert.x web client, I'll show you next week how to bootstrap it.
+ public static void main(String[] args) {
+ Vertx.vertx().deployVerticle(new InteropWebSocketServer());
+ }
+
+ @Override
+ public void start(Promise start) {
+ // just to have some messages flowing around
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+ vertx.setPeriodic(1000L, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ HttpJsonRPCStreamEventBusBridgeImpl bridge = (HttpJsonRPCStreamEventBusBridgeImpl) JsonRPCStreamEventBusBridge.httpSocketHandler(
+ vertx,
+ new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping")),
+ null
+ );
+
+ vertx
+ .createHttpServer()
+ .requestHandler(req -> {
+ // this is where any http request will land
+ // serve the base HTML application
+ if ("/".equals(req.path())) {
+ req.response()
+ .putHeader(HttpHeaders.CONTENT_TYPE, "text/html")
+ .sendFile("ws.html");
+ } else if ("/jsonrpc".equals(req.path())){
+ bridge.handle(req);
+ } else if ("/jsonrpc-sse".equals(req.path())) {
+ JsonObject params = new JsonObject().put("params", new JsonObject().put("address", "ping"));
+ bridge.handleSSE(req, (int) (Math.random() * 100_000), params);
+ } else {
+ req.response().setStatusCode(404).end("Not Found");
+ }
+ })
+ .listen(8080)
+ .onFailure(start::fail)
+ .onSuccess(server -> {
+ System.out.println("Server listening at http://localhost:8080");
+ start.complete();
+ });
+ }
+}
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java
new file mode 100644
index 0000000..8db0ff0
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/StreamParserTest.java
@@ -0,0 +1,61 @@
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(VertxUnitRunner.class)
+public class StreamParserTest {
+
+ @Rule
+ public RunTestOnContext rule = new RunTestOnContext();
+
+ @Test(timeout = 30_000)
+ public void testParseSimple(TestContext should) {
+ final Async test = should.async();
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ // extra line feed and carriage return are ignored
+ should.assertEquals("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}", body.toString());
+ test.complete();
+ });
+
+ parser.handle(Buffer.buffer(
+ "\r\n" +
+ "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}"));
+ }
+
+ @Test(timeout = 30_000)
+ public void testParseSimpleWithPreambleFail(TestContext should) {
+ final Async test = should.async();
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(err -> test.complete())
+ .handler(body -> should.fail("There is something else than JSON in the preamble of the buffer"));
+
+ parser.handle(Buffer.buffer(
+ "Content-Length: 38\r\n" +
+ "Content-Type: application/vscode-jsonrpc;charset=utf-8\r\n" +
+ "\r\n" +
+ "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}"));
+ }
+
+ @Test(timeout = 30_000)
+ public void testParseSimpleHeaderless(TestContext should) {
+ final Async test = should.async();
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ System.out.println(body.toString());
+ test.complete();
+ });
+
+ parser.handle(Buffer.buffer("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"hi\"}\r\n"));
+ }
+}
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java
new file mode 100644
index 0000000..403dd9e
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TCPJsonRPCStreamEventBusBridgeImplTest.java
@@ -0,0 +1,677 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetSocket;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*;
+
+@RunWith(VertxUnitRunner.class)
+public class TCPJsonRPCStreamEventBusBridgeImplTest {
+
+ @Rule
+ public RunTestOnContext rule = new RunTestOnContext();
+
+ private final Handler> eventHandler = event -> event.complete(true);
+
+ @Before
+ public void before(TestContext should) {
+ final Async test = should.async();
+ final Vertx vertx = rule.vertx();
+
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+
+ vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ vertx.createNetServer()
+ .connectHandler(JsonRPCStreamEventBusBridge.netSocketHandler(
+ vertx,
+ new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping")),
+ eventHandler))
+ .listen(7000, res -> {
+ should.assertTrue(res.succeeded());
+ test.complete();
+ });
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendVoidMessage(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ client.close();
+ test.complete();
+ });
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+ request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), socket::write);
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoHandlers(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ }).exceptionHandler(should::fail);
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testErrorReply(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ msg.fail(0, "oops!");
+ });
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendsFromOtherSideOfBridge(TestContext should) {
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+
+ // 2 replies will arrive:
+ // 1). acknowledge register
+ // 2). greeting
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("hi", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "ping"),
+ socket::write
+ );
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrack(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "hello")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrackTimeout(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ // This does not reply and will provoke a timeout
+ vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ });
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject error = frame.getJsonObject("error");
+
+ should.assertEquals("Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test", error.getString("message"));
+ should.assertEquals(-1, error.getInteger("code"));
+
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ JsonObject headers = new JsonObject().put("timeout", 100L);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("headers", headers)
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithDuplicateReplyID(TestContext should) {
+ // replies must always return to the same origin
+
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ vertx.eventBus().consumer("third-party-receiver", msg -> should.fail());
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+
+ request(
+ "send",
+ "third-party-receiver",
+ new JsonObject()
+ .put("address", "hello")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+ final AtomicInteger messageCount = new AtomicInteger(0);
+
+ // 3 messages will arrive
+ // 1) ACK for register message
+ // 2) ACK for publish message
+ // 3) MESSAGE for echo
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else if (messageCount.get() == 1) {
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount.compareAndSet(1, 2));
+ } else {
+ // reply for echo message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "echo"),
+ socket::write
+ );
+
+ // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this
+ // remote consumer
+
+ request(
+ "publish",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "echo")
+ .put("body", new JsonObject().put("value", "Vert.x")),
+ socket::write
+ );
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testUnRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+
+ final String address = "test";
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ // 4 replies will arrive:
+ // 1). ACK for register
+ // 2). ACK for publish
+ // 3). message published to test
+ // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered.
+ final AtomicInteger messageCount = new AtomicInteger(0);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else if (messageCount.get() == 1) {
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount.compareAndSet(1, 2));
+ } else if (messageCount.get() == 2) {
+ // got message, then unregister the handler
+ should.assertFalse(frame.containsKey("error"));
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ // increment message count so that next time ACK for unregister is expected
+ should.assertTrue(messageCount.compareAndSet(2, 3));
+
+ request("unregister", "#backtrack", new JsonObject().put("address", address), socket::write);
+ } else if (messageCount.get() == 3) {
+ // ACK for unregister message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time error reply for send message is expected
+ should.assertTrue(messageCount.compareAndSet(3, 4));
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address)
+ .put("body", new JsonObject().put("value", "This will fail anyway!")),
+ socket::write
+ );
+ } else {
+ // TODO: Check error handling of bridge for consistency
+ // consumer on 'test' has been unregistered, send message will fail.
+ should.assertTrue(frame.containsKey("error"));
+ JsonObject error = frame.getJsonObject("error");
+ should.assertEquals(-1, error.getInteger("code"));
+ should.assertEquals("No handlers for address test", error.getString("message"));
+
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ request(
+ "publish",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address)
+ .put("body", new JsonObject().put("value", "Vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testReplyFromClient(TestContext should) {
+ // Send a request from java and get a response from the client
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+ final String address = "test";
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", result.getString("replyAddress"))
+ .put("body", new JsonObject().put("value", "You got it")),
+ socket::write
+ );
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ // There is now way to know that the register actually happened, wait a bit before sending.
+ vertx.setTimer(500L, timerId -> {
+ vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> {
+ should.assertTrue(respMessage.succeeded());
+ should.assertEquals("You got it", respMessage.result().body().getString("value"));
+ client.close();
+ test.complete();
+ });
+ });
+
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testFailFromClient(TestContext should) {
+ // Send a request from java and get a response from the client
+ final Vertx vertx = rule.vertx();
+
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+ final String address = "test";
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ request(
+ "send",
+ null,
+ new JsonObject()
+ .put("address", result.getString("replyAddress"))
+ .put("error", new JsonObject().put("failureCode", 1234).put("message", "ooops!")),
+ socket::write
+ );
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ // There is now way to know that the register actually happened, wait a bit before sending.
+ vertx.setTimer(500L, timerId -> {
+ vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> {
+ should.assertTrue(respMessage.failed());
+ should.assertEquals("ooops!", respMessage.cause().getMessage());
+ client.close();
+ test.complete();
+ });
+ });
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendPing(TestContext should) {
+ final Vertx vertx = rule.vertx();
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+ // MESSAGE for ping
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ should.assertEquals("pong", frame.getString("result"));
+ client.close();
+ test.complete();
+ });
+
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+ socket.handler(parser);
+ request(
+ "ping",
+ "#backtrack",
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoAddress(TestContext should) {
+ final Vertx vertx = rule.vertx();
+
+ NetClient client = vertx.createNetClient();
+ final Async test = should.async();
+ final AtomicBoolean errorOnce = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ if (!errorOnce.compareAndSet(false, true)) {
+ should.fail("Client gets error message twice!");
+ } else {
+ should.assertTrue(frame.containsKey("error"));
+ should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message"));
+ vertx.setTimer(200, l -> {
+ client.close();
+ test.complete();
+ });
+ }
+ });
+ client.connect(7000, "localhost", should.asyncAssertSuccess(socket -> {
+ socket.handler(parser);
+ request(
+ "send",
+ "#backtrack",
+ socket::write
+ );
+ }));
+ }
+
+}
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java
index 175c3e8..7415c8e 100644
--- a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java
@@ -67,11 +67,11 @@ public void before(TestContext context) {
.setSsl(true)
.setTrustStoreOptions(sslKeyPairCerts.getServerTrustStore())
.setKeyStoreOptions(sslKeyPairCerts.getServerKeyStore()),
- be -> {
+ (BridgeEvent be) -> {
logger.info("Handled a bridge event " + be.getRawMessage());
if (be.socket().isSsl()) {
try {
- for (Certificate c : be.socket().peerCertificates()) {
+ for (Certificate c : be.socket().peerCertificates()) {
logger.info(((X509Certificate)c).getSubjectDN().toString());
}
} catch (SSLPeerUnverifiedException e) {
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java
new file mode 100644
index 0000000..de7a0ad
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/ValidatorTest.java
@@ -0,0 +1,60 @@
+package io.vertx.ext.eventbus.bridge.tcp;
+
+
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.json.schema.*;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(VertxUnitRunner.class)
+ public class ValidatorTest {
+
+ @Rule
+ public RunTestOnContext rule = new RunTestOnContext();
+
+ @Test
+ public void testValidateSingle() {
+ String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json";
+
+ Validator validator = Validator.create(
+ JsonSchema.of(new JsonObject(rule.vertx().fileSystem().readFileBlocking(path))),
+ new JsonSchemaOptions()
+ .setDraft(Draft.DRAFT202012)
+ .setBaseUri("https://vertx.io")
+ );
+
+ JsonObject rpc = new JsonObject("{\"jsonrpc\": \"2.0\", \"method\": \"subtract\", \"params\": [42, 23], \"id\": 1}");
+
+ assertTrue(validator.validate(rpc).getValid());
+ }
+
+ @Test
+ public void testValidateBatch() {
+ String path = "io/vertx/ext/eventbus/bridge/tcp/impl/protocol/jsonrpc.schema.json";
+
+ Validator validator = Validator.create(
+ JsonSchema.of(new JsonObject(rule.vertx().fileSystem().readFileBlocking(path))),
+ new JsonSchemaOptions()
+ .setOutputFormat(OutputFormat.Basic)
+ .setDraft(Draft.DRAFT202012)
+ .setBaseUri("https://vertx.io")
+ );
+
+ JsonArray rpc = new JsonArray("[\n" +
+ " {\"jsonrpc\": \"2.0\", \"method\": \"sum\", \"params\": [1,2,4], \"id\": \"1\"},\n" +
+ " {\"jsonrpc\": \"2.0\", \"method\": \"notify_hello\", \"params\": [7]},\n" +
+ " {\"jsonrpc\": \"2.0\", \"method\": \"subtract\", \"params\": [42,23], \"id\": \"2\"},\n" +
+ " {\"jsonrpc\": \"2.0\", \"method\": \"foo.get\", \"params\": {\"name\": \"myself\"}, \"id\": \"5\"},\n" +
+ " {\"jsonrpc\": \"2.0\", \"method\": \"get_data\", \"id\": \"9\"} \n" +
+ " ]");
+
+ assertTrue(validator.validate(rpc).getValid());
+ }
+
+ }
diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java
new file mode 100644
index 0000000..abe0276
--- /dev/null
+++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/WebsocketJsonRPCStreamEventBusBridgeImplTest.java
@@ -0,0 +1,682 @@
+/*
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Apache License v2.0 which accompanies this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * The Apache License v2.0 is available at
+ * http://www.opensource.org/licenses/apache2.0.php
+ *
+ * You may elect to redistribute this code under either of these licenses.
+ */
+package io.vertx.ext.eventbus.bridge.tcp;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.WebSocketBase;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.bridge.PermittedOptions;
+import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.*;
+
+@RunWith(VertxUnitRunner.class)
+public class WebsocketJsonRPCStreamEventBusBridgeImplTest {
+
+
+ @Rule
+ public RunTestOnContext rule = new RunTestOnContext();
+
+ private final Handler> eventHandler = event -> event.complete(true);
+
+ @Before
+ public void before(TestContext should) {
+ final Async test = should.async();
+ final Vertx vertx = rule.vertx();
+
+ vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value"))));
+
+ vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body()));
+
+ vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi")));
+
+ final Handler bridge = JsonRPCStreamEventBusBridge.webSocketHandler(
+ vertx,
+ new JsonRPCBridgeOptions()
+ .addInboundPermitted(new PermittedOptions().setAddress("hello"))
+ .addInboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addInboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("echo"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("test"))
+ .addOutboundPermitted(new PermittedOptions().setAddress("ping")),
+ eventHandler
+ );
+
+ vertx
+ .createHttpServer()
+ .webSocketHandler(bridge::handle)
+ .listen(7000, res -> {
+ should.assertTrue(res.succeeded());
+ test.complete();
+ });
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendVoidMessage(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ client.close();
+ test.complete();
+ });
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket ->
+ request("send", new JsonObject().put("address", "test").put("body", new JsonObject().put("value", "vert.x")), socket::write)
+ ));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoHandlers(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ }).exceptionHandler(should::fail);
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testErrorReply(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ vertx.eventBus().consumer("test", (Message msg) -> {
+ msg.fail(0, "oops!");
+ });
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendsFromOtherSideOfBridge(TestContext should) {
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+
+ // 2 replies will arrive:
+ // 1). acknowledge register
+ // 2). greeting
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("hi", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "ping"),
+ socket::write
+ );
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrack(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Hello vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "hello")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithReplyBacktrackTimeout(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ // This does not reply and will provoke a timeout
+ vertx.eventBus().consumer("test", (Message msg) -> { /* Nothing! */ });
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertTrue(frame.containsKey("error"));
+ should.assertFalse(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject error = frame.getJsonObject("error");
+
+ should.assertEquals("Timed out after waiting 100(ms) for a reply. address: __vertx.reply.1, repliedAddress: test", error.getString("message"));
+ should.assertEquals(-1, error.getInteger("code"));
+
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+ JsonObject headers = new JsonObject().put("timeout", 100L);
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "test")
+ .put("headers", headers)
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendMessageWithDuplicateReplyID(TestContext should) {
+ // replies must always return to the same origin
+
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ vertx.eventBus().consumer("third-party-receiver", msg -> should.fail());
+
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ client.close();
+ test.complete();
+ });
+
+ socket.handler(parser);
+
+
+ request(
+ "send",
+ "third-party-receiver",
+ new JsonObject()
+ .put("address", "hello")
+ .put("body", new JsonObject().put("value", "vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+ final AtomicInteger messageCount = new AtomicInteger(0);
+
+ // 3 messages will arrive
+ // 1) ACK for register message
+ // 2) ACK for publish message
+ // 3) MESSAGE for echo
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else if (messageCount.get() == 1) {
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount.compareAndSet(1, 2));
+ } else {
+ // reply for echo message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ JsonObject result = frame.getJsonObject("result");
+
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "echo"),
+ socket::write
+ );
+
+ // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this
+ // remote consumer
+
+ request(
+ "publish",
+ "#backtrack",
+ new JsonObject()
+ .put("address", "echo")
+ .put("body", new JsonObject().put("value", "Vert.x")),
+ socket::write
+ );
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testUnRegister(TestContext should) {
+ // Send a request and get a response
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+
+ final String address = "test";
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ // 4 replies will arrive:
+ // 1). ACK for register
+ // 2). ACK for publish
+ // 3). message published to test
+ // 4). err of NO_HANDLERS because of consumer for 'test' is unregistered.
+ final AtomicInteger messageCount = new AtomicInteger(0);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (messageCount.get() == 0) {
+ // ACK for register message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time ACK for publish is expected
+ should.assertTrue(messageCount.compareAndSet(0, 1));
+ }
+ else if (messageCount.get() == 1) {
+ // ACK for publish message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time reply for echo message is expected
+ should.assertTrue(messageCount.compareAndSet(1, 2));
+ } else if (messageCount.get() == 2) {
+ // got message, then unregister the handler
+ should.assertFalse(frame.containsKey("error"));
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ // increment message count so that next time ACK for unregister is expected
+ should.assertTrue(messageCount.compareAndSet(2, 3));
+
+ request("unregister", "#backtrack", new JsonObject().put("address", address), socket::write);
+ } else if (messageCount.get() == 3) {
+ // ACK for unregister message
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ // increment message count so that next time error reply for send message is expected
+ should.assertTrue(messageCount.compareAndSet(3, 4));
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address)
+ .put("body", new JsonObject().put("value", "This will fail anyway!")),
+ socket::write
+ );
+ } else {
+ // TODO: Check error handling of bridge for consistency
+ // consumer on 'test' has been unregistered, send message will fail.
+ should.assertTrue(frame.containsKey("error"));
+ JsonObject error = frame.getJsonObject("error");
+ should.assertEquals(-1, error.getInteger("code"));
+ should.assertEquals("No handlers for address test", error.getString("message"));
+
+ client.close();
+ test.complete();
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ request(
+ "publish",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address)
+ .put("body", new JsonObject().put("value", "Vert.x")),
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testReplyFromClient(TestContext should) {
+ // Send a request from java and get a response from the client
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+ final String address = "test";
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ request(
+ "send",
+ "#backtrack",
+ new JsonObject()
+ .put("address", result.getString("replyAddress"))
+ .put("body", new JsonObject().put("value", "You got it")),
+ socket::write
+ );
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ // There is now way to know that the register actually happened, wait a bit before sending.
+ vertx.setTimer(500L, timerId -> {
+ vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> {
+ should.assertTrue(respMessage.succeeded());
+ should.assertEquals("You got it", respMessage.result().body().getString("value"));
+ client.close();
+ test.complete();
+ });
+ });
+
+ }));
+
+ }
+
+ @Test(timeout = 10_000L)
+ public void testFailFromClient(TestContext should) {
+ // Send a request from java and get a response from the client
+ final Vertx vertx = rule.vertx();
+
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+ final String address = "test";
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+
+ final AtomicBoolean ack = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ if (!ack.getAndSet(true)) {
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+ } else {
+ JsonObject result = frame.getJsonObject("result");
+ should.assertEquals("Vert.x", result.getJsonObject("body").getString("value"));
+
+ request(
+ "send",
+ null,
+ new JsonObject()
+ .put("address", result.getString("replyAddress"))
+ .put("error", new JsonObject().put("failureCode", 1234).put("message", "ooops!")),
+ socket::write
+ );
+ }
+ });
+
+ socket.handler(parser);
+
+ request(
+ "register",
+ "#backtrack",
+ new JsonObject()
+ .put("address", address),
+ socket::write
+ );
+
+ // There is now way to know that the register actually happened, wait a bit before sending.
+ vertx.setTimer(500L, timerId -> {
+ vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> {
+ should.assertTrue(respMessage.failed());
+ should.assertEquals("ooops!", respMessage.cause().getMessage());
+ client.close();
+ test.complete();
+ });
+ });
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSendPing(TestContext should) {
+ final Vertx vertx = rule.vertx();
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+ // MESSAGE for ping
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+
+ should.assertFalse(frame.containsKey("error"));
+ should.assertTrue(frame.containsKey("result"));
+ should.assertEquals("#backtrack", frame.getValue("id"));
+
+ should.assertEquals("pong", frame.getString("result"));
+ client.close();
+ test.complete();
+ });
+
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+ socket.handler(parser);
+ request(
+ "ping",
+ "#backtrack",
+ socket::write
+ );
+ }));
+ }
+
+ @Test(timeout = 10_000L)
+ public void testNoAddress(TestContext should) {
+ final Vertx vertx = rule.vertx();
+
+ HttpClient client = vertx.createHttpClient();
+ final Async test = should.async();
+ final AtomicBoolean errorOnce = new AtomicBoolean(false);
+ final StreamParser parser = new StreamParser()
+ .exceptionHandler(should::fail)
+ .handler(body -> {
+ JsonObject frame = new JsonObject(body);
+ if (!errorOnce.compareAndSet(false, true)) {
+ should.fail("Client gets error message twice!");
+ } else {
+ should.assertTrue(frame.containsKey("error"));
+ should.assertEquals("invalid_parameters", frame.getJsonObject("error").getString("message"));
+ vertx.setTimer(200, l -> {
+ client.close();
+ test.complete();
+ });
+ }
+ });
+ client.webSocket(7000, "localhost", "/", should.asyncAssertSuccess(socket -> {
+ socket.handler(parser);
+ request(
+ "send",
+ "#backtrack",
+ socket::write
+ );
+ }));
+ }
+
+}