diff --git a/.Rbuildignore b/.Rbuildignore index 4ceb83f..d902ab7 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -2,7 +2,7 @@ ^EndpointR\.Rproj$ ^\.Rproj\.user$ todos.md -dev_docs/ +^dev_docs/ README\.Rmd CONTRIBUTORS\.md todos\.qmd @@ -11,3 +11,4 @@ todos\.qmd ^docs$ ^pkgdown$ ^\.github$ +^test_dir/ diff --git a/.gitignore b/.gitignore index 433a50a..c95a029 100644 --- a/.gitignore +++ b/.gitignore @@ -36,12 +36,11 @@ rsconnect/ .Rproj.user inst/doc EndpointR.Rproj - *.html *_dev_files* - dev_docs/project_test_run.qmd docs - # testing /dev_docs/ artifacts *.csv +test_dir +metadata_test_dir diff --git a/DESCRIPTION b/DESCRIPTION index 419c468..628d86a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: EndpointR Title: Connects to various Machine Learning inference providers -Version: 0.1.1 +Version: 0.1.2 Authors@R: person("Jack", "Penzer", , "Jack.penzer@sharecreative.com", role = c("aut", "cre")) Description: EndpointR is a 'batteries included', open-source R package for connecting to various APIs for Machine Learning model predictions. EndpointR is built for company-specific use cases, so may not be useful to a wide audience. @@ -32,7 +32,8 @@ Imports: tibble, S7, jsonvalidate, - readr + readr, + arrow VignetteBuilder: knitr Depends: R (>= 3.5) diff --git a/NAMESPACE b/NAMESPACE index d761a6e..6346a46 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -6,12 +6,15 @@ export(hf_build_request) export(hf_build_request_batch) export(hf_build_request_df) export(hf_classify_batch) +export(hf_classify_chunks) export(hf_classify_df) export(hf_classify_text) export(hf_embed_batch) export(hf_embed_chunks) export(hf_embed_df) export(hf_embed_text) +export(hf_get_endpoint_info) +export(hf_get_model_max_length) export(hf_perform_request) export(json_dump) export(json_schema) diff --git a/NEWS.md b/NEWS.md index 882260d..c48bc6f 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,27 @@ -# Endpointr 0.1.2 +# EndpointR 0.1.2 -- [ ] `hf_embed_df()`, `hf_classify_df()` improved to write to files similarly to the upgrades applied in 0.1qq.1 +- **File writing improvements**: `hf_embed_df()` and `hf_classify_df()` now write intermediate results as `.parquet` files to `output_dir` directories, similar to improvements in 0.1.1 for OpenAI functions + +- **Parameter changes**: Moved from `batch_size` to `chunk_size` argument across `hf_embed_df()`, `hf_classify_df()`, and `oai_complete_df()` for consistency + +- **New chunking functions**: Introduced `hf_embed_chunks()` and `hf_classify_chunks()` for more efficient batch processing with better error handling + +- **Dependency update**: Package now depends on `arrow` for faster `.parquet` file writing and reading + +- **Metadata tracking**: Hugging Face functions that write to files (`hf_embed_df()`, `hf_classify_df()`, `hf_embed_chunks()`, `hf_classify_chunks()`) now write `metadata.json` to output directories containing: + - Endpoint URL and API key name used + - Processing parameters (chunk_size, concurrent_requests, timeout, max_retries) + - Inference parameters (truncate, max_length) + - Timestamp and row counts + - Useful for debugging, reproducibility, and tracking which models/endpoints were used + +- **max_length parameter**: Added `max_length` parameter to `hf_classify_df()` and `hf_classify_chunks()` for text truncation control. Note: `hf_embed_df()` handles truncation automatically via endpoint configuration (set `AUTO_TRUNCATE` in endpoint settings) + +- **New utility functions**: + - `hf_get_model_max_length()` - Retrieve maximum token length for a Hugging Face model + - `hf_get_endpoint_info()` - Retrieve detailed information about a Hugging Face Inference Endpoint + +- **Improved reporting**: Chunked/batch processing functions now report total successes and failures at completion # EndpointR 0.1.1 @@ -15,3 +36,4 @@ Initial BETA release, ships with: - Support for text completion using OpenAI models via the Chat Completions API - Support for embeddings with the OpenAI Embeddings API - Structured outputs via JSON schemas and validators + diff --git a/R/hf_classify.R b/R/hf_classify.R index e0e6304..008898c 100644 --- a/R/hf_classify.R +++ b/R/hf_classify.R @@ -91,7 +91,7 @@ tidy_batch_classification_response <- function(response) { return(results) } -# hf_classify_docs ---- +# hf_classify_text_docs ---- #' Classify text using a Hugging Face Inference API endpoint #' #' @description @@ -153,7 +153,7 @@ tidy_batch_classification_response <- function(response) { #' tidy = FALSE #' ) #' } -# hf_classify_docs ---- +# hf_classify_text docs ---- hf_classify_text <- function(text, endpoint_url, key_name, @@ -206,6 +206,7 @@ hf_classify_text <- function(text, } +# hf_classify_batch docs ---- #' Classify multiple texts using Hugging Face Inference Endpoints #' #' @description @@ -261,6 +262,7 @@ hf_classify_text <- function(text, #' batch_size = 3 #' ) #' } +# hf_classify_batch docs ---- hf_classify_batch <- function(texts, endpoint_url, key_name, @@ -359,7 +361,239 @@ hf_classify_batch <- function(texts, return(result) } +# hf_classify_chunks docs ---- +#' Efficiently classify vectors of text in chunks +#' +#' @description +#' Classifies large batches of text using a Hugging Face classification endpoint. +#' Processes texts in chunks with concurrent requests, writes intermediate results +#' to disk as Parquet files, and returns a combined data frame of all classifications. +#' +#' +#' @details +#' The function creates a metadata JSON file in `output_dir` containing processing +#' parameters and timestamps. Each chunk is saved as a separate Parquet file before +#' being combined into the final result. Use `output_dir = "auto"` to generate a +#' timestamped directory automatically. +#' +#' For single text classification, use `hf_classify_text()` instead. +#' +#' @param texts Character vector of texts to classify +#' @param ids Vector of unique identifiers corresponding to each text (same length as texts) +#' @param endpoint_url Hugging Face Classification Endpoint +#' @param max_length The maximum number of tokens in the text variable. Beyond this cut-off everything is truncated. +#' @param tidy_func Function to process API responses, defaults to +#' `tidy_classification_response` +#' @param output_dir Path to directory for the .parquet chunks +#' @param chunk_size Number of texts to process in each chunk before writing to disk (default: 5000) +#' @param concurrent_requests Integer; number of concurrent requests (default: 5) +#' @param max_retries Integer; maximum retry attempts (default: 5) +#' @param timeout Numeric; request timeout in seconds (default: 30) +#' @param key_name Name of environment variable containing the API key +#' @param id_col_name Name for the ID column in output (default: "id"). When called from hf_classify_df(), this preserves the original column name. +#' @param text_col_name Name for the text column in output (default: "text"). When called from hf_classify_df(), this preserves the original column name. +#' +#' @returns A data frame of classified documents with successes and failures +#' @export +#' +#' @examples +#' \dontrun{ +#' # basic usage with vectors +#' texts <- c("I love this", "I hate this", "This is ok") +#' ids <- c("review_1", "review_2", "review_3") +#' +#' results <- hf_classify_chunks( +#' texts = texts, +#' ids = ids, +#' endpoint_url = "https://your-endpoint.huggingface.cloud", +#' key_name = "HF_API_KEY" +#' ) +#' } +# hf_classify_chunks docs ---- +hf_classify_chunks <- function(texts, + ids, + endpoint_url, + max_length = 512L, + tidy_func = tidy_classification_response, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 5L, + max_retries = 5L, + timeout = 30L, + key_name = "HF_API_KEY", + id_col_name = "id", + text_col_name = "text" +) { + + # input validation ---- + if (length(texts) == 0) { + cli::cli_abort("Input 'texts' is empty or . Returning an empty tibble.") + } + + if (length(texts) == 1) { + cli::cli_abort("Function expects a batch of inputs, use `hf_classify_text` for single texts.") + } + + + stopifnot( + "Texts must be a list or vector" = is.vector(texts), + "ids must be a vector" = is.vector(ids), + "texts and ids must be the same length" = length(texts) == length(ids), + "chunk_size must be a positive integer" = is.numeric(chunk_size) && chunk_size > 0 && chunk_size == as.integer(chunk_size), + "concurrent_requests must be a positive integer" = is.numeric(concurrent_requests) && concurrent_requests > 0 && concurrent_requests == as.integer(concurrent_requests), + "max_retries must be a positive integer" = is.numeric(max_retries) && max_retries >= 0 && max_retries == as.integer(max_retries), + "timeout must be a positive integer" = is.numeric(timeout) && timeout > 0, + "endpoint_url must be a non-empty string" = is.character(endpoint_url) && nchar(endpoint_url) > 0, + "key_name must be a non-empty string" = is.character(key_name) && nchar(key_name) > 0 + ) + + # Chunking set up and metadata ---- + output_dir <- .handle_output_directory(output_dir, base_dir_name = "hf_classify_chunk") + + if (!dir.exists(output_dir)) { + dir.create(output_dir, recursive = TRUE) + } + + chunk_data <- batch_vector(seq_along(texts), chunk_size) + n_chunks <- length(chunk_data$batch_indices) + + inference_parameters = list(return_all_scores = TRUE, + truncation = TRUE, + max_length = max_length) + + metadata <- list( + output_dir = output_dir, + endpoint_url = endpoint_url, + inference_parameters = inference_parameters, + chunk_size = chunk_size, + n_chunks = n_chunks, + n_texts = length(texts), + concurrent_requests = concurrent_requests, + timeout = timeout, + max_retries = max_retries, + key_name = key_name, + timestamp = Sys.time() + ) + + jsonlite::write_json(metadata, + file.path(output_dir, "metadata.json"), + auto_unbox = TRUE, + pretty = TRUE) + + cli::cli_alert_info("Processing {length(texts)} text{?s} in {n_chunks} chunk{?s} of up to {chunk_size} rows per chunk") + cli::cli_alert_info("Intermediate results and metadata will be saved as .parquet files and .json in {output_dir}") + + + # process chunks ---- + + # track global successes for failures for end-of-pipeline reporting + total_successes <- 0 + total_failures <- 0 + + for (chunk_num in seq_along(chunk_data$batch_indices)) { + chunk_indices <- chunk_data$batch_indices[[chunk_num]] + chunk_texts <- texts[chunk_indices] + chunk_ids <- ids[chunk_indices] + + cli::cli_progress_message("Classifying chunk {chunk_num}/{n_chunks} ({length(chunk_indices)} text{?s})") + + requests <- purrr::map2( + .x = chunk_texts, + .y = chunk_ids, + .f = \(x, y) hf_build_request( + input = x, + endpoint_url = endpoint_url, + endpointr_id = y, + key_name = key_name, + parameters = inference_parameters, + max_retries = max_retries, + timeout = timeout, + validate = FALSE + ) + ) + is_valid_request <- purrr::map_lgl(requests, \(x) inherits(x, "httr2_request")) + + valid_requests <- requests[is_valid_request] + + if (length(valid_requests) == 0) { + cli::cli_alert_warning("No valid request{?s} in chunk {chunk_num}, skipping") + next + } + + responses <- perform_requests_with_strategy( + valid_requests, + concurrent_requests = concurrent_requests, + progress = TRUE + ) + + chunk_successes <- httr2::resps_successes(responses) + chunk_failures <- httr2::resps_failures(responses) + + n_chunk_successes <- length(chunk_successes) + n_chunk_failures <- length(chunk_failures) + + total_successes <- total_successes + n_chunk_successes + total_failures <- total_failures + n_chunk_failures + + chunk_results <- list() + + if (n_chunk_successes > 0) { + + successes_ids <- purrr::map(chunk_successes, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> + unlist() + successes_texts <- purrr::map(chunk_successes, \(x) purrr::pluck(x, "request", "body", "data", "inputs")) |> unlist() + successes_content <- purrr::map(chunk_successes, tidy_func) |> + purrr::list_rbind() + + chunk_results$successes <- tibble::tibble( + !!id_col_name := successes_ids, + !!text_col_name := successes_texts, + .error = FALSE, + .error_msg = NA_character_, + .chunk = chunk_num + ) |> + dplyr::bind_cols(successes_content) + + } + + if (n_chunk_failures > 0) { + + failures_ids <- purrr::map(chunk_failures, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> unlist() + failures_texts <- purrr::map_chr(chunk_failures, \(x) purrr::pluck(x, "request", "body", "data", "inputs")) |> unlist() + failures_msgs <- purrr::map_chr(chunk_failures, \(x) purrr::pluck(x, "message", .default = "Unknown error")) + + + chunk_results$failures <- tibble::tibble( + !!id_col_name := failures_ids, + !!text_col_name := failures_texts, + .error = TRUE, + .error_msg = failures_msgs, + .chunk = chunk_num + ) + } + + chunk_df <- dplyr::bind_rows(chunk_results) + + if (nrow(chunk_df) > 0) { + chunk_file <- glue::glue("{output_dir}/chunk_{stringr::str_pad(chunk_num, 3, pad = '0')}.parquet") + arrow::write_parquet(chunk_df, chunk_file) + } + + cli::cli_alert_success("Chunk {chunk_num}: {n_chunk_successes} successful, {n_chunk_failures} failed") + } + + parquet_files <- list.files(output_dir, pattern = "\\.parquet$", full.names = TRUE) + + # report and return ---- + cli::cli_alert_info("Processing completed, there were {total_successes} successes\n and {total_failures} failures.") + final_results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() + + return(final_results) +} + +# hf_classify_df docs ---- #' Classify a data frame of texts using Hugging Face Inference Endpoints #' #' @description @@ -367,9 +601,9 @@ hf_classify_batch <- function(texts, #' endpoint and joins the results back to the original data frame. #' #' @details -#' This function extracts texts from a specified column, classifies them using -#' `hf_classify_batch()`, and joins the classification results back to the -#' original data frame using a specified ID column. +#' This function extracts texts and IDs from the specified columns, classifies them in chunks. +#' It writes +#' `hf_classify_chunks()`, and then returns all of the chu #' #' The function preserves the original data frame structure and adds new #' columns for classification scores. If the number of rows doesn't match @@ -383,16 +617,14 @@ hf_classify_batch <- function(texts, #' @param id_var Column name to use as identifier for joining (unquoted) #' @param endpoint_url URL of the Hugging Face Inference API endpoint #' @param key_name Name of environment variable containing the API key -#' @param ... Additional arguments passed to request functions +#' @param max_length The maximum number of tokens in the text variable. Beyond this cut-off everything is truncated. +#' @param output_dir Path to directory for the .parquet chunks #' @param tidy_func Function to process API responses, defaults to #' `tidy_batch_classification_response` -#' @param parameters List of parameters for the API endpoint, defaults to -#' `list(return_all_scores = TRUE)` -#' @param batch_size Integer; number of texts per batch (default: 4) +#' @param chunk_size Number of texts to process in each chunk before writing to disk (default: 5000) #' @param concurrent_requests Integer; number of concurrent requests (default: 1) #' @param max_retries Integer; maximum retry attempts (default: 5) #' @param timeout Numeric; request timeout in seconds (default: 30) -#' @param progress Logical; whether to show progress bar (default: TRUE) #' #' @return Original data frame with additional columns for classification scores, #' or classification results table if row counts don't match @@ -414,19 +646,19 @@ hf_classify_batch <- function(texts, #' key_name = "API_KEY" #' ) #' } +# hf_classify_df docs ---- hf_classify_df <- function(df, text_var, id_var, endpoint_url, key_name, - ..., - tidy_func = tidy_batch_classification_response, - parameters = list(return_all_scores = TRUE), - batch_size = 4, + max_length = 512L, + output_dir = "auto", + tidy_func = tidy_classification_response, + chunk_size = 5000, concurrent_requests = 1, max_retries = 5, - timeout = 30, - progress = TRUE) { + timeout = 60) { # mirrors the hf_embed_df function @@ -437,47 +669,38 @@ hf_classify_df <- function(df, "df must be a data frame" = is.data.frame(df), "endpoint_url must be provided" = !is.null(endpoint_url) && nchar(endpoint_url) > 0, "concurrent_requests must be a number greater than 0" = is.numeric(concurrent_requests) && concurrent_requests > 0, - "batch_size must be a number greater than 0" = is.numeric(batch_size) && batch_size > 0 + "chunk_size must be a number greater than 0" = is.numeric(chunk_size) && chunk_size > 0 ) - original_num_rows <- nrow(df) # for final sanity check + output_dir <- .handle_output_directory(output_dir, base_dir_name = "hf_classification_chunks") # pull texts & ids into vectors for batch function text_vec <- dplyr::pull(df, !!text_sym) indices_vec <- dplyr::pull(df, !!id_sym) - batch_size <- if(is.null(batch_size) || batch_size <=1) 1 else batch_size - - classification_tbl <- hf_classify_batch(texts = text_vec, - endpoint_url = endpoint_url, - key_name = key_name, - tidy_func = tidy_func, - parameters = parameters, - batch_size = batch_size, - max_retries = max_retries, - timeout = timeout, - progress = TRUE, - concurrent_requests = concurrent_requests) - - - final_num_rows <- nrow(classification_tbl) - - if(final_num_rows == original_num_rows) { - classification_tbl <- classification_tbl |> dplyr::mutate(!!id_sym := indices_vec) - - df <- dplyr::left_join(df, classification_tbl) - - return(df) - } else { - cli::cli_warn("Rows in original data frame and returned data frame do not match:") - cli::cli_bullets(text = c( - "Rows in original data frame: {original_num_rows}", - "Rows in returned data frame: {final_num_rows}" - )) - cli::cli_alert_info("Returning table with all available response data") - return(classification_tbl) - } + # preserve original column names + id_col_name <- rlang::as_name(id_sym) + text_col_name <- rlang::as_name(text_sym) + + chunk_size <- if(is.null(chunk_size) || chunk_size <=1) 1 else chunk_size + + results <- hf_classify_chunks( + texts = text_vec, + ids = indices_vec, + endpoint_url = endpoint_url, + max_length = max_length, + tidy_func = tidy_func, + chunk_size = chunk_size, + concurrent_requests = concurrent_requests, + max_retries = max_retries, + timeout = timeout, + key_name = key_name, + output_dir = output_dir, + id_col_name = id_col_name, + text_col_name = text_col_name + ) + return(results) } diff --git a/R/hf_embed.R b/R/hf_embed.R index a0c4374..8f249e3 100644 --- a/R/hf_embed.R +++ b/R/hf_embed.R @@ -235,6 +235,7 @@ hf_embed_batch <- function(texts, } +# hf_embed_chunks docs ---- #' Embed text chunks through Hugging Face Inference Embedding Endpoints #' #' This function is capable of processing large volumes of text through Hugging Face's Inference Embedding Endpoints. Results are written in chunks to a file, to avoid out of memory issues. @@ -246,30 +247,33 @@ hf_embed_batch <- function(texts, #' @param texts Character vector of texts to process #' @param ids Vector of unique identifiers corresponding to each text (same length as texts) #' @param endpoint_url Hugging Face Embedding Endpoint -#' @param output_file Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename. +#' @param output_dir Path to directory for the .parquet chunks #' @param chunk_size Number of texts to process in each chunk before writing to disk (default: 5000) #' @param concurrent_requests Number of concurrent requests (default: 5) #' @param max_retries Maximum retry attempts per failed request (default: 5) #' @param timeout Request timeout in seconds (default: 10) #' @param key_name Name of environment variable containing the API key (default: "HF_API_KEY") +#' @param id_col_name Name for the ID column in output (default: "id"). When called from hf_embed_df(), this preserves the original column name. #' #' @return A tibble with columns: -#' - `id`: Original identifier from input +#' - ID column (name specified by `id_col_name`): Original identifier from input #' - `.error`: Logical indicating if request failed #' - `.error_msg`: Error message if failed, NA otherwise #' - `.chunk`: Chunk number for tracking #' - Embedding columns (V1, V2, etc.) #' @export #' +# hf_embed_chunks docs ---- hf_embed_chunks <- function(texts, ids, endpoint_url, - output_file = "auto", + output_dir = "auto", chunk_size = 5000L, concurrent_requests = 5L, max_retries = 5L, timeout = 10L, - key_name = "HF_API_KEY") { + key_name = "HF_API_KEY", + id_col_name = "id") { # input validation ---- stopifnot( @@ -279,20 +283,47 @@ hf_embed_chunks <- function(texts, "chunk_size must be a positive integer greater than 1" = is.numeric(chunk_size) && chunk_size > 0 ) - output_file = .handle_output_filename(output_file, base_file_name = "hf_embeddings_batch") + # output_file = .handle_output_filename(output_file, base_file_name = "hf_embeddings_batch") + + output_dir <- .handle_output_directory(output_dir, base_dir_name = "hf_embeddings_batch") + + if (!dir.exists(output_dir)) { + dir.create(output_dir, recursive = TRUE) + } chunk_data <- batch_vector(seq_along(texts), chunk_size) n_chunks <- length(chunk_data$batch_indices) + inference_parameters = list(truncate = TRUE) # text embeddings inference - TEI only takes truncate, not truncation and max_length like other inference endpoints! + + # write/store imoortant metadata in the output dir + metadata <- list( + endpoint_url = endpoint_url, + chunk_size = chunk_size, + n_texts = length(texts), + concurrent_requests = concurrent_requests, + timeout = timeout, + output_dir = output_dir, + key_name = key_name, + n_chunks = n_chunks, + timestamp = Sys.time(), + inference_parameters = inference_parameters + ) + + jsonlite::write_json(metadata, + file.path(output_dir, "metadata.json"), + auto_unbox = TRUE, + pretty = TRUE) + cli::cli_alert_info("Processing {length(texts)} text{?s} in {n_chunks} chunk{?s} of up to {chunk_size} each") - cli::cli_alert_info("Intermediate results will be saved to a .csv at {output_file}.") + cli::cli_alert_info("Intermediate results will be saved as parquet files in {output_dir}") total_success <- 0 total_failures <- 0 ## Chunk Processing ---- - for (chunk_num in seq_along(chunk_data$batch_indices)) - { + for (chunk_num in seq_along(chunk_data$batch_indices)) { + chunk_indices <- chunk_data$batch_indices[[chunk_num]] chunk_texts <- texts[chunk_indices] chunk_ids <- ids[chunk_indices] @@ -307,7 +338,7 @@ hf_embed_chunks <- function(texts, endpoint_url = endpoint_url, endpointr_id = y, key_name = key_name, - parameters = list(), + parameters = inference_parameters, max_retries = max_retries, timeout = timeout, validate = FALSE @@ -339,13 +370,13 @@ hf_embed_chunks <- function(texts, # within chunk results ---- chunk_results <- list() - if (length(successes) > 0) { + if(n_successes > 0) { successes_ids <- purrr::map(successes, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> unlist() successes_content <- purrr::map(successes, tidy_embedding_response) |> purrr::list_rbind() chunk_results$successes <- tibble::tibble( - id = successes_ids, + !!id_col_name := successes_ids, .error = FALSE, .error_msg = NA_character_, .chunk = chunk_num @@ -353,12 +384,12 @@ hf_embed_chunks <- function(texts, dplyr::bind_cols(successes_content) } - if (length(failures) > 0) { + if (n_failures > 0) { failures_ids <- purrr::map(failures, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> unlist() failures_msgs <- purrr::map_chr(failures, \(x) purrr::pluck(x, "message", .default = "Unknown error")) chunk_results$failures <- tibble::tibble( - id = failures_ids, + !!id_col_name := failures_ids, .error = TRUE, .error_msg = failures_msgs, .chunk = chunk_num @@ -367,20 +398,19 @@ hf_embed_chunks <- function(texts, chunk_df <- dplyr::bind_rows(chunk_results) - if (nrow(chunk_df) > 0) { - if (chunk_num == 1) { - # if we're in the first chunk write to csv with headers (col names) - readr::write_csv(chunk_df, output_file, append = FALSE) - } else { - # all other chunks, append and don't use col names - readr::write_csv(chunk_df, output_file, append = TRUE, col_names = FALSE) - } + if (nrow(chunk_df) > 0) { + chunk_file <- glue::glue("{output_dir}/chunk_{stringr::str_pad(chunk_num, 3, pad = '0')}.parquet") + arrow::write_parquet(chunk_df, chunk_file) } cli::cli_alert_success("Chunk {chunk_num}: {n_successes} successful, {n_failures} failed") } - final_results <- readr::read_csv(output_file, show_col_types = FALSE) + parquet_files <- list.files(output_dir, pattern = "\\.parquet$", full.names = TRUE) + + cli::cli_alert_info("Processing completed, there were {total_success} successes\n and {total_failures} failures.") + final_results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() return(final_results) } @@ -395,13 +425,15 @@ hf_embed_chunks <- function(texts, #' response processing, with options for batching & parallel execution. #' Setting the number of retries #' +#' Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a `.parquet` file in the `output_dir=` directory, which also contains a `metadata.json` file which tracks important information such as the endpoint URL used. Be sure to check any output directories into .gitignore! +#' #' @param df A data frame containing texts to embed #' @param text_var Name of the column containing text to embed #' @param id_var Name of the column to use as ID #' @param endpoint_url The URL of the Hugging Face Inference API endpoint #' @param key_name Name of the environment variable containing the API key -#' @param output_file Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename. -#' @param chunk_size Number of texts to process in one batch (NULL for no batching) +#' @param output_dir Path to directory for the .parquet chunks +#' @param chunk_size The size of each chunk that will be processed and then written to a file. #' @param concurrent_requests Number of requests to send at once. Some APIs do not allow for multiple requests. #' @param max_retries Maximum number of retry attempts for failed requests. #' @param timeout Request timeout in seconds @@ -418,34 +450,22 @@ hf_embed_chunks <- function(texts, #' text = c("First example", "Second example", "Third example") #' ) #' -#' # Use parallel processing without batching -#' embeddings_df <- hf_embed_df( -#' df = df, -#' text_var = text, -#' endpoint_url = "https://my-endpoint.huggingface.cloud", -#' id_var = id, -#' parallel = TRUE, -#' batch_size = NULL -#' ) -#' #' # Use batching without parallel processing #' embeddings_df <- hf_embed_df( #' df = df, #' text_var = text, #' endpoint_url = "https://my-endpoint.huggingface.cloud", -#' id_var = id, -#' parallel = FALSE, -#' batch_size = 10 +#' id_var = id #' ) #' -#' # Use both batching and parallel processing +#' # Use both chunking and parallel processing #' embeddings_df <- hf_embed_df( #' df = df, #' text_var = text, #' endpoint_url = "https://my-endpoint.huggingface.cloud", #' id_var = id, -#' parallel = TRUE, -#' batch_size = 10 +#' chunk_size = 10000, +#' concurrent_requests = 50 #' ) #' } # hf_embed_df docs ---- @@ -454,8 +474,8 @@ hf_embed_df <- function(df, id_var, endpoint_url, key_name, - output_file = "auto", - chunk_size = 8L, + output_dir = "auto", + chunk_size = 5000L, concurrent_requests = 1L, max_retries = 5L, timeout = 15L, @@ -473,13 +493,17 @@ hf_embed_df <- function(df, "concurrent_requests must be an integer" = is.numeric(concurrent_requests) && concurrent_requests > 0 ) - output_file <- .handle_output_filename(output_file, - base_file_name = "hf_embeddings_batch") - # refactoring to always use hf_embed_batch - if batch_size if one then it gets handled anyway, avoids branching and additional complexity. + output_dir <- .handle_output_directory(output_dir, + base_dir_name = "hf_embeddings_batch") + + texts <- dplyr::pull(df, !!text_sym) indices <- dplyr::pull(df, !!id_sym) + # preserve original column name + id_col_name <- rlang::as_name(id_sym) + chunk_size <- if(is.null(chunk_size) || chunk_size <= 1) 1 else chunk_size results <- hf_embed_chunks( @@ -490,10 +514,13 @@ hf_embed_df <- function(df, chunk_size = chunk_size, concurrent_requests = concurrent_requests, max_retries = max_retries, - timeout = timeout + timeout = timeout, + output_dir = output_dir, + id_col_name = id_col_name ) return(results) } + diff --git a/R/utils.R b/R/utils.R index 8d9e75e..bb4ef98 100644 --- a/R/utils.R +++ b/R/utils.R @@ -260,9 +260,20 @@ extract_field <- function(api_response, field_name) { return(x) } +# modifying the .handle_output_filename to work with .parquet #' @keywords internal -.append_tibble_class <- function(x) { - attr(x, "class") <- c("tbl_df", "tbl", "data.frame") +.handle_output_directory <- function(x, base_dir_name = "batch_processing_") { + if (is.null(x)) { + return(tempfile(pattern = base_dir_name)) + } + + if(identical(x, "auto")) { + timestamp <- format(Sys.time(), "%d%m%Y_%H%M%S") + output_dir <- glue::glue("{base_dir_name}_{timestamp}") + return(output_dir) + } + + # Accept directory path directly return(x) } @@ -272,3 +283,59 @@ parse_oai_date <- function(date_string) { date <- as.Date(parsed_date) return(date) } + + + +#' Check the max number of tokens allowed for your inputs +#' +#' This function requires the model to have 'tokenizer_config.json' file with a +#' `model_max_length` key, otherwise it will error. +#' +#' @param model_name name of the model e.g. 'sentence-transformers/mpnet-base-v2' +#' @param api_key Your Hugging Face auth token +#' +#' @returns Integer value of the model_max_length from tokenizer config +#' @export +#' +hf_get_model_max_length <- function(model_name, api_key = "HF_API_KEY") { + config_url <- glue::glue("https://huggingface.co/{model_name}/resolve/main/tokenizer_config.json") + + use_api_key <- get_api_key(api_key) + + req <- httr2::request(config_url) + + if (!is.null(use_api_key)) { + req <- req |> + httr2::req_headers(Authorization = paste("Bearer", use_api_key)) + } + + response <- req |> httr2::req_perform() + + tokenizer_config <- response |> + httr2::resp_body_string() |> + jsonlite::fromJSON() + + return(tokenizer_config$model_max_length) +} + + +#' Retrieve information about an endpoint +#' +#' @param endpoint_url Hugging Face Embedding Endpoint +#' @param key_name Name of environment variable containing the API key (default: "HF_API_KEY") +#' +#' @returns JSON of endpoint information +#' @export +#' +hf_get_endpoint_info <- function(endpoint_url, key_name = "HF_API_KEY") { + + info_endpoint_url <- glue::glue("{endpoint_url}/info") + api_key = get_api_key(key_name) + + info <-httr2::request(info_endpoint_url) |> + httr2::req_headers(Authorization = paste("Bearer", api_key)) |> + httr2::req_perform() |> + httr2::resp_body_json() + + return(info) +} diff --git a/README.Rmd b/README.Rmd index a9ff7b5..22071ee 100644 --- a/README.Rmd +++ b/README.Rmd @@ -93,8 +93,11 @@ hf_embed_df( id_var = review_id, endpoint_url = endpoint_url, key_name = "HF_API_KEY", + output_dir = "embeddings_output", # writes .parquet chunks to this directory + chunk_size = 5000, # process 5000 rows per chunk concurrent_requests = 2, - batch_size = 3 + max_retries = 5, + timeout = 15 ) ``` @@ -134,8 +137,12 @@ hf_classify_df( id_var = review_id, endpoint_url = sentiment_endpoint, key_name = "HF_API_KEY", - batch_size = 8, - concurrent_requests = 3 + max_length = 512, # truncate texts longer than 512 tokens + output_dir = "classification_output", # writes .parquet chunks to this directory + chunk_size = 2500, # process 2500 rows per chunk + concurrent_requests = 3, + max_retries = 5, + timeout = 60 ) |> dplyr::rename(!!!labelid_2class()) ``` @@ -189,7 +196,11 @@ oai_complete_df( id_var = review_id, system_prompt = "Classify the following review:", key_name = "OPENAI_API_KEY", - concurrent_requests = 5 # send 5 rows of data simultaneously + output_file = "completions_output.parquet", # writes results to this file + chunk_size = 1000, # process 1000 rows per chunk + concurrent_requests = 5, # send 5 rows of data simultaneously + max_retries = 5, + timeout = 30 ) ``` @@ -203,10 +214,59 @@ oai_complete_df( system_prompt = "Classify the following review:", schema = sentiment_schema, key_name = "OPENAI_API_KEY", - concurrent_requests = 5 # send 5 rows of data simultaneously + output_file = "completions_output.parquet", + chunk_size = 1000, + concurrent_requests = 5 ) ``` +# Working with Output Files + +## Reading Results from Disk + +Hugging Face functions (`hf_embed_df()`, `hf_classify_df()`) write intermediate results as `.parquet` files in the specified `output_dir`. To read all results back: + +```{r, eval = FALSE} +# List all parquet files (excludes metadata.json automatically) +parquet_files <- list.files("embeddings_output", + pattern = "\\.parquet$", + full.names = TRUE) + +# Read all chunks into a single data frame +results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() +``` + +## Understanding metadata.json + +Each Hugging Face output directory contains a `metadata.json` file that records: + +- `endpoint_url`: The API endpoint used +- `chunk_size`: Number of rows processed per chunk +- `n_texts`: Total number of texts processed +- `concurrent_requests`: Parallel request setting +- `timeout`: Request timeout in seconds +- `max_retries`: Maximum retry attempts +- `inference_parameters`: Model-specific parameters (e.g., truncate, max_length) +- `timestamp`: When the job was run +- `key_name`: Which API key was used + +This metadata is useful for: + +- Debugging failed runs +- Reproducing results with the same settings +- Tracking which endpoint/model was used +- Understanding performance characteristics + +```{r, eval = FALSE} +metadata <- jsonlite::read_json("embeddings_output/metadata.json") + +# check which endpoint was used +metadata$endpoint_url +``` + +**Note:** Add output directories to `.gitignore` to avoid committing API responses and metadata. + Read the [LLM Providers Vignette](articles/llm_providers.html), and the [Structured Outputs Vignette](vignettes/structured_outputs_json_schema.Rmd) for more information on common workflows with the OpenAI Chat Completions API [^1] [^1]: Content pending implementation for Anthroic Messages API, Gemini API, and OpenAI Responses API diff --git a/README.md b/README.md index da1601a..e54bd32 100644 --- a/README.md +++ b/README.md @@ -89,8 +89,11 @@ hf_embed_df( id_var = review_id, endpoint_url = endpoint_url, key_name = "HF_API_KEY", + output_dir = "embeddings_output", # writes .parquet chunks to this directory + chunk_size = 5000, # process 5000 rows per chunk concurrent_requests = 2, - batch_size = 3 + max_retries = 5, + timeout = 15 ) ``` @@ -132,8 +135,12 @@ hf_classify_df( id_var = review_id, endpoint_url = sentiment_endpoint, key_name = "HF_API_KEY", - batch_size = 8, - concurrent_requests = 3 + max_length = 512, # truncate texts longer than 512 tokens + output_dir = "classification_output", # writes .parquet chunks to this directory + chunk_size = 2500, # process 2500 rows per chunk + concurrent_requests = 3, + max_retries = 5, + timeout = 60 ) |> dplyr::rename(!!!labelid_2class()) ``` @@ -190,7 +197,11 @@ oai_complete_df( id_var = review_id, system_prompt = "Classify the following review:", key_name = "OPENAI_API_KEY", - concurrent_requests = 5 # send 5 rows of data simultaneously + output_file = "completions_output.parquet", # writes results to this file + chunk_size = 1000, # process 1000 rows per chunk + concurrent_requests = 5, # send 5 rows of data simultaneously + max_retries = 5, + timeout = 30 ) ``` @@ -204,10 +215,64 @@ oai_complete_df( system_prompt = "Classify the following review:", schema = sentiment_schema, key_name = "OPENAI_API_KEY", - concurrent_requests = 5 # send 5 rows of data simultaneously + output_file = "completions_output.parquet", + chunk_size = 1000, + concurrent_requests = 5 ) ``` +# Working with Output Files + +## Reading Results from Disk + +Hugging Face functions (`hf_embed_df()`, `hf_classify_df()`) write +intermediate results as `.parquet` files in the specified `output_dir`. +To read all results back: + +``` r +# List all parquet files (excludes metadata.json automatically) +parquet_files <- list.files("embeddings_output", + pattern = "\\.parquet$", + full.names = TRUE) + +# Read all chunks into a single data frame +results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() +``` + +## Understanding metadata.json + +Each Hugging Face output directory contains a `metadata.json` file that +records: + +- `endpoint_url`: The API endpoint used +- `chunk_size`: Number of rows processed per chunk +- `n_texts`: Total number of texts processed +- `concurrent_requests`: Parallel request setting +- `timeout`: Request timeout in seconds +- `max_retries`: Maximum retry attempts +- `inference_parameters`: Model-specific parameters (e.g., truncate, + max_length) +- `timestamp`: When the job was run +- `key_name`: Which API key was used + +This metadata is useful for: + +- Debugging failed runs +- Reproducing results with the same settings +- Tracking which endpoint/model was used +- Understanding performance characteristics + +``` r +metadata <- jsonlite::read_json("embeddings_output/metadata.json") + +# check which endpoint was used +metadata$endpoint_url +``` + +**Note:** Add output directories to `.gitignore` to avoid committing API +responses and metadata. + Read the [LLM Providers Vignette](articles/llm_providers.html), and the [Structured Outputs Vignette](vignettes/structured_outputs_json_schema.Rmd) for more diff --git a/_pkgdown.yml b/_pkgdown.yml index c1c4629..aa2657d 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -70,6 +70,7 @@ reference: contents: - hf_classify_text - hf_classify_batch + - hf_classify_chunks - hf_classify_df - tidy_classification_response @@ -81,6 +82,12 @@ reference: - hf_build_request_df - hf_perform_request +- title: "Hugging Face Endpoint Utilities" + desc: "Small functions for checking things with Hugging FACE APIs" + contents: + - hf_get_endpoint_info + - hf_get_model_max_length + - title: "OpenAI Completions" desc: "Functions for working with OpenAI's APIs including structured outputs" contents: @@ -138,7 +145,6 @@ reference: - single_embedding_hf - df_embeddings_hf - authors: Jack Penzer: href: https://github.com/jpcompartir diff --git a/dev_docs/hf_classify_dev.qmd b/dev_docs/hf_classify_dev.qmd index 9dc61b5..e034d6a 100644 --- a/dev_docs/hf_classify_dev.qmd +++ b/dev_docs/hf_classify_dev.qmd @@ -485,3 +485,140 @@ hf_classify_df( concurrent_requests = 4 ) ``` + +# Writing to Files + +November, 2025 + +```{r} +hf_cl +``` + +# Truncation, padding, max_length + +We were getting some failures which seemed hard to pin down - was it Arabic, tokenisation issues, empty strings? No, it was token length. The test endpoint we're using has a max_model_length of 128. If we tried to feed in a string that tokenised to \> 128 tokens, then we got errors. + +```{r} +library(tidyverse) +library(EndpointR) +library(arrow) + +x <- read_parquet("~/dev/projects/diageo/mltm/data/main/mltm_l3_trend_df.parquet") + +endpointr_test_url <- httr2::secret_decrypt("osXYmCFrfPR4NsIIOBOIwWzp8XdA77BVP5QKVwJb0jzHPUwgQ8NmZcHXrhvoYrR8a2QNXZjmwIHpW2X55Ivaa_y11RV5TgXOiD7aK3O8hbDoeQ", "ENDPOINTR_KEY") + +test_sent_url <- httr2::secret_decrypt("STCYZqL6e2yxfGaz5AWA80pnINnfX-47vsqEcxpm23IkxE1R8238Gtnp7oeRUQ6GxkF6jWUYYmiSjIh-Abo1cWOe23qUsZ7uSy5UE9BwvJ9oWg", "ENDPOINTR_KEY") +``` + +So we grab a bunch of strings that are guaranteed to fail if we don't do anything special: + +```{r} +test_long_strings <- x |> + filter(language == "Spanish (Español)", + message != "", + !is.na(message)) |> + mutate(string_length = str_length(message)) |> + filter(string_length > 1000) |> + slice(10:15) + # mutate(message = str_sub(message, 1, 100)) |> +``` + +And then we make them pass, checking that the max_length argument is working as intended. + +```{r} +should_pass <- hf_classify_chunks( + test_long_strings$message, + test_long_strings$universal_message_id, + tidy_func = tidy_classification_response, + endpoint_url = test_sent_url, + output_dir = "test_dir/test_classify/test_passes", + chunk_size = 5, + concurrent_requests = 5, + timeout = 60, + max_retries = 10, + max_length = 128L, + id_col_name = "universal_message_id", + text_col_name = ".text_col" +) +``` + +``` +ℹ Processing 6 texts in 2 chunks of up to 5 rows per chunk +ℹ Intermediate results and metadata will be saved as .parquet files and .json in test_dir/test_classify/test_passes +ℹ Performing 5 requests in parallel (with 5 concurrent requests)... +✔ Chunk 1: 5 successful, 0 failed +ℹ Performing 1 request sequentially... +✔ Chunk 2: 1 successful, 0 failed +ℹ Processing completed, there were 6 successes +and 0 failures. +``` + +And we make them fail, to confirm the argument is working as intended! + +```{r} +should_fail <- hf_classify_chunks( + test_long_strings$message, + test_long_strings$universal_message_id, + tidy_func = tidy_classification_response, + endpoint_url = test_sent_url, + output_dir = "test_dir/test_classify/test_failures", + chunk_size = 5, + concurrent_requests = 5, + timeout = 60, + max_retries = 10, + max_length = 512L +) + +``` + +``` +ℹ Processing 6 texts in 2 chunks of up to 5 rows per chunk +ℹ Intermediate results and metadata will be saved as .parquet files and .json in test_dir/test_classify/test_failures +ℹ Performing 5 requests in parallel (with 5 concurrent requests)... +✔ Chunk 1: 0 successful, 5 failed +ℹ Performing 1 request sequentially... +! Sequential request to failed: HTTP 400 Bad Request. +✔ Chunk 2: 0 successful, 1 failed +ℹ Processing completed, there were 0 successes +and 6 failures. +``` + +Test the function over 10k data points + +```{r} + ten_k_results <- x |> + filter(message != "", + !is.na(message)) |> + slice(1:10000) |> + hf_classify_df( + text_var = message, + id_var = universal_message_id, + endpoint_url = test_sent_url, + key_name = "HF_API_KEY", + max_length = 128, + output_dir = "test_dir/test_classify/ten_thousand_rows", + tidy_func = tidy_classification_response, + chunk_size = 2500, + concurrent_requests = 15, + max_retries = 10L, + timeout = 60 + ) +``` + +```{r} +x |> + slice(1:10) |> + hf_classify_df( + message, + universal_message_id, + endpoint_url = test_sent_url, + key_name = "HF_API_KEY", + max_length = 128, + output_dir = "test_dir/test_classify/ten_thousand_rows", + tidy_func = tidy_classification_response, + chunk_size = 2500, + concurrent_requests = 15, + max_retries = 10L, + timeout = 60 + ) +``` diff --git a/dev_docs/hf_embed_dev.qmd b/dev_docs/hf_embed_dev.qmd new file mode 100644 index 0000000..11b1529 --- /dev/null +++ b/dev_docs/hf_embed_dev.qmd @@ -0,0 +1,122 @@ +--- +title: "hf_embed_dev" +format: html +--- + +```{r} +library(EndpointR) +library(arrow) +library(httr2) +library(purrr) +library(dplyr) +library(tidyr) +library(stringr) +``` + +```{r} +test_embed_url <- secret_decrypt( + "KWqk_H6v58UIaJm6WPAekfrAD8LVAZGUmfZNysv4Ze48NxHJCYyOPWdqMn8_z-xU_MFCccR8Qm0_qUzqcSMB6QlphuHPtkHmYievFG1L4J6k0g", + "ENDPOINTR_KEY" +) + +test_data <- read_parquet("~/dev/projects/diageo/mltm/data/main/mltm_l3_trend_df.parquet") +``` + +```{r} +test_data <- test_data |> + slice(1:10000) |> + mutate(string_length = str_length(message)) + +test_long_strings <- test_data |> + filter(string_length > 2500) + +test_short_strings <- test_data |> + filter(string_length < 500) |> + slice(1:50) +``` + +```{r} +should_fail <- test_long_strings |> + hf_embed_df( + message, + universal_message_id, + test_embed_url, + "HF_API_KEY", + # max_length = 8192L, + output_dir = "test_dir/test_embed/test_failures_long", + concurrent_requests = 5, + max_retries = 10L, + timeout = 60 + ) +``` + +```{r} +should_pass <- test_short_strings |> + hf_embed_df( + message, + universal_message_id, + test_embed_url, + "HF_API_KEY", + # max_length = 8192L, + output_dir = "test_dir/test_embed/test_passes_short", + concurrent_requests = 5, + max_retries = 10L, + timeout = 60 + ) +``` + +The endpoint reports 512 as model_max_length but actually the inference endpoint asks for 256 + +```{r} +should_pass_string_truncation <- test_long_strings |> + mutate(message = str_trunc(message, 250, ellipsis = "")) |> + hf_embed_df( + message, + universal_message_id, + test_embed_url, + "HF_API_KEY", + # max_length = 50, + output_dir = "test_dir/test_embed/test_pass_truncation", + concurrent_requests = 5, + max_retries = 10L, + timeout = 60 + ) +``` + +TEST Tokenisation is working + +```{r} +should_pass_tokenisation <- test_long_strings |> + hf_embed_df( + message, + universal_message_id, + test_embed_url, + "HF_API_KEY", + output_dir = "test_dir/test_embed/test_pass_truncation", + concurrent_requests = 5, + max_retries = 10L, + timeout = 60 + ) +``` + +```{r} +test_ten_k <- test_data |> + hf_embed_df( + message, + universal_message_id, + test_embed_url, + "HF_API_KEY", + output_dir = "test_dir/test_embed/test_ten_k", + concurrent_requests = 50, + max_retries = 10L, + timeout = 60 + ) +``` + +With AUTO_TRUNCATE: true, they do work (in the Endpoint configs) + +Getting model info - requires the API be up and running. + +```{r} +hf_get_endpoint_info(test_embed_url) +``` diff --git a/man/hf_classify_chunks.Rd b/man/hf_classify_chunks.Rd new file mode 100644 index 0000000..9fd875b --- /dev/null +++ b/man/hf_classify_chunks.Rd @@ -0,0 +1,80 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/hf_classify.R +\name{hf_classify_chunks} +\alias{hf_classify_chunks} +\title{Efficiently classify vectors of text in chunks} +\usage{ +hf_classify_chunks( + texts, + ids, + endpoint_url, + max_length = 512L, + tidy_func = tidy_classification_response, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 5L, + max_retries = 5L, + timeout = 30L, + key_name = "HF_API_KEY", + id_col_name = "id", + text_col_name = "text" +) +} +\arguments{ +\item{texts}{Character vector of texts to classify} + +\item{ids}{Vector of unique identifiers corresponding to each text (same length as texts)} + +\item{endpoint_url}{Hugging Face Classification Endpoint} + +\item{max_length}{The maximum number of tokens in the text variable. Beyond this cut-off everything is truncated.} + +\item{tidy_func}{Function to process API responses, defaults to +\code{tidy_classification_response}} + +\item{output_dir}{Path to directory for the .parquet chunks} + +\item{chunk_size}{Number of texts to process in each chunk before writing to disk (default: 5000)} + +\item{concurrent_requests}{Integer; number of concurrent requests (default: 5)} + +\item{max_retries}{Integer; maximum retry attempts (default: 5)} + +\item{timeout}{Numeric; request timeout in seconds (default: 30)} + +\item{key_name}{Name of environment variable containing the API key} + +\item{id_col_name}{Name for the ID column in output (default: "id"). When called from hf_classify_df(), this preserves the original column name.} + +\item{text_col_name}{Name for the text column in output (default: "text"). When called from hf_classify_df(), this preserves the original column name.} +} +\value{ +A data frame of classified documents with successes and failures +} +\description{ +Classifies large batches of text using a Hugging Face classification endpoint. +Processes texts in chunks with concurrent requests, writes intermediate results +to disk as Parquet files, and returns a combined data frame of all classifications. +} +\details{ +The function creates a metadata JSON file in \code{output_dir} containing processing +parameters and timestamps. Each chunk is saved as a separate Parquet file before +being combined into the final result. Use \code{output_dir = "auto"} to generate a +timestamped directory automatically. + +For single text classification, use \code{hf_classify_text()} instead. +} +\examples{ +\dontrun{ +# basic usage with vectors +texts <- c("I love this", "I hate this", "This is ok") +ids <- c("review_1", "review_2", "review_3") + +results <- hf_classify_chunks( + texts = texts, + ids = ids, + endpoint_url = "https://your-endpoint.huggingface.cloud", + key_name = "HF_API_KEY" +) +} +} diff --git a/man/hf_classify_df.Rd b/man/hf_classify_df.Rd index 77f9cb6..e8eaca8 100644 --- a/man/hf_classify_df.Rd +++ b/man/hf_classify_df.Rd @@ -10,14 +10,13 @@ hf_classify_df( id_var, endpoint_url, key_name, - ..., - tidy_func = tidy_batch_classification_response, - parameters = list(return_all_scores = TRUE), - batch_size = 4, + max_length = 512L, + output_dir = "auto", + tidy_func = tidy_classification_response, + chunk_size = 5000, concurrent_requests = 1, max_retries = 5, - timeout = 30, - progress = TRUE + timeout = 60 ) } \arguments{ @@ -31,23 +30,20 @@ hf_classify_df( \item{key_name}{Name of environment variable containing the API key} -\item{...}{Additional arguments passed to request functions} +\item{max_length}{The maximum number of tokens in the text variable. Beyond this cut-off everything is truncated.} + +\item{output_dir}{Path to directory for the .parquet chunks} \item{tidy_func}{Function to process API responses, defaults to \code{tidy_batch_classification_response}} -\item{parameters}{List of parameters for the API endpoint, defaults to -\code{list(return_all_scores = TRUE)}} - -\item{batch_size}{Integer; number of texts per batch (default: 4)} +\item{chunk_size}{Number of texts to process in each chunk before writing to disk (default: 5000)} \item{concurrent_requests}{Integer; number of concurrent requests (default: 1)} \item{max_retries}{Integer; maximum retry attempts (default: 5)} \item{timeout}{Numeric; request timeout in seconds (default: 30)} - -\item{progress}{Logical; whether to show progress bar (default: TRUE)} } \value{ Original data frame with additional columns for classification scores, @@ -58,9 +54,9 @@ Classifies texts in a data frame column using a Hugging Face classification endpoint and joins the results back to the original data frame. } \details{ -This function extracts texts from a specified column, classifies them using -\code{hf_classify_batch()}, and joins the classification results back to the -original data frame using a specified ID column. +This function extracts texts and IDs from the specified columns, classifies them in chunks. +It writes +\code{hf_classify_chunks()}, and then returns all of the chu The function preserves the original data frame structure and adds new columns for classification scores. If the number of rows doesn't match diff --git a/man/hf_embed_chunks.Rd b/man/hf_embed_chunks.Rd index ed307b9..9920509 100644 --- a/man/hf_embed_chunks.Rd +++ b/man/hf_embed_chunks.Rd @@ -8,12 +8,13 @@ hf_embed_chunks( texts, ids, endpoint_url, - output_file = "auto", + output_dir = "auto", chunk_size = 5000L, concurrent_requests = 5L, max_retries = 5L, timeout = 10L, - key_name = "HF_API_KEY" + key_name = "HF_API_KEY", + id_col_name = "id" ) } \arguments{ @@ -23,7 +24,7 @@ hf_embed_chunks( \item{endpoint_url}{Hugging Face Embedding Endpoint} -\item{output_file}{Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename.} +\item{output_dir}{Path to directory for the .parquet chunks} \item{chunk_size}{Number of texts to process in each chunk before writing to disk (default: 5000)} @@ -34,11 +35,13 @@ hf_embed_chunks( \item{timeout}{Request timeout in seconds (default: 10)} \item{key_name}{Name of environment variable containing the API key (default: "HF_API_KEY")} + +\item{id_col_name}{Name for the ID column in output (default: "id"). When called from hf_embed_df(), this preserves the original column name.} } \value{ A tibble with columns: \itemize{ -\item \code{id}: Original identifier from input +\item ID column (name specified by \code{id_col_name}): Original identifier from input \item \code{.error}: Logical indicating if request failed \item \code{.error_msg}: Error message if failed, NA otherwise \item \code{.chunk}: Chunk number for tracking diff --git a/man/hf_embed_df.Rd b/man/hf_embed_df.Rd index 8b4a227..ebd69e4 100644 --- a/man/hf_embed_df.Rd +++ b/man/hf_embed_df.Rd @@ -10,8 +10,8 @@ hf_embed_df( id_var, endpoint_url, key_name, - output_file = "auto", - chunk_size = 8L, + output_dir = "auto", + chunk_size = 5000L, concurrent_requests = 1L, max_retries = 5L, timeout = 15L, @@ -29,9 +29,9 @@ hf_embed_df( \item{key_name}{Name of the environment variable containing the API key} -\item{output_file}{Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename.} +\item{output_dir}{Path to directory for the .parquet chunks} -\item{chunk_size}{Number of texts to process in one batch (NULL for no batching)} +\item{chunk_size}{The size of each chunk that will be processed and then written to a file.} \item{concurrent_requests}{Number of requests to send at once. Some APIs do not allow for multiple requests.} @@ -49,6 +49,8 @@ High-level function to generate embeddings for texts in a data frame. This function handles the entire process from request creation to response processing, with options for batching & parallel execution. Setting the number of retries + +Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a \code{.parquet} file in the \verb{output_dir=} directory, which also contains a \code{metadata.json} file which tracks important information such as the endpoint URL used. Be sure to check any output directories into .gitignore! } \examples{ \dontrun{ @@ -58,34 +60,22 @@ Setting the number of retries text = c("First example", "Second example", "Third example") ) - # Use parallel processing without batching - embeddings_df <- hf_embed_df( - df = df, - text_var = text, - endpoint_url = "https://my-endpoint.huggingface.cloud", - id_var = id, - parallel = TRUE, - batch_size = NULL - ) - # Use batching without parallel processing embeddings_df <- hf_embed_df( df = df, text_var = text, endpoint_url = "https://my-endpoint.huggingface.cloud", - id_var = id, - parallel = FALSE, - batch_size = 10 + id_var = id ) - # Use both batching and parallel processing + # Use both chunking and parallel processing embeddings_df <- hf_embed_df( df = df, text_var = text, endpoint_url = "https://my-endpoint.huggingface.cloud", id_var = id, - parallel = TRUE, - batch_size = 10 + chunk_size = 10000, + concurrent_requests = 50 ) } } diff --git a/man/hf_get_endpoint_info.Rd b/man/hf_get_endpoint_info.Rd new file mode 100644 index 0000000..a905169 --- /dev/null +++ b/man/hf_get_endpoint_info.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils.R +\name{hf_get_endpoint_info} +\alias{hf_get_endpoint_info} +\title{Retrieve information about an endpoint} +\usage{ +hf_get_endpoint_info(endpoint_url, key_name = "HF_API_KEY") +} +\arguments{ +\item{endpoint_url}{Hugging Face Embedding Endpoint} + +\item{key_name}{Name of environment variable containing the API key (default: "HF_API_KEY")} +} +\value{ +JSON of endpoint information +} +\description{ +Retrieve information about an endpoint +} diff --git a/man/hf_get_model_max_length.Rd b/man/hf_get_model_max_length.Rd new file mode 100644 index 0000000..a7727ca --- /dev/null +++ b/man/hf_get_model_max_length.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils.R +\name{hf_get_model_max_length} +\alias{hf_get_model_max_length} +\title{Check the max number of tokens allowed for your inputs} +\usage{ +hf_get_model_max_length(model_name, api_key = "HF_API_KEY") +} +\arguments{ +\item{model_name}{name of the model e.g. 'sentence-transformers/mpnet-base-v2'} + +\item{api_key}{Your Hugging Face auth token} +} +\value{ +Integer value of the model_max_length from tokenizer config +} +\description{ +This function requires the model to have 'tokenizer_config.json' file with a +\code{model_max_length} key, otherwise it will error. +} diff --git a/tests/testthat/test-hf_classify.R b/tests/testthat/test-hf_classify.R index e0d89dc..d78ec94 100644 --- a/tests/testthat/test-hf_classify.R +++ b/tests/testthat/test-hf_classify.R @@ -148,6 +148,102 @@ test_that("hf_classify_batch processes a batch of texts and returns a tidied cla }) +test_that("hf_classify_chunks processes chunks correctly", { + texts <- paste0("text", 1:6) + ids <- paste0("id", 1:length(texts)) + temp_dir <- withr::local_tempdir() + expected_cols <- c("id", "text", ".error", ".error_msg", ".chunk", "positive", "negative", "neutral") + + # Test with chunk_size = 2 + chunk_2 <- expect_no_error(hf_classify_chunks( + texts = texts, + ids = ids, + endpoint_url = server$url("/test_single_sentiment"), + key_name = "HF_TEST_API_KEY", + chunk_size = 2, + concurrent_requests = 1, + output_dir = temp_dir + )) |> suppressMessages() + + expect_setequal(unique(chunk_2$`.chunk`), c(1, 2, 3)) + expect_setequal(names(chunk_2), expected_cols) + expect_equal(nrow(chunk_2), 6) + + # Test with chunk_size = 1 + chunk_1 <- expect_no_error(hf_classify_chunks( + texts = texts, + ids = ids, + endpoint_url = server$url("/test_single_sentiment"), + key_name = "HF_TEST_API_KEY", + chunk_size = 1, + concurrent_requests = 1, + output_dir = temp_dir + )) |> suppressMessages() + + expect_setequal(unique(chunk_1$`.chunk`), 1:6) + expect_equal(nrow(chunk_1), 6) +}) + +test_that("hf_classify_df works correctly with chunk processing", { + test_df <- data.frame( + id = paste0("id", 1:2), + text = c("text1", "text2"), + stringsAsFactors = FALSE + ) + output_dir <- withr::local_tempdir() + + result <- expect_no_error( + hf_classify_df( + df = test_df, + text_var = text, + id_var = id, + endpoint_url = server$url("/test_single_sentiment"), + key_name = "HF_TEST_API_KEY", + chunk_size = 1, + output_dir = output_dir + ) + ) |> + suppressMessages() + + expect_s3_class(result, "data.frame") + expect_equal(nrow(result), 2) + expect_true(all(c("id", "positive", "negative", "neutral", ".error", ".error_msg", ".chunk") %in% names(result))) + expect_equal(result$id, c("id1", "id2")) + expect_equal(result$positive, c(0.9, 0.9), tolerance = 1e-7) + expect_equal(result$negative, c(0.05, 0.05), tolerance = 1e-7) + expect_equal(result$neutral, c(0.05, 0.05), tolerance = 1e-7) + expect_equal(result$.error, c(FALSE, FALSE)) +}) + +test_that("hf_classify_df works with different chunk sizes", { + test_df <- data.frame( + id = paste0("id", 1:4), + text = paste0("text", 1:4), + stringsAsFactors = FALSE + ) + temp_dir <- withr::local_tempdir() + + result <- expect_no_error( + hf_classify_df( + df = test_df, + text_var = text, + id_var = id, + endpoint_url = server$url("/test_single_sentiment"), + key_name = "HF_TEST_API_KEY", + chunk_size = 2, + concurrent_requests = 1, + output_dir = temp_dir + ) + ) |> + suppressMessages() + + expect_s3_class(result, "data.frame") + expect_equal(nrow(result), 4) + expect_true(all(c("id", ".chunk", ".error", ".error_msg") %in% names(result))) + expect_equal(result$.error, c(FALSE, FALSE, FALSE, FALSE)) + expect_setequal(unique(result$.chunk), c(1, 2)) +}) + test_that("hf_classify_df's input validation is working", { # safety net for changes @@ -183,26 +279,14 @@ test_that("hf_classify_df's input validation is working", { ) expect_error( - hf_classify_df(df = test_df, text_var = text_content, id_var = doc_id, endpoint_url = "url", key_name = "key", batch_size = "text"), - "batch_size must be a number greater than 0" + hf_classify_df(df = test_df, text_var = text_content, id_var = doc_id, endpoint_url = "url", key_name = "key", chunk_size = "text"), + "chunk_size must be a number greater than 0" ) expect_error( - hf_classify_df(df = test_df, text_var = text_content, id_var = doc_id, endpoint_url = "url", key_name = "key", batch_size = NULL), - "batch_size must be a number greater than 0" + hf_classify_df(df = test_df, text_var = text_content, id_var = doc_id, endpoint_url = "url", key_name = "key", chunk_size = NULL), + "chunk_size must be a number greater than 0" ) }) - -# test_that("hf_classify_df processes a data frame of texts and returns a data frame", { -# -# -# test_df <- data.frame( -# id = c(1, 2), -# text = c("positive text", "negative text"), -# stringsAsFactors = FALSE -# ) -# -# -# }) diff --git a/tests/testthat/test-hf_embed.R b/tests/testthat/test-hf_embed.R index 354f935..3953ab8 100644 --- a/tests/testthat/test-hf_embed.R +++ b/tests/testthat/test-hf_embed.R @@ -87,7 +87,7 @@ test_that("hf_embed_batch allows custom tidy_func", { test_that("hf_embed_chunks replaces hf_embed_batch", { texts <- paste0("text", 1:6) ids <- paste0('id', 1:length(texts)) - temp_file <- tempfile(fileext = ".csv") + temp_dir <- withr::local_tempdir() expected_cols <- c("id", ".error", ".error_msg", ".chunk", "V1", "V2", "V3") @@ -98,7 +98,7 @@ test_that("hf_embed_chunks replaces hf_embed_batch", { key_name = "HF_TEST_API_KEY", chunk_size = 2, concurrent_requests =1, - output_file = temp_file + output_dir = temp_dir )) |> suppressMessages() expect_setequal(unique(chunk_2$`.chunk`), c(1, 2, 3)) @@ -111,7 +111,7 @@ test_that("hf_embed_chunks replaces hf_embed_batch", { key_name = "HF_TEST_API_KEY", chunk_size = 1, concurrent_requests =1, - output_file = temp_file + output_dir = temp_dir )) |> suppressMessages() expect_setequal(unique(chunk_1$`.chunk`), 1:6) @@ -120,11 +120,11 @@ test_that("hf_embed_chunks replaces hf_embed_batch", { test_that("hf_embed_df works correctly with real endpoint", { test_df <- data.frame( - id = c(1, 2), + id = paste0("id", 1:2), text = c("text1", "text2"), stringsAsFactors = FALSE ) - temp_file <- tempfile(fileext = ".csv") + output_dir <- withr::local_tempdir() result <- expect_no_error( hf_embed_df( @@ -133,8 +133,8 @@ test_that("hf_embed_df works correctly with real endpoint", { id_var = id, endpoint_url = server$url("/test_embedding"), key_name = "HF_TEST_API_KEY", - chunk_size = 2, - output_file = temp_file + chunk_size = 1, + output_dir = output_dir ) ) |> suppressMessages() @@ -142,7 +142,7 @@ test_that("hf_embed_df works correctly with real endpoint", { expect_s3_class(result, "data.frame") expect_equal(nrow(result), 2) expect_true(all(c("id", "V1", "V2", "V3", ".error", ".error_msg", ".chunk") %in% names(result))) - expect_equal(result$id, c(1, 2)) + expect_equal(result$id, c("id1", "id2")) expect_equal(result$V1, c(0.1, 0.1), tolerance = 1e-7) expect_equal(result$V2, c(0.2, 0.2), tolerance = 1e-7) expect_equal(result$V3, c(0.3, 0.3), tolerance = 1e-7) @@ -151,11 +151,11 @@ test_that("hf_embed_df works correctly with real endpoint", { test_that("hf_embed_df works with different batch sizes", { test_df <- data.frame( - id = c(1, 2), + id = c(paste0("id", 1:2)), text = c("text1", "text2"), stringsAsFactors = FALSE ) - temp_file <- tempfile(fileext = ".csv") + temp_dir <- withr::local_tempdir() result <- expect_no_error( hf_embed_df( @@ -166,7 +166,7 @@ test_that("hf_embed_df works with different batch sizes", { key_name = "HF_TEST_API_KEY", chunk_size = 1, concurrent_requests = 1, - output_file = temp_file + output_dir = temp_dir ) ) |> suppressMessages() diff --git a/todos.qmd b/todos.qmd index 8a46920..9fed3d6 100644 --- a/todos.qmd +++ b/todos.qmd @@ -2,9 +2,51 @@ # 0.1.2 +- [x] tests passing following hf\_\* changes +- [x] max_length for hf functions - not applicable in hf_embed\_\* functions as they use HF's TEI which doesn't allow max_length +- [ ] documented +- [x] Update README + +similarly to the upgrades applied in 0.1.1: + +``` +- [ ] `hf_embed_df()` + - [x] write to files + - [x] parquet + - [x] fix output dir + - [x] chunk_size + - [x] update tests + - [ ] update docs +- [x] `hf_classify_df()` + - [x] write to files + - [x] parquet + - [x] fix output dir + - [x] chunk_size + - [x] update tests + - [ ] update docs +- [ ] `oai_complete_df` + - [x] write to files + - [ ] parquet + - [x] chunk_size +- [ ] `oai_complete_chunks` + - [ ] write to files + - [ ] parquet + - [ ] chunk_size +- [ ] `oai_embed_df` + - [ ] write to files + - [ ] parquet + - [ ] chunk_size +``` + - Update the `hf_embed_df()` and `hf_classify_df()` functions to output files for intermediate results + + - use parquet not csv + - adds arrow to deps + - Refactor to include an ID in the request building for hf\_ functions + - It's tempting to abstract the processing chunks + file writes code, but I think two separate batch loops is cleaner because whilst the hf_embed_df and oai_complete_df functions are sometimes structurally similar: + - oai_complete_df takes a schema, returns a complex result in cases or a single column - hf_embed_df returns a tibble with many columns - primary error handling needs are different - HF for the endpoint start up, oai for rate limits diff --git a/vignettes/hugging_face_inference.Rmd b/vignettes/hugging_face_inference.Rmd index 7b6aa08..da2470e 100644 --- a/vignettes/hugging_face_inference.Rmd +++ b/vignettes/hugging_face_inference.Rmd @@ -1,6 +1,6 @@ --- title: "Using Hugging Face Inference Endpoints" -output: +output: html_document: toc: true toc_float: true @@ -37,12 +37,13 @@ library(EndpointR) library(dplyr) library(httr2) library(tibble) +library(arrow) my_data <- tibble( id = 1:3, text = c( "Machine learning is fascinating", - "I love working with embeddings", + "I love working with embeddings", "Natural language processing is powerful" ), category = c("ML", "embeddings", "NLP") @@ -52,7 +53,7 @@ my_data <- tibble( Follow Hugging Face's [docs](https://huggingface.co/docs/hub/security-tokens) to generate a Hugging Face token, and then register it with EndpointR: ```{r, keys_and_urls} -set_api_key("HF_TEST_API_KEY") +set_api_key("HF_TEST_API_KEY") ``` # Choosing Your Service @@ -68,6 +69,140 @@ For this vignette, we'll use the Inference API. To switch to dedicated endpoints Go to [Hugging Face's models hub](https://huggingface.co/models) and fetch the Inference API's URL for the model you want to embed your data with. Not all models are available via the Hugging Face Inference API, if you need to use a model that is not available you may need to deploy a [Dedicated Inference Endpoint](https://huggingface.co/inference-endpoints/dedicated). +# Understanding the Function Hierarchy + +EndpointR provides four levels of functions for working with Hugging Face endpoints. + +> **KEY FEATURE**: The `*_df()` and `*_chunks()` functions preserve your original column names. If you pass a data frame with columns named `review_id` and `review_text`, those exact names will appear in the output and in the saved `.parquet` files. This makes it easy to join results back to your original data. + +## Single Text Functions + +- `hf_embed_text()` - Embed a single text +- `hf_classify_text()` - Classify a single text + +Use these for one-off requests or testing. + +## Batch Functions + +- `hf_embed_batch()` - Embed multiple texts in memory +- `hf_classify_batch()` - Classify multiple texts in memory + +Use these for small to medium datasets (<5000 texts) that fit in memory. Results are returned as a single data frame. + +## Chunk Functions (NEW in v0.1.2) + +- `hf_embed_chunks()` - Process large volumes with incremental file writing +- `hf_classify_chunks()` - Process large volumes with incremental file writing + +Use these for large datasets (>5000 texts). Results are written incrementally as `.parquet` files to avoid memory issues and provide safety against crashes. + +## Data Frame Functions + +- `hf_embed_df()` - Convenience wrapper that calls `hf_embed_chunks()` +- `hf_classify_df()` - Convenience wrapper that calls `hf_classify_chunks()` + +**Most users will use these.** They handle extraction from data frames and call the chunk functions internally. + +## Choosing the Right Function + +Use this decision tree: + +```{r} +# Single text? Use _text functions +if (n_texts == 1) { + result <- hf_embed_text(text, endpoint_url, key_name) + # or + result <- hf_classify_text(text, endpoint_url, key_name) +} + +# Small batch (<5000 texts) and want results in memory only? +if (n_texts < 5000 && !need_file_output) { + results <- hf_embed_batch(texts, endpoint_url, key_name, batch_size = 10) + # or + results <- hf_classify_batch(texts, endpoint_url, key_name, batch_size = 8) +} + +# Large dataset or want file output for safety? +# Use _df functions (they call _chunks internally) +if (n_texts >= 5000 || need_safety) { + results <- hf_embed_df(df, text, id, endpoint_url, key_name, + chunk_size = 5000, output_dir = "my_results") + # or + results <- hf_classify_df(df, text, id, endpoint_url, key_name, + chunk_size = 2500, output_dir = "my_results", + max_length = 512) +} +``` + +> **Recommendation**: For most production use cases, use `_df` functions even for smaller datasets. The safety of incremental file writing is worth it. + +# Key Differences: Embeddings vs Classification + +Understanding the differences between embedding and classification functions is crucial for effective use. + +## Text Truncation Handling + +**Embeddings** (`hf_embed_*`): + +- **NO** `max_length` parameter in the R functions +- Truncation is handled **at the endpoint level** +- For Dedicated Endpoints: Set `AUTO_TRUNCATE=true` in your endpoint's environment variables +- For Inference API: Truncation is typically handled automatically by the model +- Uses TEI (Text Embeddings Inference) which only accepts `truncate`, not `truncation` or `max_length` + +**Classification** (`hf_classify_*`): + +- **HAS** `max_length` parameter (default: `512L`) +- Truncation is controlled **in your R code** +- Texts longer than `max_length` tokens are truncated before classification +- Uses standard inference parameters: `truncation=TRUE` and `max_length` + +```{r} +# Embeddings - NO max_length parameter +hf_embed_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = embed_url, + key_name = "HF_API_KEY" + # max_length not available - set AUTO_TRUNCATE in endpoint settings +) + +# Classification - max_length IS available +hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + max_length = 512 # Control truncation here +) +``` + +## Inference Parameters Sent to API + +The functions send different parameters to the Hugging Face API: + +**Embeddings**: + +```json +{ + "truncate": true +} +``` + +**Classification**: + +```json +{ + "return_all_scores": true, + "truncation": true, + "max_length": 512 +} +``` + +These differences are handled automatically - you don't need to worry about them unless you're debugging API issues. Check `metadata.json` (see below) to see what parameters were used. + # Embeddings ## Single Text @@ -115,35 +250,137 @@ The result includes: - `.error_message`: what went wrong (if anything) - `V1` to `V384`: the embedding values -## Data Frame +## Processing Data Frames with Chunk Writing + +Most commonly, you'll want to embed a column in a data frame. The `hf_embed_df()` function processes data in chunks and writes intermediate results to disk. + +### Understanding output_dir -Most commonly, you'll want to embed a column in a data frame: +Both `hf_embed_df()` and `hf_classify_df()` write intermediate results to disk as `.parquet` files. This provides: + +1. **Safety**: If your job crashes, you don't lose all progress +2. **Memory efficiency**: Large datasets don't overwhelm your RAM +3. **Reproducibility**: Metadata tracks exactly what parameters you used ```{r} +# Basic usage - auto-generates output directory embedding_result <- hf_embed_df( df = my_data, text_var = text, # column with your text id_var = id, # column with unique ids endpoint_url = embed_url, - key_name = "HF_API_KEY" + key_name = "HF_API_KEY", + output_dir = "auto", # Creates "hf_embeddings_batch_TIMESTAMP" + chunk_size = 5000, # Writes every 5000 rows + concurrent_requests = 2 ) + +# Custom output directory +embedding_result <- hf_embed_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = embed_url, + key_name = "HF_API_KEY", + output_dir = "my_embeddings_v1", # Your custom directory name + chunk_size = 5000 +) +``` + +### Output Directory Structure + +After running `hf_embed_df()` or `hf_classify_df()`, you'll have: + +``` +my_embeddings_v1/ +├── chunk_001.parquet +├── chunk_002.parquet +├── chunk_003.parquet +└── metadata.json +``` + +**IMPORTANT**: Add your output directories to `.gitignore`! These files contain API responses and can be large. + +```r +# .gitignore +hf_embeddings_batch_*/ +hf_classification_chunks_*/ +my_embeddings_v1/ ``` -Check for errors: +### Reading Results from Disk + +If your R session crashes or you want to reload results later: ```{r} -embedding_result |> count(.error) +# List all parquet files (excludes metadata.json automatically) +parquet_files <- list.files("my_embeddings_v1", + pattern = "\\.parquet$", + full.names = TRUE) + +# Read all chunks into a single data frame +results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() + +# Check for any errors +results |> count(.error) + +# Extract only successful embeddings +successful <- results |> filter(.error == FALSE) ``` -Extract just the embeddings: +### Understanding metadata.json + +The metadata file records everything about your processing job: ```{r} -embeddings_only <- embedding_result |> select(V1:V384) +metadata <- jsonlite::read_json("my_embeddings_v1/metadata.json") + +# Check which endpoint was used +metadata$endpoint_url + +# See processing parameters +metadata$chunk_size +metadata$concurrent_requests +metadata$timeout + +# See inference parameters (differs between embed and classify!) +metadata$inference_parameters +# For embeddings: {truncate: true} +# For classification: {return_all_scores: true, truncation: true, max_length: 512} + +# Check when the job ran +metadata$timestamp +``` + +This metadata is invaluable for: + +- Debugging why a job failed +- Reproducing results with identical settings +- Tracking which model/endpoint version was used +- Understanding performance characteristics + +### Check for Errors + +Always verify your results: + +```{r} +embedding_result |> count(.error) + +# View any failures (column names match your original data frame) +failures <- embedding_result |> + filter(.error == TRUE) |> + select(id, .error_message) + +# Extract just the embeddings for successful rows +embeddings_only <- embedding_result |> + filter(.error == FALSE) |> + select(starts_with("V")) ``` # Classification -Classification works the same way as embeddings, just with a different URL and output format. If neceessary, you can also provide a custom function for tidying the output. +Classification works similarly to embeddings, but with a different URL, output format, and the additional `max_length` parameter for controlling text truncation. ## Single Text @@ -157,7 +394,7 @@ sentiment <- hf_classify_text( ) ``` -## Data Frame +## Processing Data Frames ```{r} classification_result <- hf_classify_df( @@ -165,24 +402,107 @@ classification_result <- hf_classify_df( text_var = text, id_var = id, endpoint_url = classify_url, - key_name = "HF_API_KEY" + key_name = "HF_API_KEY", + max_length = 512, # Truncate texts longer than 512 tokens + output_dir = "my_classification_v1", + chunk_size = 2500, # Smaller chunks for classification + concurrent_requests = 1, + timeout = 60 # Longer timeout for classification ) ``` The result includes: -- Your original `id` column +- Your original ID and text columns (with their original names preserved) - Classification labels (e.g., POSITIVE, NEGATIVE) - Confidence scores -- Error tracking columns. +- Error tracking columns (`.error`, `.error_message`) +- Chunk tracking (`.chunk`) + +> **NOTE**: Classification labels are model and task specific. Check the model card on Hugging Face for label mappings. + +> **IMPORTANT**: The function preserves your original column names. If your data frame has `review_id` and `review_text`, those names will appear in the output, not generic `id` and `text`. + +## Renaming Classification Labels + +Many classification models use generic labels like `LABEL_0`, `LABEL_1`. You can rename these: + +```{r} +# Create a mapping function +labelid_2class <- function() { + return(list( + negative = "LABEL_0", + neutral = "LABEL_1", + positive = "LABEL_2" + )) +} + +# Apply the mapping +classification_result <- hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + max_length = 512 +) |> + dplyr::rename(!!!labelid_2class()) +``` + +# Utility Functions + +EndpointR provides utility functions to help you work with Hugging Face endpoints. + +## Get Model Token Limits + +Find out the maximum token length for a model: + +```{r} +# Get the model's max token length from Hugging Face +max_tokens <- hf_get_model_max_length( + model_name = "cardiffnlp/twitter-roberta-base-sentiment", + api_key = "HF_API_KEY" +) -> **NOTE**: Classification labels are model and task specific. +# Use this to set max_length for classification +hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + max_length = max_tokens # Use the model's actual limit +) +``` + +This is especially useful when working with different models that have varying token limits (e.g., 512, 1024, 2048). + +## Get Endpoint Information + +Retrieve detailed information about your Dedicated Inference Endpoint: + +```{r} +endpoint_info <- hf_get_endpoint_info( + endpoint_url = "https://your-endpoint.endpoints.huggingface.cloud", + key_name = "HF_API_KEY" +) + +# Check endpoint configuration +endpoint_info +``` + +This is useful for: + +- Checking endpoint status +- Verifying model configuration +- Understanding available features +- Debugging connection issues # Using Dedicated Endpoints To use dedicated endpoints instead of the Inference API: -1. Deploy your model to a dedicated endpoint (see Hugging Face docs) +1. Deploy your model to a dedicated endpoint (see [Hugging Face docs](https://huggingface.co/docs/inference-endpoints)) 2. Get your endpoint URL 3. Replace the URL in any function: @@ -198,39 +518,263 @@ result <- hf_embed_text( ) ``` -> **Note**: Dedicated endpoints take 20-30 seconds to start if they're idle. Set `max_retries = 5` to give them time to wake up. +> **Note**: Dedicated endpoints take 20-30 seconds to start if they're idle (cold start). Set `max_retries = 10` to give them time to wake up. + +## Setting AUTO_TRUNCATE for Embedding Endpoints + +For Dedicated Inference Endpoints running embedding models, you should enable automatic truncation: + +1. In your endpoint settings on Hugging Face +2. Add environment variable: `AUTO_TRUNCATE=true` +3. This handles long texts automatically at the endpoint level + +Without this, very long texts may cause "Payload too large" errors. + +# Tips and Best Practices + +## Performance Tuning + +- **Start conservative**: Begin with `chunk_size = 2500` and `concurrent_requests = 1` +- **Scale gradually**: Monitor for errors as you increase concurrency +- **Embeddings are faster**: You can often use higher concurrency for embeddings than classification +- **Watch your rate limits**: + - Inference API: Shared limits, reduce concurrency if you hit errors + - Dedicated Endpoints: Limited by hardware, not API rate limits + +## Memory Management + +- Use `chunk_size` to control memory usage +- Smaller chunks = more frequent disk writes = less memory needed +- For very large datasets (>100k rows), use `chunk_size = 1000-2500` + +```{r} +# For very large datasets +hf_embed_df( + df = large_data, + text_var = text, + id_var = id, + endpoint_url = embed_url, + key_name = "HF_API_KEY", + chunk_size = 1000, # Smaller chunks for memory efficiency + concurrent_requests = 1 +) +``` + +## Truncation Strategy + +**For Embeddings**: + +1. Set `AUTO_TRUNCATE=true` in your Dedicated Endpoint's environment variables +2. For Inference API, truncation is handled automatically by most models +3. Consider preprocessing very long texts before embedding (e.g., take first N characters) -# Tips +**For Classification**: -- Start with small batch sizes (3-5) and increase gradually -- The Inference API has rate limits - dedicated endpoints have hardware constraints, increase hardware for higher limits -- For production use, choose dedicated endpoints -- Check the [Improving Performance](improving_performance.html) vignette for speed tips +1. Use `hf_get_model_max_length()` to check the model's token limit +2. Set `max_length` appropriately (default 512 works for most models) +3. For documents longer than `max_length`, consider: + - Chunking documents and classifying each chunk + - Summarization before classification + - Using models with longer context windows + +```{r} +# Get model's actual max length +model_limit <- hf_get_model_max_length( + model_name = "distilbert/distilbert-base-uncased-finetuned-sst-2-english", + api_key = "HF_API_KEY" +) + +# Use 90% of the limit to be safe +safe_limit <- as.integer(model_limit * 0.9) + +hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + max_length = safe_limit +) +``` + +## Error Recovery + +Always check for errors and consider retrying failures: + +```{r} +# Check results for errors +results |> count(.error) + +# Identify failed texts (column names match your input data frame) +failed <- results |> filter(.error == TRUE) + +# Note: Column names below will match your original data frame +# If you used review_id and review_text, use those names instead +failed |> select(id, .error_msg) + +# Retry failed texts with adjusted parameters +# Access text column by its actual name from your data +retry_results <- hf_embed_batch( + texts = failed$text, # Use your actual column name + endpoint_url = embed_url, + key_name = "HF_API_KEY", + batch_size = 1, # One at a time for failures + timeout = 30, # Longer timeout + max_retries = 10 # More retries +) +``` + +## Production Recommendations + +1. **Always use output_dir**: Never rely solely on in-memory results for large jobs +2. **Monitor metadata**: Check `metadata.json` to verify your settings +3. **Add to .gitignore**: Keep API responses out of version control +4. **Use Dedicated Endpoints**: For production workloads, avoid the free Inference API +5. **Set appropriate timeouts**: Classification needs longer timeouts than embeddings +6. **Test with small samples**: Before processing 1M rows, test with 100 rows +7. **Monitor costs**: Track your Dedicated Endpoint usage on Hugging Face # Common Issues -**Rate limits**: Reduce batch size or add delays between requests +## "Payload too large" Errors + +**For Embeddings**: + +- Not fixable in R code - must configure endpoint +- **Dedicated Endpoints**: Set `AUTO_TRUNCATE=true` in endpoint environment variables +- **Inference API**: Preprocess and truncate texts before sending + +```{r} +# Preprocessing approach for Inference API +my_data <- my_data |> + mutate(text = substr(text, 1, 5000)) # Limit to ~5000 characters +``` + +**For Classification**: + +- Reduce the `max_length` parameter + +```{r} +hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + max_length = 256 # Reduce from default 512 +) +``` + +## Timeouts -**Model not available**: Not all models work with the Inference API. Check the model page or use dedicated endpoints. +Classification takes longer than embeddings. Increase timeout if needed: + +```{r} +hf_classify_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = classify_url, + key_name = "HF_API_KEY", + timeout = 120, # Increase from default 60 + max_retries = 10 +) +``` + +## Dedicated Endpoint Cold Starts + +Dedicated endpoints take 20-30 seconds to wake up from idle: + +```{r} +# Set higher max_retries to allow for cold start +hf_embed_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = dedicated_url, + key_name = "HF_API_KEY", + max_retries = 10, # Give it time to wake up + timeout = 30 +) +``` -**Timeouts**: Increase `max_retries` or reduce batch size +The first chunk may fail or be slow, but subsequent chunks will be fast once the endpoint is warm. + +## Out of Memory Errors + +Reduce `chunk_size`: + +```{r} +# Instead of default 5000 +hf_embed_df( + df = large_data, + text_var = text, + id_var = id, + endpoint_url = embed_url, + key_name = "HF_API_KEY", + chunk_size = 1000, # Smaller chunks + concurrent_requests = 1 +) +``` + +## Rate Limit Errors + +**For Inference API**: + +- Reduce `concurrent_requests` to 1 +- Increase delays between requests (handled automatically by retries) + +```{r} +hf_embed_df( + df = my_data, + text_var = text, + id_var = id, + endpoint_url = embed_url, + key_name = "HF_API_KEY", + concurrent_requests = 1, # Sequential processing + max_retries = 10 # More retries with backoff +) +``` + +**For Dedicated Endpoints**: + +- Not typically rate-limited +- If you see errors, your hardware may be overwhelmed +- Reduce `concurrent_requests` or upgrade your endpoint hardware + +## Model Not Available + +Not all models work with the Inference API. Check the model page on Hugging Face. If the model isn't available via Inference API, you'll need to: + +1. Deploy a Dedicated Inference Endpoint +2. Use a different model that is available via Inference API +3. Run the model locally (outside of EndpointR) # Improving Performance -EndpointR's functions come with knobs and dials and you can turn to improve throughput and performance. Visit the [Improving Performance](articles/improving_performance.html) vignette for more information. +For detailed performance optimization strategies, visit the [Improving Performance](improving_performance.html) vignette. + +Quick tips: + +- Increase `concurrent_requests` gradually while monitoring errors +- Use larger `chunk_size` values for faster processing (if memory allows) +- For Dedicated Endpoints, upgrade hardware for better throughput +- Use batch functions (`hf_embed_batch()`, `hf_classify_batch()`) for small datasets to avoid file I/O overhead # Appendix ## Comparison of Inference API vs Dedicated Inference Endpoints -| Feature | Inference API | Dedicated Inference Endpoints | -|----|----|----| -| **Accessibility** | Public, shared service | Private, dedicated hardware | -| **Cost** | Free (with paid tiers) | Paid service - rent specific hardware | -| **Hardware** | Shared computing resources | Dedicated hardware allocation | -| **Wait Times** | Variable, unknowable in advance | Predictable, minimal queuing, \~30s for first request | -| **Production Ready** | Not recommended for production | Recommended for production use | -| **Use Case** | Casual usage, testing, prototyping | Production applications, consistent performance | -| **Scalability** | Limited by shared resources | Scales with dedicated allocation | -| **Availability** | Subject to shared infrastructure limits | Guaranteed availability during rental period | -| **Model Coverage** | Commonly-used models, models selected by Hugging Face | Virtually all models on the Hub are available | +| Feature | Inference API | Dedicated Inference Endpoints | +|------------------------|------------------------------------------------|------------------------------------------| +| **Accessibility** | Public, shared service | Private, dedicated hardware | +| **Cost** | Free (with paid tiers) | Paid service - rent specific hardware | +| **Hardware** | Shared computing resources | Dedicated hardware allocation | +| **Wait Times** | Variable, unknowable in advance | Predictable, \~30s for cold start | +| **Production Ready** | Not recommended for production | Recommended for production use | +| **Use Case** | Casual usage, testing, prototyping | Production applications | +| **Scalability** | Limited by shared resources | Scales with dedicated allocation | +| **Availability** | Subject to shared infrastructure limits | Guaranteed availability during rental | +| **Model Coverage** | Commonly-used models, models selected by HF | Virtually all models on the Hub | +| **Truncation Control** | Limited (model-dependent) | Full control via environment variables | +