Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion config/clients/java/config.overrides.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"gitRepoId": "java-sdk",
"artifactId": "openfga-sdk",
"groupId": "dev.openfga",
"packageVersion": "0.9.2",
"packageVersion": "0.9.3",
"basePackage": "dev.openfga.sdk",
"apiPackage": "dev.openfga.sdk.api",
"authPackage": "dev.openfga.sdk.api.auth",
Expand Down Expand Up @@ -37,6 +37,18 @@
"src/main/constants/FgaConstants.mustache": {
"destinationFilename": "src/main/java/dev/openfga/sdk/constants/FgaConstants.java",
"templateType": "SupportingFiles"
},
"libraries/native/StreamResult.mustache": {
"destinationFilename": "src/main/java/dev/openfga/sdk/api/model/StreamResult.java",
"templateType": "SupportingFiles"
},
"libraries/native/BaseStreamingApi.mustache": {
"destinationFilename": "src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java",
"templateType": "SupportingFiles"
},
"libraries/native/StreamedListObjectsApi.mustache": {
"destinationFilename": "src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java",
"templateType": "SupportingFiles"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
{{>licenseInfo}}
package {{apiPackage}};

import static {{basePackage}}.util.StringUtil.isNullOrWhitespace;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import {{clientPackage}}.ApiClient;
import {{configPackage}}.Configuration;
import {{errorsPackage}}.ApiException;
import {{errorsPackage}}.FgaInvalidParameterException;
import {{modelPackage}}.Status;
import {{modelPackage}}.StreamResult;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* Base class for handling streaming API responses.
* This class provides generic streaming functionality that can be reused across
* different streaming endpoints by handling the common streaming parsing and error handling logic.
*
* @param <T> The type of response objects in the stream
*/
public abstract class BaseStreamingApi<T> {
protected final Configuration configuration;
protected final ApiClient apiClient;
protected final ObjectMapper objectMapper;
protected final TypeReference<StreamResult<T>> streamResultTypeRef;

/**
* Constructor for BaseStreamingApi
*
* @param configuration The API configuration
* @param apiClient The API client for making HTTP requests
* @param streamResultTypeRef TypeReference for deserializing StreamResult<T>
*/
protected BaseStreamingApi(
Configuration configuration,
ApiClient apiClient,
TypeReference<StreamResult<T>> streamResultTypeRef) {
this.configuration = configuration;
this.apiClient = apiClient;
this.objectMapper = apiClient.getObjectMapper();
this.streamResultTypeRef = streamResultTypeRef;
}

/**
* Process a streaming response asynchronously.
* Each line in the response is parsed and delivered to the consumer callback.
*
* @param request The HTTP request to execute
* @param consumer Callback to handle each response object (invoked asynchronously)
* @param errorConsumer Optional callback to handle errors during streaming
* @return CompletableFuture<Void> that completes when streaming finishes
*/
protected CompletableFuture<Void> processStreamingResponse(
HttpRequest request, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {

// Use async HTTP client with streaming body handler
// ofLines() provides line-by-line streaming
return apiClient
.getHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenCompose(response -> {
// Check response status
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
ApiException apiException =
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
return CompletableFuture.failedFuture(apiException);
}

// Process the stream - this runs on HttpClient's executor thread
try (Stream<String> lines = response.body()) {
lines.forEach(line -> {
if (!isNullOrWhitespace(line)) {
processLine(line, consumer, errorConsumer);
}
});
return CompletableFuture.completedFuture((Void) null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.handle((result, throwable) -> {
if (throwable != null) {
// Unwrap CompletionException to get the original exception
Throwable actualException = throwable;
if (throwable instanceof java.util.concurrent.CompletionException
&& throwable.getCause() != null) {
actualException = throwable.getCause();
}

if (errorConsumer != null) {
errorConsumer.accept(actualException);
}
// Re-throw to keep the CompletableFuture in failed state
if (actualException instanceof RuntimeException) {
throw (RuntimeException) actualException;
}
throw new RuntimeException(actualException);
}
return result;
});
}

/**
* Process a single line from the stream
*
* @param line The JSON line to process
* @param consumer Callback to handle the parsed result
* @param errorConsumer Optional callback to handle errors
*/
private void processLine(String line, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
try {
// Parse the JSON line to extract the object
StreamResult<T> streamResult = objectMapper.readValue(line, streamResultTypeRef);

if (streamResult.getError() != null) {
// Handle error in stream
if (errorConsumer != null) {
Status error = streamResult.getError();
String errorMessage = error.getMessage() != null
? "Stream error: " + error.getMessage()
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
errorConsumer.accept(new ApiException(errorMessage));
}
} else if (streamResult.getResult() != null) {
// Deliver the response object to the consumer
T result = streamResult.getResult();
if (result != null) {
consumer.accept(result);
}
}
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
}
}

/**
* Build an HTTP request for the streaming endpoint
*
* @param method HTTP method (e.g., "POST")
* @param path The API path
* @param body The request body
* @param configuration The configuration to use
* @return HttpRequest ready to execute
* @throws ApiException if request building fails
* @throws FgaInvalidParameterException if parameters are invalid
*/
protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
throws ApiException, FgaInvalidParameterException {
try {
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);

// Apply request interceptors if any
var interceptor = apiClient.getRequestInterceptor();
if (interceptor != null) {
interceptor.accept(requestBuilder);
}

return requestBuilder.build();
} catch (Exception e) {
throw new ApiException(e);
}
}
}

111 changes: 111 additions & 0 deletions config/clients/java/template/libraries/native/StreamResult.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
{{>licenseInfo}}
package {{modelPackage}};

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.Objects;

/**
* Generic wrapper for streaming results that can contain either a result or an error.
* This class is used to deserialize streaming responses where each line may contain
* either a successful result or an error status.
*
* @param <T> The type of the result object
*/
@JsonPropertyOrder({
StreamResult.JSON_PROPERTY_RESULT,
StreamResult.JSON_PROPERTY_ERROR
})
public class StreamResult<T> {
public static final String JSON_PROPERTY_RESULT = "result";
private T result;

public static final String JSON_PROPERTY_ERROR = "error";
private Status error;

public StreamResult() {}

public StreamResult<T> result(T result) {
this.result = result;
return this;
}

/**
* Get result
* @return result
**/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_RESULT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public T getResult() {
return result;
}

@JsonProperty(JSON_PROPERTY_RESULT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setResult(T result) {
this.result = result;
}

public StreamResult<T> error(Status error) {
this.error = error;
return this;
}

/**
* Get error
* @return error
**/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_ERROR)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public Status getError() {
return error;
}

@JsonProperty(JSON_PROPERTY_ERROR)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setError(Status error) {
this.error = error;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StreamResult<?> streamResult = (StreamResult<?>) o;
return Objects.equals(this.result, streamResult.result)
&& Objects.equals(this.error, streamResult.error);
}

@Override
public int hashCode() {
return Objects.hash(result, error);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class StreamResult {\n");
sb.append(" result: ").append(toIndentedString(result)).append("\n");
sb.append(" error: ").append(toIndentedString(error)).append("\n");
sb.append("}");
return sb.toString();
}

/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}
Loading
Loading