Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

feat(messages): polish pagination and filtering #1213

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions src/codegate/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@

import requests
import structlog
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import StreamingResponse
from fastapi.routing import APIRoute
from pydantic import BaseModel, ValidationError

import codegate.muxing.models as mux_models
from codegate import __version__
from codegate.api import v1_models, v1_processing
from codegate.config import API_DEFAULT_PAGE_SIZE, API_MAX_PAGE_SIZE
from codegate.db.connection import AlreadyExistsError, DbReader
from codegate.db.models import AlertSeverity, WorkspaceWithModel
from codegate.db.models import AlertSeverity, AlertTriggerType, WorkspaceWithModel
from codegate.providers import crud as provendcrud
from codegate.workspaces import crud

Expand Down Expand Up @@ -429,7 +430,13 @@ async def get_workspace_alerts_summary(workspace_name: str) -> v1_models.AlertSu
tags=["Workspaces"],
generate_unique_id_function=uniq_name,
)
async def get_workspace_messages(workspace_name: str) -> List[v1_models.Conversation]:
async def get_workspace_messages(
workspace_name: str,
page: int = Query(1, ge=1),
page_size: int = Query(API_DEFAULT_PAGE_SIZE, ge=1, le=API_MAX_PAGE_SIZE),
filter_by_ids: Optional[List[str]] = Query(None),
filter_by_alert_trigger_types: Optional[List[AlertTriggerType]] = Query(None),
) -> v1_models.PaginatedMessagesResponse:
"""Get messages for a workspace."""
try:
ws = await wscrud.get_workspace_by_name(workspace_name)
Expand All @@ -439,16 +446,40 @@ async def get_workspace_messages(workspace_name: str) -> List[v1_models.Conversa
logger.exception("Error while getting workspace")
raise HTTPException(status_code=500, detail="Internal server error")

offset = (page - 1) * page_size
fetched_messages: List[v1_models.Conversation] = []

try:
prompts_with_output_alerts_usage = (
await dbreader.get_prompts_with_output_alerts_usage_by_workspace_id(
ws.id, AlertSeverity.CRITICAL.value
while len(fetched_messages) < page_size:
messages_batch = await dbreader.get_messages(
ws.id,
offset,
page_size,
filter_by_ids,
list([AlertSeverity.CRITICAL.value]), # TODO: Configurable severity
filter_by_alert_trigger_types,
)
if not messages_batch:
break
parsed_conversations, _ = await v1_processing.parse_messages_in_conversations(
messages_batch
)
fetched_messages.extend(parsed_conversations)

offset += len(messages_batch)

final_messages = fetched_messages[:page_size]

# Fetch total message count
total_count = await dbreader.get_total_messages_count_by_workspace_id(
ws.id, AlertSeverity.CRITICAL.value
)
conversations, _ = await v1_processing.parse_messages_in_conversations(
prompts_with_output_alerts_usage
return v1_models.PaginatedMessagesResponse(
data=final_messages,
limit=page_size,
offset=(page - 1) * page_size,
total=total_count,
)
return conversations
except Exception:
logger.exception("Error while getting messages")
raise HTTPException(status_code=500, detail="Internal server error")
Expand Down
7 changes: 7 additions & 0 deletions src/codegate/api/v1_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,10 @@ class ModelByProvider(pydantic.BaseModel):

def __str__(self):
return f"{self.provider_name} / {self.name}"


class PaginatedMessagesResponse(pydantic.BaseModel):
data: List[Conversation]
limit: int
offset: int
total: int
13 changes: 6 additions & 7 deletions src/codegate/api/v1_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
TokenUsageByModel,
)
from codegate.db.connection import alert_queue
from codegate.db.models import Alert, GetPromptWithOutputsRow, TokenUsage
from codegate.db.models import Alert, GetMessagesRow, TokenUsage

logger = structlog.get_logger("codegate")

Expand Down Expand Up @@ -152,7 +152,7 @@ def _parse_single_output(single_output: dict) -> str:


async def _get_partial_question_answer(
row: GetPromptWithOutputsRow,
row: GetMessagesRow,
) -> Optional[PartialQuestionAnswer]:
"""
Parse a row from the get_prompt_with_outputs query and return a PartialConversation
Expand Down Expand Up @@ -423,7 +423,7 @@ async def match_conversations(


async def _process_prompt_output_to_partial_qa(
prompts_outputs: List[GetPromptWithOutputsRow],
prompts_outputs: List[GetMessagesRow],
) -> List[PartialQuestionAnswer]:
"""
Process the prompts and outputs to PartialQuestionAnswer objects.
Expand All @@ -435,7 +435,7 @@ async def _process_prompt_output_to_partial_qa(


async def parse_messages_in_conversations(
prompts_outputs: List[GetPromptWithOutputsRow],
prompts_outputs: List[GetMessagesRow],
) -> Tuple[List[Conversation], Dict[str, Conversation]]:
"""
Get all the messages from the database and return them as a list of conversations.
Expand Down Expand Up @@ -477,7 +477,7 @@ async def parse_row_alert_conversation(

async def parse_get_alert_conversation(
alerts: List[Alert],
prompts_outputs: List[GetPromptWithOutputsRow],
prompts_outputs: List[GetMessagesRow],
) -> List[AlertConversation]:
"""
Parse a list of rows from the get_alerts_with_prompt_and_output query and return a list of
Expand All @@ -496,7 +496,7 @@ async def parse_get_alert_conversation(


async def parse_workspace_token_usage(
prompts_outputs: List[GetPromptWithOutputsRow],
prompts_outputs: List[GetMessagesRow],
) -> TokenUsageAggregate:
"""
Parse the token usage from the workspace.
Expand All @@ -515,7 +515,6 @@ async def remove_duplicate_alerts(alerts: List[v1_models.Alert]) -> List[v1_mode
for alert in sorted(
alerts, key=lambda x: x.timestamp, reverse=True
): # Sort alerts by timestamp descending

# Handle trigger_string based on its type
trigger_string_content = ""
if isinstance(alert.trigger_string, dict):
Expand Down
3 changes: 3 additions & 0 deletions src/codegate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
"llamacpp": "./codegate_volume/models", # Default LlamaCpp model path
}

API_DEFAULT_PAGE_SIZE = 50
API_MAX_PAGE_SIZE = 100


@dataclass
class Config:
Expand Down
Loading