diff --git a/DESCRIPTION b/DESCRIPTION index 628d86a..f2fab13 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: EndpointR Title: Connects to various Machine Learning inference providers -Version: 0.1.2 +Version: 0.2 Authors@R: person("Jack", "Penzer", , "Jack.penzer@sharecreative.com", role = c("aut", "cre")) Description: EndpointR is a 'batteries included', open-source R package for connecting to various APIs for Machine Learning model predictions. EndpointR is built for company-specific use cases, so may not be useful to a wide audience. diff --git a/NAMESPACE b/NAMESPACE index 6346a46..bdc43fe 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,6 @@ # Generated by roxygen2: do not edit by hand +export(ant_build_messages_request) export(create_json_schema) export(get_api_key) export(hf_build_request) @@ -25,6 +26,7 @@ export(oai_complete_chunks) export(oai_complete_df) export(oai_complete_text) export(oai_embed_batch) +export(oai_embed_chunks) export(oai_embed_df) export(oai_embed_text) export(perform_requests_with_strategy) diff --git a/NEWS.md b/NEWS.md index c48bc6f..74dad18 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,10 @@ +# EndpointR 0.2 + +- error message and status propagation improvement. Now writes .error, .error_msg (standardised across package), and .status. Main change is preventing httr2 eating the errors before we can deal with them +- adds parquet writing to oai_complete_df and oai_embed_df +- adds chunks func to oai_embed, and re-writes all batch -\> chunk logic +- implements the Anthropic messages API with structured outputs (via BETA) + # EndpointR 0.1.2 - **File writing improvements**: `hf_embed_df()` and `hf_classify_df()` now write intermediate results as `.parquet` files to `output_dir` directories, similar to improvements in 0.1.1 for OpenAI functions @@ -9,6 +16,7 @@ - **Dependency update**: Package now depends on `arrow` for faster `.parquet` file writing and reading - **Metadata tracking**: Hugging Face functions that write to files (`hf_embed_df()`, `hf_classify_df()`, `hf_embed_chunks()`, `hf_classify_chunks()`) now write `metadata.json` to output directories containing: + - Endpoint URL and API key name used - Processing parameters (chunk_size, concurrent_requests, timeout, max_retries) - Inference parameters (truncate, max_length) @@ -18,6 +26,7 @@ - **max_length parameter**: Added `max_length` parameter to `hf_classify_df()` and `hf_classify_chunks()` for text truncation control. Note: `hf_embed_df()` handles truncation automatically via endpoint configuration (set `AUTO_TRUNCATE` in endpoint settings) - **New utility functions**: + - `hf_get_model_max_length()` - Retrieve maximum token length for a Hugging Face model - `hf_get_endpoint_info()` - Retrieve detailed information about a Hugging Face Inference Endpoint @@ -36,4 +45,3 @@ Initial BETA release, ships with: - Support for text completion using OpenAI models via the Chat Completions API - Support for embeddings with the OpenAI Embeddings API - Structured outputs via JSON schemas and validators - diff --git a/R/anthropic_messages.R b/R/anthropic_messages.R new file mode 100644 index 0000000..e828609 --- /dev/null +++ b/R/anthropic_messages.R @@ -0,0 +1,175 @@ +# constants ---- +.ANT_API_VERSION <- "2023-06-01" +.ANT_STRUCTURED_OUTPUTS_BETA <- "structured-outputs-2025-11-13" +.ANT_MESSAGES_ENDPOINT <- "https://api.anthropic.com/v1/messages" +.ANT_DEFAULT_MODEL <- "claude-haiku-4-5" + +#' Build an Anthropic Messages API request +#' +#' @description +#' Constructs an httr2 request object for Anthropic's Messages API. +#' Handles message formatting, system prompts, and optional JSON schema +#' for structured outputs. When using strucutred outputs you must select the correct model. +#' +#' +#' @details +#' This function creates the HTTP request but does not execute it. For +#' structured outputs, you must use a supported model (Claude Sonnet 4.5 +#' or Opus 4.1) and the request will automatically include the required +#' beta header. +#' +#' The `schema` parameter accepts either: +#' - A `json_schema` S7 object created with `create_json_schema()` +#' - A raw list in Anthropic's `output_format` structure +#' +#' Unlike OpenAI, Anthropic uses `output_format` (not `response_format`) +#' and the schema structure differs slightly. +#' +#' @param input Text input to send to the model +#' @param endpointr_id An id that will persist through to response +#' @param model Anthropic model to use (default: "claude-haiku-4.5") +#' @param temperature Sampling temperature (0-2), higher values = more randomness +#' @param max_tokens Maximum tokens in response +#' @param schema Optional JSON schema for structured output (json_schema object or list) +#' @param system_prompt Optional system prompt +#' @param key_name Environment variable name for API key +#' @param endpoint_url Anthropic API endpoint URL +#' @param timeout Request timeout in seconds +#' @param max_retries Maximum number of retry attempts for failed requests + +#' +#' @return An httr2 request object +#' @export +#' +#' @seealso \url{https://platform.claude.com/docs/en/build-with-claude/structured-outputs} +#' @examples +#' \dontrun{ +#' # simple request +#' req <- ant_build_messages_request( +#' input = "What is the capital of France?", +#' max_tokens = 100 +#' ) +#' +#' # with structured output +#' schema <- create_json_schema( +#' name = "capital_response", +#' schema = schema_object( +#' country = schema_string(), +#' capital = schema_string(), +#' required = c("country", "capital") +#' ) +#' ) +#' req <- ant_build_messages_request( +#' input = "What is the capital of France?", +#' schema = schema, +#' max_tokens = 100, +#' model = "sonnet-4-5" +#' ) +#' } +ant_build_messages_request <- function( + input, + endpointr_id = NULL, + model = .ANT_DEFAULT_MODEL, + temperature = 0, + max_tokens = 500L, + schema = NULL, + system_prompt = NULL, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_MESSAGES_ENDPOINT, + timeout = 30L, + max_retries = 5L + ) { + # can't use `base_request()` from core.R because Anthropic use different auth (x-api-key) so we add as a header + + stopifnot( + "input must be a non-empty character string" = is.character(input) && length(input) == 1 && nchar(input) > 0, + "model must be a character string" = is.character(model) && length(model) == 1, + "temperature must be numeric between 0 and 1" = is.numeric(temperature) && temperature >= 0 && temperature <= 1, # diff to OAI API + "max_tokens must be a positive integer" = is.numeric(max_tokens) && max_tokens > 0) + + use_structured_outputs <- FALSE # flag for later control flow + + api_key <- get_api_key(key_name) + + messages <- list( + list(role = "user", content = input) + ) + + body <- list( + model = model, + messages = messages, + max_tokens = as.integer(max_tokens), + temperature = temperature + ) + + # Anthropic API takes system_prompt as its own parameter, different to OAI where we concatenate + + if(!is.null(system_prompt)){ + if (!rlang::is_scalar_character(system_prompt)){ + cli::cli_abort("{.arg system_prompt} must be a {.cls character} of length 1, e.g. 'This is a valid system prompt'") + } + + body$system <- system_prompt + } + + # + if(!is.null(schema)) { + use_structured_outputs <- TRUE + if (inherits(schema, "EndpointR::json_schema")) { + body$output_format <- .ant_format_schema(schema) + } else if (is.list(schema)) { + cli::cli_alert_warning("Your {.arg schema} is a list, not an EndpointR json_schema") + body$output_format <- schema + } else { + cli::cli_abort("{.arg chema} must be an EndpointR json_schema object or a list") + } + } + + # build the request with headers, auth, timeout, retries, backoff (incl. system prompt if applicable) + request <- httr2::request(endpoint_url) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_method("POST") |> + httr2::req_headers( + "Content-Type" = "application/json", + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_error(is_error = ~ FALSE) |> # don't let httr2 auto-throw errors; we handle them ourselves + httr2::req_timeout(timeout) |> + httr2::req_retry( + max_tries = max_retries, + backoff = ~ 2 ^ .x, + retry_on_failure = TRUE + ) |> + httr2::req_body_json(body) + + # if we did use structured outputs then we need to add the anthropic-beta header (this will be patched at some point I expect) + + if (use_structured_outputs) { + request <- httr2::req_headers(request, "anthropic-beta" = .ANT_STRUCTURED_OUTPUTS_BETA) + } + + if (!is.null(endpointr_id)) { + request <- httr2::req_headers(request, endpointr_id = endpointr_id) + } + + return(request) +} + + + +#' Convert json_schema S7 object to Anthropic output_format structure +#' @keywords internal +.ant_format_schema <- function(schema) { + if (!inherits(schema, "EndpointR::json_schema")) { + cli::cli_abort("schema must be a json_schema object") + } + + # Anthropic uses output_format with type "json_schema" + # The schema goes directly in the "schema" field (not nested like OpenAI) + list( + type = "json_schema", + schema = schema@schema + ) +} + diff --git a/R/core.R b/R/core.R index a1c649d..a216123 100644 --- a/R/core.R +++ b/R/core.R @@ -24,7 +24,8 @@ base_request <- function(endpoint_url, api_key){ httr2::req_user_agent("EndpointR") |> httr2::req_method("POST") |> httr2::req_headers("Content-Type" = "application/json") |> - httr2::req_auth_bearer_token(token = api_key) + httr2::req_auth_bearer_token(token = api_key) |> + httr2::req_error(is_error = ~ FALSE) # don't let httr2 auto-throw errors; we handle them ourselves for better error messages return(req) } @@ -34,22 +35,36 @@ base_request <- function(endpoint_url, api_key){ #' #' @description #' Wrapper around httr2::req_perform that handles errors gracefully. +#' Returns the response object directly - check status with httr2::resp_status(). #' #' @param request An httr2 request object #' -#' @return A list with components $result and $error +#' @return A list with components $result (httr2_response or NULL) and $error (NULL or condition) #' @export safely_perform_request <- function(request) { purrr::safely(httr2::req_perform)(request) } +#' Perform request and return response or error object +#' +#' @description +#' Performs a request and returns the response. Since req_error(is_error = ~ FALSE) +#' is set in base_request(), httr2 won't throw errors for HTTP status codes >= 400. +#' Instead, callers should check the response status with httr2::resp_status(). +#' +#' @param request An httr2 request object +#' +#' @return An httr2_response object (check status with resp_status()) or an error condition +#' @keywords internal perform_request_or_return_error <- function(request) { tryCatch({ response <- httr2::req_perform(request) - + # with req_error(is_error = ~ FALSE), we get responses even for HTTP errors + # callers should check status themselves return(response) }, error = function(e) { - cli::cli_alert_warning("Sequential request to {.url {request$url}} failed: {conditionMessage(e)}") + # this catches network errors, timeouts, etc. (not HTTP status errors) + cli::cli_alert_warning("Request to {.url {request$url}} failed: {conditionMessage(e)}") return(e) }) } @@ -62,13 +77,17 @@ perform_request_or_return_error <- function(request) { #' Automatically chooses sequential processing when concurrent_requests = 1 #' or when there's only one request. #' -#' @details returns responses in the order that requests were sent, and returns errors in a predictable format. +#' @details Returns responses in the order that requests were sent. +#' Since requests use req_error(is_error = ~ FALSE), HTTP error responses (status >= 400) +#' are returned as httr2_response objects rather than being thrown as errors. +#' Callers should check response status with httr2::resp_status() or use +#' httr2::resps_successes() / httr2::resps_failures() to categorise responses. #' #' @param requests List of httr2_request objects to perform #' @param concurrent_requests Integer specifying maximum number of simultaneous requests (default: 1) #' @param progress Logical indicating whether to show progress bar (default: TRUE) #' -#' @return List of httr2_response objects or error objects for failed requests +#' @return List of httr2_response objects (check status with resp_status()) or error objects for network failures #' @export #' @examples #' \dontrun{ @@ -132,7 +151,7 @@ perform_requests_with_strategy <- function(requests, #' @return A tibble with processed results or error information, including: #' - original_index: Position in original request batch #' - .error: Logical indicating if an error occurred -#' - .error_message: Character description of any error +#' - .error_msg: Character description of any error #' - Additional columns from tidy_func output #' #' @export @@ -146,21 +165,32 @@ perform_requests_with_strategy <- function(requests, #' ) #' } process_response <- function(resp, indices, tidy_func) { - #higher-order function for processing (takes function as inputs) + # higher-order function for processing (takes function as inputs) if (inherits(resp, "httr2_response")) { + # check if response is an error (status >= 400) + status <- httr2::resp_status(resp) + if (status >= 400) { + error_msg <- .extract_api_error(resp) + cli::cli_warn("Request failed with status {status}: {error_msg}") + return(.create_error_tibble(indices, error_msg, status = status)) + } + tryCatch({ result <- tidy_func(resp) result$original_index <- indices result$.error <- FALSE - result$.error_message <- NA_character_ + result$.error_msg <- NA_character_ + result$.status <- NA_integer_ return(result) }, error = function(e) { cli::cli_warn("Error processing response: {conditionMessage(e)}") return(.create_error_tibble(indices, conditionMessage(e))) }) } else { - cli::cli_warn("Request failed: {conditionMessage(resp)}") - return(.create_error_tibble(indices, "Request failed")) + # handle non-response objects (e.g., errors from network failures) + error_msg <- .extract_api_error(resp, "Request failed") + cli::cli_warn("Request failed: {error_msg}") + return(.create_error_tibble(indices, error_msg)) } } @@ -171,27 +201,72 @@ process_response <- function(resp, indices, tidy_func) { #' Ensures uniform error reporting across different failure modes. #' #' @param indices Vector of indices indicating original request positions -#' @param error_message Character string or condition object describing the error +#' @param error_msg Character string or condition object describing the error +#' @param status HTTP status code (integer) or NA_integer_ for non-HTTP errors. +#' Defaults to NA_integer_. #' #' @return A tibble with columns: #' - original_index: Position in original request batch -#' - .error: Always TRUE for error tibbles -#' - .error_message: Character description of the error +#' - .error: TRUE for errors +#' - .error_msg: Character description of the error +#' - .status: HTTP status code (integer) or NA for non-HTTP errors #' #' @keywords internal -.create_error_tibble <- function(indices, error_message) { +.create_error_tibble <- function(indices, error_msg, status = NA_integer_) { # for consistent outputs with safely function(s) - if (!is.character(error_message)) { - if (inherits(error_message, "condition")) { - error_message <- conditionMessage(error_message) + if (!is.character(error_msg)) { + if (inherits(error_msg, "condition")) { + error_msg <- conditionMessage(error_msg) } else { - error_message <- as.character(error_message) + error_msg <- as.character(error_msg) } } return(tibble::tibble( original_index = indices, .error = TRUE, - .error_message = error_message + .error_msg = error_msg, + .status = status )) } + + +#' Extract error message from an API response +#' +#' @description +#' Extracts a meaningful error message from an httr2 response object. +#' Handles different API response formats (OpenAI, Anthropic, HuggingFace). +#' +#' @param response An httr2_response object, error object, or other response type +#' @param fallback_message Message to return if extraction fails +#' +#' @return Character string containing the error message, or NA_character_ if response is successful +#' @keywords internal +.extract_api_error <- function(response, fallback_message = "Unknown error") { + # handle non-response objects (e.g., errors from network failures) + if (!inherits(response, "httr2_response")) { + if (inherits(response, "error") || inherits(response, "condition")) { + return(conditionMessage(response)) + } + return(as.character(fallback_message)) + } + + status <- httr2::resp_status(response) + if (status < 400) return(NA_character_) + + # try to extract error from response body - different APIs use different formats + + tryCatch({ + body <- httr2::resp_body_json(response) + # huggingface format: {"error": "..."} - check first as it's a string not a list + if (!is.null(body$error) && is.character(body$error)) return(body$error) + # openai format: {"error": {"message": "...", "type": "..."}} + if (!is.null(body$error) && is.list(body$error) && !is.null(body$error$message)) return(body$error$message) + # anthropic format: {"message": "..."} + if (!is.null(body$message)) return(body$message) + # fallback to status code + paste("HTTP", status) + }, error = function(e) { + paste("HTTP", status) + }) +} diff --git a/R/data.R b/R/data.R index 14c731e..42b3869 100644 --- a/R/data.R +++ b/R/data.R @@ -28,7 +28,7 @@ #' \item{text}{Character; the original text that was embedded} #' \item{category}{Character; category classification of the text} #' \item{.error}{Logical; whether the embedding process failed} -#' \item{.error_message}{Character; error message if embedding failed (NA if successful)} +#' \item{.error_msg}{Character; error message if embedding failed (NA if successful)} #' \item{V1}{Numeric; embedding vector dimensions} #' \item{V2}{Numeric; embedding vector dimensions} #' \item{V3}{Numeric; embedding vector dimensions} @@ -815,7 +815,7 @@ #' \item{NEGATIVE}{Numeric; probability score for negative sentiment (0-1)} #' \item{POSITIVE}{Numeric; probability score for positive sentiment (0-1)} #' \item{.error}{Logical; whether the classification process failed} -#' \item{.error_message}{Character; error message if classification failed (NA if successful)} +#' \item{.error_msg}{Character; error message if classification failed (NA if successful)} #' } #' @source Generated using Hugging Face sentiment classification model via EndpointR functions "df_sentiment_classification_example" diff --git a/R/hf_classify.R b/R/hf_classify.R index 008898c..d9cd98c 100644 --- a/R/hf_classify.R +++ b/R/hf_classify.R @@ -323,7 +323,7 @@ hf_classify_batch <- function(texts, result <- tidy_func(response$result) result$original_index <- batch_data$batch_indices[[1]] result$.error <- FALSE - result$.error_message <- NA_character_ + result$.error_msg <- NA_character_ }, error = function(e) { cli::cli_warn("Error in single batch request: {conditionMessage(e)}") return(.create_error_tibble(batch_data$batch_indices, conditionMessage(e))) @@ -527,8 +527,18 @@ hf_classify_chunks <- function(texts, progress = TRUE ) - chunk_successes <- httr2::resps_successes(responses) - chunk_failures <- httr2::resps_failures(responses) + # separate actual responses from error objects (network failures, etc.) + is_response <- purrr::map_lgl(responses, inherits, "httr2_response") + response_objects <- responses[is_response] + error_objects <- responses[!is_response] + + # split responses by HTTP status code (not just by type) + is_success <- purrr::map_lgl(response_objects, ~httr2::resp_status(.x) < 400) + chunk_successes <- response_objects[is_success] + http_failures <- response_objects[!is_success] + + # combine HTTP failures with network/other errors + chunk_failures <- c(http_failures, error_objects) n_chunk_successes <- length(chunk_successes) n_chunk_failures <- length(chunk_failures) @@ -551,6 +561,7 @@ hf_classify_chunks <- function(texts, !!text_col_name := successes_texts, .error = FALSE, .error_msg = NA_character_, + .status = NA_integer_, .chunk = chunk_num ) |> dplyr::bind_cols(successes_content) @@ -561,14 +572,30 @@ hf_classify_chunks <- function(texts, failures_ids <- purrr::map(chunk_failures, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> unlist() failures_texts <- purrr::map_chr(chunk_failures, \(x) purrr::pluck(x, "request", "body", "data", "inputs")) |> unlist() - failures_msgs <- purrr::map_chr(chunk_failures, \(x) purrr::pluck(x, "message", .default = "Unknown error")) - + failures_msgs <- purrr::map_chr(chunk_failures, \(x) { + if (inherits(x, "httr2_response")) { + .extract_api_error(x) + } else { + # error object - try to get resp from it + resp <- purrr::pluck(x, "resp") + if (!is.null(resp)) .extract_api_error(resp) else .extract_api_error(x, "Unknown error") + } + }) + failures_status <- purrr::map_int(chunk_failures, \(x) { + if (inherits(x, "httr2_response")) { + httr2::resp_status(x) + } else { + resp <- purrr::pluck(x, "resp") + if (!is.null(resp)) httr2::resp_status(resp) else NA_integer_ + } + }) chunk_results$failures <- tibble::tibble( !!id_col_name := failures_ids, !!text_col_name := failures_texts, .error = TRUE, .error_msg = failures_msgs, + .status = failures_status, .chunk = chunk_num ) } diff --git a/R/hf_embed.R b/R/hf_embed.R index 8f249e3..e7001d3 100644 --- a/R/hf_embed.R +++ b/R/hf_embed.R @@ -230,7 +230,7 @@ hf_embed_batch <- function(texts, result$original_index <- NULL # drop index now we're returning - result <- dplyr::relocate(result, c(`.error`, `.error_message`), .before = dplyr::all_of(relocate_col)) + result <- dplyr::relocate(result, c(`.error`, `.error_msg`), .before = dplyr::all_of(relocate_col)) return(result) } @@ -359,8 +359,18 @@ hf_embed_chunks <- function(texts, progress = TRUE ) - successes <- httr2::resps_successes(responses) - failures <- httr2::resps_failures(responses) + # separate actual responses from error objects (network failures, etc.) + is_response <- purrr::map_lgl(responses, inherits, "httr2_response") + response_objects <- responses[is_response] + error_objects <- responses[!is_response] + + # split responses by HTTP status code (not just by type) + is_success <- purrr::map_lgl(response_objects, ~httr2::resp_status(.x) < 400) + successes <- response_objects[is_success] + http_failures <- response_objects[!is_success] + + # combine HTTP failures with network/other errors + failures <- c(http_failures, error_objects) n_successes <- length(successes) n_failures <- length(failures) @@ -379,6 +389,7 @@ hf_embed_chunks <- function(texts, !!id_col_name := successes_ids, .error = FALSE, .error_msg = NA_character_, + .status = NA_integer_, .chunk = chunk_num ) |> dplyr::bind_cols(successes_content) @@ -386,12 +397,29 @@ hf_embed_chunks <- function(texts, if (n_failures > 0) { failures_ids <- purrr::map(failures, \(x) purrr::pluck(x, "request", "headers", "endpointr_id")) |> unlist() - failures_msgs <- purrr::map_chr(failures, \(x) purrr::pluck(x, "message", .default = "Unknown error")) + failures_msgs <- purrr::map_chr(failures, \(x) { + if (inherits(x, "httr2_response")) { + .extract_api_error(x) + } else { + # error object - try to get resp from it + resp <- purrr::pluck(x, "resp") + if (!is.null(resp)) .extract_api_error(resp) else .extract_api_error(x, "Unknown error") + } + }) + failures_status <- purrr::map_int(failures, \(x) { + if (inherits(x, "httr2_response")) { + httr2::resp_status(x) + } else { + resp <- purrr::pluck(x, "resp") + if (!is.null(resp)) httr2::resp_status(resp) else NA_integer_ + } + }) chunk_results$failures <- tibble::tibble( !!id_col_name := failures_ids, .error = TRUE, .error_msg = failures_msgs, + .status = failures_status, .chunk = chunk_num ) } diff --git a/R/hf_inference.R b/R/hf_inference.R index 101a250..1685f10 100644 --- a/R/hf_inference.R +++ b/R/hf_inference.R @@ -237,12 +237,14 @@ #' Execute a single embedding request and process the response #' #' @description - #' Performs a prepared request and returns the response + #' Performs a prepared request and returns the response. + #' Since requests use req_error(is_error = ~ FALSE), HTTP error responses + #' (status >= 400) are returned rather than thrown as errors. #' #' @param request An httr2 request object created by hf_build_request - #' @param ... ellipsis is sent to `httr2::req_perform`, e.g. for `path` and `verbosity`arguments. + #' @param ... ellipsis is sent to `httr2::req_perform`, e.g. for `path` and `verbosity` arguments. #' - #' @return A httr2 response object + #' @return An httr2 response object. Check status with httr2::resp_status(). #' @export #' #' @examples diff --git a/R/openai_completions.R b/R/openai_completions.R index 957d1be..34da373 100644 --- a/R/openai_completions.R +++ b/R/openai_completions.R @@ -275,10 +275,7 @@ oai_complete_text <- function(text, # basic request sending with non-comprehensive error handling if (httr2::resp_status(response) != 200) { - error_msg <- tryCatch( - httr2::resp_body_json(response)$error$message, - error = function(e) paste("HTTP", httr2::resp_status(response)) - ) + error_msg <- .extract_api_error(response) cli::cli_abort(c( "API request failed", "x" = error_msg @@ -312,7 +309,7 @@ oai_complete_text <- function(text, #' Process text chunks through OpenAI's Chat Completions API with batch file output #' #' This function processes large volumes of text through OpenAI's Chat Completions API -#' in configurable chunks, writing results progressively to a CSV file. It handles +#' in configurable chunks, writing results progressively to parquet files. It handles #' concurrent requests, automatic retries, and structured outputs while #' managing memory efficiently for large-scale processing. #' @@ -322,23 +319,25 @@ oai_complete_text <- function(text, #' memory usage. #' #' The function preserves data integrity by matching results to source texts through -#' the `ids` parameter. Each chunk is processed independently with results appended -#' to the output file, allowing for resumable processing if interrupted. +#' the `ids` parameter. Each chunk is processed independently with results written as +#' parquet files to the output directory, allowing for resumable processing if interrupted. #' #' When using structured outputs with a `schema`, responses are validated against -#' the JSON schema but stored as raw JSON strings in the output file. This allows +#' the JSON schema but stored as raw JSON strings in the output files. This allows #' for flexible post-processing without memory constraints during the API calls. #' #' The chunking strategy balances API efficiency with memory management. Larger #' `chunk_size` values reduce overhead but increase memory usage. Adjust based on #' your system resources and text sizes. #' +#' Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a `.parquet` file in the `output_dir=` directory, which also contains a `metadata.json` file which tracks important information such as the model and endpoint URL used. Be sure to add output directories to .gitignore! +#' #' @param texts Character vector of texts to process #' @param ids Vector of unique identifiers corresponding to each text (same length as texts) #' @param chunk_size Number of texts to process in each batch (default: 5000) #' @param model OpenAI model to use (default: "gpt-4.1-nano") #' @param system_prompt Optional system prompt applied to all requests -#' @param output_file Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename. +#' @param output_dir Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory. #' @param schema Optional JSON schema for structured output (json_schema object or list) #' @param concurrent_requests Integer; number of concurrent requests (default: 5) #' @param temperature Sampling temperature (0-2), lower = more deterministic (default: 0) @@ -347,29 +346,46 @@ oai_complete_text <- function(text, #' @param timeout Request timeout in seconds (default: 30) #' @param key_name Name of environment variable containing the API key (default: OPENAI_API_KEY) #' @param endpoint_url OpenAI API endpoint URL +#' @param id_col_name Name for the ID column in output (default: "id"). When called from oai_complete_df(), this preserves the original column name. #' #' @return A tibble containing all results with columns: -#' - `id`: Original identifier from input +#' - ID column (name specified by `id_col_name`): Original identifier from input #' - `content`: API response content (text or JSON string if schema used) #' - `.error`: Logical indicating if request failed #' - `.error_msg`: Error message if failed, NA otherwise -#' - `.batch`: Batch number for tracking +#' - `.chunk`: Chunk number for tracking #' #' @export #' @examples #' \dontrun{ -#' # basic usage with automatic file naming: -#' -#' # large-scale processing with custom output file: - -#' #structured extraction with schema: -#' +#' # basic usage with automatic directory naming: +#' result <- oai_complete_chunks( +#' texts = my_texts, +#' ids = my_ids, +#' model = "gpt-4.1-nano" +#' ) +#' +#' # large-scale processing with custom output directory: +#' result <- oai_complete_chunks( +#' texts = my_texts, +#' ids = my_ids, +#' output_dir = "my_results", +#' chunk_size = 10000 +#' ) +#' +#' # structured extraction with schema: +#' result <- oai_complete_chunks( +#' texts = my_texts, +#' ids = my_ids, +#' schema = my_schema, +#' temperature = 0 +#' ) #' #' # post-process structured results: -#' xx <- xx |> +#' processed <- result |> #' dplyr::filter(!.error) |> -#' dplyr::mutate(parsed = map(content, ~jsonlite::fromJSON(.x))) |> -#' unnest_wider(parsed) +#' dplyr::mutate(parsed = purrr::map(content, ~jsonlite::fromJSON(.x))) |> +#' tidyr::unnest_wider(parsed) #' } # oai_complete_chunks docs ---- oai_complete_chunks <- function(texts, @@ -377,7 +393,7 @@ oai_complete_chunks <- function(texts, chunk_size = 5000L, model = "gpt-4.1-nano", system_prompt = NULL, - output_file = "auto", + output_dir = "auto", schema = NULL, concurrent_requests = 5L, temperature = 0L, @@ -385,7 +401,8 @@ oai_complete_chunks <- function(texts, max_retries = 5L, timeout = 30L, key_name = "OPENAI_API_KEY", - endpoint_url = "https://api.openai.com/v1/chat/completions" + endpoint_url = "https://api.openai.com/v1/chat/completions", + id_col_name = "id" ) { # input validation ---- stopifnot( @@ -395,7 +412,11 @@ oai_complete_chunks <- function(texts, "chunk_size must be a positive integer greater than 1" = is.numeric(chunk_size) && chunk_size > 0 ) - output_file <- .handle_output_filename(output_file) + output_dir <- .handle_output_directory(output_dir, base_dir_name = "oai_completions_batch") + + if (!dir.exists(output_dir)) { + dir.create(output_dir, recursive = TRUE) + } # make sure we json_dump the schema here if necessary, so that we don't json_dump for every individual document if(!is.null(schema) && inherits(schema, "EndpointR::json_schema")) { @@ -405,27 +426,52 @@ oai_complete_chunks <- function(texts, } batch_data <- batch_vector(seq_along(texts), chunk_size) - n_batches <- length(batch_data$batch_indices) + n_chunks <- length(batch_data$batch_indices) + + # write metadata to track important information for debugging and reproducibility + metadata <- list( + model = model, + endpoint_url = endpoint_url, + chunk_size = chunk_size, + n_texts = length(texts), + concurrent_requests = concurrent_requests, + timeout = timeout, + max_retries = max_retries, + temperature = temperature, + max_tokens = max_tokens, + output_dir = output_dir, + key_name = key_name, + n_chunks = n_chunks, + has_schema = !is.null(schema), + has_system_prompt = !is.null(system_prompt), + timestamp = Sys.time() + ) - cli::cli_alert_info("Processing {length(texts)} text{?s} in {n_batches} chunk{?s} of up to {chunk_size} each") + jsonlite::write_json(metadata, + file.path(output_dir, "metadata.json"), + auto_unbox = TRUE, + pretty = TRUE) + + cli::cli_alert_info("Processing {length(texts)} text{?s} in {n_chunks} chunk{?s} of up to {chunk_size} each") + cli::cli_alert_info("Intermediate results will be saved as parquet files in {output_dir}") total_successes <- 0 total_failures <- 0 - ## batch processing ---- - for (batch_num in seq_along(batch_data$batch_indices)) + ## chunk processing ---- + for (chunk_num in seq_along(batch_data$batch_indices)) { - batch_indices <- batch_data$batch_indices[[batch_num]] - batch_texts <- texts[batch_indices] - batch_ids <- ids[batch_indices] + chunk_indices <- batch_data$batch_indices[[chunk_num]] + chunk_texts <- texts[chunk_indices] + chunk_ids <- ids[chunk_indices] - cli::cli_progress_message("Processing batch {batch_num}/{n_batches} ({length(batch_indices)} text{?s})") + cli::cli_progress_message("Processing chunk {chunk_num}/{n_chunks} ({length(chunk_indices)} text{?s})") - ## build batch requests ---- + ## build chunk requests ---- requests <- oai_build_completions_request_list( - inputs = batch_texts, + inputs = chunk_texts, model = model, temperature = temperature, max_tokens = max_tokens, @@ -435,88 +481,109 @@ oai_complete_chunks <- function(texts, endpoint_url = endpoint_url, max_retries = max_retries, timeout = timeout, - endpointr_ids = batch_ids + endpointr_ids = chunk_ids ) - # make sure we have some valid requests, or skip to the next iter + # make sure we have some valid requests, or skip to the next iteration is_valid_request <- purrr::map_lgl(requests, ~inherits(.x, "httr2_request")) valid_requests <- requests[is_valid_request] if (length(valid_requests) == 0) { - cli::cli_alert_warning("No valid request{?s} in batch {batch_num}, skipping") + cli::cli_alert_warning("No valid request{?s} in chunk {chunk_num}, skipping") next } - # perform batch requests ---- - # get chunk_size individual responses and then handle them + # perform chunk requests ---- responses <- perform_requests_with_strategy( valid_requests, concurrent_requests = concurrent_requests, progress = TRUE ) - successes <- httr2::resps_successes(responses) - failures <- httr2::resps_failures(responses) + # separate actual responses from error objects (network failures, etc.) + is_response <- purrr::map_lgl(responses, inherits, "httr2_response") + response_objects <- responses[is_response] + error_objects <- responses[!is_response] + + # split responses by HTTP status code (not just by type) + is_success <- purrr::map_lgl(response_objects, ~httr2::resp_status(.x) < 400) + successes <- response_objects[is_success] + http_failures <- response_objects[!is_success] + + # combine HTTP failures with network/other errors + failures <- c(http_failures, error_objects) n_successes <- length(successes) n_failures <- length(failures) total_successes <- total_successes + n_successes total_failures <- total_failures + n_failures - ## process batch responses ---- - # within batch results - batch_results <- list() + ## process chunk responses ---- + # within chunk results + chunk_results <- list() if (length(successes) > 0) { successes_ids <- purrr::map(successes, ~purrr::pluck(.x, "request", "headers", "endpointr_id")) |> unlist() successes_content <- purrr::map_chr(successes, .extract_successful_completion_content) - batch_results$successes <- tibble::tibble( - id = successes_ids, + chunk_results$successes <- tibble::tibble( + !!id_col_name := successes_ids, content = successes_content, .error = FALSE, .error_msg = NA_character_, - .batch = batch_num + .status = NA_integer_, + .chunk = chunk_num ) } if (length(failures) > 0) { failures_ids <- purrr::map(failures, ~purrr::pluck(.x, "request", "headers", "endpointr_id")) |> unlist() - failures_msgs <- purrr::map_chr(failures, ~purrr::pluck(.x, "message", .default = "Unknown error")) - - batch_results$failures <- tibble::tibble( - id = failures_ids, + failures_msgs <- purrr::map_chr(failures, ~{ + if (inherits(.x, "httr2_response")) { + .extract_api_error(.x) + } else { + # Error object - try to get resp from it + resp <- purrr::pluck(.x, "resp") + if (!is.null(resp)) .extract_api_error(resp) else .extract_api_error(.x, "Unknown error") + } + }) + failures_status <- purrr::map_int(failures, ~{ + if (inherits(.x, "httr2_response")) { + httr2::resp_status(.x) + } else { + resp <- purrr::pluck(.x, "resp") + if (!is.null(resp)) httr2::resp_status(resp) else NA_integer_ + } + }) + + chunk_results$failures <- tibble::tibble( + !!id_col_name := failures_ids, content = NA_character_, .error = TRUE, .error_msg = failures_msgs, - .batch = batch_num + .status = failures_status, + .chunk = chunk_num ) } - batch_df <- dplyr::bind_rows(batch_results) + chunk_df <- dplyr::bind_rows(chunk_results) - # if(!is.null(output_file)){ # skip writing if output_file = NULL - can't be NULL after .handle_output_filename - as if NULL we write to tmp file - if (nrow(batch_df) > 0) { - if (batch_num == 1) { - # if we're in the first batch write to csv with headers (col names) - readr::write_csv(batch_df, output_file, append = FALSE) - } else { - # all other batches, append and don't use col names - readr::write_csv(batch_df, output_file, append = TRUE, col_names = FALSE) - } + if (nrow(chunk_df) > 0) { + chunk_file <- glue::glue("{output_dir}/chunk_{stringr::str_pad(chunk_num, 3, pad = '0')}.parquet") + arrow::write_parquet(chunk_df, chunk_file) } - # } - cli::cli_alert_success("Batch {batch_num}: {n_successes} successful, {n_failures} failed") + cli::cli_alert_success("Chunk {chunk_num}: {n_successes} successful, {n_failures} failed") - rm(requests, responses, successes, failures, batch_results, batch_df) + rm(requests, responses, successes, failures, chunk_results, chunk_df) gc(verbose = FALSE) } - cli::cli_alert_success("Completed processing: {total_successes} successful, {total_failures} failed") + parquet_files <- list.files(output_dir, pattern = "\\.parquet$", full.names = TRUE) - # retrieve all results from the output file (results for all batches) - this may still be too inefficient, and should perhaps write to duckdb(?) - final_results <- readr::read_csv(output_file, show_col_types = FALSE) + cli::cli_alert_info("Processing completed, there were {total_successes} successes\n and {total_failures} failures.") + final_results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() return(final_results) } @@ -535,8 +602,8 @@ oai_complete_chunks <- function(texts, #' results matched to the original data through the `id_var` parameter. #' #' The chunking approach enables processing of large data frames without memory -#' constraints. Results are written progressively to a CSV file (either specified -#' or auto-generated) and then read back as the return value. +#' constraints. Results are written progressively as parquet files (either to a specified +#' directory or auto-generated) and then read back as the return value. #' #' When using structured outputs with a `schema`, responses are validated against #' the JSON schema and stored as JSON strings. Post-processing may be needed to @@ -545,6 +612,8 @@ oai_complete_chunks <- function(texts, #' Failed requests are marked with `.error = TRUE` and include error messages, #' allowing for easy filtering and retry logic on failures. #' +#' Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a `.parquet` file in the `output_dir=` directory, which also contains a `metadata.json` file. Be sure to add output directories to .gitignore! +#' #' @param df Data frame containing text to process #' @param text_var Column name (unquoted) containing text inputs #' @param id_var Column name (unquoted) for unique row identifiers @@ -554,19 +623,58 @@ oai_complete_chunks <- function(texts, #' - `content`: API response content (text or JSON string if schema used) #' - `.error`: Logical indicating if request failed #' - `.error_msg`: Error message if failed, NA otherwise -#' - `.batch`: Batch number for tracking +#' - `.chunk`: Chunk number for tracking #' #' @export #' @examples #' \dontrun{ -#' +#' # Basic usage with a data frame +#' df <- tibble::tibble( +#' doc_id = 1:3, +#' text = c( +#' "I absolutely loved this product!", +#' "Terrible experience, would not recommend.", +#' "It was okay, nothing special." +#' ) +#' ) +#' +#' results <- oai_complete_df( +#' df = df, +#' text_var = text, +#' id_var = doc_id, +#' system_prompt = "Summarise the sentiment in one word." +#' ) +#' +#' # Structured extraction with schema +#' sentiment_schema <- create_json_schema( +#' name = "sentiment_analysis", +#' schema = schema_object( +#' sentiment = schema_string("positive, negative, or neutral"), +#' confidence = schema_number("confidence score between 0 and 1"), +#' required = list("sentiment", "confidence") +#' ) +#' ) +#' +#' results <- oai_complete_df( +#' df = df, +#' text_var = text, +#' id_var = doc_id, +#' schema = sentiment_schema, +#' temperature = 0 +#' ) +#' +#' # Post-process structured results +#' results |> +#' dplyr::filter(!.error) |> +#' dplyr::mutate(parsed = purrr::map(content, safely_from_json)) |> +#' tidyr::unnest_wider(parsed) #' } #oai_complete_df docs---- oai_complete_df <- function(df, text_var, id_var, model = "gpt-4.1-nano", - output_file = "auto", + output_dir = "auto", system_prompt = NULL, schema = NULL, chunk_size = 1000, @@ -591,12 +699,14 @@ oai_complete_df <- function(df, "`chunk_size` must be a positive integer" = is.numeric(chunk_size) && chunk_size > 0 ) - - output_file <- .handle_output_filename(output_file, base_file_name = "oai_batch") + output_dir <- .handle_output_directory(output_dir, base_dir_name = "oai_completions_batch") text_vec <- dplyr::pull(df, !!text_sym) id_vec <- dplyr::pull(df, !!id_sym) + # preserve original column name + id_col_name <- rlang::as_name(id_sym) + results <- oai_complete_chunks( texts = text_vec, ids = id_vec, @@ -611,11 +721,10 @@ oai_complete_df <- function(df, max_tokens = max_tokens, key_name = key_name, endpoint_url = endpoint_url, - output_file = output_file + output_dir = output_dir, + id_col_name = id_col_name ) - results <- dplyr::rename(results, !!id_sym := id) - return(results) } @@ -652,10 +761,7 @@ oai_complete_df <- function(df, .error_msg = NA_character_ )) } else { - .error_msg <- tryCatch( - httr2::resp_body_json(response)$error$message, - error = function(e) paste("HTTP", status) - ) + .error_msg <- .extract_api_error(response) return(list( status = status, diff --git a/R/openai_embed.R b/R/openai_embed.R index c8eeef0..b88dcb9 100644 --- a/R/openai_embed.R +++ b/R/openai_embed.R @@ -312,7 +312,7 @@ oai_embed_text <- function(text, #' filled with NA values and marked with error information. #' #' The function returns a tibble with embedding columns (V1, V2, ..., Vn), -#' error tracking columns (.error, .error_message), and optionally the +#' error tracking columns (.error, .error_msg), and optionally the #' original texts. #' #' @param texts Vector or list of character strings to generate embeddings for @@ -331,7 +331,7 @@ oai_embed_text <- function(text, #' @return A tibble containing: #' - Embedding vectors as columns (V1, V2, ..., Vn) #' - .error: Logical column indicating if embedding failed -#' - .error_message: Character column with error details +#' - .error_msg: Character column with error details #' - text: Original texts (if include_texts = TRUE) #' @export #' @@ -466,7 +466,7 @@ oai_embed_batch <- function(texts, # 5. Adding Error Information to Data Frame ---- # add errors and messages to return df. FALSE and "" if no error. result$.error <- errors - result$.error_message <- error_msgs + result$.error_msg <- error_msgs n_failed <- sum(result$.error) n_succeeded <- n_texts - n_failed @@ -491,14 +491,264 @@ oai_embed_batch <- function(texts, } # 6. Relocating Cols and Returning ---- - result <- dplyr::relocate(result, c(.error, .error_message), .before = dplyr::all_of(relocate_col)) + result <- dplyr::relocate(result, c(.error, .error_msg), .before = dplyr::all_of(relocate_col)) return(result) } -oai_embed_df <- function(df, text_var, id_var, model = "text-embedding-3-small", dimensions = NULL, batch_size = 1, concurrent_requests = 1, max_retries = 5, timeout = 20, endpoint_url = "https://api.openai.com/v1/embeddings", key_name = "OPENAI_API_KEY" ) { +# oai_embed_chunks docs ---- +#' Embed text chunks through OpenAI's Embeddings API +#' +#' This function processes large volumes of text through OpenAI's Embeddings API +#' in configurable chunks, writing results progressively to parquet files. It handles +#' concurrent requests, automatic retries, while managing memory efficiently for +#' large-scale processing. +#' +#' @details This function is designed for processing large text datasets that may not +#' fit comfortably in memory. It divides the input into chunks, processes each chunk +#' with concurrent API requests, and writes results immediately to disk to minimise +#' memory usage. +#' +#' The function preserves data integrity by matching results to source texts through +#' the `ids` parameter. Each chunk is processed independently with results written as +#' parquet files to the output directory. +#' +#' The chunking strategy balances API efficiency with memory management. Larger +#' `chunk_size` values reduce overhead but increase memory usage. Adjust based on +#' your system resources and text sizes. +#' +#' Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a `.parquet` file in the `output_dir=` directory, which also contains a `metadata.json` file which tracks important information such as the model and endpoint URL used. Be sure to add output directories to .gitignore! +#' +#' @param texts Character vector of texts to process +#' @param ids Vector of unique identifiers corresponding to each text (same length as texts) +#' @param model OpenAI embedding model to use (default: "text-embedding-3-small") +#' @param dimensions Number of embedding dimensions (default: 1536 for text-embedding-3-small) +#' @param output_dir Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory. +#' @param chunk_size Number of texts to process in each chunk before writing to disk (default: 5000) +#' @param concurrent_requests Number of concurrent requests (default: 5) +#' @param max_retries Maximum retry attempts per failed request (default: 5) +#' @param timeout Request timeout in seconds (default: 20) +#' @param endpoint_url OpenAI API endpoint URL (default: OpenAI's embedding endpoint) +#' @param key_name Name of environment variable containing the API key (default: "OPENAI_API_KEY") +#' @param id_col_name Name for the ID column in output (default: "id"). When called from oai_embed_df(), this preserves the original column name. +#' +#' @return A tibble with columns: +#' - ID column (name specified by `id_col_name`): Original identifier from input +#' - `.error`: Logical indicating if request failed +#' - `.error_msg`: Error message if failed, NA otherwise +#' - `.chunk`: Chunk number for tracking +#' - Embedding columns (V1, V2, etc.) +#' @export +#' +#' @examples +#' \dontrun{ +#' # basic usage with automatic directory naming +#' result <- oai_embed_chunks( +#' texts = my_texts, +#' ids = my_ids, +#' model = "text-embedding-3-small" +#' ) +#' +#' # large-scale processing with custom settings +#' result <- oai_embed_chunks( +#' texts = my_texts, +#' ids = my_ids, +#' output_dir = "my_embeddings", +#' chunk_size = 10000, +#' dimensions = 512, +#' concurrent_requests = 10 +#' ) +#' } +# oai_embed_chunks docs ---- +oai_embed_chunks <- function(texts, + ids, + model = "text-embedding-3-small", + dimensions = 1536, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 5L, + max_retries = 5L, + timeout = 20L, + endpoint_url = "https://api.openai.com/v1/embeddings", + key_name = "OPENAI_API_KEY", + id_col_name = "id") { + + # input validation ---- + stopifnot( + "texts must be a vector" = is.vector(texts), + "ids must be a vector" = is.vector(ids), + "texts and ids must be the same length" = length(texts) == length(ids), + "chunk_size must be a positive integer greater than 1" = is.numeric(chunk_size) && chunk_size > 0 + ) + + output_dir <- .handle_output_directory(output_dir, base_dir_name = "oai_embeddings_batch") + + if (!dir.exists(output_dir)) { + dir.create(output_dir, recursive = TRUE) + } + + chunk_data <- batch_vector(seq_along(texts), chunk_size) + n_chunks <- length(chunk_data$batch_indices) + + # write metadata to track important information for debugging and reproducibility + metadata <- list( + model = model, + endpoint_url = endpoint_url, + dimensions = dimensions, + chunk_size = chunk_size, + n_texts = length(texts), + concurrent_requests = concurrent_requests, + timeout = timeout, + max_retries = max_retries, + output_dir = output_dir, + key_name = key_name, + n_chunks = n_chunks, + timestamp = Sys.time() + ) + jsonlite::write_json(metadata, + file.path(output_dir, "metadata.json"), + auto_unbox = TRUE, + pretty = TRUE) + + cli::cli_alert_info("Processing {length(texts)} text{?s} in {n_chunks} chunk{?s} of up to {chunk_size} each") + cli::cli_alert_info("Intermediate results will be saved as parquet files in {output_dir}") + + total_successes <- 0 + total_failures <- 0 + + ## chunk processing ---- + for (chunk_num in seq_along(chunk_data$batch_indices)) { + + chunk_indices <- chunk_data$batch_indices[[chunk_num]] + chunk_texts <- texts[chunk_indices] + chunk_ids <- ids[chunk_indices] + + cli::cli_progress_message("Processing chunk {chunk_num}/{n_chunks} ({length(chunk_indices)} text{?s})") + + ## build chunk requests ---- + # use individual requests for each text rather than batching within request + requests <- purrr::map2( + .x = chunk_texts, + .y = chunk_ids, + .f = function(text, id) { + req <- oai_build_embedding_request( + input = text, + model = model, + dimensions = dimensions, + max_retries = max_retries, + timeout = timeout, + endpoint_url = endpoint_url, + key_name = key_name + ) + # attach id to request headers for tracking + httr2::req_headers(req, endpointr_id = id) + } + ) + + # make sure we have some valid requests, or skip to the next iteration + is_valid_request <- purrr::map_lgl(requests, ~inherits(.x, "httr2_request")) + valid_requests <- requests[is_valid_request] + + if (length(valid_requests) == 0) { + cli::cli_alert_warning("No valid request{?s} in chunk {chunk_num}, skipping") + next + } + + # perform chunk requests ---- + responses <- perform_requests_with_strategy( + valid_requests, + concurrent_requests = concurrent_requests, + progress = TRUE + ) + + # separate actual responses from error objects (network failures, etc.) + is_response <- purrr::map_lgl(responses, inherits, "httr2_response") + response_objects <- responses[is_response] + error_objects <- responses[!is_response] + + # split responses by HTTP status code (not just by type) + is_success <- purrr::map_lgl(response_objects, ~httr2::resp_status(.x) < 400) + successes <- response_objects[is_success] + http_failures <- response_objects[!is_success] + + # combine HTTP failures with network/other errors + failures <- c(http_failures, error_objects) + + n_successes <- length(successes) + n_failures <- length(failures) + total_successes <- total_successes + n_successes + total_failures <- total_failures + n_failures + + ## process chunk responses ---- + # within chunk results + chunk_results <- list() + + if (n_successes > 0) { + successes_ids <- purrr::map(successes, ~purrr::pluck(.x, "request", "headers", "endpointr_id")) |> unlist() + successes_content <- purrr::map(successes, tidy_oai_embedding) |> + purrr::list_rbind() + + chunk_results$successes <- tibble::tibble( + !!id_col_name := successes_ids, + .error = FALSE, + .error_msg = NA_character_, + .status = NA_integer_, + .chunk = chunk_num + ) |> + dplyr::bind_cols(successes_content) + } + + if (n_failures > 0) { + failures_ids <- purrr::map(failures, ~purrr::pluck(.x, "request", "headers", "endpointr_id")) |> unlist() + failures_msgs <- purrr::map_chr(failures, ~{ + if (inherits(.x, "httr2_response")) { + .extract_api_error(.x) + } else { + # Error object - try to get resp from it + resp <- purrr::pluck(.x, "resp") + if (!is.null(resp)) .extract_api_error(resp) else .extract_api_error(.x, "Unknown error") + } + }) + failures_status <- purrr::map_int(failures, ~{ + if (inherits(.x, "httr2_response")) { + httr2::resp_status(.x) + } else { + resp <- purrr::pluck(.x, "resp") + if (!is.null(resp)) httr2::resp_status(resp) else NA_integer_ + } + }) + + chunk_results$failures <- tibble::tibble( + !!id_col_name := failures_ids, + .error = TRUE, + .error_msg = failures_msgs, + .status = failures_status, + .chunk = chunk_num + ) + } + + chunk_df <- dplyr::bind_rows(chunk_results) + + if (nrow(chunk_df) > 0) { + chunk_file <- glue::glue("{output_dir}/chunk_{stringr::str_pad(chunk_num, 3, pad = '0')}.parquet") + arrow::write_parquet(chunk_df, chunk_file) + } + + cli::cli_alert_success("Chunk {chunk_num}: {n_successes} successful, {n_failures} failed") + + rm(requests, responses, successes, failures, chunk_results, chunk_df) + gc(verbose = FALSE) + } + + parquet_files <- list.files(output_dir, pattern = "\\.parquet$", full.names = TRUE) + + cli::cli_alert_info("Processing completed, there were {total_successes} successes\n and {total_failures} failures.") + final_results <- arrow::open_dataset(parquet_files, format = "parquet") |> + dplyr::collect() + + return(final_results) } @@ -508,37 +758,43 @@ oai_embed_df <- function(df, text_var, id_var, model = "text-embedding-3-small", #' @description #' High-level function to generate embeddings for texts in a data frame using #' OpenAI's embedding API. This function handles the entire process from request -#' creation to response processing, with options for batching & concurrent requests. +#' creation to response processing, with options for chunking & concurrent requests. #' #' @details #' This function extracts texts from a specified column, generates embeddings using -#' `oai_embed_batch()`, and joins the results back to the original data frame using -#' a specified ID column. +#' `oai_embed_chunks()`, and returns the results matched to the original IDs. #' -#' The function preserves the original data frame structure and adds new columns -#' for embedding dimensions (V1, V2, ..., Vn). If the number of rows doesn't match -#' after processing (due to errors), it returns the results with a warning. +#' The chunking approach enables processing of large data frames without memory +#' constraints. Results are written progressively as parquet files (either to a specified +#' directory or auto-generated) and then read back as the return value. #' #' OpenAI's embedding API allows you to specify the number of dimensions for the -#' output embeddings, which can be useful for reducing memory usage, storage cost,s or matching +#' output embeddings, which can be useful for reducing memory usage, storage costs, or matching #' specific downstream requirements. The default is model-specific (1536 for #' text-embedding-3-small). \href{https://openai.com/index/new-embedding-models-and-api-updates/}{OpenAI Embedding Updates} #' +#' Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a `.parquet` file in the `output_dir=` directory, which also contains a `metadata.json` file. Be sure to add output directories to .gitignore! +#' #' @param df Data frame containing texts to embed #' @param text_var Column name (unquoted) containing texts to embed #' @param id_var Column name (unquoted) for unique row identifiers #' @param model OpenAI embedding model to use (default: "text-embedding-3-small") -#' @param dimensions Number of embedding dimensions (NULL uses model default) +#' @param dimensions Number of embedding dimensions (default: 1536) #' @param key_name Name of environment variable containing the API key -#' @param batch_size Number of texts to process in one batch (default: 10) +#' @param output_dir Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory. +#' @param chunk_size Number of texts to process in each chunk before writing to disk (default: 5000) #' @param concurrent_requests Number of concurrent requests (default: 1) #' @param max_retries Maximum retry attempts per request (default: 5) #' @param timeout Request timeout in seconds (default: 20) #' @param endpoint_url OpenAI API endpoint URL #' @param progress Whether to display a progress bar (default: TRUE) #' -#' @return Original data frame with additional columns for embeddings (V1, V2, etc.), -#' plus .error and .error_message columns indicating any failures +#' @return A tibble with columns: +#' - ID column (preserves original column name): Original identifier from input +#' - `.error`: Logical indicating if request failed +#' - `.error_msg`: Error message if failed, NA otherwise +#' - `.chunk`: Chunk number for tracking +#' - Embedding columns (V1, V2, etc.) #' #' @export #' @@ -549,7 +805,7 @@ oai_embed_df <- function(df, text_var, id_var, model = "text-embedding-3-small", #' text = c("First example", "Second example", "Third example") #' ) #' -#' # Generate embeddings with default dimensions +#' # Generate embeddings with default settings #' embeddings_df <- oai_embed_df( #' df = df, #' text_var = text, @@ -561,8 +817,8 @@ oai_embed_df <- function(df, text_var, id_var, model = "text-embedding-3-small", #' df = df, #' text_var = text, #' id_var = id, -#' dimensions = 360, # smaller embeddings -#' batch_size = 5 +#' dimensions = 512, # smaller embeddings +#' chunk_size = 10000 #' ) #' #' # Use with concurrent requests for faster processing @@ -571,7 +827,7 @@ oai_embed_df <- function(df, text_var, id_var, model = "text-embedding-3-small", #' text_var = text, #' id_var = id, #' model = "text-embedding-3-large", -#' concurrent_requests = 3 +#' concurrent_requests = 5 #' ) #' } oai_embed_df <- function(df, @@ -580,10 +836,11 @@ oai_embed_df <- function(df, model = "text-embedding-3-small", dimensions = 1536, key_name = "OPENAI_API_KEY", - batch_size = 10, - concurrent_requests = 1, - max_retries = 5, - timeout = 20, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 1L, + max_retries = 5L, + timeout = 20L, endpoint_url = "https://api.openai.com/v1/embeddings", progress = TRUE) { @@ -592,60 +849,37 @@ oai_embed_df <- function(df, stopifnot( "df must be a data frame" = is.data.frame(df), + "df must not be empty" = nrow(df) > 0, + "text_var must exist in df" = rlang::as_name(text_sym) %in% names(df), + "id_var must exist in df" = rlang::as_name(id_sym) %in% names(df), "endpoint_url must be provided" = !is.null(endpoint_url) && nchar(endpoint_url) > 0, - "concurrent_requests must be a number greater than 0" = is.numeric(concurrent_requests) && concurrent_requests > 0, - "batch_size must be a number greater than 0" = is.numeric(batch_size) && batch_size > 0 + "concurrent_requests must be an integer" = is.numeric(concurrent_requests) && concurrent_requests > 0 ) - if (!rlang::as_string(text_sym) %in% names(df)) { - cli::cli_abort("Column {.code {rlang::as_string(text_sym)}} not found in data frame") - } - - if (!rlang::as_string(id_sym) %in% names(df)) { - cli::cli_abort("Column {.code {rlang::as_string(id_sym)}} not found in data frame") - } - - original_num_rows <- nrow(df) + output_dir <- .handle_output_directory(output_dir, base_dir_name = "oai_embeddings_batch") - # pull texts & ids into vectors for batch function texts <- dplyr::pull(df, !!text_sym) indices <- dplyr::pull(df, !!id_sym) - batch_size <- if(is.null(batch_size) || batch_size <= 1) 1 else batch_size + # preserve original column name + id_col_name <- rlang::as_name(id_sym) - embeddings_tbl <- oai_embed_batch( + chunk_size <- if(is.null(chunk_size) || chunk_size <= 1) 1 else chunk_size + + results <- oai_embed_chunks( texts = texts, + ids = indices, model = model, dimensions = dimensions, - batch_size = batch_size, + output_dir = output_dir, + chunk_size = chunk_size, concurrent_requests = concurrent_requests, max_retries = max_retries, timeout = timeout, endpoint_url = endpoint_url, key_name = key_name, - include_texts = FALSE, - relocate_col = 1 + id_col_name = id_col_name ) - df_with_row_id <- df |> dplyr::mutate(.row_id = dplyr::row_number()) - - embeddings_tbl <- embeddings_tbl |> - dplyr::mutate(.row_id = dplyr::row_number()) - - result_df <- df_with_row_id |> - dplyr::left_join(embeddings_tbl, by = ".row_id") |> - dplyr::select(-.row_id) - - # sanity check and alert user if there's a mismatch - final_num_rows <- nrow(result_df) - - if(final_num_rows != original_num_rows){ - cli::cli_warn("Rows in original data frame and returned data frame do not match:") - cli::cli_bullets(text = c( - "Rows in original data frame: {original_num_rows}", - "Rows in returned data frame: {final_num_rows}" - )) - } - - return(result_df) + return(results) } diff --git a/R/zzz.R b/R/zzz.R index bb8cd44..a925ff3 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,5 +1,5 @@ utils::globalVariables(c(".embeddings", ".request", ".response", ".row_num", ".data", ".error", - ".error_message", "original_index", "text", ":=", ".row_id", "id", "label", "score", "verbose")) + ".error_msg", ".status", "original_index", "text", ":=", ".row_id", "id", "label", "score", "verbose")) .onLoad <- function(...) { S7::methods_register() diff --git a/README.Rmd b/README.Rmd index 22071ee..fe61b4a 100644 --- a/README.Rmd +++ b/README.Rmd @@ -196,7 +196,7 @@ oai_complete_df( id_var = review_id, system_prompt = "Classify the following review:", key_name = "OPENAI_API_KEY", - output_file = "completions_output.parquet", # writes results to this file + output_dir = "completions_output", # writes .parquet chunks to this directory chunk_size = 1000, # process 1000 rows per chunk concurrent_requests = 5, # send 5 rows of data simultaneously max_retries = 5, @@ -214,7 +214,7 @@ oai_complete_df( system_prompt = "Classify the following review:", schema = sentiment_schema, key_name = "OPENAI_API_KEY", - output_file = "completions_output.parquet", + output_dir = "completions_output", chunk_size = 1000, concurrent_requests = 5 ) @@ -267,7 +267,7 @@ metadata$endpoint_url **Note:** Add output directories to `.gitignore` to avoid committing API responses and metadata. -Read the [LLM Providers Vignette](articles/llm_providers.html), and the [Structured Outputs Vignette](vignettes/structured_outputs_json_schema.Rmd) for more information on common workflows with the OpenAI Chat Completions API [^1] +Read the [LLM Providers Vignette](articles/llm_providers.html), and the [Structured Outputs Vignette](vignettes/structured_outputs_json_schema.html) for more information on common workflows with the OpenAI Chat Completions API [^1] [^1]: Content pending implementation for Anthroic Messages API, Gemini API, and OpenAI Responses API diff --git a/README.md b/README.md index e54bd32..0f9e141 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,7 @@ oai_complete_df( id_var = review_id, system_prompt = "Classify the following review:", key_name = "OPENAI_API_KEY", - output_file = "completions_output.parquet", # writes results to this file + output_dir = "completions_output", # writes .parquet chunks to this directory chunk_size = 1000, # process 1000 rows per chunk concurrent_requests = 5, # send 5 rows of data simultaneously max_retries = 5, @@ -215,7 +215,7 @@ oai_complete_df( system_prompt = "Classify the following review:", schema = sentiment_schema, key_name = "OPENAI_API_KEY", - output_file = "completions_output.parquet", + output_dir = "completions_output", chunk_size = 1000, concurrent_requests = 5 ) @@ -275,7 +275,7 @@ responses and metadata. Read the [LLM Providers Vignette](articles/llm_providers.html), and the [Structured Outputs -Vignette](vignettes/structured_outputs_json_schema.Rmd) for more +Vignette](vignettes/structured_outputs_json_schema.html) for more information on common workflows with the OpenAI Chat Completions API [^1] diff --git a/_pkgdown.yml b/_pkgdown.yml index aa2657d..75b859a 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -88,6 +88,12 @@ reference: - hf_get_endpoint_info - hf_get_model_max_length +- title: "Anthropic Messages" + desc: "functions for working with Anthropic's Messages API" + contents: + - ant_build_messages_request + + - title: "OpenAI Completions" desc: "Functions for working with OpenAI's APIs including structured outputs" contents: @@ -103,6 +109,7 @@ reference: - oai_build_embedding_request - oai_embed_text - oai_embed_batch + - oai_embed_chunks - oai_embed_df - tidy_oai_embedding - title: "JSON Schema for Structured Outputs" @@ -159,8 +166,12 @@ development: news: releases: + - text: "Version 0.1.2" + href: news/index.html#endpointr-012 + - text: "Version 0.1.1" + href: news/index.html#endpointr-011 - text: "Version 0.1.0" - href: news/index.html + href: news/index.html#endpointr-010 footer: structure: diff --git a/dev_docs/ant_messages.qmd b/dev_docs/ant_messages.qmd new file mode 100644 index 0000000..43be43c --- /dev/null +++ b/dev_docs/ant_messages.qmd @@ -0,0 +1,148 @@ +--- +title: "ant_messages" +format: html +--- + +# Structured Outputs + +HTTP 400 errors + +```{r} +library(httr2) +library(jsonlite) + +req <- ant_build_messages_request(input = "This is terrible, fin.") + +resp <- req_perform(req, verbosity = 1) +resp |> resp_body_string() |> prettify() + + +schema <- create_json_schema( + name = "capital_response", + schema = schema_object( + country = schema_string(), + capital = schema_string(), + required = c("country", "capital") + ) +) + +structured_req <- ant_build_messages_request(input = "Well I would walk five hundred miles, to visit the noble city of Prague.", schema = schema, model = "claude-sonnet-4-5") + +structured_req |> req_dry_run() |> toJSON() |> jsonlite::flatten() + +structured_resp <- req_perform(structured_req) + +ant_schema <- .ant_format_schema(schema) +ant_structured_req <- ant_build_messages_request(input = "Well I would walk five hundred miles, to visit the noble city of Prague.", schema = ant_schema) + +setdiff(ant_structured_req |> req_dry_run(), structured_req |> req_dry_run()) + +ant_structured_resp <- req_perform(ant_structured_req) + +struc_dry <- req_dry_run(structured_req) +ant_dry <- req_dry_run(ant_structured_req) + +setdiff(struc_dry, ant_dry) +identical(struc_dry, ant_dry) + +``` + +``` +"output_format": { + "type": "json_schema", + "schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string"}, + "plan_interest": {"type": "string"}, + "demo_requested": {"type": "boolean"} + }, + "required": ["name", "email", "plan_interest", "demo_requested"], + "additionalProperties": false + } +} +``` + +```{r} +list_schema <- list( + type = "json_schema", + schema = list( + type = "object", + properties = list( + country = list(type = "string"), + capital = list(type = "string") + ), + required = c("country", "capital"), + additionalProperties = FALSE + ) +) + +``` + +```{r} +list_schema_req <- ant_build_messages_request("Prague is my favourite city, I went there when I visited The Czech Republic", schema = list_schema, model = "claude-sonnet-4-5") +list_schema_req |> req_dry_run() +list_schema_resp <- req_perform(list_schema_req, verbosity = 1) +``` + +If we want to surface the actual errors we'll need to re-write a bunch of the package. Thought about this earlier but wasn't sure. The structured outputs error with Haiku makes me think it's probs worth. + +```{r} +req_schema_haiku <- ant_build_messages_request("Prague is my favourite city, I went there when I visited The Czech Republic", schema = list_schema) + +req_schema_haiku <- req_schema_haiku |> + req_error(is_error = ~FALSE) + +resp <- req_schema_haiku |> + req_perform(verbosity = 1) + +resp_body_json(resp)[["error"]][["message"]] +resp |> resp_status() +``` + +## Nested Schemas + +```{r} +library(httr2) +library(purrr) +library(jsonlite) +library(tidyr) +absa_entities_schema <- create_json_schema( + name = "entities", + strict = TRUE, + description = "List of entity-sentiment", + schema = schema_object( + entities = schema_array( + schema_object( + entity = schema_string( + description = "Name of the named entity"), + sentiment = schema_string( + enum = c("positive", "negative", "neutral"), + "sentiment associated with the entity") + ) + ) + ) +) + +req <- ant_build_messages_request( + "Apple have been wonderful, Microsoft... not so much. And by not so much I mean pathetic.", + schema = absa_entities_schema, + model = "claude-sonnet-4-5" +) + +resp <- httr2::req_perform(req) + +resp |> resp_body_json() |> + purrr::pluck("content", 1, "text") |> + fromJSON() |> + pluck('entities') + + +``` + +``` + entity sentiment +1 Apple positive +2 Microsoft negative +``` diff --git a/dev_docs/integrations.qmd b/dev_docs/integrations.qmd new file mode 100644 index 0000000..d4bc839 --- /dev/null +++ b/dev_docs/integrations.qmd @@ -0,0 +1,172 @@ +--- +title: "integrations" +format: html +--- + +```{r, setup} +library(tidyverse) +library(httr2) +library(EndpointR) + +n_tests <- 5 +test_df <- tibble( + id = 1:n_tests, + text = paste("This is test sentence number", 1:n_tests) + ) +``` + +Space for integration tests (useful for not relying on unit tests when interacting with real APIs) + +# oai embed + +```{r, invalid_model_oai_embed} +oai_embed_invalid_model <- oai_embed_df( + test_df, + text_var = "text", + id_var = "id", + model = "text-embedding-FAKE-model", + output_dir = NULL, + concurrent_requests = n_tests + ) + +oai_embed_invalid_model |> dplyr::select(id, .error, .error_msg, .status) +# expect: .error = TRUE, .status = 404, .error_msg contains model info +``` + +invalid API key returns 401 authentication error + +```{r, oai_embed_401_auth} +oai_embed_bad_auth <- oai_embed_df( + test_df, + text_var = "text", + id_var = "id", + model = "text-embedding-3-small", + key_name = "FAKE_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests +) + +oai_embed_bad_auth |> dplyr::select(id, .error, .error_msg, .status) +# expect: .error = TRUE, .status = 401, .error_msg mentions authentication +``` + +now succeed: + +```{r oai_embed_success} +oai_embed_success <- oai_embed_df( + test_df, + text_var = text, + id_var = id, + model = "text-embedding-3-small", + output_dir = NULL, + concurrent_requests = n_tests) + +oai_embed_success +``` + +# oai completions + +invalid API key for completions + +```{r} +oai_complete_bad_auth <- oai_complete_df( + test_df, + text_var = "text", + id_var = "id", + model = "gpt-4o-mini", + system_prompt = "Summarize in one word.", + key_name = "FAKE_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests +) + +oai_complete_bad_auth |> dplyr::select(id, .error, .error_msg, .status) + +# expect: .error = TRUE, .status = 401 +``` + +```{r} +oai_complete_good_auth <- oai_complete_df( + test_df, + text_var = "text", + id_var = "id", + model = "gpt-4o-mini", + system_prompt = "Summarize in one word.", + key_name = "OPENAI_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests +) + +oai_complete_good_auth +``` + +# hf embed + +non-existent HuggingFace model + +```{r} + hf_embed_invalid_model <- hf_embed_df( + test_df, + text_var = "text", + id_var = "id", + endpoint_url = "https://api-inference.huggingface.co/pipeline/feature-extraction/FAKE-model-404", + key_name = "HF_TEST_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests + ) + +hf_embed_invalid_model |> dplyr::select(id, .error, .error_msg, .status) + +# expect: .error = TRUE, .status = 410, .error_msg mentions mhttps://api-inference.huggingface.co is no longer... +``` + +invalid HuggingFace token + +```{r, invalid_key_hf_embed} +hf_embed_bad_auth <- hf_embed_df( + test_df, + text_var = "text", + id_var = "id", + endpoint_url = "https://router.huggingface.co/hf-inference/models/sentence-transformers/all-MiniLM-L6-v2/pipeline/sentence-similarity", + key_name = "FAKE_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests) + +hf_embed_bad_auth |> dplyr::select(id, .error, .error_msg, .status) + +# expect: .error = TRUE, .status = 401 +``` + +Non-existent classification model + +```{r, invalid_model_hf_classify} +hf_classify_invalid_model <- hf_classify_df( + test_df, + text_var = "text", + id_var = "id", + endpoint_url = "https://api-inference.huggingface.co/pipeline/feature-extraction/FAKE-model-404", + output_dir = NULL, + key_name = "HF_TEST_API_KEY", + concurrent_requests = n_tests + ) + +hf_classify_invalid_model |> dplyr::select(id, .error, .error_msg, .status) +# expect: .error = TRUE, .status = 410 +``` + +Using an embedding model for classification (wrong task type) + +```{r, bad_task_hf_classify} +hf_classify_wrong_task <- hf_classify_df( + test_df, + text_var = text, + id_var = id, + endpoint_url = "https://router.huggingface.co/hf-inference/models/sentence-transformers/all-MiniLM-L6-v2/pipeline/sentence-similarity", + key_name = "HF_TEST_API_KEY", + output_dir = NULL, + concurrent_requests = n_tests) + +hf_classify_wrong_task |> dplyr::select(id, .error, .error_msg, .status) + +# expect: .error = TRUE, error message should indicate task mismatch, .status = 400 +``` diff --git a/dev_docs/jimbo_oai_batch_api.qmd b/dev_docs/jimbo_oai_batch_api.qmd new file mode 100644 index 0000000..6dfe0c2 --- /dev/null +++ b/dev_docs/jimbo_oai_batch_api.qmd @@ -0,0 +1,630 @@ +--- +title: "batch_api" +format: html +--- + +```{python} +import json +from openai import OpenAI +import pandas as pd +from IPython.display import Image, display +``` + +```{r} +reticulate::repl_python() + +OPENAI_API_KEY <- Sys.getenv("OPENAI_API_KEY") +``` + +```{python} +py_OPENAI_API_KEY = r.OPENAI_API_KEY + +client = OpenAI() +``` + +Loading data + +```{python} +dataset_path = "data/batch_trial.csv" + +df = pd.read_csv(dataset_path) +df.head() +``` + +Processing step + +```{python} +categorize_system_prompt = ''' +You are an internal tool that classifies online posts. + +Classify as TRUE only if the post explicitly discusses exercise as a deliberate activity AND explicitly states the REASON behind their exercise behaviour: +- A specific motivation/driver for exercising: WHY do they exercise? (e.g., losing weight, health problems, enjoying beautiful environments, dog needs walks, saving money on transport, mental health benefits, social connection) +- A specific barrier preventing exercise: WHAT stops them? (e.g., lack of facilities, weather, injury, cost, time constraints, caregiving responsibilities, safety concerns) + +Exercise includes: organised activities (gym, running, yoga, sports) or incidental physical activity performed regularly (walking for transport, active caregiving, dog walking). + +Classify as FALSE if: +- Exercise verbs describe ordinary movement, not deliberate exercise +- Exercise is tangential or metaphorical +- The post is about someone else's experience, not the author's own +- Exercise is mentioned but NO specific reason is given +- The post shows enthusiasm for exercise but doesn't explain the underlying driver (e.g., "I love working out!", "Can't wait for gym time") +- The post discusses exercise planning, routines, or methods without explaining WHY they exercise or WHAT prevents them +- Any connection to exercise requires inference or multiple logical steps + +The motivation or barrier must be directly stated in relation to exercise - do not infer connections. + +Answer TRUE or FALSE, then provide a brief rationale. + +Examples: + +"I work in Seattle this week right next to this cool ass hotel. I love older architecture. I could walk around a city looking at cool buildings admiring them all day✨" - TRUE - Motivation to walk around an aesthetically pleasing environment + +"Fine, but I still want to learn to walk before I try to run, okay?" - FALSE - Metaphorical use of exercise verbs, not about actual exercise + +"Dentist will prescribe them but not walk in centre. I could see the emergency dentist but tbh I can't drive on these levels of codeine." - FALSE - "Walk in centre" is a facility type, not about exercise + +"After my spinal fusion I had to beg and cry for anything. I was a few days in with little to no sleep, I told them if I don't get something to help so I could sleep, I'd be walking upstairs to psych." - FALSE - "Walking upstairs" describes ordinary movement in a hospital, not exercise; no motivation or barrier to exercise is discussed + +"I can't afford a gym membership and it's too dark after work to run outside safely" - TRUE - Explicit barriers to exercise (cost and safety concerns) + +"Just walking to pick the kids up from school is my daily workout now" - TRUE - Walking as deliberate exercise with implicit motivation (combining caregiving with activity) + +"I want your workout split and I want to follow it like the Bible" - FALSE - Person engaged in exercise, but no information as to why they exercise or what prevents them + +"I ran for 4 hours today and my legs are killing me but I'm going to keep walking" - FALSE - Describes doing exercise with dedication but doesn't explain WHY they exercise or what motivates them to continue +''' + +exercise_schema = { + "type": "json_schema", + "json_schema": { + "name": "simple_schema", + "schema": { + "type": "object", + "properties": { + "exercise": { + "type": "boolean", + "description": "contains reference to motivation towards exercise or barriers faced stopping exercise" + }, + "reason": { + "type": "string", + "description": "brief rationale behind classification" + } + }, + "additionalProperties": False, + "required": [ + "exercise", + "reason" + ] + }, + "strict": True + } +} + +def get_categories(description): + response = client.chat.completions.create( + model="gpt-4.1-mini", + temperature=0.0, + # This is to enable JSON mode, making sure responses are valid json objects + response_format= exercise_schema, + messages=[ + { + "role": "system", + "content": categorize_system_prompt + }, + { + "role": "user", + "content": description + } + ], + ) + + return response.choices[0].message.content + +## test on a few examples +for _, row in df[:5].iterrows(): + description = row['text'] + result = get_categories(description) + print(f"TEXT: {description}\n\nRESULT: {result}") + print("\n\n----------------------------\n\n") +``` + +Create batch task: + +```{python} +# Creating an array of json tasks + +tasks = [] + +for index, row in df.iterrows(): + + description = row['text'] + + task = { + "custom_id": str(row['radarly_id']), + "method": "POST", + "url": "/v1/chat/completions", + "body": { + # This is what you would have in your Chat Completions API call + "model": "gpt-4.1-mini", + "temperature": 0.0, + "response_format": exercise_schema, + "messages": [ + { + "role": "system", + "content": categorize_system_prompt + }, + { + "role": "user", + "content": description + } + ], + } + } + + tasks.append(task) + +tasks +``` + +Note: the request ID should be unique per batch. This is what you can use to match results to the initial input files, as requests will not be returned in the same order. + +```{python} +# Creating the file + +file_name = "data/batch_tasks_exercise.jsonl" + +with open(file_name, 'w') as file: + for obj in tasks: + file.write(json.dumps(obj) + '\n') +``` + +Uploading file + +```{python} +batch_file = client.files.create( + file=open(file_name, "rb"), + purpose="batch" +) + +print(batch_file) + +``` + +Create batch job + +```{python} +batch_job = client.batches.create( + input_file_id=batch_file.id, + endpoint="/v1/chat/completions", + completion_window="24h" +) +``` + +Checking batch status Note: this can take up to 24h, but it will usually be completed faster. + +You can continue checking until the status is 'completed'. + +```{python} +batch_job = client.batches.retrieve(batch_job.id) +print(batch_job) +``` + +Retrieving results + +```{python} +result_file_id = batch_job.output_file_id +result = client.files.content(result_file_id).content +``` + +```{python} +result_file_name = "data/batch_job_results.jsonl" + +with open(result_file_name, 'wb') as file: + file.write(result) +``` + +```{python} +# Loading data from saved file +results = [] +with open(result_file_name, 'r') as file: + for line in file: + # Parsing the JSON string into a dict and appending to the list of results + json_object = json.loads(line.strip()) + results.append(json_object) +``` + +```{python} +# Reading only the first results +for res in results[:5]: + task_id = res['custom_id'] + result = res['response']['body']['choices'][0]['message']['content'] + row = df[df['radarly_id'] == task_id].iloc[0] + description = row['text'] + radarly_id = row['radarly_id'] + print(f"RADARLY ID: {radarly_id}\nTEXT: {description}\n\nRESULT: {result}") + print("\n\n----------------------------\n\n") +``` + +Okay now I am going to try this on a sample of 10k different posts: + +```{python} +dataset_path = "data/batch_trial_v3.csv" +dataset_path = "data/batch_trial_v4.csv" +dataset_path = "data/lr_labelled_batch.csv" +dataset_path = "data/lr_labelled_batch_2.csv" + +df = pd.read_csv(dataset_path) +df.head() +len(df) +``` + +```{python} +categorize_system_prompt = ''' +You are an internal tool that classifies online posts relating to exercise. + +For each post, provide THREE classifications: + +1. Exercise: Does the post mention exercise in any form? (TRUE/FALSE) + - Include: organised activities (gym, running, yoga, sports, workout) or incidental physical activity performed regularly (walking for transport, active caregiving, dog walking) + - Exclude: ordinary movement, metaphorical uses, tangential mentions + +2. Motivation: Does the post explicitly state WHY they exercise or want to exercise? (TRUE/FALSE) + - Must explicitly state a reason: "because...", "so that...", "for my...", "to..." + - Examples: losing weight, health problems, enjoying beautiful environments, dog needs walks, saving money, mental health benefits, social connection + - FALSE if: shows enthusiasm but no specific reason, or requires inference + +3. Barrier: Does the post explicitly state WHAT prevents or hinders them from exercising? (TRUE/FALSE) + - Must explicitly state an obstacle or challenge + - Examples: lack of facilities, weather, injury, cost, time constraints, caregiving responsibilities, safety concerns + - FALSE if requires inference + + +IMPORTANT: +- The post must be about the author's OWN experience (not someone else's) +- Motivations and barriers must be DIRECTLY STATED, not inferred +- "Implies motivation" or "suggests barrier" is NOT sufficient +- Simply doing exercise != stating motivation + +Output format: +Exercise: TRUE/FALSE +Motivation: TRUE/FALSE +Barrier: TRUE/FALSE + +Examples: + +"I work in Seattle this week right next to this cool ass hotel. I love older architecture. I could walk around a city looking at cool buildings admiring them all day✨" +Exercise: TRUE +Motivation: TRUE +Barrier: FALSE + +"Fine, but I still want to learn to walk before I try to run, okay?" +Exercise: FALSE +Motivation: FALSE +Barrier: FALSE + +"Dentist will prescribe them but not walk in centre. I could see the emergency dentist but tbh I can't drive on these levels of codeine." +Exercise: FALSE +Motivation: FALSE +Barrier: FALSE + +"After my spinal fusion I had to beg and cry for anything. I was a few days in with little to no sleep, I told them if I don't get something to help so I could sleep, I'd be walking upstairs to psych." +Exercise: FALSE +Motivation: FALSE +Barrier: FALSE + +"I can't afford a gym membership and it's too dark after work to run outside safely" +Exercise: TRUE +Motivation: FALSE +Barrier: TRUE + +"Just walking to pick the kids up from school is my daily workout now" +Exercise: TRUE +Motivation: TRUE +Barrier: FALSE + +"I want your workout split and I want to follow it like the Bible" +Exercise: TRUE +Motivation: FALSE +Barrier: FALSE + +"I ran for 4 hours today and my legs are killing me but I'm going to keep walking" +Exercise: TRUE +Motivation: FALSE +Barrier: FALSE +''' + +exercise_schema = { + "type": "json_schema", + "json_schema": { + "name": "simple_schema", + "schema": { + "type": "object", + "properties": { + "exercise": { + "type": "boolean", + "description": "contains reference to exercise" + }, + "motivation": { + "type": "boolean", + "description": "contains reference to motivation towards exercise " + }, + "barrier": { + "type": "boolean", + "description": "contains reference to barriers faced stopping exercise" + } + }, + "additionalProperties": False, + "required": [ + "exercise", + "motivation", + "barrier" + ] + }, + "strict": True + } +} +``` + +```{python} +def get_categories(description): + response = client.chat.completions.create( + model="gpt-4.1-mini", + temperature=0.0, + # This is to enable JSON mode, making sure responses are valid json objects + response_format= exercise_schema, + messages=[ + { + "role": "system", + "content": categorize_system_prompt + }, + { + "role": "user", + "content": description + } + ], + ) + + return response.choices[0].message.content + +## test on a few examples +for _, row in df[:10].iterrows(): + description = row['text'] + result = get_categories(description) + print(f"TEXT: {description}\n\nRESULT: {result}") + print("\n\n----------------------------\n\n") +``` + +Create batch task: + +```{python} +# Creating an array of json tasks + +tasks = [] + +for index, row in df.iterrows(): + + description = row['text'] + + task = { + "custom_id": str(row['radarly_id']), + "method": "POST", + "url": "/v1/chat/completions", + "body": { + # This is what you would have in your Chat Completions API call + "model": "gpt-4.1-mini", + "temperature": 0.0, + "response_format": exercise_schema, + "messages": [ + { + "role": "system", + "content": categorize_system_prompt + }, + { + "role": "user", + "content": description + } + ], + } + } + + tasks.append(task) + +# tasks +``` + +Note: the request ID should be unique per batch. This is what you can use to match results to the initial input files, as requests will not be returned in the same order. + +```{python} +# Creating the file + +file_name = "data/batch_tasks_exercise_v2.jsonl" #10k +file_name = "data/batch_tasks_exercise_v3.jsonl" #35k +file_name_2 = "data/batch_tasks_exercise_v4.jsonl" #35k + +file_name_3 = "data/new_data_batch_tasks_exercise_v1.jsonl" #50k +file_name_4 = "data/new_data_batch_tasks_exercise_v2.jsonl" #50k + +with open(file_name, 'w') as file: + for obj in tasks: + file.write(json.dumps(obj) + '\n') + +with open(file_name_2, 'w') as file: + for obj in tasks: + file.write(json.dumps(obj) + '\n') + + +with open(file_name_3, 'w') as file: + for obj in tasks: + file.write(json.dumps(obj) + '\n') + + +with open(file_name_4, 'w') as file: + for obj in tasks: + file.write(json.dumps(obj) + '\n') +``` + +Uploading file + +```{python} +batch_file = client.files.create( + file=open(file_name, "rb"), + purpose="batch" +) + +print(batch_file) + +batch_file_2 = client.files.create( + file=open(file_name_2, "rb"), + purpose="batch" +) + +print(batch_file_2) + +batch_file_3 = client.files.create( + file=open(file_name_3, "rb"), + purpose="batch" +) + +print(batch_file_3) + +batch_file_4 = client.files.create( + file=open(file_name_4, "rb"), + purpose="batch" +) + +print(batch_file_4) + +``` + +Create batch job + +```{python} +batch_job = client.batches.create( + input_file_id=batch_file.id, + endpoint="/v1/chat/completions", + completion_window="24h" +) + +batch_job_2 = client.batches.create( + input_file_id=batch_file_2.id, + endpoint="/v1/chat/completions", + completion_window="24h" +) + +batch_job_3 = client.batches.create( + input_file_id=batch_file_3.id, + endpoint="/v1/chat/completions", + completion_window="24h" +) + +batch_job_4 = client.batches.create( + input_file_id=batch_file_4.id, + endpoint="/v1/chat/completions", + completion_window="24h" +) +``` + +Checking batch status Note: this can take up to 24h, but it will usually be completed faster. + +You can continue checking until the status is 'completed'. + +```{python} +batch_job = client.batches.retrieve(batch_job.id) +print(batch_job) +batch_job.status +batch_job.request_counts + +batch_job_2 = client.batches.retrieve(batch_job_2.id) +print(batch_job_2) +batch_job_2.status +batch_job_2.request_counts + +batch_job_3 = client.batches.retrieve(batch_job_3.id) +print(batch_job_3) +batch_job_3.status +batch_job_3.request_counts + +batch_job_4 = client.batches.retrieve(batch_job_4.id) +print(batch_job_4) +batch_job_4.status +batch_job_4.request_counts +``` + +Retrieving results + +```{python} +result_file_id = batch_job.output_file_id +result = client.files.content(result_file_id).content + +result_file_id_2 = batch_job_2.output_file_id +result_2 = client.files.content(result_file_id_2).content + +result_file_id_3 = batch_job_3.output_file_id +result_3 = client.files.content(result_file_id_3).content + +result_file_id_4 = batch_job_4.output_file_id +result_4 = client.files.content(result_file_id_4).content +``` + +```{python} +result_file_name_2 = "data/batch_job_results_v3.jsonl" + +with open(result_file_name_2, 'wb') as file: + file.write(result) + +result_file_name_3 = "data/batch_job_results_v4.jsonl" + +with open(result_file_name_3, 'wb') as file: + file.write(result_2) + + +result_file_name_4 = "data/batch_job_results_v6.jsonl" + +with open(result_file_name_4, 'wb') as file: + file.write(result_3) + +result_file_name_5 = "data/batch_job_results_v5.jsonl" + +with open(result_file_name_5, 'wb') as file: + file.write(result_4) +``` + +```{python} +# Loading data from saved file +results = [] +with open(result_file_name_2, 'r') as file: + for line in file: + # Parsing the JSON string into a dict and appending to the list of results + json_object = json.loads(line.strip()) + results.append(json_object) + +results_2 = [] +with open(result_file_name_3, 'r') as file: + for line in file: + # Parsing the JSON string into a dict and appending to the list of results + json_object = json.loads(line.strip()) + results_2.append(json_object) +``` + +```{python} +# Reading only the first results +for res in results[:5]: + task_id = res['custom_id'] + result = res['response']['body']['choices'][0]['message']['content'] + row = df[df['radarly_id'] == task_id].iloc[0] + description = row['text'] + radarly_id = row['radarly_id'] + print(f"RADARLY ID: {radarly_id}\nTEXT: {description}\n\nRESULT: {result}") + print("\n\n----------------------------\n\n") + +# Reading only the first results +for res in results_2[:5]: + task_id = res['custom_id'] + result = res['response']['body']['choices'][0]['message']['content'] + row = df[df['radarly_id'] == task_id].iloc[0] + description = row['text'] + radarly_id = row['radarly_id'] + print(f"RADARLY ID: {radarly_id}\nTEXT: {description}\n\nRESULT: {result}") + print("\n\n----------------------------\n\n") +``` diff --git a/dev_docs/refactor_error_messages.qmd b/dev_docs/refactor_error_messages.qmd new file mode 100644 index 0000000..fea52da --- /dev/null +++ b/dev_docs/refactor_error_messages.qmd @@ -0,0 +1,202 @@ +--- +title: "Error Handling Refactor Plan" +format: html +--- + +# EndpointR Error Handling Refactor Plan + +## Problem Statement + +1. **httr2 catches errors prematurely** - When API requests fail, httr2 throws errors before we can inspect the response body, making debugging difficult. Users see generic error messages instead of the detailed API error messages. + +2. **Inconsistent error column naming** - The package uses both `.error_msg` (in `openai_completions.R`) and `.error_message` (everywhere else), causing confusion and potential bugs. + +## Solution Overview + +### Part A: Prevent httr2 from auto-catching errors + +Add `httr2::req_error(is_error = ~ FALSE)` to request building functions so httr2 doesn't throw errors automatically. Then check response status manually and extract meaningful error messages from the API response body. + +### Part B: Standardize on `.error_msg` + +Rename all `.error_message` occurrences to `.error_msg` for consistency (shorter, cleaner). + +--- + +## Detailed Implementation Plan + +### Phase 1: Create a centralised error handling utility + +**File: `R/core.R`** + +Add a new helper function to extract error messages from failed responses: + +```r +.extract_api_error <- function(response, fallback_message = "Unknown error") { + # Handle different response types + if (!inherits(response, "httr2_response")) { + if (inherits(response, "error") || inherits(response, "condition")) { + return(conditionMessage(response)) + } + return(as.character(fallback_message)) + } + + status <- httr2::resp_status(response) + if (status < 400) return(NA_character_) + + # Try to extract error from response body + tryCatch({ + body <- httr2::resp_body_json(response) + # OpenAI format + if (!is.null(body$error$message)) return(body$error$message) + # Anthropic format + if (!is.null(body$error$message)) return(body$error$message) + # HuggingFace format + if (!is.null(body$error)) return(body$error) + # Generic message with status + paste("HTTP", status) + }, error = function(e) { + paste("HTTP", status) + }) +} +``` + +### Phase 2: Modify request building functions + +Add `httr2::req_error(is_error = ~ FALSE)` to each request builder: + +| File | Function | Change | +|------|----------|--------| +| `R/core.R` | `base_request()` | Add `req_error(is_error = ~ FALSE)` to the pipe chain | +| `R/hf_inference.R` | `hf_build_request()` | Already uses `base_request()`, will inherit | +| `R/hf_inference.R` | `hf_build_request_batch()` | Already uses `base_request()`, will inherit | +| `R/openai_completions.R` | `oai_build_completions_request()` | Already uses `base_request()`, will inherit | +| `R/openai_embed.R` | `oai_build_embedding_request()` | Already uses `base_request()`, will inherit | +| `R/openai_embed.R` | `oai_build_embedding_request_batch()` | Already uses `base_request()`, will inherit | + +**Key insight:** Adding `req_error()` to `base_request()` will cascade to all derived request functions. + +### Phase 3: Update request performing functions + +Update functions that perform requests to check status manually: + +#### 3.1 `R/core.R` - `safely_perform_request()` +- Currently wraps with `purrr::safely()` +- Update to check response status and extract error messages + +#### 3.2 `R/core.R` - `perform_request_or_return_error()` +- Remove tryCatch (no longer needed since errors won't be thrown) +- Add status check and error extraction + +#### 3.3 `R/core.R` - `perform_requests_with_strategy()` +- For parallel requests, httr2 returns all responses +- Update to properly categorise successes/failures based on status code + +#### 3.4 `R/hf_inference.R` - `hf_perform_request()` +- Add status check after `req_perform()` +- Return error info if status >= 400 + +### Phase 4: Update high-level functions + +Update functions that call perform functions to handle the new error format: + +| File | Functions | +|------|-----------| +| `R/openai_completions.R` | `oai_complete_text()`, `oai_complete_chunks()`, `oai_complete_df()` | +| `R/openai_embed.R` | `oai_embed_text()`, `oai_embed_batch()`, `oai_embed_df()` | +| `R/hf_embed.R` | `hf_embed_text()`, `hf_embed_batch()`, `hf_embed_df()` | +| `R/hf_classify.R` | `hf_classify_text()`, `hf_classify_batch()`, `hf_classify_df()` | + +### Phase 5: Standardize error column naming + +Rename `.error_message` to `.error_msg` across the package: + +| File | Change | +|------|--------| +| `R/core.R` | Lines 135, 155, 179, 195 - `.error_message` -> `.error_msg` | +| `R/hf_embed.R` | Line 233 - `.error_message` -> `.error_msg` | +| `R/hf_classify.R` | Line 324 - `.error_message` -> `.error_msg` | +| `R/openai_embed.R` | Lines 315, 334, 469, 494, 541 - `.error_message` -> `.error_msg` | +| `R/zzz.R` | Line 2 - `.error_message` -> `.error_msg` in globalVariables | + +### Phase 6: Update tests and documentation + +| File | Changes Needed | +|-----------|----------------| +| `tests/testthat/test-openai_completions.R` | Already uses `.error_msg` - no changes | +| `tests/testthat/test-core.R` | Line 134: `.error_message` -> `.error_msg` | +| `tests/testthat/test-hf_embed.R` | Lines 63, 107, 137: `.error_message` -> `.error_msg` | +| `tests/testthat/test-hf_classify.R` | Line 146: `.error_message` -> `.error_msg` | +| `vignettes/*.Rmd` | Update examples to use `.error_msg` | +| `man/*.Rd` | Will regenerate with `devtools::document()` | + +--- + +## Files to Modify (Summary) + +1. **R/core.R** - Add `.extract_api_error()`, modify `base_request()`, update perform functions, rename `.error_message` -> `.error_msg` +2. **R/openai_completions.R** - Update error handling (already uses `.error_msg`) +3. **R/openai_embed.R** - Rename `.error_message` -> `.error_msg`, update error handling +4. **R/hf_inference.R** - Update `hf_perform_request()` error handling +5. **R/hf_embed.R** - Rename `.error_message` -> `.error_msg`, update error handling +6. **R/hf_classify.R** - Rename `.error_message` -> `.error_msg`, update error handling +7. **R/zzz.R** - Change globalVariables from `.error_message` to `.error_msg` +8. **tests/testthat/test-core.R** - Update expected column name +9. **tests/testthat/test-hf_embed.R** - Update expected column name +10. **tests/testthat/test-hf_classify.R** - Update expected column name +11. **vignettes/*.Rmd** - Update documentation examples +12. **man/*.Rd** - Will regenerate with roxygen2 + +--- + +## Risk Assessment + +| Risk | Mitigation | +|------|------------| +| Breaking existing workflows | Run full test suite after each phase | +| Response body structure varies by API | Handle multiple formats in `.extract_api_error()` | +| Edge cases with empty responses | Default fallback messages | +| Documentation sync | Regenerate docs with `devtools::document()` | + +--- + +## Implementation Order + +1. Add `.extract_api_error()` helper (Phase 1) +2. Add `req_error()` to `base_request()` (Phase 2) +3. Update `perform_requests_with_strategy()` (Phase 3) +4. Update individual perform functions (Phase 3) +5. Rename `.error_message` to `.error_msg` across package (Phase 5) +6. Update tests (Phase 6) +7. Run tests and fix any issues +8. Regenerate documentation with `devtools::document()` + +--- + +## Implementation Notes (Completed) + +### Bug fix in `.extract_api_error()` + +During testing, discovered that the HuggingFace error format check must come **before** the OpenAI format check. When `body$error` is a string (HuggingFace format), attempting `body$error$message` throws "$ operator is invalid for atomic vectors". + +Fixed order: +```r +# huggingface format: {"error": "..."} - check first as it's a string not a list +if (!is.null(body$error) && is.character(body$error)) return(body$error) +# openai format: {"error": {"message": "...", "type": "..."}} +if (!is.null(body$error) && is.list(body$error) && !is.null(body$error$message)) return(body$error$message) +``` + +### Tests added (40 new tests in test-core.R) + +- `base_request` includes `req_error` policy +- `.extract_api_error()` extracts OpenAI-format error messages (400, 401, 429) +- `.extract_api_error()` extracts HuggingFace-format error messages (500, 503) +- `.extract_api_error()` extracts Anthropic-format error messages (529) +- `.extract_api_error()` falls back to HTTP status when body parsing fails +- `.extract_api_error()` returns NA for successful responses +- `.extract_api_error()` handles non-response objects gracefully +- `.extract_api_error()` handles all common HTTP error status codes (400, 401, 403, 404, 429, 500, 502, 503, 529) +- `process_response()` handles HTTP error responses correctly +- `process_response()` handles mixed success and error batches +- `.create_error_tibble()` produces correct structure with `.error_msg` column diff --git a/man/ant_build_messages_request.Rd b/man/ant_build_messages_request.Rd new file mode 100644 index 0000000..c9d1686 --- /dev/null +++ b/man/ant_build_messages_request.Rd @@ -0,0 +1,94 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_messages.R +\name{ant_build_messages_request} +\alias{ant_build_messages_request} +\title{Build an Anthropic Messages API request} +\usage{ +ant_build_messages_request( + input, + endpointr_id = NULL, + model = .ANT_DEFAULT_MODEL, + temperature = 0, + max_tokens = 500L, + schema = NULL, + system_prompt = NULL, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_MESSAGES_ENDPOINT, + timeout = 30L, + max_retries = 5L +) +} +\arguments{ +\item{input}{Text input to send to the model} + +\item{endpointr_id}{An id that will persist through to response} + +\item{model}{Anthropic model to use (default: "claude-haiku-4.5")} + +\item{temperature}{Sampling temperature (0-2), higher values = more randomness} + +\item{max_tokens}{Maximum tokens in response} + +\item{schema}{Optional JSON schema for structured output (json_schema object or list)} + +\item{system_prompt}{Optional system prompt} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic API endpoint URL} + +\item{timeout}{Request timeout in seconds} + +\item{max_retries}{Maximum number of retry attempts for failed requests} +} +\value{ +An httr2 request object +} +\description{ +Constructs an httr2 request object for Anthropic's Messages API. +Handles message formatting, system prompts, and optional JSON schema +for structured outputs. When using strucutred outputs you must select the correct model. +} +\details{ +This function creates the HTTP request but does not execute it. For +structured outputs, you must use a supported model (Claude Sonnet 4.5 +or Opus 4.1) and the request will automatically include the required +beta header. + +The \code{schema} parameter accepts either: +\itemize{ +\item A \code{json_schema} S7 object created with \code{create_json_schema()} +\item A raw list in Anthropic's \code{output_format} structure +} + +Unlike OpenAI, Anthropic uses \code{output_format} (not \code{response_format}) +and the schema structure differs slightly. +} +\examples{ +\dontrun{ + # simple request + req <- ant_build_messages_request( + input = "What is the capital of France?", + max_tokens = 100 + ) + + # with structured output + schema <- create_json_schema( + name = "capital_response", + schema = schema_object( + country = schema_string(), + capital = schema_string(), + required = c("country", "capital") + ) + ) + req <- ant_build_messages_request( + input = "What is the capital of France?", + schema = schema, + max_tokens = 100, + model = "sonnet-4-5" + ) +} +} +\seealso{ +\url{https://platform.claude.com/docs/en/build-with-claude/structured-outputs} +} diff --git a/man/df_embeddings_hf.Rd b/man/df_embeddings_hf.Rd index ae7753b..e0bd787 100644 --- a/man/df_embeddings_hf.Rd +++ b/man/df_embeddings_hf.Rd @@ -11,7 +11,7 @@ A data frame with 3 rows and 773 variables: \item{text}{Character; the original text that was embedded} \item{category}{Character; category classification of the text} \item{.error}{Logical; whether the embedding process failed} -\item{.error_message}{Character; error message if embedding failed (NA if successful)} +\item{.error_msg}{Character; error message if embedding failed (NA if successful)} \item{V1}{Numeric; embedding vector dimensions} \item{V2}{Numeric; embedding vector dimensions} \item{V3}{Numeric; embedding vector dimensions} diff --git a/man/df_sentiment_classification_example.Rd b/man/df_sentiment_classification_example.Rd index 662137d..c4efb77 100644 --- a/man/df_sentiment_classification_example.Rd +++ b/man/df_sentiment_classification_example.Rd @@ -13,7 +13,7 @@ A data frame with 3 rows and 7 variables: \item{NEGATIVE}{Numeric; probability score for negative sentiment (0-1)} \item{POSITIVE}{Numeric; probability score for positive sentiment (0-1)} \item{.error}{Logical; whether the classification process failed} -\item{.error_message}{Character; error message if classification failed (NA if successful)} +\item{.error_msg}{Character; error message if classification failed (NA if successful)} } } \source{ diff --git a/man/dot-ant_format_schema.Rd b/man/dot-ant_format_schema.Rd new file mode 100644 index 0000000..23e0ace --- /dev/null +++ b/man/dot-ant_format_schema.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_messages.R +\name{.ant_format_schema} +\alias{.ant_format_schema} +\title{Convert json_schema S7 object to Anthropic output_format structure} +\usage{ +.ant_format_schema(schema) +} +\description{ +Convert json_schema S7 object to Anthropic output_format structure +} +\keyword{internal} diff --git a/man/dot-create_error_tibble.Rd b/man/dot-create_error_tibble.Rd index e01adba..1b55fcb 100644 --- a/man/dot-create_error_tibble.Rd +++ b/man/dot-create_error_tibble.Rd @@ -4,19 +4,23 @@ \alias{.create_error_tibble} \title{Create standardised error tibble for failed requests} \usage{ -.create_error_tibble(indices, error_message) +.create_error_tibble(indices, error_msg, status = NA_integer_) } \arguments{ \item{indices}{Vector of indices indicating original request positions} -\item{error_message}{Character string or condition object describing the error} +\item{error_msg}{Character string or condition object describing the error} + +\item{status}{HTTP status code (integer) or NA_integer_ for non-HTTP errors. +Defaults to NA_integer_.} } \value{ A tibble with columns: \itemize{ \item original_index: Position in original request batch -\item .error: Always TRUE for error tibbles -\item .error_message: Character description of the error +\item .error: TRUE for errors +\item .error_msg: Character description of the error +\item .status: HTTP status code (integer) or NA for non-HTTP errors } } \description{ diff --git a/man/dot-extract_api_error.Rd b/man/dot-extract_api_error.Rd new file mode 100644 index 0000000..5a5e1ab --- /dev/null +++ b/man/dot-extract_api_error.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/core.R +\name{.extract_api_error} +\alias{.extract_api_error} +\title{Extract error message from an API response} +\usage{ +.extract_api_error(response, fallback_message = "Unknown error") +} +\arguments{ +\item{response}{An httr2_response object, error object, or other response type} + +\item{fallback_message}{Message to return if extraction fails} +} +\value{ +Character string containing the error message, or NA_character_ if response is successful +} +\description{ +Extracts a meaningful error message from an httr2 response object. +Handles different API response formats (OpenAI, Anthropic, HuggingFace). +} +\keyword{internal} diff --git a/man/hf_perform_request.Rd b/man/hf_perform_request.Rd index 75648b2..8810a4d 100644 --- a/man/hf_perform_request.Rd +++ b/man/hf_perform_request.Rd @@ -9,13 +9,15 @@ hf_perform_request(request, ...) \arguments{ \item{request}{An httr2 request object created by hf_build_request} -\item{...}{ellipsis is sent to \code{httr2::req_perform}, e.g. for \code{path} and \code{verbosity}arguments.} +\item{...}{ellipsis is sent to \code{httr2::req_perform}, e.g. for \code{path} and \code{verbosity} arguments.} } \value{ -A httr2 response object +An httr2 response object. Check status with httr2::resp_status(). } \description{ -Performs a prepared request and returns the response +Performs a prepared request and returns the response. +Since requests use req_error(is_error = ~ FALSE), HTTP error responses +(status >= 400) are returned rather than thrown as errors. } \examples{ \dontrun{ diff --git a/man/oai_complete_chunks.Rd b/man/oai_complete_chunks.Rd index d433341..60b1ccb 100644 --- a/man/oai_complete_chunks.Rd +++ b/man/oai_complete_chunks.Rd @@ -10,7 +10,7 @@ oai_complete_chunks( chunk_size = 5000L, model = "gpt-4.1-nano", system_prompt = NULL, - output_file = "auto", + output_dir = "auto", schema = NULL, concurrent_requests = 5L, temperature = 0L, @@ -18,7 +18,8 @@ oai_complete_chunks( max_retries = 5L, timeout = 30L, key_name = "OPENAI_API_KEY", - endpoint_url = "https://api.openai.com/v1/chat/completions" + endpoint_url = "https://api.openai.com/v1/chat/completions", + id_col_name = "id" ) } \arguments{ @@ -32,7 +33,7 @@ oai_complete_chunks( \item{system_prompt}{Optional system prompt applied to all requests} -\item{output_file}{Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename.} +\item{output_dir}{Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory.} \item{schema}{Optional JSON schema for structured output (json_schema object or list)} @@ -49,20 +50,22 @@ oai_complete_chunks( \item{key_name}{Name of environment variable containing the API key (default: OPENAI_API_KEY)} \item{endpoint_url}{OpenAI API endpoint URL} + +\item{id_col_name}{Name for the ID column in output (default: "id"). When called from oai_complete_df(), this preserves the original column name.} } \value{ A tibble containing all results with columns: \itemize{ -\item \code{id}: Original identifier from input +\item ID column (name specified by \code{id_col_name}): Original identifier from input \item \code{content}: API response content (text or JSON string if schema used) \item \code{.error}: Logical indicating if request failed \item \code{.error_msg}: Error message if failed, NA otherwise -\item \code{.batch}: Batch number for tracking +\item \code{.chunk}: Chunk number for tracking } } \description{ This function processes large volumes of text through OpenAI's Chat Completions API -in configurable chunks, writing results progressively to a CSV file. It handles +in configurable chunks, writing results progressively to parquet files. It handles concurrent requests, automatic retries, and structured outputs while managing memory efficiently for large-scale processing. } @@ -73,29 +76,48 @@ with concurrent API requests, and writes results immediately to disk to minimise memory usage. The function preserves data integrity by matching results to source texts through -the \code{ids} parameter. Each chunk is processed independently with results appended -to the output file, allowing for resumable processing if interrupted. +the \code{ids} parameter. Each chunk is processed independently with results written as +parquet files to the output directory, allowing for resumable processing if interrupted. When using structured outputs with a \code{schema}, responses are validated against -the JSON schema but stored as raw JSON strings in the output file. This allows +the JSON schema but stored as raw JSON strings in the output files. This allows for flexible post-processing without memory constraints during the API calls. The chunking strategy balances API efficiency with memory management. Larger \code{chunk_size} values reduce overhead but increase memory usage. Adjust based on your system resources and text sizes. + +Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a \code{.parquet} file in the \verb{output_dir=} directory, which also contains a \code{metadata.json} file which tracks important information such as the model and endpoint URL used. Be sure to add output directories to .gitignore! } \examples{ \dontrun{ -# basic usage with automatic file naming: +# basic usage with automatic directory naming: +result <- oai_complete_chunks( + texts = my_texts, + ids = my_ids, + model = "gpt-4.1-nano" +) -# large-scale processing with custom output file: -#structured extraction with schema: +# large-scale processing with custom output directory: +result <- oai_complete_chunks( + texts = my_texts, + ids = my_ids, + output_dir = "my_results", + chunk_size = 10000 +) +# structured extraction with schema: +result <- oai_complete_chunks( + texts = my_texts, + ids = my_ids, + schema = my_schema, + temperature = 0 +) # post-process structured results: -xx <- xx |> +processed <- result |> dplyr::filter(!.error) |> - dplyr::mutate(parsed = map(content, ~jsonlite::fromJSON(.x))) |> - unnest_wider(parsed) + dplyr::mutate(parsed = purrr::map(content, ~jsonlite::fromJSON(.x))) |> + tidyr::unnest_wider(parsed) } } diff --git a/man/oai_complete_df.Rd b/man/oai_complete_df.Rd index 15554b2..d274017 100644 --- a/man/oai_complete_df.Rd +++ b/man/oai_complete_df.Rd @@ -9,7 +9,7 @@ oai_complete_df( text_var, id_var, model = "gpt-4.1-nano", - output_file = "auto", + output_dir = "auto", system_prompt = NULL, schema = NULL, chunk_size = 1000, @@ -31,7 +31,7 @@ oai_complete_df( \item{model}{OpenAI model to use (default: "gpt-4.1-nano")} -\item{output_file}{Path to .CSV file for results. "auto" generates the filename, location and is persistent across sessions. If NULL, generates timestamped filename.} +\item{output_dir}{Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory.} \item{system_prompt}{Optional system prompt applied to all requests} @@ -59,7 +59,7 @@ A tibble with the original id column and additional columns: \item \code{content}: API response content (text or JSON string if schema used) \item \code{.error}: Logical indicating if request failed \item \code{.error_msg}: Error message if failed, NA otherwise -\item \code{.batch}: Batch number for tracking +\item \code{.chunk}: Chunk number for tracking } } \description{ @@ -75,8 +75,8 @@ processes texts in configurable chunks with concurrent API requests, and returns results matched to the original data through the \code{id_var} parameter. The chunking approach enables processing of large data frames without memory -constraints. Results are written progressively to a CSV file (either specified -or auto-generated) and then read back as the return value. +constraints. Results are written progressively as parquet files (either to a specified +directory or auto-generated) and then read back as the return value. When using structured outputs with a \code{schema}, responses are validated against the JSON schema and stored as JSON strings. Post-processing may be needed to @@ -84,9 +84,50 @@ unnest these into separate columns. Failed requests are marked with \code{.error = TRUE} and include error messages, allowing for easy filtering and retry logic on failures. + +Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a \code{.parquet} file in the \verb{output_dir=} directory, which also contains a \code{metadata.json} file. Be sure to add output directories to .gitignore! } \examples{ \dontrun{ +# Basic usage with a data frame +df <- tibble::tibble( + doc_id = 1:3, + text = c( + "I absolutely loved this product!", + "Terrible experience, would not recommend.", + "It was okay, nothing special." + ) +) + +results <- oai_complete_df( + df = df, + text_var = text, + id_var = doc_id, + system_prompt = "Summarise the sentiment in one word." +) + +# Structured extraction with schema +sentiment_schema <- create_json_schema( + name = "sentiment_analysis", + schema = schema_object( + sentiment = schema_string("positive, negative, or neutral"), + confidence = schema_number("confidence score between 0 and 1"), + required = list("sentiment", "confidence") + ) +) + +results <- oai_complete_df( + df = df, + text_var = text, + id_var = doc_id, + schema = sentiment_schema, + temperature = 0 +) +# Post-process structured results +results |> + dplyr::filter(!.error) |> + dplyr::mutate(parsed = purrr::map(content, safely_from_json)) |> + tidyr::unnest_wider(parsed) } } diff --git a/man/oai_embed_batch.Rd b/man/oai_embed_batch.Rd index 5331d95..8a90b03 100644 --- a/man/oai_embed_batch.Rd +++ b/man/oai_embed_batch.Rd @@ -49,7 +49,7 @@ A tibble containing: \itemize{ \item Embedding vectors as columns (V1, V2, ..., Vn) \item .error: Logical column indicating if embedding failed -\item .error_message: Character column with error details +\item .error_msg: Character column with error details \item text: Original texts (if include_texts = TRUE) } } @@ -73,7 +73,7 @@ as failed, not all documents across all batches. Failed embeddings will be filled with NA values and marked with error information. The function returns a tibble with embedding columns (V1, V2, ..., Vn), -error tracking columns (.error, .error_message), and optionally the +error tracking columns (.error, .error_msg), and optionally the original texts. } \examples{ diff --git a/man/oai_embed_chunks.Rd b/man/oai_embed_chunks.Rd new file mode 100644 index 0000000..ff15b36 --- /dev/null +++ b/man/oai_embed_chunks.Rd @@ -0,0 +1,98 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/openai_embed.R +\name{oai_embed_chunks} +\alias{oai_embed_chunks} +\title{Embed text chunks through OpenAI's Embeddings API} +\usage{ +oai_embed_chunks( + texts, + ids, + model = "text-embedding-3-small", + dimensions = 1536, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 5L, + max_retries = 5L, + timeout = 20L, + endpoint_url = "https://api.openai.com/v1/embeddings", + key_name = "OPENAI_API_KEY", + id_col_name = "id" +) +} +\arguments{ +\item{texts}{Character vector of texts to process} + +\item{ids}{Vector of unique identifiers corresponding to each text (same length as texts)} + +\item{model}{OpenAI embedding model to use (default: "text-embedding-3-small")} + +\item{dimensions}{Number of embedding dimensions (default: 1536 for text-embedding-3-small)} + +\item{output_dir}{Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory.} + +\item{chunk_size}{Number of texts to process in each chunk before writing to disk (default: 5000)} + +\item{concurrent_requests}{Number of concurrent requests (default: 5)} + +\item{max_retries}{Maximum retry attempts per failed request (default: 5)} + +\item{timeout}{Request timeout in seconds (default: 20)} + +\item{endpoint_url}{OpenAI API endpoint URL (default: OpenAI's embedding endpoint)} + +\item{key_name}{Name of environment variable containing the API key (default: "OPENAI_API_KEY")} + +\item{id_col_name}{Name for the ID column in output (default: "id"). When called from oai_embed_df(), this preserves the original column name.} +} +\value{ +A tibble with columns: +\itemize{ +\item ID column (name specified by \code{id_col_name}): Original identifier from input +\item \code{.error}: Logical indicating if request failed +\item \code{.error_msg}: Error message if failed, NA otherwise +\item \code{.chunk}: Chunk number for tracking +\item Embedding columns (V1, V2, etc.) +} +} +\description{ +This function processes large volumes of text through OpenAI's Embeddings API +in configurable chunks, writing results progressively to parquet files. It handles +concurrent requests, automatic retries, while managing memory efficiently for +large-scale processing. +} +\details{ +This function is designed for processing large text datasets that may not +fit comfortably in memory. It divides the input into chunks, processes each chunk +with concurrent API requests, and writes results immediately to disk to minimise +memory usage. + +The function preserves data integrity by matching results to source texts through +the \code{ids} parameter. Each chunk is processed independently with results written as +parquet files to the output directory. + +The chunking strategy balances API efficiency with memory management. Larger +\code{chunk_size} values reduce overhead but increase memory usage. Adjust based on +your system resources and text sizes. + +Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a \code{.parquet} file in the \verb{output_dir=} directory, which also contains a \code{metadata.json} file which tracks important information such as the model and endpoint URL used. Be sure to add output directories to .gitignore! +} +\examples{ +\dontrun{ + # basic usage with automatic directory naming + result <- oai_embed_chunks( + texts = my_texts, + ids = my_ids, + model = "text-embedding-3-small" + ) + + # large-scale processing with custom settings + result <- oai_embed_chunks( + texts = my_texts, + ids = my_ids, + output_dir = "my_embeddings", + chunk_size = 10000, + dimensions = 512, + concurrent_requests = 10 + ) +} +} diff --git a/man/oai_embed_df.Rd b/man/oai_embed_df.Rd index 08cf47d..ae62236 100644 --- a/man/oai_embed_df.Rd +++ b/man/oai_embed_df.Rd @@ -11,10 +11,11 @@ oai_embed_df( model = "text-embedding-3-small", dimensions = 1536, key_name = "OPENAI_API_KEY", - batch_size = 10, - concurrent_requests = 1, - max_retries = 5, - timeout = 20, + output_dir = "auto", + chunk_size = 5000L, + concurrent_requests = 1L, + max_retries = 5L, + timeout = 20L, endpoint_url = "https://api.openai.com/v1/embeddings", progress = TRUE ) @@ -28,11 +29,13 @@ oai_embed_df( \item{model}{OpenAI embedding model to use (default: "text-embedding-3-small")} -\item{dimensions}{Number of embedding dimensions (NULL uses model default)} +\item{dimensions}{Number of embedding dimensions (default: 1536)} \item{key_name}{Name of environment variable containing the API key} -\item{batch_size}{Number of texts to process in one batch (default: 10)} +\item{output_dir}{Path to directory for the .parquet chunks. "auto" generates a timestamped directory name. If NULL, uses a temporary directory.} + +\item{chunk_size}{Number of texts to process in each chunk before writing to disk (default: 5000)} \item{concurrent_requests}{Number of concurrent requests (default: 1)} @@ -45,27 +48,34 @@ oai_embed_df( \item{progress}{Whether to display a progress bar (default: TRUE)} } \value{ -Original data frame with additional columns for embeddings (V1, V2, etc.), -plus .error and .error_message columns indicating any failures +A tibble with columns: +\itemize{ +\item ID column (preserves original column name): Original identifier from input +\item \code{.error}: Logical indicating if request failed +\item \code{.error_msg}: Error message if failed, NA otherwise +\item \code{.chunk}: Chunk number for tracking +\item Embedding columns (V1, V2, etc.) +} } \description{ High-level function to generate embeddings for texts in a data frame using OpenAI's embedding API. This function handles the entire process from request -creation to response processing, with options for batching & concurrent requests. +creation to response processing, with options for chunking & concurrent requests. } \details{ This function extracts texts from a specified column, generates embeddings using -\code{oai_embed_batch()}, and joins the results back to the original data frame using -a specified ID column. +\code{oai_embed_chunks()}, and returns the results matched to the original IDs. -The function preserves the original data frame structure and adds new columns -for embedding dimensions (V1, V2, ..., Vn). If the number of rows doesn't match -after processing (due to errors), it returns the results with a warning. +The chunking approach enables processing of large data frames without memory +constraints. Results are written progressively as parquet files (either to a specified +directory or auto-generated) and then read back as the return value. OpenAI's embedding API allows you to specify the number of dimensions for the -output embeddings, which can be useful for reducing memory usage, storage cost,s or matching +output embeddings, which can be useful for reducing memory usage, storage costs, or matching specific downstream requirements. The default is model-specific (1536 for text-embedding-3-small). \href{https://openai.com/index/new-embedding-models-and-api-updates/}{OpenAI Embedding Updates} + +Avoid risk of data loss by setting a low-ish chunk_size (e.g. 5,000, 10,000). Each chunk is written to a \code{.parquet} file in the \verb{output_dir=} directory, which also contains a \code{metadata.json} file. Be sure to add output directories to .gitignore! } \examples{ \dontrun{ @@ -74,7 +84,7 @@ text-embedding-3-small). \href{https://openai.com/index/new-embedding-models-and text = c("First example", "Second example", "Third example") ) - # Generate embeddings with default dimensions + # Generate embeddings with default settings embeddings_df <- oai_embed_df( df = df, text_var = text, @@ -86,8 +96,8 @@ text-embedding-3-small). \href{https://openai.com/index/new-embedding-models-and df = df, text_var = text, id_var = id, - dimensions = 360, # smaller embeddings - batch_size = 5 + dimensions = 512, # smaller embeddings + chunk_size = 10000 ) # Use with concurrent requests for faster processing @@ -96,7 +106,7 @@ text-embedding-3-small). \href{https://openai.com/index/new-embedding-models-and text_var = text, id_var = id, model = "text-embedding-3-large", - concurrent_requests = 3 + concurrent_requests = 5 ) } } diff --git a/man/perform_request_or_return_error.Rd b/man/perform_request_or_return_error.Rd new file mode 100644 index 0000000..02d7cfe --- /dev/null +++ b/man/perform_request_or_return_error.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/core.R +\name{perform_request_or_return_error} +\alias{perform_request_or_return_error} +\title{Perform request and return response or error object} +\usage{ +perform_request_or_return_error(request) +} +\arguments{ +\item{request}{An httr2 request object} +} +\value{ +An httr2_response object (check status with resp_status()) or an error condition +} +\description{ +Performs a request and returns the response. Since req_error(is_error = ~ FALSE) +is set in base_request(), httr2 won't throw errors for HTTP status codes >= 400. +Instead, callers should check the response status with httr2::resp_status(). +} +\keyword{internal} diff --git a/man/perform_requests_with_strategy.Rd b/man/perform_requests_with_strategy.Rd index a2fda0a..80ce1e7 100644 --- a/man/perform_requests_with_strategy.Rd +++ b/man/perform_requests_with_strategy.Rd @@ -18,7 +18,7 @@ perform_requests_with_strategy( \item{progress}{Logical indicating whether to show progress bar (default: TRUE)} } \value{ -List of httr2_response objects or error objects for failed requests +List of httr2_response objects (check status with resp_status()) or error objects for network failures } \description{ Executes a list of HTTP requests either sequentially or in parallel. @@ -26,7 +26,11 @@ Automatically chooses sequential processing when concurrent_requests = 1 or when there's only one request. } \details{ -returns responses in the order that requests were sent, and returns errors in a predictable format. +Returns responses in the order that requests were sent. +Since requests use req_error(is_error = ~ FALSE), HTTP error responses (status >= 400) +are returned as httr2_response objects rather than being thrown as errors. +Callers should check response status with httr2::resp_status() or use +httr2::resps_successes() / httr2::resps_failures() to categorise responses. } \examples{ \dontrun{ diff --git a/man/process_response.Rd b/man/process_response.Rd index ec9f819..4082549 100644 --- a/man/process_response.Rd +++ b/man/process_response.Rd @@ -18,7 +18,7 @@ A tibble with processed results or error information, including: \itemize{ \item original_index: Position in original request batch \item .error: Logical indicating if an error occurred -\item .error_message: Character description of any error +\item .error_msg: Character description of any error \item Additional columns from tidy_func output } } diff --git a/man/safely_perform_request.Rd b/man/safely_perform_request.Rd index 9800ea8..d72b436 100644 --- a/man/safely_perform_request.Rd +++ b/man/safely_perform_request.Rd @@ -10,8 +10,9 @@ safely_perform_request(request) \item{request}{An httr2 request object} } \value{ -A list with components $result and $error +A list with components $result (httr2_response or NULL) and $error (NULL or condition) } \description{ Wrapper around httr2::req_perform that handles errors gracefully. +Returns the response object directly - check status with httr2::resp_status(). } diff --git a/tests/testthat/test-anthropic_messages.R b/tests/testthat/test-anthropic_messages.R new file mode 100644 index 0000000..bc7fddc --- /dev/null +++ b/tests/testthat/test-anthropic_messages.R @@ -0,0 +1,80 @@ +test_that("ant_build_messages_request validates inputs and generates valid requests", { + + expect_error( + ant_build_messages_request(input = c("Vector", "input")), + "input must be a non-empty character string" + ) + + expect_error( + ant_build_messages_request(input = "User stuff", system_prompt = c("Vector", "Prompt")), "must be a " + ) + + req <- expect_no_error( + ant_build_messages_request(input = "Test Input Alone") + ) + + expect_equal(req$headers$`Content-Type`, "application/json") + expect_equal(req$headers$`anthropic-version`, "2023-06-01") + expect_equal(req$body$data$messages[[1]][["content"]], "Test Input Alone") + + expect_equal(req$url, "https://api.anthropic.com/v1/messages") + expect_equal(req$method, "POST") + expect_equal(req$policies$retry_max_tries, 5) + expect_equal(req$options$timeout_ms, 30000) + + expect_error( + ant_build_messages_request("hello",temperature = 2), + "temperature must be numeric between 0 and 1" + ) +}) + +test_that("ant_build_messages accepts a system_prompt and the request is formatted appropriately", { + + message <- "The 4th king of neverland was not Captain Hook" + req <- ant_build_messages_request(message) + + expect_null(req$body$data$system) + + req_w_sys <- ant_build_messages_request(message, system_prompt = "Talk about all things Peter Pan only") + + expect_true(!is.null(req_w_sys$body$data$system)) +}) + +test_that("ant_build_messages_request accepts schemas and formats properly with .ant_format_schema", { + sentiment_schema <- create_json_schema( + name = "sent_schema", + schema = schema_object( + sentiment = schema_enum(values = c("positive", "negative", "neutral")), + required = list("sentiment"), + additional_properties = FALSE + ) + ) + + req_schema <-ant_build_messages_request( + "the UX of tensorflow was vastly inferior to Pytorch, hence the latter's dominance", + schema = sentiment_schema, + model = "claude-sonnet-4-5") + + + schema_data <- req_schema$body$data$output_format + expect_equal(schema_data$type, "json_schema") + + expect_equal(names(schema_data$schema$properties), "sentiment") + expect_equal(req_schema$headers$`anthropic-beta`, "structured-outputs-2025-11-13") + + +}) + +test_that("ant_build_messages_request accepts endpointr_id and adds to headers", { + req <- ant_build_messages_request( + "Hello this a test", + endpointr_id = "id_101" + ) + + expect_equal(req$headers$endpointr_id, "id_101") +}) + + + + + diff --git a/tests/testthat/test-core.R b/tests/testthat/test-core.R index b49ea6c..d3f7a85 100644 --- a/tests/testthat/test-core.R +++ b/tests/testthat/test-core.R @@ -131,7 +131,7 @@ test_that("process_response handles batches of inputs when passed the correct ti single_batch <- expect_no_error(process_response(resp = mock_batch_response, indices = 1:3, tidy_func = tidy_batch_classification_response)) - expect_setequal(names(single_batch), c("positive", "negative", "neutral", "original_index", ".error", ".error_message")) + expect_setequal(names(single_batch), c("positive", "negative", "neutral", "original_index", ".error", ".error_msg", ".status")) expect_equal(nrow(single_batch), 3) # multi-batches @@ -160,7 +160,7 @@ test_that("process_response handles batches of inputs when passed the correct ti ) expect_equal(nrow(processed_batch_results), 9) - expect_equal(ncol(processed_batch_results), 6) + expect_equal(ncol(processed_batch_results), 7) }) @@ -170,3 +170,240 @@ test_that(".create_erorr_tibble deals with indices and messages and outputs a ti expect_true(nrow(error_tib) ==2) }) + + +# error handling tests ---- + +test_that("base_request includes req_error to prevent auto-throwing on HTTP errors", { + # verify that req_error policy is set on base requests + req <- base_request("https://api.example.com", "fake_key") + + # the request should have an error policy (error_is_error) that doesn't auto-throw + expect_true("error_is_error" %in% names(req$policies)) +}) + +test_that(".extract_api_error extracts OpenAI-format error messages", { + # openai format: {"error": {"message": "...", "type": "..."}} + + mock_400 <- httr2::response_json( + status_code = 400L, + body = list(error = list( + message = "Invalid request: missing required parameter 'model'", + type = "invalid_request_error" + )) + ) + + error_msg <- .extract_api_error(mock_400) + expect_equal(error_msg, "Invalid request: missing required parameter 'model'") + + mock_401 <- httr2::response_json( + status_code = 401L, + body = list(error = list( + message = "Incorrect API key provided", + type = "authentication_error" + )) + ) + + error_msg <- .extract_api_error(mock_401) + expect_equal(error_msg, "Incorrect API key provided") + + mock_429 <- httr2::response_json( + status_code = 429L, + body = list(error = list( + message = "Rate limit exceeded. Please retry after 60 seconds.", + type = "rate_limit_error" + )) + ) + + error_msg <- .extract_api_error(mock_429) + expect_equal(error_msg, "Rate limit exceeded. Please retry after 60 seconds.") +}) + +test_that(".extract_api_error extracts HuggingFace-format error messages", { + # huggingface format: {"error": "..."} + + mock_503 <- httr2::response_json( + status_code = 503L, + body = list(error = "Model is currently loading, please retry in 30 seconds") + ) + + error_msg <- .extract_api_error(mock_503) + expect_equal(error_msg, "Model is currently loading, please retry in 30 seconds") + + mock_500 <- httr2::response_json( + status_code = 500L, + body = list(error = "Internal server error") + ) + + error_msg <- .extract_api_error(mock_500) + expect_equal(error_msg, "Internal server error") +}) + +test_that(".extract_api_error extracts Anthropic-format error messages", { + # anthropic format: {"message": "..."} + + mock_529 <- httr2::response_json( + status_code = 529L, + body = list(message = "Anthropic API is temporarily overloaded") + ) + + error_msg <- .extract_api_error(mock_529) + expect_equal(error_msg, "Anthropic API is temporarily overloaded") +}) + +test_that(".extract_api_error falls back to HTTP status when body parsing fails", { + # response with non-json body or unexpected structure + + mock_502 <- httr2::response_json( + status_code = 502L, + body = list(unexpected_field = "something went wrong") + ) + + error_msg <- .extract_api_error(mock_502) + expect_equal(error_msg, "HTTP 502") + + mock_403 <- httr2::response_json( + status_code = 403L, + body = list() # empty body + ) + + error_msg <- .extract_api_error(mock_403) + expect_equal(error_msg, "HTTP 403") +}) + +test_that(".extract_api_error returns NA for successful responses", { + mock_200 <- httr2::response_json( + status_code = 200L, + body = list(data = "success") + ) + + error_msg <- .extract_api_error(mock_200) + expect_true(is.na(error_msg)) +}) + +test_that(".extract_api_error handles non-response objects gracefully", { + # error condition object + err <- simpleError("Connection timed out") + error_msg <- .extract_api_error(err) + expect_equal(error_msg, "Connection timed out") + + # generic object with fallback + error_msg <- .extract_api_error("not a response", fallback_message = "Unknown failure") + expect_equal(error_msg, "Unknown failure") + + # NULL input + error_msg <- .extract_api_error(NULL, fallback_message = "Request failed") + expect_equal(error_msg, "Request failed") +}) + +test_that(".extract_api_error handles all common HTTP error status codes", { + # test a range of common error codes + status_codes <- c(400L, 401L, 403L, 404L, 429L, 500L, 502L, 503L, 529L) + + for (code in status_codes) { + mock_resp <- httr2::response_json( + status_code = code, + body = list(error = list(message = paste("Error", code))) + ) + + error_msg <- .extract_api_error(mock_resp) + expect_equal(error_msg, paste("Error", code), info = paste("Failed for status code", code)) + } +}) + +test_that("process_response handles HTTP error responses correctly", { + # mock a 429 rate limit error + mock_429 <- httr2::response_json( + status_code = 429L, + body = list(error = list( + message = "Rate limit exceeded", + type = "rate_limit_error" + )) + ) + + # check warning is produced + expect_warning( + process_response(mock_429, indices = 1:3, tidy_func = tidy_classification_response), + "Request failed with status 429" + ) + + # capture result for assertions + result <- suppressWarnings( + process_response(mock_429, indices = 1:3, tidy_func = tidy_classification_response) + ) + + expect_true(all(result$.error)) + expect_equal(result$.error_msg[1], "Rate limit exceeded") + expect_equal(nrow(result), 3) + expect_true("original_index" %in% names(result)) + + # mock a 500 server error + mock_500 <- httr2::response_json( + status_code = 500L, + body = list(error = list(message = "Internal server error")) + ) + + # check warning is produced + expect_warning( + process_response(mock_500, indices = c(5, 6), tidy_func = tidy_classification_response), + "Request failed with status 500" + ) + + # capture result for assertions + result <- suppressWarnings( + process_response(mock_500, indices = c(5, 6), tidy_func = tidy_classification_response) + ) + + expect_true(all(result$.error)) + expect_equal(result$.error_msg[1], "Internal server error") + expect_equal(nrow(result), 2) +}) + +test_that("process_response handles mixed success and error batches", { + # this tests that we can process a mix of successful and failed responses + + # successful response + success_body <- list( + list( + list(label = "positive", score = 0.9), + list(label = "negative", score = 0.1) + ) + ) + + mock_success <- httr2::response_json( + status_code = 200L, + body = success_body + ) + + # error response + mock_error <- httr2::response_json( + status_code = 429L, + body = list(error = list(message = "Rate limited")) + ) + + responses <- list(mock_success, mock_error) + indices_list <- list(1, 2) + + # process both + results <- purrr::map2( + responses, + indices_list, + ~ suppressWarnings(process_response(.x, .y, tidy_func = tidy_classification_response)) + ) |> + purrr::list_rbind() + + expect_equal(nrow(results), 2) + expect_equal(sum(results$.error), 1) # one error + expect_equal(sum(!results$.error), 1) # one success +}) + +test_that(".create_error_tibble produces correct structure with .error_msg column", { + error_tib <- .create_error_tibble(c(1, 2, 3), "Something went wrong") + + expect_s3_class(error_tib, "tbl_df") + expect_equal(nrow(error_tib), 3) + expect_true(all(error_tib$.error)) + expect_true(all(error_tib$.error_msg == "Something went wrong")) + expect_equal(error_tib$original_index, c(1, 2, 3)) + expect_setequal(names(error_tib), c("original_index", ".error", ".error_msg", ".status")) +}) diff --git a/tests/testthat/test-hf_classify.R b/tests/testthat/test-hf_classify.R index d78ec94..0b66729 100644 --- a/tests/testthat/test-hf_classify.R +++ b/tests/testthat/test-hf_classify.R @@ -143,7 +143,7 @@ test_that("hf_classify_batch processes a batch of texts and returns a tidied cla expect_equal(nrow(res), 4) - expect_setequal(names(res), c("positive", "negative", "neutral", ".error", ".error_message")) + expect_setequal(names(res), c("positive", "negative", "neutral", ".error", ".error_msg")) }) @@ -152,7 +152,7 @@ test_that("hf_classify_chunks processes chunks correctly", { texts <- paste0("text", 1:6) ids <- paste0("id", 1:length(texts)) temp_dir <- withr::local_tempdir() - expected_cols <- c("id", "text", ".error", ".error_msg", ".chunk", "positive", "negative", "neutral") + expected_cols <- c("id", "text", ".error", ".error_msg", ".status", ".chunk", "positive", "negative", "neutral") # Test with chunk_size = 2 chunk_2 <- expect_no_error(hf_classify_chunks( diff --git a/tests/testthat/test-hf_embed.R b/tests/testthat/test-hf_embed.R index 3953ab8..e8e6399 100644 --- a/tests/testthat/test-hf_embed.R +++ b/tests/testthat/test-hf_embed.R @@ -60,7 +60,7 @@ test_that("hf_embed_batch works correctly with tidy_func parameter added", { expect_s3_class(result, "tbl_df") expect_equal(nrow(result), 2) - expect_true(all(c("V1", "V2", "V3", ".error", ".error_message") %in% names(result))) + expect_true(all(c("V1", "V2", "V3", ".error", ".error_msg") %in% names(result))) }) test_that("hf_embed_batch allows custom tidy_func", { @@ -88,7 +88,7 @@ test_that("hf_embed_chunks replaces hf_embed_batch", { texts <- paste0("text", 1:6) ids <- paste0('id', 1:length(texts)) temp_dir <- withr::local_tempdir() - expected_cols <- c("id", ".error", ".error_msg", ".chunk", "V1", "V2", "V3") + expected_cols <- c("id", ".error", ".error_msg", ".status", ".chunk", "V1", "V2", "V3") chunk_2 <- expect_no_error(hf_embed_chunks( diff --git a/tests/testthat/test-openai_completions.R b/tests/testthat/test-openai_completions.R index 3e8ed92..97ad2d5 100644 --- a/tests/testthat/test-openai_completions.R +++ b/tests/testthat/test-openai_completions.R @@ -202,13 +202,13 @@ test_that("oai_complete_df takes single row, multi-row data frames as inputs", { endpoint_url = endpoint_url, concurrent_requests = 1, max_retries = 1, - output_file = NULL) + output_dir = NULL) ) ) expect_setequal(names(successful_response), - c("id", "content", ".error_msg", ".error", ".batch")) + c("id", "content", ".error", ".error_msg", ".status", ".chunk")) expect_setequal(unique(successful_response$content), "positive") withr::with_envvar( @@ -220,7 +220,7 @@ test_that("oai_complete_df takes single row, multi-row data frames as inputs", { endpoint_url = endpoint_url, concurrent_requests = 1, max_retries = 1, - output_file = NULL), + output_dir = NULL), regexp = "Performing 5 requests sequentially" ) @@ -236,7 +236,7 @@ test_that("oai_complete_df takes single row, multi-row data frames as inputs", { endpoint_url = endpoint_url, concurrent_requests = 5, max_retries = 1, - output_file = NULL), + output_dir = NULL), regexp = "with 5 concurrent requests" ) @@ -287,7 +287,7 @@ test_that("oai_complete_df takes a schema as input", { concurrent_requests = 1, max_retries = 1, schema = sentiment_schema, - output_file = NULL + output_dir = NULL ) ) @@ -357,7 +357,7 @@ test_that("oai_complete_df handles mixed validation success/failure", { concurrent_requests = 1, max_retries = 1, schema = sentiment_schema, - output_file = NULL + output_dir = NULL )} ) diff --git a/todos.qmd b/todos.qmd index 9fed3d6..fe7293d 100644 --- a/todos.qmd +++ b/todos.qmd @@ -1,6 +1,42 @@ # EndpointR Hugging Face Embeddings Implementation Checklist -# 0.1.2 +# Versions + +## 0.2 + +- [ ] Support for Anthropic API + - [ ] Batches + - [ ] Messages (Completions) + - [ ] Structured Outputs +- [ ] Support for Gemini API + - [ ] Embeddings + - [ ] Completions + - [ ] Structured Outputs +- [ ] LLM Providers Vignette Updated +- [ ] Structured Outputs Vignette Updated +- [ ] Better error propagation throughout package (refactor, large) + +Error reporting is somewhat annoying by default with httr2::req_perform() if we don't: + +``` +response <- req |> + httr2::req_error(is_error = ~ FALSE) |> + httr2::req_perform() + +if (httr2::resp_status(response) >= 400) { + error_body <- tryCatch( + httr2::resp_body_json(response), + error = function(e) list(error = list(message = paste("HTTP", httr2::resp_status(response)))) + ) + + cli::cli_abort(c( + "API request failed ({error_body$error$type %||% 'unknown'})", + "x" = error_body$error$message %||% paste("HTTP", httr2::resp_status(response)) + )) +} +``` + +## 0.1.2 - [x] tests passing following hf\_\* changes - [x] max_length for hf functions - not applicable in hf_embed\_\* functions as they use HF's TEI which doesn't allow max_length @@ -317,3 +353,58 @@ Schema -\> Type relationship? e.g. schema_number has its own method which coerce - [x] Vignettes with real-world examples - [x] Vignette re-working/refining based on JH feedback - [ ] Optional Fields and Schemas + +# Anthropic API + +## Anthropic API - Messages + +Anthropic Version of Completions API [docs](https://platform.claude.com/docs/en/api/overview) + +- [ ] build request with headers and auth. THey use X-api-key for auth +- [ ] `ant_complete_text()` +- [ ] `ant_complete_chunks()` +- [ ] `ant_complete_df()` +- [ ] Structured outputs [docs](https://platform.claude.com/docs/en/build-with-claude/structured-outputs) uses JSON schema similar to OAI "To use the feature, set the beta header structured-outputs-2025-11-13" + +Default request looks like: + +``` +curl https://api.anthropic.com/v1/messages \ + -H 'Content-Type: application/json' \ + -H "X-Api-Key: $ANTHROPIC_API_KEY" \ + -d '{ + "max_tokens": 1024, + "messages": [ + { + "content": "Hello, world", + "role": "user" + } + ], + "model": "claude-sonnet-4-5-20250929" + }' +``` + +Basic 200 response body: + +``` +{ + "id": "msg_01XFDUDYJgAACzvnptvVoYEL", + "type": "message", + "role": "assistant", + "content": [ + { + "type": "text", + "text": "Hello!" + } + ], + "model": "claude-sonnet-4-5", + "stop_reason": "end_turn", + "stop_sequence": null, + "usage": { + "input_tokens": 12, + "output_tokens": 6 + } +} +``` + +## Anthropic API - Batches diff --git a/vignettes/embeddings_providers.Rmd b/vignettes/embeddings_providers.Rmd index 1a025bc..1287b91 100644 --- a/vignettes/embeddings_providers.Rmd +++ b/vignettes/embeddings_providers.Rmd @@ -126,7 +126,7 @@ batch_embeddings <- hf_embed_batch( glimpse(batch_embeddings[1,1:10 ]) # truncated for ease ``` -The result includes: - `text`: your original text - `.error` and `.error_message`: error tracking - `V1` to `V768`: the embedding dimensions +The result includes: - `text`: your original text - `.error` and `.error_msg`: error tracking - `V1` to `V768`: the embedding dimensions ## Data Frame Integration @@ -144,7 +144,7 @@ embedded_df <- hf_embed_df( ) # Original data + embeddings -names(embedded_df)[1:10] # shows: id, text, category, .error, .error_message, V1, V2... +names(embedded_df)[1:10] # shows: id, text, category, .error, .error_msg, V1, V2... embedded_df ``` @@ -298,7 +298,7 @@ results <- oai_embed_batch(texts = texts_to_embed) if (any(results$.error)) { failed <- results |> filter(.error) |> - select(text, .error_message) + select(text, .error_msg) print(failed) diff --git a/vignettes/hugging_face_inference.Rmd b/vignettes/hugging_face_inference.Rmd index da2470e..f985801 100644 --- a/vignettes/hugging_face_inference.Rmd +++ b/vignettes/hugging_face_inference.Rmd @@ -247,7 +247,7 @@ The result includes: - `text`: your original text - `.error`: TRUE if something went wrong -- `.error_message`: what went wrong (if anything) +- `.error_msg`: what went wrong (if anything) - `V1` to `V384`: the embedding values ## Processing Data Frames with Chunk Writing diff --git a/vignettes/llm_providers.Rmd b/vignettes/llm_providers.Rmd index 381ed67..ff53a9c 100644 --- a/vignettes/llm_providers.Rmd +++ b/vignettes/llm_providers.Rmd @@ -85,7 +85,7 @@ oai_complete_df( review_df, text_var = text, id_var = id, - output_file = NULL, # leave this to 'auto' to have your results written to a file in your current working directory + output_dir = NULL, # leave this to 'auto' to have your results written to a directory in your current working directory system_prompt = sentiment_system_prompt, concurrent_requests = 2, chunk_size = 5 @@ -99,7 +99,7 @@ oai_complete_df( ✔ Batch 1: 5 successful, 0 failed ✔ Completed processing: 5 successful, 0 failed # A tibble: 5 × 5 - id content .error .error_msg .batch + id content .error .error_msg .chunk 1 1 "The sentiment of the text is highly positive." FALSE NA 1 @@ -131,9 +131,9 @@ structured_df <- oai_complete_df( text_var = text, id_var = id, schema = sentiment_schema, - output_file = NULL, + output_dir = NULL, system_prompt = sentiment_system_prompt, - concurrent_requests = 2, + concurrent_requests = 2, chunk_size = 5 ) @@ -146,7 +146,7 @@ structured_df <- oai_complete_df( ✔ Batch 1: 5 successful, 0 failed ✔ Completed processing: 5 successful, 0 failed # A tibble: 5 × 5 - id content .error .error_msg .batch + id content .error .error_msg .chunk 1 1 "{\"sentiment\":\"positive\"}" FALSE NA 1 @@ -169,7 +169,7 @@ structured_df |> ```{=html}
 # A tibble: 5 × 5
-     id sentiment .error .error_msg .batch
+     id sentiment .error .error_msg .chunk
                  
 1     1 positive  FALSE  NA              1
 2     2 negative  FALSE  NA              1
@@ -394,14 +394,14 @@ df_classical_texts <- tibble(
 )
 ```
 
-We have a function `oai_complete_df()` which takes a data frame, an id variable, and a text variable as mandatory inputs, and returns a data frame with columns: `id_var`, `text+var`, `.error_msg`, `.error`, `.batch`.
+We have a function `oai_complete_df()` which takes a data frame, an id variable, and a text variable as mandatory inputs, and returns a data frame with columns: `id_var`, `content`, `.error`, `.error_msg`, `.chunk`.
 
 ```{r}
 oai_complete_df(df_classical_texts, 
                 text_var = text, 
                 id_var = id,
                 concurrent_requests = 5,
-                output_file = NULL # set this to write to a temporary file, useful for documentation and testing.
+                output_dir = NULL # set this to write to a temporary directory, useful for documentation and testing.
                 )
 ```