Skip to content

Commit a3460d4

Browse files
authored
feat: add udp support (#3)
1 parent 51ae89b commit a3460d4

File tree

2 files changed

+396
-76
lines changed

2 files changed

+396
-76
lines changed

crates/bit_rev/src/protocol_udp.rs

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,256 @@
1+
use anyhow::{anyhow, Result};
2+
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
3+
use rand::Rng;
4+
use std::io::{Cursor, Read, Write};
5+
use std::net::{SocketAddr, ToSocketAddrs};
6+
use std::time::Duration;
7+
use tokio::net::UdpSocket;
8+
use tokio::time::{timeout, Instant};
9+
use tracing::{debug, info};
110

11+
use crate::file::TorrentMeta;
12+
13+
const PROTOCOL_ID: u64 = 0x41727101980;
14+
const ACTION_CONNECT: u32 = 0;
15+
const ACTION_ANNOUNCE: u32 = 1;
16+
const ACTION_SCRAPE: u32 = 2;
17+
const ACTION_ERROR: u32 = 3;
18+
19+
#[derive(Debug, Clone)]
20+
pub struct UdpTracker {
21+
pub url: String,
22+
pub connection_id: Option<u64>,
23+
pub last_connect: Option<Instant>,
24+
}
25+
26+
#[derive(Debug, Clone)]
27+
pub struct UdpPeer {
28+
pub ip: [u8; 4],
29+
pub port: u16,
30+
}
31+
32+
#[derive(Debug, Clone)]
33+
pub struct UdpAnnounceResponse {
34+
pub action: u32,
35+
pub transaction_id: u32,
36+
pub interval: u32,
37+
pub leechers: u32,
38+
pub seeders: u32,
39+
pub peers: Vec<UdpPeer>,
40+
}
41+
42+
impl UdpTracker {
43+
pub fn new(url: String) -> Self {
44+
Self {
45+
url,
46+
connection_id: None,
47+
last_connect: None,
48+
}
49+
}
50+
51+
pub async fn announce(
52+
&mut self,
53+
torrent_meta: &TorrentMeta,
54+
peer_id: &[u8; 20],
55+
port: u16,
56+
uploaded: u64,
57+
downloaded: u64,
58+
left: u64,
59+
event: u32,
60+
) -> Result<UdpAnnounceResponse> {
61+
// Check if we need to connect/reconnect
62+
if self.connection_id.is_none()
63+
|| self.last_connect.map_or(true, |t| t.elapsed() > Duration::from_secs(60))
64+
{
65+
self.connect().await?;
66+
}
67+
68+
let connection_id = self.connection_id.ok_or_else(|| anyhow!("No connection ID"))?;
69+
70+
let socket = UdpSocket::bind("0.0.0.0:0").await?;
71+
let addr = self.parse_udp_url()?;
72+
// info!("Using UDP tracker at {}", addr);
73+
74+
let transaction_id: u32 = rand::thread_rng().gen();
75+
76+
// Build announce request
77+
let mut request = Vec::new();
78+
request.write_u64::<BigEndian>(connection_id)?;
79+
request.write_u32::<BigEndian>(ACTION_ANNOUNCE)?;
80+
request.write_u32::<BigEndian>(transaction_id)?;
81+
request.write_all(&torrent_meta.info_hash)?;
82+
request.write_all(peer_id)?;
83+
request.write_u64::<BigEndian>(downloaded)?;
84+
request.write_u64::<BigEndian>(left)?;
85+
request.write_u64::<BigEndian>(uploaded)?;
86+
request.write_u32::<BigEndian>(event)?; // 0: none, 1: completed, 2: started, 3: stopped
87+
request.write_u32::<BigEndian>(0)?; // IP address (0 = default)
88+
request.write_u32::<BigEndian>(rand::thread_rng().gen())?; // key
89+
request.write_i32::<BigEndian>(-1)?; // num_want (-1 = default)
90+
request.write_u16::<BigEndian>(port)?;
91+
92+
debug!("Sending UDP announce request to {}", addr);
93+
socket.send_to(&request, addr).await?;
94+
95+
// Receive response with timeout
96+
let mut buf = [0u8; 1024];
97+
let (len, _) = timeout(Duration::from_secs(15), socket.recv_from(&mut buf)).await??;
98+
99+
self.parse_announce_response(&buf[..len], transaction_id)
100+
}
101+
102+
async fn connect(&mut self) -> Result<()> {
103+
let socket = UdpSocket::bind("0.0.0.0:0").await?;
104+
let addr = self.parse_udp_url()?;
105+
106+
let transaction_id: u32 = rand::thread_rng().gen();
107+
108+
// Build connect request
109+
let mut request = Vec::new();
110+
request.write_u64::<BigEndian>(PROTOCOL_ID)?;
111+
request.write_u32::<BigEndian>(ACTION_CONNECT)?;
112+
request.write_u32::<BigEndian>(transaction_id)?;
113+
114+
debug!("Sending UDP connect request to {}", addr);
115+
socket.send_to(&request, addr).await?;
116+
117+
// Receive response with timeout
118+
let mut buf = [0u8; 16];
119+
let (len, _) = timeout(Duration::from_secs(15), socket.recv_from(&mut buf)).await??;
120+
121+
if len < 16 {
122+
return Err(anyhow!("Connect response too short: {} bytes", len));
123+
}
124+
125+
let mut cursor = Cursor::new(&buf[..len]);
126+
let action = cursor.read_u32::<BigEndian>()?;
127+
let response_transaction_id = cursor.read_u32::<BigEndian>()?;
128+
129+
if action == ACTION_ERROR {
130+
let error_msg = String::from_utf8_lossy(&buf[8..len]);
131+
return Err(anyhow!("Tracker error: {}", error_msg));
132+
}
133+
134+
if action != ACTION_CONNECT {
135+
return Err(anyhow!("Invalid action in connect response: {}", action));
136+
}
137+
138+
if response_transaction_id != transaction_id {
139+
return Err(anyhow!("Transaction ID mismatch in connect response"));
140+
}
141+
142+
self.connection_id = Some(cursor.read_u64::<BigEndian>()?);
143+
self.last_connect = Some(Instant::now());
144+
145+
debug!("UDP tracker connected with connection_id: {:?}", self.connection_id);
146+
Ok(())
147+
}
148+
149+
fn parse_udp_url(&self) -> Result<SocketAddr> {
150+
if !self.url.starts_with("udp://") {
151+
return Err(anyhow!("Invalid UDP tracker URL: {}", self.url));
152+
}
153+
154+
let url_without_scheme = &self.url[6..]; // Remove "udp://"
155+
156+
// Split at the first '/' to separate hostname:port from path
157+
let host_port = url_without_scheme.split('/').next()
158+
.ok_or_else(|| anyhow!("Invalid UDP tracker URL format: {}", self.url))?;
159+
160+
let addr = host_port.to_socket_addrs()?.next()
161+
.ok_or_else(|| anyhow!("Could not resolve UDP tracker address: {}", host_port))?;
162+
163+
Ok(addr)
164+
}
165+
166+
fn parse_announce_response(&self, data: &[u8], expected_transaction_id: u32) -> Result<UdpAnnounceResponse> {
167+
if data.len() < 20 {
168+
return Err(anyhow!("Announce response too short: {} bytes", data.len()));
169+
}
170+
171+
let mut cursor = Cursor::new(data);
172+
let action = cursor.read_u32::<BigEndian>()?;
173+
let transaction_id = cursor.read_u32::<BigEndian>()?;
174+
175+
if action == ACTION_ERROR {
176+
let error_msg = String::from_utf8_lossy(&data[8..]);
177+
return Err(anyhow!("Tracker error: {}", error_msg));
178+
}
179+
180+
if action != ACTION_ANNOUNCE {
181+
return Err(anyhow!("Invalid action in announce response: {}", action));
182+
}
183+
184+
if transaction_id != expected_transaction_id {
185+
return Err(anyhow!("Transaction ID mismatch in announce response"));
186+
}
187+
188+
let interval = cursor.read_u32::<BigEndian>()?;
189+
let leechers = cursor.read_u32::<BigEndian>()?;
190+
let seeders = cursor.read_u32::<BigEndian>()?;
191+
192+
let mut peers = Vec::new();
193+
let remaining_bytes = data.len() - 20;
194+
let peer_count = remaining_bytes / 6; // Each peer is 6 bytes (4 IP + 2 port)
195+
196+
for _ in 0..peer_count {
197+
let mut ip = [0u8; 4];
198+
cursor.read_exact(&mut ip)?;
199+
let port = cursor.read_u16::<BigEndian>()?;
200+
201+
peers.push(UdpPeer { ip, port });
202+
}
203+
204+
debug!("UDP announce response: {} seeders, {} leechers, {} peers", seeders, leechers, peers.len());
205+
206+
Ok(UdpAnnounceResponse {
207+
action,
208+
transaction_id,
209+
interval,
210+
leechers,
211+
seeders,
212+
peers,
213+
})
214+
}
215+
}
216+
217+
impl UdpPeer {
218+
pub fn to_socket_addr(&self) -> SocketAddr {
219+
SocketAddr::from((self.ip, self.port))
220+
}
221+
}
222+
223+
pub async fn request_udp_peers(
224+
tracker_url: &str,
225+
torrent_meta: &TorrentMeta,
226+
peer_id: &[u8; 20],
227+
port: u16,
228+
) -> Result<UdpAnnounceResponse> {
229+
let mut tracker = UdpTracker::new(tracker_url.to_string());
230+
231+
let uploaded = 0;
232+
let downloaded = 0;
233+
let left = torrent_meta.torrent_file.info.length.unwrap_or(0) as u64;
234+
let event = 2; // started event
235+
236+
tracker.announce(torrent_meta, peer_id, port, uploaded, downloaded, left, event).await
237+
}
238+
239+
#[cfg(test)]
240+
mod tests {
241+
use super::*;
242+
243+
#[test]
244+
fn test_parse_udp_url() {
245+
let tracker = UdpTracker::new("udp://tracker.example.com:8080/announce".to_string());
246+
let result = tracker.parse_udp_url();
247+
assert!(result.is_ok());
248+
}
249+
250+
#[test]
251+
fn test_invalid_udp_url() {
252+
let tracker = UdpTracker::new("http://tracker.example.com:8080/announce".to_string());
253+
let result = tracker.parse_udp_url();
254+
assert!(result.is_err());
255+
}
256+
}

0 commit comments

Comments
 (0)