Skip to content

Commit 59785cd

Browse files
committed
Set all tablets availability to ONDEMAND on import
1 parent da7b3b0 commit 59785cd

File tree

3 files changed

+169
-23
lines changed

3 files changed

+169
-23
lines changed

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java

+14-19
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.accumulo.manager.tableOps.ManagerRepo;
5757
import org.apache.accumulo.server.fs.VolumeManager;
5858
import org.apache.accumulo.server.util.MetadataTableUtil;
59+
import org.apache.hadoop.fs.FSDataInputStream;
5960
import org.apache.hadoop.fs.Path;
6061
import org.apache.hadoop.io.Text;
6162
import org.slf4j.Logger;
@@ -103,7 +104,8 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
103104
try (
104105
BatchWriter mbw =
105106
manager.getContext().createBatchWriter(AccumuloTable.METADATA.tableName());
106-
ZipInputStream zis = new ZipInputStream(fs.open(path))) {
107+
FSDataInputStream fsDataInputStream = fs.open(path);
108+
ZipInputStream zis = new ZipInputStream(fsDataInputStream)) {
107109

108110
Map<String,String> fileNameMappings = new HashMap<>();
109111
for (ImportedTableInfo.DirectoryMapping dm : tableInfo.directories) {
@@ -115,8 +117,11 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
115117

116118
ZipEntry zipEntry;
117119
while ((zipEntry = zis.getNextEntry()) != null) {
118-
if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
119-
DataInputStream in = new DataInputStream(new BufferedInputStream(zis));
120+
if (!zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
121+
continue;
122+
}
123+
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(zis);
124+
DataInputStream in = new DataInputStream(bufferedInputStream)) {
120125

121126
Key key = new Key();
122127
Value val = new Value();
@@ -125,8 +130,6 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
125130
Text currentRow = null;
126131
int dirCount = 0;
127132

128-
boolean sawTabletAvailability = false;
129-
130133
while (true) {
131134
key.readFields(in);
132135
val.readFields(in);
@@ -166,11 +169,9 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
166169
if (m == null || !currentRow.equals(metadataRow)) {
167170

168171
if (m != null) {
169-
if (!sawTabletAvailability) {
170-
// add a default tablet availability
171-
TabletColumnFamily.AVAILABILITY_COLUMN.put(m,
172-
TabletAvailabilityUtil.toValue(TabletAvailability.ONDEMAND));
173-
}
172+
// add a default tablet availability
173+
TabletColumnFamily.AVAILABILITY_COLUMN.put(m,
174+
TabletAvailabilityUtil.toValue(TabletAvailability.ONDEMAND));
174175
mbw.addMutation(m);
175176
}
176177

@@ -183,21 +184,15 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
183184
m = new Mutation(metadataRow);
184185
ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(tabletDir));
185186
currentRow = metadataRow;
186-
sawTabletAvailability = false;
187187
}
188188

189-
if (TabletColumnFamily.AVAILABILITY_COLUMN.hasColumns(key)) {
190-
sawTabletAvailability = true;
191-
}
192189
m.put(key.getColumnFamily(), cq, val);
193190

194191
if (endRow == null && TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
195192

196-
if (!sawTabletAvailability) {
197-
// add a default tablet availability
198-
TabletColumnFamily.AVAILABILITY_COLUMN.put(m,
199-
TabletAvailabilityUtil.toValue(TabletAvailability.ONDEMAND));
200-
}
193+
// add a default tablet availability
194+
TabletColumnFamily.AVAILABILITY_COLUMN.put(m,
195+
TabletAvailabilityUtil.toValue(TabletAvailability.ONDEMAND));
201196

202197
mbw.addMutation(m);
203198
break; // it is the last column in the last row

test/src/main/java/org/apache/accumulo/test/ImportExportIT.java

+146
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static org.apache.accumulo.core.Constants.IMPORT_MAPPINGS_FILE;
2222
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
23+
import static org.apache.accumulo.test.TableOperationsIT.setExpectedTabletAvailability;
24+
import static org.apache.accumulo.test.TableOperationsIT.verifyTabletAvailabilites;
2325
import static org.junit.jupiter.api.Assertions.assertEquals;
2426
import static org.junit.jupiter.api.Assertions.assertFalse;
2527
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -33,20 +35,24 @@
3335
import java.io.InputStreamReader;
3436
import java.nio.file.Paths;
3537
import java.time.Duration;
38+
import java.util.ArrayList;
3639
import java.util.Arrays;
3740
import java.util.Iterator;
3841
import java.util.List;
3942
import java.util.Map.Entry;
4043
import java.util.Set;
44+
import java.util.SortedSet;
4145

4246
import org.apache.accumulo.cluster.AccumuloCluster;
4347
import org.apache.accumulo.core.Constants;
4448
import org.apache.accumulo.core.client.Accumulo;
4549
import org.apache.accumulo.core.client.AccumuloClient;
4650
import org.apache.accumulo.core.client.BatchWriter;
4751
import org.apache.accumulo.core.client.Scanner;
52+
import org.apache.accumulo.core.client.admin.AvailabilityForTablet;
4853
import org.apache.accumulo.core.client.admin.CompactionConfig;
4954
import org.apache.accumulo.core.client.admin.ImportConfiguration;
55+
import org.apache.accumulo.core.client.admin.TabletAvailability;
5056
import org.apache.accumulo.core.data.Key;
5157
import org.apache.accumulo.core.data.Mutation;
5258
import org.apache.accumulo.core.data.Range;
@@ -75,6 +81,8 @@
7581
import org.slf4j.Logger;
7682
import org.slf4j.LoggerFactory;
7783

84+
import com.google.common.collect.Sets;
85+
7886
/**
7987
* ImportTable didn't correctly place absolute paths in metadata. This resulted in the imported
8088
* table only being usable when the actual HDFS directory for Accumulo was the same as
@@ -367,6 +375,144 @@ public void testExportImportOffline(boolean fenced) throws Exception {
367375
}
368376
}
369377

378+
/**
379+
* Ensure all tablets in an imported table are ONDEMAND.
380+
*
381+
* Create a table with multiple tablets, each with a different tablet availability. Export the
382+
* table. Import the table and make sure that all tablets on the imported table have the ONDEMAND
383+
* tablet availability.
384+
*
385+
* This test case stitches together code from TableOperationsIT to create the table, set up the
386+
* tablets and verify. The code to export then import the table is from
387+
* ImportExportIT.testExportImportOffline()
388+
*/
389+
@Test
390+
public void testImportedTableIsOnDemand() throws Exception {
391+
392+
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
393+
String[] tableNames = getUniqueNames(2);
394+
String srcTable = tableNames[0], destTable = tableNames[1];
395+
396+
client.tableOperations().create(srcTable);
397+
String srcTableId = client.tableOperations().tableIdMap().get(srcTable);
398+
399+
// add split 'h' and 'q'. Leave first as ONDEMAND, set second to UNHOSTED, and third to HOSTED
400+
SortedSet<Text> splits = Sets.newTreeSet(Arrays.asList(new Text("h"), new Text("q")));
401+
client.tableOperations().addSplits(srcTable, splits);
402+
Range range = new Range(new Text("h"), false, new Text("q"), true);
403+
client.tableOperations().setTabletAvailability(srcTable, range, TabletAvailability.UNHOSTED);
404+
range = new Range(new Text("q"), false, null, true);
405+
client.tableOperations().setTabletAvailability(srcTable, range, TabletAvailability.HOSTED);
406+
407+
// verify
408+
List<AvailabilityForTablet> expectedTabletAvailability = new ArrayList<>();
409+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "h", null,
410+
TabletAvailability.ONDEMAND);
411+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "q", "h",
412+
TabletAvailability.UNHOSTED);
413+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, null, "q",
414+
TabletAvailability.HOSTED);
415+
verifyTabletAvailabilites(client, srcTable, new Range(), expectedTabletAvailability);
416+
417+
// Add a split within each of the existing tablets. Adding 'd', 'm', and 'v'
418+
splits = Sets.newTreeSet(Arrays.asList(new Text("d"), new Text("m"), new Text("v")));
419+
client.tableOperations().addSplits(srcTable, splits);
420+
421+
// verify results
422+
expectedTabletAvailability.clear();
423+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "d", null,
424+
TabletAvailability.ONDEMAND);
425+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "h", "d",
426+
TabletAvailability.ONDEMAND);
427+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "m", "h",
428+
TabletAvailability.UNHOSTED);
429+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "q", "m",
430+
TabletAvailability.UNHOSTED);
431+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, "v", "q",
432+
TabletAvailability.HOSTED);
433+
setExpectedTabletAvailability(expectedTabletAvailability, srcTableId, null, "v",
434+
TabletAvailability.HOSTED);
435+
verifyTabletAvailabilites(client, srcTable, new Range(), expectedTabletAvailability);
436+
437+
// Make a directory we can use to throw the export and import directories
438+
// Must exist on the filesystem the cluster is running.
439+
FileSystem fs = cluster.getFileSystem();
440+
log.info("Using FileSystem: " + fs);
441+
Path baseDir = new Path(cluster.getTemporaryPath(), getClass().getName());
442+
fs.deleteOnExit(baseDir);
443+
if (fs.exists(baseDir)) {
444+
log.info("{} exists on filesystem, deleting", baseDir);
445+
assertTrue(fs.delete(baseDir, true), "Failed to deleted " + baseDir);
446+
}
447+
log.info("Creating {}", baseDir);
448+
assertTrue(fs.mkdirs(baseDir), "Failed to create " + baseDir);
449+
Path exportDir = new Path(baseDir, "export");
450+
fs.deleteOnExit(exportDir);
451+
Path importDirA = new Path(baseDir, "import-a");
452+
Path importDirB = new Path(baseDir, "import-b");
453+
fs.deleteOnExit(importDirA);
454+
fs.deleteOnExit(importDirB);
455+
for (Path p : new Path[] {exportDir, importDirA, importDirB}) {
456+
assertTrue(fs.mkdirs(p), "Failed to create " + p);
457+
}
458+
459+
Set<String> importDirs = Set.of(importDirA.toString(), importDirB.toString());
460+
461+
Path[] importDirAry = new Path[] {importDirA, importDirB};
462+
463+
log.info("Exporting table to {}", exportDir);
464+
log.info("Importing table from {}", importDirs);
465+
466+
// test fast fail offline check
467+
assertThrows(IllegalStateException.class,
468+
() -> client.tableOperations().exportTable(srcTable, exportDir.toString()));
469+
470+
// Offline the table
471+
client.tableOperations().offline(srcTable, true);
472+
// Then export it
473+
client.tableOperations().exportTable(srcTable, exportDir.toString());
474+
475+
// Make sure the distcp.txt file that exporttable creates is available
476+
Path distcp = new Path(exportDir, "distcp.txt");
477+
fs.deleteOnExit(distcp);
478+
assertTrue(fs.exists(distcp), "Distcp file doesn't exist");
479+
FSDataInputStream is = fs.open(distcp);
480+
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
481+
482+
// Copy each file that was exported to one of the imports directory
483+
String line;
484+
485+
while ((line = reader.readLine()) != null) {
486+
Path p = new Path(line.substring(5));
487+
assertTrue(fs.exists(p), "File doesn't exist: " + p);
488+
Path importDir = importDirAry[RANDOM.get().nextInt(importDirAry.length)];
489+
Path dest = new Path(importDir, p.getName());
490+
assertFalse(fs.exists(dest), "Did not expect " + dest + " to exist");
491+
FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
492+
}
493+
494+
reader.close();
495+
496+
log.info("Import dir A: {}", Arrays.toString(fs.listStatus(importDirA)));
497+
log.info("Import dir B: {}", Arrays.toString(fs.listStatus(importDirB)));
498+
499+
// Import the exported data into a new table
500+
client.tableOperations().importTable(destTable, importDirs, ImportConfiguration.empty());
501+
502+
// Get the table ID for the table that the importtable command created
503+
final String destTableId = client.tableOperations().tableIdMap().get(destTable);
504+
assertNotNull(destTableId);
505+
506+
// Get all `file` colfams from the metadata table for the new table
507+
log.info("Imported into table with ID: {}", destTableId);
508+
509+
client.tableOperations().getTabletInformation(destTable, new Range())
510+
.forEach(tabletInformation -> assertEquals(TabletAvailability.ONDEMAND,
511+
tabletInformation.getTabletAvailability(),
512+
"Expected all tablets in imported table to be ONDEMAND"));
513+
}
514+
}
515+
370516
private boolean verifyMappingsFile(String destTableId) throws IOException {
371517
AccumuloCluster cluster = getCluster();
372518
assertTrue(cluster instanceof MiniAccumuloClusterImpl);

test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383

8484
public class TableOperationsIT extends AccumuloClusterHarness {
8585

86-
private AccumuloClient accumuloClient;
86+
private static AccumuloClient accumuloClient;
8787
private static final int MAX_TABLE_NAME_LEN = 1024;
8888

8989
@Override
@@ -727,9 +727,14 @@ private void verifyTablesWithSplits(String tableName, Map<String,String> idMap,
727727
expectedTabletAvailability);
728728
}
729729

730-
private void verifyTabletAvailabilites(String tableName, Range range,
730+
public static void verifyTabletAvailabilites(String tableName, Range range,
731731
List<AvailabilityForTablet> expectedAvailability) throws TableNotFoundException {
732-
List<TabletInformation> tabletInfo = accumuloClient.tableOperations()
732+
verifyTabletAvailabilites(accumuloClient, tableName, range, expectedAvailability);
733+
}
734+
735+
public static void verifyTabletAvailabilites(AccumuloClient client, String tableName, Range range,
736+
List<AvailabilityForTablet> expectedAvailability) throws TableNotFoundException {
737+
List<TabletInformation> tabletInfo = client.tableOperations()
733738
.getTabletInformation(tableName, range).collect(Collectors.toList());
734739
assertEquals(expectedAvailability.size(), tabletInfo.size());
735740
for (var i = 0; i < expectedAvailability.size(); i++) {
@@ -739,7 +744,7 @@ private void verifyTabletAvailabilites(String tableName, Range range,
739744
}
740745
}
741746

742-
private void setExpectedTabletAvailability(List<AvailabilityForTablet> expected, String id,
747+
public static void setExpectedTabletAvailability(List<AvailabilityForTablet> expected, String id,
743748
String endRow, String prevEndRow, TabletAvailability availability) {
744749
KeyExtent ke = new KeyExtent(TableId.of(id), endRow == null ? null : new Text(endRow),
745750
prevEndRow == null ? null : new Text(prevEndRow));

0 commit comments

Comments
 (0)