Skip to content

Commit ee21848

Browse files
mostroverkhovrobertroeser
authored andcommitted
Improve Lease support with sources for sent and received Leases (#37)
* Introduce FrameSender which releases queued frames on cancel/termination basic interactions stress tests cleanup readme * remove commented code * shorten method name & update readme * Improve Lease support with sources for sent and received Leases * complete sent and received Lease streams on connection close * Lease: add initialAllowedRequests property * Simplified RSocketFactory api, added javadocs
1 parent 2ee23a6 commit ee21848

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+749
-480
lines changed

rsocket-core/src/main/kotlin/io/rsocket/kotlin/Availability.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ package io.rsocket.kotlin
1616
interface Availability {
1717

1818
/**
19-
* @return a positive numbers representing the availability of the entity. Higher is better, 0.0
20-
* means not available
19+
* @return a positive numbers representing the availability of the entity.
20+
* Higher is better, 0.0 means not available
2121
*/
2222
fun availability(): Double
2323
}
Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,23 @@
11
package io.rsocket.kotlin
22

3-
class ClientOptions : Options<ClientOptions>() {
3+
class ClientOptions {
44

5-
override fun copy(): ClientOptions =
6-
ClientOptions().streamRequestLimit(streamRequestLimit())
5+
private var streamRequestLimit: Int = 128
6+
7+
fun streamRequestLimit(streamRequestLimit: Int): ClientOptions {
8+
assertRequestLimit(streamRequestLimit)
9+
this.streamRequestLimit = streamRequestLimit
10+
return this
11+
}
12+
13+
internal fun streamRequestLimit(): Int = streamRequestLimit
14+
15+
fun copy(): ClientOptions = ClientOptions()
16+
.streamRequestLimit(streamRequestLimit)
17+
18+
private fun assertRequestLimit(streamRequestLimit: Int) {
19+
if (streamRequestLimit <= 0) {
20+
throw IllegalArgumentException("stream request limit must be positive")
21+
}
22+
}
723
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/DefaultPayload.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class DefaultPayload @JvmOverloads constructor(data: ByteBuffer,
8181
return _metadata
8282
}
8383

84-
override fun hasMetadata(): Boolean = _metadata != null
84+
override val hasMetadata: Boolean = _metadata != null
8585

8686
companion object {
8787

rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package io.rsocket.kotlin
22

33
import java.util.concurrent.TimeUnit
44

5+
/**
6+
* Represents duration with different time units
7+
*/
58
data class Duration(private val value: Long, val unit: TimeUnit) {
69

710
val millis = unit.toMillis(value)

rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
236236
metadataMimeType: String,
237237
dataMimeType: String,
238238
payload: Payload): Frame {
239-
val metadata = if (payload.hasMetadata())
239+
val metadata = if (payload.hasMetadata)
240240
Unpooled.wrappedBuffer(payload.metadata)
241241
else
242242
Unpooled.EMPTY_BUFFER
@@ -424,7 +424,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
424424
if (initialRequestN < 1) {
425425
throw IllegalStateException("initial request n must be greater than 0")
426426
}
427-
val metadata = if (payload.hasMetadata()) Unpooled.wrappedBuffer(payload.metadata) else null
427+
val metadata = if (payload.hasMetadata) Unpooled.wrappedBuffer(payload.metadata) else null
428428
val data = Unpooled.wrappedBuffer(payload.data)
429429

430430
val frame = RECYCLER.get()
@@ -531,8 +531,8 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
531531
fun from(streamId: Int,
532532
type: FrameType,
533533
payload: Payload,
534-
flags: Int = if (payload.hasMetadata()) FLAGS_M else 0): Frame {
535-
val metadata = if (payload.hasMetadata()) Unpooled.wrappedBuffer(payload.metadata) else null
534+
flags: Int = if (payload.hasMetadata) FLAGS_M else 0): Frame {
535+
val metadata = if (payload.hasMetadata) Unpooled.wrappedBuffer(payload.metadata) else null
536536
val data = Unpooled.wrappedBuffer(payload.data)
537537
return from(streamId, type, metadata, data, flags)
538538
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/FrameType.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.rsocket.kotlin
1717

18-
/** Types of [Frame] that can be sent. */
18+
/** RSocket [Frame] types */
1919
enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
2020
// blank type that is not defined
2121
UNDEFINED(0x00),
@@ -44,15 +44,13 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
4444
Flags.IS_REQUEST_TYPE or
4545
Flags.HAS_INITIAL_REQUEST_N or
4646
Flags.IS_FRAGMENTABLE),
47-
// Requester mid-stream
4847
REQUEST_N(0x08),
4948
CANCEL(0x09, Flags.CAN_HAVE_METADATA),
50-
// Responder
5149
PAYLOAD(0x0A, Flags.CAN_HAVE_METADATA_AND_DATA),
5250
ERROR(0x0B, Flags.CAN_HAVE_METADATA_AND_DATA),
53-
// Requester & Responder
51+
// Requester & Responder metadata push
5452
METADATA_PUSH(0x0C, Flags.CAN_HAVE_METADATA),
55-
// Resumption frames, not yet implemented
53+
// Resumption frames
5654
RESUME(0x0D),
5755
RESUME_OK(0x0E),
5856
// synthetic types from Responder for use by the rest of the machinery

rsocket-core/src/main/kotlin/io/rsocket/kotlin/InterceptorOptions.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,23 @@ package io.rsocket.kotlin
33
import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor
44
import io.rsocket.kotlin.interceptors.RSocketInterceptor
55

6+
/**
7+
* Configures RSocket interceptors
8+
*/
69
interface InterceptorOptions {
710

11+
/**
12+
* @param interceptor adds [DuplexConnectionInterceptor]
13+
*/
814
fun connection(interceptor: DuplexConnectionInterceptor)
915

16+
/**
17+
* @param interceptor adds [RSocketInterceptor] for requester [RSocket]
18+
*/
1019
fun requester(interceptor: RSocketInterceptor)
1120

21+
/**
22+
* @param interceptor adds [RSocketInterceptor] for handler (responder) [RSocket]
23+
*/
1224
fun handler(interceptor: RSocketInterceptor)
1325
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
11
package io.rsocket.kotlin
22

3+
/**
4+
* Configures keep-alive feature of RSocket
5+
*/
36
interface KeepAlive {
47

8+
/**
9+
* @return time between KEEPALIVE frames that the client will send
10+
*/
511
fun keepAliveInterval(): Duration
612

13+
/**
14+
* @return time that a client will allow a server to not respond to a
15+
* KEEPALIVE before it is assumed to be dead.
16+
*/
717
fun keepAliveMaxLifeTime(): Duration
818
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveData.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@ package io.rsocket.kotlin
22

33
import java.nio.ByteBuffer
44

5+
/**
6+
* Provides means to handle data sent by client with KEEPALIVE frame
7+
*/
58
interface KeepAliveData {
69

10+
/**
11+
* @return supplier of Keep-alive data [ByteBuffer] sent by client
12+
*/
713
fun producer(): () -> ByteBuffer
814

15+
/**
16+
* @return consumer of Keep-alive data [ByteBuffer] returned by server
17+
*/
918
fun handler(): (ByteBuffer) -> Unit
1019
}
1120

rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,60 @@ package io.rsocket.kotlin
22

33
import io.rsocket.kotlin.internal.EmptyKeepAliveData
44

5+
/**
6+
* Configures Keep-alive feature of RSocket
7+
*/
58
class KeepAliveOptions : KeepAlive {
69
private var interval: Duration = Duration.ofMillis(100)
710
private var maxLifeTime: Duration = Duration.ofSeconds(1)
811
private var keepAliveData: KeepAliveData = EmptyKeepAliveData()
912

13+
/**
14+
* @param interval time between KEEPALIVE frames that the client will send
15+
* @return this [KeepAliveOptions]
16+
*/
1017
fun keepAliveInterval(interval: Duration): KeepAliveOptions {
1118
assertDuration(interval, "keepAliveInterval")
1219
this.interval = interval
1320
return this
1421
}
1522

23+
/**
24+
* @return time between KEEPALIVE frames that the client will send
25+
*/
1626
override fun keepAliveInterval() = interval
1727

28+
/**
29+
* @param maxLifetime time that a client will allow a server to not respond to a
30+
* KEEPALIVE before it is assumed to be dead.
31+
*/
1832
fun keepAliveMaxLifeTime(maxLifetime: Duration): KeepAliveOptions {
1933
assertDuration(maxLifetime, "keepAliveMaxLifeTime")
2034
this.maxLifeTime = maxLifetime
2135
return this
2236
}
2337

38+
/**
39+
* @return time that a client will allow a server to not respond to a
40+
* KEEPALIVE before it is assumed to be dead.
41+
*/
2442
override fun keepAliveMaxLifeTime() = maxLifeTime
2543

44+
/**
45+
* Provides means to handle data sent by client with KEEPALIVE frame
46+
*
47+
* @param keepAliveData utility for handling data sent by client with KEEPALIVE
48+
* frame
49+
* @return this KeepAliveOptions
50+
*/
2651
fun keepAliveData(keepAliveData: KeepAliveData): KeepAliveOptions {
2752
this.keepAliveData = keepAliveData
2853
return this
2954
}
3055

56+
/**
57+
* @return utility for handling data sent by client with KEEPALIVE
58+
*/
3159
fun keepAliveData(): KeepAliveData = keepAliveData
3260

3361
fun copy(): KeepAliveOptions = KeepAliveOptions()

0 commit comments

Comments
 (0)