Skip to content

Commit 845b45e

Browse files
committed
refactor(*) : change base package + add facade interface to create clients
- change jdk to sdk - create StreamdataClient a facade interface to create clients - rename StreamApiClient in RxJavaEventSourceClient - rename RxJavaEventSourceClient.toObservable() in toFlowable() - rename EventSourceClient.getCurrentData() en getCurrentSnaphot() - make Event an internal class of RxJavaEventSourceClient
1 parent f974207 commit 845b45e

File tree

9 files changed

+238
-222
lines changed

9 files changed

+238
-222
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ hs_err_pid*
1616
target/
1717

1818
*.iml
19+
20+
logs/

src/main/java/io/streamdata/demo/Main.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package io.streamdata.demo;
22

33
import io.reactivex.disposables.Disposable;
4-
import io.streamdata.jdk.EventSourceClient;
5-
import io.streamdata.jdk.StreamApiClient;
4+
import io.streamdata.sdk.EventSourceClient;
5+
import io.streamdata.sdk.RxJavaEventSourceClient;
6+
import io.streamdata.sdk.StreamdataClient;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89

@@ -22,12 +23,12 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
2223
*/
2324
{
2425

25-
EventSourceClient eventSource = EventSourceClient.createEventSource(apiURL, appKey);
26+
EventSourceClient eventSource = StreamdataClient.createClient(apiURL, appKey);
2627
eventSource
2728
.addHeader("X-MYAPI-HEADER", "Polled-By-SD.io")
2829
.addHeader("X-MYAPI-HEADER2", "SomeStuffs")
2930
.onSnapshot(data -> logger.info("INITIAL SNAPSHOT {}", data))
30-
.onPatch(patch -> logger.info("PATCH {} SNAPSHOT UPDATED {}", patch, eventSource.getCurrentData()))
31+
.onPatch(patch -> logger.info("PATCH {} SNAPSHOT UPDATED {}", patch, eventSource.getCurrentSnapshot()))
3132
.onOpen(() -> logger.info("And we are... live!"))
3233
.open();
3334

@@ -41,7 +42,7 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
4142
* */
4243
{
4344

44-
EventSourceClient eventSource = EventSourceClient.createEventSource(apiURL, appKey);
45+
EventSourceClient eventSource = StreamdataClient.createClient(apiURL, appKey);
4546
eventSource.incrementalCache(false)
4647
.onSnapshot(data -> logger.info("INITIAL SNAPSHOT {}", data))
4748
.onPatch(System.out::println) // useless
@@ -54,13 +55,14 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
5455
}
5556

5657
/*
57-
* Using StreamApiClient
58+
* Using RxJavaEventSourceClient
5859
*/
5960
{
60-
StreamApiClient streamApiClient = StreamApiClient.createEventStream(apiURL, appKey);
61+
RxJavaEventSourceClient rxJavaEventSourceClient = StreamdataClient.createRxJavaClient(apiURL, appKey);
6162
Disposable disposable =
62-
streamApiClient.addHeader("X-MYAPI-HEADER", "Polled By SD.io")
63-
.toObservable(null) // TODO create new method
63+
rxJavaEventSourceClient.addHeader("X-MYAPI-HEADER", "Polled By SD.io")
64+
.incrementalCache(true) // same behavior as default
65+
.toFlowable()
6466
.subscribe(event -> {
6567
if (event.isSnapshot()) {
6668
logger.info("RX INITIAL SNAPSHOT {}", event.getSnapshot());

src/main/java/io/streamdata/jdk/Event.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

src/main/java/io/streamdata/jdk/StreamApiClient.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

src/main/java/io/streamdata/jdk/EventSourceClient.java renamed to src/main/java/io/streamdata/sdk/EventSourceClient.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
package io.streamdata.jdk;
1+
package io.streamdata.sdk;
22

33
import com.fasterxml.jackson.databind.JsonNode;
4-
import io.streamdata.jdk.impl.EventSourceClientImpl;
54

6-
import java.net.URISyntaxException;
75
import java.util.concurrent.Future;
86
import java.util.function.Consumer;
97

@@ -14,17 +12,6 @@ public interface EventSourceClient {
1412

1513
String SD_PROXY_URL = "https://streamdata.motwin.net/";
1614

17-
/**
18-
* Create an event source for a apiUrl
19-
*
20-
* @param apiUrl the url to be polled
21-
* @param appKey the app key that will be passed to the proxy
22-
* @return a client to be
23-
* @throws URISyntaxException if the URL to poll is not a valid URL
24-
*/
25-
static EventSourceClient createEventSource(String apiUrl, String appKey) throws URISyntaxException {
26-
return new EventSourceClientImpl(apiUrl, appKey);
27-
}
2815

2916

3017
/**
@@ -39,7 +26,7 @@ static EventSourceClient createEventSource(String apiUrl, String appKey) throws
3926

4027
/**
4128
* <p>Allow to enable or disable incremental cache. <b>By default incremental cache is enabled</b> thus the following happens : a Snapshot is sent back to the user followed by successive patches of this snapshot.</p>
42-
* <p>If set to false a snapshot will be sent every time, no patch is sent. This means that {@link Event#getPatch()} will return null <b>Use this only for low frequency polling</b></p>
29+
* <p>If set to false a snapshot will be sent every time, no patch is sent. This means that {@link RxJavaEventSourceClient.Event#getPatch()} will return null <b>Use this only for low frequency polling</b></p>
4330
* <p>Behind the scene it adds the header <code>text/event-stream</code> for patches or <code>application/json</code> for non-incremental cache</p>
4431
*
4532
* @param enableIncrementalCache a boolean to allow incremental cache (default : true)
@@ -64,7 +51,7 @@ static EventSourceClient createEventSource(String apiUrl, String appKey) throws
6451
EventSourceClient onSnapshot(Consumer<JsonNode> snaphot);
6552

6653
/**
67-
* Sets a callback to be called every time streamdata pushes a patch. The patch is applied behind the scenes and can be accessed in a thread safe fashion using {@link #getCurrentData()}
54+
* Sets a callback to be called every time streamdata pushes a patch. The patch is applied behind the scenes and can be accessed in a thread safe fashion using {@link #getCurrentSnapshot()}
6855
* * <b>This callback must be set before calling {@link #open()}</b>
6956
*
7057
* @param onOpen the callback
@@ -94,11 +81,11 @@ static EventSourceClient createEventSource(String apiUrl, String appKey) throws
9481
EventSourceClient onException(Consumer<Throwable> callback);
9582

9683
/**
97-
* Get the data (initial or after a patch is received and applied)
84+
* Get the snapshot (initial or after a patch is received and applied)
9885
*
99-
* @return the most fresh data available
86+
* @return the most fresh snapshot available
10087
*/
101-
JsonNode getCurrentData();
88+
JsonNode getCurrentSnapshot();
10289

10390

10491
/**

0 commit comments

Comments
 (0)