diff --git a/README.md b/README.md index a9666c3c..ea570352 100644 --- a/README.md +++ b/README.md @@ -120,9 +120,10 @@ GuideLLM provides various CLI and environment options to customize evaluations, Some typical configurations for the CLI include: -- `--rate-type`: The rate to use for benchmarking. Options include `sweep`, `synchronous`, `throughput`, `constant`, and `poisson`. +- `--rate-type`: The rate to use for benchmarking. Options include `sweep`, `synchronous`, `concurrent`, `throughput`, `constant`, and `poisson`. - `--rate-type sweep`: (default) Sweep runs through the full range of the server's performance, starting with a `synchronous` rate, then `throughput`, and finally, 10 `constant` rates between the min and max request rate found. - `--rate-type synchronous`: Synchronous runs requests synchronously, one after the other. + - `--rate-type concurrent`: Concurrent runs requests concurrently in multiple threads. One request per thread. Number of threads is specified with `--rate` argument. - `--rate-type throughput`: Throughput runs requests in a throughput manner, sending requests as fast as possible. - `--rate-type constant`: Constant runs requests at a constant rate. Specify the request rate per second with the `--rate` argument. For example, `--rate 10` or multiple rates with `--rate 10 --rate 20 --rate 30`. - `--rate-type poisson`: Poisson draws from a Poisson distribution with the mean at the specified rate, adding some real-world variance to the runs. Specify the request rate per second with the `--rate` argument. For example, `--rate 10` or multiple rates with `--rate 10 --rate 20 --rate 30`. diff --git a/pyproject.toml b/pyproject.toml index 6ab2c6e9..a9969879 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,7 +127,8 @@ ignore = [ "TCH002", "PLW1514", # allow Path.open without encoding "RET505", # allow `else` blocks - "RET506" # allow `else` blocks + "RET506", # allow `else` blocks + "C901" # allow small if/else complexity ] select = [ diff --git a/src/guidellm/executor/profile_generator.py b/src/guidellm/executor/profile_generator.py index 757646cf..16a4fd63 100644 --- a/src/guidellm/executor/profile_generator.py +++ b/src/guidellm/executor/profile_generator.py @@ -17,7 +17,12 @@ ] ProfileGenerationMode = Literal[ - "sweep", "synchronous", "throughput", "constant", "poisson" + "sweep", + "synchronous", + "throughput", + "constant", + "poisson", + "concurrent", ] @@ -61,7 +66,7 @@ def __init__( logger.error(err) raise err - self._mode = mode + self._mode: ProfileGenerationMode = mode if self._mode in ("sweep", "throughput", "synchronous"): if rate is not None: @@ -135,7 +140,7 @@ def generated_count(self) -> int: return self._generated_count @property - def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]: + def profile_generation_modes(self) -> List[ProfileGenerationMode]: """ Return the list of profile modes to be run in the report. @@ -147,7 +152,8 @@ def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]: settings.num_sweep_profiles ) - if self._mode in ["throughput", "synchronous"]: + # WIP: think about moving this concurrent above + if self._mode in ["throughput", "synchronous", "concurrent"]: return [self._mode] if self._rates is None: @@ -173,13 +179,13 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil current_report, ) - if self.mode in ["constant", "poisson"]: + if self.mode in ("constant", "poisson", "concurrent"): if not self.rates: err = ValueError(f"Rates are required for {self.mode} mode") logger.error(err) raise err - profile = self.create_fixed_rate_profile( + profile: Optional[Profile] = self.create_fixed_rate_profile( self.generated_count, self.mode, self.rates, @@ -229,9 +235,11 @@ def create_fixed_rate_profile( :return: The generated profile or None if index is out of range. :rtype: Optional[Profile] """ + modes_map: Dict[str, LoadGenerationMode] = { "constant": "constant", "poisson": "poisson", + "concurrent": "consistent", } if mode not in modes_map: diff --git a/src/guidellm/main.py b/src/guidellm/main.py index 4016ecec..ae8231b4 100644 --- a/src/guidellm/main.py +++ b/src/guidellm/main.py @@ -264,7 +264,7 @@ def generate_benchmark_report( backend=backend_inst, request_generator=request_generator, mode=rate_type, - rate=rate if rate_type in ("constant", "poisson") else None, + rate=rate if rate_type in ("constant", "poisson", "concurrent") else None, max_number=( len(request_generator) if max_requests == "dataset" else max_requests ), diff --git a/src/guidellm/scheduler/base.py b/src/guidellm/scheduler/base.py index 602166b0..fe529186 100644 --- a/src/guidellm/scheduler/base.py +++ b/src/guidellm/scheduler/base.py @@ -114,12 +114,12 @@ def __init__( logger.error(err) raise err - self._generator = generator - self._worker = worker - self._mode = mode - self._rate = rate - self._max_number = max_number - self._max_duration = max_duration + self._generator: RequestGenerator = generator + self._worker: Backend = worker + self._mode: LoadGenerationMode = mode + self._rate: Optional[float] = rate + self._max_number: Optional[int] = max_number + self._max_duration: Optional[float] = max_duration self._load_generator = LoadGenerator(mode, rate) @@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_total = ( self.max_number if self.max_number - else round(self.max_duration) - if self.max_duration - else 0 + else round(self.max_duration) if self.max_duration else 0 ) # yield initial result for progress tracking @@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_completed = ( min(run_count, self.max_number) if self.max_number - else round(time.time() - start_time) - if self.max_duration - else 0 + else round(time.time() - start_time) if self.max_duration else 0 ) yield SchedulerResult( @@ -267,9 +263,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_completed=( benchmark.request_count + benchmark.error_count if self.max_number - else round(time.time() - start_time) - if self.max_duration - else 0 + else round(time.time() - start_time) if self.max_duration else 0 ), benchmark=benchmark, ) @@ -277,6 +271,8 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: async def _run_sync( self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float ) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]: + """Runs only for "synchronous" mode.""" + for index, (request, submit_at) in enumerate( zip(self.generator, self.load_generator.times()) ): @@ -298,42 +294,81 @@ async def _run_sync( async def _run_async( self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float ) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]: + """ + Notes: + if the Load Generation Mode is set to 'consistent' - timestamps should + not be generated in order to make as many requests as possible to + simulate concurrent clients interaction. + """ + tasks = [] completed = 0 - for index, (request, submit_at) in enumerate( - zip(self.generator, self.load_generator.times()) - ): - while (index + 1 - completed) >= settings.max_concurrency: - await asyncio.sleep(0.1) + def _completed(_task: asyncio.Task) -> None: + nonlocal completed + completed += 1 + _res = _task.result() - if index >= max_number or time.time() >= end_time or submit_at >= end_time: - break + if _res: + benchmark.request_completed(_res) + logger.debug("Request completed: {}", _res) - logger.debug( - "Running asynchronous request={} at submit_at={}", - request, - submit_at, - ) - - def _completed(_task: asyncio.Task) -> None: - nonlocal completed - completed += 1 - _res = _task.result() - - if _res: - benchmark.request_completed(_res) - logger.debug("Request completed: {}", _res) + if self.mode == "consistent": + if self.rate is None: + raise ValueError( + "The `rate` must be specified in order to provide " + "the concurrent execution" + ) + for index, request in enumerate(self.generator): + while (index + 1 - completed) >= settings.max_concurrency: + await asyncio.sleep(0.1) + + if index >= max_number or time.time() >= end_time: + break + + logger.debug(f"Running concurrently request={request}") + + benchmark.request_started() + + # Create multiple concurrent tasks + tasks: list[asyncio.Task] = [] + for _ in range(int(self.rate)): + task: asyncio.Task = asyncio.create_task( + self._submit_task_coroutine( # submit the call with 'Backend' + request=request, submit_at=0.0, end_time=end_time + ) + ) + task.add_done_callback(_completed) + tasks.append(task) + else: + for index, (request, submit_at) in enumerate( + zip(self.generator, self.load_generator.times()) + ): + while (index + 1 - completed) >= settings.max_concurrency: + await asyncio.sleep(0.1) + + if ( + index >= max_number + or time.time() >= end_time + or submit_at >= end_time + ): + break + + logger.debug( + "Running asynchronous request={} at submit_at={}", + request, + submit_at, + ) - benchmark.request_started() - task = asyncio.create_task( - self._submit_task_coroutine(request, submit_at, end_time) - ) - task.add_done_callback(_completed) - tasks.append(task) + benchmark.request_started() + task = asyncio.create_task( + self._submit_task_coroutine(request, submit_at, end_time) + ) + task.add_done_callback(_completed) + tasks.append(task) - # release control to the event loop for other tasks - await asyncio.sleep(0.001) + # release control to the event loop for other tasks + await asyncio.sleep(0.001) for compl_task in asyncio.as_completed(tasks): task_res = await compl_task diff --git a/src/guidellm/scheduler/load_generator.py b/src/guidellm/scheduler/load_generator.py index f629752a..0a52da23 100644 --- a/src/guidellm/scheduler/load_generator.py +++ b/src/guidellm/scheduler/load_generator.py @@ -6,7 +6,9 @@ __all__ = ["LoadGenerationMode", "LoadGenerator"] -LoadGenerationMode = Literal["synchronous", "constant", "poisson", "throughput"] +LoadGenerationMode = Literal[ + "synchronous", "constant", "poisson", "throughput", "consistent" +] class LoadGenerator: @@ -18,7 +20,7 @@ class LoadGenerator: timestamps based on the rate provided during initialization. :param mode: The mode of load generation. Valid options are "constant", - "poisson", "throughput", and "synchronous". + "poisson", "throughput", and "synchronous", "consistent" :type mode: LoadGenerationMode :param rate: The rate at which to generate timestamps. This value is interpreted differently depending on the mode. @@ -52,8 +54,8 @@ def __init__(self, mode: LoadGenerationMode, rate: Optional[float] = None): logger.error(error) raise error - self._mode = mode - self._rate = rate + self._mode: LoadGenerationMode = mode + self._rate: Optional[float] = rate logger.debug( "Initialized LoadGenerator with mode: {mode}, rate: {rate}", mode=mode, diff --git a/tests/unit/scheduler/test_load_generator.py b/tests/unit/scheduler/test_load_generator.py index 6b84ee01..198df3bd 100644 --- a/tests/unit/scheduler/test_load_generator.py +++ b/tests/unit/scheduler/test_load_generator.py @@ -14,6 +14,7 @@ def test_load_generator_mode(): "constant", "poisson", "throughput", + "consistent", } @@ -21,10 +22,11 @@ def test_load_generator_mode(): @pytest.mark.parametrize( ("mode", "rate"), [ + ("synchronous", None), ("constant", 10), ("poisson", 5), ("throughput", None), - ("synchronous", None), + ("consistent", 2), ], ) def test_load_generator_instantiation(mode, rate):