diff --git a/core/pom.xml b/core/pom.xml index 9710b0afc0d..78945cc5dd2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -207,6 +207,7 @@ ${accumulo.build.license.header} src/main/java/org/apache/accumulo/core/bloomfilter/*.java + src/main/java/org/apache/accumulo/core/util/CountingInputStream.java src/main/java/org/apache/accumulo/core/util/HostAndPort.java src/test/resources/*.jceks src/test/resources/org/apache/accumulo/core/file/rfile/*.rf diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java new file mode 100644 index 00000000000..53c2ce9e7a3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.logging.LoggingBlockCache; +import org.apache.accumulo.core.spi.cache.BlockCache; +import org.apache.accumulo.core.spi.cache.CacheType; + +public class BlockCacheUtil { + public static BlockCache instrument(CacheType type, BlockCache cache) { + if (cache == null) { + return null; + } + + if (cache instanceof InstrumentedBlockCache || cache instanceof LoggingBlockCache) { + // its already instrumented + return cache; + } + + return LoggingBlockCache.wrap(type, InstrumentedBlockCache.wrap(type, cache)); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java new file mode 100644 index 00000000000..402136c208e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/InstrumentedBlockCache.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.Map; + +import org.apache.accumulo.core.spi.cache.BlockCache; +import org.apache.accumulo.core.spi.cache.CacheEntry; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.trace.ScanInstrumentation; + +public class InstrumentedBlockCache implements BlockCache { + + private final BlockCache blockCache; + private final ScanInstrumentation scanInstrumentation; + private final CacheType cacheType; + + public InstrumentedBlockCache(CacheType cacheType, BlockCache blockCache, + ScanInstrumentation scanInstrumentation) { + this.blockCache = blockCache; + this.scanInstrumentation = scanInstrumentation; + this.cacheType = cacheType; + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf) { + return blockCache.cacheBlock(blockName, buf); + } + + @Override + public CacheEntry getBlock(String blockName) { + return blockCache.getBlock(blockName); + } + + private final class CountingLoader implements Loader { + + private final Loader loader; + int loadCount = 0; + + private CountingLoader(Loader loader) { + this.loader = loader; + } + + @Override + public Map getDependencies() { + return loader.getDependencies(); + } + + @Override + public byte[] load(int maxSize, Map dependencies) { + loadCount++; + return loader.load(maxSize, dependencies); + } + } + + @Override + public CacheEntry getBlock(String blockName, Loader loader) { + var cl = new CountingLoader(loader); + var ce = blockCache.getBlock(blockName, cl); + if (cl.loadCount == 0 && ce != null) { + scanInstrumentation.incrementCacheHit(cacheType); + } else { + scanInstrumentation.incrementCacheMiss(cacheType); + } + return ce; + } + + @Override + public long getMaxHeapSize() { + return blockCache.getMaxHeapSize(); + } + + @Override + public long getMaxSize() { + return blockCache.getMaxSize(); + } + + @Override + public Stats getStats() { + return blockCache.getStats(); + } + + public static BlockCache wrap(CacheType cacheType, BlockCache cache) { + var si = ScanInstrumentation.get(); + if (cache != null && si.enabled()) { + return new InstrumentedBlockCache(cacheType, cache, si); + } else { + return cache; + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 854c74154b5..f694740d964 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -38,7 +38,10 @@ import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCache.Loader; import org.apache.accumulo.core.spi.cache.CacheEntry; +import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.crypto.CryptoService; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.util.CountingInputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -405,6 +408,7 @@ public CachedBlockRead getMetaBlock(String blockName) throws IOException { } BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName); + incrementCacheBypass(CacheType.INDEX); return new CachedBlockRead(_currBlock); } @@ -421,6 +425,7 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi } BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize); + incrementCacheBypass(CacheType.INDEX); return new CachedBlockRead(_currBlock); } @@ -443,6 +448,7 @@ public CachedBlockRead getDataBlock(int blockIndex) throws IOException { } BlockReader _currBlock = getBCFile().getDataBlock(blockIndex); + incrementCacheBypass(CacheType.DATA); return new CachedBlockRead(_currBlock); } @@ -459,9 +465,14 @@ public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSi } BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize); + incrementCacheBypass(CacheType.DATA); return new CachedBlockRead(_currBlock); } + private void incrementCacheBypass(CacheType cacheType) { + ScanInstrumentation.get().incrementCacheBypass(cacheType); + } + @Override public synchronized void close() throws IOException { if (closed) { @@ -491,12 +502,22 @@ public void setCacheProvider(CacheProvider cacheProvider) { } public static class CachedBlockRead extends DataInputStream { + + private static InputStream wrapForTrace(InputStream inputStream) { + var scanInstrumentation = ScanInstrumentation.get(); + if (scanInstrumentation.enabled()) { + return new CountingInputStream(inputStream); + } else { + return inputStream; + } + } + private final SeekableByteArrayInputStream seekableInput; private final CacheEntry cb; boolean indexable; public CachedBlockRead(InputStream in) { - super(in); + super(wrapForTrace(in)); cb = null; seekableInput = null; indexable = false; @@ -507,7 +528,7 @@ public CachedBlockRead(CacheEntry cb, byte[] buf) { } private CachedBlockRead(SeekableByteArrayInputStream seekableInput, CacheEntry cb) { - super(seekableInput); + super(wrapForTrace(seekableInput)); this.seekableInput = seekableInput; this.cb = cb; indexable = true; @@ -536,5 +557,25 @@ public BlockIndex getIndex(Supplier indexSupplier) { public void indexWeightChanged() { cb.indexWeightChanged(); } + + public void flushStats() { + if (in instanceof CountingInputStream) { + var cin = ((CountingInputStream) in); + ScanInstrumentation.get().incrementUncompressedBytesRead(cin.getCount()); + cin.resetCount(); + var src = cin.getWrappedStream(); + if (src instanceof BlockReader) { + var br = (BlockReader) src; + br.flushStats(); + } + + } + } + + @Override + public void close() throws IOException { + flushStats(); + super.close(); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java index 78b8600b378..155b8743264 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/ScanCacheProvider.java @@ -20,7 +20,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.logging.LoggingBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.CacheType; import org.apache.accumulo.core.spi.scan.ScanDispatch; @@ -33,8 +33,8 @@ public class ScanCacheProvider implements CacheProvider { public ScanCacheProvider(AccumuloConfiguration tableConfig, ScanDispatch dispatch, BlockCache indexCache, BlockCache dataCache) { - var loggingIndexCache = LoggingBlockCache.wrap(CacheType.INDEX, indexCache); - var loggingDataCache = LoggingBlockCache.wrap(CacheType.DATA, dataCache); + var loggingIndexCache = BlockCacheUtil.instrument(CacheType.INDEX, indexCache); + var loggingDataCache = BlockCacheUtil.instrument(CacheType.DATA, dataCache); switch (dispatch.getIndexCacheUsage()) { case ENABLED: diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 68e2be016d1..9e2d4633c26 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -838,7 +838,6 @@ public void next() throws IOException { } private void _next() throws IOException { - if (!hasTop) { throw new IllegalStateException(); } @@ -1164,6 +1163,12 @@ public void setCacheProvider(CacheProvider cacheProvider) { public long estimateOverlappingEntries(KeyExtent extent) throws IOException { throw new UnsupportedOperationException(); } + + public void flushStats() { + if (currBlock != null) { + currBlock.flushStats(); + } + } } public static class Reader extends HeapIterator implements FileSKVIterator { @@ -1304,6 +1309,9 @@ private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOExce @Override public void closeDeepCopies() throws IOException { + for (LocalityGroupReader lgr : currentReaders) { + lgr.flushStats(); + } closeDeepCopies(false); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index aca53e495c3..31f32d64ae1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -47,6 +47,8 @@ import org.apache.accumulo.core.spi.crypto.FileEncrypter; import org.apache.accumulo.core.spi.crypto.NoFileDecrypter; import org.apache.accumulo.core.spi.crypto.NoFileEncrypter; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.util.CountingInputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -468,6 +470,7 @@ private static final class RBlockState { private final CompressionAlgorithm compressAlgo; private Decompressor decompressor; private final BlockRegion region; + private final InputStream rawInputStream; private final InputStream in; private volatile boolean closed; @@ -481,9 +484,14 @@ public RBlockState( BoundedRangeFileInputStream boundedRangeFileInputStream = new BoundedRangeFileInputStream( fsin, this.region.getOffset(), this.region.getCompressedSize()); + if (ScanInstrumentation.get().enabled()) { + rawInputStream = new CountingInputStream(boundedRangeFileInputStream); + } else { + rawInputStream = boundedRangeFileInputStream; + } + try { - InputStream inputStreamToBeCompressed = - decrypter.decryptStream(boundedRangeFileInputStream); + InputStream inputStreamToBeCompressed = decrypter.decryptStream(rawInputStream); this.in = compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor, getFSInputBufferSize(conf)); } catch (IOException e) { @@ -506,11 +514,20 @@ public BlockRegion getBlockRegion() { return region; } + public void flushStats() { + if (rawInputStream instanceof CountingInputStream) { + var ci = (CountingInputStream) rawInputStream; + ScanInstrumentation.get().incrementFileBytesRead(ci.getCount()); + ci.resetCount(); + } + } + public void finish() throws IOException { synchronized (in) { if (!closed) { try { in.close(); + flushStats(); } finally { closed = true; if (decompressor != null) { @@ -538,6 +555,10 @@ public static class BlockReader extends DataInputStream { rBlkState = rbs; } + public void flushStats() { + rBlkState.flushStats(); + } + /** * Finishing reading the block. Release all resources. */ diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java index 47a08819731..a2f8da5975f 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java @@ -19,7 +19,10 @@ package org.apache.accumulo.core.iteratorsImpl.system; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -34,15 +37,21 @@ public class StatsIterator extends ServerWrappingIterator { private int numRead = 0; - private final AtomicLong seekCounter; + private final AtomicLong scanSeekCounter; + private final LongAdder serverSeekCounter; private final AtomicLong scanCounter; + private final LongAdder tabletScanCounter; private final LongAdder serverScanCounter; + private final List deepCopies = Collections.synchronizedList(new ArrayList<>()); - public StatsIterator(SortedKeyValueIterator source, AtomicLong seekCounter, - AtomicLong tabletScanCounter, LongAdder serverScanCounter) { + public StatsIterator(SortedKeyValueIterator source, AtomicLong scanSeekCounter, + LongAdder serverSeekCounter, AtomicLong scanCounter, LongAdder tabletScanCounter, + LongAdder serverScanCounter) { super(source); - this.seekCounter = seekCounter; - this.scanCounter = tabletScanCounter; + this.scanSeekCounter = scanSeekCounter; + this.serverSeekCounter = serverSeekCounter; + this.scanCounter = scanCounter; + this.tabletScanCounter = tabletScanCounter; this.serverScanCounter = serverScanCounter; } @@ -51,31 +60,43 @@ public void next() throws IOException { source.next(); numRead++; - if (numRead % 23 == 0) { - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + if (numRead % 1009 == 0) { + // only report on self, do not force deep copies to report + report(false); } } @Override public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - return new StatsIterator(source.deepCopy(env), seekCounter, scanCounter, serverScanCounter); + var deepCopy = new StatsIterator(source.deepCopy(env), scanSeekCounter, serverSeekCounter, + scanCounter, tabletScanCounter, serverScanCounter); + deepCopies.add(deepCopy); + return deepCopy; } @Override public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { source.seek(range, columnFamilies, inclusive); - seekCounter.incrementAndGet(); - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + serverSeekCounter.increment(); + scanSeekCounter.incrementAndGet(); + // only report on self, do not force deep copies to report + report(false); } - public void report() { - scanCounter.addAndGet(numRead); - serverScanCounter.add(numRead); - numRead = 0; + public void report(boolean reportDeepCopies) { + if (numRead > 0) { + scanCounter.addAndGet(numRead); + tabletScanCounter.add(numRead); + serverScanCounter.add(numRead); + numRead = 0; + } + + if (reportDeepCopies) { + // recurse down the fat tree of deep copies forcing them to report + for (var deepCopy : deepCopies) { + deepCopy.report(true); + } + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index cce1044e65d..83c06339747 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -32,12 +32,12 @@ import org.apache.accumulo.core.client.rfile.RFileSource; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.file.NoSuchMetaStoreException; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheUtil; import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachableBuilder; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; -import org.apache.accumulo.core.logging.LoggingBlockCache; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.CacheEntry; import org.apache.accumulo.core.spi.cache.CacheType; @@ -194,8 +194,8 @@ public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFa // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when // only summary data is wanted. CompositeCache compositeCache = - new CompositeCache(LoggingBlockCache.wrap(CacheType.SUMMARY, summaryCache), - LoggingBlockCache.wrap(CacheType.INDEX, indexCache)); + new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, summaryCache), + BlockCacheUtil.instrument(CacheType.INDEX, indexCache)); CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache) .cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService); bcReader = new CachableBlockFile.Reader(cb); diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java new file mode 100644 index 00000000000..8ed962c53a8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentation.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import org.apache.accumulo.core.spi.cache.CacheType; + +import com.google.common.base.Preconditions; + +import io.opentelemetry.api.trace.Span; + +/** + * This class helps collect per scan information for the purposes of tracing. + */ +public abstract class ScanInstrumentation { + private static final ThreadLocal INSTRUMENTED_SCANS = new ThreadLocal<>(); + + private static final ScanInstrumentation NOOP_SI = new ScanInstrumentation() { + @Override + public boolean enabled() { + return false; + } + + @Override + public void incrementFileBytesRead(long amount) {} + + @Override + public void incrementUncompressedBytesRead(long amount) {} + + @Override + public void incrementCacheMiss(CacheType cacheType) {} + + @Override + public void incrementCacheHit(CacheType cacheType) {} + + @Override + public void incrementCacheBypass(CacheType cacheType) {} + + @Override + public long getFileBytesRead() { + return 0; + } + + @Override + public long getUncompressedBytesRead() { + return 0; + } + + @Override + public int getCacheHits(CacheType cacheType) { + return 0; + } + + @Override + public int getCacheMisses(CacheType cacheType) { + return 0; + } + + @Override + public int getCacheBypasses(CacheType cacheType) { + return 0; + } + }; + + public interface ScanScope extends AutoCloseable { + @Override + void close(); + } + + public static ScanScope enable(Span span) { + if (span.isRecording()) { + INSTRUMENTED_SCANS.set(new ScanInstrumentationImpl()); + var id = Thread.currentThread().getId(); + return () -> { + Preconditions.checkState(id == Thread.currentThread().getId()); + INSTRUMENTED_SCANS.remove(); + }; + } else { + return () -> {}; + } + } + + public static ScanInstrumentation get() { + var si = INSTRUMENTED_SCANS.get(); + if (si == null) { + return NOOP_SI; + } + return si; + } + + public abstract boolean enabled(); + + /** + * Increments the raw bytes read directly from DFS by a scan. + * + * @param amount the amount of bytes read + */ + public abstract void incrementFileBytesRead(long amount); + + /** + * Increments the uncompressed and decrypted bytes read by a scan. This will include all + * uncompressed data read by a scan regardless of if the underlying data came from cache or DFS. + */ + public abstract void incrementUncompressedBytesRead(long amount); + + /** + * Increments the count of rfile blocks that were not already in the cache. + */ + public abstract void incrementCacheMiss(CacheType cacheType); + + /** + * Increments the count of rfile blocks that were already in the cache. + */ + public abstract void incrementCacheHit(CacheType cacheType); + + /** + * Increments the count of rfile blocks that were directly read from DFS bypassing the cache. + */ + public abstract void incrementCacheBypass(CacheType cacheType); + + public abstract long getFileBytesRead(); + + public abstract long getUncompressedBytesRead(); + + public abstract int getCacheHits(CacheType cacheType); + + public abstract int getCacheMisses(CacheType cacheType); + + public abstract int getCacheBypasses(CacheType cacheType); +} diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java new file mode 100644 index 00000000000..67754ee738d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/ScanInstrumentationImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.spi.cache.CacheType; + +class ScanInstrumentationImpl extends ScanInstrumentation { + private final AtomicLong fileBytesRead = new AtomicLong(); + private final AtomicLong uncompressedBytesRead = new AtomicLong(); + private final AtomicInteger[] cacheHits = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheMisses = new AtomicInteger[CacheType.values().length]; + private final AtomicInteger[] cacheBypasses = new AtomicInteger[CacheType.values().length]; + + ScanInstrumentationImpl() { + for (int i = 0; i < CacheType.values().length; i++) { + cacheHits[i] = new AtomicInteger(); + cacheMisses[i] = new AtomicInteger(); + cacheBypasses[i] = new AtomicInteger(); + } + } + + @Override + public boolean enabled() { + return true; + } + + @Override + public void incrementFileBytesRead(long amount) { + fileBytesRead.addAndGet(amount); + } + + @Override + public void incrementUncompressedBytesRead(long amount) { + uncompressedBytesRead.addAndGet(amount); + } + + @Override + public void incrementCacheMiss(CacheType cacheType) { + cacheMisses[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public void incrementCacheHit(CacheType cacheType) { + cacheHits[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public void incrementCacheBypass(CacheType cacheType) { + cacheBypasses[cacheType.ordinal()].incrementAndGet(); + } + + @Override + public long getFileBytesRead() { + return fileBytesRead.get(); + } + + @Override + public long getUncompressedBytesRead() { + return uncompressedBytesRead.get(); + } + + @Override + public int getCacheHits(CacheType cacheType) { + return cacheHits[cacheType.ordinal()].get(); + } + + @Override + public int getCacheMisses(CacheType cacheType) { + return cacheMisses[cacheType.ordinal()].get(); + } + + @Override + public int getCacheBypasses(CacheType cacheType) { + return cacheBypasses[cacheType.ordinal()].get(); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java new file mode 100644 index 00000000000..e2c5b078b79 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceAttributes.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import io.opentelemetry.api.common.AttributeKey; + +public class TraceAttributes { + public static final AttributeKey ENTRIES_RETURNED_KEY = + AttributeKey.longKey("accumulo.scan.entries.returned"); + public static final AttributeKey BYTES_RETURNED_KEY = + AttributeKey.longKey("accumulo.scan.bytes.returned"); + public static final AttributeKey BYTES_READ_KEY = + AttributeKey.longKey("accumulo.scan.bytes.read"); + public static final AttributeKey BYTES_READ_FILE_KEY = + AttributeKey.longKey("accumulo.scan.bytes.read.file"); + public static final AttributeKey EXECUTOR_KEY = + AttributeKey.stringKey("accumulo.scan.executor"); + public static final AttributeKey TABLE_ID_KEY = + AttributeKey.stringKey("accumulo.table.id"); + public static final AttributeKey EXTENT_KEY = AttributeKey.stringKey("accumulo.extent"); + public static final AttributeKey INDEX_HITS_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.hits"); + public static final AttributeKey INDEX_MISSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.misses"); + public static final AttributeKey INDEX_BYPASSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.index.bypasses"); + public static final AttributeKey DATA_HITS_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.hits"); + public static final AttributeKey DATA_MISSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.misses"); + public static final AttributeKey DATA_BYPASSES_KEY = + AttributeKey.longKey("accumulo.scan.cache.data.bypasses"); + public static final AttributeKey SERVER_KEY = AttributeKey.stringKey("accumulo.server"); + public static final AttributeKey ENTRIES_READ_KEY = + AttributeKey.longKey("accumulo.scan.entries.read"); + public static final AttributeKey SEEKS_KEY = AttributeKey.longKey("accumulo.scan.seeks"); +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java new file mode 100644 index 00000000000..15d41b18d70 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.util; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +/** + * This class was copied from Guava and modified. If this class was not final in Guava it could have + * been extended. Guava has issue 590 open about this. + * + * An {@link InputStream} that counts the number of bytes read. + * + * @author Chris Nokleberg + */ +public final class CountingInputStream extends FilterInputStream { + + private long count; + private long mark = -1; + + /** + * Wraps another input stream, counting the number of bytes read. + * + * @param in the input stream to be wrapped + */ + public CountingInputStream(InputStream in) { + super(Objects.requireNonNull(in)); + } + + /** Returns the number of bytes read. */ + public long getCount() { + return count; + } + + /** Resets the count of bytes read to zero */ + public void resetCount() { + count = 0; + } + + /** Returns the input stream this is wrapping */ + public InputStream getWrappedStream() { + return in; + } + + @Override + public int read() throws IOException { + int result = in.read(); + if (result != -1) { + count++; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = in.read(b, off, len); + if (result != -1) { + count += result; + } + return result; + } + + @Override + public long skip(long n) throws IOException { + long result = in.skip(n); + count += result; + return result; + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + mark = count; + // it's okay to mark even if mark isn't supported, as reset won't work + } + + @Override + public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + count = mark; + } +} diff --git a/pom.xml b/pom.xml index e7ed0e963f8..6e07454736c 100644 --- a/pom.xml +++ b/pom.xml @@ -345,6 +345,18 @@ under the License. commons-validator 1.10.0 + + io.opentelemetry.javaagent + opentelemetry-javaagent + + 2.14.0 + + + io.opentelemetry.proto + opentelemetry-proto + + 1.3.2-alpha + junit @@ -1020,6 +1032,8 @@ under the License. org.junit.vintage:junit-vintage-engine:jar:* org.junit.jupiter:junit-jupiter-engine:jar:* org.lz4:lz4-java:jar:* + + io.opentelemetry.javaagent:opentelemetry-javaagent:jar:* diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 3715eac0c24..8226f20dd33 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -24,6 +24,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.scan.ScanServerInfo; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; @@ -61,4 +62,6 @@ public interface TabletHostingServer { GarbageCollectionLogger getGcLogger(); BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf); + + HostAndPort getAdvertiseAddress(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 2573d6b4514..25ea5276729 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; @@ -227,7 +228,7 @@ public TabletServerMinCMetrics getMinCMetrics() { private String lockID; private volatile long lockSessionId = -1; - public static final AtomicLong seekCount = new AtomicLong(0); + public static final LongAdder seekCount = new LongAdder(); private final AtomicLong totalMinorCompactions = new AtomicLong(0); @@ -1201,7 +1202,7 @@ public TabletServerStatus getStats(Map> scanCou result.osLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); result.name = getClientAddressString(); result.holdTime = resourceManager.holdTime(); - result.lookups = seekCount.get(); + result.lookups = seekCount.sum(); result.indexCacheHits = resourceManager.getIndexCache().getStats().hitCount(); result.indexCacheRequest = resourceManager.getIndexCache().getStats().requestCount(); result.dataCacheHits = resourceManager.getDataCache().getStats().hitCount(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..1e6c408ee5d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.trace.TraceAttributes; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig; import org.apache.accumulo.server.fs.FileManager.ScanFileManager; @@ -57,6 +58,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + +import io.opentelemetry.api.trace.Span; + class ScanDataSource implements DataSource { private static final Logger log = LoggerFactory.getLogger(ScanDataSource.class); @@ -67,7 +72,7 @@ class ScanDataSource implements DataSource { private SortedKeyValueIterator iter; private long expectedDeletionCount; private List memIters = null; - private long fileReservationId; + private long fileReservationId = -1; private final AtomicBoolean interruptFlag; private StatsIterator statsIterator; @@ -76,6 +81,9 @@ class ScanDataSource implements DataSource { private final byte[] defaultLabels; private final long scanDataSourceId; + private final AtomicLong scanSeekCounter; + private final AtomicLong scanCounter; + ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean loadIters, AtomicBoolean interruptFlag) { this.tablet = tablet; @@ -85,6 +93,8 @@ class ScanDataSource implements DataSource { this.loadIters = loadIters; this.defaultLabels = tablet.getDefaultSecurityLabels(); this.scanDataSourceId = nextSourceId.incrementAndGet(); + this.scanSeekCounter = new AtomicLong(); + this.scanCounter = new AtomicLong(); log.trace("new scan data source, scanId {}, tablet: {}, params: {}, loadIterators: {}", this.scanDataSourceId, this.tablet, this.scanParams, this.loadIters); } @@ -111,6 +121,9 @@ public DataSource getNewDataSource() { } finally { expectedDeletionCount = tablet.getDataSourceDeletions(); iter = null; + if (statsIterator != null) { + statsIterator.report(true); + } } } } @@ -174,6 +187,7 @@ private SortedKeyValueIterator createIterator() memIters = tablet.getMemIterators(samplerConfig); Pair> reservation = tablet.reserveFilesForScan(); fileReservationId = reservation.getFirst(); + Preconditions.checkState(fileReservationId >= 0); files = reservation.getSecond(); } @@ -200,8 +214,8 @@ private SortedKeyValueIterator createIterator() } SystemIteratorEnvironment iterEnv = (SystemIteratorEnvironment) builder.build(); - statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter(), - tablet.getScanMetrics().getScannedCounter()); + statsIterator = new StatsIterator(multiIter, scanSeekCounter, TabletServer.seekCount, + scanCounter, tablet.getScannedCounter(), tablet.getScanMetrics().getScannedCounter()); SortedKeyValueIterator visFilter = SystemIteratorUtil.setupSystemScanIterators(statsIterator, scanParams.getColumnSet(), @@ -257,9 +271,11 @@ private void returnIterators() { tablet.returnMemIterators(memIters); memIters = null; try { - log.trace("Returning file iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), - scanDataSourceId, fileReservationId); - tablet.returnFilesForScan(fileReservationId); + if (fileReservationId >= 0) { + log.trace("Returning file iterators for {}, scanId:{}, fid:{}", tablet.getExtent(), + scanDataSourceId, fileReservationId); + tablet.returnFilesForScan(fileReservationId); + } } catch (Exception e) { log.warn("Error Returning file iterators for scan: {}, :{}", scanDataSourceId, e); // Continue bubbling the exception up for handling. @@ -270,8 +286,16 @@ private void returnIterators() { } } + private boolean closed = false; + @Override public void close(boolean sawErrors) { + if (closed) { + return; + } + + closed = true; + try { returnIterators(); } finally { @@ -288,12 +312,20 @@ public void close(boolean sawErrors) { } finally { fileManager = null; if (statsIterator != null) { - statsIterator.report(); + statsIterator.report(true); } } } } + public void setAttributes(Span span) { + if (statsIterator != null && span.isRecording()) { + statsIterator.report(true); + span.setAttribute(TraceAttributes.ENTRIES_READ_KEY, scanCounter.get()); + span.setAttribute(TraceAttributes.SEEKS_KEY, scanSeekCounter.get()); + } + } + public void interrupt() { interruptFlag.set(true); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 2b89a2005b1..9e2a7c7af03 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -30,7 +30,11 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ShutdownUtil; +import org.apache.accumulo.tserver.scan.NextBatchTask; import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +72,23 @@ public class Scanner { } public ScanBatch read() throws IOException, TabletClosedException { + var span = TraceUtil.startSpan(NextBatchTask.class, "scan-batch"); + try (var scope = span.makeCurrent(); var scanScope = ScanInstrumentation.enable(span)) { + var batchAndSource = readInternal(); + // This needs to be called after the ScanDataSource was closed inorder to make sure all + // statistics related to files reads are seen. + tablet.recordScanTrace(span, batchAndSource.getFirst().getResults(), scanParams, + batchAndSource.getSecond()); + return batchAndSource.getFirst(); + } catch (IOException | RuntimeException e) { + span.recordException(e); + throw e; + } finally { + span.end(); + } + } + + private Pair readInternal() throws IOException, TabletClosedException { ScanDataSource dataSource = null; @@ -121,13 +142,13 @@ public ScanBatch read() throws IOException, TabletClosedException { if (results.getResults() == null) { range = null; - return new ScanBatch(new ArrayList<>(), false); + return new Pair<>(new ScanBatch(new ArrayList<>(), false), dataSource); } else if (results.getContinueKey() == null) { - return new ScanBatch(results.getResults(), false); + return new Pair<>(new ScanBatch(results.getResults(), false), dataSource); } else { range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive()); - return new ScanBatch(results.getResults(), true); + return new Pair<>(new ScanBatch(results.getResults(), true), dataSource); } } catch (IterationInterruptedException iie) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 90a53c734a0..d7ff6a7c8dc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1793,7 +1793,7 @@ public long totalQueryResultsBytes() { } public long totalScannedCount() { - return this.scannedCount.get(); + return this.scannedCount.sum(); } public long totalLookupCount() { @@ -1806,7 +1806,7 @@ public void updateRates(long now) { queryByteRate.update(now, this.queryResultBytes.get()); ingestRate.update(now, ingestCount); ingestByteRate.update(now, ingestBytes); - scannedRate.update(now, this.scannedCount.get()); + scannedRate.update(now, this.scannedCount.sum()); } public long getSplitCreationTime() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index dcc54e9da71..0ac8f6bd141 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -48,6 +49,10 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.spi.cache.CacheType; +import org.apache.accumulo.core.trace.ScanInstrumentation; +import org.apache.accumulo.core.trace.TraceAttributes; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ShutdownUtil; @@ -62,6 +67,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.opentelemetry.api.trace.Span; + /** * This class exists to share code for scanning a tablet between {@link Tablet} and * {@link SnapshotTablet} @@ -79,7 +86,7 @@ public abstract class TabletBase { protected AtomicLong lookupCount = new AtomicLong(0); protected AtomicLong queryResultCount = new AtomicLong(0); protected AtomicLong queryResultBytes = new AtomicLong(0); - protected final AtomicLong scannedCount = new AtomicLong(0); + protected final LongAdder scannedCount = new LongAdder(); protected final Set activeScans = new HashSet<>(); @@ -154,7 +161,7 @@ public Scanner createScanner(Range range, ScanParameters scanParams, return new Scanner(this, range, scanParams, interruptFlag); } - public AtomicLong getScannedCounter() { + public LongAdder getScannedCounter() { return this.scannedCount; } @@ -205,25 +212,30 @@ public Tablet.LookupResult lookup(List ranges, List results, tabletRange.clip(range); } - SourceSwitchingIterator.DataSource dataSource = - createDataSource(scanParams, true, interruptFlag); + ScanDataSource dataSource = createDataSource(scanParams, true, interruptFlag); Tablet.LookupResult result = null; boolean sawException = false; - try { + var span = TraceUtil.startSpan(TabletBase.class, "multiscan-batch"); + try (var scope = span.makeCurrent(); var scanScope = ScanInstrumentation.enable(span)) { SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); this.lookupCount.incrementAndGet(); this.server.getScanMetrics().incrementLookupCount(1); result = lookup(iter, ranges, results, scanParams, maxResultSize); + // must close data source before recording scan trace in order to flush all file read stats + dataSource.close(false); + recordScanTrace(span, results, scanParams, dataSource); return result; } catch (IOException | RuntimeException e) { sawException = true; + span.recordException(e); throw e; } finally { // code in finally block because always want // to return mapfiles, even when exception is thrown dataSource.close(sawException); + span.end(); synchronized (this) { queryResultCount.addAndGet(results.size()); @@ -236,6 +248,34 @@ public Tablet.LookupResult lookup(List ranges, List results, } } + void recordScanTrace(Span span, List batch, ScanParameters scanParameters, + ScanDataSource dataSource) { + if (span.isRecording()) { + span.setAttribute(TraceAttributes.ENTRIES_RETURNED_KEY, batch.size()); + long bytesReturned = 0; + for (var e : batch) { + bytesReturned += e.getKey().getLength() + e.getValue().get().length; + } + span.setAttribute(TraceAttributes.BYTES_RETURNED_KEY, bytesReturned); + span.setAttribute(TraceAttributes.EXECUTOR_KEY, + scanParameters.getScanDispatch().getExecutorName()); + span.setAttribute(TraceAttributes.TABLE_ID_KEY, getExtent().tableId().canonical()); + span.setAttribute(TraceAttributes.EXTENT_KEY, getExtent().toString()); + var si = ScanInstrumentation.get(); + span.setAttribute(TraceAttributes.BYTES_READ_FILE_KEY, si.getFileBytesRead()); + span.setAttribute(TraceAttributes.BYTES_READ_KEY, si.getUncompressedBytesRead()); + span.setAttribute(TraceAttributes.INDEX_HITS_KEY, si.getCacheHits(CacheType.INDEX)); + span.setAttribute(TraceAttributes.INDEX_MISSES_KEY, si.getCacheMisses(CacheType.INDEX)); + span.setAttribute(TraceAttributes.INDEX_BYPASSES_KEY, si.getCacheBypasses(CacheType.INDEX)); + span.setAttribute(TraceAttributes.DATA_HITS_KEY, si.getCacheHits(CacheType.DATA)); + span.setAttribute(TraceAttributes.DATA_MISSES_KEY, si.getCacheMisses(CacheType.DATA)); + span.setAttribute(TraceAttributes.DATA_BYPASSES_KEY, si.getCacheBypasses(CacheType.DATA)); + span.setAttribute(TraceAttributes.SERVER_KEY, server.getAdvertiseAddress().toString()); + + dataSource.setAttributes(span); + } + } + Batch nextBatch(SortedKeyValueIterator iter, Range range, ScanParameters scanParams) throws IOException { diff --git a/test/pom.xml b/test/pom.xml index c58e3778697..4430f6d929e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -62,6 +62,10 @@ commons-cli commons-cli + + commons-codec + commons-codec + commons-io commons-io @@ -82,6 +86,14 @@ io.opentelemetry opentelemetry-context + + io.opentelemetry.proto + opentelemetry-proto + + + jakarta.servlet + jakarta.servlet-api + org.apache.accumulo accumulo-compaction-coordinator @@ -194,6 +206,10 @@ org.easymock easymock + + org.eclipse.jetty + jetty-server + org.jline jline @@ -214,6 +230,11 @@ org.slf4j slf4j-api + + io.opentelemetry.javaagent + opentelemetry-javaagent + runtime + org.apache.hadoop hadoop-client-runtime diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index f20cf31afe4..a6f85f9aaf0 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -246,6 +247,10 @@ public static void toPrintableChars(byte[] dest) { } } + public static IteratorSetting.Column generateColumn(IngestParams params, int column) { + return new IteratorSetting.Column(new Text(params.columnFamily), generateQualifier(column)); + } + public static void main(String[] args) throws Exception { Opts opts = new Opts(); @@ -299,7 +304,7 @@ public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestPa Mutation m = new Mutation(row); for (int j = 0; j < params.cols; j++) { Text colf = new Text(params.columnFamily); - Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); + Text colq = generateQualifier(j); if (writer != null) { Key key = new Key(row, colf, colq, labBA); @@ -405,6 +410,11 @@ public static void ingest(AccumuloClient accumuloClient, FileSystem fs, IngestPa elapsed); } + private static Text generateQualifier(int j) { + Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX)); + return colq; + } + public static void ingest(AccumuloClient c, IngestParams params) throws MutationsRejectedException, IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java new file mode 100644 index 00000000000..2196302fea2 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTraceClient.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Range; + +import com.google.gson.FormattingStyle; +import com.google.gson.GsonBuilder; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; + +public class ScanTraceClient { + + public static class Options { + String clientPropsPath; + String table; + String startRow; + String endRow; + String family; + String qualifier; + + Options() {} + + Options(String table) { + this.table = table; + } + + void conigureScanner(Scanner scanner) { + if (startRow != null || endRow != null) { + scanner.setRange(new Range(startRow, true, endRow, false)); + } + setColumn(scanner); + } + + void conigureScanner(BatchScanner scanner) { + if (startRow != null || endRow != null) { + scanner.setRanges(List.of(new Range(startRow, true, endRow, false))); + } else { + scanner.setRanges(List.of(new Range())); + } + setColumn(scanner); + } + + void setColumn(ScannerBase scanner) { + System.out.println(scanner.getClass().getName() + " fam " + family); + if (family != null) { + scanner.fetchColumn(family, qualifier); + } + } + + } + + public static class Results { + // trace id for the batch scan + String traceId1; + // trace id for the normal scan + String traceId2; + // The number of entries returned by both scans + long scanCount; + // The number of bytes returned by both scans + long scanSize; + + @Override + public String toString() { + return "Results{scanCount=" + scanCount + ", traceId1='" + traceId1 + "', traceId2='" + + traceId2 + "', scanSize=" + scanSize + '}'; + } + + } + + public static void main(String[] args) throws Exception { + + Options opts = new GsonBuilder().create().fromJson(args[0], Options.class); + + String clientPropsPath = opts.clientPropsPath; + String table = opts.table; + + Tracer tracer = GlobalOpenTelemetry.get().getTracer(ScanTraceClient.class.getName()); + try (var client = Accumulo.newClient().from(clientPropsPath).build()) { + long scanCount = 0; + long scanSize = 0; + long batchScancount = 0; + long batchScanSize = 0; + + Span span = tracer.spanBuilder("batch-scan").startSpan(); + try (var scanner = client.createBatchScanner(table); var scope = span.makeCurrent()) { + opts.conigureScanner(scanner); + for (var entry : scanner) { + batchScancount++; + batchScanSize += entry.getKey().getSize() + entry.getValue().getSize(); + } + } finally { + span.end(); + } + var traceId1 = span.getSpanContext().getTraceId(); + + // start a second trace + span = tracer.spanBuilder("seq-scan").startSpan(); + try (var scanner = client.createScanner(table); var scope = span.makeCurrent()) { + opts.conigureScanner(scanner); + scanner.setBatchSize(10_000); + for (var entry : scanner) { + scanCount++; + scanSize += entry.getKey().getSize() + entry.getValue().getSize(); + } + } finally { + span.end(); + } + var traceId2 = span.getSpanContext().getTraceId(); + + assertEquals(scanCount, batchScancount); + assertEquals(scanSize, batchScanSize); + assertNotEquals(traceId1, traceId2); + + Results results = new Results(); + results.traceId1 = traceId1; + results.traceId2 = traceId2; + results.scanCount = scanCount; + results.scanSize = scanSize; + + var gson = new GsonBuilder().setFormattingStyle(FormattingStyle.COMPACT).create(); + System.out.println("RESULT:" + gson.toJson(results)); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java new file mode 100644 index 00000000000..8d0fe70bba0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/ScanTracingIT.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import static org.apache.accumulo.core.trace.TraceAttributes.EXECUTOR_KEY; +import static org.apache.accumulo.core.trace.TraceAttributes.EXTENT_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.trace.TraceAttributes; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; + +class ScanTracingIT extends ConfigurableMacBase { + + private static final int OTLP_PORT = PortUtils.getRandomFreePort(); + + private static List getJvmArgs() { + String javaAgent = null; + for (var cpi : System.getProperty("java.class.path").split(":")) { + if (cpi.contains("opentelemetry-javaagent")) { + javaAgent = cpi; + } + } + + Objects.requireNonNull(javaAgent); + + return List.of("-Dotel.traces.exporter=otlp", "-Dotel.exporter.otlp.protocol=http/protobuf", + "-Dotel.exporter.otlp.endpoint=http://localhost:" + OTLP_PORT, + "-Dotel.metrics.exporter=none", "-Dotel.logs.exporter=none", "-javaagent:" + javaAgent); + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + getJvmArgs().forEach(cfg::addJvmOption); + // sized such that full table scans will not fit in the cache + cfg.setProperty(Property.TSERV_DATACACHE_SIZE.getKey(), "8M"); + cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "pool1.threads", "8"); + } + + private TraceCollector collector; + + @BeforeEach + public void startCollector() throws Exception { + collector = new TraceCollector("localhost", OTLP_PORT); + } + + @AfterEach + public void stopCollector() throws Exception { + collector.stop(); + } + + @Test + public void test() throws Exception { + var names = getUniqueNames(7); + runTest(names[0], 0, false, false, -1, -1, -1); + runTest(names[1], 10, false, false, -1, -1, -1); + runTest(names[2], 0, true, false, -1, -1, -1); + runTest(names[3], 0, false, false, -1, -1, 2); + runTest(names[4], 0, false, false, 32, 256, -1); + runTest(names[5], 0, true, true, 32, 256, -1); + runTest(names[6], 0, true, false, -1, -1, 2); + } + + private void runTest(String tableName, int numSplits, boolean cacheData, + boolean secondScanFitsInCache, int startRow, int endRow, int column) throws Exception { + + var ingestParams = new TestIngest.IngestParams(getClientProperties(), tableName); + ingestParams.createTable = false; + ingestParams.rows = 1000; + ingestParams.cols = 10; + + try (var client = Accumulo.newClient().from(getClientProperties()).build()) { + var ntc = new NewTableConfiguration(); + if (numSplits > 0) { + var splits = TestIngest.getSplitPoints(0, 1000, numSplits); + ntc.withSplits(splits); + } + + var props = new HashMap(); + + if (cacheData) { + props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); + } + + // use a different executor for batch scans + props.put("table.scan.dispatcher.opts.multi_executor", "pool1"); + + ntc.setProperties(props); + + client.tableOperations().create(tableName, ntc); + + TestIngest.ingest(client, ingestParams); + client.tableOperations().flush(tableName, null, null, true); + } + + long expectedRows = ingestParams.rows; + + var options = new ScanTraceClient.Options(tableName); + if (startRow != -1 && endRow != -1) { + options.startRow = TestIngest.generateRow(startRow, 0).toString(); + options.endRow = TestIngest.generateRow(endRow, 0).toString(); + expectedRows = IntStream.range(startRow, endRow).count(); + } + + int expectedColumns = ingestParams.cols; + + if (column != -1) { + var col = TestIngest.generateColumn(ingestParams, column); + options.family = col.getColumnFamily().toString(); + options.qualifier = col.getColumnQualifier().toString(); + expectedColumns = 1; + } + + var results = run(options); + System.out.println(results); + + var tableId = getServerContext().getTableId(tableName).canonical(); + + var expectedServers = getServerContext().getAmple().readTablets().forTable(TableId.of(tableId)) + .fetch(TabletMetadata.ColumnType.LOCATION).build().stream() + .map(tm -> tm.getLocation().getHostAndPort().toString()).collect(Collectors.toSet()); + + ScanTraceStats scanStats = new ScanTraceStats(false); + ScanTraceStats batchScanStats = new ScanTraceStats(true); + Set extents1 = new TreeSet<>(); + Set extents2 = new TreeSet<>(); + + while (scanStats.getEntriesReturned() < expectedRows * expectedColumns + || batchScanStats.getEntriesReturned() < expectedRows * expectedColumns) { + var span = collector.take(); + var stats = ScanTraceStats.create(span); + if (stats != null + && span.stringAttributes.get(TraceAttributes.TABLE_ID_KEY.getKey()).equals(tableId) + && (results.traceId1.equals(span.traceId) || results.traceId2.equals(span.traceId))) { + assertTrue( + expectedServers + .contains(span.stringAttributes.get(TraceAttributes.SERVER_KEY.getKey())), + () -> expectedServers + " " + span); + if (stats.isBatchScan()) { + assertEquals("pool1", span.stringAttributes.get(EXECUTOR_KEY.getKey())); + } else { + assertEquals("default", span.stringAttributes.get(EXECUTOR_KEY.getKey())); + } + if (numSplits == 0) { + assertEquals(tableId + "<<", span.stringAttributes.get(EXTENT_KEY.getKey())); + } else { + var extent = span.stringAttributes.get(EXTENT_KEY.getKey()); + assertTrue(extent.startsWith(tableId + ";") || extent.startsWith(tableId + "<")); + } + assertEquals(1, stats.getSeeks()); + if (stats.isBatchScan()) { + assertEquals(results.traceId1, span.traceId); + extents1.add(span.stringAttributes.get(EXTENT_KEY.getKey())); + } else { + assertEquals(results.traceId2, span.traceId); + extents2.add(span.stringAttributes.get(EXTENT_KEY.getKey())); + } + } else { + continue; + } + + if (stats.isBatchScan()) { + batchScanStats.merge(stats); + } else { + scanStats.merge(stats); + } + } + + if (numSplits > 0) { + assertEquals(numSplits, extents1.size()); + assertEquals(numSplits, extents2.size()); + } + + System.out.println(scanStats); + System.out.println(batchScanStats); + + assertEquals(expectedRows * expectedColumns, results.scanCount, results::toString); + + var statsList = List.of(batchScanStats, scanStats); + for (int i = 0; i < statsList.size(); i++) { + var stats = statsList.get(i); + assertEquals(expectedRows * 10, stats.getEntriesRead(), stats::toString); + assertEquals(results.scanCount, stats.getEntriesReturned(), stats::toString); + // When filtering on columns will read more data than we return + double colMultiplier = 10.0 / expectedColumns; + assertClose((long) (results.scanSize * colMultiplier), stats.getBytesRead(), .05); + assertClose(results.scanSize, stats.getBytesReturned(), .05); + if (secondScanFitsInCache && i == 1) { + assertEquals(0, stats.getFileBytesRead(), stats::toString); + } else { + assertClose((long) (stats.getBytesRead() * .005), stats.getFileBytesRead(), .2); + } + if (cacheData) { + assertEquals(0, stats.getDataCacheBypasses(), stats::toString); + assertTrue(stats.getDataCacheHits() + stats.getDataCacheMisses() > 0, stats::toString); + if (stats.getFileBytesRead() == 0) { + assertEquals(0L, stats.getDataCacheMisses(), stats::toString); + } + // When caching data, does not seem to hit the cache much + var cacheSum = stats.getIndexCacheHits() + stats.getIndexCacheMisses(); + assertTrue(cacheSum == 0 || cacheSum == 1, stats::toString); + } else { + assertEquals(0, stats.getDataCacheHits(), stats::toString); + assertEquals(0, stats.getDataCacheMisses(), stats::toString); + assertTrue(stats.getDataCacheBypasses() > stats.getSeeks(), stats::toString); + assertClose(stats.getDataCacheBypasses(), stats.getIndexCacheHits(), .05); + } + assertEquals(0, stats.getIndexCacheBypasses(), stats::toString); + } + + } + + public void assertClose(long expected, long value, double e) { + assertTrue(Math.abs(1 - (double) expected / (double) value) < e, + () -> expected + " " + value + " " + e); + } + + /** + * Runs ScanTraceClient in an external process so it can be instrumented with the open telemetry + * java agent. Use json to get data to/from external process. + */ + public ScanTraceClient.Results run(ScanTraceClient.Options opts) + throws IOException, InterruptedException { + opts.clientPropsPath = getCluster().getClientPropsPath(); + var proc = getCluster().exec(ScanTraceClient.class, getJvmArgs(), new Gson().toJson(opts)); + assertEquals(0, proc.getProcess().waitFor()); + var out = proc.readStdOut(); + var result = Arrays.stream(out.split("\\n")).filter(line -> line.startsWith("RESULT:")) + .findFirst().orElse("RESULT:{}"); + result = result.substring("RESULT:".length()); + return new Gson().fromJson(result, ScanTraceClient.Results.class); + } + + /** + * Helper class that encapsulates data from a scan trace making it easier to access and + * centralizing the code for accessing data from a span. + */ + static class ScanTraceStats { + final Map scanStats; + final boolean isBatchScan; + + ScanTraceStats(SpanData spanData) { + this.scanStats = spanData.integerAttributes; + this.isBatchScan = spanData.name.contains("multiscan-batch"); + } + + ScanTraceStats(boolean isBatchScan) { + scanStats = new TreeMap<>(); + this.isBatchScan = isBatchScan; + } + + void merge(ScanTraceStats other) { + Preconditions.checkArgument(isBatchScan == other.isBatchScan); + other.scanStats.forEach((k, v) -> { + scanStats.merge(k, v, Long::sum); + }); + } + + /** + * @return a ScanTrace if span is from a scan batch, otherwise return null + */ + static ScanTraceStats create(SpanData data) { + if (data.name.contains("scan-batch")) { + return new ScanTraceStats(data); + } + return null; + } + + boolean isBatchScan() { + return isBatchScan; + } + + long getEntriesRead() { + return scanStats.getOrDefault(TraceAttributes.ENTRIES_READ_KEY.getKey(), 0L); + } + + long getEntriesReturned() { + return scanStats.getOrDefault(TraceAttributes.ENTRIES_RETURNED_KEY.getKey(), 0L); + } + + long getFileBytesRead() { + return scanStats.getOrDefault(TraceAttributes.BYTES_READ_FILE_KEY.getKey(), 0L); + } + + long getBytesRead() { + return scanStats.getOrDefault(TraceAttributes.BYTES_READ_KEY.getKey(), 0L); + } + + long getBytesReturned() { + return scanStats.getOrDefault(TraceAttributes.BYTES_RETURNED_KEY.getKey(), 0L); + } + + long getDataCacheHits() { + return scanStats.getOrDefault(TraceAttributes.DATA_HITS_KEY.getKey(), 0L); + } + + long getDataCacheMisses() { + return scanStats.getOrDefault(TraceAttributes.DATA_MISSES_KEY.getKey(), 0L); + } + + long getDataCacheBypasses() { + return scanStats.getOrDefault(TraceAttributes.DATA_BYPASSES_KEY.getKey(), 0L); + } + + long getIndexCacheHits() { + return scanStats.getOrDefault(TraceAttributes.INDEX_HITS_KEY.getKey(), 0L); + } + + long getIndexCacheMisses() { + return scanStats.getOrDefault(TraceAttributes.INDEX_MISSES_KEY.getKey(), 0L); + } + + long getIndexCacheBypasses() { + return scanStats.getOrDefault(TraceAttributes.INDEX_BYPASSES_KEY.getKey(), 0L); + } + + long getSeeks() { + return scanStats.getOrDefault(TraceAttributes.SEEKS_KEY.getKey(), 0L); + } + + @Override + public String toString() { + return "ScanTraceStats{isBatchScan=" + isBatchScan + ", scanStats=" + scanStats + '}'; + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java new file mode 100644 index 00000000000..c84845275aa --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/SpanData.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import java.util.Map; + +public class SpanData { + + public final String traceId; + public final String name; + public final Map stringAttributes; + public final Map integerAttributes; + + public SpanData(String traceId, String name, Map stringAttributes, + Map integerAttributes) { + this.traceId = traceId; + this.name = name; + this.stringAttributes = stringAttributes; + this.integerAttributes = integerAttributes; + } + + @Override + public String toString() { + return "SpanData{traceId='" + traceId + "', name='" + name + "', stringAttributes=" + + stringAttributes + ", integerAttributes=" + integerAttributes + '}'; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java new file mode 100644 index 00000000000..9a3b6c97010 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/tracing/TraceCollector.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.tracing; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import org.apache.commons.codec.binary.Hex; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +/** + * Open telemetry tracing data sink for testing. Processes can send http/protobuf trace data to this + * sink over http, and it will add them to an in memory queue that tests can read from. + */ +public class TraceCollector { + private final Server server; + + private final LinkedBlockingQueue spanQueue = new LinkedBlockingQueue<>(); + + private class TraceHandler extends AbstractHandler { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException { + + if (!target.equals("/v1/traces")) { + System.err.println("unexpected target : " + target); + response.setStatus(404); + response.getOutputStream().close(); + return; + } + + var body = request.getInputStream().readAllBytes(); + try { + var etsr = + io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest.parseFrom(body); + var spans = + etsr.getResourceSpansList().stream().flatMap(r -> r.getScopeSpansList().stream()) + .flatMap(r -> r.getSpansList().stream()).collect(Collectors.toList()); + + spans.forEach(s -> { + var traceId = Hex.encodeHexString(s.getTraceId().toByteArray(), true); + + Map stringAttrs = new HashMap<>(); + Map intAttrs = new HashMap<>(); + + s.getAttributesList().forEach(kv -> { + if (kv.getValue().hasIntValue()) { + intAttrs.put(kv.getKey(), kv.getValue().getIntValue()); + } else if (kv.getValue().hasStringValue()) { + stringAttrs.put(kv.getKey(), kv.getValue().getStringValue()); + } + }); + + spanQueue.add( + new SpanData(traceId, s.getName(), Map.copyOf(stringAttrs), Map.copyOf(intAttrs))); + }); + + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + + response.setStatus(200); + response.getOutputStream().close(); + } + } + + TraceCollector(String host, int port) throws Exception { + server = new Server(new InetSocketAddress(host, port)); + server.setHandler(new TraceHandler()); + server.start(); + } + + SpanData take() throws InterruptedException { + return spanQueue.take(); + } + + void stop() throws Exception { + server.stop(); + } +}