Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -506,41 +507,75 @@ public String getRootTabletLocation() {
return loc.getHostPort();
}

/**
* Returns the location(s) of the accumulo manager and any redundant servers.
*
* @return a list of locations in "hostname:port" form
*/
public List<String> getManagerLocations() {
public String getPrimaryManagerLocation() {
ensureOpen();
var zLockManagerPath =
ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGER_LOCK);

OpTimer timer = null;

if (log.isTraceEnabled()) {
log.trace("tid={} Looking up manager location in zookeeper at {}.",
log.trace("tid={} Looking up primary manager location in zookeeper at {}.",
Thread.currentThread().getId(), zLockManagerPath);
timer = new OpTimer().start();
}

Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
location = sld.orElseThrow().getAddressString(ThriftService.FATE);
}

if (timer != null) {
timer.stop();
log.trace("tid={} Found manager at {} in {}", Thread.currentThread().getId(),
log.trace("tid={} Found primary manager at {} in {}", Thread.currentThread().getId(),
(location == null ? "null" : location), String.format("%.3f secs", timer.scale(SECONDS)));
}

if (location == null) {
return Collections.emptyList();
return location;
}

/**
* Returns the location(s) of the accumulo manager and any redundant servers.
*
* @return a list of locations in "hostname:port" form
*/
public List<String> getManagerLocations() {
ensureOpen();

List<String> locations = new ArrayList<>();

var zLockManagerPath =
ServiceLock.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS);

OpTimer timer = null;

if (log.isTraceEnabled()) {
log.trace("tid={} Looking up manager locations in zookeeper at {}.",
Thread.currentThread().getId(), zLockManagerPath);
timer = new OpTimer().start();
}

for (String manager : zooCache
.getChildren(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS)) {
if (manager.contains(":")) {
var zLocPath = ServiceLock
.path(Constants.ZROOT + "/" + getInstanceID() + Constants.ZMANAGERS + "/" + manager);
Optional<ServiceLockData> sld = zooCache.getLockData(zLocPath);
if (sld.isPresent()) {
locations.add(sld.orElseThrow().getAddressString(ThriftService.MANAGER));
}
}
}

if (timer != null) {
timer.stop();
log.trace("tid={} Found managers at {} in {}", Thread.currentThread().getId(),
(locations == null ? "null" : locations),
String.format("%.3f secs", timer.scale(SECONDS)));
}

return Collections.singletonList(location);
return locations;
}

/**
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ public enum Property {
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
MANAGER_PORTSEARCH("manager.port.search", "false", PropertyType.BOOLEAN,
"If the manager.port.client is in use, search higher ports until one is available.", "4.0.0"),
MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -388,6 +390,19 @@ public enum Property {
"Allow tablets for the " + MetadataTable.NAME
+ " table to be suspended via table.suspend.duration.",
"1.8.0"),
MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT("manager.startup.manager.avail.min.count", "0",
PropertyType.COUNT,
"Minimum number of managers that need to be registered before a manager will start. A value "
+ "greater than 0 is useful when multiple managers are supposed to be running on startup. "
+ "When set to 0 or less, no blocking occurs. Default is 0 (disabled).",
"4.0.0"),
MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait", "0",
PropertyType.TIMEDURATION,
"Maximum time manager will wait for manager available threshold "
+ "to be reached before continuing. When set to 0 or less, will block "
+ "indefinitely. Default is 0 to block indefinitely. Only valid when manager available "
+ "threshold is set greater than 1.",
"4.0.0"),
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count", "0",
PropertyType.COUNT,
"Minimum number of tservers that need to be registered before manager will "
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
Expand All @@ -62,6 +63,7 @@ public class Fate<T> {
private final T environment;
private final ScheduledThreadPoolExecutor fatePoolWatcher;
private final ExecutorService executor;
private final Supplier<Boolean> enabled;

private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

Expand All @@ -76,6 +78,11 @@ private class TransactionRunner implements Runnable {
@Override
public void run() {
while (keepRunning.get()) {
if (!enabled.get()) {
log.debug("[{}] Not enabled, must not be primary manager");
UtilWaitThread.sleep(60_000);
continue;
}
long deferTime = 0;
Long tid = null;
try {
Expand Down Expand Up @@ -224,7 +231,7 @@ private void undo(long tid, Repo<T> op) {
* @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging
*/
public Fate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFunc,
AccumuloConfiguration conf) {
AccumuloConfiguration conf, Supplier<Boolean> enabled) {
this.store = FateLogger.wrap(store, toLogStrFunc);
this.environment = environment;
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Expand Down Expand Up @@ -255,6 +262,7 @@ public Fate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFun
}
}, 3, SECONDS));
this.executor = pool;
this.enabled = enabled;
}

