Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 1 addition & 37 deletions modules/devguide/examples/java/Analytics.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,17 @@

// tag::imports[]
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.analytics.AnalyticsResult;
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
import com.couchbase.client.java.analytics.ReactiveAnalyticsResult;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static com.couchbase.client.java.analytics.AnalyticsOptions.analyticsOptions;
// end::imports[]
Expand Down Expand Up @@ -199,41 +196,8 @@ public static void asyncExamples(String... args) {
.subscribe(row -> System.out.println("Found row: " + row));
// end::simplereactive[]
}

System.out.println("backpressure");
// tag::backpressure[]
Mono<ReactiveAnalyticsResult> result = cluster
.reactive()
.analyticsQuery("select * from `huge-dataset`");

result
.flatMapMany(ReactiveAnalyticsResult::rowsAsObject)
.subscribe(new BaseSubscriber<JsonObject>() {
// Number of outstanding requests
final AtomicInteger outstanding = new AtomicInteger(0);

@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // initially request to rows
outstanding.set(10);
}

@Override
protected void hookOnNext(JsonObject value) {
process(value);
if (outstanding.decrementAndGet() == 0) {
request(10);
outstanding.set(10);
}
}
});
// end::backpressure[]
}

}

static void process(JsonObject value) {

}

}
46 changes: 3 additions & 43 deletions modules/devguide/examples/java/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@
// NB: This example requires the `travel-sample` bucket to be installed.

// tag::imports[]
import static com.couchbase.client.java.query.QueryOptions.queryOptions;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.MutationState;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.query.QueryScanConsistency;
import com.couchbase.client.java.query.ReactiveQueryResult;
import reactor.core.publisher.Mono;

import org.reactivestreams.Subscription;
import java.util.UUID;

import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import static com.couchbase.client.java.query.QueryOptions.queryOptions;
// end::imports[]

public class Queries {
Expand Down Expand Up @@ -138,32 +128,6 @@ public static void main(String[] args) throws Exception {
// end::simplereactive[]
}

{
System.out.println("\nExample: [backpressure]");
// tag::backpressure[]
Mono<ReactiveQueryResult> result = cluster.reactive().query("select * from `travel-sample`.inventory.route");

result.flatMapMany(ReactiveQueryResult::rowsAsObject).subscribe(new BaseSubscriber<JsonObject>() {
// Number of outstanding requests
final AtomicInteger oustanding = new AtomicInteger(0);

@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // initially request to rows
oustanding.set(10);
}

@Override
protected void hookOnNext(JsonObject value) {
process(value);
if (oustanding.decrementAndGet() == 0) {
request(10);
}
}
});
// end::backpressure[]
}

{
System.out.println("\nExample: [scope-level-query]");
// tag::scope-level-query[]
Expand All @@ -178,8 +142,4 @@ protected void hookOnNext(JsonObject value) {

}

static void process(JsonObject value) {

}

}
43 changes: 6 additions & 37 deletions modules/devguide/examples/java/Search.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
* limitations under the License.
*/

import static com.couchbase.client.java.search.SearchOptions.searchOptions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
Expand All @@ -37,10 +32,14 @@
import com.couchbase.client.java.search.sort.SearchSort;
import com.couchbase.client.java.search.vector.VectorQuery;
import com.couchbase.client.java.search.vector.VectorSearch;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.couchbase.client.java.search.SearchOptions.searchOptions;

// This example assumes an index called `travel-sample-index` exists.
// Please refer to file `modules/test/scripts/init-couchbase/init-buckets.sh` (line 66)
// for the relevant curl command to create this.
Expand Down Expand Up @@ -149,32 +148,6 @@ public static void main(String... args) {
// end::simplereactive[]
}

{
// tag::backpressure[]
Mono<ReactiveSearchResult> result = cluster.reactive().searchQuery("travel-sample-index",
SearchQuery.queryString("swanky"));

result.flatMapMany(ReactiveSearchResult::rows).subscribe(new BaseSubscriber<SearchRow>() {
// Number of outstanding requests
final AtomicInteger oustanding = new AtomicInteger(0);

@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // initially request to rows
oustanding.set(10);
}

@Override
protected void hookOnNext(SearchRow row) {
process(row);
if (oustanding.decrementAndGet() == 0) {
request(10);
}
}
});
// end::backpressure[]
}

// This will come from an external source, such as an embeddings API.
float[] vectorQuery = null;
float[] anotherVectorQuery = null;
Expand Down Expand Up @@ -218,8 +191,4 @@ protected void hookOnNext(SearchRow row) {

}

static void process(SearchRow value) {

}

}
13 changes: 1 addition & 12 deletions modules/howtos/pages/analytics-using-sdk.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,7 @@ A simple reactive query is similar to the blocking one:
include::devguide:example$java/Analytics.java[tag=simplereactive,indent=0]
----

This query will stream all rows as they become available from the server. If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.

[source,java]
----
include::devguide:example$java/Analytics.java[tag=backpressure,indent=0]
----

In this example we initially request a batch size of 10 rows (so streaming can begin).
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded.
Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled.
We always recommend not blocking in the first place in reactive code.
This query will stream all rows as they become available from the server, automatically applying backpressure as necessary.

== Scoped Queries on Named Collections

Expand Down
14 changes: 1 addition & 13 deletions modules/howtos/pages/full-text-searching-with-sdk.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,4 @@ A simple reactive query is similar to the blocking one:
include::devguide:example$java/Search.java[tag=simplereactive,indent=0]
----

This Search query will stream all rows as they become available form the server.
If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.

[source,java]
----
include::devguide:example$java/Search.java[tag=backpressure,indent=0]
----

In this example we initially request a batch size of 10 rows (so streaming can begin).
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded.
Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled.
We always recommend not blocking in the first place in reactive code.
This Search query will stream all rows as they become available from the server, automatically applying backpressure as necessary.
14 changes: 1 addition & 13 deletions modules/howtos/pages/sqlpp-queries-with-sdk.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,7 @@ A simple reactive query is similar to the blocking one:
include::devguide:example$java/Queries.java[tag=simplereactive,indent=0]
----

This query will stream all rows as they become available form the server.
If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.

[source,java]
----
include::devguide:example$java/Queries.java[tag=backpressure,indent=0]
----

In this example we initially request a batch size of 10 rows (so streaming can begin).
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
Then a counter is decremented and once all of the 10 outstanding rows are processed another batch is loaded.
Please note that if your `process()` method equivalent is blocking, like always with reactive code, you *must* move it onto another scheduler so that the I/O threads are not stalled.
As always we recommend not blocking in the first place in reactive code.
This query will stream all rows as they become available from the server, automatically applying backpressure as necessary.


== Querying at Scope Level
Expand Down
Loading