Skip to content

Commit 6bcd915

Browse files
authored
Merge pull request #466 from Lothiraldan/opik-filter-example
Add filter example of message monitoring using Opik
2 parents ef900c4 + b7debc3 commit 6bcd915

File tree

3 files changed

+276
-0
lines changed

3 files changed

+276
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Welcome to **Pipelines**, an [Open WebUI](https://github.com/open-webui) initiat
2323
- [**Function Calling Pipeline**](/examples/filters/function_calling_filter_pipeline.py): Easily handle function calls and enhance your applications with custom logic.
2424
- [**Custom RAG Pipeline**](/examples/pipelines/rag/llamaindex_pipeline.py): Implement sophisticated Retrieval-Augmented Generation pipelines tailored to your needs.
2525
- [**Message Monitoring Using Langfuse**](/examples/filters/langfuse_filter_pipeline.py): Monitor and analyze message interactions in real-time using Langfuse.
26+
- [**Message Monitoring Using Opik**](/examples/filters/opik_filter_pipeline.py): Monitor and analyze message interactions using Opik, an open-source platform for debugging and evaluating LLM applications and RAG systems.
2627
- [**Rate Limit Filter**](/examples/filters/rate_limit_filter_pipeline.py): Control the flow of requests to prevent exceeding rate limits.
2728
- [**Real-Time Translation Filter with LibreTranslate**](/examples/filters/libretranslate_filter_pipeline.py): Seamlessly integrate real-time translations into your LLM interactions.
2829
- [**Toxic Message Filter**](/examples/filters/detoxify_filter_pipeline.py): Implement filters to detect and handle toxic messages effectively.
+274
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
"""
2+
title: Opik Filter Pipeline
3+
author: open-webui
4+
date: 2025-03-12
5+
version: 1.0
6+
license: MIT
7+
description: A filter pipeline that uses Opik for LLM observability.
8+
requirements: opik
9+
"""
10+
11+
from typing import List, Optional
12+
import os
13+
import uuid
14+
import json
15+
16+
from pydantic import BaseModel
17+
from opik import Opik
18+
19+
20+
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
21+
for message in reversed(messages):
22+
if message["role"] == "assistant":
23+
return message
24+
return {}
25+
26+
27+
class Pipeline:
28+
class Valves(BaseModel):
29+
pipelines: List[str] = []
30+
priority: int = 0
31+
api_key: Optional[str] = None
32+
workspace: str
33+
project_name: str
34+
host: str
35+
debug: bool = False
36+
37+
def __init__(self):
38+
self.type = "filter"
39+
self.name = "Opik Filter"
40+
41+
self.valves = self.Valves(
42+
**{
43+
"pipelines": ["*"],
44+
"api_key": os.getenv("OPIK_API_KEY", "set_me_for_opik_cloud"),
45+
"workspace": os.getenv("OPIK_WORKSPACE", "default"),
46+
"project_name": os.getenv("OPIK_PROJECT_NAME", "default"),
47+
"host": os.getenv(
48+
"OPIK_URL_OVERRIDE", "https://www.comet.com/opik/api"
49+
),
50+
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
51+
}
52+
)
53+
54+
self.opik = None
55+
# Keep track of the trace and the last-created span for each chat_id
56+
self.chat_traces = {}
57+
self.chat_spans = {}
58+
59+
self.suppressed_logs = set()
60+
61+
def log(self, message: str, suppress_repeats: bool = False):
62+
"""Logs messages to the terminal if debugging is enabled."""
63+
if self.valves.debug:
64+
if suppress_repeats:
65+
if message in self.suppressed_logs:
66+
return
67+
self.suppressed_logs.add(message)
68+
print(f"[DEBUG] {message}")
69+
70+
async def on_startup(self):
71+
self.log(f"on_startup triggered for {__name__}")
72+
self.set_opik()
73+
74+
async def on_shutdown(self):
75+
self.log(f"on_shutdown triggered for {__name__}")
76+
if self.opik:
77+
self.opik.end()
78+
79+
async def on_valves_updated(self):
80+
self.log("Valves updated, resetting Opik client.")
81+
if self.opik:
82+
self.opik.end()
83+
self.set_opik()
84+
85+
def set_opik(self):
86+
try:
87+
self.opik = Opik(
88+
project_name=self.valves.project_name,
89+
workspace=self.valves.workspace,
90+
host=self.valves.host,
91+
api_key=self.valves.api_key,
92+
)
93+
self.opik.auth_check()
94+
self.log("Opik client initialized successfully.")
95+
except Exception as e:
96+
print(
97+
f"Opik error: {e} Please re-enter your Opik credentials in the pipeline settings."
98+
)
99+
100+
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
101+
"""
102+
Inlet handles the incoming request (usually a user message).
103+
- If no trace exists yet for this chat_id, we create a new trace.
104+
- If a trace does exist, we simply create a new span for the new user message.
105+
"""
106+
if self.valves.debug:
107+
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
108+
109+
self.log(f"Inlet function called with body: {body} and user: {user}")
110+
111+
metadata = body.get("metadata", {})
112+
task = metadata.get("task", "")
113+
114+
# Skip logging tasks for now
115+
if task:
116+
self.log(f"Skipping {task} task.")
117+
return body
118+
119+
if "chat_id" not in metadata:
120+
chat_id = str(uuid.uuid4()) # Regular chat messages
121+
self.log(f"Assigned normal chat_id: {chat_id}")
122+
123+
metadata["chat_id"] = chat_id
124+
body["metadata"] = metadata
125+
else:
126+
chat_id = metadata["chat_id"]
127+
128+
required_keys = ["model", "messages"]
129+
missing_keys = [key for key in required_keys if key not in body]
130+
if missing_keys:
131+
error_message = (
132+
f"Error: Missing keys in the request body: {', '.join(missing_keys)}"
133+
)
134+
self.log(error_message)
135+
raise ValueError(error_message)
136+
137+
user_email = user.get("email") if user else None
138+
139+
assert chat_id not in self.chat_traces, (
140+
f"There shouldn't be a trace already exists for chat_id {chat_id}"
141+
)
142+
143+
# Create a new trace and span
144+
self.log(f"Creating new chat trace for chat_id: {chat_id}")
145+
146+
# Body copy for traces and span
147+
trace_body = body.copy()
148+
span_body = body.copy()
149+
150+
# Extract metadata from body
151+
metadata = trace_body.pop("metadata", {})
152+
metadata.update({"chat_id": chat_id, "user_id": user_email})
153+
154+
# We don't need the model at the trace level
155+
trace_body.pop("model", None)
156+
157+
trace_payload = {
158+
"name": f"{__name__}",
159+
"input": trace_body,
160+
"metadata": metadata,
161+
"thread_id": chat_id,
162+
}
163+
164+
if self.valves.debug:
165+
print(f"[DEBUG] Opik trace request: {json.dumps(trace_payload, indent=2)}")
166+
167+
trace = self.opik.trace(**trace_payload)
168+
169+
span_metadata = metadata.copy()
170+
span_metadata.update({"interface": "open-webui"})
171+
172+
# Extract the model from body
173+
span_body.pop("model", None)
174+
# We don't need the metadata in the input for the span
175+
span_body.pop("metadata", None)
176+
177+
# Extract the model and provider from metadata
178+
model = span_metadata.get("model", {}).get("id", None)
179+
provider = span_metadata.get("model", {}).get("owned_by", None)
180+
181+
span_payload = {
182+
"name": chat_id,
183+
"model": model,
184+
"provider": provider,
185+
"input": span_body,
186+
"metadata": span_metadata,
187+
"type": "llm",
188+
}
189+
190+
if self.valves.debug:
191+
print(f"[DEBUG] Opik span request: {json.dumps(span_payload, indent=2)}")
192+
193+
span = trace.span(**span_payload)
194+
195+
self.chat_traces[chat_id] = trace
196+
self.chat_spans[chat_id] = span
197+
self.log(f"Trace and span objects successfully created for chat_id: {chat_id}")
198+
199+
return body
200+
201+
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
202+
"""
203+
Outlet handles the response body (usually the assistant message).
204+
It will finalize/end the span created for the user request.
205+
"""
206+
self.log(f"Outlet function called with body: {body}")
207+
208+
chat_id = body.get("chat_id")
209+
210+
# If no trace or span exist, attempt to register again
211+
if chat_id not in self.chat_traces or chat_id not in self.chat_spans:
212+
self.log(
213+
f"[WARNING] No matching chat trace found for chat_id: {chat_id}, chat won't be logged."
214+
)
215+
return body
216+
217+
trace = self.chat_traces[chat_id]
218+
span = self.chat_spans[chat_id]
219+
220+
# Body copy for traces and span
221+
trace_body = body.copy()
222+
span_body = body.copy()
223+
224+
# Get the last assistant message from the conversation
225+
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
226+
227+
# Extract usage if available
228+
usage = None
229+
self.log(f"Assistant message obj: {assistant_message_obj}")
230+
if assistant_message_obj:
231+
message_usage = assistant_message_obj.get("usage", {})
232+
if isinstance(message_usage, dict):
233+
input_tokens = message_usage.get(
234+
"prompt_eval_count"
235+
) or message_usage.get("prompt_tokens")
236+
output_tokens = message_usage.get("eval_count") or message_usage.get(
237+
"completion_tokens"
238+
)
239+
if input_tokens is not None and output_tokens is not None:
240+
usage = {
241+
"prompt_tokens": input_tokens,
242+
"completion_tokens": output_tokens,
243+
"total_tokens": input_tokens + output_tokens,
244+
}
245+
self.log(f"Usage data extracted: {usage}")
246+
247+
# Chat_id is already logged as trace thread
248+
span_body.pop("chat_id", None)
249+
250+
# End the span with the final assistant message and updated conversation
251+
span_payload = {
252+
"output": span_body, # include the entire conversation
253+
"usage": usage,
254+
}
255+
256+
if self.valves.debug:
257+
print(
258+
f"[DEBUG] Opik span end request: {json.dumps(span_payload, indent=2)}"
259+
)
260+
261+
span.end(**span_payload)
262+
self.log(f"span ended for chat_id: {chat_id}")
263+
264+
# Chat_id is already logged as trace thread
265+
span_body.pop("chat_id", None)
266+
267+
# Optionally update the trace with the final assistant output
268+
trace.end(output=trace_body)
269+
270+
# Force the creation of a new trace and span for the next chat even if they are part of the same thread
271+
del self.chat_traces[chat_id]
272+
del self.chat_spans[chat_id]
273+
274+
return body

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ psycopg2-binary
3232
# Observability
3333
langfuse
3434
ddtrace
35+
opik
3536

3637
# ML libraries
3738
torch

0 commit comments

Comments
 (0)