Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ Working version
(Nicolás Ojeda Bär, review by Daniel Bünzli, Gabriel Scherer and David
Allsopp)

- #14043: Lazy.Atomic_repeating.t: concurrency-safe lazy thunks that may
repeat initialization computation on forcing races.
(Gabriel Scherer,
review by KC Sivaramakrishnan and Pierre Chambart and ??)

### Other libraries:

- #14020: Add Unix.unsetenv.
Expand Down
2 changes: 2 additions & 0 deletions stdlib/.depend
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,12 @@ stdlib__Int64.cmi : int64.mli
stdlib__Lazy.cmo : lazy.ml \
stdlib__Obj.cmi \
camlinternalLazy.cmi \
stdlib__Atomic.cmi \
stdlib__Lazy.cmi
stdlib__Lazy.cmx : lazy.ml \
stdlib__Obj.cmx \
camlinternalLazy.cmx \
stdlib__Atomic.cmx \
stdlib__Lazy.cmi
stdlib__Lazy.cmi : lazy.mli \
camlinternalLazy.cmi
Expand Down
101 changes: 101 additions & 0 deletions stdlib/lazy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,104 @@ let map_val f x =
if is_val x
then from_val (f (force x))
else lazy (f (force x))



module Atomic_repeating = struct
(* we define these as primitives to avoid a dependency on Printexc *)
type raw_backtrace
external get_raw_backtrace:
unit -> raw_backtrace = "caml_get_exception_raw_backtrace"
external raise_with_backtrace: exn -> raw_backtrace -> 'a
= "%raise_with_backtrace"

type 'a ops = {
make : unit -> 'a;
wait : unit -> unit;
broadcast : unit -> unit
}

type race_behaviour =
| Busy_wait
| Synchronise of { wait : unit -> unit; broadcast : unit -> unit }
| Fail

type 'a state =
| Thunk of 'a ops
| Forcing of 'a ops
| Val of 'a
| Failed of exn * raw_backtrace

type 'a t = 'a state Atomic.t

let from_val v = Atomic.make (Val v)
let from_fun ?(race_behaviour = Fail) f =
let wait, broadcast =
match race_behaviour with
| Busy_wait ->
Fun.id, Fun.id
| Synchronise { wait; broadcast } -> wait, broadcast
| Fail -> (fun () -> raise Undefined), Fun.id
in
Atomic.make (Thunk { make = f; wait; broadcast })

let rec force th =
match Atomic.get th with
| Val v -> v
| Failed (exn, bt) ->
raise_with_backtrace exn bt
| (Thunk ops) as thunk ->
(* [compare_and_set] returns [false] when another domain has
set the thunk to [Forcing] or a finished state. *)
if Atomic.compare_and_set th thunk (Forcing ops)
then begin
match ops.make () with
| exception exn ->
let bt = get_raw_backtrace () in
let failed = Failed (exn, bt) in
(* [compare_and_set] cannot return false, as only the thread that
managed to set to forcing can try to update it again. *)
ignore (Atomic.compare_and_set th forcing failed);
ops.broadcast ();
raise_with_backtrace exn bt
| v ->
(* [compare_and_set] cannot return false, as only the thread that
managed to set to forcing can try to update it again. *)
ignore (Atomic.compare_and_set th forcing (Val v));
v
end
else force th
| (Forcing ops) as forcing ->
ops.wait ();
force th

let rec force_non_blocking th =
match Atomic.get th with
| Val v -> Some v
| Failed (exn, bt) ->
raise_with_backtrace exn bt
| (Thunk ops) as thunk ->
(* [compare_and_set] returns [false] when another domain has
set the thunk to [Forcing] or a finished state. *)
if Atomic.compare_and_set th thunk (Forcing ops)
then begin
match ops.make () with
| exception exn ->
let bt = get_raw_backtrace () in
let failed = Failed (exn, bt) in
(* [compare_and_set] cannot return false, as only the thread that
managed to set to forcing can try to update it again. *)
ignore (Atomic.compare_and_set th forcing failed);
ops.broadcast ();
raise_with_backtrace exn bt
| v ->
(* [compare_and_set] cannot return false, as only the thread that
managed to set to forcing can try to update it again. *)
ignore (Atomic.compare_and_set th forcing (Val v));
ops.broadcast ();
Some v
end
else force th
| (Forcing ops) as forcing ->
None
end
152 changes: 152 additions & 0 deletions stdlib/lazy.mli
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,155 @@ val force_val : 'a t -> 'a

@raise Undefined (see {!Undefined}).
*)

