Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.bridge.BridgeEventType;
Expand All @@ -12,6 +15,7 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class TCPJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl<NetSocket> {

Expand Down Expand Up @@ -48,27 +52,47 @@ public void handle(NetSocket socket) {
log.error(t.getMessage(), t);
})
.handler(buffer -> {
// TODO: handle content type

// TODO: body may be an array (batching)
final JsonObject msg = new JsonObject(buffer);
final Object msg = Json.decodeValue(buffer);
if (!this.validate(msg)) {
// TODO: should we log? or send an error?
return;
}
final String method = msg.getString("method");
final Object id = msg.getValue("id");

dispatch(
socket::write,
method,
id,
msg,
registry,
replies);
// msg can now be either JsonObject or JsonArray or something else (e.g. String)
// we should only care about JsonObjects or JsonArrays
if (msg instanceof JsonObject) {
final JsonObject json = (JsonObject) msg;
dispatch(json, socket::write, registry, replies);
return;
}
if (msg instanceof JsonArray) {
final JsonArray json = (JsonArray) msg;
for (Object o : json) {
if (o instanceof JsonObject) {
// TODO: we need to try catch on ClassCastException to handle the case of bad batch like in the spec
// and return an error
final JsonObject jsonObject = (JsonObject) o;
dispatch(jsonObject, socket::write, registry, replies);
}
}
return;
}
// TODO: this is a fully broken request it's not a JsonObject or JsonArray, we need to return an error
}));
},
// on failure
socket::close
);
}

private void dispatch(JsonObject msg, Consumer<Buffer> socket, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
final String method = msg.getString("method");
final Object id = msg.getValue("id");
dispatch(
socket,
method,
id,
msg,
registry,
replies);
}
}