Skip to content

Commit 6e74ce8

Browse files
committed
Implement writing statistics data to a writer
1 parent d09e61b commit 6e74ce8

File tree

2 files changed

+121
-40
lines changed

2 files changed

+121
-40
lines changed

src/stats/mod.rs

Lines changed: 116 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
#![allow(dead_code)]
55
#![allow(unused_variables)]
66

7+
use core::fmt::Debug;
78
use std::{
9+
io::Write,
810
net::IpAddr,
911
sync::{
10-
LazyLock,
12+
Arc, LazyLock, Mutex,
1113
atomic::{AtomicI32, AtomicPtr, AtomicU64, Ordering},
1214
},
1315
};
@@ -17,7 +19,7 @@ use chrono::{DateTime, Duration, FixedOffset, Utc};
1719
use papaya::{Guard, HashMap};
1820
use sarlacc::Intern;
1921
use seize::Collector;
20-
use tracing::{info, instrument};
22+
use tracing::{error, info, instrument};
2123

2224
/// The TTL for IP tracking entries, after which they are considered stale and removed.
2325
const IP_TRACKING_TTL: chrono::TimeDelta = Duration::days(1);
@@ -40,34 +42,44 @@ struct IpInfo {
4042
type Counts = HashMap<(Intern<Authority>, Intern<Authority>, Intern<Authority>), AtomicU64>;
4143

4244
/// The counts for all of the possible webring redirects
43-
#[derive(Debug)]
44-
struct AggregatedStats {
45+
struct AggregatedStats<W: Write + Send + 'static> {
4546
/// The collector for the atomic data that we're handling
46-
collector: Collector,
47+
collector: Arc<Collector>,
4748
/// The last date that a redirect was tracked (hopefully today)
4849
today: AtomicI32,
4950
/// (From, To, Started From) → Count
5051
/// Invariant: This MUST ALWAYS be a valid pointer
5152
counter: AtomicPtr<Counts>,
53+
/// The writer for the statistics output file
54+
output: Arc<Mutex<W>>,
5255
}
5356

54-
impl AggregatedStats {
57+
impl<W: Write + Send + 'static> Debug for AggregatedStats<W> {
58+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59+
f.debug_struct("AggregatedStats")
60+
.field("today", &self.today.load(Ordering::Relaxed))
61+
.finish_non_exhaustive()
62+
}
63+
}
64+
65+
/// Convert the current time into the days since the epoch
66+
fn mk_num(time: DateTime<Utc>) -> i32 {
67+
time.date_naive().to_epoch_days()
68+
}
69+
70+
impl<W: Write + Send + 'static> AggregatedStats<W> {
5571
/// Create a new `AggregatedStats`
56-
fn new(now: DateTime<Utc>) -> Self {
72+
fn new(now: DateTime<Utc>, writer: W) -> Self {
5773
let counter: Counts = HashMap::default();
5874

5975
AggregatedStats {
60-
today: AtomicI32::new(Self::mk_num(now)),
76+
today: AtomicI32::new(mk_num(now)),
6177
counter: AtomicPtr::new(Box::into_raw(Box::new(counter))),
62-
collector: Collector::new(),
78+
collector: Arc::new(Collector::new()),
79+
output: Arc::new(Mutex::new(writer)),
6380
}
6481
}
6582

66-
/// Convert the current time into the days since the epoch
67-
fn mk_num(time: DateTime<Utc>) -> i32 {
68-
time.date_naive().to_epoch_days()
69-
}
70-
7183
/// Retrieve the current counter from a guard
7284
fn counter<'a>(&'a self, guard: &'a impl Guard) -> &'a Counts {
7385
// SAFETY: The counter is guaranteed to be a valid pointer and we are using Acquire ordering to synchronize-with its initialization
@@ -76,40 +88,73 @@ impl AggregatedStats {
7688

7789
/// Retrieve the current counter from a guard while updating it if the current time is a new calendar date
7890
fn maybe_update_counter<'a>(&'a self, now: DateTime<Utc>, guard: &'a impl Guard) -> &'a Counts {
79-
let now = AggregatedStats::mk_num(now);
91+
let now = mk_num(now);
8092

8193
let prev_day = self.today.swap(now, Ordering::Relaxed);
8294

8395
if prev_day != now {
8496
let new_counter: *mut Counts = Box::into_raw(Box::new(HashMap::new()));
8597

86-
// Release to synchronize-with `counter`. We don't need Acquire because we won't read the previous pointer.
87-
let prev = guard.swap(&self.counter, new_counter, Ordering::Release);
88-
// SAFETY: The pointer can no longer be accessed now that it has been swapped into `prev`, and `Box::from_raw` is the correct way to drop the pointer.
89-
unsafe {
90-
self.collector
91-
.retire(prev, |ptr, _| drop(Box::from_raw(ptr)));
92-
}
98+
// We need this guard to go into our task so it needs to be owned
99+
let guard_owned = self.collector.enter();
100+
101+
// Release to synchronize-with `counter` and Acquire to ensure that we can see the initialization of the previous one so that we can properly access and write it.
102+
let prev_ptr = guard_owned.swap(&self.counter, new_counter, Ordering::AcqRel);
103+
104+
let output = Arc::clone(&self.output);
105+
106+
// Allow it to be moved to our task
107+
let prev_ptr = prev_ptr as usize;
108+
109+
let this_collector = Arc::clone(&self.collector);
110+
111+
tokio::task::spawn_blocking(move || {
112+
let mut output = output.lock().unwrap();
113+
114+
let prev_ptr = prev_ptr as *mut Counts;
115+
// SAFETY: Since this pointer hasn't been retired yet, we have access to it until we do retire it.
116+
let prev = unsafe { &*prev_ptr }.pin();
117+
118+
for ((from, to, started_from), count) in &prev {
119+
let count = count.load(Ordering::Relaxed);
120+
if let Err(e) = output.write_fmt(format_args!(
121+
"{prev_day},{from},{to},{started_from},{count}\n"
122+
)) {
123+
error!("Error writing statistics: {e}");
124+
}
125+
}
126+
127+
// SAFETY: The pointer can no longer be accessed from a new location since we previously overwrote the atomic pointer, and `Box::from_raw` is the correct way to drop the pointer. This task is also finished with its access to it.
128+
unsafe { this_collector.retire(prev_ptr, |ptr, _| drop(Box::from_raw(ptr))) }
129+
});
93130
}
94131

95132
self.counter(guard)
96133
}
97134
}
98135

99136
/// Statistics tracking for the webring
100-
#[derive(Debug)]
101-
pub struct Stats {
137+
pub struct Stats<W: Write + Send + 'static> {
102138
/// Aggregated statistics
103-
aggregated: AggregatedStats,
139+
aggregated: AggregatedStats<W>,
104140
/// Map of IP information keyed by IP address
105141
ip_tracking: HashMap<IpAddr, IpInfo>,
106142
}
107143

108-
impl Stats {
144+
impl<W: Write + Send + 'static> Debug for Stats<W> {
145+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146+
f.debug_struct("Stats")
147+
.field("aggregated", &self.aggregated)
148+
.field("ip_tracking", &self.ip_tracking)
149+
.finish()
150+
}
151+
}
152+
153+
impl<W: Write + Send> Stats<W> {
109154
/// Creates a new instance of `Stats`.
110-
pub fn new(now: DateTime<Utc>) -> Stats {
155+
pub fn new(now: DateTime<Utc>, writer: W) -> Stats<W> {
111156
Stats {
112-
aggregated: AggregatedStats::new(now),
157+
aggregated: AggregatedStats::new(now, writer),
113158
ip_tracking: HashMap::new(),
114159
}
115160
}
@@ -189,11 +234,13 @@ impl Stats {
189234

190235
#[cfg(test)]
191236
mod tests {
192-
use std::net::IpAddr;
237+
use std::{collections::HashSet, net::IpAddr, str::from_utf8};
193238

194239
use axum::http::uri::Authority;
195240
use chrono::{DateTime, Duration, NaiveDate, Utc};
241+
use indoc::indoc;
196242
use sarlacc::Intern;
243+
use tokio::sync::mpsc::{self, UnboundedReceiver};
197244

198245
use crate::stats::IP_TRACKING_TTL;
199246

@@ -215,9 +262,37 @@ mod tests {
215262
t(timestamp).with_timezone(&TIMEZONE).date_naive()
216263
}
217264

265+
struct TestWriter(mpsc::UnboundedSender<Vec<u8>>);
266+
267+
impl std::io::Write for TestWriter {
268+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
269+
self.0.send(buf.to_owned()).unwrap();
270+
Ok(buf.len())
271+
}
272+
273+
fn flush(&mut self) -> std::io::Result<()> {
274+
Ok(())
275+
}
276+
}
277+
278+
async fn assert_same_data(rx: &mut UnboundedReceiver<Vec<u8>>, expected: &str) {
279+
let mut data = Vec::new();
280+
while data.len() != expected.len() {
281+
data.extend(rx.recv().await.unwrap());
282+
}
283+
assert_eq!(
284+
from_utf8(&data)
285+
.unwrap()
286+
.split('\n')
287+
.collect::<HashSet<_>>(),
288+
expected.split('\n').collect::<HashSet<_>>()
289+
);
290+
}
291+
218292
#[tokio::test]
219293
async fn test_stat_tracking() {
220-
let stats = Stats::new(t(0));
294+
let (tx, mut rx) = mpsc::unbounded_channel();
295+
let stats = Stats::new(t(0), TestWriter(tx));
221296

222297
stats.redirected_impl(a("0.0.0.0"), i("a.com"), i("b.com"), t(0));
223298
stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1));
@@ -266,7 +341,18 @@ mod tests {
266341

267342
let day = Duration::days(1);
268343

344+
assert!(rx.is_empty());
269345
stats.redirected_impl(a("0.0.0.0"), i("b.com"), i("c.com"), t(1) + day);
346+
assert_same_data(
347+
&mut rx,
348+
indoc! {"
349+
0,a.com,b.com,a.com,2
350+
0,b.com,c.com,a.com,1
351+
0,b.com,homepage.com,a.com,1
352+
0,homepage.com,c.com,a.com,1
353+
"},
354+
)
355+
.await;
270356
stats.redirected_impl(a("0.0.0.0"), i("c.com"), i("a.com"), t(2) + day);
271357

272358
assert_eq!(stats.aggregated.counter(&guard).len(), 2);

src/webring.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Ring behavior and data structures
22
33
use std::{
4+
io::{Empty, empty},
45
net::IpAddr,
56
path::{Path, PathBuf},
67
sync::{
@@ -184,7 +185,7 @@ pub struct Webring {
184185
/// Discord notifier for notifying members of issues with their sites
185186
notifier: Option<Arc<DiscordNotifier>>,
186187
/// Statistics collected about the ring
187-
stats: Arc<Stats>,
188+
stats: Arc<Stats<Empty>>,
188189
/// Current configuration of the webring, used for detecting changes when reloading
189190
config: Arc<AsyncRwLock<Option<Config>>>,
190191
}
@@ -199,7 +200,7 @@ impl Webring {
199200
members: RwLock::new(member_map_from_config_table(&config.members)),
200201
static_dir_path: config.webring.static_dir.clone(),
201202
homepage: AsyncRwLock::new(None),
202-
stats: Arc::new(Stats::new(Utc::now())),
203+
stats: Arc::new(Stats::new(Utc::now(), empty())),
203204
file_watcher: OnceLock::default(),
204205
base_address: config.webring.base_url(),
205206
notifier: config
@@ -644,7 +645,7 @@ mod tests {
644645
use std::{
645646
collections::HashSet,
646647
fs::{File, OpenOptions},
647-
io::Write as _,
648+
io::{Write as _, empty},
648649
path::PathBuf,
649650
sync::{
650651
Arc, OnceLock, RwLock,
@@ -687,7 +688,7 @@ mod tests {
687688
base_address: Intern::default(),
688689
base_authority: Intern::new("ring.purduehackers.com".parse().unwrap()),
689690
notifier: None,
690-
stats: Arc::new(Stats::new(Utc::now())),
691+
stats: Arc::new(Stats::new(Utc::now(), empty())),
691692
config: Arc::new(AsyncRwLock::new(None)),
692693
}
693694
}
@@ -818,8 +819,6 @@ mod tests {
818819
assert_eq!(*inner, expected);
819820
}
820821

821-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
822-
823822
webring.assert_next(
824823
"https://hrovnyak.gitlab.io/bruh/bruh/bruh?bruh=bruh",
825824
Ok("kasad.com"),
@@ -1009,8 +1008,6 @@ mod tests {
10091008
.unwrap();
10101009
let webring = Webring::new(&config);
10111010

1012-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
1013-
10141011
let uri = webring
10151012
.random_page(None, "0.0.0.0".parse().unwrap())
10161013
.unwrap();
@@ -1034,8 +1031,6 @@ mod tests {
10341031
"# }).unwrap();
10351032
let webring = Webring::new(&config);
10361033

1037-
let today = Utc::now().with_timezone(&TIMEZONE).date_naive();
1038-
10391034
let uri = webring
10401035
.random_page(
10411036
Some(&"clementine.viridian.page".parse().unwrap()),

0 commit comments

Comments
 (0)