@@ -23,13 +23,15 @@ use sqlx::{
2323 } ,
2424} ;
2525use tokio:: { fs, select, task:: JoinSet , time} ;
26+ use tokio_util:: sync:: { CancellationToken , DropGuard } ;
2627use tracing:: { info, instrument} ;
2728
2829use crate :: config:: Config ;
2930
3031pub struct InflightActivationStore {
3132 config : InflightActivationStoreConfig ,
3233 shards : Vec < Arc < InflightActivationShard > > ,
34+ _maintenance_shutdown : DropGuard ,
3335}
3436
3537impl InflightActivationStore {
@@ -73,31 +75,37 @@ impl InflightActivationStore {
7375 shards
7476 } ;
7577
78+ let maintenance_shutdown = CancellationToken :: new ( ) ;
79+
7680 for ( i, shard) in shards. iter ( ) . cloned ( ) . enumerate ( ) {
81+ let cancellation = maintenance_shutdown. clone ( ) ;
7782 tokio:: spawn ( async move {
78- let guard = elegant_departure:: get_shutdown_guard ( ) . shutdown_on_drop ( ) ;
79-
8083 let mut timer = time:: interval ( Duration :: from_millis ( config. vacuum_interval_ms ) ) ;
8184
8285 timer. set_missed_tick_behavior ( time:: MissedTickBehavior :: Skip ) ;
8386 loop {
8487 select ! {
8588 _ = timer. tick( ) => {
8689 shard. vacuum_db( ) . await . unwrap_or_else( |_| {
90+ drop( elegant_departure:: get_shutdown_guard( ) . shutdown_on_drop( ) ) ;
8791 panic!( "Failed to run maintenance vacuum on shard {:}" , i)
8892 } ) ;
8993 info!( "ran maintenance vacuum on shard {:}" , i) ;
9094 }
9195
92- _ = guard . wait ( ) => {
96+ _ = cancellation . cancelled ( ) => {
9397 break ;
9498 }
9599 }
96100 }
97101 } ) ;
98102 }
99103
100- Ok ( Self { config, shards } )
104+ Ok ( Self {
105+ config,
106+ shards,
107+ _maintenance_shutdown : maintenance_shutdown. drop_guard ( ) ,
108+ } )
101109 }
102110
103111 fn route ( & self , id : & str ) -> usize {
0 commit comments