Skip to content

Commit de36c60

Browse files
[Examples] Fix broken examples (#1077)
* Fix broken examples * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix mypy * Pylint ignore Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 96ec796 commit de36c60

File tree

11 files changed

+88
-61
lines changed

11 files changed

+88
-61
lines changed

examples/pubsub_eventing.py

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -66,42 +66,37 @@ def publisher_process(
6666
# Create a subscriber.
6767
# Internally, subscribe will start a separate thread
6868
# to receive incoming published messages.
69-
subscriber = EventSubscriber(event_manager.queue, callback=on_event)
70-
subscriber.setup()
71-
72-
# Start a publisher process to demonstrate safe exchange
73-
# of messages between processes.
74-
publisher_shutdown_event = multiprocessing.Event()
75-
publisher = multiprocessing.Process(
76-
target=publisher_process, args=(
77-
publisher_shutdown_event, event_manager.queue, ),
78-
)
79-
publisher.start()
80-
81-
# Dispatch event from main process too
82-
# to demonstrate safe exchange of messages
83-
# between threads.
84-
try:
85-
while True:
86-
event_manager.queue.publish(
87-
request_id='1234',
88-
event_name=eventNames.WORK_STARTED,
89-
event_payload={'time': time.time()},
90-
publisher_id='eventing_pubsub_main',
91-
)
92-
except KeyboardInterrupt:
93-
logger.info('bye!!!')
94-
finally:
95-
# Stop publisher process
96-
publisher_shutdown_event.set()
97-
publisher.join()
98-
# Stop subscriber thread
99-
subscriber.unsubscribe()
100-
logger.info(
101-
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
102-
num_events_received[0], num_events_received[1], time.time(
103-
) - start_time,
104-
),
69+
with EventSubscriber(event_manager.queue, callback=on_event) as subscriber:
70+
# Start a publisher process to demonstrate safe exchange
71+
# of messages between processes.
72+
publisher_shutdown_event = multiprocessing.Event()
73+
publisher = multiprocessing.Process(
74+
target=publisher_process, args=(
75+
publisher_shutdown_event, event_manager.queue, ),
10576
)
106-
if subscriber:
107-
subscriber.shutdown(do_unsubscribe=False)
77+
publisher.start()
78+
79+
# Dispatch event from main process too
80+
# to demonstrate safe exchange of messages
81+
# between threads.
82+
try:
83+
while True:
84+
event_manager.queue.publish(
85+
request_id='1234',
86+
event_name=eventNames.WORK_STARTED,
87+
event_payload={'time': time.time()},
88+
publisher_id='eventing_pubsub_main',
89+
)
90+
except KeyboardInterrupt:
91+
logger.info('KBE!!!')
92+
finally:
93+
# Stop publisher process
94+
publisher_shutdown_event.set()
95+
publisher.join()
96+
logger.info(
97+
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
98+
num_events_received[0], num_events_received[1], time.time(
99+
) - start_time,
100+
),
101+
)
102+
logger.info('Done!!!')

examples/ssl_echo_client.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,29 @@
88
:copyright: (c) 2013-present by Abhinav Singh and contributors.
99
:license: BSD, see LICENSE for more details.
1010
"""
11+
import ssl
1112
import logging
1213

1314
from proxy.core.connection import TcpServerConnection
14-
from proxy.common.constants import DEFAULT_BUFFER_SIZE
15+
from proxy.common.constants import DEFAULT_LOG_FORMAT, DEFAULT_BUFFER_SIZE
1516

1617

18+
logging.basicConfig(level=logging.INFO, format=DEFAULT_LOG_FORMAT)
19+
1720
logger = logging.getLogger(__name__)
1821

1922
if __name__ == '__main__':
20-
client = TcpServerConnection('::', 12345)
23+
client = TcpServerConnection('127.0.0.1', 12345)
2124
client.connect()
22-
client.wrap('example.com', ca_file='ca-cert.pem')
23-
# wrap() will by default set connection to nonblocking
24-
# flip it back to blocking
25-
client.connection.setblocking(True)
25+
client.wrap(
26+
None, # 'localhost',
27+
ca_file='ca-cert.pem',
28+
# For self-signed certs you will have
29+
# to disable verification. Or you can
30+
# add your CA certificate in the CA bundle
31+
# and then enable verify.
32+
verify_mode=ssl.VerifyMode.CERT_NONE,
33+
)
2634
try:
2735
while True:
2836
client.send(b'hello')

examples/ssl_echo_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ def main() -> None:
5252
threadless=True,
5353
num_workers=1,
5454
port=12345,
55-
keyfile='https-key.pem',
56-
certfile='https-signed-cert.pem',
55+
key_file='https-key.pem',
56+
cert_file='https-signed-cert.pem',
5757
):
5858
try:
5959
while True:

examples/tcp_echo_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
import logging
1212

1313
from proxy.common.utils import socket_connection
14-
from proxy.common.constants import DEFAULT_BUFFER_SIZE
14+
from proxy.common.constants import DEFAULT_LOG_FORMAT, DEFAULT_BUFFER_SIZE
1515

1616

17+
logging.basicConfig(level=logging.INFO, format=DEFAULT_LOG_FORMAT)
18+
1719
logger = logging.getLogger(__name__)
1820

1921
if __name__ == '__main__':
20-
with socket_connection(('::', 12345)) as client:
22+
with socket_connection(('127.0.0.1', 12345)) as client:
2123
while True:
2224
client.send(b'hello')
2325
data = client.recv(DEFAULT_BUFFER_SIZE)

examples/websocket_client.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
from proxy.http.websocket import (
1515
WebsocketFrame, WebsocketClient, websocketOpcodes,
1616
)
17+
from proxy.common.constants import DEFAULT_LOG_FORMAT
1718

1819

20+
logging.basicConfig(level=logging.INFO, format=DEFAULT_LOG_FORMAT)
21+
1922
# globals
2023
client: WebsocketClient
2124
last_dispatch_time: float
@@ -47,9 +50,9 @@ def on_message(frame: WebsocketFrame) -> None:
4750
if __name__ == '__main__':
4851
# Constructor establishes socket connection
4952
client = WebsocketClient(
50-
b'echo.websocket.org',
51-
80,
52-
b'/',
53+
b'localhost',
54+
8899,
55+
b'/ws-route-example',
5356
on_message=on_message,
5457
)
5558
# Perform handshake

proxy/common/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,15 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]:
213213

214214

215215
def wrap_socket(
216-
conn: socket.socket, keyfile: str,
217-
certfile: str,
216+
conn: socket.socket,
217+
keyfile: str,
218+
certfile: str,
219+
cafile: Optional[str] = None,
218220
) -> ssl.SSLSocket:
219221
"""Use this to upgrade server_side socket to TLS."""
220222
ctx = ssl.create_default_context(
221223
ssl.Purpose.CLIENT_AUTH,
224+
cafile=cafile,
222225
)
223226
ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
224227
ctx.verify_mode = ssl.CERT_NONE

proxy/core/base/tcp_server.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,19 @@ async def handle_readables(self, readables: Readables) -> bool:
185185
if self.work.connection.fileno() in readables:
186186
try:
187187
data = self.work.recv(self.flags.client_recvbuf_size)
188+
except ConnectionResetError:
189+
logger.info(
190+
'Connection reset by client {0}'.format(
191+
self.work.address,
192+
),
193+
)
194+
return True
188195
except TimeoutError:
189-
logger.info('Client recv timeout error')
196+
logger.info(
197+
'Client recv timeout error {0}'.format(
198+
self.work.address,
199+
),
200+
)
190201
return True
191202
if data is None:
192203
logger.debug(

proxy/core/connection/server.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,19 @@ def connect(
4545

4646
def wrap(
4747
self,
48-
hostname: str,
48+
hostname: Optional[str] = None,
4949
ca_file: Optional[str] = None,
5050
as_non_blocking: bool = False,
51+
# Ref https://github.com/PyCQA/pylint/issues/3691
52+
verify_mode: ssl.VerifyMode = ssl.VerifyMode.CERT_REQUIRED, # pylint: disable=E1101
5153
) -> None:
5254
ctx = ssl.create_default_context(
53-
ssl.Purpose.SERVER_AUTH, cafile=ca_file,
55+
ssl.Purpose.SERVER_AUTH,
56+
cafile=ca_file,
5457
)
5558
ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
56-
ctx.check_hostname = True
59+
ctx.check_hostname = hostname is not None
60+
ctx.verify_mode = verify_mode
5761
self.connection.setblocking(True)
5862
self._conn = ctx.wrap_socket(
5963
self.connection,

proxy/core/event/dispatcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ def run(self) -> None:
9999
except Exception as e:
100100
logger.exception('Dispatcher exception', exc_info=e)
101101
finally:
102-
logger.info('Dispatcher shutdown')
103102
# Send shutdown message to all active subscribers
104103
self._broadcast({
105104
'event_name': eventNames.DISPATCHER_SHUTDOWN,
106105
})
106+
logger.info('Dispatcher shutdown')
107107

108108
def _broadcast(self, ev: Dict[str, Any]) -> None:
109109
broken_pipes: List[str] = []

proxy/core/event/queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def publish(
5151
event_payload: Dict[str, Any],
5252
publisher_id: Optional[str] = None,
5353
) -> None:
54-
self.queue.put_nowait({
54+
self.queue.put({
5555
'process_id': os.getpid(),
5656
'thread_id': threading.get_ident(),
5757
'event_timestamp': time.time(),
@@ -71,7 +71,7 @@ def subscribe(
7171
sub_id is a subscription identifier which must be globally
7272
unique. channel MUST be a multiprocessing connection.
7373
"""
74-
self.queue.put_nowait({
74+
self.queue.put({
7575
'event_name': eventNames.SUBSCRIBE,
7676
'event_payload': {'sub_id': sub_id, 'conn': channel},
7777
})
@@ -81,7 +81,7 @@ def unsubscribe(
8181
sub_id: str,
8282
) -> None:
8383
"""Unsubscribe by subscriber id."""
84-
self.queue.put_nowait({
84+
self.queue.put({
8585
'event_name': eventNames.UNSUBSCRIBE,
8686
'event_payload': {'sub_id': sub_id},
8787
})

0 commit comments

Comments
 (0)