Skip to content

Commit ee8f9ae

Browse files
committed
optimize blockLock; MessageQueue does not send message if channel is disconnected
1 parent cdad6e1 commit ee8f9ae

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

framework/src/main/java/org/tron/common/overlay/server/ChannelManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
@Component
3535
public class ChannelManager {
3636

37-
private final Map<ByteArrayWrapper, Channel> activePeers = new ConcurrentHashMap<>();
37+
private final Map<ByteArrayWrapper, Channel> activeChannels = new ConcurrentHashMap<>();
3838
@Autowired
3939
private PeerServer peerServer;
4040
@Autowired
@@ -121,7 +121,7 @@ public void processDisconnect(Channel channel, ReasonCode reason) {
121121

122122
public void notifyDisconnect(Channel channel) {
123123
syncPool.onDisconnect(channel);
124-
activePeers.values().remove(channel);
124+
activeChannels.values().remove(channel);
125125
if (channel != null) {
126126
if (channel.getNodeStatistics() != null) {
127127
channel.getNodeStatistics().notifyDisconnect();
@@ -146,7 +146,7 @@ public synchronized boolean processPeer(Channel peer) {
146146
return false;
147147
}
148148

149-
if (!peer.isActive() && activePeers.size() >= maxActivePeers) {
149+
if (!peer.isActive() && activeChannels.size() >= maxActivePeers) {
150150
peer.disconnect(TOO_MANY_PEERS);
151151
return false;
152152
}
@@ -157,7 +157,7 @@ public synchronized boolean processPeer(Channel peer) {
157157
}
158158
}
159159

160-
Channel channel = activePeers.get(peer.getNodeIdWrapper());
160+
Channel channel = activeChannels.get(peer.getNodeIdWrapper());
161161
if (channel != null) {
162162
if (channel.getStartTime() > peer.getStartTime()) {
163163
logger.info("Disconnect connection established later, {}", channel.getNode());
@@ -167,14 +167,14 @@ public synchronized boolean processPeer(Channel peer) {
167167
return false;
168168
}
169169
}
170-
activePeers.put(peer.getNodeIdWrapper(), peer);
171-
logger.info("Add active peer {}, total active peers: {}", peer, activePeers.size());
170+
activeChannels.put(peer.getNodeIdWrapper(), peer);
171+
logger.info("Add active peer {}, total active peers: {}", peer, activeChannels.size());
172172
return true;
173173
}
174174

175175
public int getConnectionNum(InetAddress inetAddress) {
176176
int cnt = 0;
177-
for (Channel channel : activePeers.values()) {
177+
for (Channel channel : activeChannels.values()) {
178178
if (channel.getInetAddress().equals(inetAddress)) {
179179
cnt++;
180180
}
@@ -183,7 +183,7 @@ public int getConnectionNum(InetAddress inetAddress) {
183183
}
184184

185185
public Collection<Channel> getActivePeers() {
186-
return activePeers.values();
186+
return activeChannels.values();
187187
}
188188

189189
public Cache<InetAddress, ReasonCode> getRecentlyDisconnected() {

framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public void activate(ChannelHandlerContext ctx) {
6969
continue;
7070
}
7171
Message msg = msgQueue.take();
72+
if (channel.isDisconnect()) {
73+
logger.warn("Failed to send to {} as channel has closed, {}",
74+
ctx.channel().remoteAddress(), msg);
75+
return;
76+
}
7277
ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> {
7378
if (!future.isSuccess() && !channel.isDisconnect()) {
7479
logger.warn("Failed to send to {}, {}", ctx.channel().remoteAddress(), msg);
@@ -92,6 +97,11 @@ public void setChannel(Channel channel) {
9297
}
9398

9499
public void fastSend(Message msg) {
100+
if (channel.isDisconnect()) {
101+
logger.warn("Fast send to {} failed as channel has closed, {} ",
102+
ctx.channel().remoteAddress(), msg);
103+
return;
104+
}
95105
logger.info("Fast send to {}, {} ", ctx.channel().remoteAddress(), msg);
96106
ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> {
97107
if (!future.isSuccess() && !channel.isDisconnect()) {

framework/src/main/java/org/tron/core/net/service/SyncService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ private synchronized void handleSyncBlock() {
234234

235235
isProcessed[0] = false;
236236

237-
synchronized (tronNetDelegate.getBlockLock()) {
238-
blockWaitToProcess.forEach((msg, peerConnection) -> {
237+
blockWaitToProcess.forEach((msg, peerConnection) -> {
238+
synchronized (tronNetDelegate.getBlockLock()) {
239239
if (peerConnection.isDisconnect()) {
240240
blockWaitToProcess.remove(msg);
241241
invalid(msg.getBlockId());
@@ -254,8 +254,8 @@ private synchronized void handleSyncBlock() {
254254
isProcessed[0] = true;
255255
processSyncBlock(msg.getBlockCapsule());
256256
}
257-
});
258-
}
257+
}
258+
});
259259
}
260260
}
261261

0 commit comments

Comments
 (0)