From d88840a3619a5c98dc777ed055a2f91cc57df9d5 Mon Sep 17 00:00:00 2001 From: Gabriel Scherer Date: Mon, 19 May 2025 07:53:25 +0100 Subject: [PATCH 1/3] Lazy.Atomic_repeating: concurrency-safe lazy thunks --- Changes | 4 + stdlib/.depend | 2 + stdlib/lazy.ml | 61 +++++++++++ stdlib/lazy.mli | 147 +++++++++++++++++++++++++ testsuite/tests/lib-lazy/repeating.ml | 152 ++++++++++++++++++++++++++ 5 files changed, 366 insertions(+) create mode 100644 testsuite/tests/lib-lazy/repeating.ml diff --git a/Changes b/Changes index d05ccc7ffbfa..f94e7338cf99 100644 --- a/Changes +++ b/Changes @@ -32,6 +32,10 @@ Working version (Nicolás Ojeda Bär, review by Daniel Bünzli, Gabriel Scherer and David Allsopp) +- #14043: Lazy.Atomic_repeating.t: concurrency-safe lazy thunks that may + repeat initialization computation on forcing races. + (Gabriel Scherer, review by KC Sivaramakrishnan and ??) + ### Other libraries: - #14020: Add Unix.unsetenv. diff --git a/stdlib/.depend b/stdlib/.depend index 1420b0094c66..706cec86cade 100644 --- a/stdlib/.depend +++ b/stdlib/.depend @@ -481,10 +481,12 @@ stdlib__Int64.cmi : int64.mli stdlib__Lazy.cmo : lazy.ml \ stdlib__Obj.cmi \ camlinternalLazy.cmi \ + stdlib__Atomic.cmi \ stdlib__Lazy.cmi stdlib__Lazy.cmx : lazy.ml \ stdlib__Obj.cmx \ camlinternalLazy.cmx \ + stdlib__Atomic.cmx \ stdlib__Lazy.cmi stdlib__Lazy.cmi : lazy.mli \ camlinternalLazy.cmi diff --git a/stdlib/lazy.ml b/stdlib/lazy.ml index 553c0caff288..6fedcfdb4de1 100644 --- a/stdlib/lazy.ml +++ b/stdlib/lazy.ml @@ -78,3 +78,64 @@ let map_val f x = if is_val x then from_val (f (force x)) else lazy (f (force x)) + + + +module Atomic_repeating = struct + (* we define these as primitives to avoid a dependency on Printexc *) + type raw_backtrace + external get_raw_backtrace: + unit -> raw_backtrace = "caml_get_exception_raw_backtrace" + external raise_with_backtrace: exn -> raw_backtrace -> 'a + = "%raise_with_backtrace" + + type 'a ops = { + make : unit -> 'a; + discard : 'a -> unit; + } + + type 'a state = + | Thunk of 'a ops + | Forcing of 'a ops + | Val of 'a + | Failed of exn * raw_backtrace + + type 'a t = 'a state Atomic.t + + let from_val v = Atomic.make (Val v) + let from_fun ?(discard = ignore) f = + Atomic.make (Thunk { make = f; discard }) + + let finish ops = + match ops.make () with + | exception exn -> + let bt = get_raw_backtrace () in + Failed (exn, bt) + | v -> + Val v + + let rec force th = + match Atomic.get th with + | Val v -> v + | Failed (exn, bt) -> + raise_with_backtrace exn bt + | (Thunk ops) as thunk -> + (* [compare_and_set] returns [false] when another domain has + set the thunk to [Forcing] or a finished state. *) + ignore (Atomic.compare_and_set th thunk (Forcing ops)); + force th + | (Forcing ops) as forcing -> + let finished = finish ops in + (* [compare_and_set] returns [false] when another domain has + set the thunk to a finished state. In this case our + [finished] value is discarded. *) + if not (Atomic.compare_and_set th forcing finished) + then begin match finished with + | Val v -> + (* Ignore exceptions raised by discard: we already + have a finished result to return. *) + (try ops.discard v with _ -> ()) + | Thunk _ | Forcing _ | Failed _ -> () + end; + force th +end diff --git a/stdlib/lazy.mli b/stdlib/lazy.mli index 0dbdadbd30e0..f299d027c100 100644 --- a/stdlib/lazy.mli +++ b/stdlib/lazy.mli @@ -139,3 +139,150 @@ val force_val : 'a t -> 'a @raise Undefined (see {!Undefined}). *) + +module Atomic_repeating : sig + (** Atomic, repeating deferred computations. + + This implementation is less optimized than [Lazy.t], but it can + be used in a concurrent setting. + + OCaml domains do not provide a common abstraction to block on + another computation. Forcing an [Atomic_repeating.t] thunk does + not block when races happen, instead it may repeat the + computation of the result several times. We do guarantee that + calling {!force} on the same suspended computation will always + return the same value, even in presence of forcing races. + + A typical use-case for atomic, repeating deferred computations is + optional library initialization code that is moderately + expensive, or acquires resources. The library author does not + want to do this work on startup, because it may not be needed, + but using ['a Lazy.t] is incorrect if the library may be used in + concurrent settings. ['a Lazy.Atomic_repeating.t] can be used, as + long as the fact that duplications are repeated is acceptable. + + {b Warning}: ['a Lazy.t] contains a protection against recursively + forcing a thunk, it will raise {!Undefined}. On the other hand, + ['a Lazy.Atomic_repeating.t] will recursively repeat the computation, + which may loop. + + See {{!examples} the examples} below. + *) + + type 'a t + (* A value of type ['a Lazy.Atomic_repeating.t] is similar to a value + of type ['a Lazy.t], it represents a deferred computation, but it can + safely be used in concurrent settings. + + If a calling domain attempts to {!force} a value that is already + being forced, the calling domain is not suspended. Instead, the + computation of the value will be repeated on the calling + domain. In other words, [Atomic_repeating.t] can duplicate + computations. + + The implementation ensures that all call to {!force} return the + same value or raise the same exception: if a repeated terminates + on a result, its value or exception will be discarded. + *) + + val from_val : 'a -> 'a t + (** [from_val v] is a deferred computation which is already + finished and whose result is the value [v]. *) + + val from_fun : ?discard:('a -> unit) -> (unit -> 'a) -> 'a t + (** [from_fun ?discard f] is a deferred computation that will call + [f] when forced. Note that [f] may be called several times + in the case of concurrent races. + + If [f] is called several times, one result will be stored as the + result of this computation, and the value of any other result + will be passed to the [discard] function -- a no-op by default. + Exceptions raised by [discard] are themselves discarded. *) + + val force : 'a t -> 'a + (** [force x] forces the suspension [x]. If [x] has + already been forced, [Lazy.force x] returns the same value again without + recomputing it. If it raised an exception, the same exception is raised + again. + + If there is a race between several calls to [force], the + computation may be repeated and some of the finished results + will be discarded. All forcing calls will return the same result. *) + + + (** {1:examples Examples} + + A typical use-case is to initialize some library-local + state that is used by library functions. + + {[ + let config = Lazy.Atomic_repeating.from_fun (fun () -> + match Sys.getenv "MYLIB_CONFIG_PATH" with + | exception _ -> Config.default () + | path -> Config.read_from_path path + ) + ]} + + The environment access and file read may be repeated several + times in the case of concurrent forcing, but the "first" + configuration to be computed will be returned by all callers. + + {3:examples_discard Using the [?discard] parameter.} + + The [?discard] argument is useful to release resources if + a repeated result is discarded. + + {[ + let log_file_and_channel = + let acquire () = + match Sys.getenv "MYLIB_LOG_PATH" with + | exception _ -> + let path, chan = Filename.open_temp_file "mylib" ".log" in + (`Temp path), chan + | path -> + let chan = Out_channel.open_bin path in + (`User path), chan + in + let discard (source, chan) = + Out_channel.close chan; + match source with + | `User _ -> () + | `Temp path -> Sys.remove path + in + Lazy.Atomic_repeating.from_fun ~discard acquire + ]} + + {3:examples_sync User synchronization} + + Users of this module can add their own synchronization logic to + avoid repeated computations. For example, in an application + which uses threads and mutex: + + {[ + let entropy = + (* we use a mibibyte of random data from /dev/urandom *) + let init_mutex = Mutex.create () in + let result = ref None in + Lazy.Atomic_repeating.from_fun (fun () -> + Mutex.protect init_mutex (fun () -> + match !result with + | Some v -> v + | None -> + let v = + In_channel.with_open_bin "/dev/urandom" (fun chan -> + In_channel.really_input_string chan (1024 * 1024) + ) + in + result := Some v; + v + ) + ) + ]} + + A program using this definition will open "/dev/urandom" at most + once. Note that the mutex is only taken on [force] calls that + happen while the initialization is not yet finished -- typically + the first call, or possibly several concurrent first calls. Once + initialization is finished, the value will be returned directly. + *) +end diff --git a/testsuite/tests/lib-lazy/repeating.ml b/testsuite/tests/lib-lazy/repeating.ml new file mode 100644 index 000000000000..2796f8d6c927 --- /dev/null +++ b/testsuite/tests/lib-lazy/repeating.ml @@ -0,0 +1,152 @@ +(* TEST + expect; +*) + +module LazyAR = Lazy.Atomic_repeating +[%%expect{| +module LazyAR = Lazy.Atomic_repeating +|}] + +(* direct value return *) +let it = + let v = LazyAR.from_val 42 in + (LazyAR.force v, LazyAR.force v) +[%%expect{| +val it : int * int = (42, 42) +|}] + +(* value return *) +let it = + let v = LazyAR.from_fun (fun () -> 43) in + (LazyAR.force v, LazyAR.force v) +[%%expect{| +val it : int * int = (43, 43) +|}] + + +(* exception case *) +let it = + let fail = LazyAR.from_fun (fun () -> raise Exit) in + let check () = match LazyAR.force fail with + | exception Exit -> true + | exception _ | _ -> false + in + check () && check () +[%%expect{| +val it : bool = true +|}] + +(* sharing check *) +let it = + let r = ref 0 in + let test = LazyAR.from_fun (fun () -> incr r) in + (* side-effects must not be repeated in sequential settings. *) + LazyAR.force test; + LazyAR.force test; + if !r = 1 then Ok () else Error !r +[%%expect{| +val it : (unit, int) result = Ok () +|}] + +(* Fake concurrency tests : we can use reentrancy to emulate concurrency. *) +let it = + let step = ref 0 in + let thunk = + let self = ref (LazyAR.from_fun (fun () -> 500)) in + self := begin + (* The first call to reach !step = 100 will finish with the value 0. + Other calls will finish with higher values, but those will be discarded. *) + let discard n = + if n = 0 then prerr_endline "Discard error!" + in + LazyAR.from_fun ~discard (fun () -> + if !step >= 100 then 0 + else (incr step; LazyAR.force !self + 1) + ) + end; + !self + in + let result1 = LazyAR.force thunk in + let result2 = LazyAR.force thunk in + if result1 = 0 && result2 = 0 && !step = 100 + then Ok () + else Error (~result1, ~result2, ~step:!step) +[%%expect{| +val it : (unit, result1:int * result2:int * step:int) result = Ok () +|}] + + +(* Check that the documentation examples are well-typed. *) +module Example1 (Config : sig + type t + val default : unit -> t + val read_from_path : string -> t +end) = struct + let config = Lazy.Atomic_repeating.from_fun (fun () -> + match Sys.getenv "MYLIB_CONFIG_PATH" with + | exception _ -> Config.default () + | path -> Config.read_from_path path + ) +end +[%%expect{| +module Example1 : + (Config : sig + type t + val default : unit -> t + val read_from_path : string -> t + end) + -> sig val config : Config.t Lazy.Atomic_repeating.t end +|}] + +module Example2 () = struct + let log_file_and_channel = + let acquire () = + match Sys.getenv "MYLIB_LOG_PATH" with + | exception _ -> + let path, chan = Filename.open_temp_file "mylib" ".log" in + (`Temp path), chan + | path -> + let chan = Out_channel.open_bin path in + (`User path), chan + in + let discard (source, chan) = + Out_channel.close chan; + match source with + | `User _ -> () + | `Temp path -> Sys.remove path + in + Lazy.Atomic_repeating.from_fun ~discard acquire +end +[%%expect{| +module Example2 : + () -> + sig + val log_file_and_channel : + ([ `Temp of string | `User of string ] * Out_channel.t) + Lazy.Atomic_repeating.t + end +|}] + +module Example3 = struct + let entropy = + (* we use a mibibyte of random data from /dev/urandom *) + let init_mutex = Mutex.create () in + let result = ref None in + Lazy.Atomic_repeating.from_fun (fun () -> + Mutex.protect init_mutex (fun () -> + match !result with + | Some v -> v + | None -> + let v = + In_channel.with_open_bin "/dev/urandom" (fun chan -> + In_channel.really_input_string chan (1024 * 1024) + ) + in + result := Some v; + v + ) + ) +end +[%%expect {| +module Example3 : sig val entropy : string option Lazy.Atomic_repeating.t end +|}] From 660be08a3ee9b76e92cf9a3fc0f17cc5dfa99e4a Mon Sep 17 00:00:00 2001 From: Gabriel Scherer Date: Thu, 22 May 2025 09:15:18 +0200 Subject: [PATCH 2/3] Lazy.Atomic_repeating: always re-raise exceptions, never discard them Suggested-by: Pierre Chambart --- Changes | 3 ++- stdlib/lazy.ml | 42 +++++++++++++++++++++--------------------- stdlib/lazy.mli | 27 ++++++++++++++++----------- 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/Changes b/Changes index f94e7338cf99..99505e09023e 100644 --- a/Changes +++ b/Changes @@ -34,7 +34,8 @@ Working version - #14043: Lazy.Atomic_repeating.t: concurrency-safe lazy thunks that may repeat initialization computation on forcing races. - (Gabriel Scherer, review by KC Sivaramakrishnan and ??) + (Gabriel Scherer, + review by KC Sivaramakrishnan and Pierre Chambart and ??) ### Other libraries: diff --git a/stdlib/lazy.ml b/stdlib/lazy.ml index 6fedcfdb4de1..9b87164b79d2 100644 --- a/stdlib/lazy.ml +++ b/stdlib/lazy.ml @@ -106,14 +106,6 @@ module Atomic_repeating = struct let from_fun ?(discard = ignore) f = Atomic.make (Thunk { make = f; discard }) - let finish ops = - match ops.make () with - | exception exn -> - let bt = get_raw_backtrace () in - Failed (exn, bt) - | v -> - Val v - let rec force th = match Atomic.get th with | Val v -> v @@ -125,17 +117,25 @@ module Atomic_repeating = struct ignore (Atomic.compare_and_set th thunk (Forcing ops)); force th | (Forcing ops) as forcing -> - let finished = finish ops in - (* [compare_and_set] returns [false] when another domain has - set the thunk to a finished state. In this case our - [finished] value is discarded. *) - if not (Atomic.compare_and_set th forcing finished) - then begin match finished with - | Val v -> - (* Ignore exceptions raised by discard: we already - have a finished result to return. *) - (try ops.discard v with _ -> ()) - | Thunk _ | Forcing _ | Failed _ -> () - end; - force th + begin match ops.make () with + | exception exn -> + let bt = get_raw_backtrace () in + let failed = Failed (exn, bt) in + (* [compare_and_set] returns [false] when another domain + has set the thunk to a finished state. We re-raise our + exception in any case to avoid losing it. *) + ignore (Atomic.compare_and_set th forcing failed); + raise_with_backtrace exn bt + | v -> + (* [compare_and_set] returns [false] when another domain + has set the thunk to a finished state. In this case we + [discard] our value, and reuse the finished state. *) + if Atomic.compare_and_set th forcing (Val v) + then v + else begin + (* Exceptions from [discard] are propagated to the caller. *) + ops.discard v; + force th + end + end end diff --git a/stdlib/lazy.mli b/stdlib/lazy.mli index f299d027c100..2a9482e36559 100644 --- a/stdlib/lazy.mli +++ b/stdlib/lazy.mli @@ -150,8 +150,9 @@ module Atomic_repeating : sig another computation. Forcing an [Atomic_repeating.t] thunk does not block when races happen, instead it may repeat the computation of the result several times. We do guarantee that - calling {!force} on the same suspended computation will always - return the same value, even in presence of forcing races. + if two calls to {!force} on the same suspended computation return + a value, then they return the same value, even in presence of + forcing races. A typical use-case for atomic, repeating deferred computations is optional library initialization code that is moderately @@ -195,19 +196,23 @@ module Atomic_repeating : sig in the case of concurrent races. If [f] is called several times, one result will be stored as the - result of this computation, and the value of any other result - will be passed to the [discard] function -- a no-op by default. - Exceptions raised by [discard] are themselves discarded. *) + result of this computation. Other values computed concurrently + will be discarded, after being passed to the [discard] function + (a no-op by default). On the other hand, exceptions raised by + concurrent computations will be re-raised, as well as exceptions + raised by [discard]. +*) val force : 'a t -> 'a - (** [force x] forces the suspension [x]. If [x] has - already been forced, [Lazy.force x] returns the same value again without - recomputing it. If it raised an exception, the same exception is raised - again. + (** [force x] forces the suspension [x]. If [x] has already been + forced, [Lazy.force x] returns the same value again without + recomputing it. If it raised an exception, the same exception is + raised again. If there is a race between several calls to [force], the - computation may be repeated and some of the finished results - will be discarded. All forcing calls will return the same result. *) + computation may be repeated several times. If some of them fail + with an exception, they will re-raise it; but all those that + return a value will return the same value. *) (** {1:examples Examples} From 479a4cbce8cc2b044f7f6c224018cf632c97691d Mon Sep 17 00:00:00 2001 From: Vincent Laviron Date: Thu, 22 May 2025 10:28:33 +0200 Subject: [PATCH 3/3] Non-repeating design --- stdlib/lazy.ml | 88 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/stdlib/lazy.ml b/stdlib/lazy.ml index 9b87164b79d2..ae996d091058 100644 --- a/stdlib/lazy.ml +++ b/stdlib/lazy.ml @@ -91,9 +91,15 @@ module Atomic_repeating = struct type 'a ops = { make : unit -> 'a; - discard : 'a -> unit; + wait : unit -> unit; + broadcast : unit -> unit } + type race_behaviour = + | Busy_wait + | Synchronise of { wait : unit -> unit; broadcast : unit -> unit } + | Fail + type 'a state = | Thunk of 'a ops | Forcing of 'a ops @@ -103,8 +109,15 @@ module Atomic_repeating = struct type 'a t = 'a state Atomic.t let from_val v = Atomic.make (Val v) - let from_fun ?(discard = ignore) f = - Atomic.make (Thunk { make = f; discard }) + let from_fun ?(race_behaviour = Fail) f = + let wait, broadcast = + match race_behaviour with + | Busy_wait -> + Fun.id, Fun.id + | Synchronise { wait; broadcast } -> wait, broadcast + | Fail -> (fun () -> raise Undefined), Fun.id + in + Atomic.make (Thunk { make = f; wait; broadcast }) let rec force th = match Atomic.get th with @@ -114,28 +127,55 @@ module Atomic_repeating = struct | (Thunk ops) as thunk -> (* [compare_and_set] returns [false] when another domain has set the thunk to [Forcing] or a finished state. *) - ignore (Atomic.compare_and_set th thunk (Forcing ops)); - force th + if Atomic.compare_and_set th thunk (Forcing ops) + then begin + match ops.make () with + | exception exn -> + let bt = get_raw_backtrace () in + let failed = Failed (exn, bt) in + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing failed); + ops.broadcast (); + raise_with_backtrace exn bt + | v -> + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing (Val v)); + v + end + else force th | (Forcing ops) as forcing -> - begin match ops.make () with + ops.wait (); + force th + + let rec force_non_blocking th = + match Atomic.get th with + | Val v -> Some v + | Failed (exn, bt) -> + raise_with_backtrace exn bt + | (Thunk ops) as thunk -> + (* [compare_and_set] returns [false] when another domain has + set the thunk to [Forcing] or a finished state. *) + if Atomic.compare_and_set th thunk (Forcing ops) + then begin + match ops.make () with | exception exn -> - let bt = get_raw_backtrace () in - let failed = Failed (exn, bt) in - (* [compare_and_set] returns [false] when another domain - has set the thunk to a finished state. We re-raise our - exception in any case to avoid losing it. *) - ignore (Atomic.compare_and_set th forcing failed); - raise_with_backtrace exn bt + let bt = get_raw_backtrace () in + let failed = Failed (exn, bt) in + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing failed); + ops.broadcast (); + raise_with_backtrace exn bt | v -> - (* [compare_and_set] returns [false] when another domain - has set the thunk to a finished state. In this case we - [discard] our value, and reuse the finished state. *) - if Atomic.compare_and_set th forcing (Val v) - then v - else begin - (* Exceptions from [discard] are propagated to the caller. *) - ops.discard v; - force th - end - end + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing (Val v)); + ops.broadcast (); + Some v + end + else force th + | (Forcing ops) as forcing -> + None end