From 96ef8dc55fa31ba822b919ab623a7ffcd1b92357 Mon Sep 17 00:00:00 2001 From: hsonne Date: Fri, 15 Sep 2023 10:21:16 +0200 Subject: [PATCH 1/2] Improve download_nextcloud_files() - reindent - simplify string creation - use kwb.utils::createDirectory() - return local paths to downloaded files with Nextcloud paths in attribute "paths" --- R/influxdb_ultimate.R | 68 ++++++++++++++++++++++++------------------ vignettes/ultimate.Rmd | 36 +++++++++++++++------- 2 files changed, 64 insertions(+), 40 deletions(-) diff --git a/R/influxdb_ultimate.R b/R/influxdb_ultimate.R index 6cf9908..779ddcd 100644 --- a/R/influxdb_ultimate.R +++ b/R/influxdb_ultimate.R @@ -331,12 +331,12 @@ write_to_influxdb <- function(tsv_paths, ) if(changed_only) { - fieldnames <- fieldnames %>% - dplyr::filter(diff != 0) %>% - dplyr::pull(.data$ParameterCode) - - tmp_long <- tmp_long %>% - dplyr::filter(.data$ParameterCode %in% fieldnames) + fieldnames <- fieldnames %>% + dplyr::filter(diff != 0) %>% + dplyr::pull(.data$ParameterCode) + + tmp_long <- tmp_long %>% + dplyr::filter(.data$ParameterCode %in% fieldnames) } else { fieldnames <- fieldnames %>% dplyr::pull(.data$ParameterCode) } @@ -455,32 +455,42 @@ write_to_influxdb <- function(tsv_paths, #' file_pattern = "Project\\.xls$" #' ) #' } -download_nextcloud_files <- function(dir_cloud, - dir_local, - file_pattern = "Project\\.xls$") +download_nextcloud_files <- function( + dir_cloud, + dir_local, + file_pattern = "Project\\.xls$" +) { - if(!check_env_nextcloud()) { - env_vars <- paste0(sprintf("NEXTCLOUD_%s", c("URL", "USER", "PASSWORD")), - collapse = ", ") - message(sprintf(paste0("Not all NEXTCLOUD environment variables are defined. ", - "Please define all of them '%s' with usethis::edit_r_environ()"), - env_vars)) + if (!check_env_nextcloud()) { + + env_vars <- paste0( + "NEXTCLOUD_", + c("URL", "USER", "PASSWORD"), + collapse = ", " + ) + + message(sprintf(env_vars, fmt = paste( + "Not all NEXTCLOUD environment variables are defined.", + "Please define all of them (%s) with usethis::edit_r_environ()" + ))) + } else { - - if (!dir.exists(dir_local)) { - fs::dir_create(dir_local, recurse = TRUE) + + cloud_files <- dir_cloud %>% + kwb.nextcloud::list_files(full_info = TRUE) %>% + dplyr::filter(stringr::str_detect(.data$file, pattern = file_pattern)) + + local_files <- kwb.nextcloud::download_files( + href = cloud_files$href, + target_dir = kwb.utils::createDirectory(dir_local) + ) + + # Return paths to local files with Nextcloud paths in attribute "paths" + structure( + local_files, + paths = file.path(dir_cloud, cloud_files$file) + ) } - - cloud_files <- kwb.nextcloud::list_files(dir_cloud, - full_info = TRUE) %>% - dplyr::filter(stringr::str_detect(.data$file, - pattern = file_pattern)) - - - kwb.nextcloud::download_files(href = cloud_files$href, - target_dir = dir_local) - cloud_files$file -} } #' Helper Function: check if all environment variables for Nextcloud are defined diff --git a/vignettes/ultimate.Rmd b/vignettes/ultimate.Rmd index 6b46506..f41d0eb 100644 --- a/vignettes/ultimate.Rmd +++ b/vignettes/ultimate.Rmd @@ -113,6 +113,17 @@ paths <- kwb.utils::resolve( ### 1. Write Pilot Plant Raw Data to InfluxDB Cloud ################################################################################ +# Helper functions +get_base_names <- function(x) basename(kwb.utils::getAttribute(x, "paths")) + +to_raw_imported_data_frame <- function(dir_raw, dir_imported, base_names) +{ + data.frame( + raw = file.path(dir_raw, base_names), + imported = file.path(dir_imported, base_names) + ) +} + ################################################################################ ### 1.1 Pilot A ################################################################################ @@ -145,14 +156,15 @@ if (length(files_pilot_a) > 0L) { ### Move all files for Pilot A`s cloud "raw" data directory to the "imported" ### directory (in case that a file is already existing there: overwrite it)! - paths_pilot_a <- data.frame( - raw = file.path(paths$a_cloud_raw, files_pilot_a), - imported = file.path(paths$a_cloud_imported, files_pilot_a - ) + kwb.pilot::move_nextcloud_files( + paths_df = to_raw_imported_data_frame( + dir_raw = paths$a_cloud_raw, + dir_imported = paths$a_cloud_imported, + base_names = get_base_names(files_pilot_a) + ), + overwrite = TRUE ) - kwb.pilot::move_nextcloud_files(paths_pilot_a, overwrite = TRUE) - } else { message(sprintf( @@ -198,13 +210,15 @@ if (length(files_pilot_b) > 0L) { ### Move all files for Pilot B`s cloud "raw" data directory to the "imported" ### directory (in case that a file is already existing there: overwrite it)! - paths_pilot_b <- data.frame( - raw = file.path(paths$b_cloud_raw, files_pilot_b), - imported = file.path(paths$b_cloud_imported, files_pilot_b) + kwb.pilot::move_nextcloud_files( + paths_df = to_raw_imported_data_frame( + dir_raw = paths$b_cloud_raw, + dir_imported = paths$b_cloud_imported, + base_names = get_base_names(files_pilot_b) + ), + overwrite = TRUE ) - kwb.pilot::move_nextcloud_files(paths_pilot_b, overwrite = TRUE) - } else { message(sprintf( From c3433734428d3f5ec777f3953888efa8bb47bd08 Mon Sep 17 00:00:00 2001 From: hsonne Date: Fri, 15 Sep 2023 10:43:26 +0200 Subject: [PATCH 2/2] Clean influxdb_ultimate.R - add foldable commented section lines - improve indentation - use variable "n_files" --- R/influxdb_ultimate.R | 343 +++++++++++++++++++++++++----------------- 1 file changed, 202 insertions(+), 141 deletions(-) diff --git a/R/influxdb_ultimate.R b/R/influxdb_ultimate.R index 779ddcd..9b8c5de 100644 --- a/R/influxdb_ultimate.R +++ b/R/influxdb_ultimate.R @@ -1,3 +1,5 @@ +# get_pivot_data --------------------------------------------------------------- + #' InfluxDB: Get Pivot Data from ultimate_mean_ bucket #' #' @param agg_interval aggregation interval (default: 1h) @@ -9,16 +11,22 @@ #' @importFrom influxdbclient InfluxDBClient #' @importFrom data.table rbindlist #' @importFrom dplyr select mutate relocate -get_pivot_data <- function(agg_interval = "1d", - date_start = "2021-07-05", - date_stop = Sys.Date()) { +get_pivot_data <- function( + agg_interval = "1d", + date_start = "2021-07-05", + date_stop = Sys.Date() +) +{ config <- get_env_influxdb_ultimate() #stopifnot(agg_interval %in% c("1d", "1h", "10m", "1m")) if (agg_interval %in% c("1d", "1h", "10m", "1m")) { + bucket_source <- sprintf("ultimate_mean_%s", agg_interval) + } else { + message("use raw data") bucket_source <- "ultimate" } @@ -48,6 +56,7 @@ get_pivot_data <- function(agg_interval = "1d", dplyr::select(-.data$`_time`) } +# write_aggr_to_influxdb_loop -------------------------------------------------- #' InfluxDB: write aggregated time series to Ultimate target bucket in loop #' @description wrapper for \code{\link{write_aggr_to_influxdb}} @@ -68,38 +77,48 @@ get_pivot_data <- function(agg_interval = "1d", #' @export #' @importFrom lubridate ymd #' @importFrom kwb.utils catAndRun -write_aggr_to_influxdb_loop <- function(agg_interval = "1h", - agg_function = "mean", - bucket_source = "ultimate", - bucket_target = sprintf("%s_%s_%s", - bucket_source, - agg_function, - agg_interval), - bucket_org = "kwb", - date_start = "2021-07-05", - date_end = Sys.Date(), - hour_start = 0, - hour_end = 12, - max_days = 5) { +write_aggr_to_influxdb_loop <- function( + agg_interval = "1h", + agg_function = "mean", + bucket_source = "ultimate", + bucket_target = sprintf( + "%s_%s_%s", bucket_source, agg_function, agg_interval + ), + bucket_org = "kwb", + date_start = "2021-07-05", + date_end = Sys.Date(), + hour_start = 0, + hour_end = 12, + max_days = 5 +) +{ if (max_days > 0) { - dates_start <- sprintf("%sT00:00:00Z", - seq( - lubridate::ymd(date_start), - lubridate::ymd(date_end) - max_days, - by = sprintf('%d days', max_days) - )) - dates_end <- sprintf("%sT00:00:00Z", - seq( - lubridate::ymd(date_start) + max_days, - lubridate::ymd(date_end), - by = sprintf('%d days', max_days) - )) + dates_start <- sprintf( + "%sT00:00:00Z", + seq( + lubridate::ymd(date_start), + lubridate::ymd(date_end) - max_days, + by = sprintf('%d days', max_days) + ) + ) + dates_end <- sprintf( + "%sT00:00:00Z", + seq( + lubridate::ymd(date_start) + max_days, + lubridate::ymd(date_end), + by = sprintf('%d days', max_days) + ) + ) } else { - dates_start <- - seq(lubridate::ymd(date_start), lubridate::ymd(date_end), 1) + + dates_start <- seq( + lubridate::ymd(date_start), + lubridate::ymd(date_end), + 1 + ) if (hour_end == 0) { dates_end <- dates_start + 1 @@ -107,47 +126,46 @@ write_aggr_to_influxdb_loop <- function(agg_interval = "1h", dates_end <- dates_start } - dates_start <- - sprintf("%sT%02d:00:00Z", dates_start, hour_start) - + dates_start <- sprintf("%sT%02d:00:00Z", dates_start, hour_start) dates_end <- sprintf("%sT%02d:00:00Z", dates_end, hour_end) } - periods_df <- data.frame(start = dates_start, - end = dates_end) + periods_df <- data.frame( + start = dates_start, + end = dates_end + ) sapply( seq_len(nrow(periods_df)), FUN = function(idx) { - period <- periods_df[idx,] - - msg_txt <- - sprintf( - "Aggregate raw data (func: '%s', intervall: %s, period: %s - %s) from raw bucket '%s' and write to '%s'", - agg_function, - agg_interval, - period$start, - period$end, - bucket_source, - bucket_target + period <- periods_df[idx, ] + msg_txt <- sprintf( + "Aggregate raw data (func: '%s', intervall: %s, period: %s - %s) from raw bucket '%s' and write to '%s'", + agg_function, + agg_interval, + period$start, + period$end, + bucket_source, + bucket_target + ) + kwb.utils::catAndRun( + messageText = msg_txt, + expr = write_aggr_to_influxdb( + start = period$start, + end = period$end, + agg_interval = agg_interval, + agg_function = agg_function, + bucket_source = bucket_source, + bucket_target = bucket_target, + bucket_org = bucket_org ) - kwb.utils::catAndRun(messageText = msg_txt, - expr = { - write_aggr_to_influxdb( - start = period$start, - end = period$end, - agg_interval = agg_interval, - agg_function = agg_function, - bucket_source = bucket_source, - bucket_target = bucket_target, - bucket_org = bucket_org - ) - }) + ) } ) } +# write_aggr_to_influxdb ------------------------------------------------------- #' InfluxDB: write aggregated time series to Ultimate target bucket #' @@ -160,22 +178,24 @@ write_aggr_to_influxdb_loop <- function(agg_interval = "1h", #' @param bucket_org bucket organisation (default: "kwb") #' @return writes aggregated time series to InfluxDB target bucket in loop #' @export -write_aggr_to_influxdb <- function(start, - end, - agg_interval = "1h", - agg_function = "mean", - bucket_source = "ultimate", - bucket_target = sprintf("%s_%s_%s", - bucket_source, - agg_function, - agg_interval), - bucket_org = "kwb") { +write_aggr_to_influxdb <- function( + start, + end, + agg_interval = "1h", + agg_function = "mean", + bucket_source = "ultimate", + bucket_target = sprintf( + "%s_%s_%s", bucket_source, agg_function, agg_interval + ), + bucket_org = "kwb" +) +{ config <- get_env_influxdb_ultimate() client <- influxdbclient::InfluxDBClient$new( - url = config[[1]], - token = config[[2]], - org = config[[3]], + url = config[[1L]], + token = config[[2L]], + org = config[[3L]], retryOptions = TRUE ) @@ -199,11 +219,10 @@ write_aggr_to_influxdb <- function(start, '")' ) - tables <- client$query(text = flux_qry) - } +# write_to_influxdb_loop ------------------------------------------------------- #' InfluxDB: write to InfluxDB in Loop #' @@ -224,34 +243,47 @@ write_aggr_to_influxdb <- function(start, #' @return writes imported data to InfluxDB in Loop #' @export -write_to_influxdb_loop <- function(tsv_paths, - paths, - changed_only = TRUE, - max_tsv_files = 5, - batch_size = 5000) { - splits_full <- floor(length(tsv_paths) / max_tsv_files) - splits_partial <- ceiling(length(tsv_paths) / max_tsv_files) +write_to_influxdb_loop <- function( + tsv_paths, + paths, + changed_only = TRUE, + max_tsv_files = 5L, + batch_size = 5000L +) +{ + n_files <- length(tsv_paths) + + splits_full <- floor(n_files / max_tsv_files) + splits_partial <- ceiling(n_files / max_tsv_files) - idx_start <- 1 - max_tsv_files - idx_end <- 0 + idx_start <- 1L - max_tsv_files + idx_end <- 0L for (split in seq_len(splits_full)) { + idx_start <- idx_start + max_tsv_files idx_end <- idx_end + max_tsv_files + cat(sprintf( "Split: %d (tsv_paths_idx: %d - %d)\n", split, idx_start, idx_end )) - write_to_influxdb(tsv_paths = tsv_paths[idx_start:idx_end], - paths = paths, - changed_only = changed_only, - batch_size = batch_size) + + write_to_influxdb( + tsv_paths = tsv_paths[idx_start:idx_end], + paths = paths, + changed_only = changed_only, + batch_size = batch_size + ) } - if (splits_partial - splits_full == 1) { + + if (splits_partial - splits_full == 1L) { + idx_start <- idx_start + max_tsv_files - idx_end <- length(tsv_paths) + idx_end <- n_files + cat( sprintf( "Split partial: %d (tsv_paths_idx: %d - %d)\n", @@ -260,12 +292,16 @@ write_to_influxdb_loop <- function(tsv_paths, idx_end ) ) - write_to_influxdb(tsv_paths = tsv_paths[idx_start:idx_end], - paths = paths, - batch_size = batch_size) + + write_to_influxdb( + tsv_paths = tsv_paths[idx_start:idx_end], + paths = paths, + batch_size = batch_size + ) } } +# write_to_influxdb ------------------------------------------------------------ #' InfluxDB: write to InfluxDB #' @@ -284,10 +320,13 @@ write_to_influxdb_loop <- function(tsv_paths, #' @importFrom tidyselect all_of #' @importFrom janitor clean_names #' @importFrom tidyr pivot_wider -write_to_influxdb <- function(tsv_paths, - paths, - changed_only = TRUE, - batch_size = 5000) { +write_to_influxdb <- function( + tsv_paths, + paths, + changed_only = TRUE, + batch_size = 5000L +) +{ config <- get_env_influxdb_ultimate() tmp_wide <- read_pentair_data( @@ -309,8 +348,7 @@ write_to_influxdb <- function(tsv_paths, - field_cols <- - setdiff(names(tmp_wide), c("date_time", "site_code")) + field_cols <- setdiff(names(tmp_wide), c("date_time", "site_code")) tmp_long <- tmp_wide %>% tidyr::pivot_longer( @@ -318,9 +356,10 @@ write_to_influxdb <- function(tsv_paths, names_to = "ParameterCode", values_to = "ParameterValue" ) %>% - dplyr::filter(!is.na(.data$ParameterValue), - !is.infinite(.data$ParameterValue)) - + dplyr::filter( + !is.na(.data$ParameterValue), + !is.infinite(.data$ParameterValue) + ) fieldnames <- tmp_long %>% dplyr::group_by(.data$ParameterCode) %>% @@ -330,14 +369,17 @@ write_to_influxdb <- function(tsv_paths, diff = max - min ) - if(changed_only) { + if (changed_only) { + fieldnames <- fieldnames %>% dplyr::filter(diff != 0) %>% dplyr::pull(.data$ParameterCode) tmp_long <- tmp_long %>% dplyr::filter(.data$ParameterCode %in% fieldnames) + } else { + fieldnames <- fieldnames %>% dplyr::pull(.data$ParameterCode) } @@ -345,13 +387,12 @@ write_to_influxdb <- function(tsv_paths, #remotes::install_github("influxdata/influxdb-client-r") - client <- - influxdbclient::InfluxDBClient$new( - url = config[[1]], - token = config[[2]], - org = config[[3]], - retryOptions = TRUE - ) + client <- influxdbclient::InfluxDBClient$new( + url = config[[1L]], + token = config[[2L]], + org = config[[3L]], + retryOptions = TRUE + ) # Ready status #ready <- client$ready() @@ -361,44 +402,51 @@ write_to_influxdb <- function(tsv_paths, #client$health() system.time(expr = { + sapply(fieldnames, function(field_col) { + tmp_dat <- tmp_long %>% dplyr::filter(.data$ParameterCode == field_col) %>% - tidyr::pivot_wider(names_from = .data$ParameterCode, - values_from = .data$ParameterValue) %>% + tidyr::pivot_wider( + names_from = .data$ParameterCode, + values_from = .data$ParameterValue + ) %>% as.data.frame() ids <- seq(ceiling(nrow(tmp_dat) / batch_size)) + requests <- tibble::tibble( id = ids, idx_start = 1 + (ids - 1) * batch_size, idx_end = ids * batch_size ) + requests$idx_end[nrow(requests)] <- nrow(tmp_dat) + sapply(ids, function(id) { - tmp_dat_split <- - tmp_dat[requests$idx_start[id]:requests$idx_end[id], ] - msg_txt <- - sprintf( - "'%s' (%d/%d), write request %d/%d (%s - %s, data points: %d, temporal resolution (avg): %ds) to InfluxDB", - field_col, - which(fieldnames == field_col), - length(fieldnames), - id, - length(ids), - min(tmp_dat_split$date_time), - max(tmp_dat_split$date_time), - nrow(tmp_dat_split), - round(as.numeric( - difftime( - max(tmp_dat_split$date_time), - min(tmp_dat_split$date_time), - units = "secs" - ) - ) / nrow(tmp_dat_split), - 0) - ) + tmp_dat_split <- tmp_dat[requests$idx_start[id]:requests$idx_end[id], ] + + msg_txt <- sprintf( + "'%s' (%d/%d), write request %d/%d (%s - %s, data points: %d, temporal resolution (avg): %ds) to InfluxDB", + field_col, + which(fieldnames == field_col), + length(fieldnames), + id, + length(ids), + min(tmp_dat_split$date_time), + max(tmp_dat_split$date_time), + nrow(tmp_dat_split), + round(as.numeric( + difftime( + max(tmp_dat_split$date_time), + min(tmp_dat_split$date_time), + units = "secs" + ) + ) / nrow(tmp_dat_split), + 0) + ) + kwb.utils::catAndRun(messageText = msg_txt, expr = { client$write( tmp_dat_split, @@ -411,12 +459,16 @@ write_to_influxdb <- function(tsv_paths, timeCol = "date_time" ) }) - }) - }) - }) + + }) # end of sapply(ids, ...) + + }) # end of sapply(fieldnames, ...) + + }) # end of system.time() + } - +# download_nextcloud_files ----------------------------------------------------- #' Helper Function: Download Nextcloud Files from a Directory #' @@ -449,7 +501,7 @@ write_to_influxdb <- function(tsv_paths, #' dir_local = "C:/kwb/projects/") #' #' paths <- kwb.utils::resolve(paths_list) - +#' #' download_nextcloud_files(dir_cloud = paths$dir_cloud, #' dir_local = paths$dir_local, #' file_pattern = "Project\\.xls$" @@ -479,7 +531,7 @@ download_nextcloud_files <- function( cloud_files <- dir_cloud %>% kwb.nextcloud::list_files(full_info = TRUE) %>% dplyr::filter(stringr::str_detect(.data$file, pattern = file_pattern)) - + local_files <- kwb.nextcloud::download_files( href = cloud_files$href, target_dir = kwb.utils::createDirectory(dir_local) @@ -493,37 +545,46 @@ download_nextcloud_files <- function( } } +# check_env_nextcloud ---------------------------------------------------------- + #' Helper Function: check if all environment variables for Nextcloud are defined #' #' @return TRUE if all defined, FALSE otherwise #' @export #' -check_env_nextcloud <- function() { +check_env_nextcloud <- function() +{ all(sapply(Sys.getenv(sprintf( "NEXTCLOUD_%s", c("URL", "USER", "PASSWORD") )), nchar) > 0) } +# check_env_influxdb_ultimate -------------------------------------------------- + #' Helper Function: check if all environment variables for ULTIMATE InfluxDB are #' defined #' #' @return TRUE if all defined, FALSE otherwise #' @export #' -check_env_influxdb_ultimate <- function() { +check_env_influxdb_ultimate <- function() +{ all(sapply(Sys.getenv(sprintf( "ULTIMATE_INFLUXDB_%s", c("URL", "TOKEN", "ORG") )), nchar) > 0) } +# get_env_influxdb_ultimate ---------------------------------------------------- + #' Helper Function: get influxdb config for Ultimate if defined #' defined #' #' @return list with influxdb config #' @export #' -get_env_influxdb_ultimate <- function() { +get_env_influxdb_ultimate <- function() +{ stopifnot(check_env_influxdb_ultimate()) as.list(Sys.getenv(sprintf(