From f7ff29588b3027baa0955cd26414fe8d19fae123 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 16 Oct 2023 21:44:14 +0000 Subject: [PATCH] Consolidated destructive tablet fate code into a new class --- .../DestructiveTabletManagerRepo.java | 195 ++++++++++++++++++ .../tableOps/split/DeleteOperationIds.java | 45 +--- .../manager/tableOps/split/PreSplit.java | 88 +------- .../manager/tableOps/split/UpdateTablets.java | 49 +++-- 4 files changed, 235 insertions(+), 142 deletions(-) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/tableOps/DestructiveTabletManagerRepo.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/DestructiveTabletManagerRepo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/DestructiveTabletManagerRepo.java new file mode 100644 index 00000000000..1c89ea9c84a --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/DestructiveTabletManagerRepo.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.split.PreSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class DestructiveTabletManagerRepo extends ManagerRepo { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PreSplit.class); + + private final TabletOperationType opType; + private final KeyExtent extent; + + public DestructiveTabletManagerRepo(TabletOperationType opType, KeyExtent extent) { + Objects.requireNonNull(opType); + Objects.requireNonNull(extent); + this.opType = opType; + this.extent = extent; + } + + private TabletMetadata readTabletMetadata(Manager manager) { + return manager.getContext().getAmple().readTablet(extent, PREV_ROW, LOCATION, OPID); + } + + /** + * Destructive tablet operations (e.g. MERGE, SPLIT, DELETE) require that the corresponding tablet + * is unhosted. This is done by placing the TabltOperationId into the opid column in the tablet + * metadata. This method returns true when the opid in the tablet metadata is correct and the + * location is empty (which means the tablet is unhosted. This method should be called from the + * {@link #isReady(long, Manager)} method of the initial step of destructive tablet FATE + * operations + * + * @param tid transaction id + * @param manager manager + * @return true if can start the operation, false otherwise + */ + public boolean canStart(long tid, Manager manager) { + + final TabletMetadata tm = readTabletMetadata(manager); + LOG.trace("Attempting tablet {} {} {} {}", opType, FateTxId.formatTid(tid), extent, + tm == null ? null : tm.getLocation()); + final TabletOperationId opid = TabletOperationId.from(opType, tid); + + if (tm == null || (tm.getOperationId() != null && !opid.equals(tm.getOperationId()))) { + // tablet no longer exists or is reserved by another operation + return true; + } else if (opid.equals(tm.getOperationId())) { + if (tm.getLocation() == null) { + // the operation id is set and there is no location, so can proceed + return true; + } else { + // the operation id was set, but a location is also set wait for it be unset + return false; + } + } else { + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + + tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tm, LOCATION, PREV_ROW).putOperation(opid) + .submit(tmeta -> opid.equals(tmeta.getOperationId())); + + Map results = tabletsMutator.process(); + if (results.get(extent).getStatus() == Status.ACCEPTED) { + LOG.trace("Successfully set operation id for {} {}", opType, FateTxId.formatTid(tid)); + if (tm.getLocation() == null) { + // the operation id was set and there is no location, so can move on + return true; + } else { + // now that the operation id set, generate an event to unload the tablet + manager.getEventCoordinator().event(extent, "Set operation id %s on tablet for {}", + FateTxId.formatTid(tid), opType); + // the operation id was set, but a location is also set wait for it be unset + return false; + } + } else { + LOG.trace("Failed to set operation id for {} {}", opType, FateTxId.formatTid(tid)); + // something changed with the tablet, so setting the operation id failed. Try again later + return false; + } + } + } + } + + /** + * This method ensures that the opid in the tablet metadata matches this operation id and type, + * and ensures that the location is empty. This method should be called from + * {@link #isReady(long, Manager)} and {@link #call(long, Manager)} methods, except for the + * {@link #isReady(long, Manager)} method of the initial FATE step. + * + * @param tid transaction id + * @param manager manager + * @return true if can continue, false otherwise + */ + public boolean canContinue(long tid, Manager manager) { + final TabletMetadata tm = readTabletMetadata(manager); + final TabletOperationId opid = TabletOperationId.from(opType, tid); + + if (tm == null || !opid.equals(tm.getOperationId())) { + // the tablet no longer exists or we could not set the operation id, maybe another operation + // was running, lets not proceed with the split. + var optMeta = Optional.ofNullable(tm); + LOG.trace("{} Not proceeding with {}. extent:{} location:{} opid:{}", opType, + FateTxId.formatTid(tid), extent, optMeta.map(TabletMetadata::getLocation).orElse(null), + optMeta.map(TabletMetadata::getOperationId).orElse(null)); + return false; + } + + // Its expected that the tablet has no location at this point and if it does its an indication + // of a bug. + if (tm.getLocation() == null) { + LOG.trace("Tablet unexpectedly had location set %s %s %s", FateTxId.formatTid(tid), + tm.getLocation(), tm.getExtent()); + return false; + } + + return true; + + } + + /** + * Removes the operationIds from the tablet metadata. This should be called from the last step in + * the destructive tablet FATE operation + * + * @param tid transaction id + * @param manager manager + * @param extents extents from which operationId should be removed + * @throws IllegalStateException on error + */ + public void removeOperationIds(long tid, Manager manager, Set extents) { + + final TabletOperationId opid = TabletOperationId.from(opType, tid); + + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + + // As long as the operation is not our operation id, then this step can be considered + // successful in the case of rejection. If this repo is running for a second time and has + // already deleted the operation id, then it could be absent or set by another fate operation. + Ample.RejectionHandler rejectionHandler = + tabletMetadata -> !opid.equals(tabletMetadata.getOperationId()); + + extents.forEach(extent -> { + tabletsMutator.mutateTablet(extent).requireOperation(opid).requireAbsentLocation() + .deleteOperation().submit(rejectionHandler); + }); + + var results = tabletsMutator.process(); + + boolean allAccepted = + results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED); + + if (!allAccepted) { + throw new IllegalStateException("Failed to delete operation ids " + extent + " " + results + .values().stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet())); + } + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index a2c7169ea5a..9cdeb727c7f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -18,61 +18,32 @@ */ package org.apache.accumulo.manager.tableOps.split; -import java.util.stream.Collectors; - import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.logging.TabletLogger; -import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; -import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.DestructiveTabletManagerRepo; -public class DeleteOperationIds extends ManagerRepo { +public class DeleteOperationIds extends DestructiveTabletManagerRepo { private static final long serialVersionUID = 1L; private final SplitInfo splitInfo; public DeleteOperationIds(SplitInfo splitInfo) { + super(TabletOperationType.SPLITTING, splitInfo.getOriginal()); this.splitInfo = splitInfo; } @Override public Repo call(long tid, Manager manager) throws Exception { - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); - - try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - - // As long as the operation is not our operation id, then this step can be considered - // successful in the case of rejection. If this repo is running for a second time and has - // already deleted the operation id, then it could be absent or set by another fate operation. - Ample.RejectionHandler rejectionHandler = - tabletMetadata -> !opid.equals(tabletMetadata.getOperationId()); - - splitInfo.getTablets().forEach(extent -> { - tabletsMutator.mutateTablet(extent).requireOperation(opid).requireAbsentLocation() - .deleteOperation().submit(rejectionHandler); - }); - - var results = tabletsMutator.process(); - - boolean allAccepted = - results.values().stream().allMatch(result -> result.getStatus() == Status.ACCEPTED); - - if (!allAccepted) { - throw new IllegalStateException( - "Failed to delete operation ids " + splitInfo.getOriginal() + " " + results.values() - .stream().map(Ample.ConditionalResult::getStatus).collect(Collectors.toSet())); - } + removeOperationIds(tid, manager, splitInfo.getTablets()); - // Get the tablets hosted ASAP if necessary. - manager.getEventCoordinator().event(splitInfo.getOriginal(), "Added %d splits to %s", - splitInfo.getSplits().size(), splitInfo.getOriginal()); + // Get the tablets hosted ASAP if necessary. + manager.getEventCoordinator().event(splitInfo.getOriginal(), "Added %d splits to %s", + splitInfo.getSplits().size(), splitInfo.getOriginal()); - TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits()); - } + TabletLogger.split(splitInfo.getOriginal(), splitInfo.getSplits()); return null; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 5e6505f58db..e2da8481d50 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -18,27 +18,17 @@ */ package org.apache.accumulo.manager.tableOps.split; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; - import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.SortedSet; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.DestructiveTabletManagerRepo; import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -46,14 +36,14 @@ import com.google.common.base.Preconditions; -public class PreSplit extends ManagerRepo { +public class PreSplit extends DestructiveTabletManagerRepo { private static final long serialVersionUID = 1L; private static final Logger log = LoggerFactory.getLogger(PreSplit.class); private final SplitInfo splitInfo; public PreSplit(KeyExtent expectedExtent, SortedSet splits) { - Objects.requireNonNull(expectedExtent); + super(TabletOperationType.SPLITTING, expectedExtent); Objects.requireNonNull(splits); Preconditions.checkArgument(!splits.isEmpty()); Preconditions.checkArgument(!expectedExtent.isRootTablet()); @@ -66,8 +56,6 @@ public long isReady(long tid, Manager manager) throws Exception { // ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed, // revist later when more operations are moved out of tablet server - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); - // ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to // the same tablet. @@ -76,79 +64,21 @@ public long isReady(long tid, Manager manager) throws Exception { // that have not completed their first step. Once splits starts running, would like it to move // through as quickly as possible. - var tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal(), - PREV_ROW, LOCATION, OPID); - - log.trace("Attempting tablet split {} {} {}", FateTxId.formatTid(tid), splitInfo.getOriginal(), - tabletMetadata == null ? null : tabletMetadata.getLocation()); - - if (tabletMetadata == null || (tabletMetadata.getOperationId() != null - && !opid.equals(tabletMetadata.getOperationId()))) { - // tablet no longer exists or is reserved by another operation + if (canStart(tid, manager)) { return 0; - } else if (opid.equals(tabletMetadata.getOperationId())) { - if (tabletMetadata.getLocation() == null) { - // the operation id is set and there is no location, so can proceed to split - return 0; - } else { - // the operation id was set, but a location is also set wait for it be unset - return 1000; - } - } else { - try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - - tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation() - .requireSame(tabletMetadata, LOCATION, PREV_ROW).putOperation(opid) - .submit(tmeta -> opid.equals(tmeta.getOperationId())); - - Map results = tabletsMutator.process(); - if (results.get(splitInfo.getOriginal()).getStatus() == Status.ACCEPTED) { - log.trace("Successfully set operation id for split {}", FateTxId.formatTid(tid)); - if (tabletMetadata.getLocation() == null) { - // the operation id was set and there is no location, so can move on - return 0; - } else { - // now that the operation id set, generate an event to unload the tablet - manager.getEventCoordinator().event(splitInfo.getOriginal(), - "Set operation id %s on tablet for split", FateTxId.formatTid(tid)); - // the operation id was set, but a location is also set wait for it be unset - return 1000; - } - } else { - log.trace("Failed to set operation id for split {}", FateTxId.formatTid(tid)); - // something changed with the tablet, so setting the operation id failed. Try again later - return 1000; - } - } } + return 1000; } @Override public Repo call(long tid, Manager manager) throws Exception { - manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); - - TabletMetadata tabletMetadata = manager.getContext().getAmple() - .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID); - - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); - - if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) { - // the tablet no longer exists or we could not set the operation id, maybe another operation - // was running, lets not proceed with the split. - var optMeta = Optional.ofNullable(tabletMetadata); - log.trace("{} Not proceeding with split. extent:{} location:{} opid:{}", - FateTxId.formatTid(tid), splitInfo.getOriginal(), - optMeta.map(TabletMetadata::getLocation).orElse(null), - optMeta.map(TabletMetadata::getOperationId).orElse(null)); - return null; + if (!canContinue(tid, manager)) { + throw new IllegalStateException( + "Tablet is in an unexpected condition: " + splitInfo.getOriginal()); } - // Its expected that the tablet has no location at this point and if it does its an indication - // of a bug. - Preconditions.checkState(tabletMetadata.getLocation() == null, - "Tablet unexpectedly had location set %s %s %s", FateTxId.formatTid(tid), - tabletMetadata.getLocation(), tabletMetadata.getExtent()); + manager.getSplitter().removeSplitStarting(splitInfo.getOriginal()); // Create the dir name here for the next step. If the next step fails it will always have the // same dir name each time it runs again making it idempotent. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 28131f795e3..48ae988484d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -37,7 +37,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.DestructiveTabletManagerRepo; import org.apache.accumulo.server.util.FileUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,50 +45,47 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; -public class UpdateTablets extends ManagerRepo { +public class UpdateTablets extends DestructiveTabletManagerRepo { private static final Logger log = LoggerFactory.getLogger(UpdateTablets.class); private static final long serialVersionUID = 1L; private final SplitInfo splitInfo; private final List dirNames; public UpdateTablets(SplitInfo splitInfo, List dirNames) { + super(TabletOperationType.SPLITTING, splitInfo.getOriginal()); this.splitInfo = splitInfo; this.dirNames = dirNames; } @Override public Repo call(long tid, Manager manager) throws Exception { + TabletMetadata tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal()); var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); - if (tabletMetadata == null) { - // check to see if this operation has already succeeded. - TabletMetadata newTabletMetadata = - manager.getContext().getAmple().readTablet(splitInfo.getTablets().last()); - - if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) { - // have already created the new tablet and failed before we could return the next step, so - // lets go ahead and return the next step. - log.trace( - "{} creating new tablet was rejected because it existed, operation probably failed before.", - FateTxId.formatTid(tid)); - return new DeleteOperationIds(splitInfo); - } else { - throw new IllegalStateException("Tablet is in an unexpected condition " - + splitInfo.getOriginal() + " " + (newTabletMetadata == null) + " " - + (newTabletMetadata == null ? null : newTabletMetadata.getOperationId())); - } - } + if (!canContinue(tid, manager)) { - Preconditions.checkState(tabletMetadata.getOperationId().equals(opid), - "Tablet %s does not have expected operation id %s it has %s", splitInfo.getOriginal(), opid, - tabletMetadata.getOperationId()); + if (tabletMetadata == null) { + // check to see if this operation has already succeeded. + TabletMetadata newTabletMetadata = + manager.getContext().getAmple().readTablet(splitInfo.getTablets().last()); - Preconditions.checkState(tabletMetadata.getLocation() == null, - "Tablet %s unexpectedly has a location %s", splitInfo.getOriginal(), - tabletMetadata.getLocation()); + if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) { + // have already created the new tablet and failed before we could return the next step, so + // lets go ahead and return the next step. + log.trace( + "{} creating new tablet was rejected because it existed, operation probably failed before.", + FateTxId.formatTid(tid)); + return new DeleteOperationIds(splitInfo); + } else { + throw new IllegalStateException("Tablet is in an unexpected condition " + + splitInfo.getOriginal() + " " + (newTabletMetadata == null) + " " + + (newTabletMetadata == null ? null : newTabletMetadata.getOperationId())); + } + } + } var newTablets = splitInfo.getTablets();