Skip to content
22 changes: 22 additions & 0 deletions examples/01_basic_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
1. Basic agent creation with input/output models
2. Field descriptions and examples
3. Cost and latency tracking
4. How to fetch and analyze completions after a run
"""

import asyncio
from typing import Any

from pydantic import BaseModel, Field

Expand All @@ -17,6 +19,7 @@

class CityInput(BaseModel):
"""Input model for the city-to-capital agent."""

city: str = Field(
description="The name of the city for which to find the country's capital",
examples=["Paris", "New York", "Tokyo"],
Expand All @@ -25,6 +28,7 @@ class CityInput(BaseModel):

class CapitalOutput(BaseModel):
"""Output model containing information about the capital city."""

country: str = Field(
description="The country where the input city is located",
examples=["France", "United States", "Japan"],
Expand Down Expand Up @@ -57,6 +61,21 @@ async def get_capital_info(city_input: CityInput) -> Run[CapitalOutput]:
...


async def display_completions(run: Run[Any]) -> None:
"""Helper function to display completions for a run."""
try:
completions = await run.fetch_completions()

for completion in completions:
print("\n--- Completion Details ---")

# Use model_dump_json for clean serialization
completion_json = completion.model_dump_json(indent=2)
print(completion_json)
except (ValueError, workflowai.WorkflowAIError) as e:
print(f"Error: {e}")


async def main():
# Example 1: Basic usage with Paris
print("\nExample 1: Basic usage with Paris")
Expand All @@ -70,6 +89,9 @@ async def main():
run = await get_capital_info.run(CityInput(city="Tokyo"))
print(run)

# Fetch and display completions for the Tokyo example
await display_completions(run)


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion examples/02_agent_with_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class HistoricalEventOutput(BaseModel):

@workflowai.agent(
id="historical-event-analyzer",
model=Model.GEMINI_1_5_FLASH_LATEST,
model=Model.GEMINI_2_0_FLASH_LATEST,
tools=[get_current_date, calculate_days_between],
)
async def analyze_historical_event(event_input: HistoricalEventInput) -> HistoricalEventOutput:
Expand Down
6 changes: 3 additions & 3 deletions examples/03_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run
from workflowai import Model

# Import CacheUsage type
CacheUsage = Literal["auto", "always", "never"]
Expand Down Expand Up @@ -54,7 +54,7 @@ class SOAPNote(BaseModel):
id="soap-extractor",
model=Model.LLAMA_3_3_70B,
)
async def extract_soap_notes(soap_input: SOAPInput) -> Run[SOAPNote]:
async def extract_soap_notes(soap_input: SOAPInput) -> SOAPNote:
"""
Extract SOAP notes from a medical consultation transcript.

Expand Down Expand Up @@ -91,7 +91,7 @@ async def demonstrate_caching(transcript: str):
for cache_option in cache_options:
start_time = time.time()

