Skip to content
13 changes: 9 additions & 4 deletions cmd/ww/cat/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var env util.IPFSEnv
func Command() *cli.Command {
return &cli.Command{
Name: "cat",
ArgsUsage: "<peer> <proc>",
ArgsUsage: "<peer> <proc> [method]",
Usage: "Connect to a peer and execute a procedure over a stream",
Description: `Connect to a specified peer and execute a procedure over a custom protocol stream.
The command will:
Expand All @@ -33,7 +33,8 @@ The command will:

Examples:
ww cat QmPeer123 /echo
ww cat 12D3KooW... /myproc`,
ww cat 12D3KooW... /myproc echo
ww cat 12D3KooW... /myproc poll`,
Flags: append([]cli.Flag{
&cli.StringFlag{
Name: "ipfs",
Expand All @@ -58,12 +59,13 @@ func Main(c *cli.Context) error {
ctx, cancel := context.WithTimeout(c.Context, c.Duration("timeout"))
defer cancel()

if c.NArg() != 2 {
return cli.Exit("cat requires exactly two arguments: <peer> <proc>", 1)
if c.NArg() < 3 {
return cli.Exit("cat requires 2-3 arguments: <peer> <proc> [method]", 1)
}

peerIDStr := c.Args().Get(0)
procName := c.Args().Get(1)
method := c.Args().Get(2)

// Parse peer ID
peerID, err := peer.Decode(peerIDStr)
Expand All @@ -73,6 +75,9 @@ func Main(c *cli.Context) error {

// Construct protocol ID
protocolID := protocol.ID("/ww/0.1.0/" + procName)
if method != "" && method != "poll" {
protocolID = protocol.ID("/ww/0.1.0/" + procName + "/" + method)
}

// Create libp2p host in client mode
h, err := util.NewClient()
Expand Down
29 changes: 25 additions & 4 deletions cmd/ww/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/ipfs/boxo/path"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -134,20 +136,39 @@ func Main(c *cli.Context) error {
"peer", env.Host.ID(),
"endpoint", p.Endpoint.Name)

env.Host.SetStreamHandler(p.Endpoint.Protocol(), func(s network.Stream) {
// Set up stream handler that matches both exact protocol and with method suffix
baseProto := p.Endpoint.Protocol()
env.Host.SetStreamHandlerMatch(baseProto, func(protocol protocol.ID) bool {
// Match exact base protocol (/ww/0.1.0/<proc-id>) or with method suffix (/ww/0.1.0/<proc-id>/<method>)
return protocol == baseProto || strings.HasPrefix(string(protocol), string(baseProto)+"/")
}, func(s network.Stream) {
defer s.CloseRead()

// Extract method from protocol string
method := "poll" // default
protocolStr := string(s.Protocol())
if strings.HasPrefix(protocolStr, string(baseProto)+"/") {
// Extract method from /ww/0.1.0/<proc-id>/<method>
parts := strings.Split(protocolStr, "/")
if len(parts) > 0 {
method = parts[len(parts)-1]
}
}

slog.InfoContext(ctx, "stream connected",
"peer", s.Conn().RemotePeer(),
"stream-id", s.ID(),
"endpoint", p.Endpoint.Name)
if err := p.Poll(ctx, s, nil); err != nil {
"endpoint", p.Endpoint.Name,
"method", method)
if err := p.ProcessMessage(ctx, s, method); err != nil {
slog.ErrorContext(ctx, "failed to poll process",
"id", p.ID(),
"stream", s.ID(),
"method", method,
"reason", err)
}
})
defer env.Host.RemoveStreamHandler(p.Endpoint.Protocol())
defer env.Host.RemoveStreamHandler(baseProto)

for {
select {
Expand Down
17 changes: 5 additions & 12 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,19 @@ import (
// main is the entry point for synchronous mode.
// It processes one complete message from stdin and exits.
func main() {
// Echo: copy stdin to stdout using io.Copy
// io.Copy uses an internal 32KB buffer by default
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
os.Stderr.WriteString("Error copying stdin to stdout: " + err.Error() + "\n")
os.Exit(1)
}
defer os.Stdout.Sync()
// implicitly returns 0 to indicate successful completion
echo()
}

// poll is the async entry point for stream-based processing.
// echo is the async entry point for stream-based processing.
// This function is called by the wetware runtime when a new stream
// is established for this process.
//
//export poll
func poll() {
//export echo
func echo() {
// In async mode, we process each incoming stream
// This is the same logic as main() but for individual streams
if _, err := io.Copy(os.Stdout, os.Stdin); err != nil {
os.Stderr.WriteString("Error in poll: " + err.Error() + "\n")
os.Stderr.WriteString("Error in echo: " + err.Error() + "\n")
os.Exit(1)
}
defer os.Stdout.Sync()
Expand Down
Binary file modified examples/echo/main.wasm
Binary file not shown.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/lthibault/go-libp2p-inproc-transport v0.4.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.16.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.11.1
github.com/tetratelabs/wazero v1.9.0
github.com/urfave/cli/v2 v2.27.5
Expand Down
252 changes: 252 additions & 0 deletions system/ipfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package system

import (
"context"
"io"
"io/fs"
"log/slog"
"runtime"
"time"

"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/pkg/errors"
)

var _ fs.FS = (*IPFS)(nil)

// An IPFS provides access to a hierarchical file system.
//
// The IPFS interface is the minimum implementation required of the file system.
// A file system may implement additional interfaces,
// such as [ReadFileFS], to provide additional or optimized functionality.
//
// [testing/fstest.TestFS] may be used to test implementations of an IPFS for
// correctness.
type IPFS struct {
Ctx context.Context
Root path.Path
Unix iface.UnixfsAPI
}

// Open opens the named file.
//
// When Open returns an error, it should be of type *PathError
// with the Op field set to "open", the Path field set to name,
// and the Err field describing the problem.
//
// Open should reject attempts to open names that do not satisfy
// fs.ValidPath(name), returning a *fs.PathError with Err set to
// fs.ErrInvalid or fs.ErrNotExist.
func (f IPFS) Open(name string) (fs.File, error) {
path, node, err := f.Resolve(f.Ctx, name)
if err != nil {
return nil, &fs.PathError{
Op: "open",
Path: name,
Err: err,
}
}

return &ipfsNode{
Path: path,
Node: node,
}, nil
}

func (f IPFS) Resolve(ctx context.Context, name string) (path.Path, files.Node, error) {
if pathInvalid(name) {
return nil, nil, fs.ErrInvalid
}

p, err := path.Join(f.Root, name)
if err != nil {
return nil, nil, err
}

node, err := f.Unix.Get(ctx, p)
return p, node, err
}

func pathInvalid(name string) bool {
return !fs.ValidPath(name)
}

func (f IPFS) Sub(dir string) (fs.FS, error) {
var root path.Path
var err error
if (f == IPFS{}) {
root, err = path.NewPath(dir)
} else {
root, err = path.Join(f.Root, dir)
}

return &IPFS{
Ctx: f.Ctx,
Root: root,
Unix: f.Unix,
}, err
}

var (
_ fs.FileInfo = (*ipfsNode)(nil)
_ fs.ReadDirFile = (*ipfsNode)(nil)
_ fs.DirEntry = (*ipfsNode)(nil)
)

// ipfsNode provides access to a single file. The fs.File interface is the minimum
// implementation required of the file. Directory files should also implement [ReadDirFile].
// A file may implement io.ReaderAt or io.Seeker as optimizations.
type ipfsNode struct {
Path path.Path
files.Node
}

// base name of the file
func (n ipfsNode) Name() string {
segs := n.Path.Segments()
return segs[len(segs)-1] // last segment is name
}

func (n *ipfsNode) Stat() (fs.FileInfo, error) {
return n, nil
}

// length in bytes for regular files; system-dependent for others
func (n ipfsNode) Size() int64 {
size, err := n.Node.Size()
if err != nil {
slog.Error("failed to obtain file size",
"path", n.Path,
"reason", err)
}

return size
}

// file mode bits
func (n ipfsNode) Mode() fs.FileMode {
switch n.Node.(type) {
case files.Directory:
return fs.ModeDir
default:
return 0 // regular read-only file
}
}

// modification time
func (n ipfsNode) ModTime() time.Time {
return time.Time{} // zero-value time
}

// abbreviation for Mode().IsDir()
func (n ipfsNode) IsDir() bool {
return n.Mode().IsDir()
}

// underlying data source (never returns nil)
func (n ipfsNode) Sys() any {
return n.Node
}

func (n ipfsNode) Read(b []byte) (int, error) {
switch node := n.Node.(type) {
case io.Reader:
return node.Read(b)
default:
return 0, errors.New("unreadable node")
}
}

// ReadDir reads the contents of the directory and returns
// a slice of up to max DirEntry values in directory order.
// Subsequent calls on the same file will yield further DirEntry values.
//
// If max > 0, ReadDir returns at most max DirEntry structures.
// In this case, if ReadDir returns an empty slice, it will return
// a non-nil error explaining why.
// At the end of a directory, the error is io.EOF.
// (ReadDir must return io.EOF itself, not an error wrapping io.EOF.)
//
// If max <= 0, ReadDir returns all the DirEntry values from the directory
// in a single slice. In this case, if ReadDir succeeds (reads all the way
// to the end of the directory), it returns the slice and a nil error.
// If it encounters an error before the end of the directory,
// ReadDir returns the DirEntry list read until that point and a non-nil error.
func (n ipfsNode) ReadDir(max int) (entries []fs.DirEntry, err error) {
root, ok := n.Node.(files.Directory)
if !ok {
return nil, errors.New("not a directory")
}

iter := root.Entries()
for iter.Next() {
name := iter.Name()
node := iter.Node()

// Callers will typically discard entries if they get a non-nill
// error, so we make sure nodes are eventually closed.
runtime.SetFinalizer(node, func(c io.Closer) {
if err := c.Close(); err != nil {
slog.Warn("unable to close node",
"name", name,
"reason", err)
}
})

var subpath path.Path
if subpath, err = path.Join(n.Path, name); err != nil {
return
}

entries = append(entries, &ipfsNode{
Path: subpath,
Node: node})

// got max items?
if max--; max == 0 {
return
}
}

// If we get here, it's because the iterator stopped. It either
// failed or is exhausted. Any other error has already caused us
// to return.
if iter.Err() != nil {
err = iter.Err() // failed
} else if max >= 0 {
err = io.EOF // exhausted
}

return
}

// Info returns the FileInfo for the file or subdirectory described by the entry.
// The returned FileInfo may be from the time of the original directory read
// or from the time of the call to Info. If the file has been removed or renamed
// since the directory read, Info may return an error satisfying errors.Is(err, ErrNotExist).
// If the entry denotes a symbolic link, Info reports the information about the link itself,
// not the link's target.
func (n *ipfsNode) Info() (fs.FileInfo, error) {
return n, nil
}

// Type returns the type bits for the entry.
// The type bits are a subset of the usual FileMode bits, those returned by the FileMode.Type method.
func (n ipfsNode) Type() fs.FileMode {
if n.Mode().IsDir() {
return fs.ModeDir
}

return 0
}

func (n ipfsNode) Write(b []byte) (int, error) {
dst, ok := n.Node.(io.Writer)
if ok {
return dst.Write(b)
}

return 0, errors.New("not writeable")
}
Loading
Loading