Skip to content

Commit 185e469

Browse files
mostroverkhovrobertroeser
authored andcommitted
Reworked fragmentation, Client & Server keep-alive improvements, RSocketFactory simplification (#29)
* Client and Server keep-alive implemented according to spec Introduce RSocket server contract interceptor; move Setup frame validation from RSocketFactory Move fragmentation from RSocketFactory to Connection interceptor RSocketFactory api simplification renames: RSocketClient -> RSocketRequester, RSocketServer -> RSocketResponder, PluginRegistry -> InterceptorRegistry rename RSocketInterceptor responder -> handler Release setup frame after decoding * Both peers apply fragmentation unconditionally Reimplemented fragmentation and reassembly logic as previously It did not distinguish between Payload and Request* frames and crashed consistently. Also made sure FrameFragmenter does not leak frame contents if unsubscribed before all fragments of frame are passed to subscriber * disable flaky test
1 parent cdcba75 commit 185e469

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1626
-1039
lines changed

rsocket-core/src/main/java/io/rsocket/android/Duration.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,19 @@ import java.util.concurrent.TimeUnit
88

99
data class Duration(val value: Long, val unit: TimeUnit) {
1010

11-
val toMillis = unit.toMillis(value)
11+
val millis = unit.toMillis(value)
12+
13+
val intMillis = unit.toMillis(value).toInt()
1214

1315
companion object {
14-
val ZERO = Duration(0, TimeUnit.MILLISECONDS)
16+
1517
fun ofSeconds(n: Long) = Duration(n, TimeUnit.SECONDS)
18+
19+
fun ofSeconds(n: Int) = Duration(n.toLong(), TimeUnit.SECONDS)
20+
1621
fun ofMillis(n: Long) = Duration(n, TimeUnit.MILLISECONDS)
22+
23+
fun ofMillis(n: Int) = Duration(n.toLong(), TimeUnit.MILLISECONDS)
1724
}
1825
}
1926

rsocket-core/src/main/java/io/rsocket/android/Frame.kt

Lines changed: 135 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,20 @@
1515
*/
1616
package io.rsocket.android
1717

18-
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
19-
20-
import io.netty.buffer.*
18+
import com.sun.org.apache.xpath.internal.operations.Bool
19+
import io.netty.buffer.ByteBuf
20+
import io.netty.buffer.ByteBufAllocator
21+
import io.netty.buffer.ByteBufHolder
22+
import io.netty.buffer.Unpooled
2123
import io.netty.util.IllegalReferenceCountException
2224
import io.netty.util.Recycler
2325
import io.netty.util.Recycler.Handle
2426
import io.netty.util.ResourceLeakDetector
25-
import io.rsocket.android.frame.ErrorFrameFlyweight
26-
import io.rsocket.android.frame.FrameHeaderFlyweight
27-
import io.rsocket.android.frame.KeepaliveFrameFlyweight
28-
import io.rsocket.android.frame.LeaseFrameFlyweight
29-
import io.rsocket.android.frame.RequestFrameFlyweight
30-
import io.rsocket.android.frame.RequestNFrameFlyweight
31-
import io.rsocket.android.frame.SetupFrameFlyweight
32-
import io.rsocket.android.frame.VersionFlyweight
27+
import io.rsocket.android.frame.*
28+
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
29+
import org.slf4j.LoggerFactory
3330
import java.nio.ByteBuffer
3431
import java.nio.charset.StandardCharsets
35-
import org.slf4j.LoggerFactory
3632

3733
/**
3834
* Represents a Frame sent over a [DuplexConnection].
@@ -51,10 +47,12 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
5147

5248
/** Return the content which is held by this [Frame]. */
5349
override fun content(): ByteBuf {
54-
if (content!!.refCnt() <= 0) {
55-
throw IllegalReferenceCountException(content!!.refCnt())
56-
}
57-
return content as ByteBuf
50+
val c = content
51+
return if (c == null) {
52+
throw IllegalReferenceCountException(0)
53+
} else if (c.refCnt() <= 0) {
54+
throw IllegalReferenceCountException(c.refCnt())
55+
} else content as ByteBuf
5856
}
5957

6058
/** Creates a deep copy of this [Frame]. */
@@ -79,7 +77,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
7977
* Returns the reference count of this object. If `0`, it means this object has been
8078
* deallocated.
8179
*/
82-
override fun refCnt(): Int = content!!.refCnt()
80+
override fun refCnt(): Int = content?.refCnt() ?: 0
8381

8482
/** Increases the reference count by `1`. */
8583
override fun retain(): Frame {
@@ -210,11 +208,17 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
210208
*/
211209
fun flags(): Int = FrameHeaderFlyweight.flags(content!!)
212210

213-
fun hasMetadata(): Boolean = isFlagSet(this.flags(), FLAGS_M)
211+
fun isFlagSet(flag: Int): Boolean {
212+
return isFlagSet(this.flags(), flag)
213+
}
214+
215+
fun hasMetadata(): Boolean = isFlagSet(FLAGS_M)
214216

215217
val dataUtf8: String
216218
get() = StandardCharsets.UTF_8.decode(data).toString()
217219

220+
val isFragmentable: Boolean
221+
get() = type.isFragmentable
218222
/* TODO:
219223
*
220224
* fromRequest(type, id, payload)
@@ -227,6 +231,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
227231

228232
fun from(
229233
flags: Int,
234+
version: Int,
230235
keepaliveInterval: Int,
231236
maxLifetime: Int,
232237
metadataMimeType: String,
@@ -250,6 +255,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
250255
SetupFrameFlyweight.encode(
251256
frame.content!!,
252257
flags,
258+
version,
253259
keepaliveInterval,
254260
maxLifetime,
255261
metadataMimeType,
@@ -259,6 +265,25 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
259265
return frame
260266
}
261267

268+
fun from(
269+
flags: Int,
270+
keepaliveInterval: Int,
271+
maxLifetime: Int,
272+
metadataMimeType: String,
273+
dataMimeType: String,
274+
payload: Payload): Frame {
275+
276+
return from(
277+
flags,
278+
SetupFrameFlyweight.CURRENT_VERSION,
279+
keepaliveInterval,
280+
maxLifetime,
281+
metadataMimeType,
282+
dataMimeType,
283+
payload)
284+
}
285+
286+
262287
fun getFlags(frame: Frame): Int {
263288
ensureFrameType(FrameType.SETUP, frame)
264289
val flags = FrameHeaderFlyweight.flags(frame.content!!)
@@ -271,6 +296,20 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
271296
return SetupFrameFlyweight.version(frame.content!!)
272297
}
273298

299+
fun resumeEnabled(frame: Frame): Boolean {
300+
ensureFrameType(FrameType.SETUP, frame)
301+
return Frame.isFlagSet(
302+
frame.flags(),
303+
SetupFrameFlyweight.FLAGS_RESUME_ENABLE)
304+
}
305+
306+
fun leaseEnabled(frame: Frame): Boolean {
307+
ensureFrameType(FrameType.SETUP, frame)
308+
return Frame.isFlagSet(
309+
frame.flags(),
310+
SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
311+
}
312+
274313
fun keepaliveInterval(frame: Frame): Int {
275314
ensureFrameType(FrameType.SETUP, frame)
276315
return SetupFrameFlyweight.keepaliveInterval(frame.content!!)
@@ -429,14 +468,14 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
429468
fun from(
430469
streamId: Int,
431470
type: FrameType,
432-
metadata: ByteBuf,
471+
metadata: ByteBuf?,
433472
data: ByteBuf,
434473
initialRequestN: Int,
435474
flags: Int): Frame {
436475
val frame = RECYCLER.get()
437476
frame.content = ByteBufAllocator.DEFAULT.buffer(
438477
RequestFrameFlyweight.computeFrameLength(
439-
type, metadata.readableBytes(), data.readableBytes()))
478+
type, metadata?.readableBytes(), data.readableBytes()))
440479
frame.content!!.writerIndex(
441480
RequestFrameFlyweight.encode(
442481
frame.content!!,
@@ -449,6 +488,21 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
449488
return frame
450489
}
451490

491+
fun from(streamId: Int,
492+
type: FrameType,
493+
metadata: ByteBuf?,
494+
data: ByteBuf,
495+
flags: Int): Frame {
496+
497+
return PayloadFrame.from(
498+
streamId,
499+
type,
500+
metadata,
501+
data,
502+
flags)
503+
}
504+
505+
452506
fun initialRequestN(frame: Frame): Int {
453507
val type = frame.type
454508
if (!type.isRequestType) {
@@ -538,6 +592,66 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
538592
}
539593
}
540594

595+
object Fragmentation {
596+
597+
fun assembleFrame(blueprintFrame: Frame,
598+
metadata: ByteBuf,
599+
data: ByteBuf): Frame =
600+
601+
create(blueprintFrame,
602+
metadata,
603+
data,
604+
{ it and FrameHeaderFlyweight.FLAGS_F.inv() })
605+
606+
fun sliceFrame(blueprintFrame: Frame,
607+
metadata: ByteBuf?,
608+
data: ByteBuf,
609+
additionalFlags: Int): Frame =
610+
611+
create(blueprintFrame,
612+
metadata,
613+
data,
614+
{ it or additionalFlags })
615+
616+
private inline fun create(blueprintFrame: Frame,
617+
metadata: ByteBuf?,
618+
data: ByteBuf,
619+
modifyFlags: (Int) -> Int): Frame =
620+
when (blueprintFrame.type) {
621+
FrameType.FIRE_AND_FORGET,
622+
FrameType.REQUEST_RESPONSE -> {
623+
Frame.Request.from(
624+
blueprintFrame.streamId,
625+
blueprintFrame.type,
626+
metadata,
627+
data,
628+
modifyFlags(blueprintFrame.flags()))
629+
}
630+
FrameType.NEXT,
631+
FrameType.NEXT_COMPLETE -> {
632+
Frame.PayloadFrame.from(
633+
blueprintFrame.streamId,
634+
blueprintFrame.type,
635+
metadata,
636+
data,
637+
modifyFlags(blueprintFrame.flags()))
638+
}
639+
640+
FrameType.REQUEST_STREAM,
641+
FrameType.REQUEST_CHANNEL -> {
642+
Frame.Request.from(
643+
blueprintFrame.streamId,
644+
blueprintFrame.type,
645+
metadata,
646+
data,
647+
Frame.Request.initialRequestN(blueprintFrame),
648+
modifyFlags(blueprintFrame.flags()))
649+
}
650+
else -> throw AssertionError("Non-fragmentable frame: " +
651+
"${blueprintFrame.type}")
652+
}
653+
}
654+
541655
override fun toString(): String {
542656
val type = FrameHeaderFlyweight.frameType(content!!)
543657
val payload = StringBuilder()
@@ -587,7 +701,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
587701
}
588702

589703
companion object {
590-
val NULL_BYTEBUFFER:ByteBuffer = ByteBuffer.allocateDirect(0)
704+
val NULL_BYTEBUFFER: ByteBuffer = ByteBuffer.allocateDirect(0)
591705

592706
private val RECYCLER = object : Recycler<Frame>() {
593707
override fun newObject(handle: Handle<Frame>): Frame {

rsocket-core/src/main/java/io/rsocket/android/FrameType.kt

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,26 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
2424
LEASE(0x02, Flags.CAN_HAVE_METADATA),
2525
KEEPALIVE(0x03, Flags.CAN_HAVE_DATA),
2626
// Requester to start request
27-
REQUEST_RESPONSE(0x04, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
28-
FIRE_AND_FORGET(0x05, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
27+
REQUEST_RESPONSE(0x04,
28+
Flags.CAN_HAVE_METADATA_AND_DATA or
29+
Flags.IS_REQUEST_TYPE or
30+
Flags.IS_FRAGMENTABLE),
31+
FIRE_AND_FORGET(0x05,
32+
Flags.CAN_HAVE_METADATA_AND_DATA or
33+
Flags.IS_REQUEST_TYPE or
34+
Flags.IS_FRAGMENTABLE),
2935
REQUEST_STREAM(
30-
0x06, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
36+
0x06,
37+
Flags.CAN_HAVE_METADATA_AND_DATA or
38+
Flags.IS_REQUEST_TYPE or
39+
Flags.HAS_INITIAL_REQUEST_N
40+
or Flags.IS_FRAGMENTABLE),
3141
REQUEST_CHANNEL(
32-
0x07, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
42+
0x07,
43+
Flags.CAN_HAVE_METADATA_AND_DATA or
44+
Flags.IS_REQUEST_TYPE or
45+
Flags.HAS_INITIAL_REQUEST_N or
46+
Flags.IS_FRAGMENTABLE),
3347
// Requester mid-stream
3448
REQUEST_N(0x08),
3549
CANCEL(0x09, Flags.CAN_HAVE_METADATA),
@@ -42,22 +56,28 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
4256
RESUME(0x0D),
4357
RESUME_OK(0x0E),
4458
// synthetic types from Responder for use by the rest of the machinery
45-
NEXT(0xA0, Flags.CAN_HAVE_METADATA_AND_DATA),
59+
NEXT(0xA0,
60+
Flags.CAN_HAVE_METADATA_AND_DATA or
61+
Flags.IS_FRAGMENTABLE),
4662
COMPLETE(0xB0),
47-
NEXT_COMPLETE(0xC0, Flags.CAN_HAVE_METADATA_AND_DATA),
63+
NEXT_COMPLETE(0xC0,
64+
Flags.CAN_HAVE_METADATA_AND_DATA or
65+
Flags.IS_FRAGMENTABLE),
4866
EXT(0xFFFF, Flags.CAN_HAVE_METADATA_AND_DATA);
4967

5068
private object Flags {
5169

52-
internal val CAN_HAVE_DATA = 1
53-
internal val CAN_HAVE_METADATA = 2
54-
internal val CAN_HAVE_METADATA_AND_DATA = 3
55-
internal val IS_REQUEST_TYPE = 4
56-
internal val HAS_INITIAL_REQUEST_N = 8
70+
internal const val CAN_HAVE_DATA = 1
71+
internal const val CAN_HAVE_METADATA = 2
72+
internal const val CAN_HAVE_METADATA_AND_DATA = 3
73+
internal const val IS_REQUEST_TYPE = 4
74+
internal const val HAS_INITIAL_REQUEST_N = 8
75+
internal const val IS_FRAGMENTABLE = 16
5776
}
5877

59-
val isRequestType: Boolean
60-
get() = Flags.IS_REQUEST_TYPE == flags and Flags.IS_REQUEST_TYPE
78+
val isFragmentable = Flags.IS_FRAGMENTABLE == (flags and Flags.IS_FRAGMENTABLE)
79+
80+
val isRequestType: Boolean = Flags.IS_REQUEST_TYPE == (flags and Flags.IS_REQUEST_TYPE)
6181

6282
fun hasInitialRequestN(): Boolean = Flags.HAS_INITIAL_REQUEST_N == flags and Flags.HAS_INITIAL_REQUEST_N
6383

rsocket-core/src/main/java/io/rsocket/android/RSocket.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ interface RSocket : Availability, Closeable {
5555
/**
5656
* Request-Channel interaction model of `RSocket`.
5757
*
58-
* @param payloads Stream of request payloads.
58+
* @param payloads Stream of send payloads.
5959
* @return Stream of response payloads.
6060
*/
6161
fun requestChannel(payloads: Publisher<Payload>): Flowable<Payload>

0 commit comments

Comments
 (0)