Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,4 @@ public int compareTo(MetadataTime mtime) {
"Cannot compare different time types: " + this + " and " + mtime);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;

Expand Down Expand Up @@ -222,6 +223,13 @@ private void requireSameSingle(TabletMetadata tabletMetadata, ColumnType type) {
mutation.addCondition(c);
}
break;
case TIME: {
Condition c =
new Condition(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier());
c = c.setValue(tabletMetadata.getTime().encode());
mutation.addCondition(c);
}
break;
default:
throw new UnsupportedOperationException("Column type " + type + " is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;

import java.util.ArrayList;
import java.util.Comparator;
Expand Down Expand Up @@ -49,13 +50,12 @@
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* Make asynchronous load calls to each overlapping Tablet. This RepO does its work on the isReady
* and will return a linear sleep value based on the largest number of Tablets on a TabletServer.
Expand Down Expand Up @@ -101,8 +101,6 @@ private static class Loader {
Ample.ConditionalTabletsMutator conditionalMutator;

void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
// ELASTICITY_TODO handle setting time... handle case where tablets are hosted and unhosted
Preconditions.checkArgument(!setTime);
this.bulkDir = bulkDir;
this.manager = manager;
this.tid = tid;
Expand All @@ -115,9 +113,24 @@ void load(List<TabletMetadata> tablets, Files files) {
for (TabletMetadata tablet : tablets) {
Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>();

if (setTime && tablet.getLocation() != null) {
throw new IllegalStateException("Setting time on hosted tablet is not implemented");
}

var tabletTime = TabletTime.getInstance(tablet.getTime());

for (final Bulk.FileInfo fileInfo : files) {
filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())),
new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()));

DataFileValue dfv;

if (setTime) {
dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(),
tabletTime.getAndUpdateTime());
} else {
dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries());
}

filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())), dfv);
}

// remove any files that were already loaded
Expand All @@ -127,12 +140,24 @@ void load(List<TabletMetadata> tablets, Files files) {

if (!filesToLoad.isEmpty()) {
// ELASTICITY_TODO lets automatically call require prev end row
var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
.requireAbsentOperation().requireSame(tablet, PREV_ROW, LOADED);
var tabletMutator =
conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();

if (setTime) {
tabletMutator.requireSame(tablet, PREV_ROW, LOADED, TIME, LOCATION);
} else {
tabletMutator.requireSame(tablet, PREV_ROW, LOADED);
}

filesToLoad.forEach((f, v) -> {
tabletMutator.putBulkFile(f, tid);
tabletMutator.putFile(f, v);

if (setTime) {
// ELASTICITY_TODO this is not the correct thing to do when the tablet is hosted and
// could be harmful
tabletMutator.putTime(tabletTime.getMetadataTime());
}
});

tabletMutator.submit(tm -> false);
Expand Down Expand Up @@ -179,7 +204,7 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa

Iterator<TabletMetadata> tabletIter =
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null)
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build().iterator();
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build().iterator();

Loader loader = new Loader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -49,6 +50,7 @@
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
Expand All @@ -61,6 +63,7 @@
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
Expand Down Expand Up @@ -719,4 +722,35 @@ public void testRootTabletUpdate() throws Exception {
assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus());
assertEquals(7L, context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong());
}

@Test
public void testTime() {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
var context = cluster.getServerContext();

for (var time : List.of(new MetadataTime(100, TimeType.LOGICAL),
new MetadataTime(100, TimeType.MILLIS), new MetadataTime(0, TimeType.LOGICAL))) {
var ctmi = new ConditionalTabletsMutatorImpl(context);
var tabletMeta1 = TabletMetadata.builder(e1).putTime(time).build();
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
.putTime(new MetadataTime(101, TimeType.LOGICAL)).submit(tabletMetadata -> false);
var results = ctmi.process();
assertEquals(Status.REJECTED, results.get(e1).getStatus());
assertEquals(new MetadataTime(0, TimeType.MILLIS),
context.getAmple().readTablet(e1).getTime());
}

for (int i = 0; i < 10; i++) {
var ctmi = new ConditionalTabletsMutatorImpl(context);
var tabletMeta1 =
TabletMetadata.builder(e1).putTime(new MetadataTime(i, TimeType.MILLIS)).build();
ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, TIME)
.putTime(new MetadataTime(i + 1, TimeType.MILLIS)).submit(tabletMetadata -> false);
var results = ctmi.process();
assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
assertEquals(new MetadataTime(i + 1, TimeType.MILLIS),
context.getAmple().readTablet(e1).getTime());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,20 @@
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
Expand All @@ -82,7 +85,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -183,8 +185,6 @@ public void testSingleTabletSingleFile() throws Exception {
}

@Test
@Disabled("Need to implement set time functionality")
// ELASTICITY_TODO
public void testSetTime() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
tableName = "testSetTime_table1";
Expand All @@ -193,6 +193,11 @@ public void testSetTime() throws Exception {
newTableConf.setTimeType(TimeType.LOGICAL);
client.tableOperations().create(tableName, newTableConf);
testSingleTabletSingleFile(client, false, true);

var ctx = (ClientContext) client;
var tabletTime = ctx.getAmple()
.readTablet(new KeyExtent(ctx.getTableId(tableName), new Text("0333"), null)).getTime();
assertEquals(new MetadataTime(1, TimeType.LOGICAL), tabletTime);
}
}

Expand Down