From 0b450a88f81b238064ace9af2c1890486f19c885 Mon Sep 17 00:00:00 2001 From: Kenjiro Nakayama Date: Sat, 24 Dec 2016 16:23:02 +0900 Subject: [PATCH] distributor: write data to peers concurrently --- distributor/storage.go | 60 +++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/distributor/storage.go b/distributor/storage.go index 5a60e90..3fe4a36 100644 --- a/distributor/storage.go +++ b/distributor/storage.go @@ -2,7 +2,9 @@ package distributor import ( "errors" + "runtime" "sync" + "sync/atomic" "time" "github.com/coreos/torus" @@ -211,31 +213,51 @@ func (d *Distributor) WriteBlock(ctx context.Context, i torus.BlockRef, data []b } return torus.ErrNoPeer case torus.WriteAll: - toWrite := peers.Replication - for _, p := range peers.Peers { - var err error - if p == d.UUID() { - err = d.blocks.WriteBlock(ctx, i, data) - } else { - err = d.client.PutBlock(ctx, p, i, data) - } - if err != nil { - clog.Noticef("error WriteAll to peer %s: %s", p, err) - } else { - toWrite-- - } - if toWrite == 0 { - return nil - } + var wg sync.WaitGroup + toWrite := int32(peers.Replication) + peerChan := make(chan string, len(peers.Peers)) + for index := 0; int32(index) < toWrite && index < len(peers.Peers) && index < runtime.GOMAXPROCS(0); index++ { + wg.Add(1) + go d.writer(ctx, i, data, &toWrite, peerChan, &wg) + } + + // Keep the number of writer threads less than toWrite. If you create the threads + // more than target peers, extra peers would store the data. + for index := 0; index < len(peers.Peers) && int32(index) < atomic.LoadInt32(&toWrite); index++ { + peerChan <- peers.Peers[index] } - if toWrite == peers.Replication { - clog.Noticef("error WriteAll to all peers") + close(peerChan) + wg.Wait() + if toWrite == int32(peers.Replication) { + clog.Errorf("error WriteAll to all peers") return torus.ErrNoPeer } - clog.Warningf("only wrote block to %d/%d peers", (peers.Replication - toWrite), peers.Replication) + if toWrite > 0 { + clog.Warningf("only wrote block to %d/%d peers", (int32(peers.Replication) - toWrite), peers.Replication) + } } return nil } +func (d *Distributor) writer(ctx context.Context, i torus.BlockRef, data []byte, toWrite *int32, peerChan chan string, wg *sync.WaitGroup) { + defer wg.Done() + for { + p, ok := <-peerChan + if !ok { + return + } + var err error + if p == d.UUID() { + err = d.blocks.WriteBlock(ctx, i, data) + } else { + err = d.client.PutBlock(ctx, p, i, data) + } + if err != nil { + clog.Noticef("error WriteAll to peer %s: %s", p, err) + } else { + atomic.AddInt32(toWrite, -1) + } + } +} func (d *Distributor) WriteBuf(ctx context.Context, i torus.BlockRef) ([]byte, error) { return d.blocks.WriteBuf(ctx, i)