Skip to content

Commit 99c98de

Browse files
author
Michael Hammann
committed
fix: added startDependeeProcesses function which spawns dependee processes before. This currently blocks the supervisor runforever loop but does the job
1 parent 549c23a commit 99c98de

File tree

1 file changed

+83
-24
lines changed

1 file changed

+83
-24
lines changed

supervisor/rpcinterface.py

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
from supervisor.events import notify
2626
from supervisor.events import RemoteCommunicationEvent
2727

28+
from supervisor.medusa import asyncore_25 as asyncore
29+
2830
from supervisor.http import NOT_DONE_YET
2931
from supervisor.xmlrpc import (
3032
capped_int,
@@ -283,28 +285,9 @@ def startProcess(self, name, wait=True):
283285
"""
284286
## check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
285287
group, process = self._getGroupAndProcess(name)
286-
#if process.config.depends_on is not None:
287-
# # keep track of RUNNING childs
288-
# running_childs = set()
289-
# # wait/loop until all childs are running
290-
# while set(process.config.depends_on.keys()) != running_childs:
291-
# for child in process.config.depends_on.values():
292-
# if child.get_state() != ProcessStates.RUNNING:
293-
# # potentially remove child, if it is in running list
294-
# if child.config.name in running_childs:
295-
# running_childs.remove(child.config.name)
296-
# # check if it needs to be started
297-
# if child.state is not (ProcessStates.STARTING or ProcessStates.RUNNING):
298-
# self.startProcess(child.group.config.name + ':' + child.config.name)
299-
# else:
300-
# child.transition()
301-
# msg = ("waiting on dependee process {} to reach running state - currently in {}"
302-
# .format(child.config.name, getProcessStateDescription(child.state)))
303-
# self.supervisord.options.logger.warn(msg)
304-
# else:
305-
# # child is running - add to set
306-
# running_childs.add(child.config.name)
307-
# time.sleep(0.5)
288+
if process.config.depends_on is not None:
289+
self.startDependeeProcesses(process)
290+
308291

309292
self._update('startProcess')
310293
group, process = self._getGroupAndProcess(name)
@@ -330,8 +313,6 @@ def startProcess(self, name, wait=True):
330313

331314
process.spawn(self.supervisord)
332315

333-
#TODO make sure this process is properly spawned in the end and correctly handled now.
334-
335316
# We call reap() in order to more quickly obtain the side effects of
336317
# process.finish(), which reap() eventually ends up calling. This
337318
# might be the case if the spawn() was successful but then the process
@@ -378,6 +359,84 @@ def onwait():
378359

379360
return True
380361

362+
def startDependeeProcesses(self, process, name):
363+
""" Start a Process but make sure that dependee processes are started
364+
before and in RUNNING state. This is a blocking call and a lot of code
365+
is copied from supervisor runforever loop. This needs to be improved
366+
but is currently required to use depends_on in combination with
367+
runningregex and rpcinterfaces such as supervisorctl.
368+
"""
369+
# keep track of RUNNING childs
370+
running_childs = set()
371+
# wait/loop until all childs are running
372+
while set(process.config.depends_on.keys()) != running_childs:
373+
for dependee in process.config.depends_on.values():
374+
if dependee.get_state() != ProcessStates.RUNNING:
375+
# potentially remove child, if it is in running list
376+
if dependee.config.name in running_childs:
377+
running_childs.remove(dependee.config.name)
378+
# check if it needs to be started
379+
if dependee.state is not (ProcessStates.STARTING or ProcessStates.RUNNING):
380+
self.startProcess(dependee.group.config.name + ':' + dependee.config.name)
381+
else:
382+
timeout = 1
383+
socket_map = self.supervisord.options.get_socket_map()
384+
combined_map = {}
385+
combined_map.update(socket_map)
386+
combined_map.update(self.supervisord.get_process_map())
387+
388+
for fd, dispatcher in combined_map.items():
389+
if dispatcher.readable():
390+
self.supervisord.options.poller.register_readable(fd)
391+
if dispatcher.writable():
392+
self.supervisord.options.poller.register_writable(fd)
393+
394+
r, w = self.supervisord.options.poller.poll(timeout)
395+
396+
for fd in r:
397+
if fd in combined_map:
398+
try:
399+
dispatcher = combined_map[fd]
400+
self.supervisord.options.logger.blather(
401+
'read event caused by %(dispatcher)r',
402+
dispatcher=dispatcher)
403+
dispatcher.handle_read_event()
404+
if not dispatcher.readable():
405+
self.supervisord.options.poller.unregister_readable(fd)
406+
except asyncore.ExitNow:
407+
raise
408+
except:
409+
combined_map[fd].handle_error()
410+
411+
for fd in w:
412+
if fd in combined_map:
413+
try:
414+
dispatcher = combined_map[fd]
415+
self.supervisord.options.logger.blather(
416+
'write event caused by %(dispatcher)r',
417+
dispatcher=dispatcher)
418+
dispatcher.handle_write_event()
419+
if not dispatcher.writable():
420+
self.supervisord.options.poller.unregister_writable(fd)
421+
except asyncore.ExitNow:
422+
raise
423+
except:
424+
combined_map[fd].handle_error()
425+
426+
self.supervisord.reap()
427+
if dependee.spawnerr:
428+
dependee.state = ProcessStates.FATAL
429+
raise RPCError(Faults.SPAWN_ERROR, name)
430+
dependee.transition()
431+
432+
msg = ("waiting on dependee process {} to reach running state - currently in {}"
433+
.format(dependee.config.name, getProcessStateDescription(dependee.state)))
434+
self.supervisord.options.logger.info(msg)
435+
else:
436+
# child is running - add to set
437+
running_childs.add(dependee.config.name)
438+
time.sleep(0.5)
439+
381440
def startProcessGroup(self, name, wait=True):
382441
""" Start all processes in the group named 'name'
383442

0 commit comments

Comments
 (0)