Skip to content

Commit 3e5d0e9

Browse files
[EventDispatcher] Guard against broken pipe and eof (#1074)
* Guard against broken pipe and eof * refactor * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 05d761b commit 3e5d0e9

File tree

1 file changed

+38
-14
lines changed

1 file changed

+38
-14
lines changed

proxy/core/event/dispatcher.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,26 @@ def __init__(
5252

5353
def handle_event(self, ev: Dict[str, Any]) -> None:
5454
if ev['event_name'] == eventNames.SUBSCRIBE:
55-
self.subscribers[ev['event_payload']['sub_id']] = \
56-
ev['event_payload']['conn']
55+
sub_id = ev['event_payload']['sub_id']
56+
self.subscribers[sub_id] = ev['event_payload']['conn']
5757
# send ack
58-
ev['event_payload']['conn'].send({
59-
'event_name': eventNames.SUBSCRIBED,
60-
})
58+
if not self._send(
59+
sub_id, {
60+
'event_name': eventNames.SUBSCRIBED,
61+
},
62+
):
63+
self._close_and_delete(sub_id)
6164
elif ev['event_name'] == eventNames.UNSUBSCRIBE:
62-
if ev['event_payload']['sub_id'] in self.subscribers:
65+
sub_id = ev['event_payload']['sub_id']
66+
if sub_id in self.subscribers:
6367
# send ack
6468
logger.debug('unsubscription request ack sent')
65-
self.subscribers[ev['event_payload']['sub_id']].send({
66-
'event_name': eventNames.UNSUBSCRIBED,
67-
})
68-
# close conn and delete subscriber
69-
self.subscribers[ev['event_payload']['sub_id']].close()
70-
del self.subscribers[ev['event_payload']['sub_id']]
69+
self._send(
70+
sub_id, {
71+
'event_name': eventNames.UNSUBSCRIBED,
72+
},
73+
)
74+
self._close_and_delete(sub_id)
7175
else:
7276
logger.info(
7377
'unsubscription request ack not sent, subscriber already gone',
@@ -87,11 +91,12 @@ def run(self) -> None:
8791
self.run_once()
8892
except queue.Empty:
8993
pass
90-
except (BrokenPipeError, EOFError, KeyboardInterrupt):
94+
except KeyboardInterrupt:
9195
pass
9296
except Exception as e:
9397
logger.exception('Dispatcher exception', exc_info=e)
9498
finally:
99+
logger.info('Dispatcher shutdown')
95100
# Send shutdown message to all active subscribers
96101
self._broadcast({
97102
'event_name': eventNames.DISPATCHER_SHUTDOWN,
@@ -106,7 +111,26 @@ def _broadcast(self, ev: Dict[str, Any]) -> None:
106111
logger.warning(
107112
'Subscriber#%s broken pipe', sub_id,
108113
)
109-
self.subscribers[sub_id].close()
114+
self._close(sub_id)
110115
broken_pipes.append(sub_id)
111116
for sub_id in broken_pipes:
112117
del self.subscribers[sub_id]
118+
119+
def _close_and_delete(self, sub_id: str) -> None:
120+
self._close(sub_id)
121+
del self.subscribers[sub_id]
122+
123+
def _close(self, sub_id: str) -> None:
124+
try:
125+
self.subscribers[sub_id].close()
126+
except Exception: # noqa: S110
127+
pass
128+
129+
def _send(self, sub_id: str, payload: Any) -> bool:
130+
done = False
131+
try:
132+
self.subscribers[sub_id].send(payload)
133+
done = True
134+
except (BrokenPipeError, EOFError):
135+
pass
136+
return done

0 commit comments

Comments
 (0)