From 439816fea9fb67af23e6ad9742ac877bd91a5164 Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 6 Mar 2023 16:33:13 -0500 Subject: [PATCH 1/9] Initial commit for #2872. Make exporttable command volume aware. --- .../tableExport/WriteExportFiles.java | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index a245a95fb8b..f606f1a22fa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -154,6 +156,9 @@ public void undo(long tid, Manager env) { public static void exportTable(VolumeManager fs, ServerContext context, String tableName, TableId tableID, String exportDir) throws Exception { + int count = 0; + Set volumeSet = new TreeSet<>(); + fs.mkdirs(new Path(exportDir)); Path exportMetaFilePath = fs.getFileSystemByPath(new Path(exportDir)) .makeQualified(new Path(exportDir, Constants.EXPORT_FILE)); @@ -186,8 +191,30 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t dataOut.close(); dataOut = null; - createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles); + // make a set of unique volumes from the map + for (String fileString : uniqueFiles.values()) { + String[] fileSegmentArray = fileString.split("/"); + for (String fileSegment : fileSegmentArray) { + ++count; + if (count == 3) { + volumeSet.add(fileSegment); + break; + } + } + count = 0; + } + // for each unique volume: get every matching entry in the map and send to createDistcpFile + // method + for (String volumeString : volumeSet) { + Set sortedVolumeSet = new TreeSet<>(); + for (String rFileString : uniqueFiles.values()) { + if (rFileString.contains(volumeString)) { + sortedVolumeSet.add(rFileString); + } + } + createDistcpFile(fs, exportDir, exportMetaFilePath, sortedVolumeSet, volumeString); + } } finally { if (dataOut != null) { dataOut.close(); @@ -196,12 +223,15 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t } private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, - Map uniqueFiles) throws IOException { - BufferedWriter distcpOut = new BufferedWriter( - new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt")), UTF_8)); + Set uniqueFiles, String volumeName) throws IOException { + if (volumeName.contains(":")) + volumeName = volumeName.substring(0, volumeName.indexOf(":")); + + BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(exportDir, "distcp-" + volumeName + ".txt")), UTF_8)); try { - for (String file : uniqueFiles.values()) { + for (String file : uniqueFiles) { distcpOut.append(file); distcpOut.newLine(); } From 477fd4ed13da1aa34c0d569efa2ead4e763f5a7e Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 6 Mar 2023 17:35:24 -0500 Subject: [PATCH 2/9] Fixed syntax error. Added {} to if. --- .../manager/tableOps/tableExport/WriteExportFiles.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index f606f1a22fa..1b02f4030a5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -224,8 +224,9 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Set uniqueFiles, String volumeName) throws IOException { - if (volumeName.contains(":")) + if (volumeName.contains(":")) { volumeName = volumeName.substring(0, volumeName.indexOf(":")); + } BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( fs.create(new Path(exportDir, "distcp-" + volumeName + ".txt")), UTF_8)); From 92e5ffc2af90bd390957b8169f637f1a712ea619 Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 13 Mar 2023 17:43:36 -0400 Subject: [PATCH 3/9] Improved way to collect volume name and changed naming of new distcp.txt file --- .../tableOps/tableExport/WriteExportFiles.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index 1b02f4030a5..a096497f4d3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -156,7 +156,7 @@ public void undo(long tid, Manager env) { public static void exportTable(VolumeManager fs, ServerContext context, String tableName, TableId tableID, String exportDir) throws Exception { - int count = 0; + // int count = 0; Set volumeSet = new TreeSet<>(); fs.mkdirs(new Path(exportDir)); @@ -194,14 +194,7 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t // make a set of unique volumes from the map for (String fileString : uniqueFiles.values()) { String[] fileSegmentArray = fileString.split("/"); - for (String fileSegment : fileSegmentArray) { - ++count; - if (count == 3) { - volumeSet.add(fileSegment); - break; - } - } - count = 0; + volumeSet.add(fileSegmentArray[2]); } // for each unique volume: get every matching entry in the map and send to createDistcpFile @@ -225,7 +218,7 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Set uniqueFiles, String volumeName) throws IOException { if (volumeName.contains(":")) { - volumeName = volumeName.substring(0, volumeName.indexOf(":")); + volumeName = volumeName.replace(":", "-"); } BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( From 5df4df5284a37568e7623930c93bb658c9062eff Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Tue, 11 Apr 2023 09:57:50 -0400 Subject: [PATCH 4/9] Cleaned up class and started test case --- .../tableExport/WriteExportFiles.java | 1 - .../accumulo/test/shell/ShellServerIT.java | 92 ++++++++++++++++++- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index a096497f4d3..4429c67c29c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -156,7 +156,6 @@ public void undo(long tid, Manager env) { public static void exportTable(VolumeManager fs, ServerContext context, String tableName, TableId tableID, String exportDir) throws Exception { - // int count = 0; Set volumeSet = new TreeSet<>(); fs.mkdirs(new Path(exportDir)); diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java index 5639ac582c4..7041cbf71c2 100644 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java @@ -167,6 +167,9 @@ public void exporttableImporttable() throws Exception { final String table = getUniqueNames(1)[0]; final String table2 = table + "2"; + String[] pathTokens = rootPath.split("/"); + String volumeName = pathTokens[2]; + // exporttable / importtable ts.exec("createtable " + table + " -evc", true); make10(); @@ -195,8 +198,8 @@ public void exporttableImporttable() throws Exception { fs.mkdirs(importDir); // Implement a poor-man's DistCp - try (BufferedReader reader = - new BufferedReader(new FileReader(new File(exportDir, "distcp.txt"), UTF_8))) { + try (BufferedReader reader = new BufferedReader( + new FileReader(new File(exportDir, "distcp-" + volumeName + ".txt"), UTF_8))) { for (String line; (line = reader.readLine()) != null;) { Path exportedFile = new Path(line); // There isn't a cp on FileSystem?? @@ -208,7 +211,7 @@ public void exporttableImporttable() throws Exception { } } } else { - String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_}; + String[] distCpArgs = {"-f", exportUri + "/distcp-" + volumeName + ".txt", import_}; assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs)); } ts.exec("importtable " + table2 + " " + import_, true); @@ -221,6 +224,66 @@ public void exporttableImporttable() throws Exception { ts.exec("deletetable -f " + table2, true); } + @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") + @Test + public void exporttableWithMultipleVolumes() throws Exception { + + try (AccumuloClient client = + getCluster().createAccumuloClient(getPrincipal(), new PasswordToken(getRootPassword()))) { + client.securityOperations().grantNamespacePermission(getPrincipal(), "", + NamespacePermission.ALTER_NAMESPACE); + } + + String[] pathTokens = rootPath.split("/"); + String volumeName = pathTokens[2]; + + // exporttable + ts.exec("createtable multVolTable -evc", true); + makeTableWithMultipleVolumes(); + // read table and get volumes!! + ts.exec("offline multVolTable", true); + File exportDir = new File(rootPath, "ShellServerIT.export"); + String exportUri = "file://" + exportDir; + String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp"); + ts.exec("exporttable -t multVolTable" + " " + exportUri, true); + DistCp cp = new DistCp(new Configuration(false), null); + String import_ = "file://" + new File(rootPath, "ShellServerIT.import"); + ClientInfo info = ClientInfo.from(getCluster().getClientProperties()); + if (info.saslEnabled()) { + // DistCp bugs out trying to get a fs delegation token to perform the cp. Just copy it + // ourselves by hand. + FileSystem fs = getCluster().getFileSystem(); + FileSystem localFs = FileSystem.getLocal(new Configuration(false)); + + // Path on local fs to cp into + Path localTmpPath = new Path(localTmp); + localFs.mkdirs(localTmpPath); + + // Path in remote fs to importtable from + Path importDir = new Path(import_); + fs.mkdirs(importDir); + + // Implement a poor-man's DistCp + try (BufferedReader reader = new BufferedReader( + new FileReader(new File(exportDir, "distcp-" + volumeName + ".txt"), UTF_8))) { + for (String line; (line = reader.readLine()) != null;) { + Path exportedFile = new Path(line); + // There isn't a cp on FileSystem?? + log.info("Copying {} to {}", line, localTmpPath); + fs.copyToLocalFile(exportedFile, localTmpPath); + Path tmpFile = new Path(localTmpPath, exportedFile.getName()); + log.info("Moving {} to the import directory {}", tmpFile, importDir); + fs.moveFromLocalFile(tmpFile, importDir); + } + } + } else { + String[] distCpArgs = {"-f", exportUri + "/distcp-" + volumeName + ".txt", import_}; + assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs)); + } + ts.exec("online multVolTable", true); + // ts.exec("deletetable -f multVolTable", true); + } + @Test public void setscaniterDeletescaniter() throws Exception { final String table = getUniqueNames(1)[0]; @@ -1968,6 +2031,29 @@ private void make10() throws IOException { } } + private void makeTableWithMultipleVolumes() throws IOException { + for (int i = 1; i <= 4; i++) { + ts.exec(String.format( + "insert row%d cf I00000%dp.rf hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/I00000%dp.rf", + i, i, i)); + } + for (int j = 5; j <= 8; j++) { + ts.exec(String.format( + "insert row%d cf J00000%dp.rf hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/J00000%dp.rf", + j, j, j)); + } + for (int k = 9; k <= 12; k++) { + ts.exec(String.format( + "insert row%d cf K00000%dp.rf hdfs://warehouse-b.com:6093/accumulo/tables/3/default_tablet/K00000%dp.rf", + k, k, k)); + } + for (int l = 13; l <= 16; l++) { + ts.exec(String.format( + "insert row%d cf L00000%dp.rf hdfs://n1.example.com:6090/accumulo/tables/3/default_tablet/L00000%dp.rf", + l, l, l)); + } + } + private List getFiles(String tableId) throws IOException { ts.output.clear(); From aa66552d9c28594f017dee1693d3b5420dd65e7d Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Tue, 23 May 2023 18:33:36 -0400 Subject: [PATCH 5/9] Added IT class and improved WriteExportFiles.java --- .../tableExport/WriteExportFiles.java | 16 +- ...ExportTableCommandWitlMultipleVolumes.java | 156 ++++++++++++++++++ 2 files changed, 167 insertions(+), 5 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index 4429c67c29c..3ce19d32821 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -192,16 +192,17 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t // make a set of unique volumes from the map for (String fileString : uniqueFiles.values()) { - String[] fileSegmentArray = fileString.split("/"); - volumeSet.add(fileSegmentArray[2]); + String uniqueVolume = getVolumeFromString(fileString); + volumeSet.add(uniqueVolume); } - // for each unique volume: get every matching entry in the map and send to createDistcpFile - // method + // for each unique volume: get every matching entry in the map and send them to + // createDistcpFile method for (String volumeString : volumeSet) { Set sortedVolumeSet = new TreeSet<>(); for (String rFileString : uniqueFiles.values()) { - if (rFileString.contains(volumeString)) { + String currentVolume = getVolumeFromString(rFileString); + if (currentVolume.equals(volumeString)) { sortedVolumeSet.add(rFileString); } } @@ -214,6 +215,11 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t } } + private static String getVolumeFromString(String searchString) { + String[] segmentArray = searchString.split("/"); + return segmentArray[2]; + } + private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Set uniqueFiles, String volumeName) throws IOException { if (volumeName.contains(":")) { diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java new file mode 100644 index 00000000000..9feae47106c --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.time.Duration; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTableCommandWitlMultipleVolumes extends AccumuloClusterHarness { + + private static final Logger log = LoggerFactory.getLogger(ImportExportIT.class); + + Path v1, v2; + + public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(","); + + String baseDirStr = ""; + String baseDir2Str = ""; + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + baseDirStr = baseDir.toString(); + + // get first volume name + String[] baseDirArray = baseDirStr.split("/"); + String originalVolume = baseDirArray[2]; + + // get second volume name + String[] baseDir2Array = baseDirArray; + baseDir2Array[2] = baseDir2Array[2] + "2"; + String secondVolume = baseDir2Array[2]; + + // make second volume base directory + for (String element : baseDir2Array) { + baseDir2Str = baseDir2Str + "/" + element; + } + File baseDir2 = new File(baseDir2Str); + + File v1f = new File(baseDir, "volumes/v1"); + File v2f = new File(baseDir2, "volumes/v2"); + + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test + public void testExportCommand() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + // add splits to table + SortedSet partitions = new TreeSet<>(); + for (String s : row_numbers) { + partitions.add(new Text(s)); + } + client.tableOperations().addSplits(tableName, partitions); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 1; i <= 5000000; i++) { + Mutation m = new Mutation(Integer.toString(i)); + if (i % 2 != 0) { + m.put(Integer.toString(i), "", String + .format("file://localhost:8020/accumulo/tables/1/default_tablet/I00000%dp.rf", i)); + } else { + m.put(Integer.toString(i), "", String + .format("file://localhost:8020/accumulo/tables/2/default_tablet/I00000%dp.rf", i)); + } + bw.addMutation(m); + } + } + + client.tableOperations().compact(tableName, null, null, true, true); + client.tableOperations().flush(tableName, null, null, true); + + Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName()); + Path exportDir = new Path(outputDir, "export"); + client.tableOperations().offline(tableName, true); + client.tableOperations().exportTable(tableName, exportDir.toString()); + + try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + scanner.setRange(new Range("1", "1<")); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (Map.Entry entry : scanner) { + log.info("Key is: " + entry.getKey()); + log.info("Value is: " + entry.getValue()); + boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString()); + boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString()); + assertTrue(inV1 || inV2); + } + } + + FileSystem fs = cluster.getFileSystem(); + fs.deleteOnExit(v1); + fs.deleteOnExit(v2); + } + } +} From 71f9e528b8eda67a8ac8fb464ff7d06449f2ba94 Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 5 Jun 2023 13:17:59 -0400 Subject: [PATCH 6/9] Restored original ShellServerIT. Renamed IT class. Modified checks in WriteExportFiles --- .../tableExport/WriteExportFiles.java | 56 ++-- ...portTableCommandWithMultipleVolumesIT.java | 156 +++++++++ .../accumulo/test/shell/ShellServerIT.java | 311 ++++++------------ 3 files changed, 285 insertions(+), 238 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index 3ce19d32821..1b5a3b5b7b8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -25,11 +25,12 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeSet; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -60,6 +61,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@ -68,6 +70,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; class WriteExportFiles extends ManagerRepo { @@ -156,8 +159,6 @@ public void undo(long tid, Manager env) { public static void exportTable(VolumeManager fs, ServerContext context, String tableName, TableId tableID, String exportDir) throws Exception { - Set volumeSet = new TreeSet<>(); - fs.mkdirs(new Path(exportDir)); Path exportMetaFilePath = fs.getFileSystemByPath(new Path(exportDir)) .makeQualified(new Path(exportDir, Constants.EXPORT_FILE)); @@ -190,23 +191,35 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t dataOut.close(); dataOut = null; - // make a set of unique volumes from the map - for (String fileString : uniqueFiles.values()) { - String uniqueVolume = getVolumeFromString(fileString); - volumeSet.add(uniqueVolume); - } - - // for each unique volume: get every matching entry in the map and send them to - // createDistcpFile method - for (String volumeString : volumeSet) { - Set sortedVolumeSet = new TreeSet<>(); - for (String rFileString : uniqueFiles.values()) { - String currentVolume = getVolumeFromString(rFileString); - if (currentVolume.equals(volumeString)) { - sortedVolumeSet.add(rFileString); + // make map containing a volume and corresponding files + final Map> volumeFileMap = new HashMap<>(); + final Collection configuredVolumes = fs.getVolumes(); + configuredVolumes.forEach(vol -> { + final FileSystem dfs = vol.getFileSystem(); + uniqueFiles.values().forEach(file -> { + Path p = null; + try { + p = dfs.resolvePath(new Path(file)); + } catch (IOException e) { + throw new RuntimeException(e); } - } - createDistcpFile(fs, exportDir, exportMetaFilePath, sortedVolumeSet, volumeString); + if (vol.containsPath(p)) { + if (volumeFileMap.get(vol) == null) { + volumeFileMap.put(vol, new HashSet()); + } + volumeFileMap.get(vol).add(file); + } + }); + }); + + // for each entry in volumeFileMap, get 'name' of volume to name distcp.txt file + // and call createDistcpFile + for (Map.Entry> entry : volumeFileMap.entrySet()) { + String keyValueString = entry.getKey().toString(); + String[] keyValueArray = keyValueString.split("/"); + String volumeName = keyValueArray[2]; + createDistcpFile(fs, exportDir, exportMetaFilePath, volumeFileMap.get(entry.getKey()), + volumeName); } } finally { if (dataOut != null) { @@ -215,11 +228,6 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t } } - private static String getVolumeFromString(String searchString) { - String[] segmentArray = searchString.split("/"); - return segmentArray[2]; - } - private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Set uniqueFiles, String volumeName) throws IOException { if (volumeName.contains(":")) { diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java new file mode 100644 index 00000000000..65338f2ccb8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.time.Duration; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTableCommandWithMultipleVolumesIT extends AccumuloClusterHarness { + private static final Logger log = + LoggerFactory.getLogger(ExportTableCommandWithMultipleVolumesIT.class); + + Path v1, v2; + + public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(","); + + String baseDirStr = ""; + String baseDir2Str = ""; + String originalVolume = ""; + String secondVolume = ""; + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + + // get first volume name + baseDirStr = baseDir.toString(); + String[] baseDirArray = baseDirStr.split("/"); + originalVolume = baseDirArray[2]; + + // get second volume name + String[] baseDir2Array = baseDirArray; + baseDir2Array[2] = baseDir2Array[2] + "2"; + secondVolume = baseDir2Array[2]; + + // make second volume base directory + for (String element : baseDir2Array) { + baseDir2Str = baseDir2Str + "/" + element; + } + File baseDir2 = new File(baseDir2Str); + + File v1f = new File(baseDir, "volumes/v1"); + File v2f = new File(baseDir2, "volumes/v2"); + + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test + public void testExportCommand() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + FileSystem fs = cluster.getFileSystem(); + + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + // add splits to table + SortedSet partitions = new TreeSet<>(); + for (String s : row_numbers) { + partitions.add(new Text(s)); + } + client.tableOperations().addSplits(tableName, partitions); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 1; i <= 50000; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(Integer.toString(i), "", String.format("Entry number %d.", i)); + bw.addMutation(m); + } + } + + client.tableOperations().compact(tableName, null, null, true, true); + client.tableOperations().flush(tableName, null, null, true); + + Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName()); + Path exportDir = new Path(outputDir, "export"); + client.tableOperations().offline(tableName, true); + client.tableOperations().exportTable(tableName, exportDir.toString()); + + // Make sure the distcp.txt files that exporttable creates exists + Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt"); + Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt"); + assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume"); + assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume"); + + try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + scanner.setRange(new Range("1", "1<")); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (Map.Entry entry : scanner) { + boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString()); + boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString()); + assertTrue(inV1 || inV2); + } + } + + fs.deleteOnExit(v1); + fs.deleteOnExit(v2); + fs.deleteOnExit(outputDir); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java index 7041cbf71c2..ae350cd475e 100644 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.shell; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -44,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.accumulo.core.Constants; @@ -52,7 +51,6 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.sample.RowColumnSampler; import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -79,6 +77,7 @@ import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -162,144 +161,93 @@ public void exporttableImporttable() throws Exception { getCluster().createAccumuloClient(getPrincipal(), new PasswordToken(getRootPassword()))) { client.securityOperations().grantNamespacePermission(getPrincipal(), "", NamespacePermission.ALTER_NAMESPACE); - } - - final String table = getUniqueNames(1)[0]; - final String table2 = table + "2"; - - String[] pathTokens = rootPath.split("/"); - String volumeName = pathTokens[2]; - // exporttable / importtable - ts.exec("createtable " + table + " -evc", true); - make10(); - ts.exec("addsplits row5", true); - ts.exec("config -t " + table + " -s table.split.threshold=345M", true); - ts.exec("offline " + table, true); - File exportDir = new File(rootPath, "ShellServerIT.export"); - String exportUri = "file://" + exportDir; - String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp"); - ts.exec("exporttable -t " + table + " " + exportUri, true); - DistCp cp = new DistCp(new Configuration(false), null); - String import_ = "file://" + new File(rootPath, "ShellServerIT.import"); - ClientInfo info = ClientInfo.from(getCluster().getClientProperties()); - if (info.saslEnabled()) { - // DistCp bugs out trying to get a fs delegation token to perform the cp. Just copy it - // ourselves by hand. - FileSystem fs = getCluster().getFileSystem(); - FileSystem localFs = FileSystem.getLocal(new Configuration(false)); - - // Path on local fs to cp into - Path localTmpPath = new Path(localTmp); - localFs.mkdirs(localTmpPath); - - // Path in remote fs to importtable from - Path importDir = new Path(import_); - fs.mkdirs(importDir); - - // Implement a poor-man's DistCp - try (BufferedReader reader = new BufferedReader( - new FileReader(new File(exportDir, "distcp-" + volumeName + ".txt"), UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - Path exportedFile = new Path(line); - // There isn't a cp on FileSystem?? - log.info("Copying {} to {}", line, localTmpPath); - fs.copyToLocalFile(exportedFile, localTmpPath); - Path tmpFile = new Path(localTmpPath, exportedFile.getName()); - log.info("Moving {} to the import directory {}", tmpFile, importDir); - fs.moveFromLocalFile(tmpFile, importDir); + final String tableBase = getUniqueNames(1)[0]; + final String table = tableBase + "_export_src"; + final String table2 = tableBase + "_import_tgt"; + + // exporttable / importtable + ts.exec("createtable " + table + " -evc", true); + make10(); + ts.exec("addsplits row5", true); + ts.exec("config -t " + table + " -s table.split.threshold=345M", true); + ts.exec("offline " + table, true); + File exportDir = new File(rootPath, "ShellServerIT.export"); + String exportUri = "file://" + exportDir; + String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp"); + ts.exec("exporttable -t " + table + " " + exportUri, true); + DistCp cp = new DistCp(new Configuration(false), null); + String import_ = "file://" + new File(rootPath, "ShellServerIT.import"); + ClientInfo info = ClientInfo.from(getCluster().getClientProperties()); + if (info.saslEnabled()) { + // DistCp bugs out trying to get a fs delegation token to perform the cp. Just copy it + // ourselves by hand. + FileSystem fs = getCluster().getFileSystem(); + FileSystem localFs = FileSystem.getLocal(new Configuration(false)); + + // Path on local fs to cp into + Path localTmpPath = new Path(localTmp); + localFs.mkdirs(localTmpPath); + + // Path in remote fs to importtable from + Path importDir = new Path(import_); + fs.mkdirs(importDir); + + // Implement a poor-man's DistCp + try (BufferedReader reader = + new BufferedReader(new FileReader(new File(exportDir, "distcp.txt"), UTF_8))) { + for (String line; (line = reader.readLine()) != null;) { + Path exportedFile = new Path(line); + // There isn't a cp on FileSystem?? + log.info("Copying {} to {}", line, localTmpPath); + fs.copyToLocalFile(exportedFile, localTmpPath); + Path tmpFile = new Path(localTmpPath, exportedFile.getName()); + log.info("Moving {} to the import directory {}", tmpFile, importDir); + fs.moveFromLocalFile(tmpFile, importDir); + } } + } else { + String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_}; + assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs)); } - } else { - String[] distCpArgs = {"-f", exportUri + "/distcp-" + volumeName + ".txt", import_}; - assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs)); + Thread.sleep(20); + ts.exec("importtable " + table2 + " " + import_, true); + ts.exec("config -t " + table2 + " -np", true, "345M", true); + ts.exec("getsplits -t " + table2, true, "row5", true); + ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true); + ts.exec("online " + table, true); + ts.exec("deletetable -f " + table, true); + ts.exec("deletetable -f " + table2, true); } - ts.exec("importtable " + table2 + " " + import_, true); - Thread.sleep(100); - ts.exec("config -t " + table2 + " -np", true, "345M", true); - ts.exec("getsplits -t " + table2, true, "row5", true); - ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true); - ts.exec("online " + table, true); - ts.exec("deletetable -f " + table, true); - ts.exec("deletetable -f " + table2, true); } - @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") @Test - public void exporttableWithMultipleVolumes() throws Exception { - + public void propStressTest() throws Exception { try (AccumuloClient client = getCluster().createAccumuloClient(getPrincipal(), new PasswordToken(getRootPassword()))) { client.securityOperations().grantNamespacePermission(getPrincipal(), "", NamespacePermission.ALTER_NAMESPACE); - } - String[] pathTokens = rootPath.split("/"); - String volumeName = pathTokens[2]; - - // exporttable - ts.exec("createtable multVolTable -evc", true); - makeTableWithMultipleVolumes(); - // read table and get volumes!! - ts.exec("offline multVolTable", true); - File exportDir = new File(rootPath, "ShellServerIT.export"); - String exportUri = "file://" + exportDir; - String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp"); - ts.exec("exporttable -t multVolTable" + " " + exportUri, true); - DistCp cp = new DistCp(new Configuration(false), null); - String import_ = "file://" + new File(rootPath, "ShellServerIT.import"); - ClientInfo info = ClientInfo.from(getCluster().getClientProperties()); - if (info.saslEnabled()) { - // DistCp bugs out trying to get a fs delegation token to perform the cp. Just copy it - // ourselves by hand. - FileSystem fs = getCluster().getFileSystem(); - FileSystem localFs = FileSystem.getLocal(new Configuration(false)); - - // Path on local fs to cp into - Path localTmpPath = new Path(localTmp); - localFs.mkdirs(localTmpPath); - - // Path in remote fs to importtable from - Path importDir = new Path(import_); - fs.mkdirs(importDir); - - // Implement a poor-man's DistCp - try (BufferedReader reader = new BufferedReader( - new FileReader(new File(exportDir, "distcp-" + volumeName + ".txt"), UTF_8))) { - for (String line; (line = reader.readLine()) != null;) { - Path exportedFile = new Path(line); - // There isn't a cp on FileSystem?? - log.info("Copying {} to {}", line, localTmpPath); - fs.copyToLocalFile(exportedFile, localTmpPath); - Path tmpFile = new Path(localTmpPath, exportedFile.getName()); - log.info("Moving {} to the import directory {}", tmpFile, importDir); - fs.moveFromLocalFile(tmpFile, importDir); - } - } - } else { - String[] distCpArgs = {"-f", exportUri + "/distcp-" + volumeName + ".txt", import_}; - assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs)); - } - ts.exec("online multVolTable", true); - // ts.exec("deletetable -f multVolTable", true); - } + final String table = getUniqueNames(1)[0]; - @Test - public void setscaniterDeletescaniter() throws Exception { - final String table = getUniqueNames(1)[0]; + ts.exec("createtable " + table + " -evc", true); + make10(); + ts.exec("addsplits row5", true); - // setscaniter, deletescaniter - ts.exec("createtable " + table); - ts.exec("insert a cf cq 1"); - ts.exec("insert a cf cq 1"); - ts.exec("insert a cf cq 1"); - ts.input.set("true\n\n\n\nSTRING"); - ts.exec("setscaniter -class " + SUMMING_COMBINER_ITERATOR + " -p 10 -n name", true); - ts.exec("scan", true, "3", true); - ts.exec("deletescaniter -n name", true); - ts.exec("scan", true, "1", true); - ts.exec("deletetable -f " + table); + ts.exec("config -t " + table + " -s table.split.threshold=345M", true); + for (int i = 0; i < 50; i++) { + String expected = (100 + i) + "M"; + ts.exec("config -t " + table + " -s table.split.threshold=" + expected, true); + ts.exec("config -t " + table + " -np -f table.split.threshold", true, expected, true); + ts.exec("config -t " + table + " -s table.scan.max.memory=" + expected, true); + ts.exec("config -t " + table + " -np -f table.scan.max.memory", true, expected, true); + + String bExpected = ((i % 2) == 0) ? "true" : "false"; + ts.exec("config -t " + table + " -s table.bloom.enabled=" + bExpected, true); + ts.exec("config -t " + table + " -np -f table.bloom.enabled", true, bExpected, true); + } + } } @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") @@ -350,22 +298,6 @@ public void du() throws Exception { ts.exec("deletetable -f " + table); } - /* - * This test should be deleted when the debug command is removed - */ - @Deprecated(since = "2.0.0") - @Test - public void debug() throws Exception { - String expectMsg = "The debug command is deprecated"; - ts.exec("debug", false, expectMsg); - ts.exec("debug on", false, expectMsg); - ts.exec("debug", false, expectMsg); - ts.exec("debug off", false, expectMsg); - ts.exec("debug", false, expectMsg); - ts.exec("debug debug", false, expectMsg); - ts.exec("debug debug debug", false, expectMsg); - } - @Test public void user() throws Exception { final String table = getUniqueNames(1)[0]; @@ -484,8 +416,7 @@ public void setIterOptionPrompt() throws Exception { String expectedKey = "table.iterator.scan.cfcounter"; String expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR; - TableOperations tops = client.tableOperations(); - checkTableForProperty(tops, tableName0, expectedKey, expectedValue); + checkTableForProperty(client, tableName0, expectedKey, expectedValue); ts.exec("deletetable " + tableName0, true); @@ -499,7 +430,7 @@ public void setIterOptionPrompt() throws Exception { ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p 30", true); expectedKey = "table.iterator.scan.customcfcounter"; expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR; - checkTableForProperty(tops, tableName1, expectedKey, expectedValue); + checkTableForProperty(client, tableName1, expectedKey, expectedValue); ts.exec("deletetable " + tableName1, true); @@ -511,15 +442,16 @@ public void setIterOptionPrompt() throws Exception { // Name on the CLI should override OptionDescriber (or user input name, in this case) ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p 30", true); + expectedKey = "table.iterator.scan.customcfcounter"; expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR; - checkTableForProperty(tops, tableName2, expectedKey, expectedValue); + checkTableForProperty(client, tableName2, expectedKey, expectedValue); expectedKey = "table.iterator.scan.customcfcounter.opt.name1"; expectedValue = "value1"; - checkTableForProperty(tops, tableName2, expectedKey, expectedValue); + checkTableForProperty(client, tableName2, expectedKey, expectedValue); expectedKey = "table.iterator.scan.customcfcounter.opt.name2"; expectedValue = "value2"; - checkTableForProperty(tops, tableName2, expectedKey, expectedValue); + checkTableForProperty(client, tableName2, expectedKey, expectedValue); ts.exec("deletetable " + tableName2, true); @@ -534,33 +466,24 @@ public void setIterOptionPrompt() throws Exception { true); expectedKey = "table.iterator.scan.cfcounter"; expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR; - checkTableForProperty(tops, tableName3, expectedKey, expectedValue); + checkTableForProperty(client, tableName3, expectedKey, expectedValue); expectedKey = "table.iterator.scan.cfcounter.opt.name1"; expectedValue = "value1.1,value1.2,value1.3"; - checkTableForProperty(tops, tableName3, expectedKey, expectedValue); + checkTableForProperty(client, tableName3, expectedKey, expectedValue); expectedKey = "table.iterator.scan.cfcounter.opt.name2"; expectedValue = "value2"; - checkTableForProperty(tops, tableName3, expectedKey, expectedValue); + checkTableForProperty(client, tableName3, expectedKey, expectedValue); ts.exec("deletetable " + tableName3, true); - } } - protected void checkTableForProperty(TableOperations tops, String tableName, String expectedKey, - String expectedValue) throws Exception { - for (int i = 0; i < 5; i++) { - for (Entry entry : tops.getProperties(tableName)) { - if (expectedKey.equals(entry.getKey())) { - assertEquals(expectedValue, entry.getValue()); - return; - } - } - Thread.sleep(500); - } - - fail("Failed to find expected property on " + tableName + ": " + expectedKey + "=" - + expectedValue); + protected void checkTableForProperty(final AccumuloClient client, final String tableName, + final String expectedKey, final String expectedValue) throws Exception { + assertTrue( + Wait.waitFor(() -> client.tableOperations().getConfiguration(tableName).get(expectedKey) + .equals(expectedValue), 5000, 500), + "Failed to find expected value for key: " + expectedKey); } @Test @@ -612,7 +535,7 @@ public void addauths() throws Exception { ts.exec("getauths", true, "bar", true); passed = true; } catch (AssertionError | Exception e) { - sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + Thread.sleep(500); } } assertTrue(passed, "Could not successfully see updated authoriations"); @@ -1081,7 +1004,7 @@ public void constraint() throws Exception { ts.exec("constraint -l -t " + table, true, "VisibilityConstraint=2", true); ts.exec("constraint -t " + table + " -d 2", true, "Removed constraint 2 from table " + table); // wait for zookeeper updates to propagate - sleepUninterruptibly(1, TimeUnit.SECONDS); + Thread.sleep(SECONDS.toMillis(1)); ts.exec("constraint -l -t " + table, true, "VisibilityConstraint=2", false); ts.exec("deletetable -f " + table); } @@ -1433,23 +1356,6 @@ public void info() throws Exception { ts.exec("info", true, Constants.VERSION, true); } - @Test - public void interpreter() throws Exception { - final String table = getUniqueNames(1)[0]; - - ts.exec("createtable " + table, true); - ts.exec("interpreter -l", true, "HexScan", false); - ts.exec("insert \\x02 cf cq value", true); - ts.exec("scan -b 02", true, "value", false); - ts.exec("interpreter -i org.apache.accumulo.core.util.interpret.HexScanInterpreter", true); - // Need to allow time for this to propagate through zoocache/zookeeper - sleepUninterruptibly(3, TimeUnit.SECONDS); - - ts.exec("interpreter -l", true, "HexScan", true); - ts.exec("scan -b 02", true, "value", true); - ts.exec("deletetable -f " + table, true); - } - @Test public void listcompactions() throws Exception { final String table = getUniqueNames(1)[0]; @@ -1516,7 +1422,7 @@ public void ping() throws Exception { if (ts.output.get().split("\n").length == 3) { break; } - sleepUninterruptibly(1, TimeUnit.SECONDS); + Thread.sleep(SECONDS.toMillis(1)); } assertEquals(2, ts.output.get().split("\n").length); @@ -1597,7 +1503,7 @@ public void listscans() throws Exception { log.info("Ignoring scan because of wrong table: {}", currentScan); } } - sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + Thread.sleep(300); } thread.join(); @@ -1636,7 +1542,7 @@ public void testPerTableClasspath_2_1_Jar() throws Exception { } public void verifyPerTableClasspath(final String table, final File fooConstraintJar) - throws IOException { + throws IOException, InterruptedException { File fooFilterJar = initJar("/org/apache/accumulo/test/FooFilter.jar", "FooFilter", rootPath); @@ -1647,7 +1553,7 @@ public void verifyPerTableClasspath(final String table, final File fooConstraint "config -t " + table + " -s " + Property.TABLE_CLASSLOADER_CONTEXT.getKey() + "=" + context, true); - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); + Thread.sleep(250); // We can't use the setiter command as Filter implements OptionDescriber which // forces us to enter more input that I don't know how to input @@ -1655,11 +1561,11 @@ public void verifyPerTableClasspath(final String table, final File fooConstraint ts.exec("config -t " + table + " -s " + Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.foo=10,org.apache.accumulo.test.FooFilter"); - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); + Thread.sleep(250); ts.exec("insert foo f q v", true); - sleepUninterruptibly(250, TimeUnit.MILLISECONDS); + Thread.sleep(250); ts.exec("scan -np", true, "foo", false); @@ -2031,29 +1937,6 @@ private void make10() throws IOException { } } - private void makeTableWithMultipleVolumes() throws IOException { - for (int i = 1; i <= 4; i++) { - ts.exec(String.format( - "insert row%d cf I00000%dp.rf hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/I00000%dp.rf", - i, i, i)); - } - for (int j = 5; j <= 8; j++) { - ts.exec(String.format( - "insert row%d cf J00000%dp.rf hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/J00000%dp.rf", - j, j, j)); - } - for (int k = 9; k <= 12; k++) { - ts.exec(String.format( - "insert row%d cf K00000%dp.rf hdfs://warehouse-b.com:6093/accumulo/tables/3/default_tablet/K00000%dp.rf", - k, k, k)); - } - for (int l = 13; l <= 16; l++) { - ts.exec(String.format( - "insert row%d cf L00000%dp.rf hdfs://n1.example.com:6090/accumulo/tables/3/default_tablet/L00000%dp.rf", - l, l, l)); - } - } - private List getFiles(String tableId) throws IOException { ts.output.clear(); From 6e19c8a2dfb9b7f45f9d0558a29277c89aae67b1 Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Tue, 13 Jun 2023 12:07:30 -0400 Subject: [PATCH 7/9] removed ExportTableCommandWitlMultipleVolumes.java --- ...ExportTableCommandWitlMultipleVolumes.java | 156 ------------------ 1 file changed, 156 deletions(-) delete mode 100644 test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java deleted file mode 100644 index 9feae47106c..00000000000 --- a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWitlMultipleVolumes.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.accumulo.test; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.File; -import java.time.Duration; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ExportTableCommandWitlMultipleVolumes extends AccumuloClusterHarness { - - private static final Logger log = LoggerFactory.getLogger(ImportExportIT.class); - - Path v1, v2; - - public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(","); - - String baseDirStr = ""; - String baseDir2Str = ""; - - @Override - protected Duration defaultTimeout() { - return Duration.ofMinutes(1); - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - File baseDir = cfg.getDir(); - baseDirStr = baseDir.toString(); - - // get first volume name - String[] baseDirArray = baseDirStr.split("/"); - String originalVolume = baseDirArray[2]; - - // get second volume name - String[] baseDir2Array = baseDirArray; - baseDir2Array[2] = baseDir2Array[2] + "2"; - String secondVolume = baseDir2Array[2]; - - // make second volume base directory - for (String element : baseDir2Array) { - baseDir2Str = baseDir2Str + "/" + element; - } - File baseDir2 = new File(baseDir2Str); - - File v1f = new File(baseDir, "volumes/v1"); - File v2f = new File(baseDir2, "volumes/v2"); - - v1 = new Path("file://" + v1f.getAbsolutePath()); - v2 = new Path("file://" + v2f.getAbsolutePath()); - - // Run MAC on two locations in the local file system - cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); - - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - @Test - public void testExportCommand() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - - final String tableName = getUniqueNames(1)[0]; - client.tableOperations().create(tableName); - - // add splits to table - SortedSet partitions = new TreeSet<>(); - for (String s : row_numbers) { - partitions.add(new Text(s)); - } - client.tableOperations().addSplits(tableName, partitions); - - try (BatchWriter bw = client.createBatchWriter(tableName)) { - for (int i = 1; i <= 5000000; i++) { - Mutation m = new Mutation(Integer.toString(i)); - if (i % 2 != 0) { - m.put(Integer.toString(i), "", String - .format("file://localhost:8020/accumulo/tables/1/default_tablet/I00000%dp.rf", i)); - } else { - m.put(Integer.toString(i), "", String - .format("file://localhost:8020/accumulo/tables/2/default_tablet/I00000%dp.rf", i)); - } - bw.addMutation(m); - } - } - - client.tableOperations().compact(tableName, null, null, true, true); - client.tableOperations().flush(tableName, null, null, true); - - Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName()); - Path exportDir = new Path(outputDir, "export"); - client.tableOperations().offline(tableName, true); - client.tableOperations().exportTable(tableName, exportDir.toString()); - - try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { - scanner.setRange(new Range("1", "1<")); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); - - for (Map.Entry entry : scanner) { - log.info("Key is: " + entry.getKey()); - log.info("Value is: " + entry.getValue()); - boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString()); - boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString()); - assertTrue(inV1 || inV2); - } - } - - FileSystem fs = cluster.getFileSystem(); - fs.deleteOnExit(v1); - fs.deleteOnExit(v2); - } - } -} From fced61f2479dea155610386a9ec6ae11a8a0cfb4 Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 26 Jun 2023 15:12:48 -0400 Subject: [PATCH 8/9] Made suggested change to exportTable method --- .../manager/tableOps/tableExport/WriteExportFiles.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index 1b5a3b5b7b8..21ee23b8e4a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -204,10 +204,7 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t throw new RuntimeException(e); } if (vol.containsPath(p)) { - if (volumeFileMap.get(vol) == null) { - volumeFileMap.put(vol, new HashSet()); - } - volumeFileMap.get(vol).add(file); + volumeFileMap.computeIfAbsent(vol, k -> new HashSet<>()).add(file); } }); }); From fe5fd308b15339cae7c5a0b5c5b02a0e028a696b Mon Sep 17 00:00:00 2001 From: AlbertWhitlock Date: Mon, 18 Sep 2023 14:35:19 -0400 Subject: [PATCH 9/9] Changes to old test and added a new test file --- ...portTableCommandWithMultipleVolumesIT.java | 46 +++--- .../test/ExportTableMakeMultipleDistcp.java | 148 ++++++++++++++++++ 2 files changed, 168 insertions(+), 26 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java index 65338f2ccb8..5ca19869b48 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -43,7 +44,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -53,7 +53,7 @@ public class ExportTableCommandWithMultipleVolumesIT extends AccumuloClusterHarn private static final Logger log = LoggerFactory.getLogger(ExportTableCommandWithMultipleVolumesIT.class); - Path v1, v2; + Path v1, v2, v3; public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(","); @@ -77,27 +77,21 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo originalVolume = baseDirArray[2]; // get second volume name - String[] baseDir2Array = baseDirArray; - baseDir2Array[2] = baseDir2Array[2] + "2"; + File baseDir2 = new File("/tmp/testUser"); + baseDir2Str = baseDir2.toString(); + String[] baseDir2Array = baseDir2Str.split("/"); secondVolume = baseDir2Array[2]; - // make second volume base directory - for (String element : baseDir2Array) { - baseDir2Str = baseDir2Str + "/" + element; - } - File baseDir2 = new File(baseDir2Str); - - File v1f = new File(baseDir, "volumes/v1"); - File v2f = new File(baseDir2, "volumes/v2"); + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + File v1f = new File(baseDir, "volumes/" + uuid1.toString()); + File v2f = new File(baseDir2, "volumes/" + uuid2.toString()); v1 = new Path("file://" + v1f.getAbsolutePath()); v2 = new Path("file://" + v2f.getAbsolutePath()); // Run MAC on two locations in the local file system cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); - - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @Test @@ -126,17 +120,6 @@ public void testExportCommand() throws Exception { client.tableOperations().compact(tableName, null, null, true, true); client.tableOperations().flush(tableName, null, null, true); - Path outputDir = new Path(cluster.getTemporaryPath(), getClass().getName()); - Path exportDir = new Path(outputDir, "export"); - client.tableOperations().offline(tableName, true); - client.tableOperations().exportTable(tableName, exportDir.toString()); - - // Make sure the distcp.txt files that exporttable creates exists - Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt"); - Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt"); - assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume"); - assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume"); - try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { scanner.setRange(new Range("1", "1<")); scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); @@ -148,6 +131,17 @@ public void testExportCommand() throws Exception { } } + Path outputDir = new Path(cluster.getTemporaryPath(), "testDir"); + Path exportDir = new Path(outputDir, "export"); + client.tableOperations().offline(tableName, true); + client.tableOperations().exportTable(tableName, exportDir.toString()); + + // Make sure the distcp.txt files that exporttable creates exist + Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt"); + Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt"); + assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume"); + assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume"); + fs.deleteOnExit(v1); fs.deleteOnExit(v2); fs.deleteOnExit(outputDir); diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java b/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java new file mode 100644 index 00000000000..3e949b2fda1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTableMakeMultipleDistcp { + private static final Logger log = LoggerFactory.getLogger(ExportTableMakeMultipleDistcp.class); + + @Test + public void testDistcpFiles() { + Set volumeSet = new TreeSet<>(); + + Map uniqueFiles = new HashMap<>(); + uniqueFiles.put("F00000ja.rf", + "hdfs://warehouse-b.com:6093/accumulo/tables/3/default_tablet/F00000ja.rf"); + uniqueFiles.put("F00000jb.rf", + "hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/F00000jb.rf"); + uniqueFiles.put("C00000gf.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/C00000gf.rf"); + uniqueFiles.put("F00000jc.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/F00000jc.rf"); + uniqueFiles.put("F00000jd.rf", + "hdfs://n2.example.com:6080/accumulo/tables/3/default_tablet/F00000jd.rf"); + uniqueFiles.put("F00000je.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/F00000je.rf"); + uniqueFiles.put("F00000jz.rf", + "hdfs://n1.example.com:6090/accumulo/tables/1/default_tablet/F00000jz.rf"); + uniqueFiles.put("F00000jf.rf", + "hdfs://n2.example.com:6093/accumulo/tables/2/default_tablet/F00000jf.rf"); + uniqueFiles.put("A000002m.rf", + "hdfs://n1.example.com:6093/accumulo/tables/1/default_tablet/A000002m.rf"); + uniqueFiles.put("Z00000ez.rf", + "hdfs://n4.example.com:6093/accumulo/tables/3/default_tablet/Z00000ez.rf"); + uniqueFiles.put("A000002n.rf", + "hdfs://warehouse-b.com:6093/accumulo/tables/1/default_tablet/A000002n.rf"); + uniqueFiles.put("A000002p.rf", + "hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/A000002p.rf"); + uniqueFiles.put("A000002q.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/A000002q.rf"); + uniqueFiles.put("C00000ef.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/1/default_tablet/C00000ef.rf"); + uniqueFiles.put("F00000cz.rf", + "hdfs://n1.example.com:6090/accumulo/tables/2/default_tablet/F00000cz.rf"); + uniqueFiles.put("C00000eg.rf", + "hdfs://n1.example.com:6093/accumulo/tables/2/default_tablet/C00000eg.rf"); + uniqueFiles.put("C00000eh.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/C00000eh.rf"); + uniqueFiles.put("D00000ef.rf", + "hdfs://warehouse-a.com:6090/accumulo/tables/1/default_tablet/D00000ef.rf"); + uniqueFiles.put("C00000ek.rf", + "hdfs://n2.example.com:6080/accumulo/tables/1/default_tablet/C00000ek.rf"); + uniqueFiles.put("C00000em.rf", + "hdfs://n2.example.com:6093/accumulo/tables/2/default_tablet/C00000em.rf"); + uniqueFiles.put("S00000eo.rf", + "hdfs://n4.example.com:6093/accumulo/tables/3/default_tablet/S00000eo.rf"); + uniqueFiles.put("Z00002fu.rf", + "hdfs://n8.example.com:9999/accumulo/tables/1/default_tablet/Z00002fu.rf"); + + // this will get each unique volume from original map and store it in a set + for (String fileString : uniqueFiles.values()) { + String[] fileSegmentArray = fileString.split("/"); + volumeSet.add(fileSegmentArray[2]); + } + + // this will traverse original map, get all entries with same volume, add them to a new map, and + // send new map to createDistcpFile method + for (String volumeString : volumeSet) { + Set sortedVolumeSet = new TreeSet<>(); + for (String rFileString : uniqueFiles.values()) { + if (rFileString.contains(volumeString)) { + sortedVolumeSet.add(rFileString); + } + } + createDistcpFile(volumeString, sortedVolumeSet); + } + } + + public static void createDistcpFile(String volumeName, Set passedSet) { + if (volumeName.contains(":")) { + volumeName = volumeName.replace(":", "-"); + } + + try { + String outputPath = System.getProperty("user.dir") + + "/target/mini-tests/ExportTableMakeMultipleDistcp_createDistcpFile"; + Path outputFilePath = new Path(outputPath); + FileSystem fs = FileSystem.getLocal(new Configuration()); + + BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(outputPath, "distcp-" + volumeName + ".txt")), UTF_8)); + + try { + for (String file : passedSet) { + distcpOut.append(file); + distcpOut.newLine(); + } + + distcpOut.close(); + distcpOut = null; + + } finally { + if (distcpOut != null) { + distcpOut.close(); + } + } + + Path distcpPath = new Path(outputFilePath, "distcp-" + volumeName + ".txt"); + assertTrue(fs.exists(distcpPath), "Distcp file doesn't exist"); + + // cleanup created files + fs.deleteOnExit(distcpPath); + } catch (Exception e) { + System.out.println("Exception during configure"); + } + } +}