diff --git a/.github/maven-cd-settings.xml b/.github/maven-cd-settings.xml new file mode 100644 index 0000000..1ee3124 --- /dev/null +++ b/.github/maven-cd-settings.xml @@ -0,0 +1,63 @@ + + + + + false + + + + vertx-snapshots-repository + ${env.VERTX_NEXUS_USERNAME} + ${env.VERTX_NEXUS_PASSWORD} + + + + + + google-mirror + + true + + + + google-maven-central + GCS Maven Central mirror EU + https://maven-central.storage-download.googleapis.com/maven2/ + + true + + + false + + + + + + google-maven-central + GCS Maven Central mirror + https://maven-central.storage-download.googleapis.com/maven2/ + + true + + + false + + + + + + diff --git a/.github/maven-ci-settings.xml b/.github/maven-ci-settings.xml new file mode 100644 index 0000000..24b5bdb --- /dev/null +++ b/.github/maven-ci-settings.xml @@ -0,0 +1,55 @@ + + + + + false + + + + google-mirror + + true + + + + google-maven-central + GCS Maven Central mirror EU + https://maven-central.storage-download.googleapis.com/maven2/ + + true + + + false + + + + + + google-maven-central + GCS Maven Central mirror + https://maven-central.storage-download.googleapis.com/maven2/ + + true + + + false + + + + + + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..a08d993 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,51 @@ +name: CI +on: + push: + branches: + - master + - '[0-9]+.[0-9]+' + pull_request: + branches: + - master + - '[0-9]+.[0-9]+' + schedule: + - cron: '0 4 * * *' +jobs: + Test: + name: Run tests + strategy: + matrix: + os: [ubuntu-latest] + jdk: [8, 17] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Install JDK + uses: actions/setup-java@v2 + with: + java-version: ${{ matrix.jdk }} + distribution: temurin + - name: Run tests + run: mvn -s .github/maven-ci-settings.xml -q clean verify -B + Deploy: + name: Deploy to OSSRH + if: ${{ github.repository_owner == 'vert-x3' && (github.event_name == 'push' || github.event_name == 'schedule') }} + needs: Test + runs-on: ubuntu-latest + env: + VERTX_NEXUS_USERNAME: ${{ secrets.VERTX_NEXUS_USERNAME }} + VERTX_NEXUS_PASSWORD: ${{ secrets.VERTX_NEXUS_PASSWORD }} + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Install JDK + uses: actions/setup-java@v2 + with: + java-version: 8 + distribution: temurin + - name: Get project version + run: echo "PROJECT_VERSION=$(mvn org.apache.maven.plugins:maven-help-plugin:evaluate -Dexpression=project.version -B | grep -v '\[')" >> $GITHUB_ENV + - name: Maven deploy + if: ${{ endsWith(env.PROJECT_VERSION, '-SNAPSHOT') }} + run: mvn deploy -s .github/maven-cd-settings.xml -DskipTests -B diff --git a/.travis.deploy.artifacts.sh b/.travis.deploy.artifacts.sh deleted file mode 100644 index d6f9804..0000000 --- a/.travis.deploy.artifacts.sh +++ /dev/null @@ -1,5 +0,0 @@ -PROJECT_VERSION=$(mvn org.apache.maven.plugins:maven-help-plugin:evaluate -Dexpression=project.version -B | grep -v '\[') -if [[ "$PROJECT_VERSION" =~ .*SNAPSHOT ]] && [[ "${TRAVIS_BRANCH}" =~ ^master$|^[0-9]+\.[0-9]+$ ]] && [[ "${TRAVIS_PULL_REQUEST}" = "false" ]]; -then - mvn deploy -s .travis.maven.settings.xml -DskipTests -B; -fi diff --git a/.travis.maven.settings.xml b/.travis.maven.settings.xml deleted file mode 100644 index cd8c444..0000000 --- a/.travis.maven.settings.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - false - - - - sonatype-nexus-snapshots - ${env.SONATYPE_NEXUS_USERNAME} - ${env.SONATYPE_NEXUS_PASSWORD} - - - - - diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 064d240..0000000 --- a/.travis.yml +++ /dev/null @@ -1,31 +0,0 @@ -language: java -branches: - only: - - master - - /^\d+\.\d+$/ -cache: - directories: - - $HOME/.m2 -before_cache: - - rm -rf $HOME/.m2/repository/io/vertx/ -jobs: - include: - - stage: test - name: "OpenJDK 8" - jdk: openjdk8 - script: mvn -q clean verify -B - - if: type != pull_request - name: "OpenJDK 11" - jdk: openjdk11 - script: mvn -q clean verify -B - - stage: deploy - name: "Deploy to Sonatype's snapshots repository" - jdk: openjdk8 - if: type != pull_request AND env(SONATYPE_NEXUS_USERNAME) IS present - script: bash .travis.deploy.artifacts.sh -notifications: - email: - recipients: - - secure: "OFDvmbTbrfEnG9hBX/ekW6PXaM4bskrTayCr4umVM07HMU1YAKYkKfABwCf+6ccrA9MI/zXzeP8Pw5Pl8ARGA/Q2Eyv6C3o2bbXd5bk9aMa+kxE4Utv0pW6IMKH9BywR/eWfjYPKHs73fDzOC/u1h8xejFgIhddVgqCocVbb1cGmj6LwpP3YwjAOIoUbKnu4Ug7tOVD7r1laEL06g6kcUDMx1Me5I10rlx2n9kzn134esImP+dacgP9Pa0TjCqgxcjn0GYpi+txoLb3+4koEHcCm3isNdmaDJMRrQ38JlXTJ+EfG3ZykNKDaJMDE2iNqyvpwx1nn9TZWwcMF32UBlpH0ADzoLY93KGkpfSdaBkXCon1gyPiKIpgC6+HviO7K6dZnm2rUyfFrWEvGwb2l93m+La1WEIRucBWBxzw63ypvIv5qFDonWgvm6Byk9OUsPu1dF8zMKSjBAiDcqvefpXupZ+sAEIHOLio0XBYpBaSvrakjoL4gEv+TJiY4VhYfovbt3qEy47ov9ejtK9RevYcmY4/pTo9sfqIsi7oKJd8hEJT3Z7djHZ2MifgI9zrXMdLqbHogM30aIQ9y8RduIVEdbX39WogDPFH3F8y3b3kXHXaNA8K5+d/WYhE9Bq9QbHSk7vAu2sa1Jz2jk0nX4vAesNfwp2WMcelvqy/er1o=" - on_success: always - on_failure: always diff --git a/README.md b/README.md index 8e92963..a662394 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # vertx-tcp-eventbus-bridge -[![Build Status](https://travis-ci.org/vert-x3/vertx-tcp-eventbus-bridge.svg?branch=master)](https://travis-ci.org/vert-x3/vertx-tcp-eventbus-bridge) +[![Build Status](https://github.com/vert-x3/vertx-tcp-eventbus-bridge/workflows/CI/badge.svg?branch=master)](https://github.com/vert-x3/vertx-tcp-eventbus-bridge/actions?query=workflow%3ACI) This is a TCP eventbus bridge implementation. @@ -34,7 +34,7 @@ It is the response to the `ping` request from client to bridge. For a regular message, the object will also contain: * `address`: (string, required) Destination address. -* `body`: (object, required) Message content as a JSON object. +* `body`: (any, required) Message content as a JSON valid type. * `headers`: (object, optional) Headers as a JSON object with String values. * `replyAddress`: (string, optional) Address for replying to. * `send`: (boolean, required) Will be `true` if the message is a send, `false` if a publish. @@ -54,7 +54,7 @@ type is shown below, along with the companion keys for that type: #### `type: "send"`, `type: "publish"` * `address`: (string, required) Destination address -* `body`: (object, required) Message content as a JSON object. +* `body`: (any, required) Message content as a JSON valid type. * `headers`: (object, optional) Headers as a JSON object with String values. * `replyAddress`: (string, optional) Address for replying to. diff --git a/pom.xml b/pom.xml index 42f34b8..2f2718f 100644 --- a/pom.xml +++ b/pom.xml @@ -20,16 +20,16 @@ io.vertx vertx-ext-parent - 34 + 38 vertx-tcp-eventbus-bridge - 4.0.0-SNAPSHOT + 4.2.1-SNAPSHOT Vert.x TCP EventBus Bridge - 4.0.0-SNAPSHOT + 4.2.1-SNAPSHOT @@ -77,12 +77,18 @@ junit junit test - 4.12 + 4.13.1 io.vertx vertx-unit test + + org.bouncycastle + bcpkix-jdk15on + 1.65 + test + diff --git a/src/client/nodejs/lib/tcp-vertx-eventbus.js b/src/client/nodejs/lib/tcp-vertx-eventbus.js index e7cb3aa..9b8b4ff 100644 --- a/src/client/nodejs/lib/tcp-vertx-eventbus.js +++ b/src/client/nodejs/lib/tcp-vertx-eventbus.js @@ -47,7 +47,7 @@ function send(transport, message) { message = Buffer.from(message, "utf-8"); var msgLen = message.length; - var buffer = new Buffer(4); + var buffer = Buffer.alloc(4); buffer.writeInt32BE(msgLen, 0); transport.write(Buffer.concat([buffer, message], 4 + msgLen)); @@ -101,7 +101,7 @@ var EventBus = function (host, port, options) { this.onerror = console.error; // message buffer - var buffer = new Buffer(0); + var buffer = Buffer.alloc(0); var len = 0; this.transport.on('close', function () { diff --git a/src/client/nodejs/package.json b/src/client/nodejs/package.json index 07a5da8..13a1a99 100644 --- a/src/client/nodejs/package.json +++ b/src/client/nodejs/package.json @@ -2,7 +2,7 @@ "name": "vertx3-eventbus-client", "description": "Vert.x3 TCP event bus client", "dependencies": { - "node-uuid": "1.4.3" + "node-uuid": "1.4.8" }, "devDependencies": { "mocha": "2.3.3" diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java index e87bbfa..5abbdda 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridge.java @@ -18,6 +18,7 @@ import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.net.NetServerOptions; @@ -49,10 +50,9 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOpt /** * Listen on default port 7000 * - * @return self + * @return a future of the result */ - @Fluent - TcpEventBusBridge listen(); + Future listen(); /** * Listen on default port 7000 with a handler to report the state of the socket listen operation. @@ -69,10 +69,9 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOpt * @param port tcp port * @param address tcp address to the bind * - * @return self + * @return a future of the result */ - @Fluent - TcpEventBusBridge listen(int port, String address); + Future listen(int port, String address); /** * Listen on specific port and bind to specific address @@ -91,10 +90,9 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOpt * * @param port tcp port * - * @return self + * @return a future of the result */ - @Fluent - TcpEventBusBridge listen(int port); + Future listen(int port); /** * Listen on specific port @@ -116,6 +114,8 @@ static TcpEventBusBridge create(Vertx vertx, BridgeOptions options, NetServerOpt /** * Close the current socket. + * + * @return a future of the result */ - void close(); + Future close(); } diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java index 9ec01d3..ff2b111 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.java @@ -71,27 +71,19 @@ public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOption server.connectHandler(this::handler); } - public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions options, NetServerOptions netServerOptions) { - this(vertx, options, netServerOptions, null); - } - - @Override - public TcpEventBusBridge listen() { - server.listen(); - return this; + public Future listen() { + return server.listen().map(this); } @Override - public TcpEventBusBridge listen(int port) { - server.listen(port); - return this; + public Future listen(int port) { + return server.listen(port).map(this); } @Override - public TcpEventBusBridge listen(int port, String address) { - server.listen(port, address); - return this; + public Future listen(int port, String address) { + return server.listen(port, address).map(this); } @Override @@ -131,8 +123,8 @@ public TcpEventBusBridge listen(int port, Handler } private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map> registry, Map> replies) { - final JsonObject body = msg.getJsonObject("body"); + MessageConsumer> registry, Map> replies) { + final Object body = msg.getValue("body"); final JsonObject headers = msg.getJsonObject("headers"); @@ -147,11 +139,11 @@ private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map> res1) -> { + eb.request(address, body, deliveryOptions, (AsyncResult> res1) -> { if (res1.failed()) { sendErrFrame(address, replyAddress, (ReplyException) res1.cause(), socket); } else { - final Message response = res1.result(); + final Message response = res1.result(); final JsonObject responseHeaders = new JsonObject(); // clone the headers from / to @@ -198,7 +190,7 @@ private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map res1) -> { + registry.put(address, eb.consumer(address, (Message res1) -> { // save a reference to the message so tcp bridged messages can be replied properly if (res1.replyAddress() != null) { replies.put(res1.replyAddress(), res1); @@ -242,7 +234,7 @@ private void doSendOrPub(NetSocket socket, String address, JsonObject msg, Map> registry = new ConcurrentHashMap<>(); - final Map> replies = new ConcurrentHashMap<>(); + final Map> replies = new ConcurrentHashMap<>(); // create a protocol parser final FrameParser parser = new FrameParser(res -> { @@ -264,7 +256,7 @@ private void handler(NetSocket socket) { () -> { if (eventType != BridgeEventType.SOCKET_PING && address == null) { sendErrFrame("missing_address", socket); - log.error("msg does not have address: " + msg.toString()); + log.error("msg does not have address: " + msg); return; } doSendOrPub(socket, address, msg, registry, replies); @@ -293,8 +285,8 @@ public void close(Handler> handler) { } @Override - public void close() { - server.close(); + public Future close() { + return server.close(); } private void checkCallHook(Supplier eventSupplier, Runnable okAction, Runnable rejectAction) { @@ -329,7 +321,7 @@ private boolean checkMatches(boolean inbound, String address) { return checkMatches(inbound, address, null); } - private boolean checkMatches(boolean inbound, String address, Map> replies) { + private boolean checkMatches(boolean inbound, String address, Map> replies) { // special case, when dealing with replies the addresses are not in the inbound/outbound list but on // the replies registry if (replies != null && inbound && replies.containsKey(address)) { diff --git a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameHelper.java b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameHelper.java index 1376944..e07a38e 100644 --- a/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameHelper.java +++ b/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameHelper.java @@ -33,7 +33,7 @@ public class FrameHelper { private FrameHelper() {} - public static void sendFrame(String type, String address, String replyAddress, JsonObject headers, Boolean send, JsonObject body, WriteStream handler) { + public static void sendFrame(String type, String address, String replyAddress, JsonObject headers, Boolean send, Object body, WriteStream handler) { final JsonObject payload = new JsonObject().put("type", type); if (address != null) { @@ -59,11 +59,11 @@ public static void sendFrame(String type, String address, String replyAddress, J writeFrame(payload, handler); } - public static void sendFrame(String type, String address, String replyAddress, JsonObject body, WriteStream handler) { + public static void sendFrame(String type, String address, String replyAddress, Object body, WriteStream handler) { sendFrame(type, address, replyAddress, null, null, body, handler); } - public static void sendFrame(String type, String address, JsonObject body, WriteStream handler) { + public static void sendFrame(String type, String address, Object body, WriteStream handler) { sendFrame(type, address, null, null, null, body, handler); } diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/jsonrcp/JsonRPCIntegrationTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/jsonrcp/JsonRPCIntegrationTest.java new file mode 100644 index 0000000..f6dad7f --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/jsonrcp/JsonRPCIntegrationTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.ext.eventbus.bridge.jsonrcp; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.NetClient; +import io.vertx.ext.bridge.BridgeOptions; +import io.vertx.ext.bridge.PermittedOptions; +import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent; +import io.vertx.ext.eventbus.bridge.tcp.JsonRPCStreamEventBusBridge; +import io.vertx.ext.eventbus.bridge.tcp.impl.StreamParser; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.UUID; + +import static io.vertx.ext.eventbus.bridge.tcp.impl.protocol.JsonRPCHelper.request; + +@RunWith(VertxUnitRunner.class) +public class JsonRPCIntegrationTest { + + String id() { + return UUID.randomUUID().toString(); + } + + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + + private volatile Handler eventHandler = event -> event.complete(true); + + @Before + public void before(TestContext should) { + final Async test = should.async(); + final Vertx vertx = rule.vertx(); + + vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); + + vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); + + vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); + + vertx.createNetServer() + .connectHandler(JsonRPCStreamEventBusBridge.create( + vertx, + new BridgeOptions() + .addInboundPermitted(new PermittedOptions().setAddress("hello")) + .addInboundPermitted(new PermittedOptions().setAddress("echo")) + .addInboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("echo")) + .addOutboundPermitted(new PermittedOptions().setAddress("test")) + .addOutboundPermitted(new PermittedOptions().setAddress("ping")), + event -> eventHandler.handle(event))) + .listen(7000, res -> { + should.assertTrue(res.succeeded()); + test.complete(); + }); + } + + // client send message not expecting a response + @Test + public void testClientSendMessageNotExpectingResponse(TestContext context) { + final Vertx vertx = rule.vertx(); + NetClient client = vertx.createNetClient(); + final Async test = context.async(); + final String address = "test"; + + client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> { + final StreamParser parser = new StreamParser() + .exceptionHandler(context::fail) + .handler((mineType, body) ->{ + + JsonObject frame = new JsonObject(body); + + if ("message".equals(frame.getString("send"))) { + context.assertEquals(true, frame.getBoolean("send")); + context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value")); + + request( + "send", + "#backtrack", + new JsonObject() + .put("address", frame.getString("replyAddress")) + .put("body", new JsonObject().put("value", "You got it")), + socket); + } + }); + + + socket.handler(parser); + + request( + "register", + "#backtrack", + new JsonObject() + .put("address", address), + socket); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer(500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + context.assertTrue(respMessage.failed()); + client.close(); + test.complete(); + }); + }); + })); + } + + // client sends a message expecting a response + // client subscribes to a channel, server sends a reply + // client subscribes to a channel, server publishes a reply + // client subscribes to a channel, server sends multiple replies' + // client unsubscribes a channel, server sends a message, and it is not received on the client + // client send ping and expect pong + +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridgeTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridgeTest.java index b2edc67..109e421 100644 --- a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridgeTest.java +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/JsonRPCStreamEventBusBridgeTest.java @@ -28,6 +28,7 @@ import io.vertx.ext.unit.junit.RunTestOnContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -166,6 +167,7 @@ public void testErrorReply(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testSendsFromOtherSideOfBridge(TestContext should) { final Vertx vertx = rule.vertx(); NetClient client = vertx.createNetClient(); @@ -330,6 +332,7 @@ public void testSendMessageWithDuplicateReplyID(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testRegister(TestContext should) { // Send a request and get a response final Vertx vertx = rule.vertx(); @@ -381,6 +384,7 @@ public void testRegister(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testUnRegister(TestContext should) { // Send a request and get a response final Vertx vertx = rule.vertx(); @@ -450,6 +454,7 @@ public void testUnRegister(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testReplyFromClient(TestContext should) { // Send a request from java and get a response from the client final Vertx vertx = rule.vertx(); @@ -500,6 +505,7 @@ public void testReplyFromClient(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testFailFromClient(TestContext should) { // Send a request from java and get a response from the client final Vertx vertx = rule.vertx(); @@ -550,6 +556,7 @@ public void testFailFromClient(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testSendPing(TestContext should) { final Vertx vertx = rule.vertx(); NetClient client = vertx.createNetClient(); @@ -586,6 +593,7 @@ public void testSendPing(TestContext should) { } @Test(timeout = 10_000L) + @Ignore public void testNoAddress(TestContext should) { final Vertx vertx = rule.vertx(); diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/SSLKeyPairCerts.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/SSLKeyPairCerts.java new file mode 100644 index 0000000..46fe10d --- /dev/null +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/SSLKeyPairCerts.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. and others + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +package io.vertx.ext.eventbus.bridge.tcp; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.JksOptions; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.AlgorithmIdentifier; +import org.bouncycastle.asn1.x509.GeneralName; +import org.bouncycastle.asn1.x509.GeneralNames; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.crypto.params.AsymmetricKeyParameter; +import org.bouncycastle.crypto.util.PrivateKeyFactory; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder; +import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder; +import org.bouncycastle.operator.bc.BcContentSignerBuilder; +import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder; + +import java.io.ByteArrayOutputStream; +import java.math.BigInteger; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.Date; + +/** + * Util class to generate SSL key pairs and certificates for test purpose. + * + * All generated key pairs and certificates are in memory. + * + * @author Lin Gao + */ +public class SSLKeyPairCerts { + + private static final String SERVER_CERT_SUBJECT = "CN=Vertx Server, OU=Middleware Runtime, O=Red Hat, C=US"; + private static final String CLIENT_CERT_SUBJECT = "CN=Vertx Client, OU=Middleware Runtime, O=Red Hat, C=US"; + private static final String PASSWORD = "wibble"; + + private JksOptions serverKeyStore; + private JksOptions serverTrustStore; + private JksOptions clientKeyStore; + private JksOptions clientTrustStore; + + public SSLKeyPairCerts() { + } + + /** + * Creates 2 way SSL key pairs and certificates. + * + *

