diff --git a/communicator-api/build.gradle.kts b/communicator-api/build.gradle.kts index f93cde4..9a06f1a 100644 --- a/communicator-api/build.gradle.kts +++ b/communicator-api/build.gradle.kts @@ -2,8 +2,12 @@ internal val junitVersion: String by project internal val ktorVersion: String by project +internal val serializationVersion: String by project -plugins { kotlin(module = "multiplatform") } +plugins { + kotlin("multiplatform") + kotlin("plugin.serialization") +} kotlin { explicitApi() @@ -20,11 +24,17 @@ kotlin { with(languageSettings) { useExperimentalAnnotation("kotlin.ExperimentalUnsignedTypes") useExperimentalAnnotation("kotlin.contracts.ExperimentalContracts") + useExperimentalAnnotation("kotlinx.serialization.ExperimentalSerializationApi") + useExperimentalAnnotation("kotlinx.serialization.InternalSerializationApi") } } commonMain { - dependencies { api("io.ktor:ktor-io:$ktorVersion") } + dependencies { + api("org.jetbrains.kotlinx:kotlinx-serialization-cbor:$serializationVersion") + api("org.jetbrains.kotlinx:kotlinx-serialization-json:$serializationVersion") + api("io.ktor:ktor-io:$ktorVersion") + } } commonTest { diff --git a/communicator-api/src/commonMain/kotlin/BasicCodecs.kt b/communicator-api/src/commonMain/kotlin/BasicCodecs.kt new file mode 100644 index 0000000..8ba4456 --- /dev/null +++ b/communicator-api/src/commonMain/kotlin/BasicCodecs.kt @@ -0,0 +1,293 @@ +package space.kscience.communicator.api + +import io.ktor.utils.io.* +import io.ktor.utils.io.core.ByteReadPacket +import kotlinx.serialization.* +import kotlinx.serialization.cbor.Cbor +import kotlinx.serialization.json.Json +import kotlin.reflect.KClass + +private suspend fun ByteReadChannel.copyAvailable(): ByteArray = + ByteArray(availableForRead).also { readFully(it) } + +private val EMPTY_BYTE_ARRAY = ByteArray(0) +private val UNIT_AND_ZERO = Unit to 0 + +/** + * Binds [Unit] to `common/unit` codec. + */ +public object UnitCodec : Codec { + override val identity: String + get() = "common/unit" + + override suspend fun decode(payload: Payload): Pair = UNIT_AND_ZERO + override suspend fun encode(value: Unit): Payload = EMPTY_BYTE_ARRAY +} + +/** + * Binds [Int] to `common/i32` codec. + */ +public object IntCodec : Codec { + override val identity: String + get() = "common/i32" + + override suspend fun decode(payload: Payload): Pair = ByteReadChannel(payload).readInt() to Int.SIZE_BYTES + override suspend fun encode(value: Int): Payload = ByteChannel(true).also { it.writeInt(value) }.copyAvailable() +} + +/** + * Binds [Long] to `common/i64`. + */ +public object LongCodec : Codec { + override val identity: String + get() = "common/i64" + + override suspend fun decode(payload: Payload): Pair = + ByteReadChannel(payload).readLong() to Long.SIZE_BYTES + + override suspend fun encode(value: Long): Payload = + ByteChannel(true).also { it.writeLong(value) }.copyAvailable() +} + +/** + * Binds [ULong] to `common/u64`. + */ +public object ULongCodec : Codec { + override val identity: String + get() = "common/u64" + + override suspend fun decode(payload: Payload): Pair = + ByteReadChannel(payload).readLong().toULong() to ULong.SIZE_BYTES + + override suspend fun encode(value: ULong): Payload = + ByteChannel(true).also { it.writeLong(value.toLong()) }.copyAvailable() +} + +/** + * Binds [Float] to `common/f32`. + */ +public object FloatCodec : Codec { + override val identity: String + get() = "common/f32" + + override suspend fun decode(payload: Payload): Pair = + ByteReadChannel(payload).readFloat() to Float.SIZE_BYTES + + override suspend fun encode(value: Float): Payload = + ByteChannel(true).also { it.writeFloat(value) }.copyAvailable() +} + +/** + * Binds [Double] to `common/f64`. + */ +public object DoubleCodec : Codec { + override val identity: String + get() = "common/f64" + + override suspend fun decode(payload: Payload): Pair = + ByteReadChannel(payload).readDouble() to Double.SIZE_BYTES + + override suspend fun encode(value: Double): Payload = + ByteChannel(true).also { it.writeDouble(value) }.copyAvailable() +} + +/** + * Binds [String] to `common/utf8`. + */ +public object StringCodec : SizedCodec() { + override val identity: String + get() = "common/utf8" + + override suspend fun customEncode(value: String): Payload = value.encodeToByteArray() + override suspend fun customDecode(payload: Payload): String = payload.decodeToString() +} + +/** + * Allows to create create codecs which write the payload size before serialized value. + * + * @param T the type of decoded and encoded object. + */ +public abstract class SizedCodec : Codec { + public final override suspend fun encode(value: T): Payload { + val out = ByteChannel(true) + val encoded = customEncode(value) + out.writeInt(encoded.size) + out.writePacket(ByteReadPacket(encoded)) + return out.copyAvailable() + } + + /** + * Custom serialization function. + */ + protected abstract suspend fun customEncode(value: T): Payload + + public final override suspend fun decode(payload: Payload): Pair { + val inp = ByteReadChannel(payload) + val length = inp.readInt() + val encoded = inp.copyAvailable() + return customDecode(encoded.sliceArray(0 until length)) to Int.SIZE_BYTES + length + } + + /** + * Custom deserialization function. + */ + protected abstract suspend fun customDecode(payload: Payload): T +} + +/** + * Binds [Pair] to `common/tuple`. + * + * @property codecOfA The codec of [A]. + * @property codecOfB The codec of [B]. + */ +public class PairCodec( + public val codecOfA: Codec, + public val codecOfB: Codec, +) : Codec> { + override val identity: String + get() = "common/pair<${codecOfA.identity},${codecOfB.identity}>" + + override suspend fun encode(value: Pair): Payload { + val out = ByteChannel(true) + val (a, b) = value + out.writeFully(codecOfA.encode(a)) + out.writeFully(codecOfB.encode(b)) + return out.copyAvailable() + } + + override suspend fun decode(payload: Payload): Pair, Int> { + val (a, lengthOfA) = codecOfA.decode(payload) + val (b, lengthOfB) = codecOfB.decode(payload.copyOfRange(lengthOfA, payload.size)) + return (a to b) to lengthOfA + lengthOfB + } +} + +/** + * Binds [Triple] to `common/tuple`. + * + * @property codecOfA The codec of [A]. + * @property codecOfB The codec of [B]. + * @property codecOfC The codec of [C]. + */ +public class TripleCodec( + public val codecOfA: Codec, + public val codecOfB: Codec, + public val codecOfC: Codec, +) : Codec> { + override val identity: String + get() = "common/triple<${codecOfA.identity},${codecOfB.identity},${codecOfC.identity}>" + + override suspend fun encode(value: Triple): Payload { + val out = ByteChannel(true) + val (a, b, c) = value + out.writeFully(codecOfA.encode(a)) + out.writeFully(codecOfB.encode(b)) + out.writeFully(codecOfC.encode(c)) + return out.copyAvailable() + } + + override suspend fun decode(payload: Payload): Pair, Int> { + var fragment = payload + val (a, lengthOfA) = codecOfA.decode(fragment) + fragment = fragment.sliceArray(lengthOfA until fragment.size) + val (b, lengthOfB) = codecOfB.decode(fragment) + fragment = fragment.sliceArray(lengthOfB until fragment.size) + val (c, lengthOfC) = codecOfC.decode(fragment) + return Triple(a, b, c) to lengthOfA + lengthOfB + lengthOfC + } +} + +/** + * Binds [List] of [Any] to `common/list`. + * + * @property codec The codec of elements. + */ +public class ListCodec(public val codec: Codec) : Codec> { + override val identity: String + get() = "common/list<${codec.identity}>" + + override suspend fun decode(payload: Payload): Pair, Int> { + val ch = ByteReadChannel(payload) + val count = ch.readInt() + var fragment = ch.copyAvailable() + val objects = mutableListOf() + var overallLength = Int.SIZE_BYTES + + repeat(count) { + println(fragment.decodeToString()) + val (decoded, length) = codec.decode(fragment) + objects += decoded + fragment = fragment.sliceArray(length until fragment.size) + overallLength += length + } + + return objects to overallLength + } + + override suspend fun encode(value: List): Payload { + val out = ByteChannel(true) + out.writeInt(value.size) + value.forEach { element -> out.writeFully(codec.encode(element)) } + return out.copyAvailable() + } +} + +/** + * Binds objects that can be serialized with [KSerializer] to `common/json`. + * + * @param T the type of decoded and encoded object. + * @property serializerOfT [KSerializer] of [T]. + * @property format The [Json] instance. + */ +public class JsonCodec constructor( + public val serializerOfT: KSerializer, + public val format: Json = Json, +) : Codec { + public constructor(classOfT: KClass, format: Json = Json) : this(classOfT.serializer(), format) + + override val identity: String + get() = "common/json" + + override suspend fun decode(payload: Payload): Pair { + val (str, length) = StringCodec.decode(payload) + return format.decodeFromString(serializerOfT, str) to length + } + + override suspend fun encode(value: T): Payload = StringCodec.encode(format.encodeToString(serializerOfT, value)) +} + +/** + * Constructs [JsonCodec]. + * + * @param T the type of decoded and encoded object. + * @param format the [Json] instance. + */ +public inline fun JsonCodec(format: Json = Json): JsonCodec = JsonCodec(T::class, format) + +/** + * Binds objects that can be serialized with [KSerializer] to `common/cbor`. + * + * @param T the type of decoded and encoded object. + * @property serializerOfT [KSerializer] of [T]. + * @property format The [Json] instance. + */ +public class CborCodec constructor( + public val serializerOfT: KSerializer, + public val format: Cbor = Cbor, +) : SizedCodec() { + public constructor(classOfT: KClass, format: Cbor = Cbor) : this(classOfT.serializer(), format) + + override val identity: String + get() = "common/cbor" + + override suspend fun customDecode(payload: Payload): T = format.decodeFromByteArray(serializerOfT, payload) + override suspend fun customEncode(value: T): Payload = format.encodeToByteArray(serializerOfT, value) +} + +/** + * Constructs [CborCodec]. + * + * @param T the type of decoded and encoded object. + * @property format The [Cbor] instance. + */ +public inline fun CborCodec(format: Cbor = Cbor): CborCodec = CborCodec(T::class, format) diff --git a/communicator-api/src/commonMain/kotlin/BasicCoders.kt b/communicator-api/src/commonMain/kotlin/BasicCoders.kt deleted file mode 100644 index 204e4d4..0000000 --- a/communicator-api/src/commonMain/kotlin/BasicCoders.kt +++ /dev/null @@ -1,167 +0,0 @@ -package space.kscience.communicator.api - -import io.ktor.utils.io.* -import io.ktor.utils.io.core.ByteReadPacket -import io.ktor.utils.io.core.readBytes - -private suspend fun ByteReadChannel.copyAvailable(): ByteArray = - ByteArray(availableForRead).also { readFully(it) } - -/** - * Binds [Int] to 4-byte [Payload]. - */ -public object IntCoder : Coder { - public override val identity: String - get() = "Int" - - public override suspend fun decode(payload: Payload): Int = ByteReadChannel(payload).readInt() - - public override suspend fun encode(value: Int): Payload = - ByteChannel(true).also { it.writeInt(value) }.copyAvailable() -} - -/** - * Binds [Long] to 8-byte [Payload]. - */ -public object LongCoder : Coder { - public override val identity: String - get() = "Long" - - public override suspend fun decode(payload: Payload): Long = ByteReadChannel(payload).readLong() - - public override suspend fun encode(value: Long): Payload = - ByteChannel(true).also { it.writeLong(value) }.copyAvailable() -} - -/** - * Binds [ULong] to 8-byte [Payload]. - */ -public object ULongCoder : Coder { - public override val identity: String - get() = "ULong" - - public override suspend fun decode(payload: Payload): ULong = ByteReadChannel(payload).readLong().toULong() - - public override suspend fun encode(value: ULong): Payload = - ByteChannel(true).also { it.writeLong(value.toLong()) }.copyAvailable() -} - -/** - * Binds [Double] to 8-byte [Payload]. - */ -public object DoubleCoder : Coder { - public override val identity: String - get() = "Double" - - public override suspend fun decode(payload: Payload): Double = ByteReadChannel(payload).readDouble() - - public override suspend fun encode(value: Double): Payload = - ByteChannel(true).also { it.writeDouble(value) }.copyAvailable() -} - -/** - * Binds [String] to payload of array of size N and int N before it. - */ -public object StringCoder : Coder { - public override val identity: String - get() = "String" - - public override suspend fun decode(payload: Payload): String { - val inp = ByteReadChannel(payload) - val len = inp.readInt() - return inp.readPacket(len).readText() - } - - public override suspend fun encode(value: String): Payload { - val out = ByteChannel(true) - out.writeInt(value.encodeToByteArray().size) - out.writePacket(ByteReadPacket(value.encodeToByteArray())) - return out.copyAvailable() - } -} - -/** - * Allows to create create coders which write the payload size before serialized value. - * - * @param T the type of decoded and encoded object. - */ -public abstract class SizedCoder : Coder { - public final override suspend fun encode(value: T): Payload { - val out = ByteChannel(true) - val encoded = customEncode(value) - out.writeInt(encoded.size) - out.writePacket(ByteReadPacket(encoded)) - return out.copyAvailable() - } - - /** - * Custom serialization function. - */ - public abstract suspend fun customEncode(value: T): Payload - - public final override suspend fun decode(payload: Payload): T { - val inp = ByteReadChannel(payload) - val length = inp.readInt() - val encoded = inp.readPacket(length).readBytes() - return customDecode(encoded) - } - - /** - * Custom deserialization function. - */ - public abstract suspend fun customDecode(payload: Payload): T -} - -/** - * Binds [Pair] to payload of two sections coded by provided pair of coders. - * - * @property firstCoder The coder of [A]. - * @property secondCoder The coder of [B]. - */ -public class PairCoder( - public val firstCoder: Coder, - public val secondCoder: Coder, -) : Coder> { - public override val identity: String - get() = "Pair<${firstCoder.identity}, ${secondCoder.identity}>" - - public override suspend fun encode(value: Pair): Payload = - firstCoder.encode(value.first) + secondCoder.encode(value.second) - - public override suspend fun decode(payload: Payload): Pair { - val v1 = firstCoder.decode(payload) - val v2 = secondCoder.decode(payload.copyOfRange(firstCoder.encode(v1).size, payload.size)) - return Pair(v1, v2) - } -} - -/** - * Binds [Triple] to payload of two sections coded by provided triple of coders. - * - * @property firstCoder The coder of [A]. - * @property secondCoder The coder of [B]. - * @property thirdCoder The coder of [C]. - */ -public class TripleCoder( - public val firstCoder: Coder, - public val secondCoder: Coder, - public val thirdCoder: Coder, -) : Coder> { - public override val identity: String - get() = "Triple<${firstCoder.identity}, ${secondCoder.identity}, ${thirdCoder.identity}>" - - public override suspend fun encode(value: Triple): Payload = firstCoder - .encode(value.first) - .plus(secondCoder.encode(value.second)) - .plus(thirdCoder.encode(value.third)) - - public override suspend fun decode(payload: Payload): Triple { - var start = 0 - val v1 = firstCoder.decode(payload) - start += firstCoder.encode(v1).size - val v2 = secondCoder.decode(payload.copyOfRange(start, payload.size)) - start += secondCoder.encode(v2).size - val v3 = thirdCoder.decode(payload.copyOfRange(start, payload.size)) - return Triple(v1, v2, v3) - } -} diff --git a/communicator-api/src/commonMain/kotlin/Codec.kt b/communicator-api/src/commonMain/kotlin/Codec.kt new file mode 100644 index 0000000..3945fe5 --- /dev/null +++ b/communicator-api/src/commonMain/kotlin/Codec.kt @@ -0,0 +1,78 @@ +package space.kscience.communicator.api + +/** + * Codec object exposed to the communicator API to bind objects to [Payload]. Also provides identity. + * + * @param T the type of decoded and encoded object. + */ +public interface Codec { + /** + * Identity of the codec is some data that is equal for the structurally equal codecs (codecs that work with equal + * types of data). + * Codecs on different processes (machines) are compared by their identity. + * Identity may be called "hash code", but this property has completely different purposes than [Any.hashCode]. + */ + public val identity: String + + /** + * Decodes payload to object. + * + * @param payload the payload. + * @return the decoded object plus the quantity of bytes read from [payload]. + */ + public suspend fun decode(payload: Payload): Pair + + /** + * Encodes object to payload. + * + * @param value the object. + * @return the payload. + */ + public suspend fun encode(value: T): Payload +} + +/** + * Returned function will wrap serialization exceptions into [CodecException], + * and will throw receiver function's exceptions as-is. + */ +public fun (suspend (T) -> R).toBinary( + argumentCodec: Codec, + resultCodec: Codec, +): PayloadFunction = { bin -> + val (arg, _) = try { + argumentCodec.decode(bin) + } catch (ex: Exception) { + throw DecodingException(bin, argumentCodec, ex) + } + + val res = invoke(arg) + + try { + resultCodec.encode(res) + } catch (ex: Exception) { + throw EncodingException(res, resultCodec, ex) + } +} + +/** + * Returned function will wrap serialization exceptions into [CodecException], + * and will throw receiver function's exceptions as-is. + */ +public fun PayloadFunction.toFunction( + argumentCodec: Codec, + resultCodec: Codec, +): (suspend (T) -> R) = { arg -> + val bin = try { + argumentCodec.encode(arg) + } catch (ex: Exception) { + throw EncodingException(arg, argumentCodec, ex) + } + + val res = invoke(bin) + + try { + resultCodec.decode(res).first + } catch (ex: Exception) { + throw DecodingException(res, resultCodec, ex) + } +} diff --git a/communicator-api/src/commonMain/kotlin/Coder.kt b/communicator-api/src/commonMain/kotlin/Coder.kt deleted file mode 100644 index 381c8bc..0000000 --- a/communicator-api/src/commonMain/kotlin/Coder.kt +++ /dev/null @@ -1,32 +0,0 @@ -package space.kscience.communicator.api - -/** - * Coder object exposed to the communicator API to bind objects to [Payload]. Also provides identity. - * - * @param T the type of decoded and encoded object. - */ -public interface Coder { - /** - * Identity of the coder is some data that is equal for the structurally equal coders - * (coders that work with equal types of data). - * Coders on different processes (machines) are compared by their identity. - * Identity may be called "hash code", but this property has completely different purposes than [Any.hashCode]. - */ - public val identity: String - - /** - * Decodes payload to object. - * - * @param payload the payload. - * @return the decoded object. - */ - public suspend fun decode(payload: Payload): T - - /** - * Encodes object to payload. - * - * @param value the object. - * @return the payload. - */ - public suspend fun encode(value: T): Payload -} diff --git a/communicator-api/src/commonMain/kotlin/Endpoints.kt b/communicator-api/src/commonMain/kotlin/Endpoints.kt index 703d3f1..7dd4221 100644 --- a/communicator-api/src/commonMain/kotlin/Endpoints.kt +++ b/communicator-api/src/commonMain/kotlin/Endpoints.kt @@ -1,13 +1,30 @@ package space.kscience.communicator.api /** - * Represents communicator client end-point, i.e. pair of protocol and address. + * Represents communicator client end-point i.e., a triple of protocol, host, and port. * - * @property protocol The transport protocol identifier. - * @property host The host component. - * @property port The port component. + * @property host The host. */ -public data class ClientEndpoint(val protocol: String, val host: String, val port: Int) +public class ClientEndpoint(protocol: String, public val host: String, port: Int) : + ServerEndpoint(protocol, port, false) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is ClientEndpoint) return false + if (!super.equals(other)) return false + + if (host != other.host) return false + + return true + } + + override fun hashCode(): Int { + var result = super.hashCode() + result = 31 * result + host.hashCode() + return result + } + + override fun toString(): String = "ClientEndpoint(host='$host') ${super.toString()}" +} /** * Creates [ClientEndpoint] by given protocol identifier and address string. @@ -18,12 +35,41 @@ public fun ClientEndpoint(protocol: String, address: String): ClientEndpoint = ClientEndpoint(protocol, address.split(":")[0], address.split(":")[1].toInt()) /** - * Represents communicator server end-point, i.e. pair of protocol and port. + * Represents communicator server end-point i.e., a pair of protocol and port. * - * @property protocol The transport protocol identifier. - * @property port The port component. + * @property protocol The identifier of the transport protocol. + * @property port The port. + */ +public open class ServerEndpoint internal constructor( + public val protocol: String, + public open val port: Int, + @Suppress("UNUSED_PARAMETER") dummy: Boolean, +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is ServerEndpoint) return false + + if (protocol != other.protocol) return false + if (port != other.port) return false + + return true + } + + override fun hashCode(): Int { + var result = protocol.hashCode() + result = 31 * result + port + return result + } + + override fun toString(): String = "ServerEndpoint(protocol='$protocol', port=$port)" +} + +/** + * Creates [ServerEndpoint] with given [protocol] and [port]. */ -public data class ServerEndpoint(val protocol: String, val port: Int) +public fun ServerEndpoint(protocol: String, port: Int): ServerEndpoint = ServerEndpoint(protocol, port, false) + + /** * Creates [ServerEndpoint] by given protocol identifier and address string. * @@ -32,8 +78,3 @@ public data class ServerEndpoint(val protocol: String, val port: Int) */ public fun ServerEndpoint(protocol: String, address: String): ServerEndpoint = ServerEndpoint(protocol, address.split(":")[1].toInt()) - -/** - * Drops host from the given [ClientEndpoint] to create [ServerEndpoint]. - */ -public fun ClientEndpoint.toServerEndpoint(): ServerEndpoint = ServerEndpoint(protocol, port) diff --git a/communicator-api/src/commonMain/kotlin/Exceptions.kt b/communicator-api/src/commonMain/kotlin/Exceptions.kt index 5307ec4..1a3dbbc 100644 --- a/communicator-api/src/commonMain/kotlin/Exceptions.kt +++ b/communicator-api/src/commonMain/kotlin/Exceptions.kt @@ -1,35 +1,37 @@ package space.kscience.communicator.api /** - * Base class for RemoteCall library exceptions. - * These exceptions can be thrown by the suspend function returned by [FunctionClient.getFunction]. - * [IncompatibleSpecsException] can also be thrown by [FunctionServer.register] if it connects to a proxy as a worker, - * and that proxy already has a worker with a different coder. + * Base class for library's exceptions. These exceptions can be thrown by the `suspend` function returned by + * [FunctionClient.getFunction]. [IncompatibleSpecsException] can also be thrown by [FunctionServer.register] if it + * connects to a proxy as a worker, and that proxy already has a worker with a different codec. */ -public sealed class RemoteCallException : Exception() +public sealed class RemoteCallException : Exception { + protected constructor() : super() + protected constructor(message: String?) : super(message) + protected constructor (message: String?, cause: Throwable?) : super(message, cause) + protected constructor(cause: Throwable?) : super(cause) +} /** - * This exception is thrown on the client if its coders have a different identities than the coders on the remote server. - * This exception can also be thrown on the worker, if it tries to connect to a proxy, - * and proxy already has a worker for the same function but with different coder identities. - * Spec strings are the strings received from [FunctionSpec.toString] method, which uses [Coder.toString]. + * This exception is thrown on the client if its codecs have a different identities than the codecs on the remote + * server. This exception can also be thrown on the worker, if it tries to connect to a proxy, and proxy already has a + * worker for the same function but with different codec identities. */ public class IncompatibleSpecsException( public val functionName: String, public val localSpec: String, public val remoteSpec: String ) : RemoteCallException() { - public override val message: String + override val message: String get() = """Remote server has different spec for the function. Function name: "$functionName" Local spec: $localSpec Remote spec: $remoteSpec """ - } public class UnsupportedFunctionNameException(public val functionName: String) : RemoteCallException() { - public override val message: String + override val message: String get() = "Server does not support a function with name $functionName. " + "If you are using a proxy server, please make sure that required worker " + "is connected to the proxy before making query." @@ -40,7 +42,7 @@ public class UnsupportedFunctionNameException(public val functionName: String) : * and the client retried the query enough times. */ public class TimeoutException : RemoteCallException() { - public override val message: String + override val message: String get() = "Timeout for the query has ended." } @@ -50,47 +52,52 @@ public class TimeoutException : RemoteCallException() { * Worker logs its exception, but does not throw it from [FunctionServer] methods. */ public class RemoteFunctionException(public val remoteExceptionMessage: String) : RemoteCallException() { - public override val message: String + override val message: String get() = "Remote function has finished with an exception: $remoteExceptionMessage" } /** * Base class for serialization/deserialization exception. - * These exceptions are wrappers for exceptions thrown by [Coder.encode] or [Coder.decode]. - * If the [CoderException] happened on the remote server, it will be delivered to the client inside [RemoteFunctionException]. + * These exceptions are wrappers for exceptions thrown by [Codec.encode] or [Codec.decode]. + * If the [CodecException] happened on the remote server, it will be delivered to the client inside [RemoteFunctionException]. */ -public sealed class CoderException : RemoteCallException() +public sealed class CodecException : RemoteCallException { + protected constructor() : super() + protected constructor(message: String?) : super(message) + protected constructor (message: String?, cause: Throwable?) : super(message, cause) + protected constructor(cause: Throwable?) : super(cause) +} /** - * This exception is thrown if the coder can't serialize the object. + * This exception is thrown if the codec can't serialize the object. */ public class EncodingException( public val obj: Any?, - public val coder: Coder<*>, - public val coderExceptionMessage: String -) : CoderException() { + public val codec: Codec<*>, + cause: Throwable, +) : CodecException(cause) { override val message: String get() = """Object serialization exception. Object: $obj - Coder: $coder - Coder identity: ${coder.identity} - Exception message: $coderExceptionMessage + Codec: $codec + Codec identity: ${codec.identity} + Exception message: ${cause?.message} """ } /** - * This exception is thrown if the coder can't deserialize the payload. + * This exception is thrown if the codec can't deserialize the payload. */ public class DecodingException( public val payload: Payload, - public val coder: Coder<*>, - public val coderExceptionMessage: String -) : CoderException() { - public override val message: String + public val codec: Codec<*>, + cause: Throwable, +) : CodecException(cause) { + override val message: String get() = """Payload deserialization exception. Payload: ${payload.contentToString()} - Coder: $coder - Coder identity: ${coder.identity} - Exception message: $coderExceptionMessage + Codec: $codec + Codec identity: ${codec.identity} + Exception message: ${cause?.message} """ } diff --git a/communicator-api/src/commonMain/kotlin/FunctionClient.kt b/communicator-api/src/commonMain/kotlin/FunctionClient.kt index c8d413a..95d737d 100644 --- a/communicator-api/src/commonMain/kotlin/FunctionClient.kt +++ b/communicator-api/src/commonMain/kotlin/FunctionClient.kt @@ -5,32 +5,51 @@ import kotlin.properties.ReadOnlyProperty /** * Represents function client that is able to provide named functions for given endpoint and specification. + * + * Implementation is based on [TransportClient] objects provided by [TransportFactory]. */ -public interface FunctionClient : Closeable { +public class FunctionClient(internal val factory: TransportFactory) : Closeable { + internal val transportCache: MutableMap = hashMapOf() + /** * Constructs a `suspend` function that calls the function server. * * @param endpoint the endpoint of server. * @param name the name of function. - * @param spec the spec of function. + * @param argumentCodec the codec of [T]. + * @param resultCodec the codec of [R]. + * @return the result function. */ - public fun getFunction(endpoint: ClientEndpoint, name: String, spec: FunctionSpec): suspend (T) -> R + public fun getFunction( + endpoint: ClientEndpoint, + name: String, + argumentCodec: Codec, + resultCodec: Codec, + ): suspend (T) -> R = transportCache + .getOrPut(endpoint.protocol) { + factory.client(endpoint.protocol) ?: error("Protocol ${endpoint.protocol} is not supported by this client.") + } + .channel(endpoint.host, endpoint.port, name) + .toFunction(argumentCodec, resultCodec) /** * Disposes this function client. */ - override fun close() + override fun close(): Unit = transportCache.values.forEach(TransportClient::close) } /** * Returns object that uses [FunctionClient.getFunction] to receive function object. The name of function is equal * to name of property. * + * @receiver the client to get function from. * @param endpoint the endpoint of server. - * @param spec the spec of function. + * @param argumentCodec the codec of [T]. + * @param resultCodec the codec of [R]. */ -public fun function( +public fun FunctionClient.function( endpoint: ClientEndpoint, - spec: FunctionSpec -): ReadOnlyProperty R> = - ReadOnlyProperty { thisRef, property -> thisRef.getFunction(endpoint, property.name, spec) } + argumentCodec: Codec, + resultCodec: Codec, +): ReadOnlyProperty R> = + ReadOnlyProperty { _, property -> getFunction(endpoint, property.name, argumentCodec, resultCodec) } diff --git a/communicator-api/src/commonMain/kotlin/FunctionServer.kt b/communicator-api/src/commonMain/kotlin/FunctionServer.kt index f72e107..94b4cc9 100644 --- a/communicator-api/src/commonMain/kotlin/FunctionServer.kt +++ b/communicator-api/src/commonMain/kotlin/FunctionServer.kt @@ -1,16 +1,24 @@ package space.kscience.communicator.api import io.ktor.utils.io.core.Closeable +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract /** - * Represents communicator function server that is able to register and unregister functions to serve them at several - * endpoints. + * Function server implementation based on [TransportServer] objects provided by [TransportFactory]. + * + * @property endpoints The set of endpoints this class serves. */ -public interface FunctionServer : Closeable { - /** - * The set of endpoints this object serves. - */ - public val endpoints: Set +public class FunctionServer( + private val factory: TransportFactory = TransportFactory, + public val endpoints: Set, +) : Closeable { + public constructor(factory: TransportFactory, vararg endpoints: ServerEndpoint) : + this(factory, endpoints.toSet()) + + internal val transportServers: List = endpoints.map { (protocol, port) -> + factory.server(protocol, port) ?: error("Protocol $protocol is not supported.") + } /** * Registers a function in this server. @@ -18,25 +26,43 @@ public interface FunctionServer : Closeable { * @param T the type the function takes. * @param R the type the function returns. * @param name the name of function. - * @param spec the spec of function. + * @param argumentCodec the codec of [T]. + * @param resultCodec the codec of [R]. * @param function the function implementation. * @return the function implementation. */ - public fun register( + public fun register( name: String, - spec: FunctionSpec, - function: suspend (T) -> R - ) + argumentCodec: Codec, + resultCodec: Codec, + function: suspend (T) -> R, + ) { + val payloadFunction = function.toBinary(argumentCodec, resultCodec) + transportServers.forEach { it.register(name, payloadFunction, argumentCodec, resultCodec) } + } /** * Unregisters a function from this server. * * @param name the name of function. */ - public fun unregister(name: String) + public fun unregister(name: String): Unit = transportServers.forEach { it.unregister(name) } /** * Stops and disposes this function server. */ - public override fun close() + override fun close(): Unit = transportServers.forEach(TransportServer::close) +} + +/** + * Constructs [FunctionServer] in the context of [S]. + */ +public inline fun FunctionServer( + set: S, + factory: TransportFactory, + action: S.(FunctionServer) -> Unit, +): FunctionServer where S : FunctionSet { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } + val t = FunctionServer(factory, set.endpoint.toServerEndpoint()) + return t.configure(set, action) } diff --git a/communicator-api/src/commonMain/kotlin/FunctionSet.kt b/communicator-api/src/commonMain/kotlin/FunctionSet.kt index 684b03b..6641713 100644 --- a/communicator-api/src/commonMain/kotlin/FunctionSet.kt +++ b/communicator-api/src/commonMain/kotlin/FunctionSet.kt @@ -11,23 +11,33 @@ import kotlin.reflect.KProperty * * @property endpoint The endpoint of all functions in this set. */ -public abstract class FunctionSet(public val endpoint: ClientEndpoint) { - internal val functions: MutableMap> = hashMapOf() +public open class FunctionSet(public val endpoint: ClientEndpoint) { + internal val functions: MutableMap, Codec<*>>> = hashMapOf() /** * Represents a declaration of function in the set. * + * @param T the type the function takes. + * @param R the type the function returns. * @property owner The set where the function was declared. * @property name Tha name of function. - * @property spec The spec of the function. */ @Suppress("UNCHECKED_CAST") - public data class Declaration internal constructor( + public data class Declaration internal constructor( val owner: FunctionSet, val name: String, ) { - val spec: FunctionSpec - get() = owner.functions[name] as FunctionSpec + /** + * The codec of [T]. + */ + val argumentCodec: Codec + get() = checkNotNull(owner.functions[name]) { "Can't find function $name in $owner." }.first as Codec + + /** + * The codec of [R]. + */ + val resultCodec: Codec + get() = checkNotNull(owner.functions[name]) { "Can't find function $name in $owner." }.second as Codec } /** @@ -36,15 +46,20 @@ public abstract class FunctionSet(public val endpoint: ClientEndpoint) { * @param T the type the function takes. * @param R the type the function returns. * @param name the name of function. - * @param spec the spec of function. + * @param argumentCodec the codec of [T]. + * @param resultCodec the codec of [R]. * @return a new declaration object. */ - public fun declare(name: String, spec: FunctionSpec): Declaration { - functions[name] = spec + public fun declare( + name: String, + argumentCodec: Codec, + resultCodec: Codec, + ): Declaration { + functions[name] = argumentCodec to resultCodec return Declaration(this, name) } - public override fun toString(): String = "FunctionSet(endpoint='$endpoint', functions=$functions)" + override fun toString(): String = "FunctionSet(endpoint='$endpoint', functions=$functions)" } /** @@ -57,16 +72,17 @@ public abstract class FunctionSet(public val endpoint: ClientEndpoint) { * @return the function invoked by client. */ @Suppress("UNCHECKED_CAST") -public operator fun FunctionSet.getValue( +public operator fun FunctionSet.getValue( thisRef: FunctionClient, property: KProperty<*>, ): suspend (T) -> R { val name = property.name - val spec = - checkNotNull(functions[name] as? FunctionSpec) { "Cannot find function $name in function set $this." } + val (argumentCodec, resultCodec) = checkNotNull(functions[name] as? Pair, Codec>) { + "Cannot find function $name in function set $this." + } - return thisRef.getFunction(endpoint, property.name, spec) + return thisRef.getFunction(endpoint, property.name, argumentCodec, resultCodec) } /** @@ -78,52 +94,30 @@ public operator fun FunctionSet.getValue( * @param property the property. * @return the function invoked by client. */ -public operator fun FunctionSet.Declaration.getValue( +public operator fun FunctionSet.Declaration.getValue( thisRef: FunctionClient, property: KProperty<*>, ): suspend (T) -> R = owner.getValue(thisRef, property) -/** - * Creates a function, stores it and return declaration object pointing to this set. - * - * @param T the type the function takes. - * @param R the type the function returns. - * @param nameToSpec pair of the name and the spec of function. - * @return a new declaration object. - */ -public fun FunctionSet.declare(nameToSpec: Pair>): FunctionSet.Declaration = - declare(nameToSpec.first, nameToSpec.second) - /** * Returns [PropertyDelegateProvider] providing [FunctionSet.Declaration] objects by using given spec and name of the * property. * * @param T the type the function takes. * @param R the type the function returns. - * @param spec the spec of function. + * @param argumentCodec the codec of [T]. + * @param resultCodec the codec of [R]. * @return a new declaration object. */ -public fun declare(spec: FunctionSpec): PropertyDelegateProvider>> = +public fun declare( + argumentCodec: Codec, + resultCodec: Codec, +): PropertyDelegateProvider>> = PropertyDelegateProvider { thisRef, property -> - val d = thisRef.declare(property.name to spec) + val d = thisRef.declare(property.name, argumentCodec, resultCodec) ReadOnlyProperty { _, _ -> d } } -/** - * Returns [PropertyDelegateProvider] providing [FunctionSet.Declaration] objects by using given spec and name of the - * property. - * - * @param T the type the function takes. - * @param R the type the function returns. - * @param spec the spec of function. - * @return a new declaration object. - */ -public fun declare( - argumentCoder: Coder, - resultCoder: Coder, -): PropertyDelegateProvider>> = - declare(FunctionSpec(argumentCoder, resultCoder)) - /** * Registers a function in [FunctionServer] by its implementation and declaration. Warning, endpoint should be added to * the function server independently. @@ -133,11 +127,11 @@ public fun declare( * @param function the function's implementation. * @return [function]. */ -public fun FunctionServer.impl( +public fun FunctionServer.impl( declaration: FunctionSet.Declaration, function: suspend (T) -> R, ): suspend (T) -> R { - register(declaration.name, declaration.spec, function) + register(declaration.name, declaration.argumentCodec, declaration.resultCodec, function) return function } @@ -152,21 +146,23 @@ public fun FunctionServer.impl( * @return the result of the function. */ @Suppress("RedundantSuspendModifier") -public suspend operator fun FunctionSet.Declaration.invoke(client: FunctionClient, arg: T): R = - client.getFunction(owner.endpoint, name, spec)(arg) +public suspend operator fun FunctionSet.Declaration.invoke(client: FunctionClient, arg: T): R = + client.getFunction(owner.endpoint, name, argumentCodec, resultCodec)(arg) /** * Configures this function server with provided function set receiver. It is usually needed to provide functions to * the server with [impl]. * * @receiver the function server. - * @param F the type of function server. * @param S the type of function set. * @param set the set object. * @param action the lambda to apply. * @return this function server. */ -public inline fun F.configure(set: S, action: S.(F) -> Unit): F where F : FunctionServer, S : FunctionSet { +public inline fun FunctionServer.configure( + set: S, + action: S.(FunctionServer) -> Unit, +): FunctionServer where S : FunctionSet { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } require(set.endpoint.toServerEndpoint() in endpoints) { diff --git a/communicator-api/src/commonMain/kotlin/FunctionSpec.kt b/communicator-api/src/commonMain/kotlin/FunctionSpec.kt deleted file mode 100644 index d1ff060..0000000 --- a/communicator-api/src/commonMain/kotlin/FunctionSpec.kt +++ /dev/null @@ -1,51 +0,0 @@ -package space.kscience.communicator.api - -/** - * Represents typed specification of function. - * - * @param T the type of argument. - * @param R the type of result. - * @property argumentCoder the coder of [T]. - * @property resultCoder the coder of [R]. - */ -public data class FunctionSpec(val argumentCoder: Coder, val resultCoder: Coder) - -/** - * Returned function will wrap serialization exceptions into [CoderException], - * and will throw receiver function's exceptions as-is. - */ -public fun (suspend (T) -> R).toBinary(spec: FunctionSpec): PayloadFunction = { bin -> - val arg = try { - spec.argumentCoder.decode(bin) - } catch (ex: Exception) { - throw DecodingException(bin, spec.argumentCoder, ex.message.orEmpty()) - } - - val res = invoke(arg) - - try { - spec.resultCoder.encode(res) - } catch (ex: Exception) { - throw EncodingException(res, spec.resultCoder, ex.message.orEmpty()) - } -} - -/** - * Returned function will wrap serialization exceptions into [CoderException], - * and will throw receiver function's exceptions as-is. - */ -public fun PayloadFunction.toFunction(spec: FunctionSpec): (suspend (T) -> R) = { arg -> - val bin = try { - spec.argumentCoder.encode(arg) - } catch (ex: Exception) { - throw EncodingException(arg, spec.argumentCoder, ex.message.orEmpty()) - } - - val res = invoke(bin) - - try { - spec.resultCoder.decode(res) - } catch (ex: Exception) { - throw DecodingException(res, spec.resultCoder, ex.message.orEmpty()) - } -} diff --git a/communicator-api/src/commonMain/kotlin/TransportClient.kt b/communicator-api/src/commonMain/kotlin/TransportClient.kt index a70722b..16aed64 100644 --- a/communicator-api/src/commonMain/kotlin/TransportClient.kt +++ b/communicator-api/src/commonMain/kotlin/TransportClient.kt @@ -9,7 +9,8 @@ public interface TransportClient : Closeable { /** * Communicates with endpoint by transceiving a payload. * - * @param address the address to channel. + * @param host the host to channel. + * @param port the port to channel. * @param name the name of function. * @param payload the payload to send. * @return the received payload. @@ -19,7 +20,8 @@ public interface TransportClient : Closeable { /** * Returns a payload function channeling this transport. * - * @param address the address to channel. + * @param host the host to channel. + * @param port the port to channel. * @param name the name of function. * @return the freshly created function. */ @@ -29,5 +31,5 @@ public interface TransportClient : Closeable { /** * Disposes this transport. */ - public override fun close() + override fun close() } diff --git a/communicator-api/src/commonMain/kotlin/TransportClientFactory.kt b/communicator-api/src/commonMain/kotlin/TransportClientFactory.kt deleted file mode 100644 index 017e094..0000000 --- a/communicator-api/src/commonMain/kotlin/TransportClientFactory.kt +++ /dev/null @@ -1,15 +0,0 @@ -package space.kscience.communicator.api - -/** - * Creates [TransportClient] for given protocol identifier. - */ -public fun interface TransportClientFactory { - /** - * Returns [TransportClient] for given protocol identifier. - */ - public operator fun get(protocol: String): TransportClient? - - public companion object : TransportClientFactory { - public override fun get(protocol: String): TransportClient? = null - } -} diff --git a/communicator-api/src/commonMain/kotlin/TransportFactory.kt b/communicator-api/src/commonMain/kotlin/TransportFactory.kt new file mode 100644 index 0000000..2d3203b --- /dev/null +++ b/communicator-api/src/commonMain/kotlin/TransportFactory.kt @@ -0,0 +1,21 @@ +package space.kscience.communicator.api + +/** + * Creates [TransportClient] and [TransportServer] objects for given protocol. + */ +public interface TransportFactory { + /** + * Returns [TransportClient] for given protocol identifier. + */ + public fun client(protocol: String): TransportClient? + + /** + * Returns [TransportServer] for given port and protocol. + */ + public fun server(protocol: String, port: Int): TransportServer? + + public companion object : TransportFactory { + override fun client(protocol: String): TransportClient? = null + override fun server(protocol: String, port: Int): TransportServer? = null + } +} diff --git a/communicator-api/src/commonMain/kotlin/TransportFunctionClient.kt b/communicator-api/src/commonMain/kotlin/TransportFunctionClient.kt deleted file mode 100644 index 644714e..0000000 --- a/communicator-api/src/commonMain/kotlin/TransportFunctionClient.kt +++ /dev/null @@ -1,22 +0,0 @@ -package space.kscience.communicator.api - -/** - * Function client implementation based on [TransportClient] objects provided by [TransportClientFactory]. - */ -public class TransportFunctionClient(private val factory: TransportClientFactory) : - FunctionClient { - private val transportCache: MutableMap = hashMapOf() - - public override fun getFunction( - endpoint: ClientEndpoint, - name: String, - spec: FunctionSpec, - ): suspend (T) -> R = transportCache - .getOrPut(endpoint.protocol) { - factory[endpoint.protocol] ?: error("Protocol ${endpoint.protocol} is not supported by this client.") - } - .channel(endpoint.host, endpoint.port, name) - .toFunction(spec) - - public override fun close(): Unit = transportCache.values.forEach(TransportClient::close) -} diff --git a/communicator-api/src/commonMain/kotlin/TransportFunctionServer.kt b/communicator-api/src/commonMain/kotlin/TransportFunctionServer.kt deleted file mode 100644 index 44bae4b..0000000 --- a/communicator-api/src/commonMain/kotlin/TransportFunctionServer.kt +++ /dev/null @@ -1,41 +0,0 @@ -package space.kscience.communicator.api - -import kotlin.contracts.InvocationKind -import kotlin.contracts.contract - -/** - * Function server implementation based on [TransportServer] objects provided by [TransportServerFactory]. - */ -public class TransportFunctionServer( - private val factory: TransportServerFactory = TransportServerFactory, - public override val endpoints: Set, -) : FunctionServer { - public constructor(factory: TransportServerFactory, vararg endpoints: ServerEndpoint) : - this(factory, endpoints.toSet()) - - private val transportServers: List = endpoints.map { (protocol, port) -> - factory[protocol, port] ?: error("Protocol $protocol is not supported.") - } - - public override fun register( - name: String, - spec: FunctionSpec, - function: suspend (T) -> R, - ) { - val payloadFunction = function.toBinary(spec) - transportServers.forEach { it.register(name, payloadFunction, spec) } - } - - public override fun unregister(name: String): Unit = transportServers.forEach { it.unregister(name) } - public override fun close(): Unit = transportServers.forEach(TransportServer::close) -} - -public inline fun TransportFunctionServer( - set: S, - factory: TransportServerFactory, - action: S.(TransportFunctionServer) -> Unit, -): TransportFunctionServer where S : FunctionSet { - contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } - val t = TransportFunctionServer(factory, set.endpoint.toServerEndpoint()) - return t.configure(set, action) -} diff --git a/communicator-api/src/commonMain/kotlin/TransportServer.kt b/communicator-api/src/commonMain/kotlin/TransportServer.kt index a0b2f36..c9dcc25 100644 --- a/communicator-api/src/commonMain/kotlin/TransportServer.kt +++ b/communicator-api/src/commonMain/kotlin/TransportServer.kt @@ -17,7 +17,7 @@ public interface TransportServer : Closeable { * @param name the name of function. * @param function the implementation of function. */ - public fun register(name: String, function: PayloadFunction, spec: FunctionSpec<*, *>) + public fun register(name: String, function: PayloadFunction, argumentCodec: Codec<*>, resultCodec: Codec<*>) /** * Unregisters function by its name. diff --git a/communicator-api/src/commonMain/kotlin/TransportServerFactory.kt b/communicator-api/src/commonMain/kotlin/TransportServerFactory.kt deleted file mode 100644 index f2de262..0000000 --- a/communicator-api/src/commonMain/kotlin/TransportServerFactory.kt +++ /dev/null @@ -1,15 +0,0 @@ -package space.kscience.communicator.api - -/** - * Creates [TransportServer] for given port and protocol. - */ -public fun interface TransportServerFactory { - /** - * Returns [TransportServer] for given port and protocol. - */ - public operator fun get(protocol: String, port: Int): TransportServer? - - public companion object : TransportServerFactory { - public override fun get(protocol: String, port: Int): TransportServer? = null - } -} diff --git a/communicator-api/src/jvmTest/kotlin/CodersTest.kt b/communicator-api/src/jvmTest/kotlin/CodecsTest.kt similarity index 50% rename from communicator-api/src/jvmTest/kotlin/CodersTest.kt rename to communicator-api/src/jvmTest/kotlin/CodecsTest.kt index b257087..69e51c0 100644 --- a/communicator-api/src/jvmTest/kotlin/CodersTest.kt +++ b/communicator-api/src/jvmTest/kotlin/CodecsTest.kt @@ -1,28 +1,30 @@ package space.kscience.communicator.api import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable +import kotlinx.serialization.serializer import kotlin.math.PI import kotlin.test.Test import kotlin.test.assertEquals -internal class CodersTest { - private suspend fun testEncodeAndDecode(coder: Coder, vararg values: T) = values.forEach { - assertEquals(it, coder.decode(coder.encode(it))) +internal class CodecsTest { + private suspend fun testEncodeAndDecode(codec: Codec, vararg values: T) = values.forEach { + assertEquals(it, codec.decode(codec.encode(it)).first) } @Test - fun int() = runBlocking { testEncodeAndDecode(IntCoder, 0, 1, Int.MAX_VALUE, Int.MIN_VALUE, 42) } + fun int() = runBlocking { testEncodeAndDecode(IntCodec, 0, 1, Int.MAX_VALUE, Int.MIN_VALUE, 42) } @Test - fun long() = runBlocking { testEncodeAndDecode(LongCoder, 0L, 1L, Long.MAX_VALUE, Long.MIN_VALUE, 42L) } + fun long() = runBlocking { testEncodeAndDecode(LongCodec, 0L, 1L, Long.MAX_VALUE, Long.MIN_VALUE, 42L) } @Test - fun ulong() = runBlocking { testEncodeAndDecode(ULongCoder, 0uL, 1uL, ULong.MAX_VALUE, 42uL) } + fun ulong() = runBlocking { testEncodeAndDecode(ULongCodec, 0uL, 1uL, ULong.MAX_VALUE, 42uL) } @Test fun double() = runBlocking { testEncodeAndDecode( - DoubleCoder, + DoubleCodec, 0.0, 1.0, Double.MAX_VALUE, @@ -37,13 +39,13 @@ internal class CodersTest { @Test fun string() = runBlocking { - testEncodeAndDecode(StringCoder, "", "42", "bla".repeat(1000), "语言处理", "добрый вечер") + testEncodeAndDecode(StringCodec, "", "42", "bla".repeat(1000), "语言处理", "добрый вечер") } @Test fun pair() = runBlocking { testEncodeAndDecode( - PairCoder(IntCoder, StringCoder), + PairCodec(IntCodec, StringCodec), 42 to "ku", 21 to "yyy", Int.MAX_VALUE to "bla".repeat(100), @@ -53,7 +55,7 @@ internal class CodersTest { @Test fun triple() = runBlocking { testEncodeAndDecode( - TripleCoder(IntCoder, StringCoder, LongCoder), + TripleCodec(IntCodec, StringCodec, LongCodec), Triple(42, "ku", 323L), Triple(21, "yyy", 13L), Triple(Int.MAX_VALUE, "bla".repeat(100), 2302131233L), @@ -61,28 +63,52 @@ internal class CodersTest { } @Test - fun custom() = runBlocking { + fun sized() = runBlocking { data class SomeClass(val int: Int, val double: Double) - val coder = object : SizedCoder() { - private val coder = PairCoder(IntCoder, DoubleCoder) + val codec = object : SizedCodec() { + private val codec = PairCodec(IntCodec, DoubleCodec) override val identity: String get() = "SomeClass" - override suspend fun customEncode(value: SomeClass): Payload = coder.encode(value.int to value.double) + override suspend fun customEncode(value: SomeClass): Payload = codec.encode(value.int to value.double) override suspend fun customDecode(payload: Payload): SomeClass { - val (a, b) = coder.decode(payload) + val (decoded, _) = codec.decode(payload) + val (a, b) = decoded return SomeClass(a, b) } } testEncodeAndDecode( - coder, + codec, SomeClass(42, 42.0), SomeClass(-2100000, Double.NaN), SomeClass(Int.MAX_VALUE, 21341234.0), ) } + + @Test + fun tuple() = runBlocking { + testEncodeAndDecode(TupleCodec(listOf(IntCodec, LongCodec, StringCodec)), listOf(1, 2L, "123")) + } + + @Test + fun list() = runBlocking { + testEncodeAndDecode(ListCodec(StringCodec), listOf("12341", "awerjhq", "9324148hrwst")) + } + + @Serializable + data class MyData(val a: Int, val b: String, val c: Set) + + @Test + fun json() = runBlocking { + testEncodeAndDecode(JsonCodec(), MyData(1, "2134oi5", setOf(1, 2, 3))) + } + + @Test + fun cbor() = runBlocking { + testEncodeAndDecode(JsonCodec(), MyData(1, "2134oi5", setOf(1, 2, 3))) + } } diff --git a/communicator-zmq/src/commonMain/kotlin/FactorySupport.kt b/communicator-zmq/src/commonMain/kotlin/FactorySupport.kt index 13e2b0d..88fec00 100644 --- a/communicator-zmq/src/commonMain/kotlin/FactorySupport.kt +++ b/communicator-zmq/src/commonMain/kotlin/FactorySupport.kt @@ -1,20 +1,17 @@ package space.kscience.communicator.zmq -import space.kscience.communicator.api.TransportClientFactory -import space.kscience.communicator.api.TransportServerFactory +import space.kscience.communicator.api.* import space.kscience.communicator.zmq.client.ZmqTransportClient import space.kscience.communicator.zmq.server.ZmqTransportServer /** - * Creates a new [TransportClientFactory] which handles ZMQ protocol name by binding it [ZmqTransportClient]. + * Creates a new [TransportFactory] which handles ZMQ protocol name by binding it to [ZmqTransportClient] and + * [ZmqTransportServer]. */ -public fun TransportClientFactory.withZmq(): TransportClientFactory = TransportClientFactory { - if (it == "ZMQ") ZmqTransportClient() else this[it] -} +public fun TransportFactory.zmq(): TransportFactory = object : TransportFactory { + override fun client(protocol: String): TransportClient? = + if (protocol == "ZMQ") ZmqTransportClient() else this@zmq.client(protocol) -/** - * Creates a new [TransportServerFactory] which handles ZMQ protocol name by binding it [ZmqTransportServer]. - */ -public fun TransportServerFactory.withZmq(): TransportServerFactory = TransportServerFactory { protocol, port -> - if (protocol == "ZMQ") ZmqTransportServer(port) else this[protocol, port] + override fun server(protocol: String, port: Int): TransportServer? = + if (protocol == "ZMQ") ZmqTransportServer(port) else this@zmq.server(protocol, port) } diff --git a/communicator-zmq/src/commonMain/kotlin/client/ClientForwardSocketHandler.kt b/communicator-zmq/src/commonMain/kotlin/client/ClientForwardSocketHandler.kt index b982f5a..9acdc4b 100644 --- a/communicator-zmq/src/commonMain/kotlin/client/ClientForwardSocketHandler.kt +++ b/communicator-zmq/src/commonMain/kotlin/client/ClientForwardSocketHandler.kt @@ -59,9 +59,9 @@ internal fun handleForwardSocket(arg: ForwardSocketHandlerArg) = with(arg.client } Protocol.Coder.IdentityFound -> { - val (queryID, argCoderIdentity, resultCoderIdentity) = msg + val (queryID, argCodecIdentity, resultCodecIdentity) = msg val callback = specQueriesInWork[UniqueID(queryID)] ?: return - callback.onSpecFound(argCoderIdentity.decodeToString(), resultCoderIdentity.decodeToString()) + callback.onSpecFound(argCodecIdentity.decodeToString(), resultCodecIdentity.decodeToString()) } Protocol.Coder.IdentityNotFound -> { diff --git a/communicator-zmq/src/commonMain/kotlin/client/ZmqTransportClient.kt b/communicator-zmq/src/commonMain/kotlin/client/ZmqTransportClient.kt index e009a0a..c493c41 100644 --- a/communicator-zmq/src/commonMain/kotlin/client/ZmqTransportClient.kt +++ b/communicator-zmq/src/commonMain/kotlin/client/ZmqTransportClient.kt @@ -64,7 +64,7 @@ public class ZmqTransportClient private constructor( runAsync(this) { start() } } - public override suspend fun respond(host: String, port: Int, name: String, payload: Payload): Payload = + override suspend fun respond(host: String, port: Int, name: String, payload: Payload): Payload = respondImpl(host, port, name, payload) @@ -89,7 +89,7 @@ public class ZmqTransportClient private constructor( newQueriesQueue.addFirst(query) } - public override fun close() { + override fun close() { logger.info { "Stopping and cleaning up." } active[0] = -1 newQueriesQueue.dispose() @@ -100,7 +100,7 @@ public class ZmqTransportClient private constructor( ctx.close() } - public override fun toString(): String = "ZmqTransport(${identityHash})" + override fun toString(): String = "ZmqTransport(${identityHash})" } internal expect suspend fun ZmqTransportClient.respondImpl( @@ -116,7 +116,7 @@ private fun ZmqTransportClient.handleQueriesQueue() { val id = UniqueID() queriesInWork[id] = query.callback - getForwardSocket(query.host , query.port).sendMsg { + getForwardSocket(query.host, query.port).sendMsg { +Protocol.Query +id +query.arg diff --git a/communicator-zmq/src/commonMain/kotlin/server/TransportServerFrontendHandler.kt b/communicator-zmq/src/commonMain/kotlin/server/TransportServerFrontendHandler.kt index 182f3e1..3ad90c0 100644 --- a/communicator-zmq/src/commonMain/kotlin/server/TransportServerFrontendHandler.kt +++ b/communicator-zmq/src/commonMain/kotlin/server/TransportServerFrontendHandler.kt @@ -44,19 +44,24 @@ internal fun ZmqTransportServer.handleFrontend() { Protocol.Coder.IdentityQuery -> { val (functionName) = msg - val functionSpec = serverFunctions[functionName.decodeToString()]?.second + + val (_, argumentCodec, resultCodec) = serverFunctions[functionName.decodeToString()] ?: Triple( + null, + null, + null, + ) frontend.sendMsg { +clientIdentity - if (functionSpec == null) { + if (argumentCodec == null || resultCodec == null) { +Protocol.Coder.IdentityNotFound +functionName } else { +Protocol.Coder.IdentityFound +functionName - +functionSpec.argumentCoder.identity.encodeToByteArray() - +functionSpec.resultCoder.identity.encodeToByteArray() + +argumentCodec.identity.encodeToByteArray() + +resultCodec.identity.encodeToByteArray() } } } diff --git a/communicator-zmq/src/commonMain/kotlin/server/WorkerFrontendHandler.kt b/communicator-zmq/src/commonMain/kotlin/server/WorkerFrontendHandler.kt index 0c480a6..7c17bae 100644 --- a/communicator-zmq/src/commonMain/kotlin/server/WorkerFrontendHandler.kt +++ b/communicator-zmq/src/commonMain/kotlin/server/WorkerFrontendHandler.kt @@ -46,8 +46,11 @@ internal fun ZmqWorker.handleWorkerFrontend() { } Protocol.IncompatibleSpecsFailure -> { - val (functionName, argCoder, resultCoder) = msg - logger.warn { "INCOMPATIBLE_SPECS_FAILURE functionName=$functionName argCoder=$argCoder resultCoder=$resultCoder" } + val (functionName, argCodec, resultCodec) = msg + + logger.warn { + "INCOMPATIBLE_SPECS_FAILURE functionName=$functionName argCodec=$argCodec resultCodec=$resultCodec" + } } else -> logger.warn { "Unknown message type: $type" } diff --git a/communicator-zmq/src/commonMain/kotlin/server/ZmqTransportServer.kt b/communicator-zmq/src/commonMain/kotlin/server/ZmqTransportServer.kt index b80f1ab..d2b5196 100644 --- a/communicator-zmq/src/commonMain/kotlin/server/ZmqTransportServer.kt +++ b/communicator-zmq/src/commonMain/kotlin/server/ZmqTransportServer.kt @@ -7,7 +7,7 @@ import co.touchlab.stately.isolate.StateRunner import kotlinx.coroutines.* import mu.KLogger import mu.KotlinLogging -import space.kscience.communicator.api.FunctionSpec +import space.kscience.communicator.api.Codec import space.kscience.communicator.api.PayloadFunction import space.kscience.communicator.api.TransportServer import space.kscience.communicator.zmq.Protocol @@ -25,10 +25,10 @@ import space.kscience.communicator.zmq.util.sendMsg * The recommended protocol identifier is `ZMQ`. */ public class ZmqTransportServer private constructor( - public override val port: Int, + override val port: Int, private val stateRunner: StateRunner = DaemonStateRunner(), - internal val serverFunctions: IsoMutableMap>> = + internal val serverFunctions: IsoMutableMap, Codec<*>>> = IsoMutableMap(stateRunner) { HashMap() }, private val workerDispatcher: CoroutineDispatcher = Dispatchers.Default, @@ -66,7 +66,7 @@ public class ZmqTransportServer private constructor( reactor.start() } - public override fun close() { + override fun close() { logger.info { "Stopping and cleaning up." } active[0] = -1 workerScope.cancel("Transport server is being stopped.") @@ -80,18 +80,25 @@ public class ZmqTransportServer private constructor( runAsync(this) { start() } } - public override fun register(name: String, function: PayloadFunction, spec: FunctionSpec<*, *>): Unit = - editFunctionQueriesQueue.addFirst(RegisterFunctionQuery(name, function, spec)) + override fun register( + name: String, + function: PayloadFunction, + argumentCodec: Codec<*>, + resultCodec: Codec<*>, + ): Unit = editFunctionQueriesQueue.addFirst(RegisterFunctionQuery(name, function, argumentCodec, resultCodec)) - public override fun unregister(name: String): Unit = - editFunctionQueriesQueue.addFirst(UnregisterFunctionQuery(name)) - - public override fun toString(): String = "ZmqTransportServer($port)" + override fun unregister(name: String): Unit = editFunctionQueriesQueue.addFirst(UnregisterFunctionQuery(name)) + override fun toString(): String = "ZmqTransportServer($port)" } internal sealed class EditFunctionQuery -private class RegisterFunctionQuery(val name: String, val function: PayloadFunction, val spec: FunctionSpec<*, *>) : +private class RegisterFunctionQuery( + val name: String, + val function: PayloadFunction, + val argumentCodec: Codec<*>, + val resultCodec: Codec<*>, +) : EditFunctionQuery() private class UnregisterFunctionQuery(val name: String) : EditFunctionQuery() @@ -134,7 +141,7 @@ private fun ZmqTransportServer.handleEditFunctionQueue() { when (editFunctionMessage) { is RegisterFunctionQuery -> this@handleEditFunctionQueue.serverFunctions[editFunctionMessage.name] = - editFunctionMessage.function to editFunctionMessage.spec + Triple(editFunctionMessage.function, editFunctionMessage.argumentCodec, editFunctionMessage.resultCodec) is UnregisterFunctionQuery -> this@handleEditFunctionQueue.serverFunctions -= editFunctionMessage.name } diff --git a/communicator-zmq/src/commonMain/kotlin/server/ZmqWorker.kt b/communicator-zmq/src/commonMain/kotlin/server/ZmqWorker.kt index 3b1ac7d..3f35db3 100644 --- a/communicator-zmq/src/commonMain/kotlin/server/ZmqWorker.kt +++ b/communicator-zmq/src/commonMain/kotlin/server/ZmqWorker.kt @@ -9,8 +9,8 @@ import kotlinx.coroutines.* import mu.KLogger import mu.KotlinLogging import space.kscience.communicator.api.ClientEndpoint -import space.kscience.communicator.api.FunctionSpec -import space.kscience.communicator.api.IntCoder +import space.kscience.communicator.api.Codec +import space.kscience.communicator.api.IntCodec import space.kscience.communicator.api.PayloadFunction import space.kscience.communicator.zmq.Protocol import space.kscience.communicator.zmq.platform.ZmqContext @@ -23,7 +23,7 @@ import space.kscience.communicator.zmq.util.sendMsg public class ZmqWorker private constructor( internal val proxy: ClientEndpoint, private val stateRunner: StateRunner = DaemonStateRunner(), - internal val serverFunctions: IsoMutableMap>>, + internal val serverFunctions: IsoMutableMap, Codec<*>>>, private val workerDispatcher: CoroutineDispatcher = Dispatchers.Default, internal val workerScope: CoroutineScope = CoroutineScope(workerDispatcher + SupervisorJob()), private val ctx: ZmqContext = ZmqContext(), @@ -36,7 +36,7 @@ public class ZmqWorker private constructor( ) : Closeable { public constructor( proxy: ClientEndpoint, - serverFunctions: MutableMap>>, + serverFunctions: MutableMap, Codec<*>>>, workerDispatcher: CoroutineDispatcher = Dispatchers.Default, workerScope: CoroutineScope = CoroutineScope(workerDispatcher + SupervisorJob()), stateRunner: StateRunner = DaemonStateRunner(), @@ -55,7 +55,7 @@ public class ZmqWorker private constructor( /** * Stops and disposes this worker. */ - public override fun close() { + override fun close() { workerScope.cancel("Proxy is being stopped.") serverFunctions.dispose() ctx.close() @@ -66,12 +66,12 @@ public class ZmqWorker private constructor( frontend.sendMsg { +Protocol.Worker.Register - +IntCoder.encode(serverFunctions.size) + +IntCodec.encode(serverFunctions.size) - serverFunctions.mapValues { it.value.second }.forEach { - +it.key - +it.value.argumentCoder.identity - +it.value.resultCoder.identity + serverFunctions.mapValues { (_, v) -> v.second to v.third }.forEach { (k, v) -> + +k + +v.first.identity + +v.second.identity } } @@ -104,7 +104,7 @@ public class ZmqWorker private constructor( reactor.start() } - public override fun toString(): String = "ZmqWorker(${proxy.host}:${proxy.port}))" + override fun toString(): String = "ZmqWorker(${proxy.host}:${proxy.port}))" } internal sealed class WorkerEditFunctionQuery @@ -112,7 +112,8 @@ internal sealed class WorkerEditFunctionQuery private class WorkerRegisterFunctionQuery( val name: String, val function: PayloadFunction, - val spec: FunctionSpec<*, *>, + val argumentCodec: Codec<*>, + val resultCodec: Codec<*>, ) : WorkerEditFunctionQuery() private class WorkerUnregisterFunctionQuery(val name: String) : WorkerEditFunctionQuery() @@ -156,7 +157,7 @@ private fun ZmqWorker.handleEditFunctionQueue() { when (editFunctionMessage) { is WorkerRegisterFunctionQuery -> serverFunctions[editFunctionMessage.name] = - editFunctionMessage.function to editFunctionMessage.spec + Triple(editFunctionMessage.function, editFunctionMessage.argumentCodec, editFunctionMessage.resultCodec) is WorkerUnregisterFunctionQuery -> serverFunctions -= editFunctionMessage.name } diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 62d3bf1..f03ae00 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -1,7 +1,11 @@ @file:Suppress("UNUSED_VARIABLE") internal val slf4jVersion: String by project -plugins { kotlin(module = "multiplatform") } + +plugins { + kotlin("multiplatform") + kotlin("plugin.serialization") +} kotlin { jvm() diff --git a/demo/src/jvmMain/kotlin/Example.kt b/demo/src/jvmMain/kotlin/Example.kt index ed180e0..b37f67f 100644 --- a/demo/src/jvmMain/kotlin/Example.kt +++ b/demo/src/jvmMain/kotlin/Example.kt @@ -1,30 +1,29 @@ import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable import space.kscience.communicator.api.* -import space.kscience.communicator.zmq.withZmq +import space.kscience.communicator.zmq.zmq private val endpoint = ClientEndpoint("ZMQ", "127.0.0.1:8888") +@Serializable +data class MyArguments(val arg1: Int, val arg2: Double) + private object Functions : FunctionSet(endpoint) { - val f by declare(IntCoder, IntCoder) - val g by declare(IntCoder, StringCoder) + val f by declare(JsonCodec(), IntCodec) } /** - * Launches [TransportFunctionServer] with function f(x) = x^2 + 1 and [TransportFunctionClient] calling that + * Launches [FunctionServer] with function f(x) = x^2 + 1 and [FunctionClient] calling that * function, calls f from 123, and prints the result. */ fun main(): Unit = runBlocking { - val server = TransportFunctionServer(Functions, TransportServerFactory.withZmq()) { - it.impl(f) { x -> x * x + 1 } - it.impl(g) { x -> "a".repeat(x) } + val server = FunctionServer(Functions, TransportFactory.zmq()) { + it.impl(f) { (arg1, arg2) -> arg1 * arg2.toInt() } } - val client = TransportFunctionClient(TransportClientFactory.withZmq()) + val client = FunctionClient(TransportFactory.zmq()) println("Calling ${Functions.f}") - var result: Any = Functions.f(client, 123) - println("Result is $result") - println("Calling ${Functions.g}") - result = Functions.g(client, 55) + val result: Any = Functions.f(client, MyArguments(1, 2.0)) println("Result is $result") client.close() server.close() diff --git a/gradle.properties b/gradle.properties index d210c58..cc355d2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,5 +10,6 @@ kotlinVersion=1.5.21 ktorVersion=1.6.1 org.gradle.jvmargs=-XX:MaxMetaspaceSize=1G org.gradle.parallel=true -slf4jVersion=1.7.30 +serializationVersion=1.2.2 +slf4jVersion=1.7.31 statelyIsoVersion=1.1.7-a1