From 3f507c4d790e7d5cbe315de326a058c9971f17b2 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 17 Nov 2023 14:10:05 -0800 Subject: [PATCH 1/4] Use new singularity GetFileDeals API New API returns slice of RangeFileDeads, each having a slice of deals for a file range. Uses singularity v0.5.10 --- go.mod | 2 +- go.sum | 4 +- integration/singularity/store.go | 82 +++++++++++++++++++----------- integration/test/motionlarity/.env | 2 +- 4 files changed, 55 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index a296599..77da408 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/filecoin-project/motion go 1.20 require ( - github.com/data-preservation-programs/singularity v0.5.9 + github.com/data-preservation-programs/singularity v0.5.10 github.com/filecoin-project/go-address v1.1.0 github.com/filecoin-project/go-state-types v0.12.0 github.com/gammazero/fsutil v0.0.1 diff --git a/go.sum b/go.sum index 2b48ca3..ab46550 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHH github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= -github.com/data-preservation-programs/singularity v0.5.9 h1:BuZBrNx9N2GHmCveJsoXHL0EAqdyixIdvzvFSt+oQmA= -github.com/data-preservation-programs/singularity v0.5.9/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4= +github.com/data-preservation-programs/singularity v0.5.10 h1:uPM6xk6lWP8ddrEo3eumbvX5p0gcyY1x92taEhqrfe8= +github.com/data-preservation-programs/singularity v0.5.10/go.mod h1:p3Morz6kp3e12dRJs1rkALFCHlFjbkmM+TwnRk7sWG4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/integration/singularity/store.go b/integration/singularity/store.go index 8256015..ee7fb5e 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -511,32 +511,42 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err return nil, err } - if len(getFileDealsRes.Payload) == 0 { + dealsForRanges := getFileDealsRes.GetPayload() + if len(dealsForRanges) == 0 { return descriptor, nil } - replicas := make([]blob.Replica, 0, len(getFileDealsRes.Payload)) - for _, deal := range getFileDealsRes.Payload { - updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) - if err != nil { - updatedAt = time.Time{} - } - piece := blob.Piece{ - Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), - LastUpdated: updatedAt, - PieceCID: deal.PieceCid, - Status: string(deal.State), + var replicas []blob.Replica + for i := range dealsForRanges { + dealsForRange := dealsForRanges[i].Deals + for _, deal := range dealsForRange { + updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) + if err != nil { + updatedAt = time.Time{} + } + piece := blob.Piece{ + Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), + LastUpdated: updatedAt, + PieceCID: deal.PieceCid, + Status: string(deal.State), + } + replicas = append(replicas, blob.Replica{ + Provider: deal.Provider, + Pieces: []blob.Piece{piece}, + }) } - replicas = append(replicas, blob.Replica{ - Provider: deal.Provider, - Pieces: []blob.Piece{piece}, - }) } descriptor.Replicas = replicas return descriptor, nil } -// Returns true if the file has at least 1 deal for every SP. +// Returns true if the file has at least 1 deal for every SP, for every range of the file. +// +// - If no file ranges, returns true. +// - If no storage providers, returns true. +// - If no deals in any file range, returns false. +// - If any range does not have a deal with at least one SP, returns false. +// - If all ranges have a deal with at least one SP, return true. func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (bool, error) { fileID, err := s.idMap.get(blobID) if err != nil { @@ -551,24 +561,34 @@ func (s *Store) hasDealForAllProviders(ctx context.Context, blobID blob.ID) (boo return false, fmt.Errorf("failed to get file deals: %w", err) } - // Make sure the file has at least 1 deal for every SP - for _, sp := range s.storageProviders { - foundDealForSP := false - for _, deal := range getFileDealsRes.Payload { - // Only check state for current provider - if deal.Provider != sp.String() { - continue - } + dealsForRanges := getFileDealsRes.GetPayload() - if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive { - foundDealForSP = true - break + // Make sure the file has at least 1 deal for every SP and every range of this file. + for i := range dealsForRanges { + dealsForRange := dealsForRanges[i].Deals + // Check that each SP has a deal. + for _, sp := range s.storageProviders { + // Return false if this range has no + if !storageProviderHasAnyDeal(sp, dealsForRange) { + return false, nil } } - if !foundDealForSP { - return false, nil - } } return true, nil } + +func storageProviderHasAnyDeal(sp address.Address, deals []*models.ModelDeal) bool { + // Find a deal for this SP. + for _, deal := range deals { + // Only check state for current provider + if deal.Provider != sp.String() { + continue + } + if deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive { + return true + } + } + // The storage provider did not have any of the deals. + return false +} diff --git a/integration/test/motionlarity/.env b/integration/test/motionlarity/.env index 3605694..be5d1a4 100644 --- a/integration/test/motionlarity/.env +++ b/integration/test/motionlarity/.env @@ -1,4 +1,4 @@ -SINGULARITY_REF=:v0.5.9 +SINGULARITY_REF=:v0.5.10 LOTUS_TEST='true' LOTUS_API=http://lotus:1234/rpc/v1 MOTION_PRICE_PER_GIB_EPOCH=0 From 8fe769ae9896cd54decbbcf02ccf144e59940d64 Mon Sep 17 00:00:00 2001 From: gammazero Date: Sat, 18 Nov 2023 02:14:03 -0800 Subject: [PATCH 2/4] Use new singularity GetFileDeals API --- integration/singularity/store.go | 65 ++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 8 deletions(-) diff --git a/integration/singularity/store.go b/integration/singularity/store.go index ee7fb5e..b71d938 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -516,25 +516,74 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err return descriptor, nil } - var replicas []blob.Replica + // In order to have a complete replica, each file range must be in a deal. + // The number of replicas is the lowest number of deals for all ranges. + // This means a single replica can have multiple providers. + // + // - If different ranges of one file are in different deals, then all of + // the deals make up one complete replica. + // + // - If every file range has 2 deals, then there are 2 complete replicas. + // + // - If some file ranges have 2 deals and some have 1, then there is one + // complete replica and one one partial replica. + + // Count the number of replicas of each file range, and record the lowest + // count across all ranges. + var n int for i := range dealsForRanges { - dealsForRange := dealsForRanges[i].Deals - for _, deal := range dealsForRange { + // ---- START DEBUG---- + fileRange := dealsForRanges[i].FileRange + deals := dealsForRanges[i].Deals + fmt.Println("---> File range", fileRange.Offset, "-", fileRange.Offset+fileRange.Length, "has", len(deals), "deals") + for _, deal := range deals { + fmt.Println(" -->> ID:", deal.ID, "DealID:", deal.DealID, "PieceCid", deal.PieceCid) + } + // ---- END DEBUG---- + + count := len(dealsForRanges[i].Deals) + if count == 0 { + // No complete replica because this file range is not in any deal. + return descriptor, nil + } + if n == -1 || count < n { + n = count + } + } + fmt.Println() + + // Collect information about about deals in each complete replica. + dealsSeen := map[int64]struct{}{} + replicas := make([]blob.Replica, 0, n) + for i := 0; i < n; i++ { + var pieces []blob.Piece + var providers []string + // Collect info for all deals in replica i. + for j := range dealsForRanges { + deal := dealsForRanges[j].Deals[i] + if _, seen := dealsSeen[deal.ID]; seen { + // Already saw this deal for a different range. + fmt.Println("---> already seen deal", deal.ID) + continue + } + dealsSeen[deal.ID] = struct{}{} updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) if err != nil { updatedAt = time.Time{} } - piece := blob.Piece{ + pieces = append(pieces, blob.Piece{ Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), LastUpdated: updatedAt, PieceCID: deal.PieceCid, Status: string(deal.State), - } - replicas = append(replicas, blob.Replica{ - Provider: deal.Provider, - Pieces: []blob.Piece{piece}, }) + providers = append(providers, deal.Provider) } + replicas = append(replicas, blob.Replica{ + // TODO: need to support multiple providers per replica + Provider: providers[0], + Pieces: pieces, + }) } descriptor.Replicas = replicas return descriptor, nil From d9a29cf77bf9caff7521b7712aa335c830027401 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 24 Nov 2023 16:00:02 -0800 Subject: [PATCH 3/4] File ranges in deals with different providers are part of different replica --- integration/singularity/store.go | 112 ++++++++++++++++--------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/integration/singularity/store.go b/integration/singularity/store.go index b71d938..1c242a0 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -473,6 +473,24 @@ func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) return NewReader(s.singularityClient, uint64(fileID), getFileRes.Payload.Size), nil } +// Describe returns a blob descriptor describing a blob's replica(s). +// +// Rules about replica composition: +// +// - If twe different ranges are in deals with two separate providers, then +// these are considered to be in separate replicas. This limitation is imposed +// to limit complexity. +// +// - In order to have a complete replica, each file range must be in a deal +// with the same provider. +// +// - If different ranges of one file are in different deals, then all of the +// deals make up one complete replica. +// +// - If every file range has 2 deals, then there are 2 complete replicas. +// +// - If some file ranges have 2 deals and some have 1, then there is one +// complete replica and one one partial replica. func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) { fileID, err := s.idMap.get(id) if err != nil { @@ -516,75 +534,61 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err return descriptor, nil } - // In order to have a complete replica, each file range must be in a deal. - // The number of replicas is the lowest number of deals for all ranges. - // This means a single replica can have multiple providers. - // - // - If different ranges of one file are in different deals, then all of - // the deals make up one complete replica. - // - // - If every file range has 2 deals, then there are 2 complete replicas. - // - // - If some file ranges have 2 deals and some have 1, then there is one - // complete replica and one one partial replica. - - // Count the number of replicas of each file range, and record the lowest - // count across all ranges. - var n int + providerReplicas := make(map[string][]blob.Replica) + var prevDealCount, totalReplicas int + + // All ranges in a replica are handled by the same provider. for i := range dealsForRanges { - // ---- START DEBUG---- fileRange := dealsForRanges[i].FileRange deals := dealsForRanges[i].Deals - fmt.Println("---> File range", fileRange.Offset, "-", fileRange.Offset+fileRange.Length, "has", len(deals), "deals") - for _, deal := range deals { - fmt.Println(" -->> ID:", deal.ID, "DealID:", deal.DealID, "PieceCid", deal.PieceCid) + provRangeCounts := make(map[string]int) + if i != 0 && len(deals) != prevDealCount { + logger.Warnw("File range has different number of deals that previous file range", + "range", fileRange, "deals", len(deals), "previousDeals", prevDealCount) } - // ---- END DEBUG---- + prevDealCount = len(deals) - count := len(dealsForRanges[i].Deals) - if count == 0 { - // No complete replica because this file range is not in any deal. - return descriptor, nil - } - if n == -1 || count < n { - n = count - } - } - fmt.Println() - - // Collect information about about deals in each complete replica. - dealsSeen := map[int64]struct{}{} - replicas := make([]blob.Replica, 0, n) - for i := 0; i < n; i++ { - var pieces []blob.Piece - var providers []string - // Collect info for all deals in replica i. - for j := range dealsForRanges { - deal := dealsForRanges[j].Deals[i] - if _, seen := dealsSeen[deal.ID]; seen { - // Already saw this deal for a different range. - fmt.Println("---> already seen deal", deal.ID) - continue - } - dealsSeen[deal.ID] = struct{}{} + for _, deal := range deals { updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) if err != nil { updatedAt = time.Time{} } - pieces = append(pieces, blob.Piece{ + piece := blob.Piece{ Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), LastUpdated: updatedAt, PieceCID: deal.PieceCid, Status: string(deal.State), - }) - providers = append(providers, deal.Provider) + } + + // Deals with different providers are alrays for different replicas. + replicas := providerReplicas[deal.Provider] + // Get number of deals so far for this range for this provider. + j := provRangeCounts[deal.Provider] + if len(replicas) == j { + // Need a new replica since this file range has more deals than + // there are replicas. + // + // If there are multiple deals for the same file range, then + // these deals are for separate replicas. + providerReplicas[deal.Provider] = append(replicas, blob.Replica{ + Provider: deal.Provider, + Pieces: []blob.Piece{piece}, + }) + provRangeCounts[deal.Provider]++ + totalReplicas++ + } else { + // Deal is part of an existing replica that a previous range is part of. + replicas[j].Pieces = append(replicas[j].Pieces, piece) + } } - replicas = append(replicas, blob.Replica{ - // TODO: need to support multiple providers per replica - Provider: providers[0], - Pieces: pieces, - }) } + + replicas := make([]blob.Replica, 0, totalReplicas) + for prov, provReplicas := range providerReplicas { + replicas = append(replicas, provReplicas...) + logger.Infof("Provider %s has %d replicas", prov, len(provReplicas)) + } + descriptor.Replicas = replicas return descriptor, nil } From a29f6fb5cb4e037f73551dffbd3246373dd931b5 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 26 Dec 2023 19:28:41 -0800 Subject: [PATCH 4/4] update test --- integration/test/integration_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/test/integration_test.go b/integration/test/integration_test.go index a061a8f..6cda07d 100644 --- a/integration/test/integration_test.go +++ b/integration/test/integration_test.go @@ -100,7 +100,7 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) { var decoded api.GetStatusResponse err = jsonResp.Decode(&decoded) assert.NoError(c, err) - assert.Len(t, decoded.Replicas, 2) + assert.Len(t, decoded.Replicas, 1) if len(decoded.Replicas) == 2 { assert.Len(c, decoded.Replicas[0].Pieces, 1) assert.Len(c, decoded.Replicas[1].Pieces, 1) @@ -146,9 +146,9 @@ func TestRoundTripPutStatusAndFullStorage(t *testing.T) { var decoded api.GetStatusResponse err = jsonResp.Decode(&decoded) assert.NoError(c, err) - assert.Len(c, decoded.Replicas, 2) + assert.Len(c, decoded.Replicas, 1) for _, replica := range decoded.Replicas { - assert.Len(c, replica.Pieces, 1) + assert.Len(c, replica.Pieces, 4) assert.Contains(c, []string{"published", "active"}, replica.Pieces[0].Status) } }, 2*time.Minute, 5*time.Second, "published deals")