diff --git a/conf/log4j2.properties b/conf/log4j2.properties index fcd0f451..94c52b38 100644 --- a/conf/log4j2.properties +++ b/conf/log4j2.properties @@ -27,7 +27,7 @@ appender.console.type = Console appender.console.name = STDERR appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{ISO8601} [%c{3}] %-5p: %m%n +appender.console.layout.pattern = %d{ISO8601} %T [%c{3}] %-5p: %m%n loggers = accumulotesting, accumulo, hadooptest, hadoopmapred, hadooputil, zookeepertest, curatortest logger.accumulotesting.name = org.apache.accumulo.testing diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java index b217f164..a1de6669 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkMinusOne.java @@ -30,8 +30,7 @@ public class BulkMinusOne extends BulkImportTest { @Override protected void runLater(State state, RandWalkEnv env) throws Exception { - log.info("Decrementing"); - BulkPlusOne.bulkLoadLots(log, state, env, negOne); + var bulkRange = BulkPlusOne.rangeExchange.nextDecrementRange(env); + BulkPlusOne.bulkLoadLots(log, state, env, bulkRange, negOne); } - } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java index 7287da22..de716b48 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java @@ -18,8 +18,12 @@ */ package org.apache.accumulo.testing.randomwalk.bulk; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; import java.util.List; +import java.util.Random; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -39,9 +43,13 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; +import com.google.common.base.Preconditions; + public class BulkPlusOne extends BulkImportTest { public static final int LOTS = 100000; + public static final int ZONES = 50; + public static final int ZONE_SIZE = LOTS / ZONES; public static final int COLS = 10; public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16)); public static final String FMT = "r%0" + HEX_SIZE + "x"; @@ -51,61 +59,183 @@ public class BulkPlusOne extends BulkImportTest { .map(t -> new Column(CHECK_COLUMN_FAMILY, t)).collect(Collectors.toList()); public static final Text MARKER_CF = new Text("marker"); - static final AtomicLong counter = new AtomicLong(); + + /** + * Inclusive start exclusive end zone range. + */ + record BulkRange(int startZone, int endZone) { + public BulkRange { + Preconditions.checkArgument(startZone >= 0 && startZone < endZone && endZone <= ZONES, + "startZone:%s endZone:%s", startZone, endZone); + } + + static BulkRange randomRange(Random random) { + int start = random.nextInt(ZONES); + int end = random.nextInt(ZONES); + if (end < start) { + int tmp = end; + end = start; + start = tmp; + } + if (end == start) { + end++; + } + return new BulkRange(start, end); + } + } + + /** + * Every increment range must also be decremented and visa versa. This ensures that happens as + * threads doing bulk imports request ranges. + */ + static class RangeExchange { + private Deque incrementRanges = new ArrayDeque<>(); + private Deque decrementRanges = new ArrayDeque<>(); + + synchronized BulkRange nextIncrementRange(RandWalkEnv env) { + var next = incrementRanges.poll(); + if (next == null) { + next = BulkRange.randomRange(env.getRandom()); + decrementRanges.push(next); + } + return next; + } + + synchronized BulkRange nextDecrementRange(RandWalkEnv env) { + var next = decrementRanges.poll(); + if (next == null) { + next = BulkRange.randomRange(env.getRandom()); + incrementRanges.push(next); + } + return next; + } + + synchronized boolean isEmpty() { + return incrementRanges.isEmpty() && decrementRanges.isEmpty(); + } + + synchronized void clear() { + incrementRanges.clear(); + decrementRanges.clear(); + } + } + + static final RangeExchange rangeExchange = new RangeExchange(); + static final AtomicLong perZoneCounters[] = new AtomicLong[ZONES]; + static { + for (int i = 0; i < perZoneCounters.length; i++) { + perZoneCounters[i] = new AtomicLong(0); + } + } private static final Value ONE = new Value("1".getBytes()); - static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception { - String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); - String markerLog = "marker:" + markerColumnQualifier; + /** + * Load a plus one or minus one into a random range of the table. Overall this test should load a + * minus one into the same range for every plus one loaded (or visa versa) and at the end of the + * test the sum should be zero. In order to aid with debugging data loss, this test loads markers + * along with the plus one and minus ones. These markers help pin point which bulk load operation + * was missing. The tables row range is divided into zones and each zone has a one up counter that + * is used to generate markers. At the end of the test each zone in the table should have a + * contiguous set of markers. If a marker is missing, then the logging from this method should be + * consulted to determine the corresponding bulk import operation. Once the bulk import operation + * is found it can be followed in the Accumulo server logs. The test does analysis to find missing + * markers and prints holes. Like if the test prints that it saw marker 97 and 99 in zone 3, then + * that means marker 98 is missing and the bulk import operation related to 98 needs to be found + * in the test logs. When looking for missing markers, look for the correct zone in the test logs. + * + * All the logging in this method includes the bulk import directory uuid. If there is a problem, + * then this directory uuid can be used to find the corresponding fate uuid in the accumulo server + * logs. There should be a log message in the server logs that includes the bulk directory and the + * fate uuid. + */ + static void bulkLoadLots(Logger log, State state, RandWalkEnv env, BulkRange bulkRange, + Value value) throws Exception { + + long[] markers = new long[ZONES]; + Arrays.fill(markers, -1); + + // Allocate a marker for each zone and build a log message that links the marker for each zone + // to this bulk import directory name uuid. + StringBuilder makersBuilder = new StringBuilder("["); + String sep = ""; + for (int z = bulkRange.startZone; z < bulkRange.endZone; z++) { + long zoneMarker = perZoneCounters[z].incrementAndGet(); + markers[z] = zoneMarker; + makersBuilder.append(sep).append(z).append(":").append(String.format("%07d", zoneMarker)); + sep = ","; + } + makersBuilder.append("]"); + String markersLog = makersBuilder.toString(); + final UUID uuid = UUID.randomUUID(); final FileSystem fs = (FileSystem) state.get("fs"); - final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID()); - log.debug("{} bulk loading from {}", markerLog, dir); + final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + uuid); + log.debug("{} bulk loading {} over {} from {}", uuid, value, bulkRange, dir); + log.debug("{} zone markers:{}", uuid, markersLog); final int parts = env.getRandom().nextInt(10) + 1; + // Must mutate all rows in the zone, so expand the start and end to encompass all the rows in + // the zone. + final int start = bulkRange.startZone * ZONE_SIZE; + final int end = bulkRange.endZone * ZONE_SIZE; + // The set created below should always contain 0. So its very important that zero is first in // concat below. - TreeSet startRows = - Stream.concat(Stream.of(0), Stream.generate(() -> env.getRandom().nextInt(LOTS))).distinct() - .limit(parts).collect(Collectors.toCollection(TreeSet::new)); + TreeSet startRows = Stream + .concat(Stream.of(start), + Stream.generate(() -> env.getRandom().nextInt(end - start) + start)) + .distinct().limit(parts).collect(Collectors.toCollection(TreeSet::new)); List printRows = startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList()); - log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog, - printRows, String.format(FMT, LOTS - 1)); + log.debug("{} preparing bulk files with start rows {} last row {}", uuid, printRows, + String.format(FMT, end)); + startRows.add(end); List rows = new ArrayList<>(startRows); - rows.add(LOTS); + + long currentZone = -1; + Text markerColumnQualifier = null; for (int i = 0; i < parts; i++) { String fileName = dir + "/" + String.format("part_%d.rf", i); - log.debug("{} creating {}", markerLog, fileName); + log.trace("creating {}", fileName); try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) { writer.startDefaultLocalityGroup(); - int start = rows.get(i); - int end = rows.get(i + 1); - for (int j = start; j < end; j++) { + int partStart = rows.get(i); + int partEnd = rows.get(i + 1); + int eCount = 0; + for (int j = partStart; j < partEnd; j++) { + int zone = j / ZONE_SIZE; + if (currentZone != zone) { + Preconditions.checkState(markers[zone] > 0, "%s %s %s", zone, j, bulkRange); + markerColumnQualifier = new Text(String.format("%07d", markers[zone])); + currentZone = zone; + } + Text row = new Text(String.format(FMT, j)); for (Column col : COLNAMES) { writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value); + eCount++; } - writer.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); + writer.append(new Key(row, MARKER_CF, markerColumnQualifier), ONE); + eCount++; } + log.debug("{} created {} with {} entries", uuid, fileName, eCount); } } env.getAccumuloClient().tableOperations().importDirectory(dir.toString()) .to(Setup.getTableName()).tableTime(true).load(); fs.delete(dir, true); - log.debug("{} Finished bulk import", markerLog); + log.debug("{} Finished bulk import", uuid); } @Override protected void runLater(State state, RandWalkEnv env) throws Exception { - log.info("Incrementing"); - bulkLoadLots(log, state, env, ONE); + var bulkRange = rangeExchange.nextIncrementRange(env); + bulkLoadLots(log, state, env, bulkRange, ONE); } - } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java index 3fd2d8de..6b193e58 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java @@ -38,7 +38,7 @@ public class Setup extends Test { - private static final int MAX_POOL_SIZE = 8; + private static final int MAX_POOL_SIZE = 16; static String tableName = null; @Override @@ -54,7 +54,8 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti IteratorSetting is = new IteratorSetting(10, SummingCombiner.class); SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING); SummingCombiner.setCombineAllColumns(is, true); - var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000"); + var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000", + Property.TABLE_BULK_MAX_TABLETS.getKey(), "1000"); tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is).setProperties(tableProps)); @@ -65,7 +66,10 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti state.setRandom(env.getRandom()); state.set("fs", FileSystem.get(env.getHadoopConfiguration())); state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE); - BulkPlusOne.counter.set(0L); + BulkPlusOne.rangeExchange.clear(); + for (int i = 0; i < BulkPlusOne.perZoneCounters.length; i++) { + BulkPlusOne.perZoneCounters[i].set(0); + } ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool") .numCoreThreads(MAX_POOL_SIZE).build(); state.set("pool", e); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java index 6db6ab5e..9c465f16 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java @@ -67,6 +67,11 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti errorFound = true; } + if (!BulkPlusOne.rangeExchange.isEmpty()) { + log.error("BulkPlusOne.rangeExchange is not empty"); + errorFound = true; + } + String user = env.getAccumuloClient().whoami(); Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user); RowIterator rowIter; @@ -112,9 +117,13 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti prev = curr; } - if (BulkPlusOne.counter.get() != prev) { - log.error("Row {} does not have all markers. Current marker: {}, Previous marker:{}", - rowText, BulkPlusOne.counter.get(), prev); + long parsedRow = Long.parseLong(rowText.toString().substring(1), 16); + int zone = (int) (parsedRow / BulkPlusOne.ZONE_SIZE); + + if (BulkPlusOne.perZoneCounters[zone].get() != prev) { + log.error( + "Row {} does not have all markers. Current marker: {}, Previous marker:{} zone:{}", + rowText, BulkPlusOne.perZoneCounters[zone].get(), prev, zone); errorFound = true; } }