Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import com.google.cloud.firestore.pipeline.expressions.Field;
import com.google.cloud.firestore.pipeline.expressions.FunctionExpression;
import com.google.cloud.firestore.pipeline.expressions.Ordering;
import com.google.cloud.firestore.pipeline.expressions.PipelineValueExpression;
import com.google.cloud.firestore.pipeline.expressions.Selectable;
import com.google.cloud.firestore.pipeline.stages.AddFields;
import com.google.cloud.firestore.pipeline.stages.Aggregate;
import com.google.cloud.firestore.pipeline.stages.AggregateOptions;
import com.google.cloud.firestore.pipeline.stages.Define;
import com.google.cloud.firestore.pipeline.stages.Distinct;
import com.google.cloud.firestore.pipeline.stages.FindNearest;
import com.google.cloud.firestore.pipeline.stages.FindNearestOptions;
Expand All @@ -55,6 +57,7 @@
import com.google.cloud.firestore.pipeline.stages.Sort;
import com.google.cloud.firestore.pipeline.stages.Stage;
import com.google.cloud.firestore.pipeline.stages.StageUtils;
import com.google.cloud.firestore.pipeline.stages.Subcollection;
import com.google.cloud.firestore.pipeline.stages.Union;
import com.google.cloud.firestore.pipeline.stages.Unnest;
import com.google.cloud.firestore.pipeline.stages.UnnestOptions;
Expand Down Expand Up @@ -261,6 +264,243 @@ public Pipeline addFields(Selectable field, Selectable... additionalFields) {
.toArray(new Selectable[0]))));
}

/**
* Initializes a pipeline scoped to a subcollection.
*
* <p>This method allows you to start a new pipeline that operates on a subcollection of the
* current document. It is intended to be used as a subquery.
*
* <p><b>Note:</b> A pipeline created with `subcollection` cannot be executed directly using
* {@link #execute()}. It must be used within a parent pipeline.
*
* <p>Example:
*
* <pre>{@code
* firestore.pipeline().collection("books")
* .addFields(
* Pipeline.subcollection("reviews")
* .aggregate(AggregateFunction.average("rating").as("avg_rating"))
* .toScalarExpression().as("average_rating"));
* }</pre>
*
* @param path The path of the subcollection.
* @return A new {@code Pipeline} instance scoped to the subcollection.
*/
@BetaApi
public static Pipeline subcollection(String path) {
return new Pipeline(null, new Subcollection(path));
}

/**
* Defines one or more variables in the pipeline's scope. `define` is used to bind a value to a
* variable for internal reuse within the pipeline body (accessed via the {@link
* Expression#variable(String)} function).
*
* <p>This stage is useful for declaring reusable values or intermediate calculations that can be
* referenced multiple times in later parts of the pipeline, improving readability and
* maintainability.
*
* <p>Each variable is defined using an {@link AliasedExpression}, which pairs an expression with
* a name (alias).
*
* <p>Example:
*
* <pre>{@code
* firestore.pipeline().collection("products")
* .define(
* multiply(field("price"), 0.9).as("discountedPrice"),
* add(field("stock"), 10).as("newStock"))
* .where(lessThan(variable("discountedPrice"), 100))
* .select(field("name"), variable("newStock"));
* }</pre>
*
* @param expression The expression to define using {@link AliasedExpression}.
* @param additionalExpressions Additional expressions to define using {@link AliasedExpression}.
* @return A new Pipeline object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline define(AliasedExpression expression, AliasedExpression... additionalExpressions) {
return append(
new Define(
PipelineUtils.selectablesToMap(
ImmutableList.<AliasedExpression>builder()
.add(expression)
.add(additionalExpressions)
.build()
.toArray(new AliasedExpression[0]))));
}

/**
* Converts the pipeline into an array expression.
*
* <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field
* automatically unwrap that value to the top level, ignoring the inner alias. If the subquery
* returns multiple fields, they are preserved as a map.
*
* <p><b>Example 1: Single field unwrapping</b>
*
* <pre>{@code
* // Get a list of all reviewer names for each book
* db.pipeline().collection("books")
* .define(field("id").as("book_id"))
* .addFields(
* db.pipeline().collection("reviews")
* .where(field("book_id").equal(variable("book_id")))
* .select(field("reviewer").as("name"))
* .toArrayExpression()
* .as("reviewers"))
* }</pre>
*
* <p><i>The result set is unwrapped from {@code "reviewers": [{ "name": "Alice" }, { "name":
* "Bob" }]} to {@code "reviewers": ["Alice", "Bob"]}.</i>
*
* <pre>{@code
* // Output Document:
* [
* {
* "id": "1",
* "title": "1984",
* "reviewers": ["Alice", "Bob"]
* }
* ]
* }</pre>
*
* <p><b>Example 2: Multiple fields (Map)</b>
*
* <pre>{@code
* // Get a list of reviews (reviewer and rating) for each book
* db.pipeline().collection("books")
* .define(field("id").as("book_id"))
* .addFields(
* db.pipeline().collection("reviews")
* .where(field("book_id").equal(variable("book_id")))
* .select(field("reviewer"), field("rating"))
* .toArrayExpression()
* .as("reviews"))
* }</pre>
*
* <p><i>When the subquery produces multiple fields, they are kept as objects in the array:</i>
*
* <pre>{@code
* // Output Document:
* [
* {
* "id": "1",
* "title": "1984",
* "reviews": [
* { "reviewer": "Alice", "rating": 5 },
* { "reviewer": "Bob", "rating": 4 }
* ]
* }
* ]
* }</pre>
*
* @return A new {@link Expression} representing the pipeline as an array.
*/
@BetaApi
public Expression toArrayExpression() {
return new FunctionExpression("array", ImmutableList.of(new PipelineValueExpression(this)));
}

