Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions project/MimaSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object MimaSettings {
ProblemFilters.exclude[MissingClassProblem]("zio.http.netty.NettyHeaderEncoding"),
ProblemFilters.exclude[MissingClassProblem]("zio.http.netty.NettyHeaderEncoding$"),
exclude[Problem]("zio.http.template2.*"),
ProblemFilters.exclude[DirectMissingMethodProblem]("zio.http.Body.fromFile"),
),
mimaFailOnProblem := failOnProblem,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ trait HandlerPlatformSpecific {
*/
def fromResource(path: String, charset: Charset = Charsets.Utf8)(implicit
trace: Trace,
): Handler[Any, Throwable, Any, Response] =
): Handler[Any, Throwable, Request, Response] =
Handler.fromZIO {
ZIO
.attemptBlocking(getClass.getClassLoader.getResource(path))
Expand All @@ -29,7 +29,7 @@ trait HandlerPlatformSpecific {
private[zio] def fromResourceWithURL(
url: java.net.URL,
charset: Charset,
)(implicit trace: Trace): Handler[Any, Throwable, Any, Response] = {
)(implicit trace: Trace): Handler[Any, Throwable, Request, Response] =
url.getProtocol match {
case "file" => Handler.fromFile(new File(url.getPath), charset)
case "jar" =>
Expand Down Expand Up @@ -70,7 +70,6 @@ trait HandlerPlatformSpecific {
case proto =>
Handler.fail(new IllegalArgumentException(s"Unsupported protocol: $proto"))
}
}

/**
* Attempts to retrieve files from the classpath.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ private[netty] object NettyBodyWriter {
}

body match {
case body: Body.RangedFileBody =>
println(s"[NettyBodyWriter] RangedFileBody detected, converting to StreamBody")
val stream = body.asStream
val s = StreamBody(stream, body.knownContentLength, contentType = body.contentType)
NettyBodyWriter.writeAndFlush(s, body.knownContentLength, ctx)
case body: FileBody =>
// We need to stream the file when compression is enabled otherwise the response encoding fails
val stream = ZStream.fromFile(body.file)
Expand Down
2 changes: 1 addition & 1 deletion zio-http/jvm/src/test/scala/zio/http/HandlerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ object HandlerSpec extends ZIOHttpSpec with ExitAssertion {
val tempFile = tempPath.toFile
val http = Handler.fromFileZIO(ZIO.succeed(tempFile))
for {
r <- http.apply {}
r <- http.apply(Request.get(URL.root))
tempFile <- Body.fromFile(tempFile)
} yield {
assert(r.status)(equalTo(Status.Ok)) &&
Expand Down
101 changes: 99 additions & 2 deletions zio-http/shared/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,22 @@ object Body {
/**
* Constructs a [[zio.http.Body]] from the contents of a file.
*/
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 4)(implicit trace: Trace): ZIO[Any, Nothing, Body] = {
def fromFile(
file: java.io.File,
chunkSize: Int = 1024 * 4,
start: Long = 0,
end: Option[Long] = None,
)(implicit trace: Trace): ZIO[Any, Nothing, Body] = {
ZIO.attemptBlocking(file.length()).orDie.map { fileSize =>
FileBody(file, chunkSize, fileSize)
val isRangeRequest = start > 0 || end.exists(_ < fileSize - 1)

if (isRangeRequest) {
val endByte = end.getOrElse(fileSize - 1)
val contentLength = endByte - start + 1
RangedFileBody(file, chunkSize, contentLength, start, endByte)
} else {
FileBody(file, chunkSize, fileSize)
}
}
}

Expand Down Expand Up @@ -682,6 +695,90 @@ object Body {
override def knownContentLength: Option[Long] = Some(fileSize)
}

private[zio] final case class RangedFileBody(
file: java.io.File,
chunkSize: Int,
fileSize: Long,
start: Long,
end: Long,
override val contentType: Option[Body.ContentType] = None,
) extends Body {

override def asArray(implicit trace: Trace): Task[Array[Byte]] =
ZIO.attemptBlocking {
val raf = new java.io.RandomAccessFile(file, "r")
try {
raf.seek(start)
val buffer = new Array[Byte]((end - start + 1).toInt)
raf.readFully(buffer)
buffer
} finally {
raf.close()
}
}

override def isComplete: Boolean = false

override def isEmpty: Boolean = false

override def asChunk(implicit trace: Trace): Task[Chunk[Byte]] = ???

override def asStream(implicit trace: Trace): ZStream[Any, Throwable, Byte] =
ZStream.unwrap {
ZIO.blocking {
ZIO.suspendSucceed {
try {
val totalBytesToRead = end - start + 1
val raf = new java.io.RandomAccessFile(file, "r")
raf.seek(start)

println(s"[RangedFileBody.asStream] Starting read - start=$start, end=$end, totalBytes=$totalBytesToRead")

var bytesRead = 0L

val read: Task[Option[Chunk[Byte]]] =
ZIO.suspendSucceed {
try {
val remaining = totalBytesToRead - bytesRead
if (remaining <= 0) {
Exit.none
} else {
val size = Math.min(chunkSize.toLong, remaining).toInt
val buffer = new Array[Byte](size)
val len = raf.read(buffer)
if (len > 0) {
bytesRead += len
println(
s"[RangedFileBody.asStream] Read chunk: $len bytes, total read: $bytesRead/$totalBytesToRead",
)
Exit.succeed(Some(Chunk.fromArray(buffer.slice(0, len))))
} else {
println(s"[RangedFileBody.asStream] Finished reading - total: $bytesRead bytes")
Exit.none
}
}
} catch {
case e: Throwable => Exit.fail(e)
}
}

Exit.succeed {
ZStream
.unfoldChunkZIO(read)(_.map(_.map(_ -> read)))
.ensuring(ZIO.attempt(raf.close()).ignoreLogged)
}
} catch {
case e: Throwable => Exit.fail(e)
}
}
}
}

override def contentType(newContentType: Body.ContentType): Body = copy(contentType = Some(newContentType))

override def knownContentLength: Option[Long] = Some(fileSize)
}

private[zio] final case class StreamBody(
stream: ZStream[Any, Throwable, Byte],
knownContentLength: Option[Long],
Expand Down
103 changes: 84 additions & 19 deletions zio-http/shared/src/main/scala/zio/http/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -908,15 +908,77 @@ object Handler extends HandlerPlatformSpecific with HandlerVersionSpecific {
}
}

/**
* Set MIME type in the response headers. This is only relevant in case of
* RandomAccessFile transfers as browsers use the MIME type, not the file
* extension, to determine how to process a URL.
* {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
*/
private def addMediaType(
response: Response,
pathName: String,
charset: Charset,
): ZIO[Any, Nothing, Response] = {
determineMediaType(pathName) match {
case Some(mediaType) =>
val charset0 = if (mediaType.mainType == "text" || !mediaType.binary) Some(charset) else None
ZIO.succeed(response.addHeader(Header.ContentType(mediaType, charset = charset0)))
case None => ZIO.succeed(response)
}
}

private def serveFileWithRange(
file: File,
fileSize: Long,
rangeHeader: Header.Range,
pathName: String,
)(implicit trace: Trace): ZIO[Any, Throwable, Response] = {
rangeHeader match {
case Header.Range.Single(unit, start, endOpt) if unit == "bytes" =>
val end = endOpt.getOrElse(fileSize - 1)

if (start >= fileSize || start < 0 || end >= fileSize || start > end) {
println(s"[RANGE] invalid range: start=$start, end=$end, fileSize=$fileSize")
ZIO.succeed(
Response(
status = Status.RequestedRangeNotSatisfiable,
headers = Headers(Header.ContentRange.RangeTotal("bytes", fileSize.toInt)),
),
)
} else {
println(s"[RANGE] serving bytes $start-$end de $fileSize")
Body.fromFile(file, start = start, end = Some(end)).map { body =>
Response(
status = Status.PartialContent,
body = body,
headers = Headers(
Header.ContentRange.EndTotal("bytes", start.toInt, end.toInt, fileSize.toInt),
Header.AcceptRanges.Bytes,
),
)
}
}

case _ =>
ZIO.succeed(
Response(
status = Status.RequestedRangeNotSatisfiable,
headers = Headers(Header.ContentRange.RangeTotal("bytes", fileSize.toInt)),
),
)
}
}

def fromFile[R](makeFile: => File, charset: Charset = Charsets.Utf8)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] =
): Handler[R, Throwable, Request, Response] = {
fromFileZIO(ZIO.attempt(makeFile), charset)
}

