From d9376e97e9577c679d49b182b2d082501874591a Mon Sep 17 00:00:00 2001 From: Joschka Tillmanns Date: Thu, 3 Nov 2016 15:07:49 +0100 Subject: [PATCH] add handle HandleStreamingReader This commit introduces a new type 'StreamingReadResponse' which implements the io.Writer interface. Thus users are writing into the response rather than updating the Data property of ReadResponse. This allows us to preallocate the space for the whole message and therefore reduce allocations. Additionally we may now use a sync.Pool to manage the required space needed for the read request and further minimize allocations. This patch intends to not break existing implementations of the fuse.HandleReader interface by introducing a new interface that provides the advantages described above. I've benchmarked this using a scenario that is similar to the environment I need for the application I am currently developing. The benchmark reads data blobs of different sizes from the solid-state drive. Using this patch we can achieve a 1.60x speed up as well as reduce memory allocations by 85%. https://github.com/bazil/fuse/files/570909/bazil_patch_bench.txt --- buffer.go | 27 +++++++++++++++++++++++++-- fs/serve.go | 29 +++++++++++++++++++++++++++-- fs/serve_test.go | 31 +++++++++++++++++++++++++++++++ fuse.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/buffer.go b/buffer.go index bb1d2b77..07f8c378 100644 --- a/buffer.go +++ b/buffer.go @@ -1,11 +1,17 @@ package fuse -import "unsafe" +import ( + "bytes" + "sync" + "unsafe" +) // buffer provides a mechanism for constructing a message from // multiple segments. type buffer []byte +const hdrSize = unsafe.Sizeof(outHeader{}) + // alloc allocates size bytes and returns a pointer to the new // segment. func (w *buffer) alloc(size uintptr) unsafe.Pointer { @@ -29,7 +35,24 @@ func (w *buffer) reset() { } func newBuffer(extra uintptr) buffer { - const hdrSize = unsafe.Sizeof(outHeader{}) buf := make(buffer, hdrSize, hdrSize+extra) return buf } + +// readBufPool is a pool of read request data +var readBufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, int(hdrSize)+maxWrite) + }, +} + +func newStreamingBuffer() *bytes.Buffer { + buf := bytes.NewBuffer(readBufPool.Get().([]byte)) + buf.Truncate(int(hdrSize)) + + return buf +} + +func returnBuffer(buf []byte) { + readBufPool.Put(buf) +} diff --git a/fs/serve.go b/fs/serve.go index e9fc5659..b85ef0c3 100644 --- a/fs/serve.go +++ b/fs/serve.go @@ -303,6 +303,16 @@ type HandleReader interface { Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error } +type HandleStreamingReader interface { + // StreamingRead requests to read data from the handle. + // + // In contrast to `HandleReader` this interface uses a + // StreamingReadResponse which implements the io.Writer + // interface. + + StreamingRead(ctx context.Context, req *fuse.ReadRequest, resp *fuse.StreamingReadResponse) error +} + type HandleWriter interface { // Write requests to write data into the handle at the given offset. // Store the amount of data written in resp.Size. @@ -1241,14 +1251,29 @@ func (c *Server) handleRequest(ctx context.Context, node Node, snode *serveNode, r.Respond(s) return nil } - h, ok := handle.(HandleReader) + if h, ok := handle.(HandleReader); ok { + if err := h.Read(ctx, r, s); err != nil { + return err + } + + done(s) + r.Respond(s) + return nil + } + h, ok := handle.(HandleStreamingReader) if !ok { err := handleNotReaderError{handle: handle} return err } - if err := h.Read(ctx, r, s); err != nil { + + s := fuse.NewStreamingReadResponse() + if err := h.StreamingRead(ctx, r, s); err != nil { return err } + + done(s) + r.StreamingRespond(s) + return nil } done(s) r.Respond(s) diff --git a/fs/serve_test.go b/fs/serve_test.go index e7f9cfb0..2f7c5a1b 100644 --- a/fs/serve_test.go +++ b/fs/serve_test.go @@ -351,6 +351,37 @@ func TestReadAllWithHandleRead(t *testing.T) { testReadAll(t, mnt.Dir+"/child") } +type readWithStreamingHandleRead struct { +} + +func (readWithStreamingHandleRead) Attr(ctx context.Context, a *fuse.Attr) error { + a.Mode = 0666 + a.Size = uint64(len(hi)) + + return nil +} + +func (readWithStreamingHandleRead) StreamingRead(ctx context.Context, req *fuse.ReadRequest, resp *fuse.StreamingReadResponse) error { + _, err := io.Copy(resp, bytes.NewReader([]byte(hi))) + if err != nil { + return fuse.EIO + } + + return nil +} + +func TestStreamingReadAllWithHandleRead(t *testing.T) { + t.Parallel() + mnt, err := fstestutil.MountedT(t, fstestutil.SimpleFS{&fstestutil.ChildMap{"child": readWithStreamingHandleRead{}}}, nil) + + if err != nil { + t.Fatal(err) + } + defer mnt.Close() + + testReadAll(t, mnt.Dir+"/child") +} + type readFlags struct { fstestutil.File fileFlags record.Recorder diff --git a/fuse.go b/fuse.go index 6db0ef29..aa53a170 100644 --- a/fuse.go +++ b/fuse.go @@ -1764,6 +1764,12 @@ func (r *ReadRequest) Respond(resp *ReadResponse) { r.respond(buf) } +func (r *ReadRequest) StreamingRespond(resp *StreamingReadResponse) { + buf := resp.Data() + r.respond(buf) + returnBuffer(buf) +} + // A ReadResponse is the response to a ReadRequest. type ReadResponse struct { Data []byte @@ -1773,6 +1779,28 @@ func (r *ReadResponse) String() string { return fmt.Sprintf("Read %d", len(r.Data)) } +func NewStreamingReadResponse() *StreamingReadResponse { + return &StreamingReadResponse{buf: newStreamingBuffer()} +} + +// A StreamingReadResponse is the response to a ReadRequests wich +// supports streaming the response via a io.Writer +type StreamingReadResponse struct { + buf *bytes.Buffer +} + +func (resp *StreamingReadResponse) Write(p []byte) (int, error) { + return resp.buf.Write(p) +} + +func (resp *StreamingReadResponse) Data() []byte { + return resp.buf.Bytes() +} + +func (r *StreamingReadResponse) String() string { + return fmt.Sprintf("Read %d", len(r.Data())) +} + type jsonReadResponse struct { Len uint64 }