run = await extract_soap_notes(
run = await extract_soap_notes.run(
SOAPInput(transcript=transcript),
use_cache=cache_option,
)
Expand Down
14 changes: 7 additions & 7 deletions examples/04_audio_classifier_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model, Run
from workflowai.fields import File
from workflowai import Model
from workflowai.fields import Audio


class AudioInput(BaseModel):
"""Input containing the audio file to analyze."""
audio: File = Field(
audio: Audio = Field(
description="The audio recording to analyze for spam/robocall detection",
)

Expand Down Expand Up @@ -67,7 +67,7 @@ class AudioClassification(BaseModel):
id="audio-spam-detector",
model=Model.GEMINI_1_5_FLASH_LATEST,
)
async def classify_audio(audio_input: AudioInput) -> Run[AudioClassification]:
async def classify_audio(audio_input: AudioInput) -> AudioClassification:
"""
Analyze the audio recording to determine if it's a spam/robocall.

Expand Down Expand Up @@ -108,18 +108,18 @@ async def main():
with open(audio_path, "rb") as f: # noqa: ASYNC230
audio_data = f.read()

audio = File(
audio = Audio(
content_type="audio/mp3",
data=base64.b64encode(audio_data).decode(),
)

# Example 2: Using a URL instead of base64 (commented out)
# audio = File(
# audio = Audio(
# url="https://example.com/audio/call.mp3"
# )

# Classify the audio
run = await classify_audio(AudioInput(audio=audio))
run = await classify_audio.run(AudioInput(audio=audio))

# Print results including cost and latency information
print(run)
Expand Down
6 changes: 3 additions & 3 deletions examples/05_browser_text_uptime_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model, Run
from workflowai import Model


class UptimeInput(BaseModel):
Expand Down Expand Up @@ -40,7 +40,7 @@ class UptimeOutput(BaseModel):
id="uptime-checker",
model=Model.GPT_4O_MINI_LATEST,
)
async def check_uptime(uptime_input: UptimeInput, use_cache: str = "never") -> Run[UptimeOutput]:
async def check_uptime(uptime_input: UptimeInput, use_cache: str = "never") -> UptimeOutput:
"""
Fetch and analyze uptime data from an API status page.
Use @browser-text to get the page content.
Expand Down Expand Up @@ -68,7 +68,7 @@ async def main():
print("-" * 50)

# Get uptime data with caching disabled
run = await check_uptime(uptime_input, use_cache="never")
run = await check_uptime.run(uptime_input, use_cache="never")

# Print the run
print(run)
Expand Down
7 changes: 3 additions & 4 deletions examples/06_streaming_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
"""

import asyncio
from collections.abc import AsyncIterator

from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model, Run
from workflowai import Model


class TranslationInput(BaseModel):
Expand All @@ -30,7 +29,7 @@ class TranslationOutput(BaseModel):


@workflowai.agent(id="french-translator", model=Model.CLAUDE_3_5_SONNET_LATEST)
def translate_to_english(_: TranslationInput) -> AsyncIterator[Run[TranslationOutput]]:
async def translate_to_english(_: TranslationInput) -> TranslationOutput:
"""
Translate French text into natural, fluent English.

Expand Down Expand Up @@ -74,7 +73,7 @@ async def main():
# This ensures we can see the streaming effect in the example
# Otherwise, subsequent runs would return the cached result instantly,
# making it hard to observe the incremental streaming behavior
async for chunk in translate_to_english(TranslationInput(text=french_text), use_cache="never"):
async for chunk in translate_to_english.stream(TranslationInput(text=french_text), use_cache="never"):
print(f"--- Translation Progress (Chunk {chunk_num}) ---")
print(chunk.output.translation)
print("-" * 50)
Expand Down
18 changes: 8 additions & 10 deletions examples/08_pdf_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Run, WorkflowAIError
from workflowai import WorkflowAIError
from workflowai.core.domain.model import Model
from workflowai.fields import File
from workflowai.fields import PDF


class PDFQuestionInput(BaseModel):
pdf: File = Field(description="The PDF document to analyze")
pdf: PDF = Field(description="The PDF document to analyze")
question: str = Field(description="The question to answer about the PDF content")


Expand All @@ -21,7 +21,7 @@ class PDFAnswerOutput(BaseModel):


@workflowai.agent(id="pdf-answer", model=Model.CLAUDE_3_5_SONNET_LATEST)
async def answer_pdf_question(_: PDFQuestionInput) -> Run[PDFAnswerOutput]:
async def answer_pdf_question(_: PDFQuestionInput) -> PDFAnswerOutput:
"""
Analyze the provided PDF document and answer the given question.
Provide a clear and concise answer based on the content found in the PDF.
Expand Down Expand Up @@ -49,23 +49,21 @@ async def run_pdf_answer():

content = base64.b64encode(pdf_file.read()).decode("utf-8")

pdf = File(content_type="application/pdf", data=content)
pdf = PDF(content_type="application/pdf", data=content)
# Could also pass the content via url
# pdf = File(url="https://example.com/sample.pdf")
# pdf = PDF(url="https://example.com/sample.pdf")
question = "How many stocks were sold? What is the total amount in USD?"

try:
agent_run = await answer_pdf_question(
run = await answer_pdf_question.run(
PDFQuestionInput(pdf=pdf, question=question),
use_cache="auto",
)
except WorkflowAIError as e:
print(f"Failed to run task. Code: {e.error.code}. Message: {e.error.message}")
return

print("\n--------\nAgent output:\n", agent_run.output, "\n--------\n")
print(f"Cost: ${agent_run.cost_usd:.10f}")
print(f"Latency: {agent_run.duration_seconds:.2f}s")
print(run)


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions examples/09_reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model, Run
from workflowai import Model


class NameExtractionInput(BaseModel):
Expand All @@ -43,7 +43,7 @@ class NameExtractionOutput(BaseModel):


@workflowai.agent(id="name-extractor", model=Model.GPT_4O_MINI_LATEST)
async def extract_name(_: NameExtractionInput) -> Run[NameExtractionOutput]:
async def extract_name(_: NameExtractionInput) -> NameExtractionOutput:
"""
Extract a person's first and last name from a sentence.
Be precise and consider cultural variations in name formats.
Expand All @@ -64,7 +64,7 @@ async def main():
print(f"\nProcessing: {sentence}")

# Initial extraction
run = await extract_name(NameExtractionInput(sentence=sentence))
run = await extract_name.run(NameExtractionInput(sentence=sentence))

print(f"Extracted: {run.output.first_name} {run.output.last_name}")

Expand Down
10 changes: 5 additions & 5 deletions examples/10_calendar_event_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run
from workflowai import Model
from workflowai.fields import File


Expand Down Expand Up @@ -79,7 +79,7 @@ class CalendarEventOutput(BaseModel):
id="calendar-event-extractor",
model=Model.GPT_4O_MINI_LATEST,
)
async def extract_calendar_event_from_email(email_input: EmailInput) -> Run[CalendarEventOutput]:
async def extract_calendar_event_from_email(email_input: EmailInput) -> CalendarEventOutput:
"""
Extract calendar event details from email content.

Expand Down Expand Up @@ -112,7 +112,7 @@ async def extract_calendar_event_from_email(email_input: EmailInput) -> Run[Cale
id="calendar-event-extractor",
model=Model.GPT_4O_MINI_LATEST,
)
async def extract_calendar_event_from_image(image_input: ImageInput) -> Run[CalendarEventOutput]:
async def extract_calendar_event_from_image(image_input: ImageInput) -> CalendarEventOutput:
"""
Extract calendar event details from an event poster or flyer image.

