3
3
import multiprocessing
4
4
import os
5
5
import subprocess
6
- import sys
7
6
import tempfile
8
7
import textwrap
9
8
import uuid
10
9
from multiprocessing import queues
10
+ from multiprocessing .managers import BaseManager
11
11
from typing import AnyStr
12
12
13
13
import pexpect .fdpexpect
16
16
17
17
from .utils import Meta , remove_asci_color_code , to_bytes , to_str , utcnow_str
18
18
19
- if sys .platform == 'darwin' :
20
- _ctx = multiprocessing .get_context ('fork' )
21
- else :
22
- _ctx = multiprocessing .get_context ()
19
+ _ctx = multiprocessing .get_context ('spawn' )
20
+
21
+
22
+ class MessageQueueManager (BaseManager ):
23
+ pass
23
24
24
25
25
26
class MessageQueue (queues .Queue ):
@@ -40,7 +41,7 @@ def put(self, obj, **kwargs):
40
41
_b = to_bytes (obj )
41
42
try :
42
43
super ().put (_b , ** kwargs )
43
- except : # noqa # queue might be closed
44
+ except Exception : # queue might be closed
44
45
pass
45
46
46
47
def write (self , s : AnyStr ):
@@ -53,6 +54,9 @@ def isatty(self):
53
54
return True
54
55
55
56
57
+ MessageQueueManager .register ('MessageQueue' , MessageQueue )
58
+
59
+
56
60
class PexpectProcess (pexpect .fdpexpect .fdspawn ):
57
61
"""
58
62
Use a temp file to gather multiple inputs into one output, and do `pexpect.expect()` from one place.
@@ -146,16 +150,16 @@ def live_print_call(*args, msg_queue: MessageQueue | None = None, expect_returnc
146
150
147
151
class _PopenRedirectProcess (_ctx .Process ):
148
152
def __init__ (self , msg_queue : MessageQueue , logfile : str ):
149
- self ._q = msg_queue
150
-
151
- self .logfile = logfile
152
-
153
- super ().__init__ (target = self ._forward_io , daemon = True ) # killed by the main process
153
+ super ().__init__ (target = self ._forward_io , args = (msg_queue , logfile ), daemon = True )
154
154
155
- def _forward_io (self ) -> None :
156
- with open (self .logfile ) as fr :
155
+ @staticmethod
156
+ def _forward_io (msg_queue , logfile ) -> None :
157
+ with open (logfile ) as fr :
157
158
while True :
158
- self ._q .put (fr .read ())
159
+ try :
160
+ msg_queue .put (fr .read ()) # msg_queue may be closed
161
+ except Exception :
162
+ break
159
163
160
164
161
165
class DuplicateStdoutPopen (subprocess .Popen ):
0 commit comments