Skip to content

Commit a8145ab

Browse files
authored
Merge pull request #38 from WorkflowAI/documentation-streaming
add streaming examples
2 parents 78dcbc6 + b57fb67 commit a8145ab

File tree

2 files changed

+137
-10
lines changed

2 files changed

+137
-10
lines changed

README.md

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,15 @@ transcript = '''
147147
[00:04:30] Customer: But I think the pricing for additional users is a bit steep compared to other solutions we looked at.
148148
'''
149149

150-
# Analyze the feedback
151-
result = await analyze_call_feedback(
152-
CallFeedbackInput(
153-
transcript=transcript,
154-
call_date=date(2024, 1, 15)
155-
)
150+
# Create input
151+
feedback_input = CallFeedbackInput(
152+
transcript=transcript,
153+
call_date=date(2024, 1, 15)
156154
)
157155

156+
# Analyze the feedback
157+
result = await analyze_call_feedback(feedback_input)
158+
158159
# Print the analysis
159160
print("\nPositive Points:")
160161
for point in result.positive_points:
@@ -212,7 +213,7 @@ more flexible than changing the function parameters when running in production.
212213

213214
```python
214215
@workflowai.agent(deployment="production") # or simply @workflowai.agent()
215-
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
216+
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
216217
...
217218
```
218219

@@ -230,7 +231,7 @@ async def analyze_call_feedback(input: CallFeedbackInput) -> Run[CallFeedbackOut
230231
...
231232

232233

233-
run = await say_hello(Input(name="John"))
234+
run = await analyze_call_feedback(feedback_input)
234235
print(run.output) # the output, as before
235236
print(run.model) # the model used for the run
236237
print(run.cost_usd) # the cost of the run in USD
@@ -241,16 +242,55 @@ print(run.duration_seconds) # the duration of the inference in seconds
241242

242243
You can configure the agent function to stream by changing the type annotation to an AsyncIterator.
243244

245+
#### Streaming the output only
246+
247+
Use `AsyncIterator[Output]` to get the **output** as it is generated.
248+
244249
```python
250+
from collections.abc import AsyncIterator
251+
245252
# Stream the output, the output is filled as it is generated
246253
@workflowai.agent()
247-
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
254+
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[CallFeedbackOutput]:
248255
...
249256

257+
async for chunk in analyze_call_feedback(feedback_input):
258+
# Just get the output as it's generated
259+
print(chunk.output)
260+
```
261+
262+
> Note: no need to mark the agent as async here ! It is already asynchronous since it returns an AsyncIterator.
263+
> The type checkers some times get confused since they consider that an async function that returns an AsyncIterator is
264+
> async twice.
265+
> For example, a function with the signature `async def foo() -> AsyncIterator[int]` may be called
266+
> `async for c in await foo():...` in certain cases...
267+
268+
269+
#### Streaming the run object
270+
271+
Use `AsyncIterator[Run[Output]]` to get the **run** object as it is generated, which allows you, for the **last chunk**, to access the cost and duration of the run.
272+
273+
```python
274+
import workflowai
275+
from workflowai import Run
276+
from collections.abc import AsyncIterator
277+
250278
# Stream the run object, the output is filled as it is generated
251279
@workflowai.agent()
252-
async def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
280+
def analyze_call_feedback(input: CallFeedbackInput) -> AsyncIterator[Run[CallFeedbackOutput]]:
253281
...
282+
283+
last_chunk = None
284+
285+
async for chunk in analyze_call_feedback(feedback_input):
286+
# Show output as it's generated
287+
print(chunk.output)
288+
last_chunk = chunk
289+
290+
if last_chunk:
291+
# Cost and duration are only available on the last chunk
292+
print(f"\nCost: ${last_chunk.cost_usd}")
293+
print(f"Latency: {last_chunk.duration_seconds:.2f}s")
254294
```
255295

256296
### Images

examples/streaming_summary.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import asyncio
2+
from collections.abc import AsyncIterator
3+
4+
from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]
5+
6+
import workflowai
7+
from workflowai import Model, Run
8+
9+
10+
class TranslationInput(BaseModel):
11+
"""Input for text translation."""
12+
13+
text: str = Field(description="The French text to translate.")
14+
15+
16+
class TranslationOutput(BaseModel):
17+
"""Output containing the translated text."""
18+
19+
translation: str = Field(
20+
default="",
21+
description="The text translated into English.",
22+
)
23+
24+
25+
@workflowai.agent(id="french-translator", model=Model.CLAUDE_3_5_SONNET_LATEST)
26+
def translate_to_english(_: TranslationInput) -> AsyncIterator[Run[TranslationOutput]]:
27+
"""
28+
Translate French text into natural, fluent English.
29+
30+
Guidelines:
31+
- Maintain the original tone and style
32+
- Ensure accurate translation of idioms and expressions
33+
- Preserve formatting and paragraph structure
34+
- Focus on clarity and readability in English
35+
"""
36+
...
37+
38+
39+
async def main():
40+
# Example French text
41+
french_text = """
42+
Cher journal,
43+
44+
Aujourd'hui, j'ai fait une magnifique randonnée dans les Alpes.
45+
Le temps était parfait, avec un ciel bleu éclatant et une légère brise.
46+
Les montagnes étaient encore couvertes de neige, créant un paysage à couper le souffle.
47+
Les sommets majestueux se dressaient devant moi comme des géants silencieux.
48+
49+
En chemin, j'ai rencontré des randonneurs sympathiques qui m'ont partagé leur pique-nique.
50+
Nous avons échangé des histoires et des conseils sur les meilleurs sentiers de la région.
51+
52+
Cette expérience restera gravée dans ma mémoire. La nature a vraiment le pouvoir de nous
53+
ressourcer et de nous faire oublier le stress du quotidien.
54+
55+
À bientôt,
56+
Pierre
57+
"""
58+
59+
print("Starting translation...\n")
60+
61+
# Keep track of the last chunk to get cost info
62+
last_chunk = None
63+
chunk_num = 1
64+
65+
# Stream the translation with run information
66+
# We set use_cache='never' to prevent caching the response
67+
# This ensures we can see the streaming effect in the example
68+
# Otherwise, subsequent runs would return the cached result instantly,
69+
# making it hard to observe the incremental streaming behavior
70+
async for chunk in translate_to_english(TranslationInput(text=french_text), use_cache="never"):
71+
print(f"--- Translation Progress (Chunk {chunk_num}) ---")
72+
print(chunk.output.translation)
73+
print("-" * 50)
74+
chunk_num += 1
75+
last_chunk = chunk
76+
77+
if last_chunk:
78+
# Cost and duration are only available on the last streaming chunk
79+
# since they represent the final totals for the complete run.
80+
# We store the last chunk in last_chunk to access these values
81+
# after streaming completes.
82+
print(f"\nCost: ${last_chunk.cost_usd}")
83+
print(f"Latency: {last_chunk.duration_seconds:.2f}s")
84+
85+
86+
if __name__ == "__main__":
87+
asyncio.run(main())

0 commit comments

Comments
 (0)