Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions SECURITY_VALIDATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Gateway-Level Input Validation & Output Sanitization

This document describes the experimental security validation and sanitization features in MCP Gateway.

## Overview

The MCP Gateway includes an experimental validation layer that provides:

- **Input Validation**: Validates all inbound parameters (tool args, resource URIs, prompt vars)
- **Output Sanitization**: Sanitizes all outbound payloads before delivery
- **Path Traversal Defense**: Normalizes and confines resource paths to declared roots
- **Shell Injection Prevention**: Escapes or rejects dangerous shell metacharacters
- **SQL Injection Protection**: Validates parameters for SQL injection patterns

## Configuration

Enable experimental validation by setting:

```bash
EXPERIMENTAL_VALIDATE_IO=true
VALIDATION_STRICT=true # Reject on violations (default: true)
SANITIZE_OUTPUT=true # Sanitize output (default: true)
ALLOWED_ROOTS="/srv/data,/tmp" # Allowed root paths for resources
MAX_PATH_DEPTH=10 # Maximum path depth (default: 10)
MAX_PARAM_LENGTH=10000 # Maximum parameter length (default: 10000)
```

## Validation Rules

### Path Traversal Defense

Resource paths are validated against:
- Path traversal patterns (`../`, `..\\`)
- Allowed root directories
- Maximum path depth

Example:
```python
# BLOCKED: Path traversal
"/srv/data/../../etc/passwd"

# ALLOWED: Within allowed root
"/srv/data/file.txt"
```

### Dangerous Parameter Validation

Parameters are checked for:
- Shell metacharacters: `;`, `&`, `|`, `` ` ``, `$`, `()`, `{}`, `[]`, `<>`
- SQL injection patterns: quotes, comments, SQL keywords
- Control characters: ASCII 0x00-0x1F, 0x7F-0x9F

### Output Sanitization

All text output is sanitized to remove:
- Control characters (except newlines and tabs)
- Escape sequences that could affect terminals
- Invalid UTF-8 sequences

## Security Patterns

### Tool Parameter Validation

```python
from mcpgateway.validators import SecurityValidator

# Validate shell parameters
safe_filename = SecurityValidator.validate_shell_parameter("file.txt")

# Validate SQL parameters
safe_query = SecurityValidator.validate_sql_parameter("user input")

# Validate parameter length
SecurityValidator.validate_parameter_length(value, max_length=1000)
```

### Resource Path Validation

```python
# Validate and normalize paths
safe_path = SecurityValidator.validate_path(
"/srv/data/file.txt",
allowed_roots=["/srv/data"]
)
```

### Output Sanitization

```python
from mcpgateway.validators import OutputSanitizer

# Sanitize text output
clean_text = OutputSanitizer.sanitize_text("Hello\x1b[31mWorld")
# Result: "HelloWorld"

# Sanitize JSON responses
clean_data = OutputSanitizer.sanitize_json_response({
"message": "Hello\x00World",
"items": ["test\x1f", "clean"]
})
```

## Validation Modes

### Strict Mode (Default)
- Rejects requests with dangerous patterns
- Returns HTTP 422 validation errors
- Logs all violations

### Non-Strict Mode
- Attempts to sanitize dangerous input
- Logs warnings for violations
- Continues processing when possible

## Error Responses

Validation failures return structured errors:

```json
{
"detail": "Parameter filename contains dangerous characters",
"type": "validation_error",
"code": "dangerous_input"
}
```

## Performance Impact

The validation middleware adds minimal overhead:
- ~1-2ms per request for parameter validation
- ~0.5ms per response for output sanitization
- Regex compilation is cached for performance

## Testing

Run validation tests:

```bash
pytest tests/security/test_validation.py -v
```

## Limitations

Current limitations of the experimental validation:
- Binary content validation is basic
- Some legitimate use cases may be blocked
- Performance impact on large payloads
- Limited to common attack patterns

## Future Enhancements

Planned improvements:
- Machine learning-based anomaly detection
- Configurable validation rules per tool
- Integration with external security scanners
- Support for custom validation plugins

## Security Considerations

This validation layer provides defense-in-depth but should not be the only security measure:

- Always use proper authentication and authorization
- Implement rate limiting and request throttling
- Monitor and log all security events
- Keep the gateway and dependencies updated
- Use network-level security controls

## Reporting Issues

If you find security issues or false positives, please report them following our Security Policy.
196 changes: 196 additions & 0 deletions mcpgateway/common/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@
# Standard
import html
import logging
from pathlib import Path
import re
import shlex
from typing import Any, List, Optional
from urllib.parse import urlparse
import uuid

# First-Party
from mcpgateway.common.config import settings
from mcpgateway.config import settings as config_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1188,3 +1192,195 @@ def validate_mime_type(cls, value: str) -> str:
raise ValueError(f"MIME type '{value}' is not in the allowed list")

return value

@classmethod
def validate_shell_parameter(cls, value: str) -> str:
"""Validate and escape shell parameters to prevent command injection.

Args:
value (str): Shell parameter to validate

Returns:
str: Validated/escaped parameter

Raises:
ValueError: If parameter contains dangerous characters in strict mode

Examples:
>>> SecurityValidator.validate_shell_parameter('safe_param')
'safe_param'
>>> SecurityValidator.validate_shell_parameter('param with spaces')
'param with spaces'
"""
if not isinstance(value, str):
raise ValueError("Parameter must be string")

# Check for dangerous patterns
dangerous_chars = re.compile(r"[;&|`$(){}\[\]<>]")
if dangerous_chars.search(value):
# Check if validation is strict
strict_mode = getattr(settings, "validation_strict", True)
if strict_mode:
raise ValueError("Parameter contains shell metacharacters")
# In non-strict mode, escape using shlex
return shlex.quote(value)

