From 120ebb6b7e7dc977209cd3dddd01e915725fd3e7 Mon Sep 17 00:00:00 2001 From: Anja Gruenheid Date: Wed, 6 Aug 2025 13:12:32 +0200 Subject: [PATCH 1/2] Add an optional minimal duration for a phase. --- .../common/LSTBenchmarkExecutor.java | 72 +++++++++++++------ .../microsoft/lst_bench/exec/PhaseExec.java | 3 + .../input/BenchmarkObjectFactory.java | 2 +- .../com/microsoft/lst_bench/input/Phase.java | 3 + .../lst_bench/input/PhaseTemplate.java | 5 ++ .../common/LSTBenchmarkExecutorTest.java | 52 ++++++++++++++ .../config/spark/w_all_tpcds-delta-time.yaml | 50 +++++++++++++ 7 files changed, 165 insertions(+), 22 deletions(-) create mode 100644 core/src/test/resources/config/spark/w_all_tpcds-delta-time.yaml diff --git a/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java b/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java index ef9d1066..2d80e1ca 100644 --- a/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java +++ b/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java @@ -64,7 +64,7 @@ public LSTBenchmarkExecutor( this.telemetryRegistry = telemetryRegistry; } - /** This method runs the experiment. */ + /** This method runs the experiment. */ public void execute() throws Exception { this.experimentStartTime = DateTimeFormatter.U_FORMATTER.format(Instant.now()); LOGGER.info("Running experiment: {}, start-time: {}", config.getId(), experimentStartTime); @@ -96,33 +96,34 @@ public void execute() throws Exception { for (PhaseExec phase : workload.getPhases()) { LOGGER.info("Running " + phase.getId() + " phase..."); final Instant phaseStartTime = Instant.now(); + EventInfo eventInfo; try { - final List threads = new ArrayList<>(); - for (SessionExec session : phase.getSessions()) { - threads.add( - new SessionExecutor( - connectionManagers.get(session.getTargetEndpoint()), - this.telemetryRegistry, - session, - runtimeParameterValues, - phaseIdToEndTime, - this.experimentStartTime)); + if (phase.getMinimalDurationSeconds() != null + && phase.getMinimalDurationSeconds() > 0) { + // Time-based execution with minimum duration. + long phaseEndTime = + phaseStartTime.toEpochMilli() + phase.getMinimalDurationSeconds() * 1000L; + while (System.currentTimeMillis() < phaseEndTime) { + executePhase( + phase, runtimeParameterValues, phaseIdToEndTime, executor, phaseStartTime); + } + } else { + // Single execution. + executePhase( + phase, runtimeParameterValues, phaseIdToEndTime, executor, phaseStartTime); } - checkResults(executor.invokeAll(threads)); eventInfo = writePhaseEvent(phaseStartTime, phase.getId(), Status.SUCCESS); + + LOGGER.info( + "Phase {} finished in {} seconds.", + phase.getId(), + ChronoUnit.SECONDS.between(phaseStartTime, eventInfo.getEndTime())); + phaseIdToEndTime.put(phase.getId(), eventInfo.getEndTime()); } catch (Exception e) { - LOGGER.error("Exception executing phase: " + phase.getId()); - writePhaseEvent(phaseStartTime, phase.getId(), Status.FAILURE); + LOGGER.error("Exception executing phase: " + phase.getId(), e); throw e; - } finally { - telemetryRegistry.flush(); } - LOGGER.info( - "Phase {} finished in {} seconds.", - phase.getId(), - ChronoUnit.SECONDS.between(phaseStartTime, eventInfo.getEndTime())); - phaseIdToEndTime.put(phase.getId(), eventInfo.getEndTime()); } // Log end-to-end execution of experiment. @@ -151,6 +152,35 @@ public void execute() throws Exception { LOGGER.info("Finished experiment: {}", config.getId()); } + private void executePhase( + PhaseExec phase, + Map runtimeParameterValues, + Map phaseIdToEndTime, + ExecutorService executor, + Instant phaseStartTime) + throws Exception { + try { + final List threads = new ArrayList<>(); + for (SessionExec session : phase.getSessions()) { + threads.add( + new SessionExecutor( + connectionManagers.get(session.getTargetEndpoint()), + this.telemetryRegistry, + session, + runtimeParameterValues, + phaseIdToEndTime, + this.experimentStartTime)); + } + checkResults(executor.invokeAll(threads)); + } catch (Exception e) { + LOGGER.error("Exception executing phase: " + phase.getId()); + writePhaseEvent(phaseStartTime, phase.getId(), Status.FAILURE); + throw e; + } finally { + telemetryRegistry.flush(); + } + } + private void checkResults(List> results) { for (Future result : results) { try { diff --git a/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java b/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java index e77eaead..a46cfb50 100644 --- a/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java +++ b/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java @@ -16,6 +16,7 @@ package com.microsoft.lst_bench.exec; import java.util.List; +import javax.annotation.Nullable; import org.immutables.value.Value; /** Represents a phase in a workload. */ @@ -24,6 +25,8 @@ public interface PhaseExec { String getId(); + + @Nullable Integer getMinimalDurationSeconds(); List getSessions(); } diff --git a/core/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java b/core/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java index 7191eb5c..3fd025e7 100644 --- a/core/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java +++ b/core/src/main/java/com/microsoft/lst_bench/input/BenchmarkObjectFactory.java @@ -169,7 +169,7 @@ private static PhaseExec createPhaseExec( taskTemplateIdToParameterValuesCounter); sessionExecList.add(sessionExec); } - return ImmutablePhaseExec.of(phase.getId(), sessionExecList); + return ImmutablePhaseExec.of(phase.getId(), phase.getMinimalDurationSeconds(), sessionExecList); } private static SessionExec createSessionExec( diff --git a/core/src/main/java/com/microsoft/lst_bench/input/Phase.java b/core/src/main/java/com/microsoft/lst_bench/input/Phase.java index b48c71a4..7594cc31 100644 --- a/core/src/main/java/com/microsoft/lst_bench/input/Phase.java +++ b/core/src/main/java/com/microsoft/lst_bench/input/Phase.java @@ -31,6 +31,9 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public interface Phase { String getId(); + + @JsonProperty("minimal_duration_seconds") + @Nullable Integer getMinimalDurationSeconds(); @JsonProperty("template_id") @Nullable String getTemplateId(); diff --git a/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java b/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java index 34940ae6..d6c32cfc 100644 --- a/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java +++ b/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java @@ -16,9 +16,11 @@ package com.microsoft.lst_bench.input; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.List; +import javax.annotation.Nullable; import org.immutables.value.Value; /** @@ -32,6 +34,9 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public interface PhaseTemplate { String getId(); + + @JsonProperty("minimal_duration_seconds") + @Nullable Integer getMinimalDurationSeconds(); List getSessions(); } diff --git a/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java b/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java index 310143ee..90d10672 100644 --- a/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java +++ b/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java @@ -179,6 +179,58 @@ void testExperimentRetry() throws Exception { benchmark.run(); } + /** + * This test checks whether the minimal duration of a phase is correctly set and respected during + * execution. It uses a mock connection manager that does not execute any SQL. The test verifies + * that the phase runs for at least the specified minimal duration. + */ + @Test + void testExperimentMinimalDuration() throws Exception { + Connection mockConnection = Mockito.mock(Connection.class); + ConnectionManager mockConnectionManager = Mockito.mock(ConnectionManager.class); + Mockito.when(mockConnectionManager.createConnection()).thenReturn(mockConnection); + + // Current workload relies on 2 connection managers + var connectionManagers = new ArrayList(); + connectionManagers.add(mockConnectionManager); + connectionManagers.add(mockConnectionManager); + + ExperimentConfig experimentConfig = + ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build(); + + URL libFile = getClass().getClassLoader().getResource("./config/samples/library_0.yaml"); + Assertions.assertNotNull(libFile); + Library library = FileParser.loadLibrary(libFile.getFile()); + + URL workloadFile = + getClass().getClassLoader().getResource("./config/spark/w_all_tpcds-delta-time.yaml"); + Assertions.assertNotNull(workloadFile); + Workload workload = FileParser.loadWorkload(workloadFile.getFile()); + + var config = BenchmarkObjectFactory.benchmarkConfig(experimentConfig, library, workload); + + SQLTelemetryRegistry telemetryRegistry = getTelemetryRegistry(); + + LSTBenchmarkExecutor benchmark = + new LSTBenchmarkExecutor(connectionManagers, config, telemetryRegistry); + benchmark.run(); + + try (var validationConnection = + DriverManager.getConnection("jdbc:duckdb:./" + telemetryDbFileName)) { + ResultSet resultset = + validationConnection + .createStatement() + .executeQuery( + "SELECT CAST(EXTRACT(EPOCH FROM CAST(event_end_time AS TIMESTAMP) - CAST(event_start_time AS TIMESTAMP)) AS INTEGER) AS seconds_diff FROM experiment_telemetry WHERE event_type = 'EXEC_PHASE' AND event_id = 'single_user'"); + int seconds = 0; + while (resultset.next()) { + seconds = resultset.getInt("seconds_diff"); + } + Assertions.assertTrue( + seconds >= 5, "Phase did not run for at least 5 seconds, actual: " + seconds); + } + } + private SQLTelemetryRegistry getTelemetryRegistry() throws ClientException, IOException { URL telemetryConfigFile = getClass().getClassLoader().getResource("./config/spark/telemetry_config.yaml"); diff --git a/core/src/test/resources/config/spark/w_all_tpcds-delta-time.yaml b/core/src/test/resources/config/spark/w_all_tpcds-delta-time.yaml new file mode 100644 index 00000000..47fc076f --- /dev/null +++ b/core/src/test/resources/config/spark/w_all_tpcds-delta-time.yaml @@ -0,0 +1,50 @@ +# Description: Workload for test: All task types, TPC-DS, Delta +--- +version: 1 +id: w_all_tpcds +phases: +- id: setup + sessions: + - tasks: + - template_id: setup +- id: setup_data_maintenance + sessions: + - tasks: + - template_id: setup_data_maintenance +- id: init + sessions: + - tasks: + - template_id: init +- id: build + sessions: + - tasks: + - template_id: build +- id: single_user + minimal_duration_seconds: 5 + sessions: + - tasks: + - template_id: single_user +- id: data_maintenance + sessions: + - tasks: + - template_id: data_maintenance_delta +- id: data_maintenance_dependent + sessions: + - tasks: + - template_id: data_maintenance_dependent + task_executor_arguments: + dependent_task_batch_size: 100 + # TODO: Remove this once #182 is fixed + skip_erroneous_query_strings: "[DELTA_FAILED_RECOGNIZE_PREDICATE]" +- id: optimize + sessions: + - tasks: + - template_id: optimize_delta +- id: optimize_split + sessions: + - tasks: + - template_id: optimize_split_delta + task_executor_arguments: + dependent_task_batch_size: 100 + # TODO: Remove this once #182 is fixed + skip_erroneous_query_strings: "[DELTA_FAILED_RECOGNIZE_PREDICATE]" From 0a993434604c971891eb35243227c9805e18337c Mon Sep 17 00:00:00 2001 From: anjagruenheid <87397397+anjagruenheid@users.noreply.github.com> Date: Wed, 6 Aug 2025 11:16:42 +0000 Subject: [PATCH 2/2] Spotless --- .../com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java | 2 +- core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java | 2 +- core/src/main/java/com/microsoft/lst_bench/input/Phase.java | 2 +- .../main/java/com/microsoft/lst_bench/input/PhaseTemplate.java | 2 +- .../microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java b/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java index 2d80e1ca..2f41a774 100644 --- a/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java +++ b/core/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java @@ -64,7 +64,7 @@ public LSTBenchmarkExecutor( this.telemetryRegistry = telemetryRegistry; } - /** This method runs the experiment. */ + /** This method runs the experiment. */ public void execute() throws Exception { this.experimentStartTime = DateTimeFormatter.U_FORMATTER.format(Instant.now()); LOGGER.info("Running experiment: {}, start-time: {}", config.getId(), experimentStartTime); diff --git a/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java b/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java index a46cfb50..d83368b1 100644 --- a/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java +++ b/core/src/main/java/com/microsoft/lst_bench/exec/PhaseExec.java @@ -25,7 +25,7 @@ public interface PhaseExec { String getId(); - + @Nullable Integer getMinimalDurationSeconds(); List getSessions(); diff --git a/core/src/main/java/com/microsoft/lst_bench/input/Phase.java b/core/src/main/java/com/microsoft/lst_bench/input/Phase.java index 7594cc31..5938c1fc 100644 --- a/core/src/main/java/com/microsoft/lst_bench/input/Phase.java +++ b/core/src/main/java/com/microsoft/lst_bench/input/Phase.java @@ -31,7 +31,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public interface Phase { String getId(); - + @JsonProperty("minimal_duration_seconds") @Nullable Integer getMinimalDurationSeconds(); diff --git a/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java b/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java index d6c32cfc..f54211d2 100644 --- a/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java +++ b/core/src/main/java/com/microsoft/lst_bench/input/PhaseTemplate.java @@ -34,7 +34,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public interface PhaseTemplate { String getId(); - + @JsonProperty("minimal_duration_seconds") @Nullable Integer getMinimalDurationSeconds(); diff --git a/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java b/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java index 90d10672..64819362 100644 --- a/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java +++ b/core/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java @@ -179,7 +179,7 @@ void testExperimentRetry() throws Exception { benchmark.run(); } - /** + /** * This test checks whether the minimal duration of a phase is correctly set and respected during * execution. It uses a mock connection manager that does not execute any SQL. The test verifies * that the phase runs for at least the specified minimal duration.