From 4e20574160c07cacf33e3a768e22c6e156721953 Mon Sep 17 00:00:00 2001 From: Pooya Davoodi Date: Mon, 3 Feb 2025 02:58:25 +0000 Subject: [PATCH] [Frontend] Optionally remove memory buffer used for uploading to URLs We also retry a few times in case an exception (e.g. timeout) happens during the upload. Signed-off-by: Pooya Davoodi --- vllm/entrypoints/openai/run_batch.py | 123 +++++++++++++++++++++++---- 1 file changed, 108 insertions(+), 15 deletions(-) diff --git a/vllm/entrypoints/openai/run_batch.py b/vllm/entrypoints/openai/run_batch.py index 675d3cdcf971..81e7028ad774 100644 --- a/vllm/entrypoints/openai/run_batch.py +++ b/vllm/entrypoints/openai/run_batch.py @@ -1,6 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import tempfile from http import HTTPStatus from io import StringIO from typing import Awaitable, Callable, List, Optional @@ -51,6 +52,13 @@ def parse_args(): help="The path or url to a single output file. Currently supports " "local file paths, or web (http or https) urls. If a URL is specified," " the file should be available via HTTP PUT.") + parser.add_argument( + "--output-tmp-dir", + type=str, + default=None, + help="The directory to store the output file before uploading it " + "to the output URL.", + ) parser.add_argument("--response-role", type=nullable_str, default="assistant", @@ -134,17 +142,107 @@ async def read_file(path_or_url: str) -> str: return f.read() -async def write_file(path_or_url: str, data: str) -> None: +async def write_local_file(output_path: str, + batch_outputs: List[BatchRequestOutput]) -> None: + """ + Write the responses to a local file. + output_path: The path to write the responses to. + batch_outputs: The list of batch outputs to write. + """ + # We should make this async, but as long as run_batch runs as a + # standalone program, blocking the event loop won't effect performance. + with open(output_path, "w", encoding="utf-8") as f: + for o in batch_outputs: + print(o.model_dump_json(), file=f) + + +async def upload_data(output_url: str, data_or_file: str, + from_file: bool) -> None: + """ + Upload a local file to a URL. + output_url: The URL to upload the file to. + data_or_file: Either the data to upload or the path to the file to upload. + from_file: If True, data_or_file is the path to the file to upload. + """ + # Timeout is a common issue when uploading large files. + # We retry max_retries times before giving up. + max_retries = 5 + # Number of seconds to wait before retrying. + delay = 5 + + for attempt in range(1, max_retries + 1): + try: + # We increase the timeout to 1000 seconds to allow + # for large files (default is 300). + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout( + total=1000)) as session: + if from_file: + with open(data_or_file, "rb") as file: + async with session.put(output_url, + data=file) as response: + if response.status != 200: + raise Exception(f"Failed to upload file.\n" + f"Status: {response.status}\n" + f"Response: {response.text()}") + else: + async with session.put(output_url, + data=data_or_file) as response: + if response.status != 200: + raise Exception(f"Failed to upload data.\n" + f"Status: {response.status}\n" + f"Response: {response.text()}") + + except Exception as e: + if attempt < max_retries: + logger.error( + f"Failed to upload data (attempt {attempt}). " + f"Error message: {str(e)}.\nRetrying in {delay} seconds..." + ) + await asyncio.sleep(delay) + else: + raise Exception(f"Failed to upload data (attempt {attempt}). " + f"Error message: {str(e)}.") from e + + +async def write_file(path_or_url: str, batch_outputs: List[BatchRequestOutput], + output_tmp_dir: str) -> None: + """ + Write batch_outputs to a file or upload to a URL. + path_or_url: The path or URL to write batch_outputs to. + batch_outputs: The list of batch outputs to write. + output_tmp_dir: The directory to store the output file before uploading it + to the output URL. + """ if path_or_url.startswith("http://") or path_or_url.startswith("https://"): - async with aiohttp.ClientSession() as session, \ - session.put(path_or_url, data=data.encode("utf-8")): - pass + if output_tmp_dir is None: + logger.info("Writing outputs to memory buffer") + output_buffer = StringIO() + for o in batch_outputs: + print(o.model_dump_json(), file=output_buffer) + output_buffer.seek(0) + logger.info("Uploading outputs to %s", path_or_url) + await upload_data( + path_or_url, + output_buffer.read().strip().encode("utf-8"), + from_file=False, + ) + else: + # Write responses to a temporary file and then upload it to the URL. + with tempfile.NamedTemporaryFile( + mode="w", + encoding="utf-8", + dir=output_tmp_dir, + prefix="tmp_batch_output_", + suffix=".jsonl", + ) as f: + logger.info("Writing outputs to temporary local file %s", + f.name) + await write_local_file(f.name, batch_outputs) + logger.info("Uploading outputs to %s", path_or_url) + await upload_data(path_or_url, f.name, from_file=True) else: - # We should make this async, but as long as this is always run as a - # standalone program, blocking the event loop won't effect performance - # in this particular case. - with open(path_or_url, "w", encoding="utf-8") as f: - f.write(data) + logger.info("Writing outputs to local file %s", path_or_url) + await write_local_file(path_or_url, batch_outputs) def make_error_request_output(request: BatchRequestInput, @@ -317,12 +415,7 @@ async def main(args): with tracker.pbar(): responses = await asyncio.gather(*response_futures) - output_buffer = StringIO() - for response in responses: - print(response.model_dump_json(), file=output_buffer) - - output_buffer.seek(0) - await write_file(args.output_file, output_buffer.read().strip()) + await write_file(args.output_file, responses, args.output_tmp_dir) if __name__ == "__main__":