From 5d0487213e6fd3b4080dadae02ca4f060b2b3133 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 00:09:32 +0000 Subject: [PATCH 01/23] adds per scan tracing statistics Added per scan tracing statistics for the following : * The count of compressed bytes read from DFS (when all data is read from cache this will be zero). * The count of uncompressed bytes read from DFS or cache * The count of bytes returned (after iterators filter) * The count of key/values read before any filtering * The count of key/values returned after filtering * The count of seeks done * Statistics for cache hits, missed, and bypasses. A bypass is when a rfile block is read w/o using the cache. The statistics are included in a tracing span that wraps reading each each batch of key values in tablet or scan server. So if a scan reads 10 batches of key values, then 10 spans will be emitted for tracing data. Each span will included the statistics for that batch. --- .../file/blockfile/cache/BlockCacheUtil.java | 38 +++++ .../cache/InstrumentedBlockCache.java | 107 ++++++++++++++ .../blockfile/impl/CachableBlockFile.java | 54 +++++++- .../blockfile/impl/ScanCacheProvider.java | 6 +- .../accumulo/core/file/rfile/RFile.java | 10 +- .../core/file/rfile/bcfile/BCFile.java | 29 +++- .../iteratorsImpl/system/StatsIterator.java | 40 +++--- .../accumulo/core/summary/SummaryReader.java | 6 +- .../core/trace/ScanInstrumentation.java | 131 ++++++++++++++++++ .../core/util/CountingInputStream.java | 107 ++++++++++++++ .../accumulo/tserver/TabletHostingServer.java | 3 + .../apache/accumulo/tserver/TabletServer.java | 5 +- .../tserver/tablet/ScanDataSource.java | 27 +++- .../accumulo/tserver/tablet/Scanner.java | 2 +- .../accumulo/tserver/tablet/Tablet.java | 5 +- .../accumulo/tserver/tablet/TabletBase.java | 99 ++++++++++++- 16 files changed, 629 insertions(+), 40 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java create mode 100644 core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java create mode 100644 core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java create mode 100644 core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java new file mode 100644 index 00000000000..53c2ce9e7a3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.logging.LoggingBlockCache; +import org.apache.accumulo.core.spi.cache.BlockCache; +import org.apache.accumulo.core.spi.cache.CacheType; + +public class BlockCacheUtil { + public static BlockCache instrument(CacheType type, BlockCache cache) { + if (cache == null) { + return null; + } + + if (cache instanceof InstrumentedBlockCache || cache instanceof LoggingBlockCache) { + // its already instrumented + return cache; + } + + return LoggingBlockCache.wrap(type, InstrumentedBlockCache.wrap(type, cache)); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java new file mode 100644 index 00000000000..a46e0b2353d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.Map; + +import org.apache.accumulo.core.spi.cache.BlockCache; +import org.apache.accumulo.core.spi.cache.CacheEntry; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.trace.ScanInstrumentation; + +public class InstrumentedBlockCache implements BlockCache { + + private final BlockCache blockCache; + private final ScanInstrumentation scanInstrumentation; + private final CacheType cacheType; + + public InstrumentedBlockCache(CacheType cacheType, BlockCache blockCache, + ScanInstrumentation scanInstrumentation) { + this.blockCache = blockCache; + this.scanInstrumentation = scanInstrumentation; + this.cacheType = cacheType; + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf) { + return blockCache.cacheBlock(blockName, buf); + } + + @Override + public CacheEntry getBlock(String blockName) { + return blockCache.getBlock(blockName); + } + + private final class CountingLoader implements Loader { + + private final Loader loader; + int loadCount = 0; + + private CountingLoader(Loader loader) { + this.loader = loader; + } + + @Override + public Map getDependencies() { + return loader.getDependencies(); + } + + @Override + public byte[] load(int maxSize, Map dependencies) { + loadCount++; + return loader.load(maxSize, dependencies); + } + } + + @Override + public CacheEntry getBlock(String blockName, Loader loader) { + var cl = new CountingLoader(loader); + var ce = blockCache.getBlock(blockName, cl); + if (cl.loadCount == 0 && ce != null) { + scanInstrumentation.incrementCacheHit(cacheType); + } else { + scanInstrumentation.incrementCacheMiss(cacheType); + } + return ce; + } + + @Override + public long getMaxHeapSize() { + return blockCache.getMaxHeapSize(); + } + + @Override + public long getMaxSize() { + return blockCache.getMaxSize(); + } + + @Override + public Stats getStats() { + return blockCache.getStats(); + } + + public static BlockCache wrap(CacheType cacheType, BlockCache cache) { + var si = ScanInstrumentation.get(); + if (cache != null && si != null) { + return new InstrumentedBlockCache(cacheType, cache, si); + } else { + return cache; + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 854c74154b5..b34cdf83929 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -38,7 +38,10 @@ import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCache.Loader; import org.apache.accumulo.core.spi.cache.CacheEntry; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.util.CountingInputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -50,6 +53,8 @@ import com.google.common.cache.Cache; +import io.opentelemetry.api.GlobalOpenTelemetry; + /** * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks * and metadatablocks @@ -405,11 +410,13 @@ public CachedBlockRead getMetaBlock(String blockName) throws IOException { } BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName); + incrementCacheBypass(CacheType.INDEX); return new CachedBlockRead(_currBlock); } public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException { + GlobalOpenTelemetry.get(); BlockCache _iCache = cacheProvider.getIndexCache(); if (_iCache != null) { String _lookup = this.cacheId + "R" + offset; @@ -421,6 +428,7 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi } BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize); + incrementCacheBypass(CacheType.INDEX); return new CachedBlockRead(_currBlock); } @@ -443,6 +451,7 @@ public CachedBlockRead getDataBlock(int blockIndex) throws IOException { } BlockReader _currBlock = getBCFile().getDataBlock(blockIndex); + incrementCacheBypass(CacheType.DATA); return new CachedBlockRead(_currBlock); } @@ -459,9 +468,17 @@ public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSi } BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize); + incrementCacheBypass(CacheType.DATA); return new CachedBlockRead(_currBlock); } + private void incrementCacheBypass(CacheType cacheType) { + var si = ScanInstrumentation.get(); + if (si != null) { + si.incrementCacheBypass(cacheType); + } + } + @Override public synchronized void close() throws IOException { if (closed) { @@ -491,12 +508,22 @@ public void setCacheProvider(CacheProvider cacheProvider) { } public static class CachedBlockRead extends DataInputStream { + + private static InputStream wrapForTrace(InputStream inputStream) { + var scanInstrumentation = ScanInstrumentation.get(); + if (scanInstrumentation != null) { + return new CountingInputStream(inputStream); + } else { + return inputStream; + } + } + private final SeekableByteArrayInputStream seekableInput; private final CacheEntry cb; boolean indexable; public CachedBlockRead(InputStream in) { - super(in); + super(wrapForTrace(in)); cb = null; seekableInput = null; indexable = false; @@ -507,7 +534,7 @@ public CachedBlockRead(CacheEntry cb, byte[] buf) { } private CachedBlockRead(SeekableByteArrayInputStream seekableInput, CacheEntry cb) { - super(seekableInput); + super(wrapForTrace(seekableInput)); this.seekableInput = seekableInput; this.cb = cb; indexable = true; @@ -536,5 +563,28 @@ public BlockIndex getIndex(Supplier indexSupplier) { public void indexWeightChanged() { cb.indexWeightChanged(); } + + public void flushStats() { + if (in instanceof CountingInputStream) { + var cin = ((CountingInputStream) in); + var si = ScanInstrumentation.get(); + if (si != null) { + si.incrementUncompressedBytesRead(cin.getCount()); + } + cin.resetCount(); + var src = cin.getWrappedStream(); + if (src instanceof BlockReader) { + var br = (BlockReader) src; + br.flushStats(); + } + + } + } + + @Override + public void close() throws IOException { + flushStats(); + super.close(); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java index 78b8600b378..155b8743264 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java @@ -20,7 +20,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.logging.LoggingBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.scan.ScanDispatch; @@ -33,8 +33,8 @@ public class ScanCacheProvider implements CacheProvider { public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDispatch dispatch, BlockCache indexCache, BlockCache dataCache) { - var loggingIndexCache = LoggingBlockCache.wrap(CacheType.INDEX, indexCache); - var loggingDataCache = LoggingBlockCache.wrap(CacheType.DATA, dataCache); + var loggingIndexCache = BlockCacheUtil.instrument(CacheType.INDEX, indexCache); + var loggingDataCache = BlockCacheUtil.instrument(CacheType.DATA, dataCache); switch (dispatch.getIndexCacheUsage()) { case ENABLED: diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 68e2be016d1..9e2d4633c26 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -838,7 +838,6 @@ public void next() throws IOException { } private void _next() throws IOException { - if (!hasTop) { throw new IllegalStateException(); } @@ -1164,6 +1163,12 @@ public void setCacheProvider(CacheProvider cacheProvider) { public long estimateOverlappingEntries(KeyExtent extent) throws IOException { throw new UnsupportedOperationException(); } + + public void flushStats() { + if (currBlock != null) { + currBlock.flushStats(); + } + } } public static class Reader extends HeapIterator implements FileSKVIterator { @@ -1304,6 +1309,9 @@ private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOExce @Override public void closeDeepCopies() throws IOException { + for (LocalityGroupReader lgr : currentReaders) { + lgr.flushStats(); + } closeDeepCopies(false); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index aca53e495c3..a46711b7202 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -47,6 +47,8 @@ import org.apache.accumulo.core.spi.crypto.FileEncrypter; import org.apache.accumulo.core.spi.crypto.NoFileDecrypter; import org.apache.accumulo.core.spi.crypto.NoFileEncrypter; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.util.CountingInputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -468,6 +470,7 @@ private static final class RBlockState { private final CompressionAlgorithm compressAlgo; private Decompressor decompressor; private final BlockRegion region; + private final InputStream rawInputStream; private final InputStream in; private volatile boolean closed; @@ -481,9 +484,15 @@ public RBlockState( BoundedRangeFileInputStream boundedRangeFileInputStream = new BoundedRangeFileInputStream( fsin, this.region.getOffset(), this.region.getCompressedSize()); + var si = ScanInstrumentation.get(); + if (si != null) { + rawInputStream = new CountingInputStream(boundedRangeFileInputStream); + } else { + rawInputStream = boundedRangeFileInputStream; + } + try { - InputStream inputStreamToBeCompressed = - decrypter.decryptStream(boundedRangeFileInputStream); + InputStream inputStreamToBeCompressed = decrypter.decryptStream(rawInputStream); this.in = compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor, getFSInputBufferSize(conf)); } catch (IOException e) { @@ -506,11 +515,23 @@ public BlockRegion getBlockRegion() { return region; } + public void flushStats() { + if (rawInputStream instanceof CountingInputStream) { + var ci = (CountingInputStream) rawInputStream; + var si = ScanInstrumentation.get(); + if (si != null) { + si.incrementFileBytesRead(ci.getCount()); + } + ci.resetCount(); + } + } + public void finish() throws IOException { synchronized (in) { if (!closed) { try { in.close(); + flushStats(); } finally { closed = true; if (decompressor != null) { @@ -538,6 +559,10 @@ public static class BlockReader extends DataInputStream { rBlkState = rbs; } + public void flushStats() { + rBlkState.flushStats(); + } + /** * Finishing reading the block. Release all resources. */ diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java index 47a08819731..990752023c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java @@ -34,15 +34,20 @@ public class StatsIterator extends ServerWrappingIterator { private int numRead = 0; - private final AtomicLong seekCounter; + private final AtomicLong scanSeekCounter; + private final LongAdder serverSeekCounter; private final AtomicLong scanCounter; + private final LongAdder tabletScanCounter; private final LongAdder serverScanCounter; - public StatsIterator(SortedKeyValueIterator source, AtomicLong seekCounter, - AtomicLong tabletScanCounter, LongAdder serverScanCounter) { + public StatsIterator(SortedKeyValueIterator source, AtomicLong scanSeekCounter, + LongAdder serverSeekCounter, AtomicLong scanCounter, LongAdder tabletScanCounter, + LongAdder serverScanCounter) { super(source); - this.seekCounter = seekCounter; - this.scanCounter = tabletScanCounter; + this.scanSeekCounter = scanSeekCounter; + this.serverSeekCounter = serverSeekCounter; + this.scanCounter = scanCounter; + this.tabletScanCounter = tabletScanCounter; this.serverScanCounter = serverScanCounter; } @@ -51,31 +56,32 @@ public void next() throws IOException { source.next(); numRead++; - if (numRead % 23 == 0) { - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + if (numRead % 1009 == 0) { + report(); } } @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new StatsIterator(source.deepCopy(env), seekCounter, scanCounter, serverScanCounter); + return new StatsIterator(source.deepCopy(env), scanSeekCounter, serverSeekCounter, scanCounter, + tabletScanCounter, serverScanCounter); } @Override public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { source.seek(range, columnFamilies, inclusive); - seekCounter.incrementAndGet(); - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + serverSeekCounter.increment(); + scanSeekCounter.incrementAndGet(); + report(); } public void report() { - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + if (numRead > 0) { + scanCounter.addAndGet(numRead); + tabletScanCounter.add(numRead); + serverScanCounter.add(numRead); + numRead = 0; + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index cce1044e65d..83c06339747 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -32,12 +32,12 @@ import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.file.NoSuchMetaStoreException; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; -import org.apache.accumulo.core.logging.LoggingBlockCache; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.cache.CacheType; @@ -194,8 +194,8 @@ public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFa // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when // only summary data is wanted. CompositeCache compositeCache = - new CompositeCache(LoggingBlockCache.wrap(CacheType.SUMMARY, summaryCache), - LoggingBlockCache.wrap(CacheType.INDEX, indexCache)); + new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, summaryCache), + BlockCacheUtil.instrument(CacheType.INDEX, indexCache)); CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache) .cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService); bcReader = new CachableBlockFile.Reader(cb); diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java new file mode 100644 index 00000000000..d68963a5978 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.spi.cache.CacheType; + +import io.opentelemetry.api.trace.Span; + +/** + * This class helps collect per scan information for the purposes of tracing. + */ +public class ScanInstrumentation { + private final AtomicLong fileBytesRead = new AtomicLong(); + private final AtomicLong uncompressedBytesRead = new AtomicLong(); + private final AtomicInteger[] cacheHits = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheMisses = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheBypasses = new AtomicInteger[CacheType.values().length]; + + private static final Map INSTRUMENTED_SCANS = + new ConcurrentHashMap<>(); + + private ScanInstrumentation() { + for (int i = 0; i < CacheType.values().length; i++) { + cacheHits[i] = new AtomicInteger(); + cacheMisses[i] = new AtomicInteger(); + cacheBypasses[i] = new AtomicInteger(); + } + } + + /** + * Increments the raw bytes read directly from DFS by a scan. + * + * @param amount the amount of bytes read + */ + public void incrementFileBytesRead(long amount) { + fileBytesRead.addAndGet(amount); + } + + // TODO should it be an option to cache compressed data? + /** + * Increments the uncompressed and decrypted bytes read by a scan. This will include all + * uncompressed data read by a scan regardless of if the underlying data came from cache or DFS. + * + * @param amount + */ + public void incrementUncompressedBytesRead(long amount) { + uncompressedBytesRead.addAndGet(amount); + } + + /** + * Increments the count of rfile blocks that were not already in the cache. + */ + public void incrementCacheMiss(CacheType cacheType) { + cacheMisses[cacheType.ordinal()].incrementAndGet(); + } + + /** + * Increments the count of rfile blocks that were already in the cache. + */ + public void incrementCacheHit(CacheType cacheType) { + cacheHits[cacheType.ordinal()].incrementAndGet(); + } + + /** + * Increments the count of rfile blocks that were directly read from DFS bypassing the cache. + */ + public void incrementCacheBypass(CacheType cacheType) { + cacheBypasses[cacheType.ordinal()].incrementAndGet(); + } + + public long getFileBytesRead() { + return fileBytesRead.get(); + } + + public long getUncompressedBytesRead() { + return uncompressedBytesRead.get(); + } + + public int getCacheHits(CacheType cacheType) { + return cacheHits[cacheType.ordinal()].get(); + } + + public int getCacheMisses(CacheType cacheType) { + return cacheMisses[cacheType.ordinal()].get(); + } + + public int getCacheBypasses(CacheType cacheType) { + return cacheBypasses[cacheType.ordinal()].get(); + } + + public static void enable(Span span) { + if (span.isRecording()) { + INSTRUMENTED_SCANS.put(span.getSpanContext().getTraceId(), new ScanInstrumentation()); + } + } + + public static ScanInstrumentation get() { + var span = Span.current(); + if (span.isRecording()) { + return INSTRUMENTED_SCANS.get(span.getSpanContext().getTraceId()); + } + return null; + } + + public static void disable(Span span) { + if (span.isRecording()) { + INSTRUMENTED_SCANS.remove(span.getSpanContext().getTraceId()); + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java new file mode 100644 index 00000000000..6691a8008a6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// TODO this was copied from Guava and slightly modified, what needs to be done for the license +// TODO probably needs unit test +// TODO open upstream issue? found existing issue number #590 in guava about making the class non-final +package org.apache.accumulo.core.util; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +/** + * An {@link InputStream} that counts the number of bytes read. + * + * @author Chris Nokleberg + */ +public final class CountingInputStream extends FilterInputStream { + + private long count; + private long mark = -1; + + /** + * Wraps another input stream, counting the number of bytes read. + * + * @param in the input stream to be wrapped + */ + public CountingInputStream(InputStream in) { + super(Objects.requireNonNull(in)); + } + + /** Returns the number of bytes read. */ + public long getCount() { + return count; + } + + /** Resets the count of bytes read to zero */ + public void resetCount() { + count = 0; + } + + /** Returns the input stream this is wrapping */ + public InputStream getWrappedStream() { + return in; + } + + @Override + public int read() throws IOException { + int result = in.read(); + if (result != -1) { + count++; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = in.read(b, off, len); + if (result != -1) { + count += result; + } + return result; + } + + @Override + public long skip(long n) throws IOException { + long result = in.skip(n); + count += result; + return result; + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + mark = count; + // it's okay to mark even if mark isn't supported, as reset won't work + } + + @Override + public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + count = mark; + } +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 3715eac0c24..8226f20dd33 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.scan.ScanServerInfo; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; @@ -61,4 +62,6 @@ public interface TabletHostingServer { GarbageCollectionLogger getGcLogger(); BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf); + + HostAndPort getAdvertiseAddress(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 2573d6b4514..25ea5276729 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; @@ -227,7 +228,7 @@ public TabletServerMinCMetrics getMinCMetrics() { private String lockID; private volatile long lockSessionId = -1; - public static final AtomicLong seekCount = new AtomicLong(0); + public static final LongAdder seekCount = new LongAdder(); private final AtomicLong totalMinorCompactions = new AtomicLong(0); @@ -1201,7 +1202,7 @@ public TabletServerStatus getStats(Map> scanCou result.osLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); result.name = getClientAddressString(); result.holdTime = resourceManager.holdTime(); - result.lookups = seekCount.get(); + result.lookups = seekCount.sum(); result.indexCacheHits = resourceManager.getIndexCache().getStats().hitCount(); result.indexCacheRequest = resourceManager.getIndexCache().getStats().requestCount(); result.dataCacheHits = resourceManager.getDataCache().getStats().hitCount(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..d9297f36077 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -57,6 +57,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; + class ScanDataSource implements DataSource { private static final Logger log = LoggerFactory.getLogger(ScanDataSource.class); @@ -76,6 +79,9 @@ class ScanDataSource implements DataSource { private final byte[] defaultLabels; private final long scanDataSourceId; + private final AtomicLong scanSeekCounter; + private final AtomicLong scanCounter; + ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean loadIters, AtomicBoolean interruptFlag) { this.tablet = tablet; @@ -85,6 +91,8 @@ class ScanDataSource implements DataSource { this.loadIters = loadIters; this.defaultLabels = tablet.getDefaultSecurityLabels(); this.scanDataSourceId = nextSourceId.incrementAndGet(); + this.scanSeekCounter = new AtomicLong(); + this.scanCounter = new AtomicLong(); log.trace("new scan data source, scanId {}, tablet: {}, params: {}, loadIterators: {}", this.scanDataSourceId, this.tablet, this.scanParams, this.loadIters); } @@ -111,6 +119,9 @@ public DataSource getNewDataSource() { } finally { expectedDeletionCount = tablet.getDataSourceDeletions(); iter = null; + if (statsIterator != null) { + statsIterator.report(); + } } } } @@ -200,8 +211,8 @@ private SortedKeyValueIterator createIterator() } SystemIteratorEnvironment iterEnv = (SystemIteratorEnvironment) builder.build(); - statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter(), - tablet.getScanMetrics().getScannedCounter()); + statsIterator = new StatsIterator(multiIter, scanSeekCounter, TabletServer.seekCount, + scanCounter, tablet.getScannedCounter(), tablet.getScanMetrics().getScannedCounter()); SortedKeyValueIterator visFilter = SystemIteratorUtil.setupSystemScanIterators(statsIterator, scanParams.getColumnSet(), @@ -294,6 +305,18 @@ public void close(boolean sawErrors) { } } + private static final AttributeKey ENTRIES_READ_KEY = + AttributeKey.longKey("accumulo.entries.read"); + private static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.seeks"); + + public void setAttributes(Span span) { + if (statsIterator != null && span.isRecording()) { + statsIterator.report(); + span.setAttribute(ENTRIES_READ_KEY, scanCounter.get()); + span.setAttribute(SEEKS_KEY, scanSeekCounter.get()); + } + } + public void interrupt() { interruptFlag.set(true); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 2b89a2005b1..30a2119407b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -117,7 +117,7 @@ public ScanBatch read() throws IOException, TabletClosedException { iter = new SourceSwitchingIterator(dataSource, false); } - results = tablet.nextBatch(iter, range, scanParams); + results = tablet.nextBatch(iter, range, scanParams, dataSource); if (results.getResults() == null) { range = null; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 90a53c734a0..8d17402ec96 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -446,6 +446,7 @@ private void removeOldTemporaryFiles( } try { + // TODO include extent if (volume.getFileSystem().delete(tmp.getPath(), false)) { log.debug("Removed old temp file {}", tmp.getPath()); } else { @@ -1793,7 +1794,7 @@ public long totalQueryResultsBytes() { } public long totalScannedCount() { - return this.scannedCount.get(); + return this.scannedCount.sum(); } public long totalLookupCount() { @@ -1806,7 +1807,7 @@ public void updateRates(long now) { queryByteRate.update(now, this.queryResultBytes.get()); ingestRate.update(now, ingestCount); ingestByteRate.update(now, ingestBytes); - scannedRate.update(now, this.scannedCount.get()); + scannedRate.update(now, this.scannedCount.sum()); } public long getSplitCreationTime() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index dcc54e9da71..b66f4badd84 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -48,6 +49,9 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ShutdownUtil; @@ -58,10 +62,14 @@ import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; +import org.apache.accumulo.tserver.scan.NextBatchTask; import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; + /** * This class exists to share code for scanning a tablet between {@link Tablet} and * {@link SnapshotTablet} @@ -79,7 +87,7 @@ public abstract class TabletBase { protected AtomicLong lookupCount = new AtomicLong(0); protected AtomicLong queryResultCount = new AtomicLong(0); protected AtomicLong queryResultBytes = new AtomicLong(0); - protected final AtomicLong scannedCount = new AtomicLong(0); + protected final LongAdder scannedCount = new LongAdder(); protected final Set activeScans = new HashSet<>(); @@ -154,7 +162,7 @@ public Scanner createScanner(Range range, ScanParameters scanParams, return new Scanner(this, range, scanParams, interruptFlag); } - public AtomicLong getScannedCounter() { + public LongAdder getScannedCounter() { return this.scannedCount; } @@ -205,25 +213,30 @@ public Tablet.LookupResult lookup(List ranges, List results, tabletRange.clip(range); } - SourceSwitchingIterator.DataSource dataSource = - createDataSource(scanParams, true, interruptFlag); + ScanDataSource dataSource = createDataSource(scanParams, true, interruptFlag); Tablet.LookupResult result = null; boolean sawException = false; - try { + var span = TraceUtil.startSpan(TabletBase.class, "multiscan-batch"); + try (var scope = span.makeCurrent()) { + ScanInstrumentation.enable(span); SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); this.lookupCount.incrementAndGet(); this.server.getScanMetrics().incrementLookupCount(1); result = lookup(iter, ranges, results, scanParams, maxResultSize); + recordScanTrace(span, results, scanParams, dataSource); return result; } catch (IOException | RuntimeException e) { sawException = true; + span.recordException(e); throw e; } finally { + ScanInstrumentation.disable(span); // code in finally block because always want // to return mapfiles, even when exception is thrown dataSource.close(sawException); + span.end(); synchronized (this) { queryResultCount.addAndGet(results.size()); @@ -236,6 +249,82 @@ public Tablet.LookupResult lookup(List ranges, List results, } } + private static final AttributeKey ENTRIES_RETURNED_KEY = + AttributeKey.longKey("accumulo.entries.returned"); + private static final AttributeKey BYTES_RETURNED_KEY = + AttributeKey.longKey("accumulo.bytes.returned"); + private static final AttributeKey BYTES_READ_KEY = + AttributeKey.longKey("accumulo.bytes.read"); + private static final AttributeKey BYTES_READ_FILE_KEY = + AttributeKey.longKey("accumulo.bytes.read.file"); + private static final AttributeKey EXECUTOR_KEY = + AttributeKey.stringKey("accumulo.executor"); + private static final AttributeKey TABLE_ID_KEY = + AttributeKey.stringKey("accumulo.table.id"); + private static final AttributeKey EXTENT_KEY = AttributeKey.stringKey("accumulo.extent"); + private static final AttributeKey INDEX_HITS_KEY = + AttributeKey.longKey("accumulo.cache.index.hits"); + private static final AttributeKey INDEX_MISSES_KEY = + AttributeKey.longKey("accumulo.cache.index.misses"); + private static final AttributeKey INDEX_BYPASSES_KEY = + AttributeKey.longKey("accumulo.cache.index.bypasses"); + private static final AttributeKey DATA_HITS_KEY = + AttributeKey.longKey("accumulo.cache.data.hits"); + private static final AttributeKey DATA_MISSES_KEY = + AttributeKey.longKey("accumulo.cache.data.misses"); + private static final AttributeKey DATA_BYPASSES_KEY = + AttributeKey.longKey("accumulo.cache.data.bypasses");; + private static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); + + private void recordScanTrace(Span span, List batch, ScanParameters scanParameters, + ScanDataSource dataSource) { + if (span.isRecording()) { + // TODO in testing could not get really large batches, even when increasing table and + // client settings + span.setAttribute(ENTRIES_RETURNED_KEY, batch.size()); + long bytesReturned = 0; + for (var e : batch) { + bytesReturned += e.getKey().getLength() + e.getValue().get().length; + } + span.setAttribute(BYTES_RETURNED_KEY, bytesReturned); + span.setAttribute(EXECUTOR_KEY, scanParameters.getScanDispatch().getExecutorName()); + span.setAttribute(TABLE_ID_KEY, getExtent().tableId().canonical()); + span.setAttribute(EXTENT_KEY, getExtent().toString()); + var si = ScanInstrumentation.get(); + if (si != null) { + span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); + span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); + span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); + span.setAttribute(INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); + span.setAttribute(INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); + span.setAttribute(DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); + span.setAttribute(DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); + span.setAttribute(DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); + } + span.setAttribute(SERVER_KEY, server.getAdvertiseAddress().toString()); + + dataSource.setAttributes(span); + } + } + + Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams, + ScanDataSource dataSource) throws IOException { + // TODO what is the fastest way to short circuit and do nothing is there is no trace? + var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch"); + try (var scope = span.makeCurrent()) { + ScanInstrumentation.enable(span); + + var batch = nextBatch(iter, range, scanParams); + recordScanTrace(span, batch.getResults(), scanParams, dataSource); + return batch; + } catch (IOException | RuntimeException e) { + span.recordException(e); + throw e; + } finally { + ScanInstrumentation.disable(span); + } + } + Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams) throws IOException { From ba54725c63109a5bf2d97623f754b8fe3fb196b5 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 01:00:19 +0000 Subject: [PATCH 02/23] fix javadoc --- .../org/apache/accumulo/core/trace/ScanInstrumentation.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java index d68963a5978..172d290cf58 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -61,8 +61,6 @@ public void incrementFileBytesRead(long amount) { /** * Increments the uncompressed and decrypted bytes read by a scan. This will include all * uncompressed data read by a scan regardless of if the underlying data came from cache or DFS. - * - * @param amount */ public void incrementUncompressedBytesRead(long amount) { uncompressedBytesRead.addAndGet(amount); From d68f51ef64c16b61bd0482e82518651d30114f32 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 16:44:26 +0000 Subject: [PATCH 03/23] report data scan stats from deep copies --- .../iteratorsImpl/system/StatsIterator.java | 25 +++++++++++++++---- .../tserver/tablet/ScanDataSource.java | 6 ++--- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java index 990752023c4..a2f8da5975f 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java @@ -19,7 +19,10 @@ package org.apache.accumulo.core.iteratorsImpl.system; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -39,6 +42,7 @@ public class StatsIterator extends ServerWrappingIterator { private final AtomicLong scanCounter; private final LongAdder tabletScanCounter; private final LongAdder serverScanCounter; + private final List deepCopies = Collections.synchronizedList(new ArrayList<>()); public StatsIterator(SortedKeyValueIterator source, AtomicLong scanSeekCounter, LongAdder serverSeekCounter, AtomicLong scanCounter, LongAdder tabletScanCounter, @@ -57,14 +61,17 @@ public void next() throws IOException { numRead++; if (numRead % 1009 == 0) { - report(); + // only report on self, do not force deep copies to report + report(false); } } @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new StatsIterator(source.deepCopy(env), scanSeekCounter, serverSeekCounter, scanCounter, - tabletScanCounter, serverScanCounter); + var deepCopy = new StatsIterator(source.deepCopy(env), scanSeekCounter, serverSeekCounter, + scanCounter, tabletScanCounter, serverScanCounter); + deepCopies.add(deepCopy); + return deepCopy; } @Override @@ -73,15 +80,23 @@ public void seek(Range range, Collection columnFamilies, boolean i source.seek(range, columnFamilies, inclusive); serverSeekCounter.increment(); scanSeekCounter.incrementAndGet(); - report(); + // only report on self, do not force deep copies to report + report(false); } - public void report() { + public void report(boolean reportDeepCopies) { if (numRead > 0) { scanCounter.addAndGet(numRead); tabletScanCounter.add(numRead); serverScanCounter.add(numRead); numRead = 0; } + + if (reportDeepCopies) { + // recurse down the fat tree of deep copies forcing them to report + for (var deepCopy : deepCopies) { + deepCopy.report(true); + } + } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index d9297f36077..27173d4b1e8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -120,7 +120,7 @@ public DataSource getNewDataSource() { expectedDeletionCount = tablet.getDataSourceDeletions(); iter = null; if (statsIterator != null) { - statsIterator.report(); + statsIterator.report(true); } } } @@ -299,7 +299,7 @@ public void close(boolean sawErrors) { } finally { fileManager = null; if (statsIterator != null) { - statsIterator.report(); + statsIterator.report(true); } } } @@ -311,7 +311,7 @@ public void close(boolean sawErrors) { public void setAttributes(Span span) { if (statsIterator != null && span.isRecording()) { - statsIterator.report(); + statsIterator.report(true); span.setAttribute(ENTRIES_READ_KEY, scanCounter.get()); span.setAttribute(SEEKS_KEY, scanSeekCounter.get()); } From a83bfbf06dd12f622ee9ecde2ada2467a8f19b4b Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 18:48:04 +0000 Subject: [PATCH 04/23] refactor where intrumentation happens --- .../blockfile/impl/CachableBlockFile.java | 3 -- .../core/trace/ScanInstrumentation.java | 19 +++++++----- .../tserver/tablet/ScanDataSource.java | 21 +++++++++++--- .../accumulo/tserver/tablet/Scanner.java | 28 +++++++++++++++--- .../accumulo/tserver/tablet/TabletBase.java | 29 ++++--------------- 5 files changed, 58 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index b34cdf83929..a40072bb146 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -53,8 +53,6 @@ import com.google.common.cache.Cache; -import io.opentelemetry.api.GlobalOpenTelemetry; - /** * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks * and metadatablocks @@ -416,7 +414,6 @@ public CachedBlockRead getMetaBlock(String blockName) throws IOException { public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException { - GlobalOpenTelemetry.get(); BlockCache _iCache = cacheProvider.getIndexCache(); if (_iCache != null) { String _lookup = this.cacheId + "R" + offset; diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java index 172d290cf58..1c50d8348b8 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -107,9 +107,18 @@ public int getCacheBypasses(CacheType cacheType) { return cacheBypasses[cacheType.ordinal()].get(); } - public static void enable(Span span) { + public static interface ScanScope extends AutoCloseable { + @Override + void close(); + } + + public static ScanScope enable(Span span) { if (span.isRecording()) { - INSTRUMENTED_SCANS.put(span.getSpanContext().getTraceId(), new ScanInstrumentation()); + var traceId = span.getSpanContext().getTraceId(); + INSTRUMENTED_SCANS.put(traceId, new ScanInstrumentation()); + return () -> INSTRUMENTED_SCANS.remove(traceId); + } else { + return () -> {}; } } @@ -120,10 +129,4 @@ public static ScanInstrumentation get() { } return null; } - - public static void disable(Span span) { - if (span.isRecording()) { - INSTRUMENTED_SCANS.remove(span.getSpanContext().getTraceId()); - } - } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 27173d4b1e8..2aa84811e87 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -57,6 +57,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; @@ -70,7 +72,7 @@ class ScanDataSource implements DataSource { private SortedKeyValueIterator iter; private long expectedDeletionCount; private List memIters = null; - private long fileReservationId; + private long fileReservationId = -1; private final AtomicBoolean interruptFlag; private StatsIterator statsIterator; @@ -185,6 +187,7 @@ private SortedKeyValueIterator createIterator() memIters = tablet.getMemIterators(samplerConfig); Pair> reservation = tablet.reserveFilesForScan(); fileReservationId = reservation.getFirst(); + Preconditions.checkState(fileReservationId >= 0); files = reservation.getSecond(); } @@ -268,9 +271,11 @@ private void returnIterators() { tablet.returnMemIterators(memIters); memIters = null; try { - log.trace("Returning file iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), - scanDataSourceId, fileReservationId); - tablet.returnFilesForScan(fileReservationId); + if (fileReservationId >= 0) { + log.trace("Returning file iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), + scanDataSourceId, fileReservationId); + tablet.returnFilesForScan(fileReservationId); + } } catch (Exception e) { log.warn("Error Returning file iterators for scan: {}, :{}", scanDataSourceId, e); // Continue bubbling the exception up for handling. @@ -281,8 +286,16 @@ private void returnIterators() { } } + private boolean closed = false; + @Override public void close(boolean sawErrors) { + if (closed) { + return; + } + + closed = true; + try { returnIterators(); } finally { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 30a2119407b..ba1c7b28b3d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -30,7 +30,11 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ShutdownUtil; +import org.apache.accumulo.tserver.scan.NextBatchTask; import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +72,22 @@ public class Scanner { } public ScanBatch read() throws IOException, TabletClosedException { + // TODO what is the fastest way to short circuit and do nothing is there is no trace? + var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch"); + try (var scope = span.makeCurrent(); var scanScope = ScanInstrumentation.enable(span)) { + var batchAndSource = readInternal(); + // This needs to be called after the ScanDataSource was closed inorder to make sure all + // statistics related to files reads are seen. + tablet.recordScanTrace(span, batchAndSource.getFirst().getResults(), scanParams, + batchAndSource.getSecond()); + return batchAndSource.getFirst(); + } catch (IOException | RuntimeException e) { + span.recordException(e); + throw e; + } + } + + private Pair readInternal() throws IOException, TabletClosedException { ScanDataSource dataSource = null; @@ -117,17 +137,17 @@ public ScanBatch read() throws IOException, TabletClosedException { iter = new SourceSwitchingIterator(dataSource, false); } - results = tablet.nextBatch(iter, range, scanParams, dataSource); + results = tablet.nextBatch(iter, range, scanParams); if (results.getResults() == null) { range = null; - return new ScanBatch(new ArrayList<>(), false); + return new Pair<>(new ScanBatch(new ArrayList<>(), false), dataSource); } else if (results.getContinueKey() == null) { - return new ScanBatch(results.getResults(), false); + return new Pair<>(new ScanBatch(results.getResults(), false), dataSource); } else { range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive()); - return new ScanBatch(results.getResults(), true); + return new Pair<>(new ScanBatch(results.getResults(), true), dataSource); } } catch (IterationInterruptedException iie) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index b66f4badd84..0453c7006f6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -62,7 +62,6 @@ import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.TabletServerResourceManager; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; -import org.apache.accumulo.tserver.scan.NextBatchTask; import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,12 +218,13 @@ public Tablet.LookupResult lookup(List ranges, List results, boolean sawException = false; var span = TraceUtil.startSpan(TabletBase.class, "multiscan-batch"); - try (var scope = span.makeCurrent()) { - ScanInstrumentation.enable(span); + try (var scope = span.makeCurrent(); var scanScope = ScanInstrumentation.enable(span)) { SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); this.lookupCount.incrementAndGet(); this.server.getScanMetrics().incrementLookupCount(1); result = lookup(iter, ranges, results, scanParams, maxResultSize); + // must close data source before recording scan trace in order to flush all file read stats + dataSource.close(false); recordScanTrace(span, results, scanParams, dataSource); return result; } catch (IOException | RuntimeException e) { @@ -232,7 +232,6 @@ public Tablet.LookupResult lookup(List ranges, List results, span.recordException(e); throw e; } finally { - ScanInstrumentation.disable(span); // code in finally block because always want // to return mapfiles, even when exception is thrown dataSource.close(sawException); @@ -276,7 +275,7 @@ public Tablet.LookupResult lookup(List ranges, List results, AttributeKey.longKey("accumulo.cache.data.bypasses");; private static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); - private void recordScanTrace(Span span, List batch, ScanParameters scanParameters, + void recordScanTrace(Span span, List batch, ScanParameters scanParameters, ScanDataSource dataSource) { if (span.isRecording()) { // TODO in testing could not get really large batches, even when increasing table and @@ -292,6 +291,8 @@ private void recordScanTrace(Span span, List batch, ScanParameters scan span.setAttribute(EXTENT_KEY, getExtent().toString()); var si = ScanInstrumentation.get(); if (si != null) { + // TODO this happens before the scan data source is closed, so not all counts may have been + // registered span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); @@ -307,24 +308,6 @@ private void recordScanTrace(Span span, List batch, ScanParameters scan } } - Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams, - ScanDataSource dataSource) throws IOException { - // TODO what is the fastest way to short circuit and do nothing is there is no trace? - var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch"); - try (var scope = span.makeCurrent()) { - ScanInstrumentation.enable(span); - - var batch = nextBatch(iter, range, scanParams); - recordScanTrace(span, batch.getResults(), scanParams, dataSource); - return batch; - } catch (IOException | RuntimeException e) { - span.recordException(e); - throw e; - } finally { - ScanInstrumentation.disable(span); - } - } - Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams) throws IOException { From 7884dd620219e20843341b8d8aa4595ede414680 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 23:07:36 +0000 Subject: [PATCH 05/23] fix bug --- .../main/java/org/apache/accumulo/tserver/tablet/Scanner.java | 2 ++ .../org/apache/accumulo/test/tracing/ScanTraceClient.java | 4 ++++ .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 4 ++++ 3 files changed, 10 insertions(+) create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index ba1c7b28b3d..3af98d70687 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -84,6 +84,8 @@ public ScanBatch read() throws IOException, TabletClosedException { } catch (IOException | RuntimeException e) { span.recordException(e); throw e; + }finally { + span.end(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java new file mode 100644 index 00000000000..32a80b5f860 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java @@ -0,0 +1,4 @@ +package org.apache.accumulo.test.tracing; + +public class ScanTraceClient { +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java new file mode 100644 index 00000000000..542080e8755 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -0,0 +1,4 @@ +package org.apache.accumulo.test.tracing; + +public class TestScanTracing { +} From 3c168ffc23355da19cbe22de82fe717a622929cb Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 23:11:31 +0000 Subject: [PATCH 06/23] remove accidental checkin --- .../org/apache/accumulo/test/tracing/ScanTraceClient.java | 4 ---- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 4 ---- 2 files changed, 8 deletions(-) delete mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java delete mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java deleted file mode 100644 index 32a80b5f860..00000000000 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.accumulo.test.tracing; - -public class ScanTraceClient { -} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java deleted file mode 100644 index 542080e8755..00000000000 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.accumulo.test.tracing; - -public class TestScanTracing { -} From 5ca9a9dd83042cbe9a1844a14947d5fd10ed39b8 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 9 Dec 2025 23:19:25 +0000 Subject: [PATCH 07/23] format code --- .../main/java/org/apache/accumulo/tserver/tablet/Scanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 3af98d70687..d8c9d911150 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -84,7 +84,7 @@ public ScanBatch read() throws IOException, TabletClosedException { } catch (IOException | RuntimeException e) { span.recordException(e); throw e; - }finally { + } finally { span.end(); } } From 0deafbe1dddf43ac275779544985680c6ae71e81 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 10 Dec 2025 01:11:36 +0000 Subject: [PATCH 08/23] fix bug --- .../core/trace/ScanInstrumentation.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java index 1c50d8348b8..46c570f6ae6 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -18,13 +18,13 @@ */ package org.apache.accumulo.core.trace; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.spi.cache.CacheType; +import com.google.common.base.Preconditions; + import io.opentelemetry.api.trace.Span; /** @@ -37,8 +37,7 @@ public class ScanInstrumentation { private final AtomicInteger[] cacheMisses = new AtomicInteger[CacheType.values().length]; private final AtomicInteger[] cacheBypasses = new AtomicInteger[CacheType.values().length]; - private static final Map INSTRUMENTED_SCANS = - new ConcurrentHashMap<>(); + private static final ThreadLocal INSTRUMENTED_SCANS = new ThreadLocal<>(); private ScanInstrumentation() { for (int i = 0; i < CacheType.values().length; i++) { @@ -107,26 +106,25 @@ public int getCacheBypasses(CacheType cacheType) { return cacheBypasses[cacheType.ordinal()].get(); } - public static interface ScanScope extends AutoCloseable { + public interface ScanScope extends AutoCloseable { @Override void close(); } public static ScanScope enable(Span span) { if (span.isRecording()) { - var traceId = span.getSpanContext().getTraceId(); - INSTRUMENTED_SCANS.put(traceId, new ScanInstrumentation()); - return () -> INSTRUMENTED_SCANS.remove(traceId); + INSTRUMENTED_SCANS.set(new ScanInstrumentation()); + var id = Thread.currentThread().getId(); + return () -> { + Preconditions.checkState(id == Thread.currentThread().getId()); + INSTRUMENTED_SCANS.remove(); + }; } else { return () -> {}; } } public static ScanInstrumentation get() { - var span = Span.current(); - if (span.isRecording()) { - return INSTRUMENTED_SCANS.get(span.getSpanContext().getTraceId()); - } - return null; + return INSTRUMENTED_SCANS.get(); } } From 2b79fd94b29a01207c853114053171fef229b6ca Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 18:22:25 +0000 Subject: [PATCH 09/23] adds integration test scan tracing --- .../accumulo/tserver/tablet/TabletBase.java | 2 - test/pom.xml | 12 + .../org/apache/accumulo/test/TestIngest.java | 12 +- .../test/tracing/ScanTraceClient.java | 152 ++++++++ .../accumulo/test/tracing/ScanTracingIT.java | 344 ++++++++++++++++++ .../accumulo/test/tracing/SpanData.java | 44 +++ .../accumulo/test/tracing/TraceCollector.java | 106 ++++++ 7 files changed, 669 insertions(+), 3 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java create mode 100644 test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 0453c7006f6..7817b4b1cee 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -291,8 +291,6 @@ void recordScanTrace(Span span, List batch, ScanParameters scanParamete span.setAttribute(EXTENT_KEY, getExtent().toString()); var si = ScanInstrumentation.get(); if (si != null) { - // TODO this happens before the scan data source is closed, so not all counts may have been - // registered span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); diff --git a/test/pom.xml b/test/pom.xml index c58e3778697..cce8818e8f6 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -82,6 +82,18 @@ io.opentelemetry opentelemetry-context + + io.opentelemetry.javaagent + opentelemetry-javaagent + + 2.14.0 + + + io.opentelemetry.proto + opentelemetry-proto + + 1.3.2-alpha + org.apache.accumulo accumulo-compaction-coordinator diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index f20cf31afe4..a6f85f9aaf0 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -246,6 +247,10 @@ public static void toPrintableChars(byte[] dest) { } } + public static IteratorSetting.Column generateColumn(IngestParams params, int column) { + return new IteratorSetting.Column(new Text(params.columnFamily), generateQualifier(column)); + } + public static void main(String[] args) throws Exception { Opts opts = new Opts(); @@ -299,7 +304,7 @@ public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestPa Mutation m = new Mutation(row); for (int j = 0; j < params.cols; j++) { Text colf = new Text(params.columnFamily); - Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); + Text colq = generateQualifier(j); if (writer != null) { Key key = new Key(row, colf, colq, labBA); @@ -405,6 +410,11 @@ public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestPa elapsed); } + private static Text generateQualifier(int j) { + Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); + return colq; + } + public static void ingest(AccumuloClient c, IngestParams params) throws MutationsRejectedException, IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java new file mode 100644 index 00000000000..14a4a0a16cd --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Range; + +import com.google.gson.FormattingStyle; +import com.google.gson.GsonBuilder; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; + +public class ScanTraceClient { + + public static class Options { + String clientPropsPath; + String table; + String startRow; + String endRow; + String family; + String qualifier; + + Options() {} + + Options(String table) { + this.table = table; + } + + void conigureScanner(Scanner scanner) { + if (startRow != null || endRow != null) { + scanner.setRange(new Range(startRow, true, endRow, false)); + } + setColumn(scanner); + } + + void conigureScanner(BatchScanner scanner) { + if (startRow != null || endRow != null) { + scanner.setRanges(List.of(new Range(startRow, true, endRow, false))); + } else { + scanner.setRanges(List.of(new Range())); + } + setColumn(scanner); + } + + void setColumn(ScannerBase scanner) { + System.out.println(scanner.getClass().getName() + " fam " + family); + if (family != null) { + scanner.fetchColumn(family, qualifier); + } + } + + } + + public static class Results { + // trace id for the batch scan + String traceId1; + // trace id for the normal scan + String traceId2; + // The number of entries returned by both scans + long scanCount; + // The number of bytes returned by both scans + long scanSize; + + @Override + public String toString() { + return "Results{" + "scanCount=" + scanCount + ", traceId1='" + traceId1 + '\'' + + ", traceId2='" + traceId2 + '\'' + ", scanSize=" + scanSize + '}'; + } + + } + + public static void main(String[] args) throws Exception { + + Options opts = new GsonBuilder().create().fromJson(args[0], Options.class); + + String clientPropsPath = opts.clientPropsPath; + String table = opts.table; + + Tracer tracer = GlobalOpenTelemetry.get().getTracer(ScanTraceClient.class.getName()); + try (var client = Accumulo.newClient().from(clientPropsPath).build()) { + long scanCount = 0; + long scanSize = 0; + long batchScancount = 0; + long batchScanSize = 0; + + Span span = tracer.spanBuilder("batch-scan").startSpan(); + try (var scanner = client.createBatchScanner(table); var scope = span.makeCurrent()) { + opts.conigureScanner(scanner); + for (var entry : scanner) { + batchScancount++; + batchScanSize += entry.getKey().getSize() + entry.getValue().getSize(); + } + } finally { + span.end(); + } + var traceId1 = span.getSpanContext().getTraceId(); + + // start a second trace + span = tracer.spanBuilder("seq-scan").startSpan(); + try (var scanner = client.createScanner(table); var scope = span.makeCurrent()) { + opts.conigureScanner(scanner); + scanner.setBatchSize(10_000); + for (var entry : scanner) { + scanCount++; + scanSize += entry.getKey().getSize() + entry.getValue().getSize(); + } + } finally { + span.end(); + } + var traceId2 = span.getSpanContext().getTraceId(); + + assertEquals(scanCount, batchScancount); + assertEquals(scanSize, batchScanSize); + assertNotEquals(traceId1, traceId2); + + Results results = new Results(); + results.traceId1 = traceId1; + results.traceId2 = traceId2; + results.scanCount = scanCount; + results.scanSize = scanSize; + + var gson = new GsonBuilder().setFormattingStyle(FormattingStyle.COMPACT).create(); + System.out.println("RESULT:" + gson.toJson(results)); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java new file mode 100644 index 00000000000..62faaad3e59 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; + +class ScanTracingIT extends ConfigurableMacBase { + + private static int OTLP_PORT = 12345; + + private static List getJvmArgs() { + String javaAgent = null; + for (var cpi : System.getProperty("java.class.path").split(":")) { + if (cpi.contains("opentelemetry-javaagent")) { + javaAgent = cpi; + } + } + + Objects.requireNonNull(javaAgent); + + return List.of("-Dotel.traces.exporter=otlp", "-Dotel.exporter.otlp.protocol=http/protobuf", + "-Dotel.exporter.otlp.endpoint=http://localhost:" + OTLP_PORT, + "-Dotel.metrics.exporter=none", "-Dotel.logs.exporter=none", "-javaagent:" + javaAgent); + } + + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + getJvmArgs().forEach(cfg::addJvmOption); + // sized such that full table scans will not fit in the cache + cfg.setProperty(Property.TSERV_DATACACHE_SIZE.getKey(), "8M"); + } + + private TraceCollector collector; + + @BeforeEach + public void startCollector() throws Exception { + collector = new TraceCollector("localhost", OTLP_PORT); + } + + @AfterEach + public void stopCollector() throws Exception { + collector.stop(); + } + + @Test + public void test() throws Exception { + var names = getUniqueNames(7); + runTest(names[0], 0, false, false, -1, -1, -1); + runTest(names[1], 10, false, false, -1, -1, -1); + runTest(names[2], 0, true, false, -1, -1, -1); + runTest(names[3], 0, false, false, -1, -1, 2); + runTest(names[4], 0, false, false, 32, 256, -1); + runTest(names[5], 0, true, true, 32, 256, -1); + runTest(names[6], 0, true, false, -1, -1, 2); + } + + private void runTest(String tableName, int numSplits, boolean cacheData, + boolean secondScanFitsInCache, int startRow, int endRow, int column) throws Exception { + + var ingestParams = new TestIngest.IngestParams(getClientProperties(), tableName); + ingestParams.createTable = false; + ingestParams.rows = 1000; + ingestParams.cols = 10; + + try (var client = Accumulo.newClient().from(getClientProperties()).build()) { + var ntc = new NewTableConfiguration(); + if (numSplits > 0) { + var splits = TestIngest.getSplitPoints(0, 1000, numSplits); + ntc.withSplits(splits); + } + + if (cacheData) { + ntc.setProperties(Map.of(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true")); + } + + client.tableOperations().create(tableName, ntc); + + TestIngest.ingest(client, ingestParams); + client.tableOperations().flush(tableName, null, null, true); + } + + long expectedRows = ingestParams.rows; + + var options = new ScanTraceClient.Options(tableName); + if (startRow != -1 && endRow != -1) { + options.startRow = TestIngest.generateRow(startRow, 0).toString(); + options.endRow = TestIngest.generateRow(endRow, 0).toString(); + expectedRows = IntStream.range(startRow, endRow).count(); + } + + int expectedColumns = ingestParams.cols; + + if (column != -1) { + var col = TestIngest.generateColumn(ingestParams, column); + options.family = col.getColumnFamily().toString(); + options.qualifier = col.getColumnQualifier().toString(); + expectedColumns = 1; + } + + var results = run(options); + System.out.println(results); + + var tableId = getServerContext().getTableId(tableName).canonical(); + + ScanTraceStats scanStats = new ScanTraceStats(false); + ScanTraceStats batchScanStats = new ScanTraceStats(true); + Set extents1 = new TreeSet<>(); + Set extents2 = new TreeSet<>(); + + while (scanStats.getEntriesReturned() < expectedRows * expectedColumns + || batchScanStats.getEntriesReturned() < expectedRows * expectedColumns) { + var span = collector.take(); + var stats = ScanTraceStats.create(span); + if (stats != null && span.stringAttributes.get("accumulo.table.id").equals(tableId) + && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { + assertEquals("default", span.stringAttributes.get("accumulo.executor")); + if (numSplits == 0) { + assertEquals(tableId + "<<", span.stringAttributes.get("accumulo.extent")); + } else { + var extent = span.stringAttributes.get("accumulo.extent"); + assertTrue(extent.startsWith(tableId + ";") || extent.startsWith(tableId + "<")); + } + assertEquals(1, stats.getSeeks()); + if (stats.isBatchScan()) { + assertEquals(results.traceId1, span.traceId); + extents1.add(span.stringAttributes.get("accumulo.extent")); + } else { + assertEquals(results.traceId2, span.traceId); + extents2.add(span.stringAttributes.get("accumulo.extent")); + } + } else { + continue; + } + + if (stats.isBatchScan()) { + batchScanStats.merge(stats); + } else { + scanStats.merge(stats); + } + } + + if (numSplits > 0) { + assertEquals(numSplits, extents1.size()); + assertEquals(numSplits, extents2.size()); + } + + System.out.println(scanStats); + System.out.println(batchScanStats); + + assertEquals(expectedRows * expectedColumns, results.scanCount, results::toString); + + var statsList = List.of(batchScanStats, scanStats); + for (int i = 0; i < statsList.size(); i++) { + var stats = statsList.get(i); + assertEquals(expectedRows * 10, stats.getEntriesRead(), stats::toString); + assertEquals(results.scanCount, stats.getEntriesReturned(), stats::toString); + // When filtering on columns will read more data than we return + double colMultiplier = 10.0 / expectedColumns; + assertClose((long) (results.scanSize * colMultiplier), stats.getBytesRead(), .05); + assertClose(results.scanSize, stats.getBytesReturned(), .05); + if (secondScanFitsInCache && i == 1) { + assertEquals(0, stats.getFileBytesRead(), stats::toString); + } else { + assertClose((long) (stats.getBytesRead() * .005), stats.getFileBytesRead(), .2); + } + if (cacheData) { + assertEquals(0, stats.getDataCacheBypasses(), stats::toString); + assertTrue(stats.getDataCacheHits() + stats.getDataCacheMisses() > 0, stats::toString); + if (stats.getFileBytesRead() == 0) { + assertEquals(0L, stats.getDataCacheMisses(), stats::toString); + } + // When caching data, does not seem to hit the cache much + var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses(); + assertTrue(cacheSum == 0 || cacheSum == 1, stats::toString); + } else { + assertEquals(0, stats.getDataCacheHits(), stats::toString); + assertEquals(0, stats.getDataCacheMisses(), stats::toString); + assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(), stats::toString); + // When not caching data, will go to the index cache each time a block location is looked + // up. TODO why is this happening? keeps getting the RootData metablock for every data + // block. + assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(), .05); + } + assertEquals(0, stats.getIndexCacheBypasses(), stats::toString); + } + + } + + public void assertClose(long expected, long value, double e) { + assertTrue(Math.abs(1 - (double) expected / (double) value) < e, + () -> expected + " " + value + " " + e); + }; + + /** + * Runs ScanTraceClient in an external process so it can be instrumented with the open telemetry + * java agent. Use json to get data to/from external process. + */ + public ScanTraceClient.Results run(ScanTraceClient.Options opts) + throws IOException, InterruptedException { + opts.clientPropsPath = getCluster().getClientPropsPath(); + new Gson().toJson(opts); + var proc = getCluster().exec(ScanTraceClient.class, getJvmArgs(), new Gson().toJson(opts)); + assertEquals(0, proc.getProcess().waitFor()); + var out = proc.readStdOut(); + var result = Arrays.stream(out.split("\\n")).filter(line -> line.startsWith("RESULT:")) + .findFirst().orElse("RESULT:{}"); + result = result.substring("RESULT:".length()); + return new Gson().fromJson(result, ScanTraceClient.Results.class); + } + + /** + * Helper class that encapsulates data from a scan trace making it easier to access and + * centralizing the code for accessing data from a span. + */ + static class ScanTraceStats { + final Map scanStats; + final boolean isBatchScan; + + ScanTraceStats(SpanData spanData) { + this.scanStats = spanData.integerAttributes; + this.isBatchScan = spanData.name.contains("multiscan-batch"); + } + + ScanTraceStats(boolean isBatchScan) { + scanStats = new TreeMap<>(); + this.isBatchScan = isBatchScan; + } + + void merge(ScanTraceStats other) { + Preconditions.checkArgument(isBatchScan == other.isBatchScan); + other.scanStats.forEach((k, v) -> { + scanStats.merge(k, v, Long::sum); + }); + } + + /** + * @return a ScanTrace if span is from a scan batch, otherwise return null + */ + static ScanTraceStats create(SpanData data) { + if (data.name.contains("scan-batch")) { + return new ScanTraceStats(data); + } + return null; + } + + boolean isBatchScan() { + return isBatchScan; + } + + long getEntriesRead() { + return scanStats.getOrDefault("accumulo.entries.read", 0L); + } + + long getEntriesReturned() { + return scanStats.getOrDefault("accumulo.entries.returned", 0L); + + } + + long getFileBytesRead() { + return scanStats.getOrDefault("accumulo.bytes.read.file", 0L); + } + + long getBytesRead() { + return scanStats.getOrDefault("accumulo.bytes.read", 0L); + } + + long getBytesReturned() { + return scanStats.getOrDefault("accumulo.bytes.returned", 0L); + } + + long getDataCacheHits() { + return scanStats.getOrDefault("accumulo.cache.data.hits", 0L); + } + + long getDataCacheMisses() { + return scanStats.getOrDefault("accumulo.cache.data.misses", 0L); + } + + long getDataCacheBypasses() { + return scanStats.getOrDefault("accumulo.cache.data.bypasses", 0L); + } + + long getIndexCacheHits() { + return scanStats.getOrDefault("accumulo.cache.index.hits", 0L); + } + + long getIndexCacheMisses() { + return scanStats.getOrDefault("accumulo.cache.index.misses", 0L); + } + + long getIndexCacheBypasses() { + return scanStats.getOrDefault("accumulo.cache.index.bypasses", 0L); + } + + long getSeeks() { + return scanStats.getOrDefault("accumulo.seeks", 0L); + } + + @Override + public String toString() { + return "ScanTraceStats{" + "isBatchScan=" + isBatchScan + ", scanStats=" + scanStats + '}'; + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java new file mode 100644 index 00000000000..2074ac66caa --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import java.util.Map; + +public class SpanData { + + public final String traceId; + public final String name; + public final Map stringAttributes; + public final Map integerAttributes; + + public SpanData(String traceId, String name, Map stringAttributes, + Map integerAttributes) { + this.traceId = traceId; + this.name = name; + this.stringAttributes = stringAttributes; + this.integerAttributes = integerAttributes; + } + + @Override + public String toString() { + return "SpanData{" + "traceId='" + traceId + '\'' + ", name='" + name + '\'' + + ", stringAttributes=" + stringAttributes + ", integerAttributes=" + integerAttributes + + '}'; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java new file mode 100644 index 00000000000..7b92a1db49f --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import org.apache.commons.codec.binary.Hex; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +/** + * Open telemetry tracing data sink for testing. Processes can send http/protobuf trace data to this + * sink over http, and it will add them to an in memory queue that tests can read from. + */ +public class TraceCollector { + private final Server server; + + private final LinkedBlockingQueue spanQueue = new LinkedBlockingQueue<>(); + + private class TraceHandler extends AbstractHandler { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException { + + if (!target.equals("/v1/traces")) { + System.err.println("unexpected target : " + target); + response.setStatus(404); + response.getOutputStream().close(); + return; + } + + var body = request.getInputStream().readAllBytes(); + try { + var etsr = + io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest.parseFrom(body); + var spans = + etsr.getResourceSpansList().stream().flatMap(r -> r.getScopeSpansList().stream()) + .flatMap(r -> r.getSpansList().stream()).collect(Collectors.toList()); + + spans.forEach(s -> { + var traceId = Hex.encodeHexString(s.getTraceId().toByteArray(), true); + + Map stringAttrs = new HashMap<>(); + Map intAttrs = new HashMap<>(); + + s.getAttributesList().forEach(kv -> { + if (kv.getValue().hasIntValue()) { + intAttrs.put(kv.getKey(), kv.getValue().getIntValue()); + } else if (kv.getValue().hasStringValue()) { + stringAttrs.put(kv.getKey(), kv.getValue().getStringValue()); + } + }); + + spanQueue.add( + new SpanData(traceId, s.getName(), Map.copyOf(stringAttrs), Map.copyOf(intAttrs))); + }); + + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + + response.setStatus(200); + response.getOutputStream().close(); + } + }; + + TraceCollector(String host, int port) throws Exception { + server = new Server(new InetSocketAddress(host, port)); + server.setHandler(new TraceHandler()); + server.start(); + } + + SpanData take() throws InterruptedException { + return spanQueue.take(); + } + + void stop() throws Exception { + server.stop(); + } +} From cfb3a5de5b5274266b19a9b431247aaa73e42a62 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 18:55:41 +0000 Subject: [PATCH 10/23] fix deps --- test/pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/pom.xml b/test/pom.xml index cce8818e8f6..b31ddb771cf 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -62,6 +62,10 @@ commons-cli commons-cli + + commons-codec + commons-codec + commons-io commons-io @@ -94,6 +98,10 @@ 1.3.2-alpha + + jakarta.servlet + jakarta.servlet-api + org.apache.accumulo accumulo-compaction-coordinator @@ -206,6 +214,10 @@ org.easymock easymock + + org.eclipse.jetty + jetty-server + org.jline jline From 30a252e9540d3784ccf1ff9ddc2e66948eb449b7 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 19:53:18 +0000 Subject: [PATCH 11/23] fix build --- pom.xml | 14 ++++++++++++++ test/pom.xml | 13 +++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index e7ed0e963f8..6e07454736c 100644 --- a/pom.xml +++ b/pom.xml @@ -345,6 +345,18 @@ under the License. commons-validator 1.10.0 + + io.opentelemetry.javaagent + opentelemetry-javaagent + + 2.14.0 + + + io.opentelemetry.proto + opentelemetry-proto + + 1.3.2-alpha + junit @@ -1020,6 +1032,8 @@ under the License. org.junit.vintage:junit-vintage-engine:jar:* org.junit.jupiter:junit-jupiter-engine:jar:* org.lz4:lz4-java:jar:* + + io.opentelemetry.javaagent:opentelemetry-javaagent:jar:* diff --git a/test/pom.xml b/test/pom.xml index b31ddb771cf..4430f6d929e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -86,17 +86,9 @@ io.opentelemetry opentelemetry-context - - io.opentelemetry.javaagent - opentelemetry-javaagent - - 2.14.0 - io.opentelemetry.proto opentelemetry-proto - - 1.3.2-alpha jakarta.servlet @@ -238,6 +230,11 @@ org.slf4j slf4j-api + + io.opentelemetry.javaagent + opentelemetry-javaagent + runtime + org.apache.hadoop hadoop-client-runtime From 49c6dce85275efdf54ff55022540f2a98ee1ecdd Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 20:04:53 +0000 Subject: [PATCH 12/23] remove unused code --- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 62faaad3e59..52fdfeffaf7 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -238,7 +238,6 @@ public void assertClose(long expected, long value, double e) { public ScanTraceClient.Results run(ScanTraceClient.Options opts) throws IOException, InterruptedException { opts.clientPropsPath = getCluster().getClientPropsPath(); - new Gson().toJson(opts); var proc = getCluster().exec(ScanTraceClient.class, getJvmArgs(), new Gson().toJson(opts)); assertEquals(0, proc.getProcess().waitFor()); var out = proc.readStdOut(); From 2f95fc9731c2144c1dee04c0530dc472408ec7b4 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 20:13:23 +0000 Subject: [PATCH 13/23] fix checkstyle violations --- .../org/apache/accumulo/test/tracing/ScanTraceClient.java | 4 ++-- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 4 ++-- .../main/java/org/apache/accumulo/test/tracing/SpanData.java | 5 ++--- .../org/apache/accumulo/test/tracing/TraceCollector.java | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java index 14a4a0a16cd..2196302fea2 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java @@ -89,8 +89,8 @@ public static class Results { @Override public String toString() { - return "Results{" + "scanCount=" + scanCount + ", traceId1='" + traceId1 + '\'' - + ", traceId2='" + traceId2 + '\'' + ", scanSize=" + scanSize + '}'; + return "Results{scanCount=" + scanCount + ", traceId1='" + traceId1 + "', traceId2='" + + traceId2 + "', scanSize=" + scanSize + '}'; } } diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 52fdfeffaf7..18508a279bd 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -229,7 +229,7 @@ private void runTest(String tableName, int numSplits, boolean cacheData, public void assertClose(long expected, long value, double e) { assertTrue(Math.abs(1 - (double) expected / (double) value) < e, () -> expected + " " + value + " " + e); - }; + } /** * Runs ScanTraceClient in an external process so it can be instrumented with the open telemetry @@ -337,7 +337,7 @@ long getSeeks() { @Override public String toString() { - return "ScanTraceStats{" + "isBatchScan=" + isBatchScan + ", scanStats=" + scanStats + '}'; + return "ScanTraceStats{isBatchScan=" + isBatchScan + ", scanStats=" + scanStats + '}'; } } } diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java index 2074ac66caa..c84845275aa 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java @@ -37,8 +37,7 @@ public SpanData(String traceId, String name, Map stringAttributes @Override public String toString() { - return "SpanData{" + "traceId='" + traceId + '\'' + ", name='" + name + '\'' - + ", stringAttributes=" + stringAttributes + ", integerAttributes=" + integerAttributes - + '}'; + return "SpanData{traceId='" + traceId + "', name='" + name + "', stringAttributes=" + + stringAttributes + ", integerAttributes=" + integerAttributes + '}'; } } diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java index 7b92a1db49f..9a3b6c97010 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java @@ -88,7 +88,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques response.setStatus(200); response.getOutputStream().close(); } - }; + } TraceCollector(String host, int port) throws Exception { server = new Server(new InetSocketAddress(host, port)); From ae2abb7c78b18ae4ec36ff531c04cb3d72c5120b Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 20:32:01 +0000 Subject: [PATCH 14/23] use scan prefix --- .../tserver/tablet/ScanDataSource.java | 4 +-- .../accumulo/tserver/tablet/TabletBase.java | 22 +++++++-------- .../accumulo/test/tracing/ScanTracingIT.java | 27 ++++++++++--------- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 2aa84811e87..c5def6269ee 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -319,8 +319,8 @@ public void close(boolean sawErrors) { } private static final AttributeKey ENTRIES_READ_KEY = - AttributeKey.longKey("accumulo.entries.read"); - private static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.seeks"); + AttributeKey.longKey("accumulo.scan.entries.read"); + private static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.scan.seeks"); public void setAttributes(Span span) { if (statsIterator != null && span.isRecording()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 7817b4b1cee..4704a4c828f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -249,30 +249,30 @@ public Tablet.LookupResult lookup(List ranges, List results, } private static final AttributeKey ENTRIES_RETURNED_KEY = - AttributeKey.longKey("accumulo.entries.returned"); + AttributeKey.longKey("accumulo.scan.entries.returned"); private static final AttributeKey BYTES_RETURNED_KEY = - AttributeKey.longKey("accumulo.bytes.returned"); + AttributeKey.longKey("accumulo.scan.bytes.returned"); private static final AttributeKey BYTES_READ_KEY = - AttributeKey.longKey("accumulo.bytes.read"); + AttributeKey.longKey("accumulo.scan.bytes.read"); private static final AttributeKey BYTES_READ_FILE_KEY = - AttributeKey.longKey("accumulo.bytes.read.file"); + AttributeKey.longKey("accumulo.scan.bytes.read.file"); private static final AttributeKey EXECUTOR_KEY = - AttributeKey.stringKey("accumulo.executor"); + AttributeKey.stringKey("accumulo.scan.executor"); private static final AttributeKey TABLE_ID_KEY = AttributeKey.stringKey("accumulo.table.id"); private static final AttributeKey EXTENT_KEY = AttributeKey.stringKey("accumulo.extent"); private static final AttributeKey INDEX_HITS_KEY = - AttributeKey.longKey("accumulo.cache.index.hits"); + AttributeKey.longKey("accumulo.scan.cache.index.hits"); private static final AttributeKey INDEX_MISSES_KEY = - AttributeKey.longKey("accumulo.cache.index.misses"); + AttributeKey.longKey("accumulo.scan.cache.index.misses"); private static final AttributeKey INDEX_BYPASSES_KEY = - AttributeKey.longKey("accumulo.cache.index.bypasses"); + AttributeKey.longKey("accumulo.scan.cache.index.bypasses"); private static final AttributeKey DATA_HITS_KEY = - AttributeKey.longKey("accumulo.cache.data.hits"); + AttributeKey.longKey("accumulo.scan.cache.data.hits"); private static final AttributeKey DATA_MISSES_KEY = - AttributeKey.longKey("accumulo.cache.data.misses"); + AttributeKey.longKey("accumulo.scan.cache.data.misses"); private static final AttributeKey DATA_BYPASSES_KEY = - AttributeKey.longKey("accumulo.cache.data.bypasses");; + AttributeKey.longKey("accumulo.scan.cache.data.bypasses");; private static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); void recordScanTrace(Span span, List batch, ScanParameters scanParameters, diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 18508a279bd..247a02ec30e 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -64,6 +64,7 @@ private static List getJvmArgs() { "-Dotel.metrics.exporter=none", "-Dotel.logs.exporter=none", "-javaagent:" + javaAgent); } + @Override protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { getJvmArgs().forEach(cfg::addJvmOption); // sized such that full table scans will not fit in the cache @@ -153,7 +154,7 @@ private void runTest(String tableName, int numSplits, boolean cacheData, var stats = ScanTraceStats.create(span); if (stats != null && span.stringAttributes.get("accumulo.table.id").equals(tableId) && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { - assertEquals("default", span.stringAttributes.get("accumulo.executor")); + assertEquals("default", span.stringAttributes.get("accumulo.scan.executor")); if (numSplits == 0) { assertEquals(tableId + "<<", span.stringAttributes.get("accumulo.extent")); } else { @@ -287,52 +288,52 @@ boolean isBatchScan() { } long getEntriesRead() { - return scanStats.getOrDefault("accumulo.entries.read", 0L); + return scanStats.getOrDefault("accumulo.scan.entries.read", 0L); } long getEntriesReturned() { - return scanStats.getOrDefault("accumulo.entries.returned", 0L); + return scanStats.getOrDefault("accumulo.scan.entries.returned", 0L); } long getFileBytesRead() { - return scanStats.getOrDefault("accumulo.bytes.read.file", 0L); + return scanStats.getOrDefault("accumulo.scan.bytes.read.file", 0L); } long getBytesRead() { - return scanStats.getOrDefault("accumulo.bytes.read", 0L); + return scanStats.getOrDefault("accumulo.scan.bytes.read", 0L); } long getBytesReturned() { - return scanStats.getOrDefault("accumulo.bytes.returned", 0L); + return scanStats.getOrDefault("accumulo.scan.bytes.returned", 0L); } long getDataCacheHits() { - return scanStats.getOrDefault("accumulo.cache.data.hits", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.data.hits", 0L); } long getDataCacheMisses() { - return scanStats.getOrDefault("accumulo.cache.data.misses", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.data.misses", 0L); } long getDataCacheBypasses() { - return scanStats.getOrDefault("accumulo.cache.data.bypasses", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.data.bypasses", 0L); } long getIndexCacheHits() { - return scanStats.getOrDefault("accumulo.cache.index.hits", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.index.hits", 0L); } long getIndexCacheMisses() { - return scanStats.getOrDefault("accumulo.cache.index.misses", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.index.misses", 0L); } long getIndexCacheBypasses() { - return scanStats.getOrDefault("accumulo.cache.index.bypasses", 0L); + return scanStats.getOrDefault("accumulo.scan.cache.index.bypasses", 0L); } long getSeeks() { - return scanStats.getOrDefault("accumulo.seeks", 0L); + return scanStats.getOrDefault("accumulo.scan.seeks", 0L); } @Override From 693a2688b18484a362e2d364d1528341b97b615f Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 20:48:43 +0000 Subject: [PATCH 15/23] test different scan executors --- .../accumulo/test/tracing/ScanTracingIT.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 247a02ec30e..be3fc33f7d3 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -69,6 +70,7 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit getJvmArgs().forEach(cfg::addJvmOption); // sized such that full table scans will not fit in the cache cfg.setProperty(Property.TSERV_DATACACHE_SIZE.getKey(), "8M"); + cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "pool1.threads", "8"); } private TraceCollector collector; @@ -110,10 +112,17 @@ private void runTest(String tableName, int numSplits, boolean cacheData, ntc.withSplits(splits); } + var props = new HashMap(); + if (cacheData) { - ntc.setProperties(Map.of(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true")); + props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); } + // use a different executor for batch scans + props.put("table.scan.dispatcher.opts.multi_executor", "pool1"); + + ntc.setProperties(props); + client.tableOperations().create(tableName, ntc); TestIngest.ingest(client, ingestParams); @@ -154,7 +163,11 @@ private void runTest(String tableName, int numSplits, boolean cacheData, var stats = ScanTraceStats.create(span); if (stats != null && span.stringAttributes.get("accumulo.table.id").equals(tableId) && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { - assertEquals("default", span.stringAttributes.get("accumulo.scan.executor")); + if (stats.isBatchScan()) { + assertEquals("pool1", span.stringAttributes.get("accumulo.scan.executor")); + } else { + assertEquals("default", span.stringAttributes.get("accumulo.scan.executor")); + } if (numSplits == 0) { assertEquals(tableId + "<<", span.stringAttributes.get("accumulo.extent")); } else { From 5acf15cc9fed4baf14391545c886c43428060476 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 21:50:05 +0000 Subject: [PATCH 16/23] do some TODOs --- core/pom.xml | 1 + .../core/util/CountingInputStream.java | 29 ++++++++----------- .../accumulo/tserver/tablet/Scanner.java | 1 - 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 9710b0afc0d..78945cc5dd2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -207,6 +207,7 @@ ${accumulo.build.license.header} src/main/java/org/apache/accumulo/core/bloomfilter/*.java + src/main/java/org/apache/accumulo/core/util/CountingInputStream.java src/main/java/org/apache/accumulo/core/util/HostAndPort.java src/test/resources/*.jceks src/test/resources/org/apache/accumulo/core/file/rfile/*.rf diff --git a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java index 6691a8008a6..15d41b18d70 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java @@ -1,24 +1,16 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright (C) 2007 The Guava Authors * - * https://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ -// TODO this was copied from Guava and slightly modified, what needs to be done for the license -// TODO probably needs unit test -// TODO open upstream issue? found existing issue number #590 in guava about making the class non-final package org.apache.accumulo.core.util; import java.io.FilterInputStream; @@ -27,6 +19,9 @@ import java.util.Objects; /** + * This class was copied from Guava and modified. If this class was not final in Guava it could have + * been extended. Guava has issue 590 open about this. + * * An {@link InputStream} that counts the number of bytes read. * * @author Chris Nokleberg diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index d8c9d911150..9e2a7c7af03 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -72,7 +72,6 @@ public class Scanner { } public ScanBatch read() throws IOException, TabletClosedException { - // TODO what is the fastest way to short circuit and do nothing is there is no trace? var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch"); try (var scope = span.makeCurrent(); var scanScope = ScanInstrumentation.enable(span)) { var batchAndSource = readInternal(); From 20bf890e941cf5cb4b0a2536a59440d1fe0b74cf Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 22:09:45 +0000 Subject: [PATCH 17/23] remove todo that has issue --- .../src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 8d17402ec96..d7ff6a7c8dc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -446,7 +446,6 @@ private void removeOldTemporaryFiles( } try { - // TODO include extent if (volume.getFileSystem().delete(tmp.getPath(), false)) { log.debug("Removed old temp file {}", tmp.getPath()); } else { From 0a04cae98fc00ba5d180dfa9a4cccf8cf6ac013d Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 22:44:55 +0000 Subject: [PATCH 18/23] adds noop impl --- .../cache/InstrumentedBlockCache.java | 2 +- .../blockfile/impl/CachableBlockFile.java | 12 +- .../core/file/rfile/bcfile/BCFile.java | 8 +- .../core/trace/ScanInstrumentation.java | 149 ++++++++++-------- .../core/trace/ScanInstrumentationImpl.java | 97 ++++++++++++ .../accumulo/tserver/tablet/TabletBase.java | 18 +-- 6 files changed, 193 insertions(+), 93 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java index a46e0b2353d..402136c208e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java @@ -98,7 +98,7 @@ public Stats getStats() { public static BlockCache wrap(CacheType cacheType, BlockCache cache) { var si = ScanInstrumentation.get(); - if (cache != null && si != null) { + if (cache != null && si.enabled()) { return new InstrumentedBlockCache(cacheType, cache, si); } else { return cache; diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index a40072bb146..f694740d964 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -470,10 +470,7 @@ public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSi } private void incrementCacheBypass(CacheType cacheType) { - var si = ScanInstrumentation.get(); - if (si != null) { - si.incrementCacheBypass(cacheType); - } + ScanInstrumentation.get().incrementCacheBypass(cacheType); } @Override @@ -508,7 +505,7 @@ public static class CachedBlockRead extends DataInputStream { private static InputStream wrapForTrace(InputStream inputStream) { var scanInstrumentation = ScanInstrumentation.get(); - if (scanInstrumentation != null) { + if (scanInstrumentation.enabled()) { return new CountingInputStream(inputStream); } else { return inputStream; @@ -564,10 +561,7 @@ public void indexWeightChanged() { public void flushStats() { if (in instanceof CountingInputStream) { var cin = ((CountingInputStream) in); - var si = ScanInstrumentation.get(); - if (si != null) { - si.incrementUncompressedBytesRead(cin.getCount()); - } + ScanInstrumentation.get().incrementUncompressedBytesRead(cin.getCount()); cin.resetCount(); var src = cin.getWrappedStream(); if (src instanceof BlockReader) { diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index a46711b7202..31f32d64ae1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -484,8 +484,7 @@ public RBlockState( BoundedRangeFileInputStream boundedRangeFileInputStream = new BoundedRangeFileInputStream( fsin, this.region.getOffset(), this.region.getCompressedSize()); - var si = ScanInstrumentation.get(); - if (si != null) { + if (ScanInstrumentation.get().enabled()) { rawInputStream = new CountingInputStream(boundedRangeFileInputStream); } else { rawInputStream = boundedRangeFileInputStream; @@ -518,10 +517,7 @@ public BlockRegion getBlockRegion() { public void flushStats() { if (rawInputStream instanceof CountingInputStream) { var ci = (CountingInputStream) rawInputStream; - var si = ScanInstrumentation.get(); - if (si != null) { - si.incrementFileBytesRead(ci.getCount()); - } + ScanInstrumentation.get().incrementFileBytesRead(ci.getCount()); ci.resetCount(); } } diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java index 46c570f6ae6..8ed962c53a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -18,9 +18,6 @@ */ package org.apache.accumulo.core.trace; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.accumulo.core.spi.cache.CacheType; import com.google.common.base.Preconditions; @@ -30,81 +27,55 @@ /** * This class helps collect per scan information for the purposes of tracing. */ -public class ScanInstrumentation { - private final AtomicLong fileBytesRead = new AtomicLong(); - private final AtomicLong uncompressedBytesRead = new AtomicLong(); - private final AtomicInteger[] cacheHits = new AtomicInteger[CacheType.values().length]; - private final AtomicInteger[] cacheMisses = new AtomicInteger[CacheType.values().length]; - private final AtomicInteger[] cacheBypasses = new AtomicInteger[CacheType.values().length]; - +public abstract class ScanInstrumentation { private static final ThreadLocal INSTRUMENTED_SCANS = new ThreadLocal<>(); - private ScanInstrumentation() { - for (int i = 0; i < CacheType.values().length; i++) { - cacheHits[i] = new AtomicInteger(); - cacheMisses[i] = new AtomicInteger(); - cacheBypasses[i] = new AtomicInteger(); + private static final ScanInstrumentation NOOP_SI = new ScanInstrumentation() { + @Override + public boolean enabled() { + return false; } - } - /** - * Increments the raw bytes read directly from DFS by a scan. - * - * @param amount the amount of bytes read - */ - public void incrementFileBytesRead(long amount) { - fileBytesRead.addAndGet(amount); - } + @Override + public void incrementFileBytesRead(long amount) {} - // TODO should it be an option to cache compressed data? - /** - * Increments the uncompressed and decrypted bytes read by a scan. This will include all - * uncompressed data read by a scan regardless of if the underlying data came from cache or DFS. - */ - public void incrementUncompressedBytesRead(long amount) { - uncompressedBytesRead.addAndGet(amount); - } + @Override + public void incrementUncompressedBytesRead(long amount) {} - /** - * Increments the count of rfile blocks that were not already in the cache. - */ - public void incrementCacheMiss(CacheType cacheType) { - cacheMisses[cacheType.ordinal()].incrementAndGet(); - } + @Override + public void incrementCacheMiss(CacheType cacheType) {} - /** - * Increments the count of rfile blocks that were already in the cache. - */ - public void incrementCacheHit(CacheType cacheType) { - cacheHits[cacheType.ordinal()].incrementAndGet(); - } + @Override + public void incrementCacheHit(CacheType cacheType) {} - /** - * Increments the count of rfile blocks that were directly read from DFS bypassing the cache. - */ - public void incrementCacheBypass(CacheType cacheType) { - cacheBypasses[cacheType.ordinal()].incrementAndGet(); - } + @Override + public void incrementCacheBypass(CacheType cacheType) {} - public long getFileBytesRead() { - return fileBytesRead.get(); - } + @Override + public long getFileBytesRead() { + return 0; + } - public long getUncompressedBytesRead() { - return uncompressedBytesRead.get(); - } + @Override + public long getUncompressedBytesRead() { + return 0; + } - public int getCacheHits(CacheType cacheType) { - return cacheHits[cacheType.ordinal()].get(); - } + @Override + public int getCacheHits(CacheType cacheType) { + return 0; + } - public int getCacheMisses(CacheType cacheType) { - return cacheMisses[cacheType.ordinal()].get(); - } + @Override + public int getCacheMisses(CacheType cacheType) { + return 0; + } - public int getCacheBypasses(CacheType cacheType) { - return cacheBypasses[cacheType.ordinal()].get(); - } + @Override + public int getCacheBypasses(CacheType cacheType) { + return 0; + } + }; public interface ScanScope extends AutoCloseable { @Override @@ -113,7 +84,7 @@ public interface ScanScope extends AutoCloseable { public static ScanScope enable(Span span) { if (span.isRecording()) { - INSTRUMENTED_SCANS.set(new ScanInstrumentation()); + INSTRUMENTED_SCANS.set(new ScanInstrumentationImpl()); var id = Thread.currentThread().getId(); return () -> { Preconditions.checkState(id == Thread.currentThread().getId()); @@ -125,6 +96,50 @@ public static ScanScope enable(Span span) { } public static ScanInstrumentation get() { - return INSTRUMENTED_SCANS.get(); + var si = INSTRUMENTED_SCANS.get(); + if (si == null) { + return NOOP_SI; + } + return si; } + + public abstract boolean enabled(); + + /** + * Increments the raw bytes read directly from DFS by a scan. + * + * @param amount the amount of bytes read + */ + public abstract void incrementFileBytesRead(long amount); + + /** + * Increments the uncompressed and decrypted bytes read by a scan. This will include all + * uncompressed data read by a scan regardless of if the underlying data came from cache or DFS. + */ + public abstract void incrementUncompressedBytesRead(long amount); + + /** + * Increments the count of rfile blocks that were not already in the cache. + */ + public abstract void incrementCacheMiss(CacheType cacheType); + + /** + * Increments the count of rfile blocks that were already in the cache. + */ + public abstract void incrementCacheHit(CacheType cacheType); + + /** + * Increments the count of rfile blocks that were directly read from DFS bypassing the cache. + */ + public abstract void incrementCacheBypass(CacheType cacheType); + + public abstract long getFileBytesRead(); + + public abstract long getUncompressedBytesRead(); + + public abstract int getCacheHits(CacheType cacheType); + + public abstract int getCacheMisses(CacheType cacheType); + + public abstract int getCacheBypasses(CacheType cacheType); } diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java new file mode 100644 index 00000000000..5d6857a0274 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.spi.cache.CacheType; + +class ScanInstrumentationImpl extends ScanInstrumentation { + private final AtomicLong fileBytesRead = new AtomicLong(); + private final AtomicLong uncompressedBytesRead = new AtomicLong(); + private final AtomicInteger[] cacheHits = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheMisses = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheBypasses = new AtomicInteger[CacheType.values().length]; + + ScanInstrumentationImpl() { + for (int i = 0; i < CacheType.values().length; i++) { + cacheHits[i] = new AtomicInteger(); + cacheMisses[i] = new AtomicInteger(); + cacheBypasses[i] = new AtomicInteger(); + } + } + + @Override + public boolean enabled() { + return true; + } + + @Override + public void incrementFileBytesRead(long amount) { + fileBytesRead.addAndGet(amount); + } + + // TODO should it be an option to cache compressed data? + @Override + public void incrementUncompressedBytesRead(long amount) { + uncompressedBytesRead.addAndGet(amount); + } + + @Override + public void incrementCacheMiss(CacheType cacheType) { + cacheMisses[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public void incrementCacheHit(CacheType cacheType) { + cacheHits[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public void incrementCacheBypass(CacheType cacheType) { + cacheBypasses[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public long getFileBytesRead() { + return fileBytesRead.get(); + } + + @Override + public long getUncompressedBytesRead() { + return uncompressedBytesRead.get(); + } + + @Override + public int getCacheHits(CacheType cacheType) { + return cacheHits[cacheType.ordinal()].get(); + } + + @Override + public int getCacheMisses(CacheType cacheType) { + return cacheMisses[cacheType.ordinal()].get(); + } + + @Override + public int getCacheBypasses(CacheType cacheType) { + return cacheBypasses[cacheType.ordinal()].get(); + } + +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 4704a4c828f..65b8eee97a6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -290,16 +290,14 @@ void recordScanTrace(Span span, List batch, ScanParameters scanParamete span.setAttribute(TABLE_ID_KEY, getExtent().tableId().canonical()); span.setAttribute(EXTENT_KEY, getExtent().toString()); var si = ScanInstrumentation.get(); - if (si != null) { - span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); - span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); - span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); - span.setAttribute(INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); - span.setAttribute(INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); - span.setAttribute(DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); - span.setAttribute(DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); - span.setAttribute(DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); - } + span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); + span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); + span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); + span.setAttribute(INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); + span.setAttribute(INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); + span.setAttribute(DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); + span.setAttribute(DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); + span.setAttribute(DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); span.setAttribute(SERVER_KEY, server.getAdvertiseAddress().toString()); dataSource.setAttributes(span); From 059ea2c6a25163b730072619cc8dae96ec6a8644 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 17:45:19 -0500 Subject: [PATCH 19/23] Update test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java Co-authored-by: Dave Marion --- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index be3fc33f7d3..723e784bd41 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -48,7 +48,7 @@ class ScanTracingIT extends ConfigurableMacBase { - private static int OTLP_PORT = 12345; + private static int OTLP_PORT = PortUtils.getRandomFreePort(); private static List getJvmArgs() { String javaAgent = null; From 48c4a87f0ab0259cfa0935d7d52c6af69b4bbbe4 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 22:47:27 +0000 Subject: [PATCH 20/23] add import --- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 723e784bd41..eccbd852e5f 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.util.PortUtils; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; From 7ae602ac3669590b0eb6a4f170d9c01368b3a4ab Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 23:28:42 +0000 Subject: [PATCH 21/23] share constants --- .../accumulo/core/trace/TraceAttributes.java | 53 +++++++++++++++++ .../tserver/tablet/ScanDataSource.java | 10 +--- .../accumulo/tserver/tablet/TabletBase.java | 58 +++++-------------- .../accumulo/test/tracing/ScanTracingIT.java | 45 +++++++------- 4 files changed, 96 insertions(+), 70 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java new file mode 100644 index 00000000000..e2c5b078b79 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import io.opentelemetry.api.common.AttributeKey; + +public class TraceAttributes { + public static final AttributeKey ENTRIES_RETURNED_KEY = + AttributeKey.longKey("accumulo.scan.entries.returned"); + public static final AttributeKey BYTES_RETURNED_KEY = + AttributeKey.longKey("accumulo.scan.bytes.returned"); + public static final AttributeKey BYTES_READ_KEY = + AttributeKey.longKey("accumulo.scan.bytes.read"); + public static final AttributeKey BYTES_READ_FILE_KEY = + AttributeKey.longKey("accumulo.scan.bytes.read.file"); + public static final AttributeKey EXECUTOR_KEY = + AttributeKey.stringKey("accumulo.scan.executor"); + public static final AttributeKey TABLE_ID_KEY = + AttributeKey.stringKey("accumulo.table.id"); + public static final AttributeKey EXTENT_KEY = AttributeKey.stringKey("accumulo.extent"); + public static final AttributeKey INDEX_HITS_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.hits"); + public static final AttributeKey INDEX_MISSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.misses"); + public static final AttributeKey INDEX_BYPASSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.bypasses"); + public static final AttributeKey DATA_HITS_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.hits"); + public static final AttributeKey DATA_MISSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.misses"); + public static final AttributeKey DATA_BYPASSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.bypasses"); + public static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); + public static final AttributeKey ENTRIES_READ_KEY = + AttributeKey.longKey("accumulo.scan.entries.read"); + public static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.scan.seeks"); +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index c5def6269ee..1e6c408ee5d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.trace.TraceAttributes; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig; import org.apache.accumulo.server.fs.FileManager.ScanFileManager; @@ -59,7 +60,6 @@ import com.google.common.base.Preconditions; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; class ScanDataSource implements DataSource { @@ -318,15 +318,11 @@ public void close(boolean sawErrors) { } } - private static final AttributeKey ENTRIES_READ_KEY = - AttributeKey.longKey("accumulo.scan.entries.read"); - private static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.scan.seeks"); - public void setAttributes(Span span) { if (statsIterator != null && span.isRecording()) { statsIterator.report(true); - span.setAttribute(ENTRIES_READ_KEY, scanCounter.get()); - span.setAttribute(SEEKS_KEY, scanSeekCounter.get()); + span.setAttribute(TraceAttributes.ENTRIES_READ_KEY, scanCounter.get()); + span.setAttribute(TraceAttributes.SEEKS_KEY, scanSeekCounter.get()); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 65b8eee97a6..5ae0df785e4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.trace.TraceAttributes; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; @@ -66,7 +67,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; /** @@ -248,57 +248,31 @@ public Tablet.LookupResult lookup(List ranges, List results, } } - private static final AttributeKey ENTRIES_RETURNED_KEY = - AttributeKey.longKey("accumulo.scan.entries.returned"); - private static final AttributeKey BYTES_RETURNED_KEY = - AttributeKey.longKey("accumulo.scan.bytes.returned"); - private static final AttributeKey BYTES_READ_KEY = - AttributeKey.longKey("accumulo.scan.bytes.read"); - private static final AttributeKey BYTES_READ_FILE_KEY = - AttributeKey.longKey("accumulo.scan.bytes.read.file"); - private static final AttributeKey EXECUTOR_KEY = - AttributeKey.stringKey("accumulo.scan.executor"); - private static final AttributeKey TABLE_ID_KEY = - AttributeKey.stringKey("accumulo.table.id"); - private static final AttributeKey EXTENT_KEY = AttributeKey.stringKey("accumulo.extent"); - private static final AttributeKey INDEX_HITS_KEY = - AttributeKey.longKey("accumulo.scan.cache.index.hits"); - private static final AttributeKey INDEX_MISSES_KEY = - AttributeKey.longKey("accumulo.scan.cache.index.misses"); - private static final AttributeKey INDEX_BYPASSES_KEY = - AttributeKey.longKey("accumulo.scan.cache.index.bypasses"); - private static final AttributeKey DATA_HITS_KEY = - AttributeKey.longKey("accumulo.scan.cache.data.hits"); - private static final AttributeKey DATA_MISSES_KEY = - AttributeKey.longKey("accumulo.scan.cache.data.misses"); - private static final AttributeKey DATA_BYPASSES_KEY = - AttributeKey.longKey("accumulo.scan.cache.data.bypasses");; - private static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); - void recordScanTrace(Span span, List batch, ScanParameters scanParameters, ScanDataSource dataSource) { if (span.isRecording()) { // TODO in testing could not get really large batches, even when increasing table and // client settings - span.setAttribute(ENTRIES_RETURNED_KEY, batch.size()); + span.setAttribute(TraceAttributes.ENTRIES_RETURNED_KEY, batch.size()); long bytesReturned = 0; for (var e : batch) { bytesReturned += e.getKey().getLength() + e.getValue().get().length; } - span.setAttribute(BYTES_RETURNED_KEY, bytesReturned); - span.setAttribute(EXECUTOR_KEY, scanParameters.getScanDispatch().getExecutorName()); - span.setAttribute(TABLE_ID_KEY, getExtent().tableId().canonical()); - span.setAttribute(EXTENT_KEY, getExtent().toString()); + span.setAttribute(TraceAttributes.BYTES_RETURNED_KEY, bytesReturned); + span.setAttribute(TraceAttributes.EXECUTOR_KEY, + scanParameters.getScanDispatch().getExecutorName()); + span.setAttribute(TraceAttributes.TABLE_ID_KEY, getExtent().tableId().canonical()); + span.setAttribute(TraceAttributes.EXTENT_KEY, getExtent().toString()); var si = ScanInstrumentation.get(); - span.setAttribute(BYTES_READ_FILE_KEY, si.getFileBytesRead()); - span.setAttribute(BYTES_READ_KEY, si.getUncompressedBytesRead()); - span.setAttribute(INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); - span.setAttribute(INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); - span.setAttribute(INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); - span.setAttribute(DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); - span.setAttribute(DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); - span.setAttribute(DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); - span.setAttribute(SERVER_KEY, server.getAdvertiseAddress().toString()); + span.setAttribute(TraceAttributes.BYTES_READ_FILE_KEY, si.getFileBytesRead()); + span.setAttribute(TraceAttributes.BYTES_READ_KEY, si.getUncompressedBytesRead()); + span.setAttribute(TraceAttributes.INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); + span.setAttribute(TraceAttributes.INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); + span.setAttribute(TraceAttributes.INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); + span.setAttribute(TraceAttributes.DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); + span.setAttribute(TraceAttributes.DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); + span.setAttribute(TraceAttributes.DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); + span.setAttribute(TraceAttributes.SERVER_KEY, server.getAdvertiseAddress().toString()); dataSource.setAttributes(span); } diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index eccbd852e5f..b0e32bcce29 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.tracing; +import static org.apache.accumulo.core.trace.TraceAttributes.EXECUTOR_KEY; +import static org.apache.accumulo.core.trace.TraceAttributes.EXTENT_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,6 +37,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.trace.TraceAttributes; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.PortUtils; import org.apache.accumulo.test.TestIngest; @@ -49,7 +52,7 @@ class ScanTracingIT extends ConfigurableMacBase { - private static int OTLP_PORT = PortUtils.getRandomFreePort(); + private static final int OTLP_PORT = PortUtils.getRandomFreePort(); private static List getJvmArgs() { String javaAgent = null; @@ -162,26 +165,27 @@ private void runTest(String tableName, int numSplits, boolean cacheData, || batchScanStats.getEntriesReturned() < expectedRows * expectedColumns) { var span = collector.take(); var stats = ScanTraceStats.create(span); - if (stats != null && span.stringAttributes.get("accumulo.table.id").equals(tableId) + if (stats != null + && span.stringAttributes.get(TraceAttributes.TABLE_ID_KEY.getKey()).equals(tableId) && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { if (stats.isBatchScan()) { - assertEquals("pool1", span.stringAttributes.get("accumulo.scan.executor")); + assertEquals("pool1", span.stringAttributes.get(EXECUTOR_KEY.getKey())); } else { - assertEquals("default", span.stringAttributes.get("accumulo.scan.executor")); + assertEquals("default", span.stringAttributes.get(EXECUTOR_KEY.getKey())); } if (numSplits == 0) { - assertEquals(tableId + "<<", span.stringAttributes.get("accumulo.extent")); + assertEquals(tableId + "<<", span.stringAttributes.get(EXTENT_KEY.getKey())); } else { - var extent = span.stringAttributes.get("accumulo.extent"); + var extent = span.stringAttributes.get(EXTENT_KEY.getKey()); assertTrue(extent.startsWith(tableId + ";") || extent.startsWith(tableId + "<")); } assertEquals(1, stats.getSeeks()); if (stats.isBatchScan()) { assertEquals(results.traceId1, span.traceId); - extents1.add(span.stringAttributes.get("accumulo.extent")); + extents1.add(span.stringAttributes.get(EXTENT_KEY.getKey())); } else { assertEquals(results.traceId2, span.traceId); - extents2.add(span.stringAttributes.get("accumulo.extent")); + extents2.add(span.stringAttributes.get(EXTENT_KEY.getKey())); } } else { continue; @@ -302,52 +306,51 @@ boolean isBatchScan() { } long getEntriesRead() { - return scanStats.getOrDefault("accumulo.scan.entries.read", 0L); + return scanStats.getOrDefault(TraceAttributes.ENTRIES_READ_KEY.getKey(), 0L); } long getEntriesReturned() { - return scanStats.getOrDefault("accumulo.scan.entries.returned", 0L); - + return scanStats.getOrDefault(TraceAttributes.ENTRIES_RETURNED_KEY.getKey(), 0L); } long getFileBytesRead() { - return scanStats.getOrDefault("accumulo.scan.bytes.read.file", 0L); + return scanStats.getOrDefault(TraceAttributes.BYTES_READ_FILE_KEY.getKey(), 0L); } long getBytesRead() { - return scanStats.getOrDefault("accumulo.scan.bytes.read", 0L); + return scanStats.getOrDefault(TraceAttributes.BYTES_READ_KEY.getKey(), 0L); } long getBytesReturned() { - return scanStats.getOrDefault("accumulo.scan.bytes.returned", 0L); + return scanStats.getOrDefault(TraceAttributes.BYTES_RETURNED_KEY.getKey(), 0L); } long getDataCacheHits() { - return scanStats.getOrDefault("accumulo.scan.cache.data.hits", 0L); + return scanStats.getOrDefault(TraceAttributes.DATA_HITS_KEY.getKey(), 0L); } long getDataCacheMisses() { - return scanStats.getOrDefault("accumulo.scan.cache.data.misses", 0L); + return scanStats.getOrDefault(TraceAttributes.DATA_MISSES_KEY.getKey(), 0L); } long getDataCacheBypasses() { - return scanStats.getOrDefault("accumulo.scan.cache.data.bypasses", 0L); + return scanStats.getOrDefault(TraceAttributes.DATA_BYPASSES_KEY.getKey(), 0L); } long getIndexCacheHits() { - return scanStats.getOrDefault("accumulo.scan.cache.index.hits", 0L); + return scanStats.getOrDefault(TraceAttributes.INDEX_HITS_KEY.getKey(), 0L); } long getIndexCacheMisses() { - return scanStats.getOrDefault("accumulo.scan.cache.index.misses", 0L); + return scanStats.getOrDefault(TraceAttributes.INDEX_MISSES_KEY.getKey(), 0L); } long getIndexCacheBypasses() { - return scanStats.getOrDefault("accumulo.scan.cache.index.bypasses", 0L); + return scanStats.getOrDefault(TraceAttributes.INDEX_BYPASSES_KEY.getKey(), 0L); } long getSeeks() { - return scanStats.getOrDefault("accumulo.scan.seeks", 0L); + return scanStats.getOrDefault(TraceAttributes.SEEKS_KEY.getKey(), 0L); } @Override From 026d8a312eadfde50b8a0fc9ebcbb4565a1ec3ec Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 15 Dec 2025 23:40:34 +0000 Subject: [PATCH 22/23] test server span attribute --- .../apache/accumulo/test/tracing/ScanTracingIT.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index b0e32bcce29..1d4e70d3024 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -32,11 +32,14 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.trace.TraceAttributes; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.PortUtils; @@ -156,6 +159,10 @@ private void runTest(String tableName, int numSplits, boolean cacheData, var tableId = getServerContext().getTableId(tableName).canonical(); + var expectedServers = getServerContext().getAmple().readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream() + .map(tm -> tm.getLocation().getHostAndPort().toString()).collect(Collectors.toSet()); + ScanTraceStats scanStats = new ScanTraceStats(false); ScanTraceStats batchScanStats = new ScanTraceStats(true); Set extents1 = new TreeSet<>(); @@ -168,6 +175,10 @@ private void runTest(String tableName, int numSplits, boolean cacheData, if (stats != null && span.stringAttributes.get(TraceAttributes.TABLE_ID_KEY.getKey()).equals(tableId) && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { + assertTrue( + expectedServers + .contains(span.stringAttributes.get(TraceAttributes.SERVER_KEY.getKey())), + () -> expectedServers + " " + span); if (stats.isBatchScan()) { assertEquals("pool1", span.stringAttributes.get(EXECUTOR_KEY.getKey())); } else { From 44e68a10c8dfba4bb6a84796ea08353424bfa69b Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 16 Dec 2025 19:39:16 +0000 Subject: [PATCH 23/23] removed TODOs after opening issues --- .../apache/accumulo/core/trace/ScanInstrumentationImpl.java | 1 - .../java/org/apache/accumulo/tserver/tablet/TabletBase.java | 2 -- .../java/org/apache/accumulo/test/tracing/ScanTracingIT.java | 3 --- 3 files changed, 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java index 5d6857a0274..67754ee738d 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java @@ -48,7 +48,6 @@ public void incrementFileBytesRead(long amount) { fileBytesRead.addAndGet(amount); } - // TODO should it be an option to cache compressed data? @Override public void incrementUncompressedBytesRead(long amount) { uncompressedBytesRead.addAndGet(amount); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 5ae0df785e4..0ac8f6bd141 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -251,8 +251,6 @@ public Tablet.LookupResult lookup(List ranges, List results, void recordScanTrace(Span span, List batch, ScanParameters scanParameters, ScanDataSource dataSource) { if (span.isRecording()) { - // TODO in testing could not get really large batches, even when increasing table and - // client settings span.setAttribute(TraceAttributes.ENTRIES_RETURNED_KEY, batch.size()); long bytesReturned = 0; for (var e : batch) { diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java index 1d4e70d3024..8d0fe70bba0 100644 --- a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -246,9 +246,6 @@ private void runTest(String tableName, int numSplits, boolean cacheData, assertEquals(0, stats.getDataCacheHits(), stats::toString); assertEquals(0, stats.getDataCacheMisses(), stats::toString); assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(), stats::toString); - // When not caching data, will go to the index cache each time a block location is looked - // up. TODO why is this happening? keeps getting the RootData metablock for every data - // block. assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(), .05); } assertEquals(0, stats.getIndexCacheBypasses(), stats::toString);