/**
* Converts this Pipeline into an expression that evaluates to a single scalar result. Used for
* 1:1 lookups or Aggregations when the subquery is expected to return a single value or object.
*
* <p><b>Runtime Validation:</b> The runtime will validate that the result set contains exactly
* one item. It throws a runtime error if the result has more than one item, and evaluates to
* {@code null} if the pipeline has zero results.
*
* <p><b>Result Unwrapping:</b> For simpler access, subqueries producing a single field
* automatically unwrap that value to the top level, ignoring the inner alias. If the subquery
* returns multiple fields, they are preserved as a map.
*
* <p><b>Example 1: Single field unwrapping</b>
*
* <pre>{@code
* // Calculate average rating for each restaurant using a subquery
* db.pipeline().collection("restaurants")
* .define(field("id").as("rid"))
* .addFields(
* db.pipeline().collection("reviews")
* .where(field("restaurant_id").equal(variable("rid")))
* // Inner aggregation returns a single document
* .aggregate(AggregateFunction.average("rating").as("value"))
* // Convert Pipeline -> Scalar Expression (validates result is 1 item)
* .toScalarExpression()
* .as("average_rating"))
* }</pre>
*
* <p><i>The result set is unwrapped twice: from {@code "average_rating": [{ "value": 4.5 }]} to
* {@code "average_rating": { "value": 4.5 }}, and finally to {@code "average_rating": 4.5}.</i>
*
* <pre>{@code
* // Output Document:
* [
* {
* "id": "123",
* "name": "The Burger Joint",
* "cuisine": "American",
* "average_rating": 4.5
* },
* {
* "id": "456",
* "name": "Sushi World",
* "cuisine": "Japanese",
* "average_rating": 4.8
* }
* ]
* }</pre>
*
* <p><b>Example 2: Multiple fields (Map)</b>
*
* <pre>{@code
* // For each restaurant, calculate review statistics (average rating AND total
* // count)
* db.pipeline().collection("restaurants")
* .define(field("id").as("rid"))
* .addFields(
* db.pipeline().collection("reviews")
* .where(field("restaurant_id").equal(variable("rid")))
* .aggregate(
* AggregateFunction.average("rating").as("avg_score"),
* AggregateFunction.countAll().as("review_count"))
* .toScalarExpression()
* .as("stats"))
* }</pre>
*
* <p><i>When the subquery produces multiple fields, they are wrapped in a map:</i>
*
* <pre>{@code
* // Output Document:
* [
* {
* "id": "123",
* "name": "The Burger Joint",
* "cuisine": "American",
* "stats": {
* "avg_score": 4.0,
* "review_count": 3
* }
* },
* {
* "id": "456",
* "name": "Sushi World",
* "cuisine": "Japanese",
* "stats": {
* "avg_score": 4.8,
* "review_count": 120
* }
* }
* ]
* }</pre>
*
* @return A new {@link Expression} representing the pipeline as a scalar.
*/
@BetaApi
public Expression toScalarExpression() {
return new FunctionExpression("scalar", ImmutableList.of(new PipelineValueExpression(this)));
}

/**
* Remove fields from outputs of previous stages.
*
Expand Down Expand Up @@ -1113,6 +1353,9 @@ MetricsContext createMetricsContext(String methodName) {
*/
@BetaApi
public void execute(ApiStreamObserver<PipelineResult> observer) {
if (this.rpcContext == null) {
throw new IllegalStateException("Cannot execute a relative subcollection pipeline directly");
}
MetricsContext metricsContext =
createMetricsContext(TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE_EXECUTE);

Expand Down Expand Up @@ -1143,6 +1386,10 @@ ApiFuture<Snapshot> execute(
@Nonnull PipelineExecuteOptions options,
@Nullable final ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime) {
if (this.rpcContext == null) {
throw new IllegalStateException("Cannot execute a relative subcollection pipeline directly");
}

TraceUtil.Span span =
rpcContext
.getFirestore()
Expand Down
Loading
Loading