diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java index a245a95fb8b..21ee23b8e4a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableExport/WriteExportFiles.java @@ -25,9 +25,12 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -58,6 +61,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@ -66,6 +70,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; class WriteExportFiles extends ManagerRepo { @@ -186,8 +191,33 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t dataOut.close(); dataOut = null; - createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles); - + // make map containing a volume and corresponding files + final Map> volumeFileMap = new HashMap<>(); + final Collection configuredVolumes = fs.getVolumes(); + configuredVolumes.forEach(vol -> { + final FileSystem dfs = vol.getFileSystem(); + uniqueFiles.values().forEach(file -> { + Path p = null; + try { + p = dfs.resolvePath(new Path(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (vol.containsPath(p)) { + volumeFileMap.computeIfAbsent(vol, k -> new HashSet<>()).add(file); + } + }); + }); + + // for each entry in volumeFileMap, get 'name' of volume to name distcp.txt file + // and call createDistcpFile + for (Map.Entry> entry : volumeFileMap.entrySet()) { + String keyValueString = entry.getKey().toString(); + String[] keyValueArray = keyValueString.split("/"); + String volumeName = keyValueArray[2]; + createDistcpFile(fs, exportDir, exportMetaFilePath, volumeFileMap.get(entry.getKey()), + volumeName); + } } finally { if (dataOut != null) { dataOut.close(); @@ -196,12 +226,16 @@ public static void exportTable(VolumeManager fs, ServerContext context, String t } private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, - Map uniqueFiles) throws IOException { - BufferedWriter distcpOut = new BufferedWriter( - new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt")), UTF_8)); + Set uniqueFiles, String volumeName) throws IOException { + if (volumeName.contains(":")) { + volumeName = volumeName.replace(":", "-"); + } + + BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(exportDir, "distcp-" + volumeName + ".txt")), UTF_8)); try { - for (String file : uniqueFiles.values()) { + for (String file : uniqueFiles) { distcpOut.append(file); distcpOut.newLine(); } diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java new file mode 100644 index 00000000000..5ca19869b48 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableCommandWithMultipleVolumesIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.time.Duration; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTableCommandWithMultipleVolumesIT extends AccumuloClusterHarness { + private static final Logger log = + LoggerFactory.getLogger(ExportTableCommandWithMultipleVolumesIT.class); + + Path v1, v2, v3; + + public static String[] row_numbers = "1,2,3,4,5,6,7,8,9,10".split(","); + + String baseDirStr = ""; + String baseDir2Str = ""; + String originalVolume = ""; + String secondVolume = ""; + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + + // get first volume name + baseDirStr = baseDir.toString(); + String[] baseDirArray = baseDirStr.split("/"); + originalVolume = baseDirArray[2]; + + // get second volume name + File baseDir2 = new File("/tmp/testUser"); + baseDir2Str = baseDir2.toString(); + String[] baseDir2Array = baseDir2Str.split("/"); + secondVolume = baseDir2Array[2]; + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + File v1f = new File(baseDir, "volumes/" + uuid1.toString()); + File v2f = new File(baseDir2, "volumes/" + uuid2.toString()); + + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2); + } + + @Test + public void testExportCommand() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + FileSystem fs = cluster.getFileSystem(); + + final String tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + + // add splits to table + SortedSet partitions = new TreeSet<>(); + for (String s : row_numbers) { + partitions.add(new Text(s)); + } + client.tableOperations().addSplits(tableName, partitions); + + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (int i = 1; i <= 50000; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(Integer.toString(i), "", String.format("Entry number %d.", i)); + bw.addMutation(m); + } + } + + client.tableOperations().compact(tableName, null, null, true, true); + client.tableOperations().flush(tableName, null, null, true); + + try (Scanner scanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + scanner.setRange(new Range("1", "1<")); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + for (Map.Entry entry : scanner) { + boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString()); + boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString()); + assertTrue(inV1 || inV2); + } + } + + Path outputDir = new Path(cluster.getTemporaryPath(), "testDir"); + Path exportDir = new Path(outputDir, "export"); + client.tableOperations().offline(tableName, true); + client.tableOperations().exportTable(tableName, exportDir.toString()); + + // Make sure the distcp.txt files that exporttable creates exist + Path distcpOne = new Path(exportDir, "distcp-" + originalVolume + ".txt"); + Path distcpTwo = new Path(exportDir, "distcp-" + secondVolume + ".txt"); + assertTrue(fs.exists(distcpOne), "Distcp file doesn't exist for original volume"); + assertTrue(fs.exists(distcpTwo), "Distcp file doesn't exist for second volume"); + + fs.deleteOnExit(v1); + fs.deleteOnExit(v2); + fs.deleteOnExit(outputDir); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java b/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java new file mode 100644 index 00000000000..3e949b2fda1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExportTableMakeMultipleDistcp.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportTableMakeMultipleDistcp { + private static final Logger log = LoggerFactory.getLogger(ExportTableMakeMultipleDistcp.class); + + @Test + public void testDistcpFiles() { + Set volumeSet = new TreeSet<>(); + + Map uniqueFiles = new HashMap<>(); + uniqueFiles.put("F00000ja.rf", + "hdfs://warehouse-b.com:6093/accumulo/tables/3/default_tablet/F00000ja.rf"); + uniqueFiles.put("F00000jb.rf", + "hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/F00000jb.rf"); + uniqueFiles.put("C00000gf.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/C00000gf.rf"); + uniqueFiles.put("F00000jc.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/F00000jc.rf"); + uniqueFiles.put("F00000jd.rf", + "hdfs://n2.example.com:6080/accumulo/tables/3/default_tablet/F00000jd.rf"); + uniqueFiles.put("F00000je.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/3/default_tablet/F00000je.rf"); + uniqueFiles.put("F00000jz.rf", + "hdfs://n1.example.com:6090/accumulo/tables/1/default_tablet/F00000jz.rf"); + uniqueFiles.put("F00000jf.rf", + "hdfs://n2.example.com:6093/accumulo/tables/2/default_tablet/F00000jf.rf"); + uniqueFiles.put("A000002m.rf", + "hdfs://n1.example.com:6093/accumulo/tables/1/default_tablet/A000002m.rf"); + uniqueFiles.put("Z00000ez.rf", + "hdfs://n4.example.com:6093/accumulo/tables/3/default_tablet/Z00000ez.rf"); + uniqueFiles.put("A000002n.rf", + "hdfs://warehouse-b.com:6093/accumulo/tables/1/default_tablet/A000002n.rf"); + uniqueFiles.put("A000002p.rf", + "hdfs://n1.example.com:6093/accumulo/tables/3/default_tablet/A000002p.rf"); + uniqueFiles.put("A000002q.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/A000002q.rf"); + uniqueFiles.put("C00000ef.rf", + "hdfs://warehouse-a.com:6093/accumulo/tables/1/default_tablet/C00000ef.rf"); + uniqueFiles.put("F00000cz.rf", + "hdfs://n1.example.com:6090/accumulo/tables/2/default_tablet/F00000cz.rf"); + uniqueFiles.put("C00000eg.rf", + "hdfs://n1.example.com:6093/accumulo/tables/2/default_tablet/C00000eg.rf"); + uniqueFiles.put("C00000eh.rf", + "hdfs://n3.example.com:6093/accumulo/tables/3/default_tablet/C00000eh.rf"); + uniqueFiles.put("D00000ef.rf", + "hdfs://warehouse-a.com:6090/accumulo/tables/1/default_tablet/D00000ef.rf"); + uniqueFiles.put("C00000ek.rf", + "hdfs://n2.example.com:6080/accumulo/tables/1/default_tablet/C00000ek.rf"); + uniqueFiles.put("C00000em.rf", + "hdfs://n2.example.com:6093/accumulo/tables/2/default_tablet/C00000em.rf"); + uniqueFiles.put("S00000eo.rf", + "hdfs://n4.example.com:6093/accumulo/tables/3/default_tablet/S00000eo.rf"); + uniqueFiles.put("Z00002fu.rf", + "hdfs://n8.example.com:9999/accumulo/tables/1/default_tablet/Z00002fu.rf"); + + // this will get each unique volume from original map and store it in a set + for (String fileString : uniqueFiles.values()) { + String[] fileSegmentArray = fileString.split("/"); + volumeSet.add(fileSegmentArray[2]); + } + + // this will traverse original map, get all entries with same volume, add them to a new map, and + // send new map to createDistcpFile method + for (String volumeString : volumeSet) { + Set sortedVolumeSet = new TreeSet<>(); + for (String rFileString : uniqueFiles.values()) { + if (rFileString.contains(volumeString)) { + sortedVolumeSet.add(rFileString); + } + } + createDistcpFile(volumeString, sortedVolumeSet); + } + } + + public static void createDistcpFile(String volumeName, Set passedSet) { + if (volumeName.contains(":")) { + volumeName = volumeName.replace(":", "-"); + } + + try { + String outputPath = System.getProperty("user.dir") + + "/target/mini-tests/ExportTableMakeMultipleDistcp_createDistcpFile"; + Path outputFilePath = new Path(outputPath); + FileSystem fs = FileSystem.getLocal(new Configuration()); + + BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(outputPath, "distcp-" + volumeName + ".txt")), UTF_8)); + + try { + for (String file : passedSet) { + distcpOut.append(file); + distcpOut.newLine(); + } + + distcpOut.close(); + distcpOut = null; + + } finally { + if (distcpOut != null) { + distcpOut.close(); + } + } + + Path distcpPath = new Path(outputFilePath, "distcp-" + volumeName + ".txt"); + assertTrue(fs.exists(distcpPath), "Distcp file doesn't exist"); + + // cleanup created files + fs.deleteOnExit(distcpPath); + } catch (Exception e) { + System.out.println("Exception during configure"); + } + } +}