diff --git a/dune-project b/dune-project index 6446bd94..533e54b3 100644 --- a/dune-project +++ b/dune-project @@ -39,3 +39,13 @@ (iostream-camlzip (>= 0.2.1)) (logs :with-test) (odoc :with-doc))) + +(package + (name tiny_httpd_lwt) + (synopsis "Tiny_httpd backend based on lwt.unix for OCaml 5") + (depends + (tiny_httpd (= :version)) + (lwt (>= 5.0)) + base-unix + (logs :with-test) + (odoc :with-doc))) diff --git a/echo_lwt.sh b/echo_lwt.sh new file mode 100755 index 00000000..dd5317bc --- /dev/null +++ b/echo_lwt.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec dune exec --display=quiet --profile=release "examples/echo_lwt.exe" -- $@ diff --git a/examples/dune b/examples/dune index 08d06886..a596ab63 100644 --- a/examples/dune +++ b/examples/dune @@ -8,11 +8,23 @@ (modules sse_client) (libraries unix)) +(library + (name example_vfs) + (wrapped false) + (modules vfs) + (libraries tiny_httpd)) + (executable (name echo) (flags :standard -warn-error -a+8) - (modules echo vfs) - (libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data)) + (modules echo) + (libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs)) + +(executable + (name echo_lwt) + (flags :standard -warn-error -a+8) + (modules echo_lwt) + (libraries tiny_httpd tiny_httpd_lwt logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs)) (executable (name writer) diff --git a/examples/echo_lwt.ml b/examples/echo_lwt.ml new file mode 100644 index 00000000..c66184b2 --- /dev/null +++ b/examples/echo_lwt.ml @@ -0,0 +1,380 @@ +open Tiny_httpd_core +module Log = Tiny_httpd.Log +module MFD = Tiny_httpd_multipart_form_data +module Lwt_direct = Tiny_httpd_lwt.Lwt_direct + +let now_ = Unix.gettimeofday + +let alice_text = + "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \ + sitting by her sister on the bank, and of having nothing to do: once or \ + twice she had peeped into the book her sister was reading, but it had no \ + pictures or conversations in it, thought \ + Alice So she was considering in her \ + own mind (as well as she could, for the hot day made her feel very sleepy \ + and stupid), whether the pleasure of making a daisy-chain would be worth \ + the trouble of getting up and picking the daisies, when suddenly a White \ + Rabbit with pink eyes ran close by her. There was nothing so very \ + remarkable in that; nor did Alice think it so very much out of the way to \ + hear the Rabbit say to itself, (when \ + she thought it over afterwards, it occurred to her that she ought to have \ + wondered at this, but at the time it all seemed quite natural); but when \ + the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \ + it, and then hurried on, Alice started to her feet, for it flashed across \ + her mind that she had never before seen a rabbit with either a \ + waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \ + she ran across the field after it, and fortunately was just in time to see \ + it pop down a large rabbit-hole under the hedge. In another moment down \ + went Alice after it, never once considering how in the world she was to get \ + out again. The rabbit-hole went straight on like a tunnel for some way, and \ + then dipped suddenly down, so suddenly that Alice had not a moment to think \ + about stopping herself before she found herself falling down a very deep \ + well. Either the well was very deep, or she fell very slowly, for she had \ + plenty of time as she went down to look about her and to wonder what was \ + going to happen next. First, she tried to look down and make out what she \ + was coming to, but it was too dark to see anything; then she looked at the \ + sides of the well, and noticed that they were filled with cupboards......" + +(* util: a little middleware collecting statistics *) +let middleware_stat () : Server.Middleware.t * (unit -> string) = + let n_req = ref 0 in + let total_time_ = ref 0. in + let parse_time_ = ref 0. in + let build_time_ = ref 0. in + let write_time_ = ref 0. in + + let m h req ~resp = + incr n_req; + let t1 = Request.start_time req in + let t2 = now_ () in + h req ~resp:(fun response -> + let t3 = now_ () in + resp response; + let t4 = now_ () in + total_time_ := !total_time_ +. (t4 -. t1); + parse_time_ := !parse_time_ +. (t2 -. t1); + build_time_ := !build_time_ +. (t3 -. t2); + write_time_ := !write_time_ +. (t4 -. t3)) + and get_stat () = + Printf.sprintf + "%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)" + !n_req + (!total_time_ /. float !n_req *. 1e3) + (!parse_time_ /. float !n_req *. 1e3) + (!build_time_ /. float !n_req *. 1e3) + (!write_time_ /. float !n_req *. 1e3) + in + m, get_stat + +(* ugly AF *) +let base64 x = + let ic, oc = Unix.open_process "base64" in + output_string oc x; + flush oc; + close_out oc; + let r = input_line ic in + ignore (Unix.close_process (ic, oc)); + r + +let setup_logging () = + Logs.set_reporter @@ Logs.format_reporter (); + Logs.set_level ~all:true (Some Logs.Debug) + +let setup_upload server : unit = + Server.add_route_handler_stream ~meth:`POST server + Route.(exact "upload" @/ return) + (fun req -> + let (`boundary boundary) = + match MFD.parse_content_type req.headers with + | Some b -> b + | None -> Response.fail_raise ~code:400 "no boundary found" + in + + let st = MFD.create ~boundary req.body in + let tbl = Hashtbl.create 16 in + let cur = ref "" in + let cur_kind = ref "" in + let buf = Buffer.create 16 in + let rec loop () = + match MFD.next st with + | End_of_input -> + if !cur <> "" then + Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf) + | Part headers -> + if !cur <> "" then + Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf); + (match MFD.Content_disposition.parse headers with + | Some { kind; name = Some name; filename = _ } -> + cur := name; + cur_kind := kind; + Buffer.clear buf; + loop () + | _ -> Response.fail_raise ~code:400 "content disposition missing") + | Read sl -> + Buffer.add_subbytes buf sl.bytes sl.off sl.len; + loop () + in + loop (); + + let open Tiny_httpd_html in + let data = + Hashtbl.fold + (fun name (kind, data) acc -> + Printf.sprintf "%S (kind: %S): %S" name kind data :: acc) + tbl [] + in + let html = + body [] + [ + pre [] + [ txt (Printf.sprintf "{\n%s\n}" @@ String.concat "\n" data) ]; + ] + in + Response.make_string ~code:201 @@ Ok (to_string_top html)) + +let () = + let port_ = ref 8080 in + let j = ref 32 in + let addr = ref "127.0.0.1" in + Arg.parse + (Arg.align + [ + "--port", Arg.Set_int port_, " set port"; + "-p", Arg.Set_int port_, " set port"; + "--debug", Arg.Unit setup_logging, " enable debug"; + "-j", Arg.Set_int j, " maximum number of connections"; + "--addr", Arg.Set_string addr, " binding address"; + ]) + (fun _ -> raise (Arg.Bad "")) + "echo [option]*"; + + let ev = new Lwt_engine.libev () in + Lwt_engine.set ev; + + Lwt_main.run @@ Lwt_direct.run + @@ fun () -> + let server = + Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j () + |> Lwt_direct.await + in + + Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; + let m_stats, get_stats = middleware_stat () in + Server.add_middleware server ~stage:(`Stage 1) m_stats; + + (* say hello *) + Server.add_route_handler ~meth:`GET server + Route.(exact "hello" @/ string @/ return) + (fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n"))); + + (* compressed file access *) + Server.add_route_handler ~meth:`GET server + Route.(exact "zcat" @/ string_urlencoded @/ return) + (fun path _req -> + let ic = open_in path in + let str = IO.Input.of_in_channel ic in + let mime_type = + try + let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in + try + let s = [ "Content-Type", String.trim (input_line p) ] in + ignore @@ Unix.close_process_in p; + s + with _ -> + ignore @@ Unix.close_process_in p; + [] + with _ -> [] + in + Response.make_stream ~headers:mime_type (Ok str)); + + (* echo request *) + Server.add_route_handler server + Route.(exact "echo" @/ return) + (fun req -> + let q = + Request.query req + |> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v) + |> String.concat ";" + in + Response.make_string + (Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q))); + + (* file upload *) + Server.add_route_handler_stream ~meth:`PUT server + Route.(exact "upload" @/ string @/ return) + (fun path req -> + Log.debug (fun k -> + k "start upload %S, headers:\n%s\n\n%!" path + (Format.asprintf "%a" Headers.pp (Request.headers req))); + try + let oc = open_out @@ "/tmp/" ^ path in + IO.Input.to_chan oc req.Request.body; + flush oc; + Response.make_string (Ok "uploaded file") + with e -> + Response.fail ~code:500 "couldn't upload file: %s" + (Printexc.to_string e)); + + (* protected by login *) + Server.add_route_handler server + Route.(exact "protected" @/ return) + (fun req -> + let ok = + match Request.get_header req "authorization" with + | Some v -> + Log.debug (fun k -> k "authenticate with %S" v); + v = "Basic " ^ base64 "user:foobar" + | None -> false + in + if ok then ( + (* FIXME: a logout link *) + let s = + "

hello, this is super secret!

log out" + in + Response.make_string (Ok s) + ) else ( + let headers = + Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"") + in + Response.fail ~code:401 ~headers "invalid" + )); + + (* logout *) + Server.add_route_handler server + Route.(exact "logout" @/ return) + (fun _req -> Response.fail ~code:401 "logged out"); + + (* stats *) + Server.add_route_handler server + Route.(exact "stats" @/ return) + (fun _req -> + let stats = get_stats () in + Response.make_string @@ Ok stats); + + Server.add_route_handler server + Route.(exact "alice" @/ return) + (fun _req -> Response.make_string (Ok alice_text)); + + Server.add_route_handler ~meth:`HEAD server + Route.(exact "head" @/ return) + (fun _req -> + Response.make_void ~code:200 ~headers:[ "x-hello", "world" ] ()); + + (* VFS *) + Tiny_httpd.Dir.add_vfs server + ~config: + (Tiny_httpd.Dir.config ~download:true + ~dir_behavior:Tiny_httpd.Dir.Index_or_lists ()) + ~vfs:Vfs.vfs ~prefix:"vfs"; + + setup_upload server; + + (* main page *) + Server.add_route_handler server + Route.(return) + (fun _req -> + let open Tiny_httpd_html in + let h = + html [] + [ + head [] [ title [] [ txt "index of echo" ] ]; + body [] + [ + h3 [] [ txt "welcome!" ]; + p [] [ b [] [ txt "endpoints are:" ] ]; + ul [] + [ + li [] [ pre [] [ txt "/hello/:name (GET)" ] ]; + li [] + [ + pre [] + [ + a [ A.href "/echo/" ] [ txt "echo" ]; + txt " echo back query"; + ]; + ]; + li [] + [ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ]; + li [] + [ + pre [] + [ + txt + "/zcat/:path (GET) to download a file (deflate \ + transfer-encoding)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/stats/" ] [ txt "/stats/" ]; + txt " (GET) to access statistics"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/vfs/" ] [ txt "/vfs" ]; + txt " (GET) to access a VFS embedded in the binary"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/protected" ] [ txt "/protected" ]; + txt + " (GET) to see a protected page (login: user, \ + password: foobar)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/logout" ] [ txt "/logout" ]; + txt " (POST) to log out"; + ]; + ]; + li [] + [ + form + [ + A.action "/upload"; + A.enctype "multipart/form-data"; + A.target "_self"; + A.method_ "POST"; + ] + [ + label [] [ txt "my beautiful form" ]; + input [ A.type_ "file"; A.name "file1" ]; + input [ A.type_ "file"; A.name "file2" ]; + input + [ + A.type_ "text"; + A.name "a"; + A.placeholder "text A"; + ]; + input + [ + A.type_ "text"; + A.name "b"; + A.placeholder "text B"; + ]; + input [ A.type_ "submit" ]; + ]; + ]; + ]; + ]; + ] + in + let s = to_string_top h in + Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); + + Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) + (Server.port server); + match Server.run server with + | Ok () -> () + | Error e -> raise e diff --git a/src/lwt/dune b/src/lwt/dune new file mode 100644 index 00000000..76e9b6ae --- /dev/null +++ b/src/lwt/dune @@ -0,0 +1,6 @@ + +(library + (name tiny_httpd_lwt) + (public_name tiny_httpd_lwt) + (enabled_if (>= %{ocaml_version} 5.0)) + (libraries tiny_httpd lwt lwt.unix)) diff --git a/src/lwt/lwt_direct.ml b/src/lwt/lwt_direct.ml new file mode 100644 index 00000000..704a49ab --- /dev/null +++ b/src/lwt/lwt_direct.ml @@ -0,0 +1,104 @@ +(* Direct-style wrapper for Lwt code + + The implementation of the direct-style wrapper relies on ocaml5's effect + system capturing continuations and adding them as a callback to some lwt + promises. *) + +(* part 1: tasks, getting the scheduler to call them *) + +let tasks : (unit -> unit) Queue.t = Queue.create () +let[@inline] push_task f : unit = Queue.push f tasks + +let absolute_max_number_of_steps = + (* TODO 6.0: what's a good number here? should it be customisable? *) + 10_000 + +let run_all_tasks () : unit = + let n_processed = ref 0 in + let max_number_of_steps = + min absolute_max_number_of_steps (2 * Queue.length tasks) + in + while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do + let t = Queue.pop tasks in + incr n_processed; + try t () + with exn -> + (* TODO 6.0: change async_exception handler to accept a backtrace, pass it + here and at the other use site. *) + (* TODO 6.0: this and other try-with: respect exception-filter *) + !Lwt.async_exception_hook exn + done; + (* In the case where there are no promises ready for wakeup, the scheduler's + engine will pause until some IO completes. There might never be completed + IO, depending on the program structure and the state of the world. If this + happens and the queue is not empty, we add a [pause] so that the engine has + something to wakeup for so that the rest of the queue can be processed. *) + if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then + ignore (Lwt.pause () : unit Lwt.t) + +let setup_hooks = + let already_done = ref false in + fun () -> + if not !already_done then ( + already_done := true; + (* TODO 6.0: assess whether we should have both hooks or just one (which + one). Tempted to say we should only have the enter hook. *) + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + ) + +(* part 2: effects, performing them *) + +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t + +let await (fut : 'a Lwt.t) : 'a = + match Lwt.state fut with + | Lwt.Return x -> x + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> Effect.perform (Await fut) + +let yield () : unit = Effect.perform Yield + +(* part 3: handling effects *) + +let handler : _ Effect.Deep.effect_handler = + let effc : type b. + b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function + | Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ())) + | Await fut -> + Some + (fun k -> + Lwt.on_any fut + (fun res -> push_task (fun () -> Effect.Deep.continue k res)) + (fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn))) + | _ -> None + in + { effc } + +(* part 4: putting it all together: running tasks *) + +let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : + unit = + let run_f_and_set_res () = + match f () with + | res -> Lwt.wakeup promise res + | exception exc -> Lwt.wakeup_exn promise exc + in + Effect.Deep.try_with run_f_and_set_res () handler + +let spawn f : _ Lwt.t = + setup_hooks (); + let lwt, resolve = Lwt.wait () in + push_task (run_inside_effect_handler_and_resolve_ resolve f); + lwt + +(* part 4 (encore): running a task in the background *) + +let run_inside_effect_handler_in_the_background_ f () : unit = + let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in + Effect.Deep.try_with run_f () handler + +let spawn_in_the_background f : unit = + setup_hooks (); + push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/lwt/lwt_direct.mli b/src/lwt/lwt_direct.mli new file mode 100644 index 00000000..b964ab78 --- /dev/null +++ b/src/lwt/lwt_direct.mli @@ -0,0 +1,67 @@ +(** Direct style control flow for Lwt. + + This module relies on OCaml 5's + {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of + chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it + becomes possible to start lightweight "tasks" using + [Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in + direct-style code, using OCaml's standard control flow structures such as + loops, higher-order functions, exception handlers, [match], etc. + + Interactions with the rest of lwt can be done using [await], for example: + + {[ + Lwt_direct.spawn (fun () -> + let continue = ref true in + while !continue do + match Lwt_io.read_line in_channel |> Lwt_direct.await with + | exception End_of_file -> continue := false + | line -> + let uppercase_line = String.uppercase_ascii line in + Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await + done) + ]} + + This code snippet contains a simple "task" that repeatedly reads a line from + a [Lwt_io] channel, uppercases it, and writes the uppercase version to + another channel. + + This task is itself a [unit Lwt.t], which is resolved when the function + returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore + the result and let the task run in the background instead. *) + +val spawn : (unit -> 'a) -> 'a Lwt.t +(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event + loop. [f ()] can create [Lwt] promises and use {!await} to wait for them. + Like any promise in Lwt, [f ()] can starve the event loop if it runs long + computations without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise [spawn f] is + resolved with [f ()]'s result, or the exception raised by [f ()]. + + The promise returned by [spawn f] is not cancellable. Canceling it will have + no effect. *) + +val spawn_in_the_background : (unit -> unit) -> unit +(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The + computation [f()] runs in the background in the event loop and returns no + result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called. +*) + +val yield : unit -> unit +(** Yield to the event loop. + + Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) + +val await : 'a Lwt.t -> 'a +(** [await prom] returns the result of [prom], or re-raises the exception with + which [prom] failed if it failed. If [prom] is not resolved yet, + [await prom] will suspend the current task and resume it when [prom] is + resolved. + + Calling [await] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) + diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml new file mode 100644 index 00000000..af7f1dfd --- /dev/null +++ b/src/lwt/tiny_httpd_lwt.ml @@ -0,0 +1,220 @@ +module IO = Tiny_httpd.IO +module H = Tiny_httpd.Server +module Pool = Tiny_httpd.Pool +module Slice = IO.Slice +module Log = Tiny_httpd.Log +module Lwt_direct = Lwt_direct + +let spf = Printf.sprintf +let ( let@ ) = ( @@ ) + +type 'a with_args = + ?addr:string -> + ?port:int -> + ?unix_sock:string -> + ?max_connections:int -> + ?max_buf_pool_size:int -> + ?buf_size:int -> + 'a + +let get_max_connection_ ?(max_connections = 64) () : int = + let max_connections = max 4 max_connections in + max_connections + +let default_buf_size = 16 * 1024 + +let show_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Input.t = + object + inherit Iostream.In_buf.t_from_refill ~bytes () + + method private refill (sl : Slice.t) = + assert (sl.len = 0); + sl.off <- 0; + let n = + Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await + in + sl.len <- n + + method close () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await + end + +let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Output.t = + object + inherit IO.Output.t_from_output ~bytes () + (* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *) + + method private output_underlying buf i len = + let i = ref i in + let len = ref len in + while !len > 0 do + let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in + i := !i + n; + len := !len - n + done + + method private close_underlying () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await + end + +let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size + ?(buf_size = default_buf_size) () : (module H.IO_BACKEND) = + let _buf_pool = + Pool.create ?max_size:max_buf_pool_size + ~mk_item:(fun () -> Lwt_bytes.create buf_size) + () + in + + let addr, port, (sockaddr : Unix.sockaddr) = + match addr, port, unix_sock with + | _, _, Some s -> Printf.sprintf "unix:%s" s, 0, Unix.ADDR_UNIX s + | addr, port, None -> + let addr = Option.value ~default:"127.0.0.1" addr in + let sockaddr, port = + match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with + | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None -> + let p = 8080 in + Unix.ADDR_INET (h, p), p + | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, Some p -> + Unix.ADDR_INET (h, p), p + | _ -> + failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr + in + addr, port, sockaddr + in + + let module M = struct + let init_addr () = addr + let init_port () = port + let get_time_s () = Unix.gettimeofday () + let max_connections = get_max_connection_ ?max_connections () + + let pool_size = + match max_buf_pool_size with + | Some n -> n + | None -> min 4096 (max_connections * 2) + + let tcp_server () : IO.TCP_server.builder = + { + IO.TCP_server.serve = + (fun ~after_init ~handle () : unit -> + let server_done, set_server_done = Lwt.wait () in + let running = Atomic.make true in + let active_conns = Atomic.make 0 in + + (* a pool of buffers, to reduce allocations *) + let buf_pool = + Pool.create ~max_size:pool_size + ~clear:(fun buf -> Bytes.fill buf 0 (Bytes.length buf) '\x00') + ~mk_item:(fun () -> Bytes.create buf_size) + () + in + + (* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *) + let port = ref port in + + let server_loop : unit Lwt.t = + let@ () = Lwt_direct.spawn in + let backlog = max_connections in + let sock = + Lwt_unix.socket ~cloexec:true + (Unix.domain_of_sockaddr sockaddr) + Unix.SOCK_STREAM 0 + in + Lwt_unix.setsockopt sock Unix.TCP_NODELAY true; + Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None; + Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true; + Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true; + Lwt_unix.bind sock sockaddr |> Lwt_direct.await; + Lwt_unix.listen sock backlog; + + let cleanup () = Lwt_unix.close sock |> Lwt_direct.await in + + (* recover real port, if any *) + (match Unix.getsockname (Lwt_unix.unix_file_descr sock) with + | Unix.ADDR_INET (_, p) -> port := p + | _ -> ()); + + let handle_client client_addr fd : unit = + Atomic.incr active_conns; + Lwt_direct.spawn_in_the_background @@ fun () -> + let@ buf_ic = Pool.with_resource buf_pool in + let@ buf_oc = Pool.with_resource buf_pool in + + (* close FD when both ends are closed *) + let num_open = ref 2 in + let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in + let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in + + let cleanup ~shutdown () = + Log.debug (fun k -> + k "Tiny_httpd_lwt: client handler returned"); + Atomic.decr active_conns; + if shutdown then ( + try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> () + ); + ic#close (); + oc#close () + in + + try + handle.handle ~client_addr ic oc; + cleanup ~shutdown:true () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup ~shutdown:false (); + Log.error (fun k -> + k "Client handler for %s failed with %s\n%s" + (show_sockaddr client_addr) + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt)) + in + + try + while Atomic.get running do + let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in + handle_client addr fd + done; + cleanup () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + Printexc.raise_with_backtrace exn bt + in + + let tcp_server : IO.TCP_server.t = + { + running = (fun () -> Atomic.get running); + stop = + (fun () -> + Atomic.set running false; + Lwt.wakeup_later set_server_done (); + Lwt_direct.await server_loop); + endpoint = (fun () -> addr, !port); + active_connections = (fun () -> Atomic.get active_conns); + } + in + + after_init tcp_server; + Lwt_direct.await server_done); + } + end in + (module M) + +let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size + ?middlewares () : H.t Lwt.t = + let@ () = Lwt_direct.spawn in + let backend = + io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections + ?buf_size () + in + H.create_from ?buf_size ?middlewares ~backend () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli new file mode 100644 index 00000000..2ef9fa1c --- /dev/null +++ b/src/lwt/tiny_httpd_lwt.mli @@ -0,0 +1,28 @@ +(** Lwt backend for Tiny_httpd. + + This only works on OCaml 5 because it uses effect handlers to use Lwt in + direct style. + + {b NOTE}: this is very experimental and will absolutely change over time, + @since NEXT_RELEASE *) + +module Lwt_direct = Lwt_direct + +type 'a with_args = + ?addr:string -> + ?port:int -> + ?unix_sock:string -> + ?max_connections:int -> + ?max_buf_pool_size:int -> + ?buf_size:int -> + 'a + +val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args +(** Create a server *) + +val create : + (?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list -> + unit -> + Tiny_httpd.Server.t Lwt.t) + with_args +(** Create a server *) diff --git a/tiny_httpd_lwt.opam b/tiny_httpd_lwt.opam new file mode 100644 index 00000000..2afaf170 --- /dev/null +++ b/tiny_httpd_lwt.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.19" +synopsis: "Tiny_httpd backend based on lwt.unix for OCaml 5" +maintainer: ["c-cube"] +authors: ["c-cube"] +license: "MIT" +homepage: "https://github.com/c-cube/tiny_httpd/" +bug-reports: "https://github.com/c-cube/tiny_httpd/issues" +depends: [ + "dune" {>= "3.2"} + "tiny_httpd" {= version} + "lwt" {>= "5.0"} + "base-unix" + "logs" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/tiny_httpd.git"