Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ private static class ScanServerShutdownITConfiguration
public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
org.apache.hadoop.conf.Configuration coreSite) {

cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
cfg.getClusterServerConfiguration().setNumDefaultScanServers(1);

// Timeout scan sessions after being idle for 3 seconds
cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");

// Configure the scan server to only have 1 scan executor thread. This means
// that the scan server will run scans serially, not concurrently.
cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");

// Set our custom implementation that shuts down after 3 batch scans
cfg.setServerClass(ServerType.SCAN_SERVER, SelfStoppingScanServer.class);
}
}

Expand All @@ -87,14 +90,6 @@ public void testRefRemovalOnShutdown() throws Exception {
ZooReaderWriter zrw = ctx.getZooReaderWriter();
String scanServerRoot = zooRoot + Constants.ZSSERVERS;

Wait.waitFor(() -> zrw.getChildren(scanServerRoot).size() == 0);

// Stop normal ScanServers so that we can start our custom implementation
// that shuts down after 3 batch scans
getCluster().getConfig().getClusterServerConfiguration().setNumDefaultScanServers(1);
getCluster().getClusterControl().start(ServerType.SCAN_SERVER, null, 1,
SelfStoppingScanServer.class);

// Wait for the ScanServer to register in ZK
Wait.waitFor(() -> zrw.getChildren(scanServerRoot).size() == 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.FindCompactionTmpFiles;
Expand All @@ -75,6 +76,7 @@ static class ExternalCompaction2Config implements MiniClusterConfigurationCallba
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
cfg.setServerClass(ServerType.COMPACTOR, ExternalDoNothingCompactor.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
*/
package org.apache.accumulo.test.compaction;

import org.apache.accumulo.minicluster.ServerType;
import org.junit.jupiter.api.BeforeAll;

public class ExternalCompaction_2_IT extends ExternalCompaction2BaseIT {

@BeforeAll
public static void beforeTests() throws Exception {
startMiniClusterWithConfig(new ExternalCompaction2Config());
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
ExternalDoNothingCompactor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.ample.FlakyAmpleManager;
import org.apache.accumulo.test.ample.FlakyAmpleServerContext;
import org.apache.accumulo.test.ample.FlakyAmpleTserver;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

Expand All @@ -37,16 +39,18 @@
*/
public class FlakyExternalCompaction2IT extends ExternalCompaction2BaseIT {

@BeforeAll
public static void setup() throws Exception {
SharedMiniClusterBase.startMiniClusterWithConfig((cfg, coreSite) -> {
ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
static class FlakyExternalCompaction2Config extends ExternalCompaction2Config {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
super.configureMiniCluster(cfg, coreSite);
cfg.setServerClass(ServerType.MANAGER, FlakyAmpleManager.class);
cfg.setServerClass(ServerType.TABLET_SERVER, FlakyAmpleTserver.class);
});
getCluster().getClusterControl().stop(ServerType.COMPACTOR);
getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
ExternalDoNothingCompactor.class);
}
}

@BeforeAll
public static void setup() throws Exception {
startMiniClusterWithConfig(new FlakyExternalCompaction2Config());
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -83,6 +82,9 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
cfg.setSystemProperties(sysProps);

// Set a compactor that will consume and free memory when we need it to
cfg.setServerClass(ServerType.COMPACTOR, MemoryConsumingCompactor.class);
}
}

Expand Down Expand Up @@ -137,25 +139,6 @@ public void testMajCPauses() throws Exception {

ClientContext ctx = (ClientContext) client;

// Kill the normal compactors and wait until their addresses in ZK are cleared
getCluster().getConfig().getClusterServerConfiguration().getCompactorConfiguration().keySet()
.forEach(resourceGroup -> {
List<Process> procs = getCluster().getClusterControl().getCompactors(resourceGroup);
for (int i = 0; i < procs.size(); i++) {
LOG.info("Stopping compactor process: {}", procs.get(i).pid());
try {
procs.get(i).destroyForcibly().waitFor();
} catch (InterruptedException e) {
fail("Interrupted trying to stop compactor process");
}
}
getCluster().getClusterControl().getCompactors(resourceGroup).clear();
});
Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 0, 60_000);

// Start the Compactors that will consume and free memory when we need it to
getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
MemoryConsumingCompactor.class);
Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 1, 60_000);
Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx)
.get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000);
Expand Down