From 794bc1defd4919ad2db0ff1b1c5380ff57b26e38 Mon Sep 17 00:00:00 2001 From: Alberto Perdomo Date: Mon, 25 Aug 2025 08:30:54 +0100 Subject: [PATCH] implement incremental load strategy with cap --- src/guidellm/__main__.py | 22 ++++++ src/guidellm/benchmark/__init__.py | 2 + src/guidellm/benchmark/benchmark.py | 4 + src/guidellm/benchmark/entrypoints.py | 3 + src/guidellm/benchmark/profile.py | 93 ++++++++++++++++++++++- src/guidellm/benchmark/scenario.py | 3 + src/guidellm/scheduler/__init__.py | 2 + src/guidellm/scheduler/strategy.py | 103 +++++++++++++++++++++++++- 8 files changed, 230 insertions(+), 2 deletions(-) diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 7cba6a7c..b3679914 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -161,6 +161,7 @@ def benchmark(): "For rate-type=concurrent, this is the number of concurrent requests. " "For rate-type=async,constant,poisson, this is the rate requests per second. " "For rate-type=synchronous,throughput, this must not be set." + "For rate-type=incremental, this must not be set (use --start-rate and --increment-factor instead). " ), ) @click.option( @@ -247,6 +248,21 @@ def benchmark(): type=int, help="The random seed to use for benchmarking to ensure reproducibility.", ) +@click.option( + "--start-rate", + type=float, + help="The initial rate for incremental rate type in requests per second.", +) +@click.option( + "--increment-factor", + type=float, + help="The factor by which to increase the rate over time for incremental rate type.", +) +@click.option( + "--rate-limit", + type=int, + help="The rate after which the load remains constant for incremental rate type.", +) def run( scenario, target, @@ -260,6 +276,9 @@ def run( data_sampler, rate_type, rate, + start_rate, + increment_factor, + rate_limit, max_seconds, max_requests, warmup_percent, @@ -287,6 +306,9 @@ def run( data_sampler=data_sampler, rate_type=rate_type, rate=rate, + start_rate=start_rate, + increment_factor=increment_factor, + rate_limit=rate_limit, max_seconds=max_seconds, max_requests=max_requests, warmup_percent=warmup_percent, diff --git a/src/guidellm/benchmark/__init__.py b/src/guidellm/benchmark/__init__.py index a4676c7e..d8529017 100644 --- a/src/guidellm/benchmark/__init__.py +++ b/src/guidellm/benchmark/__init__.py @@ -17,6 +17,7 @@ from .profile import ( AsyncProfile, ConcurrentProfile, + IncrementalProfile, Profile, ProfileType, SweepProfile, @@ -55,6 +56,7 @@ "GenerativeTextBenchmarkerTaskProgressState", "GenerativeTextErrorStats", "GenerativeTextResponseStats", + "IncrementalProfile", "Profile", "ProfileType", "StatusBreakdown", diff --git a/src/guidellm/benchmark/benchmark.py b/src/guidellm/benchmark/benchmark.py index 02eea02b..980ff1e8 100644 --- a/src/guidellm/benchmark/benchmark.py +++ b/src/guidellm/benchmark/benchmark.py @@ -7,6 +7,7 @@ from guidellm.benchmark.profile import ( AsyncProfile, ConcurrentProfile, + IncrementalProfile, Profile, SweepProfile, SynchronousProfile, @@ -23,6 +24,7 @@ ) from guidellm.scheduler import ( AsyncConstantStrategy, + AsyncIncrementalStrategy, AsyncPoissonStrategy, ConcurrentStrategy, GenerativeRequestsWorkerDescription, @@ -59,6 +61,7 @@ class BenchmarkArgs(StandardBaseModel): ConcurrentProfile, ThroughputProfile, SynchronousProfile, + IncrementalProfile, Profile, ] = Field( description=( @@ -79,6 +82,7 @@ class BenchmarkArgs(StandardBaseModel): SynchronousStrategy, AsyncPoissonStrategy, AsyncConstantStrategy, + AsyncIncrementalStrategy, SchedulingStrategy, ] = Field( description="The scheduling strategy used to run this benchmark. ", diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 2ef85c3e..5bf80386 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -51,6 +51,9 @@ async def benchmark_generative_text( data_sampler: Optional[Literal["random"]], rate_type: Union[StrategyType, ProfileType], rate: Optional[Union[float, list[float]]], + start_rate: Optional[float], + increment_factor: Optional[float], + rate_limit: Optional[int], max_seconds: Optional[float], max_requests: Optional[int], warmup_percent: Optional[float], diff --git a/src/guidellm/benchmark/profile.py b/src/guidellm/benchmark/profile.py index 642cb7a8..63027329 100644 --- a/src/guidellm/benchmark/profile.py +++ b/src/guidellm/benchmark/profile.py @@ -8,6 +8,7 @@ from guidellm.objects import StandardBaseModel from guidellm.scheduler import ( AsyncConstantStrategy, + AsyncIncrementalStrategy, AsyncPoissonStrategy, ConcurrentStrategy, SchedulingStrategy, @@ -19,6 +20,7 @@ __all__ = [ "AsyncProfile", "ConcurrentProfile", + "IncrementalProfile", "Profile", "ProfileType", "SweepProfile", @@ -27,7 +29,9 @@ "create_profile", ] -ProfileType = Literal["synchronous", "concurrent", "throughput", "async", "sweep"] +ProfileType = Literal[ + "synchronous", "concurrent", "throughput", "async", "sweep", "incremental" +] class Profile(StandardBaseModel): @@ -363,9 +367,82 @@ def from_standard_args( # type: ignore[override] return SweepProfile(sweep_size=int(rate), random_seed=random_seed, **kwargs) +class IncrementalProfile(ThroughputProfile): + type_: Literal["incremental"] = "incremental" + start_rate: float = Field( + description="The initial rate at which to schedule requests in requests per second.", + ) + increment_factor: float = Field( + description="The factor by which to increase the rate over time.", + ) + rate_limit: int = Field( + description="The factor after which the load remains constant for incremental rate type.", + ) + initial_burst: bool = Field( + default=True, + description=( + "True to send an initial burst of requests (math.floor(self.start_rate)) " + "to reach target rate. False to not send an initial burst." + ), + ) + + @property + def strategy_types(self) -> list[StrategyType]: + return [self.type_] + + def next_strategy(self) -> Optional[SchedulingStrategy]: + if self.completed_strategies >= 1: + return None + + return AsyncIncrementalStrategy( + start_rate=self.start_rate, + increment_factor=self.increment_factor, + rate_limit=self.rate_limit, + initial_burst=self.initial_burst, + max_concurrency=self.max_concurrency, + ) + + @staticmethod + def from_standard_args( + rate_type: Union[StrategyType, ProfileType], + rate: Optional[Union[float, Sequence[float]]], + start_rate: float, + increment_factor: float, + rate_limit: int, + **kwargs, + ) -> "IncrementalProfile": + if rate_type != "incremental": + raise ValueError("Rate type must be 'incremental' for incremental profile.") + + if rate is not None: + raise ValueError( + "rate does not apply to incremental profile, it must be set to None or not set at all. " + "Use start_rate and increment_factor instead." + ) + + if start_rate <= 0: + raise ValueError("start_rate must be a positive number.") + + if increment_factor <= 0: + raise ValueError("increment_factor must be a positive number.") + + if rate_limit <= 0: + raise ValueError("rate_limit must be a positive integer.") + + return IncrementalProfile( + start_rate=start_rate, + increment_factor=increment_factor, + rate_limit=rate_limit, + **kwargs, + ) + + def create_profile( rate_type: Union[StrategyType, ProfileType], rate: Optional[Union[float, Sequence[float]]], + start_rate: Optional[float] = None, + increment_factor: Optional[float] = None, + rate_limit: Optional[int] = None, random_seed: int = 42, **kwargs, ) -> "Profile": @@ -383,6 +460,20 @@ def create_profile( **kwargs, ) + if rate_type == "incremental": + if start_rate is None or increment_factor is None: + raise ValueError( + "start_rate and increment_factor are required for incremental profile" + ) + return IncrementalProfile.from_standard_args( + rate_type=rate_type, + rate=rate, + start_rate=start_rate, + increment_factor=increment_factor, + rate_limit=rate_limit, + **kwargs, + ) + if rate_type == "throughput": return ThroughputProfile.from_standard_args( rate_type=rate_type, diff --git a/src/guidellm/benchmark/scenario.py b/src/guidellm/benchmark/scenario.py index af43e426..fbf20fdf 100644 --- a/src/guidellm/benchmark/scenario.py +++ b/src/guidellm/benchmark/scenario.py @@ -96,6 +96,9 @@ class Config: rate: Annotated[ Optional[list[PositiveFloat]], BeforeValidator(parse_float_list) ] = None + start_rate: Optional[PositiveFloat] = None + increment_factor: Optional[PositiveFloat] = None + rate_limit: Optional[PositiveInt] = None max_seconds: Optional[PositiveFloat] = None max_requests: Optional[PositiveInt] = None warmup_percent: Annotated[Optional[float], Field(gt=0, le=1)] = None diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index d3aa0aab..3493dd61 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -7,6 +7,7 @@ from .scheduler import Scheduler from .strategy import ( AsyncConstantStrategy, + AsyncIncrementalStrategy, AsyncPoissonStrategy, ConcurrentStrategy, SchedulingStrategy, @@ -26,6 +27,7 @@ __all__ = [ "AsyncConstantStrategy", + "AsyncIncrementalStrategy", "AsyncPoissonStrategy", "ConcurrentStrategy", "GenerativeRequestsWorker", diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 74d19266..bac5292b 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -15,6 +15,7 @@ __all__ = [ "AsyncConstantStrategy", + "AsyncIncrementalStrategy", "AsyncPoissonStrategy", "ConcurrentStrategy", "SchedulingStrategy", @@ -25,7 +26,9 @@ ] -StrategyType = Literal["synchronous", "concurrent", "throughput", "constant", "poisson"] +StrategyType = Literal[ + "synchronous", "concurrent", "throughput", "constant", "poisson", "incremental" +] class SchedulingStrategy(StandardBaseModel): @@ -481,6 +484,98 @@ def request_times(self) -> Generator[float, None, None]: yield init_time +class AsyncIncrementalStrategy(ThroughputStrategy): + """ + A class representing an asynchronous incremental scheduling strategy. + This strategy schedules requests asynchronously starting at a base rate + and incrementally increasing the rate by a factor over time. + If initial_burst is set, it will send an initial burst of math.floor(start_rate) + requests to reach the target rate. + It inherits from the `ThroughputStrategy` base class and + implements the `request_times` method to provide the specific + behavior for asynchronous incremental scheduling. + :param type_: The incremental StrategyType to schedule requests asynchronously. + :param start_rate: The initial rate at which to schedule requests in + requests per second. This must be a positive float. + :param increment_factor: The factor by which to increase the rate over time. + This must be a positive float greater than 0. + :param rate_limit: The factor that limits the max rate. + This must be a positive integer greater than 0. + :param initial_burst: True to send an initial burst of requests + (math.floor(self.start_rate)) to reach target rate. + False to not send an initial burst. + """ + + type_: Literal["incremental"] = "incremental" + start_rate: float = Field( + description=( + "The initial rate at which to schedule requests asynchronously in " + "requests per second. This must be a positive float." + ), + gt=0, + ) + increment_factor: float = Field( + description=( + "The factor by which to increase the rate over time. " + "This must be a positive float greater than 0." + ), + gt=0, + ) + rate_limit: int = Field( + description=( + "The factor that limits the max rate." + "This must be a positive integer greater than 0." + ), + gt=0, + ) + initial_burst: bool = Field( + default=True, + description=( + "True to send an initial burst of requests (math.floor(self.start_rate)) " + "to reach target rate. False to not send an initial burst." + ), + ) + + def request_times(self) -> Generator[float, None, None]: + """ + A generator that yields timestamps for when requests should be sent. + This method schedules requests asynchronously starting at a base rate + and incrementally increasing the rate by a factor over time. + If initial_burst is set, it will send an initial burst of requests + to reach the target rate. + :return: A generator that yields timestamps for request scheduling. + """ + start_time = time.time() + + # handle bursts first to get to the desired rate + if self.initial_burst: + # send an initial burst equal to the start rate + # to reach the target rate + burst_count = math.floor(self.start_rate) + for _ in range(burst_count): + yield start_time + + current_time = start_time + counter = 0 + + # continue with incremental rate + while True: + yield current_time + counter += 1 + + # decide which rate should be next + elapsed_time = current_time - start_time + next_rate = self.start_rate + (self.increment_factor * elapsed_time) + + # cap at rate limit if specified + if self.rate_limit and next_rate >= self.rate_limit: + increment = 1.0 / self.rate_limit + else: + increment = 1.0 / next_rate + + current_time += increment + + def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> str: strategy_type = strategy if isinstance(strategy, str) else strategy.type_ strategy_instance = strategy if isinstance(strategy, SchedulingStrategy) else None @@ -489,6 +584,12 @@ def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> s rate = f"@{strategy_instance.streams}" if strategy_instance else "@##" # type: ignore[attr-defined] elif strategy_type in ("constant", "poisson"): rate = f"@{strategy_instance.rate:.2f}" if strategy_instance else "@#.##" # type: ignore[attr-defined] + elif strategy_type == "incremental": + rate = ( + f"@{strategy_instance.start_rate:.2f}+{strategy_instance.increment_factor:.2f}" + if strategy_instance + else "@#.##+#.##" + ) else: rate = ""