From f12f012f23aa030f92dc272b04f695c8b64a89bb Mon Sep 17 00:00:00 2001 From: Pierre Date: Thu, 30 Jan 2025 18:41:23 +0100 Subject: [PATCH 1/7] first simple workflow: chain --- examples/workflows/chain.py | 147 ++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 examples/workflows/chain.py diff --git a/examples/workflows/chain.py b/examples/workflows/chain.py new file mode 100644 index 0000000..e229e0c --- /dev/null +++ b/examples/workflows/chain.py @@ -0,0 +1,147 @@ +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from typing import TypedDict + +class MarketingCopyInput(BaseModel): + """The product or concept for which to generate marketing copy.""" + idea: str = Field(description="A short description or name of the product.") + + +class MarketingCopyOutput(BaseModel): + """Contains the AI generated marketing copy text for the provided product or concept.""" + marketing_text: str = Field(description="Persuasive marketing copy text.") + + +@workflowai.agent(id='marketing-copy-generator', model=Model.GPT_4O_MINI_LATEST) +async def generate_marketing_copy_agent( + input: MarketingCopyInput +) -> MarketingCopyOutput: + """ + Write persuasive marketing copy for the provided idea. + Focus on benefits and emotional appeal. + """ + ... + + +class EvaluateCopyInput(BaseModel): + """Input type for evaluating the quality of marketing copy.""" + marketing_text: str = Field(description="The marketing copy text to evaluate.") + + +class EvaluateCopyOutput(BaseModel): + """Evaluation results for the marketing copy.""" + hasCallToAction: bool = Field(description="Whether a call to action is present.") + emotionalAppeal: int = Field(description="Emotional appeal rating (1-10).") + clarity: int = Field(description="Clarity rating (1-10).") + +# We use a smarter model (O1) to review the copy since evaluation requires more nuanced understanding +@workflowai.agent(id='marketing-copy-evaluator', model=Model.O1_MINI_LATEST) +async def evaluate_marketing_copy_agent( + input: EvaluateCopyInput +) -> EvaluateCopyOutput: + """ + Evaluate the marketing copy for: + 1) Presence of a call to action (true/false) + 2) Emotional appeal (1-10) + 3) Clarity (1-10) + Return the results as a structured output. + """ + ... + + +class RewriteCopyInput(BaseModel): + """Input for rewriting the marketing copy with targeted improvements.""" + original_copy: str = Field(default="", description="Original marketing copy.") + add_call_to_action: bool = Field(default=False, description="Whether we need a clear call to action.") + strengthen_emotional_appeal: bool = Field(default=False, description="Whether emotional appeal needs a boost.") + improve_clarity: bool = Field(default=False, description="Whether clarity needs improvement.") + + +class RewriteCopyOutput(BaseModel): + """Contains the improved marketing copy.""" + marketing_text: str = Field(description="The improved marketing copy text.") + +# Claude 3.5 Sonnet is a more powerful model for copywriting +@workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) +async def rewrite_marketing_copy_agent( + input: RewriteCopyInput +) -> RewriteCopyOutput: + """ + Rewrite the marketing copy with the specified improvements: + - A clear CTA if requested + - Stronger emotional appeal if requested + - Improved clarity if requested + """ + ... + + +class MarketingResult(TypedDict): + original_copy: str + final_copy: str + was_improved: bool + quality_metrics: EvaluateCopyOutput + +async def generate_marketing_copy(idea: str) -> MarketingResult: + """ + Demonstrates a chain flow: + 1) Generate an initial marketing copy. + 2) Evaluate its quality. + 3) If the quality is lacking, request a rewrite with clearer CTA, stronger emotional appeal, and/or clarity. + 4) Return the final copy and metrics. + """ + # Step 1: Generate initial copy + generation = await generate_marketing_copy_agent(MarketingCopyInput(idea=idea)) + original_copy = generation.marketing_text + final_copy = original_copy + + # Step 2: Evaluate the copy + evaluation = await evaluate_marketing_copy_agent(EvaluateCopyInput(marketing_text=original_copy)) + + # Step 3: Check evaluation results. If below thresholds, rewrite + needs_improvement = ( + not evaluation.hasCallToAction + or evaluation.emotionalAppeal < 7 + or evaluation.clarity < 7 + ) + + if needs_improvement: + rewrite = await rewrite_marketing_copy_agent( + RewriteCopyInput( + original_copy=original_copy, + add_call_to_action=not evaluation.hasCallToAction, + strengthen_emotional_appeal=evaluation.emotionalAppeal < 7, + improve_clarity=evaluation.clarity < 7, + ) + ) + final_copy = rewrite.marketing_text + + return { + "original_copy": original_copy, + "final_copy": final_copy, + "was_improved": needs_improvement, + "quality_metrics": evaluation + } + + +if __name__ == "__main__": + idea = "A open-source platform for AI agents" + result = asyncio.run(generate_marketing_copy(idea)) + + print("\n=== Input Idea ===") + print(idea) + + print("\n=== Marketing Copy ===") + print(result["original_copy"]) + + print("\n=== Quality Assessment ===") + metrics = result["quality_metrics"] + print(f"✓ Call to Action: {'Present' if metrics.hasCallToAction else 'Missing'}") + print(f"✓ Emotional Appeal: {metrics.emotionalAppeal}/10") + print(f"✓ Clarity: {metrics.clarity}/10") + + if result["was_improved"]: + print("\n=== Improved Marketing Copy ===") + print(result["final_copy"]) + print() From 784d3a866ea86829d1abff222d3599a06789e1af Mon Sep 17 00:00:00 2001 From: Pierre Date: Thu, 30 Jan 2025 18:41:26 +0100 Subject: [PATCH 2/7] Create routing.py --- examples/workflows/routing.py | 132 ++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 examples/workflows/routing.py diff --git a/examples/workflows/routing.py b/examples/workflows/routing.py new file mode 100644 index 0000000..5687b69 --- /dev/null +++ b/examples/workflows/routing.py @@ -0,0 +1,132 @@ +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from enum import Enum +from typing import TypedDict + + +class QueryType(str, Enum): + GENERAL = "general" + REFUND = "refund" + TECHNICAL = "technical" + + +class ComplexityLevel(str, Enum): + SIMPLE = "simple" + COMPLEX = "complex" + + +class ClassificationInput(BaseModel): + """Input for query classification.""" + query: str = Field(description="The customer query to classify.") + + +class ClassificationOutput(BaseModel): + """Output containing query classification details.""" + reasoning: str = Field(description="Brief reasoning for the classification.") + type: QueryType = Field(description="Type of the query (general, refund, or technical).") + complexity: ComplexityLevel = Field(description="Complexity level of the query.") + + +# Uses GPT-4O for its strong analytical and classification capabilities +@workflowai.agent(id='query-classifier', model=Model.GPT_4O_LATEST) +async def classify_query_agent( + input: ClassificationInput +) -> ClassificationOutput: + """ + Classify the customer query by: + 1. Query type (general, refund, or technical) + 2. Complexity (simple or complex) + 3. Provide brief reasoning for classification + """ + ... + + +class ResponseInput(BaseModel): + """Input for generating customer response.""" + query: str = Field(description="The customer query to respond to.") + query_type: QueryType = Field(description="Type of the query for specialized handling.") + + +class ResponseOutput(BaseModel): + """Output containing the response to the customer.""" + response: str = Field(description="The generated response to the customer query.") + + +# Uses Claude 3.5 Sonnet for its strong conversational abilities and empathy +@workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) +async def handle_general_query(input: ResponseInput) -> ResponseOutput: + """Expert customer service agent handling general inquiries.""" + ... + + +# Uses GPT-4O Mini for efficient policy-based responses +@workflowai.agent(model=Model.GPT_4O_MINI_LATEST) +async def handle_refund_query(input: ResponseInput) -> ResponseOutput: + """Customer service agent specializing in refund requests.""" + ... + + +# Uses O1 Mini for its technical expertise and problem-solving capabilities +@workflowai.agent(model=Model.O1_MINI_LATEST) +async def handle_technical_query(input: ResponseInput) -> ResponseOutput: + """Technical support specialist providing troubleshooting assistance.""" + ... + + +class QueryResult(TypedDict): + response: str + classification: ClassificationOutput + + +async def handle_customer_query(query: str) -> QueryResult: + """ + Handle a customer query through a workflow: + 1. Classify the query type and complexity + 2. Route to appropriate specialized agent + 3. Return response and classification details + """ + # Step 1: Classify the query + classification = await classify_query_agent(ClassificationInput(query=query)) + + # Step 2: Route to appropriate handler based on type and complexity + handlers = { + (QueryType.GENERAL, ComplexityLevel.SIMPLE): handle_general_query, + (QueryType.GENERAL, ComplexityLevel.COMPLEX): handle_general_query, + (QueryType.REFUND, ComplexityLevel.SIMPLE): handle_refund_query, + (QueryType.REFUND, ComplexityLevel.COMPLEX): handle_refund_query, + (QueryType.TECHNICAL, ComplexityLevel.SIMPLE): handle_technical_query, + (QueryType.TECHNICAL, ComplexityLevel.COMPLEX): handle_technical_query, + } + + # Get appropriate handler + handler = handlers[(classification.type, classification.complexity)] + + # Generate response + result = await handler(ResponseInput( + query=query, + query_type=classification.type + )) + + return { + "response": result.response, + "classification": classification + } + + +if __name__ == "__main__": + query = "I'm having trouble logging into my account. It keeps saying invalid password even though I'm sure it's correct." + result = asyncio.run(handle_customer_query(query)) + + print("\n=== Customer Query ===") + print(query) + + print("\n=== Classification ===") + print(f"Type: {result['classification'].type}") + print(f"Complexity: {result['classification'].complexity}") + print(f"Reasoning: {result['classification'].reasoning}") + + print("\n=== Response ===") + print(result["response"]) + print() From 30427528022a92400b8342e6741c568581e3f817 Mon Sep 17 00:00:00 2001 From: Pierre Date: Mon, 3 Feb 2025 17:03:19 +0100 Subject: [PATCH 3/7] add README, + workflows --- examples/workflows/README.md | 55 ++++++ examples/workflows/evaluator_optimizer.py | 168 ++++++++++++++++++ examples/workflows/orchestrator_worker.py | 159 +++++++++++++++++ examples/workflows/parallel_processing.py | 198 ++++++++++++++++++++++ 4 files changed, 580 insertions(+) create mode 100644 examples/workflows/README.md create mode 100644 examples/workflows/evaluator_optimizer.py create mode 100644 examples/workflows/orchestrator_worker.py create mode 100644 examples/workflows/parallel_processing.py diff --git a/examples/workflows/README.md b/examples/workflows/README.md new file mode 100644 index 0000000..72e6db0 --- /dev/null +++ b/examples/workflows/README.md @@ -0,0 +1,55 @@ +# Workflows Patterns + +This README describes the five main patterns used in our workflows built using the WorkflowAI SDK. These patterns provide a structured method for composing complex AI tasks from simpler components, and allow you to balance flexibility and control in your AI applications. + +## 1. Sequential Processing (Chains) +In this pattern, tasks are executed in a fixed sequence, where the output of one step becomes the input for the next. This is ideal for linear processes such as content generation, data transformation, or any task that benefits from a clear, step-by-step flow. + +**Example:** +- Generate an initial result (e.g., marketing copy). +- Evaluate and refine that result through subsequent steps. + +For an implementation example, see [chain.py](chain.py). + +## 2. Routing +The routing pattern directs work based on intermediate results. An initial decision or classification determines which specialized agent should handle the next part of the workflow. This allows the system to adapt its behavior based on context (for instance, routing customer queries to different support teams). + +**Example:** +- Classify a customer query (e.g., as general, refund, or technical). +- Route the query to a specialized agent that handles that particular type. + +For an implementation example, see [routing.py](routing.py). + +## 3. Parallel Processing +Parallel processing splits work into independent subtasks that run concurrently. This pattern is used when different aspects of an input can be handled independently, leading to faster overall processing times. + +**Example:** +- Perform security, performance, and maintainability reviews on code simultaneously. +- Collect and aggregate the results after all tasks complete. + +For an implementation example, see [parallel_processing.py](parallel_processing.py). + +## 4. Orchestrator-Worker +In the orchestrator-worker pattern, one agent (the orchestrator) plans and coordinates the work, while multiple worker agents execute the individual parts of the plan in parallel. This pattern is particularly useful when a task can be decomposed into distinct planning and execution phases. + +**Example:** +- An orchestrator analyzes a feature request and generates an implementation plan with details on file changes. +- Worker agents then execute the planned changes concurrently. + +For an implementation example, see [orchestrator_worker.py](orchestrator_worker.py). + +## 5. Evaluator-Optimizer +The evaluator-optimizer pattern employs an iterative feedback loop. An initial output is evaluated for quality, and based on the feedback, improvements are made. This cycle is repeated until the output reaches the desired quality, or a maximum number of iterations is met. + +**Example:** +- Translate text using a fast initial model. +- Evaluate the translation's quality, tone, nuance, and cultural accuracy. +- If needed, refine the translation based on detailed feedback and repeat the process. + +For an implementation example, see [evaluator_optimizer.py](evaluator_optimizer.py). + +--- + +These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples). + +This README should serve as a high-level guide to understanding and using the different patterns available in our workflows. diff --git a/examples/workflows/evaluator_optimizer.py b/examples/workflows/evaluator_optimizer.py new file mode 100644 index 0000000..efa5e5c --- /dev/null +++ b/examples/workflows/evaluator_optimizer.py @@ -0,0 +1,168 @@ +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from typing import List, TypedDict + + +class TranslationInput(BaseModel): + """Input for translation.""" + text: str = Field(description="The text to translate.") + target_language: str = Field(description="The target language for translation.") + + +class TranslationOutput(BaseModel): + """Output containing the translated text.""" + translation: str = Field(description="The translated text.") + + +# Uses GPT-4O Mini for fast initial translation +@workflowai.agent(id='initial-translator', model=Model.GPT_4O_MINI_LATEST) +async def initial_translation_agent( + input: TranslationInput +) -> TranslationOutput: + """ + Expert literary translator. + Translate text while preserving tone and cultural nuances. + """ + ... + + +class TranslationEvaluationInput(BaseModel): + """Input for evaluating translation quality.""" + original_text: str = Field(description="The original text.") + translation: str = Field(description="The translation to evaluate.") + target_language: str = Field(description="The target language of the translation.") + + +class TranslationEvaluationOutput(BaseModel): + """Output containing the translation evaluation.""" + quality_score: int = Field(description="Overall quality score (1-10).") + preserves_tone: bool = Field(description="Whether the translation preserves the original tone.") + preserves_nuance: bool = Field(description="Whether the translation preserves subtle nuances.") + culturally_accurate: bool = Field(description="Whether the translation is culturally appropriate.") + specific_issues: List[str] = Field(description="List of specific issues identified.") + improvement_suggestions: List[str] = Field(description="List of suggested improvements.") + + +# Uses O1 for its strong analytical and evaluation capabilities +@workflowai.agent(id='translation-evaluator', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def evaluate_translation_agent( + input: TranslationEvaluationInput +) -> TranslationEvaluationOutput: + """ + Expert in evaluating literary translations. + Evaluate translations for quality, tone preservation, nuance, and cultural accuracy. + """ + ... + + +class TranslationImprovementInput(BaseModel): + """Input for improving translation based on feedback.""" + original_text: str = Field(description="The original text.") + current_translation: str = Field(description="The current translation.") + target_language: str = Field(description="The target language.") + specific_issues: List[str] = Field(description="Issues to address.") + improvement_suggestions: List[str] = Field(description="Suggestions for improvement.") + + +class TranslationImprovementOutput(BaseModel): + """Output containing the improved translation.""" + translation: str = Field(description="The improved translation.") + + +# Uses GPT-4O for high-quality translation refinement +@workflowai.agent(id='translation-improver', model=Model.GPT_4O_LATEST) +async def improve_translation_agent( + input: TranslationImprovementInput +) -> TranslationImprovementOutput: + """ + Expert literary translator. + Improve translations based on specific feedback while maintaining overall quality. + """ + ... + + +class TranslationResult(TypedDict): + final_translation: str + iterations_required: int + + +async def translate_with_feedback(text: str, target_language: str) -> TranslationResult: + """ + Translate text with iterative feedback and improvement: + 1. Generate initial translation with a faster model + 2. Evaluate translation quality + 3. If quality threshold not met, improve based on feedback + 4. Repeat evaluation-improvement cycle up to MAX_ITERATIONS + """ + MAX_ITERATIONS = 3 + iterations = 0 + + # Initial translation using faster model + current_translation = await initial_translation_agent( + TranslationInput( + text=text, + target_language=target_language + ) + ) + + while iterations < MAX_ITERATIONS: + # Evaluate current translation + evaluation = await evaluate_translation_agent( + TranslationEvaluationInput( + original_text=text, + translation=current_translation.translation, + target_language=target_language + ) + ) + + # Check if quality meets threshold + if ( + evaluation.quality_score >= 8 and + evaluation.preserves_tone and + evaluation.preserves_nuance and + evaluation.culturally_accurate + ): + break + + # Generate improved translation based on feedback + improved = await improve_translation_agent( + TranslationImprovementInput( + original_text=text, + current_translation=current_translation.translation, + target_language=target_language, + specific_issues=evaluation.specific_issues, + improvement_suggestions=evaluation.improvement_suggestions + ) + ) + + current_translation.translation = improved.translation + iterations += 1 + + return { + "final_translation": current_translation.translation, + "iterations_required": iterations + } + + +if __name__ == "__main__": + # Example text to translate + text = """ + The old bookstore, nestled in the heart of the city, + was a sanctuary for bibliophiles. Its wooden shelves, + worn smooth by decades of searching hands, held stories + in dozens of languages. The owner, a silver-haired woman + with kind eyes, knew each book's location by heart. + """ + target_language = "French" + + result = asyncio.run(translate_with_feedback(text, target_language)) + + print("\n=== Translation Result ===") + print(f"\nOriginal Text:") + print(text) + print(f"\nFinal Translation:") + print(result["final_translation"]) + print(f"\nIterations Required: {result['iterations_required']}") + print() \ No newline at end of file diff --git a/examples/workflows/orchestrator_worker.py b/examples/workflows/orchestrator_worker.py new file mode 100644 index 0000000..eb84383 --- /dev/null +++ b/examples/workflows/orchestrator_worker.py @@ -0,0 +1,159 @@ +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from enum import Enum +from typing import List, TypedDict + + +class ChangeType(str, Enum): + CREATE = "create" + MODIFY = "modify" + DELETE = "delete" + + +class ComplexityLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class FileChange(BaseModel): + """Describes a planned change to a file.""" + purpose: str = Field(description="Purpose of the change in the context of the feature.") + file_path: str = Field(description="Path to the file to be changed.") + change_type: ChangeType = Field(description="Type of change to be made.") + + +class ImplementationPlanInput(BaseModel): + """Input for generating an implementation plan.""" + feature_request: str = Field(description="The feature request to implement.") + + +class ImplementationPlanOutput(BaseModel): + """Output containing the implementation plan.""" + files: List[FileChange] = Field(description="List of files to be changed.") + estimated_complexity: ComplexityLevel = Field(description="Estimated complexity of the implementation.") + + +# Uses O1 for its strong architectural planning capabilities +@workflowai.agent(id='implementation-planner', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def plan_implementation_agent( + input: ImplementationPlanInput +) -> ImplementationPlanOutput: + """ + Senior software architect planning feature implementations. + Analyze feature requests and create detailed implementation plans. + """ + ... + + +class FileImplementationInput(BaseModel): + """Input for implementing file changes.""" + file_path: str = Field(description="Path to the file being changed.") + purpose: str = Field(description="Purpose of the change.") + feature_request: str = Field(description="Overall feature context.") + change_type: ChangeType = Field(description="Type of change being made.") + + +class FileImplementationOutput(BaseModel): + """Output containing the implemented changes.""" + explanation: str = Field(description="Explanation of the implemented changes.") + code: str = Field(description="The implemented code changes.") + + +# Uses GPT-4O for its strong code generation and modification capabilities +@workflowai.agent(id='file-implementer', model=Model.GPT_4O_LATEST) +async def implement_file_changes_agent( + input: FileImplementationInput +) -> FileImplementationOutput: + """ + Expert at implementing code changes based on the change type: + - CREATE: Implement new files following best practices and project patterns + - MODIFY: Modify existing code while maintaining consistency + - DELETE: Safely remove code while ensuring no breaking changes + """ + ... + + +class ImplementationChange(TypedDict): + file: FileChange + implementation: FileImplementationOutput + + +class FeatureImplementationResult(TypedDict): + plan: ImplementationPlanOutput + changes: List[ImplementationChange] + + +async def implement_feature(feature_request: str) -> FeatureImplementationResult: + """ + Implement a feature using an orchestrator-worker pattern: + 1. Orchestrator (planner) analyzes the request and creates an implementation plan + 2. Workers execute the planned changes in parallel + 3. Return both the plan and implemented changes + """ + # Orchestrator: Plan the implementation + implementation_plan = await plan_implementation_agent( + ImplementationPlanInput(feature_request=feature_request) + ) + + # Workers: Execute the planned changes in parallel + file_changes = await asyncio.gather(*[ + implement_file_changes_agent( + FileImplementationInput( + file_path=file.file_path, + purpose=file.purpose, + feature_request=feature_request, + change_type=file.change_type + ) + ) for file in implementation_plan.files + ]) + + # Combine results + changes: List[ImplementationChange] = [ + { + "file": implementation_plan.files[i], + "implementation": change + } + for i, change in enumerate(file_changes) + ] + + return { + "plan": implementation_plan, + "changes": changes + } + + +if __name__ == "__main__": + # Example feature request + feature_request = """ + Add a new user authentication endpoint that: + 1. Accepts username/password + 2. Validates credentials + 3. Returns a JWT token + 4. Includes rate limiting + """ + + result = asyncio.run(implement_feature(feature_request)) + + print("\n=== Implementation Plan ===") + print(f"Estimated Complexity: {result['plan'].estimated_complexity}") + print("\nPlanned Changes:") + for file in result['plan'].files: + print(f"\n- {file.change_type.upper()}: {file.file_path}") + print(f" Purpose: {file.purpose}") + + print("\n=== Implemented Changes ===") + for change in result['changes']: + file = change['file'] + impl = change['implementation'] + + print(f"\n=== {file.change_type.upper()}: {file.file_path} ===") + print("\nPurpose:") + print(file.purpose) + print("\nExplanation:") + print(impl.explanation) + print("\nCode:") + print(impl.code) + print() \ No newline at end of file diff --git a/examples/workflows/parallel_processing.py b/examples/workflows/parallel_processing.py new file mode 100644 index 0000000..2bbdb46 --- /dev/null +++ b/examples/workflows/parallel_processing.py @@ -0,0 +1,198 @@ +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from enum import Enum +from typing import TypedDict, List + + +class RiskLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +class SecurityReviewInput(BaseModel): + """Input for security code review.""" + code: str = Field(description="The code to review for security issues.") + + +class SecurityReviewOutput(BaseModel): + """Output from security code review.""" + vulnerabilities: List[str] = Field(description="List of identified security vulnerabilities.") + risk_level: RiskLevel = Field(description="Overall security risk level.") + suggestions: List[str] = Field(description="Security improvement suggestions.") + +# Uses Claude 3.5 Sonnet for its strong security analysis capabilities +@workflowai.agent(id='security-reviewer', model=Model.CLAUDE_3_5_SONNET_LATEST) +async def security_review_agent( + input: SecurityReviewInput +) -> SecurityReviewOutput: + """ + Expert in code security. + Focus on identifying security vulnerabilities, injection risks, and authentication issues. + """ + ... + + +class PerformanceReviewInput(BaseModel): + """Input for performance code review.""" + code: str = Field(description="The code to review for performance issues.") + + +class PerformanceReviewOutput(BaseModel): + """Output from performance code review.""" + issues: List[str] = Field(description="List of identified performance issues.") + impact: RiskLevel = Field(description="Impact level of performance issues.") + optimizations: List[str] = Field(description="Performance optimization suggestions.") + + +# Uses O1 Mini for its expertise in performance optimization +@workflowai.agent(id='performance-reviewer', model=Model.O1_MINI_LATEST) +async def performance_review_agent( + input: PerformanceReviewInput +) -> PerformanceReviewOutput: + """ + Expert in code performance. + Focus on identifying performance bottlenecks, memory leaks, and optimization opportunities. + """ + ... + + +class MaintainabilityReviewInput(BaseModel): + """Input for maintainability code review.""" + code: str = Field(description="The code to review for maintainability issues.") + + +class MaintainabilityReviewOutput(BaseModel): + """Output from maintainability code review.""" + concerns: List[str] = Field(description="List of maintainability concerns.") + quality_score: int = Field(description="Code quality score (1-10).", ge=1, le=10) + recommendations: List[str] = Field(description="Maintainability improvement recommendations.") + + +# Uses Claude 3.5 Sonnet for its strong code quality and readability analysis +@workflowai.agent(id='maintainability-reviewer', model=Model.CLAUDE_3_5_SONNET_LATEST) +async def maintainability_review_agent( + input: MaintainabilityReviewInput +) -> MaintainabilityReviewOutput: + """ + Expert in code quality. + Focus on code structure, readability, and adherence to best practices. + """ + ... + + +class ReviewSummaryInput(BaseModel): + """Input for review summary generation.""" + security_review: SecurityReviewOutput = Field(description="Security review results.") + performance_review: PerformanceReviewOutput = Field(description="Performance review results.") + maintainability_review: MaintainabilityReviewOutput = Field(description="Maintainability review results.") + + +class ReviewSummaryOutput(BaseModel): + """Output containing the summarized review.""" + summary: str = Field(description="Concise summary of all reviews with key actions.") + + +# Uses O1 for its strong synthesis and summarization abilities +@workflowai.agent(id='review-summarizer', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def summarize_reviews_agent( + input: ReviewSummaryInput +) -> ReviewSummaryOutput: + """ + Technical lead summarizing multiple code reviews. + Synthesize review results into a concise summary with key actions. + """ + ... + + +class CodeReviewResult(TypedDict): + security_review: SecurityReviewOutput + performance_review: PerformanceReviewOutput + maintainability_review: MaintainabilityReviewOutput + summary: str + + +async def parallel_code_review(code: str) -> CodeReviewResult: + """ + Perform parallel code reviews using specialized agents: + 1. Security review for vulnerabilities and risks + 2. Performance review for optimization opportunities + 3. Maintainability review for code quality + 4. Synthesize results into an actionable summary + """ + # Run parallel reviews + security_review, performance_review, maintainability_review = await asyncio.gather( + security_review_agent(SecurityReviewInput(code=code)), + performance_review_agent(PerformanceReviewInput(code=code)), + maintainability_review_agent(MaintainabilityReviewInput(code=code)) + ) + + # Aggregate and summarize results + summary = await summarize_reviews_agent( + ReviewSummaryInput( + security_review=security_review, + performance_review=performance_review, + maintainability_review=maintainability_review + ) + ) + + return { + "security_review": security_review, + "performance_review": performance_review, + "maintainability_review": maintainability_review, + "summary": summary.summary + } + + +if __name__ == "__main__": + # Example code to review + code_to_review = """ + def process_user_input(user_input): + # Execute the input as a command + result = eval(user_input) + return result + + def cache_data(data): + # Store everything in memory + global_cache.append(data) + + def get_user_data(user_id): + # Query without parameterization + cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") + return cursor.fetchall() + """ + + result = asyncio.run(parallel_code_review(code_to_review)) + + print("\n=== Security Review ===") + print(f"Risk Level: {result['security_review'].risk_level}") + print("\nVulnerabilities:") + for v in result['security_review'].vulnerabilities: + print(f"- {v}") + print("\nSuggestions:") + for s in result['security_review'].suggestions: + print(f"- {s}") + + print("\n=== Performance Review ===") + print(f"Impact Level: {result['performance_review'].impact}") + print("\nIssues:") + for i in result['performance_review'].issues: + print(f"- {i}") + print("\nOptimizations:") + for o in result['performance_review'].optimizations: + print(f"- {o}") + + print("\n=== Maintainability Review ===") + print(f"Quality Score: {result['maintainability_review'].quality_score}/10") + print("\nConcerns:") + for c in result['maintainability_review'].concerns: + print(f"- {c}") + print("\nRecommendations:") + for r in result['maintainability_review'].recommendations: + print(f"- {r}") + + print("\n=== Summary ===") + print(result["summary"]) + print() \ No newline at end of file From c83a8e26ed0c3cbcfeb1179d29f6b3d7f9b53658 Mon Sep 17 00:00:00 2001 From: Pierre Date: Mon, 3 Feb 2025 17:05:30 +0100 Subject: [PATCH 4/7] Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 401ed2c..23ece36 100644 --- a/README.md +++ b/README.md @@ -427,3 +427,7 @@ async for run in say_hello(Input(name="John")): print(run.output.greeting1) # will be empty if the model has not generated it yet ``` + +## Workflows + +For advanced workflow patterns and examples, please refer to the [Workflows README](examples/workflows/README.md) for more details. From 1ee64dba6ff7f2dab7dae8335b45255be0c9bc24 Mon Sep 17 00:00:00 2001 From: Pierre Date: Tue, 4 Feb 2025 18:08:51 +0100 Subject: [PATCH 5/7] add chain of agents inspired by Google https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/ --- examples/workflows/README.md | 12 +- examples/workflows/chain_of_agents.py | 200 ++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 examples/workflows/chain_of_agents.py diff --git a/examples/workflows/README.md b/examples/workflows/README.md index 72e6db0..478390b 100644 --- a/examples/workflows/README.md +++ b/examples/workflows/README.md @@ -48,8 +48,18 @@ The evaluator-optimizer pattern employs an iterative feedback loop. An initial o For an implementation example, see [evaluator_optimizer.py](evaluator_optimizer.py). +## 6. Chain of Agents (Long Context Processing) +The Chain of Agents pattern is designed for processing long documents or complex tasks that exceed the context window of a single model. In this pattern, multiple worker agents process different chunks of the input sequentially, passing their findings through the chain, while a manager agent synthesizes the final output. + +**Example:** +- Split a long document into manageable chunks +- Worker agents process each chunk sequentially, building upon previous findings +- A manager agent synthesizes all findings into a final, coherent response + +For an implementation example, see [chain_of_agents.py](chain_of_agents.py). + --- -These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples). +These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples) and research from organizations like [Google Research](https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/). This README should serve as a high-level guide to understanding and using the different patterns available in our workflows. diff --git a/examples/workflows/chain_of_agents.py b/examples/workflows/chain_of_agents.py new file mode 100644 index 0000000..21978c0 --- /dev/null +++ b/examples/workflows/chain_of_agents.py @@ -0,0 +1,200 @@ +""" +This implementation is inspired by Google Research's Chain of Agents (CoA) framework: +https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/ + +CoA is a training-free, task-agnostic framework for handling long-context tasks through LLM collaboration. +Instead of trying to fit everything into a single context window, it: +1. Breaks input into manageable chunks +2. Uses worker agents to process chunks sequentially, passing information forward +3. Uses a manager agent to synthesize the final answer + +Key benefits of this approach: +- More efficient than full-context processing (O(nk) vs O(n²) complexity) +- Better performance than RAG for complex reasoning tasks +- Scales well with input length (performance improves with longer inputs) +- No training required, works with any LLM +""" + +import workflowai +from workflowai import Model +from pydantic import BaseModel, Field +import asyncio +from typing import List + + +class DocumentChunk(BaseModel): + """A chunk of text from a long document.""" + content: str = Field(description="The content of this document chunk.") + + +class WorkerInput(BaseModel): + """Input for a worker agent processing a document chunk.""" + chunk: DocumentChunk = Field(description="The current chunk to process.") + query: str = Field(description="The query or task to be answered.") + previous_findings: str = Field( + default="", + description="Accumulated findings from previous workers." + ) + + +class WorkerOutput(BaseModel): + """Output from a worker agent containing findings and evidence.""" + findings: str = Field( + description="Key findings and information extracted from this chunk." + ) + evidence: List[str] = Field( + default_factory=list, + description="Supporting evidence or quotes from the chunk." + ) + message_for_next: str = Field( + description="Message to pass to the next worker with relevant context." + ) + + +class ManagerInput(BaseModel): + """Input for the manager agent to generate final response.""" + query: str = Field(description="The original query to answer.") + accumulated_findings: str = Field( + description="All findings accumulated through the worker chain." + ) + + +class ManagerOutput(BaseModel): + """Final output from the manager agent.""" + answer: str = Field(description="The final answer to the query.") + reasoning: str = Field(description="Explanation of how the answer was derived.") + supporting_evidence: List[str] = Field( + default_factory=list, + description="Key evidence supporting the answer." + ) + + +@workflowai.agent(id='document-worker', model=Model.GPT_4O_MINI_LATEST) +async def worker_agent(input: WorkerInput) -> WorkerOutput: + """ + Process your assigned document chunk to: + 1. Extract relevant information related to the query + 2. Build upon findings from previous workers + 3. Pass important context to the next worker + + Be concise but thorough in your analysis. + Focus on information that could be relevant to the query. + """ + ... + + +@workflowai.agent(id='document-manager', model=Model.CLAUDE_3_5_SONNET_LATEST) +async def manager_agent(input: ManagerInput) -> ManagerOutput: + """ + Synthesize all findings from the worker agents to: + 1. Generate a comprehensive answer to the query + 2. Provide clear reasoning for your answer + 3. Include supporting evidence from the document + + Ensure your answer is well-supported by the accumulated findings. + """ + ... + + +async def process_long_document( + document: str, + query: str, + chunk_size: int = 2000 +) -> ManagerOutput: + """ + Process a long document using Chain of Agents pattern: + 1. Split document into chunks + 2. Have worker agents process each chunk sequentially + 3. Pass findings through the chain + 4. Have manager agent synthesize final answer + """ + # Split document into chunks + chunks = [ + document[i:i + chunk_size] + for i in range(0, len(document), chunk_size) + ] + + # Convert to DocumentChunk objects + doc_chunks = [DocumentChunk(content=chunk) for chunk in chunks] + + # Initialize accumulator for findings + accumulated_findings = "" + + # Process chunks sequentially through worker chain + for chunk in doc_chunks: + worker_input = WorkerInput( + chunk=chunk, + query=query, + previous_findings=accumulated_findings + ) + + # Get worker's output + worker_output = await worker_agent(worker_input) + + # Accumulate findings + if accumulated_findings: + accumulated_findings += "\n\n" + accumulated_findings += f"Findings:\n{worker_output.findings}" + + if worker_output.evidence: + accumulated_findings += "\nEvidence:\n- " + accumulated_findings += "\n- ".join(worker_output.evidence) + + # Have manager synthesize final answer + manager_input = ManagerInput( + query=query, + accumulated_findings=accumulated_findings + ) + + return await manager_agent(manager_input) + + +async def main(): + # Example long document + document = """ + The Industrial Revolution was a period of major industrialization and innovation during the late 18th and early 19th centuries. It began in Britain and later spread to other parts of the world. This era marked a major turning point in human history, fundamentally changing economic and social organization. Steam power, mechanization, and new manufacturing processes revolutionized production methods. The introduction of new technologies and manufacturing techniques led to unprecedented increases in productivity and efficiency. + + The development of new manufacturing processes led to significant changes in how goods were produced. The factory system replaced cottage industries, bringing workers into centralized locations. New iron production processes using coke instead of charcoal dramatically increased output. Textile manufacturing was transformed by inventions like the spinning jenny and power loom. These changes enabled mass production of goods at unprecedented scales. The mechanization of agriculture through innovations like the seed drill and threshing machine reduced the labor needed for farming while increasing food production. + + Social and economic impacts were profound, affecting both urban and rural life. Cities grew rapidly as people moved from rural areas to work in factories. Working conditions were often harsh, with long hours, dangerous conditions, and child labor being common. A new middle class emerged, while traditional skilled craftsmen often struggled. Living conditions in industrial cities were frequently overcrowded and unsanitary, leading to disease outbreaks. The rise of labor movements and trade unions began in response to poor working conditions and low wages. Education systems were developed to provide workers with basic literacy and numeracy skills needed in the new industrial economy. + + Environmental consequences became apparent as industrialization progressed. Coal burning led to severe air pollution in industrial cities, creating smog and respiratory problems. Rivers became contaminated with industrial waste and raw sewage. Deforestation increased as wood was needed for construction and fuel. The landscape was transformed by mines, factories, and expanding urban areas. These changes marked the beginning of large-scale human impact on the environment, leading to problems that would only be recognized and addressed much later. + + Transportation and communication systems underwent revolutionary changes during this period. The development of railways, steam-powered ships, and improved road networks facilitated the movement of goods and people. The telegraph enabled rapid long-distance communication for the first time. These advances in transportation and communication helped create larger markets and more integrated economies. + + The Industrial Revolution also had significant cultural and intellectual impacts. Scientific and technological progress led to increased faith in human capability and reason. The Enlightenment ideals of progress and rationality found practical expression in industrial innovations. New forms of urban culture emerged, while traditional rural ways of life declined. The period saw the rise of new philosophical and political movements, including socialism, in response to industrial conditions. + + Public health and medicine saw both challenges and advances during this period. The concentration of population in cities led to serious health problems, including cholera epidemics and tuberculosis outbreaks. However, these challenges eventually spurred improvements in urban sanitation and medical knowledge. The development of public health systems and modern medical practices can be traced to this era. + + The Industrial Revolution's impact on agriculture extended beyond mechanization. New farming techniques and crop rotation methods increased agricultural productivity. The application of scientific principles to farming led to improved livestock breeding and crop yields. These agricultural advances were necessary to feed growing urban populations and free up labor for industrial work. + + The financial and business systems also evolved significantly. New forms of business organization, such as joint-stock companies, emerged to handle larger-scale industrial operations. Banking and credit systems expanded to provide capital for industrial growth. Insurance companies developed to manage the risks of industrial enterprises. These financial innovations helped fuel continued industrial expansion. + + The role of women in society began to change during this period. While many women worked in factories under difficult conditions, the Industrial Revolution also created new opportunities for women's employment in sectors like textile manufacturing. Middle-class women often found themselves managing increasingly complex households. The seeds of women's rights movements were planted during this era. + + The global impact of the Industrial Revolution was profound and long-lasting. As industrialization spread from Britain to other countries, it created new patterns of international trade and competition. Colonial empires expanded as industrial nations sought raw materials and markets. The technological and economic gaps between industrialized and non-industrialized regions grew, shaping global relationships that persist to the present day. + + The Industrial Revolution's legacy continues to influence modern society. Many current environmental challenges, including climate change, can be traced to the industrial practices established during this period. Modern labor laws, education systems, and urban planning still reflect responses to Industrial Revolution conditions. Understanding this transformative period helps explain many aspects of contemporary life and the ongoing challenges we face. + """ + + query = "What were the major environmental impacts of the Industrial Revolution?" + + result = await process_long_document(document, query) + + print("\n=== Query ===") + print(query) + + print("\n=== Answer ===") + print(result.answer) + + print("\n=== Reasoning ===") + print(result.reasoning) + + if result.supporting_evidence: + print("\n=== Supporting Evidence ===") + for evidence in result.supporting_evidence: + print(f"- {evidence}") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file From b113c6528669b3130623b0a4b89175d9eacee9bb Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Tue, 4 Feb 2025 13:36:42 -0500 Subject: [PATCH 6/7] fix: lint errors --- examples/workflows/chain.py | 68 ++++++++--------- examples/workflows/chain_of_agents.py | 80 ++++++++++---------- examples/workflows/evaluator_optimizer.py | 90 ++++++++++++----------- examples/workflows/orchestrator_worker.py | 68 +++++++++-------- examples/workflows/parallel_processing.py | 77 +++++++++---------- examples/workflows/routing.py | 43 ++++++----- 6 files changed, 226 insertions(+), 200 deletions(-) diff --git a/examples/workflows/chain.py b/examples/workflows/chain.py index e229e0c..a77df96 100644 --- a/examples/workflows/chain.py +++ b/examples/workflows/chain.py @@ -1,25 +1,28 @@ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from typing import TypedDict +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + + class MarketingCopyInput(BaseModel): """The product or concept for which to generate marketing copy.""" + idea: str = Field(description="A short description or name of the product.") class MarketingCopyOutput(BaseModel): """Contains the AI generated marketing copy text for the provided product or concept.""" + marketing_text: str = Field(description="Persuasive marketing copy text.") -@workflowai.agent(id='marketing-copy-generator', model=Model.GPT_4O_MINI_LATEST) -async def generate_marketing_copy_agent( - input: MarketingCopyInput -) -> MarketingCopyOutput: +@workflowai.agent(id="marketing-copy-generator", model=Model.GPT_4O_MINI_LATEST) +async def generate_marketing_copy_agent(_: MarketingCopyInput) -> MarketingCopyOutput: """ - Write persuasive marketing copy for the provided idea. + Write persuasive marketing copy for the provided idea. Focus on benefits and emotional appeal. """ ... @@ -27,20 +30,21 @@ async def generate_marketing_copy_agent( class EvaluateCopyInput(BaseModel): """Input type for evaluating the quality of marketing copy.""" + marketing_text: str = Field(description="The marketing copy text to evaluate.") class EvaluateCopyOutput(BaseModel): """Evaluation results for the marketing copy.""" - hasCallToAction: bool = Field(description="Whether a call to action is present.") - emotionalAppeal: int = Field(description="Emotional appeal rating (1-10).") + + has_call_to_action: bool = Field(description="Whether a call to action is present.") + emotional_appeal: int = Field(description="Emotional appeal rating (1-10).") clarity: int = Field(description="Clarity rating (1-10).") + # We use a smarter model (O1) to review the copy since evaluation requires more nuanced understanding -@workflowai.agent(id='marketing-copy-evaluator', model=Model.O1_MINI_LATEST) -async def evaluate_marketing_copy_agent( - input: EvaluateCopyInput -) -> EvaluateCopyOutput: +@workflowai.agent(id="marketing-copy-evaluator", model=Model.O1_MINI_LATEST) +async def evaluate_marketing_copy_agent(_: EvaluateCopyInput) -> EvaluateCopyOutput: """ Evaluate the marketing copy for: 1) Presence of a call to action (true/false) @@ -53,6 +57,7 @@ async def evaluate_marketing_copy_agent( class RewriteCopyInput(BaseModel): """Input for rewriting the marketing copy with targeted improvements.""" + original_copy: str = Field(default="", description="Original marketing copy.") add_call_to_action: bool = Field(default=False, description="Whether we need a clear call to action.") strengthen_emotional_appeal: bool = Field(default=False, description="Whether emotional appeal needs a boost.") @@ -61,13 +66,13 @@ class RewriteCopyInput(BaseModel): class RewriteCopyOutput(BaseModel): """Contains the improved marketing copy.""" + marketing_text: str = Field(description="The improved marketing copy text.") + # Claude 3.5 Sonnet is a more powerful model for copywriting @workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) -async def rewrite_marketing_copy_agent( - input: RewriteCopyInput -) -> RewriteCopyOutput: +async def rewrite_marketing_copy_agent(_: RewriteCopyInput) -> RewriteCopyOutput: """ Rewrite the marketing copy with the specified improvements: - A clear CTA if requested @@ -83,6 +88,7 @@ class MarketingResult(TypedDict): was_improved: bool quality_metrics: EvaluateCopyOutput + async def generate_marketing_copy(idea: str) -> MarketingResult: """ Demonstrates a chain flow: @@ -100,20 +106,16 @@ async def generate_marketing_copy(idea: str) -> MarketingResult: evaluation = await evaluate_marketing_copy_agent(EvaluateCopyInput(marketing_text=original_copy)) # Step 3: Check evaluation results. If below thresholds, rewrite - needs_improvement = ( - not evaluation.hasCallToAction - or evaluation.emotionalAppeal < 7 - or evaluation.clarity < 7 - ) - + needs_improvement = not evaluation.has_call_to_action or evaluation.emotional_appeal < 7 or evaluation.clarity < 7 + if needs_improvement: rewrite = await rewrite_marketing_copy_agent( RewriteCopyInput( original_copy=original_copy, - add_call_to_action=not evaluation.hasCallToAction, - strengthen_emotional_appeal=evaluation.emotionalAppeal < 7, + add_call_to_action=not evaluation.has_call_to_action, + strengthen_emotional_appeal=evaluation.emotional_appeal < 7, improve_clarity=evaluation.clarity < 7, - ) + ), ) final_copy = rewrite.marketing_text @@ -121,26 +123,26 @@ async def generate_marketing_copy(idea: str) -> MarketingResult: "original_copy": original_copy, "final_copy": final_copy, "was_improved": needs_improvement, - "quality_metrics": evaluation + "quality_metrics": evaluation, } if __name__ == "__main__": idea = "A open-source platform for AI agents" result = asyncio.run(generate_marketing_copy(idea)) - + print("\n=== Input Idea ===") print(idea) - + print("\n=== Marketing Copy ===") print(result["original_copy"]) - + print("\n=== Quality Assessment ===") metrics = result["quality_metrics"] - print(f"✓ Call to Action: {'Present' if metrics.hasCallToAction else 'Missing'}") - print(f"✓ Emotional Appeal: {metrics.emotionalAppeal}/10") + print(f"✓ Call to Action: {'Present' if metrics.has_call_to_action else 'Missing'}") + print(f"✓ Emotional Appeal: {metrics.emotional_appeal}/10") print(f"✓ Clarity: {metrics.clarity}/10") - + if result["was_improved"]: print("\n=== Improved Marketing Copy ===") print(result["final_copy"]) diff --git a/examples/workflows/chain_of_agents.py b/examples/workflows/chain_of_agents.py index 21978c0..6bdd169 100644 --- a/examples/workflows/chain_of_agents.py +++ b/examples/workflows/chain_of_agents.py @@ -15,82 +15,89 @@ - No training required, works with any LLM """ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from typing import List +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + class DocumentChunk(BaseModel): """A chunk of text from a long document.""" + content: str = Field(description="The content of this document chunk.") class WorkerInput(BaseModel): """Input for a worker agent processing a document chunk.""" + chunk: DocumentChunk = Field(description="The current chunk to process.") query: str = Field(description="The query or task to be answered.") previous_findings: str = Field( default="", - description="Accumulated findings from previous workers." + description="Accumulated findings from previous workers.", ) class WorkerOutput(BaseModel): """Output from a worker agent containing findings and evidence.""" + findings: str = Field( - description="Key findings and information extracted from this chunk." + description="Key findings and information extracted from this chunk.", ) evidence: List[str] = Field( default_factory=list, - description="Supporting evidence or quotes from the chunk." + description="Supporting evidence or quotes from the chunk.", ) message_for_next: str = Field( - description="Message to pass to the next worker with relevant context." + description="Message to pass to the next worker with relevant context.", ) class ManagerInput(BaseModel): """Input for the manager agent to generate final response.""" + query: str = Field(description="The original query to answer.") accumulated_findings: str = Field( - description="All findings accumulated through the worker chain." + description="All findings accumulated through the worker chain.", ) class ManagerOutput(BaseModel): """Final output from the manager agent.""" + answer: str = Field(description="The final answer to the query.") reasoning: str = Field(description="Explanation of how the answer was derived.") supporting_evidence: List[str] = Field( default_factory=list, - description="Key evidence supporting the answer." + description="Key evidence supporting the answer.", ) -@workflowai.agent(id='document-worker', model=Model.GPT_4O_MINI_LATEST) -async def worker_agent(input: WorkerInput) -> WorkerOutput: +@workflowai.agent(id="document-worker", model=Model.GPT_4O_MINI_LATEST) +async def worker_agent(_: WorkerInput) -> WorkerOutput: """ Process your assigned document chunk to: 1. Extract relevant information related to the query 2. Build upon findings from previous workers 3. Pass important context to the next worker - + Be concise but thorough in your analysis. Focus on information that could be relevant to the query. """ ... -@workflowai.agent(id='document-manager', model=Model.CLAUDE_3_5_SONNET_LATEST) -async def manager_agent(input: ManagerInput) -> ManagerOutput: +@workflowai.agent(id="document-manager", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def manager_agent(_: ManagerInput) -> ManagerOutput: """ Synthesize all findings from the worker agents to: 1. Generate a comprehensive answer to the query 2. Provide clear reasoning for your answer 3. Include supporting evidence from the document - + Ensure your answer is well-supported by the accumulated findings. """ ... @@ -99,7 +106,7 @@ async def manager_agent(input: ManagerInput) -> ManagerOutput: async def process_long_document( document: str, query: str, - chunk_size: int = 2000 + chunk_size: int = 2000, ) -> ManagerOutput: """ Process a long document using Chain of Agents pattern: @@ -109,43 +116,40 @@ async def process_long_document( 4. Have manager agent synthesize final answer """ # Split document into chunks - chunks = [ - document[i:i + chunk_size] - for i in range(0, len(document), chunk_size) - ] - + chunks = [document[i : i + chunk_size] for i in range(0, len(document), chunk_size)] + # Convert to DocumentChunk objects doc_chunks = [DocumentChunk(content=chunk) for chunk in chunks] - + # Initialize accumulator for findings accumulated_findings = "" - + # Process chunks sequentially through worker chain for chunk in doc_chunks: worker_input = WorkerInput( chunk=chunk, query=query, - previous_findings=accumulated_findings + previous_findings=accumulated_findings, ) - + # Get worker's output worker_output = await worker_agent(worker_input) - + # Accumulate findings if accumulated_findings: accumulated_findings += "\n\n" accumulated_findings += f"Findings:\n{worker_output.findings}" - + if worker_output.evidence: accumulated_findings += "\nEvidence:\n- " accumulated_findings += "\n- ".join(worker_output.evidence) - + # Have manager synthesize final answer manager_input = ManagerInput( query=query, - accumulated_findings=accumulated_findings + accumulated_findings=accumulated_findings, ) - + return await manager_agent(manager_input) @@ -175,21 +179,21 @@ async def main(): The global impact of the Industrial Revolution was profound and long-lasting. As industrialization spread from Britain to other countries, it created new patterns of international trade and competition. Colonial empires expanded as industrial nations sought raw materials and markets. The technological and economic gaps between industrialized and non-industrialized regions grew, shaping global relationships that persist to the present day. The Industrial Revolution's legacy continues to influence modern society. Many current environmental challenges, including climate change, can be traced to the industrial practices established during this period. Modern labor laws, education systems, and urban planning still reflect responses to Industrial Revolution conditions. Understanding this transformative period helps explain many aspects of contemporary life and the ongoing challenges we face. - """ - + """ # noqa: E501 + query = "What were the major environmental impacts of the Industrial Revolution?" - + result = await process_long_document(document, query) - + print("\n=== Query ===") print(query) - + print("\n=== Answer ===") print(result.answer) - + print("\n=== Reasoning ===") print(result.reasoning) - + if result.supporting_evidence: print("\n=== Supporting Evidence ===") for evidence in result.supporting_evidence: @@ -197,4 +201,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/examples/workflows/evaluator_optimizer.py b/examples/workflows/evaluator_optimizer.py index efa5e5c..2867092 100644 --- a/examples/workflows/evaluator_optimizer.py +++ b/examples/workflows/evaluator_optimizer.py @@ -1,26 +1,28 @@ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from typing import List, TypedDict +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + class TranslationInput(BaseModel): """Input for translation.""" + text: str = Field(description="The text to translate.") target_language: str = Field(description="The target language for translation.") class TranslationOutput(BaseModel): """Output containing the translated text.""" + translation: str = Field(description="The translated text.") # Uses GPT-4O Mini for fast initial translation -@workflowai.agent(id='initial-translator', model=Model.GPT_4O_MINI_LATEST) -async def initial_translation_agent( - input: TranslationInput -) -> TranslationOutput: +@workflowai.agent(id="initial-translator", model=Model.GPT_4O_MINI_LATEST) +async def initial_translation_agent(_: TranslationInput) -> TranslationOutput: """ Expert literary translator. Translate text while preserving tone and cultural nuances. @@ -30,6 +32,7 @@ async def initial_translation_agent( class TranslationEvaluationInput(BaseModel): """Input for evaluating translation quality.""" + original_text: str = Field(description="The original text.") translation: str = Field(description="The translation to evaluate.") target_language: str = Field(description="The target language of the translation.") @@ -37,6 +40,7 @@ class TranslationEvaluationInput(BaseModel): class TranslationEvaluationOutput(BaseModel): """Output containing the translation evaluation.""" + quality_score: int = Field(description="Overall quality score (1-10).") preserves_tone: bool = Field(description="Whether the translation preserves the original tone.") preserves_nuance: bool = Field(description="Whether the translation preserves subtle nuances.") @@ -46,10 +50,8 @@ class TranslationEvaluationOutput(BaseModel): # Uses O1 for its strong analytical and evaluation capabilities -@workflowai.agent(id='translation-evaluator', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) -async def evaluate_translation_agent( - input: TranslationEvaluationInput -) -> TranslationEvaluationOutput: +@workflowai.agent(id="translation-evaluator", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def evaluate_translation_agent(_: TranslationEvaluationInput) -> TranslationEvaluationOutput: """ Expert in evaluating literary translations. Evaluate translations for quality, tone preservation, nuance, and cultural accuracy. @@ -59,6 +61,7 @@ async def evaluate_translation_agent( class TranslationImprovementInput(BaseModel): """Input for improving translation based on feedback.""" + original_text: str = Field(description="The original text.") current_translation: str = Field(description="The current translation.") target_language: str = Field(description="The target language.") @@ -68,14 +71,13 @@ class TranslationImprovementInput(BaseModel): class TranslationImprovementOutput(BaseModel): """Output containing the improved translation.""" + translation: str = Field(description="The improved translation.") # Uses GPT-4O for high-quality translation refinement -@workflowai.agent(id='translation-improver', model=Model.GPT_4O_LATEST) -async def improve_translation_agent( - input: TranslationImprovementInput -) -> TranslationImprovementOutput: +@workflowai.agent(id="translation-improver", model=Model.GPT_4O_LATEST) +async def improve_translation_agent(_: TranslationImprovementInput) -> TranslationImprovementOutput: """ Expert literary translator. Improve translations based on specific feedback while maintaining overall quality. @@ -94,38 +96,38 @@ async def translate_with_feedback(text: str, target_language: str) -> Translatio 1. Generate initial translation with a faster model 2. Evaluate translation quality 3. If quality threshold not met, improve based on feedback - 4. Repeat evaluation-improvement cycle up to MAX_ITERATIONS + 4. Repeat evaluation-improvement cycle up to max_iterations """ - MAX_ITERATIONS = 3 + max_iterations = 3 iterations = 0 - + # Initial translation using faster model current_translation = await initial_translation_agent( TranslationInput( text=text, - target_language=target_language - ) + target_language=target_language, + ), ) - - while iterations < MAX_ITERATIONS: + + while iterations < max_iterations: # Evaluate current translation evaluation = await evaluate_translation_agent( TranslationEvaluationInput( original_text=text, translation=current_translation.translation, - target_language=target_language - ) + target_language=target_language, + ), ) - + # Check if quality meets threshold if ( - evaluation.quality_score >= 8 and - evaluation.preserves_tone and - evaluation.preserves_nuance and - evaluation.culturally_accurate + evaluation.quality_score >= 8 + and evaluation.preserves_tone + and evaluation.preserves_nuance + and evaluation.culturally_accurate ): break - + # Generate improved translation based on feedback improved = await improve_translation_agent( TranslationImprovementInput( @@ -133,36 +135,36 @@ async def translate_with_feedback(text: str, target_language: str) -> Translatio current_translation=current_translation.translation, target_language=target_language, specific_issues=evaluation.specific_issues, - improvement_suggestions=evaluation.improvement_suggestions - ) + improvement_suggestions=evaluation.improvement_suggestions, + ), ) - + current_translation.translation = improved.translation iterations += 1 - + return { "final_translation": current_translation.translation, - "iterations_required": iterations + "iterations_required": iterations, } if __name__ == "__main__": # Example text to translate text = """ - The old bookstore, nestled in the heart of the city, - was a sanctuary for bibliophiles. Its wooden shelves, - worn smooth by decades of searching hands, held stories - in dozens of languages. The owner, a silver-haired woman + The old bookstore, nestled in the heart of the city, + was a sanctuary for bibliophiles. Its wooden shelves, + worn smooth by decades of searching hands, held stories + in dozens of languages. The owner, a silver-haired woman with kind eyes, knew each book's location by heart. """ target_language = "French" - + result = asyncio.run(translate_with_feedback(text, target_language)) - + print("\n=== Translation Result ===") - print(f"\nOriginal Text:") + print("\nOriginal Text:") print(text) - print(f"\nFinal Translation:") + print("\nFinal Translation:") print(result["final_translation"]) print(f"\nIterations Required: {result['iterations_required']}") - print() \ No newline at end of file + print() diff --git a/examples/workflows/orchestrator_worker.py b/examples/workflows/orchestrator_worker.py index eb84383..abe3fd5 100644 --- a/examples/workflows/orchestrator_worker.py +++ b/examples/workflows/orchestrator_worker.py @@ -1,10 +1,12 @@ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from enum import Enum from typing import List, TypedDict +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + class ChangeType(str, Enum): CREATE = "create" @@ -20,6 +22,7 @@ class ComplexityLevel(str, Enum): class FileChange(BaseModel): """Describes a planned change to a file.""" + purpose: str = Field(description="Purpose of the change in the context of the feature.") file_path: str = Field(description="Path to the file to be changed.") change_type: ChangeType = Field(description="Type of change to be made.") @@ -27,20 +30,20 @@ class FileChange(BaseModel): class ImplementationPlanInput(BaseModel): """Input for generating an implementation plan.""" + feature_request: str = Field(description="The feature request to implement.") class ImplementationPlanOutput(BaseModel): """Output containing the implementation plan.""" + files: List[FileChange] = Field(description="List of files to be changed.") estimated_complexity: ComplexityLevel = Field(description="Estimated complexity of the implementation.") # Uses O1 for its strong architectural planning capabilities -@workflowai.agent(id='implementation-planner', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) -async def plan_implementation_agent( - input: ImplementationPlanInput -) -> ImplementationPlanOutput: +@workflowai.agent(id="implementation-planner", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def plan_implementation_agent(_: ImplementationPlanInput) -> ImplementationPlanOutput: """ Senior software architect planning feature implementations. Analyze feature requests and create detailed implementation plans. @@ -50,6 +53,7 @@ async def plan_implementation_agent( class FileImplementationInput(BaseModel): """Input for implementing file changes.""" + file_path: str = Field(description="Path to the file being changed.") purpose: str = Field(description="Purpose of the change.") feature_request: str = Field(description="Overall feature context.") @@ -58,15 +62,14 @@ class FileImplementationInput(BaseModel): class FileImplementationOutput(BaseModel): """Output containing the implemented changes.""" + explanation: str = Field(description="Explanation of the implemented changes.") code: str = Field(description="The implemented code changes.") # Uses GPT-4O for its strong code generation and modification capabilities -@workflowai.agent(id='file-implementer', model=Model.GPT_4O_LATEST) -async def implement_file_changes_agent( - input: FileImplementationInput -) -> FileImplementationOutput: +@workflowai.agent(id="file-implementer", model=Model.GPT_4O_LATEST) +async def implement_file_changes_agent(_: FileImplementationInput) -> FileImplementationOutput: """ Expert at implementing code changes based on the change type: - CREATE: Implement new files following best practices and project patterns @@ -95,33 +98,36 @@ async def implement_feature(feature_request: str) -> FeatureImplementationResult """ # Orchestrator: Plan the implementation implementation_plan = await plan_implementation_agent( - ImplementationPlanInput(feature_request=feature_request) + ImplementationPlanInput(feature_request=feature_request), ) # Workers: Execute the planned changes in parallel - file_changes = await asyncio.gather(*[ - implement_file_changes_agent( - FileImplementationInput( - file_path=file.file_path, - purpose=file.purpose, - feature_request=feature_request, - change_type=file.change_type + file_changes = await asyncio.gather( + *[ + implement_file_changes_agent( + FileImplementationInput( + file_path=file.file_path, + purpose=file.purpose, + feature_request=feature_request, + change_type=file.change_type, + ), ) - ) for file in implementation_plan.files - ]) + for file in implementation_plan.files + ], + ) # Combine results changes: List[ImplementationChange] = [ { "file": implementation_plan.files[i], - "implementation": change + "implementation": change, } for i, change in enumerate(file_changes) ] return { "plan": implementation_plan, - "changes": changes + "changes": changes, } @@ -136,19 +142,19 @@ async def implement_feature(feature_request: str) -> FeatureImplementationResult """ result = asyncio.run(implement_feature(feature_request)) - + print("\n=== Implementation Plan ===") print(f"Estimated Complexity: {result['plan'].estimated_complexity}") print("\nPlanned Changes:") - for file in result['plan'].files: + for file in result["plan"].files: print(f"\n- {file.change_type.upper()}: {file.file_path}") print(f" Purpose: {file.purpose}") - + print("\n=== Implemented Changes ===") - for change in result['changes']: - file = change['file'] - impl = change['implementation'] - + for change in result["changes"]: + file = change["file"] + impl = change["implementation"] + print(f"\n=== {file.change_type.upper()}: {file.file_path} ===") print("\nPurpose:") print(file.purpose) @@ -156,4 +162,4 @@ async def implement_feature(feature_request: str) -> FeatureImplementationResult print(impl.explanation) print("\nCode:") print(impl.code) - print() \ No newline at end of file + print() diff --git a/examples/workflows/parallel_processing.py b/examples/workflows/parallel_processing.py index 2bbdb46..5b2f27c 100644 --- a/examples/workflows/parallel_processing.py +++ b/examples/workflows/parallel_processing.py @@ -1,9 +1,11 @@ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from enum import Enum -from typing import TypedDict, List +from typing import List, TypedDict + +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model class RiskLevel(str, Enum): @@ -14,20 +16,21 @@ class RiskLevel(str, Enum): class SecurityReviewInput(BaseModel): """Input for security code review.""" + code: str = Field(description="The code to review for security issues.") class SecurityReviewOutput(BaseModel): """Output from security code review.""" + vulnerabilities: List[str] = Field(description="List of identified security vulnerabilities.") risk_level: RiskLevel = Field(description="Overall security risk level.") suggestions: List[str] = Field(description="Security improvement suggestions.") + # Uses Claude 3.5 Sonnet for its strong security analysis capabilities -@workflowai.agent(id='security-reviewer', model=Model.CLAUDE_3_5_SONNET_LATEST) -async def security_review_agent( - input: SecurityReviewInput -) -> SecurityReviewOutput: +@workflowai.agent(id="security-reviewer", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def security_review_agent(_: SecurityReviewInput) -> SecurityReviewOutput: """ Expert in code security. Focus on identifying security vulnerabilities, injection risks, and authentication issues. @@ -37,21 +40,21 @@ async def security_review_agent( class PerformanceReviewInput(BaseModel): """Input for performance code review.""" + code: str = Field(description="The code to review for performance issues.") class PerformanceReviewOutput(BaseModel): """Output from performance code review.""" + issues: List[str] = Field(description="List of identified performance issues.") impact: RiskLevel = Field(description="Impact level of performance issues.") optimizations: List[str] = Field(description="Performance optimization suggestions.") # Uses O1 Mini for its expertise in performance optimization -@workflowai.agent(id='performance-reviewer', model=Model.O1_MINI_LATEST) -async def performance_review_agent( - input: PerformanceReviewInput -) -> PerformanceReviewOutput: +@workflowai.agent(id="performance-reviewer", model=Model.O1_MINI_LATEST) +async def performance_review_agent(_: PerformanceReviewInput) -> PerformanceReviewOutput: """ Expert in code performance. Focus on identifying performance bottlenecks, memory leaks, and optimization opportunities. @@ -61,21 +64,21 @@ async def performance_review_agent( class MaintainabilityReviewInput(BaseModel): """Input for maintainability code review.""" + code: str = Field(description="The code to review for maintainability issues.") class MaintainabilityReviewOutput(BaseModel): """Output from maintainability code review.""" + concerns: List[str] = Field(description="List of maintainability concerns.") quality_score: int = Field(description="Code quality score (1-10).", ge=1, le=10) recommendations: List[str] = Field(description="Maintainability improvement recommendations.") # Uses Claude 3.5 Sonnet for its strong code quality and readability analysis -@workflowai.agent(id='maintainability-reviewer', model=Model.CLAUDE_3_5_SONNET_LATEST) -async def maintainability_review_agent( - input: MaintainabilityReviewInput -) -> MaintainabilityReviewOutput: +@workflowai.agent(id="maintainability-reviewer", model=Model.CLAUDE_3_5_SONNET_LATEST) +async def maintainability_review_agent(_: MaintainabilityReviewInput) -> MaintainabilityReviewOutput: """ Expert in code quality. Focus on code structure, readability, and adherence to best practices. @@ -85,6 +88,7 @@ async def maintainability_review_agent( class ReviewSummaryInput(BaseModel): """Input for review summary generation.""" + security_review: SecurityReviewOutput = Field(description="Security review results.") performance_review: PerformanceReviewOutput = Field(description="Performance review results.") maintainability_review: MaintainabilityReviewOutput = Field(description="Maintainability review results.") @@ -92,14 +96,13 @@ class ReviewSummaryInput(BaseModel): class ReviewSummaryOutput(BaseModel): """Output containing the summarized review.""" + summary: str = Field(description="Concise summary of all reviews with key actions.") # Uses O1 for its strong synthesis and summarization abilities -@workflowai.agent(id='review-summarizer', model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) -async def summarize_reviews_agent( - input: ReviewSummaryInput -) -> ReviewSummaryOutput: +@workflowai.agent(id="review-summarizer", model=Model.O1_2024_12_17_HIGH_REASONING_EFFORT) +async def summarize_reviews_agent(_: ReviewSummaryInput) -> ReviewSummaryOutput: """ Technical lead summarizing multiple code reviews. Synthesize review results into a concise summary with key actions. @@ -126,7 +129,7 @@ async def parallel_code_review(code: str) -> CodeReviewResult: security_review, performance_review, maintainability_review = await asyncio.gather( security_review_agent(SecurityReviewInput(code=code)), performance_review_agent(PerformanceReviewInput(code=code)), - maintainability_review_agent(MaintainabilityReviewInput(code=code)) + maintainability_review_agent(MaintainabilityReviewInput(code=code)), ) # Aggregate and summarize results @@ -134,15 +137,15 @@ async def parallel_code_review(code: str) -> CodeReviewResult: ReviewSummaryInput( security_review=security_review, performance_review=performance_review, - maintainability_review=maintainability_review - ) + maintainability_review=maintainability_review, + ), ) return { "security_review": security_review, "performance_review": performance_review, "maintainability_review": maintainability_review, - "summary": summary.summary + "summary": summary.summary, } @@ -153,11 +156,11 @@ def process_user_input(user_input): # Execute the input as a command result = eval(user_input) return result - + def cache_data(data): # Store everything in memory global_cache.append(data) - + def get_user_data(user_id): # Query without parameterization cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") @@ -165,34 +168,34 @@ def get_user_data(user_id): """ result = asyncio.run(parallel_code_review(code_to_review)) - + print("\n=== Security Review ===") print(f"Risk Level: {result['security_review'].risk_level}") print("\nVulnerabilities:") - for v in result['security_review'].vulnerabilities: + for v in result["security_review"].vulnerabilities: print(f"- {v}") print("\nSuggestions:") - for s in result['security_review'].suggestions: + for s in result["security_review"].suggestions: print(f"- {s}") - + print("\n=== Performance Review ===") print(f"Impact Level: {result['performance_review'].impact}") print("\nIssues:") - for i in result['performance_review'].issues: + for i in result["performance_review"].issues: print(f"- {i}") print("\nOptimizations:") - for o in result['performance_review'].optimizations: + for o in result["performance_review"].optimizations: print(f"- {o}") - + print("\n=== Maintainability Review ===") print(f"Quality Score: {result['maintainability_review'].quality_score}/10") print("\nConcerns:") - for c in result['maintainability_review'].concerns: + for c in result["maintainability_review"].concerns: print(f"- {c}") print("\nRecommendations:") - for r in result['maintainability_review'].recommendations: + for r in result["maintainability_review"].recommendations: print(f"- {r}") - + print("\n=== Summary ===") print(result["summary"]) - print() \ No newline at end of file + print() diff --git a/examples/workflows/routing.py b/examples/workflows/routing.py index 5687b69..81a82f2 100644 --- a/examples/workflows/routing.py +++ b/examples/workflows/routing.py @@ -1,10 +1,12 @@ -import workflowai -from workflowai import Model -from pydantic import BaseModel, Field import asyncio from enum import Enum from typing import TypedDict +from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType] + +import workflowai +from workflowai import Model + class QueryType(str, Enum): GENERAL = "general" @@ -19,20 +21,22 @@ class ComplexityLevel(str, Enum): class ClassificationInput(BaseModel): """Input for query classification.""" + query: str = Field(description="The customer query to classify.") class ClassificationOutput(BaseModel): """Output containing query classification details.""" + reasoning: str = Field(description="Brief reasoning for the classification.") type: QueryType = Field(description="Type of the query (general, refund, or technical).") complexity: ComplexityLevel = Field(description="Complexity level of the query.") # Uses GPT-4O for its strong analytical and classification capabilities -@workflowai.agent(id='query-classifier', model=Model.GPT_4O_LATEST) +@workflowai.agent(id="query-classifier", model=Model.GPT_4O_LATEST) async def classify_query_agent( - input: ClassificationInput + _: ClassificationInput, ) -> ClassificationOutput: """ Classify the customer query by: @@ -45,32 +49,34 @@ async def classify_query_agent( class ResponseInput(BaseModel): """Input for generating customer response.""" + query: str = Field(description="The customer query to respond to.") query_type: QueryType = Field(description="Type of the query for specialized handling.") class ResponseOutput(BaseModel): """Output containing the response to the customer.""" + response: str = Field(description="The generated response to the customer query.") # Uses Claude 3.5 Sonnet for its strong conversational abilities and empathy @workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST) -async def handle_general_query(input: ResponseInput) -> ResponseOutput: +async def handle_general_query(_: ResponseInput) -> ResponseOutput: """Expert customer service agent handling general inquiries.""" ... # Uses GPT-4O Mini for efficient policy-based responses @workflowai.agent(model=Model.GPT_4O_MINI_LATEST) -async def handle_refund_query(input: ResponseInput) -> ResponseOutput: +async def handle_refund_query(_: ResponseInput) -> ResponseOutput: """Customer service agent specializing in refund requests.""" ... # Uses O1 Mini for its technical expertise and problem-solving capabilities @workflowai.agent(model=Model.O1_MINI_LATEST) -async def handle_technical_query(input: ResponseInput) -> ResponseOutput: +async def handle_technical_query(_: ResponseInput) -> ResponseOutput: """Technical support specialist providing troubleshooting assistance.""" ... @@ -104,29 +110,32 @@ async def handle_customer_query(query: str) -> QueryResult: handler = handlers[(classification.type, classification.complexity)] # Generate response - result = await handler(ResponseInput( - query=query, - query_type=classification.type - )) + result = await handler( + ResponseInput( + query=query, + query_type=classification.type, + ), + ) return { "response": result.response, - "classification": classification + "classification": classification, } if __name__ == "__main__": - query = "I'm having trouble logging into my account. It keeps saying invalid password even though I'm sure it's correct." + query = "I'm having trouble logging into my account." + " It keeps saying invalid password even though I'm sure it's correct." result = asyncio.run(handle_customer_query(query)) - + print("\n=== Customer Query ===") print(query) - + print("\n=== Classification ===") print(f"Type: {result['classification'].type}") print(f"Complexity: {result['classification'].complexity}") print(f"Reasoning: {result['classification'].reasoning}") - + print("\n=== Response ===") print(result["response"]) print() From 172587636821e32651649ea4db20fbeba20efa04 Mon Sep 17 00:00:00 2001 From: Guillaume Aquilina Date: Tue, 4 Feb 2025 13:40:35 -0500 Subject: [PATCH 7/7] fix: environment for examples script --- .github/workflows/examples.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml index 4053d60..3f34ebe 100644 --- a/.github/workflows/examples.yml +++ b/.github/workflows/examples.yml @@ -14,6 +14,8 @@ env: jobs: examples: + # TODO: we should run only in prod + environment: staging runs-on: ubuntu-latest steps: - uses: actions/checkout@v3