From ef5e62e75f1c2ab21009df3b8ddf973aa060f02f Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 21:07:58 -0400 Subject: [PATCH 01/11] feat: add `tiny_httpd_lwt` library --- src/lwt/dune | 6 ++ src/lwt/task.ml | 74 ++++++++++++++++ src/lwt/task.mli | 9 ++ src/lwt/tiny_httpd_lwt.ml | 175 +++++++++++++++++++++++++++++++++++++ src/lwt/tiny_httpd_lwt.mli | 26 ++++++ 5 files changed, 290 insertions(+) create mode 100644 src/lwt/dune create mode 100644 src/lwt/task.ml create mode 100644 src/lwt/task.mli create mode 100644 src/lwt/tiny_httpd_lwt.ml create mode 100644 src/lwt/tiny_httpd_lwt.mli diff --git a/src/lwt/dune b/src/lwt/dune new file mode 100644 index 00000000..76e9b6ae --- /dev/null +++ b/src/lwt/dune @@ -0,0 +1,6 @@ + +(library + (name tiny_httpd_lwt) + (public_name tiny_httpd_lwt) + (enabled_if (>= %{ocaml_version} 5.0)) + (libraries tiny_httpd lwt lwt.unix)) diff --git a/src/lwt/task.ml b/src/lwt/task.ml new file mode 100644 index 00000000..d1e615f8 --- /dev/null +++ b/src/lwt/task.ml @@ -0,0 +1,74 @@ +module ED = Effect.Deep + +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t + +(** Queue of microtasks that are ready *) +let tasks : (unit -> unit) Queue.t = Queue.create () + +let[@inline] push_task f : unit = Queue.push f tasks + +let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref = + ref (fun exn bt -> + Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt)) + +let run_all_tasks () : unit = + (* use local queue to prevent the hook from running forever in case + tasks keep scheduling new tasks. *) + let local = Queue.create () in + Queue.transfer tasks local; + while not (Queue.is_empty local) do + let t = Queue.pop local in + try t () + with exn -> + let bt = Printexc.get_raw_backtrace () in + !on_uncaught_exn exn bt + done; + (* make sure we don't sleep forever if there's no lwt promise + ready but [tasks] contains ready tasks *) + if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) + +let () = + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + +let await (fut : 'a Lwt.t) : 'a = + match Lwt.state fut with + | Lwt.Return x -> x + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> Effect.perform (Await fut) + +(** the main effect handler *) +let handler : _ ED.effect_handler = + let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = + function + | Await fut -> + Some + (fun k -> + Lwt.on_any fut + (fun res -> push_task (fun () -> ED.continue k res)) + (fun exn -> push_task (fun () -> ED.discontinue k exn))) + | _ -> None + in + + { effc } + +let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = + let res = ref (Error (Failure "not resolved")) in + let run_f_and_set_res () = + (try + let r = f () in + res := Ok r + with exn -> res := Error exn); + Lwt.wakeup_later_result promise !res + in + ED.try_with run_f_and_set_res () handler + +let run f : _ Lwt.t = + let lwt, resolve = Lwt.wait () in + push_task (run_inside_effect_handler_ resolve f); + lwt + +let run_async f : unit = ignore (run f : unit Lwt.t) diff --git a/src/lwt/task.mli b/src/lwt/task.mli new file mode 100644 index 00000000..7b326dc0 --- /dev/null +++ b/src/lwt/task.mli @@ -0,0 +1,9 @@ +(** Direct style tasks for Lwt *) + +val run : (unit -> 'a) -> 'a Lwt.t +(** Run a microtask *) + +val run_async : (unit -> unit) -> unit + +val await : 'a Lwt.t -> 'a +(** Can only be used inside {!run} *) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml new file mode 100644 index 00000000..909c177c --- /dev/null +++ b/src/lwt/tiny_httpd_lwt.ml @@ -0,0 +1,175 @@ +module IO = Tiny_httpd.IO +module H = Tiny_httpd.Server +module Pool = Tiny_httpd.Pool +module Slice = IO.Slice +module Log = Tiny_httpd.Log + +let spf = Printf.sprintf +let ( let@ ) = ( @@ ) + +type 'a with_args = + ?addr:string -> + ?port:int -> + ?unix_sock:string -> + ?max_connections:int -> + ?max_buf_pool_size:int -> + ?buf_size:int -> + 'a + +let get_max_connection_ ?(max_connections = 64) () : int = + let max_connections = max 4 max_connections in + max_connections + +let buf_size = 16 * 1024 + +let show_sockaddr = function + | Unix.ADDR_UNIX s -> s + | Unix.ADDR_INET (addr, port) -> + spf "%s:%d" (Unix.string_of_inet_addr addr) port + +let ic_of_channel (ic : Lwt_io.input_channel) : IO.Input.t = + object + inherit Iostream.In_buf.t_from_refill () + + method private refill (sl : Slice.t) = + assert (sl.len = 0); + let n = + Lwt_io.read_into ic sl.bytes 0 (Bytes.length sl.bytes) |> Task.await + in + sl.len <- n + + method close () = Lwt_io.close ic |> Task.await + end + +let oc_of_channel (oc : Lwt_io.output_channel) : IO.Output.t = + object + method flush () : unit = Lwt_io.flush oc |> Task.await + + method output buf i len = + Lwt_io.write_from_exactly oc buf i len |> Task.await + + method output_char c = Lwt_io.write_char oc c |> Task.await + method close () = Lwt_io.close oc |> Task.await + end + +let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size + ?(buf_size = buf_size) () : (module H.IO_BACKEND) = + let buf_pool = + Pool.create ?max_size:max_buf_pool_size + ~mk_item:(fun () -> Lwt_bytes.create buf_size) + () + in + + let addr, port, (sockaddr : Unix.sockaddr) = + match addr, port, unix_sock with + | _, _, Some s -> Printf.sprintf "unix:%s" s, 0, Unix.ADDR_UNIX s + | addr, port, None -> + let addr = Option.value ~default:"127.0.0.1" addr in + let sockaddr, port = + match Lwt_unix.getaddrinfo addr "" [] |> Task.await, port with + | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None -> + let p = 8080 in + Unix.ADDR_INET (h, p), p + | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, Some p -> + Unix.ADDR_INET (h, p), p + | _ -> + failwith @@ Printf.sprintf "Could not parse TCP address from %S" addr + in + addr, port, sockaddr + in + + let module M = struct + let init_addr () = addr + let init_port () = port + let get_time_s () = Unix.gettimeofday () + let max_connections = get_max_connection_ ?max_connections () + + let pool_size = + match max_buf_pool_size with + | Some n -> n + | None -> min 4096 (max_connections * 2) + + let tcp_server () : IO.TCP_server.builder = + { + IO.TCP_server.serve = + (fun ~after_init ~handle () : unit -> + let running = Atomic.make true in + let active_conns = Atomic.make 0 in + + (* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *) + let port = ref port in + + let server_loop : unit Lwt.t = + let@ () = Task.run in + let backlog = max_connections in + let sock = + Lwt_unix.socket ~cloexec:true + (Unix.domain_of_sockaddr sockaddr) + Unix.SOCK_STREAM 0 + in + Lwt_unix.bind sock sockaddr |> Task.await; + Lwt_unix.listen sock backlog; + + (* recover real port, if any *) + (match Unix.getsockname (Lwt_unix.unix_file_descr sock) with + | Unix.ADDR_INET (_, p) -> port := p + | _ -> ()); + + let handle_client client_addr fd : unit = + Atomic.incr active_conns; + let@ () = Task.run_async in + let@ () = + Fun.protect ~finally:(fun () -> + Log.debug (fun k -> + k "Tiny_httpd_lwt: client handler returned"); + Atomic.decr active_conns) + in + + let@ buf_ic = Pool.with_resource buf_pool in + let@ buf_oc = Pool.with_resource buf_pool in + let ic = + ic_of_channel @@ Lwt_io.of_fd ~mode:Input ~buffer:buf_ic fd + in + let oc = + oc_of_channel @@ Lwt_io.of_fd ~mode:Output ~buffer:buf_ic fd + in + try handle.handle ~client_addr ic oc + with exn -> + let bt = Printexc.get_raw_backtrace () in + Log.error (fun k -> + k "Client handler for %s failed with %s\n%s" + (show_sockaddr client_addr) + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt)) + in + + while Atomic.get running do + let fd, addr = Lwt_unix.accept sock |> Task.await in + handle_client addr fd + done + in + + let tcp_server : IO.TCP_server.t = + { + running = (fun () -> Atomic.get running); + stop = + (fun () -> + Atomic.set running false; + Task.await server_loop); + endpoint = (fun () -> addr, !port); + active_connections = (fun () -> Atomic.get active_conns); + } + in + + after_init tcp_server); + } + end in + (module M) + +let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size + ?middlewares () : H.t = + let backend = + io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections + ?buf_size () + in + H.create_from ?buf_size ?middlewares ~backend () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli new file mode 100644 index 00000000..56bd6868 --- /dev/null +++ b/src/lwt/tiny_httpd_lwt.mli @@ -0,0 +1,26 @@ +(** Lwt backend for Tiny_httpd. + + This only works on OCaml 5 because it uses effect handlers to use Lwt in + direct style. + + {b NOTE}: this is very experimental and will absolutely change over time, + @since NEXT_RELEASE *) + +type 'a with_args = + ?addr:string -> + ?port:int -> + ?unix_sock:string -> + ?max_connections:int -> + ?max_buf_pool_size:int -> + ?buf_size:int -> + 'a + +val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args +(** Create a server *) + +val create : + (?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list -> + unit -> + Tiny_httpd.Server.t) + with_args +(** Create a server *) From 8931def188acbb114acc106c8e5a27715dc71a1c Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 21:08:29 -0400 Subject: [PATCH 02/11] package tiny_httpd_lwt --- dune-project | 10 ++++++++++ tiny_httpd_lwt.opam | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 tiny_httpd_lwt.opam diff --git a/dune-project b/dune-project index 6446bd94..533e54b3 100644 --- a/dune-project +++ b/dune-project @@ -39,3 +39,13 @@ (iostream-camlzip (>= 0.2.1)) (logs :with-test) (odoc :with-doc))) + +(package + (name tiny_httpd_lwt) + (synopsis "Tiny_httpd backend based on lwt.unix for OCaml 5") + (depends + (tiny_httpd (= :version)) + (lwt (>= 5.0)) + base-unix + (logs :with-test) + (odoc :with-doc))) diff --git a/tiny_httpd_lwt.opam b/tiny_httpd_lwt.opam new file mode 100644 index 00000000..2afaf170 --- /dev/null +++ b/tiny_httpd_lwt.opam @@ -0,0 +1,32 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.19" +synopsis: "Tiny_httpd backend based on lwt.unix for OCaml 5" +maintainer: ["c-cube"] +authors: ["c-cube"] +license: "MIT" +homepage: "https://github.com/c-cube/tiny_httpd/" +bug-reports: "https://github.com/c-cube/tiny_httpd/issues" +depends: [ + "dune" {>= "3.2"} + "tiny_httpd" {= version} + "lwt" {>= "5.0"} + "base-unix" + "logs" {with-test} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/c-cube/tiny_httpd.git" From cd0407973f70ade9f16cca38b28e72ece1bdf984 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 22:44:28 -0400 Subject: [PATCH 03/11] echo example with lwt --- echo_lwt.sh | 2 + examples/dune | 16 +- examples/echo_lwt.ml | 377 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 393 insertions(+), 2 deletions(-) create mode 100755 echo_lwt.sh create mode 100644 examples/echo_lwt.ml diff --git a/echo_lwt.sh b/echo_lwt.sh new file mode 100755 index 00000000..dd5317bc --- /dev/null +++ b/echo_lwt.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec dune exec --display=quiet --profile=release "examples/echo_lwt.exe" -- $@ diff --git a/examples/dune b/examples/dune index 08d06886..a596ab63 100644 --- a/examples/dune +++ b/examples/dune @@ -8,11 +8,23 @@ (modules sse_client) (libraries unix)) +(library + (name example_vfs) + (wrapped false) + (modules vfs) + (libraries tiny_httpd)) + (executable (name echo) (flags :standard -warn-error -a+8) - (modules echo vfs) - (libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data)) + (modules echo) + (libraries tiny_httpd logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs)) + +(executable + (name echo_lwt) + (flags :standard -warn-error -a+8) + (modules echo_lwt) + (libraries tiny_httpd tiny_httpd_lwt logs tiny_httpd_camlzip tiny_httpd.multipart-form-data example_vfs)) (executable (name writer) diff --git a/examples/echo_lwt.ml b/examples/echo_lwt.ml new file mode 100644 index 00000000..6b46d3c0 --- /dev/null +++ b/examples/echo_lwt.ml @@ -0,0 +1,377 @@ +open Tiny_httpd_core +module Log = Tiny_httpd.Log +module MFD = Tiny_httpd_multipart_form_data +module Task = Tiny_httpd_lwt.Task + +let now_ = Unix.gettimeofday + +let alice_text = + "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of \ + sitting by her sister on the bank, and of having nothing to do: once or \ + twice she had peeped into the book her sister was reading, but it had no \ + pictures or conversations in it, thought \ + Alice So she was considering in her \ + own mind (as well as she could, for the hot day made her feel very sleepy \ + and stupid), whether the pleasure of making a daisy-chain would be worth \ + the trouble of getting up and picking the daisies, when suddenly a White \ + Rabbit with pink eyes ran close by her. There was nothing so very \ + remarkable in that; nor did Alice think it so very much out of the way to \ + hear the Rabbit say to itself, (when \ + she thought it over afterwards, it occurred to her that she ought to have \ + wondered at this, but at the time it all seemed quite natural); but when \ + the Rabbit actually took a watch out of its waistcoat-pocket, and looked at \ + it, and then hurried on, Alice started to her feet, for it flashed across \ + her mind that she had never before seen a rabbit with either a \ + waistcoat-pocket, or a watch to take out of it, and burning with curiosity, \ + she ran across the field after it, and fortunately was just in time to see \ + it pop down a large rabbit-hole under the hedge. In another moment down \ + went Alice after it, never once considering how in the world she was to get \ + out again. The rabbit-hole went straight on like a tunnel for some way, and \ + then dipped suddenly down, so suddenly that Alice had not a moment to think \ + about stopping herself before she found herself falling down a very deep \ + well. Either the well was very deep, or she fell very slowly, for she had \ + plenty of time as she went down to look about her and to wonder what was \ + going to happen next. First, she tried to look down and make out what she \ + was coming to, but it was too dark to see anything; then she looked at the \ + sides of the well, and noticed that they were filled with cupboards......" + +(* util: a little middleware collecting statistics *) +let middleware_stat () : Server.Middleware.t * (unit -> string) = + let n_req = ref 0 in + let total_time_ = ref 0. in + let parse_time_ = ref 0. in + let build_time_ = ref 0. in + let write_time_ = ref 0. in + + let m h req ~resp = + incr n_req; + let t1 = Request.start_time req in + let t2 = now_ () in + h req ~resp:(fun response -> + let t3 = now_ () in + resp response; + let t4 = now_ () in + total_time_ := !total_time_ +. (t4 -. t1); + parse_time_ := !parse_time_ +. (t2 -. t1); + build_time_ := !build_time_ +. (t3 -. t2); + write_time_ := !write_time_ +. (t4 -. t3)) + and get_stat () = + Printf.sprintf + "%d requests (average response time: %.3fms = %.3fms + %.3fms + %.3fms)" + !n_req + (!total_time_ /. float !n_req *. 1e3) + (!parse_time_ /. float !n_req *. 1e3) + (!build_time_ /. float !n_req *. 1e3) + (!write_time_ /. float !n_req *. 1e3) + in + m, get_stat + +(* ugly AF *) +let base64 x = + let ic, oc = Unix.open_process "base64" in + output_string oc x; + flush oc; + close_out oc; + let r = input_line ic in + ignore (Unix.close_process (ic, oc)); + r + +let setup_logging () = + Logs.set_reporter @@ Logs.format_reporter (); + Logs.set_level ~all:true (Some Logs.Debug) + +let setup_upload server : unit = + Server.add_route_handler_stream ~meth:`POST server + Route.(exact "upload" @/ return) + (fun req -> + let (`boundary boundary) = + match MFD.parse_content_type req.headers with + | Some b -> b + | None -> Response.fail_raise ~code:400 "no boundary found" + in + + let st = MFD.create ~boundary req.body in + let tbl = Hashtbl.create 16 in + let cur = ref "" in + let cur_kind = ref "" in + let buf = Buffer.create 16 in + let rec loop () = + match MFD.next st with + | End_of_input -> + if !cur <> "" then + Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf) + | Part headers -> + if !cur <> "" then + Hashtbl.add tbl !cur (!cur_kind, Buffer.contents buf); + (match MFD.Content_disposition.parse headers with + | Some { kind; name = Some name; filename = _ } -> + cur := name; + cur_kind := kind; + Buffer.clear buf; + loop () + | _ -> Response.fail_raise ~code:400 "content disposition missing") + | Read sl -> + Buffer.add_subbytes buf sl.bytes sl.off sl.len; + loop () + in + loop (); + + let open Tiny_httpd_html in + let data = + Hashtbl.fold + (fun name (kind, data) acc -> + Printf.sprintf "%S (kind: %S): %S" name kind data :: acc) + tbl [] + in + let html = + body [] + [ + pre [] + [ txt (Printf.sprintf "{\n%s\n}" @@ String.concat "\n" data) ]; + ] + in + Response.make_string ~code:201 @@ Ok (to_string_top html)) + +let () = + let port_ = ref 8080 in + let j = ref 32 in + let addr = ref "127.0.0.1" in + Arg.parse + (Arg.align + [ + "--port", Arg.Set_int port_, " set port"; + "-p", Arg.Set_int port_, " set port"; + "--debug", Arg.Unit setup_logging, " enable debug"; + "-j", Arg.Set_int j, " maximum number of connections"; + "--addr", Arg.Set_string addr, " binding address"; + ]) + (fun _ -> raise (Arg.Bad "")) + "echo [option]*"; + + Lwt_main.run @@ Task.run + @@ fun () -> + let server = + Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j () + |> Task.await + in + + Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; + let m_stats, get_stats = middleware_stat () in + Server.add_middleware server ~stage:(`Stage 1) m_stats; + + (* say hello *) + Server.add_route_handler ~meth:`GET server + Route.(exact "hello" @/ string @/ return) + (fun name _req -> Response.make_string (Ok ("hello " ^ name ^ "!\n"))); + + (* compressed file access *) + Server.add_route_handler ~meth:`GET server + Route.(exact "zcat" @/ string_urlencoded @/ return) + (fun path _req -> + let ic = open_in path in + let str = IO.Input.of_in_channel ic in + let mime_type = + try + let p = Unix.open_process_in (Printf.sprintf "file -i -b %S" path) in + try + let s = [ "Content-Type", String.trim (input_line p) ] in + ignore @@ Unix.close_process_in p; + s + with _ -> + ignore @@ Unix.close_process_in p; + [] + with _ -> [] + in + Response.make_stream ~headers:mime_type (Ok str)); + + (* echo request *) + Server.add_route_handler server + Route.(exact "echo" @/ return) + (fun req -> + let q = + Request.query req + |> List.map (fun (k, v) -> Printf.sprintf "%S = %S" k v) + |> String.concat ";" + in + Response.make_string + (Ok (Format.asprintf "echo:@ %a@ (query: %s)@." Request.pp req q))); + + (* file upload *) + Server.add_route_handler_stream ~meth:`PUT server + Route.(exact "upload" @/ string @/ return) + (fun path req -> + Log.debug (fun k -> + k "start upload %S, headers:\n%s\n\n%!" path + (Format.asprintf "%a" Headers.pp (Request.headers req))); + try + let oc = open_out @@ "/tmp/" ^ path in + IO.Input.to_chan oc req.Request.body; + flush oc; + Response.make_string (Ok "uploaded file") + with e -> + Response.fail ~code:500 "couldn't upload file: %s" + (Printexc.to_string e)); + + (* protected by login *) + Server.add_route_handler server + Route.(exact "protected" @/ return) + (fun req -> + let ok = + match Request.get_header req "authorization" with + | Some v -> + Log.debug (fun k -> k "authenticate with %S" v); + v = "Basic " ^ base64 "user:foobar" + | None -> false + in + if ok then ( + (* FIXME: a logout link *) + let s = + "

