Skip to content

Workflows #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ env:

jobs:
examples:
# TODO: we should run only in prod
environment: staging
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ validation that is too strict can lead to invalid generations. In case of an inv
def my_agent(_: Input) -> :...
```

## Workflows

For advanced workflow patterns and examples, please refer to the [Workflows README](examples/workflows/README.md) for more details.

## Contributing

See the [CONTRIBUTING.md](./CONTRIBUTING.md) file for more details.
65 changes: 65 additions & 0 deletions examples/workflows/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Workflows Patterns

This README describes the five main patterns used in our workflows built using the WorkflowAI SDK. These patterns provide a structured method for composing complex AI tasks from simpler components, and allow you to balance flexibility and control in your AI applications.

## 1. Sequential Processing (Chains)
In this pattern, tasks are executed in a fixed sequence, where the output of one step becomes the input for the next. This is ideal for linear processes such as content generation, data transformation, or any task that benefits from a clear, step-by-step flow.

**Example:**
- Generate an initial result (e.g., marketing copy).
- Evaluate and refine that result through subsequent steps.

For an implementation example, see [chain.py](chain.py).

## 2. Routing
The routing pattern directs work based on intermediate results. An initial decision or classification determines which specialized agent should handle the next part of the workflow. This allows the system to adapt its behavior based on context (for instance, routing customer queries to different support teams).

**Example:**
- Classify a customer query (e.g., as general, refund, or technical).
- Route the query to a specialized agent that handles that particular type.

For an implementation example, see [routing.py](routing.py).

## 3. Parallel Processing
Parallel processing splits work into independent subtasks that run concurrently. This pattern is used when different aspects of an input can be handled independently, leading to faster overall processing times.

**Example:**
- Perform security, performance, and maintainability reviews on code simultaneously.
- Collect and aggregate the results after all tasks complete.

For an implementation example, see [parallel_processing.py](parallel_processing.py).

## 4. Orchestrator-Worker
In the orchestrator-worker pattern, one agent (the orchestrator) plans and coordinates the work, while multiple worker agents execute the individual parts of the plan in parallel. This pattern is particularly useful when a task can be decomposed into distinct planning and execution phases.

**Example:**
- An orchestrator analyzes a feature request and generates an implementation plan with details on file changes.
- Worker agents then execute the planned changes concurrently.

For an implementation example, see [orchestrator_worker.py](orchestrator_worker.py).

## 5. Evaluator-Optimizer
The evaluator-optimizer pattern employs an iterative feedback loop. An initial output is evaluated for quality, and based on the feedback, improvements are made. This cycle is repeated until the output reaches the desired quality, or a maximum number of iterations is met.

**Example:**
- Translate text using a fast initial model.
- Evaluate the translation's quality, tone, nuance, and cultural accuracy.
- If needed, refine the translation based on detailed feedback and repeat the process.

For an implementation example, see [evaluator_optimizer.py](evaluator_optimizer.py).

## 6. Chain of Agents (Long Context Processing)
The Chain of Agents pattern is designed for processing long documents or complex tasks that exceed the context window of a single model. In this pattern, multiple worker agents process different chunks of the input sequentially, passing their findings through the chain, while a manager agent synthesizes the final output.

**Example:**
- Split a long document into manageable chunks
- Worker agents process each chunk sequentially, building upon previous findings
- A manager agent synthesizes all findings into a final, coherent response

For an implementation example, see [chain_of_agents.py](chain_of_agents.py).

---

These patterns were inspired by the workflow patterns described in the [Vercel AI SDK Documentation](https://sdk.vercel.ai/docs/foundations/agents#patterns-with-examples) and research from organizations like [Google Research](https://research.google/blog/chain-of-agents-large-language-models-collaborating-on-long-context-tasks/).

This README should serve as a high-level guide to understanding and using the different patterns available in our workflows.
149 changes: 149 additions & 0 deletions examples/workflows/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import asyncio
from typing import TypedDict

from pydantic import BaseModel, Field # pyright: ignore [reportUnknownVariableType]

import workflowai
from workflowai import Model


class MarketingCopyInput(BaseModel):
"""The product or concept for which to generate marketing copy."""

idea: str = Field(description="A short description or name of the product.")


class MarketingCopyOutput(BaseModel):
"""Contains the AI generated marketing copy text for the provided product or concept."""

marketing_text: str = Field(description="Persuasive marketing copy text.")


@workflowai.agent(id="marketing-copy-generator", model=Model.GPT_4O_MINI_LATEST)
async def generate_marketing_copy_agent(_: MarketingCopyInput) -> MarketingCopyOutput:
"""
Write persuasive marketing copy for the provided idea.
Focus on benefits and emotional appeal.
"""
...


class EvaluateCopyInput(BaseModel):
"""Input type for evaluating the quality of marketing copy."""

marketing_text: str = Field(description="The marketing copy text to evaluate.")


class EvaluateCopyOutput(BaseModel):
"""Evaluation results for the marketing copy."""

has_call_to_action: bool = Field(description="Whether a call to action is present.")
emotional_appeal: int = Field(description="Emotional appeal rating (1-10).")
clarity: int = Field(description="Clarity rating (1-10).")


# We use a smarter model (O1) to review the copy since evaluation requires more nuanced understanding
@workflowai.agent(id="marketing-copy-evaluator", model=Model.O1_MINI_LATEST)
async def evaluate_marketing_copy_agent(_: EvaluateCopyInput) -> EvaluateCopyOutput:
"""
Evaluate the marketing copy for:
1) Presence of a call to action (true/false)
2) Emotional appeal (1-10)
3) Clarity (1-10)
Return the results as a structured output.
"""
...


class RewriteCopyInput(BaseModel):
"""Input for rewriting the marketing copy with targeted improvements."""

original_copy: str = Field(default="", description="Original marketing copy.")
add_call_to_action: bool = Field(default=False, description="Whether we need a clear call to action.")
strengthen_emotional_appeal: bool = Field(default=False, description="Whether emotional appeal needs a boost.")
improve_clarity: bool = Field(default=False, description="Whether clarity needs improvement.")


class RewriteCopyOutput(BaseModel):
"""Contains the improved marketing copy."""

marketing_text: str = Field(description="The improved marketing copy text.")


# Claude 3.5 Sonnet is a more powerful model for copywriting
@workflowai.agent(model=Model.CLAUDE_3_5_SONNET_LATEST)
async def rewrite_marketing_copy_agent(_: RewriteCopyInput) -> RewriteCopyOutput:
"""
Rewrite the marketing copy with the specified improvements:
- A clear CTA if requested
- Stronger emotional appeal if requested
- Improved clarity if requested
"""
...


class MarketingResult(TypedDict):
original_copy: str
final_copy: str
was_improved: bool
quality_metrics: EvaluateCopyOutput


async def generate_marketing_copy(idea: str) -> MarketingResult:
"""
Demonstrates a chain flow:
1) Generate an initial marketing copy.
2) Evaluate its quality.
3) If the quality is lacking, request a rewrite with clearer CTA, stronger emotional appeal, and/or clarity.
4) Return the final copy and metrics.
"""
# Step 1: Generate initial copy
generation = await generate_marketing_copy_agent(MarketingCopyInput(idea=idea))
original_copy = generation.marketing_text
final_copy = original_copy

# Step 2: Evaluate the copy
evaluation = await evaluate_marketing_copy_agent(EvaluateCopyInput(marketing_text=original_copy))

# Step 3: Check evaluation results. If below thresholds, rewrite
needs_improvement = not evaluation.has_call_to_action or evaluation.emotional_appeal < 7 or evaluation.clarity < 7

if needs_improvement:
rewrite = await rewrite_marketing_copy_agent(
RewriteCopyInput(
original_copy=original_copy,
add_call_to_action=not evaluation.has_call_to_action,
strengthen_emotional_appeal=evaluation.emotional_appeal < 7,
improve_clarity=evaluation.clarity < 7,
),
)
final_copy = rewrite.marketing_text

return {
"original_copy": original_copy,
"final_copy": final_copy,
"was_improved": needs_improvement,
"quality_metrics": evaluation,
}


if __name__ == "__main__":
idea = "A open-source platform for AI agents"
result = asyncio.run(generate_marketing_copy(idea))

print("\n=== Input Idea ===")
print(idea)

print("\n=== Marketing Copy ===")
print(result["original_copy"])

print("\n=== Quality Assessment ===")
metrics = result["quality_metrics"]
print(f"✓ Call to Action: {'Present' if metrics.has_call_to_action else 'Missing'}")
print(f"✓ Emotional Appeal: {metrics.emotional_appeal}/10")
print(f"✓ Clarity: {metrics.clarity}/10")

if result["was_improved"]:
print("\n=== Improved Marketing Copy ===")
print(result["final_copy"])
print()
Loading