diff --git a/src/gleam/erlang/port.gleam b/src/gleam/erlang/port.gleam index 4e1b4d8..ba7f36c 100644 --- a/src/gleam/erlang/port.gleam +++ b/src/gleam/erlang/port.gleam @@ -1,3 +1,6 @@ +import gleam/dynamic +import gleam/erlang/charlist.{type Charlist} + /// Ports are how code running on the Erlang virtual machine interacts with /// the outside world. Bytes of data can be sent to and read from ports, /// providing a form of message passing to an external program or resource. @@ -7,3 +10,73 @@ /// [1]: https://erlang.org/doc/reference_manual/ports.html /// pub type Port + +pub type PortCommand { + Spawn(String) + SpawnDriver(String) + SpawnExecutable(String) + Fd(in: Int, out: Int) +} + +pub fn spawn(command: String) -> PortCommand { + Spawn(command) +} + +pub fn spawn_driver(command: String) -> PortCommand { + SpawnDriver(command) +} + +pub fn spawn_executable(command: String) -> PortCommand { + SpawnExecutable(command) +} + + +pub type PortOptions { + Arg0(String) + Args(List(String)) + Env(List(#(Charlist, Charlist))) + Cd(String) + Binary + + UseStdio + NouseStdio + + Eof + ExitStatus + + Stream + Line(Int) + Packet(Int) // only 1, 2, & 4 +} + +pub type PortError { + Badarg + SystemLimit + Enomem + Eagain + Enametoolong + Emfile + Enfile + Eaccess + Enoent +} + +@external(erlang, "gleam_erlang_ffi", "my_open_port") +pub fn open( + name: PortCommand, + settings: List(PortOptions), +) -> Result(Port, PortError) + +@external(erlang, "erlang", "port_command") +pub fn port_command( + port: Port, + data: BitArray, + options: List(PortOptions), +) -> Nil + + +@external(erlang, "gleam_erlang_ffi", "identity") +pub fn to_dynamic(a: Port) -> dynamic.Dynamic + +@external(erlang, "erlang", "port_close") +pub fn close(port: Port) -> Nil diff --git a/src/gleam/erlang/port/receive.gleam b/src/gleam/erlang/port/receive.gleam new file mode 100644 index 0000000..2df8420 --- /dev/null +++ b/src/gleam/erlang/port/receive.gleam @@ -0,0 +1,5 @@ +pub type PortData { + Data(BitArray) + ExitStatus(Int) + Eof +} diff --git a/src/gleam/erlang/process.gleam b/src/gleam/erlang/process.gleam index 7172233..21560d3 100644 --- a/src/gleam/erlang/process.gleam +++ b/src/gleam/erlang/process.gleam @@ -2,7 +2,9 @@ import gleam/dynamic.{type Dynamic} import gleam/dynamic/decode import gleam/erlang/atom.{type Atom} import gleam/erlang/port.{type Port} +import gleam/erlang/port/receive.{type PortData} import gleam/erlang/reference.{type Reference} +import gleam/result import gleam/string /// A `Pid` (or Process identifier) is a reference to an Erlang process. Each @@ -78,6 +80,8 @@ pub fn spawn_unlinked(a: fn() -> anything) -> Pid pub opaque type Subject(message) { Subject(owner: Pid, tag: Dynamic) NamedSubject(name: Name(message)) + // backup_owner needs to exist because ports close and we can't deduce the owner after a port closes, but the messages still exist in a mailbox of a pid + PortSubject(port: Port, backup_owner: Pid) } /// Create a subject for the given process with the give tag. This is unsafe! @@ -149,7 +153,7 @@ pub fn named_subject(name: Name(message)) -> Subject(message) { pub fn subject_name(subject: Subject(message)) -> Result(Name(message), Nil) { case subject { NamedSubject(name:) -> Ok(name) - Subject(..) -> Error(Nil) + _ -> Error(Nil) } } @@ -159,6 +163,31 @@ pub fn new_subject() -> Subject(message) { Subject(owner: self(), tag: reference_to_dynamic(reference.new())) } + +pub fn port_subject(port: Port) -> Subject(PortData) { + PortSubject( + port: port, + backup_owner: port_owner(port) |> result.unwrap(self()), + ) +} + +pub fn port_open(command: port.PortCommand, options: List(port.PortOptions)) -> Result(Subject(PortData), _) { + port.open(command, options) |> result.map(port_subject) +} + +@external(erlang, "erlang", "port_connect") +fn perform_port_connect(port: Port, pid: Pid) -> Nil + +pub fn port_connect(port: Port, pid: Pid) -> Subject(PortData) { + perform_port_connect(port, pid) + PortSubject(port, backup_owner: pid) +} + +/// Returns Error if the port is already closed +/// if possible, try to use subject_owner() +@external(erlang, "gleam_erlang_ffi", "port_owner") +pub fn port_owner(port: Port) -> Result(Pid, Nil) + /// Get the owner process for a subject, which is the process that will /// receive any messages sent using the subject. /// @@ -169,6 +198,14 @@ pub fn subject_owner(subject: Subject(message)) -> Result(Pid, Nil) { case subject { NamedSubject(name) -> named(name) Subject(pid, _) -> Ok(pid) + PortSubject(port, backup_owner) -> Ok(port_owner(port) |> result.unwrap(backup_owner)) + } +} + +pub fn subject_port(subject: Subject(message)) -> Result(Port, Nil) { + case subject { + PortSubject(port, ..) -> Ok(port) + _ -> Error(Nil) } } @@ -215,6 +252,7 @@ pub fn send(subject: Subject(message), message: message) -> Nil { let assert Ok(pid) = named(name) as "Sending to unregistered name" raw_send(pid, #(name, message)) } + PortSubject(..) -> panic as "Cannot send on PortSubject" } Nil } @@ -243,14 +281,11 @@ pub fn receive( from subject: Subject(message), within timeout: Int, ) -> Result(message, Nil) { - case subject { - NamedSubject(..) -> perform_receive(subject, timeout) - Subject(owner:, ..) -> - case owner == self() { - True -> perform_receive(subject, timeout) - False -> - panic as "Cannot receive with a subject owned by another process" - } + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive(subject, timeout) + _ -> + panic as "Cannot receive with a subject owned by another process" } } @@ -263,8 +298,19 @@ fn perform_receive( /// Receive a message that has been sent to current process using the `Subject`. /// /// Same as `receive` but waits forever and returns the message as is. +pub fn receive_forever(from subject: Subject(message)) -> message { + let self = self() + case subject_owner(subject) { + Ok(pid) if pid == self -> perform_receive_forever(subject) + _ -> + panic as "Cannot receive with a subject owned by another process" + } +} + @external(erlang, "gleam_erlang_ffi", "receive") -pub fn receive_forever(from subject: Subject(message)) -> message +fn perform_receive_forever(from subject: Subject(message)) -> message + + /// A type that enables a process to wait for messages from multiple `Subject`s /// at the same time, returning whichever message arrives first. @@ -412,6 +458,7 @@ pub fn select_map( case subject { NamedSubject(name) -> insert_selector_handler(selector, #(name, 2), handler) Subject(_, tag:) -> insert_selector_handler(selector, #(tag, 2), handler) + PortSubject(port:, ..) -> insert_selector_handler(selector, #(port, 2), handler) } } @@ -425,6 +472,7 @@ pub fn deselect( case subject { NamedSubject(name) -> remove_selector_handler(selector, #(name, 2)) Subject(_, tag:) -> remove_selector_handler(selector, #(tag, 2)) + PortSubject(port:, ..) -> remove_selector_handler(selector, #(port, 2)) } } @@ -747,6 +795,7 @@ pub fn send_after(subject: Subject(msg), delay: Int, message: msg) -> Timer { case subject { NamedSubject(name) -> name_send_after(delay, name, #(name, message)) Subject(owner, tag) -> pid_send_after(delay, owner, #(tag, message)) + PortSubject(..) -> panic as "Cannot send on PortSubject" } } diff --git a/src/gleam_erlang_ffi.erl b/src/gleam_erlang_ffi.erl index b1ad710..c331f17 100644 --- a/src/gleam_erlang_ffi.erl +++ b/src/gleam_erlang_ffi.erl @@ -5,7 +5,7 @@ select/2, trap_exits/1, map_selector/2, merge_selector/2, flush_messages/0, priv_directory/1, connect_node/1, register_process/2, unregister_process/1, process_named/1, identity/1, 'receive'/1, 'receive'/2, new_name/1, - cast_down_message/1, cast_exit_reason/1 + cast_down_message/1, cast_exit_reason/1, my_open_port/2, port_owner/1 ]). -spec atom_from_string(binary()) -> {ok, atom()} | {error, nil}. @@ -83,6 +83,10 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> receive {Ref, Message} -> Message end; +'receive'({port_subject, Port, _Pid}) -> + receive + {Port, Message} -> Message + end; 'receive'({named_subject, Name}) -> receive {Name, Message} -> Message @@ -94,6 +98,12 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> after Timeout -> {error, nil} end; +'receive'({port_subject, Port, _Pid}, Timeout) -> + receive + {Port, Message} -> {ok, Message} + after Timeout -> + {error, nil} + end; 'receive'({named_subject, Name}, Timeout) -> receive {Name, Message} -> {ok, Message} @@ -101,6 +111,32 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) -> {error, nil} end. +% sometimes this errors out if the port is already closed +port_owner(Port) -> + case erlang:port_info(Port, connected) of + {connected, Pid} -> {ok, Pid}; + undefined -> {error, nil} + end. + +my_open_port(PortName, PortSettings) -> + try + Port = open_port(PortName, PortSettings), + {ok, Port} + catch error:Reason -> + case Reason of + badarg -> {error, badarg}; + system_limit -> {error, system_limit}; + enomem -> {error, enomem}; + eagain -> {error, eagain}; + enametoolong -> {error, enametoolong}; + emfile -> {error, emfile}; + enfile -> {error, enfile}; + eaccess -> {error, eaccess}; + enoent -> {error, enoent} + end + end. + + demonitor(Reference) -> erlang:demonitor(Reference, [flush]). diff --git a/test/gleam/erlang/port_test.gleam b/test/gleam/erlang/port_test.gleam new file mode 100644 index 0000000..a8fa113 --- /dev/null +++ b/test/gleam/erlang/port_test.gleam @@ -0,0 +1,136 @@ +import gleam/erlang/port +import gleam/erlang/port/receive.{Data, ExitStatus} +import gleam/erlang/process.{receive, port_open, port_connect, port_subject} +import gleam/erlang/charlist + +@external(erlang, "gleam_erlang_test_ffi", "assert_gleam_panic") +fn assert_panic(f: fn() -> t) -> String + +pub fn port_open_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi | wc -c"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"3\n">>)) == receive(subject, 100) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} + +pub fn port_env_open_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [ + port.Args(["-c", "echo $VAR"]), + port.Env([#(charlist.from_string("VAR"), charlist.from_string("test123"))]), + port.UseStdio, port.ExitStatus, port.Binary + ] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"test123\n">>)) == receive(subject, 100) +} + +pub fn fail_port_open_test() { + let assert Error(port.Enoent) = port.open( + port.spawn_executable("/bin/doesntexist"), + [] + ) +} + +pub fn port_subject_test() { + let assert Ok(subject) = port_open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hello | wc -c"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + assert Ok(Data(<<"6\n">>)) == receive(subject, 100) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} + +pub fn close_port_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi; sleep 0.25; echo yo 2> /dev/null"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + assert Ok(Data(<<"hi\n">>)) == receive(subject, 100) + + port.close(port) + + assert Error(Nil) == receive(subject, 300) +} + +pub fn delay_port_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "echo hi; sleep 0.25; echo yo 2> /dev/null"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + assert Ok(Data(<<"hi\n">>)) == receive(subject, 50) + assert Error(Nil) == receive(subject, 100) + assert Ok(Data(<<"yo\n">>)) == receive(subject, 300) + assert Ok(ExitStatus(0)) == receive(subject, 100) + assert Error(Nil) == receive(subject, 100) +} + +// mirrors name_other_switchover_test in process_test a bit +pub fn port_switchover_test() { + // create a new port + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "cat"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = process.port_subject(port) + + // verify we can listen on it + port.port_command(port, <<"wasd\n">>, []) + assert Ok(Data(<<"wasd\n">>)) == receive(subject, 5) + + // switch the owner to some other erlang process + let pid = process.spawn(fn() { process.sleep(20) }) + let newsubject = port_connect(port, pid) + + // try to listen on it again + port.port_command(port, <<"test2\n">>, []) + assert assert_panic(fn() { process.receive(newsubject, 0) }) + == "Cannot receive with a subject owned by another process" + // test the original subject, too + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" + + // let's switch back now + let newnewsubject = port_connect(port, process.self()) + port.port_command(port, <<"test3\n">>, []) + assert Ok(Data(<<"test3\n">>)) == receive(subject, 5) + port.port_command(port, <<"test4\n">>, []) + assert Ok(Data(<<"test4\n">>)) == receive(newsubject, 5) + port.port_command(port, <<"test5\n">>, []) + assert Ok(Data(<<"test5\n">>)) == receive(newnewsubject, 5) + + port.close(port) +} + + +pub fn port_subject_send_test() { + let assert Ok(port) = port.open( + port.spawn_executable("/bin/bash"), + [port.Args(["-c", "cat"]), port.UseStdio, port.ExitStatus, port.Binary] + ) + + let subject = port_subject(port) + + + assert assert_panic(fn() { process.send(subject, Data(<<"hello">>)) }) + == "Cannot send on PortSubject" + + port.close(port) +} diff --git a/test/gleam/erlang/process_test.gleam b/test/gleam/erlang/process_test.gleam index 1a86c2b..4faab9c 100644 --- a/test/gleam/erlang/process_test.gleam +++ b/test/gleam/erlang/process_test.gleam @@ -112,6 +112,15 @@ pub fn receive_forever_test() { let assert 2 = process.receive_forever(subject) } +pub fn receive_forever_other_test() { + let subject = process.new_subject() + process.spawn(fn() { process.send(subject, process.new_subject()) }) + let subject = process.receive_forever(subject) + + assert assert_panic(fn() { process.receive_forever(subject) }) + == "Cannot receive with a subject owned by another process" +} + pub fn is_alive_test() { let pid = process.spawn_unlinked(fn() { Nil }) process.sleep(5) @@ -666,3 +675,67 @@ pub fn name_test() { let assert Ok("Hello") = process.receive(subject, 0) process.unregister(name) } + +pub fn name_other_test() { + let name = process.new_name("name") + let pid = process.spawn_unlinked(fn() { process.sleep(10) }) + let assert Ok(_) = process.register(pid, name) + let subject = process.named_subject(name) + process.send(subject, "Hello") + assert assert_panic(fn() { process.receive(subject, 5) }) + == "Cannot receive with a subject owned by another process" + process.unregister(name) +} + + +// tests that sending/receiving on names fail if the name gets unregistered +pub fn unregister_receive_test() { + let name = process.new_name("name") + let assert Ok(_) = process.register(process.self(), name) + let subject = process.named_subject(name) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let assert Ok(_) = process.unregister(name) + + assert assert_panic(fn() { process.send(subject, "Hello") }) + == "Sending to unregistered name" + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" // this error message could be better +} + +// tests that receiving on names fail if the registration gets moved during execution +pub fn name_other_switchover_test() { + let name = process.new_name("name") + let self = process.self() + let assert Ok(_) = process.register(self, name) + let subject = process.named_subject(name) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "Hello") + let assert Ok("Hello") = process.receive(subject, 0) + + let pid = process.spawn(fn() { + process.sleep(10) + let assert Ok(_) = process.unregister(name) // "undesirable behaviour" + let assert Ok(_) = process.register(process.self(), name) + + process.sleep(20) + let assert Ok("Peace") = process.receive(subject, 0) + }) + + assert Ok(self) == process.subject_owner(subject) + + process.send(subject, "World") + let assert Ok("World") = process.receive(subject, 0) + + assert Ok(self) == process.subject_owner(subject) + process.sleep(12) + assert Ok(pid) == process.subject_owner(subject) + + process.send(subject, "Peace") + assert assert_panic(fn() { process.receive(subject, 0) }) + == "Cannot receive with a subject owned by another process" +}