hello, this is super secret!

log out" + in + Response.make_string (Ok s) + ) else ( + let headers = + Headers.(empty |> set "www-authenticate" "basic realm=\"echo\"") + in + Response.fail ~code:401 ~headers "invalid" + )); + + (* logout *) + Server.add_route_handler server + Route.(exact "logout" @/ return) + (fun _req -> Response.fail ~code:401 "logged out"); + + (* stats *) + Server.add_route_handler server + Route.(exact "stats" @/ return) + (fun _req -> + let stats = get_stats () in + Response.make_string @@ Ok stats); + + Server.add_route_handler server + Route.(exact "alice" @/ return) + (fun _req -> Response.make_string (Ok alice_text)); + + Server.add_route_handler ~meth:`HEAD server + Route.(exact "head" @/ return) + (fun _req -> + Response.make_void ~code:200 ~headers:[ "x-hello", "world" ] ()); + + (* VFS *) + Tiny_httpd.Dir.add_vfs server + ~config: + (Tiny_httpd.Dir.config ~download:true + ~dir_behavior:Tiny_httpd.Dir.Index_or_lists ()) + ~vfs:Vfs.vfs ~prefix:"vfs"; + + setup_upload server; + + (* main page *) + Server.add_route_handler server + Route.(return) + (fun _req -> + let open Tiny_httpd_html in + let h = + html [] + [ + head [] [ title [] [ txt "index of echo" ] ]; + body [] + [ + h3 [] [ txt "welcome!" ]; + p [] [ b [] [ txt "endpoints are:" ] ]; + ul [] + [ + li [] [ pre [] [ txt "/hello/:name (GET)" ] ]; + li [] + [ + pre [] + [ + a [ A.href "/echo/" ] [ txt "echo" ]; + txt " echo back query"; + ]; + ]; + li [] + [ pre [] [ txt "/upload/:path (PUT) to upload a file" ] ]; + li [] + [ + pre [] + [ + txt + "/zcat/:path (GET) to download a file (deflate \ + transfer-encoding)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/stats/" ] [ txt "/stats/" ]; + txt " (GET) to access statistics"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/vfs/" ] [ txt "/vfs" ]; + txt " (GET) to access a VFS embedded in the binary"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/protected" ] [ txt "/protected" ]; + txt + " (GET) to see a protected page (login: user, \ + password: foobar)"; + ]; + ]; + li [] + [ + pre [] + [ + a [ A.href "/logout" ] [ txt "/logout" ]; + txt " (POST) to log out"; + ]; + ]; + li [] + [ + form + [ + A.action "/upload"; + A.enctype "multipart/form-data"; + A.target "_self"; + A.method_ "POST"; + ] + [ + label [] [ txt "my beautiful form" ]; + input [ A.type_ "file"; A.name "file1" ]; + input [ A.type_ "file"; A.name "file2" ]; + input + [ + A.type_ "text"; + A.name "a"; + A.placeholder "text A"; + ]; + input + [ + A.type_ "text"; + A.name "b"; + A.placeholder "text B"; + ]; + input [ A.type_ "submit" ]; + ]; + ]; + ]; + ]; + ] + in + let s = to_string_top h in + Response.make_string ~headers:[ "content-type", "text/html" ] @@ Ok s); + + Printf.printf "listening on http://%s:%d\n%!" (Server.addr server) + (Server.port server); + match Server.run server with + | Ok () -> () + | Error e -> raise e From 0193a2c0d190a192666e48606cfc3194f78b86ee Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 22:44:57 -0400 Subject: [PATCH 04/11] wip: tiny_httpd_lwt --- src/lwt/task.ml | 4 +- src/lwt/tiny_httpd_lwt.ml | 89 +++++++++++++++++++++++++------------- src/lwt/tiny_httpd_lwt.mli | 4 +- 3 files changed, 66 insertions(+), 31 deletions(-) diff --git a/src/lwt/task.ml b/src/lwt/task.ml index d1e615f8..2902022f 100644 --- a/src/lwt/task.ml +++ b/src/lwt/task.ml @@ -62,7 +62,7 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = let r = f () in res := Ok r with exn -> res := Error exn); - Lwt.wakeup_later_result promise !res + Lwt.wakeup_result promise !res in ED.try_with run_f_and_set_res () handler @@ -72,3 +72,5 @@ let run f : _ Lwt.t = lwt let run_async f : unit = ignore (run f : unit Lwt.t) + +(* TODO: yield, use that in loops? *) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index 909c177c..a5dbe7b7 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -3,6 +3,7 @@ module H = Tiny_httpd.Server module Pool = Tiny_httpd.Pool module Slice = IO.Slice module Log = Tiny_httpd.Log +module Task = Task let spf = Printf.sprintf let ( let@ ) = ( @@ ) @@ -20,41 +21,57 @@ let get_max_connection_ ?(max_connections = 64) () : int = let max_connections = max 4 max_connections in max_connections -let buf_size = 16 * 1024 +let default_buf_size = 4 * 1024 let show_sockaddr = function | Unix.ADDR_UNIX s -> s | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let ic_of_channel (ic : Lwt_io.input_channel) : IO.Input.t = +let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Input.t = + let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object - inherit Iostream.In_buf.t_from_refill () + inherit Iostream.In_buf.t_from_refill ~bytes () method private refill (sl : Slice.t) = assert (sl.len = 0); let n = - Lwt_io.read_into ic sl.bytes 0 (Bytes.length sl.bytes) |> Task.await + Lwt_bytes.read fd lwt_bytes 0 (Lwt_bytes.length lwt_bytes) |> Task.await in + Lwt_bytes.blit_to_bytes lwt_bytes 0 bytes 0 n; sl.len <- n - method close () = Lwt_io.close ic |> Task.await + method close () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Task.await end -let oc_of_channel (oc : Lwt_io.output_channel) : IO.Output.t = +let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : + IO.Output.t = + let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object - method flush () : unit = Lwt_io.flush oc |> Task.await - - method output buf i len = - Lwt_io.write_from_exactly oc buf i len |> Task.await - - method output_char c = Lwt_io.write_char oc c |> Task.await - method close () = Lwt_io.close oc |> Task.await + inherit IO.Output.t_from_output ~bytes () + (* method flush () : unit = Lwt_io.flush oc |> Task.await *) + + method private output_underlying buf i len = + Lwt_bytes.blit_from_bytes buf i lwt_bytes 0 len; + let i = ref 0 in + let len = ref len in + while !len > 0 do + let n = Lwt_bytes.write fd lwt_bytes !i !len |> Task.await in + i := !i + n; + len := !len - n + done + + method private close_underlying () = + decr num_open; + if !num_open <= 0 then Lwt_unix.close fd |> Task.await end let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size - ?(buf_size = buf_size) () : (module H.IO_BACKEND) = - let buf_pool = + ?(buf_size = default_buf_size) () : (module H.IO_BACKEND) = + let _buf_pool = Pool.create ?max_size:max_buf_pool_size ~mk_item:(fun () -> Lwt_bytes.create buf_size) () @@ -93,6 +110,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size { IO.TCP_server.serve = (fun ~after_init ~handle () : unit -> + let server_done, set_server_done = Lwt.wait () in let running = Atomic.make true in let active_conns = Atomic.make 0 in @@ -107,6 +125,10 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (Unix.domain_of_sockaddr sockaddr) Unix.SOCK_STREAM 0 in + Lwt_unix.setsockopt sock Unix.TCP_NODELAY true; + Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None; + Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true; + Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true; Lwt_unix.bind sock sockaddr |> Task.await; Lwt_unix.listen sock backlog; @@ -118,24 +140,30 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; let@ () = Task.run_async in - let@ () = - Fun.protect ~finally:(fun () -> - Log.debug (fun k -> - k "Tiny_httpd_lwt: client handler returned"); - Atomic.decr active_conns) + + let cleanup () = + Log.debug (fun k -> + k "Tiny_httpd_lwt: client handler returned"); + Atomic.decr active_conns in + let buf_ic = Bytes.create buf_size in + let buf_oc = Bytes.create buf_size in + (* let@ buf_ic = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in - let ic = - ic_of_channel @@ Lwt_io.of_fd ~mode:Input ~buffer:buf_ic fd - in - let oc = - oc_of_channel @@ Lwt_io.of_fd ~mode:Output ~buffer:buf_ic fd - in - try handle.handle ~client_addr ic oc +*) + + (* close FD when both ends are closed *) + let num_open = ref 2 in + let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in + let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in + try + handle.handle ~client_addr ic oc; + cleanup () with exn -> let bt = Printexc.get_raw_backtrace () in + cleanup (); Log.error (fun k -> k "Client handler for %s failed with %s\n%s" (show_sockaddr client_addr) @@ -155,19 +183,22 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size stop = (fun () -> Atomic.set running false; + Lwt.wakeup_later set_server_done (); Task.await server_loop); endpoint = (fun () -> addr, !port); active_connections = (fun () -> Atomic.get active_conns); } in - after_init tcp_server); + after_init tcp_server; + Task.await server_done); } end in (module M) let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size - ?middlewares () : H.t = + ?middlewares () : H.t Lwt.t = + let@ () = Task.run in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli index 56bd6868..4b0fabf5 100644 --- a/src/lwt/tiny_httpd_lwt.mli +++ b/src/lwt/tiny_httpd_lwt.mli @@ -6,6 +6,8 @@ {b NOTE}: this is very experimental and will absolutely change over time, @since NEXT_RELEASE *) +module Task = Task + type 'a with_args = ?addr:string -> ?port:int -> @@ -21,6 +23,6 @@ val io_backend : (unit -> (module Tiny_httpd.Server.IO_BACKEND)) with_args val create : (?middlewares:([ `Encoding | `Stage of int ] * Tiny_httpd.Middleware.t) list -> unit -> - Tiny_httpd.Server.t) + Tiny_httpd.Server.t Lwt.t) with_args (** Create a server *) From 57bc8e434cdf37e8c76099c8f14729aaadf35a16 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 23:09:11 -0400 Subject: [PATCH 05/11] make sure we use epoll --- examples/echo_lwt.ml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/echo_lwt.ml b/examples/echo_lwt.ml index 6b46d3c0..e1b9e196 100644 --- a/examples/echo_lwt.ml +++ b/examples/echo_lwt.ml @@ -148,6 +148,9 @@ let () = (fun _ -> raise (Arg.Bad "")) "echo [option]*"; + let ev = new Lwt_engine.libev () in + Lwt_engine.set ev; + Lwt_main.run @@ Task.run @@ fun () -> let server = From 906cc152f2d96b0bcb10e7ca20e922ab56cadaea Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 23:11:18 -0400 Subject: [PATCH 06/11] feat lwt_task: adaptative limit on number of tasks in one go --- src/lwt/task.ml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/lwt/task.ml b/src/lwt/task.ml index 2902022f..15c29b25 100644 --- a/src/lwt/task.ml +++ b/src/lwt/task.ml @@ -14,12 +14,11 @@ let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref = (Printexc.raw_backtrace_to_string bt)) let run_all_tasks () : unit = - (* use local queue to prevent the hook from running forever in case - tasks keep scheduling new tasks. *) - let local = Queue.create () in - Queue.transfer tasks local; - while not (Queue.is_empty local) do - let t = Queue.pop local in + let n_processed = ref 0 in + let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in + while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do + let t = Queue.pop tasks in + incr n_processed; try t () with exn -> let bt = Printexc.get_raw_backtrace () in From 029c558802de530114cd1a15c45e0825b1566d33 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 2 Jul 2025 23:11:37 -0400 Subject: [PATCH 07/11] tiny_httpd_lwt: fix bug in ic refill; revert to `bytes` --- src/lwt/tiny_httpd_lwt.ml | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index a5dbe7b7..e78926ed 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -21,7 +21,7 @@ let get_max_connection_ ?(max_connections = 64) () : int = let max_connections = max 4 max_connections in max_connections -let default_buf_size = 4 * 1024 +let default_buf_size = 16 * 1024 let show_sockaddr = function | Unix.ADDR_UNIX s -> s @@ -30,16 +30,15 @@ let show_sockaddr = function let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : IO.Input.t = - let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object inherit Iostream.In_buf.t_from_refill ~bytes () method private refill (sl : Slice.t) = assert (sl.len = 0); + sl.off <- 0; let n = - Lwt_bytes.read fd lwt_bytes 0 (Lwt_bytes.length lwt_bytes) |> Task.await + Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Task.await in - Lwt_bytes.blit_to_bytes lwt_bytes 0 bytes 0 n; sl.len <- n method close () = @@ -49,17 +48,15 @@ let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : IO.Output.t = - let lwt_bytes = Lwt_bytes.create (Bytes.length bytes) in object inherit IO.Output.t_from_output ~bytes () (* method flush () : unit = Lwt_io.flush oc |> Task.await *) method private output_underlying buf i len = - Lwt_bytes.blit_from_bytes buf i lwt_bytes 0 len; - let i = ref 0 in + let i = ref i in let len = ref len in while !len > 0 do - let n = Lwt_bytes.write fd lwt_bytes !i !len |> Task.await in + let n = Lwt_unix.write fd buf !i !len |> Task.await in i := !i + n; len := !len - n done From 3014046a8a1d3fc8d2ed0015d5017eedc1b86d33 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 8 Jul 2025 10:32:46 -0400 Subject: [PATCH 08/11] use lwt_direct from lwt PR --- examples/echo_lwt.ml | 6 ++-- src/lwt/{task.ml => lwt_direct.ml} | 50 ++++++++++++++++++++---------- src/lwt/lwt_direct.mli | 33 ++++++++++++++++++++ src/lwt/task.mli | 9 ------ src/lwt/tiny_httpd_lwt.ml | 29 +++++++++-------- src/lwt/tiny_httpd_lwt.mli | 2 +- 6 files changed, 85 insertions(+), 44 deletions(-) rename src/lwt/{task.ml => lwt_direct.ml} (55%) create mode 100644 src/lwt/lwt_direct.mli delete mode 100644 src/lwt/task.mli diff --git a/examples/echo_lwt.ml b/examples/echo_lwt.ml index e1b9e196..c66184b2 100644 --- a/examples/echo_lwt.ml +++ b/examples/echo_lwt.ml @@ -1,7 +1,7 @@ open Tiny_httpd_core module Log = Tiny_httpd.Log module MFD = Tiny_httpd_multipart_form_data -module Task = Tiny_httpd_lwt.Task +module Lwt_direct = Tiny_httpd_lwt.Lwt_direct let now_ = Unix.gettimeofday @@ -151,11 +151,11 @@ let () = let ev = new Lwt_engine.libev () in Lwt_engine.set ev; - Lwt_main.run @@ Task.run + Lwt_main.run @@ Lwt_direct.run @@ fun () -> let server = Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j () - |> Task.await + |> Lwt_direct.await in Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; diff --git a/src/lwt/task.ml b/src/lwt/lwt_direct.ml similarity index 55% rename from src/lwt/task.ml rename to src/lwt/lwt_direct.ml index 15c29b25..f989198b 100644 --- a/src/lwt/task.ml +++ b/src/lwt/lwt_direct.ml @@ -1,17 +1,16 @@ module ED = Effect.Deep -type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t (** Queue of microtasks that are ready *) let tasks : (unit -> unit) Queue.t = Queue.create () let[@inline] push_task f : unit = Queue.push f tasks -let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref = - ref (fun exn bt -> - Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt)) +let default_on_uncaught_exn exn bt = + Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) let run_all_tasks () : unit = let n_processed = ref 0 in @@ -22,16 +21,21 @@ let run_all_tasks () : unit = try t () with exn -> let bt = Printexc.get_raw_backtrace () in - !on_uncaught_exn exn bt + default_on_uncaught_exn exn bt done; (* make sure we don't sleep forever if there's no lwt promise ready but [tasks] contains ready tasks *) if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) -let () = - let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in - let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in - () +let setup_hooks = + let already_done = ref false in + fun () -> + if not !already_done then ( + already_done := true; + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + ) let await (fut : 'a Lwt.t) : 'a = match Lwt.state fut with @@ -39,10 +43,13 @@ let await (fut : 'a Lwt.t) : 'a = | Lwt.Fail exn -> raise exn | Lwt.Sleep -> Effect.perform (Await fut) +let yield () : unit = Effect.perform Yield + (** the main effect handler *) let handler : _ ED.effect_handler = let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = function + | Yield -> Some (fun k -> push_task (fun () -> ED.continue k ())) | Await fut -> Some (fun k -> @@ -51,10 +58,10 @@ let handler : _ ED.effect_handler = (fun exn -> push_task (fun () -> ED.discontinue k exn))) | _ -> None in - { effc } -let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = +let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : + unit = let res = ref (Error (Failure "not resolved")) in let run_f_and_set_res () = (try @@ -66,10 +73,21 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = ED.try_with run_f_and_set_res () handler let run f : _ Lwt.t = + setup_hooks (); let lwt, resolve = Lwt.wait () in - push_task (run_inside_effect_handler_ resolve f); + push_task (run_inside_effect_handler_and_resolve_ resolve f); lwt -let run_async f : unit = ignore (run f : unit Lwt.t) +let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = + let run_f () : unit = + try f () + with exn -> + let bt = Printexc.get_raw_backtrace () in + on_uncaught_exn exn bt + in + ED.try_with run_f () handler -(* TODO: yield, use that in loops? *) +let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit + = + setup_hooks (); + push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) diff --git a/src/lwt/lwt_direct.mli b/src/lwt/lwt_direct.mli new file mode 100644 index 00000000..e1cbd422 --- /dev/null +++ b/src/lwt/lwt_direct.mli @@ -0,0 +1,33 @@ +(** Direct style control flow for Lwt. *) + +val run : (unit -> 'a) -> 'a Lwt.t +(** [run f] runs the function [f ()] in a task within + the [Lwt_unix] event loop. [f ()] can create [Lwt] + promises and use {!await} to wait for them. Like any promise + in Lwt, [f ()] can starve the event loop if it runs long computations + without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise + [run f] is resolved with [f ()]'s result, or the exception + raised by [f ()]. *) + +val run_in_the_background : + ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> + (unit -> unit) -> + unit +(** [run_in_the_background f] is similar to [ignore (run f)]. + The computation [f()] runs in the background in the event loop + and returns no result. + @param on_uncaught_exn if provided, this is called when [f()] + raises an exception. *) + +val yield : unit -> unit +(** Yield to the event loop. + Can only be used inside {!run} or {!run_in_the_background}. *) + +val await : 'a Lwt.t -> 'a +(** [await prom] returns the result of [prom], or re-raises the + exception with which [prom] failed if it failed. + If [prom] is not resolved yet, [await prom] will suspend the + current task and resume it when [prom] is resolved. + Can only be used inside {!run} or {!run_in_the_background}. *) diff --git a/src/lwt/task.mli b/src/lwt/task.mli deleted file mode 100644 index 7b326dc0..00000000 --- a/src/lwt/task.mli +++ /dev/null @@ -1,9 +0,0 @@ -(** Direct style tasks for Lwt *) - -val run : (unit -> 'a) -> 'a Lwt.t -(** Run a microtask *) - -val run_async : (unit -> unit) -> unit - -val await : 'a Lwt.t -> 'a -(** Can only be used inside {!run} *) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index e78926ed..f21d6606 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -3,7 +3,7 @@ module H = Tiny_httpd.Server module Pool = Tiny_httpd.Pool module Slice = IO.Slice module Log = Tiny_httpd.Log -module Task = Task +module Lwt_direct = Lwt_direct let spf = Printf.sprintf let ( let@ ) = ( @@ ) @@ -37,33 +37,33 @@ let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : assert (sl.len = 0); sl.off <- 0; let n = - Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Task.await + Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await in sl.len <- n method close () = decr num_open; - if !num_open <= 0 then Lwt_unix.close fd |> Task.await + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await end let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : IO.Output.t = object inherit IO.Output.t_from_output ~bytes () - (* method flush () : unit = Lwt_io.flush oc |> Task.await *) + (* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *) method private output_underlying buf i len = let i = ref i in let len = ref len in while !len > 0 do - let n = Lwt_unix.write fd buf !i !len |> Task.await in + let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in i := !i + n; len := !len - n done method private close_underlying () = decr num_open; - if !num_open <= 0 then Lwt_unix.close fd |> Task.await + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await end let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size @@ -80,7 +80,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size | addr, port, None -> let addr = Option.value ~default:"127.0.0.1" addr in let sockaddr, port = - match Lwt_unix.getaddrinfo addr "" [] |> Task.await, port with + match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None -> let p = 8080 in Unix.ADDR_INET (h, p), p @@ -115,7 +115,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let port = ref port in let server_loop : unit Lwt.t = - let@ () = Task.run in + let@ () = Lwt_direct.run in let backlog = max_connections in let sock = Lwt_unix.socket ~cloexec:true @@ -126,7 +126,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None; Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true; Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true; - Lwt_unix.bind sock sockaddr |> Task.await; + Lwt_unix.bind sock sockaddr |> Lwt_direct.await; Lwt_unix.listen sock backlog; (* recover real port, if any *) @@ -136,8 +136,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; - let@ () = Task.run_async in - + Lwt_direct.run_in_the_background @@ fun () -> let cleanup () = Log.debug (fun k -> k "Tiny_httpd_lwt: client handler returned"); @@ -169,7 +168,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size in while Atomic.get running do - let fd, addr = Lwt_unix.accept sock |> Task.await in + let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in handle_client addr fd done in @@ -181,21 +180,21 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (fun () -> Atomic.set running false; Lwt.wakeup_later set_server_done (); - Task.await server_loop); + Lwt_direct.await server_loop); endpoint = (fun () -> addr, !port); active_connections = (fun () -> Atomic.get active_conns); } in after_init tcp_server; - Task.await server_done); + Lwt_direct.await server_done); } end in (module M) let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size ?middlewares () : H.t Lwt.t = - let@ () = Task.run in + let@ () = Lwt_direct.run in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli index 4b0fabf5..2ef9fa1c 100644 --- a/src/lwt/tiny_httpd_lwt.mli +++ b/src/lwt/tiny_httpd_lwt.mli @@ -6,7 +6,7 @@ {b NOTE}: this is very experimental and will absolutely change over time, @since NEXT_RELEASE *) -module Task = Task +module Lwt_direct = Lwt_direct type 'a with_args = ?addr:string -> From 76cefc0991620e13ff931f8f82a62cd122abad94 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 16 Jul 2025 22:53:26 -0400 Subject: [PATCH 09/11] cleanup for lwt --- src/lwt/tiny_httpd_lwt.ml | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index f21d6606..8fa694e5 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -129,6 +129,8 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size Lwt_unix.bind sock sockaddr |> Lwt_direct.await; Lwt_unix.listen sock backlog; + let cleanup () = Lwt_unix.close sock |> Lwt_direct.await in + (* recover real port, if any *) (match Unix.getsockname (Lwt_unix.unix_file_descr sock) with | Unix.ADDR_INET (_, p) -> port := p @@ -137,12 +139,6 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; Lwt_direct.run_in_the_background @@ fun () -> - let cleanup () = - Log.debug (fun k -> - k "Tiny_httpd_lwt: client handler returned"); - Atomic.decr active_conns - in - let buf_ic = Bytes.create buf_size in let buf_oc = Bytes.create buf_size in (* @@ -154,6 +150,16 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let num_open = ref 2 in let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in + + let cleanup () = + Log.debug (fun k -> + k "Tiny_httpd_lwt: client handler returned"); + Atomic.decr active_conns; + (try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> ()); + ic#close (); + oc#close () + in + try handle.handle ~client_addr ic oc; cleanup () @@ -167,10 +173,16 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (Printexc.raw_backtrace_to_string bt)) in - while Atomic.get running do - let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in - handle_client addr fd - done + try + while Atomic.get running do + let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in + handle_client addr fd + done; + cleanup () + with exn -> + let bt = Printexc.get_raw_backtrace () in + cleanup (); + Printexc.raise_with_backtrace exn bt in let tcp_server : IO.TCP_server.t = From 5caef1494519aa4f3e727f73ea14bc98acc37bfb Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Wed, 16 Jul 2025 22:58:21 -0400 Subject: [PATCH 10/11] buffer pool for lwt server --- src/lwt/tiny_httpd_lwt.ml | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index 8fa694e5..43141363 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -111,6 +111,14 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let running = Atomic.make true in let active_conns = Atomic.make 0 in + (* a pool of buffers, to reduce allocations *) + let buf_pool = + Pool.create ~max_size:pool_size + ~clear:(fun buf -> Bytes.fill buf 0 (Bytes.length buf) '\x00') + ~mk_item:(fun () -> Bytes.create buf_size) + () + in + (* Eio.Switch.on_release sw (fun () -> Atomic.set running false); *) let port = ref port in @@ -139,33 +147,31 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; Lwt_direct.run_in_the_background @@ fun () -> - let buf_ic = Bytes.create buf_size in - let buf_oc = Bytes.create buf_size in - (* let@ buf_ic = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in -*) (* close FD when both ends are closed *) let num_open = ref 2 in let ic = ic_of_fd ~num_open ~bytes:buf_ic fd in let oc = oc_of_fd ~num_open ~bytes:buf_oc fd in - let cleanup () = + let cleanup ~shutdown () = Log.debug (fun k -> k "Tiny_httpd_lwt: client handler returned"); Atomic.decr active_conns; - (try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> ()); + if shutdown then ( + try Lwt_unix.shutdown fd SHUTDOWN_ALL with _ -> () + ); ic#close (); oc#close () in try handle.handle ~client_addr ic oc; - cleanup () + cleanup ~shutdown:true () with exn -> let bt = Printexc.get_raw_backtrace () in - cleanup (); + cleanup ~shutdown:false (); Log.error (fun k -> k "Client handler for %s failed with %s\n%s" (show_sockaddr client_addr) From 7e06203b14b87d2394b4f7b0bd72879cf990c214 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 28 Jul 2025 14:33:43 -0400 Subject: [PATCH 11/11] update lwt_direct --- src/lwt/lwt_direct.ml | 91 ++++++++++++++++++++++----------------- src/lwt/lwt_direct.mli | 90 ++++++++++++++++++++++++++------------ src/lwt/tiny_httpd_lwt.ml | 6 +-- 3 files changed, 116 insertions(+), 71 deletions(-) diff --git a/src/lwt/lwt_direct.ml b/src/lwt/lwt_direct.ml index f989198b..704a49ab 100644 --- a/src/lwt/lwt_direct.ml +++ b/src/lwt/lwt_direct.ml @@ -1,42 +1,57 @@ -module ED = Effect.Deep +(* Direct-style wrapper for Lwt code -type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t + The implementation of the direct-style wrapper relies on ocaml5's effect + system capturing continuations and adding them as a callback to some lwt + promises. *) -(** Queue of microtasks that are ready *) -let tasks : (unit -> unit) Queue.t = Queue.create () +(* part 1: tasks, getting the scheduler to call them *) +let tasks : (unit -> unit) Queue.t = Queue.create () let[@inline] push_task f : unit = Queue.push f tasks -let default_on_uncaught_exn exn bt = - Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt) +let absolute_max_number_of_steps = + (* TODO 6.0: what's a good number here? should it be customisable? *) + 10_000 let run_all_tasks () : unit = let n_processed = ref 0 in - let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in + let max_number_of_steps = + min absolute_max_number_of_steps (2 * Queue.length tasks) + in while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do let t = Queue.pop tasks in incr n_processed; try t () with exn -> - let bt = Printexc.get_raw_backtrace () in - default_on_uncaught_exn exn bt + (* TODO 6.0: change async_exception handler to accept a backtrace, pass it + here and at the other use site. *) + (* TODO 6.0: this and other try-with: respect exception-filter *) + !Lwt.async_exception_hook exn done; - (* make sure we don't sleep forever if there's no lwt promise - ready but [tasks] contains ready tasks *) - if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) + (* In the case where there are no promises ready for wakeup, the scheduler's + engine will pause until some IO completes. There might never be completed + IO, depending on the program structure and the state of the world. If this + happens and the queue is not empty, we add a [pause] so that the engine has + something to wakeup for so that the rest of the queue can be processed. *) + if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then + ignore (Lwt.pause () : unit Lwt.t) let setup_hooks = let already_done = ref false in fun () -> if not !already_done then ( already_done := true; + (* TODO 6.0: assess whether we should have both hooks or just one (which + one). Tempted to say we should only have the enter hook. *) let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in () ) +(* part 2: effects, performing them *) + +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t + let await (fut : 'a Lwt.t) : 'a = match Lwt.state fut with | Lwt.Return x -> x @@ -45,49 +60,45 @@ let await (fut : 'a Lwt.t) : 'a = let yield () : unit = Effect.perform Yield -(** the main effect handler *) -let handler : _ ED.effect_handler = - let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = - function - | Yield -> Some (fun k -> push_task (fun () -> ED.continue k ())) +(* part 3: handling effects *) + +let handler : _ Effect.Deep.effect_handler = + let effc : type b. + b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function + | Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ())) | Await fut -> Some (fun k -> Lwt.on_any fut - (fun res -> push_task (fun () -> ED.continue k res)) - (fun exn -> push_task (fun () -> ED.discontinue k exn))) + (fun res -> push_task (fun () -> Effect.Deep.continue k res)) + (fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn))) | _ -> None in { effc } +(* part 4: putting it all together: running tasks *) + let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = - let res = ref (Error (Failure "not resolved")) in let run_f_and_set_res () = - (try - let r = f () in - res := Ok r - with exn -> res := Error exn); - Lwt.wakeup_result promise !res + match f () with + | res -> Lwt.wakeup promise res + | exception exc -> Lwt.wakeup_exn promise exc in - ED.try_with run_f_and_set_res () handler + Effect.Deep.try_with run_f_and_set_res () handler -let run f : _ Lwt.t = +let spawn f : _ Lwt.t = setup_hooks (); let lwt, resolve = Lwt.wait () in push_task (run_inside_effect_handler_and_resolve_ resolve f); lwt -let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = - let run_f () : unit = - try f () - with exn -> - let bt = Printexc.get_raw_backtrace () in - on_uncaught_exn exn bt - in - ED.try_with run_f () handler +(* part 4 (encore): running a task in the background *) + +let run_inside_effect_handler_in_the_background_ f () : unit = + let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in + Effect.Deep.try_with run_f () handler -let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit - = +let spawn_in_the_background f : unit = setup_hooks (); - push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) + push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/lwt/lwt_direct.mli b/src/lwt/lwt_direct.mli index e1cbd422..b964ab78 100644 --- a/src/lwt/lwt_direct.mli +++ b/src/lwt/lwt_direct.mli @@ -1,33 +1,67 @@ -(** Direct style control flow for Lwt. *) - -val run : (unit -> 'a) -> 'a Lwt.t -(** [run f] runs the function [f ()] in a task within - the [Lwt_unix] event loop. [f ()] can create [Lwt] - promises and use {!await} to wait for them. Like any promise - in Lwt, [f ()] can starve the event loop if it runs long computations - without yielding to the event loop. - - When [f ()] terminates (successfully or not), the promise - [run f] is resolved with [f ()]'s result, or the exception - raised by [f ()]. *) - -val run_in_the_background : - ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> - (unit -> unit) -> - unit -(** [run_in_the_background f] is similar to [ignore (run f)]. - The computation [f()] runs in the background in the event loop - and returns no result. - @param on_uncaught_exn if provided, this is called when [f()] - raises an exception. *) +(** Direct style control flow for Lwt. + + This module relies on OCaml 5's + {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of + chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it + becomes possible to start lightweight "tasks" using + [Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in + direct-style code, using OCaml's standard control flow structures such as + loops, higher-order functions, exception handlers, [match], etc. + + Interactions with the rest of lwt can be done using [await], for example: + + {[ + Lwt_direct.spawn (fun () -> + let continue = ref true in + while !continue do + match Lwt_io.read_line in_channel |> Lwt_direct.await with + | exception End_of_file -> continue := false + | line -> + let uppercase_line = String.uppercase_ascii line in + Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await + done) + ]} + + This code snippet contains a simple "task" that repeatedly reads a line from + a [Lwt_io] channel, uppercases it, and writes the uppercase version to + another channel. + + This task is itself a [unit Lwt.t], which is resolved when the function + returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore + the result and let the task run in the background instead. *) + +val spawn : (unit -> 'a) -> 'a Lwt.t +(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event + loop. [f ()] can create [Lwt] promises and use {!await} to wait for them. + Like any promise in Lwt, [f ()] can starve the event loop if it runs long + computations without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise [spawn f] is + resolved with [f ()]'s result, or the exception raised by [f ()]. + + The promise returned by [spawn f] is not cancellable. Canceling it will have + no effect. *) + +val spawn_in_the_background : (unit -> unit) -> unit +(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The + computation [f()] runs in the background in the event loop and returns no + result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called. +*) val yield : unit -> unit (** Yield to the event loop. - Can only be used inside {!run} or {!run_in_the_background}. *) + + Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) val await : 'a Lwt.t -> 'a -(** [await prom] returns the result of [prom], or re-raises the - exception with which [prom] failed if it failed. - If [prom] is not resolved yet, [await prom] will suspend the - current task and resume it when [prom] is resolved. - Can only be used inside {!run} or {!run_in_the_background}. *) +(** [await prom] returns the result of [prom], or re-raises the exception with + which [prom] failed if it failed. If [prom] is not resolved yet, + [await prom] will suspend the current task and resume it when [prom] is + resolved. + + Calling [await] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) + diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index 43141363..af7f1dfd 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -123,7 +123,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let port = ref port in let server_loop : unit Lwt.t = - let@ () = Lwt_direct.run in + let@ () = Lwt_direct.spawn in let backlog = max_connections in let sock = Lwt_unix.socket ~cloexec:true @@ -146,7 +146,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; - Lwt_direct.run_in_the_background @@ fun () -> + Lwt_direct.spawn_in_the_background @@ fun () -> let@ buf_ic = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in @@ -212,7 +212,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size ?middlewares () : H.t Lwt.t = - let@ () = Lwt_direct.run in + let@ () = Lwt_direct.spawn in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size ()