Skip to content

Commit c3b06f1

Browse files
authored
[Work] Refactor as FD work (#1049)
* Abstract out FD based work implementation * No need of local/remote abstractions * fix type * Add `BaseLocalExecutor` * Fix lint and tests
1 parent ac840ae commit c3b06f1

File tree

13 files changed

+112
-61
lines changed

13 files changed

+112
-61
lines changed

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,5 +322,5 @@
322322
(_py_obj_role, 'proxy.core.work.threadless.T'),
323323
(_py_obj_role, 'proxy.core.work.work.T'),
324324
(_py_obj_role, 'proxy.core.base.tcp_server.T'),
325-
(_py_obj_role, 'proxy.core.work.fd.T'),
325+
(_py_obj_role, 'proxy.core.work.fd.fd.T'),
326326
]

proxy/core/acceptor/acceptor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
from multiprocessing import connection
2424
from multiprocessing.reduction import recv_handle
2525

26-
from ..work import LocalExecutor, start_threaded_work, delegate_work_to_pool
26+
from ..work import start_threaded_work, delegate_work_to_pool
2727
from ..event import EventQueue
28+
from ..work.fd import LocalFdExecutor
2829
from ...common.flag import flags
2930
from ...common.types import HostPort
3031
from ...common.logger import Logger
@@ -99,7 +100,7 @@ def __init__(
99100
# Internals
100101
self._total: Optional[int] = None
101102
self._local_work_queue: Optional['NonBlockingQueue'] = None
102-
self._local: Optional[LocalExecutor] = None
103+
self._local: Optional[LocalFdExecutor] = None
103104
self._lthread: Optional[threading.Thread] = None
104105

105106
def accept(
@@ -195,7 +196,7 @@ def _recv_and_setup_socks(self) -> None:
195196
def _start_local(self) -> None:
196197
assert self.socks
197198
self._local_work_queue = NonBlockingQueue()
198-
self._local = LocalExecutor(
199+
self._local = LocalFdExecutor(
199200
iid=self.idd,
200201
work_queue=self._local_work_queue,
201202
flags=self.flags,

proxy/core/work/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
"""
1515
from .pool import ThreadlessPool
1616
from .work import Work
17-
from .local import LocalExecutor
18-
from .remote import RemoteExecutor
17+
from .local import BaseLocalExecutor
1918
from .delegate import delegate_work_to_pool
2019
from .threaded import start_threaded_work
2120
from .threadless import Threadless
@@ -24,9 +23,8 @@
2423
__all__ = [
2524
'Work',
2625
'Threadless',
27-
'RemoteExecutor',
28-
'LocalExecutor',
2926
'ThreadlessPool',
3027
'delegate_work_to_pool',
3128
'start_threaded_work',
29+
'BaseLocalExecutor',
3230
]

proxy/core/work/fd/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
from .fd import ThreadlessFdExecutor
12+
from .local import LocalFdExecutor
13+
from .remote import RemoteFdExecutor
14+
15+
16+
__all__ = [
17+
'ThreadlessFdExecutor',
18+
'LocalFdExecutor',
19+
'RemoteFdExecutor',
20+
]
Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import logging
1313
from typing import Any, TypeVar, Optional
1414

15-
from ..event import eventNames
16-
from .threadless import Threadless
17-
from ...common.types import HostPort, TcpOrTlsSocket
15+
from ...event import eventNames
16+
from ..threadless import Threadless
17+
from ....common.types import HostPort, TcpOrTlsSocket
1818

1919

2020
T = TypeVar('T')
@@ -23,12 +23,10 @@
2323

2424

2525
class ThreadlessFdExecutor(Threadless[T]):
26+
"""A threadless executor which handles file descriptors
27+
and works with read/write events over a socket."""
2628

27-
def work(
28-
self,
29-
*args: Any,
30-
**kwargs: Any,
31-
) -> None:
29+
def work(self, **kwargs: Any) -> None:
3230
fileno: int = kwargs['fileno']
3331
addr: Optional[HostPort] = kwargs.get('addr', None)
3432
conn: Optional[TcpOrTlsSocket] = \

proxy/core/work/fd/local.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import queue
12+
import asyncio
13+
import contextlib
14+
from typing import Any, Optional
15+
16+
from .fd import ThreadlessFdExecutor
17+
from ....common.backports import NonBlockingQueue
18+
19+
20+
class LocalFdExecutor(ThreadlessFdExecutor[NonBlockingQueue]):
21+
"""A threadless executor implementation which uses a queue to receive new work."""
22+
23+
def __init__(self, *args: Any, **kwargs: Any) -> None:
24+
super().__init__(*args, **kwargs)
25+
self._loop: Optional[asyncio.AbstractEventLoop] = None
26+
27+
@property
28+
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
29+
if self._loop is None:
30+
self._loop = asyncio.get_event_loop_policy().new_event_loop()
31+
return self._loop
32+
33+
def work_queue_fileno(self) -> Optional[int]:
34+
return None
35+
36+
def receive_from_work_queue(self) -> bool:
37+
with contextlib.suppress(queue.Empty):
38+
work = self.work_queue.get()
39+
if isinstance(work, bool) and work is False:
40+
return True
41+
self.initialize(work)
42+
return False
43+
44+
def initialize(self, work: Any) -> None:
45+
assert isinstance(work, tuple)
46+
conn, addr = work
47+
# NOTE: Here we are assuming to receive a connection object
48+
# and not a fileno because we are a LocalExecutor.
49+
fileno = conn.fileno()
50+
self.work(fileno=fileno, addr=addr, conn=conn)
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,14 @@
99
:license: BSD, see LICENSE for more details.
1010
"""
1111
import asyncio
12-
import logging
1312
from typing import Any, Optional
1413
from multiprocessing import connection
1514
from multiprocessing.reduction import recv_handle
1615

1716
from .fd import ThreadlessFdExecutor
1817

1918

20-
logger = logging.getLogger(__name__)
21-
22-
23-
class RemoteExecutor(ThreadlessFdExecutor[connection.Connection]):
19+
class RemoteFdExecutor(ThreadlessFdExecutor[connection.Connection]):
2420
"""A threadless executor implementation which receives work over a connection.
2521
2622
NOTE: RemoteExecutor uses ``recv_handle`` to accept file descriptors.
@@ -40,12 +36,6 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:
4036
self._loop = asyncio.get_event_loop_policy().get_event_loop()
4137
return self._loop
4238

43-
def work_queue_fileno(self) -> Optional[int]:
44-
return self.work_queue.fileno()
45-
46-
def close_work_queue(self) -> None:
47-
self.work_queue.close()
48-
4939
def receive_from_work_queue(self) -> bool:
5040
# Acceptor will not send address for
5141
# unix socket domain environments.
@@ -55,3 +45,9 @@ def receive_from_work_queue(self) -> bool:
5545
fileno = recv_handle(self.work_queue)
5646
self.work(fileno=fileno, addr=addr)
5747
return False
48+
49+
def work_queue_fileno(self) -> Optional[int]:
50+
return self.work_queue.fileno()
51+
52+
def close_work_queue(self) -> None:
53+
self.work_queue.close()

proxy/core/work/local.py

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,14 @@
88
:copyright: (c) 2013-present by Abhinav Singh and contributors.
99
:license: BSD, see LICENSE for more details.
1010
"""
11-
import queue
1211
import asyncio
13-
import logging
14-
import contextlib
1512
from typing import Any, Optional
1613

17-
from .fd import ThreadlessFdExecutor
18-
from ...common.backports import ( # noqa: W0611, F401 pylint: disable=unused-import
19-
NonBlockingQueue,
20-
)
14+
from .threadless import Threadless
15+
from ...common.backports import NonBlockingQueue
2116

2217

23-
logger = logging.getLogger(__name__)
24-
25-
26-
class LocalExecutor(ThreadlessFdExecutor['NonBlockingQueue']):
18+
class BaseLocalExecutor(Threadless[NonBlockingQueue]):
2719
"""A threadless executor implementation which uses a queue to receive new work."""
2820

2921
def __init__(self, *args: Any, **kwargs: Any) -> None:
@@ -38,16 +30,3 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:
3830

3931
def work_queue_fileno(self) -> Optional[int]:
4032
return None
41-
42-
def receive_from_work_queue(self) -> bool:
43-
with contextlib.suppress(queue.Empty):
44-
work = self.work_queue.get()
45-
if isinstance(work, bool) and work is False:
46-
return True
47-
assert isinstance(work, tuple)
48-
conn, addr = work
49-
# NOTE: Here we are assuming to receive a connection object
50-
# and not a fileno because we are a LocalExecutor.
51-
fileno = conn.fileno()
52-
self.work(fileno=fileno, addr=addr, conn=conn)
53-
return False

proxy/core/work/pool.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@
1111
import logging
1212
import argparse
1313
import multiprocessing
14-
from typing import TYPE_CHECKING, Any, List, Optional
14+
from typing import TYPE_CHECKING, Any, List, Type, TypeVar, Optional
1515
from multiprocessing import connection
1616

17-
from .remote import RemoteExecutor
1817
from ...common.flag import flags
1918
from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS
2019

2120

2221
if TYPE_CHECKING: # pragma: no cover
2322
from ..event import EventQueue
23+
from .threadless import Threadless
24+
25+
T = TypeVar('T', bound='Threadless[Any]')
2426

2527
logger = logging.getLogger(__name__)
2628

@@ -70,6 +72,7 @@ class ThreadlessPool:
7072
def __init__(
7173
self,
7274
flags: argparse.Namespace,
75+
executor_klass: Type['T'],
7376
event_queue: Optional['EventQueue'] = None,
7477
) -> None:
7578
self.flags = flags
@@ -79,7 +82,9 @@ def __init__(
7982
self.work_pids: List[int] = []
8083
self.work_locks: List['multiprocessing.synchronize.Lock'] = []
8184
# List of threadless workers
82-
self._workers: List[RemoteExecutor] = []
85+
self._executor_klass = executor_klass
86+
# FIXME: Instead of Any type must be the executor klass
87+
self._workers: List[Any] = []
8388
self._processes: List[multiprocessing.Process] = []
8489

8590
def __enter__(self) -> 'ThreadlessPool':
@@ -115,8 +120,8 @@ def _start_worker(self, index: int) -> None:
115120
self.work_locks.append(multiprocessing.Lock())
116121
pipe = multiprocessing.Pipe()
117122
self.work_queues.append(pipe[0])
118-
w = RemoteExecutor(
119-
iid=index,
123+
w = self._executor_klass(
124+
iid=str(index),
120125
work_queue=pipe[1],
121126
flags=self.flags,
122127
event_queue=self.event_queue,

proxy/core/work/threadless.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def work_queue_fileno(self) -> Optional[int]:
120120
raise NotImplementedError()
121121

122122
@abstractmethod
123-
def work(self, *args: Any, **kwargs: Any) -> None:
123+
def work(self, **kwargs: Any) -> None:
124124
raise NotImplementedError()
125125

126126
def close_work_queue(self) -> None:

0 commit comments

Comments
 (0)