diff --git a/examples/01_basic_agent.py b/examples/01_basic_agent.py index 93794cc..8d9f266 100644 --- a/examples/01_basic_agent.py +++ b/examples/01_basic_agent.py @@ -5,9 +5,11 @@ 1. Basic agent creation with input/output models 2. Field descriptions and examples 3. Cost and latency tracking +4. How to fetch and analyze completions after a run """ import asyncio +from typing import Any from pydantic import BaseModel, Field @@ -17,6 +19,7 @@ class CityInput(BaseModel): """Input model for the city-to-capital agent.""" + city: str = Field( description="The name of the city for which to find the country's capital", examples=["Paris", "New York", "Tokyo"], @@ -25,6 +28,7 @@ class CityInput(BaseModel): class CapitalOutput(BaseModel): """Output model containing information about the capital city.""" + country: str = Field( description="The country where the input city is located", examples=["France", "United States", "Japan"], @@ -57,6 +61,21 @@ async def get_capital_info(city_input: CityInput) -> Run[CapitalOutput]: ... +async def display_completions(run: Run[Any]) -> None: + """Helper function to display completions for a run.""" + try: + completions = await run.fetch_completions() + + for completion in completions: + print("\n--- Completion Details ---") + + # Use model_dump_json for clean serialization + completion_json = completion.model_dump_json(indent=2) + print(completion_json) + except (ValueError, workflowai.WorkflowAIError) as e: + print(f"Error: {e}") + + async def main(): # Example 1: Basic usage with Paris print("\nExample 1: Basic usage with Paris") @@ -70,6 +89,9 @@ async def main(): run = await get_capital_info.run(CityInput(city="Tokyo")) print(run) + # Fetch and display completions for the Tokyo example + await display_completions(run) + if __name__ == "__main__": asyncio.run(main()) diff --git a/examples/02_agent_with_tools.py b/examples/02_agent_with_tools.py index 85289e5..9bfb008 100644 --- a/examples/02_agent_with_tools.py +++ b/examples/02_agent_with_tools.py @@ -62,7 +62,7 @@ class HistoricalEventOutput(BaseModel): @workflowai.agent( id="historical-event-analyzer", - model=Model.GEMINI_1_5_FLASH_LATEST, + model=Model.GEMINI_2_0_FLASH_LATEST, tools=[get_current_date, calculate_days_between], ) async def analyze_historical_event(event_input: HistoricalEventInput) -> HistoricalEventOutput: diff --git a/examples/03_caching.py b/examples/03_caching.py index e55d85c..5e3bd7a 100644 --- a/examples/03_caching.py +++ b/examples/03_caching.py @@ -17,7 +17,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model # Import CacheUsage type CacheUsage = Literal["auto", "always", "never"] @@ -54,7 +54,7 @@ class SOAPNote(BaseModel): id="soap-extractor", model=Model.LLAMA_3_3_70B, ) -async def extract_soap_notes(soap_input: SOAPInput) -> Run[SOAPNote]: +async def extract_soap_notes(soap_input: SOAPInput) -> SOAPNote: """ Extract SOAP notes from a medical consultation transcript. @@ -91,7 +91,7 @@ async def demonstrate_caching(transcript: str): for cache_option in cache_options: start_time = time.time() - run = await extract_soap_notes( + run = await extract_soap_notes.run( SOAPInput(transcript=transcript), use_cache=cache_option, ) diff --git a/examples/04_audio_classifier_agent.py b/examples/04_audio_classifier_agent.py index 8bf5a19..fb36246 100644 --- a/examples/04_audio_classifier_agent.py +++ b/examples/04_audio_classifier_agent.py @@ -13,13 +13,13 @@ from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] import workflowai -from workflowai import Model, Run -from workflowai.fields import File +from workflowai import Model +from workflowai.fields import Audio class AudioInput(BaseModel): """Input containing the audio file to analyze.""" - audio: File = Field( + audio: Audio = Field( description="The audio recording to analyze for spam/robocall detection", ) @@ -67,7 +67,7 @@ class AudioClassification(BaseModel): id="audio-spam-detector", model=Model.GEMINI_1_5_FLASH_LATEST, ) -async def classify_audio(audio_input: AudioInput) -> Run[AudioClassification]: +async def classify_audio(audio_input: AudioInput) -> AudioClassification: """ Analyze the audio recording to determine if it's a spam/robocall. @@ -108,18 +108,18 @@ async def main(): with open(audio_path, "rb") as f: # noqa: ASYNC230 audio_data = f.read() - audio = File( + audio = Audio( content_type="audio/mp3", data=base64.b64encode(audio_data).decode(), ) # Example 2: Using a URL instead of base64 (commented out) - # audio = File( + # audio = Audio( # url="https://example.com/audio/call.mp3" # ) # Classify the audio - run = await classify_audio(AudioInput(audio=audio)) + run = await classify_audio.run(AudioInput(audio=audio)) # Print results including cost and latency information print(run) diff --git a/examples/05_browser_text_uptime_agent.py b/examples/05_browser_text_uptime_agent.py index 4e47624..a5d0243 100644 --- a/examples/05_browser_text_uptime_agent.py +++ b/examples/05_browser_text_uptime_agent.py @@ -11,7 +11,7 @@ from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] import workflowai -from workflowai import Model, Run +from workflowai import Model class UptimeInput(BaseModel): @@ -40,7 +40,7 @@ class UptimeOutput(BaseModel): id="uptime-checker", model=Model.GPT_4O_MINI_LATEST, ) -async def check_uptime(uptime_input: UptimeInput, use_cache: str = "never") -> Run[UptimeOutput]: +async def check_uptime(uptime_input: UptimeInput, use_cache: str = "never") -> UptimeOutput: """ Fetch and analyze uptime data from an API status page. Use @browser-text to get the page content. @@ -68,7 +68,7 @@ async def main(): print("-" * 50) # Get uptime data with caching disabled - run = await check_uptime(uptime_input, use_cache="never") + run = await check_uptime.run(uptime_input, use_cache="never") # Print the run print(run) diff --git a/examples/06_streaming_summary.py b/examples/06_streaming_summary.py index 557cb3b..23617bc 100644 --- a/examples/06_streaming_summary.py +++ b/examples/06_streaming_summary.py @@ -6,12 +6,11 @@ """ import asyncio -from collections.abc import AsyncIterator from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] import workflowai -from workflowai import Model, Run +from workflowai import Model class TranslationInput(BaseModel): @@ -30,7 +29,7 @@ class TranslationOutput(BaseModel): @workflowai.agent(id="french-translator", model=Model.CLAUDE_3_5_SONNET_LATEST) -def translate_to_english(_: TranslationInput) -> AsyncIterator[Run[TranslationOutput]]: +async def translate_to_english(_: TranslationInput) -> TranslationOutput: """ Translate French text into natural, fluent English. @@ -74,7 +73,7 @@ async def main(): # This ensures we can see the streaming effect in the example # Otherwise, subsequent runs would return the cached result instantly, # making it hard to observe the incremental streaming behavior - async for chunk in translate_to_english(TranslationInput(text=french_text), use_cache="never"): + async for chunk in translate_to_english.stream(TranslationInput(text=french_text), use_cache="never"): print(f"--- Translation Progress (Chunk {chunk_num}) ---") print(chunk.output.translation) print("-" * 50) diff --git a/examples/08_pdf_agent.py b/examples/08_pdf_agent.py index f2e849d..4cce803 100644 --- a/examples/08_pdf_agent.py +++ b/examples/08_pdf_agent.py @@ -5,13 +5,13 @@ from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] import workflowai -from workflowai import Run, WorkflowAIError +from workflowai import WorkflowAIError from workflowai.core.domain.model import Model -from workflowai.fields import File +from workflowai.fields import PDF class PDFQuestionInput(BaseModel): - pdf: File = Field(description="The PDF document to analyze") + pdf: PDF = Field(description="The PDF document to analyze") question: str = Field(description="The question to answer about the PDF content") @@ -21,7 +21,7 @@ class PDFAnswerOutput(BaseModel): @workflowai.agent(id="pdf-answer", model=Model.CLAUDE_3_5_SONNET_LATEST) -async def answer_pdf_question(_: PDFQuestionInput) -> Run[PDFAnswerOutput]: +async def answer_pdf_question(_: PDFQuestionInput) -> PDFAnswerOutput: """ Analyze the provided PDF document and answer the given question. Provide a clear and concise answer based on the content found in the PDF. @@ -49,13 +49,13 @@ async def run_pdf_answer(): content = base64.b64encode(pdf_file.read()).decode("utf-8") - pdf = File(content_type="application/pdf", data=content) + pdf = PDF(content_type="application/pdf", data=content) # Could also pass the content via url - # pdf = File(url="https://example.com/sample.pdf") + # pdf = PDF(url="https://example.com/sample.pdf") question = "How many stocks were sold? What is the total amount in USD?" try: - agent_run = await answer_pdf_question( + run = await answer_pdf_question.run( PDFQuestionInput(pdf=pdf, question=question), use_cache="auto", ) @@ -63,9 +63,7 @@ async def run_pdf_answer(): print(f"Failed to run task. Code: {e.error.code}. Message: {e.error.message}") return - print("\n--------\nAgent output:\n", agent_run.output, "\n--------\n") - print(f"Cost: ${agent_run.cost_usd:.10f}") - print(f"Latency: {agent_run.duration_seconds:.2f}s") + print(run) if __name__ == "__main__": diff --git a/examples/09_reply.py b/examples/09_reply.py index f5bdfed..9fccca9 100644 --- a/examples/09_reply.py +++ b/examples/09_reply.py @@ -20,7 +20,7 @@ from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] import workflowai -from workflowai import Model, Run +from workflowai import Model class NameExtractionInput(BaseModel): @@ -43,7 +43,7 @@ class NameExtractionOutput(BaseModel): @workflowai.agent(id="name-extractor", model=Model.GPT_4O_MINI_LATEST) -async def extract_name(_: NameExtractionInput) -> Run[NameExtractionOutput]: +async def extract_name(_: NameExtractionInput) -> NameExtractionOutput: """ Extract a person's first and last name from a sentence. Be precise and consider cultural variations in name formats. @@ -64,7 +64,7 @@ async def main(): print(f"\nProcessing: {sentence}") # Initial extraction - run = await extract_name(NameExtractionInput(sentence=sentence)) + run = await extract_name.run(NameExtractionInput(sentence=sentence)) print(f"Extracted: {run.output.first_name} {run.output.last_name}") diff --git a/examples/10_calendar_event_extraction.py b/examples/10_calendar_event_extraction.py index be1c3fe..4675802 100644 --- a/examples/10_calendar_event_extraction.py +++ b/examples/10_calendar_event_extraction.py @@ -17,7 +17,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model from workflowai.fields import File @@ -79,7 +79,7 @@ class CalendarEventOutput(BaseModel): id="calendar-event-extractor", model=Model.GPT_4O_MINI_LATEST, ) -async def extract_calendar_event_from_email(email_input: EmailInput) -> Run[CalendarEventOutput]: +async def extract_calendar_event_from_email(email_input: EmailInput) -> CalendarEventOutput: """ Extract calendar event details from email content. @@ -112,7 +112,7 @@ async def extract_calendar_event_from_email(email_input: EmailInput) -> Run[Cale id="calendar-event-extractor", model=Model.GPT_4O_MINI_LATEST, ) -async def extract_calendar_event_from_image(image_input: ImageInput) -> Run[CalendarEventOutput]: +async def extract_calendar_event_from_image(image_input: ImageInput) -> CalendarEventOutput: """ Extract calendar event details from an event poster or flyer image. @@ -160,7 +160,7 @@ async def main(): """, ) - run = await extract_calendar_event_from_email(email1) + run = await extract_calendar_event_from_email.run(email1) print(run) # Example 2: Virtual meeting with more details @@ -238,7 +238,7 @@ async def main(): ) try: - run = await extract_calendar_event_from_email(email4) + run = await extract_calendar_event_from_email.run(email4) print(run) except workflowai.WorkflowAIError as e: print(f"As expected, no calendar event found: {e!s}") diff --git a/examples/12_contextual_retrieval.py b/examples/12_contextual_retrieval.py index aacd7f0..796a411 100644 --- a/examples/12_contextual_retrieval.py +++ b/examples/12_contextual_retrieval.py @@ -9,7 +9,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class ContextGeneratorInput(BaseModel): @@ -37,7 +37,7 @@ class ContextGeneratorOutput(BaseModel): id="context-generator", model=Model.CLAUDE_3_5_SONNET_LATEST, ) -async def generate_chunk_context(context_input: ContextGeneratorInput) -> Run[ContextGeneratorOutput]: +async def generate_chunk_context(context_input: ContextGeneratorInput) -> ContextGeneratorOutput: """ Here is the chunk we want to situate within the whole document. Please give a short succinct context to situate this chunk within the overall document @@ -80,9 +80,9 @@ async def main(): chunk_content=chunk_content, ) - run = await generate_chunk_context(context_input) + run = await generate_chunk_context.run(context_input) print("\nGenerated Context:") - print(run.output.context) + print(run) if __name__ == "__main__": diff --git a/examples/14_templated_instructions.py b/examples/14_templated_instructions.py index 650f2b4..5b8bc31 100644 --- a/examples/14_templated_instructions.py +++ b/examples/14_templated_instructions.py @@ -23,7 +23,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class CodeReviewInput(BaseModel): @@ -77,7 +77,7 @@ class CodeReviewOutput(BaseModel): id="templated-code-reviewer", model=Model.CLAUDE_3_5_SONNET_LATEST, ) -async def review_code(review_input: CodeReviewInput) -> Run[CodeReviewOutput]: +async def review_code(review_input: CodeReviewInput) -> CodeReviewOutput: """ Review code based on specified parameters and guidelines. @@ -142,7 +142,7 @@ def calculate_sum(numbers): return result """ - run = await review_code( + run = await review_code.run( CodeReviewInput( language="python", code=python_code, diff --git a/examples/15_text_to_sql.py b/examples/15_text_to_sql.py index 908b61e..8257f61 100644 --- a/examples/15_text_to_sql.py +++ b/examples/15_text_to_sql.py @@ -17,7 +17,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class SQLGenerationInput(BaseModel): @@ -49,7 +49,7 @@ class SQLGenerationOutput(BaseModel): id="text-to-sql", model=Model.CLAUDE_3_5_SONNET_LATEST, ) -async def generate_sql(review_input: SQLGenerationInput) -> Run[SQLGenerationOutput]: +async def generate_sql(review_input: SQLGenerationInput) -> SQLGenerationOutput: """ Convert natural language questions to SQL queries based on the provided schema. @@ -121,7 +121,7 @@ async def main(): # Example 1: Simple SELECT with conditions print("\nExample 1: Find expensive products") print("-" * 50) - run = await generate_sql( + run = await generate_sql.run( SQLGenerationInput( db_schema=schema, question="Show me all products that cost more than $100, ordered by price descending", @@ -132,7 +132,7 @@ async def main(): # Example 2: JOIN with aggregation print("\nExample 2: Customer order summary") print("-" * 50) - run = await generate_sql( + run = await generate_sql.run( SQLGenerationInput( db_schema=schema, question=( @@ -146,7 +146,7 @@ async def main(): # Example 3: Complex query print("\nExample 3: Product category analysis") print("-" * 50) - run = await generate_sql( + run = await generate_sql.run( SQLGenerationInput( db_schema=schema, question=( diff --git a/examples/16_multi_model_consensus.py b/examples/16_multi_model_consensus.py index da0fbbf..ba66f1c 100644 --- a/examples/16_multi_model_consensus.py +++ b/examples/16_multi_model_consensus.py @@ -15,7 +15,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class MultiModelInput(BaseModel): @@ -56,7 +56,7 @@ class CombinedOutput(BaseModel): @workflowai.agent( id="question-answerer", ) -async def get_model_response(query: MultiModelInput) -> Run[ModelResponse]: +async def get_model_response(query: MultiModelInput) -> ModelResponse: """Get response from the specified model.""" ... @@ -65,7 +65,7 @@ async def get_model_response(query: MultiModelInput) -> Run[ModelResponse]: id="response-combiner", model=Model.O3_MINI_2025_01_31_MEDIUM_REASONING_EFFORT, ) -async def combine_responses(responses_input: CombinerInput) -> Run[CombinedOutput]: +async def combine_responses(responses_input: CombinerInput) -> CombinedOutput: """ Analyze and combine responses from multiple models into a single coherent answer. @@ -100,7 +100,7 @@ async def main(): responses = [] for model, model_name in models: - run = await get_model_response( + run = await get_model_response.run( MultiModelInput( question=question, model_name=model_name, @@ -110,7 +110,7 @@ async def main(): responses.append(run.output) # Combine responses - combined = await combine_responses(CombinerInput(responses=responses)) + combined = await combine_responses.run(CombinerInput(responses=responses)) print(combined) diff --git a/examples/17_multi_model_consensus_with_tools.py b/examples/17_multi_model_consensus_with_tools.py index 761e284..82806f4 100644 --- a/examples/17_multi_model_consensus_with_tools.py +++ b/examples/17_multi_model_consensus_with_tools.py @@ -23,7 +23,7 @@ from pydantic import BaseModel, Field import workflowai -from workflowai import Model, Run +from workflowai import Model class AskModelInput(BaseModel): @@ -44,13 +44,14 @@ class AskModelOutput(BaseModel): # through the get_model_response agent. This creates a hierarchy where the # response-combiner orchestrates multiple model queries by delegating to get_model_response. async def ask_model(query_input: AskModelInput) -> AskModelOutput: - """Ask a specific model a question and get its response.""" - run = await get_model_response( + """Ask a specific model a question and return its response.""" + run = await get_model_response.run( MultiModelInput( question=query_input.question, ), model=query_input.model, ) + # get_model_response.run() returns a Run[ModelResponse], so we need to access the output return AskModelOutput(response=run.output.response) @@ -91,7 +92,7 @@ class CombinedOutput(BaseModel): @workflowai.agent( id="question-answerer", ) -async def get_model_response(query: MultiModelInput) -> Run[ModelResponse]: +async def get_model_response(query: MultiModelInput) -> ModelResponse: """ Make sure to: 1. Provide a clear and detailed response @@ -107,7 +108,7 @@ async def get_model_response(query: MultiModelInput) -> Run[ModelResponse]: model=Model.GPT_4O_MINI_LATEST, tools=[ask_model], ) -async def combine_responses(responses_input: CombinerInput) -> Run[CombinedOutput]: +async def combine_responses(responses_input: CombinerInput) -> CombinedOutput: """ Analyze and combine responses from multiple models into a single coherent answer. You should ask at least 3 different models to get a diverse set of perspectives. @@ -137,7 +138,7 @@ async def main(): question = "What is dark matter and why is it important for our understanding of the universe?" # Let the response-combiner handle asking the models - combined = await combine_responses( + combined = await combine_responses.run( CombinerInput( original_question=question, ),