Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 8 additions & 136 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,97 +57,6 @@ async def dispatch(self, request: Request, call_next):
return await call_next(request)


def format_answer_box(answer_box: dict[str, Any]) -> str:
"""Format answer_box results for weather, finance, and other structured data."""
if answer_box.get("type") == "weather_result":
result = f"Temperature: {answer_box.get('temperature', 'N/A')}\n"
result += f"Unit: {answer_box.get('unit', 'N/A')}\n"
result += f"Precipitation: {answer_box.get('precipitation', 'N/A')}\n"
result += f"Humidity: {answer_box.get('humidity', 'N/A')}\n"
result += f"Wind: {answer_box.get('wind', 'N/A')}\n"
result += f"Location: {answer_box.get('location', 'N/A')}\n"
result += f"Date: {answer_box.get('date', 'N/A')}\n"
result += f"Weather: {answer_box.get('weather', 'N/A')}"

# Add forecast if available
if "forecast" in answer_box:
result += "\n\nDaily Forecast:\n"
for day in answer_box["forecast"]:
result += f"{day.get('day', 'N/A')}: {day.get('weather', 'N/A')} "
if "temperature" in day:
high = day["temperature"].get("high", "N/A")
low = day["temperature"].get("low", "N/A")
result += f"(High: {high}, Low: {low})"
result += "\n"

return result

elif answer_box.get("type") == "finance_results":
result = f"Title: {answer_box.get('title', 'N/A')}\n"
result += f"Exchange: {answer_box.get('exchange', 'N/A')}\n"
result += f"Stock: {answer_box.get('stock', 'N/A')}\n"
result += f"Currency: {answer_box.get('currency', 'N/A')}\n"
result += f"Price: {answer_box.get('price', 'N/A')}\n"
result += f"Previous Close: {answer_box.get('previous_close', 'N/A')}\n"

if "price_movement" in answer_box:
pm = answer_box["price_movement"]
result += f"Price Movement: {pm.get('price', 'N/A')} ({pm.get('percentage', 'N/A')}%) {pm.get('movement', 'N/A')}\n"

if "table" in answer_box:
result += "\nFinancial Metrics:\n"
for row in answer_box["table"]:
result += f"{row.get('name', 'N/A')}: {row.get('value', 'N/A')}\n"

return result
else:
# Generic answer box formatting
result = ""
for key, value in answer_box.items():
if key != "type":
result += f"{key.replace('_', ' ').title()}: {value}\n"
return result


def format_organic_results(organic_results: list[Any]) -> str:
"""Format organic search results."""
formatted_results = []
for result in organic_results:
title = result.get("title", "No title")
link = result.get("link", "No link")
snippet = result.get("snippet", "No snippet")
formatted_results.append(f"Title: {title}\nLink: {link}\nSnippet: {snippet}\n")
return "\n".join(formatted_results) if formatted_results else ""


def format_news_results(news_results: list[Any]) -> str:
"""Format news search results."""
formatted_results = []
for result in news_results:
title = result.get("title", "No title")
link = result.get("link", "No link")
snippet = result.get("snippet", "No snippet")
date = result.get("date", "No date")
source = result.get("source", "No source")
formatted_results.append(
f"Title: {title}\nSource: {source}\nDate: {date}\nLink: {link}\nSnippet: {snippet}\n"
)
return "\n".join(formatted_results) if formatted_results else ""


def format_images_results(images_results: list[Any]) -> str:
"""Format image search results."""
formatted_results = []
for result in images_results:
title = result.get("title", "No title")
link = result.get("link", "No link")
thumbnail = result.get("thumbnail", "No thumbnail")
formatted_results.append(
f"Title: {title}\nImage: {link}\nThumbnail: {thumbnail}\n"
)
return "\n".join(formatted_results) if formatted_results else ""


@mcp.tool()
async def search(params: dict[str, Any] = {}, raw: bool = False) -> str:
"""Universal search tool supporting all SerpApi engines and result types.
Expand Down Expand Up @@ -191,51 +100,14 @@ async def search(params: dict[str, Any] = {}, raw: bool = False) -> str:
# Return raw JSON if requested
if raw:
return json.dumps(data, indent=2, ensure_ascii=False)

# Process results in priority order
formatted_output = ""

# 1. Answer box (weather, finance, knowledge graph, etc.) - highest priority
if "answer_box" in data:
formatted_output += "=== Answer Box ===\n"
formatted_output += format_answer_box(data["answer_box"])
formatted_output += "\n\n"

# 2. News results
if "news_results" in data and data["news_results"]:
formatted_output += "=== News Results ===\n"
formatted_output += format_news_results(data["news_results"])
formatted_output += "\n\n"

# 3. Organic results
if "organic_results" in data and data["organic_results"]:
formatted_output += "=== Search Results ===\n"
formatted_output += format_organic_results(data["organic_results"])
formatted_output += "\n\n"

# 4. Image results
if "images_results" in data and data["images_results"]:
formatted_output += "=== Image Results ===\n"
formatted_output += format_images_results(data["images_results"])
formatted_output += "\n\n"

# 5. Shopping results
if "shopping_results" in data and data["shopping_results"]:
formatted_output += "=== Shopping Results ===\n"
shopping_results = []
for result in data["shopping_results"]:
title = result.get("title", "No title")
price = result.get("price", "No price")
link = result.get("link", "No link")
source = result.get("source", "No source")
shopping_results.append(
f"Title: {title}\nPrice: {price}\nSource: {source}\nLink: {link}\n"
)
formatted_output += "\n".join(shopping_results) + "\n\n"

# Return formatted output or fallback message
if formatted_output.strip():
return formatted_output.strip()

results = {}
for key in data:
if key not in ("search_metadata", "search_parameters") and "pagination" not in key:
results[key] = data[key]

if results:
return json.dumps(results, ensure_ascii=False)
else:
return "No results found for the given query. Try adjusting your search parameters or engine."

Expand Down
Loading