Skip to content

Auto register agent #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lint:

.PHONY: test
test:
pytest
pytest --ignore=tests/e2e

.PHONY: lock
lock:
Expand Down
155 changes: 125 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,158 @@ pip install workflowai

## Usage

Usage examples are available in the [examples](./examples/) directory.
Usage examples are available in the [examples](./examples/) directory or end to [end test](./tests/e2e/)
directory.

### Getting a workflowai api key

Create an account on [workflowai.com](https://workflowai.com), generate an API key and set it as
an environment variable.

```
WORKFLOWAI_API_KEY=...
```

> You can also set the `WORKFLOWAI_API_URL` environment variable to point to your own WorkflowAI instance.

> The current UI does not allow to generate an API key without creating a task. Take the opportunity to play
> around with the UI. When the task is created, you can generate an API key from the Code section

### Set up the workflowai client

If you have defined the api key using an environment variable, the shared workflowai client will be
correctly configured.

You can override the shared client by calling the init function.

```python
import workflowai

wai = workflowai.start(
workflowai.init(
url=..., # defaults to WORKFLOWAI_API_URL env var or https://api.workflowai.com
api_key=..., # defaults to WORKFLOWAI_API_KEY env var
)
```

### Define a task
#### Using multiple clients

We use pydantic for type definitions.
You might want to avoid using the shared client, for example if you are using multiple API keys or accounts.
It is possible to achieve this by manually creating client instances

```python
from pydantic import BaseModel, Field
from workflowai import WorkflowAI

from workflowai import Task, TaskVersionReference
client = WorkflowAI(
url=...,
api_key=...,
)

# Use the client to create and run agents
@client.agent()
def my_agent(task_input: Input) -> Output:
...
```

### Build agents

class CityToCapitalTaskInput(BaseModel):
city: str
An agent is in essence an async function with the added constraints that:

- it has a single argument that is a pydantic model
- it has a single return value that is a pydantic model
- it is decorated with the `@client.agent()` decorator

class CityToCapitalTaskOutput(BaseModel):
capital: str
> [Pydantic](https://docs.pydantic.dev/latest/) is a very popular and powerful library for data validation and
> parsing. It allows us to extract the input and output schema in a simple way

Below is an agent that says hello:

```python
import workflowai
from pydantic import BaseModel

class CityToCapitalTask(Task[CityToCapitalTaskInput, CityToCapitalTaskOutput]):
id: str = "citytocapital"
schema_id: int = 1
input_class: type[CityToCapitalTaskInput] = CityToCapitalTaskInput
output_class: type[CityToCapitalTaskOutput] = CityToCapitalTaskOutput
class Input(BaseModel):
name: str

# The default version that should be used when running the task
version: TaskVersionReference = TaskVersionReference(
iteration=4,
)
class Output(BaseModel):
greeting: str

@workflowai.agent()
async def say_hello(input: Input) -> Output:
"""Say hello"""
...
```

### Run a task
When you call that function, the associated agent will be created on workflowai.com if it does not exist yet and a
run will be created. By default:

- the docstring will be used as instructions for the agent
- the default model (`workflowai.DEFAULT_MODEL`) is used to run the agent
- the agent id will be a slugified version of the function name (i-e `say-hello`) in this case

> **What is "..." ?**
>
> The `...` is the ellipsis value in python. It is usually used as a placeholder. You could use "pass" here as well
> or anything really, the implementation of the function is handled by the decorator `@workflowai.agent()` and so
> the function body is not executed.
> `...` is usually the right choice because it signals type checkers that they should ignore the function body.

> Having the agent id determined at runtime can lead to unexpected changes, since changing the function name will
> change the agent id. A good practice is to set the agent id explicitly, `@workflowai.agent(id="say-hello")`.

#### Using different models

WorkflowAI supports a long list of models. The source of truth for models we support is on [workflowai.com](https://workflowai.com). The [Model](./workflowai/core/domain/model.py) type is a good indication of what models are supported at the time of the sdk release, although it may be missing some models since new ones are added all the time.

You can set the model explicitly in the agent decorator:

```python
task = CityToCapitalTask()
task_input = CityToCapitalTaskInput(city=city)
task_run = await wai.run(task, task_input)
@workflowai.agent(model="gpt-4o")
def say_hello(input: Input) -> Output:
...
```

print(task_run.task_output)
> Models do not become invalid on WorkflowAI. When a model is retired, it will be replaced dynamically by
> a newer version of the same model with the same or a lower price so calling the api with
> a retired model will always work.

### Version from code or deployments

Setting a docstring or a model in the agent decorator signals the client that the agent parameters are
fixed and configured via code.

Handling the agent parameters in code is useful to get started but may be limited in the long run:

- it is somewhat hard to measure the impact of different parameters
- moving to new models or instructions requires a deployment
- iterating on the agent parameters can be very tedious

Deployments allow you to refer to a version of an agent's parameters from your code that's managed from the
workflowai.com UI. The following code will use the version of the agent named "production" which is a lot
more flexible than changing the function parameters when running in production.

```python
@workflowai.agent(deployment="production") # or simply @workflowai.agent()
def say_hello(input: Input) -> AsyncIterator[Run[Output]]:
...
```

It is also possible to stream a task output
### Streaming and advanced usage

You can configure the agent function to stream or return the full run object, simply by changing the type annotation.

```python
task = CityToCapitalTask()
task_input = CityToCapitalTaskInput(city=city)
iterator = await wai.run(task, task_input, stream=True)
async for chunk in iterator:
print(chunk) # chunk is a partial (non validated) CityToCapitalTaskOutput
# Return the full run object, useful if you want to extract metadata like cost or duration
@workflowai.agent()
async def say_hello(input: Input) -> Run[Output]:
...

# Stream the output, the output is filled as it is generated
@workflowai.agent()
def say_hello(input: Input) -> AsyncIterator[Output]:
...

# Stream the run object, the output is filled as it is generated
@workflowai.agent()
def say_hello(input: Input) -> AsyncIterator[Run[Output]]:
...
```
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "workflowai"
version = "0.5.5"
version = "0.6.0.dev0"
description = ""
authors = ["Guillaume Aquilina <[email protected]>"]
readme = "README.md"
Expand Down Expand Up @@ -52,7 +52,8 @@ ignore = [
"PYI051",
"FIX002",
"SLF001", #reportPrivateUsage
"PT017", # Do not force using pytest.raises
"PT017", # Do not force using pytest.raises
"PIE790", # ... are not unnecessary for empty functions with docstring
]

# Allow fix for all enabled rules (when `--fix`) is provided.
Expand Down
14 changes: 7 additions & 7 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import pytest
from dotenv import load_dotenv

from workflowai import Client
from workflowai.core.client.client import WorkflowAIClient
import workflowai

load_dotenv()
load_dotenv(override=True)


@pytest.fixture(scope="session")
def wai() -> Client:
return WorkflowAIClient(
endpoint=os.environ["WORKFLOWAI_TEST_API_URL"],
@pytest.fixture(scope="session", autouse=True)
def wai():
workflowai.init(
api_key=os.environ["WORKFLOWAI_TEST_API_KEY"],
url=os.environ["WORKFLOWAI_TEST_API_URL"],
)
return workflowai.shared_client
22 changes: 22 additions & 0 deletions tests/e2e/no_schema_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from pydantic import BaseModel

import workflowai


class SummarizeTaskInput(BaseModel):
text: Optional[str] = None


class SummarizeTaskOutput(BaseModel):
summary_points: Optional[list[str]] = None


@workflowai.agent(id="summarize")
async def summarize(task_input: SummarizeTaskInput) -> SummarizeTaskOutput: ...


async def test_summarize():
summarized = await summarize(SummarizeTaskInput(text="Hello, world!"))
assert summarized.summary_points
48 changes: 32 additions & 16 deletions tests/e2e/run_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from enum import Enum
from typing import AsyncIterator, Optional

import pytest
from pydantic import BaseModel

import workflowai
from workflowai.core.domain.task import Task
from workflowai.core.client.agent import Agent
from workflowai.core.client.client import WorkflowAI


class ExtractProductReviewSentimentTaskInput(BaseModel):
Expand All @@ -23,36 +25,50 @@ class ExtractProductReviewSentimentTaskOutput(BaseModel):
sentiment: Optional[Sentiment] = None


@workflowai.task(schema_id=1)
@workflowai.agent(id="extract-product-review-sentiment", schema_id=1)
def extract_product_review_sentiment(
task_input: ExtractProductReviewSentimentTaskInput,
) -> AsyncIterator[ExtractProductReviewSentimentTaskOutput]: ...


class ExtractProductReviewSentimentTask(
Task[ExtractProductReviewSentimentTaskInput, ExtractProductReviewSentimentTaskOutput],
):
id: str = "extract-product-review-sentiment"
schema_id: int = 1
input_class: type[ExtractProductReviewSentimentTaskInput] = ExtractProductReviewSentimentTaskInput
output_class: type[ExtractProductReviewSentimentTaskOutput] = ExtractProductReviewSentimentTaskOutput
@pytest.fixture
def extract_product_review_sentiment_agent(
wai: WorkflowAI,
) -> Agent[ExtractProductReviewSentimentTaskInput, ExtractProductReviewSentimentTaskOutput]:
return Agent(
agent_id="extract-product-review-sentiment",
schema_id=1,
input_cls=ExtractProductReviewSentimentTaskInput,
output_cls=ExtractProductReviewSentimentTaskOutput,
api=wai.api,
)


async def test_run_task(wai: workflowai.Client):
task = ExtractProductReviewSentimentTask()
async def test_run_task(
extract_product_review_sentiment_agent: Agent[
ExtractProductReviewSentimentTaskInput,
ExtractProductReviewSentimentTaskOutput,
],
):
task_input = ExtractProductReviewSentimentTaskInput(review_text="This product is amazing!")
run = await wai.run(task, task_input=task_input, use_cache="never")
run = await extract_product_review_sentiment_agent.run(task_input=task_input, use_cache="never")
assert run.task_output.sentiment == Sentiment.POSITIVE


async def test_stream_task(wai: workflowai.Client):
task = ExtractProductReviewSentimentTask()

async def test_stream_task(
extract_product_review_sentiment_agent: Agent[
ExtractProductReviewSentimentTaskInput,
ExtractProductReviewSentimentTaskOutput,
],
):
task_input = ExtractProductReviewSentimentTaskInput(
review_text="This product is amazing!",
)

streamed = await wai.run(task, task_input=task_input, stream=True, use_cache="never")
streamed = extract_product_review_sentiment_agent.stream(
task_input=task_input,
use_cache="never",
)
chunks = [chunk async for chunk in streamed]

assert len(chunks) > 1
Loading
Loading