16
16
17
17
__all__ = [
18
18
"AsyncConstantStrategy" ,
19
+ "AsyncIncrementalStrategy" ,
19
20
"AsyncPoissonStrategy" ,
20
21
"ConcurrentStrategy" ,
21
22
"SchedulingStrategy" ,
26
27
]
27
28
28
29
29
- StrategyType = Literal ["synchronous" , "concurrent" , "throughput" , "constant" , "poisson" ]
30
+ StrategyType = Literal [
31
+ "synchronous" , "concurrent" , "throughput" , "constant" , "poisson" , "incremental"
32
+ ]
30
33
31
34
32
35
class SchedulingStrategy (StandardBaseModel ):
@@ -479,6 +482,98 @@ def request_times(self) -> Generator[float, None, None]:
479
482
yield start_time
480
483
481
484
485
+ class AsyncIncrementalStrategy (ThroughputStrategy ):
486
+ """
487
+ A class representing an asynchronous incremental scheduling strategy.
488
+ This strategy schedules requests asynchronously starting at a base rate
489
+ and incrementally increasing the rate by a factor over time.
490
+ If initial_burst is set, it will send an initial burst of math.floor(start_rate)
491
+ requests to reach the target rate.
492
+ It inherits from the `ThroughputStrategy` base class and
493
+ implements the `request_times` method to provide the specific
494
+ behavior for asynchronous incremental scheduling.
495
+ :param type_: The incremental StrategyType to schedule requests asynchronously.
496
+ :param start_rate: The initial rate at which to schedule requests in
497
+ requests per second. This must be a positive float.
498
+ :param increment_factor: The factor by which to increase the rate over time.
499
+ This must be a positive float greater than 0.
500
+ :param rate_limit: The factor that limits the max rate.
501
+ This must be a positive integer greater than 0.
502
+ :param initial_burst: True to send an initial burst of requests
503
+ (math.floor(self.start_rate)) to reach target rate.
504
+ False to not send an initial burst.
505
+ """
506
+
507
+ type_ : Literal ["incremental" ] = "incremental"
508
+ start_rate : float = Field (
509
+ description = (
510
+ "The initial rate at which to schedule requests asynchronously in "
511
+ "requests per second. This must be a positive float."
512
+ ),
513
+ gt = 0 ,
514
+ )
515
+ increment_factor : float = Field (
516
+ description = (
517
+ "The factor by which to increase the rate over time. "
518
+ "This must be a positive float greater than 0."
519
+ ),
520
+ gt = 0 ,
521
+ )
522
+ rate_limit : int = Field (
523
+ description = (
524
+ "The factor that limits the max rate."
525
+ "This must be a positive integer greater than 0."
526
+ ),
527
+ gt = 0 ,
528
+ )
529
+ initial_burst : bool = Field (
530
+ default = True ,
531
+ description = (
532
+ "True to send an initial burst of requests (math.floor(self.start_rate)) "
533
+ "to reach target rate. False to not send an initial burst."
534
+ ),
535
+ )
536
+
537
+ def request_times (self ) -> Generator [float , None , None ]:
538
+ """
539
+ A generator that yields timestamps for when requests should be sent.
540
+ This method schedules requests asynchronously starting at a base rate
541
+ and incrementally increasing the rate by a factor over time.
542
+ If initial_burst is set, it will send an initial burst of requests
543
+ to reach the target rate.
544
+ :return: A generator that yields timestamps for request scheduling.
545
+ """
546
+ start_time = time .time ()
547
+
548
+ # handle bursts first to get to the desired rate
549
+ if self .initial_burst :
550
+ # send an initial burst equal to the start rate
551
+ # to reach the target rate
552
+ burst_count = math .floor (self .start_rate )
553
+ for _ in range (burst_count ):
554
+ yield start_time
555
+
556
+ current_time = start_time
557
+ counter = 0
558
+
559
+ # continue with incremental rate
560
+ while True :
561
+ yield current_time
562
+ counter += 1
563
+
564
+ # decide which rate should be next
565
+ elapsed_time = current_time - start_time
566
+ next_rate = self .start_rate + (self .increment_factor * elapsed_time )
567
+
568
+ # cap at rate limit if specified
569
+ if self .rate_limit and next_rate >= self .rate_limit :
570
+ increment = 1.0 / self .rate_limit
571
+ else :
572
+ increment = 1.0 / next_rate
573
+
574
+ current_time += increment
575
+
576
+
482
577
def strategy_display_str (strategy : Union [StrategyType , SchedulingStrategy ]) -> str :
483
578
strategy_type = strategy if isinstance (strategy , str ) else strategy .type_
484
579
strategy_instance = strategy if isinstance (strategy , SchedulingStrategy ) else None
@@ -487,6 +582,12 @@ def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> s
487
582
rate = f"@{ strategy_instance .streams } " if strategy_instance else "@##" # type: ignore[attr-defined]
488
583
elif strategy_type in ("constant" , "poisson" ):
489
584
rate = f"@{ strategy_instance .rate :.2f} " if strategy_instance else "@#.##" # type: ignore[attr-defined]
585
+ elif strategy_type == "incremental" :
586
+ rate = (
587
+ f"@{ strategy_instance .start_rate :.2f} +{ strategy_instance .increment_factor :.2f} "
588
+ if strategy_instance
589
+ else "@#.##+#.##"
590
+ )
490
591
else :
491
592
rate = ""
492
593
0 commit comments