1+ use crossterm:: {
2+ event:: { self , Event , KeyCode , KeyModifiers } ,
3+ terminal:: { self } ,
4+ } ;
15use indicatif:: { ProgressBar , ProgressState , ProgressStyle } ;
26use std:: {
37 collections:: HashMap ,
48 fmt:: Write ,
59 io:: SeekFrom ,
610 sync:: { atomic:: AtomicU64 , Arc } ,
11+ time:: Duration ,
712} ;
813use tokio:: {
914 fs:: { create_dir_all, File } ,
1015 io:: { AsyncSeekExt , AsyncWriteExt } ,
16+ signal,
1117} ;
18+ use tokio_util:: sync:: CancellationToken ;
1219use tracing:: trace;
1320
14- use bit_rev:: { session:: Session , utils} ;
21+ use bit_rev:: {
22+ session:: { DownloadState , PieceResult , Session } ,
23+ utils,
24+ } ;
25+
26+ fn graceful_shutdown ( ) {
27+ let _ = terminal:: disable_raw_mode ( ) ;
28+ println ! ( "\n \n Shutting down gracefully..." ) ;
29+ std:: process:: exit ( 0 ) ;
30+ }
1531
1632#[ tokio:: main]
1733async fn main ( ) {
@@ -25,12 +41,14 @@ async fn main() {
2541 let output = std:: env:: args ( ) . nth ( 2 ) ;
2642
2743 if let Err ( err) = download_file ( & filename, output) . await {
44+ let _ = terminal:: disable_raw_mode ( ) ;
2845 eprintln ! ( "Error: {:?}" , err) ;
2946 }
3047}
3148
3249pub async fn download_file ( filename : & str , out_file : Option < String > ) -> anyhow:: Result < ( ) > {
33- let session = Session :: new ( ) ;
50+ let session = Arc :: new ( Session :: new ( ) ) ;
51+ let shutdown_token = CancellationToken :: new ( ) ;
3452
3553 let add_torrent_result = session. add_torrent ( filename. into ( ) ) . await ?;
3654 let torrent = add_torrent_result. torrent . clone ( ) ;
@@ -85,56 +103,198 @@ pub async fn download_file(filename: &str, out_file: Option<String>) -> anyhow::
85103
86104 let total_downloaded = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
87105 let total_downloaded_clone = total_downloaded. clone ( ) ;
106+ let session_clone = session. clone ( ) ;
88107
108+ // Spawn progress update task
89109 tokio:: spawn ( async move {
90110 loop {
91111 let new = total_downloaded_clone. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
92112 pb. set_position ( new) ;
93- pb. set_message ( "Downloading" ) ;
113+ let status = match session_clone. get_download_state ( ) {
114+ DownloadState :: Init => "Initializing" ,
115+ DownloadState :: Downloading => "Downloading" ,
116+ DownloadState :: Paused => "Paused" ,
117+ } ;
118+ pb. set_message ( status) ;
94119 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) . await ;
95120 }
96121 } ) ;
97122
98- let mut hashset = std:: collections:: HashSet :: new ( ) ;
99- while hashset. len ( ) < torrent. piece_hashes . len ( ) {
100- let pr = add_torrent_result. pr_rx . recv_async ( ) . await ?;
123+ // Enable raw mode for single keypress detection
124+ terminal:: enable_raw_mode ( ) . expect ( "Failed to enable raw mode" ) ;
101125
102- hashset. insert ( pr. index ) ;
126+ // Set up Ctrl+C signal handler
127+ let _shutdown_token_signal = shutdown_token. clone ( ) ;
128+ tokio:: spawn ( async move {
129+ let mut sigint = signal:: unix:: signal ( signal:: unix:: SignalKind :: interrupt ( ) )
130+ . expect ( "Failed to install SIGINT handler" ) ;
103131
104- // Map piece to files and write data accordingly
105- let file_mappings = utils :: map_piece_to_files ( & torrent , pr . index as usize ) ;
106- let mut piece_offset = 0 ;
132+ sigint . recv ( ) . await ;
133+ graceful_shutdown ( ) ;
134+ } ) ;
107135
108- for mapping in file_mappings {
109- let file = file_handles. get_mut ( & mapping. file_index ) . ok_or_else ( || {
110- anyhow:: anyhow!( "File handle not found for index {}" , mapping. file_index)
111- } ) ?;
136+ // Spawn keyboard input handler
137+ let session_input = session. clone ( ) ;
138+ let shutdown_token_input = shutdown_token. clone ( ) ;
139+ tokio:: spawn ( async move {
140+ loop {
141+ // Check for cancellation
142+ if shutdown_token_input. is_cancelled ( ) {
143+ break ;
144+ }
145+
146+ if event:: poll ( Duration :: from_millis ( 100 ) ) . unwrap_or ( false ) {
147+ if let Ok ( Event :: Key ( key_event) ) = event:: read ( ) {
148+ match key_event. code {
149+ KeyCode :: Char ( 'p' ) | KeyCode :: Char ( 'P' ) => {
150+ match session_input. get_download_state ( ) {
151+ DownloadState :: Paused => {
152+ session_input. resume ( ) ;
153+ }
154+ DownloadState :: Downloading => {
155+ session_input. pause ( ) ;
156+ }
157+ DownloadState :: Init => {
158+ println ! ( "\r \n Cannot pause during initialization" ) ;
159+ }
160+ }
161+ }
162+ KeyCode :: Char ( 'q' ) | KeyCode :: Char ( 'Q' ) => {
163+ graceful_shutdown ( ) ;
164+ }
165+ KeyCode :: Char ( 'c' )
166+ if key_event. modifiers . contains ( KeyModifiers :: CONTROL ) =>
167+ {
168+ graceful_shutdown ( ) ;
169+ }
170+ _ => { }
171+ }
172+ }
173+ }
174+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
175+ }
176+ } ) ;
112177
113- // Seek to correct position in file
114- file. seek ( SeekFrom :: Start ( mapping. file_offset as u64 ) )
178+ let mut hashset = std:: collections:: HashSet :: new ( ) ;
179+ let mut pending_pieces: Vec < _ > = Vec :: new ( ) ; // Queue for pieces received while paused
180+
181+ while hashset. len ( ) < torrent. piece_hashes . len ( ) {
182+ // Check for shutdown signal
183+ if shutdown_token. is_cancelled ( ) {
184+ break ;
185+ }
186+ // Process any pending pieces first if we're now downloading
187+ if session. get_download_state ( ) == DownloadState :: Downloading && !pending_pieces. is_empty ( )
188+ {
189+ let pieces_to_process = std:: mem:: take ( & mut pending_pieces) ;
190+ for pr in pieces_to_process {
191+ process_piece (
192+ & pr,
193+ & torrent,
194+ & mut file_handles,
195+ & mut hashset,
196+ & total_downloaded,
197+ )
115198 . await ?;
199+ }
200+ }
116201
117- // Write the portion of the piece that belongs to this file
118- let piece_data = & pr. buf [ piece_offset..piece_offset + mapping. length ] ;
119- file. write_all ( piece_data) . await ?;
202+ // Use a timeout to periodically check if we should process pending pieces
203+ let pr_result = tokio:: time:: timeout (
204+ Duration :: from_millis ( 100 ) ,
205+ add_torrent_result. pr_rx . recv_async ( ) ,
206+ )
207+ . await ;
120208
121- piece_offset += mapping. length ;
209+ match pr_result {
210+ Ok ( Ok ( pr) ) => {
211+ // If paused, queue the piece but don't process it yet
212+ if session. get_download_state ( ) != DownloadState :: Downloading {
213+ pending_pieces. push ( pr) ;
214+ continue ;
215+ }
122216
123- trace ! (
124- "Wrote {} bytes to file {} at offset {}" ,
125- mapping. length,
126- mapping. file_index,
127- mapping. file_offset
128- ) ;
217+ // Process piece immediately if downloading
218+ process_piece (
219+ & pr,
220+ & torrent,
221+ & mut file_handles,
222+ & mut hashset,
223+ & total_downloaded,
224+ )
225+ . await ?;
226+ }
227+ Ok ( Err ( _) ) => {
228+ // Channel closed
229+ break ;
230+ }
231+ Err ( _) => {
232+ // Timeout - continue loop to check pending pieces
233+ continue ;
234+ }
129235 }
236+ }
130237
131- total_downloaded. fetch_add ( pr. length as u64 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
238+ // Process any remaining pending pieces at the end
239+ for pr in pending_pieces {
240+ process_piece (
241+ & pr,
242+ & torrent,
243+ & mut file_handles,
244+ & mut hashset,
245+ & total_downloaded,
246+ )
247+ . await ?;
132248 }
133249
134250 // Sync all files
135251 for ( _, file) in file_handles {
136252 file. sync_all ( ) . await ?;
137253 }
138254
255+ // Restore terminal on completion
256+ let _ = terminal:: disable_raw_mode ( ) ;
257+ println ! ( "\n Download completed!" ) ;
258+
139259 Ok ( ( ) )
140260}
261+
262+ async fn process_piece (
263+ pr : & PieceResult ,
264+ torrent : & bit_rev:: torrent:: Torrent ,
265+ file_handles : & mut HashMap < usize , File > ,
266+ hashset : & mut std:: collections:: HashSet < u32 > ,
267+ total_downloaded : & Arc < AtomicU64 > ,
268+ ) -> anyhow:: Result < bool > {
269+ hashset. insert ( pr. index ) ;
270+
271+ // Map piece to files and write data accordingly
272+ let file_mappings = utils:: map_piece_to_files ( torrent, pr. index as usize ) ;
273+ let mut piece_offset = 0 ;
274+
275+ for mapping in file_mappings {
276+ let file = file_handles. get_mut ( & mapping. file_index ) . ok_or_else ( || {
277+ anyhow:: anyhow!( "File handle not found for index {}" , mapping. file_index)
278+ } ) ?;
279+
280+ // Seek to correct position in file
281+ file. seek ( SeekFrom :: Start ( mapping. file_offset as u64 ) )
282+ . await ?;
283+
284+ // Write the portion of the piece that belongs to this file
285+ let piece_data = & pr. buf [ piece_offset..piece_offset + mapping. length ] ;
286+ file. write_all ( piece_data) . await ?;
287+
288+ piece_offset += mapping. length ;
289+
290+ trace ! (
291+ "Wrote {} bytes to file {} at offset {}" ,
292+ mapping. length,
293+ mapping. file_index,
294+ mapping. file_offset
295+ ) ;
296+ }
297+
298+ total_downloaded. fetch_add ( pr. length as u64 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
299+ Ok ( true )
300+ }
0 commit comments