diff --git a/buffer.go b/buffer.go index bb1d2b77..07f8c378 100644 --- a/buffer.go +++ b/buffer.go @@ -1,11 +1,17 @@ package fuse -import "unsafe" +import ( + "bytes" + "sync" + "unsafe" +) // buffer provides a mechanism for constructing a message from // multiple segments. type buffer []byte +const hdrSize = unsafe.Sizeof(outHeader{}) + // alloc allocates size bytes and returns a pointer to the new // segment. func (w *buffer) alloc(size uintptr) unsafe.Pointer { @@ -29,7 +35,24 @@ func (w *buffer) reset() { } func newBuffer(extra uintptr) buffer { - const hdrSize = unsafe.Sizeof(outHeader{}) buf := make(buffer, hdrSize, hdrSize+extra) return buf } + +// readBufPool is a pool of read request data +var readBufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, int(hdrSize)+maxWrite) + }, +} + +func newStreamingBuffer() *bytes.Buffer { + buf := bytes.NewBuffer(readBufPool.Get().([]byte)) + buf.Truncate(int(hdrSize)) + + return buf +} + +func returnBuffer(buf []byte) { + readBufPool.Put(buf) +} diff --git a/fs/serve.go b/fs/serve.go index e9fc5659..b85ef0c3 100644 --- a/fs/serve.go +++ b/fs/serve.go @@ -303,6 +303,16 @@ type HandleReader interface { Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error } +type HandleStreamingReader interface { + // StreamingRead requests to read data from the handle. + // + // In contrast to `HandleReader` this interface uses a + // StreamingReadResponse which implements the io.Writer + // interface. + + StreamingRead(ctx context.Context, req *fuse.ReadRequest, resp *fuse.StreamingReadResponse) error +} + type HandleWriter interface { // Write requests to write data into the handle at the given offset. // Store the amount of data written in resp.Size. @@ -1241,14 +1251,29 @@ func (c *Server) handleRequest(ctx context.Context, node Node, snode *serveNode, r.Respond(s) return nil } - h, ok := handle.(HandleReader) + if h, ok := handle.(HandleReader); ok { + if err := h.Read(ctx, r, s); err != nil { + return err + } + + done(s) + r.Respond(s) + return nil + } + h, ok := handle.(HandleStreamingReader) if !ok { err := handleNotReaderError{handle: handle} return err } - if err := h.Read(ctx, r, s); err != nil { + + s := fuse.NewStreamingReadResponse() + if err := h.StreamingRead(ctx, r, s); err != nil { return err } + + done(s) + r.StreamingRespond(s) + return nil } done(s) r.Respond(s) diff --git a/fs/serve_test.go b/fs/serve_test.go index e7f9cfb0..2f7c5a1b 100644 --- a/fs/serve_test.go +++ b/fs/serve_test.go @@ -351,6 +351,37 @@ func TestReadAllWithHandleRead(t *testing.T) { testReadAll(t, mnt.Dir+"/child") } +type readWithStreamingHandleRead struct { +} + +func (readWithStreamingHandleRead) Attr(ctx context.Context, a *fuse.Attr) error { + a.Mode = 0666 + a.Size = uint64(len(hi)) + + return nil +} + +func (readWithStreamingHandleRead) StreamingRead(ctx context.Context, req *fuse.ReadRequest, resp *fuse.StreamingReadResponse) error { + _, err := io.Copy(resp, bytes.NewReader([]byte(hi))) + if err != nil { + return fuse.EIO + } + + return nil +} + +func TestStreamingReadAllWithHandleRead(t *testing.T) { + t.Parallel() + mnt, err := fstestutil.MountedT(t, fstestutil.SimpleFS{&fstestutil.ChildMap{"child": readWithStreamingHandleRead{}}}, nil) + + if err != nil { + t.Fatal(err) + } + defer mnt.Close() + + testReadAll(t, mnt.Dir+"/child") +} + type readFlags struct { fstestutil.File fileFlags record.Recorder diff --git a/fuse.go b/fuse.go index 6db0ef29..aa53a170 100644 --- a/fuse.go +++ b/fuse.go @@ -1764,6 +1764,12 @@ func (r *ReadRequest) Respond(resp *ReadResponse) { r.respond(buf) } +func (r *ReadRequest) StreamingRespond(resp *StreamingReadResponse) { + buf := resp.Data() + r.respond(buf) + returnBuffer(buf) +} + // A ReadResponse is the response to a ReadRequest. type ReadResponse struct { Data []byte @@ -1773,6 +1779,28 @@ func (r *ReadResponse) String() string { return fmt.Sprintf("Read %d", len(r.Data)) } +func NewStreamingReadResponse() *StreamingReadResponse { + return &StreamingReadResponse{buf: newStreamingBuffer()} +} + +// A StreamingReadResponse is the response to a ReadRequests wich +// supports streaming the response via a io.Writer +type StreamingReadResponse struct { + buf *bytes.Buffer +} + +func (resp *StreamingReadResponse) Write(p []byte) (int, error) { + return resp.buf.Write(p) +} + +func (resp *StreamingReadResponse) Data() []byte { + return resp.buf.Bytes() +} + +func (r *StreamingReadResponse) String() string { + return fmt.Sprintf("Read %d", len(r.Data())) +} + type jsonReadResponse struct { Len uint64 }