diff --git a/src/gleam/erlang/process.gleam b/src/gleam/erlang/process.gleam index 35df254..5845b12 100644 --- a/src/gleam/erlang/process.gleam +++ b/src/gleam/erlang/process.gleam @@ -52,6 +52,8 @@ pub fn spawn(running: fn() -> anything) -> Pid @external(erlang, "proc_lib", "spawn") pub fn spawn_unlinked(a: fn() -> anything) -> Pid +type Alias + /// A `Subject` is a value that processes can use to send and receive messages /// to and from each other in a well typed way. /// @@ -78,6 +80,7 @@ pub fn spawn_unlinked(a: fn() -> anything) -> Pid pub opaque type Subject(message) { Subject(owner: Pid, tag: Dynamic) NamedSubject(name: Name(message)) + PrioritySubject(owner: Pid, alias: Alias, tag: Dynamic) } /// Create a subject for the given process with the give tag. This is unsafe! @@ -91,6 +94,27 @@ pub fn unsafely_create_subject(owner: Pid, tag: Dynamic) -> Subject(message) { Subject(owner, tag) } +@external(erlang, "gleam_erlang_ffi", "new_priority_alias") +fn new_priority_alias() -> Alias + +/// Create a subject that sends and receives priority messages. +/// Priority messages will be inserted before regular messages, +/// after other priority messages. +/// +/// Warning: OTP 28 and above is required for this feature. +/// Warning: Priority messages are intended to solve very specific problems. +/// You very seldom need to resort to usage of priority messages. Receiving +/// processes have not been optimized for handling large amounts of priority +/// messages. +/// +pub fn priority_subject() -> Subject(message) { + PrioritySubject( + owner: self(), + alias: new_priority_alias(), + tag: reference_to_dynamic(reference.new()), + ) +} + /// A name is an identity that a process can adopt, after which they will receive /// messages sent to that name. This has two main advantages: /// @@ -150,6 +174,7 @@ pub fn subject_name(subject: Subject(message)) -> Result(Name(message), Nil) { case subject { NamedSubject(name:) -> Ok(name) Subject(..) -> Error(Nil) + PrioritySubject(..) -> Error(Nil) } } @@ -169,6 +194,7 @@ pub fn subject_owner(subject: Subject(message)) -> Result(Pid, Nil) { case subject { NamedSubject(name) -> named(name) Subject(pid, _) -> Ok(pid) + PrioritySubject(owner:, ..) -> Ok(owner) } } @@ -177,6 +203,9 @@ type DoNotLeak @external(erlang, "erlang", "send") fn raw_send(a: Pid, b: message) -> DoNotLeak +@external(erlang, "gleam_erlang_ffi", "send_priority") +fn raw_send_alias(a: Alias, b: message) -> DoNotLeak + /// Send a message to a process using a `Subject`. The message must be of the /// type that the `Subject` accepts. /// @@ -220,6 +249,8 @@ pub fn send(subject: Subject(message), message: message) -> Nil { let assert Ok(pid) = named(name) as "Sending to unregistered name" raw_send(pid, #(name, message)) } + PrioritySubject(owner: _, alias:, tag:) -> + raw_send_alias(alias, #(tag, message)) } Nil } @@ -249,7 +280,7 @@ pub fn receive( within timeout: Int, ) -> Result(message, Nil) { case subject { - NamedSubject(..) -> perform_receive(subject, timeout) + NamedSubject(..) | PrioritySubject(..) -> perform_receive(subject, timeout) Subject(owner:, ..) -> case owner == self() { True -> perform_receive(subject, timeout) @@ -416,7 +447,8 @@ pub fn select_map( let handler = fn(message: #(Reference, message)) { transform(message.1) } case subject { NamedSubject(name) -> insert_selector_handler(selector, #(name, 2), handler) - Subject(_, tag:) -> insert_selector_handler(selector, #(tag, 2), handler) + Subject(_, tag:) | PrioritySubject(tag:, ..) -> + insert_selector_handler(selector, #(tag, 2), handler) } } @@ -429,7 +461,8 @@ pub fn deselect( ) -> Selector(payload) { case subject { NamedSubject(name) -> remove_selector_handler(selector, #(name, 2)) - Subject(_, tag:) -> remove_selector_handler(selector, #(tag, 2)) + Subject(_, tag:) | PrioritySubject(tag:, ..) -> + remove_selector_handler(selector, #(tag, 2)) } } @@ -752,6 +785,8 @@ pub fn send_after(subject: Subject(msg), delay: Int, message: msg) -> Timer { case subject { NamedSubject(name) -> name_send_after(delay, name, #(name, message)) Subject(owner, tag) -> pid_send_after(delay, owner, #(tag, message)) + PrioritySubject(..) -> + panic as "send_after does not support priority aliases" } } diff --git a/src/gleam_erlang_ffi.erl b/src/gleam_erlang_ffi.erl index b1ad710..e73b64e 100644 --- a/src/gleam_erlang_ffi.erl +++ b/src/gleam_erlang_ffi.erl @@ -5,7 +5,7 @@ select/2, trap_exits/1, map_selector/2, merge_selector/2, flush_messages/0, priv_directory/1, connect_node/1, register_process/2, unregister_process/1, process_named/1, identity/1, 'receive'/1, 'receive'/2, new_name/1, - cast_down_message/1, cast_exit_reason/1 + cast_down_message/1, cast_exit_reason/1, new_priority_alias/0, send_priority/2 ]). -spec atom_from_string(binary()) -> {ok, atom()} | {error, nil}. @@ -86,6 +86,10 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> 'receive'({named_subject, Name}) -> receive {Name, Message} -> Message + end; +'receive'({priority_subject, _Pid, _Alias, Ref}) -> + receive + {Ref, Message} -> Message end. 'receive'({subject, _Pid, Ref}, Timeout) -> @@ -99,6 +103,12 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> {Name, Message} -> {ok, Message} after Timeout -> {error, nil} + end; +'receive'({priority_subject, _Pid, _Alias, Ref}, Timeout) -> + receive + {Ref, Message} -> {ok, Message} + after Timeout -> + {error, nil} end. demonitor(Reference) -> @@ -162,3 +172,9 @@ process_named(Name) -> identity(X) -> X. + +new_priority_alias() -> + erlang:alias([priority]). + +send_priority(Alias, Message) -> + erlang:send(Alias, Message, [priority]). diff --git a/test/gleam/erlang/process_test.gleam b/test/gleam/erlang/process_test.gleam index 1a86c2b..7e7c01b 100644 --- a/test/gleam/erlang/process_test.gleam +++ b/test/gleam/erlang/process_test.gleam @@ -666,3 +666,18 @@ pub fn name_test() { let assert Ok("Hello") = process.receive(subject, 0) process.unregister(name) } + +pub fn subject_priority_test() { + let subject = process.new_subject() + let priority_subject = process.priority_subject() + + process.send(subject, 123) + process.sleep(100) + process.send(priority_subject, 321) + let selector = + process.new_selector() + |> process.select(subject) + |> process.select(priority_subject) + + assert process.selector_receive_forever(selector) == 321 +}