Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .proxy import entry_point
from .proxy import main, start
from .proxy import Proxy
from .testing.test_case import TestCase
from .testing import TestCase, ReplayTestCase

__all__ = [
# PyPi package entry_point. See
Expand All @@ -23,5 +23,6 @@
# Unit testing with proxy.py. See
# https://github.com/abhinavsingh/proxy.py#unit-testing-with-proxypy
'TestCase',
'ReplayTestCase',
'Proxy',
]
6 changes: 6 additions & 0 deletions proxy/common/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,9 @@ def __init__(
self.ca_cert_dir = os.path.join(
self.proxy_py_data_dir, 'certificates')
os.makedirs(self.ca_cert_dir, exist_ok=True)

def tls_interception_enabled(self) -> bool:
return self.ca_key_file is not None and \
self.ca_cert_dir is not None and \
self.ca_signing_key_file is not None and \
self.ca_cert_file is not None
4 changes: 2 additions & 2 deletions proxy/dashboard/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
@abstractmethod
def methods(self) -> List[str]:
"""Return list of methods that this plugin will handle."""
pass
pass # pragma: no cover

def connected(self) -> None:
"""Invoked when client websocket handshake finishes."""
Expand All @@ -43,7 +43,7 @@ def connected(self) -> None:
@abstractmethod
def handle_message(self, message: Dict[str, Any]) -> None:
"""Handle messages for registered methods."""
pass
pass # pragma: no cover

def disconnected(self) -> None:
"""Invoked when client websocket connection gets closed."""
Expand Down
41 changes: 30 additions & 11 deletions proxy/http/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import hashlib
from urllib import parse as urlparse
from typing import TypeVar, NamedTuple, Optional, Dict, Type, Tuple, List

Expand Down Expand Up @@ -45,11 +46,12 @@ def __init__(self, parser_type: int) -> None:
self.type: int = parser_type
self.state: int = httpParserStates.INITIALIZED

# Total size of raw bytes passed for parsing
self.total_size: int = 0

# Buffer to hold unprocessed bytes
self.buffer: bytes = b''
# These properties cleans up developer APIs. Python urlparse.urlsplit behaves
# differently for proxy request and web request. Web request is the one
# which is broken.
self.host: Optional[bytes] = None
self.port: Optional[int] = None
self.path: Optional[bytes] = None

self.headers: Dict[bytes, Tuple[bytes, bytes]] = dict()
self.body: Optional[bytes] = None
Expand All @@ -62,12 +64,18 @@ def __init__(self, parser_type: int) -> None:

self.chunk_parser: Optional[ChunkParser] = None

# This cleans up developer APIs as Python urlparse.urlsplit behaves differently
# for incoming proxy request and incoming web request. Web request is the one
# which is broken.
self.host: Optional[bytes] = None
self.port: Optional[int] = None
self.path: Optional[bytes] = None
# Total size of raw bytes passed for parsing
self.total_size: int = 0

# Buffer to hold unprocessed bytes
self.buffer: bytes = b''

# Hash which is updated as request gets parsed
#
# TODO(abhinavsingh): This currently is a requirement
# only when cache plugin is in use, otherwise, this
# will only increase CPU & RAM usage.
self.hash: hashlib._Hash = hashlib.sha512()

@classmethod
def request(cls: Type[T], raw: bytes) -> T:
Expand All @@ -81,6 +89,15 @@ def response(cls: Type[T], raw: bytes) -> T:
parser.parse(raw)
return parser

def fingerprint(self) -> str:
"""Returns a fingerprint unique for the contents in this request.

Ideally must only be used once request has finished processing.
Otherwise, returned fingerprint will be unique for the partial
request being processed.
"""
return self.hash.hexdigest()

def header(self, key: bytes) -> bytes:
if key.lower() not in self.headers:
raise KeyError('%s not found in headers', text_(key))
Expand Down Expand Up @@ -142,6 +159,8 @@ def parse(self, raw: bytes) -> None:
"""Parses Http request out of raw bytes.

Check HttpParser state after parse has successfully returned."""
self.hash.update(raw)

self.total_size += len(raw)
raw = self.buffer + raw
self.buffer = b''
Expand Down
14 changes: 5 additions & 9 deletions proxy/http/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ def __init__(
self.event_queue)
self.plugins[instance.name()] = instance

def tls_interception_enabled(self) -> bool:
return self.flags.ca_key_file is not None and \
self.flags.ca_cert_dir is not None and \
self.flags.ca_signing_key_file is not None and \
self.flags.ca_cert_file is not None

def get_descriptors(
self) -> Tuple[List[socket.socket], List[socket.socket]]:
if not self.request.has_upstream_server():
Expand Down Expand Up @@ -216,7 +210,7 @@ def read_from_descriptors(self, r: Readables) -> bool:
# See https://github.com/abhinavsingh/proxy.py/issues/127 for why
# currently response parsing is disabled when TLS interception is enabled.
#
# or self.tls_interception_enabled():
# or self.flags.tls_interception_enabled():
if self.response.state == httpParserStates.COMPLETE:
self.handle_pipeline_response(raw)
else:
Expand Down Expand Up @@ -279,12 +273,14 @@ def on_client_data(self, raw: memoryview) -> Optional[memoryview]:
if self.server and not self.server.closed:
if self.request.state == httpParserStates.COMPLETE and (
self.request.method != httpMethods.CONNECT or
self.tls_interception_enabled()):
self.flags.tls_interception_enabled()):
if self.pipeline_request is not None and \
self.pipeline_request.is_connection_upgrade():
# Previous pipelined request was a WebSocket
# upgrade request. Incoming client data now
# must be treated as WebSocket protocol packets.
#
# TODO(abhinavsingh): Parse websocket frames here.
self.server.queue(raw)
return None

