diff --git a/DESCRIPTION b/DESCRIPTION index 7916adc..9a376ed 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,8 +1,9 @@ Package: EndpointR Title: Connects to various Machine Learning inference providers -Version: 0.2.1 +Version: 0.2.2 Authors@R: c( - person("Jack", "Penzer", , "Jack.penzer@sharecreative.com", role = c("aut", "cre")), + person("Jack", "Penzer", , "jack.penzer@samy.com", role = c("aut", "cre")), + person("Ben", "Jessup", , "ben.jessup@samy.com", role = "aut"), person("Claude", "AI", role = "aut") ) Description: EndpointR is a 'batteries included', open-source R package for connecting to various APIs for Machine Learning model predictions. EndpointR is built for company-specific use cases, so may not be useful to a wide audience. diff --git a/NAMESPACE b/NAMESPACE index 4485b08..6ec7ff2 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,5 +1,10 @@ # Generated by roxygen2: do not edit by hand +export(ant_batch_cancel) +export(ant_batch_create) +export(ant_batch_list) +export(ant_batch_results) +export(ant_batch_status) export(ant_build_messages_request) export(ant_complete_chunks) export(ant_complete_df) diff --git a/NEWS.md b/NEWS.md index 865a503..c5072c8 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,23 @@ +# EndpointR (dev) + +# EndpointR 0.2.2 + +## Anthropic Messages API +- `ant_build_messages_request()` now automatically enables prompt caching when a `system_prompt` is provided, structuring it as a content block with `cache_control`. This benefits `ant_complete_chunks()` and `ant_complete_df()` where many requests share the same system prompt — cached reads cost 90% less than uncached. +- Structured outputs is out of BETA and is now generally available, so the header is removed, and `output_form` --> `output_config` in the body of the request following [Anthropic Docs on Structured Outputs](https://platform.claude.com/docs/en/build-with-claude/structured-outputs) + +## Anthropic Batch API + +Functions for dealing with Anthropic Bathches API, works differently ot the OpenAI API - as we send requests not files. + +- `ant_batch_create()` +- `ant_batch_status()` +- `ant_batch_results()` +- `ant_batch_list()` +- `ant_batch_cancel()` + +See the [Sync Async Vignette](articles/sync_async.html#anthropic-message-batches-api) for more details + # EndpointR 0.2.1 ## OpenAI Batch API diff --git a/R/anthropic_batch.R b/R/anthropic_batch.R new file mode 100644 index 0000000..24eb1c0 --- /dev/null +++ b/R/anthropic_batch.R @@ -0,0 +1,454 @@ +# constants ---- +.ANT_BATCHES_ENDPOINT <- "https://api.anthropic.com/v1/messages/batches" + +# ant_batch_create ---- +#' Create an Anthropic Message Batch +#' +#' @description +#' Submits a batch of message requests to Anthropic's Message Batches API. +#' Batches are processed asynchronously with 50% cost savings and a 24-hour +#' completion window. +#' +#' @details +#' Each request in the batch is a standalone Messages API call. The API +#' supports up to 100,000 requests per batch. +#' +#' When `system_prompt` is provided, prompt caching is automatically enabled +#' by adding `cache_control` to each request's params. The API applies the +#' cache breakpoint to the last cacheable block. +#' +#' For structured outputs, pass a `json_schema` object to `schema`. This uses +#' the GA `output_config` format (no beta header required). +#' +#' @param texts Character vector of texts to send to the model +#' @param custom_ids Character vector of unique identifiers (same length as texts) +#' @param model Anthropic model to use +#' @param system_prompt Optional system prompt (applied to all requests) +#' @param schema Optional JSON schema for structured output (json_schema object or list) +#' @param max_tokens Maximum tokens per response +#' @param temperature Sampling temperature (0-1) +#' @param key_name Environment variable name for API key +#' @param endpoint_url Anthropic Batches API endpoint URL +#' @param timeout Request timeout in seconds +#' +#' @return A list of batch metadata including `$id` for tracking +#' @export +#' +#' @seealso [ant_batch_status()], [ant_batch_results()], [ant_batch_list()], [ant_batch_cancel()] +#' @examples +#' \dontrun{ +#' batch <- ant_batch_create( +#' texts = c("Hello", "World"), +#' custom_ids = c("t1", "t2"), +#' system_prompt = "Reply in one word" +#' ) +#' # Check status later +#' ant_batch_status(batch$id) +#' } +ant_batch_create <- function( + texts, + custom_ids, + model = .ANT_DEFAULT_MODEL, + system_prompt = NULL, + schema = NULL, + max_tokens = 1024L, + temperature = 0, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT, + timeout = 60L +) { + + stopifnot( + "texts must be a character vector" = is.character(texts) && length(texts) > 0, + "custom_ids must be a character vector" = is.character(custom_ids) && length(custom_ids) > 0, + "texts and custom_ids must be the same length" = length(texts) == length(custom_ids), + "custom_ids must be unique" = !anyDuplicated(custom_ids), + "batch cannot exceed 100,000 requests" = length(texts) <= 100000 + ) + + if (!is.null(system_prompt)) { + if (!rlang::is_scalar_character(system_prompt)) { + cli::cli_abort("{.arg system_prompt} must be a {.cls character} of length 1") + } + } + + # pre-format schema once + formatted_schema <- NULL + if (!is.null(schema)) { + if (inherits(schema, "EndpointR::json_schema")) { + formatted_schema <- .ant_format_schema(schema) + } else if (is.list(schema)) { + formatted_schema <- schema + } else { + cli::cli_abort("{.arg schema} must be an EndpointR json_schema object or a list") + } + } + + # build the requests array + requests <- purrr::map2(texts, custom_ids, function(text, id) { + params <- list( + model = model, + max_tokens = as.integer(max_tokens), + temperature = temperature, + messages = list( + list(role = "user", content = text) + ) + ) + + if (!is.null(system_prompt)) { + params$system <- system_prompt + params$cache_control <- list(type = "ephemeral") + } + + if (!is.null(formatted_schema)) { + params$output_config <- formatted_schema + } + + list( + custom_id = id, + params = params + ) + }) + + api_key <- get_api_key(key_name) + + body <- list(requests = requests) + + response <- httr2::request(endpoint_url) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_method("POST") |> + httr2::req_headers( + "Content-Type" = "application/json", + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_body_json(body) |> + httr2::req_error(is_error = ~ FALSE) |> + httr2::req_timeout(timeout) |> + httr2::req_perform() + + if (httr2::resp_status(response) >= 400) { + error_msg <- .extract_api_error(response) + cli::cli_abort(c( + "Batch creation failed", + "x" = error_msg + )) + } + + httr2::resp_body_json(response) +} + +# ant_batch_status ---- +#' Check the Status of an Anthropic Message Batch +#' +#' @description +#' Retrieves the current status and metadata for a message batch. +#' +#' @param batch_id Character string of the batch ID (returned by `ant_batch_create()`) +#' @param key_name Environment variable name for API key +#' @param endpoint_url Anthropic Batches API endpoint URL +#' +#' @return A list of batch metadata including `processing_status`, `request_counts`, etc. +#' @export +#' +#' @examples +#' \dontrun{ +#' status <- ant_batch_status("msgbatch_abc123") +#' status$processing_status # e.g. "in_progress", "ended" +#' } +ant_batch_status <- function( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) { + + stopifnot( + "batch_id must be a non-empty character string" = is.character(batch_id) && length(batch_id) == 1 && nchar(batch_id) > 0 + ) + + api_key <- get_api_key(key_name) + + response <- httr2::request(paste0(endpoint_url, "/", batch_id)) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_headers( + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_error(is_error = ~ FALSE) |> + httr2::req_perform() + + if (httr2::resp_status(response) >= 400) { + error_msg <- .extract_api_error(response) + cli::cli_abort(c( + "Failed to retrieve batch status", + "x" = error_msg + )) + } + + httr2::resp_body_json(response) +} + +# ant_batch_results ---- +#' Retrieve Results from a Completed Anthropic Message Batch +#' +#' @description +#' Downloads and parses results from a completed message batch. The batch +#' must have `processing_status` of `"ended"` before results can be retrieved. +#' +#' @details +#' Results are returned as a tibble with one row per request. The function +#' handles all four Anthropic result types: succeeded, errored, canceled, +#' and expired. +#' +#' @inheritParams ant_batch_status +#' +#' @return A tibble with columns: `custom_id`, `content`, `.error`, +#' `.error_msg`, `stop_reason`, `input_tokens`, `output_tokens` +#' @export +#' +#' @examples +#' \dontrun{ +#' results <- ant_batch_results("msgbatch_abc123") +#' results$content # model responses +#' } +ant_batch_results <- function( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) { + + batch_meta <- ant_batch_status( + batch_id, + key_name = key_name, + endpoint_url = endpoint_url + ) + + if (batch_meta$processing_status != "ended") { + cli::cli_abort(c( + "Batch is not yet complete", + "i" = "Current status: {batch_meta$processing_status}", + "i" = "Use {.fn ant_batch_status} to check progress" + )) + } + + results_url <- batch_meta$results_url + + if (is.null(results_url)) { + cli::cli_abort("Batch has ended but no results URL is available") + } + + api_key <- get_api_key(key_name) + + response <- httr2::request(results_url) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_headers( + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_error(is_error = ~ FALSE) |> + httr2::req_perform() + + if (httr2::resp_status(response) >= 400) { + error_msg <- .extract_api_error(response) + cli::cli_abort(c( + "Failed to retrieve batch results", + "x" = error_msg + )) + } + + raw_text <- httr2::resp_body_string(response) + lines <- strsplit(raw_text, "\n")[[1]] + lines <- lines[nchar(lines) > 0] + + if (length(lines) == 0) { + return(tibble::tibble( + custom_id = character(), + content = character(), + .error = logical(), + .error_msg = character(), + stop_reason = character(), + input_tokens = integer(), + output_tokens = integer() + )) + } + + parsed <- purrr::imap(lines, \(line, idx) { + tryCatch( + jsonlite::fromJSON(line, simplifyVector = FALSE), + error = function(e) { + list( + custom_id = paste0("__PARSE_ERROR_LINE_", idx), + result = list(type = "errored", error = list(message = paste("Failed to parse JSONL line", idx))) + ) + } + ) + }) + + results <- purrr::map(parsed, function(item) { + custom_id <- item$custom_id + result <- item$result + result_type <- result$type + + if (result_type == "succeeded") { + msg <- result$message + content <- .extract_content_from_batch_message(msg) + return(tibble::tibble( + custom_id = custom_id, + content = content, + .error = FALSE, + .error_msg = NA_character_, + stop_reason = msg$stop_reason %||% NA_character_, + input_tokens = msg$usage$input_tokens %||% NA_integer_, + output_tokens = msg$usage$output_tokens %||% NA_integer_ + )) + } + + # errored, canceled, expired + error_msg <- if (result_type == "errored") { + result$error$message %||% "Unknown error" + } else { + result_type + } + + tibble::tibble( + custom_id = custom_id, + content = NA_character_, + .error = TRUE, + .error_msg = error_msg, + stop_reason = NA_character_, + input_tokens = NA_integer_, + output_tokens = NA_integer_ + ) + }) + + purrr::list_rbind(results) +} + +# ant_batch_list ---- +#' List Anthropic Message Batches +#' +#' @description +#' Retrieves a paginated list of message batches. +#' +#' @param limit Maximum number of batches to return (1-1000, default 20) +#' @param before_id Cursor for backward pagination (batch ID) +#' @param after_id Cursor for forward pagination (batch ID) +#' @param key_name Environment variable name for API key +#' @param endpoint_url Anthropic Batches API endpoint URL +#' +#' @return A list containing batch metadata and pagination information +#' @export +#' +#' @examples +#' \dontrun{ +#' batches <- ant_batch_list(limit = 10) +#' } +ant_batch_list <- function( + limit = 20L, + before_id = NULL, + after_id = NULL, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) { + + stopifnot( + "limit must be between 1 and 1000" = is.numeric(limit) && limit >= 1 && limit <= 1000 + ) + + api_key <- get_api_key(key_name) + + req <- httr2::request(endpoint_url) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_headers( + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_url_query(limit = as.integer(limit)) |> + httr2::req_error(is_error = ~ FALSE) + + if (!is.null(before_id)) { + req <- httr2::req_url_query(req, before_id = before_id) + } + if (!is.null(after_id)) { + req <- httr2::req_url_query(req, after_id = after_id) + } + + response <- httr2::req_perform(req) + + if (httr2::resp_status(response) >= 400) { + error_msg <- .extract_api_error(response) + cli::cli_abort(c( + "Failed to list batches", + "x" = error_msg + )) + } + + httr2::resp_body_json(response) +} + +# ant_batch_cancel ---- +#' Cancel an Anthropic Message Batch +#' +#' @description +#' Cancels an in-progress message batch. Requests already being processed +#' may still complete. +#' +#' @inheritParams ant_batch_status +#' +#' @return A list of batch metadata reflecting the cancellation +#' @export +#' +#' @examples +#' \dontrun{ +#' ant_batch_cancel("msgbatch_abc123") +#' } +ant_batch_cancel <- function( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) { + + stopifnot( + "batch_id must be a non-empty character string" = is.character(batch_id) && length(batch_id) == 1 && nchar(batch_id) > 0 + ) + + api_key <- get_api_key(key_name) + + response <- httr2::request(paste0(endpoint_url, "/", batch_id, "/cancel")) |> + httr2::req_user_agent("EndpointR") |> + httr2::req_method("POST") |> + httr2::req_headers( + "x-api-key" = api_key, + "anthropic-version" = .ANT_API_VERSION + ) |> + httr2::req_error(is_error = ~ FALSE) |> + httr2::req_perform() + + if (httr2::resp_status(response) >= 400) { + error_msg <- .extract_api_error(response) + cli::cli_abort(c( + "Failed to cancel batch", + "x" = error_msg + )) + } + + httr2::resp_body_json(response) +} + +# internal helpers ---- +#' Extract text content from a batch result message object +#' @keywords internal +.extract_content_from_batch_message <- function(msg) { + content_blocks <- msg$content + + if (is.null(content_blocks) || length(content_blocks) == 0) { + return(NA_character_) + } + + for (block in content_blocks) { + if (block$type == "text") { + return(block$text) + } + } + return(NA_character_) +} diff --git a/R/anthropic_messages.R b/R/anthropic_messages.R index 1e69255..671a58f 100644 --- a/R/anthropic_messages.R +++ b/R/anthropic_messages.R @@ -1,6 +1,5 @@ # constants ---- .ANT_API_VERSION <- "2023-06-01" -.ANT_STRUCTURED_OUTPUTS_BETA <- "structured-outputs-2025-11-13" .ANT_MESSAGES_ENDPOINT <- "https://api.anthropic.com/v1/messages" .ANT_DEFAULT_MODEL <- "claude-haiku-4-5" @@ -16,15 +15,14 @@ #' @details #' This function creates the HTTP request but does not execute it. For #' structured outputs, you must use a supported model (Claude Sonnet 4.5 -#' or Opus 4.1) and the request will automatically include the required -#' beta header. +#' or Opus 4.1). #' #' The `schema` parameter accepts either: #' - A `json_schema` S7 object created with `create_json_schema()` -#' - A raw list in Anthropic's `output_format` structure +#' - A raw list in Anthropic's `output_config` structure #' -#' Unlike OpenAI, Anthropic uses `output_format` (not `response_format`) -#' and the schema structure differs slightly. +#' Anthropic uses `output_config` with a nested `format` field for structured +#' outputs. This is now GA and requires no beta header. #' #' @param input Text input to send to the model #' @param endpointr_id An id that will persist through to response @@ -32,7 +30,9 @@ #' @param temperature Sampling temperature (0-2), higher values = more randomness #' @param max_tokens Maximum tokens in response #' @param schema Optional JSON schema for structured output (json_schema object or list) -#' @param system_prompt Optional system prompt +#' @param system_prompt Optional system prompt. When provided, prompt caching +#' is automatically enabled via `cache_control`, reducing costs for repeated +#' requests sharing the same system prompt. #' @param key_name Environment variable name for API key #' @param endpoint_url Anthropic API endpoint URL #' @param timeout Request timeout in seconds @@ -87,8 +87,6 @@ ant_build_messages_request <- function( "temperature must be numeric between 0 and 1" = is.numeric(temperature) && temperature >= 0 && temperature <= 1, # diff to OAI API "max_tokens must be a positive integer" = is.numeric(max_tokens) && max_tokens > 0) - use_structured_outputs <- FALSE # flag for later control flow - api_key <- get_api_key(key_name) messages <- list( @@ -109,18 +107,22 @@ ant_build_messages_request <- function( cli::cli_abort("{.arg system_prompt} must be a {.cls character} of length 1, e.g. 'This is a valid system prompt'") } - body$system <- system_prompt + body$system <- list( + list( + type = "text", + text = system_prompt, + cache_control = list(type = "ephemeral") + ) + ) } - # if(!is.null(schema)) { - use_structured_outputs <- TRUE if (inherits(schema, "EndpointR::json_schema")) { - body$output_format <- .ant_format_schema(schema) + body$output_config <- .ant_format_schema(schema) } else if (is.list(schema)) { - body$output_format <- schema + body$output_config <- schema } else { - cli::cli_abort("{.arg chema} must be an EndpointR json_schema object or a list") + cli::cli_abort("{.arg schema} must be an EndpointR json_schema object or a list") } } @@ -142,12 +144,6 @@ ant_build_messages_request <- function( ) |> httr2::req_body_json(body) - # if we did use structured outputs then we need to add the anthropic-beta header (this will be patched at some point I expect) - - if (use_structured_outputs) { - request <- httr2::req_headers(request, "anthropic-beta" = .ANT_STRUCTURED_OUTPUTS_BETA) - } - if (!is.null(endpointr_id)) { request <- httr2::req_headers(request, endpointr_id = endpointr_id) } @@ -307,7 +303,9 @@ ant_complete_text <- function(text, #' @param ids Vector of unique identifiers (same length as texts) #' @param chunk_size Number of texts per chunk before writing to disk #' @param model Anthropic model to use -#' @param system_prompt Optional system prompt (applied to all requests) +#' @param system_prompt Optional system prompt (applied to all requests). +#' Prompt caching is enabled automatically, reducing costs when the same +#' system prompt is shared across many requests. #' @param output_dir Directory for parquet chunks ("auto" generates timestamped dir) #' @param schema Optional JSON schema for structured output #' @param concurrent_requests Number of concurrent requests @@ -642,18 +640,19 @@ ant_complete_df <- function(df, } -#' Convert json_schema S7 object to Anthropic output_format structure +#' Convert json_schema S7 object to Anthropic output_config structure #' @keywords internal .ant_format_schema <- function(schema) { if (!inherits(schema, "EndpointR::json_schema")) { cli::cli_abort("schema must be a json_schema object") } - # Anthropic uses output_format with type "json_schema" - # The schema goes directly in the "schema" field (not nested like OpenAI) + # Anthropic uses output_config with a nested format field (GA, no beta header needed) list( - type = "json_schema", - schema = schema@schema + format = list( + type = "json_schema", + schema = schema@schema + ) ) } diff --git a/_pkgdown.yml b/_pkgdown.yml index d173353..5825093 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -117,6 +117,12 @@ reference: - ant_complete_chunks - ant_complete_df +- title: "Anthropic Batches" + desc: "Functions for creating and managing batches of requests to Anthropic's Batch Messages API" + contents: + - contains("ant_batch") + + - title: "OpenAI Completions" desc: "Functions for working with OpenAI's APIs including structured outputs" contents: @@ -216,6 +222,8 @@ development: news: releases: + - text: "Version 0.2.2" + href: news/index.html#endpointr-022 - text: "Version 0.2.1" href: news/index.html#endpointr-021 - text: "Version 0.2.0" diff --git a/man/ant_batch_cancel.Rd b/man/ant_batch_cancel.Rd new file mode 100644 index 0000000..80d9d90 --- /dev/null +++ b/man/ant_batch_cancel.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{ant_batch_cancel} +\alias{ant_batch_cancel} +\title{Cancel an Anthropic Message Batch} +\usage{ +ant_batch_cancel( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) +} +\arguments{ +\item{batch_id}{Character string of the batch ID (returned by \code{ant_batch_create()})} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic Batches API endpoint URL} +} +\value{ +A list of batch metadata reflecting the cancellation +} +\description{ +Cancels an in-progress message batch. Requests already being processed +may still complete. +} +\examples{ +\dontrun{ + ant_batch_cancel("msgbatch_abc123") +} +} diff --git a/man/ant_batch_create.Rd b/man/ant_batch_create.Rd new file mode 100644 index 0000000..009f83d --- /dev/null +++ b/man/ant_batch_create.Rd @@ -0,0 +1,73 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{ant_batch_create} +\alias{ant_batch_create} +\title{Create an Anthropic Message Batch} +\usage{ +ant_batch_create( + texts, + custom_ids, + model = .ANT_DEFAULT_MODEL, + system_prompt = NULL, + schema = NULL, + max_tokens = 1024L, + temperature = 0, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT, + timeout = 60L +) +} +\arguments{ +\item{texts}{Character vector of texts to send to the model} + +\item{custom_ids}{Character vector of unique identifiers (same length as texts)} + +\item{model}{Anthropic model to use} + +\item{system_prompt}{Optional system prompt (applied to all requests)} + +\item{schema}{Optional JSON schema for structured output (json_schema object or list)} + +\item{max_tokens}{Maximum tokens per response} + +\item{temperature}{Sampling temperature (0-1)} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic Batches API endpoint URL} + +\item{timeout}{Request timeout in seconds} +} +\value{ +A list of batch metadata including \verb{$id} for tracking +} +\description{ +Submits a batch of message requests to Anthropic's Message Batches API. +Batches are processed asynchronously with 50\% cost savings and a 24-hour +completion window. +} +\details{ +Each request in the batch is a standalone Messages API call. The API +supports up to 100,000 requests per batch. + +When \code{system_prompt} is provided, prompt caching is automatically enabled +by adding \code{cache_control} to each request's params. The API applies the +cache breakpoint to the last cacheable block. + +For structured outputs, pass a \code{json_schema} object to \code{schema}. This uses +the GA \code{output_config} format (no beta header required). +} +\examples{ +\dontrun{ + batch <- ant_batch_create( + texts = c("Hello", "World"), + custom_ids = c("t1", "t2"), + system_prompt = "Reply in one word" + ) + # Check status later + ant_batch_status(batch$id) +} +} +\seealso{ +\code{\link[=ant_batch_status]{ant_batch_status()}}, \code{\link[=ant_batch_results]{ant_batch_results()}}, \code{\link[=ant_batch_list]{ant_batch_list()}}, \code{\link[=ant_batch_cancel]{ant_batch_cancel()}} +} diff --git a/man/ant_batch_list.Rd b/man/ant_batch_list.Rd new file mode 100644 index 0000000..94fec81 --- /dev/null +++ b/man/ant_batch_list.Rd @@ -0,0 +1,36 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{ant_batch_list} +\alias{ant_batch_list} +\title{List Anthropic Message Batches} +\usage{ +ant_batch_list( + limit = 20L, + before_id = NULL, + after_id = NULL, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) +} +\arguments{ +\item{limit}{Maximum number of batches to return (1-1000, default 20)} + +\item{before_id}{Cursor for backward pagination (batch ID)} + +\item{after_id}{Cursor for forward pagination (batch ID)} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic Batches API endpoint URL} +} +\value{ +A list containing batch metadata and pagination information +} +\description{ +Retrieves a paginated list of message batches. +} +\examples{ +\dontrun{ + batches <- ant_batch_list(limit = 10) +} +} diff --git a/man/ant_batch_results.Rd b/man/ant_batch_results.Rd new file mode 100644 index 0000000..8f7bf32 --- /dev/null +++ b/man/ant_batch_results.Rd @@ -0,0 +1,38 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{ant_batch_results} +\alias{ant_batch_results} +\title{Retrieve Results from a Completed Anthropic Message Batch} +\usage{ +ant_batch_results( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) +} +\arguments{ +\item{batch_id}{Character string of the batch ID (returned by \code{ant_batch_create()})} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic Batches API endpoint URL} +} +\value{ +A tibble with columns: \code{custom_id}, \code{content}, \code{.error}, +\code{.error_msg}, \code{stop_reason}, \code{input_tokens}, \code{output_tokens} +} +\description{ +Downloads and parses results from a completed message batch. The batch +must have \code{processing_status} of \code{"ended"} before results can be retrieved. +} +\details{ +Results are returned as a tibble with one row per request. The function +handles all four Anthropic result types: succeeded, errored, canceled, +and expired. +} +\examples{ +\dontrun{ + results <- ant_batch_results("msgbatch_abc123") + results$content # model responses +} +} diff --git a/man/ant_batch_status.Rd b/man/ant_batch_status.Rd new file mode 100644 index 0000000..5546c67 --- /dev/null +++ b/man/ant_batch_status.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{ant_batch_status} +\alias{ant_batch_status} +\title{Check the Status of an Anthropic Message Batch} +\usage{ +ant_batch_status( + batch_id, + key_name = "ANTHROPIC_API_KEY", + endpoint_url = .ANT_BATCHES_ENDPOINT +) +} +\arguments{ +\item{batch_id}{Character string of the batch ID (returned by \code{ant_batch_create()})} + +\item{key_name}{Environment variable name for API key} + +\item{endpoint_url}{Anthropic Batches API endpoint URL} +} +\value{ +A list of batch metadata including \code{processing_status}, \code{request_counts}, etc. +} +\description{ +Retrieves the current status and metadata for a message batch. +} +\examples{ +\dontrun{ + status <- ant_batch_status("msgbatch_abc123") + status$processing_status # e.g. "in_progress", "ended" +} +} diff --git a/man/ant_build_messages_request.Rd b/man/ant_build_messages_request.Rd index a7d1b54..2a0bc7a 100644 --- a/man/ant_build_messages_request.Rd +++ b/man/ant_build_messages_request.Rd @@ -31,7 +31,9 @@ ant_build_messages_request( \item{schema}{Optional JSON schema for structured output (json_schema object or list)} -\item{system_prompt}{Optional system prompt} +\item{system_prompt}{Optional system prompt. When provided, prompt caching +is automatically enabled via \code{cache_control}, reducing costs for repeated +requests sharing the same system prompt.} \item{key_name}{Environment variable name for API key} @@ -52,17 +54,16 @@ for structured outputs. When using structured outputs you must select the correc \details{ This function creates the HTTP request but does not execute it. For structured outputs, you must use a supported model (Claude Sonnet 4.5 -or Opus 4.1) and the request will automatically include the required -beta header. +or Opus 4.1). The \code{schema} parameter accepts either: \itemize{ \item A \code{json_schema} S7 object created with \code{create_json_schema()} -\item A raw list in Anthropic's \code{output_format} structure +\item A raw list in Anthropic's \code{output_config} structure } -Unlike OpenAI, Anthropic uses \code{output_format} (not \code{response_format}) -and the schema structure differs slightly. +Anthropic uses \code{output_config} with a nested \code{format} field for structured +outputs. This is now GA and requires no beta header. } \examples{ \dontrun{ diff --git a/man/ant_complete_chunks.Rd b/man/ant_complete_chunks.Rd index 059de57..7d6732d 100644 --- a/man/ant_complete_chunks.Rd +++ b/man/ant_complete_chunks.Rd @@ -31,7 +31,9 @@ ant_complete_chunks( \item{model}{Anthropic model to use} -\item{system_prompt}{Optional system prompt (applied to all requests)} +\item{system_prompt}{Optional system prompt (applied to all requests). +Prompt caching is enabled automatically, reducing costs when the same +system prompt is shared across many requests.} \item{output_dir}{Directory for parquet chunks ("auto" generates timestamped dir)} diff --git a/man/dot-ant_format_schema.Rd b/man/dot-ant_format_schema.Rd index 23e0ace..2bf53ec 100644 --- a/man/dot-ant_format_schema.Rd +++ b/man/dot-ant_format_schema.Rd @@ -2,11 +2,11 @@ % Please edit documentation in R/anthropic_messages.R \name{.ant_format_schema} \alias{.ant_format_schema} -\title{Convert json_schema S7 object to Anthropic output_format structure} +\title{Convert json_schema S7 object to Anthropic output_config structure} \usage{ .ant_format_schema(schema) } \description{ -Convert json_schema S7 object to Anthropic output_format structure +Convert json_schema S7 object to Anthropic output_config structure } \keyword{internal} diff --git a/man/dot-extract_content_from_batch_message.Rd b/man/dot-extract_content_from_batch_message.Rd new file mode 100644 index 0000000..83b410f --- /dev/null +++ b/man/dot-extract_content_from_batch_message.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/anthropic_batch.R +\name{.extract_content_from_batch_message} +\alias{.extract_content_from_batch_message} +\title{Extract text content from a batch result message object} +\usage{ +.extract_content_from_batch_message(msg) +} +\description{ +Extract text content from a batch result message object +} +\keyword{internal} diff --git a/tests/testthat/helper-webfake.R b/tests/testthat/helper-webfake.R index 6b3a564..c53b0d8 100644 --- a/tests/testthat/helper-webfake.R +++ b/tests/testthat/helper-webfake.R @@ -291,6 +291,108 @@ withr::local_envvar(HF_TEST_API_KEY = "fake-key") send(jsonlite::toJSON(response_data, auto_unbox = TRUE)) }) +# Anthropic Batches API mock endpoints +.app$post("/test_ant_batch_create", function(req, res) { + response_data <- list( + id = "msgbatch_test123", + type = "message_batch", + processing_status = "in_progress", + request_counts = list( + processing = 2L, + succeeded = 0L, + errored = 0L, + canceled = 0L, + expired = 0L + ), + created_at = "2025-01-01T00:00:00Z", + ended_at = NULL, + results_url = NULL + ) + + res$ + set_status(200L)$ + set_header("Content-Type", "application/json")$ + send(jsonlite::toJSON(response_data, auto_unbox = TRUE, null = "null")) +}) + +.app$get("/test_ant_batch_status_ended/:batch_id", function(req, res) { + response_data <- list( + id = req$params$batch_id, + type = "message_batch", + processing_status = "ended", + request_counts = list( + processing = 0L, + succeeded = 2L, + errored = 0L, + canceled = 0L, + expired = 0L + ), + created_at = "2025-01-01T00:00:00Z", + ended_at = "2025-01-01T01:00:00Z", + results_url = NULL + ) + + res$ + set_status(200L)$ + set_header("Content-Type", "application/json")$ + send(jsonlite::toJSON(response_data, auto_unbox = TRUE, null = "null")) +}) + +# ant_batch_status uses endpoint_url/batch_id so we need a wildcard route +.app$get("/test_ant_batch_status_in_progress/:batch_id", function(req, res) { + response_data <- list( + id = req$params$batch_id, + type = "message_batch", + processing_status = "in_progress", + request_counts = list( + processing = 1L, + succeeded = 1L, + errored = 0L, + canceled = 0L, + expired = 0L + ), + created_at = "2025-01-01T00:00:00Z", + ended_at = NULL, + results_url = NULL + ) + + res$ + set_status(200L)$ + set_header("Content-Type", "application/json")$ + send(jsonlite::toJSON(response_data, auto_unbox = TRUE, null = "null")) +}) + +.app$get("/test_ant_batch_results", function(req, res) { + # JSONL with one succeeded and one errored result + line1 <- jsonlite::toJSON(list( + custom_id = "t1", + result = list( + type = "succeeded", + message = list( + role = "assistant", + content = list(list(type = "text", text = "Hello response")), + stop_reason = "end_turn", + usage = list(input_tokens = 10L, output_tokens = 5L) + ) + ) + ), auto_unbox = TRUE) + + line2 <- jsonlite::toJSON(list( + custom_id = "t2", + result = list( + type = "errored", + error = list(type = "invalid_request", message = "Invalid request") + ) + ), auto_unbox = TRUE) + + jsonl_content <- paste0(line1, "\n", line2, "\n") + + res$ + set_status(200L)$ + set_header("Content-Type", "application/jsonl")$ + send(jsonl_content) +}) + server <- webfakes::local_app_process(.app) diff --git a/tests/testthat/test-anthropic_batch.R b/tests/testthat/test-anthropic_batch.R new file mode 100644 index 0000000..2a2f5f5 --- /dev/null +++ b/tests/testthat/test-anthropic_batch.R @@ -0,0 +1,223 @@ +# input validation tests ---- + +test_that("ant_batch_create validates inputs", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + expect_error( + ant_batch_create(texts = character(0), custom_ids = character(0)), + "texts must be a character vector" + ) + + expect_error( + ant_batch_create(texts = c(1, 2), custom_ids = c("a", "b")), + "texts must be a character vector" + ) + + expect_error( + ant_batch_create(texts = c("a", "b"), custom_ids = c("id1")), + "texts and custom_ids must be the same length" + ) + + expect_error( + ant_batch_create(texts = c("a", "b"), custom_ids = c("id1", "id1")), + "custom_ids must be unique" + ) + + expect_error( + ant_batch_create( + texts = rep("a", 100001), + custom_ids = as.character(seq_len(100001)) + ), + "batch cannot exceed 100,000 requests" + ) +}) + +test_that("ant_batch_status validates batch_id", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + expect_error( + ant_batch_status(batch_id = ""), + "batch_id must be a non-empty character string" + ) + + expect_error( + ant_batch_status(batch_id = 123), + "batch_id must be a non-empty character string" + ) + + expect_error( + ant_batch_status(batch_id = c("a", "b")), + "batch_id must be a non-empty character string" + ) +}) + +test_that("ant_batch_list validates limit", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + expect_error( + ant_batch_list(limit = 0), + "limit must be between 1 and 1000" + ) + + expect_error( + ant_batch_list(limit = 1001), + "limit must be between 1 and 1000" + ) +}) + +test_that("ant_batch_cancel validates batch_id", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + expect_error( + ant_batch_cancel(batch_id = ""), + "batch_id must be a non-empty character string" + ) +}) + +# .extract_content_from_batch_message tests ---- + +test_that(".extract_content_from_batch_message extracts text blocks", { + msg <- list( + content = list( + list(type = "text", text = "Hello, world!") + ) + ) + + expect_equal(.extract_content_from_batch_message(msg), "Hello, world!") +}) + +test_that(".extract_content_from_batch_message handles empty content", { + msg <- list(content = list()) + expect_true(is.na(.extract_content_from_batch_message(msg))) +}) + +test_that(".extract_content_from_batch_message handles NULL content", { + msg <- list(content = NULL) + expect_true(is.na(.extract_content_from_batch_message(msg))) +}) + +test_that(".extract_content_from_batch_message handles multiple block types", { + msg <- list( + content = list( + list(type = "tool_use", id = "toolu_1", name = "get_weather"), + list(type = "text", text = "The answer is 42") + ) + ) + + expect_equal(.extract_content_from_batch_message(msg), "The answer is 42") +}) + +# mock server integration tests ---- + +test_that("ant_batch_create sends correct request and returns batch metadata", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + result <- expect_no_error( + ant_batch_create( + texts = c("Hello", "World"), + custom_ids = c("t1", "t2"), + system_prompt = "Reply in one word", + endpoint_url = server$url("/test_ant_batch_create") + ) + ) + + expect_equal(result$id, "msgbatch_test123") + expect_equal(result$processing_status, "in_progress") + expect_equal(result$request_counts$processing, 2L) +}) + +test_that("ant_batch_results parses JSONL correctly into expected tibble", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + # We need a mock endpoint that returns ended status with results_url pointing + # to our mock results endpoint. Use a custom endpoint for this. + # The status endpoint needs to return results_url. + + # Create a special status endpoint that returns a results_url + results_url <- server$url("/test_ant_batch_results") + + # We can't easily make the status endpoint return a dynamic results_url, + # so we test the JSONL parsing through the internal flow by mocking the + # status call. Instead, let's test the parsing directly. + + # Fetch the JSONL from the mock results endpoint directly + response <- httr2::request(results_url) |> + httr2::req_headers("x-api-key" = "test-key", "anthropic-version" = "2023-06-01") |> + httr2::req_perform() + + raw_text <- httr2::resp_body_string(response) + lines <- strsplit(raw_text, "\n")[[1]] + lines <- lines[nchar(lines) > 0] + + parsed <- purrr::imap(lines, \(line, idx) { + jsonlite::fromJSON(line, simplifyVector = FALSE) + }) + + results <- purrr::map(parsed, function(item) { + custom_id <- item$custom_id + result <- item$result + result_type <- result$type + + if (result_type == "succeeded") { + msg <- result$message + content <- .extract_content_from_batch_message(msg) + return(tibble::tibble( + custom_id = custom_id, + content = content, + .error = FALSE, + .error_msg = NA_character_, + stop_reason = msg$stop_reason %||% NA_character_, + input_tokens = msg$usage$input_tokens %||% NA_integer_, + output_tokens = msg$usage$output_tokens %||% NA_integer_ + )) + } + + error_msg <- if (result_type == "errored") { + result$error$message %||% "Unknown error" + } else { + result_type + } + + tibble::tibble( + custom_id = custom_id, + content = NA_character_, + .error = TRUE, + .error_msg = error_msg, + stop_reason = NA_character_, + input_tokens = NA_integer_, + output_tokens = NA_integer_ + ) + }) + + result_df <- purrr::list_rbind(results) + + expect_s3_class(result_df, "tbl_df") + expect_equal(nrow(result_df), 2) + expect_setequal(names(result_df), c("custom_id", "content", ".error", ".error_msg", "stop_reason", "input_tokens", "output_tokens")) + + # check the succeeded row + succeeded <- result_df[result_df$custom_id == "t1", ] + expect_equal(succeeded$content, "Hello response") + expect_false(succeeded$.error) + expect_equal(succeeded$stop_reason, "end_turn") + expect_equal(succeeded$input_tokens, 10L) + expect_equal(succeeded$output_tokens, 5L) + + # check the errored row + errored <- result_df[result_df$custom_id == "t2", ] + expect_true(is.na(errored$content)) + expect_true(errored$.error) + expect_equal(errored$.error_msg, "Invalid request") +}) + +test_that("ant_batch_results errors when batch not ended", { + withr::local_envvar(ANTHROPIC_API_KEY = "test-key") + + expect_error( + ant_batch_results( + batch_id = "msgbatch_test123", + endpoint_url = server$url("/test_ant_batch_status_in_progress") + ), + "Batch is not yet complete" + ) +}) diff --git a/tests/testthat/test-anthropic_messages.R b/tests/testthat/test-anthropic_messages.R index fbe70d9..cee3a57 100644 --- a/tests/testthat/test-anthropic_messages.R +++ b/tests/testthat/test-anthropic_messages.R @@ -38,6 +38,12 @@ test_that("ant_build_messages accepts a system_prompt and the request is formatt req_w_sys <- ant_build_messages_request(message, system_prompt = "Talk about all things Peter Pan only") expect_true(!is.null(req_w_sys$body$data$system)) + + # system prompt should be structured as content block with cache_control for prompt caching + sys_block <- req_w_sys$body$data$system[[1]] + expect_equal(sys_block$type, "text") + expect_equal(sys_block$text, "Talk about all things Peter Pan only") + expect_equal(sys_block$cache_control, list(type = "ephemeral")) }) test_that("ant_build_messages_request accepts schemas and formats properly with .ant_format_schema", { @@ -56,11 +62,10 @@ test_that("ant_build_messages_request accepts schemas and formats properly with model = "claude-sonnet-4-5") - schema_data <- req_schema$body$data$output_format - expect_equal(schema_data$type, "json_schema") + schema_data <- req_schema$body$data$output_config + expect_equal(schema_data$format$type, "json_schema") - expect_equal(names(schema_data$schema$properties), "sentiment") - expect_equal(req_schema$headers$`anthropic-beta`, "structured-outputs-2025-11-13") + expect_equal(names(schema_data$format$schema$properties), "sentiment") }) @@ -135,7 +140,7 @@ test_that("ant_complete_text takes a single text and returns the response", { }) -test_that("ant_complete_text handles a schema appropriately", { +test_that("ant_complete_text handles a schema and a system prompt appropriately", { test_url <- server$url("/test_ant_sentiment") diff --git a/vignettes/sync_async.Rmd b/vignettes/sync_async.Rmd index 7f4b2ac..c54b246 100644 --- a/vignettes/sync_async.Rmd +++ b/vignettes/sync_async.Rmd @@ -32,7 +32,7 @@ A solution to these problems is to use providers' 'Batch APIs' which offer async > **TIP**: It's worth noting that the results are often ready much faster, consider checking in 1-2 hours after triggering the batch. -# Quickstart +# OpenAI Batch API The OpenAI Batch API workflow follows three stages: **prepare**, **submit**, and **retrieve**. Below are complete examples for embeddings and completions. @@ -211,22 +211,254 @@ results_df |> > **Limits**: Each batch file can contain up to 50,000 requests or 200MB, whichever is reached first. For larger datasets, split into multiple batches. +# Anthropic Message Batches API + +Anthropic offers a [Message Batches API](https://docs.anthropic.com/en/docs/build-with-claude/batch-processing) with 50% cost savings, up to 100,000 requests per batch, and a 24-hour completion window. Unlike OpenAI's file-upload workflow, Anthropic's API accepts requests directly — no file upload step is needed. + +The workflow is three compositional steps: **create**, **check status**, and **retrieve results**. + +## Batch Messages + +```{r ant-batch-messages, eval = FALSE} +# 1. Create a batch +batch <- ant_batch_create( + texts = c( + "What is the capital of France?", + "Explain photosynthesis in one sentence.", + "What is 2 + 2?" + ), + custom_ids = c("q1", "q2", "q3"), + system_prompt = "You are a helpful assistant. Be concise.", + model = "claude-haiku-4-5", + max_tokens = 100 +) +batch$id +#> "msgbatch_abc123..." + +# 2. Check status (repeat until processing_status == "ended") +status <- ant_batch_status(batch$id) +status$processing_status +#> "in_progress" ... later ... "ended" + +# 3. Retrieve results +results <- ant_batch_results(batch$id) + +results +#> # A tibble +#> custom_id content .error .error_msg stop_reason input_tokens output_tokens +#> +#> 1 q1 The capital of France is Paris. FALSE NA end_turn 15 10 +#> 2 q2 Photosynthesis converts sunlight into energy FALSE NA end_turn 18 12 +#> 3 q3 2 + 2 equals 4. FALSE NA end_turn 12 8 +``` + +## Batch Messages with Structured Output + +Structured outputs work with the Anthropic Batches API too. Pass a `json_schema` object to the `schema` parameter: + +```{r ant-batch-schema, eval = FALSE} +# 1. Define a schema +sentiment_schema <- create_json_schema( + name = "sentiment_analysis", + schema_object( + sentiment = schema_enum( + c("positive", "negative", "neutral"), + description = "The sentiment of the text" + ), + confidence = schema_number( + description = "Confidence score between 0 and 1" + ) + ) +) + +# 2. Create batch with schema +batch <- ant_batch_create( + texts = c( + "This product is absolutely fantastic! Best purchase ever.", + "Terrible quality, broke after one day.", + "It's okay, nothing special but does the job." + ), + custom_ids = c("review_1", "review_2", "review_3"), + system_prompt = "Analyse the sentiment of the following text.", + schema = sentiment_schema, + model = "claude-sonnet-4-5" +) + +batch_id <- batch$id +# 3. Check status and retrieve when ended +batch_status <- ant_batch_status(batch_id)$processing_status +``` + +
+ In Progress Response + + ``` + { + "id": "XXX", + "type": "message_batch", + "processing_status": "in_progress", + "request_counts": { + "processing": 3, + "succeeded": 0, + "errored": 0, + "canceled": 0, + "expired": 0 + }, + "ended_at": null, + "created_at": "2026-00-00T00:00:00.000000+00:00", + "expires_at": "2026-00-00T00:00:00.000000+00:00", + "archived_at": null, + "cancel_initiated_at": null, + "results_url": null + } +``` +
+ + +
+ Complete Repsonse + ``` + { + "id": "xxx", + "type": "message_batch", + "processing_status": "ended", + "request_counts": { + "processing": 0, + "succeeded": 3, + "errored": 0, + "canceled": 0, + "expired": 0 + }, + "ended_at": "1970-01-01T00:00:00.000000+00:00", + "created_at": "1970-01-01T00:00:00.000000+00:00", + "expires_at": "1970-01-01T00:00:00.000000+00:00", + "archived_at": null, + "cancel_initiated_at": null, + "results_url": "https://api.anthropic.com/v1/messages/batches/xxx/results" + } +``` +
+ + +```{r, eval = FALSE} +if(batch_status == "ended") results <- ant_batch_results(batch_id) # shows 'in_progress' if not completed + +# Parse JSON content into columns +results |> + dplyr::mutate( + parsed = purrr::map(content, jsonlite::fromJSON) + ) |> + tidyr::unnest_wider(parsed) +``` + +``` +# A tibble: 3 × 9 + custom_id content .error .error_msg stop_reason input_tokens output_tokens sentiment confidence + +1 review_1 "{\"sentiment\": \"positive\",… FALSE NA end_turn 223 17 positive 0.95 +2 review_2 "{\"sentiment\":\"negative\",\… FALSE NA end_turn 222 14 negative 0.95 +3 review_3 "{\"sentiment\": \"neutral\", … FALSE NA end_turn 224 17 neutral 0.75 +> +``` + +## Prompt Caching + +When you provide a `system_prompt`, `ant_batch_create()` automatically enables [prompt caching](https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching) by adding `cache_control` to each request. This means Anthropic will cache the system prompt and reuse it across requests in the batch, reducing costs further. + +## Batch Management + +EndpointR provides functions for managing your batches: + +```{r ant-batch-management, eval = FALSE} +# List recent batches +ant_batch_list(limit = 10) + +# Cancel a batch that's no longer needed +ant_batch_cancel("msgbatch_abc123") +``` + +## Large-scale Batches (100k+ requests) + +The Anthropic Message Batches API supports up to 100,000 requests per batch. For larger workloads, split your data into chunks and submit multiple batches. Here's an example classifying 500,000 documents with structured outputs: + +```{r ant-batch-large-scale, eval = FALSE} +# 1. Define your schema +sentiment_schema <- create_json_schema( + name = "sentiment_analysis", + schema_object( + sentiment = schema_enum( + c("positive", "negative", "neutral"), + description = "The sentiment of the text" + ), + confidence = schema_number( + description = "Confidence score between 0 and 1" + ) + ) +) + +# 2. Your data (500k texts and unique IDs) +texts <- your_500k_texts +ids <- your_500k_ids + +# 3. Split into chunks of up to 100k +chunks <- split(seq_along(texts), ceiling(seq_along(texts) / 100000)) + +# 4. Submit a batch per chunk +batch_ids <- purrr::map_chr(chunks, function(idx) { + batch <- ant_batch_create( + texts = texts[idx], + custom_ids = ids[idx], + system_prompt = "Classify the sentiment of the following text.", + schema = sentiment_schema, + model = "claude-haiku-4-5" + ) + batch$id +}) +#> 5 batches submitted + +# 5. Check statuses (repeat until all are "ended") +purrr::walk(batch_ids, \(id) { + status <- ant_batch_status(id) + cli::cli_alert_info("{id}: {status$processing_status}") +}) + +# 6. Retrieve and combine all results +all_results <- purrr::map(batch_ids, ant_batch_results) |> + purrr::list_rbind() + +# 7. Parse structured JSON content into columns +all_results |> + dplyr::mutate(parsed = purrr::map(content, jsonlite::fromJSON)) |> + tidyr::unnest_wider(parsed) +#> # A tibble: 500,000 x 7 +#> custom_id sentiment confidence .error .error_msg stop_reason input_tokens +#> +#> 1 doc_1 positive 0.95 FALSE NA end_turn 22 +#> 2 doc_2 negative 0.88 FALSE NA end_turn 19 +#> ... +``` + +> **TIP**: Prompt caching kicks in automatically when you provide a `system_prompt`, so the repeated system prompt across all 500k requests is cached — saving both latency and cost on top of the 50% batch discount. + +> **Limits**: Each batch supports up to 100,000 requests, and results are guaranteed within 24 hours. No file upload or cleanup is required — the API handles storage internally. + # When to choose Synchronous vs Asynchronous -> For a more comprehensive treatment, and motivating examples [OpenAI's official documentation/guide](https://platform.openai.com/docs/guides/batch) is a good place to start. +> For more detail, see [OpenAI's Batch guide](https://platform.openai.com/docs/guides/batch) and [Anthropic's Batch Processing guide](https://docs.anthropic.com/en/docs/build-with-claude/batch-processing). | | Synchronous | Asynchronous (Batch) | |----|----|----| | Cost | Full price per token | \~50% Discount per token | | Latency | Real-time | Up to 24 hours | -| Use Case | Experimentation, Prompt testing, Schema development, User-facing applications, | Recurrent workflows (evals etc.), embedding large datasets, classifying large datasets | +| Use Case | Experimentation, Prompt testing, Schema development, User-facing applications | Recurrent workflows (evals etc.), embedding large datasets, classifying large datasets | | Data Size | Up to \~10,000 | \~10,000+ | +| Max Requests | Rate-limited | OpenAI: 50k per batch, Anthropic: 100k per batch | > **Recommendation**: Use the Synchronous API when you need immediate feedback e.g. prompt or schema development, and for small datasets where cost savings are irrelevant. Once everything is figured out, move to the Batch API to save on cost. -# Cleaning Up +# Cleaning Up (OpenAI) -Once the batch job has been completed, the associated files will live on the OpenAI API, inside the Files API. Your OpenAI account will be charged for storage, so it's best to download the results and save in your org's own cloud storage. +Once an OpenAI batch job has been completed, the associated files will live on the OpenAI API, inside the Files API. Your OpenAI account will be charged for storage, so it's best to download the results and save in your org's own cloud storage. Anthropic handles storage internally, so no cleanup is needed for Anthropic batches. ```{r, eval = FALSE} oai_file_delete(file_info$id) # delete the input file @@ -238,7 +470,7 @@ oai_file_delete(status$error_file_id) # delete the error file > **NOTE**: At the time of writing, OpenAI save information in both the Batch API and the Files API, you need to delete your input, output, error files from the *Files API*, you cannot delete from the Batch API -# Technical Details +# Technical Details (OpenAI) ## Batch Limits