Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/gleam/erlang/process.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,11 @@ pub fn receive(
from subject: Subject(message),
within timeout: Int,
) -> Result(message, Nil) {
case subject {
NamedSubject(..) -> perform_receive(subject, timeout)
Subject(owner:, ..) ->
case owner == self() {
True -> perform_receive(subject, timeout)
False ->
panic as "Cannot receive with a subject owned by another process"
}
let self = self()
case subject_owner(subject) {
Ok(pid) if pid == self -> perform_receive(subject, timeout)
_ ->
panic as "Cannot receive with a subject owned by another process"
}
}

Expand All @@ -263,8 +260,19 @@ fn perform_receive(
/// Receive a message that has been sent to current process using the `Subject`.
///
/// Same as `receive` but waits forever and returns the message as is.
pub fn receive_forever(from subject: Subject(message)) -> message {
let self = self()
case subject_owner(subject) {
Ok(pid) if pid == self -> perform_receive_forever(subject)
_ ->
panic as "Cannot receive with a subject owned by another process"
}
}

@external(erlang, "gleam_erlang_ffi", "receive")
pub fn receive_forever(from subject: Subject(message)) -> message
fn perform_receive_forever(from subject: Subject(message)) -> message



/// A type that enables a process to wait for messages from multiple `Subject`s
/// at the same time, returning whichever message arrives first.
Expand Down
73 changes: 73 additions & 0 deletions test/gleam/erlang/process_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ pub fn receive_forever_test() {
let assert 2 = process.receive_forever(subject)
}

pub fn receive_forever_other_test() {
let subject = process.new_subject()
process.spawn(fn() { process.send(subject, process.new_subject()) })
let subject = process.receive_forever(subject)

assert assert_panic(fn() { process.receive_forever(subject) })
== "Cannot receive with a subject owned by another process"
}

pub fn is_alive_test() {
let pid = process.spawn_unlinked(fn() { Nil })
process.sleep(5)
Expand Down Expand Up @@ -666,3 +675,67 @@ pub fn name_test() {
let assert Ok("Hello") = process.receive(subject, 0)
process.unregister(name)
}

pub fn name_other_test() {
let name = process.new_name("name")
let pid = process.spawn_unlinked(fn() { process.sleep(10) })
let assert Ok(_) = process.register(pid, name)
let subject = process.named_subject(name)
process.send(subject, "Hello")
assert assert_panic(fn() { process.receive(subject, 5) })
== "Cannot receive with a subject owned by another process"
process.unregister(name)
}


// tests that sending/receiving on names fail if the name gets unregistered
pub fn unregister_receive_test() {
let name = process.new_name("name")
let assert Ok(_) = process.register(process.self(), name)
let subject = process.named_subject(name)

process.send(subject, "Hello")
let assert Ok("Hello") = process.receive(subject, 0)

let assert Ok(_) = process.unregister(name)

assert assert_panic(fn() { process.send(subject, "Hello") })
== "Sending to unregistered name"
assert assert_panic(fn() { process.receive(subject, 0) })
== "Cannot receive with a subject owned by another process" // this error message could be better
}

// tests that receiving on names fail if the registration gets moved during execution
pub fn name_other_switchover_test() {
let name = process.new_name("name")
let self = process.self()
let assert Ok(_) = process.register(self, name)
let subject = process.named_subject(name)

assert Ok(self) == process.subject_owner(subject)

process.send(subject, "Hello")
let assert Ok("Hello") = process.receive(subject, 0)

let pid = process.spawn(fn() {
process.sleep(10)
let assert Ok(_) = process.unregister(name) // "undesirable behaviour"
let assert Ok(_) = process.register(process.self(), name)

process.sleep(20)
let assert Ok("Peace") = process.receive(subject, 0)
})

assert Ok(self) == process.subject_owner(subject)

process.send(subject, "World")
let assert Ok("World") = process.receive(subject, 0)

assert Ok(self) == process.subject_owner(subject)
process.sleep(12)
assert Ok(pid) == process.subject_owner(subject)

process.send(subject, "Peace")
assert assert_panic(fn() { process.receive(subject, 0) })
== "Cannot receive with a subject owned by another process"
}
Loading