15
15
16
16
__all__ = [
17
17
"AsyncConstantStrategy" ,
18
+ "AsyncIncrementalStrategy" ,
18
19
"AsyncPoissonStrategy" ,
19
20
"ConcurrentStrategy" ,
20
21
"SchedulingStrategy" ,
25
26
]
26
27
27
28
28
- StrategyType = Literal ["synchronous" , "concurrent" , "throughput" , "constant" , "poisson" ]
29
+ StrategyType = Literal [
30
+ "synchronous" , "concurrent" , "throughput" , "constant" , "poisson" , "incremental"
31
+ ]
29
32
30
33
31
34
class SchedulingStrategy (StandardBaseModel ):
@@ -481,6 +484,98 @@ def request_times(self) -> Generator[float, None, None]:
481
484
yield init_time
482
485
483
486
487
+ class AsyncIncrementalStrategy (ThroughputStrategy ):
488
+ """
489
+ A class representing an asynchronous incremental scheduling strategy.
490
+ This strategy schedules requests asynchronously starting at a base rate
491
+ and incrementally increasing the rate by a factor over time.
492
+ If initial_burst is set, it will send an initial burst of math.floor(start_rate)
493
+ requests to reach the target rate.
494
+ It inherits from the `ThroughputStrategy` base class and
495
+ implements the `request_times` method to provide the specific
496
+ behavior for asynchronous incremental scheduling.
497
+ :param type_: The incremental StrategyType to schedule requests asynchronously.
498
+ :param start_rate: The initial rate at which to schedule requests in
499
+ requests per second. This must be a positive float.
500
+ :param increment_factor: The factor by which to increase the rate over time.
501
+ This must be a positive float greater than 0.
502
+ :param rate_limit: The factor that limits the max rate.
503
+ This must be a positive integer greater than 0.
504
+ :param initial_burst: True to send an initial burst of requests
505
+ (math.floor(self.start_rate)) to reach target rate.
506
+ False to not send an initial burst.
507
+ """
508
+
509
+ type_ : Literal ["incremental" ] = "incremental"
510
+ start_rate : float = Field (
511
+ description = (
512
+ "The initial rate at which to schedule requests asynchronously in "
513
+ "requests per second. This must be a positive float."
514
+ ),
515
+ gt = 0 ,
516
+ )
517
+ increment_factor : float = Field (
518
+ description = (
519
+ "The factor by which to increase the rate over time. "
520
+ "This must be a positive float greater than 0."
521
+ ),
522
+ gt = 0 ,
523
+ )
524
+ rate_limit : int = Field (
525
+ description = (
526
+ "The factor that limits the max rate."
527
+ "This must be a positive integer greater than 0."
528
+ ),
529
+ gt = 0 ,
530
+ )
531
+ initial_burst : bool = Field (
532
+ default = True ,
533
+ description = (
534
+ "True to send an initial burst of requests (math.floor(self.start_rate)) "
535
+ "to reach target rate. False to not send an initial burst."
536
+ ),
537
+ )
538
+
539
+ def request_times (self ) -> Generator [float , None , None ]:
540
+ """
541
+ A generator that yields timestamps for when requests should be sent.
542
+ This method schedules requests asynchronously starting at a base rate
543
+ and incrementally increasing the rate by a factor over time.
544
+ If initial_burst is set, it will send an initial burst of requests
545
+ to reach the target rate.
546
+ :return: A generator that yields timestamps for request scheduling.
547
+ """
548
+ start_time = time .time ()
549
+
550
+ # handle bursts first to get to the desired rate
551
+ if self .initial_burst :
552
+ # send an initial burst equal to the start rate
553
+ # to reach the target rate
554
+ burst_count = math .floor (self .start_rate )
555
+ for _ in range (burst_count ):
556
+ yield start_time
557
+
558
+ current_time = start_time
559
+ counter = 0
560
+
561
+ # continue with incremental rate
562
+ while True :
563
+ yield current_time
564
+ counter += 1
565
+
566
+ # decide which rate should be next
567
+ elapsed_time = current_time - start_time
568
+ next_rate = self .start_rate + (self .increment_factor * elapsed_time )
569
+
570
+ # cap at rate limit if specified
571
+ if self .rate_limit and next_rate >= self .rate_limit :
572
+ increment = 1.0 / self .rate_limit
573
+ else :
574
+ increment = 1.0 / next_rate
575
+
576
+ current_time += increment
577
+
578
+
484
579
def strategy_display_str (strategy : Union [StrategyType , SchedulingStrategy ]) -> str :
485
580
strategy_type = strategy if isinstance (strategy , str ) else strategy .type_
486
581
strategy_instance = strategy if isinstance (strategy , SchedulingStrategy ) else None
@@ -489,6 +584,12 @@ def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> s
489
584
rate = f"@{ strategy_instance .streams } " if strategy_instance else "@##" # type: ignore[attr-defined]
490
585
elif strategy_type in ("constant" , "poisson" ):
491
586
rate = f"@{ strategy_instance .rate :.2f} " if strategy_instance else "@#.##" # type: ignore[attr-defined]
587
+ elif strategy_type == "incremental" :
588
+ rate = (
589
+ f"@{ strategy_instance .start_rate :.2f} +{ strategy_instance .increment_factor :.2f} "
590
+ if strategy_instance
591
+ else "@#.##+#.##"
592
+ )
492
593
else :
493
594
rate = ""
494
595
0 commit comments