diff --git a/Changes b/Changes index d05ccc7ffbfa..99505e09023e 100644 --- a/Changes +++ b/Changes @@ -32,6 +32,11 @@ Working version (Nicolás Ojeda Bär, review by Daniel Bünzli, Gabriel Scherer and David Allsopp) +- #14043: Lazy.Atomic_repeating.t: concurrency-safe lazy thunks that may + repeat initialization computation on forcing races. + (Gabriel Scherer, + review by KC Sivaramakrishnan and Pierre Chambart and ??) + ### Other libraries: - #14020: Add Unix.unsetenv. diff --git a/stdlib/.depend b/stdlib/.depend index 1420b0094c66..706cec86cade 100644 --- a/stdlib/.depend +++ b/stdlib/.depend @@ -481,10 +481,12 @@ stdlib__Int64.cmi : int64.mli stdlib__Lazy.cmo : lazy.ml \ stdlib__Obj.cmi \ camlinternalLazy.cmi \ + stdlib__Atomic.cmi \ stdlib__Lazy.cmi stdlib__Lazy.cmx : lazy.ml \ stdlib__Obj.cmx \ camlinternalLazy.cmx \ + stdlib__Atomic.cmx \ stdlib__Lazy.cmi stdlib__Lazy.cmi : lazy.mli \ camlinternalLazy.cmi diff --git a/stdlib/lazy.ml b/stdlib/lazy.ml index 553c0caff288..ae996d091058 100644 --- a/stdlib/lazy.ml +++ b/stdlib/lazy.ml @@ -78,3 +78,104 @@ let map_val f x = if is_val x then from_val (f (force x)) else lazy (f (force x)) + + + +module Atomic_repeating = struct + (* we define these as primitives to avoid a dependency on Printexc *) + type raw_backtrace + external get_raw_backtrace: + unit -> raw_backtrace = "caml_get_exception_raw_backtrace" + external raise_with_backtrace: exn -> raw_backtrace -> 'a + = "%raise_with_backtrace" + + type 'a ops = { + make : unit -> 'a; + wait : unit -> unit; + broadcast : unit -> unit + } + + type race_behaviour = + | Busy_wait + | Synchronise of { wait : unit -> unit; broadcast : unit -> unit } + | Fail + + type 'a state = + | Thunk of 'a ops + | Forcing of 'a ops + | Val of 'a + | Failed of exn * raw_backtrace + + type 'a t = 'a state Atomic.t + + let from_val v = Atomic.make (Val v) + let from_fun ?(race_behaviour = Fail) f = + let wait, broadcast = + match race_behaviour with + | Busy_wait -> + Fun.id, Fun.id + | Synchronise { wait; broadcast } -> wait, broadcast + | Fail -> (fun () -> raise Undefined), Fun.id + in + Atomic.make (Thunk { make = f; wait; broadcast }) + + let rec force th = + match Atomic.get th with + | Val v -> v + | Failed (exn, bt) -> + raise_with_backtrace exn bt + | (Thunk ops) as thunk -> + (* [compare_and_set] returns [false] when another domain has + set the thunk to [Forcing] or a finished state. *) + if Atomic.compare_and_set th thunk (Forcing ops) + then begin + match ops.make () with + | exception exn -> + let bt = get_raw_backtrace () in + let failed = Failed (exn, bt) in + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing failed); + ops.broadcast (); + raise_with_backtrace exn bt + | v -> + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing (Val v)); + v + end + else force th + | (Forcing ops) as forcing -> + ops.wait (); + force th + + let rec force_non_blocking th = + match Atomic.get th with + | Val v -> Some v + | Failed (exn, bt) -> + raise_with_backtrace exn bt + | (Thunk ops) as thunk -> + (* [compare_and_set] returns [false] when another domain has + set the thunk to [Forcing] or a finished state. *) + if Atomic.compare_and_set th thunk (Forcing ops) + then begin + match ops.make () with + | exception exn -> + let bt = get_raw_backtrace () in + let failed = Failed (exn, bt) in + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing failed); + ops.broadcast (); + raise_with_backtrace exn bt + | v -> + (* [compare_and_set] cannot return false, as only the thread that + managed to set to forcing can try to update it again. *) + ignore (Atomic.compare_and_set th forcing (Val v)); + ops.broadcast (); + Some v + end + else force th + | (Forcing ops) as forcing -> + None +end diff --git a/stdlib/lazy.mli b/stdlib/lazy.mli index 0dbdadbd30e0..2a9482e36559 100644 --- a/stdlib/lazy.mli +++ b/stdlib/lazy.mli @@ -139,3 +139,155 @@ val force_val : 'a t -> 'a @raise Undefined (see {!Undefined}). *) + +module Atomic_repeating : sig + (** Atomic, repeating deferred computations. + + This implementation is less optimized than [Lazy.t], but it can + be used in a concurrent setting. + + OCaml domains do not provide a common abstraction to block on + another computation. Forcing an [Atomic_repeating.t] thunk does + not block when races happen, instead it may repeat the + computation of the result several times. We do guarantee that + if two calls to {!force} on the same suspended computation return + a value, then they return the same value, even in presence of + forcing races. + + A typical use-case for atomic, repeating deferred computations is + optional library initialization code that is moderately + expensive, or acquires resources. The library author does not + want to do this work on startup, because it may not be needed, + but using ['a Lazy.t] is incorrect if the library may be used in + concurrent settings. ['a Lazy.Atomic_repeating.t] can be used, as + long as the fact that duplications are repeated is acceptable. + + {b Warning}: ['a Lazy.t] contains a protection against recursively + forcing a thunk, it will raise {!Undefined}. On the other hand, + ['a Lazy.Atomic_repeating.t] will recursively repeat the computation, + which may loop. + + See {{!examples} the examples} below. + *) + + type 'a t + (* A value of type ['a Lazy.Atomic_repeating.t] is similar to a value + of type ['a Lazy.t], it represents a deferred computation, but it can + safely be used in concurrent settings. + + If a calling domain attempts to {!force} a value that is already + being forced, the calling domain is not suspended. Instead, the + computation of the value will be repeated on the calling + domain. In other words, [Atomic_repeating.t] can duplicate + computations. + + The implementation ensures that all call to {!force} return the + same value or raise the same exception: if a repeated terminates + on a result, its value or exception will be discarded. + *) + + val from_val : 'a -> 'a t + (** [from_val v] is a deferred computation which is already + finished and whose result is the value [v]. *) + + val from_fun : ?discard:('a -> unit) -> (unit -> 'a) -> 'a t + (** [from_fun ?discard f] is a deferred computation that will call + [f] when forced. Note that [f] may be called several times + in the case of concurrent races. + + If [f] is called several times, one result will be stored as the + result of this computation. Other values computed concurrently + will be discarded, after being passed to the [discard] function + (a no-op by default). On the other hand, exceptions raised by + concurrent computations will be re-raised, as well as exceptions + raised by [discard]. +*) + + val force : 'a t -> 'a + (** [force x] forces the suspension [x]. If [x] has already been + forced, [Lazy.force x] returns the same value again without + recomputing it. If it raised an exception, the same exception is + raised again. + + If there is a race between several calls to [force], the + computation may be repeated several times. If some of them fail + with an exception, they will re-raise it; but all those that + return a value will return the same value. *) + + + (** {1:examples Examples} + + A typical use-case is to initialize some library-local + state that is used by library functions. + + {[ + let config = Lazy.Atomic_repeating.from_fun (fun () -> + match Sys.getenv "MYLIB_CONFIG_PATH" with + | exception _ -> Config.default () + | path -> Config.read_from_path path + ) + ]} + + The environment access and file read may be repeated several + times in the case of concurrent forcing, but the "first" + configuration to be computed will be returned by all callers. + + {3:examples_discard Using the [?discard] parameter.} + + The [?discard] argument is useful to release resources if + a repeated result is discarded. + + {[ + let log_file_and_channel = + let acquire () = + match Sys.getenv "MYLIB_LOG_PATH" with + | exception _ -> + let path, chan = Filename.open_temp_file "mylib" ".log" in + (`Temp path), chan + | path -> + let chan = Out_channel.open_bin path in + (`User path), chan + in + let discard (source, chan) = + Out_channel.close chan; + match source with + | `User _ -> () + | `Temp path -> Sys.remove path + in + Lazy.Atomic_repeating.from_fun ~discard acquire + ]} + + {3:examples_sync User synchronization} + + Users of this module can add their own synchronization logic to + avoid repeated computations. For example, in an application + which uses threads and mutex: + + {[ + let entropy = + (* we use a mibibyte of random data from /dev/urandom *) + let init_mutex = Mutex.create () in + let result = ref None in + Lazy.Atomic_repeating.from_fun (fun () -> + Mutex.protect init_mutex (fun () -> + match !result with + | Some v -> v + | None -> + let v = + In_channel.with_open_bin "/dev/urandom" (fun chan -> + In_channel.really_input_string chan (1024 * 1024) + ) + in + result := Some v; + v + ) + ) + ]} + + A program using this definition will open "/dev/urandom" at most + once. Note that the mutex is only taken on [force] calls that + happen while the initialization is not yet finished -- typically + the first call, or possibly several concurrent first calls. Once + initialization is finished, the value will be returned directly. + *) +end diff --git a/testsuite/tests/lib-lazy/repeating.ml b/testsuite/tests/lib-lazy/repeating.ml new file mode 100644 index 000000000000..2796f8d6c927 --- /dev/null +++ b/testsuite/tests/lib-lazy/repeating.ml @@ -0,0 +1,152 @@ +(* TEST + expect; +*) + +module LazyAR = Lazy.Atomic_repeating +[%%expect{| +module LazyAR = Lazy.Atomic_repeating +|}] + +(* direct value return *) +let it = + let v = LazyAR.from_val 42 in + (LazyAR.force v, LazyAR.force v) +[%%expect{| +val it : int * int = (42, 42) +|}] + +(* value return *) +let it = + let v = LazyAR.from_fun (fun () -> 43) in + (LazyAR.force v, LazyAR.force v) +[%%expect{| +val it : int * int = (43, 43) +|}] + + +(* exception case *) +let it = + let fail = LazyAR.from_fun (fun () -> raise Exit) in + let check () = match LazyAR.force fail with + | exception Exit -> true + | exception _ | _ -> false + in + check () && check () +[%%expect{| +val it : bool = true +|}] + +(* sharing check *) +let it = + let r = ref 0 in + let test = LazyAR.from_fun (fun () -> incr r) in + (* side-effects must not be repeated in sequential settings. *) + LazyAR.force test; + LazyAR.force test; + if !r = 1 then Ok () else Error !r +[%%expect{| +val it : (unit, int) result = Ok () +|}] + +(* Fake concurrency tests : we can use reentrancy to emulate concurrency. *) +let it = + let step = ref 0 in + let thunk = + let self = ref (LazyAR.from_fun (fun () -> 500)) in + self := begin + (* The first call to reach !step = 100 will finish with the value 0. + Other calls will finish with higher values, but those will be discarded. *) + let discard n = + if n = 0 then prerr_endline "Discard error!" + in + LazyAR.from_fun ~discard (fun () -> + if !step >= 100 then 0 + else (incr step; LazyAR.force !self + 1) + ) + end; + !self + in + let result1 = LazyAR.force thunk in + let result2 = LazyAR.force thunk in + if result1 = 0 && result2 = 0 && !step = 100 + then Ok () + else Error (~result1, ~result2, ~step:!step) +[%%expect{| +val it : (unit, result1:int * result2:int * step:int) result = Ok () +|}] + + +(* Check that the documentation examples are well-typed. *) +module Example1 (Config : sig + type t + val default : unit -> t + val read_from_path : string -> t +end) = struct + let config = Lazy.Atomic_repeating.from_fun (fun () -> + match Sys.getenv "MYLIB_CONFIG_PATH" with + | exception _ -> Config.default () + | path -> Config.read_from_path path + ) +end +[%%expect{| +module Example1 : + (Config : sig + type t + val default : unit -> t + val read_from_path : string -> t + end) + -> sig val config : Config.t Lazy.Atomic_repeating.t end +|}] + +module Example2 () = struct + let log_file_and_channel = + let acquire () = + match Sys.getenv "MYLIB_LOG_PATH" with + | exception _ -> + let path, chan = Filename.open_temp_file "mylib" ".log" in + (`Temp path), chan + | path -> + let chan = Out_channel.open_bin path in + (`User path), chan + in + let discard (source, chan) = + Out_channel.close chan; + match source with + | `User _ -> () + | `Temp path -> Sys.remove path + in + Lazy.Atomic_repeating.from_fun ~discard acquire +end +[%%expect{| +module Example2 : + () -> + sig + val log_file_and_channel : + ([ `Temp of string | `User of string ] * Out_channel.t) + Lazy.Atomic_repeating.t + end +|}] + +module Example3 = struct + let entropy = + (* we use a mibibyte of random data from /dev/urandom *) + let init_mutex = Mutex.create () in + let result = ref None in + Lazy.Atomic_repeating.from_fun (fun () -> + Mutex.protect init_mutex (fun () -> + match !result with + | Some v -> v + | None -> + let v = + In_channel.with_open_bin "/dev/urandom" (fun chan -> + In_channel.really_input_string chan (1024 * 1024) + ) + in + result := Some v; + v + ) + ) +end +[%%expect {| +module Example3 : sig val entropy : string option Lazy.Atomic_repeating.t end +|}]