Skip to content
This repository was archived by the owner on Sep 22, 2020. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions distributor/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package distributor

import (
"errors"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/coreos/torus"
Expand Down Expand Up @@ -211,31 +213,51 @@ func (d *Distributor) WriteBlock(ctx context.Context, i torus.BlockRef, data []b
}
return torus.ErrNoPeer
case torus.WriteAll:
toWrite := peers.Replication
for _, p := range peers.Peers {
var err error
if p == d.UUID() {
err = d.blocks.WriteBlock(ctx, i, data)
} else {
err = d.client.PutBlock(ctx, p, i, data)
}
if err != nil {
clog.Noticef("error WriteAll to peer %s: %s", p, err)
} else {
toWrite--
}
if toWrite == 0 {
return nil
}
var wg sync.WaitGroup
toWrite := int32(peers.Replication)
peerChan := make(chan string, len(peers.Peers))
for index := 0; int32(index) < toWrite && index < len(peers.Peers) && index < runtime.GOMAXPROCS(0); index++ {
wg.Add(1)
go d.writer(ctx, i, data, &toWrite, peerChan, &wg)
}

// Keep the number of writer threads less than toWrite. If you create the threads
// more than target peers, extra peers would store the data.
for index := 0; index < len(peers.Peers) && int32(index) < atomic.LoadInt32(&toWrite); index++ {
peerChan <- peers.Peers[index]
}
if toWrite == peers.Replication {
clog.Noticef("error WriteAll to all peers")
close(peerChan)
wg.Wait()
if toWrite == int32(peers.Replication) {
clog.Errorf("error WriteAll to all peers")
return torus.ErrNoPeer
}
clog.Warningf("only wrote block to %d/%d peers", (peers.Replication - toWrite), peers.Replication)
if toWrite > 0 {
clog.Warningf("only wrote block to %d/%d peers", (int32(peers.Replication) - toWrite), peers.Replication)
}
}
return nil
}
func (d *Distributor) writer(ctx context.Context, i torus.BlockRef, data []byte, toWrite *int32, peerChan chan string, wg *sync.WaitGroup) {
defer wg.Done()
for {
p, ok := <-peerChan
if !ok {
return
}
var err error
if p == d.UUID() {
err = d.blocks.WriteBlock(ctx, i, data)
} else {
err = d.client.PutBlock(ctx, p, i, data)
}
if err != nil {
clog.Noticef("error WriteAll to peer %s: %s", p, err)
} else {
atomic.AddInt32(toWrite, -1)
}
}
}

func (d *Distributor) WriteBuf(ctx context.Context, i torus.BlockRef) ([]byte, error) {
return d.blocks.WriteBuf(ctx, i)
Expand Down