From e75e8e910c76b5200fe160f9333c711861a4f331 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Wed, 26 Nov 2025 17:58:01 +0530 Subject: [PATCH 1/3] feat(java-sdk): use common streaming utils --- config/clients/java/config.overrides.json | 12 ++ .../native/BaseStreamingApi.mustache | 174 ++++++++++++++++++ .../libraries/native/StreamResult.mustache | 111 +++++++++++ .../native/StreamedListObjectsApi.mustache | 147 +++++++++++++++ 4 files changed, 444 insertions(+) create mode 100644 config/clients/java/template/libraries/native/BaseStreamingApi.mustache create mode 100644 config/clients/java/template/libraries/native/StreamResult.mustache create mode 100644 config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache diff --git a/config/clients/java/config.overrides.json b/config/clients/java/config.overrides.json index cc987cde..edcd0458 100644 --- a/config/clients/java/config.overrides.json +++ b/config/clients/java/config.overrides.json @@ -37,6 +37,18 @@ "src/main/constants/FgaConstants.mustache": { "destinationFilename": "src/main/java/dev/openfga/sdk/constants/FgaConstants.java", "templateType": "SupportingFiles" + }, + "libraries/native/StreamResult.mustache": { + "destinationFilename": "src/main/java/dev/openfga/sdk/api/model/StreamResult.java", + "templateType": "SupportingFiles" + }, + "libraries/native/BaseStreamingApi.mustache": { + "destinationFilename": "src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java", + "templateType": "SupportingFiles" + }, + "libraries/native/StreamedListObjectsApi.mustache": { + "destinationFilename": "src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java", + "templateType": "SupportingFiles" } } } diff --git a/config/clients/java/template/libraries/native/BaseStreamingApi.mustache b/config/clients/java/template/libraries/native/BaseStreamingApi.mustache new file mode 100644 index 00000000..87e16ad1 --- /dev/null +++ b/config/clients/java/template/libraries/native/BaseStreamingApi.mustache @@ -0,0 +1,174 @@ +{{>licenseInfo}} +package {{apiPackage}}; + +import static {{basePackage}}.util.StringUtil.isNullOrWhitespace; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import {{clientPackage}}.ApiClient; +import {{configPackage}}.Configuration; +import {{errorsPackage}}.ApiException; +import {{errorsPackage}}.FgaInvalidParameterException; +import {{modelPackage}}.Status; +import {{modelPackage}}.StreamResult; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * Base class for handling streaming API responses. + * This class provides generic streaming functionality that can be reused across + * different streaming endpoints by handling the common streaming parsing and error handling logic. + * + * @param The type of response objects in the stream + */ +public abstract class BaseStreamingApi { + protected final Configuration configuration; + protected final ApiClient apiClient; + protected final ObjectMapper objectMapper; + protected final TypeReference> streamResultTypeRef; + + /** + * Constructor for BaseStreamingApi + * + * @param configuration The API configuration + * @param apiClient The API client for making HTTP requests + * @param streamResultTypeRef TypeReference for deserializing StreamResult + */ + protected BaseStreamingApi( + Configuration configuration, + ApiClient apiClient, + TypeReference> streamResultTypeRef) { + this.configuration = configuration; + this.apiClient = apiClient; + this.objectMapper = apiClient.getObjectMapper(); + this.streamResultTypeRef = streamResultTypeRef; + } + + /** + * Process a streaming response asynchronously. + * Each line in the response is parsed and delivered to the consumer callback. + * + * @param request The HTTP request to execute + * @param consumer Callback to handle each response object (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes + */ + protected CompletableFuture processStreamingResponse( + HttpRequest request, Consumer consumer, Consumer errorConsumer) { + + // Use async HTTP client with streaming body handler + // ofLines() provides line-by-line streaming + return apiClient + .getHttpClient() + .sendAsync(request, HttpResponse.BodyHandlers.ofLines()) + .thenCompose(response -> { + // Check response status + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + ApiException apiException = + new ApiException(statusCode, "API error: " + statusCode, response.headers(), null); + return CompletableFuture.failedFuture(apiException); + } + + // Process the stream - this runs on HttpClient's executor thread + try (Stream lines = response.body()) { + lines.forEach(line -> { + if (!isNullOrWhitespace(line)) { + processLine(line, consumer, errorConsumer); + } + }); + return CompletableFuture.completedFuture((Void) null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + }) + .handle((result, throwable) -> { + if (throwable != null) { + // Unwrap CompletionException to get the original exception + Throwable actualException = throwable; + if (throwable instanceof java.util.concurrent.CompletionException + && throwable.getCause() != null) { + actualException = throwable.getCause(); + } + + if (errorConsumer != null) { + errorConsumer.accept(actualException); + } + // Re-throw to keep the CompletableFuture in failed state + if (actualException instanceof RuntimeException) { + throw (RuntimeException) actualException; + } + throw new RuntimeException(actualException); + } + return result; + }); + } + + /** + * Process a single line from the stream + * + * @param line The JSON line to process + * @param consumer Callback to handle the parsed result + * @param errorConsumer Optional callback to handle errors + */ + private void processLine(String line, Consumer consumer, Consumer errorConsumer) { + try { + // Parse the JSON line to extract the object + StreamResult streamResult = objectMapper.readValue(line, streamResultTypeRef); + + if (streamResult.getError() != null) { + // Handle error in stream + if (errorConsumer != null) { + Status error = streamResult.getError(); + String errorMessage = error.getMessage() != null + ? "Stream error: " + error.getMessage() + : "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown"); + errorConsumer.accept(new ApiException(errorMessage)); + } + } else if (streamResult.getResult() != null) { + // Deliver the response object to the consumer + T result = streamResult.getResult(); + if (result != null) { + consumer.accept(result); + } + } + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + } + } + + /** + * Build an HTTP request for the streaming endpoint + * + * @param method HTTP method (e.g., "POST") + * @param path The API path + * @param body The request body + * @param configuration The configuration to use + * @return HttpRequest ready to execute + * @throws ApiException if request building fails + * @throws FgaInvalidParameterException if parameters are invalid + */ + protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration) + throws ApiException, FgaInvalidParameterException { + try { + byte[] bodyBytes = objectMapper.writeValueAsBytes(body); + HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration); + + // Apply request interceptors if any + var interceptor = apiClient.getRequestInterceptor(); + if (interceptor != null) { + interceptor.accept(requestBuilder); + } + + return requestBuilder.build(); + } catch (Exception e) { + throw new ApiException(e); + } + } +} + diff --git a/config/clients/java/template/libraries/native/StreamResult.mustache b/config/clients/java/template/libraries/native/StreamResult.mustache new file mode 100644 index 00000000..f410d305 --- /dev/null +++ b/config/clients/java/template/libraries/native/StreamResult.mustache @@ -0,0 +1,111 @@ +{{>licenseInfo}} +package {{modelPackage}}; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; + +/** + * Generic wrapper for streaming results that can contain either a result or an error. + * This class is used to deserialize streaming responses where each line may contain + * either a successful result or an error status. + * + * @param The type of the result object + */ +@JsonPropertyOrder({ + StreamResult.JSON_PROPERTY_RESULT, + StreamResult.JSON_PROPERTY_ERROR +}) +public class StreamResult { + public static final String JSON_PROPERTY_RESULT = "result"; + private T result; + + public static final String JSON_PROPERTY_ERROR = "error"; + private Status error; + + public StreamResult() {} + + public StreamResult result(T result) { + this.result = result; + return this; + } + + /** + * Get result + * @return result + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public T getResult() { + return result; + } + + @JsonProperty(JSON_PROPERTY_RESULT) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setResult(T result) { + this.result = result; + } + + public StreamResult error(Status error) { + this.error = error; + return this; + } + + /** + * Get error + * @return error + **/ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public Status getError() { + return error; + } + + @JsonProperty(JSON_PROPERTY_ERROR) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setError(Status error) { + this.error = error; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StreamResult streamResult = (StreamResult) o; + return Objects.equals(this.result, streamResult.result) + && Objects.equals(this.error, streamResult.error); + } + + @Override + public int hashCode() { + return Objects.hash(result, error); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class StreamResult {\n"); + sb.append(" result: ").append(toIndentedString(result)).append("\n"); + sb.append(" error: ").append(toIndentedString(error)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } +} diff --git a/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache b/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache new file mode 100644 index 00000000..62c9a3f4 --- /dev/null +++ b/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache @@ -0,0 +1,147 @@ +{{>licenseInfo}} +package {{apiPackage}}; + +import static {{basePackage}}.util.Validation.assertParamExists; + +import com.fasterxml.jackson.core.type.TypeReference; +import {{clientPackage}}.ApiClient; +import {{configPackage}}.Configuration; +import {{configPackage}}.ConfigurationOverride; +import {{modelPackage}}.ListObjectsRequest; +import {{modelPackage}}.Status; +import {{modelPackage}}.StreamResult; +import {{modelPackage}}.StreamedListObjectsResponse; +import {{errorsPackage}}.ApiException; +import {{errorsPackage}}.FgaInvalidParameterException; +import {{basePackage}}.util.StringUtil; +import java.net.http.HttpRequest; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * API layer for handling streaming responses from the streamedListObjects endpoint. + * This class extends BaseStreamingApi to provide true asynchronous streaming with consumer callbacks + * using CompletableFuture and Java 11's HttpClient async streaming capabilities. + */ +public class StreamedListObjectsApi extends BaseStreamingApi { + + public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) { + super(configuration, apiClient, new TypeReference>() {}); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, ListObjectsRequest body, Consumer consumer) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, null, this.configuration.override(configurationOverride)); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects(storeId, body, consumer, errorConsumer, this.configuration); + } + + /** + * Stream all objects of the given type that the user has a relation with. + * Each streamed response is delivered to the consumer callback asynchronously as it arrives. + * Returns a CompletableFuture that completes when streaming is finished. + * + * @param storeId The store ID + * @param body The list objects request + * @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously) + * @param errorConsumer Optional callback to handle errors during streaming + * @param configurationOverride Configuration overrides (e.g., additional headers) + * @return CompletableFuture that completes when streaming finishes or exceptionally on error + * @throws ApiException if the API call fails immediately + * @throws FgaInvalidParameterException if required parameters are missing + */ + public CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + ConfigurationOverride configurationOverride) + throws ApiException, FgaInvalidParameterException { + return streamedListObjects( + storeId, body, consumer, errorConsumer, this.configuration.override(configurationOverride)); + } + + /** + * Internal implementation that accepts a final Configuration to use for the request. + */ + private CompletableFuture streamedListObjects( + String storeId, + ListObjectsRequest body, + Consumer consumer, + Consumer errorConsumer, + Configuration configuration) + throws ApiException, FgaInvalidParameterException { + + assertParamExists(storeId, "storeId", "streamedListObjects"); + assertParamExists(body, "body", "streamedListObjects"); + + String path = "/stores/{store_id}/streamed-list-objects" + .replace("{store_id}", StringUtil.urlEncode(storeId.toString())); + + try { + HttpRequest request = buildHttpRequest("POST", path, body, configuration); + return processStreamingResponse(request, consumer, errorConsumer); + } catch (Exception e) { + if (errorConsumer != null) { + errorConsumer.accept(e); + } + return CompletableFuture.failedFuture(e); + } + } +} + From 67cfbcc953e575e8ea052ecb3e1a0b66e69bf817 Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Wed, 26 Nov 2025 18:23:54 +0530 Subject: [PATCH 2/3] feat: use latest pkg ver --- config/clients/java/config.overrides.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/clients/java/config.overrides.json b/config/clients/java/config.overrides.json index edcd0458..e9c54c2c 100644 --- a/config/clients/java/config.overrides.json +++ b/config/clients/java/config.overrides.json @@ -3,7 +3,7 @@ "gitRepoId": "java-sdk", "artifactId": "openfga-sdk", "groupId": "dev.openfga", - "packageVersion": "0.9.2", + "packageVersion": "0.9.3", "basePackage": "dev.openfga.sdk", "apiPackage": "dev.openfga.sdk.api", "authPackage": "dev.openfga.sdk.api.auth", From 8dd46b70d0001a926cce645722ab71044bdfd00a Mon Sep 17 00:00:00 2001 From: SoulPancake Date: Wed, 26 Nov 2025 19:01:10 +0530 Subject: [PATCH 3/3] feat: address copilot comments --- .../template/libraries/native/StreamedListObjectsApi.mustache | 1 - 1 file changed, 1 deletion(-) diff --git a/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache b/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache index 62c9a3f4..f7f7621f 100644 --- a/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache +++ b/config/clients/java/template/libraries/native/StreamedListObjectsApi.mustache @@ -8,7 +8,6 @@ import {{clientPackage}}.ApiClient; import {{configPackage}}.Configuration; import {{configPackage}}.ConfigurationOverride; import {{modelPackage}}.ListObjectsRequest; -import {{modelPackage}}.Status; import {{modelPackage}}.StreamResult; import {{modelPackage}}.StreamedListObjectsResponse; import {{errorsPackage}}.ApiException;