Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
[package]
name = "data-exporter"
version = "0.1.0"
edition = "2024"
edition = "2021"

[dependencies]
async-stream = "0.3"
axum = "0.8.7"
bytes = "1.9"
chrono = { version = "0.4.42", features = ["serde"] }
dotenvy = "0.15"
futures = "0.3"
rust-s3 = "0.37.1"
serde = "1.0.228"
serde_json = "1.0"
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "uuid", "chrono"] }
uuid = { version = "1.11.0", features = ["serde"] }
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
rust-s3 = "0.37.1"
dotenvy = "0.15"
uuid = { version = "1.11.0", features = ["serde"] }
53 changes: 53 additions & 0 deletions purge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { S3Client } from "bun";

const client = new S3Client({
accessKeyId: process.env.S3_ACCESS_KEY_ID,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
bucket: process.env.S3_BUCKET,
region: process.env.S3_REGION,
endpoint: process.env.S3_ENDPOINT,
});

async function purgeBucket() {
console.log(`Starting purge for bucket: ${process.env.S3_BUCKET}...`);
let totalDeleted = 0;
let isTruncated = true;
let startAfter: string | undefined = undefined;

try {
while (isTruncated) {
// List objects in the bucket
const response = await client.list({
maxKeys: 1000,
startAfter,
});

const objects = response.contents || [];

if (objects.length === 0) {
break;
}

// Perform deletions in parallel for the current batch
await Promise.all(
objects.map((obj) => {
console.log(`Deleting: ${obj.key}`);
return client.delete(obj.key);
})
);

totalDeleted += objects.length;
isTruncated = response.isTruncated;
if (isTruncated && objects.length > 0) {
startAfter = objects[objects.length - 1].key;
}
}

console.log(`\nSuccessfully purged ${totalDeleted} objects.`);
} catch (error) {
console.error("Error purging bucket:", error);
process.exit(1);
}
}

purgeBucket();
177 changes: 118 additions & 59 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::models::{AppState, DataEntry, ExportRequest, Project};
use crate::s3_helpers;
use bytes::Bytes;

use axum::{
extract::{Path, State},
http::{StatusCode, header::LOCATION},
http::{header::LOCATION, StatusCode},
response::{IntoResponse, Response},
};
use futures::stream::StreamExt;
use sqlx::Row;
use tokio::io::AsyncWriteExt;

