-
Hello, I've built an agent API with FastAPI. I would like the API to return results as a stream. What's the best way to implement this? My current code is provided below: from fastapi import FastAPI, HTTPException, Header, Depends,Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel,Field
from typing import List
import os
import json
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
import markdown
import time
app = FastAPI()
@dataclass
class MyMessageType:
content: str
async def verify_token(authorization: str = Header(..., alias="Authorization")):
"""
"""
if authorization != "Bearer xxx":
raise HTTPException(status_code=401, detail="Invalid token")
return authorization
async def AgentGet_keywords(msg:str):
message = MyMessageType(msg)
response = await runtime.send_message(message, AgentId("simple_agent", "default"))
return(response.content)
async def main(msg:str):
yield json.dumps({'content':'start'}) + '\n'
message = MyMessageType(msg)
response = await runtime.send_message(message, AgentId("simple_agent", "default"))
yield json.dumps({'content': response.content}) + '\n'
return
@type_subscription(topic_type="simple_agent")
class KwsAgent(RoutedAgent):
def __init__(self,name:str, model_client: ChatCompletionClient) -> None:
super().__init__(name)
self._system_messages = [SystemMessage(content="")]
self._model_client = model_client
@message_handler
async def handle_user_message(self, message: MyMessageType, ctx: MessageContext) -> MyMessageType:
# Prepare input to the chat completion model.
user_message = UserMessage(content=message.content, source="user")
response = await self._model_client.create(
self._system_messages + [user_message], cancellation_token=ctx.cancellation_token
)
# Return with the model's response.
assert isinstance(response.content, str)
return MyMessageType(content=response.content)
runtime = SingleThreadedAgentRuntime()
@app.post("/stream")
async def stream_data(request:Request,token: str = Depends(verify_token)):
"""
stream response
"""
json_data = await request.json()
modelId = json_data.get('model','')
all_msg = json_data.get('messages','')
if isinstance(all_msg,list):
latest_msg = all_msg[-1]['content']
return StreamingResponse(main(latest_msg),media_type="text/plain")
else:
raise HTTPException(status_code=500, detail='error')
@app.on_event("startup")
async def startup_event():
await KwsAgent.register(
runtime,
"simple_agent",
lambda: KwsAgent('kwsagt',
OpenAIChatCompletionClient(
model="gemini-2.0-flash",
api_key="test",
)
),
)
runtime.start()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8501) I've tried changing self._model_client.create to self._model_client.create_stream in handle_user_message, but I'm unsure how to retrieve the streaming results from the runtime in the main function. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
I am looking for the same feature. |
Beta Was this translation helpful? Give feedback.
-
Streaming can be done in many ways. In your scenario, the easiest way is to pass an Pass my response above to ChatGPT or Claude, you should get closer to what you want. |
Beta Was this translation helpful? Give feedback.
Thank you for your reply. Based on your suggestion, I created an asynchronous global queue using
asyncio.Queue
to fetch real-time messages from the agent, and it's working. I have also created an example and have opened a pull request to add it to the examples section: (Add an example using autogen-core and FastAPI to create streaming responses by ToryPan · Pull Request #6335 · microsoft/autogen). I hope this example will be helpful to others.Best regards,