Expand Down Expand Up @@ -160,7 +160,7 @@ async def main():
""",
)

run = await extract_calendar_event_from_email(email1)
run = await extract_calendar_event_from_email.run(email1)
print(run)

# Example 2: Virtual meeting with more details
Expand Down Expand Up @@ -238,7 +238,7 @@ async def main():
)

try:
run = await extract_calendar_event_from_email(email4)
run = await extract_calendar_event_from_email.run(email4)
print(run)
except workflowai.WorkflowAIError as e:
print(f"As expected, no calendar event found: {e!s}")
Expand Down
8 changes: 4 additions & 4 deletions examples/12_contextual_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run
from workflowai import Model


class ContextGeneratorInput(BaseModel):
Expand Down Expand Up @@ -37,7 +37,7 @@ class ContextGeneratorOutput(BaseModel):
id="context-generator",
model=Model.CLAUDE_3_5_SONNET_LATEST,
)
async def generate_chunk_context(context_input: ContextGeneratorInput) -> Run[ContextGeneratorOutput]:
async def generate_chunk_context(context_input: ContextGeneratorInput) -> ContextGeneratorOutput:
"""
Here is the chunk we want to situate within the whole document.
Please give a short succinct context to situate this chunk within the overall document
Expand Down Expand Up @@ -80,9 +80,9 @@ async def main():
chunk_content=chunk_content,
)

run = await generate_chunk_context(context_input)
run = await generate_chunk_context.run(context_input)
print("\nGenerated Context:")
print(run.output.context)
print(run)


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions examples/14_templated_instructions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pydantic import BaseModel, Field

import workflowai
from workflowai import Model, Run
from workflowai import Model


class CodeReviewInput(BaseModel):
Expand Down Expand Up @@ -77,7 +77,7 @@ class CodeReviewOutput(BaseModel):
id="templated-code-reviewer",
model=Model.CLAUDE_3_5_SONNET_LATEST,
)
async def review_code(review_input: CodeReviewInput) -> Run[CodeReviewOutput]:
async def review_code(review_input: CodeReviewInput) -> CodeReviewOutput:
"""
Review code based on specified parameters and guidelines.

Expand Down Expand Up @@ -142,7 +142,7 @@ def calculate_sum(numbers):
return result
"""

run = await review_code(
run = await review_code.run(
CodeReviewInput(
language="python",
code=python_code,
Expand Down
Loading