11use serde_bencode:: de;
22use std:: sync:: { atomic:: AtomicBool , Arc , Mutex } ;
3- use tokio:: { select, sync:: Semaphore } ;
3+ use tokio:: { select, sync:: Semaphore , time :: sleep } ;
44use tracing:: { debug, error} ;
55
66use crate :: {
@@ -11,7 +11,7 @@ use crate::{
1111 } ,
1212 peer_state:: PeerStates ,
1313 protocol_udp:: request_udp_peers,
14- session:: { PieceResult , PieceWork } ,
14+ session:: { DownloadState , PieceResult , PieceWork } ,
1515} ;
1616
1717#[ derive( Debug , Clone ) ]
@@ -23,6 +23,7 @@ pub struct TrackerPeers {
2323 pub piece_rx : flume:: Receiver < FullPiece > ,
2424 pub pr_rx : flume:: Receiver < PieceResult > ,
2525 pub have_broadcast : Arc < tokio:: sync:: broadcast:: Sender < u32 > > ,
26+ pub download_state : Arc < Mutex < DownloadState > > ,
2627}
2728
2829impl TrackerPeers {
@@ -33,6 +34,7 @@ impl TrackerPeers {
3334 peer_states : Arc < PeerStates > ,
3435 have_broadcast : Arc < tokio:: sync:: broadcast:: Sender < u32 > > ,
3536 pr_rx : flume:: Receiver < PieceResult > ,
37+ download_state : Arc < Mutex < DownloadState > > ,
3638 ) -> TrackerPeers {
3739 let ( sender, receiver) = flume:: unbounded ( ) ;
3840 TrackerPeers {
@@ -43,9 +45,31 @@ impl TrackerPeers {
4345 pr_rx,
4446 peer_states,
4547 have_broadcast,
48+ download_state,
4649 }
4750 }
4851
52+ pub fn set_download_state ( & self , state : DownloadState ) {
53+ let mut current_state = self . download_state . lock ( ) . unwrap ( ) ;
54+ * current_state = state;
55+ }
56+
57+ pub fn get_download_state ( & self ) -> DownloadState {
58+ * self . download_state . lock ( ) . unwrap ( )
59+ }
60+
61+ pub fn is_paused ( & self ) -> bool {
62+ self . get_download_state ( ) == DownloadState :: Paused
63+ }
64+
65+ pub fn is_downloading ( & self ) -> bool {
66+ self . get_download_state ( ) == DownloadState :: Downloading
67+ }
68+
69+ pub fn is_init ( & self ) -> bool {
70+ self . get_download_state ( ) == DownloadState :: Init
71+ }
72+
4973 pub async fn connect ( & self , pieces_of_work : Vec < PieceWork > ) {
5074 let info_hash = self . torrent_meta . info_hash ;
5175 let peer_id = self . peer_id ;
@@ -70,6 +94,7 @@ impl TrackerPeers {
7094 let peer_states = self . peer_states . clone ( ) ;
7195 let piece_tx = self . piece_tx . clone ( ) ;
7296 let have_broadcast = self . have_broadcast . clone ( ) ;
97+ let download_state = self . download_state . clone ( ) ;
7398 let torrent_downloaded_state = Arc :: new ( TorrentDownloadedState {
7499 semaphore : Semaphore :: new ( 1 ) ,
75100 pieces : pieces_of_work
@@ -84,13 +109,21 @@ impl TrackerPeers {
84109 } ) ;
85110 tokio:: spawn ( async move {
86111 loop {
112+ // Wait while not downloading
113+ while {
114+ let state = * download_state. lock ( ) . unwrap ( ) ;
115+ state != DownloadState :: Downloading
116+ } {
117+ sleep ( std:: time:: Duration :: from_millis ( 100 ) ) . await ;
118+ }
87119 // Handle TCP trackers
88120 for tracker in tcp_trackers. clone ( ) {
89121 let torrent_meta = torrent_meta. clone ( ) ;
90122 let peer_states = peer_states. clone ( ) ;
91123 let piece_tx = piece_tx. clone ( ) ;
92124 let have_broadcast = have_broadcast. clone ( ) ;
93125 let torrent_downloaded_state = torrent_downloaded_state. clone ( ) ;
126+ let download_state = download_state. clone ( ) ;
94127 tokio:: spawn ( async move {
95128 let url = file:: build_tracker_url ( & torrent_meta, & peer_id, 6881 , & tracker)
96129 . map_err ( |e| {
@@ -105,12 +138,15 @@ impl TrackerPeers {
105138 Ok ( new_peers) => {
106139 process_peers (
107140 new_peers,
108- info_hash,
109- peer_id,
110- peer_states. clone ( ) ,
111- piece_tx. clone ( ) ,
112- have_broadcast. clone ( ) ,
113- torrent_downloaded_state. clone ( ) ,
141+ PeerProcessorConfig {
142+ info_hash,
143+ peer_id,
144+ peer_states : peer_states. clone ( ) ,
145+ piece_tx : piece_tx. clone ( ) ,
146+ have_broadcast : have_broadcast. clone ( ) ,
147+ torrent_downloaded_state : torrent_downloaded_state. clone ( ) ,
148+ download_state : download_state. clone ( ) ,
149+ } ,
114150 )
115151 . await ;
116152
@@ -141,6 +177,7 @@ impl TrackerPeers {
141177 let piece_tx = piece_tx. clone ( ) ;
142178 let have_broadcast = have_broadcast. clone ( ) ;
143179 let torrent_downloaded_state = torrent_downloaded_state. clone ( ) ;
180+ let download_state = download_state. clone ( ) ;
144181 tokio:: spawn ( async move {
145182 match request_udp_peers ( & tracker, & torrent_meta, & peer_id, 6881 ) . await {
146183 Ok ( udp_response) => {
@@ -156,12 +193,15 @@ impl TrackerPeers {
156193
157194 process_peers (
158195 new_peers,
159- info_hash,
160- peer_id,
161- peer_states. clone ( ) ,
162- piece_tx. clone ( ) ,
163- have_broadcast. clone ( ) ,
164- torrent_downloaded_state. clone ( ) ,
196+ PeerProcessorConfig {
197+ info_hash,
198+ peer_id,
199+ peer_states : peer_states. clone ( ) ,
200+ piece_tx : piece_tx. clone ( ) ,
201+ have_broadcast : have_broadcast. clone ( ) ,
202+ torrent_downloaded_state : torrent_downloaded_state. clone ( ) ,
203+ download_state : download_state. clone ( ) ,
204+ } ,
165205 )
166206 . await ;
167207
@@ -185,24 +225,36 @@ impl TrackerPeers {
185225 }
186226}
187227
188- async fn process_peers (
189- new_peers : Vec < std:: net:: SocketAddr > ,
228+ struct PeerProcessorConfig {
190229 info_hash : [ u8 ; 20 ] ,
191230 peer_id : [ u8 ; 20 ] ,
192231 peer_states : Arc < PeerStates > ,
193232 piece_tx : flume:: Sender < FullPiece > ,
194233 have_broadcast : Arc < tokio:: sync:: broadcast:: Sender < u32 > > ,
195234 torrent_downloaded_state : Arc < TorrentDownloadedState > ,
196- ) {
235+ download_state : Arc < Mutex < DownloadState > > ,
236+ }
237+
238+ async fn process_peers ( new_peers : Vec < std:: net:: SocketAddr > , config : PeerProcessorConfig ) {
239+ let info_hash = config. info_hash ;
240+ let peer_id = config. peer_id ;
241+
197242 for peer in new_peers {
198- if peer_states. clone ( ) . states . contains_key ( & peer) {
243+ // Skip processing new peers if not downloading
244+ let current_state = * config. download_state . lock ( ) . unwrap ( ) ;
245+ if current_state != DownloadState :: Downloading {
246+ continue ;
247+ }
248+
249+ if config. peer_states . clone ( ) . states . contains_key ( & peer) {
199250 continue ;
200251 }
201252
202- let piece_tx = piece_tx. clone ( ) ;
203- let have_broadcast = have_broadcast. clone ( ) ;
204- let torrent_downloaded_state = torrent_downloaded_state. clone ( ) ;
205- let peer_states = peer_states. clone ( ) ;
253+ let piece_tx = config. piece_tx . clone ( ) ;
254+ let have_broadcast = config. have_broadcast . clone ( ) ;
255+ let torrent_downloaded_state = config. torrent_downloaded_state . clone ( ) ;
256+ let peer_states = config. peer_states . clone ( ) ;
257+ let download_state = config. download_state . clone ( ) ;
206258
207259 tokio:: spawn ( async move {
208260 let unchoke_notify = tokio:: sync:: Notify :: new ( ) ;
@@ -215,6 +267,7 @@ async fn process_peers(
215267 peer_writer_tx. clone ( ) ,
216268 peer_states. clone ( ) ,
217269 torrent_downloaded_state. clone ( ) ,
270+ download_state. clone ( ) ,
218271 ) ) ;
219272
220273 let peer_connection =
0 commit comments