diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java index 0a0963b..73e24fb 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java @@ -2,8 +2,11 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetSocket; import io.vertx.ext.bridge.BridgeEventType; @@ -12,6 +15,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; public class TCPJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl { @@ -48,27 +52,47 @@ public void handle(NetSocket socket) { log.error(t.getMessage(), t); }) .handler(buffer -> { - // TODO: handle content type - - // TODO: body may be an array (batching) - final JsonObject msg = new JsonObject(buffer); + final Object msg = Json.decodeValue(buffer); if (!this.validate(msg)) { + // TODO: should we log? or send an error? return; } - final String method = msg.getString("method"); - final Object id = msg.getValue("id"); - - dispatch( - socket::write, - method, - id, - msg, - registry, - replies); + // msg can now be either JsonObject or JsonArray or something else (e.g. String) + // we should only care about JsonObjects or JsonArrays + if (msg instanceof JsonObject) { + final JsonObject json = (JsonObject) msg; + dispatch(json, socket::write, registry, replies); + return; + } + if (msg instanceof JsonArray) { + final JsonArray json = (JsonArray) msg; + for (Object o : json) { + if (o instanceof JsonObject) { + // TODO: we need to try catch on ClassCastException to handle the case of bad batch like in the spec + // and return an error + final JsonObject jsonObject = (JsonObject) o; + dispatch(jsonObject, socket::write, registry, replies); + } + } + return; + } + // TODO: this is a fully broken request it's not a JsonObject or JsonArray, we need to return an error })); }, // on failure socket::close ); } + + private void dispatch(JsonObject msg, Consumer socket, Map> registry, Map> replies) { + final String method = msg.getString("method"); + final Object id = msg.getValue("id"); + dispatch( + socket, + method, + id, + msg, + registry, + replies); + } }