Skip to content

Commit df7f1c8

Browse files
Add Listener, Web server close on header, use Pipe instead of Manager in eventing core (#720)
* Abstract out a Listener class * unused * Use connection instead of manager queue * For web close connection of client requested via headers * Remove eventing WIP module * Sub and Unsub ack * Fix tests * mypy and flake8 * comma * Move callback within EventSubscriber constructor * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Skip test_unix_path_listener on Windows * Spelling fix Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent df5a668 commit df7f1c8

29 files changed

+629
-402
lines changed

README.md

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,23 +128,35 @@
128128
- Made to handle `tens-of-thousands` connections / sec
129129

130130
```console
131-
# On Macbook Pro 2015 / 2.8 GHz Intel Core i7
132-
❯ hey -n 10000 -c 100 http://localhost:8899/
131+
# On Macbook Pro 2019 / 2.4 GHz 8-Core Intel Core i9 / 32 GB RAM
132+
❯ hey -n 10000 -c 100 http://localhost:8899/http-route-example
133133

134134
Summary:
135-
Total: 0.6157 secs
136-
Slowest: 0.1049 secs
137-
Fastest: 0.0007 secs
138-
Average: 0.0055 secs
139-
Requests/sec: 16240.5444
135+
Total: 0.3248 secs
136+
Slowest: 0.1007 secs
137+
Fastest: 0.0002 secs
138+
Average: 0.0028 secs
139+
Requests/sec: 30784.7958
140140

141-
Total data: 800000 bytes
142-
Size/request: 80 bytes
141+
Total data: 190000 bytes
142+
Size/request: 19 bytes
143143

144144
Response time histogram:
145-
0.001 [1] |
146-
0.011 [9565] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
147-
0.022 [332] |■
145+
0.000 [1] |
146+
0.010 [9533] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
147+
0.020 [384] |■■
148+
149+
Latency distribution:
150+
10% in 0.0004 secs
151+
25% in 0.0007 secs
152+
50% in 0.0013 secs
153+
75% in 0.0029 secs
154+
90% in 0.0057 secs
155+
95% in 0.0097 secs
156+
99% in 0.0185 secs
157+
158+
Status code distribution:
159+
[200] 100000 responses
148160
```
149161

150162
- Lightweight

examples/pubsub_eventing.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,38 @@
1212
import multiprocessing
1313
import logging
1414

15-
from typing import Dict, Any
15+
from typing import Dict, Any, Optional
1616

17+
from proxy.common.constants import DEFAULT_LOG_FORMAT
1718
from proxy.core.event import EventManager, EventQueue, EventSubscriber, eventNames
1819

19-
# Enable debug logging to view core event logs
20-
logging.basicConfig(level=logging.DEBUG)
20+
logging.basicConfig(level=logging.DEBUG, format=DEFAULT_LOG_FORMAT)
21+
22+
logger = logging.getLogger(__name__)
23+
2124

22-
main_publisher_request_id = '1234'
23-
process_publisher_request_id = '12345'
2425
num_events_received = [0, 0]
2526

26-
logger = logging.getLogger(__name__)
27+
28+
# Execute within a separate thread context
29+
def on_event(payload: Dict[str, Any]) -> None:
30+
'''Subscriber callback.'''
31+
global num_events_received
32+
if payload['request_id'] == '1234':
33+
num_events_received[0] += 1
34+
else:
35+
num_events_received[1] += 1
2736

2837

2938
def publisher_process(
3039
shutdown_event: multiprocessing.synchronize.Event,
3140
dispatcher_queue: EventQueue,
3241
) -> None:
33-
logger.info('publisher starting')
42+
logger.info('publisher started')
3443
try:
3544
while not shutdown_event.is_set():
3645
dispatcher_queue.publish(
37-
request_id=process_publisher_request_id,
46+
request_id='12345',
3847
event_name=eventNames.WORK_STARTED,
3948
event_payload={'time': time.time()},
4049
publisher_id='eventing_pubsub_process',
@@ -44,27 +53,19 @@ def publisher_process(
4453
logger.info('publisher shutdown')
4554

4655

47-
# Execute within a separate thread context
48-
def on_event(payload: Dict[str, Any]) -> None:
49-
'''Subscriber callback.'''
50-
global num_events_received
51-
if payload['request_id'] == main_publisher_request_id:
52-
num_events_received[0] += 1
53-
else:
54-
num_events_received[1] += 1
55-
56-
5756
if __name__ == '__main__':
5857
start_time = time.time()
58+
5959
# Start eventing core
60+
subscriber: Optional[EventSubscriber] = None
6061
with EventManager() as event_manager:
6162
assert event_manager.queue
6263

6364
# Create a subscriber.
6465
# Internally, subscribe will start a separate thread
6566
# to receive incoming published messages.
66-
subscriber = EventSubscriber(event_manager.queue)
67-
subscriber.subscribe(on_event)
67+
subscriber = EventSubscriber(event_manager.queue, callback=on_event)
68+
subscriber.setup()
6869

6970
# Start a publisher process to demonstrate safe exchange
7071
# of messages between processes.
@@ -81,22 +82,24 @@ def on_event(payload: Dict[str, Any]) -> None:
8182
try:
8283
while True:
8384
event_manager.queue.publish(
84-
request_id=main_publisher_request_id,
85+
request_id='1234',
8586
event_name=eventNames.WORK_STARTED,
8687
event_payload={'time': time.time()},
8788
publisher_id='eventing_pubsub_main',
8889
)
8990
except KeyboardInterrupt:
9091
logger.info('bye!!!')
9192
finally:
92-
# Stop publisher
93+
# Stop publisher process
9394
publisher_shutdown_event.set()
9495
publisher.join()
9596
# Stop subscriber thread
9697
subscriber.unsubscribe()
97-
logger.info(
98-
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
99-
num_events_received[0], num_events_received[1], time.time(
100-
) - start_time,
101-
),
102-
)
98+
logger.info(
99+
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
100+
num_events_received[0], num_events_received[1], time.time(
101+
) - start_time,
102+
),
103+
)
104+
if subscriber:
105+
subscriber.shutdown(do_unsubscribe=False)

proxy/common/flag.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ def initialize(
233233
IpAddress,
234234
opts.get('hostname', ipaddress.ip_address(args.hostname)),
235235
)
236+
args.unix_socket_path = opts.get(
237+
'unix_socket_path', args.unix_socket_path,
238+
)
236239
# AF_UNIX is not available on Windows
237240
# See https://bugs.python.org/issue33408
238241
if os.name != 'nt':

proxy/core/acceptor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
from .work import Work
1414
from .threadless import Threadless
1515
from .executors import ThreadlessPool
16+
from .listener import Listener
1617

1718
__all__ = [
1819
'Acceptor',
1920
'AcceptorPool',
2021
'Work',
2122
'Threadless',
2223
'ThreadlessPool',
24+
'Listener',
2325
]

proxy/core/acceptor/acceptor.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class Acceptor(multiprocessing.Process):
3434
"""Work acceptor process.
3535
3636
On start-up, `Acceptor` accepts a file descriptor which will be used to
37-
accept new work. File descriptor is accepted over a `work_queue` which is
37+
accept new work. File descriptor is accepted over a `fd_queue` which is
3838
closed immediately after receiving the descriptor.
3939
4040
`Acceptor` goes on to listen for new work over the received server socket.
@@ -55,7 +55,7 @@ class Acceptor(multiprocessing.Process):
5555
def __init__(
5656
self,
5757
idd: int,
58-
work_queue: connection.Connection,
58+
fd_queue: connection.Connection,
5959
flags: argparse.Namespace,
6060
lock: multiprocessing.synchronize.Lock,
6161
executor_queues: List[connection.Connection],
@@ -72,7 +72,7 @@ def __init__(
7272
# to avoid concurrent accept over server socket
7373
self.lock = lock
7474
# Queue over which server socket fd is received on start-up
75-
self.work_queue: connection.Connection = work_queue
75+
self.fd_queue: connection.Connection = fd_queue
7676
# Available executors
7777
self.executor_queues = executor_queues
7878
self.executor_pids = executor_pids
@@ -101,8 +101,11 @@ def run(self) -> None:
101101
self.flags.log_format,
102102
)
103103
self.selector = selectors.DefaultSelector()
104-
fileno = recv_handle(self.work_queue)
105-
self.work_queue.close()
104+
# TODO: Use selector on fd_queue so that we can
105+
# dynamically accept from new fds.
106+
fileno = recv_handle(self.fd_queue)
107+
self.fd_queue.close()
108+
# TODO: Convert to socks i.e. list of fds
106109
self.sock = socket.fromfd(
107110
fileno,
108111
family=self.flags.family,

proxy/core/acceptor/listener.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import os
12+
import socket
13+
import logging
14+
import argparse
15+
16+
from typing import Optional, Any
17+
18+
from ...common.flag import flags
19+
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME, DEFAULT_PORT
20+
21+
22+
flags.add_argument(
23+
'--backlog',
24+
type=int,
25+
default=DEFAULT_BACKLOG,
26+
help='Default: 100. Maximum number of pending connections to proxy server',
27+
)
28+
29+
flags.add_argument(
30+
'--hostname',
31+
type=str,
32+
default=str(DEFAULT_IPV6_HOSTNAME),
33+
help='Default: ::1. Server IP address.',
34+
)
35+
36+
flags.add_argument(
37+
'--port', type=int, default=DEFAULT_PORT,
38+
help='Default: 8899. Server port.',
39+
)
40+
41+
flags.add_argument(
42+
'--unix-socket-path',
43+
type=str,
44+
default=None,
45+
help='Default: None. Unix socket path to use. ' +
46+
'When provided --host and --port flags are ignored',
47+
)
48+
49+
logger = logging.getLogger(__name__)
50+
51+
52+
class Listener:
53+
54+
def __init__(self, flags: argparse.Namespace) -> None:
55+
self.flags = flags
56+
# Set after binding to a port.
57+
# Stored here separately because ephemeral ports can be used.
58+
self._port: Optional[int] = None
59+
self._socket: Optional[socket.socket] = None
60+
61+
def __enter__(self) -> 'Listener':
62+
self.setup()
63+
return self
64+
65+
def __exit__(self, *args: Any) -> None:
66+
self.shutdown()
67+
68+
def fileno(self) -> Optional[int]:
69+
if not self._socket:
70+
return None
71+
return self._socket.fileno()
72+
73+
def setup(self) -> None:
74+
if self.flags.unix_socket_path:
75+
self._listen_unix_socket()
76+
else:
77+
self._listen_server_port()
78+
if self.flags.unix_socket_path:
79+
logger.info(
80+
'Listening on %s' %
81+
self.flags.unix_socket_path,
82+
)
83+
else:
84+
logger.info(
85+
'Listening on %s:%s' %
86+
(self.flags.hostname, self._port),
87+
)
88+
89+
def shutdown(self) -> None:
90+
assert self._socket
91+
self._socket.close()
92+
if self.flags.unix_socket_path:
93+
os.remove(self.flags.unix_socket_path)
94+
95+
def _listen_unix_socket(self) -> None:
96+
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
97+
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
98+
self._socket.bind(self.flags.unix_socket_path)
99+
self._socket.listen(self.flags.backlog)
100+
self._socket.setblocking(False)
101+
102+
def _listen_server_port(self) -> None:
103+
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
104+
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
105+
self._socket.bind((str(self.flags.hostname), self.flags.port))
106+
self._socket.listen(self.flags.backlog)
107+
self._socket.setblocking(False)
108+
self._port = self._socket.getsockname()[1]

0 commit comments

Comments
 (0)