Skip to content

add streaming examples #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ transcript = '''
[00:04:30] Customer: But I think the pricing for additional users is a bit steep compared to other solutions we looked at.
'''

# Analyze the feedback
result = await analyze_call_feedback(
CallFeedbackInput(
transcript=transcript,
call_date=date(2024, 1, 15)
)
# Create input
feedback_input = CallFeedbackInput(
transcript=transcript,
call_date=date(2024, 1, 15)
)

# Analyze the feedback
result = await analyze_call_feedback(feedback_input)

# Print the analysis
print("\nPositive Points:")
for point in result.positive_points:
Expand Down Expand Up @@ -212,7 +213,7 @@ more flexible than changing the function parameters when running in production.

```python
@workflowai.agent(deployment="production") # or simply @workflowai.agent()
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
...
```

Expand All @@ -230,7 +231,7 @@ async def analyze_call_feedback(input: CallFeedbackInput) -> Run[CallFeedbackOut
...


run = await say_hello(Input(name="John"))
run = await analyze_call_feedback(feedback_input)
print(run.output) # the output, as before
print(run.model) # the model used for the run
print(run.cost_usd) # the cost of the run in USD
Expand All @@ -241,16 +242,55 @@ print(run.duration_seconds) # the duration of the inference in seconds

You can configure the agent function to stream by changing the type annotation to an AsyncIterator.

#### Streaming the output only

Use `AsyncIterator[Output]` to get the **output** as it is generated.

```python
from collections.abc import AsyncIterator

# Stream the output, the output is filled as it is generated
@workflowai.agent()
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
...

async for chunk in analyze_call_feedback(feedback_input):
# Just get the output as it's generated
print(chunk.output)
```

> Note: no need to mark the agent as async here ! It is already asynchronous since it returns an AsyncIterator.
> The type checkers some times get confused since they consider that an async function that returns an AsyncIterator is
> async twice.
> For example, a function with the signature `async def foo() -> AsyncIterator[int]` may be called
> `async for c in await foo():...` in certain cases...


#### Streaming the run object

Use `AsyncIterator[Run[Output]]` to get the **run** object as it is generated, which allows you, for the **last chunk**, to access the cost and duration of the run.

```python
import workflowai
from workflowai import Run
from collections.abc import AsyncIterator

# Stream the run object, the output is filled as it is generated
@workflowai.agent()
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
...

last_chunk = None

async for chunk in analyze_call_feedback(feedback_input):
# Show output as it's generated
print(chunk.output)
last_chunk = chunk

if last_chunk:
# Cost and duration are only available on the last chunk
print(f"\nCost: ${last_chunk.cost_usd}")
print(f"Latency: {last_chunk.duration_seconds:.2f}s")
```

### Images
Expand Down
87 changes: 87 additions & 0 deletions examples/streaming_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import asyncio
from collections.abc import AsyncIterator

from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model, Run


class TranslationInput(BaseModel):
"""Input for text translation."""

text: str = Field(description="The French text to translate.")


class TranslationOutput(BaseModel):
"""Output containing the translated text."""

translation: str = Field(
default="",
description="The text translated into English.",
)


@workflowai.agent(id="french-translator", model=Model.CLAUDE_3_5_SONNET_LATEST)
def translate_to_english(_: TranslationInput) -> AsyncIterator[Run[TranslationOutput]]:
"""
Translate French text into natural, fluent English.

Guidelines:
- Maintain the original tone and style
- Ensure accurate translation of idioms and expressions
- Preserve formatting and paragraph structure
- Focus on clarity and readability in English
"""
...


async def main():
# Example French text
french_text = """
Cher journal,

Aujourd'hui, j'ai fait une magnifique randonnée dans les Alpes.
Le temps était parfait, avec un ciel bleu éclatant et une légère brise.
Les montagnes étaient encore couvertes de neige, créant un paysage à couper le souffle.
Les sommets majestueux se dressaient devant moi comme des géants silencieux.

En chemin, j'ai rencontré des randonneurs sympathiques qui m'ont partagé leur pique-nique.
Nous avons échangé des histoires et des conseils sur les meilleurs sentiers de la région.

Cette expérience restera gravée dans ma mémoire. La nature a vraiment le pouvoir de nous
ressourcer et de nous faire oublier le stress du quotidien.

À bientôt,
Pierre
"""

print("Starting translation...\n")

# Keep track of the last chunk to get cost info
last_chunk = None
chunk_num = 1

# Stream the translation with run information
# We set use_cache='never' to prevent caching the response
# This ensures we can see the streaming effect in the example
# Otherwise, subsequent runs would return the cached result instantly,
# making it hard to observe the incremental streaming behavior
async for chunk in translate_to_english(TranslationInput(text=french_text), use_cache="never"):
print(f"--- Translation Progress (Chunk {chunk_num}) ---")
print(chunk.output.translation)
print("-" * 50)
chunk_num += 1
last_chunk = chunk

if last_chunk:
# Cost and duration are only available on the last streaming chunk
# since they represent the final totals for the complete run.
# We store the last chunk in last_chunk to access these values
# after streaming completes.
print(f"\nCost: ${last_chunk.cost_usd}")
print(f"Latency: {last_chunk.duration_seconds:.2f}s")


if __name__ == "__main__":
asyncio.run(main())