2020 create_trace_args ,
2121 add_to_trace ,
2222 parse_non_streaming_output_data ,
23+ parse_structured_output_data ,
2324)
2425
2526logger = logging .getLogger (__name__ )
@@ -60,6 +61,8 @@ def trace_async_openai(
6061 )
6162
6263 is_azure_openai = isinstance (client , openai .AsyncAzureOpenAI )
64+
65+ # Patch create method
6366 create_func = client .chat .completions .create
6467
6568 @wraps (create_func )
@@ -84,6 +87,34 @@ async def traced_create_func(*args, **kwargs):
8487 )
8588
8689 client .chat .completions .create = traced_create_func
90+
91+ # Patch parse method if it exists
92+ if hasattr (client .chat .completions , 'parse' ):
93+ parse_func = client .chat .completions .parse
94+
95+ @wraps (parse_func )
96+ async def traced_parse_func (* args , ** kwargs ):
97+ inference_id = kwargs .pop ("inference_id" , None )
98+ stream = kwargs .get ("stream" , False )
99+
100+ if stream :
101+ return handle_async_streaming_parse (
102+ * args ,
103+ ** kwargs ,
104+ parse_func = parse_func ,
105+ inference_id = inference_id ,
106+ is_azure_openai = is_azure_openai ,
107+ )
108+ return await handle_async_non_streaming_parse (
109+ * args ,
110+ ** kwargs ,
111+ parse_func = parse_func ,
112+ inference_id = inference_id ,
113+ is_azure_openai = is_azure_openai ,
114+ )
115+
116+ client .chat .completions .parse = traced_parse_func
117+
87118 return client
88119
89120
@@ -259,6 +290,189 @@ async def handle_async_non_streaming_create(
259290 except Exception as e :
260291 logger .error (
261292 "Failed to trace the create chat completion request with Openlayer. %s" , e
293+ )
294+
295+ return response
296+
297+
298+ async def handle_async_streaming_parse (
299+ parse_func : callable ,
300+ * args ,
301+ is_azure_openai : bool = False ,
302+ inference_id : Optional [str ] = None ,
303+ ** kwargs ,
304+ ) -> AsyncIterator [Any ]:
305+ """Handles the parse method when streaming is enabled.
306+
307+ Parameters
308+ ----------
309+ parse_func : callable
310+ The parse method to handle.
311+ is_azure_openai : bool, optional
312+ Whether the client is an Azure OpenAI client, by default False
313+ inference_id : Optional[str], optional
314+ A user-generated inference id, by default None
315+
316+ Returns
317+ -------
318+ AsyncIterator[Any]
319+ A generator that yields the chunks of the completion.
320+ """
321+ chunks = await parse_func (* args , ** kwargs )
322+
323+ # Create and return a new async generator that processes chunks
324+ collected_output_data = []
325+ collected_function_call = {
326+ "name" : "" ,
327+ "arguments" : "" ,
328+ }
329+ raw_outputs = []
330+ start_time = time .time ()
331+ end_time = None
332+ first_token_time = None
333+ num_of_completion_tokens = None
334+ latency = None
335+ try :
336+ i = 0
337+ async for chunk in chunks :
338+ raw_outputs .append (chunk .model_dump ())
339+ if i == 0 :
340+ first_token_time = time .time ()
341+ if i > 0 :
342+ num_of_completion_tokens = i + 1
343+ i += 1
344+
345+ delta = chunk .choices [0 ].delta
346+
347+ if delta .content :
348+ collected_output_data .append (delta .content )
349+ elif delta .function_call :
350+ if delta .function_call .name :
351+ collected_function_call ["name" ] += delta .function_call .name
352+ if delta .function_call .arguments :
353+ collected_function_call [
354+ "arguments"
355+ ] += delta .function_call .arguments
356+ elif delta .tool_calls :
357+ if delta .tool_calls [0 ].function .name :
358+ collected_function_call ["name" ] += delta .tool_calls [0 ].function .name
359+ if delta .tool_calls [0 ].function .arguments :
360+ collected_function_call ["arguments" ] += delta .tool_calls [
361+ 0
362+ ].function .arguments
363+
364+ yield chunk
365+
366+ end_time = time .time ()
367+ latency = (end_time - start_time ) * 1000
368+ # pylint: disable=broad-except
369+ except Exception as e :
370+ logger .error ("Failed yield chunk. %s" , e )
371+ finally :
372+ # Try to add step to the trace
373+ try :
374+ collected_output_data = [
375+ message for message in collected_output_data if message is not None
376+ ]
377+ if collected_output_data :
378+ output_data = "" .join (collected_output_data )
379+ else :
380+ collected_function_call ["arguments" ] = json .loads (
381+ collected_function_call ["arguments" ]
382+ )
383+ output_data = collected_function_call
384+
385+ trace_args = create_trace_args (
386+ end_time = end_time ,
387+ inputs = {"prompt" : kwargs ["messages" ]},
388+ output = output_data ,
389+ latency = latency ,
390+ tokens = num_of_completion_tokens ,
391+ prompt_tokens = 0 ,
392+ completion_tokens = num_of_completion_tokens ,
393+ model = kwargs .get ("model" ),
394+ model_parameters = get_model_parameters (kwargs ),
395+ raw_output = raw_outputs ,
396+ id = inference_id ,
397+ metadata = {
398+ "timeToFirstToken" : (
399+ (first_token_time - start_time ) * 1000
400+ if first_token_time
401+ else None
402+ ),
403+ "method" : "parse" ,
404+ "response_format" : kwargs .get ("response_format" ),
405+ },
406+ )
407+ add_to_trace (
408+ ** trace_args ,
409+ is_azure_openai = is_azure_openai ,
410+ )
411+
412+ # pylint: disable=broad-except
413+ except Exception as e :
414+ logger .error (
415+ "Failed to trace the parse chat completion request with Openlayer. %s" ,
416+ e ,
417+ )
418+
419+
420+ async def handle_async_non_streaming_parse (
421+ parse_func : callable ,
422+ * args ,
423+ is_azure_openai : bool = False ,
424+ inference_id : Optional [str ] = None ,
425+ ** kwargs ,
426+ ) -> Any :
427+ """Handles the parse method when streaming is disabled.
428+
429+ Parameters
430+ ----------
431+ parse_func : callable
432+ The parse method to handle.
433+ is_azure_openai : bool, optional
434+ Whether the client is an Azure OpenAI client, by default False
435+ inference_id : Optional[str], optional
436+ A user-generated inference id, by default None
437+
438+ Returns
439+ -------
440+ Any
441+ The parsed completion response.
442+ """
443+ start_time = time .time ()
444+ response = await parse_func (* args , ** kwargs )
445+ end_time = time .time ()
446+
447+ # Try to add step to the trace
448+ try :
449+ output_data = parse_structured_output_data (response )
450+ trace_args = create_trace_args (
451+ end_time = end_time ,
452+ inputs = {"prompt" : kwargs ["messages" ]},
453+ output = output_data ,
454+ latency = (end_time - start_time ) * 1000 ,
455+ tokens = response .usage .total_tokens ,
456+ prompt_tokens = response .usage .prompt_tokens ,
457+ completion_tokens = response .usage .completion_tokens ,
458+ model = response .model ,
459+ model_parameters = get_model_parameters (kwargs ),
460+ raw_output = response .model_dump (),
461+ id = inference_id ,
462+ metadata = {
463+ "method" : "parse" ,
464+ "response_format" : kwargs .get ("response_format" ),
465+ },
466+ )
467+
468+ add_to_trace (
469+ is_azure_openai = is_azure_openai ,
470+ ** trace_args ,
471+ )
472+ # pylint: disable=broad-except
473+ except Exception as e :
474+ logger .error (
475+ "Failed to trace the parse chat completion request with Openlayer. %s" , e
262476 )
263477
264478 return response
0 commit comments