Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/guidellm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def benchmark():
"For rate-type=concurrent, this is the number of concurrent requests. "
"For rate-type=async,constant,poisson, this is the rate requests per second. "
"For rate-type=synchronous,throughput, this must not be set."
"For rate-type=incremental, this must not be set (use --start-rate and --increment-factor instead). "
),
)
@click.option(
Expand Down Expand Up @@ -247,6 +248,21 @@ def benchmark():
type=int,
help="The random seed to use for benchmarking to ensure reproducibility.",
)
@click.option(
"--start-rate",
type=float,
help="The initial rate for incremental rate type in requests per second.",
)
@click.option(
"--increment-factor",
type=float,
help="The factor by which to increase the rate over time for incremental rate type.",
)
@click.option(
"--rate-limit",
type=int,
help="The rate after which the load remains constant for incremental rate type.",
)
def run(
scenario,
target,
Expand All @@ -260,6 +276,9 @@ def run(
data_sampler,
rate_type,
rate,
start_rate,
increment_factor,
rate_limit,
max_seconds,
max_requests,
warmup_percent,
Expand Down Expand Up @@ -287,6 +306,9 @@ def run(
data_sampler=data_sampler,
rate_type=rate_type,
rate=rate,
start_rate=start_rate,
increment_factor=increment_factor,
rate_limit=rate_limit,
max_seconds=max_seconds,
max_requests=max_requests,
warmup_percent=warmup_percent,
Expand Down
2 changes: 2 additions & 0 deletions src/guidellm/benchmark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .profile import (
AsyncProfile,
ConcurrentProfile,
IncrementalProfile,
Profile,
ProfileType,
SweepProfile,
Expand Down Expand Up @@ -55,6 +56,7 @@
"GenerativeTextBenchmarkerTaskProgressState",
"GenerativeTextErrorStats",
"GenerativeTextResponseStats",
"IncrementalProfile",
"Profile",
"ProfileType",
"StatusBreakdown",
Expand Down
4 changes: 4 additions & 0 deletions src/guidellm/benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from guidellm.benchmark.profile import (
AsyncProfile,
ConcurrentProfile,
IncrementalProfile,
Profile,
SweepProfile,
SynchronousProfile,
Expand All @@ -23,6 +24,7 @@
)
from guidellm.scheduler import (
AsyncConstantStrategy,
AsyncIncrementalStrategy,
AsyncPoissonStrategy,
ConcurrentStrategy,
GenerativeRequestsWorkerDescription,
Expand Down Expand Up @@ -59,6 +61,7 @@ class BenchmarkArgs(StandardBaseModel):
ConcurrentProfile,
ThroughputProfile,
SynchronousProfile,
IncrementalProfile,
Profile,
] = Field(
description=(
Expand All @@ -79,6 +82,7 @@ class BenchmarkArgs(StandardBaseModel):
SynchronousStrategy,
AsyncPoissonStrategy,
AsyncConstantStrategy,
AsyncIncrementalStrategy,
SchedulingStrategy,
] = Field(
description="The scheduling strategy used to run this benchmark. ",
Expand Down
3 changes: 3 additions & 0 deletions src/guidellm/benchmark/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ async def benchmark_generative_text(
data_sampler: Optional[Literal["random"]],
rate_type: Union[StrategyType, ProfileType],
rate: Optional[Union[float, list[float]]],
start_rate: Optional[float],
increment_factor: Optional[float],
rate_limit: Optional[int],
max_seconds: Optional[float],
max_requests: Optional[int],
warmup_percent: Optional[float],
Expand Down
93 changes: 92 additions & 1 deletion src/guidellm/benchmark/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from guidellm.objects import StandardBaseModel
from guidellm.scheduler import (
AsyncConstantStrategy,
AsyncIncrementalStrategy,
AsyncPoissonStrategy,
ConcurrentStrategy,
SchedulingStrategy,
Expand All @@ -19,6 +20,7 @@
__all__ = [
"AsyncProfile",
"ConcurrentProfile",
"IncrementalProfile",
"Profile",
"ProfileType",
"SweepProfile",
Expand All @@ -27,7 +29,9 @@
"create_profile",
]

ProfileType = Literal["synchronous", "concurrent", "throughput", "async", "sweep"]
ProfileType = Literal[
"synchronous", "concurrent", "throughput", "async", "sweep", "incremental"
]


class Profile(StandardBaseModel):
Expand Down Expand Up @@ -363,9 +367,82 @@ def from_standard_args( # type: ignore[override]
return SweepProfile(sweep_size=int(rate), random_seed=random_seed, **kwargs)


class IncrementalProfile(ThroughputProfile):
type_: Literal["incremental"] = "incremental"
start_rate: float = Field(
description="The initial rate at which to schedule requests in requests per second.",
)
increment_factor: float = Field(
description="The factor by which to increase the rate over time.",
)
rate_limit: int = Field(
description="The factor after which the load remains constant for incremental rate type.",
)
initial_burst: bool = Field(
default=True,
description=(
"True to send an initial burst of requests (math.floor(self.start_rate)) "
"to reach target rate. False to not send an initial burst."
),
)

@property
def strategy_types(self) -> list[StrategyType]:
return [self.type_]

def next_strategy(self) -> Optional[SchedulingStrategy]:
if self.completed_strategies >= 1:
return None

return AsyncIncrementalStrategy(
start_rate=self.start_rate,
increment_factor=self.increment_factor,
rate_limit=self.rate_limit,
initial_burst=self.initial_burst,
max_concurrency=self.max_concurrency,
)

@staticmethod
def from_standard_args(
rate_type: Union[StrategyType, ProfileType],
rate: Optional[Union[float, Sequence[float]]],
start_rate: float,
increment_factor: float,
rate_limit: int,
**kwargs,
) -> "IncrementalProfile":
if rate_type != "incremental":
raise ValueError("Rate type must be 'incremental' for incremental profile.")

if rate is not None:
raise ValueError(
"rate does not apply to incremental profile, it must be set to None or not set at all. "
"Use start_rate and increment_factor instead."
)

if start_rate <= 0:
raise ValueError("start_rate must be a positive number.")

if increment_factor <= 0:
raise ValueError("increment_factor must be a positive number.")

if rate_limit <= 0:
raise ValueError("rate_limit must be a positive integer.")

return IncrementalProfile(
start_rate=start_rate,
increment_factor=increment_factor,
rate_limit=rate_limit,
**kwargs,
)


def create_profile(
rate_type: Union[StrategyType, ProfileType],
rate: Optional[Union[float, Sequence[float]]],
start_rate: Optional[float] = None,
increment_factor: Optional[float] = None,
rate_limit: Optional[int] = None,
random_seed: int = 42,
**kwargs,
) -> "Profile":
Expand All @@ -383,6 +460,20 @@ def create_profile(
**kwargs,
)

if rate_type == "incremental":
if start_rate is None or increment_factor is None:
raise ValueError(
"start_rate and increment_factor are required for incremental profile"
)
return IncrementalProfile.from_standard_args(
rate_type=rate_type,
rate=rate,
start_rate=start_rate,
increment_factor=increment_factor,
rate_limit=rate_limit,
**kwargs,
)

if rate_type == "throughput":
return ThroughputProfile.from_standard_args(
rate_type=rate_type,
Expand Down
3 changes: 3 additions & 0 deletions src/guidellm/benchmark/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class Config:
rate: Annotated[
Optional[list[PositiveFloat]], BeforeValidator(parse_float_list)
] = None
start_rate: Optional[PositiveFloat] = None
increment_factor: Optional[PositiveFloat] = None
rate_limit: Optional[PositiveInt] = None
max_seconds: Optional[PositiveFloat] = None
max_requests: Optional[PositiveInt] = None
warmup_percent: Annotated[Optional[float], Field(gt=0, le=1)] = None
Expand Down
2 changes: 2 additions & 0 deletions src/guidellm/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .scheduler import Scheduler
from .strategy import (
AsyncConstantStrategy,
AsyncIncrementalStrategy,
AsyncPoissonStrategy,
ConcurrentStrategy,
SchedulingStrategy,
Expand All @@ -26,6 +27,7 @@

__all__ = [
"AsyncConstantStrategy",
"AsyncIncrementalStrategy",
"AsyncPoissonStrategy",
"ConcurrentStrategy",
"GenerativeRequestsWorker",
Expand Down
103 changes: 102 additions & 1 deletion src/guidellm/scheduler/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

__all__ = [
"AsyncConstantStrategy",
"AsyncIncrementalStrategy",
"AsyncPoissonStrategy",
"ConcurrentStrategy",
"SchedulingStrategy",
Expand All @@ -25,7 +26,9 @@
]


StrategyType = Literal["synchronous", "concurrent", "throughput", "constant", "poisson"]
StrategyType = Literal[
"synchronous", "concurrent", "throughput", "constant", "poisson", "incremental"
]


class SchedulingStrategy(StandardBaseModel):
Expand Down Expand Up @@ -481,6 +484,98 @@ def request_times(self) -> Generator[float, None, None]:
yield init_time


class AsyncIncrementalStrategy(ThroughputStrategy):
"""
A class representing an asynchronous incremental scheduling strategy.
This strategy schedules requests asynchronously starting at a base rate
and incrementally increasing the rate by a factor over time.
If initial_burst is set, it will send an initial burst of math.floor(start_rate)
requests to reach the target rate.
It inherits from the `ThroughputStrategy` base class and
implements the `request_times` method to provide the specific
behavior for asynchronous incremental scheduling.
:param type_: The incremental StrategyType to schedule requests asynchronously.
:param start_rate: The initial rate at which to schedule requests in
requests per second. This must be a positive float.
:param increment_factor: The factor by which to increase the rate over time.
This must be a positive float greater than 0.
:param rate_limit: The factor that limits the max rate.
This must be a positive integer greater than 0.
:param initial_burst: True to send an initial burst of requests
(math.floor(self.start_rate)) to reach target rate.
False to not send an initial burst.
"""

type_: Literal["incremental"] = "incremental"
start_rate: float = Field(
description=(
"The initial rate at which to schedule requests asynchronously in "
"requests per second. This must be a positive float."
),
gt=0,
)
increment_factor: float = Field(
description=(
"The factor by which to increase the rate over time. "
"This must be a positive float greater than 0."
),
gt=0,
)
rate_limit: int = Field(
description=(
"The factor that limits the max rate."
"This must be a positive integer greater than 0."
),
gt=0,
)
initial_burst: bool = Field(
default=True,
description=(
"True to send an initial burst of requests (math.floor(self.start_rate)) "
"to reach target rate. False to not send an initial burst."
),
)

def request_times(self) -> Generator[float, None, None]:
"""
A generator that yields timestamps for when requests should be sent.
This method schedules requests asynchronously starting at a base rate
and incrementally increasing the rate by a factor over time.
If initial_burst is set, it will send an initial burst of requests
to reach the target rate.
:return: A generator that yields timestamps for request scheduling.
"""
start_time = time.time()

# handle bursts first to get to the desired rate
if self.initial_burst:
# send an initial burst equal to the start rate
# to reach the target rate
burst_count = math.floor(self.start_rate)
for _ in range(burst_count):
yield start_time

current_time = start_time
counter = 0

# continue with incremental rate
while True:
yield current_time
counter += 1

# decide which rate should be next
elapsed_time = current_time - start_time
next_rate = self.start_rate + (self.increment_factor * elapsed_time)

# cap at rate limit if specified
if self.rate_limit and next_rate >= self.rate_limit:
increment = 1.0 / self.rate_limit
else:
increment = 1.0 / next_rate

current_time += increment


def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> str:
strategy_type = strategy if isinstance(strategy, str) else strategy.type_
strategy_instance = strategy if isinstance(strategy, SchedulingStrategy) else None
Expand All @@ -489,6 +584,12 @@ def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> s
rate = f"@{strategy_instance.streams}" if strategy_instance else "@##" # type: ignore[attr-defined]
elif strategy_type in ("constant", "poisson"):
rate = f"@{strategy_instance.rate:.2f}" if strategy_instance else "@#.##" # type: ignore[attr-defined]
elif strategy_type == "incremental":
rate = (
f"@{strategy_instance.start_rate:.2f}+{strategy_instance.increment_factor:.2f}"
if strategy_instance
else "@#.##+#.##"
)
else:
rate = ""

Expand Down