Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/gleam/erlang/port.gleam
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import gleam/dynamic
import gleam/erlang/charlist.{type Charlist}

/// Ports are how code running on the Erlang virtual machine interacts with
/// the outside world. Bytes of data can be sent to and read from ports,
/// providing a form of message passing to an external program or resource.
Expand All @@ -7,3 +10,73 @@
/// [1]: https://erlang.org/doc/reference_manual/ports.html
///
pub type Port

pub type PortCommand {
Spawn(String)
SpawnDriver(String)
SpawnExecutable(String)
Fd(in: Int, out: Int)
}

pub fn spawn(command: String) -> PortCommand {
Spawn(command)
}

pub fn spawn_driver(command: String) -> PortCommand {
SpawnDriver(command)
}

pub fn spawn_executable(command: String) -> PortCommand {
SpawnExecutable(command)
}


pub type PortOptions {
Arg0(String)
Args(List(String))
Env(List(#(Charlist, Charlist)))
Cd(String)
Binary

UseStdio
NouseStdio

Eof
ExitStatus

Stream
Line(Int)
Packet(Int) // only 1, 2, & 4
}

pub type PortError {
Badarg
SystemLimit
Enomem
Eagain
Enametoolong
Emfile
Enfile
Eaccess
Enoent
}

@external(erlang, "gleam_erlang_ffi", "my_open_port")
pub fn open(
name: PortCommand,
settings: List(PortOptions),
) -> Result(Port, PortError)

@external(erlang, "erlang", "port_command")
pub fn port_command(
port: Port,
data: BitArray,
options: List(PortOptions),
) -> Nil


@external(erlang, "gleam_erlang_ffi", "identity")
pub fn to_dynamic(a: Port) -> dynamic.Dynamic

@external(erlang, "erlang", "port_close")
pub fn close(port: Port) -> Nil
5 changes: 5 additions & 0 deletions src/gleam/erlang/port/receive.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub type PortData {
Data(BitArray)
ExitStatus(Int)
Eof
}
69 changes: 59 additions & 10 deletions src/gleam/erlang/process.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import gleam/dynamic.{type Dynamic}
import gleam/dynamic/decode
import gleam/erlang/atom.{type Atom}
import gleam/erlang/port.{type Port}
import gleam/erlang/port/receive.{type PortData}
import gleam/erlang/reference.{type Reference}
import gleam/result
import gleam/string

/// A `Pid` (or Process identifier) is a reference to an Erlang process. Each
Expand Down Expand Up @@ -78,6 +80,8 @@ pub fn spawn_unlinked(a: fn() -> anything) -> Pid
pub opaque type Subject(message) {
Subject(owner: Pid, tag: Dynamic)
NamedSubject(name: Name(message))
// backup_owner needs to exist because ports close and we can't deduce the owner after a port closes, but the messages still exist in a mailbox of a pid
PortSubject(port: Port, backup_owner: Pid)
}

/// Create a subject for the given process with the give tag. This is unsafe!
Expand Down Expand Up @@ -149,7 +153,7 @@ pub fn named_subject(name: Name(message)) -> Subject(message) {
pub fn subject_name(subject: Subject(message)) -> Result(Name(message), Nil) {
case subject {
NamedSubject(name:) -> Ok(name)
Subject(..) -> Error(Nil)
_ -> Error(Nil)
}
}

Expand All @@ -159,6 +163,31 @@ pub fn new_subject() -> Subject(message) {
Subject(owner: self(), tag: reference_to_dynamic(reference.new()))
}


pub fn port_subject(port: Port) -> Subject(PortData) {
PortSubject(
port: port,
backup_owner: port_owner(port) |> result.unwrap(self()),
)
}

pub fn port_open(command: port.PortCommand, options: List(port.PortOptions)) -> Result(Subject(PortData), _) {
port.open(command, options) |> result.map(port_subject)
}

@external(erlang, "erlang", "port_connect")
fn perform_port_connect(port: Port, pid: Pid) -> Nil

pub fn port_connect(port: Port, pid: Pid) -> Subject(PortData) {
perform_port_connect(port, pid)
PortSubject(port, backup_owner: pid)
}

/// Returns Error if the port is already closed
/// if possible, try to use subject_owner()
@external(erlang, "gleam_erlang_ffi", "port_owner")
pub fn port_owner(port: Port) -> Result(Pid, Nil)

/// Get the owner process for a subject, which is the process that will
/// receive any messages sent using the subject.
///
Expand All @@ -169,6 +198,14 @@ pub fn subject_owner(subject: Subject(message)) -> Result(Pid, Nil) {
case subject {
NamedSubject(name) -> named(name)
Subject(pid, _) -> Ok(pid)
PortSubject(port, backup_owner) -> Ok(port_owner(port) |> result.unwrap(backup_owner))
}
}

pub fn subject_port(subject: Subject(message)) -> Result(Port, Nil) {
case subject {
PortSubject(port, ..) -> Ok(port)
_ -> Error(Nil)
}
}

Expand Down Expand Up @@ -215,6 +252,7 @@ pub fn send(subject: Subject(message), message: message) -> Nil {
let assert Ok(pid) = named(name) as "Sending to unregistered name"
raw_send(pid, #(name, message))
}
PortSubject(..) -> panic as "Cannot send on PortSubject"
}
Nil
}
Expand Down Expand Up @@ -243,14 +281,11 @@ pub fn receive(
from subject: Subject(message),
within timeout: Int,
) -> Result(message, Nil) {
case subject {
NamedSubject(..) -> perform_receive(subject, timeout)
Subject(owner:, ..) ->
case owner == self() {
True -> perform_receive(subject, timeout)
False ->
panic as "Cannot receive with a subject owned by another process"
}
let self = self()
case subject_owner(subject) {
Ok(pid) if pid == self -> perform_receive(subject, timeout)
_ ->
panic as "Cannot receive with a subject owned by another process"
}
}

Expand All @@ -263,8 +298,19 @@ fn perform_receive(
/// Receive a message that has been sent to current process using the `Subject`.
///
/// Same as `receive` but waits forever and returns the message as is.
pub fn receive_forever(from subject: Subject(message)) -> message {
let self = self()
case subject_owner(subject) {
Ok(pid) if pid == self -> perform_receive_forever(subject)
_ ->
panic as "Cannot receive with a subject owned by another process"
}
}

@external(erlang, "gleam_erlang_ffi", "receive")
pub fn receive_forever(from subject: Subject(message)) -> message
fn perform_receive_forever(from subject: Subject(message)) -> message



/// A type that enables a process to wait for messages from multiple `Subject`s
/// at the same time, returning whichever message arrives first.
Expand Down Expand Up @@ -412,6 +458,7 @@ pub fn select_map(
case subject {
NamedSubject(name) -> insert_selector_handler(selector, #(name, 2), handler)
Subject(_, tag:) -> insert_selector_handler(selector, #(tag, 2), handler)
PortSubject(port:, ..) -> insert_selector_handler(selector, #(port, 2), handler)
}
}

Expand All @@ -425,6 +472,7 @@ pub fn deselect(
case subject {
NamedSubject(name) -> remove_selector_handler(selector, #(name, 2))
Subject(_, tag:) -> remove_selector_handler(selector, #(tag, 2))
PortSubject(port:, ..) -> remove_selector_handler(selector, #(port, 2))
}
}

Expand Down Expand Up @@ -747,6 +795,7 @@ pub fn send_after(subject: Subject(msg), delay: Int, message: msg) -> Timer {
case subject {
NamedSubject(name) -> name_send_after(delay, name, #(name, message))
Subject(owner, tag) -> pid_send_after(delay, owner, #(tag, message))
PortSubject(..) -> panic as "Cannot send on PortSubject"
}
}

Expand Down
38 changes: 37 additions & 1 deletion src/gleam_erlang_ffi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
select/2, trap_exits/1, map_selector/2, merge_selector/2, flush_messages/0,
priv_directory/1, connect_node/1, register_process/2, unregister_process/1,
process_named/1, identity/1, 'receive'/1, 'receive'/2, new_name/1,
cast_down_message/1, cast_exit_reason/1
cast_down_message/1, cast_exit_reason/1, my_open_port/2, port_owner/1
]).

-spec atom_from_string(binary()) -> {ok, atom()} | {error, nil}.
Expand Down Expand Up @@ -83,6 +83,10 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) ->
receive
{Ref, Message} -> Message
end;
'receive'({port_subject, Port, _Pid}) ->
receive
{Port, Message} -> Message
end;
'receive'({named_subject, Name}) ->
receive
{Name, Message} -> Message
Expand All @@ -94,13 +98,45 @@ cast_down_message({'DOWN', Ref, port, Pid, Reason}) ->
after Timeout ->
{error, nil}
end;
'receive'({port_subject, Port, _Pid}, Timeout) ->
receive
{Port, Message} -> {ok, Message}
after Timeout ->
{error, nil}
end;
'receive'({named_subject, Name}, Timeout) ->
receive
{Name, Message} -> {ok, Message}
after Timeout ->
{error, nil}
end.

% sometimes this errors out if the port is already closed
port_owner(Port) ->
case erlang:port_info(Port, connected) of
{connected, Pid} -> {ok, Pid};
undefined -> {error, nil}
end.

my_open_port(PortName, PortSettings) ->
try
Port = open_port(PortName, PortSettings),
{ok, Port}
catch error:Reason ->
case Reason of
badarg -> {error, badarg};
system_limit -> {error, system_limit};
enomem -> {error, enomem};
eagain -> {error, eagain};
enametoolong -> {error, enametoolong};
emfile -> {error, emfile};
enfile -> {error, enfile};
eaccess -> {error, eaccess};
enoent -> {error, enoent}
end
end.


demonitor(Reference) ->
erlang:demonitor(Reference, [flush]).

Expand Down
Loading