return value

@classmethod
def validate_path(cls, path: str, allowed_roots: Optional[List[str]] = None) -> str:
"""Validate and normalize file paths to prevent directory traversal.

Args:
path (str): File path to validate
allowed_roots (Optional[List[str]]): List of allowed root directories

Returns:
str: Validated and normalized path

Raises:
ValueError: If path contains traversal attempts or is outside allowed roots

Examples:
>>> SecurityValidator.validate_path('/safe/path')
'/safe/path'
>>> SecurityValidator.validate_path('http://example.com/file')
'http://example.com/file'
"""
if not isinstance(path, str):
raise ValueError("Path must be string")

# Skip validation for URI schemes (http://, plugin://, etc.)
if re.match(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://", path):
return path

try:
p = Path(path)
# Check for path traversal
if ".." in p.parts:
raise ValueError("Path traversal detected")

resolved_path = p.resolve()

# Check against allowed roots
if allowed_roots:
allowed = any(str(resolved_path).startswith(str(Path(root).resolve())) for root in allowed_roots)
if not allowed:
raise ValueError("Path outside allowed roots")

return str(resolved_path)
except (OSError, ValueError) as e:
raise ValueError(f"Invalid path: {e}")

@classmethod
def validate_sql_parameter(cls, value: str) -> str:
"""Validate SQL parameters to prevent SQL injection attacks.

Args:
value (str): SQL parameter to validate

Returns:
str: Validated/escaped parameter

Raises:
ValueError: If parameter contains SQL injection patterns in strict mode

Examples:
>>> SecurityValidator.validate_sql_parameter('safe_value')
'safe_value'
>>> SecurityValidator.validate_sql_parameter('123')
'123'
"""
if not isinstance(value, str):
return value

# Check for SQL injection patterns
sql_patterns = [
r"[';\"\\]", # Quote characters
r"--", # SQL comments
r"/\\*.*?\\*/", # Block comments
r"\\b(union|select|insert|update|delete|drop|exec|execute)\\b", # SQL keywords
]

for pattern in sql_patterns:
if re.search(pattern, value, re.IGNORECASE):
if getattr(config_settings, "validation_strict", True):
raise ValueError("Parameter contains SQL injection patterns")
# Basic escaping
value = value.replace("'", "''").replace('"', '""')

return value

@classmethod
def validate_parameter_length(cls, value: str, max_length: int = None) -> str:
"""Validate parameter length against configured limits.

Args:
value (str): Parameter to validate
max_length (int): Maximum allowed length

Returns:
str: Parameter if within length limits

Raises:
ValueError: If parameter exceeds maximum length

Examples:
>>> SecurityValidator.validate_parameter_length('short', 10)
'short'
"""
max_len = max_length or getattr(config_settings, "max_param_length", 10000)
if len(value) > max_len:
raise ValueError(f"Parameter exceeds maximum length of {max_len}")
return value

@classmethod
def sanitize_text(cls, text: str) -> str:
"""Remove control characters and ANSI escape sequences from text.

Args:
text (str): Text to sanitize

Returns:
str: Sanitized text with control characters removed

Examples:
>>> SecurityValidator.sanitize_text('Hello World')
'Hello World'
>>> SecurityValidator.sanitize_text('Text\x1b[31mwith\x1b[0mcolors')
'Textwithcolors'
"""
if not isinstance(text, str):
return text

# Remove ANSI escape sequences
text = re.sub(r"\x1B\[[0-9;]*[A-Za-z]", "", text)
# Remove control characters except newlines and tabs
sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]", "", text)
return sanitized

@classmethod
def sanitize_json_response(cls, data: Any) -> Any:
"""Recursively sanitize JSON response data by removing control characters.

Args:
data (Any): JSON data structure to sanitize

Returns:
Any: Sanitized data structure with same type as input

Examples:
>>> SecurityValidator.sanitize_json_response('clean text')
'clean text'
>>> SecurityValidator.sanitize_json_response({'key': 'value'})
{'key': 'value'}
>>> SecurityValidator.sanitize_json_response(['item1', 'item2'])
['item1', 'item2']
"""
if isinstance(data, str):
return cls.sanitize_text(data)
if isinstance(data, dict):
return {k: cls.sanitize_json_response(v) for k, v in data.items()}
if isinstance(data, list):
return [cls.sanitize_json_response(item) for item in data]
return data
Loading
Loading