pub async fn export(
State(state): State<AppState>,
Expand Down Expand Up @@ -42,82 +39,144 @@ pub async fn export(

let s3_key = format!("exports/{}.json", token);

let file_exists = s3_helpers::check_file_exists(&state.s3_bucket, &s3_key)
let file_exists = state.s3_bucket.object_exists(&s3_key).await.map_err(|e| {
println!("Error checking S3 file existence: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

if !file_exists {
stream_export_to_s3(&state, export_request.project_id, &s3_key).await?;
}

let presigned_url = state
.s3_bucket
.presign_get(&s3_key, 300, None)
.await
.map_err(|e| {
println!("Error checking S3 file existence: {:?}", e);
println!("Error generating presigned URL: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

if !file_exists {
let export_data = generate_export_data(&state, export_request.project_id).await?;
Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response())
}

s3_helpers::upload_file(&state.s3_bucket, &s3_key, export_data)
.await
.map_err(|e| {
println!("Error uploading to S3: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
fn indent_json(json: &[u8], spaces: usize) -> Vec<u8> {
let mut result = Vec::with_capacity(json.len() + (json.len() >> 3));

for &byte in json {
result.push(byte);
if byte == b'\n' {
result.extend(std::iter::repeat_n(b' ', spaces));
}
}

while result.last() == Some(&b' ') {
result.pop();
}

let presigned_url = s3_helpers::generate_presigned_url(&state.s3_bucket, &s3_key, 300)
result
}

async fn stream_export_to_s3(
state: &AppState,
project_id: sqlx::types::Uuid,
s3_key: &str,
) -> Result<(), StatusCode> {
let (writer, mut reader) = tokio::io::duplex(128 * 1024);

let pool = state.pool.clone();
let s3_bucket = state.s3_bucket.clone();
let s3_key_owned = s3_key.to_string();

let writer_handle = tokio::spawn(async move {
let project = sqlx::query_as::<_, Project>(
"SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1"
)
.bind(project_id)
.fetch_optional(&pool)
.await
.map_err(std::io::Error::other)?
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "Project not found"))?;

write_export_json(writer, project, &pool, project_id).await
});

let upload_handle = tokio::spawn(async move {
s3_bucket
.put_object_stream(&mut reader, &s3_key_owned)
.await
.map_err(std::io::Error::other)
});

writer_handle
.await
.map_err(|e| {
println!("Error generating presigned URL: {:?}", e);
println!("Writer task failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
println!("Error writing JSON: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok((StatusCode::FOUND, [(LOCATION, presigned_url.as_str())]).into_response())
upload_handle
.await
.map_err(|e| {
println!("Upload task failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.map_err(|e| {
println!("Error uploading to S3: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(())
}

async fn generate_export_data(
state: &AppState,
async fn write_export_json<W>(
mut writer: W,
project: Project,
pool: &sqlx::PgPool,
project_id: sqlx::types::Uuid,
) -> Result<Bytes, StatusCode> {
let project = sqlx::query_as::<_, Project>(
"SELECT id, name, token, slug, private, template_id, created_at, owner_id FROM project WHERE id = $1"
)
.bind(project_id)
.fetch_optional(&state.pool)
.await
.map_err(|e| {
println!("Error while fetching project: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
) -> Result<(), std::io::Error>
where
W: tokio::io::AsyncWrite + Unpin,
{
let mut buffer = Vec::with_capacity(131072);

writer.write_all(b"{\n \"project\": ").await?;

serde_json::to_writer_pretty(&mut buffer, &project).map_err(std::io::Error::other)?;

let mut data_entries = Vec::new();
let mut row_stream = sqlx::query(
let indented = indent_json(&buffer, 2);
writer.write_all(&indented).await?;

writer.write_all(b",\n \"data_entries\": [\n").await?;

let mut entries_stream = sqlx::query_as::<_, DataEntry>(
"SELECT data, created_at FROM data_entries WHERE project_id = $1 ORDER BY created_at DESC",
)
.bind(project_id)
.fetch(&state.pool);

while let Some(row) = row_stream.next().await {
match row {
Ok(row) => {
let data_entry = DataEntry {
data: row.try_get("data").ok(),
created_at: row.get("created_at"),
};
data_entries.push(data_entry);
}
Err(e) => {
println!("Error while streaming data entry: {:?}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
.fetch(pool);

let mut first = true;
while let Some(entry_result) = entries_stream.next().await {
let entry = entry_result.map_err(std::io::Error::other)?;

if !first {
writer.write_all(b",\n").await?;
}
}
first = false;

let export_data = serde_json::json!({
"project": project,
"data_entries": data_entries
});
writer.write_all(b" ").await?;

let json_string = serde_json::to_string_pretty(&export_data).map_err(|e| {
println!("Error while serializing export data: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
buffer.clear();
serde_json::to_writer_pretty(&mut buffer, &entry).map_err(std::io::Error::other)?;

let indented = indent_json(&buffer, 4);
writer.write_all(&indented).await?;
}

Ok(Bytes::from(json_string))
writer.write_all(b"\n ]\n}\n").await?;
writer.shutdown().await
}
18 changes: 10 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use axum::{Extension, Router, http::StatusCode, routing::get};
use s3::{Bucket, Region, creds::Credentials};
use axum::{http::StatusCode, routing::get, Extension, Router};
use s3::{creds::Credentials, Bucket, Region};
use sqlx::postgres::PgPoolOptions;
mod handler;
mod models;
mod rate_limit;
mod s3_helpers;

#[tokio::main]
async fn main() {
#[cfg(debug_assertions)]
dotenvy::dotenv().ok();
let _ = dotenvy::dotenv();

let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in .env file or environment variables");

let pool = PgPoolOptions::new()
.max_connections(10)
.max_connections(5)
.acquire_timeout(std::time::Duration::from_secs(8))
.idle_timeout(std::time::Duration::from_secs(60))
.max_lifetime(std::time::Duration::from_secs(600))
.connect(&database_url)
.await
.expect("Failed to connect to database");
Expand Down Expand Up @@ -45,8 +46,9 @@ async fn main() {
.expect("Invalid S3_REGION")
};

let s3_bucket =
*Bucket::new(&bucket_name, region, credentials).expect("Failed to create S3 bucket");
let s3_bucket = *Bucket::new(&bucket_name, region, credentials)
.expect("Failed to create S3 bucket")
.with_path_style();

let state = models::AppState { pool, s3_bucket };

Expand Down
2 changes: 1 addition & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::NaiveDateTime;
use s3::Bucket;
use serde::Serialize;
use sqlx::{PgPool, types::Uuid};
use sqlx::{types::Uuid, PgPool};

#[derive(Clone)]
pub struct AppState {
Expand Down
Loading