Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,34 @@ public void execute() throws Exception {
for (PhaseExec phase : workload.getPhases()) {
LOGGER.info("Running " + phase.getId() + " phase...");
final Instant phaseStartTime = Instant.now();

EventInfo eventInfo;
try {
final List<SessionExecutor> threads = new ArrayList<>();
for (SessionExec session : phase.getSessions()) {
threads.add(
new SessionExecutor(
connectionManagers.get(session.getTargetEndpoint()),
this.telemetryRegistry,
session,
runtimeParameterValues,
phaseIdToEndTime,
this.experimentStartTime));
if (phase.getMinimalDurationSeconds() != null
&& phase.getMinimalDurationSeconds() > 0) {
// Time-based execution with minimum duration.
long phaseEndTime =
phaseStartTime.toEpochMilli() + phase.getMinimalDurationSeconds() * 1000L;
while (System.currentTimeMillis() < phaseEndTime) {
executePhase(
phase, runtimeParameterValues, phaseIdToEndTime, executor, phaseStartTime);
}
} else {
// Single execution.
executePhase(
phase, runtimeParameterValues, phaseIdToEndTime, executor, phaseStartTime);
}
checkResults(executor.invokeAll(threads));
eventInfo = writePhaseEvent(phaseStartTime, phase.getId(), Status.SUCCESS);

LOGGER.info(
"Phase {} finished in {} seconds.",
phase.getId(),
ChronoUnit.SECONDS.between(phaseStartTime, eventInfo.getEndTime()));
phaseIdToEndTime.put(phase.getId(), eventInfo.getEndTime());
} catch (Exception e) {
LOGGER.error("Exception executing phase: " + phase.getId());
writePhaseEvent(phaseStartTime, phase.getId(), Status.FAILURE);
LOGGER.error("Exception executing phase: " + phase.getId(), e);
throw e;
} finally {
telemetryRegistry.flush();
}
LOGGER.info(
"Phase {} finished in {} seconds.",
phase.getId(),
ChronoUnit.SECONDS.between(phaseStartTime, eventInfo.getEndTime()));
phaseIdToEndTime.put(phase.getId(), eventInfo.getEndTime());
}

// Log end-to-end execution of experiment.
Expand Down Expand Up @@ -151,6 +152,35 @@ public void execute() throws Exception {
LOGGER.info("Finished experiment: {}", config.getId());
}

private void executePhase(
PhaseExec phase,
Map<String, Object> runtimeParameterValues,
Map<String, Instant> phaseIdToEndTime,
ExecutorService executor,
Instant phaseStartTime)
throws Exception {
try {
final List<SessionExecutor> threads = new ArrayList<>();
for (SessionExec session : phase.getSessions()) {
threads.add(
new SessionExecutor(
connectionManagers.get(session.getTargetEndpoint()),
this.telemetryRegistry,
session,
runtimeParameterValues,
phaseIdToEndTime,
this.experimentStartTime));
}
checkResults(executor.invokeAll(threads));
} catch (Exception e) {
LOGGER.error("Exception executing phase: " + phase.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is logged is the calling method as well, it seems this line can be removed.

writePhaseEvent(phaseStartTime, phase.getId(), Status.FAILURE);
throw e;
} finally {
telemetryRegistry.flush();
}
}

private void checkResults(List<Future<Boolean>> results) {
for (Future<Boolean> result : results) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.microsoft.lst_bench.exec;

import java.util.List;
import javax.annotation.Nullable;
import org.immutables.value.Value;

/** Represents a phase in a workload. */
Expand All @@ -25,5 +26,7 @@ public interface PhaseExec {

String getId();

@Nullable Integer getMinimalDurationSeconds();

List<SessionExec> getSessions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private static PhaseExec createPhaseExec(
taskTemplateIdToParameterValuesCounter);
sessionExecList.add(sessionExec);
}
return ImmutablePhaseExec.of(phase.getId(), sessionExecList);
return ImmutablePhaseExec.of(phase.getId(), phase.getMinimalDurationSeconds(), sessionExecList);
}

private static SessionExec createSessionExec(
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/com/microsoft/lst_bench/input/Phase.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
public interface Phase {
String getId();

@JsonProperty("minimal_duration_seconds")
@Nullable Integer getMinimalDurationSeconds();

@JsonProperty("template_id")
@Nullable String getTemplateId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.microsoft.lst_bench.input;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
import javax.annotation.Nullable;
import org.immutables.value.Value;

/**
Expand All @@ -33,5 +35,8 @@
public interface PhaseTemplate {
String getId();

@JsonProperty("minimal_duration_seconds")
@Nullable Integer getMinimalDurationSeconds();

List<Session> getSessions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,58 @@ void testExperimentRetry() throws Exception {
benchmark.run();
}

/**
* This test checks whether the minimal duration of a phase is correctly set and respected during
* execution. It uses a mock connection manager that does not execute any SQL. The test verifies
* that the phase runs for at least the specified minimal duration.
*/
@Test
void testExperimentMinimalDuration() throws Exception {
Connection mockConnection = Mockito.mock(Connection.class);
ConnectionManager mockConnectionManager = Mockito.mock(ConnectionManager.class);
Mockito.when(mockConnectionManager.createConnection()).thenReturn(mockConnection);

// Current workload relies on 2 connection managers
var connectionManagers = new ArrayList<ConnectionManager>();
connectionManagers.add(mockConnectionManager);
connectionManagers.add(mockConnectionManager);

ExperimentConfig experimentConfig =
ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();

URL libFile = getClass().getClassLoader().getResource("./config/samples/library_0.yaml");
Assertions.assertNotNull(libFile);
Library library = FileParser.loadLibrary(libFile.getFile());

URL workloadFile =
getClass().getClassLoader().getResource("./config/spark/w_all_tpcds-delta-time.yaml");
Assertions.assertNotNull(workloadFile);
Workload workload = FileParser.loadWorkload(workloadFile.getFile());

var config = BenchmarkObjectFactory.benchmarkConfig(experimentConfig, library, workload);

SQLTelemetryRegistry telemetryRegistry = getTelemetryRegistry();

LSTBenchmarkExecutor benchmark =
new LSTBenchmarkExecutor(connectionManagers, config, telemetryRegistry);
benchmark.run();

try (var validationConnection =
DriverManager.getConnection("jdbc:duckdb:./" + telemetryDbFileName)) {
ResultSet resultset =
validationConnection
.createStatement()
.executeQuery(
"SELECT CAST(EXTRACT(EPOCH FROM CAST(event_end_time AS TIMESTAMP) - CAST(event_start_time AS TIMESTAMP)) AS INTEGER) AS seconds_diff FROM experiment_telemetry WHERE event_type = 'EXEC_PHASE' AND event_id = 'single_user'");
int seconds = 0;
while (resultset.next()) {
seconds = resultset.getInt("seconds_diff");
}
Assertions.assertTrue(
seconds >= 5, "Phase did not run for at least 5 seconds, actual: " + seconds);
}
}

private SQLTelemetryRegistry getTelemetryRegistry() throws ClientException, IOException {
URL telemetryConfigFile =
getClass().getClassLoader().getResource("./config/spark/telemetry_config.yaml");
Expand Down
50 changes: 50 additions & 0 deletions core/src/test/resources/config/spark/w_all_tpcds-delta-time.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Description: Workload for test: All task types, TPC-DS, Delta
---
version: 1
id: w_all_tpcds
phases:
- id: setup
sessions:
- tasks:
- template_id: setup
- id: setup_data_maintenance
sessions:
- tasks:
- template_id: setup_data_maintenance
- id: init
sessions:
- tasks:
- template_id: init
- id: build
sessions:
- tasks:
- template_id: build
- id: single_user
minimal_duration_seconds: 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property should be added to the phase_template item in template.json for schema validation. I'm actually surprised validation doesn't fail in your test, since it goes through the FileParser.loadWorkload method, which should run validation. That likely means something isn't working correctly in the validation :(

sessions:
- tasks:
- template_id: single_user
- id: data_maintenance
sessions:
- tasks:
- template_id: data_maintenance_delta
- id: data_maintenance_dependent
sessions:
- tasks:
- template_id: data_maintenance_dependent
task_executor_arguments:
dependent_task_batch_size: 100
# TODO: Remove this once #182 is fixed
skip_erroneous_query_strings: "[DELTA_FAILED_RECOGNIZE_PREDICATE]"
- id: optimize
sessions:
- tasks:
- template_id: optimize_delta
- id: optimize_split
sessions:
- tasks:
- template_id: optimize_split_delta
task_executor_arguments:
dependent_task_batch_size: 100
# TODO: Remove this once #182 is fixed
skip_erroneous_query_strings: "[DELTA_FAILED_RECOGNIZE_PREDICATE]"
Loading