Skip to content

Commit b63fb27

Browse files
mostroverkhovrobertroeser
authored andcommitted
Lease support (#32)
* * Lease support * Do not expose full Setup frame contents to server acceptor * Fix fragmentation of frames with null metadata * fix fragmentation leaks
1 parent bd209a8 commit b63fb27

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1485
-162
lines changed

build.gradle

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,6 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) {
109109
gpg {
110110
sign = true
111111
}
112-
113-
mavenCentralSync {
114-
user = project.property('sonatypeUsername')
115-
password = project.property('sonatypePassword')
116-
}
117112
}
118113
}
119114
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,11 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
304304

305305
fun leaseEnabled(frame: Frame): Boolean {
306306
ensureFrameType(FrameType.SETUP, frame)
307-
return Frame.isFlagSet(
308-
frame.flags(),
309-
SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
307+
return SetupFrameFlyweight.supportsLease(frame.flags())
308+
}
309+
310+
fun enableLease(flags: Int): Int {
311+
return flags or SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
310312
}
311313

312314
fun keepaliveInterval(frame: Frame): Int {
@@ -594,7 +596,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
594596
object Fragmentation {
595597

596598
fun assembleFrame(blueprintFrame: Frame,
597-
metadata: ByteBuf,
599+
metadata: ByteBuf?,
598600
data: ByteBuf): Frame =
599601

600602
create(blueprintFrame,

rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ class KeepAliveOptions : KeepAlive {
2020

2121
override fun keepAliveMaxLifeTime() = maxLifeTime
2222

23+
fun copy(): KeepAliveOptions = KeepAliveOptions()
24+
.keepAliveInterval(interval)
25+
.keepAliveMaxLifeTime(maxLifeTime)
26+
2327
private fun assertDuration(duration: Duration, name: String) {
2428
if (duration.millis <= 0) {
2529
throw IllegalArgumentException("$name must be positive")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.rsocket.kotlin
2+
3+
import io.reactivex.Completable
4+
import java.nio.ByteBuffer
5+
6+
/** Provides means to grant lease to peer */
7+
interface LeaseRef {
8+
9+
fun grantLease(
10+
numberOfRequests: Int,
11+
ttlMillis: Long,
12+
metadata: ByteBuffer): Completable
13+
14+
fun grantLease(numberOfRequests: Int,
15+
timeToLiveMillis: Long): Completable
16+
17+
fun onClose(): Completable
18+
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ class MediaTypeOptions : MediaType {
2020

2121
override fun metadataMimeType(): String = metadataMimeType
2222

23+
fun copy(): MediaTypeOptions = MediaTypeOptions()
24+
.dataMimeType(dataMimeType)
25+
.metadataMimeType(metadataMimeType)
26+
2327
private fun assertMediaType(mediaType: String) {
2428
if (mediaType.isEmpty()) {
2529
throw IllegalArgumentException("media type must be non-empty")

rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt

Lines changed: 130 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import io.reactivex.Single
2121
import io.rsocket.kotlin.interceptors.GlobalInterceptors
2222
import io.rsocket.kotlin.internal.*
2323
import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor
24+
import io.rsocket.kotlin.internal.lease.ClientLeaseSupport
25+
import io.rsocket.kotlin.internal.lease.ServerLeaseSupport
2426
import io.rsocket.kotlin.transport.ClientTransport
2527
import io.rsocket.kotlin.transport.ServerTransport
2628
import io.rsocket.kotlin.util.AbstractRSocket
@@ -47,6 +49,7 @@ object RSocketFactory {
4749
private var acceptor: ClientAcceptor = { { emptyRSocket } }
4850
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
4951
private var mtu = 0
52+
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
5053
private val interceptors = GlobalInterceptors.create()
5154
private var flags = 0
5255
private var setupPayload: Payload = DefaultPayload.EMPTY
@@ -76,6 +79,12 @@ object RSocketFactory {
7679
return this
7780
}
7881

82+
fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
83+
this.flags = Frame.Setup.enableLease(flags)
84+
this.leaseRefConsumer = leaseRefConsumer
85+
return this
86+
}
87+
7988
fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory {
8089
this.errorConsumer = errorConsumer
8190
return this
@@ -93,49 +102,78 @@ object RSocketFactory {
93102
}
94103

95104
fun transport(transport: () -> ClientTransport): Start<RSocket> =
96-
ClientStart(transport, interceptors())
105+
clientStart(acceptor, transport)
106+
107+
fun transport(transport: ClientTransport): Start<RSocket> =
108+
transport { transport }
97109

98110
fun acceptor(acceptor: ClientAcceptor): ClientTransportAcceptor {
99111
this.acceptor = acceptor
100112
return object : ClientTransportAcceptor {
101113
override fun transport(transport: () -> ClientTransport)
102-
: Start<RSocket> =
103-
ClientStart(transport, interceptors())
114+
: Start<RSocket> = clientStart(acceptor, transport)
104115
}
105116
}
106117

107-
private fun interceptors(): InterceptorRegistry =
108-
interceptors.copyWith {
109-
it.connectionFirst(
110-
FragmentationInterceptor(mtu))
111-
}
118+
private fun clientStart(acceptor: ClientAcceptor,
119+
transport: () -> ClientTransport): ClientStart =
112120

113-
private inner class ClientStart(private val transportClient: () -> ClientTransport,
114-
private val interceptors: InterceptorRegistry)
121+
ClientStart(acceptor,
122+
errorConsumer,
123+
mtu,
124+
leaseRefConsumer,
125+
flags,
126+
setupPayload,
127+
keepAlive.copy(),
128+
mediaType.copy(),
129+
streamRequestLimit,
130+
transport,
131+
interceptors.copy())
132+
133+
private class ClientStart(
134+
private val acceptor: ClientAcceptor,
135+
private val errorConsumer: (Throwable) -> Unit,
136+
private var mtu: Int,
137+
private val leaseRef: ((LeaseRef) -> Unit)?,
138+
private val flags: Int,
139+
private val setupPayload: Payload,
140+
private val keepAlive: KeepAlive,
141+
private val mediaType: MediaType,
142+
private val streamRequestLimit: Int,
143+
private val transportClient: () -> ClientTransport,
144+
private val parentInterceptors: InterceptorRegistry)
115145
: Start<RSocket> {
116146

117147
override fun start(): Single<RSocket> {
118148
return transportClient()
119149
.connect()
120150
.flatMap { connection ->
121-
val setupFrame = createSetupFrame()
151+
152+
val withLease =
153+
enableLease(parentInterceptors)
154+
155+
val interceptors =
156+
enableFragmentation(withLease)
157+
158+
val interceptConnection = interceptors as InterceptConnection
159+
val interceptRSocket = interceptors as InterceptRSocket
122160

123161
val demuxer = ClientConnectionDemuxer(
124162
connection,
125-
interceptors)
163+
interceptConnection)
126164

127165
val rSocketRequester = RSocketRequester(
128166
demuxer.requesterConnection(),
129167
errorConsumer,
130168
ClientStreamIds(),
131169
streamRequestLimit)
132170

133-
val wrappedRequester = interceptors
171+
val wrappedRequester = interceptRSocket
134172
.interceptRequester(rSocketRequester)
135173

136174
val handlerRSocket = acceptor()(wrappedRequester)
137175

138-
val wrappedHandler = interceptors
176+
val wrappedHandler = interceptRSocket
139177
.interceptHandler(handlerRSocket)
140178

141179
RSocketResponder(
@@ -149,12 +187,21 @@ object RSocketFactory {
149187
keepAlive,
150188
errorConsumer)
151189

190+
val setupFrame = createSetupFrame()
191+
152192
connection
153193
.sendOne(setupFrame)
154194
.andThen(Single.just(wrappedRequester))
155195
}
156196
}
157197

198+
private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
199+
: InterceptorRegistry {
200+
parentInterceptors.connectionFirst(
201+
FragmentationInterceptor(mtu))
202+
return parentInterceptors
203+
}
204+
158205
private fun createSetupFrame(): Frame {
159206
return Frame.Setup.from(
160207
flags,
@@ -164,14 +211,23 @@ object RSocketFactory {
164211
mediaType.dataMimeType(),
165212
setupPayload)
166213
}
214+
215+
private fun enableLease(parentInterceptors: InterceptorRegistry)
216+
: InterceptorRegistry =
217+
if (leaseRef != null) {
218+
parentInterceptors.copyWith(
219+
ClientLeaseSupport.enable(leaseRef)())
220+
} else {
221+
parentInterceptors.copy()
222+
}
167223
}
168224
}
169225

170226
class ServerRSocketFactory internal constructor() {
171227

172-
private var acceptor: ServerAcceptor = { { _, _ -> Single.just(emptyRSocket) } }
173228
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
174229
private var mtu = 0
230+
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
175231
private val interceptors = GlobalInterceptors.create()
176232
private var streamRequestLimit = defaultStreamRequestLimit
177233

@@ -186,6 +242,11 @@ object RSocketFactory {
186242
return this
187243
}
188244

245+
fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
246+
this.leaseRefConsumer = leaseRefConsumer
247+
return this
248+
}
249+
189250
fun errorConsumer(errorConsumer: (Throwable) -> Unit): ServerRSocketFactory {
190251
this.errorConsumer = errorConsumer
191252
return this
@@ -197,26 +258,28 @@ object RSocketFactory {
197258
}
198259

199260
fun acceptor(acceptor: ServerAcceptor): ServerTransportAcceptor {
200-
this.acceptor = acceptor
201261
return object : ServerTransportAcceptor {
262+
202263
override fun <T : Closeable> transport(
203264
transport: () -> ServerTransport<T>): Start<T> =
204-
ServerStart(transport, interceptors())
205-
}
206-
}
207-
208-
private fun interceptors(): InterceptorRegistry {
209-
return interceptors.copyWith {
210-
it.connectionFirst(
211-
ServerContractInterceptor(errorConsumer))
212-
it.connectionFirst(
213-
FragmentationInterceptor(mtu))
265+
ServerStart(transport,
266+
acceptor,
267+
errorConsumer,
268+
mtu,
269+
leaseRefConsumer,
270+
interceptors.copy(),
271+
streamRequestLimit)
214272
}
215273
}
216274

217-
private inner class ServerStart<T : Closeable>(
275+
private class ServerStart<T : Closeable>(
218276
private val transportServer: () -> ServerTransport<T>,
219-
private val interceptors: InterceptorRegistry) : Start<T> {
277+
private val acceptor: ServerAcceptor,
278+
private val errorConsumer: (Throwable) -> Unit,
279+
private val mtu: Int,
280+
private val leaseRef: ((LeaseRef) -> Unit)?,
281+
private val parentInterceptors: InterceptorRegistry,
282+
private val streamRequestLimit: Int) : Start<T> {
220283

221284
override fun start(): Single<T> {
222285
return transportServer().start(object
@@ -225,25 +288,37 @@ object RSocketFactory {
225288
override fun invoke(duplexConnection: DuplexConnection)
226289
: Completable {
227290

291+
val withLease =
292+
enableLease(parentInterceptors)
293+
294+
val withServerContract =
295+
enableServerContract(withLease)
296+
297+
val interceptors =
298+
enableFragmentation(withServerContract)
299+
228300
val demuxer = ServerConnectionDemuxer(
229301
duplexConnection,
230-
interceptors)
302+
interceptors as InterceptConnection)
231303

232304
return demuxer
233305
.setupConnection()
234306
.receive()
235307
.firstOrError()
236308
.flatMapCompletable { setup ->
237-
accept(setup, demuxer)
309+
accept(setup,
310+
interceptors as InterceptRSocket,
311+
demuxer)
238312
}
239313
}
240314
})
241315
}
242316

243317
private fun accept(setupFrame: Frame,
318+
interceptors: InterceptRSocket,
244319
demuxer: ConnectionDemuxer): Completable {
245320

246-
val setup = Setup.create(setupFrame)
321+
val setup = SetupContents.create(setupFrame)
247322

248323
val rSocketRequester = RSocketRequester(
249324
demuxer.requesterConnection(),
@@ -272,6 +347,30 @@ object RSocketFactory {
272347
}
273348
.ignoreElement()
274349
}
350+
351+
private fun enableLease(parentInterceptors: InterceptorRegistry)
352+
: InterceptorRegistry =
353+
if (leaseRef != null) {
354+
parentInterceptors.copyWith(
355+
ServerLeaseSupport.enable(leaseRef)())
356+
} else {
357+
parentInterceptors.copy()
358+
}
359+
360+
private fun enableServerContract(parentInterceptors: InterceptorRegistry)
361+
: InterceptorRegistry {
362+
363+
parentInterceptors.connectionFirst(
364+
ServerContractInterceptor(errorConsumer, leaseRef != null))
365+
return parentInterceptors
366+
}
367+
368+
private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
369+
: InterceptorRegistry {
370+
parentInterceptors.connectionFirst(
371+
FragmentationInterceptor(mtu))
372+
return parentInterceptors
373+
}
275374
}
276375
}
277376

0 commit comments

Comments
 (0)