@@ -80,6 +80,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
8080 shutdown : sd ,
8181 containers : make (map [string ]* runc.Container ),
8282 running : make (map [int ][]containerProcess ),
83+ pendingExecs : make (map [* runc.Container ]int ),
8384 exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
8485 }
8586 go s .processExits ()
@@ -113,8 +114,9 @@ type service struct {
113114
114115 containers map [string ]* runc.Container
115116
116- lifecycleMu sync.Mutex
117- running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
117+ lifecycleMu sync.Mutex
118+ running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
119+ pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
118120 // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
119121 // dereferencing the subscription pointers must only be done while holding
120122 // lifecycleMu.
@@ -129,26 +131,23 @@ type containerProcess struct {
129131}
130132
131133// preStart prepares for starting a container process and handling its exit.
132- // The container being started should be passed in as c when starting the
133- // container init process for an already-created container. c should be nil when
134- // creating a container or when starting an exec.
134+ // The container being started should be passed in as c when starting the container
135+ // init process for an already-created container. c should be nil when creating a
136+ // container or when starting an exec.
135137//
136138// The returned handleStarted closure records that the process has started so
137139// that its exit can be handled efficiently. If the process has already exited,
138- // it handles the exit immediately. handleStarted should be called after the
139- // event announcing the start of the process has been published .
140- // Note that handleStarted needs to be aware of whether s.mu is already held
141- // when it is called. If s.mu has been held, we don't need to lock it when
142- // calling handleProcessExit .
140+ // it handles the exit immediately. In addition, if the process is an exec and
141+ // its container's init process has already exited, that exit is also processed .
142+ // handleStarted should be called after the event announcing the start of the
143+ // process has been published. Note that s.lifecycleMu must not be held when
144+ // calling handleStarted .
143145//
144146// The returned cleanup closure releases resources used to handle early exits.
145147// It must be called before the caller of preStart returns, otherwise severe
146148// memory leaks will occur.
147- func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process , bool ), cleanup func ()) {
149+ func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process ), cleanup func ()) {
148150 exits := make (map [int ][]runcC.Exit )
149-
150- s .lifecycleMu .Lock ()
151- defer s .lifecycleMu .Unlock ()
152151 s .exitSubscribers [& exits ] = struct {}{}
153152
154153 if c != nil {
@@ -168,30 +167,65 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
168167 }
169168 }
170169
171- handleStarted = func (c * runc.Container , p process.Process , muLocked bool ) {
170+ handleStarted = func (c * runc.Container , p process.Process ) {
172171 var pid int
173172 if p != nil {
174173 pid = p .Pid ()
175174 }
176175
176+ _ , init := p .(* process.Init )
177177 s .lifecycleMu .Lock ()
178+
179+ var initExits []runcC.Exit
180+ var initCps []containerProcess
181+ if ! init {
182+ s .pendingExecs [c ]--
183+
184+ initPid := c .Pid ()
185+ iExits , initExited := exits [initPid ]
186+ if initExited && s .pendingExecs [c ] == 0 {
187+ // c's init process has exited before handleStarted was called and
188+ // this is the last pending exec process start - we need to process
189+ // the exit for the init process after processing this exec, so:
190+ // - delete c from the s.pendingExecs map
191+ // - keep the exits for the init pid to process later (after we process
192+ // this exec's exits)
193+ // - get the necessary containerProcesses for the init process (that we
194+ // need to process the exits), and remove them from s.running (which we skipped
195+ // doing in processExits).
196+ delete (s .pendingExecs , c )
197+ initExits = iExits
198+ var skipped []containerProcess
199+ for _ , initPidCp := range s .running [initPid ] {
200+ if initPidCp .Container == c {
201+ initCps = append (initCps , initPidCp )
202+ } else {
203+ skipped = append (skipped , initPidCp )
204+ }
205+ }
206+ if len (skipped ) == 0 {
207+ delete (s .running , initPid )
208+ } else {
209+ s .running [initPid ] = skipped
210+ }
211+ }
212+ }
213+
178214 ees , exited := exits [pid ]
179215 delete (s .exitSubscribers , & exits )
180216 exits = nil
181- if pid == 0 { // no-op
182- s .lifecycleMu .Unlock ()
183- } else if exited {
217+ if pid == 0 || exited {
184218 s .lifecycleMu .Unlock ()
185219 for _ , ee := range ees {
186- if muLocked {
187- s .handleProcessExit (ee , c , p )
188- } else {
189- s .mu .Lock ()
190- s .handleProcessExit (ee , c , p )
191- s .mu .Unlock ()
220+ s .handleProcessExit (ee , c , p )
221+ }
222+ for _ , eee := range initExits {
223+ for _ , cp := range initCps {
224+ s .handleProcessExit (eee , cp .Container , cp .Process )
192225 }
193226 }
194227 } else {
228+ // Process start was successful, add to `s.running`.
195229 s .running [pid ] = append (s .running [pid ], containerProcess {
196230 Container : c ,
197231 Process : p ,
@@ -216,7 +250,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
216250 s .mu .Lock ()
217251 defer s .mu .Unlock ()
218252
253+ s .lifecycleMu .Lock ()
219254 handleStarted , cleanup := s .preStart (nil )
255+ s .lifecycleMu .Unlock ()
220256 defer cleanup ()
221257
222258 container , err := runc .NewContainer (ctx , s .platform , r )
@@ -244,7 +280,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
244280 // could happen would also cause the container.Pid() call above to
245281 // nil-deference panic.
246282 proc , _ := container .Process ("" )
247- handleStarted (container , proc , true )
283+ handleStarted (container , proc )
248284
249285 return & taskAPI.CreateTaskResponse {
250286 Pid : uint32 (container .Pid ()),
@@ -264,14 +300,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
264300 }
265301
266302 var cinit * runc.Container
303+ s .lifecycleMu .Lock ()
267304 if r .ExecID == "" {
268305 cinit = container
306+ } else {
307+ s .pendingExecs [container ]++
269308 }
270309 handleStarted , cleanup := s .preStart (cinit )
310+ s .lifecycleMu .Unlock ()
271311 defer cleanup ()
312+
272313 p , err := container .Start (ctx , r )
273314 if err != nil {
274- handleStarted (container , p , false )
315+ handleStarted (container , p )
275316 return nil , errdefs .ToGRPC (err )
276317 }
277318
@@ -311,7 +352,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
311352 Pid : uint32 (p .Pid ()),
312353 })
313354 }
314- handleStarted (container , p , false )
355+ handleStarted (container , p )
315356 return & taskAPI.StartResponse {
316357 Pid : uint32 (p .Pid ()),
317358 }, nil
@@ -635,14 +676,27 @@ func (s *service) processExits() {
635676 // Handle the exit for a created/started process. If there's more than
636677 // one, assume they've all exited. One of them will be the correct
637678 // process.
638- cps := s .running [e .Pid ]
639- delete (s .running , e .Pid )
679+ var cps , skipped []containerProcess
680+ for _ , cp := range s .running [e .Pid ] {
681+ if s .pendingExecs [cp .Container ] != 0 {
682+ // This exit relates to a container for which we have pending execs. In
683+ // order to ensure order between execs and the init process for a given
684+ // container, skip processing this exit here and let the `handleStarted`
685+ // closure for the pending exec publish it.
686+ skipped = append (skipped , cp )
687+ } else {
688+ cps = append (cps , cp )
689+ }
690+ }
691+ if len (skipped ) > 0 {
692+ s .running [e .Pid ] = skipped
693+ } else {
694+ delete (s .running , e .Pid )
695+ }
640696 s .lifecycleMu .Unlock ()
641697
642698 for _ , cp := range cps {
643- s .mu .Lock ()
644699 s .handleProcessExit (e , cp .Container , cp .Process )
645- s .mu .Unlock ()
646700 }
647701 }
648702}
0 commit comments