Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 73 additions & 210 deletions cmd/estuary-shuttle/main.go

Large diffs are not rendered by default.

42 changes: 21 additions & 21 deletions cmd/estuary-shuttle/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (d *Shuttle) addPin(ctx context.Context, contid uint, data cid.Cid, user ui

op := &pinner.PinningOperation{
Obj: data,
ContId: contid,
ContentID: contid,
UserId: user,
Status: types.PinningStatusQueued,
SkipLimiter: skipLimiter,
Expand Down Expand Up @@ -234,12 +234,12 @@ func (d *Shuttle) handleRpcComputeCommP(ctx context.Context, cmd *drpc.ComputeCo
})
}

func (s *Shuttle) sendSplitContentComplete(ctx context.Context, cont uint) {
func (s *Shuttle) sendSplitContentComplete(ctx context.Context, content uint) {
if err := s.sendRpcMessage(ctx, &drpc.Message{
Op: drpc.OP_SplitComplete,
Params: drpc.MsgParams{
SplitComplete: &drpc.SplitComplete{
ID: cont,
ID: content,
},
},
}); err != nil {
Expand Down Expand Up @@ -334,14 +334,14 @@ func (s *Shuttle) handleRpcAggregateStagedContent(ctx context.Context, aggregate
}

for _, c := range aggregate.Contents {
var cont Pin
if err := s.DB.First(&cont, "content = ?", c.ID).Error; err != nil {
var content Pin
if err := s.DB.First(&content, "content = ?", c.ID).Error; err != nil {
// TODO: implies we dont have all the content locally we are being
// asked to aggregate, this is an important error to handle
return err
}

if !cont.Active || cont.Failed {
if !content.Active || content.Failed {
return fmt.Errorf("content i am being asked to aggregate is not pinned: %d", c.ID)
}
}
Expand Down Expand Up @@ -548,58 +548,58 @@ func (s *Shuttle) handleRpcUnpinContent(ctx context.Context, req *drpc.UnpinCont
return nil
}

func (s *Shuttle) markStartUnpin(cont uint) bool {
func (s *Shuttle) markStartUnpin(content uint) bool {
s.unpinLk.Lock()
defer s.unpinLk.Unlock()

if s.unpinInProgress[cont] {
if s.unpinInProgress[content] {
return false
}

s.unpinInProgress[cont] = true
s.unpinInProgress[content] = true
return true
}

func (s *Shuttle) finishUnpin(cont uint) {
func (s *Shuttle) finishUnpin(content uint) {
s.unpinLk.Lock()
defer s.unpinLk.Unlock()
delete(s.unpinInProgress, cont)
delete(s.unpinInProgress, content)
}

func (s *Shuttle) markStartAggr(cont uint) bool {
func (s *Shuttle) markStartAggr(content uint) bool {
s.aggrLk.Lock()
defer s.aggrLk.Unlock()

if s.aggrInProgress[cont] {
if s.aggrInProgress[content] {
return false
}

s.aggrInProgress[cont] = true
s.aggrInProgress[content] = true
return true
}

func (s *Shuttle) finishAggr(cont uint) {
func (s *Shuttle) finishAggr(content uint) {
s.aggrLk.Lock()
defer s.aggrLk.Unlock()
delete(s.aggrInProgress, cont)
delete(s.aggrInProgress, content)
}

func (s *Shuttle) markStartSplit(cont uint) bool {
func (s *Shuttle) markStartSplit(content uint) bool {
s.splitLk.Lock()
defer s.splitLk.Unlock()

if s.splitsInProgress[cont] {
if s.splitsInProgress[content] {
return false
}

s.splitsInProgress[cont] = true
s.splitsInProgress[content] = true
return true
}

func (s *Shuttle) finishSplit(cont uint) {
func (s *Shuttle) finishSplit(content uint) {
s.splitLk.Lock()
defer s.splitLk.Unlock()
delete(s.splitsInProgress, cont)
delete(s.splitsInProgress, content)
}

func (s *Shuttle) handleRpcSplitContent(ctx context.Context, req *drpc.SplitContent) error {
Expand Down
4 changes: 2 additions & 2 deletions contentmgr/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ func (cm *ContentManager) clearUnreferencedObjects(ctx context.Context, objs []*
return nil
}

func (cm *ContentManager) objectsForPin(ctx context.Context, cont uint) ([]*util.Object, error) {
func (cm *ContentManager) objectsForPin(ctx context.Context, content uint) ([]*util.Object, error) {
var objects []*util.Object
if err := cm.DB.Model(util.ObjRef{}).Where("content = ?", cont).
if err := cm.DB.Model(util.ObjRef{}).Where("content = ?", content).
Joins("left join objects on obj_refs.object = objects.id").
Scan(&objects).Error; err != nil {
return nil, err
Expand Down
28 changes: 14 additions & 14 deletions contentmgr/offloading.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func (cm *ContentManager) getLastAccesses(ctx context.Context, candidates []remo

// TODO: this is only looking at the root, maybe we could find an efficient way to check more of the objects?
// additionally, for aggregates, we should check each aggregated item under the root
func (cm *ContentManager) getLastAccessForContent(cont util.Content) (time.Time, error) {
func (cm *ContentManager) getLastAccessForContent(content util.Content) (time.Time, error) {
var obj util.Object
if err := cm.DB.First(&obj, "cid = ?", cont.Cid).Error; err != nil {
if err := cm.DB.First(&obj, "cid = ?", content.Cid).Error; err != nil {
return time.Time{}, err
}

Expand All @@ -133,18 +133,18 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
for _, c := range conts {
var cont util.Content
if err := cm.DB.First(&cont, "id = ?", c).Error; err != nil {
var content util.Content
if err := cm.DB.First(&content, "id = ?", c).Error; err != nil {
return 0, err
}

if cont.Location == constants.ContentLocationLocal {
local = append(local, cont.ID)
if content.Location == constants.ContentLocationLocal {
local = append(local, content.ID)
} else {
remote[cont.Location] = append(remote[cont.Location], cont.ID)
remote[content.Location] = append(remote[content.Location], content.ID)
}

if cont.AggregatedIn > 0 {
if content.AggregatedIn > 0 {
return 0, fmt.Errorf("cannot offload aggregated content")
}

Expand All @@ -156,7 +156,7 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in
return 0, err
}

if cont.Aggregate {
if content.Aggregate {
if err := cm.DB.Model(&util.Content{}).Where("aggregated_in = ?", c).Update("offloaded", true).Error; err != nil {
return 0, err
}
Expand All @@ -176,10 +176,10 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in
}

for _, c := range children {
if cont.Location == constants.ContentLocationLocal {
if content.Location == constants.ContentLocationLocal {
local = append(local, c.ID)
} else {
remote[cont.Location] = append(remote[cont.Location], c.ID)
remote[content.Location] = append(remote[content.Location], c.ID)
}
}
}
Expand Down Expand Up @@ -233,13 +233,13 @@ func (cm *ContentManager) GetRemovalCandidates(ctx context.Context, all bool, lo
q = q.Where("user_id in ?", users)
}

var conts []util.Content
if err := q.Scan(&conts).Error; err != nil {
var contents []util.Content
if err := q.Scan(&contents).Error; err != nil {
return nil, fmt.Errorf("scanning removal candidates failed: %w", err)
}

var toOffload []removalCandidateInfo
for _, c := range conts {
for _, c := range contents {
good, progress, failed, err := cm.contentIsProperlyReplicated(ctx, c.ID)
if err != nil {
return nil, xerrors.Errorf("failed to check replication of %d: %w", c.ID, err)
Expand Down
Loading