Skip to content

Commit f22c8ae

Browse files
authored
feat(net): optimize fetch inventory message processing logic (#5895)
* feat(net): optimize fetch inventory message broadcast processing logic * feat(net): solve checkstyle problem * add the missing code in the code conflict
1 parent af0e59f commit f22c8ae

File tree

2 files changed

+86
-44
lines changed

2 files changed

+86
-44
lines changed

framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
public class FetchInvDataMsgHandler implements TronMsgHandler {
3939

4040
private volatile Cache<Long, Boolean> epochCache = CacheBuilder.newBuilder().initialCapacity(100)
41-
.maximumSize(1000).expireAfterWrite(1, TimeUnit.HOURS).build();
41+
.maximumSize(1000).expireAfterWrite(1, TimeUnit.HOURS).build();
4242

4343
private static final int MAX_SIZE = 1_000_000;
4444
@Autowired
@@ -55,7 +55,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
5555

5656
FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg;
5757

58-
check(peer, fetchInvDataMsg);
58+
boolean isAdv = isAdvInv(peer, fetchInvDataMsg);
59+
60+
check(peer, fetchInvDataMsg, isAdv);
5961

6062
InventoryType type = fetchInvDataMsg.getInventoryType();
6163
List<Transaction> transactions = Lists.newArrayList();
@@ -64,6 +66,15 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
6466

6567
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
6668
Item item = new Item(hash, type);
69+
/* Cache the Inventory sent to the peer.
70+
Once a FetchInvData message is received from the peer, remove this Inventory from the cache.
71+
If the same FetchInvData request is received from the peer again and it is
72+
no longer in the cache, then reject the request.
73+
* */
74+
if (isAdv) {
75+
peer.getAdvInvSpread().invalidate(item);
76+
}
77+
6778
Message message = advService.getMessage(item);
6879
if (message == null) {
6980
try {
@@ -84,7 +95,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
8495
} else {
8596
transactions.add(((TransactionMessage) message).getTransactionCapsule().getInstance());
8697
size += ((TransactionMessage) message).getTransactionCapsule().getInstance()
87-
.getSerializedSize();
98+
.getSerializedSize();
8899
if (size > MAX_SIZE) {
89100
peer.sendMessage(new TransactionsMessage(transactions));
90101
transactions = Lists.newArrayList();
@@ -104,16 +115,16 @@ private void sendPbftCommitMessage(PeerConnection peer, BlockCapsule blockCapsul
104115
}
105116
long epoch = 0;
106117
PbftSignCapsule pbftSignCapsule = tronNetDelegate
107-
.getBlockPbftCommitData(blockCapsule.getNum());
118+
.getBlockPbftCommitData(blockCapsule.getNum());
108119
long maintenanceTimeInterval = consensusDelegate.getDynamicPropertiesStore()
109-
.getMaintenanceTimeInterval();
120+
.getMaintenanceTimeInterval();
110121
if (pbftSignCapsule != null) {
111122
Raw raw = Raw.parseFrom(pbftSignCapsule.getPbftCommitResult().getData());
112123
epoch = raw.getEpoch();
113124
peer.sendMessage(new PbftCommitMessage(pbftSignCapsule));
114125
} else {
115-
epoch =
116-
(blockCapsule.getTimeStamp() / maintenanceTimeInterval + 1) * maintenanceTimeInterval;
126+
epoch = (blockCapsule.getTimeStamp() / maintenanceTimeInterval + 1)
127+
* maintenanceTimeInterval;
117128
}
118129
if (epochCache.getIfPresent(epoch) == null) {
119130
PbftSignCapsule srl = tronNetDelegate.getSRLPbftCommitData(epoch);
@@ -127,7 +138,21 @@ private void sendPbftCommitMessage(PeerConnection peer, BlockCapsule blockCapsul
127138
}
128139
}
129140

130-
private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) throws P2pException {
141+
public boolean isAdvInv(PeerConnection peer, FetchInvDataMessage msg) {
142+
MessageTypes type = msg.getInvMessageType();
143+
if (type == MessageTypes.TRX) {
144+
return true;
145+
}
146+
for (Sha256Hash hash : msg.getHashList()) {
147+
if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.BLOCK)) == null) {
148+
return false;
149+
}
150+
}
151+
return true;
152+
}
153+
154+
private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg,
155+
boolean isAdv) throws P2pException {
131156
MessageTypes type = fetchInvDataMsg.getInvMessageType();
132157

133158
if (type == MessageTypes.TRX) {
@@ -144,46 +169,38 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
144169
+ "maxCount: {}, fetchCount: {}, peer: {}",
145170
maxCount, fetchCount, peer.getInetAddress());
146171
}
147-
} else {
148-
boolean isAdv = true;
149-
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
150-
if (peer.getAdvInvSpread().getIfPresent(new Item(hash, InventoryType.BLOCK)) == null) {
151-
isAdv = false;
152-
break;
153-
}
172+
}
173+
174+
if (!isAdv) {
175+
if (!peer.isNeedSyncFromUs()) {
176+
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
154177
}
155-
if (!isAdv) {
156-
if (!peer.isNeedSyncFromUs()) {
157-
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
158-
}
159-
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
160-
throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType()
161-
+ " message exceeds the rate limit");
178+
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
179+
throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType()
180+
+ " message exceeds the rate limit");
181+
}
182+
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
183+
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:"
184+
+ fetchInvDataMsg.getHashList().size());
185+
}
186+
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
187+
long blockNum = new BlockId(hash).getNum();
188+
long minBlockNum =
189+
peer.getLastSyncBlockId().getNum() - 2 * NetConstants.SYNC_FETCH_BATCH_NUM;
190+
if (blockNum < minBlockNum) {
191+
throw new P2pException(TypeEnum.BAD_MESSAGE,
192+
"minBlockNum: " + minBlockNum + ", blockNum: " + blockNum);
162193
}
163-
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
164-
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:"
165-
+ fetchInvDataMsg.getHashList().size());
194+
if (blockNum > peer.getLastSyncBlockId().getNum()) {
195+
throw new P2pException(TypeEnum.BAD_MESSAGE,
196+
"maxBlockNum: " + peer.getLastSyncBlockId().getNum() + ", blockNum: " + blockNum);
166197
}
167-
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
168-
long blockNum = new BlockId(hash).getNum();
169-
long minBlockNum =
170-
peer.getLastSyncBlockId().getNum() - 2 * NetConstants.SYNC_FETCH_BATCH_NUM;
171-
if (blockNum < minBlockNum) {
172-
throw new P2pException(TypeEnum.BAD_MESSAGE,
173-
"minBlockNum: " + minBlockNum + ", blockNum: " + blockNum);
174-
}
175-
if (blockNum > peer.getLastSyncBlockId().getNum()) {
176-
throw new P2pException(TypeEnum.BAD_MESSAGE,
177-
"maxBlockNum: " + peer.getLastSyncBlockId().getNum() + ", blockNum: " + blockNum);
178-
}
179-
if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) {
180-
throw new P2pException(TypeEnum.BAD_MESSAGE,
181-
new BlockId(hash).getString() + " is exist");
182-
}
183-
peer.getSyncBlockIdCache().put(hash, System.currentTimeMillis());
198+
if (peer.getSyncBlockIdCache().getIfPresent(hash) != null) {
199+
throw new P2pException(TypeEnum.BAD_MESSAGE,
200+
new BlockId(hash).getString() + " is exist");
184201
}
202+
peer.getSyncBlockIdCache().put(hash, System.currentTimeMillis());
185203
}
186204
}
187205
}
188-
189-
}
206+
}

framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,31 @@ public void testProcessMessage() throws Exception {
6767
Assert.assertNotNull(syncBlockIdCache.getIfPresent(blockId));
6868
}
6969

70+
@Test
71+
public void testIsAdvInv() {
72+
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();
73+
74+
List<Sha256Hash> list = new LinkedList<>();
75+
list.add(Sha256Hash.ZERO_HASH);
76+
FetchInvDataMessage msg =
77+
new FetchInvDataMessage(list, Protocol.Inventory.InventoryType.TRX);
78+
79+
boolean isAdv = fetchInvDataMsgHandler.isAdvInv(null, msg);
80+
Assert.assertTrue(isAdv);
81+
82+
PeerConnection peer = Mockito.mock(PeerConnection.class);
83+
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().build();
84+
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
85+
86+
msg = new FetchInvDataMessage(list, Protocol.Inventory.InventoryType.BLOCK);
87+
isAdv = fetchInvDataMsgHandler.isAdvInv(peer, msg);
88+
Assert.assertTrue(!isAdv);
89+
90+
advInvSpread.put(new Item(Sha256Hash.ZERO_HASH, Protocol.Inventory.InventoryType.BLOCK), 1L);
91+
isAdv = fetchInvDataMsgHandler.isAdvInv(peer, msg);
92+
Assert.assertTrue(isAdv);
93+
}
94+
7095
@Test
7196
public void testSyncFetchCheck() {
7297
BlockCapsule.BlockId blockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000L);

0 commit comments

Comments
 (0)