// get a transaction id back to the requester before doing any work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public LockID(String root, String serializedLID) {
if (lastSlash == 0) {
path = root;
} else {
path = root + "/" + sa[0].substring(0, lastSlash);
path = root;
if (!sa[0].startsWith("/")) {
path += "/";
}
Comment on lines +76 to +78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this because for some reason sa[0] starts with a / for the MANAGERS locks.

path += sa[0].substring(0, lastSlash);
}
node = sa[0].substring(lastSlash + 1);
eid = new BigInteger(sa[1], 16).longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ public interface ManagerClient<C extends TServiceClient> {
default C getManagerConnection(Logger log, ThriftClientTypes<C> type, ClientContext context) {
checkArgument(context != null, "context is null");

List<String> locations = context.getManagerLocations();
HostAndPort manager;
if (type == ThriftClientTypes.COORDINATOR || type == ThriftClientTypes.FATE) {
String primaryManager = context.getPrimaryManagerLocation();
manager = HostAndPort.fromString(primaryManager);
} else {
List<String> locations = context.getManagerLocations();

if (locations.isEmpty()) {
log.debug("No managers...");
return null;
}
if (locations.isEmpty()) {
log.debug("No managers...");
return null;
}

HostAndPort manager = HostAndPort.fromString(locations.get(0));
if (manager.getPort() == 0) {
return null;
manager = HostAndPort.fromString(locations.get(0));
if (manager.getPort() == 0) {
return null;
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

public class ClusterServerConfiguration {

private int numManagers = 1;
private final Map<String,Integer> compactors;
private final Map<String,Integer> sservers;
private final Map<String,Integer> tservers;
Expand All @@ -36,7 +37,7 @@ public class ClusterServerConfiguration {
* in the default resource group
*/
public ClusterServerConfiguration() {
this(1, 1, 2);
this(1, 1, 1, 2);
}

/**
Expand All @@ -46,7 +47,9 @@ public ClusterServerConfiguration() {
* @param numSServers number of scan servers in the default resource group
* @param numTServers number of tablet servers in the default resource group
*/
public ClusterServerConfiguration(int numCompactors, int numSServers, int numTServers) {
public ClusterServerConfiguration(int numManagers, int numCompactors, int numSServers,
int numTServers) {
this.numManagers = numManagers;
compactors = new HashMap<>();
compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
sservers = new HashMap<>();
Expand All @@ -55,6 +58,14 @@ public ClusterServerConfiguration(int numCompactors, int numSServers, int numTSe
tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers);
}

public int getNumManagers() {
return this.numManagers;
}

public void setNumManagers(int num) {
this.numManagers = num;
}

public void setNumDefaultCompactors(int numCompactors) {
compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class MiniAccumuloClusterControl implements ClusterControl {
protected MiniAccumuloClusterImpl cluster;

Process zooKeeperProcess = null;
Process managerProcess = null;
final List<Process> managerProcesses = new ArrayList<>();
Process gcProcess = null;
Process monitor = null;
final Map<String,List<Process>> tabletServerProcesses = new HashMap<>();
Expand Down Expand Up @@ -189,8 +189,14 @@ public synchronized void start(ServerType server, Map<String,String> configOverr
}
break;
case MANAGER:
if (managerProcess == null) {
managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess();
synchronized (managerProcesses) {
int count = 0;
for (int i = managerProcesses.size();
count < limit
&& i < cluster.getConfig().getClusterServerConfiguration().getNumManagers();
i++, ++count) {
managerProcesses.add(cluster._exec(classToUse, server, configOverrides).getProcess());
}
}
break;
case ZOOKEEPER:
Expand Down Expand Up @@ -258,15 +264,19 @@ public void stop(ServerType server) throws IOException {
public synchronized void stop(ServerType server, String hostname) throws IOException {
switch (server) {
case MANAGER:
if (managerProcess != null) {
synchronized (managerProcesses) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
for (Process manager : managerProcesses) {
try {
cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
managerProcess = null;
managerProcesses.clear();
}
}
break;
Expand Down Expand Up @@ -392,14 +402,19 @@ public void killProcess(ServerType type, ProcessReference procRef)
boolean found = false;
switch (type) {
case MANAGER:
if (procRef.getProcess().equals(managerProcess)) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
synchronized (managerProcesses) {
for (Process manager : managerProcesses) {
if (procRef.getProcess().equals(manager)) {
managerProcesses.remove(manager);
try {
cluster.stopProcessWithTimeout(manager, 30, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
}
found = true;
break;
}
}
managerProcess = null;
found = true;
}
break;
case TABLET_SERVER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,12 @@ private void verifyUp() throws InterruptedException, IOException {

int numTries = 10;

requireNonNull(getClusterControl().managerProcess, "Error starting Manager - no process");
waitForProcessStart(getClusterControl().managerProcess, "Manager");
int mgrExpectedCount = 0;
for (Process tsp : getClusterControl().managerProcesses) {
mgrExpectedCount++;
requireNonNull(tsp, "Error starting Manager " + mgrExpectedCount + " - no process");
waitForProcessStart(tsp, "Manager" + mgrExpectedCount);
}

requireNonNull(getClusterControl().gcProcess, "Error starting GC - no process");
waitForProcessStart(getClusterControl().gcProcess, "GC");
Expand Down Expand Up @@ -875,7 +879,7 @@ List<ProcessReference> references(Process... procs) {
public Map<ServerType,Collection<ProcessReference>> getProcesses() {
Map<ServerType,Collection<ProcessReference>> result = new HashMap<>();
MiniAccumuloClusterControl control = getClusterControl();
result.put(ServerType.MANAGER, references(control.managerProcess));
result.put(ServerType.MANAGER, references(control.managerProcesses.toArray(new Process[0])));
result.put(ServerType.TABLET_SERVER, references(control.tabletServerProcesses.values().stream()
.flatMap(List::stream).collect(Collectors.toList()).toArray(new Process[0])));
result.put(ServerType.COMPACTOR, references(control.compactorProcesses.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public HostAndPort getAddress() {
}

private String lockString(ServiceLock mlock) {
return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK);
return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGERS);
}

private void loadTablet(TabletManagementClientService.Client client, ServiceLock lock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,10 @@ public ClosableIterator<TabletManagement> iterator(List<Range> ranges,
public String name() {
return "Metadata Tablets";
}

@Override
public DataLevel getLevel() {
return DataLevel.METADATA;
}

}
Loading