@@ -59,8 +59,8 @@ def __init__(self, options):
5959 self .options = options
6060 self .process_groups = {}
6161 self .ticks = {}
62- self .process_queue = deque ()
63- self .waiting_to_be_spawned = set ()
62+ self .process_spawn_queue = deque ()
63+ self .process_spawn_set = set ()
6464
6565 def main (self ):
6666 if not self .options .first :
@@ -270,33 +270,7 @@ def runforever(self):
270270 for group in pgroups :
271271 group .transition (self )
272272
273- # check for processes that are sheduled but not started
274- if self .process_queue :
275- # spawn if all dependencies are running or process has no dependencies
276- if self .process_queue [- 1 ].config .depends_on is not None :
277- if all ([dependee .get_state () == ProcessStates .RUNNING for dependee in self .process_queue [- 1 ].config .depends_on .values ()]):
278- ready_process = self .process_queue .pop ()
279- ready_process .spawn (self )
280- try :
281- self .waiting_to_be_spawned .remove (ready_process .config .name )
282- except KeyError :
283- pass
284- # add any dependee which is not RUNNING or STARTING to queue
285- else :
286- for dependee in self .process_queue [- 1 ].config .depends_on .values ():
287- if dependee .state is not (ProcessStates .STARTING or ProcessStates .RUNNING ):
288- # Should we check if the process is already in the queue?
289- if dependee .config .name not in self .waiting_to_be_spawned :
290- self .process_queue .append (dependee )
291- self .waiting_to_be_spawned .add (dependee .config .name )
292-
293- else :
294- ready_process = self .process_queue .pop ()
295- ready_process .spawn (self )
296- try :
297- self .waiting_to_be_spawned .remove (ready_process .config .name )
298- except KeyError :
299- pass
273+ self .spawn_dependee_queue ()
300274
301275 self .reap ()
302276 self .handle_signal ()
@@ -373,6 +347,76 @@ def handle_signal(self):
373347 def get_state (self ):
374348 return self .options .mood
375349
350+ def spawn_dependee_queue (self ):
351+ """
352+ check for processes that are sheduled but not started
353+ if the queue is well ordered just pop and spawn all that are ready,
354+ Currently ordering is not considered when adding to queue -
355+ therefore just iterate over queue and spawn all ready processes.
356+ """
357+ if self .process_spawn_queue :
358+ for i in range (len (self .process_spawn_queue )- 1 , - 1 , - 1 ):
359+ if self .process_spawn_queue [i ].config .depends_on is not None :
360+ if any ([dependee .state is ProcessStates .FATAL for dependee in
361+ self .process_spawn_queue [i ].config .depends_on .values ()]):
362+ for process in self .process_spawn_queue :
363+ process .record_spawnerr (
364+ 'Dependee process did not start - set FATAL state for {}'
365+ .format (process .config .name ))
366+ process .change_state (ProcessStates .FATAL )
367+ self .process_spawn_set = set ()
368+ self .process_spawn_queue .clear ()
369+ break
370+ if all ([dependee .state is ProcessStates .RUNNING for dependee in
371+ self .process_spawn_queue [i ].config .depends_on .values ()]):
372+ self .spawn_me = self .process_spawn_queue [i ]
373+ del self .process_spawn_queue [i ]
374+ self .spawn_me .spawn (self )
375+ self .last_spawned = time .time ()
376+ self .last_print = time .time ()
377+ else :
378+ self .spawn_me = self .process_spawn_queue [i ]
379+ del self .process_spawn_queue [i ]
380+ self .spawn_me .spawn (self )
381+ self .last_spawned = time .time ()
382+ self .last_print = time .time ()
383+
384+ try :
385+ if (time .time () - self .last_spawned ) >= self .spawn_me .config .spawn_timeout :
386+ msg = (
387+ "timeout: dependee process {} in {} state for over {} seconds, {} are not spawned"
388+ .format (self .spawn_me .config .name ,
389+ getProcessStateDescription (self .spawn_me .state ),
390+ self .spawn_me .config .spawn_timeout ,
391+ [process .config .name for process in self .process_spawn_queue ]))
392+ self .process_spawn_queue [i ].config .options .logger .warn (msg )
393+ self .spawn_me .record_spawnerr (
394+ 'Process {} stayed for over {} seconds in STARTING' .format (
395+ self .spawn_me .config .name , self .spawn_me .config .spawn_timeout ))
396+ self .spawn_me .change_state (ProcessStates .FATAL )
397+ for process in self .process_spawn_queue :
398+ process .record_spawnerr (
399+ 'Dependee process {} did not start - set FATAL state for {}' .format (
400+ self .spawn_me .config .name , process .config .name ))
401+ process .change_state (ProcessStates .FATAL )
402+ self .process_spawn_set = set ()
403+ self .process_spawn_queue .clear ()
404+ self .last_spawned = time .time ()
405+ elif (time .time () - self .last_print ) >= 5 :
406+ msg = ("waiting for dependee process {} in {} state to be RUNNING"
407+ .format (self .spawn_me .config .name ,
408+ getProcessStateDescription (self .spawn_me .state )))
409+ self .process_spawn_queue [0 ].config .options .logger .info (msg )
410+ self .last_print = time .time ()
411+ except UnboundLocalError :
412+ self .last_spawned = time .time ()
413+ self .last_print = time .time ()
414+
415+
416+ # empty set - all processes from queue are running
417+ else :
418+ self .process_spawn_set = set ()
419+
376420def timeslice (period , when ):
377421 return int (when - (when % period ))
378422
0 commit comments