diff --git a/Cargo.lock b/Cargo.lock index c3c94db..e85284a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,28 +17,6 @@ dependencies = [ "libc", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.89" @@ -374,9 +352,7 @@ dependencies = [ name = "data-exporter" version = "0.1.0" dependencies = [ - "async-stream", "axum", - "bytes", "chrono", "dotenvy", "futures", diff --git a/Cargo.toml b/Cargo.toml index 6317fd5..ae30c24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,16 @@ [package] name = "data-exporter" version = "0.1.0" -edition = "2024" +edition = "2021" [dependencies] -async-stream = "0.3" axum = "0.8.7" -bytes = "1.9" chrono = { version = "0.4.42", features = ["serde"] } +dotenvy = "0.15" futures = "0.3" +rust-s3 = "0.37.1" serde = "1.0.228" serde_json = "1.0" sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "uuid", "chrono"] } -uuid = { version = "1.11.0", features = ["serde"] } tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] } -rust-s3 = "0.37.1" -dotenvy = "0.15" +uuid = { version = "1.11.0", features = ["serde"] } diff --git a/purge.ts b/purge.ts new file mode 100644 index 0000000..8e0d590 --- /dev/null +++ b/purge.ts @@ -0,0 +1,53 @@ +import { S3Client } from "bun"; + +const client = new S3Client({ + accessKeyId: process.env.S3_ACCESS_KEY_ID, + secretAccessKey: process.env.S3_SECRET_ACCESS_KEY, + bucket: process.env.S3_BUCKET, + region: process.env.S3_REGION, + endpoint: process.env.S3_ENDPOINT, +}); + +async function purgeBucket() { + console.log(`Starting purge for bucket: ${process.env.S3_BUCKET}...`); + let totalDeleted = 0; + let isTruncated = true; + let startAfter: string | undefined = undefined; + + try { + while (isTruncated) { + // List objects in the bucket + const response = await client.list({ + maxKeys: 1000, + startAfter, + }); + + const objects = response.contents || []; + + if (objects.length === 0) { + break; + } + + // Perform deletions in parallel for the current batch + await Promise.all( + objects.map((obj) => { + console.log(`Deleting: ${obj.key}`); + return client.delete(obj.key); + }) + ); + + totalDeleted += objects.length; + isTruncated = response.isTruncated; + if (isTruncated && objects.length > 0) { + startAfter = objects[objects.length - 1].key; + } + } + + console.log(`\nSuccessfully purged ${totalDeleted} objects.`); + } catch (error) { + console.error("Error purging bucket:", error); + process.exit(1); + } +} + +purgeBucket(); \ No newline at end of file diff --git a/src/handler.rs b/src/handler.rs index b25afef..9b365a5 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,14 +1,11 @@ use crate::models::{AppState, DataEntry, ExportRequest, Project}; -use crate::s3_helpers; -use bytes::Bytes; - use axum::{ extract::{Path, State}, - http::{StatusCode, header::LOCATION}, + http::{header::LOCATION, StatusCode}, response::{IntoResponse, Response}, }; use futures::stream::StreamExt; -use sqlx::Row; +use tokio::io::AsyncWriteExt; pub async fn export( State(state): State, @@ -42,82 +39,144 @@ pub async fn export( let s3_key = format!("exports/{}.json", token); - let file_exists = s3_helpers::check_file_exists(&state.s3_bucket, &s3_key) + let file_exists = state.s3_bucket.object_exists(&s3_key).await.map_err(|e| { + println!("Error checking S3 file existence: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if !file_exists { + stream_export_to_s3(&state, export_request.project_id, &s3_key).await?; + } + + let presigned_url = state + .s3_bucket + .presign_get(&s3_key, 300, None) .await .map_err(|e| { - println!("Error checking S3 file existence: {:?}", e); + println!("Error generating presigned URL: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; - if !file_exists { - let export_data = generate_export_data(&state, export_request.project_id).await?; + Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response()) +} - s3_helpers::upload_file(&state.s3_bucket, &s3_key, export_data) - .await - .map_err(|e| { - println!("Error uploading to S3: {:?}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; +fn indent_json(json: &[u8], spaces: usize) -> Vec { + let mut result = Vec::with_capacity(json.len() + (json.len() >> 3)); + + for &byte in json { + result.push(byte); + if byte == b'\n' { + result.extend(std::iter::repeat_n(b' ', spaces)); + } + } + + while result.last() == Some(&b' ') { + result.pop(); } - let presigned_url = s3_helpers::generate_presigned_url(&state.s3_bucket, &s3_key, 300) + result +} + +async fn stream_export_to_s3( + state: &AppState, + project_id: sqlx::types::Uuid, + s3_key: &str, +) -> Result<(), StatusCode> { + let (writer, mut reader) = tokio::io::duplex(128 * 1024); + + let pool = state.pool.clone(); + let s3_bucket = state.s3_bucket.clone(); + let s3_key_owned = s3_key.to_string(); + + let writer_handle = tokio::spawn(async move { + let project = sqlx::query_as::<_, Project>( + "SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1" + ) + .bind(project_id) + .fetch_optional(&pool) + .await + .map_err(std::io::Error::other)? + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "Project not found"))?; + + write_export_json(writer, project, &pool, project_id).await + }); + + let upload_handle = tokio::spawn(async move { + s3_bucket + .put_object_stream(&mut reader, &s3_key_owned) + .await + .map_err(std::io::Error::other) + }); + + writer_handle .await .map_err(|e| { - println!("Error generating presigned URL: {:?}", e); + println!("Writer task failed: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + println!("Error writing JSON: {:?}", e); StatusCode::INTERNAL_SERVER_ERROR })?; - Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response()) + upload_handle + .await + .map_err(|e| { + println!("Upload task failed: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .map_err(|e| { + println!("Error uploading to S3: {:?}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(()) } -async fn generate_export_data( - state: &AppState, +async fn write_export_json( + mut writer: W, + project: Project, + pool: &sqlx::PgPool, project_id: sqlx::types::Uuid, -) -> Result { - let project = sqlx::query_as::<_, Project>( - "SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1" - ) - .bind(project_id) - .fetch_optional(&state.pool) - .await - .map_err(|e| { - println!("Error while fetching project: {:?}", e); - StatusCode::INTERNAL_SERVER_ERROR - })? - .ok_or(StatusCode::NOT_FOUND)?; +) -> Result<(), std::io::Error> +where + W: tokio::io::AsyncWrite + Unpin, +{ + let mut buffer = Vec::with_capacity(131072); + + writer.write_all(b"{\n \"project\": ").await?; + + serde_json::to_writer_pretty(&mut buffer, &project).map_err(std::io::Error::other)?; - let mut data_entries = Vec::new(); - let mut row_stream = sqlx::query( + let indented = indent_json(&buffer, 2); + writer.write_all(&indented).await?; + + writer.write_all(b",\n \"data_entries\": [\n").await?; + + let mut entries_stream = sqlx::query_as::<_, DataEntry>( "SELECT data, created_at FROM data_entries WHERE project_id = $1 ORDER BY created_at DESC", ) .bind(project_id) - .fetch(&state.pool); - - while let Some(row) = row_stream.next().await { - match row { - Ok(row) => { - let data_entry = DataEntry { - data: row.try_get("data").ok(), - created_at: row.get("created_at"), - }; - data_entries.push(data_entry); - } - Err(e) => { - println!("Error while streaming data entry: {:?}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } + .fetch(pool); + + let mut first = true; + while let Some(entry_result) = entries_stream.next().await { + let entry = entry_result.map_err(std::io::Error::other)?; + + if !first { + writer.write_all(b",\n").await?; } - } + first = false; - let export_data = serde_json::json!({ - "project": project, - "data_entries": data_entries - }); + writer.write_all(b" ").await?; - let json_string = serde_json::to_string_pretty(&export_data).map_err(|e| { - println!("Error while serializing export data: {:?}", e); - StatusCode::INTERNAL_SERVER_ERROR - })?; + buffer.clear(); + serde_json::to_writer_pretty(&mut buffer, &entry).map_err(std::io::Error::other)?; + + let indented = indent_json(&buffer, 4); + writer.write_all(&indented).await?; + } - Ok(Bytes::from(json_string)) + writer.write_all(b"\n ]\n}\n").await?; + writer.shutdown().await } diff --git a/src/main.rs b/src/main.rs index d1c2f3a..41f17bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,22 @@ -use axum::{Extension, Router, http::StatusCode, routing::get}; -use s3::{Bucket, Region, creds::Credentials}; +use axum::{http::StatusCode, routing::get, Extension, Router}; +use s3::{creds::Credentials, Bucket, Region}; use sqlx::postgres::PgPoolOptions; mod handler; mod models; mod rate_limit; -mod s3_helpers; #[tokio::main] async fn main() { - #[cfg(debug_assertions)] - dotenvy::dotenv().ok(); + let _ = dotenvy::dotenv(); let database_url = std::env::var("DATABASE_URL") .expect("DATABASE_URL must be set in .env file or environment variables"); let pool = PgPoolOptions::new() - .max_connections(10) + .max_connections(5) + .acquire_timeout(std::time::Duration::from_secs(8)) + .idle_timeout(std::time::Duration::from_secs(60)) + .max_lifetime(std::time::Duration::from_secs(600)) .connect(&database_url) .await .expect("Failed to connect to database"); @@ -45,8 +46,9 @@ async fn main() { .expect("Invalid S3_REGION") }; - let s3_bucket = - *Bucket::new(&bucket_name, region, credentials).expect("Failed to create S3 bucket"); + let s3_bucket = *Bucket::new(&bucket_name, region, credentials) + .expect("Failed to create S3 bucket") + .with_path_style(); let state = models::AppState { pool, s3_bucket }; diff --git a/src/models.rs b/src/models.rs index 21e8f6c..856bd06 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,7 +1,7 @@ use chrono::NaiveDateTime; use s3::Bucket; use serde::Serialize; -use sqlx::{PgPool, types::Uuid}; +use sqlx::{types::Uuid, PgPool}; #[derive(Clone)] pub struct AppState { diff --git a/src/s3_helpers.rs b/src/s3_helpers.rs deleted file mode 100644 index b6cc4f5..0000000 --- a/src/s3_helpers.rs +++ /dev/null @@ -1,20 +0,0 @@ -use bytes::Bytes; -use s3::Bucket; -use s3::error::S3Error; - -pub async fn check_file_exists(bucket: &Bucket, key: &str) -> Result { - bucket.object_exists(key).await -} - -pub async fn upload_file(bucket: &Bucket, key: &str, data: Bytes) -> Result<(), S3Error> { - bucket.put_object(key, &data).await?; - Ok(()) -} - -pub async fn generate_presigned_url( - bucket: &Bucket, - key: &str, - expiry_seconds: u32, -) -> Result { - bucket.presign_get(key, expiry_seconds, None).await -}