+ * This will initialize 4 KeyStores in JKS type: + *

    + *
  • server's keystore
  • + *
  • server's truststore with client's certificate imported
  • + *
  • client's keystore
  • + *
  • client's truststore with server's certificate imported
  • + *
+ *

+ * @return self + */ + public SSLKeyPairCerts createTwoWaySSL() { + try { + KeyPair serverRSAKeyPair = generateRSAKeyPair(2048); + Certificate serverCert = generateSelfSignedCert(SERVER_CERT_SUBJECT, serverRSAKeyPair); + + KeyPair clientRSAKeyPair = generateRSAKeyPair(2048); + Certificate clientCert = generateSelfSignedCert(CLIENT_CERT_SUBJECT, clientRSAKeyPair); + + KeyStore serverKeyStore = emptyJKSStore(PASSWORD); + serverKeyStore.setKeyEntry("localserver", serverRSAKeyPair.getPrivate(), PASSWORD.toCharArray(), new Certificate[]{serverCert}); + + KeyStore serverTrustStore = emptyJKSStore(PASSWORD); + serverTrustStore.setCertificateEntry("clientcert", clientCert); + + KeyStore clientKeyStore = emptyJKSStore(PASSWORD); + clientKeyStore.setKeyEntry("localclient", clientRSAKeyPair.getPrivate(), PASSWORD.toCharArray(), new Certificate[]{clientCert}); + + KeyStore clientTrustStore = emptyJKSStore(PASSWORD); + clientTrustStore.setCertificateEntry("servercert", serverCert); + + ByteArrayOutputStream serverKeyStoreOutputStream = new ByteArrayOutputStream(512); + serverKeyStore.store(serverKeyStoreOutputStream, PASSWORD.toCharArray()); + this.serverKeyStore = new JksOptions().setPassword(PASSWORD).setValue(Buffer.buffer(serverKeyStoreOutputStream.toByteArray())); + + ByteArrayOutputStream serverTrustStoreOutputStream = new ByteArrayOutputStream(512); + serverTrustStore.store(serverTrustStoreOutputStream, PASSWORD.toCharArray()); + this.serverTrustStore = new JksOptions().setPassword(PASSWORD).setValue(Buffer.buffer(serverTrustStoreOutputStream.toByteArray())); + + ByteArrayOutputStream clientKeyStoreOutputStream = new ByteArrayOutputStream(512); + clientKeyStore.store(clientKeyStoreOutputStream, PASSWORD.toCharArray()); + this.clientKeyStore = new JksOptions().setPassword(PASSWORD).setValue(Buffer.buffer(clientKeyStoreOutputStream.toByteArray())); + + ByteArrayOutputStream clientTrustStoreOutputStream = new ByteArrayOutputStream(512); + clientTrustStore.store(clientTrustStoreOutputStream, PASSWORD.toCharArray()); + this.clientTrustStore = new JksOptions().setPassword(PASSWORD).setValue(Buffer.buffer(clientTrustStoreOutputStream.toByteArray())); + } catch (Exception e) { + throw new RuntimeException("Cannot generate SSL key pairs and certificates", e); + } + return this; + } + + // refer to: https://github.com/vert-x3/vertx-config/blob/4.0.0-milestone4/vertx-config-vault/src/test/java/io/vertx/config/vault/utils/Certificates.java#L149 + private X509Certificate generateSelfSignedCert(String certSub, KeyPair keyPair) throws Exception { + final X509v3CertificateBuilder certificateBuilder = new X509v3CertificateBuilder( + new org.bouncycastle.asn1.x500.X500Name(certSub), + BigInteger.ONE, + new Date(System.currentTimeMillis() - 1000L * 60 * 60 * 24 * 30), + new Date(System.currentTimeMillis() + (1000L * 60 * 60 * 24 * 30)), + new X500Name(certSub), + SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()) + ); + final GeneralNames subjectAltNames = new GeneralNames(new GeneralName(GeneralName.iPAddress, "127.0.0.1")); + certificateBuilder.addExtension(org.bouncycastle.asn1.x509.Extension.subjectAlternativeName, false, subjectAltNames); + + final AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find("SHA1WithRSAEncryption"); + final AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId); + final BcContentSignerBuilder signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); + final AsymmetricKeyParameter keyp = PrivateKeyFactory.createKey(keyPair.getPrivate().getEncoded()); + final ContentSigner signer = signerBuilder.build(keyp); + final X509CertificateHolder x509CertificateHolder = certificateBuilder.build(signer); + final X509Certificate certificate = new JcaX509CertificateConverter().getCertificate(x509CertificateHolder); + certificate.checkValidity(new Date()); + certificate.verify(keyPair.getPublic()); + return certificate; + } + + private KeyPair generateRSAKeyPair(int keySize) throws NoSuchAlgorithmException { + final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); + keyPairGenerator.initialize(keySize); + return keyPairGenerator.genKeyPair(); + } + + private KeyStore emptyJKSStore(String password) throws Exception { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null, password.toCharArray()); + return ks; + } + + /** + * @return the server's keystore options + */ + public JksOptions getServerKeyStore() { + return serverKeyStore; + } + + /** + * @return the server's truststore options + */ + public JksOptions getServerTrustStore() { + return serverTrustStore; + } + + /** + * @return the client's keystore options + */ + public JksOptions getClientKeyStore() { + return clientKeyStore; + } + + /** + * @return the client's truststore options + */ + public JksOptions getClientTrustStore() { + return clientTrustStore; + } +} diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java index 7f2c826..175c3e8 100644 --- a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeEventTest.java @@ -17,6 +17,7 @@ import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; +import io.vertx.core.http.ClientAuth; import io.vertx.core.json.JsonObject; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; @@ -33,24 +34,26 @@ import org.junit.runner.RunWith; import javax.net.ssl.SSLPeerUnverifiedException; -import javax.security.cert.X509Certificate; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; @RunWith(VertxUnitRunner.class) public class TcpEventBusBridgeEventTest { + private static final Logger logger = LoggerFactory.getLogger(TcpEventBusBridgeEventTest.class); + private Vertx vertx; + private SSLKeyPairCerts sslKeyPairCerts; + @Before public void before(TestContext context) { vertx = Vertx.vertx(); final Async async = context.async(); - vertx.eventBus().consumer("hello", (Message msg) -> msg.reply(new JsonObject().put("value", "Hello " + msg.body().getString("value")))); - vertx.eventBus().consumer("echo", (Message msg) -> msg.reply(msg.body())); - vertx.setPeriodic(1000, __ -> vertx.eventBus().send("ping", new JsonObject().put("value", "hi"))); - + sslKeyPairCerts = new SSLKeyPairCerts().createTwoWaySSL(); TcpEventBusBridge bridge = TcpEventBusBridge.create( vertx, new BridgeOptions() @@ -60,28 +63,23 @@ public void before(TestContext context) { .addOutboundPermitted(new PermittedOptions().setAddress("echo")) .addOutboundPermitted(new PermittedOptions().setAddress("ping")), new NetServerOptions() - .setSsl(true).setTrustStoreOptions(new JksOptions().setPath("server.truststore").setPassword("wibble")) - .setKeyStoreOptions(new JksOptions().setPath("server.keystore").setPassword("wibble")), + .setClientAuth(ClientAuth.REQUEST) + .setSsl(true) + .setTrustStoreOptions(sslKeyPairCerts.getServerTrustStore()) + .setKeyStoreOptions(sslKeyPairCerts.getServerKeyStore()), be -> { - - Logger l = LoggerFactory.getLogger(this.getClass().getName()); - l.info("Handled a bridge event " + be.getRawMessage()); + logger.info("Handled a bridge event " + be.getRawMessage()); if (be.socket().isSsl()) { try { - for (X509Certificate c : be.socket().peerCertificateChain()) { - l.info(c.getSubjectDN().toString()); + for (Certificate c : be.socket().peerCertificates()) { + logger.info(((X509Certificate)c).getSubjectDN().toString()); } - } catch (SSLPeerUnverifiedException e) { - l.warn("Caught SSLPeerUnverifiedException when processing peerCertificateChain "); - //@fixme should have a test truststore/keystore that validates, the ones i made still throw this + throw new RuntimeException("Failed to get peer certificates chain", e); } } - be.complete(true); - }); - bridge.listen(7000, res -> { context.assertTrue(res.succeeded()); async.complete(); @@ -96,20 +94,37 @@ public void after(TestContext context) { @Test public void testSendVoidMessage(TestContext context) { // Send a request and get a response - NetClient client = vertx.createNetClient(new NetClientOptions().setSsl(true).setTrustAll(true) - .setKeyStoreOptions(new JksOptions().setPath("client.keystore").setPassword("wibble"))); + NetClient client = vertx.createNetClient(new NetClientOptions() + .setSsl(true) + .setTrustStoreOptions(sslKeyPairCerts.getClientTrustStore()) + .setKeyStoreOptions(sslKeyPairCerts.getClientKeyStore())); final Async async = context.async(); - vertx.eventBus().consumer("test", (Message msg) -> { client.close(); async.complete(); }); - client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); - NetSocket socket = conn.result(); + FrameHelper.sendFrame("send", "test", new JsonObject().put("value", "vert.x"), socket); + }); + } + @Test + public void testSendVoidMessageTrustAll(TestContext context) { + NetClient client = vertx.createNetClient(new NetClientOptions() + .setSsl(true) + .setTrustAll(true) + .setKeyStoreOptions(sslKeyPairCerts.getClientKeyStore()) + ); + final Async async = context.async(); + vertx.eventBus().consumer("test", (Message msg) -> { + client.close(); + async.complete(); + }); + client.connect(7000, "localhost", conn -> { + context.assertFalse(conn.failed()); + NetSocket socket = conn.result(); FrameHelper.sendFrame("send", "test", new JsonObject().put("value", "vert.x"), socket); }); } diff --git a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeTest.java b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeTest.java index 82ad559..7ef6404 100644 --- a/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeTest.java +++ b/src/test/java/io/vertx/ext/eventbus/bridge/tcp/TcpEventBusBridgeTest.java @@ -91,6 +91,24 @@ public void testSendVoidMessage(TestContext context) { })); } + @Test + public void testSendVoidStringMessage(TestContext context) { + // Send a request and get a response + NetClient client = vertx.createNetClient(); + final Async async = context.async(); + + vertx.eventBus().consumer("test", (Message msg) -> { + context.assertTrue(msg.body() instanceof String); + context.assertEquals("I'm not a JSON Object", msg.body()); + client.close(); + async.complete(); + }); + + client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> { + FrameHelper.sendFrame("send", "test", "I'm not a JSON Object", socket); + })); + } + @Test public void testNoHandlers(TestContext context) { // Send a request and get a response @@ -359,6 +377,42 @@ public void testReplyFromClient(TestContext context) { } + @Test + public void testReplyStringMessageFromClient(TestContext context) { + // Send a request from java and get a response from the client + NetClient client = vertx.createNetClient(); + final Async async = context.async(); + final String address = "test"; + client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> { + + final FrameParser parser = new FrameParser(parse -> { + context.assertTrue(parse.succeeded()); + JsonObject frame = parse.result(); + if ("message".equals(frame.getString("type"))) { + context.assertEquals(true, frame.getBoolean("send")); + context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value")); + FrameHelper.sendFrame("send", frame.getString("replyAddress"), "You got it", socket); + } + }); + + socket.handler(parser); + + FrameHelper.sendFrame("register", address, null, socket); + + // There is now way to know that the register actually happened, wait a bit before sending. + vertx.setTimer( 500L, timerId -> { + vertx.eventBus().request(address, new JsonObject().put("value", "Vert.x"), respMessage -> { + context.assertTrue(respMessage.succeeded()); + context.assertEquals("You got it", respMessage.result().body()); + client.close(); + async.complete(); + }); + }); + + })); + + } + @Test public void testFailFromClient(TestContext context) { // Send a request from java and get a response from the client diff --git a/src/test/resources/.gitkeep b/src/test/resources/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/client.keystore b/src/test/resources/client.keystore deleted file mode 100644 index 4eda90e..0000000 Binary files a/src/test/resources/client.keystore and /dev/null differ diff --git a/src/test/resources/client.truststore b/src/test/resources/client.truststore deleted file mode 100644 index a00ea99..0000000 Binary files a/src/test/resources/client.truststore and /dev/null differ diff --git a/src/test/resources/server.keystore b/src/test/resources/server.keystore deleted file mode 100644 index c0d8843..0000000 Binary files a/src/test/resources/server.keystore and /dev/null differ diff --git a/src/test/resources/server.truststore b/src/test/resources/server.truststore deleted file mode 100644 index 8d57af4..0000000 Binary files a/src/test/resources/server.truststore and /dev/null differ