Skip to content

Commit 152d74b

Browse files
authored
feat(logcache): limit max block count (#2982) (#2983)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 55c409e commit 152d74b

File tree

1 file changed

+22
-34
lines changed

1 file changed

+22
-34
lines changed

s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public class LogCache {
5757
private static final int DEFAULT_MAX_BLOCK_STREAM_COUNT = 10000;
5858
private static final Consumer<LogCacheBlock> DEFAULT_BLOCK_FREE_LISTENER = block -> {
5959
};
60+
private static final int MAX_BLOCKS_COUNT = 64;
6061
final List<LogCacheBlock> blocks = new ArrayList<>();
62+
final AtomicInteger blockCount = new AtomicInteger(1);
6163
private final long capacity;
6264
private final long cacheBlockMaxSize;
6365
private final int maxCacheBlockStreamCount;
@@ -214,6 +216,7 @@ public LogCacheBlock archiveCurrentBlock() {
214216
block.lastRecordOffset = lastRecordOffset;
215217
activeBlock = new LogCacheBlock(cacheBlockMaxSize, maxCacheBlockStreamCount);
216218
blocks.add(activeBlock);
219+
blockCount.set(blocks.size());
217220
return block;
218221
} finally {
219222
writeLock.unlock();
@@ -249,11 +252,12 @@ Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
249252
public void markFree(LogCacheBlock block) {
250253
block.free = true;
251254
tryRealFree();
255+
tryMerge();
252256
}
253257

254258
private void tryRealFree() {
255259
long currSize = size.get();
256-
if (currSize <= capacity * 0.9) {
260+
if (currSize <= capacity * 0.9 && blockCount.get() <= MAX_BLOCKS_COUNT) {
257261
return;
258262
}
259263
List<LogCacheBlock> removed = new ArrayList<>();
@@ -264,21 +268,35 @@ private void tryRealFree() {
264268
currSize = size.get();
265269
Iterator<LogCacheBlock> iter = blocks.iterator();
266270
while (iter.hasNext()) {
267-
if (currSize - freeSize <= capacity * 0.9) {
271+
LogCacheBlock block = iter.next();
272+
if (blockCount.get() <= MAX_BLOCKS_COUNT && currSize - freeSize <= capacity * 0.9) {
268273
break;
269274
}
270-
LogCacheBlock block = iter.next();
271275
if (block.free) {
272276
iter.remove();
273277
freeSize += block.size();
274278
removed.add(block);
279+
blockCount.decrementAndGet();
275280
} else {
276281
break;
277282
}
278283
}
284+
} finally {
285+
writeLock.unlock();
286+
}
287+
size.addAndGet(-freeSize);
288+
removed.forEach(b -> {
289+
blockFreeListener.accept(b);
290+
b.free();
291+
});
292+
}
293+
294+
private void tryMerge() {
295+
writeLock.lock();
296+
try {
279297
// merge blocks to speed up the get.
280298
LogCacheBlock mergedBlock = null;
281-
iter = blocks.iterator();
299+
Iterator<LogCacheBlock> iter = blocks.iterator();
282300
while (iter.hasNext()) {
283301
LogCacheBlock block = iter.next();
284302
if (!block.free) {
@@ -296,36 +314,6 @@ private void tryRealFree() {
296314
} finally {
297315
writeLock.unlock();
298316
}
299-
size.addAndGet(-freeSize);
300-
removed.forEach(b -> {
301-
blockFreeListener.accept(b);
302-
b.free();
303-
});
304-
}
305-
306-
public int forceFree(int required) {
307-
AtomicInteger freedBytes = new AtomicInteger();
308-
List<LogCacheBlock> removed = new ArrayList<>();
309-
writeLock.lock();
310-
try {
311-
blocks.removeIf(block -> {
312-
if (!block.free || freedBytes.get() >= required) {
313-
return false;
314-
}
315-
long blockSize = block.size();
316-
size.addAndGet(-blockSize);
317-
freedBytes.addAndGet((int) blockSize);
318-
removed.add(block);
319-
return true;
320-
});
321-
} finally {
322-
writeLock.unlock();
323-
}
324-
removed.forEach(b -> {
325-
blockFreeListener.accept(b);
326-
b.free();
327-
});
328-
return freedBytes.get();
329317
}
330318

331319
public void setLastRecordOffset(RecordOffset lastRecordOffset) {

0 commit comments

Comments
 (0)