Skip to content

Commit 96ec796

Browse files
authored
[EventCore] Use no_wait when publishing to queues (#1076)
* Use `no_wait` when publishing to the event core queues * Update readme flag version
1 parent 3e5d0e9 commit 96ec796

File tree

9 files changed

+71
-25
lines changed

9 files changed

+71
-25
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2298,7 +2298,7 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
22982298
[--filtered-client-ips FILTERED_CLIENT_IPS]
22992299
[--filtered-url-regex-config FILTERED_URL_REGEX_CONFIG]
23002300

2301-
proxy.py v2.4.0rc9.dev12+g558a430.d20220126
2301+
proxy.py v2.4.0rc10.dev13+g96428ae.d20220126
23022302

23032303
options:
23042304
-h, --help show this help message and exit

proxy/core/event/dispatcher.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ class EventDispatcher:
3737
When --enable-events is used, dispatcher module is automatically
3838
started. Most importantly, dispatcher module ensures that queue is
3939
not flooded and doesn't utilize too much memory in case there are no
40-
event subscriber is enabled.
40+
event subscribers for published messages.
41+
42+
EventDispatcher ensures that subscribers will receive the messages
43+
in the order they are published.
4144
"""
4245

4346
def __init__(

proxy/core/event/queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def publish(
5151
event_payload: Dict[str, Any],
5252
publisher_id: Optional[str] = None,
5353
) -> None:
54-
self.queue.put({
54+
self.queue.put_nowait({
5555
'process_id': os.getpid(),
5656
'thread_id': threading.get_ident(),
5757
'event_timestamp': time.time(),
@@ -71,7 +71,7 @@ def subscribe(
7171
sub_id is a subscription identifier which must be globally
7272
unique. channel MUST be a multiprocessing connection.
7373
"""
74-
self.queue.put({
74+
self.queue.put_nowait({
7575
'event_name': eventNames.SUBSCRIBE,
7676
'event_payload': {'sub_id': sub_id, 'conn': channel},
7777
})
@@ -81,7 +81,7 @@ def unsubscribe(
8181
sub_id: str,
8282
) -> None:
8383
"""Unsubscribe by subscriber id."""
84-
self.queue.put({
84+
self.queue.put_nowait({
8585
'event_name': eventNames.UNSUBSCRIBE,
8686
'event_payload': {'sub_id': sub_id},
8787
})

tutorial/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# PROXY.PY TUTORIAL
2+
3+
This directory contains Jupyter Notebook based tutorial.
4+
5+
- [Welcome](welcome.ipynb)
6+
- [Http Parser](http_parser.ipynb)
7+
- [Responses](responses.ipynb)
8+
- [Requests](requests.ipynb)
9+
- [Eventing](eventing.ipynb)

tutorial/eventing.ipynb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Eventing\n",
8+
"\n",
9+
"Under [working with proxy.py](welcome.ipynb) we saw that plugin instances are not global. So, how to manage shared data? You have 2 options:\n",
10+
"\n",
11+
"1. Use in-built Python provided concepts of shared memory in a `multiprocessing` environment.\n",
12+
"2. Use `proxy.py` provided eventing core\n",
13+
"\n",
14+
"`proxy.py` provides an in-built mechanism to publish and subscribe to messages between processes within a running instance. With a pubsub facility in hand, we can develop variety of models e.g. multi-producer-single-subscriber model. Imagine, publishing one or multiple messages from your plugin instances (per-request) and then processing them in near real-time within a single subscriber thread."
15+
]
16+
}
17+
],
18+
"metadata": {
19+
"language_info": {
20+
"name": "python"
21+
},
22+
"orig_nbformat": 4
23+
},
24+
"nbformat": 4,
25+
"nbformat_minor": 2
26+
}

tutorial/http_parser.ipynb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
},
1414
{
1515
"cell_type": "code",
16-
"execution_count": 1,
16+
"execution_count": 2,
1717
"metadata": {},
1818
"outputs": [
1919
{
@@ -30,7 +30,7 @@
3030
"from proxy.common.constants import HTTP_1_1\n",
3131
"\n",
3232
"get_request = HttpParser(httpParserTypes.REQUEST_PARSER)\n",
33-
"get_request.parse(b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\n\\r\\n')\n",
33+
"get_request.parse(memoryview(b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\n\\r\\n'))\n",
3434
"\n",
3535
"print(get_request.build())\n",
3636
"\n",
@@ -55,7 +55,7 @@
5555
},
5656
{
5757
"cell_type": "code",
58-
"execution_count": 2,
58+
"execution_count": 3,
5959
"metadata": {},
6060
"outputs": [
6161
{
@@ -69,7 +69,7 @@
6969
],
7070
"source": [
7171
"proxy_request = HttpParser(httpParserTypes.REQUEST_PARSER)\n",
72-
"proxy_request.parse(b'GET http://jaxl.com/ HTTP/1.1\\r\\nHost: jaxl.com\\r\\n\\r\\n')\n",
72+
"proxy_request.parse(memoryview(b'GET http://jaxl.com/ HTTP/1.1\\r\\nHost: jaxl.com\\r\\n\\r\\n'))\n",
7373
"\n",
7474
"print(proxy_request.build())\n",
7575
"print(proxy_request.build(for_proxy=True))\n",
@@ -111,7 +111,7 @@
111111
],
112112
"source": [
113113
"connect_request = HttpParser(httpParserTypes.REQUEST_PARSER)\n",
114-
"connect_request.parse(b'CONNECT jaxl.com:443 HTTP/1.1\\r\\nHost: jaxl.com:443\\r\\n\\r\\n')\n",
114+
"connect_request.parse(memoryview(b'CONNECT jaxl.com:443 HTTP/1.1\\r\\nHost: jaxl.com:443\\r\\n\\r\\n'))\n",
115115
"\n",
116116
"print(connect_request.build())\n",
117117
"print(connect_request.build(for_proxy=True))\n",

tutorial/requests.ipynb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"name": "stdout",
2323
"output_type": "stream",
2424
"text": [
25-
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nUser-Agent: proxy.py v2.4.0rc9.dev8+gea0253d.d20220126\\r\\n\\r\\n'\n"
25+
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\n\\r\\n'\n"
2626
]
2727
}
2828
],
@@ -55,7 +55,7 @@
5555
{
5656
"data": {
5757
"text/plain": [
58-
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nUser-Agent: proxy.py v2.4.0rc9.dev8+gea0253d.d20220126\\r\\n\\r\\n'"
58+
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nUser-Agent: proxy.py v2.4.0rc10.dev13+g96428ae.d20220126\\r\\n\\r\\n'"
5959
]
6060
},
6161
"execution_count": 2,
@@ -154,7 +154,7 @@
154154
{
155155
"data": {
156156
"text/plain": [
157-
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nUser-Agent: proxy.py v2.4.0rc9.dev8+gea0253d.d20220126\\r\\nConnection: close\\r\\n\\r\\n'"
157+
"b'GET / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nUser-Agent: proxy.py v2.4.0rc10.dev13+g96428ae.d20220126\\r\\nConnection: close\\r\\n\\r\\n'"
158158
]
159159
},
160160
"execution_count": 5,
@@ -186,7 +186,7 @@
186186
{
187187
"data": {
188188
"text/plain": [
189-
"b'POST / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nContent-Type: application/x-www-form-urlencoded\\r\\nContent-Length: 21\\r\\nUser-Agent: proxy.py v2.4.0rc9.dev8+gea0253d.d20220126\\r\\nConnection: close\\r\\n\\r\\nkey=value&hello=world'"
189+
"b'POST / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nContent-Type: application/x-www-form-urlencoded\\r\\nContent-Length: 21\\r\\nUser-Agent: proxy.py v2.4.0rc10.dev13+g96428ae.d20220126\\r\\nConnection: close\\r\\n\\r\\nkey=value&hello=world'"
190190
]
191191
},
192192
"execution_count": 6,
@@ -214,16 +214,16 @@
214214
},
215215
{
216216
"cell_type": "code",
217-
"execution_count": 9,
217+
"execution_count": 7,
218218
"metadata": {},
219219
"outputs": [
220220
{
221221
"data": {
222222
"text/plain": [
223-
"b'POST / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nTransfer-Encoding: chunked\\r\\nContent-Type: application/x-www-form-urlencoded\\r\\nUser-Agent: proxy.py v2.4.0rc9.dev8+gea0253d.d20220126\\r\\nConnection: close\\r\\n\\r\\n5\\r\\nkey=v\\r\\n5\\r\\nalue&\\r\\n5\\r\\nhello\\r\\n5\\r\\n=worl\\r\\n1\\r\\nd\\r\\n0\\r\\n\\r\\n'"
223+
"b'POST / HTTP/1.1\\r\\nHost: jaxl.com\\r\\nTransfer-Encoding: chunked\\r\\nContent-Type: application/x-www-form-urlencoded\\r\\nUser-Agent: proxy.py v2.4.0rc10.dev13+g96428ae.d20220126\\r\\nConnection: close\\r\\n\\r\\n5\\r\\nkey=v\\r\\n5\\r\\nalue&\\r\\n5\\r\\nhello\\r\\n5\\r\\n=worl\\r\\n1\\r\\nd\\r\\n0\\r\\n\\r\\n'"
224224
]
225225
},
226-
"execution_count": 9,
226+
"execution_count": 7,
227227
"metadata": {},
228228
"output_type": "execute_result"
229229
}

tutorial/responses.ipynb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
},
1616
{
1717
"cell_type": "code",
18-
"execution_count": 2,
18+
"execution_count": 1,
1919
"metadata": {},
2020
"outputs": [
2121
{
@@ -49,7 +49,7 @@
4949
},
5050
{
5151
"cell_type": "code",
52-
"execution_count": 7,
52+
"execution_count": 2,
5353
"metadata": {},
5454
"outputs": [
5555
{
@@ -77,7 +77,7 @@
7777
},
7878
{
7979
"cell_type": "code",
80-
"execution_count": 5,
80+
"execution_count": 3,
8181
"metadata": {},
8282
"outputs": [
8383
{
@@ -107,7 +107,7 @@
107107
},
108108
{
109109
"cell_type": "code",
110-
"execution_count": 6,
110+
"execution_count": 4,
111111
"metadata": {},
112112
"outputs": [
113113
{
@@ -142,14 +142,14 @@
142142
},
143143
{
144144
"cell_type": "code",
145-
"execution_count": 16,
145+
"execution_count": 5,
146146
"metadata": {},
147147
"outputs": [
148148
{
149149
"name": "stdout",
150150
"output_type": "stream",
151151
"text": [
152-
"b'HTTP/1.1 200 OK\\r\\nX-Custom-Header: my value\\r\\nContent-Encoding: gzip\\r\\nContent-Length: 23\\r\\n\\r\\n\\x1f\\x8b\\x08\\x00F\\x0e\\xf1a\\x02\\xff\\xf3\\xf0\\xc0\\x02\\x00h\\x81?s\\x15\\x00\\x00\\x00'\n"
152+
"b\"HTTP/1.1 200 OK\\r\\nX-Custom-Header: my value\\r\\nContent-Encoding: gzip\\r\\nContent-Length: 23\\r\\n\\r\\n\\x1f\\x8b\\x08\\x00\\x80'\\xf4a\\x02\\xff\\xf3\\xf0\\xc0\\x02\\x00h\\x81?s\\x15\\x00\\x00\\x00\"\n"
153153
]
154154
}
155155
],

tutorial/welcome.ipynb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,17 @@
1414
"\n",
1515
"Today, `proxy.py` has matured into a full blown networking library with focus on being lightweight, ability to deliver maximum performance while being extendible. Unlike other Python servers, `proxy.py` doesn't need a `WSGI` or `UWSI` frontend, which then usually has to be placed behind a reverse proxy e.g. `Nginx` or `Apache`. Of-course, `proxy.py` can be placed directly behind a load-balancer _(optionally capable of speaking HA proxy protocol)_.\n",
1616
"\n",
17-
"## Asyncio\n",
17+
"## Working with proxy.py\n",
1818
"\n",
19-
"TBD\n",
19+
"To work with `proxy.py`, you must follow these critical concepts:\n",
20+
"\n",
21+
"1. Avoid using synchronous IO operations within your code\n",
22+
"\n",
23+
" `proxy.py` is asynchronous in nature and by making a synchronous call in your plugin code, you may block the entire core event loop. For asynchronous operations, you must tie into the `proxy.py` event loop using the provided plugin APIs.\n",
24+
"\n",
25+
"2. Plugin instances are NOT global\n",
26+
"\n",
27+
" Plugin instances are created for every request. Hence, your plugin code must be written to handle execution of a single request. `proxy.py` will internally take care of concurrency for you.\n",
2028
"\n",
2129
"## The Concept Of Work\n",
2230
"\n",

0 commit comments

Comments
 (0)