Skip to content

Commit 1c75ffc

Browse files
committed
Use Consumer<JsonObject> instead of Consumer<Buffer> to write
1 parent c82d513 commit 1c75ffc

File tree

5 files changed

+25
-21
lines changed

5 files changed

+25
-21
lines changed

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/HttpJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.vertx.core.http.HttpHeaders;
99
import io.vertx.core.http.HttpServerRequest;
1010
import io.vertx.core.http.HttpServerResponse;
11+
import io.vertx.core.json.Json;
1112
import io.vertx.core.json.JsonObject;
1213
import io.vertx.ext.bridge.BridgeEventType;
1314
import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
@@ -62,16 +63,15 @@ public void handle(HttpServerRequest socket) {
6263
checkCallHook(() -> new BridgeEventImpl<>(BridgeEventType.SOCKET_CLOSED, null, socket));
6364
registry.clear();
6465
});
65-
Consumer<Buffer> writer;
66+
Consumer<JsonObject> writer;
6667
if (method.equals("register")) {
6768
response.setChunked(true);
68-
writer = body -> {
69-
JsonObject object = body.toJsonObject();
70-
response.write("event: " + object.getJsonObject("result").getString("address") + "\n");
71-
response.write("data: " + object.encode() + "\n\n");
69+
writer = payload -> {
70+
response.write("event: " + payload.getJsonObject("result").getString("address") + "\n");
71+
response.write("data: " + payload.encode() + "\n\n");
7272
};
7373
} else {
74-
writer = response::end;
74+
writer = payload -> response.end(payload.encode());
7575
}
7676
dispatch(writer, method, id, msg, registry, replies);
7777
});

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/JsonRPCStreamEventBusBridgeImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected boolean isInvalid(JsonObject object) {
9696
return false;
9797
}
9898

99-
protected void dispatch(Consumer<Buffer> socket, String method, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
99+
protected void dispatch(Consumer<JsonObject> socket, String method, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
100100
switch (method) {
101101
case "send":
102102
checkCallHook(
@@ -135,7 +135,7 @@ protected void dispatch(Consumer<Buffer> socket, String method, Object id, JsonO
135135
}
136136
}
137137

138-
protected void unregister(Consumer<Buffer> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
138+
protected void unregister(Consumer<JsonObject> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
139139
final JsonObject params = msg.getJsonObject("params", EMPTY);
140140
final String address = params.getString("address");
141141

@@ -160,7 +160,7 @@ protected void unregister(Consumer<Buffer> socket, Object id, JsonObject msg, Ma
160160
}
161161
}
162162

163-
protected void register(Consumer<Buffer> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
163+
protected void register(Consumer<JsonObject> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
164164
final JsonObject params = msg.getJsonObject("params", EMPTY);
165165
final String address = params.getString("address");
166166

@@ -205,7 +205,7 @@ protected void register(Consumer<Buffer> socket, Object id, JsonObject msg, Map<
205205
}
206206
}
207207

208-
protected void publish(Consumer<Buffer> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
208+
protected void publish(Consumer<JsonObject> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
209209
final JsonObject params = msg.getJsonObject("params", EMPTY);
210210
final String address = params.getString("address");
211211

@@ -228,7 +228,7 @@ protected void publish(Consumer<Buffer> socket, Object id, JsonObject msg, Map<S
228228
}
229229
}
230230

231-
protected void send(Consumer<Buffer> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
231+
protected void send(Consumer<JsonObject> socket, Object id, JsonObject msg, Map<String, MessageConsumer<?>> registry, Map<String, Message<JsonObject>> replies) {
232232
final JsonObject params = msg.getJsonObject("params", EMPTY);
233233
final String address = params.getString("address");
234234

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TCPJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.Map;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.function.Consumer;
1617

1718
public class TCPJsonRPCStreamEventBusBridgeImpl extends JsonRPCStreamEventBusBridgeImpl<NetSocket> {
1819

@@ -59,8 +60,10 @@ public void handle(NetSocket socket) {
5960
final String method = msg.getString("method");
6061
final Object id = msg.getValue("id");
6162

63+
Consumer<JsonObject> writer = payload -> socket.write(payload.toBuffer().appendString("\r\n"));
64+
6265
dispatch(
63-
socket::write,
66+
writer,
6467
method,
6568
id,
6669
msg,

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/WebsocketJsonRPCStreamEventBusBridgeImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ public void handle(WebSocketBase socket) {
3232
final Map<String, MessageConsumer<?>> registry = new ConcurrentHashMap<>();
3333
final Map<String, Message<JsonObject>> replies = new ConcurrentHashMap<>();
3434

35-
Consumer<Buffer> consumer;
35+
Consumer<JsonObject> consumer;
3636
if (options.getWebsocketsTextAsFrame()) {
37-
consumer = buffer -> socket.writeTextMessage(buffer.toString());
37+
consumer = payload -> socket.writeTextMessage(payload.encode());
3838
} else {
39-
consumer = socket::writeBinaryMessage;
39+
consumer = payload -> socket.writeBinaryMessage(payload.toBuffer());
4040
}
4141

4242
socket

src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/JsonRPCHelper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.vertx.core.MultiMap;
1919
import io.vertx.core.buffer.Buffer;
2020
import io.vertx.core.eventbus.ReplyException;
21+
import io.vertx.core.json.Json;
2122
import io.vertx.core.json.JsonObject;
2223

2324
import java.util.function.Consumer;
@@ -83,16 +84,16 @@ public static void request(String method, JsonObject params, Consumer<Buffer> ha
8384
request(method, null, params, null, handler);
8485
}
8586

86-
public static void response(Object id, Object result, Consumer<Buffer> handler) {
87+
public static void response(Object id, Object result, Consumer<JsonObject> handler) {
8788
final JsonObject payload = new JsonObject()
8889
.put("jsonrpc", "2.0")
8990
.put("id", id)
9091
.put("result", result);
9192

92-
handler.accept(payload.toBuffer().appendString("\r\n"));
93+
handler.accept(payload);
9394
}
9495

95-
public static void error(Object id, Number code, String message, Consumer<Buffer> handler) {
96+
public static void error(Object id, Number code, String message, Consumer<JsonObject> handler) {
9697
final JsonObject payload = new JsonObject()
9798
.put("jsonrpc", "2.0")
9899
.put("id", id);
@@ -108,14 +109,14 @@ public static void error(Object id, Number code, String message, Consumer<Buffer
108109
error.put("message", message);
109110
}
110111

111-
handler.accept(payload.toBuffer().appendString("\r\n"));
112+
handler.accept(payload);
112113
}
113114

114-
public static void error(Object id, ReplyException failure, Consumer<Buffer> handler) {
115+
public static void error(Object id, ReplyException failure, Consumer<JsonObject> handler) {
115116
error(id, failure.failureCode(), failure.getMessage(), handler);
116117
}
117118

118-
public static void error(Object id, String message, Consumer<Buffer> handler) {
119+
public static void error(Object id, String message, Consumer<JsonObject> handler) {
119120
error(id, -32000, message, handler);
120121
}
121122
}

0 commit comments

Comments
 (0)