@@ -183,6 +183,74 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
183183				},
184184			},
185185		},
186+ 		{
187+ 			name : "invalid json; return body" ,
188+ 			requests : []* extProcPb.ProcessingRequest {
189+ 				{
190+ 					Request : & extProcPb.ProcessingRequest_RequestHeaders {
191+ 						RequestHeaders : & extProcPb.HttpHeaders {
192+ 							Headers : & configPb.HeaderMap {
193+ 								Headers : []* configPb.HeaderValue {
194+ 									{
195+ 										Key :   "hi" ,
196+ 										Value : "mom" ,
197+ 									},
198+ 								},
199+ 							},
200+ 						},
201+ 					},
202+ 				},
203+ 				{
204+ 					Request : & extProcPb.ProcessingRequest_RequestBody {
205+ 						RequestBody : & extProcPb.HttpBody {Body : []byte ("no healthy upstream" ), EndOfStream : true },
206+ 					},
207+ 				},
208+ 			},
209+ 			// pod-1 will be picked because it has relatively low queue size, with the requested 
210+ 			// model being active, and has low KV cache. 
211+ 			pods : map [* backend.Pod ]* backendmetrics.MetricsState {
212+ 				fakePod (0 ): {
213+ 					WaitingQueueSize :    0 ,
214+ 					KVCacheUsagePercent : 0.2 ,
215+ 					ActiveModels : map [string ]int {
216+ 						"foo" : 1 ,
217+ 						"bar" : 1 ,
218+ 					},
219+ 					WaitingModels : map [string ]int {},
220+ 				},
221+ 				fakePod (1 ): {
222+ 					WaitingQueueSize :    0 ,
223+ 					KVCacheUsagePercent : 0.1 ,
224+ 					ActiveModels : map [string ]int {
225+ 						"foo" :            1 ,
226+ 						"sql-lora-1fdg2" : 1 ,
227+ 					},
228+ 					WaitingModels : map [string ]int {},
229+ 				},
230+ 				fakePod (2 ): {
231+ 					WaitingQueueSize :    10 ,
232+ 					KVCacheUsagePercent : 0.2 ,
233+ 					ActiveModels : map [string ]int {
234+ 						"foo" : 1 ,
235+ 						"bar" : 1 ,
236+ 					},
237+ 					WaitingModels : map [string ]int {},
238+ 				},
239+ 			},
240+ 			wantErr : false ,
241+ 			wantResponses : []* extProcPb.ProcessingResponse {
242+ 				{
243+ 					Response : & extProcPb.ProcessingResponse_ImmediateResponse {
244+ 						ImmediateResponse : & extProcPb.ImmediateResponse {
245+ 							Status : & envoyTypePb.HttpStatus {
246+ 								Code : envoyTypePb .StatusCode_BadRequest ,
247+ 							},
248+ 							Body : []byte ("inference gateway: BadRequest - Error unmarshaling request body: no healthy upstream" ),
249+ 						},
250+ 					},
251+ 				},
252+ 			},
253+ 		},
186254		{
187255			name :     "select active lora, low queue" ,
188256			requests : integrationutils .GenerateStreamedRequestSet (logger , "test2" , "sql-lora" ),
@@ -407,6 +475,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
407475							Status : & envoyTypePb.HttpStatus {
408476								Code : envoyTypePb .StatusCode_TooManyRequests ,
409477							},
478+ 							Body : []byte ("inference gateway: InferencePoolResourceExhausted - failed to find target pod: inference gateway: Internal - no pods available for the given request" ),
410479						},
411480					},
412481				},
@@ -842,6 +911,106 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
842911				},
843912			},
844913		},
914+ 		{
915+ 			name : "Response is invalid json; return body" ,
916+ 			requests : []* extProcPb.ProcessingRequest {
917+ 				{
918+ 					Request : & extProcPb.ProcessingRequest_ResponseHeaders {
919+ 						ResponseHeaders : & extProcPb.HttpHeaders {
920+ 							Headers : & configPb.HeaderMap {
921+ 								Headers : []* configPb.HeaderValue {
922+ 									{
923+ 										Key :   "content-type" ,
924+ 										Value : "application/json" ,
925+ 									},
926+ 								},
927+ 							},
928+ 						},
929+ 					},
930+ 				},
931+ 				{
932+ 					Request : & extProcPb.ProcessingRequest_ResponseBody {
933+ 						ResponseBody : & extProcPb.HttpBody {Body : []byte ("no healthy upstream" ), EndOfStream : true },
934+ 					},
935+ 				},
936+ 			},
937+ 
938+ 			// 
939+ 			// pod 0 will be picked as all other models are above threshold 
940+ 			pods : map [* backend.Pod ]* backendmetrics.MetricsState {
941+ 				fakePod (0 ): {
942+ 					WaitingQueueSize :    4 ,
943+ 					KVCacheUsagePercent : 0.2 ,
944+ 					ActiveModels : map [string ]int {
945+ 						"foo" :            1 ,
946+ 						"bar" :            1 ,
947+ 						"sql-lora-1fdg3" : 1 ,
948+ 					},
949+ 					WaitingModels : map [string ]int {},
950+ 				},
951+ 				fakePod (1 ): {
952+ 					WaitingQueueSize :    0 ,
953+ 					KVCacheUsagePercent : 0.85 ,
954+ 					ActiveModels : map [string ]int {
955+ 						"foo" :            1 ,
956+ 						"sql-lora-1fdg3" : 1 ,
957+ 					},
958+ 					WaitingModels : map [string ]int {},
959+ 				},
960+ 				fakePod (2 ): {
961+ 					WaitingQueueSize :    10 ,
962+ 					KVCacheUsagePercent : 0.9 ,
963+ 					ActiveModels : map [string ]int {
964+ 						"foo" :            1 ,
965+ 						"sql-lora-1fdg3" : 1 ,
966+ 					},
967+ 					WaitingModels : map [string ]int {},
968+ 				},
969+ 			},
970+ 			wantErr : false ,
971+ 			wantResponses : []* extProcPb.ProcessingResponse {
972+ 				{
973+ 					Response : & extProcPb.ProcessingResponse_ResponseHeaders {
974+ 						ResponseHeaders : & extProcPb.HeadersResponse {
975+ 							Response : & extProcPb.CommonResponse {
976+ 								HeaderMutation : & extProcPb.HeaderMutation {
977+ 									SetHeaders : []* configPb.HeaderValueOption {
978+ 										{
979+ 											Header : & configPb.HeaderValue {
980+ 												Key :      "x-went-into-resp-headers" ,
981+ 												RawValue : []byte ("true" ),
982+ 											},
983+ 										},
984+ 										{
985+ 											Header : & configPb.HeaderValue {
986+ 												Key :      "content-type" ,
987+ 												RawValue : []uint8 ("application/json" ),
988+ 											},
989+ 										},
990+ 									},
991+ 								},
992+ 							},
993+ 						},
994+ 					},
995+ 				},
996+ 				{
997+ 					Response : & extProcPb.ProcessingResponse_ResponseBody {
998+ 						ResponseBody : & extProcPb.BodyResponse {
999+ 							Response : & extProcPb.CommonResponse {
1000+ 								BodyMutation : & extProcPb.BodyMutation {
1001+ 									Mutation : & extProcPb.BodyMutation_StreamedResponse {
1002+ 										StreamedResponse : & extProcPb.StreamedBodyResponse {
1003+ 											Body :        []byte ("no healthy upstream" ),
1004+ 											EndOfStream : true ,
1005+ 										},
1006+ 									},
1007+ 								},
1008+ 							},
1009+ 						},
1010+ 					},
1011+ 				},
1012+ 			},
1013+ 		},
8451014		{
8461015			name : "responsebody sent over a single request, but empty body with EndOfStream in the second request(this is how envoy operates); content-type is json, buffer" ,
8471016			requests : []* extProcPb.ProcessingRequest {
@@ -1261,7 +1430,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12611430
12621431	for  _ , test  :=  range  tests  {
12631432		t .Run (test .name , func (t  * testing.T ) {
1264- 			client , cleanup  :=  setUpHermeticServer (t , test .pods ,  true )
1433+ 			client , cleanup  :=  setUpHermeticServer (t , test .pods )
12651434			t .Cleanup (cleanup )
12661435			responses , err  :=  integrationutils .StreamedRequest (t , client , test .requests , len (test .wantResponses ))
12671436
@@ -1290,14 +1459,13 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12901459	}
12911460}
12921461
1293- func  setUpHermeticServer (t  * testing.T , podAndMetrics  map [* backend.Pod ]* backendmetrics.MetricsState ,  streamed   bool ) (client  extProcPb.ExternalProcessor_ProcessClient , cleanup  func ()) {
1462+ func  setUpHermeticServer (t  * testing.T , podAndMetrics  map [* backend.Pod ]* backendmetrics.MetricsState ) (client  extProcPb.ExternalProcessor_ProcessClient , cleanup  func ()) {
12941463	// Reconfigure the TestPodMetricsClient. 
12951464	res  :=  map [types.NamespacedName ]* backendmetrics.MetricsState {}
12961465	for  pod , metrics  :=  range  podAndMetrics  {
12971466		res [pod .NamespacedName ] =  metrics 
12981467	}
12991468	serverRunner .TestPodMetricsClient .SetRes (res )
1300- 	serverRunner .UseStreaming  =  streamed 
13011469
13021470	serverCtx , stopServer  :=  context .WithCancel (context .Background ())
13031471
0 commit comments