diff --git a/modules/devguide/examples/java/Analytics.java b/modules/devguide/examples/java/Analytics.java index 870008bb..53eea80f 100644 --- a/modules/devguide/examples/java/Analytics.java +++ b/modules/devguide/examples/java/Analytics.java @@ -25,20 +25,17 @@ // tag::imports[] import com.couchbase.client.core.error.CouchbaseException; -import com.couchbase.client.java.Scope; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.Scope; import com.couchbase.client.java.analytics.AnalyticsResult; import com.couchbase.client.java.analytics.AnalyticsScanConsistency; import com.couchbase.client.java.analytics.ReactiveAnalyticsResult; import com.couchbase.client.java.json.JsonArray; import com.couchbase.client.java.json.JsonObject; -import org.reactivestreams.Subscription; -import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Mono; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import static com.couchbase.client.java.analytics.AnalyticsOptions.analyticsOptions; // end::imports[] @@ -199,41 +196,8 @@ public static void asyncExamples(String... args) { .subscribe(row -> System.out.println("Found row: " + row)); // end::simplereactive[] } - - System.out.println("backpressure"); - // tag::backpressure[] - Mono result = cluster - .reactive() - .analyticsQuery("select * from `huge-dataset`"); - - result - .flatMapMany(ReactiveAnalyticsResult::rowsAsObject) - .subscribe(new BaseSubscriber() { - // Number of outstanding requests - final AtomicInteger outstanding = new AtomicInteger(0); - - @Override - protected void hookOnSubscribe(Subscription subscription) { - request(10); // initially request to rows - outstanding.set(10); - } - - @Override - protected void hookOnNext(JsonObject value) { - process(value); - if (outstanding.decrementAndGet() == 0) { - request(10); - outstanding.set(10); - } - } - }); - // end::backpressure[] } } - static void process(JsonObject value) { - - } - } diff --git a/modules/devguide/examples/java/Queries.java b/modules/devguide/examples/java/Queries.java index 4f458044..4af1a53a 100644 --- a/modules/devguide/examples/java/Queries.java +++ b/modules/devguide/examples/java/Queries.java @@ -17,29 +17,19 @@ // NB: This example requires the `travel-sample` bucket to be installed. // tag::imports[] -import static com.couchbase.client.java.query.QueryOptions.queryOptions; - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -import com.couchbase.client.core.error.CouchbaseException; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; -import com.couchbase.client.java.Collection; import com.couchbase.client.java.Scope; import com.couchbase.client.java.json.JsonArray; import com.couchbase.client.java.json.JsonObject; -import com.couchbase.client.java.kv.MutationResult; -import com.couchbase.client.java.kv.MutationState; -import com.couchbase.client.java.query.QueryOptions; import com.couchbase.client.java.query.QueryResult; import com.couchbase.client.java.query.QueryScanConsistency; import com.couchbase.client.java.query.ReactiveQueryResult; +import reactor.core.publisher.Mono; -import org.reactivestreams.Subscription; +import java.util.UUID; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Mono; +import static com.couchbase.client.java.query.QueryOptions.queryOptions; // end::imports[] public class Queries { @@ -138,32 +128,6 @@ public static void main(String[] args) throws Exception { // end::simplereactive[] } - { - System.out.println("\nExample: [backpressure]"); - // tag::backpressure[] - Mono result = cluster.reactive().query("select * from `travel-sample`.inventory.route"); - - result.flatMapMany(ReactiveQueryResult::rowsAsObject).subscribe(new BaseSubscriber() { - // Number of outstanding requests - final AtomicInteger oustanding = new AtomicInteger(0); - - @Override - protected void hookOnSubscribe(Subscription subscription) { - request(10); // initially request to rows - oustanding.set(10); - } - - @Override - protected void hookOnNext(JsonObject value) { - process(value); - if (oustanding.decrementAndGet() == 0) { - request(10); - } - } - }); - // end::backpressure[] - } - { System.out.println("\nExample: [scope-level-query]"); // tag::scope-level-query[] @@ -178,8 +142,4 @@ protected void hookOnNext(JsonObject value) { } - static void process(JsonObject value) { - - } - } diff --git a/modules/devguide/examples/java/Search.java b/modules/devguide/examples/java/Search.java index 71012127..58e5a409 100644 --- a/modules/devguide/examples/java/Search.java +++ b/modules/devguide/examples/java/Search.java @@ -14,11 +14,6 @@ * limitations under the License. */ -import static com.couchbase.client.java.search.SearchOptions.searchOptions; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import com.couchbase.client.core.error.CouchbaseException; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; @@ -37,10 +32,14 @@ import com.couchbase.client.java.search.sort.SearchSort; import com.couchbase.client.java.search.vector.VectorQuery; import com.couchbase.client.java.search.vector.VectorSearch; -import org.reactivestreams.Subscription; -import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Mono; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.couchbase.client.java.search.SearchOptions.searchOptions; + // This example assumes an index called `travel-sample-index` exists. // Please refer to file `modules/test/scripts/init-couchbase/init-buckets.sh` (line 66) // for the relevant curl command to create this. @@ -149,32 +148,6 @@ public static void main(String... args) { // end::simplereactive[] } - { - // tag::backpressure[] - Mono result = cluster.reactive().searchQuery("travel-sample-index", - SearchQuery.queryString("swanky")); - - result.flatMapMany(ReactiveSearchResult::rows).subscribe(new BaseSubscriber() { - // Number of outstanding requests - final AtomicInteger oustanding = new AtomicInteger(0); - - @Override - protected void hookOnSubscribe(Subscription subscription) { - request(10); // initially request to rows - oustanding.set(10); - } - - @Override - protected void hookOnNext(SearchRow row) { - process(row); - if (oustanding.decrementAndGet() == 0) { - request(10); - } - } - }); - // end::backpressure[] - } - // This will come from an external source, such as an embeddings API. float[] vectorQuery = null; float[] anotherVectorQuery = null; @@ -218,8 +191,4 @@ protected void hookOnNext(SearchRow row) { } - static void process(SearchRow value) { - - } - } diff --git a/modules/howtos/pages/analytics-using-sdk.adoc b/modules/howtos/pages/analytics-using-sdk.adoc index c1810506..e8d65472 100644 --- a/modules/howtos/pages/analytics-using-sdk.adoc +++ b/modules/howtos/pages/analytics-using-sdk.adoc @@ -190,18 +190,7 @@ A simple reactive query is similar to the blocking one: include::devguide:example$java/Analytics.java[tag=simplereactive,indent=0] ---- -This query will stream all rows as they become available from the server. If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls. - -[source,java] ----- -include::devguide:example$java/Analytics.java[tag=backpressure,indent=0] ----- - -In this example we initially request a batch size of 10 rows (so streaming can begin). -Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process. -Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded. -Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled. -We always recommend not blocking in the first place in reactive code. +This query will stream all rows as they become available from the server, automatically applying backpressure as necessary. == Scoped Queries on Named Collections diff --git a/modules/howtos/pages/full-text-searching-with-sdk.adoc b/modules/howtos/pages/full-text-searching-with-sdk.adoc index 3d6a484b..93452b02 100644 --- a/modules/howtos/pages/full-text-searching-with-sdk.adoc +++ b/modules/howtos/pages/full-text-searching-with-sdk.adoc @@ -303,16 +303,4 @@ A simple reactive query is similar to the blocking one: include::devguide:example$java/Search.java[tag=simplereactive,indent=0] ---- -This Search query will stream all rows as they become available form the server. -If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls. - -[source,java] ----- -include::devguide:example$java/Search.java[tag=backpressure,indent=0] ----- - -In this example we initially request a batch size of 10 rows (so streaming can begin). -Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process. -Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded. -Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled. -We always recommend not blocking in the first place in reactive code. +This Search query will stream all rows as they become available from the server, automatically applying backpressure as necessary. diff --git a/modules/howtos/pages/sqlpp-queries-with-sdk.adoc b/modules/howtos/pages/sqlpp-queries-with-sdk.adoc index cda70de4..a2ec3e6e 100644 --- a/modules/howtos/pages/sqlpp-queries-with-sdk.adoc +++ b/modules/howtos/pages/sqlpp-queries-with-sdk.adoc @@ -238,19 +238,7 @@ A simple reactive query is similar to the blocking one: include::devguide:example$java/Queries.java[tag=simplereactive,indent=0] ---- -This query will stream all rows as they become available form the server. -If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls. - -[source,java] ----- -include::devguide:example$java/Queries.java[tag=backpressure,indent=0] ----- - -In this example we initially request a batch size of 10 rows (so streaming can begin). -Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process. -Then a counter is decremented and once all of the 10 outstanding rows are processed another batch is loaded. -Please note that if your `process()` method equivalent is blocking, like always with reactive code, you *must* move it onto another scheduler so that the I/O threads are not stalled. -As always we recommend not blocking in the first place in reactive code. +This query will stream all rows as they become available from the server, automatically applying backpressure as necessary. == Querying at Scope Level