From cfc21c6402cef79ccb9765e2c88ea0a9c4552616 Mon Sep 17 00:00:00 2001 From: m Date: Tue, 23 Sep 2025 12:49:13 -0700 Subject: [PATCH 1/6] Err on receive() on other-owned named process & forever --- src/gleam/erlang/process.gleam | 26 +++++++++++++++++--------- test/gleam/erlang/process_test.gleam | 20 ++++++++++++++++++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/gleam/erlang/process.gleam b/src/gleam/erlang/process.gleam index 7172233..dd85488 100644 --- a/src/gleam/erlang/process.gleam +++ b/src/gleam/erlang/process.gleam @@ -243,14 +243,11 @@ pub fn receive( from subject: Subject(message), within timeout: Int, ) -> Result(message, Nil) { - case subject { - NamedSubject(..) -> perform_receive(subject, timeout) - Subject(owner:, ..) -> - case owner == self() { - True -> perform_receive(subject, timeout) - False -> - panic as "Cannot receive with a subject owned by another process" - } + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive(subject, timeout) + _ -> + panic as "Cannot receive with a subject owned by another process" } } @@ -263,8 +260,19 @@ fn perform_receive( /// Receive a message that has been sent to current process using the `Subject`. /// /// Same as `receive` but waits forever and returns the message as is. +pub fn receive_forever(from subject: Subject(message)) -> message { + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive_forever(subject) + _ -> + panic as "Cannot receive with a subject owned by another process" + } +} + @external(erlang, "gleam_erlang_ffi", "receive") -pub fn receive_forever(from subject: Subject(message)) -> message +fn perform_receive_forever(from subject: Subject(message)) -> message + + /// A type that enables a process to wait for messages from multiple `Subject`s /// at the same time, returning whichever message arrives first. diff --git a/test/gleam/erlang/process_test.gleam b/test/gleam/erlang/process_test.gleam index 1a86c2b..c418855 100644 --- a/test/gleam/erlang/process_test.gleam +++ b/test/gleam/erlang/process_test.gleam @@ -112,6 +112,15 @@ pub fn receive_forever_test() { let assert 2 = process.receive_forever(subject) } +pub fn receive_forever_other_test() { + let subject = process.new_subject() + process.spawn(fn() { process.send(subject, process.new_subject()) }) + let subject = process.receive_forever(subject) + + assert assert_panic(fn() { process.receive_forever(subject) }) + == "Cannot receive with a subject owned by another process" +} + pub fn is_alive_test() { let pid = process.spawn_unlinked(fn() { Nil }) process.sleep(5) @@ -666,3 +675,14 @@ pub fn name_test() { let assert Ok("Hello") = process.receive(subject, 0) process.unregister(name) } + +pub fn name_other_test() { + let name = process.new_name("name") + let pid = process.spawn_unlinked(fn() { process.sleep(10) }) + let assert Ok(_) = process.register(pid, name) + let subject = process.named_subject(name) + process.send(subject, "Hello") + assert assert_panic(fn() { process.receive(subject, 5) }) + == "Cannot receive with a subject owned by another process" + process.unregister(name) +} From 0d66f8e40701a6593b3f50c177a9701be3cd6c94 Mon Sep 17 00:00:00 2001 From: m Date: Tue, 23 Sep 2025 14:05:05 -0700 Subject: [PATCH 2/6] Add even more tests surrounding named send/receive --- test/gleam/erlang/process_test.gleam | 53 ++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/test/gleam/erlang/process_test.gleam b/test/gleam/erlang/process_test.gleam index c418855..4faab9c 100644 --- a/test/gleam/erlang/process_test.gleam +++ b/test/gleam/erlang/process_test.gleam @@ -686,3 +686,56 @@ pub fn name_other_test() { == "Cannot receive with a subject owned by another process" process.unregister(name) } + + +// tests that sending/receiving on names fail if the name gets unregistered +pub fn unregister_receive_test() { + let name = process.new_name("name") + let assert Ok(_) = process.register(process.self(), name) + let subject = process.named_subject(name) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let assert Ok(_) = process.unregister(name) + + assert assert_panic(fn() { process.send(subject, "Hello") }) + == "Sending to unregistered name" + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" // this error message could be better +} + +// tests that receiving on names fail if the registration gets moved during execution +pub fn name_other_switchover_test() { + let name = process.new_name("name") + let self = process.self() + let assert Ok(_) = process.register(self, name) + let subject = process.named_subject(name) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let pid = process.spawn(fn() { + process.sleep(10) + let assert Ok(_) = process.unregister(name) // "undesirable behaviour" + let assert Ok(_) = process.register(process.self(), name) + + process.sleep(20) + let assert Ok("Peace") = process.receive(subject, 0) + }) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "World") + let assert Ok("World") = process.receive(subject, 0) + + assert Ok(self) == process.subject_owner(subject) + process.sleep(12) + assert Ok(pid) == process.subject_owner(subject) + + process.send(subject, "Peace") + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" +} From 3d1d8f4ca8cc126cdbdf1274e5c9a36021039677 Mon Sep 17 00:00:00 2001 From: m Date: Mon, 22 Sep 2025 16:53:10 -0700 Subject: [PATCH 3/6] Add MVP port stuff Fixes #89 port and process are still inextricably coupled, most of the complex things I would want to do already require process' recieve/subject tooling. the recursive dependencies are really a bitch too, I would much rather prefer `port` module define a lot of this stuff This integrates a lot of the port stuff into process.gleam since representing a port-based subject as PortSubject lets us do two things: - Disallow send() on PortSubjects with a panic - check subject_owner at runtime that the port subject is still the calling process --- src/gleam/erlang/port.gleam | 72 +++++++++++++++++++++++++++++ src/gleam/erlang/port/receive.gleam | 5 ++ src/gleam/erlang/process.gleam | 43 ++++++++++++++++- src/gleam_erlang_ffi.erl | 38 ++++++++++++++- test/gleam/erlang/port_test.gleam | 63 +++++++++++++++++++++++++ 5 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 src/gleam/erlang/port/receive.gleam create mode 100644 test/gleam/erlang/port_test.gleam diff --git a/src/gleam/erlang/port.gleam b/src/gleam/erlang/port.gleam index 4e1b4d8..cf04b35 100644 --- a/src/gleam/erlang/port.gleam +++ b/src/gleam/erlang/port.gleam @@ -1,3 +1,5 @@ +import gleam/dynamic + /// Ports are how code running on the Erlang virtual machine interacts with /// the outside world. Bytes of data can be sent to and read from ports, /// providing a form of message passing to an external program or resource. @@ -7,3 +9,73 @@ /// [1]: https://erlang.org/doc/reference_manual/ports.html /// pub type Port + +pub type PortCommand { + Spawn(String) + SpawnDriver(String) + SpawnExecutable(String) + Fd(in: Int, out: Int) +} + +pub fn spawn(command: String) -> PortCommand { + Spawn(command) +} + +pub fn spawn_driver(command: String) -> PortCommand { + SpawnDriver(command) +} + +pub fn spawn_executable(command: String) -> PortCommand { + SpawnExecutable(command) +} + + +pub type PortOptions { + Arg0(String) + Args(List(String)) + Env(List(#(String, String))) + Cd(String) + Binary + + UseStdio + NouseStdio + + Eof + ExitStatus + + Stream + Line(Int) + Packet(Int) // only 1, 2, & 4 +} + +pub type PortError { + Badarg + SystemLimit + Enomem + Eagain + Enametoolong + Emfile + Enfile + Eaccess + Enoent +} + +@external(erlang, "gleam_erlang_ffi", "my_open_port") +pub fn open( + name: PortCommand, + settings: List(PortOptions), +) -> Result(Port, PortError) + +@external(erlang, "erlang", "port_command") +pub fn port_command( + port: Port, + data: BitArray, + options: List(PortOptions), +) -> Nil + + +@external(erlang, "gleam_erlang_ffi", "identity") +pub fn to_dynamic(a: Port) -> dynamic.Dynamic + +@external(erlang, "erlang", "port_close") +pub fn close(port: Port) -> Nil diff --git a/src/gleam/erlang/port/receive.gleam b/src/gleam/erlang/port/receive.gleam new file mode 100644 index 0000000..2df8420 --- /dev/null +++ b/src/gleam/erlang/port/receive.gleam @@ -0,0 +1,5 @@ +pub type PortData { + Data(BitArray) + ExitStatus(Int) + Eof +} diff --git a/src/gleam/erlang/process.gleam b/src/gleam/erlang/process.gleam index dd85488..21560d3 100644 --- a/src/gleam/erlang/process.gleam +++ b/src/gleam/erlang/process.gleam @@ -2,7 +2,9 @@ import gleam/dynamic.{type Dynamic} import gleam/dynamic/decode import gleam/erlang/atom.{type Atom} import gleam/erlang/port.{type Port} +import gleam/erlang/port/receive.{type PortData} import gleam/erlang/reference.{type Reference} +import gleam/result import gleam/string /// A `Pid` (or Process identifier) is a reference to an Erlang process. Each @@ -78,6 +80,8 @@ pub fn spawn_unlinked(a: fn() -> anything) -> Pid pub opaque type Subject(message) { Subject(owner: Pid, tag: Dynamic) NamedSubject(name: Name(message)) + // backup_owner needs to exist because ports close and we can't deduce the owner after a port closes, but the messages still exist in a mailbox of a pid + PortSubject(port: Port, backup_owner: Pid) } /// Create a subject for the given process with the give tag. This is unsafe! @@ -149,7 +153,7 @@ pub fn named_subject(name: Name(message)) -> Subject(message) { pub fn subject_name(subject: Subject(message)) -> Result(Name(message), Nil) { case subject { NamedSubject(name:) -> Ok(name) - Subject(..) -> Error(Nil) + _ -> Error(Nil) } } @@ -159,6 +163,31 @@ pub fn new_subject() -> Subject(message) { Subject(owner: self(), tag: reference_to_dynamic(reference.new())) } + +pub fn port_subject(port: Port) -> Subject(PortData) { + PortSubject( + port: port, + backup_owner: port_owner(port) |> result.unwrap(self()), + ) +} + +pub fn port_open(command: port.PortCommand, options: List(port.PortOptions)) -> Result(Subject(PortData), _) { + port.open(command, options) |> result.map(port_subject) +} + +@external(erlang, "erlang", "port_connect") +fn perform_port_connect(port: Port, pid: Pid) -> Nil + +pub fn port_connect(port: Port, pid: Pid) -> Subject(PortData) { + perform_port_connect(port, pid) + PortSubject(port, backup_owner: pid) +} + +/// Returns Error if the port is already closed +/// if possible, try to use subject_owner() +@external(erlang, "gleam_erlang_ffi", "port_owner") +pub fn port_owner(port: Port) -> Result(Pid, Nil) + /// Get the owner process for a subject, which is the process that will /// receive any messages sent using the subject. /// @@ -169,6 +198,14 @@ pub fn subject_owner(subject: Subject(message)) -> Result(Pid, Nil) { case subject { NamedSubject(name) -> named(name) Subject(pid, _) -> Ok(pid) + PortSubject(port, backup_owner) -> Ok(port_owner(port) |> result.unwrap(backup_owner)) + } +} + +pub fn subject_port(subject: Subject(message)) -> Result(Port, Nil) { + case subject { + PortSubject(port, ..) -> Ok(port) + _ -> Error(Nil) } } @@ -215,6 +252,7 @@ pub fn send(subject: Subject(message), message: message) -> Nil { let assert Ok(pid) = named(name) as "Sending to unregistered name" raw_send(pid, #(name, message)) } + PortSubject(..) -> panic as "Cannot send on PortSubject" } Nil } @@ -420,6 +458,7 @@ pub fn select_map( case subject { NamedSubject(name) -> insert_selector_handler(selector, #(name, 2), handler) Subject(_, tag:) -> insert_selector_handler(selector, #(tag, 2), handler) + PortSubject(port:, ..) -> insert_selector_handler(selector, #(port, 2), handler) } } @@ -433,6 +472,7 @@ pub fn deselect( case subject { NamedSubject(name) -> remove_selector_handler(selector, #(name, 2)) Subject(_, tag:) -> remove_selector_handler(selector, #(tag, 2)) + PortSubject(port:, ..) -> remove_selector_handler(selector, #(port, 2)) } } @@ -755,6 +795,7 @@ pub fn send_after(subject: Subject(msg), delay: Int, message: msg) -> Timer { case subject { NamedSubject(name) -> name_send_after(delay, name, #(name, message)) Subject(owner, tag) -> pid_send_after(delay, owner, #(tag, message)) + PortSubject(..) -> panic as "Cannot send on PortSubject" } } diff --git a/src/gleam_erlang_ffi.erl b/src/gleam_erlang_ffi.erl index b1ad710..c331f17 100644 --- a/src/gleam_erlang_ffi.erl +++ b/src/gleam_erlang_ffi.erl @@ -5,7 +5,7 @@ select/2, trap_exits/1, map_selector/2, merge_selector/2, flush_messages/0, priv_directory/1, connect_node/1, register_process/2, unregister_process/1, process_named/1, identity/1, 'receive'/1, 'receive'/2, new_name/1, - cast_down_message/1, cast_exit_reason/1 + cast_down_message/1, cast_exit_reason/1, my_open_port/2, port_owner/1 ]). -spec atom_from_string(binary()) -> {ok, atom()} | {error, nil}. @@ -83,6 +83,10 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> receive {Ref, Message} -> Message end; +'receive'({port_subject, Port, _Pid}) -> + receive + {Port, Message} -> Message + end; 'receive'({named_subject, Name}) -> receive {Name, Message} -> Message @@ -94,6 +98,12 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> after Timeout -> {error, nil} end; +'receive'({port_subject, Port, _Pid}, Timeout) -> + receive + {Port, Message} -> {ok, Message} + after Timeout -> + {error, nil} + end; 'receive'({named_subject, Name}, Timeout) -> receive {Name, Message} -> {ok, Message} @@ -101,6 +111,32 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> {error, nil} end. +% sometimes this errors out if the port is already closed +port_owner(Port) -> + case erlang:port_info(Port, connected) of + {connected, Pid} -> {ok, Pid}; + undefined -> {error, nil} + end. + +my_open_port(PortName, PortSettings) -> + try + Port = open_port(PortName, PortSettings), + {ok, Port} + catch error:Reason -> + case Reason of + badarg -> {error, badarg}; + system_limit -> {error, system_limit}; + enomem -> {error, enomem}; + eagain -> {error, eagain}; + enametoolong -> {error, enametoolong}; + emfile -> {error, emfile}; + enfile -> {error, enfile}; + eaccess -> {error, eaccess}; + enoent -> {error, enoent} + end + end. + + demonitor(Reference) -> erlang:demonitor(Reference, [flush]). diff --git a/test/gleam/erlang/port_test.gleam b/test/gleam/erlang/port_test.gleam new file mode 100644 index 0000000..28f9460 --- /dev/null +++ b/test/gleam/erlang/port_test.gleam @@ -0,0 +1,63 @@ +import gleam/erlang/port +import gleam/erlang/port/receive.{Data, ExitStatus} +import gleam/erlang/process.{receive, port_open, port_subject} + +pub fn port_open_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi | wc -c"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"3\n">>)) == receive(subject, 100) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} + +pub fn fail_port_open_test() { + let assert Error(port.Enoent) = port.open( + port.spawn_executable("/bin/doesntexist"), + [] + ) +} + +pub fn port_subject_test() { + let assert Ok(subject) = port_open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hello | wc -c"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + assert Ok(Data(<<"6\n">>)) == receive(subject, 100) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} + +pub fn close_port_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi; sleep 0.25; echo yo 2> /dev/null"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"hi\n">>)) == receive(subject, 100) + + port.close(port) + + assert Error(Nil) == receive(subject, 300) +} + +pub fn delay_port_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi; sleep 0.25; echo yo 2> /dev/null"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + assert Ok(Data(<<"hi\n">>)) == receive(subject, 50) + assert Error(Nil) == receive(subject, 100) + assert Ok(Data(<<"yo\n">>)) == receive(subject, 300) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} From 692529972253bc2d9dcc5c1e4542aa566ae74260 Mon Sep 17 00:00:00 2001 From: m Date: Tue, 23 Sep 2025 14:33:04 -0700 Subject: [PATCH 4/6] Add a test for port ownership switchover --- test/gleam/erlang/port_test.gleam | 43 ++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/test/gleam/erlang/port_test.gleam b/test/gleam/erlang/port_test.gleam index 28f9460..f8cc6d5 100644 --- a/test/gleam/erlang/port_test.gleam +++ b/test/gleam/erlang/port_test.gleam @@ -1,6 +1,9 @@ import gleam/erlang/port import gleam/erlang/port/receive.{Data, ExitStatus} -import gleam/erlang/process.{receive, port_open, port_subject} +import gleam/erlang/process.{receive, port_open, port_connect, port_subject} + +@external(erlang, "gleam_erlang_test_ffi", "assert_gleam_panic") +fn assert_panic(f: fn() -> t) -> String pub fn port_open_test() { let assert Ok(port) = port.open( @@ -61,3 +64,41 @@ pub fn delay_port_test() { assert Ok(ExitStatus(0)) == receive(subject, 100) assert Error(Nil) == receive(subject, 100) } + +// mirrors name_other_switchover_test in process_test a bit +pub fn port_switchover_test() { + // create a new port + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "cat"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = process.port_subject(port) + + // verify we can listen on it + port.port_command(port, <<"wasd\n">>, []) + assert Ok(Data(<<"wasd\n">>)) == receive(subject, 5) + + // switch the owner to some other erlang process + let pid = process.spawn(fn() { process.sleep(20) }) + let newsubject = port_connect(port, pid) + + // try to listen on it again + port.port_command(port, <<"test2\n">>, []) + assert assert_panic(fn() { process.receive(newsubject, 0) }) + == "Cannot receive with a subject owned by another process" + // test the original subject, too + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" + + // let's switch back now + let newnewsubject = port_connect(port, process.self()) + port.port_command(port, <<"test3\n">>, []) + assert Ok(Data(<<"test3\n">>)) == receive(subject, 5) + port.port_command(port, <<"test4\n">>, []) + assert Ok(Data(<<"test4\n">>)) == receive(newsubject, 5) + port.port_command(port, <<"test5\n">>, []) + assert Ok(Data(<<"test5\n">>)) == receive(newnewsubject, 5) + + port.close(port) +} From cb0a12077d0023b9edf23f09cebc1d1672eb2fcc Mon Sep 17 00:00:00 2001 From: m Date: Tue, 23 Sep 2025 15:09:13 -0700 Subject: [PATCH 5/6] Add test for verifying send isn't valid on port --- test/gleam/erlang/port_test.gleam | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/gleam/erlang/port_test.gleam b/test/gleam/erlang/port_test.gleam index f8cc6d5..77cf2be 100644 --- a/test/gleam/erlang/port_test.gleam +++ b/test/gleam/erlang/port_test.gleam @@ -102,3 +102,19 @@ pub fn port_switchover_test() { port.close(port) } + + +pub fn port_subject_send_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "cat"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + + assert assert_panic(fn() { process.send(subject, Data(<<"hello">>)) }) + == "Cannot send on PortSubject" + + port.close(port) +} From f38a490cf5295b71dfeeb744632f06fb43354f3d Mon Sep 17 00:00:00 2001 From: m Date: Tue, 23 Sep 2025 18:27:40 -0700 Subject: [PATCH 6/6] env takes charlist --- src/gleam/erlang/port.gleam | 3 ++- test/gleam/erlang/port_test.gleam | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/gleam/erlang/port.gleam b/src/gleam/erlang/port.gleam index cf04b35..ba7f36c 100644 --- a/src/gleam/erlang/port.gleam +++ b/src/gleam/erlang/port.gleam @@ -1,4 +1,5 @@ import gleam/dynamic +import gleam/erlang/charlist.{type Charlist} /// Ports are how code running on the Erlang virtual machine interacts with /// the outside world. Bytes of data can be sent to and read from ports, @@ -33,7 +34,7 @@ pub fn spawn_executable(command: String) -> PortCommand { pub type PortOptions { Arg0(String) Args(List(String)) - Env(List(#(String, String))) + Env(List(#(Charlist, Charlist))) Cd(String) Binary diff --git a/test/gleam/erlang/port_test.gleam b/test/gleam/erlang/port_test.gleam index 77cf2be..a8fa113 100644 --- a/test/gleam/erlang/port_test.gleam +++ b/test/gleam/erlang/port_test.gleam @@ -1,6 +1,7 @@ import gleam/erlang/port import gleam/erlang/port/receive.{Data, ExitStatus} import gleam/erlang/process.{receive, port_open, port_connect, port_subject} +import gleam/erlang/charlist @external(erlang, "gleam_erlang_test_ffi", "assert_gleam_panic") fn assert_panic(f: fn() -> t) -> String @@ -18,6 +19,21 @@ pub fn port_open_test() { assert Error(Nil) == receive(subject, 100) } +pub fn port_env_open_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [ + port.Args(["-c", "echo $VAR"]), + port.Env([#(charlist.from_string("VAR"), charlist.from_string("test123"))]), + port.UseStdio, port.ExitStatus, port.Binary + ] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"test123\n">>)) == receive(subject, 100) +} + pub fn fail_port_open_test() { let assert Error(port.Enoent) = port.open( port.spawn_executable("/bin/doesntexist"),