diff --git a/aw-sync/README.md b/aw-sync/README.md index 76af7e44..1471697c 100644 --- a/aw-sync/README.md +++ b/aw-sync/README.md @@ -15,10 +15,26 @@ Was originally prototyped as a PR to aw-server: https://github.com/ActivityWatch This will start a daemon which pulls and pushes events with the sync directory (`~/ActivityWatchSync` by default) every 5 minutes: ```sh +# Basic sync daemon (syncs all buckets every 5 minutes) aw-sync + +# Same as above +aw-sync daemon + +# Sync daemon with specific buckets only +aw-sync daemon --buckets "aw-watcher-window,aw-watcher-afk" --start-date "2024-01-01" + +# Sync all buckets once and exit +aw-sync sync --start-date "2024-01-01" ``` -For more options, see `aw-sync --help`. +For more options, see `aw-sync --help`. Some notable options: +- `--buckets`: Specify which buckets to sync (comma-separated). By default, all buckets are synced. + - Use `--buckets "bucket1,bucket2"` to sync specific buckets + - Not specifying this option syncs all buckets by default +- `--start-date`: Only sync events after this date (YYYY-MM-DD) +- `--sync-db`: Specify a specific database file in the sync directory +- `--mode`: Choose sync mode: "push", "pull", or "both" (default: "both") ### Setting up sync diff --git a/aw-sync/src/main.rs b/aw-sync/src/main.rs index 4dd8507f..3f6da95d 100644 --- a/aw-sync/src/main.rs +++ b/aw-sync/src/main.rs @@ -2,10 +2,10 @@ // - [x] Setup local sync bucket // - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore) // - [x] Import buckets and sync events from remotes -// - [ ] Add CLI arguments +// - [x] Add CLI arguments // - [x] For which local server to use // - [x] For which sync dir to use -// - [ ] Date to start syncing from +// - [x] Date to start syncing from #[macro_use] extern crate log; @@ -60,35 +60,45 @@ struct Opts { enum Commands { /// Daemon subcommand /// Starts aw-sync as a daemon, which will sync every 5 minutes. - Daemon {}, + Daemon { + /// Date to start syncing from. + /// If not specified, start from beginning. + /// Format: YYYY-MM-DD + #[clap(long, value_parser=parse_start_date)] + start_date: Option>, + + /// Specify buckets to sync using a comma-separated list. + /// By default, all buckets are synced. + #[clap(long)] + buckets: Option, + + /// Full path to sync db file + /// Useful for syncing buckets from a specific db file in the sync directory. + /// Must be a valid absolute path to a file in the sync directory. + #[clap(long)] + sync_db: Option, + }, - /// Sync subcommand (basic) + /// Sync subcommand /// - /// Pulls remote buckets then pushes local buckets. + /// Syncs data between local aw-server and sync directory. + /// First pulls remote buckets from the sync directory to the local aw-server. + /// Then pushes local buckets from the aw-server to the local sync directory. Sync { /// Host(s) to pull from, comma separated. Will pull from all hosts if not specified. #[clap(long, value_parser=parse_list)] host: Option>, - }, - /// Sync subcommand (advanced) - /// - /// Pulls remote buckets then pushes local buckets. - /// First pulls remote buckets in the sync directory to the local aw-server. - /// Then pushes local buckets from the aw-server to the local sync directory. - #[clap(arg_required_else_help = true)] - SyncAdvanced { /// Date to start syncing from. /// If not specified, start from beginning. - /// NOTE: might be unstable, as count cannot be used to verify integrity of sync. /// Format: YYYY-MM-DD #[clap(long, value_parser=parse_start_date)] start_date: Option>, /// Specify buckets to sync using a comma-separated list. - /// If not specified, all buckets will be synced. - #[clap(long, value_parser=parse_list)] - buckets: Option>, + /// By default, all buckets are synced. + #[clap(long)] + buckets: Option, /// Mode to sync in. Can be "push", "pull", or "both". /// Defaults to "both". @@ -111,6 +121,13 @@ fn parse_start_date(arg: &str) -> Result, chrono::ParseError> { } fn parse_list(arg: &str) -> Result, clap::Error> { + // If the argument is empty or just whitespace, return an empty Vec + // This handles the case when --buckets is used without a value + if arg.trim().is_empty() { + return Ok(vec![]); + } + + // Otherwise, split by comma as usual Ok(arg.split(',').map(|s| s.to_string()).collect()) } @@ -139,60 +156,94 @@ fn main() -> Result<(), Box> { let client = AwClient::new(&opts.host, port, "aw-sync")?; - // if opts.command is None, then we're using the default subcommand (Sync) - match opts.command.unwrap_or(Commands::Daemon {}) { + // if opts.command is None, then we're using the default subcommand (Daemon) + match opts.command.unwrap_or(Commands::Daemon { + start_date: None, + buckets: None, + sync_db: None, + }) { // Start daemon - Commands::Daemon {} => { + Commands::Daemon { + start_date, + buckets, + sync_db, + } => { info!("Starting daemon..."); - daemon(&client)?; - } - // Perform basic sync - Commands::Sync { host } => { - // Pull - match host { - Some(hosts) => { - for host in hosts.iter() { - info!("Pulling from host: {}", host); - sync_wrapper::pull(host, &client)?; - } - } - None => { - info!("Pulling from all hosts"); - sync_wrapper::pull_all(&client)?; - } - } - // Push - info!("Pushing local data"); - sync_wrapper::push(&client)? + // Use an empty vector to sync all buckets for these cases: + // 1. When --buckets '*' is supplied + // 2. When no bucket argument is provided (default) + let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() { + Some(vec![]) + } else if let Some(buckets_str) = buckets { + Some(buckets_str.split(',').map(|s| s.to_string()).collect()) + } else { + None + }; + + daemon(&client, start_date, effective_buckets, sync_db)?; } - // Perform two-way sync - Commands::SyncAdvanced { + // Perform sync + Commands::Sync { + host, start_date, buckets, mode, sync_db, } => { - let sync_dir = dirs::get_sync_dir()?; - if let Some(db_path) = &sync_db { - info!("Using sync db: {}", &db_path.display()); + // Use an empty vector to sync all buckets for these cases: + // 1. When --buckets '*' is supplied + // 2. When no bucket argument is provided (default) + let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() { + Some(vec![]) + } else if let Some(buckets_str) = buckets { + Some(buckets_str.split(',').map(|s| s.to_string()).collect()) + } else { + None + }; - if !db_path.is_absolute() { - Err("Sync db path must be absolute")? - } - if !db_path.starts_with(&sync_dir) { - Err("Sync db path must be in sync directory")? + // If advanced options are provided, use advanced sync mode + if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() { + let sync_dir = dirs::get_sync_dir()?; + if let Some(db_path) = &sync_db { + info!("Using sync db: {}", &db_path.display()); + + if !db_path.is_absolute() { + Err("Sync db path must be absolute")? + } + if !db_path.starts_with(&sync_dir) { + Err("Sync db path must be in sync directory")? + } } - } - let sync_spec = sync::SyncSpec { - path: sync_dir, - path_db: sync_db, - buckets, - start: start_date, - }; + let sync_spec = sync::SyncSpec { + path: sync_dir, + path_db: sync_db, + buckets: effective_buckets, + start: start_date, + }; + + sync::sync_run(&client, &sync_spec, mode)? + } else { + // Simple host-based sync mode (backwards compatibility) + // Pull + match host { + Some(hosts) => { + for host in hosts.iter() { + info!("Pulling from host: {}", host); + sync_wrapper::pull(host, &client)?; + } + } + None => { + info!("Pulling from all hosts"); + sync_wrapper::pull_all(&client)?; + } + } - sync::sync_run(&client, &sync_spec, mode)? + // Push + info!("Pushing local data"); + sync_wrapper::push(&client)? + } } // List all buckets @@ -207,23 +258,45 @@ fn main() -> Result<(), Box> { Ok(()) } -fn daemon(client: &AwClient) -> Result<(), Box> { +fn daemon( + client: &AwClient, + start_date: Option>, + buckets: Option>, + sync_db: Option, +) -> Result<(), Box> { let (tx, rx) = channel(); ctrlc::set_handler(move || { let _ = tx.send(()); })?; + let sync_dir = dirs::get_sync_dir()?; + if let Some(db_path) = &sync_db { + info!("Using sync db: {}", &db_path.display()); + + if !db_path.is_absolute() { + Err("Sync db path must be absolute")? + } + if !db_path.starts_with(&sync_dir) { + Err("Sync db path must be in sync directory")? + } + } + + let sync_spec = sync::SyncSpec { + path: sync_dir, + buckets, + path_db: sync_db, + start: start_date, + }; + loop { - if let Err(e) = daemon_sync_cycle(client) { + if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) { error!("Error during sync cycle: {}", e); - // Re-throw the error return Err(e); } info!("Sync pass done, sleeping for 5 minutes"); - // Wait for either the sleep duration or a termination signal match rx.recv_timeout(Duration::from_secs(300)) { Ok(_) | Err(RecvTimeoutError::Disconnected) => { info!("Termination signal received, shutting down."); @@ -237,13 +310,3 @@ fn daemon(client: &AwClient) -> Result<(), Box> { Ok(()) } - -fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box> { - info!("Pulling from all hosts"); - sync_wrapper::pull_all(client)?; - - info!("Pushing local data"); - sync_wrapper::push(client)?; - - Ok(()) -} diff --git a/aw-sync/src/sync.rs b/aw-sync/src/sync.rs index 405118a9..ee7bba17 100644 --- a/aw-sync/src/sync.rs +++ b/aw-sync/src/sync.rs @@ -247,12 +247,18 @@ pub fn sync_datastores( .get_buckets() .unwrap() .iter_mut() - // If buckets vec isn't empty, filter out buckets not in the buckets vec + // Only filter buckets if specific bucket IDs are provided .filter(|tup| { let bucket = &tup.1; if let Some(buckets) = &sync_spec.buckets { - buckets.iter().any(|b_id| b_id == &bucket.id) + // If "*" is in the buckets list or no buckets specified, sync all buckets + if buckets.iter().any(|b_id| b_id == "*") || buckets.is_empty() { + true + } else { + buckets.iter().any(|b_id| b_id == &bucket.id) + } } else { + // By default, sync all buckets true } }) diff --git a/aw-sync/src/sync_wrapper.rs b/aw-sync/src/sync_wrapper.rs index 13007779..2f6d3328 100644 --- a/aw-sync/src/sync_wrapper.rs +++ b/aw-sync/src/sync_wrapper.rs @@ -48,10 +48,7 @@ pub fn pull(host: &str, client: &AwClient) -> Result<(), Box> { let sync_spec = SyncSpec { path: sync_dir.clone(), path_db: Some(db.path().clone()), - buckets: Some(vec![ - format!("aw-watcher-window_{}", host), - format!("aw-watcher-afk_{}", host), - ]), + buckets: None, // Sync all buckets by default start: None, }; sync_run(client, &sync_spec, SyncMode::Pull)?; @@ -67,10 +64,7 @@ pub fn push(client: &AwClient) -> Result<(), Box> { let sync_spec = SyncSpec { path: sync_dir, path_db: None, - buckets: Some(vec![ - format!("aw-watcher-window_{}", client.hostname), - format!("aw-watcher-afk_{}", client.hostname), - ]), + buckets: None, // Sync all buckets by default start: None, }; sync_run(client, &sync_spec, SyncMode::Push)?;