def fromFileZIO[R](getFile: ZIO[R, Throwable, File], charset: Charset = Charsets.Utf8)(implicit
trace: Trace,
): Handler[R, Throwable, Any, Response] = {
Handler.fromZIO[R, Throwable, Response](
): Handler[R, Throwable, Request, Response] = {
Handler.fromFunctionZIO[Request] { request =>
ZIO.blocking {
getFile.flatMap { file =>
if (!file.exists()) {
Expand All @@ -925,28 +987,31 @@ object Handler extends HandlerPlatformSpecific with HandlerVersionSpecific {
ZIO.fail(new AccessDeniedException(file.getAbsolutePath))
} else {
if (file.isFile) {
Body.fromFile(file).flatMap { body =>
val response = http.Response(body = body)
val pathName = file.toPath.toString

// Set MIME type in the response headers. This is only relevant in
// case of RandomAccessFile transfers as browsers use the MIME type,
// not the file extension, to determine how to process a URL.
// {{{<a href="MSDN Doc">https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type</a>}}}
determineMediaType(pathName) match {
case Some(mediaType) =>
val charset0 = if (mediaType.mainType == "text" || !mediaType.binary) Some(charset) else None
ZIO.succeed(response.addHeader(Header.ContentType(mediaType, charset = charset0)))
case None => ZIO.succeed(response)
}
val pathName = file.toPath.toString

(request.header(Header.Range) match {
case Some(rangeHeader) =>
ZIO.attemptBlocking(file.length()).orDie.flatMap { fileSize =>
serveFileWithRange(file, fileSize, rangeHeader, pathName)
}

case None =>
Body.fromFile(file).map { body =>
Response(
body = body,
headers = Headers(Header.AcceptRanges.Bytes),
)
}
}).flatMap { response =>
addMediaType(response, pathName, charset)
}
} else {
ZIO.fail(new NotDirectoryException(s"Found directory instead of a file."))
}
}
}
},
)
}
}
}

/**
Expand Down
Loading