module Atomic_repeating : sig
(** Atomic, repeating deferred computations.

This implementation is less optimized than [Lazy.t], but it can
be used in a concurrent setting.

OCaml domains do not provide a common abstraction to block on
another computation. Forcing an [Atomic_repeating.t] thunk does
not block when races happen, instead it may repeat the
computation of the result several times. We do guarantee that
if two calls to {!force} on the same suspended computation return
a value, then they return the same value, even in presence of
forcing races.

A typical use-case for atomic, repeating deferred computations is
optional library initialization code that is moderately
expensive, or acquires resources. The library author does not
want to do this work on startup, because it may not be needed,
but using ['a Lazy.t] is incorrect if the library may be used in
concurrent settings. ['a Lazy.Atomic_repeating.t] can be used, as
long as the fact that duplications are repeated is acceptable.

{b Warning}: ['a Lazy.t] contains a protection against recursively
forcing a thunk, it will raise {!Undefined}. On the other hand,
['a Lazy.Atomic_repeating.t] will recursively repeat the computation,
which may loop.

See {{!examples} the examples} below.
*)

type 'a t
(* A value of type ['a Lazy.Atomic_repeating.t] is similar to a value
of type ['a Lazy.t], it represents a deferred computation, but it can
safely be used in concurrent settings.

If a calling domain attempts to {!force} a value that is already
being forced, the calling domain is not suspended. Instead, the
computation of the value will be repeated on the calling
domain. In other words, [Atomic_repeating.t] can duplicate
computations.

The implementation ensures that all call to {!force} return the
same value or raise the same exception: if a repeated terminates
on a result, its value or exception will be discarded.
*)

val from_val : 'a -> 'a t
(** [from_val v] is a deferred computation which is already
finished and whose result is the value [v]. *)

val from_fun : ?discard:('a -> unit) -> (unit -> 'a) -> 'a t
(** [from_fun ?discard f] is a deferred computation that will call
[f] when forced. Note that [f] may be called several times
in the case of concurrent races.

If [f] is called several times, one result will be stored as the
result of this computation. Other values computed concurrently
will be discarded, after being passed to the [discard] function
(a no-op by default). On the other hand, exceptions raised by
concurrent computations will be re-raised, as well as exceptions
raised by [discard].
*)

val force : 'a t -> 'a
(** [force x] forces the suspension [x]. If [x] has already been
forced, [Lazy.force x] returns the same value again without
recomputing it. If it raised an exception, the same exception is
raised again.

If there is a race between several calls to [force], the
computation may be repeated several times. If some of them fail
with an exception, they will re-raise it; but all those that
return a value will return the same value. *)


(** {1:examples Examples}

A typical use-case is to initialize some library-local
state that is used by library functions.

{[
let config = Lazy.Atomic_repeating.from_fun (fun () ->
match Sys.getenv "MYLIB_CONFIG_PATH" with
| exception _ -> Config.default ()
| path -> Config.read_from_path path
)
]}

The environment access and file read may be repeated several
times in the case of concurrent forcing, but the "first"
configuration to be computed will be returned by all callers.

{3:examples_discard Using the [?discard] parameter.}

The [?discard] argument is useful to release resources if
a repeated result is discarded.

{[
let log_file_and_channel =
let acquire () =
match Sys.getenv "MYLIB_LOG_PATH" with
| exception _ ->
let path, chan = Filename.open_temp_file "mylib" ".log" in
(`Temp path), chan
| path ->
let chan = Out_channel.open_bin path in
(`User path), chan
in
let discard (source, chan) =
Out_channel.close chan;
match source with
| `User _ -> ()
| `Temp path -> Sys.remove path
in
Lazy.Atomic_repeating.from_fun ~discard acquire
]}

{3:examples_sync User synchronization}

Users of this module can add their own synchronization logic to
avoid repeated computations. For example, in an application
which uses threads and mutex:

{[
let entropy =
(* we use a mibibyte of random data from /dev/urandom *)
let init_mutex = Mutex.create () in
let result = ref None in
Lazy.Atomic_repeating.from_fun (fun () ->
Mutex.protect init_mutex (fun () ->
match !result with
| Some v -> v
| None ->
let v =
In_channel.with_open_bin "/dev/urandom" (fun chan ->
In_channel.really_input_string chan (1024 * 1024)
)
in
result := Some v;
v
)
)
]}

A program using this definition will open "/dev/urandom" at most
once. Note that the mutex is only taken on [force] calls that
happen while the initialization is not yet finished -- typically
the first call, or possibly several concurrent first calls. Once
initialization is finished, the value will be returned directly.
*)
end
Loading