Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ private long getCachedFileLen() throws IOException {
}
}

private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {
private BCFile.Reader getBCFile(Supplier<byte[]> cachedMetadataSupplier) throws IOException {

BCFile.Reader reader = bcfr.get();
if (reader == null) {
RateLimitedInputStream fsIn =
new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter);
BCFile.Reader tmpReader = null;
byte[] serializedMetadata = cachedMetadataSupplier.get();
if (serializedMetadata == null) {
if (fileLenCache == null) {
tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, cryptoService);
Expand Down Expand Up @@ -221,15 +222,19 @@ private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {
}

private BCFile.Reader getBCFile() throws IOException {
BlockCache _iCache = cacheProvider.getIndexCache();
if (_iCache != null) {
CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
if (mce != null) {
return getBCFile(mce.getBuffer());

Supplier<byte[]> cachedMetadataSupplier = () -> {
BlockCache _iCache = cacheProvider.getIndexCache();
if (_iCache != null) {
CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
if (mce != null) {
return mce.getBuffer();
}
}
}
return null;
};

return getBCFile(null);
return getBCFile(cachedMetadataSupplier);
}

private class BCFileLoader implements Loader {
Expand All @@ -242,7 +247,7 @@ public Map<String,Loader> getDependencies() {
@Override
public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
try {
return getBCFile(null).serializeMetadata(maxSize);
return getBCFile(() -> null).serializeMetadata(maxSize);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -348,7 +353,7 @@ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
if (reader == null) {
if (loadingMetaBlock) {
byte[] serializedMetadata = dependencies.get(cacheId + ROOT_BLOCK_NAME);
reader = getBCFile(serializedMetadata);
reader = getBCFile(() -> serializedMetadata);
} else {
reader = getBCFile();
}
Expand Down Expand Up @@ -407,7 +412,7 @@ public CachedBlockRead getMetaBlock(String blockName) throws IOException {
}
}

BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
BlockReader _currBlock = getBCFile(() -> null).getMetaBlock(blockName);
incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}
Expand All @@ -424,7 +429,7 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi
}
}

BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize);
BlockReader _currBlock = getBCFile(() -> null).getDataBlock(offset, compressedSize, rawSize);
incrementCacheBypass(CacheType.INDEX);
return new CachedBlockRead(_currBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ public void stopCollector() throws Exception {

@Test
public void test() throws Exception {
var names = getUniqueNames(7);
var names = getUniqueNames(8);
runTest(names[0], 0, false, false, -1, -1, -1);
runTest(names[1], 10, false, false, -1, -1, -1);
runTest(names[2], 0, true, false, -1, -1, -1);
runTest(names[3], 0, false, false, -1, -1, 2);
runTest(names[4], 0, false, false, 32, 256, -1);
runTest(names[5], 0, true, true, 32, 256, -1);
runTest(names[6], 0, true, false, -1, -1, 2);
// when the tables tablets are spread across two tablet servers, then all the tables data will
// fit in cache
runTest(names[2], 10, true, true, -1, -1, -1);
runTest(names[3], 0, true, false, -1, -1, -1);
runTest(names[4], 0, false, false, -1, -1, 2);
runTest(names[5], 0, false, false, 32, 256, -1);
runTest(names[6], 0, true, true, 32, 256, -1);
runTest(names[7], 0, true, false, -1, -1, 2);
}

private void runTest(String tableName, int numSplits, boolean cacheData,
Expand Down Expand Up @@ -239,15 +242,14 @@ private void runTest(String tableName, int numSplits, boolean cacheData,
if (stats.getFileBytesRead() == 0) {
assertEquals(0L, stats.getDataCacheMisses(), stats::toString);
}
// When caching data, does not seem to hit the cache much
var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses();
assertTrue(cacheSum == 0 || cacheSum == 1, stats::toString);
} else {
assertEquals(0, stats.getDataCacheHits(), stats::toString);
assertEquals(0, stats.getDataCacheMisses(), stats::toString);
assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(), stats::toString);
assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(), .05);
}
// May see rfile metadata reads for each tablet
var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses();
assertTrue(cacheSum <= (numSplits + 1) * 2L, stats::toString);
assertEquals(0, stats.getIndexCacheBypasses(), stats::toString);
}

Expand Down