Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
public interface EapClient {
/**
* 异步连接到服务器
* @param host 服务器地址
* @param port 服务器端口
* @return a Mono that completes when the connection is established or errors.
*/
Mono<Void> connect(String host, int port);
Mono<Void> connect();

/**
* 断开连接
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.aisde8.eap.connect.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class EapClientManager {

private static final Logger logger = LoggerFactory.getLogger(EapClientManager.class);

private final Map<String, EapClient> clients = new ConcurrentHashMap<>();

public void addClient(String host, int port, EapClient client) {
String key = generateKey(host, port);
clients.put(key, client);
logger.info("Client added: {}", key);
}

public void removeClient(String host, int port) {
String key = generateKey(host, port);
clients.remove(key);
logger.info("Client removed: {}", key);
}

public EapClient getClient(String host, int port) {
String key = generateKey(host, port);
return clients.get(key);
}

public Map<String, EapClient> getAllClients() {
return clients;
}

private String generateKey(String host, int port) {
return host + ":" + port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.aisde8.eap.connect.client.hsms;

import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class ClientOption {

private String host;

private int port;

private int deviceId;

@Builder.Default
private TimeConfig timeConfig = new TimeConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.github.aisde8.eap.connect.client.hsms;

import com.github.aisde8.eap.connect.client.EapClient;
import com.github.aisde8.eap.connect.client.EapClientManager;
import com.github.aside8.eap.protocol.Message;
import com.github.aside8.eap.protocol.hsms.HsmsMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class HsmsClient implements EapClient {
private static final Logger logger = LoggerFactory.getLogger(HsmsClient.class);

private EventLoopGroup group;

private volatile Channel channel;

private final ClientOption clientOption;

private final AtomicInteger systemBytesGenerator = new AtomicInteger(0);

private final Map<Integer, MonoSink<HsmsMessage>> pendingReplies = new ConcurrentHashMap<>();

private final Sinks.Many<Message> messageSink = Sinks.many().multicast().onBackpressureBuffer();

private final EapClientManager eapClientManager;

public HsmsClient(ClientOption clientOption, EapClientManager eapClientManager) {
this.clientOption = clientOption;
this.eapClientManager = eapClientManager;
}

@Override
public Mono<Void> connect() {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getTimeConfig().getT4() * 1000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthField4FrameDecoder());
pipeline.addLast(new HsmsMessageDecoder());

pipeline.addLast(new HsmsMessageEncoder());
pipeline.addLast(new LengthField4FrameEncoder());
pipeline.addLast(new HsmsClientLogicHandler(pendingReplies, messageSink, systemBytesGenerator));
}
});

ChannelFuture future = bootstrap.connect(clientOption.getHost(), clientOption.getPort());
return Mono.create(sink -> future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
channel = f.channel();
eapClientManager.addClient(clientOption.getHost(), clientOption.getPort(), this);
sink.success();
} else {
f.channel().close();
sink.error(f.cause());
}
}));
}

@Override
public Mono<Void> disconnect() {
return Mono.create(sink -> {
if (group == null) {
sink.success();
return;
}

eapClientManager.removeClient(clientOption.getHost(), clientOption.getPort());
pendingReplies.forEach((id, replySink) -> replySink.error(new IllegalStateException("Client Disconnected")));
pendingReplies.clear();

group.shutdownGracefully().addListener(future -> {
if (future.isSuccess()) {
sink.success();
} else {
sink.error(future.cause());
}
});
});
}


@Override
public Flux<Message> receive() {
return messageSink.asFlux();
}

@Override
public Mono<Void> send(Message message) {
if (!isConnected()) {
return Mono.error(new IllegalStateException("Not connected"));
}
if (!(message instanceof HsmsMessage hsmsMessage)) {
return Mono.error(new IllegalArgumentException("Request must be an instance of HsmsMessage"));
}

return Mono.create(sink -> {
if (hsmsMessage.isRequest()) {
hsmsMessage.setSystemBytes(systemBytesGenerator.incrementAndGet());
}
channel.writeAndFlush(hsmsMessage).addListener(f -> {
if (f.isSuccess()) {
sink.success();
} else {
sink.error(f.cause());
}
});
});
}

@Override
public Mono<Message> sendRequest(Message request) {
if (!isConnected()) {
return Mono.error(new IllegalStateException("Not connected"));
}
if (!(request instanceof HsmsMessage hsmsRequest)) {
return Mono.error(new IllegalArgumentException("Request must be an instance of HsmsMessage"));
}

return Mono.<HsmsMessage>create(sink -> {
int systemBytes = systemBytesGenerator.incrementAndGet();
hsmsRequest.setSystemBytes(systemBytes);
sink.onDispose(() -> pendingReplies.remove(systemBytes));
pendingReplies.put(systemBytes, sink);
channel.writeAndFlush(hsmsRequest).addListener(future -> {
if (!future.isSuccess()) {
pendingReplies.remove(systemBytes);
sink.error(future.cause());
}
});
}).timeout(Duration.ofMillis(clientOption.getTimeConfig().getT4() * 1000L)).cast(Message.class);
}

@Override
public boolean isConnected() {
return channel != null && channel.isActive();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.aisde8.eap.connect.client.hsms;

import com.github.aside8.eap.protocol.Message;
import com.github.aside8.eap.protocol.hsms.HsmsMessage;
import com.github.aside8.eap.protocol.hsms.HsmsMessages;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class HsmsClientLogicHandler extends SimpleChannelInboundHandler<HsmsMessage> {

private static final Logger logger = LoggerFactory.getLogger(HsmsClientLogicHandler.class);

private final Map<Integer, MonoSink<HsmsMessage>> pendingReplies;

private final Sinks.Many<Message> messageSink;

private final AtomicInteger systemBytesGenerator;

private ScheduledFuture<?> linkTestFuture;

public HsmsClientLogicHandler(Map<Integer, MonoSink<HsmsMessage>> pendingReplies,
Sinks.Many<Message> messageSink,
AtomicInteger systemBytesGenerator) {

this.pendingReplies = pendingReplies;
this.messageSink = messageSink;
this.systemBytesGenerator = systemBytesGenerator;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HsmsMessage msg) throws Exception {
int systemBytes = msg.getSystemBytes();
MonoSink<HsmsMessage> sink = pendingReplies.remove(systemBytes);
if (sink != null) {
// 找到了对应的请求,完成 Mono
sink.success(msg);
} else {
// 没找到,这是一个服务器主动推送的非请求消息
messageSink.tryEmitNext(msg);
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("HSMS Channel Active: {} -> {}", ctx.channel().localAddress(), ctx.channel().remoteAddress());

// 发送 SELECT_REQ 消息
ctx.writeAndFlush(HsmsMessages.selectReq(systemBytesGenerator.incrementAndGet()));

// 启动定时器发送 LINK_TEST_REQ 消息
linkTestFuture = ctx.executor().scheduleAtFixedRate(() -> ctx.writeAndFlush(HsmsMessages.linkTestReq(systemBytesGenerator.incrementAndGet())), 3, 3, TimeUnit.SECONDS);
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("HSMS Channel Inactive: {} -> {}", ctx.channel().localAddress(), ctx.channel().remoteAddress());

// 取消定时器
if (linkTestFuture != null) {
linkTestFuture.cancel(false);
}

// 连接断开时,所有等待中的 Mono 都应失败
pendingReplies.forEach((id, sink) -> sink.error(new ChannelException("Channel disconnected unexpectedly.")));
pendingReplies.clear();
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("HSMS Client Handler caught exception: {}", cause.getMessage(), cause);

// 异常发生时,所有等待中的 Mono 都应失败
pendingReplies.forEach((id, sink) -> sink.error(cause));
pendingReplies.clear();
ctx.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.aisde8.eap.connect.client.hsms;

import com.github.aside8.eap.protocol.hsms.HsmsMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

public class HsmsMessageDecoder extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
HsmsMessage hsmsMessage = new HsmsMessage();
hsmsMessage.decode(byteBuf);
out.add(hsmsMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.github.aisde8.eap.connect.client.hsms;

import com.github.aside8.eap.protocol.hsms.HsmsMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

public class HsmsMessageEncoder extends MessageToMessageEncoder<HsmsMessage> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, HsmsMessage hsmsMessage, List<Object> out) throws Exception {
out.add(hsmsMessage.encode(channelHandlerContext.alloc()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.github.aisde8.eap.connect.client.hsms;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class LengthField4FrameDecoder extends LengthFieldBasedFrameDecoder {

public LengthField4FrameDecoder() {
super(1024 * 1024, 0, 4, 0, 4);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.github.aisde8.eap.connect.client.hsms;

import io.netty.handler.codec.LengthFieldPrepender;

public class LengthField4FrameEncoder extends LengthFieldPrepender {

public LengthField4FrameEncoder() {
super(4, 0);
}
}
Loading