Skip to content

Commit 2ee23a6

Browse files
mostroverkhovrobertroeser
authored andcommitted
Introduce FrameSender which releases queued frames on cancel/termination (#36)
* Introduce FrameSender which releases queued frames on cancel/termination basic interactions stress tests cleanup readme * remove commented code * shorten method name & update readme
1 parent 3ec8519 commit 2ee23a6

File tree

14 files changed

+729
-50
lines changed

14 files changed

+729
-50
lines changed

README.md

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ It enables following symmetric interaction models:
1111
* request/response (stream of 1)
1212
* request/stream (finite/infinite stream of many)
1313
* channel (bi-directional streams)
14-
15-
Also, metadata can be associated with stream or RSocket itself
14+
* per-stream and per-RSocket metadata
1615

1716
## Build and Binaries
1817

@@ -29,7 +28,7 @@ Also, metadata can be associated with stream or RSocket itself
2928
}
3029
```
3130
### Transports
32-
`Netty` based Websockets and TCP transport (`Client` and `Server`)
31+
`Netty` based Websockets and TCP transport (`Client` and `Server`)
3332
`OkHttp` based Websockets transport (`Client` only)
3433
```groovy
3534
dependencies {
@@ -44,22 +43,26 @@ Messages for all interactions are represented as `Payload` of binary (`NIO Byte
4443

4544
UTF-8 `text` payloads can be constructed as follows
4645
```kotlin
47-
val request = PayloadImpl.textPayload("data", "metadata")
46+
val request = DefaultPayload.text("data", "metadata")
4847
```
4948
Stream Metadata is optional
5049
```kotlin
51-
val request = PayloadImpl.textPayload("data")
50+
val request = DefaultPayload.text("data")
5251
```
5352
#### Interactions
54-
Fire and Forget
53+
* Fire and Forget
5554
`RSocket.fireAndForget(payload: Payload): Completable`
56-
Request-Response
55+
56+
* Request-Response
5757
`RSocket.requestResponse(payload: Payload): Single<Payload>`
58-
Request-Stream
58+
59+
* Request-Stream
5960
`RSocket.requestStream(payload: Payload): Flowable<Payload>`
60-
Request-Channel
61+
62+
* Request-Channel
6163
`RSocket.requestChannel(payload: Publisher<Payload>): Flowable<Payload>`
62-
Metadata-Push
64+
65+
* Metadata-Push
6366
`fun metadataPush(payload: Payload): Completable`
6467

6568
#### Client
@@ -75,13 +78,13 @@ val request = PayloadImpl.textPayload("data")
7578
private fun handler(requester:RSocket): RSocket {
7679
return object : AbstractRSocket() {
7780
override fun requestStream(payload: Payload): Flowable<Payload> {
78-
return Flowable.just(PayloadImpl.textPayload("client handler response"))
81+
return Flowable.just(DefaultPayload.text("client handler response"))
7982
}
8083
}
8184
}
8285
```
8386
#### Server
84-
Accepts `Connections` from `Clients`
87+
Server is acceptor of `Connections` from `Clients`
8588
```kotlin
8689
val closeable: Single<Closeable> = RSocketFactory
8790
.receive()
@@ -93,7 +96,7 @@ val closeable: Single<Closeable> = RSocketFactory
9396
private fun handler(setup: Setup, rSocket: RSocket): Single<RSocket> {
9497
return Single.just(object : AbstractRSocket() {
9598
override fun requestStream(payload: Payload): Flowable<Payload> {
96-
return Flowable.just(PayloadImpl.textPayload("server handler response"))
99+
return Flowable.just(DefaultPayload.text("server handler response"))
97100
}
98101
})
99102
}

build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ subprojects {
3030
group = 'io.rsocket.kotlin'
3131
version = '0.9-SNAPSHOT'
3232

33+
test {
34+
testLogging {
35+
events "failed"
36+
exceptionFormat "full"
37+
}
38+
}
39+
3340
task sourcesJar(type: Jar, dependsOn: classes) {
3441
classifier = 'sources'
3542
from sourceSets.main.allSource

rsocket-core/src/main/kotlin/io/rsocket/kotlin/DefaultPayload.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class DefaultPayload @JvmOverloads constructor(data: ByteBuffer,
9595
* @param data the data of the payload.
9696
* @return a payload.
9797
*/
98-
fun textPayload(data: String): Payload = DefaultPayload(data)
98+
fun text(data: String): Payload = DefaultPayload(data)
9999

100100
/**
101101
* Static factory method for a text payload. Mainly looks better than
@@ -105,7 +105,7 @@ class DefaultPayload @JvmOverloads constructor(data: ByteBuffer,
105105
* @param metadata the metadata for the payload.
106106
* @return a payload.
107107
*/
108-
fun textPayload(data: String, metadata: String?): Payload =
108+
fun text(data: String, metadata: String?): Payload =
109109
DefaultPayload(data, metadata)
110110
}
111111
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.rsocket.kotlin.internal
2+
3+
import io.reactivex.Flowable
4+
import io.rsocket.kotlin.Frame
5+
6+
internal class FrameSender {
7+
private val sender = FrameUnicastProcessor
8+
.create()
9+
.toSerialized()
10+
11+
fun sent(): Flowable<Frame> = sender
12+
13+
fun send(frame: Frame) {
14+
sender.onNext(frame)
15+
}
16+
}

0 commit comments

Comments
 (0)