From 70fc214d599eb7003259bdb9c5f9c3878924ccb3 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 2 Jan 2025 16:30:37 -0700 Subject: [PATCH 01/13] :construction: WIP validate dynamic lora adapters Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/__init__.py | 12 +++++++++-- vllm/engine/multiprocessing/client.py | 26 +++++++++++++++++++++-- vllm/engine/multiprocessing/engine.py | 19 +++++++++++++++-- vllm/engine/protocol.py | 5 +++++ vllm/entrypoints/openai/api_server.py | 2 +- vllm/entrypoints/openai/serving_models.py | 20 +++++++++++++---- 6 files changed, 73 insertions(+), 11 deletions(-) diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py index 420f540d0b5f..55795c57d3d4 100644 --- a/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py @@ -1,4 +1,5 @@ -from dataclasses import dataclass +import uuid +from dataclasses import dataclass, field from enum import Enum from typing import List, Mapping, Optional, Union, overload @@ -120,8 +121,15 @@ class RPCUProfileRequest(Enum): STOP_PROFILE = 2 +@dataclass +class RPCLoadAdapterRequest: + lora_request: LoRARequest + # Set the default value of request_id to a new UUID + request_id: str = field(default_factory=lambda: str(uuid.uuid4())) + + RPC_REQUEST_T = Union[RPCProcessRequest, RPCAbortRequest, RPCStartupRequest, - RPCUProfileRequest] + RPCUProfileRequest, RPCLoadAdapterRequest] REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCError] diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 0a046c71e86e..85f9f8496af6 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -25,8 +25,9 @@ IPC_HEALTH_EXT, IPC_INPUT_EXT, IPC_OUTPUT_EXT, RPC_REQUEST_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, - RPCError, RPCProcessRequest, - RPCStartupRequest, RPCStartupResponse, + RPCError, RPCLoadAdapterRequest, + RPCProcessRequest, RPCStartupRequest, + RPCStartupResponse, RPCUProfileRequest) from vllm.engine.protocol import EngineClient # yapf: enable @@ -659,3 +660,24 @@ async def stop_profile(self) -> None: await self._send_one_way_rpc_request( request=RPCUProfileRequest.STOP_PROFILE, socket=self.input_socket) + + async def add_lora(self, lora_request: LoRARequest) -> None: + """Load a new LoRA adapter into the engine for future requests.""" + # Uses the same I/O as generate requests + request = RPCLoadAdapterRequest(lora_request) + + # Create output queue for this requests. + queue: asyncio.Queue[Union[None, BaseException]] = asyncio.Queue() + self.output_queues[request.request_id] = queue + + # Send the request + request_bytes = pickle.dumps(request) + await self.input_socket.send_multipart((request_bytes, ), copy=False) + + # Wait for the response + request_output = await queue.get() + self.output_queues.pop(request.request_id) + + # Raise on error, otherwise happily return None + if isinstance(request_output, BaseException): + raise request_output diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 49a90b321dac..857248d120fe 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -14,8 +14,9 @@ IPC_HEALTH_EXT, IPC_INPUT_EXT, IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, - RPCError, RPCProcessRequest, - RPCStartupRequest, RPCStartupResponse, + RPCError, RPCLoadAdapterRequest, + RPCProcessRequest, RPCStartupRequest, + RPCStartupResponse, RPCUProfileRequest) # yapf: enable from vllm.executor.gpu_executor import GPUExecutor @@ -234,6 +235,8 @@ def handle_new_input(self): self.start_profile() else: self.stop_profile() + elif isinstance(request, RPCLoadAdapterRequest): + self._handle_load_adapter_request(request) else: raise ValueError("Unknown RPCRequest Type: " f"{type(request)}") @@ -284,6 +287,18 @@ def _handle_abort_request(self, request: RPCAbortRequest): if self.log_requests: logger.info("Aborted request %s.", request.request_id) + def _handle_load_adapter_request(self, request: RPCLoadAdapterRequest): + try: + self.engine.add_lora(request.lora_request) + except BaseException as e: + # Send back an error if the adater fails to load + rpc_err = RPCError(request_id=request.request_id, + is_engine_errored=False, + exception=e) + self._send_outputs(rpc_err) + # Otherwise, echo back the request if successful + self._send_outputs([request]) + def _health_check(self): # Send unhealthy if engine has already errored if self._errored_with is not None: diff --git a/vllm/engine/protocol.py b/vllm/engine/protocol.py index a066836b9270..f05ff62c4766 100644 --- a/vllm/engine/protocol.py +++ b/vllm/engine/protocol.py @@ -270,3 +270,8 @@ async def start_profile(self) -> None: async def stop_profile(self) -> None: """Start profiling the engine""" ... + + @abstractmethod + async def add_lora(self, lora_request: LoRARequest) -> None: + """Load a new LoRA adapter into the engine for future requests.""" + ... diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 74fe378fdae4..2b563a320a3b 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -631,12 +631,12 @@ def init_app_state( logger.info("Using supplied chat template:\n%s", resolved_chat_template) state.openai_serving_models = OpenAIServingModels( + engine_client=engine_client, model_config=model_config, base_model_paths=base_model_paths, lora_modules=args.lora_modules, prompt_adapters=args.prompt_adapters, ) - # TODO: The chat template is now broken for lora adapters :( state.openai_serving_chat = OpenAIServingChat( engine_client, model_config, diff --git a/vllm/entrypoints/openai/serving_models.py b/vllm/entrypoints/openai/serving_models.py index 26966896bc27..3a374e2dd26c 100644 --- a/vllm/entrypoints/openai/serving_models.py +++ b/vllm/entrypoints/openai/serving_models.py @@ -5,6 +5,7 @@ from typing import List, Optional, Union from vllm.config import ModelConfig +from vllm.engine.protocol import EngineClient from vllm.entrypoints.openai.protocol import (ErrorResponse, LoadLoraAdapterRequest, ModelCard, ModelList, @@ -45,6 +46,7 @@ class OpenAIServingModels: def __init__( self, + engine_client: EngineClient, model_config: ModelConfig, base_model_paths: List[BaseModelPath], *, @@ -55,6 +57,7 @@ def __init__( self.base_model_paths = base_model_paths self.max_model_len = model_config.max_model_len + self.engine_client = engine_client self.lora_id_counter = AtomicCounter(0) self.lora_requests = [] @@ -136,10 +139,19 @@ async def load_lora_adapter( lora_name, lora_path = request.lora_name, request.lora_path unique_id = self.lora_id_counter.inc(1) - self.lora_requests.append( - LoRARequest(lora_name=lora_name, - lora_int_id=unique_id, - lora_path=lora_path)) + lora_request = LoRARequest(lora_name=lora_name, + lora_int_id=unique_id, + lora_path=lora_path) + + # Validate that the adapter can be loaded into the engine + try: + await self.engine_client.add_lora(lora_request) + except BaseException as e: + return create_error_response(message=str(e), + err_type="InvalidUserInput", + status_code=HTTPStatus.BAD_REQUEST) + + self.lora_requests.append(lora_request) return f"Success: LoRA adapter '{lora_name}' added successfully." async def unload_lora_adapter( From a8745c08746c86e3ae64978f3fa4430de3125430 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Fri, 3 Jan 2025 14:53:18 -0700 Subject: [PATCH 02/13] :recycle: Clean up mp engine integration Signed-off-by: Joe Runde --- ..._lora_lineage.py => test_lora_adapters.py} | 86 +++++++++++++++---- tests/entrypoints/openai/test_serving_chat.py | 8 +- .../entrypoints/openai/test_serving_models.py | 9 +- vllm/engine/multiprocessing/__init__.py | 8 +- vllm/engine/multiprocessing/client.py | 21 +++-- vllm/engine/multiprocessing/engine.py | 14 ++- vllm/entrypoints/openai/run_batch.py | 1 + vllm/entrypoints/openai/serving_models.py | 21 ++++- vllm/lora/worker_manager.py | 8 ++ 9 files changed, 140 insertions(+), 36 deletions(-) rename tests/entrypoints/openai/{test_lora_lineage.py => test_lora_adapters.py} (50%) diff --git a/tests/entrypoints/openai/test_lora_lineage.py b/tests/entrypoints/openai/test_lora_adapters.py similarity index 50% rename from tests/entrypoints/openai/test_lora_lineage.py rename to tests/entrypoints/openai/test_lora_adapters.py index ce4f85c13fff..3cbeb180e526 100644 --- a/tests/entrypoints/openai/test_lora_lineage.py +++ b/tests/entrypoints/openai/test_lora_adapters.py @@ -1,4 +1,5 @@ import json +import shutil import openai # use the official client for correctness check import pytest @@ -63,16 +64,16 @@ def server_with_lora_modules_json(zephyr_lora_files): @pytest_asyncio.fixture -async def client_for_lora_lineage(server_with_lora_modules_json): +async def client(server_with_lora_modules_json): async with server_with_lora_modules_json.get_async_client( ) as async_client: yield async_client @pytest.mark.asyncio -async def test_static_lora_lineage(client_for_lora_lineage: openai.AsyncOpenAI, +async def test_static_lora_lineage(client: openai.AsyncOpenAI, zephyr_lora_files): - models = await client_for_lora_lineage.models.list() + models = await client.models.list() models = models.data served_model = models[0] lora_models = models[1:] @@ -87,23 +88,78 @@ async def test_static_lora_lineage(client_for_lora_lineage: openai.AsyncOpenAI, @pytest.mark.asyncio -async def test_dynamic_lora_lineage( - client_for_lora_lineage: openai.AsyncOpenAI, zephyr_lora_files): - - response = await client_for_lora_lineage.post("load_lora_adapter", - cast_to=str, - body={ - "lora_name": - "zephyr-lora-3", - "lora_path": - zephyr_lora_files - }) +async def test_dynamic_lora_lineage(client: openai.AsyncOpenAI, + zephyr_lora_files): + + response = await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "zephyr-lora-3", + "lora_path": zephyr_lora_files + }) # Ensure adapter loads before querying /models assert "success" in response - models = await client_for_lora_lineage.models.list() + models = await client.models.list() models = models.data dynamic_lora_model = models[-1] assert dynamic_lora_model.root == zephyr_lora_files assert dynamic_lora_model.parent == MODEL_NAME assert dynamic_lora_model.id == "zephyr-lora-3" + + +@pytest.mark.asyncio +async def test_dynamic_lora_not_found(client: openai.AsyncOpenAI): + with pytest.raises(openai.NotFoundError): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "notfound", + "lora_path": "/not/an/adapter" + }) + + +@pytest.mark.asyncio +async def test_dynamic_lora_invalid_files(client: openai.AsyncOpenAI, + tmp_path): + invalid_files = tmp_path / "invalid_files" + invalid_files.mkdir() + (invalid_files / "adapter_config.json").write_text("this is not json") + + with pytest.raises(openai.BadRequestError): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "invalid-json", + "lora_path": str(invalid_files) + }) + + +@pytest.mark.asyncio +async def test_dynamic_lora_invalid_lora_rank(client: openai.AsyncOpenAI, + tmp_path, zephyr_lora_files): + invalid_rank = tmp_path / "invalid_rank" + + # Copy adapter from zephyr_lora_files to invalid_rank + shutil.copytree(zephyr_lora_files, invalid_rank) + + with open(invalid_rank / "adapter_config.json") as f: + adapter_config = json.load(f) + + print(adapter_config) + + # assert False + + # Change rank to invalid value + adapter_config["r"] = 1024 + with open(invalid_rank / "adapter_config.json", "w") as f: + json.dump(adapter_config, f) + + with pytest.raises(openai.BadRequestError, + match="is greater than max_lora_rank"): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "invalid-json", + "lora_path": str(invalid_rank) + }) diff --git a/tests/entrypoints/openai/test_serving_chat.py b/tests/entrypoints/openai/test_serving_chat.py index 97248f115097..46f516625d55 100644 --- a/tests/entrypoints/openai/test_serving_chat.py +++ b/tests/entrypoints/openai/test_serving_chat.py @@ -51,7 +51,7 @@ async def _async_serving_chat_init(): engine = MockEngine() model_config = await engine.get_model_config() - models = OpenAIServingModels(model_config, BASE_MODEL_PATHS) + models = OpenAIServingModels(engine, model_config, BASE_MODEL_PATHS) serving_completion = OpenAIServingChat(engine, model_config, models, @@ -72,7 +72,8 @@ def test_serving_chat_should_set_correct_max_tokens(): mock_engine.get_tokenizer.return_value = get_tokenizer(MODEL_NAME) mock_engine.errored = False - models = OpenAIServingModels(base_model_paths=BASE_MODEL_PATHS, + models = OpenAIServingModels(engine_client=mock_engine, + base_model_paths=BASE_MODEL_PATHS, model_config=MockModelConfig()) serving_chat = OpenAIServingChat(mock_engine, MockModelConfig(), @@ -115,7 +116,8 @@ def test_serving_chat_could_load_correct_generation_config(): mock_engine.errored = False # Initialize the serving chat - models = OpenAIServingModels(base_model_paths=BASE_MODEL_PATHS, + models = OpenAIServingModels(engine_client=mock_engine, + base_model_paths=BASE_MODEL_PATHS, model_config=mock_model_config) serving_chat = OpenAIServingChat(mock_engine, mock_model_config, diff --git a/tests/entrypoints/openai/test_serving_models.py b/tests/entrypoints/openai/test_serving_models.py index 96897dc730da..4ac03e04bc7e 100644 --- a/tests/entrypoints/openai/test_serving_models.py +++ b/tests/entrypoints/openai/test_serving_models.py @@ -4,6 +4,7 @@ import pytest from vllm.config import ModelConfig +from vllm.engine.protocol import EngineClient from vllm.entrypoints.openai.protocol import (ErrorResponse, LoadLoraAdapterRequest, UnloadLoraAdapterRequest) @@ -21,10 +22,12 @@ async def _async_serving_models_init() -> OpenAIServingModels: mock_model_config = MagicMock(spec=ModelConfig) + mock_engine_client = MagicMock(spec=EngineClient) # Set the max_model_len attribute to avoid missing attribute mock_model_config.max_model_len = 2048 - serving_models = OpenAIServingModels(base_model_paths=BASE_MODEL_PATHS, + serving_models = OpenAIServingModels(engine_client=mock_engine_client, + base_model_paths=BASE_MODEL_PATHS, model_config=mock_model_config, lora_modules=None, prompt_adapters=None) @@ -113,5 +116,5 @@ async def test_unload_lora_adapter_not_found(): request = UnloadLoraAdapterRequest(lora_name="nonexistent_adapter") response = await serving_models.unload_lora_adapter(request) assert isinstance(response, ErrorResponse) - assert response.type == "InvalidUserInput" - assert response.code == HTTPStatus.BAD_REQUEST + assert response.type == "NotFoundError" + assert response.code == HTTPStatus.NOT_FOUND diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py index 55795c57d3d4..7132f9840001 100644 --- a/vllm/engine/multiprocessing/__init__.py +++ b/vllm/engine/multiprocessing/__init__.py @@ -128,10 +128,16 @@ class RPCLoadAdapterRequest: request_id: str = field(default_factory=lambda: str(uuid.uuid4())) +@dataclass +class RPCAdapterLoadedResponse: + request_id: str + + RPC_REQUEST_T = Union[RPCProcessRequest, RPCAbortRequest, RPCStartupRequest, RPCUProfileRequest, RPCLoadAdapterRequest] -REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCError] +REQUEST_OUTPUTS_T = Union[List[RequestOutput], RPCAdapterLoadedResponse, + RPCError] def ENGINE_DEAD_ERROR( diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 85f9f8496af6..735a17563482 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -25,7 +25,8 @@ IPC_HEALTH_EXT, IPC_INPUT_EXT, IPC_OUTPUT_EXT, RPC_REQUEST_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, - RPCError, RPCLoadAdapterRequest, + RPCAdapterLoadedResponse, RPCError, + RPCLoadAdapterRequest, RPCProcessRequest, RPCStartupRequest, RPCStartupResponse, RPCUProfileRequest) @@ -242,16 +243,22 @@ async def run_output_handler_loop(self): if queue is not None: queue.put_nowait(exception) else: - # Put each output into the appropriate steam. - for request_output in request_outputs: - queue = self.output_queues.get( - request_output.request_id) - if queue is not None: - queue.put_nowait(request_output) + # Put each output into the appropriate queue. + if isinstance(request_outputs, RPCAdapterLoadedResponse): + self._add_output(request_outputs) + else: + for request_output in request_outputs: + self._add_output(request_output) except asyncio.CancelledError: logger.debug("Shutting down MQLLMEngineClient output handler.") + def _add_output(self, request_output: Union[RequestOutput, + RPCAdapterLoadedResponse]): + queue = self.output_queues.get(request_output.request_id) + if queue is not None: + queue.put_nowait(request_output) + async def setup(self): """Setup the client before it starts sending server requests.""" diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 857248d120fe..36f4df4b0273 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -14,7 +14,8 @@ IPC_HEALTH_EXT, IPC_INPUT_EXT, IPC_OUTPUT_EXT, REQUEST_OUTPUTS_T, VLLM_RPC_SUCCESS_STR, RPCAbortRequest, - RPCError, RPCLoadAdapterRequest, + RPCAdapterLoadedResponse, RPCError, + RPCLoadAdapterRequest, RPCProcessRequest, RPCStartupRequest, RPCStartupResponse, RPCUProfileRequest) @@ -296,8 +297,9 @@ def _handle_load_adapter_request(self, request: RPCLoadAdapterRequest): is_engine_errored=False, exception=e) self._send_outputs(rpc_err) - # Otherwise, echo back the request if successful - self._send_outputs([request]) + # Otherwise, send back the successful load message + self._send_outputs( + RPCAdapterLoadedResponse(request_id=request.request_id)) def _health_check(self): # Send unhealthy if engine has already errored @@ -311,7 +313,11 @@ def _health_check(self): self._send_unhealthy(e) def _send_outputs(self, outputs: REQUEST_OUTPUTS_T): - """Send List of RequestOutput to RPCClient.""" + """Send outputs back to the engine client. These can be: + - Exceptions + - A list of generation outputs + - A response from loading a lora adapter + """ if outputs: try: from ray.exceptions import RayTaskError diff --git a/vllm/entrypoints/openai/run_batch.py b/vllm/entrypoints/openai/run_batch.py index 822c0f5f7c21..f8f136f9d502 100644 --- a/vllm/entrypoints/openai/run_batch.py +++ b/vllm/entrypoints/openai/run_batch.py @@ -215,6 +215,7 @@ async def main(args): # Create the openai serving objects. openai_serving_models = OpenAIServingModels( + engine_client=engine, model_config=model_config, base_model_paths=base_model_paths, lora_modules=None, diff --git a/vllm/entrypoints/openai/serving_models.py b/vllm/entrypoints/openai/serving_models.py index 3a374e2dd26c..c98968de19bf 100644 --- a/vllm/entrypoints/openai/serving_models.py +++ b/vllm/entrypoints/openai/serving_models.py @@ -144,11 +144,26 @@ async def load_lora_adapter( lora_path=lora_path) # Validate that the adapter can be loaded into the engine + # This will also pre-load it for incoming requests try: await self.engine_client.add_lora(lora_request) + except ValueError as e: + # Adapter not found or lora configuration errors + if "No adapter found" in str(e): + return create_error_response(message=str(e), + err_type="NotFoundError", + status_code=HTTPStatus.NOT_FOUND) + else: + return create_error_response( + message=str(e), + err_type="BadRequestError", + status_code=HTTPStatus.BAD_REQUEST) except BaseException as e: + # Some other unexpected problem loading the adapter, e.g. malformed + # input files. + # More detailed error messages for the user would be nicer here return create_error_response(message=str(e), - err_type="InvalidUserInput", + err_type="BadRequestError", status_code=HTTPStatus.BAD_REQUEST) self.lora_requests.append(lora_request) @@ -207,8 +222,8 @@ async def _check_unload_lora_adapter_request( return create_error_response( message= f"The lora adapter '{request.lora_name}' cannot be found.", - err_type="InvalidUserInput", - status_code=HTTPStatus.BAD_REQUEST) + err_type="NotFoundError", + status_code=HTTPStatus.NOT_FOUND) return None diff --git a/vllm/lora/worker_manager.py b/vllm/lora/worker_manager.py index 10976fac2302..9567aa0e0446 100644 --- a/vllm/lora/worker_manager.py +++ b/vllm/lora/worker_manager.py @@ -115,6 +115,14 @@ def _load_adapter(self, lora_request: LoRARequest) -> LoRAModel: embedding_padding_modules=self.embedding_padding_modules, weights_mapper=hf_to_vllm_mapper) + except FileNotFoundError as e: + # FileNotFoundError should be raised if both + # - No adapter found to download from huggingface (or in + # offline mode) + # - No local adapter files found at `lora_request.lora_path` + raise ValueError( + f"Loading lora {lora_request.lora_name} failed: No adapter " + f"found for {lora_path}") from e except Exception as e: raise RuntimeError(f"Loading lora {lora_path} failed") from e if lora.rank > self.lora_config.max_lora_rank: From f6c940d0ca9382129a2b712051bc6386db74c31e Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Fri, 3 Jan 2025 14:59:55 -0700 Subject: [PATCH 03/13] :bug: Implement add_lora in old AsyncLLMEngine Signed-off-by: Joe Runde --- vllm/engine/async_llm_engine.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 66a5089074ff..da23ed19ef7b 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1257,6 +1257,10 @@ async def stop_profile(self) -> None: else: self.engine.model_executor._run_workers("stop_profile") + async def add_lora(self, lora_request: LoRARequest) -> None: + """Load a new LoRA adapter into the engine for future requests.""" + self.engine.add_lora(lora_request) + # TODO(v1): Remove this class proxy when V1 goes default. if envs.VLLM_USE_V1: From f2078457e16d3992a1af249fa06cf243ccb136f3 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Mon, 6 Jan 2025 15:30:13 -0700 Subject: [PATCH 04/13] :bug: add add_lora in v1.AsyncLLM Signed-off-by: Joe Runde --- vllm/v1/engine/async_llm.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3f097ca7f439..e6d4b3cbcc88 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -372,3 +372,7 @@ def errored(self) -> bool: @property def dead_error(self) -> BaseException: return Exception() # TODO: implement + + async def add_lora(self, lora_request: LoRARequest) -> None: + """Load a new LoRA adapter into the engine for future requests.""" + raise NotImplementedError("LoRA not yet supported in V1") From c0354c83e1e7acc8e37ca9bd01d4802a76d1e752 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Mon, 6 Jan 2025 15:52:49 -0700 Subject: [PATCH 05/13] :loud_sound: add logs on adapter load/unload Signed-off-by: Joe Runde --- vllm/entrypoints/openai/serving_models.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/vllm/entrypoints/openai/serving_models.py b/vllm/entrypoints/openai/serving_models.py index c98968de19bf..d65ada107207 100644 --- a/vllm/entrypoints/openai/serving_models.py +++ b/vllm/entrypoints/openai/serving_models.py @@ -11,10 +11,13 @@ ModelCard, ModelList, ModelPermission, UnloadLoraAdapterRequest) +from vllm.logger import init_logger from vllm.lora.request import LoRARequest from vllm.prompt_adapter.request import PromptAdapterRequest from vllm.utils import AtomicCounter +logger = init_logger(__name__) + @dataclass class BaseModelPath: @@ -167,6 +170,8 @@ async def load_lora_adapter( status_code=HTTPStatus.BAD_REQUEST) self.lora_requests.append(lora_request) + logger.info("Loaded new LoRA adapter: name '%s', path '%s'", lora_name, + lora_path) return f"Success: LoRA adapter '{lora_name}' added successfully." async def unload_lora_adapter( @@ -182,6 +187,7 @@ async def unload_lora_adapter( lora_request for lora_request in self.lora_requests if lora_request.lora_name != lora_name ] + logger.info("Removed LoRA adapter: name '%s'", lora_name) return f"Success: LoRA adapter '{lora_name}' removed successfully." async def _check_load_lora_adapter_request( From f33158ed7aea6e90ece57b35c7b9d3f2ec637b44 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Mon, 6 Jan 2025 15:54:48 -0700 Subject: [PATCH 06/13] :recycle: simplify output checks in mp client Signed-off-by: Joe Runde --- vllm/engine/multiprocessing/client.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 735a17563482..a9ab89953518 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -242,13 +242,12 @@ async def run_output_handler_loop(self): queue = self.output_queues.get(request_id) if queue is not None: queue.put_nowait(exception) + # Put each output into the appropriate queue. + elif isinstance(request_outputs, RPCAdapterLoadedResponse): + self._add_output(request_outputs) else: - # Put each output into the appropriate queue. - if isinstance(request_outputs, RPCAdapterLoadedResponse): - self._add_output(request_outputs) - else: - for request_output in request_outputs: - self._add_output(request_output) + for request_output in request_outputs: + self._add_output(request_output) except asyncio.CancelledError: logger.debug("Shutting down MQLLMEngineClient output handler.") From bae8f8f4a4beaee643e28aaff993ed779b1deb87 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 13:03:12 -0700 Subject: [PATCH 07/13] :recycle: load new adapter before evicting LRU Signed-off-by: Joe Runde --- vllm/lora/worker_manager.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/vllm/lora/worker_manager.py b/vllm/lora/worker_manager.py index 9567aa0e0446..177cc9ab0ac6 100644 --- a/vllm/lora/worker_manager.py +++ b/vllm/lora/worker_manager.py @@ -217,13 +217,17 @@ def _apply_adapters(self, lora_requests: Set[LoRARequest]) -> None: def add_adapter(self, lora_request: LoRARequest) -> bool: if lora_request.lora_int_id not in self.list_adapters(): - # Remove before we load the new lora to save memory + # Load the new adapter first to ensure it is actually valid, before + # evicting any existing adapters. + # This may cause the # of cached lora adapters to very temporarily + # exceed `--max-cpu-loras`. + lora = self._load_adapter(lora_request) + loaded = self._adapter_manager.add_adapter(lora) + # If adding this adapter took us over capacity, evict the oldest one if len(self._adapter_manager) + 1 > self._adapter_manager.capacity: assert isinstance(self._adapter_manager, LRUCacheLoRAModelManager) self._adapter_manager.remove_oldest_adapter() - lora = self._load_adapter(lora_request) - loaded = self._adapter_manager.add_adapter(lora) else: # If the lora is already loaded, just touch it to # update its position in the caches From f0e238e64110c1e6ba331eb90edf5e3821af48c9 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 13:44:56 -0700 Subject: [PATCH 08/13] :bug: fix lora id counter Signed-off-by: Joe Runde --- vllm/entrypoints/openai/serving_models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/serving_models.py b/vllm/entrypoints/openai/serving_models.py index d65ada107207..bfa95b1accdc 100644 --- a/vllm/entrypoints/openai/serving_models.py +++ b/vllm/entrypoints/openai/serving_models.py @@ -62,7 +62,6 @@ def __init__( self.max_model_len = model_config.max_model_len self.engine_client = engine_client - self.lora_id_counter = AtomicCounter(0) self.lora_requests = [] if lora_modules is not None: self.lora_requests = [ @@ -75,6 +74,7 @@ def __init__( self.base_model_paths[0].name) for i, lora in enumerate(lora_modules, start=1) ] + self.lora_id_counter = AtomicCounter(len(self.lora_requests)) self.prompt_adapter_requests = [] if prompt_adapters is not None: From 7d4f033313d14ab593face46447c871379d57b97 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 14:18:44 -0700 Subject: [PATCH 09/13] :test_tube: add lora robustness test Signed-off-by: Joe Runde --- .../entrypoints/openai/test_lora_adapters.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/entrypoints/openai/test_lora_adapters.py b/tests/entrypoints/openai/test_lora_adapters.py index 3cbeb180e526..72bc9708eb85 100644 --- a/tests/entrypoints/openai/test_lora_adapters.py +++ b/tests/entrypoints/openai/test_lora_adapters.py @@ -1,5 +1,7 @@ +import asyncio import json import shutil +from contextlib import suppress import openai # use the official client for correctness check import pytest @@ -163,3 +165,72 @@ async def test_dynamic_lora_invalid_lora_rank(client: openai.AsyncOpenAI, "lora_name": "invalid-json", "lora_path": str(invalid_rank) }) + + +@pytest.mark.asyncio +async def test_loading_invalid_adapters_does_not_break_others( + client: openai.AsyncOpenAI, tmp_path, zephyr_lora_files): + + invalid_files = tmp_path / "invalid_files" + invalid_files.mkdir() + (invalid_files / "adapter_config.json").write_text("this is not json") + + stop_good_requests_event = asyncio.Event() + + async def run_good_requests(client): + # Run chat completions requests until event set + + results = [] + + while not stop_good_requests_event.is_set(): + try: + batch = await client.completions.create( + model="zephyr-lora", + prompt=["Hello there", "Foo bar bazz buzz"], + max_tokens=5, + ) + results.append(batch) + except Exception as e: + results.append(e) + + return results + + # Create task to run good requests + good_task = asyncio.create_task(run_good_requests(client)) + + # Run a bunch of bad adapter loads + for _ in range(25): + with suppress(openai.NotFoundError): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "notfound", + "lora_path": "/not/an/adapter" + }) + for _ in range(25): + with suppress(openai.BadRequestError): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "invalid", + "lora_path": str(invalid_files) + }) + + # Ensure all the running requests with lora adapters succeeded + stop_good_requests_event.set() + results = await good_task + for r in results: + assert not isinstance(r, Exception), f"Got exception {r}" + + # Ensure we can load another adapter and run it + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": "valid", + "lora_path": zephyr_lora_files + }) + await client.completions.create( + model="valid", + prompt=["Hello there", "Foo bar bazz buzz"], + max_tokens=5, + ) From 9335d4dd2995ab6a42defdf36bb90b4c6cffa09d Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 14:24:57 -0700 Subject: [PATCH 10/13] :bug: fixup LRU eviction Signed-off-by: Joe Runde --- vllm/lora/worker_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/lora/worker_manager.py b/vllm/lora/worker_manager.py index 177cc9ab0ac6..833c7c058fad 100644 --- a/vllm/lora/worker_manager.py +++ b/vllm/lora/worker_manager.py @@ -224,7 +224,7 @@ def add_adapter(self, lora_request: LoRARequest) -> bool: lora = self._load_adapter(lora_request) loaded = self._adapter_manager.add_adapter(lora) # If adding this adapter took us over capacity, evict the oldest one - if len(self._adapter_manager) + 1 > self._adapter_manager.capacity: + if len(self._adapter_manager) > self._adapter_manager.capacity: assert isinstance(self._adapter_manager, LRUCacheLoRAModelManager) self._adapter_manager.remove_oldest_adapter() From e3072104fad7c9c5384a6fb2de983d5974751cd0 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 15:50:38 -0700 Subject: [PATCH 11/13] :test_tube: stress test dynamic loras Signed-off-by: Joe Runde --- .../entrypoints/openai/test_lora_adapters.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/entrypoints/openai/test_lora_adapters.py b/tests/entrypoints/openai/test_lora_adapters.py index 72bc9708eb85..46a064f6d9e6 100644 --- a/tests/entrypoints/openai/test_lora_adapters.py +++ b/tests/entrypoints/openai/test_lora_adapters.py @@ -167,6 +167,39 @@ async def test_dynamic_lora_invalid_lora_rank(client: openai.AsyncOpenAI, }) +@pytest.mark.asyncio +async def test_multiple_lora_adapters(client: openai.AsyncOpenAI, tmp_path, + zephyr_lora_files): + """Validate that many loras can be dynamically registered and inferenced + with concurrently""" + + # This test file configures the server with --max-cpu-loras=2 and this test + # will concurrently load 10 adapters, so it should flex the LRU cache + async def load_and_run_adapter(adapter_name: str): + await client.post("load_lora_adapter", + cast_to=str, + body={ + "lora_name": adapter_name, + "lora_path": str(zephyr_lora_files) + }) + for _ in range(3): + await client.completions.create( + model=adapter_name, + prompt=["Hello there", "Foo bar bazz buzz"], + max_tokens=5, + ) + + lora_tasks = [] + for i in range(10): + lora_tasks.append( + asyncio.create_task(load_and_run_adapter(f"adapter_{i}"))) + + results, _ = await asyncio.wait(lora_tasks) + + for r in results: + assert not isinstance(r, Exception), f"Got exception {r}" + + @pytest.mark.asyncio async def test_loading_invalid_adapters_does_not_break_others( client: openai.AsyncOpenAI, tmp_path, zephyr_lora_files): From cf95295d07b5ad0e4a5fa7698275da259c6c0524 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Wed, 8 Jan 2025 16:38:01 -0700 Subject: [PATCH 12/13] :bug: crash on invalid static lora adapters Signed-off-by: Joe Runde --- .../entrypoints/openai/test_serving_models.py | 1 + tests/entrypoints/openai/test_shutdown.py | 27 +++++--------- vllm/entrypoints/openai/api_server.py | 5 +-- vllm/entrypoints/openai/serving_models.py | 35 +++++++++++-------- 4 files changed, 33 insertions(+), 35 deletions(-) diff --git a/tests/entrypoints/openai/test_serving_models.py b/tests/entrypoints/openai/test_serving_models.py index 4ac03e04bc7e..657ea20213ec 100644 --- a/tests/entrypoints/openai/test_serving_models.py +++ b/tests/entrypoints/openai/test_serving_models.py @@ -31,6 +31,7 @@ async def _async_serving_models_init() -> OpenAIServingModels: model_config=mock_model_config, lora_modules=None, prompt_adapters=None) + await serving_models.init_static_loras() return serving_models diff --git a/tests/entrypoints/openai/test_shutdown.py b/tests/entrypoints/openai/test_shutdown.py index 6fcc92022855..090523a836e1 100644 --- a/tests/entrypoints/openai/test_shutdown.py +++ b/tests/entrypoints/openai/test_shutdown.py @@ -1,6 +1,3 @@ -import json -import os - import openai import pytest @@ -10,16 +7,7 @@ @pytest.mark.asyncio -async def test_shutdown_on_engine_failure(tmp_path): - # Use a bad adapter to crash the engine - # (This test will fail when that bug is fixed) - adapter_path = tmp_path / "bad_adapter" - os.mkdir(adapter_path) - with open(adapter_path / "adapter_model_config.json", "w") as f: - json.dump({"not": "real"}, f) - with open(adapter_path / "adapter_model.safetensors", "wb") as f: - f.write(b"this is fake") - +async def test_shutdown_on_engine_failure(): # dtype, max-len etc set so that this can run in CI args = [ "--dtype", @@ -29,9 +17,6 @@ async def test_shutdown_on_engine_failure(tmp_path): "--enforce-eager", "--max-num-seqs", "128", - "--enable-lora", - "--lora-modules", - f"bad-adapter={tmp_path / 'bad_adapter'}", ] with RemoteOpenAIServer(MODEL_NAME, args) as remote_server: @@ -39,9 +24,13 @@ async def test_shutdown_on_engine_failure(tmp_path): with pytest.raises( (openai.APIConnectionError, openai.InternalServerError)): - # This crashes the engine - await client.completions.create(model="bad-adapter", - prompt="Hello, my name is") + # Asking for lots of prompt logprobs will currently crash the + # engine. This may change in the future when that bug is fixed + prompt = "Hello " * 4000 + await client.completions.create( + model=MODEL_NAME, + prompt=prompt, + extra_body={"prompt_logprobs": 10}) # Now the server should shut down return_code = remote_server.proc.wait(timeout=8) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 2b563a320a3b..5f00f29fbf66 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -603,7 +603,7 @@ async def add_request_id(request: Request, call_next): return app -def init_app_state( +async def init_app_state( engine_client: EngineClient, model_config: ModelConfig, state: State, @@ -637,6 +637,7 @@ def init_app_state( lora_modules=args.lora_modules, prompt_adapters=args.prompt_adapters, ) + await state.openai_serving_models.init_static_loras() state.openai_serving_chat = OpenAIServingChat( engine_client, model_config, @@ -734,7 +735,7 @@ def signal_handler(*_) -> None: app = build_app(args) model_config = await engine_client.get_model_config() - init_app_state(engine_client, model_config, app.state, args) + await init_app_state(engine_client, model_config, app.state, args) shutdown_task = await serve_http( app, diff --git a/vllm/entrypoints/openai/serving_models.py b/vllm/entrypoints/openai/serving_models.py index bfa95b1accdc..a222eafadcb6 100644 --- a/vllm/entrypoints/openai/serving_models.py +++ b/vllm/entrypoints/openai/serving_models.py @@ -62,19 +62,9 @@ def __init__( self.max_model_len = model_config.max_model_len self.engine_client = engine_client - self.lora_requests = [] - if lora_modules is not None: - self.lora_requests = [ - LoRARequest(lora_name=lora.name, - lora_int_id=i, - lora_path=lora.path, - base_model_name=lora.base_model_name - if lora.base_model_name - and self.is_base_model(lora.base_model_name) else - self.base_model_paths[0].name) - for i, lora in enumerate(lora_modules, start=1) - ] - self.lora_id_counter = AtomicCounter(len(self.lora_requests)) + self.static_lora_modules = lora_modules + self.lora_requests: List[LoRARequest] = [] + self.lora_id_counter = AtomicCounter(0) self.prompt_adapter_requests = [] if prompt_adapters is not None: @@ -90,6 +80,19 @@ def __init__( prompt_adapter_local_path=prompt_adapter.local_path, prompt_adapter_num_virtual_tokens=num_virtual_tokens)) + async def init_static_loras(self): + """Loads all static LoRA modules. + Raises if any fail to load""" + if self.static_lora_modules is None: + return + for lora in self.static_lora_modules: + load_request = LoadLoraAdapterRequest(lora_path=lora.path, + lora_name=lora.name) + load_result = await self.load_lora_adapter( + request=load_request, base_model_name=lora.base_model_name) + if isinstance(load_result, ErrorResponse): + raise ValueError(load_result.message) + def is_base_model(self, model_name): return any(model.name == model_name for model in self.base_model_paths) @@ -135,7 +138,9 @@ async def show_available_models(self) -> ModelList: async def load_lora_adapter( self, - request: LoadLoraAdapterRequest) -> Union[ErrorResponse, str]: + request: LoadLoraAdapterRequest, + base_model_name: Optional[str] = None + ) -> Union[ErrorResponse, str]: error_check_ret = await self._check_load_lora_adapter_request(request) if error_check_ret is not None: return error_check_ret @@ -145,6 +150,8 @@ async def load_lora_adapter( lora_request = LoRARequest(lora_name=lora_name, lora_int_id=unique_id, lora_path=lora_path) + if base_model_name is not None and self.is_base_model(base_model_name): + lora_request.base_model_name = base_model_name # Validate that the adapter can be loaded into the engine # This will also pre-load it for incoming requests From a9bdcdf2be792f585c1bfdda5e64084578959c97 Mon Sep 17 00:00:00 2001 From: Joe Runde Date: Thu, 9 Jan 2025 12:54:03 -0700 Subject: [PATCH 13/13] :recycle: re-order lora lru cache logic Signed-off-by: Joe Runde --- vllm/lora/worker_manager.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm/lora/worker_manager.py b/vllm/lora/worker_manager.py index 833c7c058fad..eec462743fe9 100644 --- a/vllm/lora/worker_manager.py +++ b/vllm/lora/worker_manager.py @@ -219,15 +219,18 @@ def add_adapter(self, lora_request: LoRARequest) -> bool: if lora_request.lora_int_id not in self.list_adapters(): # Load the new adapter first to ensure it is actually valid, before # evicting any existing adapters. - # This may cause the # of cached lora adapters to very temporarily + # This may cause the # of loaded lora adapters to very temporarily # exceed `--max-cpu-loras`. lora = self._load_adapter(lora_request) - loaded = self._adapter_manager.add_adapter(lora) - # If adding this adapter took us over capacity, evict the oldest one - if len(self._adapter_manager) > self._adapter_manager.capacity: + + # Loading succeeded, now check if we will exceed cache capacity and + # evict if the oldest adapter if so + if len(self._adapter_manager) + 1 > self._adapter_manager.capacity: assert isinstance(self._adapter_manager, LRUCacheLoRAModelManager) self._adapter_manager.remove_oldest_adapter() + # Then add the new adapter to the cache + loaded = self._adapter_manager.add_adapter(lora) else: # If the lora is already loaded, just touch it to # update its position in the caches