From ddb2fa771696ae0a8c4fcf574f66bddac7279873 Mon Sep 17 00:00:00 2001 From: Adam Chlupacek Date: Sat, 2 May 2020 20:08:35 +0200 Subject: [PATCH 1/3] Migrate fs2-http to fs2 2.3.0 + support for scala 2.13 --- build.sbt | 44 ++-- .../scala/spinoco/fs2/http/HttpClient.scala | 40 ++-- .../fs2/http/HttpRequestOrResponse.scala | 46 ++-- .../scala/spinoco/fs2/http/HttpServer.scala | 18 +- .../fs2/http/body/StreamBodyEncoder.scala | 4 +- src/main/scala/spinoco/fs2/http/http.scala | 33 ++- .../fs2/http/internal/ChunkedEncoding.scala | 11 +- .../spinoco/fs2/http/internal/internal.scala | 39 ++-- .../spinoco/fs2/http/routing/routing.scala | 3 +- .../spinoco/fs2/http/sse/SSEEncoding.scala | 13 +- .../scala/spinoco/fs2/http/util/util.scala | 216 ++++++++---------- .../fs2/http/websocket/WebSocket.scala | 60 +++-- .../spinoco/fs2/http/HttpRequestSpec.scala | 5 +- .../spinoco/fs2/http/HttpResponseSpec.scala | 3 +- .../spinoco/fs2/http/HttpServerSpec.scala | 97 ++++---- .../scala/spinoco/fs2/http/Resources.scala | 14 +- .../http/internal/ChunkedEncodingSpec.scala | 7 +- .../fs2/http/internal/HttpClientApp.scala | 5 +- .../fs2/http/internal/HttpServerApp.scala | 6 +- .../fs2/http/sse/SSEEncodingSpec.scala | 3 +- .../spinoco/fs2/http/util/UtilSpec.scala | 68 ------ .../http/websocket/WebSocketClientApp.scala | 12 +- .../fs2/http/websocket/WebSocketSpec.scala | 17 +- version.sbt | 2 +- 24 files changed, 346 insertions(+), 420 deletions(-) delete mode 100644 src/test/scala/spinoco/fs2/http/util/UtilSpec.scala diff --git a/build.sbt b/build.sbt index a78e6e9..f146072 100644 --- a/build.sbt +++ b/build.sbt @@ -9,8 +9,8 @@ lazy val contributors = Seq( lazy val commonSettings = Seq( organization := "com.spinoco", - scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.11.8", "2.12.1"), + scalaVersion := "2.12.11", + crossScalaVersions := Seq("2.12.11", "2.13.0"), scalacOptions ++= Seq( "-feature", "-deprecation", @@ -19,23 +19,37 @@ lazy val commonSettings = Seq( "-language:existentials", "-language:postfixOps", "-Xfatal-warnings", - "-Yno-adapted-args", - "-Ywarn-value-discard", - "-Ywarn-unused-import" + "-Ywarn-value-discard" ), + scalacOptions ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, v)) if v >= 13 => + Seq("-Ymacro-annotations", "-Ywarn-unused:imports") + case _ => + Seq("-Yno-adapted-args", "-Ywarn-unused-import") + } + }, scalacOptions in (Compile, console) ~= {_.filterNot("-Ywarn-unused-import" == _)}, libraryDependencies ++= Seq( - compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full) - , "com.github.mpilquist" %% "simulacrum" % "0.13.0" - , "org.scodec" %% "scodec-bits" % "1.1.4" - , "org.scodec" %% "scodec-core" % "1.10.3" - , "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" - , "com.spinoco" %% "protocol-http" % "0.3.15" - , "com.spinoco" %% "protocol-websocket" % "0.3.15" - , "co.fs2" %% "fs2-core" % "1.0.0-M2" - , "co.fs2" %% "fs2-io" % "1.0.0-M2" - , "com.spinoco" %% "fs2-crypto" % "0.4.0-M2" +// "com.github.mpilquist" %% "simulacrum" % "0.13.0" + "org.scalacheck" %% "scalacheck" % "1.14.3" % "test" + , "com.spinoco" %% "protocol-http" % "0.4.0-M1" + , "com.spinoco" %% "protocol-websocket" % "0.4.0-M1" + , "co.fs2" %% "fs2-core" % "2.3.0" + , "co.fs2" %% "fs2-io" % "2.3.0" ), + libraryDependencies ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, v)) if v <= 12 => + Seq( + compilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full) + ) + case _ => + // if scala 2.13.0-M4 or later, macro annotations merged into scala-reflect + // https://github.com/scala/scala/pull/6606 + Nil + } + }, scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), homepage := None, licenses += ("MIT", url("http://opensource.org/licenses/MIT")), diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 653bdb3..aceca8a 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -1,21 +1,19 @@ package spinoco.fs2.http -import java.nio.channels.AsynchronousChannelGroup -import cats.Applicative -import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, ConcurrentEffect, Sync, Timer} +import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Sync, Timer} import fs2._ -import fs2.io.tcp.Socket +import fs2.concurrent.SignallingRef +import fs2.io.tcp.{Socket, SocketGroup} +import fs2.io.tls.TLSContext import scodec.{Codec, Decoder, Encoder} - import spinoco.fs2.http.internal.{addressForRequest, clientLiftToSecure, readWithTimeout} import spinoco.fs2.http.sse.{SSEDecoder, SSEEncoding} import spinoco.fs2.http.websocket.{Frame, WebSocket, WebSocketRequest} import spinoco.protocol.http.header._ import spinoco.protocol.mime.MediaType import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} -import scala.concurrent.ExecutionContext + import scala.concurrent.duration._ @@ -109,15 +107,13 @@ trait HttpClient[F[_]] { * @param sslExecutionContext Strategy used when communication with SSL (https or wss) * @param sslContext SSL Context to use with SSL Client (https, wss) */ - def apply[F[_] : ConcurrentEffect : Timer]( + def apply[F[_] : ConcurrentEffect : Timer: ContextShift]( requestCodec : Codec[HttpRequestHeader] , responseCodec : Codec[HttpResponseHeader] - , sslExecutionContext: => ExecutionContext - , sslContext : => SSLContext - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay { - lazy val sslCtx = sslContext - lazy val sslS = sslExecutionContext - + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ):F[HttpClient[F]] = Sync[F].delay { new HttpClient[F] { def request( request: HttpRequest[F] @@ -126,10 +122,10 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(io.tcp.client[F](address)) - .evalMap { socket => - if (!request.isSecure) Applicative[F].pure(socket) - else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) + Stream.resource(socketGroup.client(address)) + .flatMap { socket => + if (!request.isSecure) Stream.emit(socket) + else Stream.resource(clientLiftToSecure[F](tlsContext)(socket, request.host)) } .flatMap { impl.request[F](request, chunkSize, maxResponseHeaderSize, timeout, requestCodec, responseCodec ) }} } @@ -141,7 +137,7 @@ trait HttpClient[F[_]] { , chunkSize: Int , maxFrameSize: Int ): Stream[F, Option[HttpResponseHeader]] = - WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx) + WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec)(socketGroup, tlsContext) def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] = @@ -169,8 +165,8 @@ trait HttpClient[F[_]] { timeout match { case fin: FiniteDuration => eval(Sync[F].delay(System.currentTimeMillis())).flatMap { start => - HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => - eval(async.signalOf[F, Boolean](true)).flatMap { timeoutSignal => + HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => + eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal => eval(Sync[F].delay(System.currentTimeMillis())).flatMap { sent => val remains = fin - (sent - start).millis readWithTimeout(socket, remains, timeoutSignal.get, chunkSize) @@ -181,7 +177,7 @@ trait HttpClient[F[_]] { }}}} case _ => - HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec) } } diff --git a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala index f29cbd1..1f673e7 100644 --- a/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala +++ b/src/main/scala/spinoco/fs2/http/HttpRequestOrResponse.scala @@ -22,7 +22,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** yields to true, if body of this request shall be chunked **/ lazy val bodyIsChunked : Boolean = - withHeaders(internal.bodyIsChunked) + withHeaders(spinoco.fs2.http.internal.bodyIsChunked) /** allows to stream arbitrary sized stream of `A` to remote party (i.e. upload) **/ def withStreamBody[A](body: Stream[F, A])(implicit E: StreamBodyEncoder[F, A]): Self = { @@ -37,7 +37,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** sets body size to supplied value **/ def withBodySize(sz: Long): Self = - updateHeaders(withHeaders(internal.swapHeader(`Content-Length`(sz)))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Length`(sz)))) /** gets body size, if one specified **/ def bodySize: Option[Long] = @@ -46,7 +46,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => protected def body: Stream[F, Byte] /** encodes body `A` given BodyEncoder exists **/ - def withBody[A](a: A)(implicit W: BodyEncoder[A]): Self = { + def withBody[A](a: A)(implicit W: BodyEncoder[A], raise: RaiseThrowable[F]): Self = { W.encode(a) match { case Failure(err) => updateBody(body = Stream.raiseError(new Throwable(s"failed to encode $a: $err"))) case Successful(bytes) => @@ -62,15 +62,15 @@ sealed trait HttpRequestOrResponse[F[_]] { self => } /** encodes body as utf8 string **/ - def withUtf8Body(s: String): Self = - withBody(s)(BodyEncoder.utf8String) + def withUtf8Body(s: String)(implicit raise: RaiseThrowable[F]): Self = + withBody(s)(BodyEncoder.utf8String, raise) /** Decodes body with supplied decoder of `A` **/ def bodyAs[A](implicit D: BodyDecoder[A], F: Sync[F]): F[Attempt[A]] = { withHeaders { _.collectFirst { case `Content-Type`(ct) => ct } match { case None => F.pure(Attempt.failure(Err("Content type is not known"))) case Some(ct) => - F.map(self.body.chunks.map(util.chunk2ByteVector).compile.toVector) { bs => + F.map(self.body.chunks.map(_.toByteVector).compile.toVector) { bs => if (bs.isEmpty) Attempt.failure(Err("Body is empty")) else D.decode(bs.reduce(_ ++ _), ct) } @@ -79,7 +79,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** gets body as stream of byteVectors **/ def bodyAsByteVectorStream:Stream[F,ByteVector] = - self.body.chunks.map(util.chunk2ByteVector) + self.body.chunks.map(_.toByteVector) /** decodes body as string with encoding supplied in ContentType **/ def bodyAsString(implicit F: Sync[F]): F[Attempt[String]] = @@ -87,7 +87,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** updates content type to one specified **/ def withContentType(ct: ContentType): Self = - updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ct)))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ct)))) /** gets ContentType, if one specififed **/ def contentType: Option[ContentType] = @@ -96,7 +96,7 @@ sealed trait HttpRequestOrResponse[F[_]] { self => /** configures encoding as chunked **/ def chunkedEncoding: Self = - updateHeaders(withHeaders(internal.swapHeader(`Transfer-Encoding`(List("chunked"))))) + updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Transfer-Encoding`(List("chunked"))))) def withHeaders[A](f: List[HttpHeader] => A): A = self match { case HttpRequest(_,_,header,_) => f(header.headers) @@ -161,8 +161,8 @@ final case class HttpRequest[F[_]]( * That means instead of passing query as part of request, they are encoded as utf8 body. * @return */ - def withQueryBodyEncoded(q:Uri.Query): Self = - withBody(q)(BodyEncoder.`x-www-form-urlencoded`) + def withQueryBodyEncoded(q:Uri.Query)(implicit raise: RaiseThrowable[F]): Self = + withBody(q)(BodyEncoder.`x-www-form-urlencoded`, raise) def bodyAsQuery(implicit F: Sync[F]):F[Attempt[Uri.Query]] = bodyAs[Uri.Query](BodyDecoder.`x-www-form-urlencoded`, F) @@ -190,10 +190,10 @@ object HttpRequest { ) , body = Stream.empty) - def post[F[_], A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = + def post[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = get(uri).withMethod(HttpMethod.POST).withBody(a) - def put[F[_], A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = + def put[F[_]: RaiseThrowable, A](uri: Uri, a: A)(implicit E: BodyEncoder[A]): HttpRequest[F] = get(uri).withMethod(HttpMethod.PUT).withBody(a) def delete[F[_]](uri: Uri): HttpRequest[F] = @@ -210,11 +210,11 @@ object HttpRequest { * @tparam F * @return */ - def fromStream[F[_]]( + def fromStream[F[_]: RaiseThrowable]( maxHeaderSize: Int , headerCodec: Codec[HttpRequestHeader] ): Pipe[F, Byte, (HttpRequestHeader, Stream[F, Byte])] = { - import internal._ + import spinoco.fs2.http.internal._ _ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) => headerCodec.decodeValue(header.bits) match { case Failure(err) => Stream.raiseError(new Throwable(s"Decoding of the request header failed: $err")) @@ -240,11 +240,11 @@ object HttpRequest { * @param request request to convert to stream * @param headerCodec Codec to convert the header to bytes */ - def toStream[F[_]]( + def toStream[F[_]: RaiseThrowable]( request: HttpRequest[F] , headerCodec: Codec[HttpRequestHeader] ): Stream[F, Byte] = Stream.suspend { - import internal._ + import spinoco.fs2.http.internal._ headerCodec.encode(request.header) match { case Failure(err) => Stream.raiseError(new Throwable(s"Encoding of the header failed: $err")) @@ -280,10 +280,10 @@ final case class HttpResponse[F[_]]( self.copy(header= self.header.copy(headers = headers)) /** encodes supplied stream of `A` as SSE stream in body **/ - def sseBody[A](in: Stream[F, A])(implicit E: SSEEncoder[A]): Self = + def sseBody[A](in: Stream[F, A])(implicit E: SSEEncoder[A], raise: RaiseThrowable[F]): Self = self .updateBody(in through SSEEncoding.encodeA[F, A]) - .updateHeaders(withHeaders(internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None))))) + .updateHeaders(withHeaders(spinoco.fs2.http.internal.swapHeader(`Content-Type`(ContentType.TextContent(MediaType.`text/event-stream`, None))))) } @@ -301,11 +301,11 @@ object HttpResponse { /** * Decodes stream of bytes as HttpResponse. */ - def fromStream[F[_]]( + def fromStream[F[_]: RaiseThrowable]( maxHeaderSize: Int , responseCodec: Codec[HttpResponseHeader] ): Pipe[F,Byte, HttpResponse[F]] = { - import internal._ + import spinoco.fs2.http.internal._ _ through httpHeaderAndBody(maxHeaderSize) flatMap { case (header, bodyRaw) => responseCodec.decodeValue(header.bits) match { @@ -325,11 +325,11 @@ object HttpResponse { /** Encodes response to stream of bytes **/ - def toStream[F[_]]( + def toStream[F[_]: RaiseThrowable]( response: HttpResponse[F] , headerCodec: Codec[HttpResponseHeader] ): Stream[F, Byte] = Stream.suspend { - import internal._ + import spinoco.fs2.http.internal._ headerCodec.encode(response.header) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode http response : $response :$err ")) diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 7b167fd..4ad7670 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -1,15 +1,16 @@ package spinoco.fs2.http import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ +import fs2.concurrent.SignallingRef +import fs2.io.tcp.SocketGroup import scodec.Codec - import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader, HttpStatusCode} + import scala.concurrent.duration._ @@ -35,7 +36,7 @@ object HttpServer { * Request is not suplied if failure happened before request was constructed. * */ - def apply[F[_] : ConcurrentEffect : Timer]( + def apply[F[_] : ConcurrentEffect : Timer: ContextShift]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 @@ -47,19 +48,18 @@ object HttpServer { , requestFailure : Throwable => Stream[F, HttpResponse[F]] , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] )( - implicit - AG: AsynchronousChannelGroup + socketGroup: SocketGroup ): Stream[F, Unit] = { import Stream._ - import internal._ + import spinoco.fs2.http.internal._ val (initial, readDuration) = requestHeaderReceiveTimeout match { case fin: FiniteDuration => (true, fin) case _ => (false, 0.millis) } - io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + socketGroup.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => - eval(async.signalOf(initial)).flatMap { timeoutSignal => + eval(SignallingRef[F, Boolean](initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) .through(HttpRequest.fromStream(maxHeaderSize, requestCodec)) .flatMap { case (request, body) => diff --git a/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala b/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala index ea8d353..2686408 100644 --- a/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala +++ b/src/main/scala/spinoco/fs2/http/body/StreamBodyEncoder.scala @@ -46,7 +46,7 @@ object StreamBodyEncoder { StreamBodyEncoder(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) { _.flatMap { bv => Stream.chunk(ByteVectorChunk(bv)) } } /** encoder that encodes utf8 string, with `text/plain` utf8 content type **/ - def utf8StringEncoder[F[_]](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] = + def utf8StringEncoder[F[_]: RaiseThrowable](implicit F: MonadError[F, Throwable]) : StreamBodyEncoder[F, String] = byteVectorEncoder mapInF[String] { s => ByteVector.encodeUtf8(s) match { case Right(bv) => F.pure(bv) @@ -55,7 +55,7 @@ object StreamBodyEncoder { } withContentType ContentType.TextContent(MediaType.`text/plain`, Some(MIMECharset.`UTF-8`)) /** a convenience wrapper to convert body encoder to StreamBodyEncoder **/ - def fromBodyEncoder[F[_], A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] = + def fromBodyEncoder[F[_]: RaiseThrowable, A](implicit E: BodyEncoder[A]):StreamBodyEncoder[F, A] = StreamBodyEncoder(E.contentType) { _.flatMap { a => E.encode(a) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode: $err ($a)")) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 903035e..29833d4 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -1,17 +1,14 @@ package spinoco.fs2 import java.net.InetSocketAddress -import java.nio.channels.AsynchronousChannelGroup -import java.util.concurrent.Executors - -import javax.net.ssl.SSLContext -import cats.effect.{ConcurrentEffect, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Timer} import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scodec.Codec - import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} -import scala.concurrent.ExecutionContext + import scala.concurrent.duration._ @@ -31,7 +28,7 @@ package object http { * @param requestCodec Codec for Http Request Header * @param service Pipe that defines handling of each incoming request and produces a response */ - def server[F[_] : ConcurrentEffect : Timer]( + def server[F[_] : ConcurrentEffect : Timer: ContextShift]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 @@ -43,7 +40,7 @@ package object http { , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] = HttpServer.handleSendFailure[F] _ )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] - )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( + )(socketGroup: SocketGroup):Stream[F,Unit] = HttpServer( maxConcurrent = maxConcurrent , receiveBufferSize = receiveBufferSize , maxHeaderSize = maxHeaderSize @@ -54,7 +51,7 @@ package object http { , service = service , requestFailure = requestFailure , sendFailure = sendFailure - ) + )(socketGroup) /** @@ -62,14 +59,14 @@ package object http { * * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header - * @param sslStrategy Strategy used to perform blocking SSL operations */ - def client[F[_]: ConcurrentEffect : Timer]( - requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec - , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec - , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) - , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = - HttpClient(requestCodec, responseCodec, sslStrategy, sslContext) + def client[F[_]: ConcurrentEffect : Timer: ContextShift]( + requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec + , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ):F[HttpClient[F]] = + HttpClient(requestCodec, responseCodec)(socketGroup, tlsContext) } diff --git a/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala b/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala index 85e44a7..097204c 100644 --- a/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala +++ b/src/main/scala/spinoco/fs2/http/internal/ChunkedEncoding.scala @@ -4,7 +4,6 @@ import fs2.Chunk.ByteVectorChunk import fs2._ import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector /** * Created by pach on 20/01/17. @@ -15,14 +14,14 @@ object ChunkedEncoding { /** decodes from the HTTP chunked encoding. After last chunk this terminates. Allows to specify max header size, after which this terminates * Please see https://en.wikipedia.org/wiki/Chunked_transfer_encoding for details */ - def decode[F[_]](maxChunkHeaderSize:Int): Pipe[F, Byte, Byte] = { + def decode[F[_]: RaiseThrowable](maxChunkHeaderSize:Int): Pipe[F, Byte, Byte] = { // on left reading the header of chunk (acting as buffer) // on right reading the chunk itself, and storing remaining bytes of the chunk def go(expect:Either[ByteVector,Long], in: Stream[F, Byte]): Pull[F, Byte, Unit] = { - in.pull.unconsChunk.flatMap { + in.pull.uncons.flatMap { case None => Pull.done case Some((h, tl)) => - val bv = chunk2ByteVector(h) + val bv = h.toByteVector expect match { case Left(header) => val nh = header ++ bv @@ -65,14 +64,14 @@ object ChunkedEncoding { if (bv.isEmpty) Chunk.empty else ByteVectorChunk(ByteVector.view(bv.size.toHexString.toUpperCase.getBytes) ++ `\r\n` ++ bv ++ `\r\n` ) } - _.mapChunks { ch => encodeChunk(chunk2ByteVector(ch)) } ++ Stream.chunk(lastChunk) + _.mapChunks { ch => encodeChunk(ch.toByteVector) } ++ Stream.chunk(lastChunk) } /** yields to size of header in case the chunked header was succesfully parsed, else yields to None **/ private def readChunkedHeader(hdr:ByteVector):Option[Long] = { - hdr.decodeUtf8.right.toOption.flatMap { s => + hdr.decodeUtf8.toOption.flatMap { s => val parts = s.split(';') // lets ignore any extensions if (parts.isEmpty) None else { diff --git a/src/main/scala/spinoco/fs2/http/internal/internal.scala b/src/main/scala/spinoco/fs2/http/internal/internal.scala index adca664..17fa521 100644 --- a/src/main/scala/spinoco/fs2/http/internal/internal.scala +++ b/src/main/scala/spinoco/fs2/http/internal/internal.scala @@ -3,20 +3,17 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, Sync, Timer} -import javax.net.ssl.{SNIHostName, SNIServerName, SSLContext} -import cats.syntax.all._ +import javax.net.ssl.{SNIHostName, SNIServerName} +import cats.effect.{Concurrent, ContextShift, Resource, Sync} import fs2.Chunk.ByteVectorChunk import fs2.Stream._ import fs2.io.tcp.Socket +import fs2.io.tls.{TLSContext, TLSParameters} import fs2.{Stream, _} import scodec.bits.ByteVector - -import spinoco.fs2.crypto.io.tcp.TLSSocket import spinoco.protocol.http.{HostPort, HttpScheme, Scheme} import spinoco.protocol.http.header.{HttpHeader, `Transfer-Encoding`} -import scala.concurrent.ExecutionContext + import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -46,13 +43,13 @@ package object internal { /** * From the stream of bytes this extracts Http Header and body part. */ - def httpHeaderAndBody[F[_]](maxHeaderSize: Int): Pipe[F, Byte, (ByteVector, Stream[F, Byte])] = { + def httpHeaderAndBody[F[_]: RaiseThrowable](maxHeaderSize: Int): Pipe[F, Byte, (ByteVector, Stream[F, Byte])] = { def go(buff: ByteVector, in: Stream[F, Byte]): Pull[F, (ByteVector, Stream[F, Byte]), Unit] = { - in.pull.unconsChunk flatMap { + in.pull.uncons flatMap { case None => Pull.raiseError(new Throwable(s"Incomplete Header received (sz = ${buff.size}): ${buff.decodeUtf8}")) case Some((chunk, tl)) => - val bv = spinoco.fs2.http.util.chunk2ByteVector(chunk) + val bv = chunk.toByteVector val all = buff ++ bv val idx = all.indexOfSlice(`\r\n\r\n`) if (idx < 0) { @@ -107,7 +104,7 @@ package object internal { eval(shallTimeout).flatMap { shallTimeout => if (!shallTimeout) socket.reads(chunkSize, None) else { - if (remains <= 0.millis) Stream.raiseError(new TimeoutException()) + if (remains <= 0.millis) Stream.raiseError[F](new TimeoutException()) else { eval(Sync[F].delay(System.currentTimeMillis())).flatMap { start => eval(socket.read(chunkSize, Some(remains))).flatMap { read => @@ -124,19 +121,13 @@ package object internal { } /** creates a function that lifts supplied socket to secure socket **/ - def clientLiftToSecure[F[_] : Concurrent : Timer](sslES: => ExecutionContext, sslContext: => SSLContext)(socket: Socket[F], server: HostPort): F[Socket[F]] = { - import collection.JavaConverters._ - Sync[F].delay { - val engine = sslContext.createSSLEngine(server.host, server.port.getOrElse(443)) - engine.setUseClientMode(true) - val sslParams = engine.getSSLParameters - sslParams.setServerNames(List[SNIServerName](new SNIHostName(server.host)).asJava) - engine.setSSLParameters(sslParams) - engine - } flatMap { - TLSSocket(socket, _, sslES) - .map(identity) //This is here just to make scala understand types properly - } + def clientLiftToSecure[F[_] : Concurrent : ContextShift](tlsContext: TLSContext)(socket: Socket[F], server: HostPort): Resource[F, Socket[F]] = { + + tlsContext.client( + socket + , TLSParameters(serverNames = Some(List[SNIServerName](new SNIHostName(server.host)))) + ) + } } diff --git a/src/main/scala/spinoco/fs2/http/routing/routing.scala b/src/main/scala/spinoco/fs2/http/routing/routing.scala index 7303717..1520806 100644 --- a/src/main/scala/spinoco/fs2/http/routing/routing.scala +++ b/src/main/scala/spinoco/fs2/http/routing/routing.scala @@ -12,7 +12,6 @@ import spinoco.fs2.http.routing.MatchResult._ import spinoco.fs2.http.routing.Matcher.{Eval, Match} import spinoco.protocol.http.header._ import spinoco.protocol.http.{HttpMethod, HttpRequestHeader, HttpStatusCode, Uri} -import spinoco.fs2.http.util.chunk2ByteVector import spinoco.fs2.http.websocket.{Frame, WebSocket} import scala.concurrent.duration._ @@ -188,7 +187,7 @@ package object routing { F.map(s.chunks.compile.toVector) { chunks => val bytes = if (chunks.isEmpty) ByteVector.empty - else chunks.map(chunk2ByteVector).reduce(_ ++ _) + else chunks.map(_.toByteVector).reduce(_ ++ _) D.decode(bytes, ct.value) } }}.flatMap { diff --git a/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala b/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala index 0e60567..d086c5c 100644 --- a/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala +++ b/src/main/scala/spinoco/fs2/http/sse/SSEEncoding.scala @@ -5,7 +5,6 @@ import fs2._ import scodec.Attempt import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector import scala.util.Try import scala.concurrent.duration._ @@ -35,7 +34,7 @@ object SSEEncoding { } /** encodes stream of `A` as SSE Stream **/ - def encodeA[F[_], A](implicit E: SSEEncoder[A]): Pipe[F, A, Byte] = { + def encodeA[F[_]: RaiseThrowable, A](implicit E: SSEEncoder[A]): Pipe[F, A, Byte] = { _ flatMap { a => E.encode(a) match { case Attempt.Successful(msg) => Stream.emit(msg) case Attempt.Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode $a : $err")) @@ -47,14 +46,14 @@ object SSEEncoding { /** * Decodes stream of bytes to SSE Messages */ - def decode[F[_]]: Pipe[F, Byte, SSEMessage] = { + def decode[F[_]: RaiseThrowable]: Pipe[F, Byte, SSEMessage] = { // drops initial Byte Order Mark, if present def dropInitial(buff:ByteVector): Pipe[F, Byte, Byte] = { - _.pull.unconsChunk.flatMap { + _.pull.uncons.flatMap { case None => Pull.raiseError(new Throwable("SSE Socket did not contain any data")) case Some((chunk, next)) => - val all = buff ++ chunk2ByteVector(chunk) + val all = buff ++ chunk.toByteVector if (all.size < 2) (next through dropInitial(all)).pull.echo else { if (all.startsWith(StartBom)) Pull.output(ByteVectorChunk(all.drop(2))) >> next.pull.echo @@ -76,7 +75,7 @@ object SSEEncoding { // the last event is emitted only if it is terminated by empty line def mkEvents: Pipe[F, String, Seq[String]] = { def go(buff: Vector[String]): Stream[F, String] => Pull[F, Seq[String], Unit] = { - _.pull.unconsChunk flatMap { + _.pull.uncons flatMap { case None => Pull.done case Some((lines, tl)) => @@ -132,7 +131,7 @@ object SSEEncoding { } /** decodes stream of sse messages to `A`, given supplied decoder **/ - def decodeA[F[_], A](implicit D: SSEDecoder[A]): Pipe[F, Byte, A] = { + def decodeA[F[_]: RaiseThrowable, A](implicit D: SSEDecoder[A]): Pipe[F, Byte, A] = { _ through decode flatMap { msg => D.decode(msg) match { case Attempt.Successful(a) => Stream.emit(a) diff --git a/src/main/scala/spinoco/fs2/http/util/util.scala b/src/main/scala/spinoco/fs2/http/util/util.scala index 85ab2c7..e625ac9 100644 --- a/src/main/scala/spinoco/fs2/http/util/util.scala +++ b/src/main/scala/spinoco/fs2/http/util/util.scala @@ -4,11 +4,6 @@ import java.lang.Thread.UncaughtExceptionHandler import java.util.concurrent.{Executors, ThreadFactory} import java.util.concurrent.atomic.AtomicInteger -import fs2.Chunk.ByteVectorChunk -import fs2._ -import scodec.bits.{BitVector, ByteVector} -import scodec.bits.Bases.{Alphabets, Base64Alphabet} - import spinoco.protocol.mime.{ContentType, MIMECharset} import scala.concurrent.ExecutionContext import scala.util.control.NonFatal @@ -16,119 +11,104 @@ import scala.util.control.NonFatal package object util { - /** - * Encodes bytes to base64 encoded bytes [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]] - * Encoding is done lazily to support very large Base64 bodies i.e. email, attachments..) - * @param alphabet Alphabet to use - * @return - */ - def encodeBase64Raw[F[_]](alphabet:Base64Alphabet): Pipe[F, Byte, Byte] = { - def go(rem:ByteVector): Stream[F,Byte] => Pull[F, Byte, Unit] = { - _.pull.unconsChunk flatMap { - case None => - if (rem.size == 0) Pull.done - else Pull.output(ByteVectorChunk(ByteVector.view(rem.toBase64(alphabet).getBytes))) - - case Some((chunk, tl)) => - val n = rem ++ chunk2ByteVector(chunk) - if (n.size/3 > 0) { - val pad = n.size % 3 - val enc = n.dropRight(pad) - val out = Array.ofDim[Byte]((enc.size.toInt / 3) * 4) - var pos = 0 - enc.toBitVector.grouped(6) foreach { group => - val idx = group.padTo(8).shiftRight(2, signExtension = false).toByteVector.head - out(pos) = alphabet.toChar(idx).toByte - pos = pos + 1 - } - Pull.output(ByteVectorChunk(ByteVector.view(out))) >> go(n.takeRight(pad))(tl) - } else { - go(n)(tl) - } - - } - - } - src => go(ByteVector.empty)(src).stream - } - - /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ - def encodeBase64Url[F[_]]:Pipe[F, Byte, Byte] = - encodeBase64Raw(Alphabets.Base64Url) - - /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ - def encodeBase64[F[_]]:Pipe[F, Byte, Byte] = - encodeBase64Raw[F](Alphabets.Base64) - - - /** - * Decodes base64 encoded stream with supplied alphabet. Whitespaces are ignored. - * Decoding is lazy to support very large Base64 bodies (i.e. email) - */ - def decodeBase64Raw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { - val Pad = alphabet.pad - def go(remAcc:BitVector): Stream[F, Byte] => Pull[F, Byte, Unit] = { - _.pull.unconsChunk flatMap { - case None => Pull.done - - case Some((chunk,tl)) => - val bv = chunk2ByteVector(chunk) - var acc = remAcc - var idx = 0 - var term = false - try { - bv.foreach { b => - b.toChar match { - case c if alphabet.ignore(c) => // ignore no-op - case Pad => term = true - case c => - if (!term) acc = acc ++ BitVector(alphabet.toIndex(c)).drop(2) - else { - throw new IllegalArgumentException(s"Unexpected character '$c' at index $idx after padding character; only '=' and whitespace characters allowed after first padding character") - } - } - idx = idx + 1 - } - val aligned = (acc.size / 8) * 8 - if (aligned <= 0 && !term) go(acc)(tl) - else { - val (out, rem) = acc.splitAt(aligned) - if (term) Pull.output(ByteVectorChunk(out.toByteVector)) - else Pull.output(ByteVectorChunk(out.toByteVector)) >> go(rem)(tl) - } - - } catch { - case e: IllegalArgumentException => - Pull.raiseError(new Throwable(s"Invalid base 64 encoding at index $idx", e)) - } - } - } - src => go(BitVector.empty)(src).stream - - } - - /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ - def decodeBase64Url[F[_]]:Pipe[F, Byte, Byte] = - decodeBase64Raw(Alphabets.Base64Url) - - /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ - def decodeBase64[F[_]]:Pipe[F, Byte, Byte] = - decodeBase64Raw(Alphabets.Base64) - - /** converts chunk of bytes to ByteVector **/ - def chunk2ByteVector(chunk: Chunk[Byte]):ByteVector = { - chunk match { - case bv: ByteVectorChunk => bv.toByteVector - case other => - val bs = other.toBytes - ByteVector(bs.values, bs.offset, bs.size) - } - } - - /** converts ByteVector to chunk **/ - def byteVector2Chunk(bv: ByteVector): Chunk[Byte] = { - ByteVectorChunk(bv) - } +// /** +// * Encodes bytes to base64 encoded bytes [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]] +// * Encoding is done lazily to support very large Base64 bodies i.e. email, attachments..) +// * @param alphabet Alphabet to use +// * @return +// */ +// def encodeBase64Raw[F[_]](alphabet:Base64Alphabet): Pipe[F, Byte, Byte] = { +// def go(rem:ByteVector): Stream[F,Byte] => Pull[F, Byte, Unit] = { +// _.pull.uncons flatMap { +// case None => +// if (rem.size == 0) Pull.done +// else Pull.output(ByteVectorChunk(ByteVector.view(rem.toBase64(alphabet).getBytes))) +// +// case Some((chunk, tl)) => +// val n = rem ++ chunk2ByteVector(chunk) +// if (n.size/3 > 0) { +// val pad = n.size % 3 +// val enc = n.dropRight(pad) +// val out = Array.ofDim[Byte]((enc.size.toInt / 3) * 4) +// var pos = 0 +// enc.toBitVector.grouped(6) foreach { group => +// val idx = group.padTo(8).shiftRight(2, signExtension = false).toByteVector.head +// out(pos) = alphabet.toChar(idx).toByte +// pos = pos + 1 +// } +// Pull.output(ByteVectorChunk(ByteVector.view(out))) >> go(n.takeRight(pad))(tl) +// } else { +// go(n)(tl) +// } +// +// } +// +// } +// src => go(ByteVector.empty)(src).stream +// } +// +// /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ +// def encodeBase64Url[F[_]]:Pipe[F, Byte, Byte] = +// encodeBase64Raw(Alphabets.Base64Url) +// +// /** encodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ +// def encodeBase64[F[_]]:Pipe[F, Byte, Byte] = +// encodeBase64Raw[F](Alphabets.Base64) +// +// +// /** +// * Decodes base64 encoded stream with supplied alphabet. Whitespaces are ignored. +// * Decoding is lazy to support very large Base64 bodies (i.e. email) +// */ +// def decodeBase64Raw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { +// val Pad = alphabet.pad +// def go(remAcc:BitVector): Stream[F, Byte] => Pull[F, Byte, Unit] = { +// _.pull.unconsChunk flatMap { +// case None => Pull.done +// +// case Some((chunk,tl)) => +// val bv = chunk2ByteVector(chunk) +// var acc = remAcc +// var idx = 0 +// var term = false +// try { +// bv.foreach { b => +// b.toChar match { +// case c if alphabet.ignore(c) => // ignore no-op +// case Pad => term = true +// case c => +// if (!term) acc = acc ++ BitVector(alphabet.toIndex(c)).drop(2) +// else { +// throw new IllegalArgumentException(s"Unexpected character '$c' at index $idx after padding character; only '=' and whitespace characters allowed after first padding character") +// } +// } +// idx = idx + 1 +// } +// val aligned = (acc.size / 8) * 8 +// if (aligned <= 0 && !term) go(acc)(tl) +// else { +// val (out, rem) = acc.splitAt(aligned) +// if (term) Pull.output(ByteVectorChunk(out.toByteVector)) +// else Pull.output(ByteVectorChunk(out.toByteVector)) >> go(rem)(tl) +// } +// +// } catch { +// case e: IllegalArgumentException => +// Pull.raiseError(new Throwable(s"Invalid base 64 encoding at index $idx", e)) +// } +// } +// } +// src => go(BitVector.empty)(src).stream +// +// } +// +// /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ +// def decodeBase64Url[F[_]]:Pipe[F, Byte, Byte] = +// decodeBase64Raw(Alphabets.Base64Url) +// +// /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ +// def decodeBase64[F[_]]:Pipe[F, Byte, Byte] = +// decodeBase64Raw(Alphabets.Base64) /** helper to create named daemon thread factories **/ def mkThreadFactory(name: String, daemon: Boolean, exitJvmOnFatalError: Boolean = true): ThreadFactory = { diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index f60f548..20ce5fe 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -1,19 +1,15 @@ package spinoco.fs2.http.websocket -import java.nio.channels.AsynchronousChannelGroup -import java.util.concurrent.Executors - -import cats.Applicative -import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, ConcurrentEffect, Timer} +import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} import fs2.Chunk.ByteVectorChunk import fs2._ -import fs2.async.mutable.Queue +import fs2.concurrent.Queue +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scodec.Attempt.{Failure, Successful} import scodec.bits.ByteVector import scodec.{Codec, Decoder, Encoder} - import spinoco.fs2.http.HttpResponse import spinoco.protocol.http.codec.{HttpRequestHeaderCodec, HttpResponseHeaderCodec} import spinoco.protocol.http.header._ @@ -22,8 +18,7 @@ import spinoco.protocol.http.header.value.ProductDescription import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} import spinoco.protocol.websocket.{OpCode, WebSocketFrame} import spinoco.protocol.websocket.codec.WebSocketFrameCodec -import spinoco.fs2.http.util.chunk2ByteVector -import scala.concurrent.ExecutionContext + import scala.concurrent.duration._ import scala.util.Random @@ -52,7 +47,7 @@ object WebSocket { , maxFrameSize: Int = 1024*1024 )(header: HttpRequestHeader, input:Stream[F,Byte]): Stream[F,HttpResponse[F]] = { Stream.emit( - impl.verifyHeaderRequest[F](header).right.map { key => + impl.verifyHeaderRequest[F](header).map { key => val respHeader = impl.computeHandshakeResponse(header, key) HttpResponse(respHeader, input through impl.webSocketOf(pipe, pingInterval, maxFrameSize, client2Server = false)) }.merge @@ -81,7 +76,7 @@ object WebSocket { * @param responseCodec Codec to decode HttpResponse Header * */ - def client[F[_] : ConcurrentEffect : Timer, I : Decoder, O : Encoder]( + def client[F[_] : ConcurrentEffect: ContextShift : Timer, I : Decoder, O : Encoder]( request: WebSocketRequest , pipe: Pipe[F, Frame[I], Frame[O]] , maxHeaderSize: Int = 4096 @@ -89,23 +84,24 @@ object WebSocket { , maxFrameSize: Int = 1024*1024 , requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec - , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) - , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { + )( + socketGroup: SocketGroup + , tlsContext: TLSContext + ): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize)) - .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } + Stream.resource(socketGroup.client[F](address, receiveBufferSize = receiveBufferSize)) + .flatMap { socket => if (request.secure) Stream.resource(clientLiftToSecure(tlsContext)(socket, request.hostPort)) else Stream.emit(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) requestCodec.encode(header) match { - case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket request: $err")) + case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to encode websocket request: $err")) case Successful(headerBits) => eval(socket.write(ByteVectorChunk(headerBits.bytes ++ `\r\n\r\n`))).flatMap { _ => socket.reads(receiveBufferSize) through httpHeaderAndBody(maxHeaderSize) flatMap { case (respHeaderBytes, body) => responseCodec.decodeValue(respHeaderBytes.bits) match { - case Failure(err) => raiseError(new Throwable(s"Failed to decode websocket response: $err")) + case Failure(err) => Stream.raiseError[F](new Throwable(s"Failed to decode websocket response: $err")) case Successful(responseHeader) => impl.validateResponse[F](header, responseHeader, fingerprint).flatMap { case Some(resp) => emit(Some(resp)) @@ -169,11 +165,11 @@ object WebSocket { }.getOrElse(Left(badRequest("Missing Sec-WebSocket-Key header"))) for { - _ <- version.right - _ <- host.right - _ <- upgrade.right - _ <- connection.right - key <- webSocketKey.right + _ <- version + _ <- host + _ <- upgrade + _ <- connection + key <- webSocketKey } yield key } @@ -209,7 +205,7 @@ object WebSocket { , maxFrameSize: Int , client2Server: Boolean ):Pipe[F, Byte, Byte] = { source: Stream[F, Byte] => Stream.suspend { - Stream.eval(async.unboundedQueue[F, PingPong]).flatMap { pingPongQ => + Stream.eval(Queue.unbounded[F, PingPong]).flatMap { pingPongQ => val metronome: Stream[F, Unit] = pingInterval match { case fin: FiniteDuration => Stream.awakeEvery[F](fin).map { _ => () } case inf => Stream.empty @@ -269,7 +265,7 @@ object WebSocket { * * @param maxFrameSize Maximum size of the frame, including its header. */ - def decodeWebSocketFrame[F[_]](maxFrameSize: Int , flag: Boolean): Pipe[F, Byte, WebSocketFrame] = { + def decodeWebSocketFrame[F[_]: RaiseThrowable](maxFrameSize: Int , flag: Boolean): Pipe[F, Byte, WebSocketFrame] = { // Returns list of raw frames and tail of // the buffer. Tail of the buffer cant be empty // (or non-empty if last one frame isn't finalized). @@ -282,11 +278,11 @@ object WebSocket { def go(buff: ByteVector): Stream[F, Byte] => Pull[F, WebSocketFrame, Unit] = { h0 => if (buff.size > maxFrameSize) Pull.raiseError(new Throwable(s"Size of websocket frame exceeded max size: $maxFrameSize, current: ${buff.size}, $buff")) else { - h0.pull.unconsChunk flatMap { + h0.pull.uncons flatMap { case None => Pull.done // todo: is ok to silently ignore buffer remainder ? case Some((chunk, tl)) => - val data = buff ++ chunk2ByteVector(chunk) + val data = buff ++ chunk.toByteVector cutFrames(data) match { case (rawFrames, _) if rawFrames.isEmpty => go(data)(tl) case (rawFrames, dataTail) => @@ -316,7 +312,7 @@ object WebSocket { * * @param pongQ Queue to notify about ping/pong frames. */ - def webSocketFrame2Frame[F[_], A](pongQ: Queue[F, PingPong])(implicit R: Decoder[A]): Pipe[F, WebSocketFrame, Frame[A]] = { + def webSocketFrame2Frame[F[_]: RaiseThrowable, A](pongQ: Queue[F, PingPong])(implicit R: Decoder[A]): Pipe[F, WebSocketFrame, Frame[A]] = { def decode(from: Vector[WebSocketFrame]):Pull[F, Frame[A], A] = { val bs = from.map(_.payload).reduce(_ ++ _) R.decodeValue(bs.bits) match { @@ -347,7 +343,7 @@ object WebSocket { * Encodes received frome to WebSocketFrame. * @param maskKey A funtion that allows to generate random masking key. Masking is applied at client -> server direction only. */ - def frame2WebSocketFrame[F[_], A](maskKey: => Option[Int])(implicit W: Encoder[A]): Pipe[F, Frame[A], WebSocketFrame] = { + def frame2WebSocketFrame[F[_]: RaiseThrowable, A](maskKey: => Option[Int])(implicit W: Encoder[A]): Pipe[F, Frame[A], WebSocketFrame] = { _.flatMap { frame => W.encode(frame.a) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode frame: $err (frame: $frame)")) @@ -368,7 +364,7 @@ object WebSocket { * @tparam F * @return */ - def encodeWebSocketFrame[F[_]](flag: Boolean): Pipe[F, WebSocketFrame, Byte] = { + def encodeWebSocketFrame[F[_]: RaiseThrowable](flag: Boolean): Pipe[F, WebSocketFrame, Byte] = { _.append(Stream.emit(closeFrame)).flatMap { wsf => WebSocketFrameCodec.codec.encode(wsf) match { case Failure(err) => Stream.raiseError(new Throwable(s"Failed to encode websocket frame: $err (frame: $wsf)")) @@ -454,7 +450,7 @@ object WebSocket { * @param expectFingerPrint expected fingerprint in header * @return */ - def validateResponse[F[_]]( + def validateResponse[F[_]: RaiseThrowable]( request: HttpRequestHeader , response: HttpResponseHeader , expectFingerPrint: ByteVector diff --git a/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala b/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala index 0732d92..c428d1b 100644 --- a/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpRequestSpec.scala @@ -11,7 +11,6 @@ import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} object HttpRequestSpec extends Properties("HttpRequest") { - import spinoco.fs2.http.util.chunk2ByteVector property("encode") = secure { @@ -22,7 +21,7 @@ object HttpRequestSpec extends Properties("HttpRequest") { HttpRequest.toStream(request, HttpRequestHeaderCodec.defaultCodec) - .chunks.compile.toVector.map { _.map(chunk2ByteVector).reduce { _ ++ _ }.decodeUtf8 } + .chunks.compile.toVector.map { _.map(_.toByteVector).reduce { _ ++ _ }.decodeUtf8 } .unsafeRunSync() ?= Right(Seq( "GET /hello-world.html HTTP/1.1" @@ -49,7 +48,7 @@ object HttpRequestSpec extends Properties("HttpRequest") { .covary[IO] .through(HttpRequest.fromStream[IO](4096,HttpRequestHeaderCodec.defaultCodec)) .flatMap { case (header, body) => - Stream.eval(body.chunks.compile.toVector.map(_.map(chunk2ByteVector).reduce(_ ++ _).decodeUtf8)).map { bodyString => + Stream.eval(body.chunks.compile.toVector.map(_.map(_.toByteVector).reduce(_ ++ _).decodeUtf8)).map { bodyString => header -> bodyString } }.compile.toVector.unsafeRunSync() ?= Vector( diff --git a/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala b/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala index 6a278b1..2bdbfc2 100644 --- a/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpResponseSpec.scala @@ -8,7 +8,6 @@ import scodec.Attempt import spinoco.protocol.http.header._ import spinoco.protocol.http.codec.HttpResponseHeaderCodec import spinoco.protocol.http.{HttpResponseHeader, HttpStatusCode} -import spinoco.fs2.http.util.chunk2ByteVector import spinoco.protocol.mime.{ContentType, MIMECharset, MediaType} @@ -21,7 +20,7 @@ object HttpResponseSpec extends Properties("HttpResponse") { .withUtf8Body("Hello World") HttpResponse.toStream(response, HttpResponseHeaderCodec.defaultCodec) - .chunks.compile.toVector.map { _.map(chunk2ByteVector).reduce { _ ++ _ }.decodeUtf8 } + .chunks.compile.toVector.map { _.map(_.toByteVector).reduce { _ ++ _ }.decodeUtf8 } .unsafeRunSync() ?= Right(Seq( "HTTP/1.1 200 OK" diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 73689b1..0fa9c1b 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -4,6 +4,8 @@ import java.net.InetSocketAddress import cats.effect.IO import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import org.scalacheck.Properties import org.scalacheck.Prop._ import spinoco.fs2.http @@ -23,7 +25,7 @@ object HttpServerSpec extends Properties("HttpServer"){ if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct0) => ct0 }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size))) @@ -43,23 +45,23 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests and then make sure all of them pass within timeout val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext) : Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map(resp => idx -> (resp.header.status == HttpStatusCode.Ok)) }} } - (Stream( - http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients) - .parJoin(MaxConcurrency) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) - + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream( + http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)) + .parJoin[IO, (Int, Boolean)](MaxConcurrency) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } @@ -68,12 +70,12 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, and then make sure all of them pass within timeout with body echoed back val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - .withBody("Hello")(BodyEncoder.utf8String) + .withBody("Hello")(BodyEncoder.utf8String, implicitly[RaiseThrowable[IO]]) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).flatMap { resp => Stream.eval(resp.bodyAsString).map { attempt => @@ -84,13 +86,14 @@ object HttpServerSpec extends Properties("HttpServer"){ }} } - ( Stream.sleep_[IO](3.second) ++ - (Stream( - http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService).drain - ).covary[IO] ++ Stream.sleep_[IO](3.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(60.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + ( Stream.sleep_[IO](3.second) ++ + (Stream( + http.server[IO](new InetSocketAddress("127.0.0.1", 9090))(echoService)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](3.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(60.seconds).map { _.size } ?= Some(count) } @@ -99,27 +102,30 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, server shall fail each, nevertheless response shall be delivered. val count = 1 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map { resp => idx -> (resp.header.status == HttpStatusCode.BadRequest) }}} } - (Stream.sleep_[IO](3.second) ++ - (Stream( - http.server[IO]( - new InetSocketAddress("127.0.0.1", 9090) - , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } - )(failRouteService).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream.sleep_[IO](3.second) ++ + (Stream( + http.server[IO]( + new InetSocketAddress("127.0.0.1", 9090) + , requestFailure = _ => { + Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] + } + )(failRouteService)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } @@ -128,11 +134,11 @@ object HttpServerSpec extends Properties("HttpServer"){ // run up to count parallel requests with body, server shall fail each (when sending body), nevertheless response shall be delivered. val count = 100 - def clients : Stream[IO, Stream[IO, (Int, Boolean)]] = { + def clients(socketGroup: SocketGroup, tls: TLSContext): Stream[IO, Stream[IO, (Int, Boolean)]] = { val request = HttpRequest.get[IO](Uri.parse("http://127.0.0.1:9090/echo").getOrElse(throw new Throwable("Invalid uri"))) - Stream.eval(client[IO]()).flatMap { httpClient => + Stream.eval(client[IO]()(socketGroup, tls)).flatMap { httpClient => Stream.range(0,count).unchunk.map { idx => httpClient.request(request).map { resp => idx -> (resp.header.status == HttpStatusCode.Ok) // body won't be consumed, and request was succesfully sent @@ -141,16 +147,17 @@ object HttpServerSpec extends Properties("HttpServer"){ } } - (Stream.sleep_[IO](3.second) ++ - (Stream( - http.server[IO]( - new InetSocketAddress("127.0.0.1", 9090) - , sendFailure = (_, _, _) => Stream.empty - )(failingResponse).drain - ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) - .take(count) - .filter { case (idx, success) => success } - .compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) + Stream.resource(httpResources).flatMap { case (group, tls) => + (Stream.sleep_[IO](3.second) ++ + (Stream( + http.server[IO]( + new InetSocketAddress("127.0.0.1", 9090) + , sendFailure = (_, _, _) => Stream.empty + )(failingResponse)(group).drain + ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients(group, tls)).parJoin[IO, (Int, Boolean)](MaxConcurrency)) + .take(count) + .filter { case (_, success) => success } + }.compile.toVector.unsafeRunTimed(30.seconds).map { _.size } ?= Some(count) } diff --git a/src/test/scala/spinoco/fs2/http/Resources.scala b/src/test/scala/spinoco/fs2/http/Resources.scala index fc6cb01..83b6e04 100644 --- a/src/test/scala/spinoco/fs2/http/Resources.scala +++ b/src/test/scala/spinoco/fs2/http/Resources.scala @@ -3,7 +3,9 @@ package spinoco.fs2.http import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors -import cats.effect.{Concurrent, IO, Timer} +import cats.effect.{Blocker, Concurrent, ContextShift, IO, Resource, Timer} +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import scala.concurrent.ExecutionContext @@ -11,8 +13,16 @@ import scala.concurrent.ExecutionContext object Resources { implicit val _timer: Timer[IO] =IO.timer(ExecutionContext.Implicits.global) - implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_timer) + implicit val _contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) + implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_contextShift) implicit val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-spec-AG", daemon = true))) + val httpResources: Resource[IO, (SocketGroup, TLSContext)] = { + Blocker[IO].flatMap { blocker => + SocketGroup(blocker).evalMap { group => + TLSContext.system(blocker).map(group -> _) + } + } + } } diff --git a/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala b/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala index 3ca1457..8ae2b03 100644 --- a/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala +++ b/src/test/scala/spinoco/fs2/http/internal/ChunkedEncodingSpec.scala @@ -5,7 +5,6 @@ import fs2._ import org.scalacheck.Properties import org.scalacheck.Prop._ import scodec.bits.ByteVector -import spinoco.fs2.http.util.chunk2ByteVector object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { @@ -16,7 +15,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { (in through ChunkedEncoding.encode through ChunkedEncoding.decode(1024)) .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right( strings.mkString @@ -42,7 +41,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { .covary[IO] .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right( "Wikipedia in\r\n\r\nchunks." @@ -62,7 +61,7 @@ object ChunkedEncodingSpec extends Properties("ChunkedEncoding") { (chunks through ChunkedEncoding.encode) .chunks .compile.toVector - .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ chunk2ByteVector(n) }) + .map(_.foldLeft(ByteVector.empty){ case (bv, n) => bv ++ n.toByteVector }) .map(_.decodeUtf8) .unsafeRunSync() ?= Right(wikiExample) } diff --git a/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala b/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala index 5170f75..dcbdd19 100644 --- a/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala +++ b/src/test/scala/spinoco/fs2/http/internal/HttpClientApp.scala @@ -13,7 +13,8 @@ object HttpClientApp extends App { - http.client[IO]().flatMap { httpClient => + httpResources.use { case (group, tls) => + http.client[IO]()(group, tls).flatMap { httpClient => httpClient.request(HttpRequest.get(Uri.https("www.google.cz", "/"))).flatMap { resp => Stream.eval(resp.bodyAsString) @@ -21,5 +22,5 @@ object HttpClientApp extends App { println } - }.unsafeRunSync() + }}.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala index 1c9399a..566a38f 100644 --- a/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala +++ b/src/test/scala/spinoco/fs2/http/internal/HttpServerApp.scala @@ -19,13 +19,15 @@ object HttpServerApp extends App { if (request.path != Uri.Path / "echo") Stream.emit(HttpResponse[IO](HttpStatusCode.Ok).withUtf8Body("Hello World")).covary[IO] else { val ct = request.headers.collectFirst { case `Content-Type`(ct) => ct }.getOrElse(ContentType.BinaryContent(MediaType.`application/octet-stream`, None)) - val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0l) + val size = request.headers.collectFirst { case `Content-Length`(sz) => sz }.getOrElse(0L) val ok = HttpResponse(HttpStatusCode.Ok).chunkedEncoding.withContentType(ct).withBodySize(size) Stream.emit(ok.copy(body = body.take(size))) } } - http.server(new InetSocketAddress("127.0.0.1", 9090))(service).compile.drain.unsafeRunSync() + Stream.resource(httpResources).flatMap { case (group, _) => + http.server(new InetSocketAddress("127.0.0.1", 9090))(service)(group) + }.compile.drain.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala b/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala index 2a42c6a..fe0de28 100644 --- a/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala +++ b/src/test/scala/spinoco/fs2/http/sse/SSEEncodingSpec.scala @@ -8,7 +8,6 @@ import org.scalacheck.Prop._ import scodec.bits.ByteVector import spinoco.fs2.http.sse.SSEMessage.SSEData -import spinoco.fs2.http.util.chunk2ByteVector object SSEEncodingSpec extends Properties("SSEEncoding") { @@ -20,7 +19,7 @@ object SSEEncodingSpec extends Properties("SSEEncoding") { , SSEMessage.SSEData(Seq("data4"), Some("event1"), None) , SSEMessage.SSEData(Seq("data5"), None, Some("id1")) , SSEMessage.SSEData(Seq("data6"), Some("event2"), Some("id2")) - ).covary[IO].through(SSEEncoding.encode[IO]).chunks.compile.toVector.map { _ map chunk2ByteVector reduce (_ ++ _) decodeUtf8 }.unsafeRunSync() ?= + ).covary[IO].through(SSEEncoding.encode[IO]).chunks.compile.toVector.map { _ map(_.toByteVector) reduce (_ ++ _) decodeUtf8 }.unsafeRunSync() ?= Right( "data: data1\n\ndata: data2\ndata: data3\n\nevent: event1\ndata: data4\n\ndata: data5\nid: id1\n\nevent: event2\ndata: data6\nid: id2\n\n" ) diff --git a/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala b/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala deleted file mode 100644 index c4cdb00..0000000 --- a/src/test/scala/spinoco/fs2/http/util/UtilSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -package spinoco.fs2.http.util - -import cats.effect.IO -import fs2._ -import org.scalacheck.Prop._ -import org.scalacheck.{Arbitrary, Gen, Properties} -import scodec.bits.Bases.{Alphabets, Base64Alphabet} -import scodec.bits.ByteVector -import shapeless.the -import spinoco.fs2.http.util - - - -object UtilSpec extends Properties("util"){ - - case class EncodingSample(chunkSize:Int, text:String, alphabet: Base64Alphabet) - - implicit val encodingTestInstance : Arbitrary[EncodingSample] = Arbitrary { - for { - s <- the[Arbitrary[String]].arbitrary - chunkSize <- Gen.choose(1,s.length max 1) - alphabet <- Gen.oneOf(Seq(Alphabets.Base64Url, Alphabets.Base64)) - } yield EncodingSample(chunkSize, s, alphabet) - } - - property("encodes.base64") = forAll { sample: EncodingSample => - Stream.chunk[IO, Byte](Chunk.bytes(sample.text.getBytes)).chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.encodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .map(_.decodeUtf8) - .compile.toVector.unsafeRunSync() ?= Vector( - Right(ByteVector.view(sample.text.getBytes).toBase64(sample.alphabet)) - ) - } - - - property("decodes.base64") = forAll { sample: EncodingSample => - val encoded = ByteVector.view(sample.text.getBytes).toBase64(sample.alphabet) - Stream.chunk[IO, Byte](Chunk.bytes(encoded.getBytes)) - .chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.decodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .map(_.decodeUtf8) - .compile.toVector.unsafeRunSync() ?= Vector( - Right(sample.text) - ) - } - - property("encodes.decodes.base64") = forAll { sample: EncodingSample => - val r = - Stream.chunk[IO, Byte](Chunk.bytes(sample.text.getBytes)).covary[IO].chunkLimit(sample.chunkSize).flatMap(Stream.chunk[IO, Byte]) - .through(util.encodeBase64Raw(sample.alphabet)) - .through(util.decodeBase64Raw(sample.alphabet)) - .chunks - .fold(ByteVector.empty){ case (acc, n) => acc ++ chunk2ByteVector(n)} - .compile.toVector.unsafeRunSync() - - - - r ?= Vector(ByteVector.view(sample.text.getBytes)) - - } - - - -} diff --git a/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala b/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala index ffbea25..43aba9a 100644 --- a/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala +++ b/src/test/scala/spinoco/fs2/http/websocket/WebSocketClientApp.scala @@ -22,11 +22,13 @@ object WebSocketClientApp extends App { implicit val codecString: Codec[String] = utf8 - WebSocket.client( - WebSocketRequest.ws("echo.websocket.org", "/", QueryParameter.single("encoding", "text")) - , wspipe - ).map { x => - println(("RESULT OF WS", x)) + Stream.resource(httpResources).flatMap { case (group, tls) => + WebSocket.client( + WebSocketRequest.ws("echo.websocket.org", "/", QueryParameter.single("encoding", "text")) + , wspipe + )(group, tls).map { x => + println(("RESULT OF WS", x)) + } }.compile.drain.unsafeRunSync() } diff --git a/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala b/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala index bd542b1..25572be 100644 --- a/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala +++ b/src/test/scala/spinoco/fs2/http/websocket/WebSocketSpec.scala @@ -4,6 +4,8 @@ import java.net.InetSocketAddress import cats.effect.IO import fs2._ +import fs2.io.tcp.SocketGroup +import fs2.io.tls.TLSContext import org.scalacheck.{Gen, Prop, Properties} import org.scalacheck.Prop._ import scodec.Codec @@ -43,24 +45,27 @@ object WebSocketSpec extends Properties("WebSocket") { output merge inbound.take(5).evalMap { in => IO { received = received :+ in }}.drain } - val serverStream = + def serverStream(group: SocketGroup) = http.server[IO](new InetSocketAddress("127.0.0.1", 9090))( server ( pipe = serverEcho , pingInterval = 500.millis , handshakeTimeout = 10.seconds ) - ) + )(group) - val clientStream = + def clientStream(group: SocketGroup, tls: TLSContext) = Stream.sleep_[IO](3.seconds) ++ WebSocket.client( WebSocketRequest.ws("127.0.0.1", 9090, "/") , clientData - ) + )(group, tls) - val resultClient = - (serverStream.drain mergeHaltBoth clientStream).compile.toVector.unsafeRunTimed(20.seconds) + val resultClient = { + Stream.resource(httpResources).flatMap { case (group, tls) => + (serverStream(group).drain mergeHaltBoth clientStream(group, tls)) + }.compile.toVector.unsafeRunTimed(20.seconds) + } (resultClient ?= Some(Vector(None))) && (received.size ?= 5) diff --git a/version.sbt b/version.sbt index acfbdb6..404aa03 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.4.0-SNAPSHOT" +version in ThisBuild := "0.5.0-SNAPSHOT" From 645c57adc4b3f8ba76812c339f948a52d0980e84 Mon Sep 17 00:00:00 2001 From: Adam Chlupacek Date: Sat, 2 May 2020 20:40:20 +0200 Subject: [PATCH 2/3] Fix travis dependencies. --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index b92c9b8..ba1066f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,8 @@ language : scala scala: - - 2.11.12 - - 2.12.6 + - 2.12.11 + - 2.13.0 dist: trusty From 5c2632c9b5e5c1feb8af134dde4c10840b512772 Mon Sep 17 00:00:00 2001 From: Adam Chlupacek Date: Sun, 3 May 2020 10:18:13 +0200 Subject: [PATCH 3/3] Address review comments, remove superfluous implicit dependencies and allow for apply summoing of HttpClient. --- .../scala/spinoco/fs2/http/HttpClient.scala | 27 +++++++++---------- .../scala/spinoco/fs2/http/HttpServer.scala | 6 ++--- src/main/scala/spinoco/fs2/http/http.scala | 14 ++++++---- .../spinoco/fs2/http/internal/internal.scala | 2 +- .../spinoco/fs2/http/HttpServerSpec.scala | 4 +-- 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 76b8e68..c288288 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -1,9 +1,10 @@ package spinoco.fs2.http -import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} import java.util.concurrent.TimeUnit +import cats.Applicative import cats.effect._ import fs2._ import fs2.concurrent.SignallingRef @@ -100,23 +101,24 @@ trait HttpClient[F[_]] { } - object HttpClient { +object HttpClient { + @inline def apply[F[_]](implicit instance: HttpClient[F]): HttpClient[F] = instance - /** - * Creates an Http Client - * @param requestCodec Codec used to decode request header - * @param responseCodec Codec used to encode response header - * @param sslExecutionContext Strategy used when communication with SSL (https or wss) - * @param sslContext SSL Context to use with SSL Client (https, wss) - */ - def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( + /** + * Creates an Http Client + * @param requestCodec Codec used to decode request header + * @param responseCodec Codec used to encode response header + * @param socketGroup Group of sockets from which to create the client for http request. + * @param tlsContext The TLS context used for elevating the http socket to https. + */ + def mk[F[_]: ConcurrentEffect: ContextShift: Timer]( requestCodec : Codec[HttpRequestHeader] , responseCodec : Codec[HttpResponseHeader] )( socketGroup: SocketGroup , tlsContext: TLSContext - ):F[HttpClient[F]] = Sync[F].delay { + ):F[HttpClient[F]] = Applicative[F].pure { new HttpClient[F] { def request( request: HttpRequest[F] @@ -188,9 +190,6 @@ trait HttpClient[F[_]] { } } } - } - - } diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 06ddbe4..b6c1e90 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -2,7 +2,7 @@ package spinoco.fs2.http import java.net.InetSocketAddress -import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef @@ -34,9 +34,9 @@ object HttpServer { * This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure ) * @param sendFailure A function to be evaluated on failure to process the the response. * Request is not suplied if failure happened before request was constructed. - * + * @param socketGroup Group of sockets from which to create the server socket. */ - def apply[F[_] : ConcurrentEffect : Timer: ContextShift]( + def mk[F[_]: ConcurrentEffect: ContextShift]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index c7c5308..a078a98 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -26,8 +26,9 @@ package object http { * @param requestHeaderReceiveTimeout A timeout to await request header to be fully received. * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response + * @param socketGroup Group of sockets from which to create the server socket. */ - def server[F[_] : ConcurrentEffect : Timer: ContextShift]( + def server[F[_] : ConcurrentEffect: ContextShift]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 @@ -37,7 +38,7 @@ package object http { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] - )(socketGroup: SocketGroup):Stream[F,Unit] = HttpServer( + )(socketGroup: SocketGroup):Stream[F,Unit] = HttpServer.mk( maxConcurrent = maxConcurrent , receiveBufferSize = receiveBufferSize , maxHeaderSize = maxHeaderSize @@ -56,14 +57,17 @@ package object http { * * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header + * @param socketGroup Group of sockets from which to create the client for http request. + * @param tlsContext The TLS context used for elevating the http socket to https. */ - def client[F[_]: ConcurrentEffect : Timer: ContextShift]( + def client[F[_]: ConcurrentEffect :Timer: ContextShift]( requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec )( socketGroup: SocketGroup , tlsContext: TLSContext - ):F[HttpClient[F]] = - HttpClient(requestCodec, responseCodec)(socketGroup, tlsContext) + ):F[HttpClient[F]] = { + HttpClient.mk(requestCodec, responseCodec)(socketGroup, tlsContext) + } } diff --git a/src/main/scala/spinoco/fs2/http/internal/internal.scala b/src/main/scala/spinoco/fs2/http/internal/internal.scala index cfe1a38..238ea27 100644 --- a/src/main/scala/spinoco/fs2/http/internal/internal.scala +++ b/src/main/scala/spinoco/fs2/http/internal/internal.scala @@ -94,7 +94,7 @@ package object internal { * @param shallTimeout If true, timeout will be applied, if false timeout won't be applied. * @param chunkSize Size of chunk to read up to */ - def readWithTimeout[F[_] : Sync]( + def readWithTimeout[F[_]: Sync]( socket: Socket[F] , timeout: FiniteDuration , shallTimeout: F[Boolean] diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 00de35f..b6606c2 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -116,7 +116,7 @@ object HttpServerSpec extends Properties("HttpServer"){ Stream.resource(httpResources).flatMap { case (group, tls) => (Stream.sleep_[IO](3.second) ++ (Stream( - HttpServer[IO]( + HttpServer.mk[IO]( bindTo = new InetSocketAddress("127.0.0.1", 9090) , service = failRouteService , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } @@ -150,7 +150,7 @@ object HttpServerSpec extends Properties("HttpServer"){ Stream.resource(httpResources).flatMap { case (group, tls) => (Stream.sleep_[IO](3.second) ++ (Stream( - HttpServer[IO]( + HttpServer.mk[IO]( bindTo = new InetSocketAddress("127.0.0.1", 9090) , service = failingResponse , requestFailure = HttpServer.handleRequestParseError[IO] _