Skip to content

Commit 2bebbac

Browse files
authored
[Work] kwargs independent work_klass creation and work core (#1051)
* `kwargs` independent work klass and core * Fix tests * Add a `create` method to base class * Lint fixes
1 parent b49fcbd commit 2bebbac

File tree

17 files changed

+181
-53
lines changed

17 files changed

+181
-53
lines changed

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2265,8 +2265,8 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
22652265
[--tunnel-username TUNNEL_USERNAME]
22662266
[--tunnel-ssh-key TUNNEL_SSH_KEY]
22672267
[--tunnel-ssh-key-passphrase TUNNEL_SSH_KEY_PASSPHRASE]
2268-
[--tunnel-remote-port TUNNEL_REMOTE_PORT] [--enable-events]
2269-
[--threadless] [--threaded] [--num-workers NUM_WORKERS]
2268+
[--tunnel-remote-port TUNNEL_REMOTE_PORT] [--threadless]
2269+
[--threaded] [--num-workers NUM_WORKERS] [--enable-events]
22702270
[--local-executor LOCAL_EXECUTOR] [--backlog BACKLOG]
22712271
[--hostname HOSTNAME] [--port PORT] [--ports PORTS [PORTS ...]]
22722272
[--port-file PORT_FILE] [--unix-socket-path UNIX_SOCKET_PATH]
@@ -2294,7 +2294,7 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
22942294
[--filtered-client-ips FILTERED_CLIENT_IPS]
22952295
[--filtered-url-regex-config FILTERED_URL_REGEX_CONFIG]
22962296

2297-
proxy.py v2.4.0rc8.dev7+g1871027
2297+
proxy.py v2.4.0rc8.dev17+g59a4335.d20220123
22982298

22992299
options:
23002300
-h, --help show this help message and exit
@@ -2313,9 +2313,6 @@ options:
23132313
--tunnel-remote-port TUNNEL_REMOTE_PORT
23142314
Default: 8899. Remote port which will be forwarded
23152315
locally for proxy.
2316-
--enable-events Default: False. Enables core to dispatch lifecycle
2317-
events. Plugins can be used to subscribe for core
2318-
events.
23192316
--threadless Default: True. Enabled by default on Python 3.8+ (mac,
23202317
linux). When disabled a new thread is spawned to
23212318
handle each client connection.
@@ -2324,6 +2321,9 @@ options:
23242321
handle each client connection.
23252322
--num-workers NUM_WORKERS
23262323
Defaults to number of CPU cores.
2324+
--enable-events Default: False. Enables core to dispatch lifecycle
2325+
events. Plugins can be used to subscribe for core
2326+
events.
23272327
--local-executor LOCAL_EXECUTOR
23282328
Default: 1. Enabled by default. Use 0 to disable. When
23292329
enabled acceptors will make use of local (same
@@ -2422,8 +2422,9 @@ options:
24222422
Default: proxy.http.proxy.auth.AuthPlugin. Auth plugin
24232423
to use instead of default basic auth plugin.
24242424
--cache-dir CACHE_DIR
2425-
Default: A temporary directory. Flag only applicable
2426-
when cache plugin is used with on-disk storage.
2425+
Default: /Users/abhinavsingh/.proxy/cache. Flag only
2426+
applicable when cache plugin is used with on-disk
2427+
storage.
24272428
--proxy-pool PROXY_POOL
24282429
List of upstream proxies to use in the pool
24292430
--enable-web-server Default: False. Whether to enable

examples/ssl_echo_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ class EchoSSLServerHandler(BaseTcpServerHandler[TcpClientConnection]):
2121
"""Wraps client socket during initialization."""
2222

2323
@staticmethod
24-
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
25-
return TcpClientConnection(**kwargs)
24+
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
25+
return TcpClientConnection(*args)
2626

2727
def initialize(self) -> None:
2828
# Acceptors don't perform TLS handshake. Perform the same

examples/task.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
import argparse
13+
import threading
14+
import multiprocessing
15+
from typing import Any
16+
17+
from proxy.core.work import (
18+
Work, ThreadlessPool, BaseLocalExecutor, BaseRemoteExecutor,
19+
)
20+
from proxy.common.flag import FlagParser
21+
from proxy.common.backports import NonBlockingQueue
22+
23+
24+
class Task:
25+
"""This will be our work object."""
26+
27+
def __init__(self, payload: bytes) -> None:
28+
self.payload = payload
29+
print(payload)
30+
31+
32+
class TaskWork(Work[Task]):
33+
"""This will be our handler class, created for each received work."""
34+
35+
@staticmethod
36+
def create(*args: Any) -> Task:
37+
"""Work core doesn't know how to create work objects for us, so
38+
we must provide an implementation of create method here."""
39+
return Task(*args)
40+
41+
42+
class LocalTaskExecutor(BaseLocalExecutor):
43+
"""We'll define a local executor which is capable of receiving
44+
log lines over a non blocking queue."""
45+
46+
def work(self, *args: Any) -> None:
47+
task_id = int(time.time())
48+
uid = '%s-%s' % (self.iid, task_id)
49+
self.works[task_id] = self.create(uid, *args)
50+
51+
52+
class RemoteTaskExecutor(BaseRemoteExecutor):
53+
54+
def work(self, *args: Any) -> None:
55+
task_id = int(time.time())
56+
uid = '%s-%s' % (self.iid, task_id)
57+
self.works[task_id] = self.create(uid, *args)
58+
59+
60+
def start_local(flags: argparse.Namespace) -> None:
61+
work_queue = NonBlockingQueue()
62+
executor = LocalTaskExecutor(iid=1, work_queue=work_queue, flags=flags)
63+
64+
t = threading.Thread(target=executor.run)
65+
t.daemon = True
66+
t.start()
67+
68+
try:
69+
i = 0
70+
while True:
71+
work_queue.put(('%d' % i).encode('utf-8'))
72+
i += 1
73+
except KeyboardInterrupt:
74+
pass
75+
finally:
76+
executor.running.set()
77+
t.join()
78+
79+
80+
def start_remote(flags: argparse.Namespace) -> None:
81+
pipe = multiprocessing.Pipe()
82+
work_queue = pipe[0]
83+
executor = RemoteTaskExecutor(iid=1, work_queue=pipe[1], flags=flags)
84+
85+
p = multiprocessing.Process(target=executor.run)
86+
p.daemon = True
87+
p.start()
88+
89+
try:
90+
i = 0
91+
while True:
92+
work_queue.send(('%d' % i).encode('utf-8'))
93+
i += 1
94+
except KeyboardInterrupt:
95+
pass
96+
finally:
97+
executor.running.set()
98+
p.join()
99+
100+
101+
def start_remote_pool(flags: argparse.Namespace) -> None:
102+
with ThreadlessPool(flags=flags, executor_klass=RemoteTaskExecutor) as pool:
103+
try:
104+
i = 0
105+
while True:
106+
work_queue = pool.work_queues[i % flags.num_workers]
107+
work_queue.send(('%d' % i).encode('utf-8'))
108+
i += 1
109+
except KeyboardInterrupt:
110+
pass
111+
112+
113+
if __name__ == '__main__':
114+
flags = FlagParser.initialize(
115+
['--disable-http-proxy'],
116+
work_klass=TaskWork,
117+
)
118+
start_remote_pool(flags)
119+
# start_remote(flags)
120+
# start_local(flags)

examples/tcp_echo_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class EchoServerHandler(BaseTcpServerHandler[TcpClientConnection]):
2020
"""Sets client socket to non-blocking during initialization."""
2121

2222
@staticmethod
23-
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
24-
return TcpClientConnection(**kwargs)
23+
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
24+
return TcpClientConnection(*args)
2525

2626
def initialize(self) -> None:
2727
self.work.connection.setblocking(False)

proxy/core/base/tcp_tunnel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
4747
pass # pragma: no cover
4848

4949
@staticmethod
50-
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
51-
return TcpClientConnection(**kwargs)
50+
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
51+
return TcpClientConnection(*args)
5252

5353
def initialize(self) -> None:
5454
self.work.connection.setblocking(False)

proxy/core/connection/pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def __init__(self) -> None:
7676
self.pools: Dict[HostPort, Set[TcpServerConnection]] = {}
7777

7878
@staticmethod
79-
def create(**kwargs: Any) -> TcpServerConnection: # pragma: no cover
80-
return TcpServerConnection(**kwargs)
79+
def create(*args: Any) -> TcpServerConnection: # pragma: no cover
80+
return TcpServerConnection(*args)
8181

8282
def acquire(self, addr: HostPort) -> Tuple[bool, TcpServerConnection]:
8383
"""Returns a reusable connection from the pool.
@@ -154,7 +154,7 @@ def add(self, addr: HostPort) -> TcpServerConnection:
154154
155155
NOTE: You must not use the returned connection, instead use `acquire`.
156156
"""
157-
new_conn = self.create(host=addr[0], port=addr[1])
157+
new_conn = self.create(addr[0], addr[1])
158158
new_conn.connect()
159159
self._add(new_conn)
160160
logger.debug(

proxy/core/work/fd/fd.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,16 @@ class ThreadlessFdExecutor(Threadless[T]):
2626
"""A threadless executor which handles file descriptors
2727
and works with read/write events over a socket."""
2828

29-
def work(self, **kwargs: Any) -> None:
30-
fileno: int = kwargs['fileno']
31-
addr: Optional[HostPort] = kwargs.get('addr', None)
32-
conn: Optional[TcpOrTlsSocket] = \
33-
kwargs.get('conn', None)
29+
def work(self, *args: Any) -> None:
30+
fileno: int = args[0]
31+
addr: Optional[HostPort] = args[1]
32+
conn: Optional[TcpOrTlsSocket] = args[2]
3433
conn = conn or socket.fromfd(
3534
fileno, family=socket.AF_INET if self.flags.hostname.version == 4 else socket.AF_INET6,
3635
type=socket.SOCK_STREAM,
3736
)
3837
uid = '%s-%s-%s' % (self.iid, self._total, fileno)
39-
self.works[fileno] = self.flags.work_klass(
40-
self.flags.work_klass.create(
41-
conn=conn,
42-
addr=addr,
43-
),
44-
flags=self.flags,
45-
event_queue=self.event_queue,
46-
uid=uid,
47-
upstream_conn_pool=self._upstream_conn_pool,
48-
)
38+
self.works[fileno] = self.create(uid, conn, addr)
4939
self.works[fileno].publish_event(
5040
event_name=eventNames.WORK_STARTED,
5141
event_payload={'fileno': fileno, 'addr': addr},

proxy/core/work/fd/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ def initialize(self, work: Any) -> None:
4747
# NOTE: Here we are assuming to receive a connection object
4848
# and not a fileno because we are a LocalExecutor.
4949
fileno = conn.fileno()
50-
self.work(fileno=fileno, addr=addr, conn=conn)
50+
self.work(fileno, addr, conn)

proxy/core/work/fd/remote.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def receive_from_work_queue(self) -> bool:
4343
if not self.flags.unix_socket_path:
4444
addr = self.work_queue.recv()
4545
fileno = recv_handle(self.work_queue)
46-
self.work(fileno=fileno, addr=addr)
46+
self.work(fileno, addr, None)
4747
return False
4848

4949
def work_queue_fileno(self) -> Optional[int]:

proxy/core/work/local.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
:copyright: (c) 2013-present by Abhinav Singh and contributors.
99
:license: BSD, see LICENSE for more details.
1010
"""
11+
import queue
1112
import asyncio
13+
import contextlib
1214
from typing import Any, Optional
1315

1416
from .threadless import Threadless
@@ -30,3 +32,11 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:
3032

3133
def work_queue_fileno(self) -> Optional[int]:
3234
return None
35+
36+
def receive_from_work_queue(self) -> bool:
37+
with contextlib.suppress(queue.Empty):
38+
work = self.work_queue.get()
39+
if isinstance(work, bool) and work is False:
40+
return True
41+
self.work(work)
42+
return False

0 commit comments

Comments
 (0)