diff --git a/README.md b/README.md index cd90be3..026bf58 100644 --- a/README.md +++ b/README.md @@ -147,14 +147,15 @@ transcript = ''' [00:04:30] Customer: But I think the pricing for additional users is a bit steep compared to other solutions we looked at. ''' -# Analyze the feedback -result = await analyze_call_feedback( - CallFeedbackInput( - transcript=transcript, - call_date=date(2024, 1, 15) - ) +# Create input +feedback_input = CallFeedbackInput( + transcript=transcript, + call_date=date(2024, 1, 15) ) +# Analyze the feedback +result = await analyze_call_feedback(feedback_input) + # Print the analysis print("\nPositive Points:") for point in result.positive_points: @@ -212,7 +213,7 @@ more flexible than changing the function parameters when running in production. ```python @workflowai.agent(deployment="production") # or simply @workflowai.agent() -async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]: +def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]: ... ``` @@ -230,7 +231,7 @@ async def analyze_call_feedback(input: CallFeedbackInput) -> Run[CallFeedbackOut ... -run = await say_hello(Input(name="John")) +run = await analyze_call_feedback(feedback_input) print(run.output) # the output, as before print(run.model) # the model used for the run print(run.cost_usd) # the cost of the run in USD @@ -241,16 +242,55 @@ print(run.duration_seconds) # the duration of the inference in seconds You can configure the agent function to stream by changing the type annotation to an AsyncIterator. +#### Streaming the output only + +Use `AsyncIterator[Output]` to get the **output** as it is generated. + ```python +from collections.abc import AsyncIterator + # Stream the output, the output is filled as it is generated @workflowai.agent() -async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]: +def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]: ... +async for chunk in analyze_call_feedback(feedback_input): + # Just get the output as it's generated + print(chunk.output) +``` + +> Note: no need to mark the agent as async here ! It is already asynchronous since it returns an AsyncIterator. +> The type checkers some times get confused since they consider that an async function that returns an AsyncIterator is +> async twice. +> For example, a function with the signature `async def foo() -> AsyncIterator[int]` may be called +> `async for c in await foo():...` in certain cases... + + +#### Streaming the run object + +Use `AsyncIterator[Run[Output]]` to get the **run** object as it is generated, which allows you, for the **last chunk**, to access the cost and duration of the run. + +```python +import workflowai +from workflowai import Run +from collections.abc import AsyncIterator + # Stream the run object, the output is filled as it is generated @workflowai.agent() -async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]: +def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]: ... + +last_chunk = None + +async for chunk in analyze_call_feedback(feedback_input): + # Show output as it's generated + print(chunk.output) + last_chunk = chunk + +if last_chunk: + # Cost and duration are only available on the last chunk + print(f"\nCost: ${last_chunk.cost_usd}") + print(f"Latency: {last_chunk.duration_seconds:.2f}s") ``` ### Images diff --git a/examples/streaming_summary.py b/examples/streaming_summary.py new file mode 100644 index 0000000..cd6b4f1 --- /dev/null +++ b/examples/streaming_summary.py @@ -0,0 +1,87 @@ +import asyncio +from collections.abc import AsyncIterator + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model, Run + + +class TranslationInput(BaseModel): + """Input for text translation.""" + + text: str = Field(description="The French text to translate.") + + +class TranslationOutput(BaseModel): + """Output containing the translated text.""" + + translation: str = Field( + default="", + description="The text translated into English.", + ) + + +@workflowai.agent(id="french-translator", model=Model.CLAUDE_3_5_SONNET_LATEST) +def translate_to_english(_: TranslationInput) -> AsyncIterator[Run[TranslationOutput]]: + """ + Translate French text into natural, fluent English. + + Guidelines: + - Maintain the original tone and style + - Ensure accurate translation of idioms and expressions + - Preserve formatting and paragraph structure + - Focus on clarity and readability in English + """ + ... + + +async def main(): + # Example French text + french_text = """ + Cher journal, + + Aujourd'hui, j'ai fait une magnifique randonnée dans les Alpes. + Le temps était parfait, avec un ciel bleu éclatant et une légère brise. + Les montagnes étaient encore couvertes de neige, créant un paysage à couper le souffle. + Les sommets majestueux se dressaient devant moi comme des géants silencieux. + + En chemin, j'ai rencontré des randonneurs sympathiques qui m'ont partagé leur pique-nique. + Nous avons échangé des histoires et des conseils sur les meilleurs sentiers de la région. + + Cette expérience restera gravée dans ma mémoire. La nature a vraiment le pouvoir de nous + ressourcer et de nous faire oublier le stress du quotidien. + + À bientôt, + Pierre + """ + + print("Starting translation...\n") + + # Keep track of the last chunk to get cost info + last_chunk = None + chunk_num = 1 + + # Stream the translation with run information + # We set use_cache='never' to prevent caching the response + # This ensures we can see the streaming effect in the example + # Otherwise, subsequent runs would return the cached result instantly, + # making it hard to observe the incremental streaming behavior + async for chunk in translate_to_english(TranslationInput(text=french_text), use_cache="never"): + print(f"--- Translation Progress (Chunk {chunk_num}) ---") + print(chunk.output.translation) + print("-" * 50) + chunk_num += 1 + last_chunk = chunk + + if last_chunk: + # Cost and duration are only available on the last streaming chunk + # since they represent the final totals for the complete run. + # We store the last chunk in last_chunk to access these values + # after streaming completes. + print(f"\nCost: ${last_chunk.cost_usd}") + print(f"Latency: {last_chunk.duration_seconds:.2f}s") + + +if __name__ == "__main__": + asyncio.run(main())