From 9f6d1ffe68e8df640b87a8c8808ecca963d17172 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 27 Mar 2023 21:21:45 +0000 Subject: [PATCH 1/5] Started working on multiple managers --- .../core/clientImpl/ClientContext.java | 59 +++- .../apache/accumulo/core/conf/Property.java | 15 + .../ClusterServerConfiguration.java | 14 +- .../MiniAccumuloClusterControl.java | 48 ++- .../MiniAccumuloClusterImpl.java | 10 +- .../state/LoggingTabletStateStore.java | 7 + .../manager/state/MetaDataStateStore.java | 10 + .../manager/state/RootTabletStateStore.java | 6 + .../manager/state/TabletStateStore.java | 9 + .../manager/state/ZooTabletStateStore.java | 1 + .../accumulo/manager/LiveManagerSet.java | 99 ++++++ .../org/apache/accumulo/manager/Manager.java | 319 ++++++++++++------ .../manager/ManagerClientServiceHandler.java | 2 +- .../accumulo/manager/MultipleManagerUtil.java | 100 ++++++ .../accumulo/manager/TabletGroupWatcher.java | 107 +++++- .../manager/tableOps/clone/CloneMetadata.java | 4 +- .../tableOps/create/PopulateMetadata.java | 2 +- .../tableImport/PopulateMetadataTable.java | 2 +- .../manager/tserverOps/ShutdownTServer.java | 2 +- .../manager/MultipleManagerUtilTest.java | 231 +++++++++++++ .../manager/tableOps/ShutdownTServerTest.java | 2 +- .../apache/accumulo/tserver/TabletServer.java | 6 +- .../test/functional/BackupManagerIT.java | 74 ---- .../test/functional/MultipleManagerIT.java | 88 +++++ .../functional/MultipleManagerLockIT.java | 141 ++++++++ 25 files changed, 1125 insertions(+), 233 deletions(-) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/LiveManagerSet.java create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/MultipleManagerUtil.java create mode 100644 server/manager/src/test/java/org/apache/accumulo/manager/MultipleManagerUtilTest.java delete mode 100644 test/src/main/java/org/apache/accumulo/test/functional/BackupManagerIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerLockIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 3ef940db961..712aa912f32 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -29,6 +29,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -506,12 +507,7 @@ public String getRootTabletLocation() { return loc.getHostPort(); } - /** - * Returns the location(s) of the accumulo manager and any redundant servers. - * - * @return a list of locations in "hostname:port" form - */ - public List getManagerLocations() { + public String getPrimaryManagerLocation() { ensureOpen(); var zLockManagerPath = ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGER_LOCK); @@ -519,7 +515,7 @@ public List getManagerLocations() { OpTimer timer = null; if (log.isTraceEnabled()) { - log.trace("tid={} Looking up manager location in zookeeper at {}.", + log.trace("tid={} Looking up primary manager location in zookeeper at {}.", Thread.currentThread().getId(), zLockManagerPath); timer = new OpTimer().start(); } @@ -527,20 +523,59 @@ public List getManagerLocations() { Optional sld = zooCache.getLockData(zLockManagerPath); String location = null; if (sld.isPresent()) { - location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); + location = sld.orElseThrow().getAddressString(ThriftService.FATE); } if (timer != null) { timer.stop(); - log.trace("tid={} Found manager at {} in {}", Thread.currentThread().getId(), + log.trace("tid={} Found primary manager at {} in {}", Thread.currentThread().getId(), (location == null ? "null" : location), String.format("%.3f secs", timer.scale(SECONDS))); } - if (location == null) { - return Collections.emptyList(); + return location; + } + + /** + * Returns the location(s) of the accumulo manager and any redundant servers. + * + * @return a list of locations in "hostname:port" form + */ + public List getManagerLocations() { + ensureOpen(); + + List locations = new ArrayList<>(); + + var zLockManagerPath = + ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS); + + OpTimer timer = null; + + if (log.isTraceEnabled()) { + log.trace("tid={} Looking up manager locations in zookeeper at {}.", + Thread.currentThread().getId(), zLockManagerPath); + timer = new OpTimer().start(); + } + + for (String manager : zooCache + .getChildren(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS)) { + if (manager.contains(":")) { + var zLocPath = ServiceLock + .path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS + "/" + manager); + Optional sld = zooCache.getLockData(zLocPath); + if (sld.isPresent()) { + locations.add(sld.get().getAddressString(ThriftService.MANAGER)); + } + } + } + + if (timer != null) { + timer.stop(); + log.trace("tid={} Found managers at {} in {}", Thread.currentThread().getId(), + (locations == null ? "null" : locations), + String.format("%.3f secs", timer.scale(SECONDS))); } - return Collections.singletonList(location); + return locations; } /** diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a3f734164e3..824f680fe8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -317,6 +317,8 @@ public enum Property { "Properties in this category affect the behavior of the manager server.", "2.1.0"), MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), + MANAGER_PORTSEARCH("manager.port.search", "false", PropertyType.BOOLEAN, + "If the manager.port.client is in use, search higher ports until one is available", "3.1.0"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -371,6 +373,19 @@ public enum Property { "Allow tablets for the " + MetadataTable.NAME + " table to be suspended via table.suspend.duration.", "1.8.0"), + MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT("manager.startup.manager.avail.min.count", "0", + PropertyType.COUNT, + "Minimum number of managers that need to be registered before a manager will start. A value " + + "greater than 0 is useful when multiple managers are supposed to be running on startup. " + + "When set to 0 or less, no blocking occurs. Default is 0 (disabled)", + "3.1.0"), + MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait", "0", + PropertyType.TIMEDURATION, + "Maximum time manager will wait for manager available threshold " + + "to be reached before continuing. When set to 0 or less, will block " + + "indefinitely. Default is 0 to block indefinitely. Only valid when manager available " + + "threshold is set greater than 1.", + "3.1.0"), MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count", "0", PropertyType.COUNT, "Minimum number of tservers that need to be registered before manager will " diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java index 13fa4b93891..fe74b586861 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java @@ -27,6 +27,7 @@ public class ClusterServerConfiguration { + private int numManagers = 1; private final Map compactors; private final Map sservers; private final Map tservers; @@ -36,7 +37,7 @@ public class ClusterServerConfiguration { * in the default resource group */ public ClusterServerConfiguration() { - this(1, 1, 2); + this(1, 1, 1, 2); } /** @@ -46,7 +47,8 @@ public ClusterServerConfiguration() { * @param numSServers number of scan servers in the default resource group * @param numTServers number of tablet servers in the default resource group */ - public ClusterServerConfiguration(int numCompactors, int numSServers, int numTServers) { + public ClusterServerConfiguration(int numManagers, int numCompactors, int numSServers, int numTServers) { + this.numManagers = numManagers; compactors = new HashMap<>(); compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); sservers = new HashMap<>(); @@ -55,6 +57,14 @@ public ClusterServerConfiguration(int numCompactors, int numSServers, int numTSe tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers); } + public int getNumManagers() { + return this.numManagers; + } + + public void setNumManagers(int num) { + this.numManagers = num; + } + public void setNumDefaultCompactors(int numCompactors) { compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 6d3b36026ef..8ce12c1ada2 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -57,7 +57,7 @@ public class MiniAccumuloClusterControl implements ClusterControl { protected MiniAccumuloClusterImpl cluster; Process zooKeeperProcess = null; - Process managerProcess = null; + final List managerProcesses = new ArrayList<>(); Process gcProcess = null; Process monitor = null; final Map> tabletServerProcesses = new HashMap<>(); @@ -189,8 +189,13 @@ public synchronized void start(ServerType server, Map configOverr } break; case MANAGER: - if (managerProcess == null) { - managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess(); + synchronized (managerProcesses) { + int count = 0; + for (int i = managerProcesses.size(); + count < limit && i < cluster.getConfig().getClusterServerConfiguration().getNumManagers(); i++, ++count) { + managerProcesses + .add(cluster._exec(classToUse, server, configOverrides).getProcess()); + } } break; case ZOOKEEPER: @@ -258,15 +263,19 @@ public void stop(ServerType server) throws IOException { public synchronized void stop(ServerType server, String hostname) throws IOException { switch (server) { case MANAGER: - if (managerProcess != null) { + synchronized (managerProcesses) { try { - cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS); - } catch (ExecutionException | TimeoutException e) { - log.warn("Manager did not fully stop after 30 seconds", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + for (Process manager : managerProcesses) { + try { + cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + log.warn("Manager did not fully stop after 30 seconds", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } finally { - managerProcess = null; + managerProcesses.clear(); } } break; @@ -392,14 +401,19 @@ public void killProcess(ServerType type, ProcessReference procRef) boolean found = false; switch (type) { case MANAGER: - if (procRef.getProcess().equals(managerProcess)) { - try { - cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS); - } catch (ExecutionException | TimeoutException e) { - log.warn("Manager did not fully stop after 30 seconds", e); + synchronized (managerProcesses) { + for (Process manager : managerProcesses) { + if (procRef.getProcess().equals(manager)) { + managerProcesses.remove(manager); + try { + cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e) { + log.warn("Manager did not fully stop after 30 seconds", e); + } + found = true; + break; + } } - managerProcess = null; - found = true; } break; case TABLET_SERVER: diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 091b7e9c6a0..63953e585fa 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -703,8 +703,12 @@ private void verifyUp() throws InterruptedException, IOException { int numTries = 10; - requireNonNull(getClusterControl().managerProcess, "Error starting Manager - no process"); - waitForProcessStart(getClusterControl().managerProcess, "Manager"); + int mgrExpectedCount = 0; + for (Process tsp : getClusterControl().managerProcesses) { + mgrExpectedCount++; + requireNonNull(tsp, "Error starting Manager " + mgrExpectedCount + " - no process"); + waitForProcessStart(tsp, "Manager" + mgrExpectedCount); + } requireNonNull(getClusterControl().gcProcess, "Error starting GC - no process"); waitForProcessStart(getClusterControl().gcProcess, "GC"); @@ -875,7 +879,7 @@ List references(Process... procs) { public Map> getProcesses() { Map> result = new HashMap<>(); MiniAccumuloClusterControl control = getClusterControl(); - result.put(ServerType.MANAGER, references(control.managerProcess)); + result.put(ServerType.MANAGER, references(control.managerProcesses.toArray(new Process[0]))); result.put(ServerType.TABLET_SERVER, references(control.tabletServerProcesses.values().stream() .flatMap(List::stream).collect(Collectors.toList()).toArray(new Process[0]))); result.put(ServerType.COMPACTOR, references(control.compactorProcesses.values().stream() diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index 8545ecd7c75..47c6f4d861f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@ -59,6 +59,13 @@ public ClosableIterator iterator(List ranges, TabletManagementParameters parameters) { return wrapped.iterator(ranges, parameters); } + + @Override + @Deprecated + public void overrideRanges(List ranges) { + wrapped.overrideRanges(ranges); + } + @Override public void setFutureLocations(Collection assignments) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index a182745da5d..21eefaeb13b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.manager.state; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -28,6 +29,7 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import com.google.common.base.Preconditions; @@ -38,6 +40,7 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState private final String targetTableName; private final Ample ample; private final DataLevel level; + private List ranges; protected MetaDataStateStore(DataLevel level, ClientContext context, String targetTableName) { super(context); @@ -45,6 +48,7 @@ protected MetaDataStateStore(DataLevel level, ClientContext context, String targ this.context = context; this.ample = context.getAmple(); this.targetTableName = targetTableName; + this.ranges = Collections.singletonList(TabletsSection.getRange()); } MetaDataStateStore(DataLevel level, ClientContext context) { @@ -56,6 +60,12 @@ public DataLevel getLevel() { return level; } + @Override + @Deprecated + public void overrideRanges(List ranges) { + this.ranges = ranges; + } + @Override public ClosableIterator iterator(List ranges, TabletManagementParameters parameters) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java index 97c9f7ec32d..ab1a054afaf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java @@ -45,4 +45,10 @@ public ClosableIterator iterator(List ranges, public String name() { return "Metadata Tablets"; } + + @Override + public DataLevel getLevel() { + return DataLevel.METADATA; + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index ee81950fe71..b10505aab30 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -62,6 +62,15 @@ default ClosableIterator iterator(TabletManagementParameters p return iterator(List.of(MetadataSchema.TabletsSection.getRange()), parameters); } + /** + * Override the range of tablets that the TabletStateStore should retrieve. By default it + * retrieves all tablets. + */ + @Deprecated + default void overrideRanges(List ranges) { + throw new UnsupportedOperationException("Not implemented."); + } + /** * Store the assigned locations in the data store. */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index 8b7a5abb0c9..0e6fc01bf08 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -179,4 +179,5 @@ public void unsuspend(Collection tablets) { public String name() { return "Root Table"; } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/LiveManagerSet.java b/server/manager/src/main/java/org/apache/accumulo/manager/LiveManagerSet.java new file mode 100644 index 00000000000..7d1c0797986 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/LiveManagerSet.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LiveManagerSet implements Watcher { + + public interface Listener { + void managersChanged(Collection addresses); + } + + private static final Logger log = LoggerFactory.getLogger(LiveManagerSet.class); + + private final Listener cback; + private final ServerContext context; + private ZooCache zooCache; + private final String path; + + private List managers = new ArrayList<>(); + + public LiveManagerSet(ServerContext context, Listener cback) { + this.cback = cback; + this.context = context; + path = this.context.getZooKeeperRoot() + Constants.ZMANAGERS; + } + + public synchronized ZooCache getZooCache() { + if (zooCache == null) { + zooCache = new ZooCache(context.getZooReader(), this); + } + return zooCache; + } + + public synchronized void startListeningForManagerServerChanges() { + scanServers(); + + ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor() + .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS)); + } + + public synchronized void scanServers() { + try { + + List current = getZooCache().getChildren(path).stream().filter(s -> s.contains(":")) + .collect(Collectors.toList()); + if (current.equals(managers)) { + return; + } + managers = new ArrayList<>(current); + this.cback.managersChanged(managers); + } catch (Exception ex) { + log.error("{}", ex.getMessage(), ex); + } + } + + @Override + public void process(WatchedEvent event) { + if (event.getPath() != null) { + if (event.getPath().endsWith(Constants.ZMANAGERS) + || event.getPath().contains(Constants.ZMANAGERS)) { + // something changed, scan the servers + scanServers(); + } else { + log.info("event received: {}", event); + } + } + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 21da4e3bad2..76ac0b2eb44 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,9 +50,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -77,6 +79,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; @@ -88,6 +91,7 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; +import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; @@ -158,6 +162,7 @@ import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -182,10 +187,11 @@ public class Manager extends AbstractServer static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; - private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND; static final int MAX_TSERVER_WORK_CHUNK = 5000; private static final int MAX_BAD_STATUS_COUNT = 3; private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D; + private static final AtomicBoolean PRIMARY_MANAGER = new AtomicBoolean(false); + private static final UUID MANAGER_SERVER_UUID = UUID.randomUUID(); private final Object balancedNotifier = new Object(); final LiveTServerSet tserverSet; @@ -205,8 +211,10 @@ public class Manager extends AbstractServer private ZooAuthenticationKeyDistributor keyDistributor; private AuthenticationTokenKeyManager authenticationTokenKeyManager; - ServiceLock managerLock = null; + private ServiceLock managerServerLock = null; + ServiceLock primaryManagerLock = null; private TServer clientService = null; + private HostAndPort clientServiceAddress = null; protected volatile TabletBalancer tabletBalancer; private final BalancerEnvironment balancerEnvironment; @@ -682,7 +690,7 @@ public void run() { for (TServerInstance server : currentServers) { try { serversToShutdown.add(server); - tserverSet.getConnection(server).fastHalt(managerLock); + tserverSet.getConnection(server).fastHalt(primaryManagerLock); } catch (TException e) { // its probably down, and we don't care } finally { @@ -766,7 +774,7 @@ private void checkForHeldServer(SortedMap ts try { TServerConnection connection = tserverSet.getConnection(instance); if (connection != null) { - connection.fastHalt(managerLock); + connection.fastHalt(primaryManagerLock); } } catch (TException e) { log.error("{}", e.getMessage(), e); @@ -876,7 +884,7 @@ private SortedMap gatherTableInformation( try { TServerConnection connection2 = tserverSet.getConnection(server); if (connection2 != null) { - connection2.halt(managerLock); + connection2.halt(primaryManagerLock); } } catch (TTransportException e1) { // ignore: it's probably down @@ -927,6 +935,82 @@ private SortedMap gatherTableInformation( return info; } + private void announceExistence(HostAndPort address) { + ZooReaderWriter zoo = getContext().getZooReaderWriter(); + + try { + var zLockPath = ServiceLock + .path(getContext().getZooKeeperRoot() + Constants.ZMANAGERS + "/" + address.toString()); + + try { + zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NOAUTH) { + log.error("Failed to write to ZooKeeper. Ensure that" + + " accumulo.properties, specifically instance.secret, is consistent."); + } + throw e; + } + + managerServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, MANAGER_SERVER_UUID); + + LockWatcher lw = new LockWatcher() { + + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt("Manager server lock in zookeeper lost (reason = " + reason + "), exiting!", + -1); + getContext().getLowMemoryDetector().logGCInfo(getConfiguration()); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); + + } + }; + + ServiceDescriptors descriptors = new ServiceDescriptors(); + // Insert the service with a fake address so that clients can't connect right now + descriptors.addService( + new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + for (int i = 0; i < 120 / 5; i++) { + zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); + if (managerServerLock.tryLock(lw, sld)) { + log.debug("Obtained manager server lock {}", managerServerLock.getLockPath()); + return; + } + log.info("Waiting for manager server lock"); + sleepUninterruptibly(5, TimeUnit.SECONDS); + } + String msg = "Too many retries, exiting."; + log.info(msg); + throw new RuntimeException(msg); + } catch (Exception e) { + log.info("Could not obtain manager server lock, exiting.", e); + throw new RuntimeException(e); + } + } + + private static void updateLockContent(HostAndPort address, ServiceLock primaryManagerLock) { + ServiceDescriptors descriptors = new ServiceDescriptors(); + descriptors.addService( + new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + if (PRIMARY_MANAGER.get()) { + descriptors.addService( + new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.FATE, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + } + ServiceLockData sld = new ServiceLockData(descriptors); + log.info("Setting manager lock data to {}", sld.toString()); + try { + primaryManagerLock.replaceLockData(sld); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception updating manager lock", e); + } + + } + @Override public void run() { final ServerContext context = getContext(); @@ -940,37 +1024,53 @@ public void run() { // // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); - managerClientHandler = new ManagerClientServiceHandler(this); compactionCoordinator = new CompactionCoordinator(context, tserverSet, security, compactionJobQueues, nextEvent); // Start the Manager's Client service - // Ensure that calls before the manager gets the lock fail - ManagerClientService.Iface haProxy = + managerClientHandler = new ManagerClientServiceHandler(this); + // Ensure that calls before the manager is initialized fail + ManagerClientService.Iface mgrProxy = HighlyAvailableServiceWrapper.service(managerClientHandler, this); + FateService.Iface fateProxy = HighlyAvailableServiceWrapper.service(fateServiceHandler, this); ServerAddress sa; - var processor = ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, - compactionCoordinator, haProxy, getContext()); + var processor = ThriftProcessorTypes.getManagerTProcessor(fateProxy, compactionCoordinator, mgrProxy, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, - "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, - Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, - Property.GENERAL_MAX_MESSAGE_SIZE); + "Manager", "Manager Client Service Handler", Property.MANAGER_PORTSEARCH, + Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, + Property.MANAGER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } clientService = sa.server; - log.info("Started Manager client service at {}", sa.address); + clientServiceAddress = sa.address; + log.info("Started Manager client service at {}", clientServiceAddress); - // block until we can obtain the ZK lock for the manager - ServiceLockData sld = null; + // announceExistence by creating an entry at the Constants.ZMANAGERS path. + announceExistence(clientServiceAddress); + + // wait a configurable amount of time for multiple manager processes to start, then + // try to get the ZMANAGER_LOCK. This will determine which manager manages not + // only the FATE transactions, but the root and metadata tables. + try { + blockForManagers(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + // Now that the minimum expected Managers are up, try to become the primary Manager. + // The Primary Manager will be responsible for Fate operations and metadata table tablets try { - sld = getManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK)); + getPrimaryManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK), + clientServiceAddress); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception getting manager lock", e); } + setManagerState(ManagerState.HAVE_LOCK); + // If UpgradeStatus is not at complete by this moment, then things are currently // upgrading. if (upgradeCoordinator.getStatus() != UpgradeCoordinator.UpgradeStatus.COMPLETE) { @@ -980,7 +1080,7 @@ public void run() { ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this); try { MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, - sa.getAddress()); + clientServiceAddress); MetricsUtil.initializeProducers(this, mm); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException @@ -1035,7 +1135,7 @@ public void process(WatchedEvent event) { this.splitter = new Splitter(context); this.splitter.start(); - watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm) { + watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm, () -> true) { @Override boolean canSuspendTablets() { // Always allow user data tablets to enter suspended state. @@ -1043,7 +1143,7 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm) { + watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm, () -> PRIMARY_MANAGER.get()) { @Override boolean canSuspendTablets() { // Allow metadata tablets to enter suspended state only if so configured. Generally @@ -1054,13 +1154,14 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm) { + watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm, () -> PRIMARY_MANAGER.get()) { @Override boolean canSuspendTablets() { // Never allow root tablet to enter suspended state. return false; } }); + for (TabletGroupWatcher watcher : watchers) { watcher.start(); } @@ -1122,21 +1223,7 @@ boolean canSuspendTablets() { log.info("AuthenticationTokenSecretManager is initialized"); } - String address = sa.address.toString(); - UUID uuid = sld.getServerUUID(ThriftService.MANAGER); - ServiceDescriptors descriptors = new ServiceDescriptors(); - for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, - ThriftService.FATE}) { - descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); - } - - sld = new ServiceLockData(descriptors); - log.info("Setting manager lock data to {}", sld.toString()); - try { - managerLock.replaceLockData(sld); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Exception updating manager lock", e); - } + updateLockContent(clientServiceAddress, this.primaryManagerLock); while (!clientService.isServing()) { sleepUninterruptibly(100, MILLISECONDS); @@ -1193,6 +1280,28 @@ boolean canSuspendTablets() { log.info("exiting"); } + private void blockForTservers() throws InterruptedException { + blockForServers(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, + Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, () -> tserverSet.size(), "tserver"); + } + + private void blockForManagers() throws InterruptedException { + Supplier count = new Supplier<>() { + @Override + public Integer get() { + try { + return (int) getContext().getZooReader() + .getChildren(getContext().getZooKeeperRoot() + Constants.ZMANAGERS).stream() + .filter(s -> s.contains(":")).count(); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error getting manager count", e); + } + } + }; + blockForServers(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, + Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, count, "manager"); + } + /** * Allows property configuration to block manager start-up waiting for a minimum number of * tservers to register in zookeeper. It also accepts a maximum time to wait - if the time @@ -1208,19 +1317,19 @@ boolean canSuspendTablets() { * * @throws InterruptedException if interrupted while blocking, propagated for caller to handle. */ - private void blockForTservers() throws InterruptedException { - long waitStart = System.nanoTime(); + private void blockForServers(Property minServerCountProperty, Property minServerWaitProperty, + Supplier numServers, String serverType) throws InterruptedException { - long minTserverCount = - getConfiguration().getCount(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT); + long waitStart = System.nanoTime(); - if (minTserverCount <= 0) { - log.info("tserver availability check disabled, continuing with-{} servers. To enable, set {}", - tserverSet.size(), Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey()); + long minServerCount = getConfiguration().getCount(minServerCountProperty); + if (minServerCount <= 0) { + log.info("{} availability check disabled, continuing with-{} servers. To enable, set {}", + serverType, numServers.get(), minServerCountProperty.getKey()); return; } - long userWait = MILLISECONDS.toSeconds( - getConfiguration().getTimeInMillis(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT)); + long userWait = + MILLISECONDS.toSeconds(getConfiguration().getTimeInMillis(minServerWaitProperty)); // Setting retry values for defined wait timeouts long retries = 10; @@ -1230,8 +1339,8 @@ private void blockForTservers() throws InterruptedException { long waitIncrement = 0; if (userWait <= 0) { - log.info("tserver availability check set to block indefinitely, To change, set {} > 0.", - Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT.getKey()); + log.info("{} availability check set to block indefinitely, To change, set {} > 0.", + serverType, minServerWaitProperty.getKey()); userWait = Long.MAX_VALUE; // If indefinitely blocking, change retry values to support incremental backoff and logging. @@ -1241,65 +1350,80 @@ private void blockForTservers() throws InterruptedException { waitIncrement = 5; } - Retry tserverRetry = Retry.builder().maxRetries(retries).retryAfter(initialWait, SECONDS) + Retry serverRetry = Retry.builder().maxRetries(retries).retryAfter(initialWait, SECONDS) .incrementBy(waitIncrement, SECONDS).maxWait(maxWaitPeriod, SECONDS).backOffFactor(1) .logInterval(30, SECONDS).createRetry(); - log.info("Checking for tserver availability - need to reach {} servers. Have {}", - minTserverCount, tserverSet.size()); + log.info("Checking for {} availability - need to reach {} servers. Have {}", serverType, + minServerCount, numServers.get()); - boolean needTservers = tserverSet.size() < minTserverCount; + boolean needServers = numServers.get() < minServerCount; - while (needTservers && tserverRetry.canRetry()) { + while (needServers && serverRetry.canRetry()) { - tserverRetry.waitForNextAttempt(log, "block until minimum tservers reached"); + serverRetry.waitForNextAttempt(log, "block until minimum " + serverType + "s reached"); - needTservers = tserverSet.size() < minTserverCount; + needServers = numServers.get() < minServerCount; // suppress last message once threshold reached. - if (needTservers) { - tserverRetry.logRetry(log, String.format( - "Blocking for tserver availability - need to reach %s servers. Have %s Time spent blocking %s seconds.", - minTserverCount, tserverSet.size(), + if (needServers) { + serverRetry.logRetry(log, String.format( + "Blocking for %s availability - need to reach %s servers. Have %s Time spent blocking %s seconds.", + serverType, minServerCount, numServers.get(), NANOSECONDS.toSeconds(System.nanoTime() - waitStart))); } - tserverRetry.useRetry(); + serverRetry.useRetry(); } - if (tserverSet.size() < minTserverCount) { + if (numServers.get() < minServerCount) { log.warn( - "tserver availability check time expired - continuing. Requested {}, have {} tservers on line. " + "{} availability check time expired - continuing. Requested {}, have {} servers on line. " + " Time waiting {} sec", - tserverSet.size(), minTserverCount, NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); + serverType, numServers.get(), minServerCount, + NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); } else { log.info( - "tserver availability check completed. Requested {}, have {} tservers on line. " + "{} availability check completed. Requested {}, have {} servers on line. " + " Time waiting {} sec", - tserverSet.size(), minTserverCount, NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); + serverType, numServers.get(), minServerCount, + NANOSECONDS.toSeconds(System.nanoTime() - waitStart)); } } private long remaining(long deadline) { return Math.max(1, deadline - System.currentTimeMillis()); } - + public ServiceLock getManagerLock() { - return managerLock; + return managerServerLock; } - private static class ManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { + public ServiceLock getPrimaryManagerLock() { + return primaryManagerLock; + } + + private static class PrimaryManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { boolean acquiredLock = false; boolean failedToAcquireLock = false; + private final HostAndPort address; + private final ServiceLock mgrLock; + + public PrimaryManagerLockWatcher(HostAndPort address, ServiceLock mgrLock) { + this.address = address; + this.mgrLock = mgrLock; + } @Override public void lostLock(LockLossReason reason) { - Halt.halt("Manager lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + PRIMARY_MANAGER.set(false); + Halt.halt(-1, () -> log.error("Lost tablet server lock (reason = {}), exiting.", reason)); } @Override public void unableToMonitorLockNode(final Exception e) { + PRIMARY_MANAGER.set(false); // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility Halt.halt(-1, () -> log.error("FATAL: No longer able to monitor manager lock node", e)); @@ -1307,19 +1431,22 @@ public void unableToMonitorLockNode(final Exception e) { @Override public synchronized void acquiredLock() { - log.debug("Acquired manager lock"); + log.info("Acquired primary manager lock"); if (acquiredLock || failedToAcquireLock) { Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); } acquiredLock = true; + PRIMARY_MANAGER.set(true); + updateLockContent(address, mgrLock); notifyAll(); } @Override public synchronized void failedToAcquireLock(Exception e) { - log.warn("Failed to get manager lock", e); + PRIMARY_MANAGER.set(false); + log.warn("Failed to get primary manager lock", e); if (e instanceof NoAuthException) { String msg = "Failed to acquire manager lock due to incorrect ZooKeeper authentication."; @@ -1336,53 +1463,20 @@ public synchronized void failedToAcquireLock(Exception e) { notifyAll(); } - public synchronized void waitForChange() { - while (!acquiredLock && !failedToAcquireLock) { - try { - wait(); - } catch (InterruptedException e) {} - } - } } - private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) + private void getPrimaryManagerLock(final ServiceLockPath zManagerLoc, final HostAndPort address) throws KeeperException, InterruptedException { var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); - log.info("trying to get manager lock"); - - final String managerClientAddress = - getHostname() + ":" + getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0]; - - UUID zooLockUUID = UUID.randomUUID(); - - ServiceDescriptors descriptors = new ServiceDescriptors(); - descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.MANAGER, - managerClientAddress, this.getResourceGroup())); - - ServiceLockData sld = new ServiceLockData(descriptors); - while (true) { + log.info("trying to get primary manager lock"); - ManagerLockWatcher managerLockWatcher = new ManagerLockWatcher(); - managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); - managerLock.lock(managerLockWatcher, sld); + ServiceLockData sld = + new ServiceLockData(MANAGER_SERVER_UUID, address.toString(), ThriftService.FATE, Constants.DEFAULT_RESOURCE_GROUP_NAME); - managerLockWatcher.waitForChange(); - - if (managerLockWatcher.acquiredLock) { - break; - } - - if (!managerLockWatcher.failedToAcquireLock) { - throw new IllegalStateException("manager lock in unknown state"); - } - - managerLock.tryToCancelAsyncLockOrUnlock(); - - sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS); - } - - setManagerState(ManagerState.HAVE_LOCK); - return sld; + primaryManagerLock = new ServiceLock(zooKeeper, zManagerLoc, MANAGER_SERVER_UUID); + PrimaryManagerLockWatcher managerLockWatcher = + new PrimaryManagerLockWatcher(address, primaryManagerLock); + primaryManagerLock.lock(managerLockWatcher, sld); } @Override @@ -1680,4 +1774,7 @@ public void registerMetrics(MeterRegistry registry) { } + HostAndPort getManagerClientAddress() { + return clientServiceAddress; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 97b9a07682f..d24fe1a2634 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -169,7 +169,7 @@ public void waitForFlush(TInfo tinfo, TCredentials c, String tableIdStr, ByteBuf try { final TServerConnection server = manager.tserverSet.getConnection(instance); if (server != null) { - server.flush(manager.managerLock, tableId, ByteBufferUtil.toBytes(startRowBB), + server.flush(manager.primaryManagerLock, tableId, ByteBufferUtil.toBytes(startRowBB), ByteBufferUtil.toBytes(endRowBB)); } } catch (TException ex) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/MultipleManagerUtil.java b/server/manager/src/main/java/org/apache/accumulo/manager/MultipleManagerUtil.java new file mode 100644 index 00000000000..72f09fd91ec --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/MultipleManagerUtil.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.ServerContext; + +public class MultipleManagerUtil { + + /** + * Each Manager will be responsible for a range(s) of metadata tablets, but we don't want to split + * up a table's metadata tablets between managers as it will throw off the tablet balancing. If + * there are three managers, then we want to split up the metadata tablets roughly into thirds and + * have each manager responsible for one third, for example. + * + * @param context server context + * @param tables set of table ids + * @param numManagers number of managers + * @return list of num manager size, each element containing a set of tables for the manager to + * manage + */ + public static List> getTablesForManagers(ServerContext context, Set tables, + int numManagers) { + + if (numManagers == 0) { + throw new IllegalStateException("No managers, one or more expected"); + } + + if (numManagers == 1) { + return List.of(tables); + } + + SortedSet> tableTabletCounts = new TreeSet<>(new Comparator<>() { + @Override + public int compare(Pair table1, Pair table2) { + // sort descending by number of tablets + int result = table1.getSecond().compareTo(table2.getSecond()); + if (result == 0) { + return table1.getFirst().compareTo(table2.getFirst()); + } + return -1 * result; + } + }); + tables.forEach(tid -> { + long count = context.getAmple().readTablets().forTable(tid).build().stream().count(); + tableTabletCounts.add(new Pair<>(tid, count)); + }); + List>> buckets = new ArrayList<>(numManagers); + IntStream.range(0, numManagers).forEach(i -> buckets.add(new HashSet<>())); + + for (Pair tableTabletCount : tableTabletCounts) { + // Find the bucket with the lowest count and add this to it + int lowestBucket = -1; + int bucketIdx = 0; + Long priorBucketCount = Long.MAX_VALUE; + for (Set> bucket : buckets) { + long bucketTabletCount = bucket.stream().mapToLong(Pair::getSecond).sum(); + if (bucketTabletCount < priorBucketCount) { + lowestBucket = bucketIdx; + } + bucketIdx++; + priorBucketCount = bucketTabletCount; + } + buckets.get(lowestBucket).add(tableTabletCount); + } + + List> results = new ArrayList<>(); + buckets.stream() + .forEach(b -> results.add(b.stream().map(Pair::getFirst).collect(Collectors.toSet()))); + return results; + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index c93d2eebc62..1c7b837a8f7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -32,15 +33,19 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; @@ -65,6 +70,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -102,7 +108,8 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterators; -abstract class TabletGroupWatcher extends AccumuloDaemonThread { +abstract class TabletGroupWatcher extends AccumuloDaemonThread implements LiveManagerSet.Listener { + // Constants used to make sure assignment logging isn't excessive in quantity or size public static class BadLocationStateException extends Exception { private static final long serialVersionUID = 2L; @@ -131,8 +138,15 @@ public Text getEncodedEndRow() { private WalStateManager walStateManager; private volatile Set filteredServersToShutdown = Set.of(); + private final Supplier enabled; + private LiveManagerSet liveManagers; + /* current set of managers */ + private final SortedSet managers = new TreeSet<>(); + /* updates set by LiveManagerSet callback */ + private final AtomicReference> managerUpdates = new AtomicReference<>(); + TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher, - ManagerMetrics metrics) { + ManagerMetrics metrics, Supplier enabled) { super("Watching " + store.name()); this.manager = manager; this.store = store; @@ -141,6 +155,7 @@ public Text getEncodedEndRow() { this.walStateManager = new WalStateManager(manager.getContext()); this.eventHandler = new EventHandler(); manager.getEventCoordinator().addListener(store.getLevel(), eventHandler); + this.enabled = enabled; } /** Should this {@code TabletGroupWatcher} suspend tablets? */ @@ -221,12 +236,19 @@ class EventHandler implements EventCoordinator.Listener { private boolean needsFullScan = true; private final BlockingQueue rangesToProcess; + AtomicReference> tablesFromLastRun = new AtomicReference<>(null); class RangeProccessor implements Runnable { @Override public void run() { try { while (manager.stillManager()) { + + // spin if not enabled + while (!enabled.get()) { + Thread.onSpinWait(); + } + var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); if (range == null) { // check to see if still the manager @@ -488,6 +510,19 @@ private TableMgmtStats manageTablets(Iterator iter, if (manager.getSplitter().addSplitStarting(tm.getExtent())) { LOG.debug("submitting tablet {} for split", tm.getExtent()); manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); + +// DLMARI2: Figure out where this goes now + // Override the set of table ranges that this manager will manage +// if (store.getLevel() == DataLevel.USER) { +// Optional> ranges = calculateRangesForMultipleManagers( +// tablesFromLastRun.getAndSet(manager.getContext().getTableIdToNameMap())); +// // If ranges is empty, then the tables and managers did not change from the +// // previous call, we will use the same ranges. +// if (ranges.isPresent()) { +// store.overrideRanges(ranges.get()); +// } +// } + } } else { LOG.debug("{} is not splittable.", tm.getExtent()); @@ -564,7 +599,7 @@ private TableMgmtStats manageTablets(Iterator iter, if (client != null) { LOG.debug("Requesting tserver {} unload tablet {}", location.getServerInstance(), tm.getExtent()); - client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), + client.unloadTablet(manager.getManagerLock(), tm.getExtent(), goal.howUnload(), manager.getSteadyTime()); tableMgmtStats.totalUnloaded++; unloaded++; @@ -698,6 +733,70 @@ private void unassignDeadTablet(TabletLists tLists, TabletMetadata tm) throws Wa } } + /** + * Calculates the set of table ranges that this manager will be reponsible for. This method will + * return an empty Optional when the set of ranges does not change. When the set of ranges + * changes, then that set will be returned. + */ + private Optional> + calculateRangesForMultipleManagers(Map tablesFromLastRun) { + if (liveManagers == null) { + this.liveManagers = new LiveManagerSet(this.manager.getContext(), this); + this.liveManagers.startListeningForManagerServerChanges(); + } + // If currentManagers is empty, wait for managers + while (managerUpdates.get().isEmpty()) { + Thread.onSpinWait(); + } + + boolean sameTables = (tablesFromLastRun == null) ? false + : tablesFromLastRun.equals(manager.getContext().getTableIdToNameMap()); + boolean sameManagers = managers.equals(managerUpdates.get()); + + if (sameTables && sameManagers) { + return Optional.empty(); + } + + if (!sameManagers) { + managers.clear(); + managers.addAll(managerUpdates.get()); + } + + if (managers.size() == 1) { + return Optional.of(Collections.singletonList(TabletsSection.getRange())); + } + + // Calculate range of metadata table that this manager + // will be responsible for. We have the current set of managers + // in the managers set and we have this managers address + // to know what number it is in the set. + String thisManagerAddress = manager.getManagerClientAddress().toString(); + int pos = 0; + for (String managerLocation : managers) { + if (thisManagerAddress.equals(managerLocation)) { + break; + } + pos++; + } + if (pos > managers.size()) { + // we didn't find our manager in the set of manager addresses + throw new RuntimeException("This manager location: " + thisManagerAddress + + " not found in set of managers: " + managers.toString()); + } + + List> managerTables = MultipleManagerUtil.getTablesForManagers( + manager.getContext(), manager.getContext().getTableIdToNameMap().keySet(), managers.size()); + Set myTables = managerTables.get(pos); + final List rangesForTables = new ArrayList<>(); + myTables.stream().forEach(tid -> rangesForTables.add(TabletsSection.getRange(tid))); + return Optional.of(rangesForTables); + } + + @Override + public void managersChanged(Collection addresses) { + managerUpdates.set(new TreeSet<>(addresses)); + } + private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet, UnassignedTablet unassignedTablet) { // maybe it's a finishing migration @@ -922,7 +1021,7 @@ private void flushChanges(TabletLists tLists) for (Assignment a : tLists.assignments) { TServerConnection client = manager.tserverSet.getConnection(a.server); if (client != null) { - client.assignTablet(manager.managerLock, a.tablet); + client.assignTablet(manager.primaryManagerLock, a.tablet); } else { Manager.log.warn("Could not connect to server {}", a.server); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java index bd3a836e27d..941de7525cd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java @@ -46,7 +46,7 @@ public Repo call(long tid, Manager environment) throws Exception { // need to clear out any metadata entries for tableId just in case this // died before and is executing again MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment.getContext(), - environment.getManagerLock()); + environment.getPrimaryManagerLock()); MetadataTableUtil.cloneTable(environment.getContext(), cloneInfo.srcTableId, cloneInfo.tableId); return new FinishCloneTable(cloneInfo); } @@ -54,7 +54,7 @@ public Repo call(long tid, Manager environment) throws Exception { @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment.getContext(), - environment.getManagerLock()); + environment.getPrimaryManagerLock()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index 1bc229b9f3d..d8f1fbb24e4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -102,7 +102,7 @@ private void writeSplitsToMetadataTable(ServerContext context, SortedSet s @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(tableInfo.getTableId(), false, environment.getContext(), - environment.getManagerLock()); + environment.getPrimaryManagerLock()); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java index f0c000d93fb..95119c89c41 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java @@ -207,6 +207,6 @@ public Repo call(long tid, Manager manager) throws Exception { @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment.getContext(), - environment.getManagerLock()); + environment.getPrimaryManagerLock()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java index eb7ab83904f..b3be8607b19 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java @@ -68,7 +68,7 @@ public long isReady(long tid, Manager manager) { TabletServerStatus status = connection.getTableMap(false); if (status.tableMap != null && status.tableMap.isEmpty()) { log.info("tablet server hosts no tablets {}", server); - connection.halt(manager.getManagerLock()); + connection.halt(manager.getPrimaryManagerLock()); log.info("tablet server asked to halt {}", server); return 0; } else { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/MultipleManagerUtilTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/MultipleManagerUtilTest.java new file mode 100644 index 00000000000..ccb6f3ff64d --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/MultipleManagerUtilTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableRangeOptions; +import org.apache.accumulo.server.ServerContext; +import org.junit.jupiter.api.Test; + +public class MultipleManagerUtilTest { + + @Test + public void testNoManager() { + assertThrows(IllegalStateException.class, () -> { + ServerContext ctx = createMock(ServerContext.class); + Set tables = Set.of(TableId.of("1"), TableId.of("2"), TableId.of("3")); + MultipleManagerUtil.getTablesForManagers(ctx, tables, 0); + }); + } + + @Test + public void testSingleManager() { + ServerContext ctx = createMock(ServerContext.class); + Set tables = Set.of(TableId.of("1"), TableId.of("2"), TableId.of("3")); + List> result = MultipleManagerUtil.getTablesForManagers(ctx, tables, 1); + assertEquals(1, result.size()); + assertEquals(tables, result.get(0)); + } + + @Test + public void testTwoManagers() { + TableId t1 = TableId.of("1"); + int t1TabletCount = 4; + + TableId t2 = TableId.of("2"); + int t2TabletCount = 34; + + TableId t3 = TableId.of("3"); + int t3TabletCount = 14; + + ServerContext ctx = createMock(ServerContext.class); + Ample ample = createMock(Ample.class); + TableOptions to = createMock(TableOptions.class); + TableRangeOptions tro1 = createMock(TableRangeOptions.class); + TableRangeOptions tro2 = createMock(TableRangeOptions.class); + TableRangeOptions tro3 = createMock(TableRangeOptions.class); + TabletsMetadata tm1 = createMock(TabletsMetadata.class); + TabletsMetadata tm2 = createMock(TabletsMetadata.class); + TabletsMetadata tm3 = createMock(TabletsMetadata.class); + + expect(ctx.getAmple()).andReturn(ample).times(3); + expect(ample.readTablets()).andReturn(to).anyTimes(); + expect(to.forTable(t1)).andReturn(tro1); + expect(to.forTable(t2)).andReturn(tro2); + expect(to.forTable(t3)).andReturn(tro3); + expect(tro1.build()).andReturn(tm1); + expect(tro2.build()).andReturn(tm2); + expect(tro3.build()).andReturn(tm3); + expect(tm1.stream()).andReturn(createTabletMetadataList(t1TabletCount).stream()); + expect(tm2.stream()).andReturn(createTabletMetadataList(t2TabletCount).stream()); + expect(tm3.stream()).andReturn(createTabletMetadataList(t3TabletCount).stream()); + + replay(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + Set tables = Set.of(t1, t2, t3); + List> result = MultipleManagerUtil.getTablesForManagers(ctx, tables, 2); + verify(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + assertEquals(2, result.size()); + Set firstManager = result.get(0); + assertEquals(1, firstManager.size()); + assertTrue(firstManager.contains(t2)); + + Set secondManager = result.get(1); + assertEquals(2, secondManager.size()); + assertTrue(secondManager.contains(t1)); + assertTrue(secondManager.contains(t3)); + + } + + @Test + public void testThreeManagers() { + TableId t1 = TableId.of("1"); + int t1TabletCount = 4; + + TableId t2 = TableId.of("2"); + int t2TabletCount = 34; + + TableId t3 = TableId.of("3"); + int t3TabletCount = 14; + + ServerContext ctx = createMock(ServerContext.class); + Ample ample = createMock(Ample.class); + TableOptions to = createMock(TableOptions.class); + TableRangeOptions tro1 = createMock(TableRangeOptions.class); + TableRangeOptions tro2 = createMock(TableRangeOptions.class); + TableRangeOptions tro3 = createMock(TableRangeOptions.class); + TabletsMetadata tm1 = createMock(TabletsMetadata.class); + TabletsMetadata tm2 = createMock(TabletsMetadata.class); + TabletsMetadata tm3 = createMock(TabletsMetadata.class); + + expect(ctx.getAmple()).andReturn(ample).times(3); + expect(ample.readTablets()).andReturn(to).anyTimes(); + expect(to.forTable(t1)).andReturn(tro1); + expect(to.forTable(t2)).andReturn(tro2); + expect(to.forTable(t3)).andReturn(tro3); + expect(tro1.build()).andReturn(tm1); + expect(tro2.build()).andReturn(tm2); + expect(tro3.build()).andReturn(tm3); + expect(tm1.stream()).andReturn(createTabletMetadataList(t1TabletCount).stream()); + expect(tm2.stream()).andReturn(createTabletMetadataList(t2TabletCount).stream()); + expect(tm3.stream()).andReturn(createTabletMetadataList(t3TabletCount).stream()); + + replay(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + Set tables = Set.of(t1, t2, t3); + List> result = MultipleManagerUtil.getTablesForManagers(ctx, tables, 3); + verify(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + assertEquals(3, result.size()); + Set firstManager = result.get(0); + assertEquals(1, firstManager.size()); + assertTrue(firstManager.contains(t2)); + + Set secondManager = result.get(1); + assertEquals(1, secondManager.size()); + assertTrue(secondManager.contains(t3)); + + Set thirdManager = result.get(2); + assertEquals(1, thirdManager.size()); + assertTrue(thirdManager.contains(t1)); + + } + + @Test + public void testFourManagers() { + TableId t1 = TableId.of("1"); + int t1TabletCount = 4; + + TableId t2 = TableId.of("2"); + int t2TabletCount = 34; + + TableId t3 = TableId.of("3"); + int t3TabletCount = 14; + + ServerContext ctx = createMock(ServerContext.class); + Ample ample = createMock(Ample.class); + TableOptions to = createMock(TableOptions.class); + TableRangeOptions tro1 = createMock(TableRangeOptions.class); + TableRangeOptions tro2 = createMock(TableRangeOptions.class); + TableRangeOptions tro3 = createMock(TableRangeOptions.class); + TabletsMetadata tm1 = createMock(TabletsMetadata.class); + TabletsMetadata tm2 = createMock(TabletsMetadata.class); + TabletsMetadata tm3 = createMock(TabletsMetadata.class); + + expect(ctx.getAmple()).andReturn(ample).times(3); + expect(ample.readTablets()).andReturn(to).anyTimes(); + expect(to.forTable(t1)).andReturn(tro1); + expect(to.forTable(t2)).andReturn(tro2); + expect(to.forTable(t3)).andReturn(tro3); + expect(tro1.build()).andReturn(tm1); + expect(tro2.build()).andReturn(tm2); + expect(tro3.build()).andReturn(tm3); + expect(tm1.stream()).andReturn(createTabletMetadataList(t1TabletCount).stream()); + expect(tm2.stream()).andReturn(createTabletMetadataList(t2TabletCount).stream()); + expect(tm3.stream()).andReturn(createTabletMetadataList(t3TabletCount).stream()); + + replay(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + Set tables = Set.of(t1, t2, t3); + List> result = MultipleManagerUtil.getTablesForManagers(ctx, tables, 4); + verify(ctx, ample, to, tro1, tro2, tro3, tm1, tm2, tm3); + + assertEquals(4, result.size()); + Set firstManager = result.get(0); + assertEquals(1, firstManager.size()); + assertTrue(firstManager.contains(t2)); + + Set secondManager = result.get(1); + assertEquals(1, secondManager.size()); + assertTrue(secondManager.contains(t3)); + + Set thirdManager = result.get(2); + assertEquals(1, thirdManager.size()); + assertTrue(thirdManager.contains(t1)); + + Set fourthManager = result.get(3); + assertEquals(0, fourthManager.size()); + + } + + private static final List createTabletMetadataList(int numEntries) { + List results = new ArrayList<>(); + IntStream.range(0, numEntries).forEach(i -> results.add(new TabletMetadata())); + return results; + + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java index 9aff9a89faa..401345844a2 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java @@ -80,7 +80,7 @@ public void testSingleShutdown() throws Exception { EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); - EasyMock.expect(manager.getManagerLock()).andReturn(null); + EasyMock.expect(manager.getPrimaryManagerLock()).andReturn(null); tserverCnxn.halt(null); EasyMock.expectLastCall().once(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 34a24c7fb0b..6ede808d265 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -420,11 +420,11 @@ private HostAndPort startServer(AccumuloConfiguration conf, String address, TPro private HostAndPort getManagerAddress() { try { - List locations = getContext().getManagerLocations(); - if (locations.isEmpty()) { + String location = getContext().getPrimaryManagerLocation(); + if (location == null || location.isEmpty()) { return null; } - return HostAndPort.fromString(locations.get(0)); + return HostAndPort.fromString(location); } catch (Exception e) { log.warn("Failed to obtain manager host " + e); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BackupManagerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BackupManagerIT.java deleted file mode 100644 index 43269021340..00000000000 --- a/test/src/main/java/org/apache/accumulo/test/functional/BackupManagerIT.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test.functional; - -import java.time.Duration; -import java.util.List; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.manager.Manager; -import org.junit.jupiter.api.Test; - -public class BackupManagerIT extends ConfigurableMacBase { - - @Override - protected Duration defaultTimeout() { - return Duration.ofMinutes(2); - } - - @Test - public void test() throws Exception { - // wait for manager - UtilWaitThread.sleep(1000); - // create a backup - Process backup = exec(Manager.class); - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - ZooReaderWriter writer = getCluster().getServerContext().getZooReaderWriter(); - String root = "/accumulo/" + client.instanceOperations().getInstanceId(); - List children; - // wait for 2 lock entries - do { - UtilWaitThread.sleep(100); - var path = ServiceLock.path(root + Constants.ZMANAGER_LOCK); - children = ServiceLock.validateAndSort(path, writer.getChildren(path.toString())); - } while (children.size() != 2); - // wait for the backup manager to learn to be the backup - UtilWaitThread.sleep(1000); - // generate a false zookeeper event - String lockPath = root + Constants.ZMANAGER_LOCK + "/" + children.get(0); - byte[] data = writer.getData(lockPath); - writer.getZooKeeper().setData(lockPath, data, -1); - // let it propagate - UtilWaitThread.sleep(500); - // kill the manager by removing its lock - writer.recursiveDelete(lockPath, NodeMissingPolicy.FAIL); - // ensure the backup becomes the manager - client.tableOperations().create(getUniqueNames(1)[0]); - } finally { - backup.destroy(); - } - } - -} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerIT.java new file mode 100644 index 00000000000..1bca046f595 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.IntStream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class MultipleManagerIT extends ConfigurableMacBase { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(2); + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.MANAGER_PORTSEARCH, "true"); + cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, "2"); + cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "30s"); + cfg.getClusterServerConfiguration().setNumManagers(2); + } + + @Test + public void testMultipleManagers() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + List managers = client.instanceOperations().getManagerLocations(); + assertEquals(2, managers.size()); + ClientContext cctx = (ClientContext) client; + String primaryManager = cctx.getPrimaryManagerLocation(); + assertNotNull(primaryManager); + assertTrue(managers.contains(primaryManager)); + + client.tableOperations().create("t1"); + SortedSet splits = new TreeSet(); + IntStream.range(97, 122).forEach((i) -> splits.add(new Text(String.valueOf((char) i)))); + client.tableOperations().addSplits("t1", splits); + ThriftClientTypes.MANAGER.executeVoid(cctx, c -> c.waitForBalance(TraceUtil.traceInfo())); + + client.tableOperations().create("t2"); + client.tableOperations().addSplits("t2", splits); + ThriftClientTypes.MANAGER.executeVoid(cctx, c -> c.waitForBalance(TraceUtil.traceInfo())); + } + } + + public static List getTabletStats(AccumuloClient c, String tableId) + throws AccumuloException, AccumuloSecurityException { + return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client -> client + .getTabletStats(TraceUtil.traceInfo(), ((ClientContext) c).rpcCreds(), tableId)); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerLockIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerLockIT.java new file mode 100644 index 00000000000..b56f81db3ea --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/MultipleManagerLockIT.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.manager.LiveManagerSet; +import org.apache.accumulo.manager.Manager; +import org.junit.jupiter.api.Test; + +public class MultipleManagerLockIT extends ConfigurableMacBase implements LiveManagerSet.Listener { + + private final AtomicInteger changeCount = new AtomicInteger(0); + private Collection zooKeeperManagerAddrs = null; + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(2); + } + + @Override + public void managersChanged(Collection addresses) { + System.out.println("callback called: " + addresses); + changeCount.incrementAndGet(); + zooKeeperManagerAddrs = addresses; + } + + @Test + public void test() throws Exception { + + Process secondManager = null; + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + final ZooReaderWriter writer = getCluster().getServerContext().getZooReaderWriter(); + final String root = "/accumulo/" + client.instanceOperations().getInstanceId(); + final LiveManagerSet live = new LiveManagerSet(getServerContext(), this); + live.startListeningForManagerServerChanges(); + + // wait for manager + while (changeCount.get() == 0) { + Thread.sleep(100); + } + int changes = changeCount.get(); + assertEquals(1, changes); + + // create a second manager + secondManager = exec(Manager.class); + + List managers = ((ClientContext) client).getManagerLocations(); + while (managers.size() != 2) { + UtilWaitThread.sleep(100); + managers = ((ClientContext) client).getManagerLocations(); + } + System.out.println("managers: " + managers.toString()); + + String primary = ((ClientContext) client).getPrimaryManagerLocation(); + System.out.println("primary: " + primary); + assertTrue(managers.contains(primary)); + assertEquals(changes + 1, changeCount.get()); + assertEquals(managers, zooKeeperManagerAddrs); + changes = changeCount.get(); + + // Remove the primary manager lock manually. This will cause the + // primary manager to shutdown and the other manager should become + // the primary. + List childLocks = writer.getChildren(root + Constants.ZMANAGER_LOCK); + assertEquals(2, childLocks.size()); + childLocks = + ServiceLock.validateAndSort(ServiceLock.path(root + Constants.ZMANAGER_LOCK), childLocks); + System.out.println("primary lock children: " + childLocks.toString()); + String currentLock = childLocks.get(0); + System.out.println("current primary lock is: " + currentLock); + String nextLock = childLocks.get(1); + System.out.println("deleting current manager lock, next primary lock should be: " + nextLock); + writer.recursiveDelete(root + Constants.ZMANAGERS + "/" + primary, NodeMissingPolicy.FAIL); + + // wait for primary to change + String nextPrimary = ((ClientContext) client).getPrimaryManagerLocation(); + while (nextPrimary == null || primary.equals(nextPrimary)) { + Thread.sleep(100); + nextPrimary = ((ClientContext) client).getPrimaryManagerLocation(); + } + System.out.println("next primary manager is: " + nextPrimary); + + managers = ((ClientContext) client).getManagerLocations(); + while (managers.size() != 1) { + Thread.sleep(100); + managers = ((ClientContext) client).getManagerLocations(); + } + System.out.println("managers: " + managers.toString()); + + assertEquals(1, managers.size()); + assertTrue(managers.contains(nextPrimary)); + + childLocks = writer.getChildren(root + Constants.ZMANAGER_LOCK); + System.out.println("primary lock children: " + childLocks.toString()); + assertEquals(1, childLocks.size()); + + while (changeCount.get() != (changes + 1)) { + Thread.sleep(500); + } + assertEquals(managers, zooKeeperManagerAddrs); + changes = changeCount.get(); + + } finally { + if (secondManager != null) { + secondManager.destroy(); + } + } + } + +} From 8685620f6d7e80200b7aa0f2e92d3e592dbb431c Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 22 Nov 2023 16:56:57 +0000 Subject: [PATCH 2/5] wip --- .../core/clientImpl/ClientContext.java | 2 +- .../apache/accumulo/core/conf/Property.java | 4 +- .../ClusterServerConfiguration.java | 7 +-- .../MiniAccumuloClusterControl.java | 7 +-- .../state/LoggingTabletStateStore.java | 3 +- .../state/TabletManagementParameters.java | 14 ++++- .../manager/state/TabletStateStore.java | 6 ++- .../org/apache/accumulo/manager/Manager.java | 31 ++++++----- .../accumulo/manager/TabletGroupWatcher.java | 51 ++++++++----------- .../tableOps/create/PopulateMetadata.java | 2 +- .../TabletManagementIteratorIT.java | 3 +- 11 files changed, 68 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 712aa912f32..2243d536bde 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -563,7 +563,7 @@ public List getManagerLocations() { .path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS + "/" + manager); Optional sld = zooCache.getLockData(zLocPath); if (sld.isPresent()) { - locations.add(sld.get().getAddressString(ThriftService.MANAGER)); + locations.add(sld.orElseThrow().getAddressString(ThriftService.MANAGER)); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 824f680fe8a..d03379e0fe9 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -318,7 +318,7 @@ public enum Property { MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), MANAGER_PORTSEARCH("manager.port.search", "false", PropertyType.BOOLEAN, - "If the manager.port.client is in use, search higher ports until one is available", "3.1.0"), + "If the manager.port.client is in use, search higher ports until one is available.", "3.1.0"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -377,7 +377,7 @@ public enum Property { PropertyType.COUNT, "Minimum number of managers that need to be registered before a manager will start. A value " + "greater than 0 is useful when multiple managers are supposed to be running on startup. " - + "When set to 0 or less, no blocking occurs. Default is 0 (disabled)", + + "When set to 0 or less, no blocking occurs. Default is 0 (disabled).", "3.1.0"), MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait", "0", PropertyType.TIMEDURATION, diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java index fe74b586861..370304149e2 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java @@ -47,7 +47,8 @@ public ClusterServerConfiguration() { * @param numSServers number of scan servers in the default resource group * @param numTServers number of tablet servers in the default resource group */ - public ClusterServerConfiguration(int numManagers, int numCompactors, int numSServers, int numTServers) { + public ClusterServerConfiguration(int numManagers, int numCompactors, int numSServers, + int numTServers) { this.numManagers = numManagers; compactors = new HashMap<>(); compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); @@ -60,11 +61,11 @@ public ClusterServerConfiguration(int numManagers, int numCompactors, int numSSe public int getNumManagers() { return this.numManagers; } - + public void setNumManagers(int num) { this.numManagers = num; } - + public void setNumDefaultCompactors(int numCompactors) { compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors); } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 8ce12c1ada2..4fab32d5054 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -192,9 +192,10 @@ public synchronized void start(ServerType server, Map configOverr synchronized (managerProcesses) { int count = 0; for (int i = managerProcesses.size(); - count < limit && i < cluster.getConfig().getClusterServerConfiguration().getNumManagers(); i++, ++count) { - managerProcesses - .add(cluster._exec(classToUse, server, configOverrides).getProcess()); + count < limit + && i < cluster.getConfig().getClusterServerConfiguration().getNumManagers(); + i++, ++count) { + managerProcesses.add(cluster._exec(classToUse, server, configOverrides).getProcess()); } } break; diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index 47c6f4d861f..f1642e7eb47 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@ -59,14 +59,13 @@ public ClosableIterator iterator(List ranges, TabletManagementParameters parameters) { return wrapped.iterator(ranges, parameters); } - + @Override @Deprecated public void overrideRanges(List ranges) { wrapped.overrideRanges(ranges); } - @Override public void setFutureLocations(Collection assignments) throws DistributedStoreException { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 4d183cd6c6f..82898e6cbeb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -33,10 +33,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.ManagerState; @@ -72,13 +74,15 @@ public class TabletManagementParameters { private final Set onlineTservers; private final boolean canSuspendTablets; private final List> volumeReplacements; + private final List rangeOverrides; public TabletManagementParameters(ManagerState managerState, Map parentUpgradeMap, Set onlineTables, LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, Set serversToShutdown, Map migrations, Ample.DataLevel level, Map> compactionHints, - boolean canSuspendTablets, List> volumeReplacements) { + boolean canSuspendTablets, List> volumeReplacements, + Optional> rangeOverrides) { this.managerState = managerState; this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); // TODO could filter by level @@ -100,6 +104,7 @@ public TabletManagementParameters(ManagerState managerState, }); this.canSuspendTablets = canSuspendTablets; this.volumeReplacements = volumeReplacements; + this.rangeOverrides = rangeOverrides.orElse(null); } private TabletManagementParameters(JsonData jdata) { @@ -126,6 +131,7 @@ private TabletManagementParameters(JsonData jdata) { }); this.canSuspendTablets = jdata.canSuspendTablets; this.volumeReplacements = jdata.volumeReplacements; + this.rangeOverrides = jdata.rangeOverrides; } public ManagerState getManagerState() { @@ -180,6 +186,10 @@ public List> getVolumeReplacements() { return volumeReplacements; } + public List getRangeOverrides() { + return rangeOverrides; + } + private static Map> makeImmutable(Map> compactionHints) { var copy = new HashMap>(); @@ -203,6 +213,7 @@ private static class JsonData { final boolean canSuspendTablets; final List> volumeReplacements; + final List rangeOverrides; private static String toString(KeyExtent extent) { DataOutputBuffer buffer = new DataOutputBuffer(); @@ -245,6 +256,7 @@ private static KeyExtent strToExtent(String kes) { compactionHints = params.compactionHints; canSuspendTablets = params.canSuspendTablets; volumeReplacements = params.volumeReplacements; + rangeOverrides = params.rangeOverrides; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index b10505aab30..b90e40a85b2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -59,7 +59,11 @@ ClosableIterator iterator(List ranges, * Scan the information about all tablets covered by this store.. */ default ClosableIterator iterator(TabletManagementParameters parameters) { - return iterator(List.of(MetadataSchema.TabletsSection.getRange()), parameters); + List ranges = List.of(MetadataSchema.TabletsSection.getRange()); + if (parameters.getRangeOverrides() != null) { + ranges = parameters.getRangeOverrides(); + } + return iterator(ranges, parameters); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 76ac0b2eb44..b280049e32b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -972,8 +972,8 @@ public void unableToMonitorLockNode(final Exception e) { ServiceDescriptors descriptors = new ServiceDescriptors(); // Insert the service with a fake address so that clients can't connect right now - descriptors.addService( - new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + descriptors.addService(new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, + address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); ServiceLockData sld = new ServiceLockData(descriptors); for (int i = 0; i < 120 / 5; i++) { zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP); @@ -995,11 +995,13 @@ public void unableToMonitorLockNode(final Exception e) { private static void updateLockContent(HostAndPort address, ServiceLock primaryManagerLock) { ServiceDescriptors descriptors = new ServiceDescriptors(); - descriptors.addService( - new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + descriptors.addService(new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.MANAGER, + address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); if (PRIMARY_MANAGER.get()) { - descriptors.addService( - new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.FATE, address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + descriptors.addService(new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.COORDINATOR, + address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); + descriptors.addService(new ServiceDescriptor(MANAGER_SERVER_UUID, ThriftService.FATE, + address.toString(), Constants.DEFAULT_RESOURCE_GROUP_NAME)); } ServiceLockData sld = new ServiceLockData(descriptors); log.info("Setting manager lock data to {}", sld.toString()); @@ -1034,7 +1036,8 @@ public void run() { FateService.Iface fateProxy = HighlyAvailableServiceWrapper.service(fateServiceHandler, this); ServerAddress sa; - var processor = ThriftProcessorTypes.getManagerTProcessor(fateProxy, compactionCoordinator, mgrProxy, getContext()); + var processor = ThriftProcessorTypes.getManagerTProcessor(fateProxy, compactionCoordinator, + mgrProxy, getContext()); try { sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, @@ -1135,7 +1138,7 @@ public void process(WatchedEvent event) { this.splitter = new Splitter(context); this.splitter.start(); - watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm, () -> true) { + watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm) { @Override boolean canSuspendTablets() { // Always allow user data tablets to enter suspended state. @@ -1143,7 +1146,7 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm, () -> PRIMARY_MANAGER.get()) { + watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm) { @Override boolean canSuspendTablets() { // Allow metadata tablets to enter suspended state only if so configured. Generally @@ -1154,7 +1157,7 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm, () -> PRIMARY_MANAGER.get()) { + watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm) { @Override boolean canSuspendTablets() { // Never allow root tablet to enter suspended state. @@ -1394,10 +1397,6 @@ private void blockForServers(Property minServerCountProperty, Property minServer private long remaining(long deadline) { return Math.max(1, deadline - System.currentTimeMillis()); } - - public ServiceLock getManagerLock() { - return managerServerLock; - } public ServiceLock getPrimaryManagerLock() { return primaryManagerLock; @@ -1470,8 +1469,8 @@ private void getPrimaryManagerLock(final ServiceLockPath zManagerLoc, final Host var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); log.info("trying to get primary manager lock"); - ServiceLockData sld = - new ServiceLockData(MANAGER_SERVER_UUID, address.toString(), ThriftService.FATE, Constants.DEFAULT_RESOURCE_GROUP_NAME); + ServiceLockData sld = new ServiceLockData(MANAGER_SERVER_UUID, address.toString(), + ThriftService.FATE, Constants.DEFAULT_RESOURCE_GROUP_NAME); primaryManagerLock = new ServiceLock(zooKeeper, zManagerLoc, MANAGER_SERVER_UUID); PrimaryManagerLockWatcher managerLockWatcher = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 1c7b837a8f7..2b73dfeb127 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; @@ -138,7 +137,6 @@ public Text getEncodedEndRow() { private WalStateManager walStateManager; private volatile Set filteredServersToShutdown = Set.of(); - private final Supplier enabled; private LiveManagerSet liveManagers; /* current set of managers */ private final SortedSet managers = new TreeSet<>(); @@ -146,7 +144,7 @@ public Text getEncodedEndRow() { private final AtomicReference> managerUpdates = new AtomicReference<>(); TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher, - ManagerMetrics metrics, Supplier enabled) { + ManagerMetrics metrics) { super("Watching " + store.name()); this.manager = manager; this.store = store; @@ -155,7 +153,6 @@ public Text getEncodedEndRow() { this.walStateManager = new WalStateManager(manager.getContext()); this.eventHandler = new EventHandler(); manager.getEventCoordinator().addListener(store.getLevel(), eventHandler); - this.enabled = enabled; } /** Should this {@code TabletGroupWatcher} suspend tablets? */ @@ -236,7 +233,6 @@ class EventHandler implements EventCoordinator.Listener { private boolean needsFullScan = true; private final BlockingQueue rangesToProcess; - AtomicReference> tablesFromLastRun = new AtomicReference<>(null); class RangeProccessor implements Runnable { @Override @@ -244,11 +240,6 @@ public void run() { try { while (manager.stillManager()) { - // spin if not enabled - while (!enabled.get()) { - Thread.onSpinWait(); - } - var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); if (range == null) { // check to see if still the manager @@ -266,7 +257,8 @@ public void run() { continue; } - TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); + TabletManagementParameters tabletMgmtParams = + createTabletManagementParameters(false, Optional.of(ranges)); var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); if (currentTservers.isEmpty()) { @@ -340,8 +332,8 @@ synchronized void waitForFullScan(long millis) { } } - private TabletManagementParameters - createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { + private TabletManagementParameters createTabletManagementParameters( + boolean lookForTabletsNeedingVolReplacement, Optional> ranges) { HashMap parentLevelUpgrade = new HashMap<>(); UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); @@ -365,7 +357,8 @@ synchronized void waitForFullScan(long millis) { manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), store.getLevel(), manager.getCompactionHints(), canSuspendTablets(), lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() - : List.of()); + : List.of(), + ranges); } private Set getFilteredServersToShutdown() { @@ -510,19 +503,6 @@ private TableMgmtStats manageTablets(Iterator iter, if (manager.getSplitter().addSplitStarting(tm.getExtent())) { LOG.debug("submitting tablet {} for split", tm.getExtent()); manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); - -// DLMARI2: Figure out where this goes now - // Override the set of table ranges that this manager will manage -// if (store.getLevel() == DataLevel.USER) { -// Optional> ranges = calculateRangesForMultipleManagers( -// tablesFromLastRun.getAndSet(manager.getContext().getTableIdToNameMap())); -// // If ranges is empty, then the tables and managers did not change from the -// // previous call, we will use the same ranges. -// if (ranges.isPresent()) { -// store.overrideRanges(ranges.get()); -// } -// } - } } else { LOG.debug("{} is not splittable.", tm.getExtent()); @@ -599,8 +579,8 @@ private TableMgmtStats manageTablets(Iterator iter, if (client != null) { LOG.debug("Requesting tserver {} unload tablet {}", location.getServerInstance(), tm.getExtent()); - client.unloadTablet(manager.getManagerLock(), tm.getExtent(), goal.howUnload(), - manager.getSteadyTime()); + client.unloadTablet(manager.getPrimaryManagerLock(), tm.getExtent(), + goal.howUnload(), manager.getSteadyTime()); tableMgmtStats.totalUnloaded++; unloaded++; } else { @@ -638,6 +618,7 @@ private TableMgmtStats manageTablets(Iterator iter, public void run() { int[] oldCounts = new int[TabletState.values().length]; boolean lookForTabletsNeedingVolReplacement = true; + AtomicReference> tablesFromLastRun = new AtomicReference<>(null); while (manager.stillManager()) { // slow things down a little, otherwise we spam the logs when there are many wake-up events @@ -648,8 +629,15 @@ public void run() { final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + // Override the set of table ranges that this manager will manage + Optional> ranges = Optional.empty(); + if (store.getLevel() == Ample.DataLevel.USER) { + ranges = calculateRangesForMultipleManagers( + tablesFromLastRun.getAndSet(manager.getContext().getTableIdToNameMap())); + } + TabletManagementParameters tableMgmtParams = - createTabletManagementParameters(lookForTabletsNeedingVolReplacement); + createTabletManagementParameters(lookForTabletsNeedingVolReplacement, ranges); var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); ClosableIterator iter = null; @@ -1043,7 +1031,8 @@ private void replaceVolumes(List volumeReplacemen vr.filesToRemove.forEach(tabletMutator::deleteFile); vr.filesToAdd.forEach(tabletMutator::putFile); - tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), manager.getManagerLock()); + tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), + manager.getPrimaryManagerLock()); tabletMutator.submit( tm -> tm.getLogs().containsAll(vr.logsToAdd) && tm.getFiles().containsAll(vr.filesToAdd diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index d8f1fbb24e4..d77f60ab501 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -69,7 +69,7 @@ public Repo call(long tid, Manager env) throws Exception { splitDirMap = Map.of(); } - writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getManagerLock()); + writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getPrimaryManagerLock()); return new FinishCreateTable(tableInfo); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 5839a95b04f..4b2764597d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -484,6 +485,6 @@ private static TabletManagementParameters createParameters(AccumuloClient client onlineTables, new LiveTServerSet.LiveTServersSnapshot(tservers, Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), - Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements, Optional.empty()); } } From a03cde78bc207c5bc5eb7e35ce3b813da5aa1670 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 22 Nov 2023 19:25:50 +0000 Subject: [PATCH 3/5] got tests working --- .../apache/accumulo/core/fate/zookeeper/ZooUtil.java | 6 +++++- .../accumulo/server/manager/LiveTServerSet.java | 2 +- .../java/org/apache/accumulo/manager/Manager.java | 12 ++++++------ .../manager/ManagerClientServiceHandler.java | 2 +- .../apache/accumulo/manager/TabletGroupWatcher.java | 9 ++++----- .../manager/tableOps/clone/CloneMetadata.java | 4 ++-- .../manager/tableOps/create/PopulateMetadata.java | 4 ++-- .../tableOps/tableImport/PopulateMetadataTable.java | 2 +- .../accumulo/manager/tserverOps/ShutdownTServer.java | 2 +- .../manager/tableOps/ShutdownTServerTest.java | 2 +- .../apache/accumulo/tserver/TabletClientHandler.java | 2 +- 11 files changed, 25 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 47d906fedb4..50badbf92ac 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -72,7 +72,11 @@ public LockID(String root, String serializedLID) { if (lastSlash == 0) { path = root; } else { - path = root + "/" + sa[0].substring(0, lastSlash); + path = root; + if (!sa[0].startsWith("/")) { + path += "/"; + } + path += sa[0].substring(0, lastSlash); } node = sa[0].substring(lastSlash + 1); eid = new BigInteger(sa[1], 16).longValue(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index eada6aa11ac..f982fc8dfbf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -87,7 +87,7 @@ public HostAndPort getAddress() { } private String lockString(ServiceLock mlock) { - return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK); + return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGERS); } private void loadTablet(TabletManagementClientService.Client client, ServiceLock lock, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b280049e32b..dd27b34fd5a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -212,7 +212,7 @@ public class Manager extends AbstractServer private AuthenticationTokenKeyManager authenticationTokenKeyManager; private ServiceLock managerServerLock = null; - ServiceLock primaryManagerLock = null; + private ServiceLock primaryManagerLock = null; private TServer clientService = null; private HostAndPort clientServiceAddress = null; protected volatile TabletBalancer tabletBalancer; @@ -690,7 +690,7 @@ public void run() { for (TServerInstance server : currentServers) { try { serversToShutdown.add(server); - tserverSet.getConnection(server).fastHalt(primaryManagerLock); + tserverSet.getConnection(server).fastHalt(managerServerLock); } catch (TException e) { // its probably down, and we don't care } finally { @@ -774,7 +774,7 @@ private void checkForHeldServer(SortedMap ts try { TServerConnection connection = tserverSet.getConnection(instance); if (connection != null) { - connection.fastHalt(primaryManagerLock); + connection.fastHalt(managerServerLock); } } catch (TException e) { log.error("{}", e.getMessage(), e); @@ -884,7 +884,7 @@ private SortedMap gatherTableInformation( try { TServerConnection connection2 = tserverSet.getConnection(server); if (connection2 != null) { - connection2.halt(primaryManagerLock); + connection2.halt(managerServerLock); } } catch (TTransportException e1) { // ignore: it's probably down @@ -1398,8 +1398,8 @@ private long remaining(long deadline) { return Math.max(1, deadline - System.currentTimeMillis()); } - public ServiceLock getPrimaryManagerLock() { - return primaryManagerLock; + public ServiceLock getManagerLock() { + return managerServerLock; } private static class PrimaryManagerLockWatcher implements ServiceLock.AccumuloLockWatcher { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index d24fe1a2634..351b2c6bc4e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -169,7 +169,7 @@ public void waitForFlush(TInfo tinfo, TCredentials c, String tableIdStr, ByteBuf try { final TServerConnection server = manager.tserverSet.getConnection(instance); if (server != null) { - server.flush(manager.primaryManagerLock, tableId, ByteBufferUtil.toBytes(startRowBB), + server.flush(manager.getManagerLock(), tableId, ByteBufferUtil.toBytes(startRowBB), ByteBufferUtil.toBytes(endRowBB)); } } catch (TException ex) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 2b73dfeb127..e67aa616516 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -579,8 +579,8 @@ private TableMgmtStats manageTablets(Iterator iter, if (client != null) { LOG.debug("Requesting tserver {} unload tablet {}", location.getServerInstance(), tm.getExtent()); - client.unloadTablet(manager.getPrimaryManagerLock(), tm.getExtent(), - goal.howUnload(), manager.getSteadyTime()); + client.unloadTablet(manager.getManagerLock(), tm.getExtent(), goal.howUnload(), + manager.getSteadyTime()); tableMgmtStats.totalUnloaded++; unloaded++; } else { @@ -1009,7 +1009,7 @@ private void flushChanges(TabletLists tLists) for (Assignment a : tLists.assignments) { TServerConnection client = manager.tserverSet.getConnection(a.server); if (client != null) { - client.assignTablet(manager.primaryManagerLock, a.tablet); + client.assignTablet(manager.getManagerLock(), a.tablet); } else { Manager.log.warn("Could not connect to server {}", a.server); } @@ -1031,8 +1031,7 @@ private void replaceVolumes(List volumeReplacemen vr.filesToRemove.forEach(tabletMutator::deleteFile); vr.filesToAdd.forEach(tabletMutator::putFile); - tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), - manager.getPrimaryManagerLock()); + tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), manager.getManagerLock()); tabletMutator.submit( tm -> tm.getLogs().containsAll(vr.logsToAdd) && tm.getFiles().containsAll(vr.filesToAdd diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java index 941de7525cd..bd3a836e27d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneMetadata.java @@ -46,7 +46,7 @@ public Repo call(long tid, Manager environment) throws Exception { // need to clear out any metadata entries for tableId just in case this // died before and is executing again MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment.getContext(), - environment.getPrimaryManagerLock()); + environment.getManagerLock()); MetadataTableUtil.cloneTable(environment.getContext(), cloneInfo.srcTableId, cloneInfo.tableId); return new FinishCloneTable(cloneInfo); } @@ -54,7 +54,7 @@ public Repo call(long tid, Manager environment) throws Exception { @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment.getContext(), - environment.getPrimaryManagerLock()); + environment.getManagerLock()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index d77f60ab501..1bc229b9f3d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -69,7 +69,7 @@ public Repo call(long tid, Manager env) throws Exception { splitDirMap = Map.of(); } - writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getPrimaryManagerLock()); + writeSplitsToMetadataTable(env.getContext(), splits, splitDirMap, env.getManagerLock()); return new FinishCreateTable(tableInfo); } @@ -102,7 +102,7 @@ private void writeSplitsToMetadataTable(ServerContext context, SortedSet s @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(tableInfo.getTableId(), false, environment.getContext(), - environment.getPrimaryManagerLock()); + environment.getManagerLock()); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java index 95119c89c41..f0c000d93fb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/PopulateMetadataTable.java @@ -207,6 +207,6 @@ public Repo call(long tid, Manager manager) throws Exception { @Override public void undo(long tid, Manager environment) throws Exception { MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment.getContext(), - environment.getPrimaryManagerLock()); + environment.getManagerLock()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java index b3be8607b19..eb7ab83904f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tserverOps/ShutdownTServer.java @@ -68,7 +68,7 @@ public long isReady(long tid, Manager manager) { TabletServerStatus status = connection.getTableMap(false); if (status.tableMap != null && status.tableMap.isEmpty()) { log.info("tablet server hosts no tablets {}", server); - connection.halt(manager.getPrimaryManagerLock()); + connection.halt(manager.getManagerLock()); log.info("tablet server asked to halt {}", server); return 0; } else { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java index 401345844a2..9aff9a89faa 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/ShutdownTServerTest.java @@ -80,7 +80,7 @@ public void testSingleShutdown() throws Exception { EasyMock.expect(manager.onlineTabletServers()).andReturn(Collections.singleton(tserver)); EasyMock.expect(manager.getConnection(tserver)).andReturn(tserverCnxn); EasyMock.expect(tserverCnxn.getTableMap(false)).andReturn(status); - EasyMock.expect(manager.getPrimaryManagerLock()).andReturn(null); + EasyMock.expect(manager.getManagerLock()).andReturn(null); tserverCnxn.halt(null); EasyMock.expectLastCall().once(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 7f34d65b4f6..565bf60739c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -866,7 +866,7 @@ static void checkPermission(SecurityOperation security, ServerContext context, if (lock != null) { ZooUtil.LockID lid = - new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK, lock); + new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMANAGERS, lock); try { if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { From a48eb99b6bcf2aa94a4c8d3ab894f24b56158075 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 22 Nov 2023 19:49:24 +0000 Subject: [PATCH 4/5] Address PR comments --- .../java/org/apache/accumulo/core/conf/Property.java | 6 +++--- .../server/manager/state/LoggingTabletStateStore.java | 6 ------ .../server/manager/state/MetaDataStateStore.java | 10 ---------- .../server/manager/state/TabletStateStore.java | 9 --------- .../server/manager/state/ZooTabletStateStore.java | 1 - 5 files changed, 3 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d03379e0fe9..130b6dd7601 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -318,7 +318,7 @@ public enum Property { MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, "The port used for handling client connections on the manager.", "1.3.5"), MANAGER_PORTSEARCH("manager.port.search", "false", PropertyType.BOOLEAN, - "If the manager.port.client is in use, search higher ports until one is available.", "3.1.0"), + "If the manager.port.client is in use, search higher ports until one is available.", "4.0.0"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -378,14 +378,14 @@ public enum Property { "Minimum number of managers that need to be registered before a manager will start. A value " + "greater than 0 is useful when multiple managers are supposed to be running on startup. " + "When set to 0 or less, no blocking occurs. Default is 0 (disabled).", - "3.1.0"), + "4.0.0"), MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait", "0", PropertyType.TIMEDURATION, "Maximum time manager will wait for manager available threshold " + "to be reached before continuing. When set to 0 or less, will block " + "indefinitely. Default is 0 to block indefinitely. Only valid when manager available " + "threshold is set greater than 1.", - "3.1.0"), + "4.0.0"), MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count", "0", PropertyType.COUNT, "Minimum number of tservers that need to be registered before manager will " diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index f1642e7eb47..8545ecd7c75 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@ -60,12 +60,6 @@ public ClosableIterator iterator(List ranges, return wrapped.iterator(ranges, parameters); } - @Override - @Deprecated - public void overrideRanges(List ranges) { - wrapped.overrideRanges(ranges); - } - @Override public void setFutureLocations(Collection assignments) throws DistributedStoreException { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 21eefaeb13b..a182745da5d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -19,7 +19,6 @@ package org.apache.accumulo.server.manager.state; import java.util.Collection; -import java.util.Collections; import java.util.List; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -29,7 +28,6 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import com.google.common.base.Preconditions; @@ -40,7 +38,6 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState private final String targetTableName; private final Ample ample; private final DataLevel level; - private List ranges; protected MetaDataStateStore(DataLevel level, ClientContext context, String targetTableName) { super(context); @@ -48,7 +45,6 @@ protected MetaDataStateStore(DataLevel level, ClientContext context, String targ this.context = context; this.ample = context.getAmple(); this.targetTableName = targetTableName; - this.ranges = Collections.singletonList(TabletsSection.getRange()); } MetaDataStateStore(DataLevel level, ClientContext context) { @@ -60,12 +56,6 @@ public DataLevel getLevel() { return level; } - @Override - @Deprecated - public void overrideRanges(List ranges) { - this.ranges = ranges; - } - @Override public ClosableIterator iterator(List ranges, TabletManagementParameters parameters) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index b90e40a85b2..272d7b33b9a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -66,15 +66,6 @@ default ClosableIterator iterator(TabletManagementParameters p return iterator(ranges, parameters); } - /** - * Override the range of tablets that the TabletStateStore should retrieve. By default it - * retrieves all tablets. - */ - @Deprecated - default void overrideRanges(List ranges) { - throw new UnsupportedOperationException("Not implemented."); - } - /** * Store the assigned locations in the data store. */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index 0e6fc01bf08..8b7a5abb0c9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -179,5 +179,4 @@ public void unsuspend(Collection tablets) { public String name() { return "Root Table"; } - } From 0727d90f03fc302b4dcc5f11d1573302832290d2 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 27 Nov 2023 22:56:13 +0000 Subject: [PATCH 5/5] Disable fate threads if not primary manager, client connection changes --- .../org/apache/accumulo/core/fate/Fate.java | 10 ++++++++- .../core/rpc/clients/ManagerClient.java | 22 ++++++++++++------- .../org/apache/accumulo/manager/Manager.java | 11 ++++++---- .../accumulo/manager/TabletGroupWatcher.java | 14 +++++++++++- .../accumulo/test/fate/zookeeper/FateIT.java | 12 ++++++---- 5 files changed, 51 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1f0ae0d7583..804ebecf515 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -62,6 +63,7 @@ public class Fate { private final T environment; private final ScheduledThreadPoolExecutor fatePoolWatcher; private final ExecutorService executor; + private final Supplier enabled; private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); @@ -76,6 +78,11 @@ private class TransactionRunner implements Runnable { @Override public void run() { while (keepRunning.get()) { + if (!enabled.get()) { + log.debug("[{}] Not enabled, must not be primary manager"); + UtilWaitThread.sleep(60_000); + continue; + } long deferTime = 0; Long tid = null; try { @@ -224,7 +231,7 @@ private void undo(long tid, Repo op) { * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging */ public Fate(T environment, TStore store, Function,String> toLogStrFunc, - AccumuloConfiguration conf) { + AccumuloConfiguration conf, Supplier enabled) { this.store = FateLogger.wrap(store, toLogStrFunc); this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, @@ -255,6 +262,7 @@ public Fate(T environment, TStore store, Function,String> toLogStrFun } }, 3, SECONDS)); this.executor = pool; + this.enabled = enabled; } // get a transaction id back to the requester before doing any work diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java index d0076e69f1f..6c8d455ec58 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java @@ -36,16 +36,22 @@ public interface ManagerClient { default C getManagerConnection(Logger log, ThriftClientTypes type, ClientContext context) { checkArgument(context != null, "context is null"); - List locations = context.getManagerLocations(); + HostAndPort manager; + if (type == ThriftClientTypes.COORDINATOR || type == ThriftClientTypes.FATE) { + String primaryManager = context.getPrimaryManagerLocation(); + manager = HostAndPort.fromString(primaryManager); + } else { + List locations = context.getManagerLocations(); - if (locations.isEmpty()) { - log.debug("No managers..."); - return null; - } + if (locations.isEmpty()) { + log.debug("No managers..."); + return null; + } - HostAndPort manager = HostAndPort.fromString(locations.get(0)); - if (manager.getPort() == 0) { - return null; + manager = HostAndPort.fromString(locations.get(0)); + if (manager.getPort() == 0) { + return null; + } } try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index dd27b34fd5a..e5e1f1c2bba 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1138,7 +1138,7 @@ public void process(WatchedEvent event) { this.splitter = new Splitter(context); this.splitter.start(); - watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm) { + watchers.add(new TabletGroupWatcher(this, this.userTabletStore, null, mm, () -> true) { @Override boolean canSuspendTablets() { // Always allow user data tablets to enter suspended state. @@ -1146,7 +1146,8 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm) { + watchers.add(new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), mm, + () -> PRIMARY_MANAGER.get()) { @Override boolean canSuspendTablets() { // Allow metadata tablets to enter suspended state only if so configured. Generally @@ -1157,7 +1158,8 @@ boolean canSuspendTablets() { } }); - watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm) { + watchers.add(new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), mm, + () -> PRIMARY_MANAGER.get()) { @Override boolean canSuspendTablets() { // Never allow root tablet to enter suspended state. @@ -1187,7 +1189,8 @@ boolean canSuspendTablets() { context.getZooReaderWriter()), HOURS.toMillis(8), System::currentTimeMillis); - Fate f = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + Fate f = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration(), + () -> PRIMARY_MANAGER.get()); fateRef.set(f); fateReadyLatch.countDown(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e67aa616516..f813cdbbd35 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; @@ -76,6 +77,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; import org.apache.accumulo.manager.metrics.ManagerMetrics; @@ -143,8 +145,10 @@ public Text getEncodedEndRow() { /* updates set by LiveManagerSet callback */ private final AtomicReference> managerUpdates = new AtomicReference<>(); + private final Supplier enabledSupplier; + TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher, - ManagerMetrics metrics) { + ManagerMetrics metrics, Supplier enabledSupplier) { super("Watching " + store.name()); this.manager = manager; this.store = store; @@ -152,6 +156,7 @@ public Text getEncodedEndRow() { this.metrics = metrics; this.walStateManager = new WalStateManager(manager.getContext()); this.eventHandler = new EventHandler(); + this.enabledSupplier = enabledSupplier; manager.getEventCoordinator().addListener(store.getLevel(), eventHandler); } @@ -621,6 +626,13 @@ public void run() { AtomicReference> tablesFromLastRun = new AtomicReference<>(null); while (manager.stillManager()) { + + if (!enabledSupplier.get()) { + Manager.log.debug("[{}] Not enabled, must not be primary manager"); + UtilWaitThread.sleep(60_000); + continue; + } + // slow things down a little, otherwise we spam the logs when there are many wake-up events sleepUninterruptibly(100, TimeUnit.MILLISECONDS); // ELASTICITY_TODO above sleep in the case when not doing a full scan to make manager more diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 5834179fc09..d927790ae22 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -161,7 +161,8 @@ public void testTransactionStatus() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = + new Fate(manager, store, TraceRepo::toLogString, config, () -> true); try { // Wait for the transaction runner to be scheduled. @@ -221,7 +222,8 @@ public void testCancelWhileNew() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = + new Fate(manager, store, TraceRepo::toLogString, config, () -> true); try { // Wait for the transaction runner to be scheduled. @@ -260,7 +262,8 @@ public void testCancelWhileSubmittedAndRunning() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = + new Fate(manager, store, TraceRepo::toLogString, config, () -> true); try { // Wait for the transaction runner to be scheduled. @@ -301,7 +304,8 @@ public void testCancelWhileInCall() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = + new Fate(manager, store, TraceRepo::toLogString, config, () -> true); try { // Wait for the transaction runner to be scheduled.