From b56136a49360a05a22b335d6edc17b9de2eac58f Mon Sep 17 00:00:00 2001 From: wu-hui Date: Mon, 16 Jun 2025 08:56:11 +0800 Subject: [PATCH] [realppl 10] Add server timestamp support --- .../FirebaseFirestore/FIRPipelineBridge.h | 3 + .../SwiftAPI/Pipeline/PipelineResult.swift | 27 +- .../SwiftAPI/Pipeline/RealtimePipeline.swift | 61 ++--- .../Pipeline/RealtimePipelineSnapshot.swift | 9 +- .../Integration/RealtimePipelineTests.swift | 255 +++++++++++++++++- Firestore/core/src/api/realtime_pipeline.cc | 6 +- Firestore/core/src/api/realtime_pipeline.h | 14 +- Firestore/core/src/api/stages.h | 11 +- Firestore/core/src/core/expressions_eval.cc | 34 ++- Firestore/core/src/core/query_listener.cc | 3 + Firestore/core/src/core/query_listener.h | 6 +- .../core/test/unit/local/query_engine_test.cc | 134 +++++++++ .../core/test/unit/local/query_engine_test.h | 4 + .../test/unit/testutil/expression_test_util.h | 2 +- 14 files changed, 516 insertions(+), 53 deletions(-) diff --git a/Firestore/Source/Public/FirebaseFirestore/FIRPipelineBridge.h b/Firestore/Source/Public/FirebaseFirestore/FIRPipelineBridge.h index dffa81391d2..209c1666c93 100644 --- a/Firestore/Source/Public/FirebaseFirestore/FIRPipelineBridge.h +++ b/Firestore/Source/Public/FirebaseFirestore/FIRPipelineBridge.h @@ -228,6 +228,9 @@ NS_SWIFT_NAME(__PipelineResultBridge) - (nullable id)get:(id)field; +- (nullable id)get:(id)field + serverTimestampBehavior:(FIRServerTimestampBehavior)serverTimestampBehavior; + @end NS_SWIFT_SENDABLE diff --git a/Firestore/Swift/Source/SwiftAPI/Pipeline/PipelineResult.swift b/Firestore/Swift/Source/SwiftAPI/Pipeline/PipelineResult.swift index 67e55663268..f0299b6ee9a 100644 --- a/Firestore/Swift/Source/SwiftAPI/Pipeline/PipelineResult.swift +++ b/Firestore/Swift/Source/SwiftAPI/Pipeline/PipelineResult.swift @@ -22,9 +22,11 @@ import Foundation @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) public struct PipelineResult: @unchecked Sendable { let bridge: __PipelineResultBridge + private let serverTimestamp: ServerTimestampBehavior init(_ bridge: __PipelineResultBridge) { self.bridge = bridge + serverTimestamp = .none ref = self.bridge.reference id = self.bridge.documentID data = self.bridge.data().mapValues { Helper.convertObjCToSwift($0) } @@ -32,6 +34,16 @@ public struct PipelineResult: @unchecked Sendable { updateTime = self.bridge.update_time } + init(_ bridge: __PipelineResultBridge, _ behavior: ServerTimestampBehavior) { + self.bridge = bridge + serverTimestamp = behavior + ref = self.bridge.reference + id = self.bridge.documentID + data = self.bridge.data(with: serverTimestamp) + createTime = self.bridge.create_time + updateTime = self.bridge.update_time + } + /// The reference of the document, if the query returns the `__name__` field. public let ref: DocumentReference? @@ -51,20 +63,29 @@ public struct PipelineResult: @unchecked Sendable { /// - Parameter fieldPath: The field path (e.g., "foo" or "foo.bar"). /// - Returns: The data at the specified field location or `nil` if no such field exists. public func get(_ fieldName: String) -> Sendable? { - return Helper.convertObjCToSwift(bridge.get(fieldName)) + return Helper.convertObjCToSwift(bridge.get( + fieldName, + serverTimestampBehavior: serverTimestamp + )) } /// Retrieves the field specified by `fieldPath`. /// - Parameter fieldPath: The field path (e.g., "foo" or "foo.bar"). /// - Returns: The data at the specified field location or `nil` if no such field exists. public func get(_ fieldPath: FieldPath) -> Sendable? { - return Helper.convertObjCToSwift(bridge.get(fieldPath)) + return Helper.convertObjCToSwift(bridge.get( + fieldPath, + serverTimestampBehavior: serverTimestamp + )) } /// Retrieves the field specified by `fieldPath`. /// - Parameter fieldPath: The field path (e.g., "foo" or "foo.bar"). /// - Returns: The data at the specified field location or `nil` if no such field exists. public func get(_ field: Field) -> Sendable? { - return Helper.convertObjCToSwift(bridge.get(field.fieldName)) + return Helper.convertObjCToSwift(bridge.get( + field.fieldName, + serverTimestampBehavior: serverTimestamp + )) } } diff --git a/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipeline.swift b/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipeline.swift index b0335f363be..49969d49ff7 100644 --- a/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipeline.swift +++ b/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipeline.swift @@ -21,30 +21,6 @@ import Foundation @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) public struct PipelineListenOptions: Sendable, Equatable, Hashable { - /// Defines how to handle server-generated timestamps that are not yet known locally - /// during latency compensation. - public struct ServerTimestampBehavior: Sendable, Equatable, Hashable { - /// The raw string value for the behavior, used for implementation and hashability. - let rawValue: String - - /// Creates a new behavior with a private raw value. - private init(rawValue: String) { - self.rawValue = rawValue - } - - /// Fields dependent on server timestamps will be `nil` until the value is - /// confirmed by the server. - public static let none = ServerTimestampBehavior(rawValue: "none") - - /// Fields dependent on server timestamps will receive a local, client-generated - /// time estimate until the value is confirmed by the server. - public static let estimate = ServerTimestampBehavior(rawValue: "estimate") - - /// Fields dependent on server timestamps will hold the value from the last - /// server-confirmed write until the new value is confirmed. - public static let previous = ServerTimestampBehavior(rawValue: "previous") - } - // MARK: - Stored Properties /// The desired behavior for handling pending server timestamps. @@ -70,16 +46,31 @@ public struct PipelineListenOptions: Sendable, Equatable, Hashable { self.includeMetadataChanges = includeMetadataChanges self.source = source bridge = __PipelineListenOptionsBridge( - serverTimestampBehavior: (self.serverTimestamps ?? .none).rawValue, + serverTimestampBehavior: PipelineListenOptions + .toRawValue(servertimestamp: self.serverTimestamps ?? .none), includeMetadata: self.includeMetadataChanges ?? false, source: self.source ?? ListenSource.default ) } + + private static func toRawValue(servertimestamp: ServerTimestampBehavior) -> String { + switch servertimestamp { + case .none: + return "none" + case .estimate: + return "estimate" + case .previous: + return "previous" + @unknown default: + fatalError("Unknown server timestamp behavior") + } + } } @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) public struct RealtimePipeline: @unchecked Sendable { private var stages: [Stage] + let bridge: RealtimePipelineBridge let db: Firestore @@ -93,14 +84,18 @@ public struct RealtimePipeline: @unchecked Sendable { listener: @escaping (RealtimePipelineSnapshot?, Error?) -> Void) -> ListenerRegistration { return bridge.addSnapshotListener(options: options.bridge) { snapshotBridge, error in - listener( - RealtimePipelineSnapshot( - // TODO(pipeline): this needs to be fixed - snapshotBridge!, - pipeline: self - ), - error - ) + if snapshotBridge != nil { + listener( + RealtimePipelineSnapshot( + snapshotBridge!, + pipeline: self, + options: options + ), + error + ) + } else { + listener(nil, error) + } } } diff --git a/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipelineSnapshot.swift b/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipelineSnapshot.swift index 2c5748065de..8fe4cbbf4c0 100644 --- a/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipelineSnapshot.swift +++ b/Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipelineSnapshot.swift @@ -31,12 +31,17 @@ public struct RealtimePipelineSnapshot: Sendable { public let metadata: SnapshotMetadata let bridge: __RealtimePipelineSnapshotBridge + private var options: PipelineListenOptions - init(_ bridge: __RealtimePipelineSnapshotBridge, pipeline: RealtimePipeline) { + init(_ bridge: __RealtimePipelineSnapshotBridge, + pipeline: RealtimePipeline, + options: PipelineListenOptions) { self.bridge = bridge self.pipeline = pipeline + self.options = options metadata = bridge.metadata - results_cache = self.bridge.results.map { PipelineResult($0) } + results_cache = self.bridge.results + .map { PipelineResult($0, options.serverTimestamps ?? .none) } changes = self.bridge.changes.map { PipelineResultChange($0) } } diff --git a/Firestore/Swift/Tests/Integration/RealtimePipelineTests.swift b/Firestore/Swift/Tests/Integration/RealtimePipelineTests.swift index 4e781e886f4..4d93c4da922 100644 --- a/Firestore/Swift/Tests/Integration/RealtimePipelineTests.swift +++ b/Firestore/Swift/Tests/Integration/RealtimePipelineTests.swift @@ -167,10 +167,9 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase { } func testBasicAsyncStream() async throws { - let collRef = collectionRef( - withDocuments: bookDocs - ) - let db = collRef.firestore + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) let pipeline = db .realtimePipeline() @@ -181,7 +180,7 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase { var iterator = stream.makeAsyncIterator() let firstSnapshot = try await iterator.next() - XCTAssertEqual(firstSnapshot!.metadata.isFromCache, false) + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) XCTAssertEqual(firstSnapshot!.results().count, 3) XCTAssertEqual(firstSnapshot!.results().first?.get("title") as? String, "Dune") XCTAssertEqual(firstSnapshot!.results()[1].get("title") as? String, "Pride and Prejudice") @@ -336,4 +335,250 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase { XCTAssertEqual(secondSnapshot!.results().count, 3) XCTAssertEqual(secondSnapshot!.changes.count, 0) } + + func testCanReadServerTimestampEstimateProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "rating": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("title").eq("The Hitchhiker's Guide to the Galaxy")) + .snapshotStream(options: PipelineListenOptions(serverTimestamps: .estimate)) + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertNotNil(result.get("rating") as? Timestamp) + XCTAssertEqual(result.get("rating") as? Timestamp, result.data["rating"] as? Timestamp) + + enableNetwork() + + let secondSnapshot = try await iterator.next() + XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false) + XCTAssertNotEqual( + secondSnapshot!.results()[0].get("rating") as? Timestamp, + result.data["rating"] as? Timestamp + ) + } + + func testCanEvaluateServerTimestampEstimateProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + let now = Constant(Timestamp(date: Date())) + // Using the non-async version + collRef.document("book1").updateData([ + "rating": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("rating").timestampAdd(Constant("second"), Constant(1)).gt(now)) + .snapshotStream( + options: PipelineListenOptions(serverTimestamps: .estimate, includeMetadataChanges: true) + ) + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertNotNil(result.get("rating") as? Timestamp) + XCTAssertEqual(result.get("rating") as? Timestamp, result.data["rating"] as? Timestamp) + + // TODO(pipeline): Enable this when watch supports timestampAdd + // enableNetwork() + // + // let secondSnapshot = try await iterator.next() + // XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false) + // XCTAssertNotEqual( + // secondSnapshot!.results()[0].get("rating") as? Timestamp, + // result.data["rating"] as? Timestamp + // ) + } + + func testCanReadServerTimestampPreviousProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "rating": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("title").eq("The Hitchhiker's Guide to the Galaxy")) + .snapshotStream(options: PipelineListenOptions(serverTimestamps: .previous)) + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertNotNil(result.get("rating") as? Double) + XCTAssertEqual(result.get("rating") as! Double, 4.2) + XCTAssertEqual(result.get("rating") as! Double, result.data["rating"] as! Double) + + enableNetwork() + + let secondSnapshot = try await iterator.next() + XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false) + XCTAssertNotNil(secondSnapshot!.results()[0].get("rating") as? Timestamp) + } + + func testCanEvaluateServerTimestampPreviousProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "title": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("title").eq("The Hitchhiker's Guide to the Galaxy")) + .snapshotStream( + options: PipelineListenOptions(serverTimestamps: .previous) + ) + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertEqual(result.get("title") as? String, "The Hitchhiker's Guide to the Galaxy") + + // TODO(pipeline): Enable this when watch supports timestampAdd + // enableNetwork() + } + + func testCanReadServerTimestampNoneProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "rating": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("title").eq("The Hitchhiker's Guide to the Galaxy")) + // .none is the default behavior + .snapshotStream() + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertNil(result.get("rating") as? Timestamp) + XCTAssertEqual(result.get("rating") as? Timestamp, result.data["rating"] as? Timestamp) + + enableNetwork() + + let secondSnapshot = try await iterator.next() + XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false) + XCTAssertNotNil(secondSnapshot!.results()[0].get("rating") as? Timestamp) + } + + func testCanEvaluateServerTimestampNoneProperly() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "title": FieldValue.serverTimestamp(), + ]) { _ in } + + let stream = db.realtimePipeline().collection(collRef.path) + .where(Field("title").isNull()) + .snapshotStream( + ) + + var iterator = stream.makeAsyncIterator() + + let firstSnapshot = try await iterator.next() + let result = firstSnapshot!.results()[0] + XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true) + XCTAssertNil(result.get("title") as? String) + + // TODO(pipeline): Enable this when watch supports timestampAdd + // enableNetwork() + } + + func testSamePipelineWithDifferetnOptions() async throws { + let db = self.db + let collRef = collectionRef() + writeAllDocuments(bookDocs, toCollection: collRef) + + disableNetwork() + + // Using the non-async version + collRef.document("book1").updateData([ + "title": FieldValue.serverTimestamp(), + ]) { _ in } + + let pipeline = db.realtimePipeline().collection(collRef.path) + .where(Field("title").isNotNull()) + .limit(1) + + let stream1 = pipeline + .snapshotStream( + options: PipelineListenOptions(serverTimestamps: .previous) + ) + + var iterator1 = stream1.makeAsyncIterator() + + let firstSnapshot1 = try await iterator1.next() + var result1 = firstSnapshot1!.results()[0] + XCTAssertEqual(firstSnapshot1!.metadata.isFromCache, true) + XCTAssertEqual(result1.get("title") as? String, "The Hitchhiker's Guide to the Galaxy") + + let stream2 = pipeline + .snapshotStream( + options: PipelineListenOptions(serverTimestamps: .estimate) + ) + + var iterator2 = stream2.makeAsyncIterator() + + let firstSnapshot2 = try await iterator2.next() + var result2 = firstSnapshot2!.results()[0] + XCTAssertEqual(firstSnapshot2!.metadata.isFromCache, true) + XCTAssertNotNil(result2.get("title") as? Timestamp) + + enableNetwork() + + let secondSnapshot1 = try await iterator1.next() + result1 = secondSnapshot1!.results()[0] + XCTAssertEqual(secondSnapshot1!.metadata.isFromCache, false) + XCTAssertNotNil(result1.get("title") as? Timestamp) + + let secondSnapshot2 = try await iterator2.next() + result2 = secondSnapshot2!.results()[0] + XCTAssertEqual(secondSnapshot2!.metadata.isFromCache, false) + XCTAssertNotNil(result2.get("title") as? Timestamp) + } } diff --git a/Firestore/core/src/api/realtime_pipeline.cc b/Firestore/core/src/api/realtime_pipeline.cc index 9a944d4575c..743c64aa2b1 100644 --- a/Firestore/core/src/api/realtime_pipeline.cc +++ b/Firestore/core/src/api/realtime_pipeline.cc @@ -37,7 +37,8 @@ RealtimePipeline::RealtimePipeline(const RealtimePipeline& other) : stages_(other.stages_), rewritten_stages_(other.rewritten_stages_), serializer_(std::make_unique( - other.serializer_->database_id())) { + other.serializer_->database_id())), + listen_options_(other.listen_options()) { } RealtimePipeline& RealtimePipeline::operator=(const RealtimePipeline& other) { @@ -46,6 +47,7 @@ RealtimePipeline& RealtimePipeline::operator=(const RealtimePipeline& other) { rewritten_stages_ = other.rewritten_stages_; serializer_ = std::make_unique(other.serializer_->database_id()); + listen_options_ = other.listen_options(); } return *this; } @@ -70,7 +72,7 @@ RealtimePipeline::rewritten_stages() const { } EvaluateContext RealtimePipeline::evaluate_context() const { - return EvaluateContext(serializer_.get()); + return EvaluateContext(serializer_.get(), listen_options_); } } // namespace api diff --git a/Firestore/core/src/api/realtime_pipeline.h b/Firestore/core/src/api/realtime_pipeline.h index afa036c0245..dab00a1c335 100644 --- a/Firestore/core/src/api/realtime_pipeline.h +++ b/Firestore/core/src/api/realtime_pipeline.h @@ -23,6 +23,7 @@ #include "Firestore/core/src/api/api_fwd.h" #include "Firestore/core/src/api/stages.h" #include "Firestore/core/src/core/core_fwd.h" +#include "Firestore/core/src/core/listen_options.h" namespace firebase { namespace firestore { @@ -47,14 +48,21 @@ class RealtimePipeline { EvaluateContext evaluate_context() const; - std::unique_ptr AddSnapshotListener( - core::ListenOptions options, - api::RealtimePipelineSnapshotListener&& listener); + RealtimePipeline WithListenOptions(const core::ListenOptions& options) const { + RealtimePipeline result(*this); + result.listen_options_ = options; + return result; + } + + const core::ListenOptions& listen_options() const { + return listen_options_; + } private: std::vector> stages_; std::vector> rewritten_stages_; std::unique_ptr serializer_; + core::ListenOptions listen_options_; }; } // namespace api diff --git a/Firestore/core/src/api/stages.h b/Firestore/core/src/api/stages.h index f4e7d1c4269..110fd5d6b91 100644 --- a/Firestore/core/src/api/stages.h +++ b/Firestore/core/src/api/stages.h @@ -29,6 +29,7 @@ #include "Firestore/core/src/api/api_fwd.h" #include "Firestore/core/src/api/expressions.h" #include "Firestore/core/src/api/ordering.h" +#include "Firestore/core/src/core/listen_options.h" #include "Firestore/core/src/model/model_fwd.h" #include "Firestore/core/src/model/resource_path.h" #include "Firestore/core/src/nanopb/message.h" @@ -53,16 +54,22 @@ class Stage { class EvaluateContext { public: - explicit EvaluateContext(remote::Serializer* serializer) - : serializer_(serializer) { + explicit EvaluateContext(remote::Serializer* serializer, + core::ListenOptions options) + : serializer_(serializer), listen_options_(std::move(options)) { } const remote::Serializer& serializer() const { return *serializer_; } + const core::ListenOptions& listen_options() const { + return listen_options_; + } + private: remote::Serializer* serializer_; + core::ListenOptions listen_options_; }; class EvaluableStage : public Stage { diff --git a/Firestore/core/src/core/expressions_eval.cc b/Firestore/core/src/core/expressions_eval.cc index 33cbc95d9da..4ae269cec62 100644 --- a/Firestore/core/src/core/expressions_eval.cc +++ b/Firestore/core/src/core/expressions_eval.cc @@ -31,12 +31,13 @@ #include "Firestore/core/src/api/expressions.h" #include "Firestore/core/src/api/stages.h" #include "Firestore/core/src/model/mutable_document.h" +#include "Firestore/core/src/model/server_timestamp_util.h" #include "Firestore/core/src/model/value_util.h" // For value helpers like IsArray, DeepClone #include "Firestore/core/src/nanopb/message.h" // Added for MakeMessage #include "Firestore/core/src/remote/serializer.h" #include "Firestore/core/src/util/hard_assert.h" +#include "Firestore/core/src/util/log.h" #include "absl/strings/ascii.h" // For AsciiStrToLower/ToUpper (if needed later) -#include "absl/strings/internal/utf8.h" #include "absl/strings/match.h" // For StartsWith, EndsWith, StrContains #include "absl/strings/str_cat.h" // For StrAppend #include "absl/strings/strip.h" // For StripAsciiWhitespace @@ -312,6 +313,32 @@ std::unique_ptr FunctionToEvaluable( HARD_FAIL("Unsupported function name: %s", function.name()); } +namespace { + +nanopb::Message GetServerTimestampValue( + const api::EvaluateContext& context, + const google_firestore_v1_Value& timestamp_sentinel) { + if (context.listen_options().server_timestamp_behavior() == + ListenOptions::ServerTimestampBehavior::kEstimate) { + google_firestore_v1_Value result; + result.which_value_type = google_firestore_v1_Value_timestamp_value_tag; + result.timestamp_value = model::GetLocalWriteTime(timestamp_sentinel); + return nanopb::MakeMessage(result); + } + + if (context.listen_options().server_timestamp_behavior() == + ListenOptions::ServerTimestampBehavior::kPrevious) { + auto result = model::GetPreviousValue(timestamp_sentinel); + if (result.has_value()) { + return model::DeepClone(result.value()); + } + } + + return nanopb::MakeMessage(model::NullValue()); +} + +} // namespace + EvaluateResult CoreField::Evaluate( const api::EvaluateContext& context, const model::PipelineInputOutput& input) const { @@ -340,6 +367,11 @@ EvaluateResult CoreField::Evaluate( // Return 'UNSET' if the field doesn't exist, otherwise the Value. const auto& result = input.field(field->field_path()); if (result.has_value()) { + if (model::IsServerTimestamp(result.value())) { + return EvaluateResult::NewValue( + GetServerTimestampValue(context, result.value())); + } + // DeepClone the field value to avoid modifying the original. return EvaluateResult::NewValue(model::DeepClone(result.value())); } else { diff --git a/Firestore/core/src/core/query_listener.cc b/Firestore/core/src/core/query_listener.cc index 2bedfc3fdd2..97245c82c2b 100644 --- a/Firestore/core/src/core/query_listener.cc +++ b/Firestore/core/src/core/query_listener.cc @@ -68,6 +68,9 @@ QueryListener::QueryListener(QueryOrPipeline query, : query_(std::move(query)), options_(std::move(options)), listener_(std::move(listener)) { + if (query_.IsPipeline()) { + query_ = QueryOrPipeline(query_.pipeline().WithListenOptions(options_)); + } } bool QueryListener::OnViewSnapshot(ViewSnapshot snapshot) { diff --git a/Firestore/core/src/core/query_listener.h b/Firestore/core/src/core/query_listener.h index 05d441d312e..47da4418f28 100644 --- a/Firestore/core/src/core/query_listener.h +++ b/Firestore/core/src/core/query_listener.h @@ -60,10 +60,14 @@ class QueryListener { virtual ~QueryListener() = default; - const QueryOrPipeline& query() const { + QueryOrPipeline& query() { return query_; } + ListenOptions listen_options() { + return options_; + } + bool listens_to_remote_store() const { return options_.source() != ListenSource::Cache; } diff --git a/Firestore/core/test/unit/local/query_engine_test.cc b/Firestore/core/test/unit/local/query_engine_test.cc index 49d20421103..01734b2c5ce 100644 --- a/Firestore/core/test/unit/local/query_engine_test.cc +++ b/Firestore/core/test/unit/local/query_engine_test.cc @@ -42,6 +42,8 @@ #include "Firestore/core/src/model/precondition.h" #include "Firestore/core/src/model/snapshot_version.h" #include "Firestore/core/src/remote/serializer.h" +#include "Firestore/core/test/unit/core/pipeline/utils.h" +#include "Firestore/core/test/unit/testutil/expression_test_util.h" #include "Firestore/core/test/unit/testutil/testutil.h" namespace firebase { @@ -107,6 +109,11 @@ const PatchMutation kDocAEmptyPatch = PatchMutation( const SnapshotVersion kLastLimboFreeSnapshot = Version(10); const SnapshotVersion kMissingLastLimboFreeSnapshot = SnapshotVersion::None(); +std::unique_ptr TestSerializer() { + return std::make_unique( + model::DatabaseId("test-project")); +} + } // namespace DocumentMap TestLocalDocumentsView::GetDocumentsMatchingQuery( @@ -217,6 +224,21 @@ DocumentSet QueryEngineTestBase::RunQuery( return view.ApplyChanges(view_doc_changes).snapshot()->documents(); } +DocumentSet QueryEngineTestBase::RunPipeline( + const api::RealtimePipeline& pipeline, + const SnapshotVersion& last_limbo_free_snapshot_version) { + DocumentKeySet remote_keys = target_cache_->GetMatchingKeys(kTestTargetId); + auto core_pipeline = core::QueryOrPipeline(pipeline); + const auto docs = query_engine_.GetDocumentsMatchingQuery( + core_pipeline, last_limbo_free_snapshot_version, remote_keys); + + // The View is always constructed based on the original query's intent, + // regardless of whether it was executed as a query or pipeline. + View view(core_pipeline, DocumentKeySet()); + ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(docs, {}); + return view.ApplyChanges(view_doc_changes).snapshot()->documents(); +} + QueryEngineTest::QueryEngineTest() : QueryEngineTestBase(GetParam().persistence_factory()) { // Initialize should_use_pipeline_ from the parameter for the specific test @@ -1006,6 +1028,118 @@ TEST_P(QueryEngineTest, InAndNotInFiltersWithObjectValues) { }); } +TEST_P(QueryEngineTest, HandlesServerTimestampNone) { + persistence_->Run("HandlesServerTimestampNone", [&] { + mutation_queue_->Start(); + index_manager_->Start(); + + AddDocuments({kMatchingDocA, kMatchingDocB}); + AddMutation(testutil::PatchMutation( + "coll/a", Map(), + std::vector>{ + {"timestamp", model::ServerTimestampTransform()}})); + + auto pipeline = api::RealtimePipeline( + {std::make_shared("coll")}, TestSerializer()); + pipeline = pipeline.AddingStage(std::make_shared( + testutil::IsNullExpr({std::make_shared("timestamp")}))); + + DocumentSet result1 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result1.size(), 1); + // NOTE: we cannot directly compare the contents of the document because the + // resulting document has the server timestamp sentinel (a special map) as + // the field. + EXPECT_EQ(result1.GetFirstDocument().value().get().key(), + testutil::Key("coll/a")); + + pipeline = pipeline.WithListenOptions(core::ListenOptions( + false, false, false, api::ListenSource::Default, + core::ListenOptions::ServerTimestampBehavior::kNone)); + DocumentSet result2 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result2.size(), 1); + // NOTE: we cannot directly compare the contents of the document because the + // resulting document has the server timestamp sentinel (a special map) as + // the field. + EXPECT_EQ(result2.GetFirstDocument().value().get().key(), + testutil::Key("coll/a")); + }); +} + +TEST_P(QueryEngineTest, HandlesServerTimestampEstimate) { + persistence_->Run("HandlesServerTimestampEstimate", [&] { + mutation_queue_->Start(); + index_manager_->Start(); + + AddDocuments({kMatchingDocA /*, kMatchingDocB*/}); + AddMutation(testutil::PatchMutation( + "coll/a", Map(), + std::vector>{ + {"timestamp", model::ServerTimestampTransform()}})); + + auto pipeline = api::RealtimePipeline( + {std::make_shared("coll")}, TestSerializer()); + pipeline = pipeline.AddingStage(std::make_shared( + testutil::GtExpr({testutil::TimestampToUnixMillisExpr( + {std::make_shared("timestamp")}), + testutil::SharedConstant(testutil::Value(1000))}))); + + DocumentSet result1 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result1.size(), 0); + + auto pipeline2 = pipeline.WithListenOptions(core::ListenOptions( + false, false, false, api::ListenSource::Default, + core::ListenOptions::ServerTimestampBehavior::kEstimate)); + DocumentSet result2 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline2, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result2.size(), 1); + // NOTE: we cannot directly compare the contents of the document because the + // resulting document has the server timestamp sentinel (a special map) as + // the field. + EXPECT_EQ(result2.GetFirstDocument().value().get().key(), + testutil::Key("coll/a")); + }); +} + +TEST_P(QueryEngineTest, HandlesServerTimestampPrevious) { + persistence_->Run("HandlesServerTimestampPrevious", [&] { + mutation_queue_->Start(); + index_manager_->Start(); + + AddDocuments({kMatchingDocA, kMatchingDocB}); + AddMutation(testutil::PatchMutation( + "coll/a", Map(), + std::vector>{ + {"matches", model::ServerTimestampTransform()}})); + + auto pipeline = api::RealtimePipeline( + {std::make_shared("coll")}, TestSerializer()); + pipeline = pipeline.AddingStage(std::make_shared( + testutil::EqExpr({std::make_shared("matches"), + testutil::SharedConstant(testutil::Value(true))}))); + + DocumentSet result1 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result1.size(), 1); + EXPECT_EQ(result1.GetFirstDocument().value().get().key(), + testutil::Key("coll/b")); + + auto pipeline2 = pipeline.WithListenOptions(core::ListenOptions( + false, false, false, api::ListenSource::Default, + core::ListenOptions::ServerTimestampBehavior::kPrevious)); + DocumentSet result2 = ExpectFullCollectionScan( + [&] { return RunPipeline(pipeline2, kMissingLastLimboFreeSnapshot); }); + EXPECT_EQ(result2.size(), 2); + // NOTE: we cannot directly compare the contents of the document because the + // resulting document has the server timestamp sentinel (a special map) as + // the field. + EXPECT_EQ(result2.GetFirstDocument().value().get().key(), + testutil::Key("coll/a")); + }); +} + } // namespace local } // namespace firestore } // namespace firebase diff --git a/Firestore/core/test/unit/local/query_engine_test.h b/Firestore/core/test/unit/local/query_engine_test.h index 77c552d0aed..8c42588c6e6 100644 --- a/Firestore/core/test/unit/local/query_engine_test.h +++ b/Firestore/core/test/unit/local/query_engine_test.h @@ -125,6 +125,10 @@ class QueryEngineTestBase : public testing::Test { api::RealtimePipeline ConvertQueryToPipeline(const core::Query& query); + model::DocumentSet RunPipeline( + const api::RealtimePipeline& pipeline, + const model::SnapshotVersion& last_limbo_free_snapshot_version); + std::unique_ptr persistence_; bool should_use_pipeline_ = false; // Flag to indicate if pipeline conversion should be attempted. diff --git a/Firestore/core/test/unit/testutil/expression_test_util.h b/Firestore/core/test/unit/testutil/expression_test_util.h index a2cb4cd604e..fab0296b44a 100644 --- a/Firestore/core/test/unit/testutil/expression_test_util.h +++ b/Firestore/core/test/unit/testutil/expression_test_util.h @@ -530,7 +530,7 @@ static remote::Serializer serializer(model::DatabaseId("test-project")); // Creates a default evaluation context. inline api::EvaluateContext NewContext() { - return EvaluateContext{&serializer}; + return EvaluateContext{&serializer, core::ListenOptions()}; } // Helper function to evaluate an expression and return the result.