Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["api", "cli", "client", "common", "server"]
members = ["api", "cli", "client", "common", "server", "util"]
11 changes: 9 additions & 2 deletions api/src/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clusterizer_common::records::{
Assignment, AssignmentFilter, Platform, PlatformFilter, Project, ProjectFilter, ProjectVersion,
ProjectVersionFilter, Result, ResultFilter, Task, TaskFilter, User, UserFilter,
Assignment, AssignmentFilter, File, FileFilter, Platform, PlatformFilter, Project,
ProjectFilter, ProjectVersion, ProjectVersionFilter, Result, ResultFilter, Task, TaskFilter,
User, UserFilter,
};

pub trait Get {
Expand Down Expand Up @@ -33,6 +34,12 @@ impl Get for ProjectVersion {
const PATH: &str = "project_versions";
}

impl Get for File {
type Filter = FileFilter;

const PATH: &str = "files";
}

impl Get for Task {
type Filter = TaskFilter;

Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ clap = { version = "4.6.0", features = ["derive", "string"] }
clusterizer-api = { version = "0.1.0", path = "../api" }
clusterizer-client = { version = "0.1.0", path = "../client" }
clusterizer-common = { version = "0.1.0", path = "../common" }
clusterizer-util = { version = "0.1.0", path = "../util" }
dirs = "6.0.0"
reqwest = { version = "0.13.2" }
tempfile = "3.27.0"
Expand Down
8 changes: 4 additions & 4 deletions cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub struct RunArgs {
}

impl RunArgs {
pub fn project_versions_dir(&self) -> PathBuf {
self.cache_dir.join("project_versions")
pub fn binaries_dir(&self) -> PathBuf {
self.cache_dir.join("bin")
}

pub fn platform_testers_dir(&self) -> PathBuf {
self.cache_dir.join("platform_testers")
pub fn temp_dir(&self) -> PathBuf {
self.cache_dir.join("tmp")
}
}

Expand Down
126 changes: 64 additions & 62 deletions cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
fs,
io::{Cursor, ErrorKind},
iter::{self, Empty},
path::Path,
path::PathBuf,
process::{Output, Stdio},
sync::Arc,
time::Duration,
Expand All @@ -16,12 +16,13 @@ use clusterizer_client::result::ClientResult;
use clusterizer_common::{
errors::SubmitResultError,
records::{
Platform, PlatformFilter, Project, ProjectFilter, ProjectVersion, ProjectVersionFilter,
Task,
File, FileFilter, Platform, PlatformFilter, Project, ProjectFilter, ProjectVersion,
ProjectVersionFilter, Task,
},
requests::{FetchTasksRequest, SubmitResultRequest},
types::Id,
};
use clusterizer_util::Hex;
use tokio::{io::AsyncWriteExt, process::Command, task::JoinSet, time};
use tracing::{debug, info, warn};
use zip::ZipArchive;
Expand All @@ -38,6 +39,7 @@ struct TaskInfo {
task: Task,
project: Project,
project_version: ProjectVersion,
file: File,
}

enum Return {
Expand Down Expand Up @@ -92,43 +94,55 @@ impl ClusterizerClient {

async fn fetch_tasks(self: Arc<Self>) -> ClientResult<Return> {
let tasks = loop {
let mut projects: HashMap<_, _> = self
let project_versions_by_project_id: HashMap<_, _> = self
.client
.get_all::<Project>(&ProjectFilter::default())
.get_all::<ProjectVersion>(&ProjectVersionFilter::default().disabled(false))
.await?
.into_iter()
.filter(|project_version| self.platform_ids.contains(&project_version.platform_id))
.map(|project_version| (project_version.project_id, project_version))
.collect();

let projects_by_project_id: HashMap<_, _> = self
.client
.get_all::<Project>(&ProjectFilter::default().disabled(false))
.await?
.into_iter()
.filter(|project| project_versions_by_project_id.contains_key(&project.id))
.map(|project| (project.id, project))
.collect();

let projects: HashMap<_, _> = self
let files_by_file_id: HashMap<_, _> = self
.client
.get_all::<ProjectVersion>(&ProjectVersionFilter::default().disabled(false))
.get_all::<File>(&FileFilter::default())
.await?
.into_iter()
.filter(|project_version| self.platform_ids.contains(&project_version.platform_id))
.filter_map(|project_version| {
projects
.remove(&project_version.project_id)
.map(|project| (project.id, (project, project_version)))
})
.map(|file| (file.id, file))
.collect();

let get_task_info = |task: &Task| {
let project = projects_by_project_id.get(&task.project_id)?;
let project_version = project_versions_by_project_id.get(&task.project_id)?;
let file = files_by_file_id.get(&project_version.file_id)?;

Some(TaskInfo {
task: task.clone(),
file: file.clone(),
project: project.clone(),
project_version: project_version.clone(),
})
};

let tasks: Vec<_> = self
.client
.fetch_tasks(&FetchTasksRequest {
project_ids: projects.keys().copied().collect(),
project_ids: projects_by_project_id.keys().copied().collect(),
limit: self.args.threads,
})
.await?
.into_iter()
.filter_map(|task| {
let info = projects
.get(&task.project_id)
.map(|(project, project_version)| TaskInfo {
task,
project: project.clone(),
project_version: project_version.clone(),
});
let info = get_task_info(&task);

if info.is_none() {
warn!("Unwanted task received from server.");
Expand All @@ -146,21 +160,8 @@ impl ClusterizerClient {
time::sleep(Duration::from_millis(15000)).await;
};

for TaskInfo {
project_version, ..
} in &tasks
{
let project_version_dir = self
.args
.project_versions_dir()
.join(project_version.id.to_string());

download_archive(
&project_version.archive_url,
&project_version_dir,
&self.args.cache_dir,
)
.await?;
for TaskInfo { file, .. } in &tasks {
download_archive(file, &self.args).await?;
}

Ok(Return::FetchTasks(tasks))
Expand All @@ -170,26 +171,25 @@ impl ClusterizerClient {
self: Arc<Self>,
TaskInfo {
task,
project,
project_version,
project,
file,
}: TaskInfo,
) -> ClientResult<Return> {
let slot_dir = tempfile::tempdir()?;

info!("Task id: {}, stdin: {}", task.id, task.stdin);
info!("Project id: {}, name: {}", project.id, project.name);
debug!(
"Project version id: {}, archive url: {}",
project_version.id, project_version.archive_url
info!(
"Project id: {}, Project name: {}",
task.project_id, project.name
);
debug!("Platform id: {}", project_version.platform_id);
debug!("Slot dir: {}", slot_dir.path().display());

let project_version_dir = self
let program = self
.args
.project_versions_dir()
.join(project_version.id.to_string());

let program = project_version_dir
.binaries_dir()
.join(format!("{}", Hex(&file.hash)))
.join(format!("main{}", env::consts::EXE_SUFFIX))
.canonicalize()?;

Expand Down Expand Up @@ -235,8 +235,8 @@ impl ClusterizerClient {
}

pub async fn run(client: ApiClient, args: RunArgs) -> ClientResult<()> {
fs::create_dir_all(args.project_versions_dir())?;
fs::create_dir_all(args.platform_testers_dir())?;
fs::create_dir_all(args.binaries_dir())?;
fs::create_dir_all(args.temp_dir())?;

let mut platform_ids = Vec::new();
let mut platform_names = Vec::new();
Expand All @@ -245,19 +245,14 @@ pub async fn run(client: ApiClient, args: RunArgs) -> ClientResult<()> {
.get_all::<Platform>(&PlatformFilter::default())
.await?
{
let file = client.get_one(platform.file_id).await?;

debug!(
"Platform id: {}, tester archive url: {}",
platform.id, platform.tester_archive_url
platform.id, file.url
);

let platform_tester_dir = args.platform_testers_dir().join(platform.id.to_string());

download_archive(
&platform.tester_archive_url,
&platform_tester_dir,
&args.cache_dir,
)
.await?;
let platform_tester_dir = download_archive(&file, &args).await?;

let slot_dir = tempfile::tempdir()?;

Expand Down Expand Up @@ -299,18 +294,25 @@ pub async fn run(client: ApiClient, args: RunArgs) -> ClientResult<()> {
.await
}

async fn download_archive(url: &str, dir: &Path, cache_dir: &Path) -> ClientResult<()> {
async fn download_archive(file: &File, args: &RunArgs) -> ClientResult<PathBuf> {
let dir = args.binaries_dir().join(format!("{}", Hex(&file.hash)));

if dir.exists() {
debug!("Archive {} was cached.", dir.display());
} else {
debug!("Archive {} is not cached.", dir.display());

let bytes = reqwest::get(url).await?.error_for_status()?.bytes().await?;
let extract_dir = tempfile::tempdir_in(cache_dir)?;
let bytes = reqwest::get(&file.url)
.await?
.error_for_status()?
.bytes()
.await?;

let extract_dir = tempfile::tempdir_in(args.temp_dir())?;

ZipArchive::new(Cursor::new(bytes))?.extract(&extract_dir)?;
fs::rename(&extract_dir, dir)?;
fs::rename(&extract_dir, &dir)?;
}

Ok(())
Ok(dir)
}
16 changes: 16 additions & 0 deletions common/src/records/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::types::Id;

#[derive(Clone, Hash, Debug, Serialize, Deserialize)]
pub struct File {
pub id: Id<File>,
pub created_at: DateTime<Utc>,
pub url: String,
pub hash: Vec<u8>,
}

#[non_exhaustive]
#[derive(Clone, Hash, Debug, Default, Serialize, Deserialize)]
pub struct FileFilter {}
2 changes: 2 additions & 0 deletions common/src/records/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod assignment;
pub mod file;
pub mod platform;
pub mod project;
pub mod project_version;
Expand All @@ -7,6 +8,7 @@ pub mod task;
pub mod user;

pub use assignment::{Assignment, AssignmentFilter};
pub use file::{File, FileFilter};
pub use platform::{Platform, PlatformFilter};
pub use project::{Project, ProjectFilter};
pub use project_version::{ProjectVersion, ProjectVersionFilter};
Expand Down
15 changes: 12 additions & 3 deletions common/src/records/platform.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::types::Id;
use crate::{records::File, types::Id};

#[derive(Clone, Hash, Debug, Serialize, Deserialize)]
pub struct Platform {
pub id: Id<Platform>,
pub created_at: DateTime<Utc>,
pub name: String,
pub tester_archive_url: String,
pub file_id: Id<File>,
}

#[non_exhaustive]
#[derive(Clone, Hash, Debug, Default, Serialize, Deserialize)]
pub struct PlatformFilter {}
pub struct PlatformFilter {
pub file_id: Option<Id<File>>,
}

impl PlatformFilter {
pub fn file_id(mut self, file_id: Id<File>) -> Self {
self.file_id = Some(file_id);
self
}
}
9 changes: 7 additions & 2 deletions common/src/records/project_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};

use crate::types::Id;

use super::{Platform, Project};
use super::{File, Platform, Project};

#[derive(Clone, Hash, Debug, Serialize, Deserialize)]
pub struct ProjectVersion {
Expand All @@ -12,7 +12,7 @@ pub struct ProjectVersion {
pub disabled_at: Option<DateTime<Utc>>,
pub project_id: Id<Project>,
pub platform_id: Id<Platform>,
pub archive_url: String,
pub file_id: Id<File>,
}

#[non_exhaustive]
Expand All @@ -21,6 +21,7 @@ pub struct ProjectVersionFilter {
pub disabled: Option<bool>,
pub project_id: Option<Id<Project>>,
pub platform_id: Option<Id<Platform>>,
pub file_id: Option<Id<File>>,
}

impl ProjectVersionFilter {
Expand All @@ -38,4 +39,8 @@ impl ProjectVersionFilter {
self.platform_id = Some(platform_id);
self
}
pub fn file_id(mut self, file_id: Id<File>) -> Self {
self.file_id = Some(file_id);
self
}
}
Loading
Loading