1
1
import copy
2
2
import json
3
3
import uuid
4
- from typing import Union
4
+ from abc import ABC , abstractmethod
5
+ from typing import Callable , Dict , Union
5
6
6
7
import structlog
7
8
from fastapi .responses import JSONResponse , StreamingResponse
8
9
from litellm import ModelResponse
9
10
from litellm .types .utils import Delta , StreamingChoices
10
- from ollama import ChatResponse
11
+ from ollama import ChatResponse , GenerateResponse
11
12
12
13
from codegate .db import models as db_models
13
14
from codegate .muxing import rulematcher
@@ -30,12 +31,13 @@ class BodyAdapter:
30
31
31
32
def _get_provider_formatted_url (self , model_route : rulematcher .ModelRoute ) -> str :
32
33
"""Get the provider formatted URL to use in base_url. Note this value comes from DB"""
34
+ base_endpoint = model_route .endpoint .endpoint .rstrip ("/" )
33
35
if model_route .endpoint .provider_type in [
34
36
db_models .ProviderType .openai ,
35
37
db_models .ProviderType .openrouter ,
36
38
]:
37
- return f"{ model_route . endpoint . endpoint } /v1"
38
- return model_route . endpoint . endpoint
39
+ return f"{ base_endpoint } /v1"
40
+ return base_endpoint
39
41
40
42
def set_destination_info (self , model_route : rulematcher .ModelRoute , data : dict ) -> dict :
41
43
"""Set the destination provider info."""
@@ -45,15 +47,91 @@ def set_destination_info(self, model_route: rulematcher.ModelRoute, data: dict)
45
47
return new_data
46
48
47
49
48
- class StreamChunkFormatter :
50
+ class OutputFormatter (ABC ):
51
+
52
+ @property
53
+ @abstractmethod
54
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
55
+ """
56
+ Return the provider specific format functions. All providers format functions should
57
+ return the chunk in OpenAI format.
58
+ """
59
+ pass
60
+
61
+ @abstractmethod
62
+ def format (
63
+ self , response : Union [StreamingResponse , JSONResponse ], dest_prov : db_models .ProviderType
64
+ ) -> Union [StreamingResponse , JSONResponse ]:
65
+ """Format the response to the client."""
66
+ pass
67
+
68
+
69
+ class StreamChunkFormatter (OutputFormatter ):
49
70
"""
50
71
Format a single chunk from a stream to OpenAI format.
51
72
We need to configure the client to expect the OpenAI format.
52
73
In Continue this means setting "provider": "openai" in the config json file.
53
74
"""
54
75
55
- def __init__ (self ):
56
- self .provider_to_func = {
76
+ @property
77
+ @abstractmethod
78
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
79
+ """
80
+ Return the provider specific format functions. All providers format functions should
81
+ return the chunk in OpenAI format.
82
+ """
83
+ pass
84
+
85
+ def _format_openai (self , chunk : str ) -> str :
86
+ """
87
+ The chunk is already in OpenAI format. To standarize remove the "data:" prefix.
88
+
89
+ This function is used by both chat and FIM formatters
90
+ """
91
+ cleaned_chunk = chunk .split ("data:" )[1 ].strip ()
92
+ return cleaned_chunk
93
+
94
+ def _format_as_openai_chunk (self , formatted_chunk : str ) -> str :
95
+ """Format the chunk as OpenAI chunk. This is the format how the clients expect the data."""
96
+ return f"data:{ formatted_chunk } \n \n "
97
+
98
+ async def _format_streaming_response (
99
+ self , response : StreamingResponse , dest_prov : db_models .ProviderType
100
+ ):
101
+ format_func = self .provider_format_funcs .get (dest_prov )
102
+ """Format the streaming response to OpenAI format."""
103
+ async for chunk in response .body_iterator :
104
+ openai_chunk = format_func (chunk )
105
+ # Sometimes for Anthropic we couldn't get content from the chunk. Skip it.
106
+ if not openai_chunk :
107
+ continue
108
+ yield self ._format_as_openai_chunk (openai_chunk )
109
+
110
+ def format (
111
+ self , response : StreamingResponse , dest_prov : db_models .ProviderType
112
+ ) -> StreamingResponse :
113
+ """Format the response to the client."""
114
+ return StreamingResponse (
115
+ self ._format_streaming_response (response , dest_prov ),
116
+ status_code = response .status_code ,
117
+ headers = response .headers ,
118
+ background = response .background ,
119
+ media_type = response .media_type ,
120
+ )
121
+
122
+
123
+ class ChatStreamChunkFormatter (StreamChunkFormatter ):
124
+ """
125
+ Format a single chunk from a stream to OpenAI format given that the request was a chat.
126
+ """
127
+
128
+ @property
129
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
130
+ """
131
+ Return the provider specific format functions. All providers format functions should
132
+ return the chunk in OpenAI format.
133
+ """
134
+ return {
57
135
db_models .ProviderType .ollama : self ._format_ollama ,
58
136
db_models .ProviderType .openai : self ._format_openai ,
59
137
db_models .ProviderType .anthropic : self ._format_antropic ,
@@ -68,7 +146,7 @@ def _format_ollama(self, chunk: str) -> str:
68
146
try :
69
147
chunk_dict = json .loads (chunk )
70
148
ollama_chunk = ChatResponse (** chunk_dict )
71
- open_ai_chunk = OLlamaToModel .normalize_chunk (ollama_chunk )
149
+ open_ai_chunk = OLlamaToModel .normalize_chat_chunk (ollama_chunk )
72
150
return open_ai_chunk .model_dump_json (exclude_none = True , exclude_unset = True )
73
151
except Exception :
74
152
return chunk
@@ -119,46 +197,54 @@ def _format_antropic(self, chunk: str) -> str:
119
197
except Exception :
120
198
return cleaned_chunk .strip ()
121
199
122
- def format (self , chunk : str , dest_prov : db_models .ProviderType ) -> ModelResponse :
123
- """Format the chunk to OpenAI format."""
124
- # Get the format function
125
- format_func = self .provider_to_func .get (dest_prov )
126
- if format_func is None :
127
- raise MuxingAdapterError (f"Provider { dest_prov } not supported." )
128
- return format_func (chunk )
129
200
201
+ class FimStreamChunkFormatter (StreamChunkFormatter ):
130
202
131
- class ResponseAdapter :
203
+ @property
204
+ def provider_format_funcs (self ) -> Dict [str , Callable ]:
205
+ """
206
+ Return the provider specific format functions. All providers format functions should
207
+ return the chunk in OpenAI format.
208
+ """
209
+ return {
210
+ db_models .ProviderType .ollama : self ._format_ollama ,
211
+ db_models .ProviderType .openai : self ._format_openai ,
212
+ # Our Lllamacpp provider emits OpenAI chunks
213
+ db_models .ProviderType .llamacpp : self ._format_openai ,
214
+ # OpenRouter is a dialect of OpenAI
215
+ db_models .ProviderType .openrouter : self ._format_openai ,
216
+ }
132
217
133
- def __init__ (self ):
134
- self .stream_formatter = StreamChunkFormatter ()
218
+ def _format_ollama (self , chunk : str ) -> str :
219
+ """Format the Ollama chunk to OpenAI format."""
220
+ try :
221
+ chunk_dict = json .loads (chunk )
222
+ ollama_chunk = GenerateResponse (** chunk_dict )
223
+ open_ai_chunk = OLlamaToModel .normalize_fim_chunk (ollama_chunk )
224
+ ai_chunk = open_ai_chunk .model_dump_json (exclude_none = True , exclude_unset = True )
225
+ return ai_chunk
226
+ except Exception :
227
+ return chunk
135
228
136
- def _format_as_openai_chunk (self , formatted_chunk : str ) -> str :
137
- """Format the chunk as OpenAI chunk. This is the format how the clients expect the data."""
138
- return f"data:{ formatted_chunk } \n \n "
139
229
140
- async def _format_streaming_response (
141
- self , response : StreamingResponse , dest_prov : db_models .ProviderType
142
- ):
143
- """Format the streaming response to OpenAI format."""
144
- async for chunk in response .body_iterator :
145
- openai_chunk = self .stream_formatter .format (chunk , dest_prov )
146
- # Sometimes for Anthropic we couldn't get content from the chunk. Skip it.
147
- if not openai_chunk :
148
- continue
149
- yield self ._format_as_openai_chunk (openai_chunk )
230
+ class ResponseAdapter :
231
+
232
+ def _get_formatter (
233
+ self , response : Union [StreamingResponse , JSONResponse ], is_fim_request : bool
234
+ ) -> OutputFormatter :
235
+ """Get the formatter based on the request type."""
236
+ if isinstance (response , StreamingResponse ):
237
+ if is_fim_request :
238
+ return FimStreamChunkFormatter ()
239
+ return ChatStreamChunkFormatter ()
240
+ raise MuxingAdapterError ("Only streaming responses are supported." )
150
241
151
242
def format_response_to_client (
152
- self , response : Union [StreamingResponse , JSONResponse ], dest_prov : db_models .ProviderType
243
+ self ,
244
+ response : Union [StreamingResponse , JSONResponse ],
245
+ dest_prov : db_models .ProviderType ,
246
+ is_fim_request : bool ,
153
247
) -> Union [StreamingResponse , JSONResponse ]:
154
248
"""Format the response to the client."""
155
- if isinstance (response , StreamingResponse ):
156
- return StreamingResponse (
157
- self ._format_streaming_response (response , dest_prov ),
158
- status_code = response .status_code ,
159
- headers = response .headers ,
160
- background = response .background ,
161
- media_type = response .media_type ,
162
- )
163
- else :
164
- raise MuxingAdapterError ("Only streaming responses are supported." )
249
+ stream_formatter = self ._get_formatter (response , is_fim_request )
250
+ return stream_formatter .format (response , dest_prov )
0 commit comments