Skip to content

Commit d61b4d1

Browse files
committed
add(EventSourceClient) : add a callback when the connection is closed
- add the method in the interface - implement doing nothing by default
1 parent e6c53e9 commit d61b4d1

File tree

3 files changed

+19
-0
lines changed

3 files changed

+19
-0
lines changed

src/main/java/io/streamdata/demo/Main.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public static void main(String... args) throws URISyntaxException, InterruptedEx
3030
.onSnapshot(data -> logger.info("INITIAL SNAPSHOT {}", data))
3131
.onPatch(patch -> logger.info("PATCH {} SNAPSHOT UPDATED {}", patch, eventSource.getCurrentSnapshot()))
3232
.onOpen(() -> logger.info("And we are... live!"))
33+
.onClose(() -> logger.info("Bye now!"))
3334
.open();
3435

3536
Thread.sleep(10000);

src/main/java/io/streamdata/sdk/EventSourceClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ public interface EventSourceClient {
3939
*/
4040
EventSourceClient onOpen(Runnable onOpen);
4141

42+
/**
43+
* Sets a optionnal callback to be called after the event source has been closed
44+
*
45+
* @param onClose the callback
46+
* @return this client instance for nice fluent api call
47+
*/
48+
EventSourceClient onClose(Runnable onClose);
49+
4250
/**
4351
* Sets a callback to be called after streamdata sends after the first time the API is polled.
4452
* <b>This callback must be set before calling {@link #open()}</b>

src/main/java/io/streamdata/sdk/impl/EventSourceClientImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class EventSourceClientImpl implements EventSourceClient {
3636

3737
// The polling URL
3838
private Runnable onOpenCallback;
39+
private Runnable onCloseCallback;
3940
private Consumer<JsonNode> onDataCallback;
4041
private Consumer<JsonNode> onPatchCallback;
4142
private Consumer<String> onErrorCallback = err -> LOGGER.error("A streamdata error has been sent from SSE : {}", err);
@@ -103,6 +104,12 @@ public EventSourceClient onOpen(Runnable callback) {
103104
return this;
104105
}
105106

107+
@Override
108+
public EventSourceClient onClose(Runnable callback) {
109+
this.onCloseCallback = callback;
110+
return this;
111+
}
112+
106113
@Override
107114
public EventSourceClient onSnapshot(Consumer<JsonNode> snaphot) {
108115
this.onDataCallback = snaphot;
@@ -131,6 +138,9 @@ public EventSourceClient onException(Consumer<Throwable> callback) {
131138
public void close() {
132139
if (this.eventSource != null) {
133140
this.eventSource.close();
141+
if (this.onCloseCallback != null) {
142+
this.onCloseCallback.run();
143+
}
134144
this.eventSource = null;
135145
}
136146
}

0 commit comments

Comments
 (0)