Expand Down Expand Up @@ -349,7 +345,7 @@ def on_request_complete(self) -> Union[socket.socket, bool]:
self.client.queue(
HttpProxyPlugin.PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
# If interception is enabled
if self.tls_interception_enabled():
if self.flags.tls_interception_enabled():
# Perform SSL/TLS handshake with upstream
self.wrap_server()
# Generate certificate and perform handshake with client
Expand Down
135 changes: 55 additions & 80 deletions proxy/plugin/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,131 +9,106 @@
:license: BSD, see LICENSE for more details.
"""
import logging
import multiprocessing
from proxy.common.constants import CRLF
from typing import Optional, Any

from .store.base import CacheStore
from ...http.parser import HttpParser, httpParserTypes
from ...http.parser import HttpParser
from ...http.proxy import HttpProxyBasePlugin
from ...http.codes import httpStatusCodes
from ...common.constants import PROXY_AGENT_HEADER_VALUE
from ...common.utils import text_
from ...common.utils import build_http_response

logger = logging.getLogger(__name__)


class BaseCacheResponsesPlugin(HttpProxyBasePlugin):
"""Base cache plugin.

It requires a storage backend to work with. Storage class
must implement CacheStore interface.

Different storage backends can be used per request if required.
Cache plugin requires a storage backend to work with.
Storage class must implement this interface.
"""

class EnabledDescriptor:
def __init__(self, enabled: bool = False) -> None:
self.enabled = multiprocessing.Event()
if enabled:
self.enabled.set()

def __get__(self, obj: Optional[object], owner: type) -> Any:
if obj is None:
return self.enabled
return None

# Dynamically enable / disable cache
enabled = EnabledDescriptor(True)

def __init__(
self,
*args: Any,
**kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.store: Optional[CacheStore] = None
self.__enabled: Optional[bool] = None

# Multiple requests can flow over a single connection.
# Example, CONNECT followed by one or many HTTP requests.
#
# Scheme is stored as attribute as request
# object is not available across all lifecycle
# callbacks.
self.scheme: bytes = b'http'

def set_store(self, store: CacheStore) -> None:
self.store = store

def before_upstream_connection(
self, request: HttpParser) -> Optional[HttpParser]:
self.__enabled = self.__class__.enabled.is_set()
if not self.__enabled:
return request
"""Avoid connection with upstream server if cached response exists.

assert self.store
logger.info("Upstream connexion %s:%d %s" %
(text_(request.host), request.port if request.port else 0, text_(request.path)))
Disabled for https request when running without TLS interception.
"""
assert request.url is not None

# Store request scheme
self.scheme = request.url.scheme

if request.port == 443:
# Cache plugin is enabled for HTTPS request only when
# TLS interception is also enabled.
if self.scheme == b'https' and not self.flags.tls_interception_enabled():
return request

try:
if self.store.is_cached(request):
return None
except Exception as e:
logger.info('Caching disabled due to exception message: %s', str(e))
# Cache plugin is a no-op for CONNECT requests
# i.e. we don't cache CONNECT responses. However,
# we do skip upstream connection
#
# See https://github.com/abhinavsingh/proxy.py/issues/443
# if request.method == b'CONNECT':
# return None

assert self.store
if self.store.is_cached(request):
return None
return request

def handle_client_request(
self, request: HttpParser) -> Optional[HttpParser]:
assert self.__enabled is not None
if not self.__enabled:
return request

assert self.store
logger.info("Client request %s:%d %s" %
(text_(request.host), request.port if request.port else 0, text_(request.path)))
"""If cached response exists, return response from cache."""
assert request.url is not None

if request.port == 443:
# Cache plugin is enabled for HTTPS request only when
# TLS interception is also enabled.
if request.url.scheme == b'https' and not self.flags.tls_interception_enabled():
return request

try:
msg = self.store.cache_request(request)
if (msg.type == httpParserTypes.REQUEST_PARSER):
return msg
elif (msg.type == httpParserTypes.RESPONSE_PARSER):
self.client.queue(memoryview(build_http_response(
int(msg.code) if msg.code is not None else 0,
reason=msg.reason,
headers={k: v for k, v in msg.headers.values()},
body=msg.body
)))
return None
else:
raise ValueError('Bad HTTPParser type: %s' % msg.type)
except Exception as e:
logger.info('Caching disabled due to exception message: %s', str(e))

try:
if self.store.is_cached(request):
self.client.queue(memoryview(build_http_response(
httpStatusCodes.INTERNAL_SERVER_ERROR,
reason=b'Internal server error',
headers={
b'Server': PROXY_AGENT_HEADER_VALUE,
b'Connection': b'close',
}
)))
except Exception as e:
logger.info('Caching disabled due to exception message: %s', str(e))

assert self.store
if self.store.is_cached(request):
logger.info("Serving out of cache")
try:
self.store.open(request)
response = self.store.read_response(request)
self.client.queue(memoryview(response.build_response()))
finally:
self.store.close()
return None
# Request not cached, open store for writes
self.store.open(request)
return request

def handle_upstream_chunk(self, chunk: memoryview) -> memoryview:
assert self.__enabled is not None
if not self.__enabled:
if self.scheme == b'https' and not self.flags.tls_interception_enabled():
return chunk

assert self.store
return self.store.cache_response_chunk(chunk)
chunk = self.store.cache_response_chunk(chunk)
if chunk.tobytes().endswith(CRLF * 2):
self.store.close()
return chunk

def on_upstream_connection_close(self) -> None:
assert self.__enabled is not None
if not self.__enabled:
if self.scheme == b'https' and not self.flags.tls_interception_enabled():
return

assert self.store
Expand Down
15 changes: 12 additions & 3 deletions proxy/plugin/cache/cache_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,27 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import tempfile
import os
from typing import Any

from .store.disk import OnDiskCacheStore
from .base import BaseCacheResponsesPlugin


class CacheResponsesPlugin(BaseCacheResponsesPlugin):
"""Caches response using OnDiskCacheStore."""
"""Pluggable caches response plugin.

Defaults to OnDiskCacheStore.

Different storage backends may be used per request if required.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.disk_store = OnDiskCacheStore(
uid=self.uid, cache_dir=tempfile.gettempdir())
uid=self.uid, cache_dir=self.cache_directory())
self.set_store(self.disk_store)

def cache_directory(self) -> str:
"""TODO(abhinavsingh): Turn this into a flag."""
return os.path.join(self.flags.proxy_py_data_dir, 'cache')
Loading