Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions src/gleam/erlang/process.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub fn spawn(running: fn() -> anything) -> Pid
@external(erlang, "proc_lib", "spawn")
pub fn spawn_unlinked(a: fn() -> anything) -> Pid

type Alias

/// A `Subject` is a value that processes can use to send and receive messages
/// to and from each other in a well typed way.
///
Expand All @@ -78,6 +80,7 @@ pub fn spawn_unlinked(a: fn() -> anything) -> Pid
pub opaque type Subject(message) {
Subject(owner: Pid, tag: Dynamic)
NamedSubject(name: Name(message))
PrioritySubject(owner: Pid, alias: Alias, tag: Dynamic)
}

/// Create a subject for the given process with the give tag. This is unsafe!
Expand All @@ -91,6 +94,27 @@ pub fn unsafely_create_subject(owner: Pid, tag: Dynamic) -> Subject(message) {
Subject(owner, tag)
}

@external(erlang, "gleam_erlang_ffi", "new_priority_alias")
fn new_priority_alias() -> Alias

/// Create a subject that sends and receives priority messages.
/// Priority messages will be inserted before regular messages,
/// after other priority messages.
///
/// Warning: OTP 28 and above is required for this feature.
/// Warning: Priority messages are intended to solve very specific problems.
/// You very seldom need to resort to usage of priority messages. Receiving
/// processes have not been optimized for handling large amounts of priority
/// messages.
///
pub fn priority_subject() -> Subject(message) {
PrioritySubject(
owner: self(),
alias: new_priority_alias(),
tag: reference_to_dynamic(reference.new()),
)
}

/// A name is an identity that a process can adopt, after which they will receive
/// messages sent to that name. This has two main advantages:
///
Expand Down Expand Up @@ -150,6 +174,7 @@ pub fn subject_name(subject: Subject(message)) -> Result(Name(message), Nil) {
case subject {
NamedSubject(name:) -> Ok(name)
Subject(..) -> Error(Nil)
PrioritySubject(..) -> Error(Nil)
}
}

Expand All @@ -169,6 +194,7 @@ pub fn subject_owner(subject: Subject(message)) -> Result(Pid, Nil) {
case subject {
NamedSubject(name) -> named(name)
Subject(pid, _) -> Ok(pid)
PrioritySubject(owner:, ..) -> Ok(owner)
}
}

Expand All @@ -177,6 +203,9 @@ type DoNotLeak
@external(erlang, "erlang", "send")
fn raw_send(a: Pid, b: message) -> DoNotLeak

@external(erlang, "gleam_erlang_ffi", "send_priority")
fn raw_send_alias(a: Alias, b: message) -> DoNotLeak

/// Send a message to a process using a `Subject`. The message must be of the
/// type that the `Subject` accepts.
///
Expand Down Expand Up @@ -220,6 +249,8 @@ pub fn send(subject: Subject(message), message: message) -> Nil {
let assert Ok(pid) = named(name) as "Sending to unregistered name"
raw_send(pid, #(name, message))
}
PrioritySubject(owner: _, alias:, tag:) ->
raw_send_alias(alias, #(tag, message))
}
Nil
}
Expand Down Expand Up @@ -249,7 +280,7 @@ pub fn receive(
within timeout: Int,
) -> Result(message, Nil) {
case subject {
NamedSubject(..) -> perform_receive(subject, timeout)
NamedSubject(..) | PrioritySubject(..) -> perform_receive(subject, timeout)
Subject(owner:, ..) ->
case owner == self() {
True -> perform_receive(subject, timeout)
Expand Down Expand Up @@ -416,7 +447,8 @@ pub fn select_map(
let handler = fn(message: #(Reference, message)) { transform(message.1) }
case subject {
NamedSubject(name) -> insert_selector_handler(selector, #(name, 2), handler)
Subject(_, tag:) -> insert_selector_handler(selector, #(tag, 2), handler)
Subject(_, tag:) | PrioritySubject(tag:, ..) ->
insert_selector_handler(selector, #(tag, 2), handler)
}
}

Expand All @@ -429,7 +461,8 @@ pub fn deselect(
) -> Selector(payload) {
case subject {
NamedSubject(name) -> remove_selector_handler(selector, #(name, 2))
Subject(_, tag:) -> remove_selector_handler(selector, #(tag, 2))
Subject(_, tag:) | PrioritySubject(tag:, ..) ->
remove_selector_handler(selector, #(tag, 2))
}
}

Expand Down Expand Up @@ -752,6 +785,8 @@ pub fn send_after(subject: Subject(msg), delay: Int, message: msg) -> Timer {
case subject {
NamedSubject(name) -> name_send_after(delay, name, #(name, message))
Subject(owner, tag) -> pid_send_after(delay, owner, #(tag, message))
PrioritySubject(..) ->
panic as "send_after does not support priority aliases"
}
}

Expand Down
18 changes: 17 additions & 1 deletion src/gleam_erlang_ffi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
select/2, trap_exits/1, map_selector/2, merge_selector/2, flush_messages/0,
priv_directory/1, connect_node/1, register_process/2, unregister_process/1,
process_named/1, identity/1, 'receive'/1, 'receive'/2, new_name/1,
cast_down_message/1, cast_exit_reason/1
cast_down_message/1, cast_exit_reason/1, new_priority_alias/0, send_priority/2
]).

-spec atom_from_string(binary()) -> {ok, atom()} | {error, nil}.
Expand Down Expand Up @@ -86,6 +86,10 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) ->
'receive'({named_subject, Name}) ->
receive
{Name, Message} -> Message
end;
'receive'({priority_subject, _Pid, _Alias, Ref}) ->
receive
{Ref, Message} -> Message
end.

'receive'({subject, _Pid, Ref}, Timeout) ->
Expand All @@ -99,6 +103,12 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) ->
{Name, Message} -> {ok, Message}
after Timeout ->
{error, nil}
end;
'receive'({priority_subject, _Pid, _Alias, Ref}, Timeout) ->
receive
{Ref, Message} -> {ok, Message}
after Timeout ->
{error, nil}
end.

demonitor(Reference) ->
Expand Down Expand Up @@ -162,3 +172,9 @@ process_named(Name) ->

identity(X) ->
X.

new_priority_alias() ->
erlang:alias([priority]).

send_priority(Alias, Message) ->
erlang:send(Alias, Message, [priority]).
15 changes: 15 additions & 0 deletions test/gleam/erlang/process_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -666,3 +666,18 @@ pub fn name_test() {
let assert Ok("Hello") = process.receive(subject, 0)
process.unregister(name)
}

pub fn subject_priority_test() {
let subject = process.new_subject()
let priority_subject = process.priority_subject()

process.send(subject, 123)
process.sleep(100)
process.send(priority_subject, 321)
let selector =
process.new_selector()
|> process.select(subject)
|> process.select(priority_subject)

assert process.selector_receive_forever(selector) == 321
}