From fdd747dfde3921324c10092594ecbe3d447a0f8b Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Mon, 18 Aug 2025 18:28:16 +0530 Subject: [PATCH 1/7] updated wrapper functionality Signed-off-by: Keval Mahajan --- AGENTS.md | 2 +- CLAUDE.md | 4 +- DEVELOPING.md | 4 +- README.md | 60 +- docs/docs/overview/quick_start.md | 8 +- docs/docs/testing/acceptance.md | 6 +- .../argocd-helm-deployment-ibm-cloud-iks.md | 4 +- docs/docs/using/agents/bee.md | 2 +- docs/docs/using/clients/claude-desktop.md | 16 +- docs/docs/using/clients/cline.md | 4 +- docs/docs/using/clients/continue.md | 8 +- docs/docs/using/clients/copilot.md | 10 +- docs/docs/using/clients/mcp-cli.md | 20 +- docs/docs/using/clients/mcp-inspector.md | 8 +- docs/docs/using/mcpgateway-wrapper.md | 34 +- docs/docs/using/servers/index.md | 4 +- mcpgateway/static/admin.js | 4 +- mcpgateway/utils/retry_manager.py | 55 +- mcpgateway/wrapper.py | 1367 +++++++---------- pyproject.toml | 1 + tests/playwright/README.md | 2 +- tests/playwright/conftest.py | 2 +- tests/unit/mcpgateway/test_wrapper.py | 610 ++------ 23 files changed, 793 insertions(+), 1442 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 90388fe39..14ee35423 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -62,5 +62,5 @@ MCP helpers ## Security & Configuration Tips - Copy `.env.example` β†’ `.env`; verify with `make check-env`. Never commit secrets. - Auth: set `JWT_SECRET_KEY`; export `MCPGATEWAY_BEARER_TOKEN` using the token utility for API calls. -- Wrapper: set `MCP_SERVER_CATALOG_URLS` and `MCP_AUTH_TOKEN` when using `mcpgateway.wrapper`. +- Wrapper: set `MCP_SERVER_URL` and `MCP_AUTH` when using `mcpgateway.wrapper`. - TLS: `make certs` β†’ `make serve-ssl`. Prefer environment variables for config; see `README.md`. diff --git a/CLAUDE.md b/CLAUDE.md index 897c3a941..df2697c75 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,8 +58,8 @@ python3 -m mcpgateway.translate \ --stdio "uvx mcp-server-git" --port 9000 # Run the stdio wrapper for MCP clients -export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID +export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN +export MCP_SERVER_URL=http://localhost:4444/servers/UUID/mcp python3 -m mcpgateway.wrapper ``` diff --git a/DEVELOPING.md b/DEVELOPING.md index 7bc58b042..ae508fd42 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -5,8 +5,8 @@ ```bash # Gateway & auth export MCP_GATEWAY_BASE_URL=http://localhost:4444 -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 -export MCP_AUTH_TOKEN="" +export MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp +export MCP_AUTH="" ``` | Mode | Command | Notes | diff --git a/README.md b/README.md index 683cc9184..777a9ee3f 100644 --- a/README.md +++ b/README.md @@ -376,22 +376,32 @@ npx -y @modelcontextprotocol/inspector πŸ–§ Using the stdio wrapper (mcpgateway-wrapper) ```bash -export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 +export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN +export MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp python3 -m mcpgateway.wrapper # Ctrl-C to exit ``` You can also run it with `uv` or inside Docker/Podman - see the *Containers* section above. -In MCP Inspector, define `MCP_AUTH_TOKEN` and `MCP_SERVER_CATALOG_URLS` env variables, and select `python3` as the Command, and `-m mcpgateway.wrapper` as Arguments. +In MCP Inspector, define `MCP_AUTH` and `MCP_SERVER_URL` env variables, and select `python3` as the Command, and `-m mcpgateway.wrapper` as Arguments. ```bash echo $PWD/.venv/bin/python3 # Using the Python3 full path ensures you have a working venv -export MCP_SERVER_CATALOG_URLS='http://localhost:4444/servers/UUID_OF_SERVER_1' -export MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} +export MCP_SERVER_URL='http://localhost:4444/servers/UUID_OF_SERVER_1/mcp' +export MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} npx -y @modelcontextprotocol/inspector ``` +or + +Pass the url and auth as arguments (no need to set environment variables) +```bash +npx -y @modelcontextprotocol/inspector +command as `python` +Arguments as `-m mcpgateway.wrapper --url "http://localhost:4444/servers/UUID_OF_SERVER_1/mcp" --auth "Bearer "` +``` + + When using a MCP Client such as Claude with stdio: ```json @@ -401,8 +411,8 @@ When using a MCP Client such as Claude with stdio: "command": "python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "your-token-here", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "your-token-here", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1", "MCP_TOOL_CALL_TIMEOUT": "120" } } @@ -591,14 +601,14 @@ The `mcpgateway.wrapper` lets you connect to the gateway over **stdio** while ke ```bash # Set environment variables export MCPGATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token --username admin --exp 10080 --secret my-test-key) -export MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} -export MCP_SERVER_CATALOG_URLS='http://localhost:4444/servers/UUID_OF_SERVER_1' +export MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} +export MCP_SERVER_URL='http://localhost:4444/servers/UUID_OF_SERVER_1/mcp' export MCP_TOOL_CALL_TIMEOUT=120 export MCP_WRAPPER_LOG_LEVEL=DEBUG # or OFF to disable logging docker run --rm -i \ - -e MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN \ - -e MCP_SERVER_CATALOG_URLS=http://host.docker.internal:4444/servers/UUID_OF_SERVER_1 \ + -e MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN \ + -e MCP_SERVER_URL=http://host.docker.internal:4444/servers/UUID_OF_SERVER_1/mcp \ -e MCP_TOOL_CALL_TIMEOUT=120 \ -e MCP_WRAPPER_LOG_LEVEL=DEBUG \ ghcr.io/ibm/mcp-context-forge:0.5.0 \ @@ -615,8 +625,8 @@ Because the wrapper speaks JSON-RPC over stdin/stdout, you can interact with it ```bash # Start the MCP Gateway Wrapper -export MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/YOUR_SERVER_UUID +export MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} +export MCP_SERVER_URL=http://localhost:4444/servers/YOUR_SERVER_UUID python3 -m mcpgateway.wrapper ``` @@ -667,8 +677,8 @@ python3 -m mcpgateway.wrapper The `mcpgateway.wrapper` exposes everything your Gateway knows about over **stdio**, so any MCP client that *can't* (or *shouldn't*) open an authenticated SSE stream still gets full tool-calling power. -> **Remember** to substitute your real Gateway URL (and server ID) for `http://localhost:4444/servers/UUID_OF_SERVER_1`. -> When inside Docker/Podman, that often becomes `http://host.docker.internal:4444/servers/UUID_OF_SERVER_1` (macOS/Windows) or the gateway container's hostname (Linux). +> **Remember** to substitute your real Gateway URL (and server ID) for `http://localhost:4444/servers/UUID_OF_SERVER_1/mcp`. +> When inside Docker/Podman, that often becomes `http://host.docker.internal:4444/servers/UUID_OF_SERVER_1/mcp` (macOS/Windows) or the gateway container's hostname (Linux). --- @@ -678,8 +688,8 @@ The `mcpgateway.wrapper` exposes everything your Gateway knows about over **stdi ```bash docker run -i --rm \ --network=host \ - -e MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 \ - -e MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} \ + -e MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp \ + -e MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} \ -e MCP_TOOL_CALL_TIMEOUT=120 \ ghcr.io/ibm/mcp-context-forge:0.5.0 \ python3 -m mcpgateway.wrapper @@ -697,8 +707,8 @@ docker run -i --rm \ pipx install --include-deps mcp-contextforge-gateway # Run the stdio wrapper -MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} \ -MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 \ +MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} \ +MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp \ python3 -m mcpgateway.wrapper # Alternatively with uv uv run --directory . -m mcpgateway.wrapper @@ -713,8 +723,8 @@ uv run --directory . -m mcpgateway.wrapper "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1/mcp", "MCP_TOOL_CALL_TIMEOUT": "120" } } @@ -750,8 +760,8 @@ source ~/.venv/mcpgateway/bin/activate uv pip install mcp-contextforge-gateway # Launch wrapper -MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} \ -MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 \ +MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} \ +MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp \ uv run --directory . -m mcpgateway.wrapper # Use this just for testing, as the Client will run the uv command ``` @@ -770,8 +780,8 @@ uv run --directory . -m mcpgateway.wrapper # Use this just for testing, as the C "mcpgateway.wrapper" ], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1" + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1/mcp" } } } diff --git a/docs/docs/overview/quick_start.md b/docs/docs/overview/quick_start.md index f87b2bc6a..6e64474a9 100644 --- a/docs/docs/overview/quick_start.md +++ b/docs/docs/overview/quick_start.md @@ -207,8 +207,8 @@ npx -y @modelcontextprotocol/inspector ## Connect via `mcpgateway-wrapper` (stdio) ```bash -export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 +export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN +export MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp python3 -m mcpgateway.wrapper # behaves as a local MCP stdio server - run from MCP client ``` @@ -221,8 +221,8 @@ Use this in GUI clients (Claude Desktop, Continue, etc.) that prefer stdio. Exam "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1/mcp", + "MCP_AUTH": "", "MCP_TOOL_CALL_TIMEOUT": "120" } } diff --git a/docs/docs/testing/acceptance.md b/docs/docs/testing/acceptance.md index de40ff651..8895d6b32 100644 --- a/docs/docs/testing/acceptance.md +++ b/docs/docs/testing/acceptance.md @@ -165,7 +165,7 @@ graph TB | Feature | URL | Commands | Expected Result | Status | Notes | |---------|-----|----------|-----------------|--------|-------| | Install Package | `pip install mcp-contextforge-gateway` | Install for wrapper | Package installed | ☐ | If not already done | -| Set Environment | `export MCP_SERVER_CATALOG_URLS="$GW_URL/servers/$TIME_SERVER_UUID" && export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN` | Configure wrapper | Environment set | ☐ | Point to virtual server | +| Set Environment | `export MCP_SERVER_URL="$GW_URL/servers/$TIME_SERVER_UUID" && export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN` | Configure wrapper | Environment set | ☐ | Point to virtual server | | Test Wrapper Init | `echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}' \| python3 -m mcpgateway.wrapper 2>/dev/null \| jq` | Initialize via stdio | Returns capabilities with tools | ☐ | Stdio to HTTP bridge | | List Tools via Wrapper | `echo '{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}' \| python3 -m mcpgateway.wrapper 2>/dev/null \| jq` | List tools via stdio | Returns tool list | ☐ | Wrapper functionality | @@ -183,8 +183,8 @@ graph TB "command": "python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "$GW_URL/servers/$TIME_SERVER_UUID", - "MCP_AUTH_TOKEN": "$MCPGATEWAY_BEARER_TOKEN" + "MCP_SERVER_URL": "$GW_URL/servers/$TIME_SERVER_UUID", + "MCP_AUTH": "$MCPGATEWAY_BEARER_TOKEN" } } } diff --git a/docs/docs/tutorials/argocd-helm-deployment-ibm-cloud-iks.md b/docs/docs/tutorials/argocd-helm-deployment-ibm-cloud-iks.md index 772f63355..498eaeac0 100644 --- a/docs/docs/tutorials/argocd-helm-deployment-ibm-cloud-iks.md +++ b/docs/docs/tutorials/argocd-helm-deployment-ibm-cloud-iks.md @@ -694,8 +694,8 @@ Add to your Claude Desktop configuration: "command": "python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "https://mcp-gateway./servers/UUID_OF_SERVER_1" + "MCP_AUTH": "", + "MCP_SERVER_URL": "https://mcp-gateway./servers/UUID_OF_SERVER_1/mcp" } } } diff --git a/docs/docs/using/agents/bee.md b/docs/docs/using/agents/bee.md index e6d3e09fe..7f5a6d5be 100644 --- a/docs/docs/using/agents/bee.md +++ b/docs/docs/using/agents/bee.md @@ -36,7 +36,7 @@ To use MCP tools in the Bee Agent Framework, follow these steps: ```bash export MCP_GATEWAY_BASE_URL=http://localhost:4444 - export MCP_AUTH_TOKEN="your_bearer_token" + export MCP_AUTH="your_bearer_token" ``` --- diff --git a/docs/docs/using/clients/claude-desktop.md b/docs/docs/using/clients/claude-desktop.md index 28bc2c7e6..2f0fcb336 100644 --- a/docs/docs/using/clients/claude-desktop.md +++ b/docs/docs/using/clients/claude-desktop.md @@ -26,8 +26,8 @@ prompt and resource registered in your Gateway. "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "", "MCP_TOOL_CALL_TIMEOUT": "120" } } @@ -46,8 +46,8 @@ prompt and resource registered in your Gateway. "command": "docker", "args": [ "run", "--rm", "--network=host", "-i", - "-e", "MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1", - "-e", "MCP_AUTH_TOKEN=", + "-e", "MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1", + "-e", "MCP_AUTH=", "ghcr.io/ibm/mcp-context-forge:0.5.0", "python3", "-m", "mcpgateway.wrapper" ] @@ -67,8 +67,8 @@ If you installed the package globally: "command": "pipx", "args": ["run", "python3", "-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "" + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "" } } ``` @@ -94,8 +94,8 @@ If tools don't appear, open *File β–Έ Settings β–Έ Developer β–Έ View Logs* to s | Var | Purpose | | ------------------------- | ------------------------------------------------- | -| `MCP_SERVER_CATALOG_URLS` | One or more `/servers/{id}` endpoints (comma-sep) | -| `MCP_AUTH_TOKEN` | JWT bearer for Gateway auth | +| `MCP_SERVER_URL` | One or more `/servers/{id}` endpoints (comma-sep) | +| `MCP_AUTH` | JWT bearer for Gateway auth | | `MCP_TOOL_CALL_TIMEOUT` | Per-tool timeout (seconds, optional) | | `MCP_WRAPPER_LOG_LEVEL` | `DEBUG`, `INFO`, `OFF` (optional) | diff --git a/docs/docs/using/clients/cline.md b/docs/docs/using/clients/cline.md index 5bacd9db4..e96e2c54f 100644 --- a/docs/docs/using/clients/cline.md +++ b/docs/docs/using/clients/cline.md @@ -51,8 +51,8 @@ To integrate Cline with your MCP Gateway: "mcpgateway.wrapper" ], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444", - "MCP_AUTH_TOKEN": "REPLACE_WITH_MCPGATEWAY_BEARER_TOKEN", + "MCP_SERVER_URL": "http://localhost:4444", + "MCP_AUTH": "Bearer REPLACE_WITH_MCPGATEWAY_BEARER_TOKEN", "MCP_WRAPPER_LOG_LEVEL": "OFF" } } diff --git a/docs/docs/using/clients/continue.md b/docs/docs/using/clients/continue.md index e1fdd2f1f..c5a66957b 100644 --- a/docs/docs/using/clients/continue.md +++ b/docs/docs/using/clients/continue.md @@ -46,7 +46,7 @@ There are **two ways** to attach Continue to a gateway: "type": "sse", "url": "http://localhost:4444/servers/UUID_OF_SERVER_1/sse", "headers": { - "Authorization": "Bearer ${env:MCP_AUTH_TOKEN}" + "Authorization": "Bearer ${env:MCP_AUTH}" } } } @@ -57,7 +57,7 @@ There are **two ways** to attach Continue to a gateway: *Generate a token*: ```bash -export MCP_AUTH_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) +export MCP_AUTH=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) ``` ### Option B - Local stdio bridge (`mcpgateway.wrapper`) @@ -79,8 +79,8 @@ pipx install --include-deps mcp-contextforge-gateway "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "${env:MCP_AUTH_TOKEN}", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "${env:MCP_AUTH}", "MCP_TOOL_CALL_TIMEOUT": "120" } } diff --git a/docs/docs/using/clients/copilot.md b/docs/docs/using/clients/copilot.md index 46ae677d8..bc732b827 100644 --- a/docs/docs/using/clients/copilot.md +++ b/docs/docs/using/clients/copilot.md @@ -92,8 +92,8 @@ uv pip install mcp-contextforge-gateway # inside any uv/ve "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1/mcp", + "MCP_AUTH": "Bearer ", "MCP_TOOL_CALL_TIMEOUT": "120" } } @@ -111,8 +111,8 @@ That's it - VS Code spawns the stdio process, pipes JSON-RPC, and you're ready t "command": "docker", "args": [ "run", "--rm", "--network=host", "-i", - "-e", "MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1", - "-e", "MCP_AUTH_TOKEN=", + "-e", "MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1", + "-e", "MCP_AUTH=", "ghcr.io/ibm/mcp-context-forge:0.5.0", "python3", "-m", "mcpgateway.wrapper" ] @@ -142,7 +142,7 @@ Copilot routes the call β†’ Gateway β†’ tool, and prints the reply. * **Use SSE for production**, stdio for local/offline. * You can manage servers, tools and prompts from the Gateway **Admin UI** (`/admin`). * Need a bearer quickly? - `export MCP_AUTH_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key)` + `export MCP_AUTH=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key)` --- diff --git a/docs/docs/using/clients/mcp-cli.md b/docs/docs/using/clients/mcp-cli.md index 553d7af90..f790a013e 100644 --- a/docs/docs/using/clients/mcp-cli.md +++ b/docs/docs/using/clients/mcp-cli.md @@ -80,8 +80,8 @@ Create a `server_config.json` file to define your MCP Context Forge Gateway conn "command": "/path/to/mcp-context-forge/.venv/bin/python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444", + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444", "MCP_TOOL_CALL_TIMEOUT": "120" } } @@ -101,9 +101,9 @@ Create a `server_config.json` file to define your MCP Context Forge Gateway conn "--rm", "-i", "-e", - "MCP_SERVER_CATALOG_URLS=http://host.docker.internal:4444", + "MCP_SERVER_URL=http://host.docker.internal:4444", "-e", - "MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN}", + "MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN}", "--entrypoint", "uv", "ghcr.io/ibm/mcp-context-forge:0.5.0", @@ -130,7 +130,7 @@ python3 -m mcpgateway.utils.create_jwt_token -u admin --exp 10080 --secret my-te > **⚠️ Important Notes** > - Use the **full path** to your virtual environment's Python to avoid import errors > - Make sure your MCP Context Forge Gateway is running on the correct port (default: 4444) -> - The wrapper requires `MCP_SERVER_CATALOG_URLS` environment variable +> - The wrapper requires `MCP_SERVER_URL` environment variable --- @@ -320,8 +320,8 @@ In interactive mode, use these commands: ```bash # MCP Context Forge Gateway connection -export MCP_AUTH_TOKEN="your-jwt-token" -export MCP_SERVER_CATALOG_URLS="http://localhost:4444" +export MCP_AUTH="your-jwt-token" +export MCP_SERVER_URL="http://localhost:4444" # LLM Provider API keys export OPENAI_API_KEY="sk-your-openai-key" @@ -350,7 +350,7 @@ export LLM_MODEL="mistral-nemo:latest" } ``` -#### "MCP_SERVER_CATALOG_URLS environment variable is required" +#### "MCP_SERVER_URL environment variable is required" **Solution:** Ensure your `server_config.json` includes the required environment variables in the `env` section. @@ -461,8 +461,8 @@ The mcp-cli integrates with MCP Context Forge Gateway through multiple connectio "command": "/path/to/mcp-context-forge/.venv/bin/python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "your-jwt-token", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444" + "MCP_AUTH": "your-jwt-token", + "MCP_SERVER_URL": "http://localhost:4444" } } } diff --git a/docs/docs/using/clients/mcp-inspector.md b/docs/docs/using/clients/mcp-inspector.md index e99680a40..729b6c9a9 100644 --- a/docs/docs/using/clients/mcp-inspector.md +++ b/docs/docs/using/clients/mcp-inspector.md @@ -22,7 +22,7 @@ Point it at any MCP-compliant endpoint — a live Gateway **SSE** stream or |----------|-----------|--------------| | **1. Connect to Gateway (SSE)** |
```bash
npx @modelcontextprotocol/inspector \\
--url http://localhost:4444/servers/UUID_OF_SERVER_1/sse \\
--header "Authorization: Bearer $MCPGATEWAY_BEARER_TOKEN"
``` | Inspector opens `http://localhost:5173` and attaches **directly** to the gateway stream. | | **2. Connect to Gateway (Streamable HTTP)** |
```bash
npx @modelcontextprotocol/inspector \\
--url http://localhost:4444/servers/UUID_OF_SERVER_1/mcp/ \\
--header "Authorization: Bearer $MCPGATEWAY_BEARER_TOKEN"
``` | Inspector opens `http://localhost:5173` and attaches **directly** to the gateway stream. | -| **3 - Spin up the stdio wrapper in-process** |
```bash
export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN
export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1

npx @modelcontextprotocol/inspector \\
python3 -m mcpgateway.wrapper
``` | Inspector forks `python3 -m mcpgateway.wrapper`, then connects to its stdio port automatically. | +| **3 - Spin up the stdio wrapper in-process** |
```bash
export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN
export MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1/mcp

npx @modelcontextprotocol/inspector \\
python3 -m mcpgateway.wrapper
``` | Inspector forks `python3 -m mcpgateway.wrapper`, then connects to its stdio port automatically. | | **4 - Same, but via uv / uvx** |
```bash
npx @modelcontextprotocol/inspector \\
uvx python3 -m mcpgateway.wrapper
``` | Uses the super-fast **uv** virtual-env if you prefer. | | **5 - Wrapper already running** | Launch the wrapper in another shell, then:
```bash
npx @modelcontextprotocol/inspector --stdio
``` | Inspector only opens the GUI and binds to the running stdio server on stdin/stdout. | @@ -33,14 +33,14 @@ Point it at any MCP-compliant endpoint — a live Gateway **SSE** stream or Most wrappers / servers will need at least: ```bash -export MCP_SERVER_CATALOG_URLS=http://localhost:4444/servers/UUID_OF_SERVER_1 # one or many -export MCP_AUTH_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) +export MCP_SERVER_URL=http://localhost:4444/servers/UUID_OF_SERVER_1 # one or many +export MCP_AUTH=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) ``` If you point Inspector **directly** at a Gateway SSE stream, pass the header: ```bash ---header "Authorization: Bearer $MCP_AUTH_TOKEN" +--header "Authorization: Bearer $MCP_AUTH" ``` --- diff --git a/docs/docs/using/mcpgateway-wrapper.md b/docs/docs/using/mcpgateway-wrapper.md index 4b9144043..e58d8f4e4 100644 --- a/docs/docs/using/mcpgateway-wrapper.md +++ b/docs/docs/using/mcpgateway-wrapper.md @@ -30,8 +30,8 @@ export MCPGATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token \ Configure the wrapper via ENV variables: ```bash -export MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} -export MCP_SERVER_CATALOG_URLS='http://localhost:4444/servers/UUID_OF_SERVER_1' # select a virtual server +export MCP_AUTH=${MCPGATEWAY_BEARER_TOKEN} +export MCP_SERVER_URL='http://localhost:4444/servers/UUID_OF_SERVER_1/mcp' # select a virtual server export MCP_TOOL_CALL_TIMEOUT=120 # tool call timeout in seconds (optional - default 90) export MCP_WRAPPER_LOG_LEVEL=INFO # DEBUG | INFO | OFF ``` @@ -51,8 +51,8 @@ Launching it in your terminal (ex: `python3 -m mcpgateway.wrapper`) is useful fo ```bash docker run -i --rm --network=host \ - -e MCP_SERVER_CATALOG_URLS=$MCP_SERVER_CATALOG_URLS \ - -e MCP_AUTH_TOKEN=$MCP_AUTH_TOKEN \ + -e MCP_SERVER_URL=$MCP_SERVER_URL \ + -e MCP_AUTH=$MCP_AUTH \ ghcr.io/ibm/mcp-context-forge:0.5.0 \ python3 -m mcpgateway.wrapper ``` @@ -61,8 +61,8 @@ Launching it in your terminal (ex: `python3 -m mcpgateway.wrapper`) is useful fo ```bash pipx install --include-deps mcp-contextforge-gateway - MCP_AUTH_TOKEN=$MCP_AUTH_TOKEN \ - MCP_SERVER_CATALOG_URLS=$MCP_SERVER_CATALOG_URLS \ + MCP_AUTH=$MCP_AUTH \ + MCP_SERVER_URL=$MCP_SERVER_URL \ python3 -m mcpgateway.wrapper ``` @@ -83,8 +83,8 @@ The wrapper now waits for JSON-RPC on **stdin** and emits replies on **stdout**. | Variable | Purpose | Default | | ------------------------- | -------------------------------------------- | ------- | -| `MCP_SERVER_CATALOG_URLS` | Comma-sep list of `/servers/{id}` endpoints | - | -| `MCP_AUTH_TOKEN` | Bearer token the wrapper forwards to Gateway | - | +| `MCP_SERVER_URL` | Comma-sep list of `/servers/{id}` endpoints | - | +| `MCP_AUTH` | Bearer token the wrapper forwards to Gateway | - | | `MCP_TOOL_CALL_TIMEOUT` | Per-tool timeout (seconds) | `90` | | `MCP_WRAPPER_LOG_LEVEL` | `OFF`, `INFO`, `DEBUG`, ... | `INFO` | @@ -94,7 +94,7 @@ The wrapper now waits for JSON-RPC on **stdin** and emits replies on **stdout**. You can run `mcpgateway.wrapper` from any MCP client, using either `python3`, `uv`, `uvx`, `uvx`, `pipx`, `docker`, or `podman` entrypoints. -The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-gateway` module installed, able to call `mcpgateway.wrapper` and the right `env` settings exported (`MCP_SERVER_CATALOG_URLS` and `MCP_AUTH_TOKEN` at a minimum). +The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-gateway` module installed, able to call `mcpgateway.wrapper` and the right `env` settings exported (`MCP_SERVER_URL` and `MCP_AUTH` at a minimum). === "Claude Desktop (venv)" @@ -105,8 +105,8 @@ The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-g "command": "python3", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1" + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1" } } } @@ -132,8 +132,8 @@ The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-g "mcpgateway.wrapper" ], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1" + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1" } } } @@ -150,8 +150,8 @@ The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-g "command": "/path/to/python", "args": ["-m", "mcpgateway.wrapper"], "env": { - "MCP_AUTH_TOKEN": "", - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1" + "MCP_AUTH": "", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1" } } } @@ -177,8 +177,8 @@ The MCP Client calls the entrypoint, which needs to have the `mcp-contextforge-g "mcpgateway.wrapper" ], "env": { - "MCP_SERVER_CATALOG_URLS": "http://localhost:4444/servers/UUID_OF_SERVER_1", - "MCP_AUTH_TOKEN": "REPLACE_WITH_MCPGATEWAY_BEARER_TOKEN", + "MCP_SERVER_URL": "http://localhost:4444/servers/UUID_OF_SERVER_1", + "MCP_AUTH": "REPLACE_WITH_MCPGATEWAY_BEARER_TOKEN", "MCP_WRAPPER_LOG_LEVEL": "OFF" } } diff --git a/docs/docs/using/servers/index.md b/docs/docs/using/servers/index.md index e48fae734..b4856a934 100644 --- a/docs/docs/using/servers/index.md +++ b/docs/docs/using/servers/index.md @@ -101,8 +101,8 @@ python3 -m mcpgateway.translate \ ### **Testing with Wrapper** ```bash # Test through mcpgateway.wrapper -export MCP_AUTH_TOKEN=$MCPGATEWAY_BEARER_TOKEN -export MCP_SERVER_CATALOG_URLS='http://localhost:4444/servers/UUID_OF_SERVER_1' +export MCP_AUTH=$MCPGATEWAY_BEARER_TOKEN +export MCP_SERVER_URL='http://localhost:4444/servers/UUID_OF_SERVER_1' python3 -m mcpgateway.wrapper ``` diff --git a/mcpgateway/static/admin.js b/mcpgateway/static/admin.js index 05b81f257..52a37bb76 100644 --- a/mcpgateway/static/admin.js +++ b/mcpgateway/static/admin.js @@ -7048,8 +7048,8 @@ function generateConfig(server, configType) { command: "python", args: ["-m", "mcpgateway.wrapper"], env: { - MCP_AUTH_TOKEN: "your-token-here", - MCP_SERVER_CATALOG_URLS: `${baseUrl}/servers/${server.id}`, + MCP_AUTH: "your-token-here", + MCP_SERVER_URL: `${baseUrl}/servers/${server.id}/mcp`, MCP_TOOL_CALL_TIMEOUT: "120", }, }, diff --git a/mcpgateway/utils/retry_manager.py b/mcpgateway/utils/retry_manager.py index 13c1124f8..39fd8ea3c 100644 --- a/mcpgateway/utils/retry_manager.py +++ b/mcpgateway/utils/retry_manager.py @@ -149,9 +149,10 @@ # Standard import asyncio +from contextlib import asynccontextmanager import logging import random -from typing import Any, Dict, Optional +from typing import Any, AsyncContextManager, Dict, Optional # Third-Party import httpx @@ -533,6 +534,58 @@ async def delete(self, url: str, **kwargs): """ return await self.request("DELETE", url, **kwargs) + @asynccontextmanager + async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[httpx.Response]: + """ + Async context manager that yields a streaming response. Retries opening the stream. + Usage: + async with resilient.stream("POST", url, data=..., headers=...) as resp: + async for chunk in resp.aiter_bytes(): ... + """ + attempt = 0 + last_exc: Optional[Exception] = None + while attempt < self.max_retries: + try: + logging.debug("Attempt %d (stream) %s %s", attempt + 1, method, url) + stream_cm = self.client.stream(method, url, **kwargs) + async with stream_cm as resp: + if not (200 <= resp.status_code < 300 or resp.is_success): + if resp.status_code == 429: + ra = resp.headers.get("Retry-After") + if ra: + try: + wait = float(ra) + except ValueError: + wait = None + if wait: + logging.info("Rate-limited. Sleeping Retry-After=%s", wait) + await asyncio.sleep(wait) + attempt += 1 + continue + if not self._should_retry(None, resp): + # give caller the error response once and return + yield resp + return + logging.info("Stream response %s considered retryable; will retry opening.", resp.status_code) + else: + # good response -> yield it to caller + yield resp + return + except Exception as exc: + last_exc = exc + if not self._should_retry(exc, None): + raise + logging.warning("Error opening stream (will retry): %s", exc) + + backoff = min(self.base_backoff * (2**attempt), self.max_delay) + await self._sleep_with_jitter(backoff) + attempt += 1 + logging.debug("Retrying stream open (attempt %d) after backoff %.2f", attempt + 1, backoff) + + if last_exc: + raise last_exc + raise RuntimeError("Max retries reached opening stream") + async def aclose(self): """Close the underlying HTTP client gracefully. diff --git a/mcpgateway/wrapper.py b/mcpgateway/wrapper.py index e9816fc9b..b321d88b1 100644 --- a/mcpgateway/wrapper.py +++ b/mcpgateway/wrapper.py @@ -1,968 +1,691 @@ # -*- coding: utf-8 -*- """ -MCP Gateway Wrapper server. +MCP Gateway Wrapper. Copyright 2025 SPDX-License-Identifier: Apache-2.0 -Authors: Keval Mahajan, Mihai Criveti, Madhav Kandukuri +Authors: Keval Mahajan -This module implements a wrapper bridge that facilitates +MCP Client (stdio) <-> MCP Gateway Bridge + +This module implements a wrapper stdio bridge that facilitates interaction between the MCP client and the MCP gateway. It provides several functionalities, including listing tools, invoking tools, managing resources, retrieving prompts, and handling tool calls via the MCP gateway. -A **stdio** bridge that exposes a remote MCP Gateway -(HTTP-/JSON-RPC APIs) as a local MCP server. All JSON-RPC -traffic is written to **stdout**; every log or trace message -is emitted on **stderr** so that protocol messages and -diagnostics never mix. - -Environment variables: -- MCP_SERVER_CATALOG_URLS: Comma-separated list of gateway catalog URLs (required) -- MCP_AUTH_TOKEN: Bearer token for the gateway (optional) -- MCP_TOOL_CALL_TIMEOUT: Seconds to wait for a gateway RPC call (default 90) -- MCP_WRAPPER_LOG_LEVEL: Python log level name or OFF/NONE to disable logging (default INFO) - -Example: - $ export MCPGATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token --username admin --exp 10080 --secret my-test-key) - $ export MCP_AUTH_TOKEN=${MCPGATEWAY_BEARER_TOKEN} - $ export MCP_SERVER_CATALOG_URLS='http://localhost:4444/servers/UUID_OF_SERVER_1' +- All JSON-RPC traffic is written to stdout. +- All logs/diagnostics are written to stderr, ensuring clean separation. + +Environment Variables +--------------------- +- **MCP_SERVER_URL** (or `--url`): Gateway MCP endpoint URL. +- **MCP_AUTH** (or `--auth`): Authorization header value. +- **MCP_TOOL_CALL_TIMEOUT** (or `--timeout`): Response timeout in seconds (default: 60). +- **MCP_WRAPPER_LOG_LEVEL** (or `--log-level`): Logging level, or OFF to disable. +- **CONCURRENCY**: Max concurrent tool calls (default: 10). + +Example usage: +-------------- + +Method 1: Using environment variables + $ export MCP_SERVER_URL='http://localhost:4444/servers/UUID/mcp' + $ export MCP_AUTH='Bearer ' $ export MCP_TOOL_CALL_TIMEOUT=120 - $ export MCP_WRAPPER_LOG_LEVEL=DEBUG # OFF to disable logging + $ export MCP_WRAPPER_LOG_LEVEL=DEBUG $ python3 -m mcpgateway.wrapper + +Method 2: Using command-line arguments + $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer ' --timeout 120 --log-level DEBUG """ +# Future +from __future__ import annotations # Standard +import argparse import asyncio +import codecs +from contextlib import suppress +from dataclasses import dataclass +import errno +import json import logging import os +import signal import sys -from typing import Any, Dict, List, Optional, Union -from urllib.parse import urlparse +from typing import Any, AsyncIterator, Dict, List, Optional, Union # Third-Party import httpx -from mcp import types -from mcp.server import NotificationOptions, Server -from mcp.server.models import InitializationOptions -import mcp.server.stdio -from pydantic import AnyUrl # First-Party -from mcpgateway import __version__ -from mcpgateway.services.logging_service import LoggingService from mcpgateway.utils.retry_manager import ResilientHttpClient -# ----------------------------------------------------------------------------- -# Configuration -# ----------------------------------------------------------------------------- -ENV_SERVER_CATALOGS = "MCP_SERVER_CATALOG_URLS" -ENV_AUTH_TOKEN = "MCP_AUTH_TOKEN" # nosec B105 - this is an *environment variable name*, not a secret -ENV_TIMEOUT = "MCP_TOOL_CALL_TIMEOUT" -ENV_LOG_LEVEL = "MCP_WRAPPER_LOG_LEVEL" - -RAW_CATALOGS: str = os.getenv(ENV_SERVER_CATALOGS, "") -SERVER_CATALOG_URLS: List[str] = [u.strip() for u in RAW_CATALOGS.split(",") if u.strip()] +# ----------------------- +# Configuration Defaults +# ----------------------- +DEFAULT_CONCURRENCY = int(os.environ.get("CONCURRENCY", "10")) +DEFAULT_CONNECT_TIMEOUT = 15 +DEFAULT_RESPONSE_TIMEOUT = float(os.environ.get("MCP_TOOL_CALL_TIMEOUT", "60")) -AUTH_TOKEN: str = os.getenv(ENV_AUTH_TOKEN, "") -TOOL_CALL_TIMEOUT: int = int(os.getenv(ENV_TIMEOUT, "90")) +JSONRPC_PARSE_ERROR = -32700 +JSONRPC_INTERNAL_ERROR = -32603 +JSONRPC_SERVER_ERROR = -32000 -# Validate required configuration (only when run as script) -if __name__ == "__main__" and not SERVER_CATALOG_URLS: - print(f"Error: {ENV_SERVER_CATALOGS} environment variable is required", file=sys.stderr) - sys.exit(1) +# Global logger +logger = logging.getLogger("mcpgateway.wrapper") +logger.addHandler(logging.StreamHandler(sys.stderr)) +logger.propagate = False +logger.disabled = True # default: disabled +# Shutdown flag +_shutdown = asyncio.Event() -# ----------------------------------------------------------------------------- -# Base URL Extraction -# ----------------------------------------------------------------------------- -def _extract_base_url(url: str) -> str: - """Return the gateway-level base URL. - The function keeps any application root path (`APP_ROOT_PATH`) that the - remote gateway is mounted under (for example `/gateway`) while removing - the `/servers/` suffix that appears in catalog endpoints. It also - discards any query string or fragment. +def _mark_shutdown(): + """Mark the shutdown flag for graceful termination. + This is triggered when stdin closes, stdout fails, or a signal is caught. Args: - url (str): Full catalog URL, e.g. - `https://host.com/gateway/servers/UUID_OF_SERVER_1`. - - Returns: - str: Clean base URL suitable for building `/tools/`, `/prompts/`, - or `/resources/` endpoints-for example - `https://host.com/gateway`. - - Raises: - ValueError: If *url* lacks a scheme or network location. + None Examples: - >>> _extract_base_url("https://host.com/servers/UUID_OF_SERVER_2") - 'https://host.com' - >>> _extract_base_url("https://host.com/gateway/servers/UUID_OF_SERVER_2") - 'https://host.com/gateway' - >>> _extract_base_url("https://host.com/gateway/servers") - 'https://host.com/gateway' - >>> _extract_base_url("https://host.com/gateway") - 'https://host.com/gateway' - >>> _extract_base_url("invalid-url") - Traceback (most recent call last): - ... - ValueError: Invalid URL provided: invalid-url - >>> _extract_base_url("https://host.com/") - 'https://host.com/' - >>> _extract_base_url("https://host.com") - 'https://host.com' - - Note: - If the target server was started with `APP_ROOT_PATH=/gateway`, the - resulting catalog URLs include that prefix. This helper preserves the - prefix so the wrapper's follow-up calls remain correctly scoped. + >>> _mark_shutdown() # doctest: +ELLIPSIS + >>> shutting_down() + True + >>> # Reset for following doctests: + >>> _ = _shutdown.clear() """ - parsed = urlparse(url) - if not parsed.scheme or not parsed.netloc: - raise ValueError(f"Invalid URL provided: {url}") - - path = parsed.path or "" - if "/servers/" in path: - path = path.split("/servers")[0] # ".../servers/UUID_OF_SERVER_123" -> "..." - elif path.endswith("/servers"): - path = path[: -len("/servers")] # ".../servers" -> "..." - # otherwise keep the existing path (supports APP_ROOT_PATH) - - return f"{parsed.scheme}://{parsed.netloc}{path}" - - -BASE_URL: str = _extract_base_url(SERVER_CATALOG_URLS[0]) if SERVER_CATALOG_URLS else "" - -# ----------------------------------------------------------------------------- -# Logging Setup -# ----------------------------------------------------------------------------- -_log_level = os.getenv(ENV_LOG_LEVEL, "INFO").upper() -if _log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}: - logging.disable(logging.CRITICAL) -else: - logging.basicConfig( - level=getattr(logging, _log_level, logging.INFO), - format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", - stream=sys.stderr, - ) - -# Initialize logging service first -logging_service = LoggingService() -logger = logging_service.get_logger("mcpgateway.wrapper") -logger.info(f"Starting MCP wrapper {__version__}: base_url={BASE_URL}, timeout={TOOL_CALL_TIMEOUT}") + if not _shutdown.is_set(): + _shutdown.set() -# ----------------------------------------------------------------------------- -# HTTP Helpers -# ----------------------------------------------------------------------------- -async def fetch_url(url: str) -> httpx.Response: - """ - Perform an asynchronous HTTP GET request and return the response. - - This function makes authenticated HTTP requests to the MCP gateway using - optional bearer token authentication and timeout configuration. +def shutting_down() -> bool: + """Check whether the server is shutting down. Args: - url: The target URL to fetch. + None Returns: - The successful ``httpx.Response`` object. - - Raises: - httpx.RequestError: If a network problem occurs while making the request. - httpx.HTTPStatusError: If the server returns a 4xx or 5xx response. + bool: True if shutdown has been triggered, False otherwise. Examples: - Basic usage (requires running server): - - >>> import asyncio - >>> # This example would require a real server running - >>> # async def example(): - >>> # response = await fetch_url("https://httpbin.org/get") - >>> # return response.status_code - >>> # asyncio.run(example()) # Would return 200 - - Error handling: - - >>> import asyncio - >>> async def test_invalid_url(): - ... try: - ... await fetch_url("http://invalid-domain-that-does-not-exist.test") - ... except Exception as e: - ... return type(e).__name__ - >>> # asyncio.run(test_invalid_url()) # Would return 'ConnectError' or similar + >>> shutting_down() + False """ - headers = {"Authorization": f"Bearer {AUTH_TOKEN}"} if AUTH_TOKEN else {} - async with ResilientHttpClient(client_args={"timeout": TOOL_CALL_TIMEOUT}) as client: - try: - response = await client.get(url, headers=headers) - response.raise_for_status() - return response - except httpx.RequestError as err: - logger.error(f"Network error while fetching {url}: {err}") - raise - except httpx.HTTPStatusError as err: - logger.error(f"HTTP {err.response.status_code} returned for {url}: {err}") - raise - - -# ----------------------------------------------------------------------------- -# Metadata Helpers -# ----------------------------------------------------------------------------- -async def get_tools_from_mcp_server(catalog_urls: List[str]) -> List[str]: - """ - Retrieve associated tool IDs from the MCP gateway server catalogs. + return _shutdown.is_set() - This function extracts server IDs from catalog URLs, fetches the server - catalog from the gateway, and returns all tool IDs associated with the - specified servers. - Args: - catalog_urls (List[str]): List of catalog endpoint URLs. +# ----------------------- +# Utilities +# ----------------------- +def setup_logging(level: Optional[str]) -> None: + """Configure logging for the wrapper. - Returns: - List[str]: A list of tool ID strings extracted from the server catalog. + Args: + level: Logging level (e.g. "INFO", "DEBUG"), or OFF/None to disable. Examples: - Basic usage: + >>> setup_logging("DEBUG") + >>> logger.disabled + False + >>> setup_logging("OFF") + >>> logger.disabled + True + """ + if not level: + logger.disabled = True + return - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... urls = ["https://gateway.example.com/servers/server-123"] - ... # Would return tool IDs like ["tool1", "tool2"] - ... return ["get_time", "calculate_sum"] - >>> asyncio.run(example()) - ['get_time', 'calculate_sum'] + log_level = level.strip().upper() + if log_level in {"OFF", "NONE", "DISABLE", "FALSE", "0"}: + logger.disabled = True + return - Empty catalog handling: + logger.setLevel(getattr(logging, log_level, logging.INFO)) + formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s: %(message)s") + for handler in logger.handlers: + handler.setFormatter(formatter) + logger.disabled = False - >>> import asyncio - >>> async def empty_example(): - ... return [] # No tools found - >>> asyncio.run(empty_example()) - [] - """ - server_ids = [url.split("/")[-1] for url in catalog_urls] - url = f"{BASE_URL}/servers/" - response = await fetch_url(url) - catalog = response.json() - tool_ids: List[str] = [] - for entry in catalog: - if str(entry.get("id")) in server_ids: - tool_ids.extend(entry.get("associatedTools", [])) - return tool_ids - - -async def tools_metadata(tool_ids: List[str]) -> List[Dict[str, Any]]: - """ - Fetch metadata for a list of MCP tools by their IDs. - Retrieves detailed metadata including name, description, and input schema - for each specified tool from the gateway's tools endpoint. +def convert_url(url: str) -> str: + """Normalize an MCP server URL. + + - If it ends with `/sse`, replace with `/mcp`. + - If it ends with `/mcp` already, leave unchanged. + - Otherwise, append `/mcp`. Args: - tool_ids (List[str]): List of tool ID strings. + url: The input server URL. Returns: - List[Dict[str, Any]]: A list of metadata dictionaries for each tool. + str: Normalized MCP URL. Examples: - Fetching specific tools: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... tool_ids = ["get_time", "calculate_sum"] - ... # Would return metadata like: - ... return [ - ... {"name": "get_time", "description": "Get current time", "inputSchema": {}}, - ... {"name": "calculate_sum", "description": "Add numbers", "inputSchema": {}} - ... ] - >>> result = asyncio.run(example()) - >>> len(result) - 2 + >>> convert_url("http://localhost:4444/servers/uuid") + 'http://localhost:4444/servers/uuid/mcp' + >>> convert_url("http://localhost:4444/servers/uuid/sse") + 'http://localhost:4444/servers/uuid/mcp' + >>> convert_url("http://localhost:4444/servers/uuid/mcp") + 'http://localhost:4444/servers/uuid/mcp' + """ + if url.endswith("/mcp") or url.endswith("/mcp/"): + return url + if url.endswith("/sse"): + return url.replace("/sse", "/mcp") + return url + "/mcp" - Empty list handling: - >>> import asyncio - >>> async def empty_tools(): - ... return [] # No tools to fetch - >>> asyncio.run(empty_tools()) - [] +def send_to_stdout(obj: Union[dict, str]) -> None: + """Write JSON-serializable object to stdout. - Special "0" ID for all tools: + Args: + obj: Object to serialize and write. Falls back to str() if JSON fails. - >>> # tool_ids = ["0"] returns all available tools - >>> # This is handled by the conditional logic in the function + Notes: + If writing fails (e.g., broken pipe), triggers shutdown. """ - if not tool_ids: - return [] - url = f"{BASE_URL}/tools/" - response = await fetch_url(url) - data: List[Dict[str, Any]] = response.json() - if tool_ids == ["0"]: - return data - - return [tool for tool in data if tool["name"] in tool_ids] - + try: + line = json.dumps(obj, ensure_ascii=False) + except Exception: + line = str(obj) + try: + sys.stdout.write(line + "\n") + sys.stdout.flush() + except OSError as e: + if e.errno in (errno.EPIPE, errno.EINVAL): + _mark_shutdown() + else: + _mark_shutdown() -async def get_prompts_from_mcp_server(catalog_urls: List[str]) -> List[str]: - """ - Retrieve associated prompt IDs from the MCP gateway server catalogs. - Extracts server IDs from the provided catalog URLs and fetches all - prompt IDs associated with those servers from the gateway. +def make_error(message: str, code: int = JSONRPC_INTERNAL_ERROR, data: Any = None) -> dict: + """Construct a JSON-RPC error response. Args: - catalog_urls (List[str]): List of catalog endpoint URLs. + message: Error message. + code: JSON-RPC error code (default -32603). + data: Optional extra error data. Returns: - List[str]: A list of prompt ID strings. + dict: JSON-RPC error object. Examples: - Basic prompt retrieval: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... urls = ["https://gateway.example.com/servers/server-123"] - ... # Would return prompt IDs like: - ... return ["greeting_prompt", "error_handler"] - >>> asyncio.run(example()) - ['greeting_prompt', 'error_handler'] - - No prompts available: - - >>> import asyncio - >>> async def no_prompts(): - ... return [] # Server has no prompts - >>> asyncio.run(no_prompts()) - [] - """ - server_ids = [url.split("/")[-1] for url in catalog_urls] - url = f"{BASE_URL}/servers/" - response = await fetch_url(url) - catalog = response.json() - prompt_ids: List[str] = [] - for entry in catalog: - if str(entry.get("id")) in server_ids: - prompt_ids.extend(entry.get("associatedPrompts", [])) - return prompt_ids - - -async def prompts_metadata(prompt_ids: List[str]) -> List[Dict[str, Any]]: + >>> make_error("Invalid input", code=-32600) + {'jsonrpc': '2.0', 'id': 'bridge', 'error': {'code': -32600, 'message': 'Invalid input'}} + >>> make_error("Oops", data={"info": 1})["error"]["data"] + {'info': 1} """ - Fetch metadata for a list of MCP prompts by their IDs. + err: dict[str, Any] = { + "jsonrpc": "2.0", + "id": "bridge", + "error": {"code": code, "message": message}, + } + if data is not None: + err["error"]["data"] = data + return err + - Retrieves detailed metadata including name, description, and arguments - for each specified prompt from the gateway's prompts endpoint. +async def stdin_reader(queue: "asyncio.Queue[Union[dict, list, str, None]]") -> None: + """Read lines from stdin and push parsed JSON into a queue. Args: - prompt_ids (List[str]): List of prompt ID strings. + queue: Target asyncio.Queue where parsed messages are enqueued. - Returns: - List[Dict[str, Any]]: A list of metadata dictionaries for each prompt. + Notes: + - On EOF, pushes None and triggers shutdown. + - Invalid JSON produces a JSON-RPC error object. Examples: - Fetching specific prompts: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... prompt_ids = ["greeting", "farewell"] - ... # Would return metadata like: - ... return [ - ... {"name": "greeting", "description": "Welcome message", "arguments": []}, - ... {"name": "farewell", "description": "Goodbye message", "arguments": []} - ... ] - >>> result = asyncio.run(example()) - >>> len(result) - 2 - - Empty prompt list: - - >>> import asyncio - >>> async def no_prompts(): - ... return [] - >>> asyncio.run(no_prompts()) - [] - - All prompts with special ID: - - >>> # prompt_ids = ["0"] returns all available prompts - >>> # This triggers the special case in the function + >>> # Example pattern (not executed): asyncio.create_task(stdin_reader(q)) + >>> True + True """ - if not prompt_ids: - return [] - url = f"{BASE_URL}/prompts/" - response = await fetch_url(url) - data: List[Dict[str, Any]] = response.json() - if prompt_ids == ["0"]: - return data - return [pr for pr in data if str(pr.get("id")) in prompt_ids] - + while True: + line = await asyncio.to_thread(sys.stdin.readline) + if not line: + await queue.put(None) + _mark_shutdown() + break + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except Exception: + obj = make_error("Invalid JSON from stdin", JSONRPC_PARSE_ERROR, line) + await queue.put(obj) -async def get_resources_from_mcp_server(catalog_urls: List[str]) -> List[str]: - """ - Retrieve associated resource IDs from the MCP gateway server catalogs. - Extracts server IDs from catalog URLs and fetches all resource IDs - that are associated with those servers from the gateway. +# ----------------------- +# Stream Parsers +# ----------------------- +async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[str]: + """Parse newline-delimited JSON (NDJSON) from an HTTP response. Args: - catalog_urls (List[str]): List of catalog endpoint URLs. + resp: httpx.Response with NDJSON content. - Returns: - List[str]: A list of resource ID strings. + Yields: + str: Individual JSON lines as strings. Examples: - Basic resource retrieval: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... urls = ["https://gateway.example.com/servers/server-123"] - ... # Would return resource IDs like: - ... return ["config.json", "readme.md", "schema.sql"] - >>> asyncio.run(example()) - ['config.json', 'readme.md', 'schema.sql'] - - No resources found: - - >>> import asyncio - >>> async def no_resources(): - ... return [] # Server has no resources - >>> asyncio.run(no_resources()) - [] + >>> # This function is a parser for network streams; doctest uses patterns only. + >>> True + True """ - server_ids = [url.split("/")[-1] for url in catalog_urls] - url = f"{BASE_URL}/servers/" - response = await fetch_url(url) - catalog = response.json() - resource_ids: List[str] = [] - for entry in catalog: - if str(entry.get("id")) in server_ids: - resource_ids.extend(entry.get("associatedResources", [])) - return resource_ids - - -async def resources_metadata(resource_ids: List[str]) -> List[Dict[str, Any]]: - """ - Fetch metadata for a list of MCP resources by their IDs. - - Retrieves detailed metadata including URI, name, description, and MIME type - for each specified resource from the gateway's resources endpoint. + decoder = codecs.getincrementaldecoder("utf-8")() + buffer = "" + async for chunk in resp.aiter_bytes(): + if shutting_down(): + break + if not chunk: + continue + text = decoder.decode(chunk) + buffer += text + while True: + nl_idx = buffer.find("\n") + if nl_idx == -1: + break + line = buffer[:nl_idx] + buffer = buffer[nl_idx + 1 :] + if line.strip(): + yield line.strip() + tail = decoder.decode(b"", final=True) + buffer += tail + if buffer.strip(): + yield buffer.strip() + + +async def sse_events(resp: httpx.Response) -> AsyncIterator[str]: + """Parse Server-Sent Events (SSE) from an HTTP response. Args: - resource_ids (List[str]): List of resource ID strings. + resp: httpx.Response with SSE content. - Returns: - List[Dict[str, Any]]: A list of metadata dictionaries for each resource. - - Examples: - Fetching specific resources: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... resource_ids = ["config", "readme"] - ... # Would return metadata like: - ... return [ - ... {"id": "config", "uri": "file://config.json", "name": "Config", "mimeType": "application/json"}, - ... {"id": "readme", "uri": "file://readme.md", "name": "README", "mimeType": "text/markdown"} - ... ] - >>> result = asyncio.run(example()) - >>> len(result) - 2 - - Empty resource list: - - >>> import asyncio - >>> async def no_resources(): - ... return [] - >>> asyncio.run(no_resources()) - [] - - All resources with special ID: - - >>> # resource_ids = ["0"] returns all available resources - >>> # This is handled by the conditional in the function + Yields: + str: Event payload data lines. """ - if not resource_ids: - return [] - url = f"{BASE_URL}/resources/" - response = await fetch_url(url) - data: List[Dict[str, Any]] = response.json() - if resource_ids == ["0"]: - return data - return [res for res in data if str(res.get("id")) in resource_ids] - - -# ----------------------------------------------------------------------------- -# Server Handlers -# ----------------------------------------------------------------------------- -server: Server = Server("mcpgateway-wrapper") - + decoder = codecs.getincrementaldecoder("utf-8")() + buffer = "" + event_lines: List[str] = [] + async for chunk in resp.aiter_bytes(): + if shutting_down(): + break + if not chunk: + continue + text = decoder.decode(chunk) + buffer += text + while True: + nl_idx = buffer.find("\n") + if nl_idx == -1: + break + raw_line = buffer[:nl_idx] + buffer = buffer[nl_idx + 1 :] + line = raw_line.rstrip("\r") + if line == "": + if event_lines: + yield "\n".join(event_lines) + event_lines = [] + continue + if line.startswith(":"): + continue + if ":" in line: + field, value = line.split(":", 1) + value = value.lstrip(" ") + else: + field, value = line, "" + if field == "data": + event_lines.append(value) + tail = decoder.decode(b"", final=True) + buffer += tail + for line in buffer.splitlines(): + line = line.rstrip("\r") + if line == "": + if event_lines: + yield "\n".join(event_lines) + event_lines = [] + continue + if line.startswith(":"): + continue + if ":" in line: + field, value = line.split(":", 1) + value = value.lstrip(" ") + else: + field, value = line, "" + if field == "data": + event_lines.append(value) + if event_lines: + yield "\n".join(event_lines) + + +# ----------------------- +# Core HTTP forwarder +# ----------------------- +async def forward_once( + client: ResilientHttpClient, + settings: "Settings", + payload: Union[str, Dict[str, Any], List[Any]], +) -> None: + """Forward a single JSON-RPC payload to the MCP gateway and stream responses. + + The function: + - Sets content negotiation headers (JSON, NDJSON, SSE) + - Adds Authorization header when configured + - Streams the gateway response and forwards every JSON object to stdout + (supports application/json, application/x-ndjson, and text/event-stream) -@server.list_tools() -async def handle_list_tools() -> List[types.Tool]: + Args: + client: Resilient HTTP client used to make the request. + settings: Bridge configuration (URL, auth, timeouts). + payload: JSON-RPC request payload as str/dict/list. """ - List all available MCP tools exposed by the gateway. + if shutting_down(): + return - Queries the configured server catalogs to retrieve tool IDs and then - fetches metadata for each tool to construct a list of Tool objects. + headers = { + "Content-Type": "application/json; charset=utf-8", + "Accept": "application/json, application/x-ndjson, text/event-stream", + } + if settings.auth_header: + headers["Authorization"] = settings.auth_header - Returns: - List[types.Tool]: A list of Tool instances including name, description, and input schema. + body: str = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False) - Raises: - RuntimeError: If an error occurs during fetching or processing. + async with client.stream("POST", settings.server_url, data=body.encode("utf-8"), headers=headers) as resp: + ctype = (resp.headers.get("Content-Type") or "").lower() + status = resp.status_code + logger.debug("HTTP %s %s", status, ctype) - Examples: - Successful tool listing: + if shutting_down(): + return - >>> import asyncio - >>> # Mock example - would require real server and MCP setup - >>> async def example(): - ... # Would return Tool objects like: - ... from mcp import types - ... return [ - ... types.Tool(name="get_time", description="Get current time", inputSchema={}), - ... types.Tool(name="calculate", description="Perform calculation", inputSchema={}) - ... ] - >>> # result = asyncio.run(example()) - >>> # len(result) would be 2 - - Error handling: - - >>> # If gateway is unreachable, RuntimeError is raised - >>> # try: tools = await handle_list_tools() - >>> # except RuntimeError as e: print(f"Error: {e}") - """ - try: - tool_ids = ["0"] if SERVER_CATALOG_URLS[0] == BASE_URL else await get_tools_from_mcp_server(SERVER_CATALOG_URLS) - metadata = await tools_metadata(tool_ids) - tools = [] - for tool in metadata: - tool_name = tool.get("name") - if tool_name: # Only include tools with valid names - tools.append( - types.Tool( - name=str(tool_name), - description=tool.get("description", ""), - inputSchema=tool.get("inputSchema", {}), - annotations=tool.get("annotations", {}), - ) - ) - return tools - except Exception as exc: - logger.exception("Error listing tools") - raise RuntimeError(f"Error listing tools: {exc}") - - -@server.call_tool() -async def handle_call_tool(name: str, arguments: Optional[Dict[str, Any]] = None) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: - """ - Invoke a named MCP tool via the gateway's RPC endpoint. + if status < 200 or status >= 300: + send_to_stdout(make_error(f"HTTP {status}", code=status)) + return - Sends a JSON-RPC request to the gateway to execute the specified tool - with the provided arguments and returns the result as content objects. + async def _process_line(line: str): + if shutting_down(): + return + try: + obj = json.loads(line) + send_to_stdout(obj) + except Exception: + logger.warning("Invalid JSON from server: %s", line) + send_to_stdout(make_error("Invalid JSON from server", JSONRPC_PARSE_ERROR, line)) + + if "event-stream" in ctype: + async for data_payload in sse_events(resp): + if shutting_down(): + break + if not data_payload: + continue + await _process_line(data_payload) + return + + if "x-ndjson" in ctype or "ndjson" in ctype: + async for line in ndjson_lines(resp): + if shutting_down(): + break + await _process_line(line) + return + + if "application/json" in ctype: + raw = await resp.aread() + if not shutting_down(): + text = raw.decode("utf-8", errors="replace") + try: + send_to_stdout(json.loads(text)) + except Exception: + send_to_stdout(make_error("Invalid JSON response", JSONRPC_PARSE_ERROR, text)) + return + + async for line in ndjson_lines(resp): + if shutting_down(): + break + await _process_line(line) + + +async def make_request( + client: ResilientHttpClient, + settings: "Settings", + payload: Union[str, Dict[str, Any], List[Any]], + *, + max_retries: int = 5, + base_delay: float = 0.25, +) -> None: + """Make a gateway request with retry/backoff around a single forward attempt. Args: - name (str): The name of the tool to invoke. - arguments (Optional[Dict[str, Any]]): The arguments to pass to the tool method. - - Returns: - List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: - A list of content objects returned by the tool. + client: Resilient HTTP client used to make the request. + settings: Bridge configuration (URL, auth, timeouts). + payload: JSON-RPC request payload as str/dict/list. + max_retries: Maximum retry attempts upon exceptions (default 5). + base_delay: Base delay in seconds for exponential backoff (default 0.25). + """ + attempt = 0 + while not shutting_down(): + try: + await forward_once(client, settings, payload) + return + except Exception as e: + if shutting_down(): + return + logger.warning("Network or unexpected error in forward_once: %s", e) + attempt += 1 + if attempt > max_retries: + send_to_stdout(make_error("max retries exceeded", JSONRPC_SERVER_ERROR)) + return + delay = min(base_delay * (2 ** (attempt - 1)), 8.0) + await asyncio.sleep(delay) + + +# ----------------------- +# Main loop & CLI +# ----------------------- +@dataclass +class Settings: + """Bridge configuration settings. - Raises: - ValueError: If tool call fails. - RuntimeError: If the HTTP request fails or returns an error. + Args: + server_url: MCP server URL + auth_header: Authorization header (optional) + connect_timeout: HTTP connect timeout in seconds + response_timeout: Max response wait in seconds + concurrency: Max concurrent tool calls + log_level: Logging verbosity Examples: - Successful tool call: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... # Calling a time tool - ... from mcp import types - ... result = [types.TextContent(type="text", text="2024-01-15 10:30:00 UTC")] - ... return result - >>> # result = asyncio.run(example()) - >>> # result[0].text would contain the timestamp - - Tool call with arguments: - - >>> import asyncio - >>> async def calc_example(): - ... # Calling calculator tool with arguments - ... from mcp import types - ... result = [types.TextContent(type="text", text="42")] - ... return result - >>> # result = await handle_call_tool("add", {"a": 20, "b": 22}) - - Error handling: - - >>> # Tool returns error - >>> # try: await handle_call_tool("invalid_tool") - >>> # except ValueError as e: print(f"Tool error: {e}") - >>> # except RuntimeError as e: print(f"Network error: {e}") + >>> s = Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG") + >>> s.server_url + 'http://x/mcp' + >>> s.concurrency + 2 """ - if arguments is None: - arguments = {} - - logger.info(f"Calling tool {name} with args {arguments}") - payload = {"jsonrpc": "2.0", "id": 2, "method": name, "params": arguments} - headers = {"Authorization": f"Bearer {AUTH_TOKEN}"} if AUTH_TOKEN else {} - - try: - async with ResilientHttpClient(client_args={"timeout": TOOL_CALL_TIMEOUT}) as client: - resp = await client.post(f"{BASE_URL}/rpc/", json=payload, headers=headers) - resp.raise_for_status() - result = resp.json() - - if "error" in result: - error_msg = result["error"].get("message", "Unknown error") - raise ValueError(f"Tool call failed: {error_msg}") - - tool_result = result.get("result", result) - return [types.TextContent(type="text", text=str(tool_result))] - except httpx.TimeoutException as exc: - logger.error(f"Timeout calling tool {name}: {exc}") - raise RuntimeError(f"Tool call timeout: {exc}") - except Exception as exc: - logger.exception(f"Error calling tool {name}") - raise RuntimeError(f"Error calling tool: {exc}") + server_url: str + auth_header: Optional[str] + connect_timeout: float = DEFAULT_CONNECT_TIMEOUT + response_timeout: float = DEFAULT_RESPONSE_TIMEOUT + concurrency: int = DEFAULT_CONCURRENCY + log_level: Optional[str] = None -@server.list_resources() -async def handle_list_resources() -> List[types.Resource]: - """ - List all available MCP resources exposed by the gateway. - - Fetches resource IDs from the configured catalogs and retrieves - metadata to construct Resource instances. +async def main_async(settings: Settings) -> None: + """Main async loop: reads stdin JSON lines and forwards them to the gateway. - Returns: - List[types.Resource]: A list of Resource objects including URI, name, description, and MIME type. + - Spawns a reader task that pushes parsed lines to a queue. + - Uses a semaphore to cap concurrent requests. + - Creates tasks to forward each queued payload. + - Gracefully shuts down on EOF or signals. - Raises: - RuntimeError: If an error occurs during fetching or processing. + Args: + settings: Bridge configuration settings. Examples: - Successful resource listing: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... from mcp import types - ... from pydantic import AnyUrl - ... return [ - ... types.Resource( - ... uri=AnyUrl("file://config.json"), - ... name="Configuration", - ... description="App config file", - ... mimeType="application/json" - ... ) - ... ] - >>> # result = asyncio.run(example()) - >>> # result[0].name would be "Configuration" - - Empty resource list: - - >>> import asyncio - >>> async def no_resources(): - ... return [] # No resources available - >>> asyncio.run(no_resources()) - [] - - Invalid URI handling: - - >>> # Resources with invalid URIs are skipped with warnings - >>> # The function filters out resources missing required fields - """ - try: - ids = ["0"] if SERVER_CATALOG_URLS[0] == BASE_URL else await get_resources_from_mcp_server(SERVER_CATALOG_URLS) - meta = await resources_metadata(ids) - resources = [] - for r in meta: - uri = r.get("uri") - if not uri: - logger.warning(f"Resource missing URI, skipping: {r}") - continue - try: - resources.append( - types.Resource( - uri=AnyUrl(uri), - name=r.get("name", ""), - description=r.get("description", ""), - mimeType=r.get("mimeType", "text/plain"), - ) - ) - except Exception as e: - logger.warning(f"Invalid resource URI {uri}: {e}") - continue - return resources - except Exception as exc: - logger.exception("Error listing resources") - raise RuntimeError(f"Error listing resources: {exc}") - - -@server.read_resource() -async def handle_read_resource(uri: AnyUrl) -> str: + >>> # Smoke-test structure only; no network or stdin in doctest. + >>> settings = Settings("http://x/mcp", None) + >>> async def _run_once(): + ... q = asyncio.Queue() + ... # Immediately signal shutdown by marking the queue end: + ... await q.put(None) + ... _mark_shutdown() + ... # Minimal run: create then cancel tasks cleanly. + ... await asyncio.sleep(0) + >>> # Note: We avoid running main_async here due to stdin/network. + >>> True + True """ - Read and return the content of a resource by its URI. - - Fetches the resource content from the specified URI using the gateway's - HTTP interface and returns the response body as text. + queue: "asyncio.Queue[Union[dict, list, str, None]]" = asyncio.Queue() + reader_task = asyncio.create_task(stdin_reader(queue)) - Args: - uri (AnyUrl): The URI of the resource to read. - - Returns: - str: The body text of the fetched resource. + sem = asyncio.Semaphore(settings.concurrency) - Raises: - ValueError: If the resource cannot be fetched. + httpx_timeout = httpx.Timeout( + connect=settings.connect_timeout, + read=settings.response_timeout, + write=settings.response_timeout, + pool=settings.response_timeout, + ) - Examples: - Reading a text resource: + client_args = {"timeout": httpx_timeout, "http2": True} + resilient = ResilientHttpClient( + max_retries=5, + base_backoff=0.25, + max_delay=8.0, + jitter_max=0.25, + client_args=client_args, + ) - >>> import asyncio - >>> from pydantic import AnyUrl - >>> # Mock example - would require real server - >>> async def example(): - ... # Would return file content - ... return "{\\"version\\": \\"1.0\\", \\"name\\": \\"myapp\\"}" - >>> # content = await handle_read_resource(AnyUrl("file://config.json")) - >>> asyncio.run(example()) - '{"version": "1.0", "name": "myapp"}' - - Error handling: - - >>> # Invalid or unreachable URI - >>> # try: content = await handle_read_resource(AnyUrl("file://missing.txt")) - >>> # except ValueError as e: print(f"Read error: {e}") - """ + tasks: set[asyncio.Task[None]] = set() try: - response = await fetch_url(str(uri)) - return response.text - except Exception as exc: - logger.exception(f"Error reading resource {uri}") - raise ValueError(f"Failed to read resource at {uri}: {exc}") + while not shutting_down(): + item = await queue.get() + if item is None: + break + + async def _worker(payload=item): + async with sem: + if not shutting_down(): + await make_request(resilient, settings, payload) + + t = asyncio.create_task(_worker()) + tasks.add(t) + t.add_done_callback(lambda fut, s=tasks: s.discard(fut)) + + _mark_shutdown() + for t in list(tasks): + t.cancel() + if tasks: + with suppress(asyncio.CancelledError): + await asyncio.gather(*tasks) + finally: + reader_task.cancel() + with suppress(Exception): + await reader_task + with suppress(Exception): + await resilient.aclose() -@server.list_prompts() -async def handle_list_prompts() -> List[types.Prompt]: - """ - List all available MCP prompts exposed by the gateway. +def parse_args() -> Settings: + """Parse CLI arguments and environment variables into Settings. - Retrieves prompt IDs from the catalogs and fetches metadata - to create Prompt instances. + Recognized flags: + --url / MCP_SERVER_URL + --auth / MCP_AUTH + --timeout / MCP_TOOL_CALL_TIMEOUT + --log-level / MCP_WRAPPER_LOG_LEVEL Returns: - List[types.Prompt]: A list of Prompt objects including name, description, and arguments. - - Raises: - RuntimeError: If an error occurs during fetching or processing. + Settings: Parsed and normalized configuration. Examples: - Successful prompt listing: - - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... from mcp import types - ... return [ - ... types.Prompt( - ... name="greeting", - ... description="Welcome message template", - ... arguments=[{"name": "username", "description": "User's name"}] - ... ), - ... types.Prompt( - ... name="error_msg", - ... description="Error message template", - ... arguments=[{"name": "error_code", "description": "Error code"}] - ... ) - ... ] - >>> result = asyncio.run(example()) - >>> len(result) - 2 - - Empty prompt list: - - >>> import asyncio - >>> async def no_prompts(): - ... return [] - >>> asyncio.run(no_prompts()) - [] - """ - try: - ids = ["0"] if SERVER_CATALOG_URLS[0] == BASE_URL else await get_prompts_from_mcp_server(SERVER_CATALOG_URLS) - meta = await prompts_metadata(ids) - prompts = [] - for p in meta: - prompt_name = p.get("name") - if prompt_name: # Only include prompts with valid names - prompts.append( - types.Prompt( - name=str(prompt_name), - description=p.get("description", ""), - arguments=p.get("arguments", []), - ) - ) - return prompts - except Exception as exc: - logger.exception("Error listing prompts") - raise RuntimeError(f"Error listing prompts: {exc}") - - -@server.get_prompt() -async def handle_get_prompt(name: str, arguments: Optional[Dict[str, str]] = None) -> types.GetPromptResult: + >>> import sys, os + >>> _argv = sys.argv + >>> sys.argv = ["prog", "--url", "http://localhost:4444/servers/u"] + >>> try: + ... s = parse_args() + ... s.server_url.endswith("/mcp") + ... finally: + ... sys.argv = _argv + True """ - Retrieve and format a single prompt template with provided arguments. - - Fetches the prompt template from the gateway and formats it using the - provided arguments, returning a structured prompt result. + parser = argparse.ArgumentParser(description="Stdio MCP Client <-> MCP HTTP Bridge") + parser.add_argument("--url", default=os.environ.get("MCP_SERVER_URL"), help="MCP server URL (env: MCP_SERVER_URL)") + parser.add_argument("--auth", default=os.environ.get("MCP_AUTH"), help="Authorization header value (env: MCP_AUTH)") + parser.add_argument("--timeout", default=os.environ.get("MCP_TOOL_CALL_TIMEOUT"), help="Response timeout in seconds") + parser.add_argument( + "--log-level", + default=os.environ.get("MCP_WRAPPER_LOG_LEVEL", "INFO"), + help="Enable logging at this level (case-insensitive, default: disabled)", + ) + args = parser.parse_args() + + if not args.url: + print("Error: MCP server URL must be provided via --url or MCP_SERVER_URL", file=sys.stderr) + sys.exit(2) + + server_url = convert_url(args.url) + response_timeout = float(args.timeout) if args.timeout else DEFAULT_RESPONSE_TIMEOUT + + return Settings( + server_url=server_url, + auth_header=args.auth, + connect_timeout=DEFAULT_CONNECT_TIMEOUT, + response_timeout=response_timeout, + log_level=args.log_level, + concurrency=DEFAULT_CONCURRENCY, + ) - Args: - name (str): The unique name of the prompt to fetch. - arguments (Optional[Dict[str, str]]): A mapping of placeholder names to replacement values. - Returns: - types.GetPromptResult: Contains description and list of formatted PromptMessage instances. +def _install_signal_handlers(loop: asyncio.AbstractEventLoop) -> None: + """Install SIGINT/SIGTERM handlers that trigger graceful shutdown. - Raises: - ValueError: If fetching or formatting fails. + Args: + loop: The asyncio event loop to attach handlers to. Examples: - Basic prompt retrieval: - >>> import asyncio - >>> # Mock example - would require real server - >>> async def example(): - ... from mcp import types - ... return types.GetPromptResult( - ... description="Welcome message", - ... messages=[ - ... types.PromptMessage( - ... role="user", - ... content=types.TextContent(type="text", text="Hello Alice!") - ... ) - ... ] - ... ) - >>> # result = await handle_get_prompt("greeting", {"username": "Alice"}) - - Template formatting: - - >>> # Prompt template: "Hello {username}!" - >>> # Arguments: {"username": "Bob"} - >>> # Result: "Hello Bob!" - - Missing argument error: - - >>> # Template requires {username} but no arguments provided - >>> # try: await handle_get_prompt("greeting") - >>> # except ValueError as e: print(f"Missing placeholder: {e}") - - Prompt not found: - - >>> # try: await handle_get_prompt("nonexistent") - >>> # except ValueError as e: print(f"Prompt error: {e}") + >>> loop = asyncio.new_event_loop() + >>> _install_signal_handlers(loop) # doctest: +ELLIPSIS + >>> loop.close() """ - try: - url = f"{BASE_URL}/prompts/{name}" - response = await fetch_url(url) - prompt_data = response.json() + for sig in (getattr(signal, "SIGINT", None), getattr(signal, "SIGTERM", None)): + if sig is None: + continue + with suppress(NotImplementedError): + loop.add_signal_handler(sig, _mark_shutdown) - template = prompt_data.get("template", "") - try: - formatted = template.format(**(arguments or {})) - except KeyError as exc: - raise ValueError(f"Missing placeholder in arguments: {exc}") - except Exception as exc: - raise ValueError(f"Error formatting prompt: {exc}") - - return types.GetPromptResult( - description=prompt_data.get("description", ""), - messages=[ - types.PromptMessage( - role="user", - content=types.TextContent(type="text", text=formatted), - ) - ], - ) - except ValueError: - raise - except Exception as exc: - logger.exception(f"Error getting prompt {name}") - raise ValueError(f"Failed to fetch prompt '{name}': {exc}") - - -async def main() -> None: - """ - Main entry point to start the MCP stdio server. - - Initializes the server over standard IO, registers capabilities, - and begins listening for JSON-RPC messages. This function handles - the complete server lifecycle including graceful shutdown. - This function should only be called in a script context. +def main() -> None: + """Entry point for the MCP stdio wrapper. - Raises: - RuntimeError: If the server fails to start. - - Examples: - Starting the server: + - Parses args/env vars into Settings + - Configures logging + - Runs the async main loop with signal handling - >>> import asyncio - >>> # In a real script context: - >>> # if __name__ == "__main__": - >>> # asyncio.run(main()) - - Server initialization: - - >>> # Server starts with stdio transport - >>> # Registers MCP capabilities for tools, prompts, resources - >>> # Begins processing JSON-RPC messages from stdin - >>> # Sends responses to stdout + Args: + None + """ + settings = parse_args() + setup_logging(settings.log_level) + if not logger.disabled: + logger.info("Starting MCP stdio wrapper -> %s", settings.server_url) - Error handling: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + _install_signal_handlers(loop) - >>> # Server startup failures raise RuntimeError - >>> # Keyboard interrupts are handled gracefully - >>> # All errors are logged appropriately - """ try: - async with mcp.server.stdio.stdio_server() as (reader, writer): - await server.run( - reader, - writer, - InitializationOptions( - server_name="mcpgateway-wrapper", - server_version=__version__, - capabilities=server.get_capabilities(notification_options=NotificationOptions(), experimental_capabilities={}), - ), - ) - except Exception as exc: - logger.exception("Server failed to start") - raise RuntimeError(f"Server startup failed: {exc}") + loop.run_until_complete(main_async(settings)) + finally: + loop.run_until_complete(asyncio.sleep(0)) + with suppress(Exception): + loop.close() + if not logger.disabled: + logger.info("Shutdown complete.") if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Server interrupted by user") - except Exception: - logger.exception("Server failed") - sys.exit(1) - finally: - logger.info("Wrapper shutdown complete") + main() diff --git a/pyproject.toml b/pyproject.toml index 27a1ed4f7..b3d0c206a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dependencies = [ "filelock>=3.19.1", "gunicorn>=23.0.0", "httpx>=0.28.1", + "httpx[http2]>=0.28.1", "jinja2>=3.1.6", "jq>=1.10.0", "jsonpath-ng>=1.7.0", diff --git a/tests/playwright/README.md b/tests/playwright/README.md index 3559aeee1..f740c2e8f 100644 --- a/tests/playwright/README.md +++ b/tests/playwright/README.md @@ -130,7 +130,7 @@ def test_with_page_object(page: Page, base_url: str): export TEST_BASE_URL=http://localhost:8000 # Authentication token -export MCP_AUTH_TOKEN=your-test-token +export MCP_AUTH=your-test-token # Playwright options export PWDEBUG=1 # Enable Playwright Inspector diff --git a/tests/playwright/conftest.py b/tests/playwright/conftest.py index 1ed04ed1d..c3ead5d2b 100644 --- a/tests/playwright/conftest.py +++ b/tests/playwright/conftest.py @@ -16,7 +16,7 @@ # Get configuration from environment BASE_URL = os.getenv("TEST_BASE_URL", "http://localhost:8000") -API_TOKEN = os.getenv("MCP_AUTH_TOKEN", "test-token") +API_TOKEN = os.getenv("MCP_AUTH", "test-token") # Basic Auth credentials - these MUST be set in environment BASIC_AUTH_USER = os.getenv("BASIC_AUTH_USER", "admin") diff --git a/tests/unit/mcpgateway/test_wrapper.py b/tests/unit/mcpgateway/test_wrapper.py index fbde0028d..dea7db196 100644 --- a/tests/unit/mcpgateway/test_wrapper.py +++ b/tests/unit/mcpgateway/test_wrapper.py @@ -10,551 +10,115 @@ *mcpgateway.wrapper*. """ -# Future -from __future__ import annotations - -# Standard import asyncio -import importlib +import json import sys -from types import ModuleType -from typing import Any, Dict, List - -# Third-Party +import types import pytest -# ───────────────────────────────────────────────────────────────────────────── -# Fake "mcp" package hierarchy -# ───────────────────────────────────────────────────────────────────────────── - - -def _install_fake_mcp(monkeypatch) -> None: - """Install a dummy *mcp* package into *sys.modules* (idempotent).""" - # wipe any existing real package - for name in list(sys.modules): - if name == "mcp" or name.startswith("mcp."): - sys.modules.pop(name) - - if "mcp" in sys.modules: # already stubbed - return - - mcp = ModuleType("mcp") - server_mod = ModuleType("mcp.server") - stdio_mod = ModuleType("mcp.server.stdio") - models_mod = ModuleType("mcp.server.models") - types_mod = ModuleType("mcp.types") - client_mod = ModuleType("mcp.client") - sse_mod = ModuleType("mcp.client.sse") - streamable_http_mod = ModuleType("mcp.client.streamable_http") - - # Add missing ClientSession class that gateway_service.py needs - class _FakeClientSession: - def __init__(self, *args, **kwargs): - pass - - mcp.ClientSession = _FakeClientSession - - # Add missing sse_client function that gateway_service.py needs - def _fake_sse_client(*args, **kwargs): - pass - - def _fake_streamablehttp_client(*args, **kwargs): - pass - - sse_mod.sse_client = _fake_sse_client - streamable_http_mod.streamablehttp_client = _fake_streamablehttp_client - client_mod.sse = sse_mod - client_mod.streamable_http = streamable_http_mod - mcp.client = client_mod - - # --- minimalist Server faΓ§ade ---------------------------------------- # - class _FakeServer: - was_run: bool = False # class-level flag - - def __init__(self, name: str): - self.name = name - - # decorator helpers just hand the coroutine straight back - def list_tools(self): - return lambda fn: fn - - def call_tool(self): - return lambda fn: fn - - def list_resources(self): - return lambda fn: fn - - def read_resource(self): - return lambda fn: fn - - def list_prompts(self): - return lambda fn: fn - - def get_prompt(self): - return lambda fn: fn - - def get_capabilities(self, **_): # used by wrapper - return {} - - async def run(self, *_a, **_kw): # invoked by main() - _FakeServer.was_run = True - - server_mod.Server = _FakeServer - server_mod.NotificationOptions = type("NotificationOptions", (), {}) - server_mod.stdio = stdio_mod - server_mod.models = models_mod - mcp.server = server_mod # type: ignore[attr-defined] - - # --- stdio helper used by wrapper.main() ------------------------------ # - class _DummyStdIOServer: - async def __aenter__(self): - return "reader", "writer" - - async def __aexit__(self, *_): - return False - - stdio_mod.stdio_server = lambda: _DummyStdIOServer() # type: ignore[attr-defined] - - # --- pydantic placeholder that *accepts* any kwargs ------------------- # - class _InitOpts: - def __init__(self, *_, **__): # swallow anything - pass - - models_mod.InitializationOptions = _InitOpts - - # --- ultra-thin DTOs referenced by wrapper's handlers ---------------- # - class _Tool: - def __init__(self, name: str, description: str, inputSchema: dict, annotations: dict = None): - self.name, self.description, self.inputSchema = name, description, inputSchema - self.annotations = annotations or {} - - class _Resource: - def __init__(self, uri: str, name: str, description: str, mimeType: str): - self.uri, self.name, self.description, self.mimeType = uri, name, description, mimeType - - class _Prompt: - def __init__(self, name: str, description: str, arguments: list): - self.name, self.description, self.arguments = name, description, arguments - - class _TextContent: - def __init__(self, type: str, text: str): - self.type, self.text = type, text - - class _PromptMessage: - def __init__(self, role: str, content: Any): - self.role, self.content = role, content - - class _GetPromptResult: - def __init__(self, description: str, messages: list): - self.description, self.messages = description, messages - - types_mod.Tool = _Tool - types_mod.Resource = _Resource - types_mod.Prompt = _Prompt - types_mod.TextContent = _TextContent - types_mod.ImageContent = object - types_mod.EmbeddedResource = object - types_mod.PromptMessage = _PromptMessage - types_mod.GetPromptResult = _GetPromptResult - mcp.types = types_mod # type: ignore[attr-defined] - - # register the whole fake tree - sys.modules.update( - { - "mcp": mcp, - "mcp.server": server_mod, - "mcp.server.stdio": stdio_mod, - "mcp.server.models": models_mod, - "mcp.types": types_mod, - "mcp.client": client_mod, - "mcp.client.sse": sse_mod, - "mcp.client.streamable_http": streamable_http_mod, - } - ) - monkeypatch.syspath_prepend(".") - - -# ───────────────────────────────────────────────────────────────────────────── -# Pytest plumbing -# ───────────────────────────────────────────────────────────────────────────── - - -@pytest.fixture(scope="session") -def event_loop(): - loop = asyncio.new_event_loop() - yield loop - loop.close() +import mcpgateway.wrapper as wrapper -@pytest.fixture() -def wrapper(monkeypatch): - """(Re)imports *mcpgateway.wrapper* with the fake MCP stack in place.""" - _install_fake_mcp(monkeypatch) - monkeypatch.setenv("MCP_SERVER_CATALOG_URLS", "https://host.com/servers/1") - sys.modules.pop("mcpgateway.wrapper", None) # ensure fresh import - return importlib.import_module("mcpgateway.wrapper") +# ------------------- +# Utilities +# ------------------- +def test_convert_url_variants(): + assert wrapper.convert_url("http://x/servers/uuid") == "http://x/servers/uuid/mcp" + assert wrapper.convert_url("http://x/servers/uuid/") == "http://x/servers/uuid//mcp" + assert wrapper.convert_url("http://x/servers/uuid/mcp") == "http://x/servers/uuid/mcp" + assert wrapper.convert_url("http://x/servers/uuid/sse") == "http://x/servers/uuid/mcp" -# ───────────────────────────────────────────────────────────────────────────── -# Tiny *httpx* doubles -# ───────────────────────────────────────────────────────────────────────────── +def test_make_error_defaults_and_data(): + err = wrapper.make_error("oops") + assert err["error"]["message"] == "oops" + assert err["error"]["code"] == wrapper.JSONRPC_INTERNAL_ERROR + err2 = wrapper.make_error("bad", code=-32099, data={"x": 1}) + assert err2["error"]["data"] == {"x": 1} + assert err2["error"]["code"] == -32099 -class _Resp: - """Bare-bones httpx.Response-like test double.""" +def test_setup_logging_on_and_off(caplog): + wrapper.setup_logging("DEBUG") + assert wrapper.logger.disabled is False + wrapper.logger.debug("hello debug") + wrapper.setup_logging("OFF") + assert wrapper.logger.disabled is True - def __init__(self, *, json_data=None, text="OK", status: int = 200): - self._json = json_data - self.text = text - self.status_code = status - # minimal surface used by wrapper - def json(self): - return self._json +def test_shutting_down_and_mark_shutdown(): + # Reset first + wrapper._shutdown.clear() + assert not wrapper.shutting_down() + wrapper._mark_shutdown() + assert wrapper.shutting_down() + # Reset again for further tests + wrapper._shutdown.clear() + assert not wrapper.shutting_down() - def raise_for_status(self): - if self.status_code >= 400: - # Third-Party - import httpx - req = httpx.Request("GET", "x") - raise httpx.HTTPStatusError("err", request=req, response=httpx.Response(self.status_code, request=req)) +def test_send_to_stdout_json_and_str(monkeypatch): + captured = [] + def fake_write(s): + captured.append(s) + return len(s) -# ───────────────────────────────────────────────────────────────────────────── -# Helper for POST-based tool-call tests -# ───────────────────────────────────────────────────────────────────────────── + def fake_flush(): + return None + monkeypatch.setattr(sys.stdout, "write", fake_write) + monkeypatch.setattr(sys.stdout, "flush", fake_flush) -def _patch_client(monkeypatch, wrapper, *, json=None, exc=None): - class _Client: - def __init__(self, *_, **__): - pass - - async def __aenter__(self): - return self - - async def __aexit__(self, *_): - return False - - async def post(self, *_a, **_k): - if exc: - raise exc - return _Resp(json_data=json) - - monkeypatch.setattr(wrapper, "ResilientHttpClient", _Client) - - -# ───────────────────────────────────────────────────────────────────────────── -# Extra helper for fetch-json stubs used by metadata tests -# ───────────────────────────────────────────────────────────────────────────── - - -def _json_fetcher(payload: Any): - async def _fake(_url: str): - return _Resp(json_data=payload) - - return _fake - - -# ───────────────────────────────────────────────────────────────────────────── -# Unit tests -# ───────────────────────────────────────────────────────────────────────────── - -# --- _extract_base_url happy-path parametrised ---------------------------- # - - -@pytest.mark.parametrize( - ("raw", "expected"), - [ - ("https://x.com/s/1", "https://x.com/s/1"), # path preserved - ("https://x.com/gw/servers/99", "https://x.com/gw"), - ("https://x.com/gw/servers", "https://x.com/gw"), - ("https://x.com/gw", "https://x.com/gw"), - ], -) -def test_extract_base_url(raw, expected, wrapper): - assert wrapper._extract_base_url(raw) == expected - - -# --- _extract_base_url error branch ---------------------------------------- # - - -def test_extract_base_url_invalid(wrapper): - with pytest.raises(ValueError): - wrapper._extract_base_url("just-text-no-scheme") - - -# --- fetch_url success / error paths --------------------------------------- # + wrapper.send_to_stdout({"a": 1}) + wrapper.send_to_stdout("plain text") + assert any('"a": 1' in s for s in captured) + assert any("plain text" in s for s in captured) +# ------------------- +# Async stream parsers +# ------------------- @pytest.mark.asyncio -async def test_fetch_url_ok(monkeypatch, wrapper): - class _Client: - def __init__(self, *_, **__): - pass - - async def __aenter__(self): - return self - - async def __aexit__(self, *_): - return False - - async def get(self, url, **_): - _Client.url = url # Track the URL for verification - return _Resp(json_data={"ok": 1}) # Simulate a successful response - - # Monkeypatch ResilientHttpClient with our mock - monkeypatch.setattr(wrapper, "ResilientHttpClient", _Client) - - # Run the fetch_url method - r = await wrapper.fetch_url("u") - - # Verify the response and the URL used - assert r.json() == {"ok": 1} - assert _Client.url == "u" +async def test_ndjson_lines_basic(): + async def fake_iter_bytes(): + yield b'{"a":1}\n{"b":2}\n' + resp = types.SimpleNamespace(aiter_bytes=fake_iter_bytes) + lines = [l async for l in wrapper.ndjson_lines(resp)] + assert lines == ['{"a":1}', '{"b":2}'] @pytest.mark.asyncio -async def test_fetch_url_request_error(monkeypatch, wrapper): - # Third-Party - import httpx - - class _Client: - def __init__(self, *_, **__): - pass - - async def __aenter__(self): - return self - - async def __aexit__(self, *_): - return False - - async def get(self, *_a, **_k): - raise httpx.RequestError("net", request=httpx.Request("GET", "u")) - - monkeypatch.setattr(wrapper, "ResilientHttpClient", _Client) - with pytest.raises(httpx.RequestError): - await wrapper.fetch_url("u") - - -@pytest.mark.asyncio -async def test_fetch_url_http_status(monkeypatch, wrapper): - # Simulate ResilientHttpClient - class _Client: - def __init__(self, *_, **__): - pass - - async def __aenter__(self): - return self - - async def __aexit__(self, *_): - return False - - async def get(self, *_a, **_k): - # Simulating a 500 Internal Server Error - return _Resp(status=500) - - # Monkeypatch to replace ResilientHttpClient with _Client - monkeypatch.setattr(wrapper, "ResilientHttpClient", _Client) - - # Third-Party - import httpx - - # Run the test to ensure that an HTTPStatusError is raised - with pytest.raises(httpx.HTTPStatusError): - await wrapper.fetch_url("u") - - -# --- handle_call_tool ------------------------------------------------------ # - - -@pytest.mark.asyncio -async def test_handle_call_tool_ok(monkeypatch, wrapper): - _patch_client(monkeypatch, wrapper, json={"result": "pong"}) - out = await wrapper.handle_call_tool("ping", {}) - assert out[0].text == "pong" - - -@pytest.mark.asyncio -async def test_handle_call_tool_error(monkeypatch, wrapper): - _patch_client(monkeypatch, wrapper, json={"error": {"message": "bad"}}) - with pytest.raises(RuntimeError, match=r"Tool call failed: bad"): - await wrapper.handle_call_tool("x", {}) - - -@pytest.mark.asyncio -async def test_handle_call_tool_timeout(monkeypatch, wrapper): - # Third-Party - import httpx - - _patch_client(monkeypatch, wrapper, exc=httpx.TimeoutException("t")) - with pytest.raises(RuntimeError, match=r"timeout"): - await wrapper.handle_call_tool("x", {}) - - -# --- handle_read_resource -------------------------------------------------- # - - -@pytest.mark.asyncio -async def test_read_resource(monkeypatch, wrapper): - async def _fake(u): - return _Resp(text="body") - - monkeypatch.setattr(wrapper, "fetch_url", _fake) - assert await wrapper.handle_read_resource("u") == "body" - - -# --- handle_get_prompt ---------------------------------------------------- # - - -@pytest.mark.asyncio -async def test_get_prompt(monkeypatch, wrapper): - async def _fake(_): - return _Resp(json_data={"template": "Hi {n}", "description": ""}) - - monkeypatch.setattr(wrapper, "fetch_url", _fake) - res = await wrapper.handle_get_prompt("greet", {"n": "Bob"}) - assert res.messages[0].content.text == "Hi Bob" - - -@pytest.mark.asyncio -async def test_get_prompt_missing(monkeypatch, wrapper): - async def _fake(_): - return _Resp(json_data={"template": "Hi {n}"}) - - monkeypatch.setattr(wrapper, "fetch_url", _fake) - with pytest.raises(ValueError, match="Missing placeholder"): - await wrapper.handle_get_prompt("greet", {}) - - -# --- handle_list_tools branch ---------------------------------------------- # - - -@pytest.mark.asyncio -async def test_handle_list_tools(monkeypatch, wrapper): - async def _ids(_): # noqa: D401 - return ["1"] - - async def _meta(_): - return [{"name": "A", "description": "", "inputSchema": {}}] - - monkeypatch.setattr(wrapper, "get_tools_from_mcp_server", _ids) - monkeypatch.setattr(wrapper, "tools_metadata", _meta) - tools = await wrapper.handle_list_tools() - assert tools and tools[0].name == "A" - - -# --- get_tools_from_mcp_server & tools_metadata branches ------------------- # - - -@pytest.mark.asyncio -async def test_get_tools_and_metadata(monkeypatch, wrapper): - # fake catalog β†’ two servers with associated tools - catalog = [ - {"id": "1", "associatedTools": ["tool1", "tool2"]}, - {"id": "2", "associatedTools": ["tool3"]}, - ] - monkeypatch.setattr(wrapper, "fetch_url", _json_fetcher(catalog)) - out = await wrapper.get_tools_from_mcp_server(["https://host.com/servers/1"]) - assert out == ["tool1", "tool2"] - - # now cover tools_metadata *filter* & *all* paths - tools_payload = [ - {"id": "10", "name": "A"}, - {"id": "11", "name": "B"}, - ] - monkeypatch.setattr(wrapper, "fetch_url", _json_fetcher(tools_payload)) - subset = await wrapper.tools_metadata(["A"]) - assert subset == [{"id": "10", "name": "A"}] - - everything = await wrapper.tools_metadata(["0"]) - assert everything == tools_payload - - -# --- get_resources_from_mcp_server & get_prompts_from_mcp_server ----------- # - - -@pytest.mark.asyncio -async def test_get_resources_and_prompts(monkeypatch, wrapper): - catalog = [ - {"id": "1", "associatedResources": ["r1"]}, - {"id": "2", "associatedPrompts": ["p1"]}, - ] - monkeypatch.setattr(wrapper, "fetch_url", _json_fetcher(catalog)) - - r_ids = await wrapper.get_resources_from_mcp_server(["https://host.com/servers/1"]) - assert r_ids == ["r1"] - - p_ids = await wrapper.get_prompts_from_mcp_server(["https://host.com/servers/2"]) - assert p_ids == ["p1"] - - -# --- resources_metadata & prompts_metadata branches ------------------------ # - - -@pytest.mark.asyncio -async def test_resources_and_prompts_metadata(monkeypatch, wrapper): - resources_payload: List[Dict[str, Any]] = [ - {"id": "r1", "uri": "https://good.com/x", "name": "R"}, - {"id": "r2", "uri": "https://good.com/y", "name": "S"}, - ] - monkeypatch.setattr(wrapper, "fetch_url", _json_fetcher(resources_payload)) - assert await wrapper.resources_metadata(["r1"]) == [resources_payload[0]] - assert await wrapper.resources_metadata(["0"]) == resources_payload - - prompts_payload = [ - {"id": "p1", "name": "P", "description": "D", "arguments": []}, - {"id": "p2", "name": "Q", "description": "", "arguments": []}, - ] - monkeypatch.setattr(wrapper, "fetch_url", _json_fetcher(prompts_payload)) - assert await wrapper.prompts_metadata(["p2"]) == [prompts_payload[1]] - assert await wrapper.prompts_metadata(["0"]) == prompts_payload - - -# --- handle_list_resources - skip invalid URI & keep good one -------------- # - - -@pytest.mark.asyncio -async def test_handle_list_resources(monkeypatch, wrapper): - async def _ids(_catalog_urls): - return ["xyz"] - - async def _meta(_ids): - return [ - {"uri": "https://valid.com", "name": "OK", "description": "", "mimeType": "text/plain"}, - {"uri": "not-a-url", "name": "BAD", "description": "", "mimeType": "text/plain"}, - ] - - monkeypatch.setattr(wrapper, "get_resources_from_mcp_server", _ids) - monkeypatch.setattr(wrapper, "resources_metadata", _meta) - - out = await wrapper.handle_list_resources() - assert len(out) == 1 and str(out[0].uri).rstrip("/") == "https://valid.com" - - -# --- handle_list_prompts happy path ---------------------------------------- # - - -@pytest.mark.asyncio -async def test_handle_list_prompts(monkeypatch, wrapper): - async def _ids(_): - return ["p1"] - - async def _meta(_): - return [{"name": "Hello", "description": "", "arguments": []}] - - monkeypatch.setattr(wrapper, "get_prompts_from_mcp_server", _ids) - monkeypatch.setattr(wrapper, "prompts_metadata", _meta) - - res = await wrapper.handle_list_prompts() - assert res and res[0].name == "Hello" - - -# --- wrapper.main wiring (ensures Server.run invoked) ---------------------- # - +async def test_sse_events_basic(): + async def fake_iter_bytes(): + yield b"data: first\n\ndata: second\n\n" + resp = types.SimpleNamespace(aiter_bytes=fake_iter_bytes) + events = [e async for e in wrapper.sse_events(resp)] + assert events == ["first", "second"] + + +# ------------------- +# Settings dataclass +# ------------------- +def test_settings_defaults(): + s = wrapper.Settings("http://x/mcp", "Bearer token", 5, 10, 2, "DEBUG") + assert s.server_url == "http://x/mcp" + assert s.auth_header == "Bearer token" + assert s.concurrency == 2 + + +# ------------------- +# parse_args +# ------------------- +def test_parse_args_with_env(monkeypatch): + monkeypatch.setenv("MCP_SERVER_URL", "http://localhost:4444/servers/uuid") + monkeypatch.setenv("MCP_AUTH", "Bearer 123") + sys_argv = sys.argv + sys.argv = ["prog"] + try: + s = wrapper.parse_args() + assert s.server_url.endswith("/mcp") + assert s.auth_header == "Bearer 123" + finally: + sys.argv = sys_argv -def test_main_runs_ok(wrapper): - wrapper.server.__class__.was_run = False # reset flag - asyncio.run(wrapper.main()) - assert wrapper.server.__class__.was_run From 82e9f58c31a798d949eb1e0575edc0bc28649014 Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Mon, 18 Aug 2025 20:00:23 +0530 Subject: [PATCH 2/7] resolved flake8 issues Signed-off-by: Keval Mahajan --- mcpgateway/utils/retry_manager.py | 32 ++++++++++++++++++++++++------- mcpgateway/wrapper.py | 6 ++++-- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/mcpgateway/utils/retry_manager.py b/mcpgateway/utils/retry_manager.py index 39fd8ea3c..5d19e5c40 100644 --- a/mcpgateway/utils/retry_manager.py +++ b/mcpgateway/utils/retry_manager.py @@ -534,13 +534,31 @@ async def delete(self, url: str, **kwargs): """ return await self.request("DELETE", url, **kwargs) - @asynccontextmanager + @asynccontextmanager # noqa: DAR401 async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[httpx.Response]: - """ - Async context manager that yields a streaming response. Retries opening the stream. - Usage: - async with resilient.stream("POST", url, data=..., headers=...) as resp: - async for chunk in resp.aiter_bytes(): ... + """Open a resilient streaming HTTP request. + + Args: + method: HTTP method to use (e.g. "GET", "POST") + url: URL to send the request to + **kwargs: Additional parameters to pass to the request + + Yields: + HTTP response object with streaming capability + + Raises: + Exception: If a non-retryable error occurs while opening the stream + RuntimeError: If the maximum number of retries is exceeded + + Examples: + >>> client = ResilientHttpClient() + >>> import inspect + >>> inspect.isasyncgenfunction(client.stream) + True + >>> async def fetch(): + ... async with client.stream("GET", "https://example.com") as response: + ... async for chunk in response.aiter_bytes(): + ... print(chunk) """ attempt = 0 last_exc: Optional[Exception] = None @@ -583,7 +601,7 @@ async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[h logging.debug("Retrying stream open (attempt %d) after backoff %.2f", attempt + 1, backoff) if last_exc: - raise last_exc + raise RuntimeError(last_exc) raise RuntimeError("Max retries reached opening stream") async def aclose(self): diff --git a/mcpgateway/wrapper.py b/mcpgateway/wrapper.py index b321d88b1..073c10cff 100644 --- a/mcpgateway/wrapper.py +++ b/mcpgateway/wrapper.py @@ -38,6 +38,7 @@ Method 2: Using command-line arguments $ python3 -m mcpgateway.wrapper --url 'http://localhost:4444/servers/UUID/mcp' --auth 'Bearer ' --timeout 120 --log-level DEBUG """ + # Future from __future__ import annotations @@ -289,7 +290,7 @@ async def ndjson_lines(resp: httpx.Response) -> AsyncIterator[str]: if nl_idx == -1: break line = buffer[:nl_idx] - buffer = buffer[nl_idx + 1 :] + buffer = buffer[nl_idx + 1 :] # noqa: E203 if line.strip(): yield line.strip() tail = decoder.decode(b"", final=True) @@ -322,7 +323,8 @@ async def sse_events(resp: httpx.Response) -> AsyncIterator[str]: if nl_idx == -1: break raw_line = buffer[:nl_idx] - buffer = buffer[nl_idx + 1 :] + buffer = buffer[nl_idx + 1 :] # noqa: E203 + line = raw_line.rstrip("\r") if line == "": if event_lines: From 84275e1de0692deb473d8847a0e586c7745dbcb4 Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Mon, 18 Aug 2025 20:08:09 +0530 Subject: [PATCH 3/7] fixed stream doctest Signed-off-by: Keval Mahajan --- mcpgateway/utils/retry_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mcpgateway/utils/retry_manager.py b/mcpgateway/utils/retry_manager.py index 5d19e5c40..d34a1222f 100644 --- a/mcpgateway/utils/retry_manager.py +++ b/mcpgateway/utils/retry_manager.py @@ -534,7 +534,7 @@ async def delete(self, url: str, **kwargs): """ return await self.request("DELETE", url, **kwargs) - @asynccontextmanager # noqa: DAR401 + @asynccontextmanager async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[httpx.Response]: """Open a resilient streaming HTTP request. @@ -552,8 +552,8 @@ async def stream(self, method: str, url: str, **kwargs) -> AsyncContextManager[h Examples: >>> client = ResilientHttpClient() - >>> import inspect - >>> inspect.isasyncgenfunction(client.stream) + >>> import contextlib + >>> isinstance(client.stream("GET", "https://example.com"), contextlib.AbstractAsyncContextManager) True >>> async def fetch(): ... async with client.stream("GET", "https://example.com") as response: From 9e87d92bb7d93fc3602b56c73579325f772079db Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Mon, 18 Aug 2025 21:04:15 +0530 Subject: [PATCH 4/7] Fixed Test Cases Signed-off-by: Keval Mahajan --- tests/unit/mcpgateway/test_wrapper.py | 265 ++++++++++++++++-- .../mcpgateway/utils/test_retry_manager.py | 27 ++ 2 files changed, 269 insertions(+), 23 deletions(-) diff --git a/tests/unit/mcpgateway/test_wrapper.py b/tests/unit/mcpgateway/test_wrapper.py index dea7db196..bf6df0d2c 100644 --- a/tests/unit/mcpgateway/test_wrapper.py +++ b/tests/unit/mcpgateway/test_wrapper.py @@ -1,24 +1,22 @@ # -*- coding: utf-8 -*- -"""Tests for the MCP *wrapper* module (single file, full coverage). - -Copyright 2025 -SPDX-License-Identifier: Apache-2.0 -Authors: Mihai Criveti + contributors - -This suite fakes the "mcp" dependency tree so that no real network or -pydantic models are required and exercises almost every branch inside -*mcpgateway.wrapper*. -""" +"""Rewritten tests for mcpgateway.wrapper to achieve full coverage.""" import asyncio import json import sys import types +import errno import pytest +import contextlib import mcpgateway.wrapper as wrapper +# Ensure shutdown flag is clear before each test run +def setup_function(): + wrapper._shutdown.clear() + + # ------------------- # Utilities # ------------------- @@ -38,7 +36,7 @@ def test_make_error_defaults_and_data(): assert err2["error"]["code"] == -32099 -def test_setup_logging_on_and_off(caplog): +def test_setup_logging_on_and_off(): wrapper.setup_logging("DEBUG") assert wrapper.logger.disabled is False wrapper.logger.debug("hello debug") @@ -47,12 +45,10 @@ def test_setup_logging_on_and_off(caplog): def test_shutting_down_and_mark_shutdown(): - # Reset first wrapper._shutdown.clear() assert not wrapper.shutting_down() wrapper._mark_shutdown() assert wrapper.shutting_down() - # Reset again for further tests wrapper._shutdown.clear() assert not wrapper.shutting_down() @@ -64,37 +60,60 @@ def fake_write(s): captured.append(s) return len(s) - def fake_flush(): - return None - monkeypatch.setattr(sys.stdout, "write", fake_write) - monkeypatch.setattr(sys.stdout, "flush", fake_flush) + monkeypatch.setattr(sys.stdout, "flush", lambda: None) wrapper.send_to_stdout({"a": 1}) wrapper.send_to_stdout("plain text") - assert any('"a": 1' in s for s in captured) + assert any('"a": 1' in s or '"a":1' in s for s in captured) assert any("plain text" in s for s in captured) +def test_send_to_stdout_oserror(monkeypatch): + wrapper._shutdown.clear() + def bad_write(_): + raise OSError(errno.EPIPE, "broken pipe") + monkeypatch.setattr(sys.stdout, "write", bad_write) + monkeypatch.setattr(sys.stdout, "flush", lambda: None) + wrapper.send_to_stdout({"x": 1}) + assert wrapper.shutting_down() + + # ------------------- # Async stream parsers # ------------------- @pytest.mark.asyncio -async def test_ndjson_lines_basic(): +async def test_ndjson_lines_basic_and_tail(): + wrapper._shutdown.clear() + async def fake_iter_bytes(): + # basic multi-line + a final line without newline to test tail handling yield b'{"a":1}\n{"b":2}\n' + yield b'{"c":3}' + resp = types.SimpleNamespace(aiter_bytes=fake_iter_bytes) lines = [l async for l in wrapper.ndjson_lines(resp)] - assert lines == ['{"a":1}', '{"b":2}'] + # we expect three JSON-line strings (the last came as a tail) + assert '{"a":1}' in lines + assert '{"b":2}' in lines + assert '{"c":3}' in lines @pytest.mark.asyncio -async def test_sse_events_basic(): +async def test_sse_events_basic_and_tail(): + wrapper._shutdown.clear() + async def fake_iter_bytes(): - yield b"data: first\n\ndata: second\n\n" + # two events with proper separators, plus a tail-only chunk + yield b"data: first\n\n" + yield b"data: second\n\n" + yield b"data: tailonly\n\n" + resp = types.SimpleNamespace(aiter_bytes=fake_iter_bytes) events = [e async for e in wrapper.sse_events(resp)] - assert events == ["first", "second"] + assert "first" in events + assert "second" in events + assert "tailonly" in events # ------------------- @@ -122,3 +141,203 @@ def test_parse_args_with_env(monkeypatch): finally: sys.argv = sys_argv + +def test_parse_args_missing_url(monkeypatch): + # no env and no arg should exit with SystemExit + monkeypatch.delenv("MCP_SERVER_URL", raising=False) + sys_argv = sys.argv + sys.argv = ["prog"] + try: + with pytest.raises(SystemExit): + wrapper.parse_args() + finally: + sys.argv = sys_argv + + +# ------------------- +# stdin_reader +# ------------------- +@pytest.mark.asyncio +async def test_stdin_reader_valid_and_invalid(monkeypatch): + wrapper._shutdown.clear() + q = asyncio.Queue() + + # synchronous readline callable used by asyncio.to_thread + lines = iter(['{"ok":1}\n', '{bad json}\n', " \n", ""]) + + def fake_readline(): + try: + return next(lines) + except StopIteration: + return "" + + monkeypatch.setattr(sys.stdin, "readline", fake_readline) + + task = asyncio.create_task(wrapper.stdin_reader(q)) + + # Collect three items: valid dict, error dict for invalid json, and None for EOF + got1 = await asyncio.wait_for(q.get(), timeout=1) + got2 = await asyncio.wait_for(q.get(), timeout=1) + got3 = await asyncio.wait_for(q.get(), timeout=1) + + # first should be parsed dict + assert isinstance(got1, dict) and got1.get("ok") == 1 + # second should be an error object (from make_error) + assert isinstance(got2, dict) and "error" in got2 + # third should be None (EOF sentinel) + assert got3 is None + + task.cancel() + suppress = contextlib.suppress(Exception) + with suppress: + await wrapper.main_async(wrapper.Settings("x", None), DummyClient(DummyResp())) + + +# ------------------- +# forward_once (JSON / NDJSON / SSE / HTTP error) +# ------------------- +class DummyResp: + def __init__(self, status=200, ctype="application/json", body=b'{"ok":1}'): + self.status_code = status + # use dict-like headers as wrapper expects resp.headers.get(...) + self._headers = {"Content-Type": ctype} + self._body = body + + @property + def headers(self): + return self._headers + + async def aread(self): + # return full body for application/json path + return self._body + + # context manager to be used with `async with client.stream(...) as resp:` + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + return False + + async def aiter_bytes(self): + # yield the body as a single chunk (ndjson and sse parsers will process it) + yield self._body + + +class DummyClient: + def __init__(self, resp): + self._resp = resp + + def stream(self, *a, **k): + # returning the response instance which implements async context manager + return self._resp + + +@pytest.mark.asyncio +async def test_forward_once_json_and_invalid(monkeypatch): + wrapper._shutdown.clear() + captured = [] + monkeypatch.setattr(wrapper, "send_to_stdout", lambda obj: captured.append(obj)) + + # valid JSON response + client = DummyClient(DummyResp(200, "application/json", b'{"ok":123}')) + await wrapper.forward_once(client, wrapper.Settings("x", None), {"a": 1}) + assert any(isinstance(o, dict) and o.get("ok") == 123 for o in captured) + + # invalid JSON body (application/json but not JSON) + client = DummyClient(DummyResp(200, "application/json", b"notjson")) + await wrapper.forward_once(client, wrapper.Settings("x", None), {"b": 2}) + # should have produced an error object for invalid JSON response + assert any(isinstance(o, dict) and "error" in o for o in captured) + + +@pytest.mark.asyncio +async def test_forward_once_ndjson_and_sse_and_http_error(monkeypatch): + wrapper._shutdown.clear() + captured = [] + monkeypatch.setattr(wrapper, "send_to_stdout", lambda obj: captured.append(obj)) + + # ndjson: multiple JSON lines + ndj = b'{"x":1}\n{"y":2}\n' + client = DummyClient(DummyResp(200, "application/x-ndjson", ndj)) + await wrapper.forward_once(client, wrapper.Settings("x", None), {"z": 3}) + assert any(isinstance(d, dict) and ("x" in d or "y" in d) for d in captured) + + # sse: streaming events with data: lines containing JSON + sse_chunk = b'data: {"foo": 42}\n\n' + client = DummyClient(DummyResp(200, "text/event-stream", sse_chunk)) + await wrapper.forward_once(client, wrapper.Settings("x", None), {"w": 4}) + assert any(isinstance(d, dict) and d.get("foo") == 42 for d in captured) + + # http error (non-2xx) + client = DummyClient(DummyResp(500, "application/json", b"")) + await wrapper.forward_once(client, wrapper.Settings("x", None), {"e": 1}) + assert any(isinstance(d, dict) and "error" in d for d in captured) + + +# ------------------- +# make_request retry path +# ------------------- +@pytest.mark.asyncio +async def test_make_request_retries(monkeypatch): + wrapper._shutdown.clear() + called = {"n": 0} + + async def bad_forward(*a, **k): + called["n"] += 1 + raise RuntimeError("fail") + + monkeypatch.setattr(wrapper, "forward_once", bad_forward) + captured = [] + monkeypatch.setattr(wrapper, "send_to_stdout", lambda obj: captured.append(obj)) + + # small base_delay so test runs quickly + await wrapper.make_request(None, wrapper.Settings("x", None), {"a": 1}, max_retries=2, base_delay=0.001) + # forward_once should have been called multiple times + assert called["n"] >= 2 + # on exhausting retries, make_request sends a max retries error + assert any(isinstance(o, dict) and "error" in o for o in captured) + + +# ------------------- +# main_async smoke test +# ------------------- +@pytest.mark.asyncio +async def test_main_async_smoke(monkeypatch): + wrapper._shutdown.clear() + + async def fake_reader(queue): + await queue.put({"foo": "bar"}) + await queue.put(None) + + # simple make_request that just records calls + called = {"n": 0} + + async def fake_make_request(client, settings, payload): + called["n"] += 1 + # simulate small work + await asyncio.sleep(0) + + class DummyResilient: + def __init__(self, *a, **k): + pass + async def aclose(self): + return None + + monkeypatch.setattr(wrapper, "stdin_reader", fake_reader) + monkeypatch.setattr(wrapper, "make_request", fake_make_request) + monkeypatch.setattr(wrapper, "ResilientHttpClient", DummyResilient) + + settings = wrapper.Settings("http://x/mcp", None) + await wrapper.main_async(settings) + assert wrapper.shutting_down() or called["n"] >= 0 + + +# ------------------- +# _install_signal_handlers runs (no-op on unsupported platforms) +# ------------------- +def test_install_signal_handlers_runs(): + loop = asyncio.new_event_loop() + try: + wrapper._install_signal_handlers(loop) + finally: + loop.close() diff --git a/tests/unit/mcpgateway/utils/test_retry_manager.py b/tests/unit/mcpgateway/utils/test_retry_manager.py index 5ec7a3e89..cf072e92a 100644 --- a/tests/unit/mcpgateway/utils/test_retry_manager.py +++ b/tests/unit/mcpgateway/utils/test_retry_manager.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Standard from unittest.mock import AsyncMock, patch +from types import SimpleNamespace # Third-Party import httpx @@ -230,6 +231,32 @@ async def fake_sleep_with_jitter(base, jitter): assert delays == expected_delays +@pytest.mark.asyncio +async def test_stream_success(monkeypatch): + client = ResilientHttpClient(max_retries=3, base_backoff=0.1, max_delay=1, jitter_max=0) + + class AsyncContextManager: + async def __aenter__(self): + resp = SimpleNamespace( + status_code=200, + is_success=True, + aiter_bytes=lambda: asyncio.as_completed([b"data"]) + ) + return resp + + async def __aexit__(self, exc_type, exc, tb): + return False + + def mock_stream(*args, **kwargs): + # Return the async context manager instance directly (not coroutine) + return AsyncContextManager() + + monkeypatch.setattr(client.client, "stream", mock_stream) + + async with client.stream("GET", "http://example.com") as resp: + assert resp.status_code == 200 + assert resp.is_success + @pytest.mark.asyncio @pytest.mark.parametrize("code", [201, 204]) async def test_success_codes_not_in_lists(code): From d86dd21e4d86249db7301589d98da3c8e7bf0f5e Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Tue, 19 Aug 2025 10:43:09 +0530 Subject: [PATCH 5/7] added missing docstrings Signed-off-by: Keval Mahajan --- mcpgateway/wrapper.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/mcpgateway/wrapper.py b/mcpgateway/wrapper.py index 073c10cff..aac43b66e 100644 --- a/mcpgateway/wrapper.py +++ b/mcpgateway/wrapper.py @@ -408,6 +408,19 @@ async def forward_once( return async def _process_line(line: str): + """ + Asynchronously processes a single line of text, expected to be a JSON-encoded string. + + If the system is shutting down, the function returns immediately. + Otherwise, it attempts to parse the line as JSON and sends the resulting object to stdout. + If parsing fails, logs a warning and sends a standardized error response to stdout. + + Args: + line (str): A string that should contain a valid JSON object. + + Returns: + None + """ if shutting_down(): return try: @@ -568,6 +581,18 @@ async def main_async(settings: Settings) -> None: break async def _worker(payload=item): + """ + Executes an asynchronous request with concurrency control. + + Acquires a semaphore to limit the number of concurrent executions. + If the system is not shutting down, sends the given payload using `make_request`. + + Args: + payload (Any): The data to be sent in the request. Defaults to `item`. + + Returns: + None + """ async with sem: if not shutting_down(): await make_request(resilient, settings, payload) From caddc7476fdcea939ca82ec518d7c81cca028bbc Mon Sep 17 00:00:00 2001 From: Keval Mahajan Date: Tue, 19 Aug 2025 10:48:43 +0530 Subject: [PATCH 6/7] minor changes Signed-off-by: Keval Mahajan --- mcpgateway/wrapper.py | 3 --- tests/unit/mcpgateway/test_wrapper.py | 11 ++++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/mcpgateway/wrapper.py b/mcpgateway/wrapper.py index aac43b66e..8cd89c733 100644 --- a/mcpgateway/wrapper.py +++ b/mcpgateway/wrapper.py @@ -589,9 +589,6 @@ async def _worker(payload=item): Args: payload (Any): The data to be sent in the request. Defaults to `item`. - - Returns: - None """ async with sem: if not shutting_down(): diff --git a/tests/unit/mcpgateway/test_wrapper.py b/tests/unit/mcpgateway/test_wrapper.py index bf6df0d2c..81b95c91a 100644 --- a/tests/unit/mcpgateway/test_wrapper.py +++ b/tests/unit/mcpgateway/test_wrapper.py @@ -1,5 +1,14 @@ # -*- coding: utf-8 -*- -"""Rewritten tests for mcpgateway.wrapper to achieve full coverage.""" +"""Tests for the MCP *wrapper* module (single file, full coverage). + +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Mihai Criveti + contributors + +This suite fakes the "mcp" dependency tree so that no real network or +pydantic models are required and exercises almost every branch inside +*mcpgateway.wrapper*. +""" import asyncio import json From 8b470f5cc3842095eb11bac4e94348576979b4b9 Mon Sep 17 00:00:00 2001 From: Mihai Criveti Date: Wed, 20 Aug 2025 10:55:04 +0100 Subject: [PATCH 7/7] Rebase and test Signed-off-by: Mihai Criveti --- mcpgateway/admin.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/mcpgateway/admin.py b/mcpgateway/admin.py index 946fa104c..80a3c0978 100644 --- a/mcpgateway/admin.py +++ b/mcpgateway/admin.py @@ -5399,8 +5399,8 @@ async def admin_list_a2a_agents(
-

{agent['name']}

-

{agent['description']}

+

{agent["name"]}

+

{agent["description"]}

{active_text} @@ -5409,23 +5409,23 @@ async def admin_list_a2a_agents( {reachable_text} - {agent['agent_type']} + {agent["agent_type"]} - Auth: {agent['auth_type']} + Auth: {agent["auth_type"]}
-
Endpoint: {agent['endpoint_url']}
-
Executions: {agent['execution_count']} | Success Rate: {agent['success_rate']}
-
Created: {agent['created_at'][:19]}
+
Endpoint: {agent["endpoint_url"]}
+
Executions: {agent["execution_count"]} | Success Rate: {agent["success_rate"]}
+
Created: {agent["created_at"][:19]}
{last_interaction_html}
{tags_html}