diff --git a/cmd/estuary-shuttle/main.go b/cmd/estuary-shuttle/main.go index fba329f6..c2fa380b 100644 --- a/cmd/estuary-shuttle/main.go +++ b/cmd/estuary-shuttle/main.go @@ -65,7 +65,6 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipfs/go-metrics-interface" uio "github.com/ipfs/go-unixfs/io" - "github.com/ipld/go-car" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" @@ -78,7 +77,7 @@ var appVersion string var log = logging.Logger("shuttle").With("app_version", appVersion) const ( - ColUuid = "coluuid" + ColUuid = "uuid" ColDir = "dir" ) @@ -1139,13 +1138,14 @@ func (s *Shuttle) ServeAPI() error { return nil }) - content := e.Group("/content") - content.Use(s.AuthRequired(util.PermLevelUpload)) - content.POST("/add", withUser(s.handleAdd)) - content.POST("/add-car", util.WithContentLengthCheck(withUser(s.handleAddCar))) - content.GET("/read/:cont", withUser(s.handleReadContent)) - content.POST("/importdeal", withUser(s.handleImportDeal)) - //content.POST("/add-ipfs", withUser(d.handleAddIpfs)) + contents := e.Group("/contents") + contents.Use(s.AuthRequired(util.PermLevelUpload)) + contents.POST("/", withUser(s.handleAdd)) + contents.GET("/:cid/read/", withUser(s.handleReadContent)) + //contents.POST("/add-ipfs", withUser(d.handleAddIpfs)) + + deals := e.Group("/deals") + deals.GET("/import", withUser(s.handleImportDeal)) admin := e.Group("/admin") admin.Use(s.AuthRequired(util.PermLevelAdmin)) @@ -1237,12 +1237,16 @@ func (s *Shuttle) handleLogLevel(c echo.Context) error { // handleAdd godoc // @Summary Upload a file // @Description This endpoint uploads a file. -// @Tags content +// @Tags shuttle // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Router /content/add [post] +// @Param type query type false "Type of content to upload ('car' or 'file'). Defaults to 'file'" +// @Param car body string false "Car file to upload" +// @Param filename formData string false "Filename to use for upload" +// @Param data formData file false "File to upload" +// @Router /content [post] func (s *Shuttle) handleAdd(c echo.Context, u *User) error { ctx := c.Request().Context() @@ -1250,68 +1254,51 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { return err } - form, err := c.MultipartForm() + bsid, bs, err := s.StagingMgr.AllocNew() if err != nil { return err } - defer form.RemoveAll() - mpf, err := c.FormFile("data") - if err != nil { - return err + defer func() { + go func() { + if err := s.StagingMgr.CleanUp(bsid); err != nil { + log.Errorf("failed to clean up staging blockstore: %s", err) + } + }() + }() + + bserv := blockservice.New(bs, nil) + dserv := merkledag.NewDAGService(bserv) + + uploadType := c.QueryParam("type") + if uploadType == "" { + uploadType = "file" } + uploadedContent, err := util.LoadContentFromRequest(c, ctx, uploadType, bs, dserv) // if splitting is disabled and uploaded content size is greater than content size limit // reject the upload, as it will only get stuck and deals will never be made for it - if !u.FlagSplitContent() && mpf.Size > s.shuttleConfig.Content.MaxSize { + if !u.FlagSplitContent() && uploadedContent.Length > s.shuttleConfig.Content.MaxSize { return &util.HttpError{ Code: http.StatusBadRequest, Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, - Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", mpf.Size, s.shuttleConfig.Content.MaxSize), + Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", uploadedContent.Length, s.shuttleConfig.Content.MaxSize), } } - filename := mpf.Filename - fi, err := mpf.Open() - if err != nil { - return err - } - defer fi.Close() - cic := util.ContentInBucket{ BucketID: c.QueryParam(ColUuid), BucketDir: c.QueryParam(ColDir), } - bsid, bs, err := s.StagingMgr.AllocNew() - if err != nil { - return err - } - - defer func() { - go func() { - if err := s.StagingMgr.CleanUp(bsid); err != nil { - log.Errorf("failed to clean up staging blockstore: %s", err) - } - }() - }() - - bserv := blockservice.New(bs, nil) - dserv := merkledag.NewDAGService(bserv) - - nd, err := s.importFile(ctx, dserv, fi) - if err != nil { - return err - } - - contid, err := s.createContent(ctx, u, nd.Cid(), filename, cic) + contid, err := s.createContent(ctx, u, uploadedContent.CID, uploadedContent.Filename, cic) if err != nil { return err } pin := &Pin{ Content: contid, - Cid: util.DbCID{CID: nd.Cid()}, + Cid: util.DbCID{CID: uploadedContent.CID}, UserID: u.ID, Active: false, Pinning: true, @@ -1321,7 +1308,7 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { return err } - totalSize, objects, err := s.addDatabaseTrackingToContent(ctx, contid, dserv, bs, nd.Cid(), func(int64) {}) + totalSize, objects, err := s.addDatabaseTrackingToContent(ctx, contid, dserv, bs, uploadedContent.CID, func(int64) {}) if err != nil { return xerrors.Errorf("encountered problem computing object references: %w", err) } @@ -1330,16 +1317,16 @@ func (s *Shuttle) handleAdd(c echo.Context, u *User) error { return xerrors.Errorf("failed to move data from staging to main blockstore: %w", err) } - s.sendPinCompleteMessage(ctx, contid, totalSize, objects, nd.Cid()) + s.sendPinCompleteMessage(ctx, contid, totalSize, objects, uploadedContent.CID) - if err := s.Provide(ctx, nd.Cid()); err != nil { + if err := s.Provide(ctx, uploadedContent.CID); err != nil { log.Warnf("failed to provide: %+v", err) } return c.JSON(http.StatusOK, &util.ContentAddResponse{ - Cid: nd.Cid().String(), - RetrievalURL: util.CreateDwebRetrievalURL(nd.Cid().String()), - EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(nd.Cid().String()), + Cid: uploadedContent.CID.String(), + RetrievalURL: util.CreateDwebRetrievalURL(uploadedContent.CID.String()), + EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(uploadedContent.CID.String()), EstuaryId: contid, Providers: s.addrsForShuttle(), }) @@ -1371,130 +1358,6 @@ func (s *Shuttle) Provide(ctx context.Context, c cid.Cid) error { return nil } -// handleAddCar godoc -// @Summary Upload content via a car file -// @Description This endpoint uploads content via a car file -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Router /content/add-car [post] -func (s *Shuttle) handleAddCar(c echo.Context, u *User) error { - ctx := c.Request().Context() - - if err := util.ErrorIfContentAddingDisabled(s.isContentAddingDisabled(u)); err != nil { - return err - } - - // if splitting is disabled and uploaded content size is greater than content size limit - // reject the upload, as it will only get stuck and deals will never be made for it - // if !u.FlagSplitContent() { - // bdWriter := &bytes.Buffer{} - // bdReader := io.TeeReader(c.Request().Body, bdWriter) - - // bdSize, err := io.Copy(ioutil.Discard, bdReader) - // if err != nil { - // return err - // } - - // if bdSize > util.MaxDealContentSize { - // return &util.HttpError{ - // Code: http.StatusBadRequest, - // Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, - // Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.MaxDealContentSize), - // } - // } - - // c.Request().Body = ioutil.NopCloser(bdWriter) - // } - - bsid, bs, err := s.StagingMgr.AllocNew() - if err != nil { - return err - } - - defer func() { - go func() { - if err := s.StagingMgr.CleanUp(bsid); err != nil { - log.Errorf("failed to clean up staging blockstore: %s", err) - } - }() - }() - - defer c.Request().Body.Close() - header, err := s.loadCar(ctx, bs, c.Request().Body) - if err != nil { - return err - } - - if len(header.Roots) != 1 { - // if someone wants this feature, let me know - return c.JSON(400, map[string]string{"error": "cannot handle uploading car files with multiple roots"}) - } - - // TODO: how to specify filename? - filename := header.Roots[0].String() - if qpname := c.QueryParam("filename"); qpname != "" { - filename = qpname - } - - bserv := blockservice.New(bs, nil) - dserv := merkledag.NewDAGService(bserv) - - root := header.Roots[0] - - contid, err := s.createContent(ctx, u, root, filename, util.ContentInBucket{ - BucketID: c.QueryParam(ColUuid), - BucketDir: c.QueryParam(ColDir), - }) - if err != nil { - return err - } - - pin := &Pin{ - Content: contid, - Cid: util.DbCID{CID: root}, - UserID: u.ID, - Active: false, - Pinning: true, - } - - if err := s.DB.Create(pin).Error; err != nil { - return err - } - - totalSize, objects, err := s.addDatabaseTrackingToContent(ctx, contid, dserv, bs, root, func(int64) {}) - if err != nil { - return xerrors.Errorf("encountered problem computing object references: %w", err) - } - - if err := util.DumpBlockstoreTo(ctx, s.Tracer, bs, s.Node.Blockstore); err != nil { - return xerrors.Errorf("failed to move data from staging to main blockstore: %w", err) - } - - s.sendPinCompleteMessage(ctx, contid, totalSize, objects, root) - - if err := s.Provide(ctx, root); err != nil { - log.Warn(err) - } - - return c.JSON(http.StatusOK, &util.ContentAddResponse{ - Cid: root.String(), - RetrievalURL: util.CreateDwebRetrievalURL(root.String()), - EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(root.String()), - EstuaryId: contid, - Providers: s.addrsForShuttle(), - }) -} - -func (s *Shuttle) loadCar(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (*car.CarHeader, error) { - _, span := s.Tracer.Start(ctx, "loadCar") - defer span.End() - - return car.LoadCar(ctx, bs, r) -} - func (s *Shuttle) addrsForShuttle() []string { var out []string for _, a := range s.Node.Host.Addrs() { @@ -1508,9 +1371,9 @@ func (s *Shuttle) createContent(ctx context.Context, u *User, root cid.Cid, file data, err := json.Marshal(util.ContentCreateBody{ ContentInBucket: cic, - Root: root.String(), - Name: filename, - Location: s.shuttleHandle, + Root: root.String(), + Name: filename, + Location: s.shuttleHandle, }) if err != nil { return 0, err @@ -1562,7 +1425,7 @@ func (s *Shuttle) shuttleCreateContent(ctx context.Context, uid uint, root cid.C Name: filename, Location: s.shuttleHandle, }, - Buckets: cols, + Buckets: cols, DagSplitRoot: dagsplitroot, User: uid, }) @@ -1619,15 +1482,15 @@ func (d *Shuttle) doPinning(ctx context.Context, op *pinner.PinningOperation, cb dserv := merkledag.NewDAGService(bserv) dsess := dserv.Session(ctx) - totalSize, objects, err := d.addDatabaseTrackingToContent(ctx, op.ContId, dsess, d.Node.Blockstore, op.Obj, cb) + totalSize, objects, err := d.addDatabaseTrackingToContent(ctx, op.ContentID, dsess, d.Node.Blockstore, op.Obj, cb) if err != nil { - return errors.Wrapf(err, "failed to addDatabaseTrackingToContent - contID(%d), cid(%s)", op.ContId, op.Obj.String()) + return errors.Wrapf(err, "failed to addDatabaseTrackingToContent - contID(%d), cid(%s)", op.ContentID, op.Obj.String()) } - d.sendPinCompleteMessage(ctx, op.ContId, totalSize, objects, op.Obj) + d.sendPinCompleteMessage(ctx, op.ContentID, totalSize, objects, op.Obj) if err := d.Provide(ctx, op.Obj); err != nil { - return errors.Wrapf(err, "failed to provide - contID(%d), cid(%s)", op.ContId, op.Obj.String()) + return errors.Wrapf(err, "failed to provide - contID(%d), cid(%s)", op.ContentID, op.Obj.String()) } return nil } @@ -1751,10 +1614,10 @@ func (d *Shuttle) addDatabaseTrackingToContent(ctx context.Context, contid uint, return totalSize, objects, nil } -func (d *Shuttle) onPinStatusUpdate(cont uint, location string, status types.PinningStatus) error { - log.Debugf("updating pin status: %d %s", cont, status) +func (d *Shuttle) onPinStatusUpdate(content uint, location string, status types.PinningStatus) error { + log.Debugf("updating pin status: %d %s", content, status) if status == types.PinningStatusFailed { - if err := d.DB.Model(Pin{}).Where("content = ?", cont).UpdateColumns(map[string]interface{}{ + if err := d.DB.Model(Pin{}).Where("content = ?", content).UpdateColumns(map[string]interface{}{ "pinning": false, "active": false, "failed": true, @@ -1768,7 +1631,7 @@ func (d *Shuttle) onPinStatusUpdate(cont uint, location string, status types.Pin Op: drpc.OP_UpdatePinStatus, Params: drpc.MsgParams{ UpdatePinStatus: &drpc.UpdatePinStatus{ - DBID: cont, + DBID: content, Status: status, }, }, @@ -1803,20 +1666,20 @@ func (s *Shuttle) refreshPinQueue() error { func (s *Shuttle) addPinToQueue(p Pin, peers []*peer.AddrInfo, replace uint) { op := &pinner.PinningOperation{ - ContId: p.Content, - UserId: p.UserID, - Obj: p.Cid.CID, - Peers: peers, - Started: p.CreatedAt, - Status: types.PinningStatusQueued, - Replace: replace, + ContentID: p.Content, + UserId: p.UserID, + Obj: p.Cid.CID, + Peers: peers, + Started: p.CreatedAt, + Status: types.PinningStatusQueued, + Replace: replace, } /* s.pinLk.Lock() // TODO: check if we are overwriting anything here - s.pinJobs[cont.ID] = op + s.pinJobs[content.ID] = op s.pinLk.Unlock() */ @@ -1859,7 +1722,7 @@ func (s *Shuttle) handleHealth(c echo.Context) error { // handleGetNetAddress godoc // @Summary Net Addrs // @Description This endpoint is used to get net addrs -// @Tags net +// @Tags shuttle // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -2009,26 +1872,26 @@ func (s *Shuttle) GarbageCollect(ctx context.Context) error { // handleReadContent godoc // @Summary Read content // @Description This endpoint reads content from the blockstore -// @Tags content +// @Tags shuttle // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Param cont path string true "CID" -// @Router /content/read/{cont} [get] +// @Param cid path string true "CID to be read" +// @Router /content/{cid}/read [get] func (s *Shuttle) handleReadContent(c echo.Context, u *User) error { - cont, err := strconv.Atoi(c.Param("cont")) + content, err := strconv.Atoi(c.Param("cid")) if err != nil { return err } var pin Pin - if err := s.DB.First(&pin, "content = ?", cont).Error; err != nil { + if err := s.DB.First(&pin, "content = ?", content).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, Reason: util.ERR_RECORD_NOT_FOUND, - Details: fmt.Sprintf("content: %d record not found in database", cont), + Details: fmt.Sprintf("content: %d record not found in database", content), } } return err @@ -2127,18 +1990,18 @@ func (s *Shuttle) handleContentHealthCheck(c echo.Context) error { func (s *Shuttle) handleResendPinComplete(c echo.Context) error { ctx := c.Request().Context() - cont, err := strconv.Atoi(c.Param("content")) + content, err := strconv.Atoi(c.Param("content")) if err != nil { return err } var p Pin - if err := s.DB.First(&p, "content = ?", cont).Error; err != nil { + if err := s.DB.First(&p, "content = ?", content).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, Reason: util.ERR_RECORD_NOT_FOUND, - Details: fmt.Sprintf("content: %d record not found in database", cont), + Details: fmt.Sprintf("content: %d record not found in database", content), } } return err @@ -2274,13 +2137,13 @@ type importDealBody struct { // handleImportDeal godoc // @Summary Import a deal // @Description This endpoint imports a deal into the shuttle. -// @Tags content +// @Tags shuttle // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError // @Param body body main.importDealBody true "Import a deal" -// @Router /content/importdeal [post] +// @Router /deals/import [post] func (s *Shuttle) handleImportDeal(c echo.Context, u *User) error { ctx, span := s.Tracer.Start(c.Request().Context(), "importDeal") defer span.End() diff --git a/cmd/estuary-shuttle/rpc.go b/cmd/estuary-shuttle/rpc.go index 3833d5c3..4dfd4861 100644 --- a/cmd/estuary-shuttle/rpc.go +++ b/cmd/estuary-shuttle/rpc.go @@ -163,7 +163,7 @@ func (d *Shuttle) addPin(ctx context.Context, contid uint, data cid.Cid, user ui op := &pinner.PinningOperation{ Obj: data, - ContId: contid, + ContentID: contid, UserId: user, Status: types.PinningStatusQueued, SkipLimiter: skipLimiter, @@ -234,12 +234,12 @@ func (d *Shuttle) handleRpcComputeCommP(ctx context.Context, cmd *drpc.ComputeCo }) } -func (s *Shuttle) sendSplitContentComplete(ctx context.Context, cont uint) { +func (s *Shuttle) sendSplitContentComplete(ctx context.Context, content uint) { if err := s.sendRpcMessage(ctx, &drpc.Message{ Op: drpc.OP_SplitComplete, Params: drpc.MsgParams{ SplitComplete: &drpc.SplitComplete{ - ID: cont, + ID: content, }, }, }); err != nil { @@ -334,14 +334,14 @@ func (s *Shuttle) handleRpcAggregateStagedContent(ctx context.Context, aggregate } for _, c := range aggregate.Contents { - var cont Pin - if err := s.DB.First(&cont, "content = ?", c.ID).Error; err != nil { + var content Pin + if err := s.DB.First(&content, "content = ?", c.ID).Error; err != nil { // TODO: implies we dont have all the content locally we are being // asked to aggregate, this is an important error to handle return err } - if !cont.Active || cont.Failed { + if !content.Active || content.Failed { return fmt.Errorf("content i am being asked to aggregate is not pinned: %d", c.ID) } } @@ -548,58 +548,58 @@ func (s *Shuttle) handleRpcUnpinContent(ctx context.Context, req *drpc.UnpinCont return nil } -func (s *Shuttle) markStartUnpin(cont uint) bool { +func (s *Shuttle) markStartUnpin(content uint) bool { s.unpinLk.Lock() defer s.unpinLk.Unlock() - if s.unpinInProgress[cont] { + if s.unpinInProgress[content] { return false } - s.unpinInProgress[cont] = true + s.unpinInProgress[content] = true return true } -func (s *Shuttle) finishUnpin(cont uint) { +func (s *Shuttle) finishUnpin(content uint) { s.unpinLk.Lock() defer s.unpinLk.Unlock() - delete(s.unpinInProgress, cont) + delete(s.unpinInProgress, content) } -func (s *Shuttle) markStartAggr(cont uint) bool { +func (s *Shuttle) markStartAggr(content uint) bool { s.aggrLk.Lock() defer s.aggrLk.Unlock() - if s.aggrInProgress[cont] { + if s.aggrInProgress[content] { return false } - s.aggrInProgress[cont] = true + s.aggrInProgress[content] = true return true } -func (s *Shuttle) finishAggr(cont uint) { +func (s *Shuttle) finishAggr(content uint) { s.aggrLk.Lock() defer s.aggrLk.Unlock() - delete(s.aggrInProgress, cont) + delete(s.aggrInProgress, content) } -func (s *Shuttle) markStartSplit(cont uint) bool { +func (s *Shuttle) markStartSplit(content uint) bool { s.splitLk.Lock() defer s.splitLk.Unlock() - if s.splitsInProgress[cont] { + if s.splitsInProgress[content] { return false } - s.splitsInProgress[cont] = true + s.splitsInProgress[content] = true return true } -func (s *Shuttle) finishSplit(cont uint) { +func (s *Shuttle) finishSplit(content uint) { s.splitLk.Lock() defer s.splitLk.Unlock() - delete(s.splitsInProgress, cont) + delete(s.splitsInProgress, content) } func (s *Shuttle) handleRpcSplitContent(ctx context.Context, req *drpc.SplitContent) error { diff --git a/contentmgr/gc.go b/contentmgr/gc.go index 4c462729..85cc034e 100644 --- a/contentmgr/gc.go +++ b/contentmgr/gc.go @@ -222,9 +222,9 @@ func (cm *ContentManager) clearUnreferencedObjects(ctx context.Context, objs []* return nil } -func (cm *ContentManager) objectsForPin(ctx context.Context, cont uint) ([]*util.Object, error) { +func (cm *ContentManager) objectsForPin(ctx context.Context, content uint) ([]*util.Object, error) { var objects []*util.Object - if err := cm.DB.Model(util.ObjRef{}).Where("content = ?", cont). + if err := cm.DB.Model(util.ObjRef{}).Where("content = ?", content). Joins("left join objects on obj_refs.object = objects.id"). Scan(&objects).Error; err != nil { return nil, err diff --git a/contentmgr/offloading.go b/contentmgr/offloading.go index b9732ece..2df7551c 100644 --- a/contentmgr/offloading.go +++ b/contentmgr/offloading.go @@ -113,9 +113,9 @@ func (cm *ContentManager) getLastAccesses(ctx context.Context, candidates []remo // TODO: this is only looking at the root, maybe we could find an efficient way to check more of the objects? // additionally, for aggregates, we should check each aggregated item under the root -func (cm *ContentManager) getLastAccessForContent(cont util.Content) (time.Time, error) { +func (cm *ContentManager) getLastAccessForContent(content util.Content) (time.Time, error) { var obj util.Object - if err := cm.DB.First(&obj, "cid = ?", cont.Cid).Error; err != nil { + if err := cm.DB.First(&obj, "cid = ?", content.Cid).Error; err != nil { return time.Time{}, err } @@ -133,18 +133,18 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in cm.contentLk.Lock() defer cm.contentLk.Unlock() for _, c := range conts { - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", c).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", c).Error; err != nil { return 0, err } - if cont.Location == constants.ContentLocationLocal { - local = append(local, cont.ID) + if content.Location == constants.ContentLocationLocal { + local = append(local, content.ID) } else { - remote[cont.Location] = append(remote[cont.Location], cont.ID) + remote[content.Location] = append(remote[content.Location], content.ID) } - if cont.AggregatedIn > 0 { + if content.AggregatedIn > 0 { return 0, fmt.Errorf("cannot offload aggregated content") } @@ -156,7 +156,7 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in return 0, err } - if cont.Aggregate { + if content.Aggregate { if err := cm.DB.Model(&util.Content{}).Where("aggregated_in = ?", c).Update("offloaded", true).Error; err != nil { return 0, err } @@ -176,10 +176,10 @@ func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (in } for _, c := range children { - if cont.Location == constants.ContentLocationLocal { + if content.Location == constants.ContentLocationLocal { local = append(local, c.ID) } else { - remote[cont.Location] = append(remote[cont.Location], c.ID) + remote[content.Location] = append(remote[content.Location], c.ID) } } } @@ -233,13 +233,13 @@ func (cm *ContentManager) GetRemovalCandidates(ctx context.Context, all bool, lo q = q.Where("user_id in ?", users) } - var conts []util.Content - if err := q.Scan(&conts).Error; err != nil { + var contents []util.Content + if err := q.Scan(&contents).Error; err != nil { return nil, fmt.Errorf("scanning removal candidates failed: %w", err) } var toOffload []removalCandidateInfo - for _, c := range conts { + for _, c := range contents { good, progress, failed, err := cm.contentIsProperlyReplicated(ctx, c.ID) if err != nil { return nil, xerrors.Errorf("failed to check replication of %d: %w", c.ID, err) diff --git a/contentmgr/pinning.go b/contentmgr/pinning.go index 68ff92af..53c02cc6 100644 --- a/contentmgr/pinning.go +++ b/contentmgr/pinning.go @@ -23,13 +23,13 @@ import ( "gorm.io/gorm/clause" ) -func (cm *ContentManager) PinStatus(cont util.Content, origins []*peer.AddrInfo) (*types.IpfsPinStatusResponse, error) { - delegates := cm.PinDelegatesForContent(cont) +func (cm *ContentManager) PinStatus(content util.Content, origins []*peer.AddrInfo) (*types.IpfsPinStatusResponse, error) { + delegates := cm.PinDelegatesForContent(content) meta := make(map[string]interface{}, 0) - if cont.PinMeta != "" { - if err := json.Unmarshal([]byte(cont.PinMeta), &meta); err != nil { - cm.log.Warnf("content %d has invalid pinmeta: %s", cont, err) + if content.PinMeta != "" { + if err := json.Unmarshal([]byte(content.PinMeta), &meta); err != nil { + cm.log.Warnf("content %d has invalid pinmeta: %s", content, err) } } @@ -44,12 +44,12 @@ func (cm *ContentManager) PinStatus(cont util.Content, origins []*peer.AddrInfo) } ps := &types.IpfsPinStatusResponse{ - RequestID: fmt.Sprintf("%d", cont.ID), + RequestID: fmt.Sprintf("%d", content.ID), Status: types.PinningStatusQueued, - Created: cont.CreatedAt, + Created: content.CreatedAt, Pin: types.IpfsPin{ - CID: cont.Cid.CID.String(), - Name: cont.Name, + CID: content.Cid.CID.String(), + Name: content.Name, Meta: meta, Origins: originStrs, }, @@ -57,18 +57,18 @@ func (cm *ContentManager) PinStatus(cont util.Content, origins []*peer.AddrInfo) Info: make(map[string]interface{}, 0), // TODO: all sorts of extra info we could add... } - if cont.Active { + if content.Active { ps.Status = types.PinningStatusPinned - } else if cont.Failed { + } else if content.Failed { ps.Status = types.PinningStatusFailed - } else if cont.Pinning { + } else if content.Pinning { ps.Status = types.PinningStatusPinning } return ps, nil } -func (cm *ContentManager) PinDelegatesForContent(cont util.Content) []string { - if cont.Location == constants.ContentLocationLocal { +func (cm *ContentManager) PinDelegatesForContent(content util.Content) []string { + if content.Location == constants.ContentLocationLocal { var out []string for _, a := range cm.Node.Host.Addrs() { out = append(out, fmt.Sprintf("%s/p2p/%s", a, cm.Node.Host.ID())) @@ -76,14 +76,14 @@ func (cm *ContentManager) PinDelegatesForContent(cont util.Content) []string { return out } else { - ai, err := cm.addrInfoForShuttle(cont.Location) + ai, err := cm.addrInfoForShuttle(content.Location) if err != nil { - cm.log.Errorf("failed to get address info for shuttle %q: %s", cont.Location, err) + cm.log.Errorf("failed to get address info for shuttle %q: %s", content.Location, err) return nil } if ai == nil { - cm.log.Warnf("no address info for shuttle: %s", cont.Location) + cm.log.Warnf("no address info for shuttle: %s", content.Location) return nil } @@ -154,7 +154,7 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid originsStr = string(b) } - cont := util.Content{ + content := util.Content{ Cid: util.DbCID{CID: obj}, Name: filename, UserID: user, @@ -165,13 +165,13 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid Location: loc, Origins: originsStr, } - if err := cm.DB.Create(&cont).Error; err != nil { + if err := cm.DB.Create(&content).Error; err != nil { return nil, err } if len(cols) > 0 { for _, c := range cols { - c.Content = cont.ID + c.Content = content.ID } if err := cm.DB.Clauses(clause.OnConflict{ @@ -183,40 +183,40 @@ func (cm *ContentManager) PinContent(ctx context.Context, user uint, obj cid.Cid } if loc == constants.ContentLocationLocal { - cm.addPinToQueue(cont, origins, replaceID, makeDeal) + cm.addPinToQueue(content, origins, replaceID, makeDeal) } else { - if err := cm.pinContentOnShuttle(ctx, cont, origins, replaceID, loc, makeDeal); err != nil { + if err := cm.pinContentOnShuttle(ctx, content, origins, replaceID, loc, makeDeal); err != nil { return nil, err } } - return cm.PinStatus(cont, origins) + return cm.PinStatus(content, origins) } -func (cm *ContentManager) addPinToQueue(cont util.Content, peers []*peer.AddrInfo, replaceID uint, makeDeal bool) { - if cont.Location != constants.ContentLocationLocal { +func (cm *ContentManager) addPinToQueue(content util.Content, peers []*peer.AddrInfo, replaceID uint, makeDeal bool) { + if content.Location != constants.ContentLocationLocal { cm.log.Errorf("calling addPinToQueue on non-local content") } op := &pinner.PinningOperation{ - ContId: cont.ID, - UserId: cont.UserID, - Obj: cont.Cid.CID, - Name: cont.Name, - Peers: peers, - Started: cont.CreatedAt, - Status: types.PinningStatusQueued, - Replace: replaceID, - Location: cont.Location, - MakeDeal: makeDeal, - Meta: cont.PinMeta, + ContentID: content.ID, + UserId: content.UserID, + Obj: content.Cid.CID, + Name: content.Name, + Peers: peers, + Started: content.CreatedAt, + Status: types.PinningStatusQueued, + Replace: replaceID, + Location: content.Location, + MakeDeal: makeDeal, + Meta: content.PinMeta, } cm.PinMgr.Add(op) } -func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, cont util.Content, peers []*peer.AddrInfo, replaceID uint, handle string, makeDeal bool) error { +func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, content util.Content, peers []*peer.AddrInfo, replaceID uint, handle string, makeDeal bool) error { ctx, span := cm.tracer.Start(ctx, "pinContentOnShuttle", trace.WithAttributes( attribute.String("handle", handle), - attribute.String("CID", cont.Cid.CID.String()), + attribute.String("CID", content.Cid.CID.String()), )) defer span.End() @@ -224,9 +224,9 @@ func (cm *ContentManager) pinContentOnShuttle(ctx context.Context, cont util.Con Op: drpc.CMD_AddPin, Params: drpc.CmdParams{ AddPin: &drpc.AddPin{ - DBID: cont.ID, - UserId: cont.UserID, - Cid: cont.Cid.CID, + DBID: content.ID, + UserId: content.UserID, + Cid: content.Cid.CID, Peers: peers, }, }, @@ -300,7 +300,7 @@ func (cm *ContentManager) selectLocationForContent(ctx context.Context, obj cid. return shuttles[0].Handle, nil } -func (cm *ContentManager) selectLocationForRetrieval(ctx context.Context, cont util.Content) (string, error) { +func (cm *ContentManager) selectLocationForRetrieval(ctx context.Context, content util.Content) (string, error) { _, span := cm.tracer.Start(ctx, "selectLocationForRetrieval") defer span.End() @@ -327,7 +327,7 @@ func (cm *ContentManager) selectLocationForRetrieval(ctx context.Context, cont u // prefer the shuttle the content is already on for _, sh := range shuttles { - if sh.Handle == cont.Location { + if sh.Handle == content.Location { return sh.Handle, nil } } @@ -381,15 +381,15 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri ctx, span := cm.tracer.Start(ctx, "handlePinningComplete") defer span.End() - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", pincomp.DBID).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", pincomp.DBID).Error; err != nil { return xerrors.Errorf("got shuttle pin complete for unknown content %d (shuttle = %s): %w", pincomp.DBID, handle, err) } - if cont.Active { + if content.Active { // content already active, no need to add objects, just update location // this is used by consolidated contents - if err := cm.DB.Model(util.Content{}).Where("id = ?", cont.ID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", content.ID).UpdateColumns(map[string]interface{}{ "pinning": false, "location": handle, }).Error; err != nil { @@ -398,7 +398,7 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri return nil } - if cont.Aggregate { + if content.Aggregate { // this is used by staging content aggregate if len(pincomp.Objects) != 1 { return fmt.Errorf("aggregate has more than 1 objects") @@ -413,13 +413,13 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri } if err := cm.DB.Create(&util.ObjRef{ - Content: cont.ID, + Content: content.ID, Object: obj.ID, }).Error; err != nil { return xerrors.Errorf("failed to create Object reference: %w", err) } - if err := cm.DB.Model(util.Content{}).Where("id = ?", cont.ID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", content.ID).UpdateColumns(map[string]interface{}{ "active": true, "pinning": false, "location": handle, @@ -429,7 +429,7 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri return xerrors.Errorf("failed to update content in database: %w", err) } // after aggregate is done, make deal for it - cm.ToCheck(cont.ID) + cm.ToCheck(content.ID) return nil } @@ -445,6 +445,6 @@ func (cm *ContentManager) handlePinningComplete(ctx context.Context, handle stri return xerrors.Errorf("failed to add objects to database: %w", err) } - cm.ToCheck(cont.ID) + cm.ToCheck(content.ID) return nil } diff --git a/contentmgr/replication.go b/contentmgr/replication.go index 58ceff0b..41695925 100644 --- a/contentmgr/replication.go +++ b/contentmgr/replication.go @@ -118,7 +118,7 @@ type ContentStagingZone struct { MaxSize int64 `json:"maxSize"` CurSize int64 `json:"curSize"` User uint `json:"user"` - ContID uint `json:"contentID"` + ContentID uint `json:"contentID"` Location string `json:"location"` IsConsolidating bool `json:"isConsolidating"` Readiness stagingZoneReadiness `json:"readiness"` @@ -136,7 +136,7 @@ func (cb *ContentStagingZone) DeepCopy() *ContentStagingZone { MaxSize: cb.MaxSize, CurSize: cb.CurSize, User: cb.User, - ContID: cb.ContID, + ContentID: cb.ContentID, Location: cb.Location, Readiness: cb.Readiness, } @@ -165,7 +165,7 @@ func (cm *ContentManager) newContentStagingZone(user uint, loc string) (*Content MinSize: cm.cfg.Content.MinSize, MaxSize: cm.cfg.Content.MaxSize, User: user, - ContID: content.ID, + ContentID: content.ID, Location: content.Location, Readiness: stagingZoneReadiness{false, "Readiness not yet evaluated"}, }, nil @@ -207,7 +207,7 @@ func (cm *ContentManager) tryAddContent(cb *ContentStagingZone, c util.Content) if err := cm.DB.Model(util.Content{}). Where("id = ?", c.ID). - UpdateColumn("aggregated_in", cb.ContID).Error; err != nil { + UpdateColumn("aggregated_in", cb.ContentID).Error; err != nil { return false, err } @@ -229,10 +229,10 @@ func (cm *ContentManager) tryRemoveContent(cb *ContentStagingZone, c util.Conten newContents := make([]util.Content, 0) newSize := int64(0) - for _, cont := range cb.Contents { - if cont.ID != c.ID { - newContents = append(newContents, cont) - newSize += cont.Size + for _, content := range cb.Contents { + if content.ID != c.ID { + newContents = append(newContents, content) + newSize += content.Size } } cb.Contents = newContents @@ -245,8 +245,8 @@ func (cb *ContentStagingZone) hasContent(c util.Content) bool { cb.lk.Lock() defer cb.lk.Unlock() - for _, cont := range cb.Contents { - if cont.ID == c.ID { + for _, content := range cb.Contents { + if content.ID == c.ID { return true } } @@ -307,7 +307,7 @@ func (cm *ContentManager) runStagingBucketWorker(ctx context.Context) { zones := cm.popReadyStagingZone() for _, z := range zones { if err := cm.processStagingZone(ctx, z); err != nil { - cm.log.Errorf("content aggregation failed (zone %d): %s", z.ContID, err) + cm.log.Errorf("content aggregation failed (zone %d): %s", z.ContentID, err) continue } } @@ -478,11 +478,11 @@ func (qm *queueManager) processQueue() { } func (cm *ContentManager) currentLocationForContent(cntId uint) (string, error) { - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", cntId).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", cntId).Error; err != nil { return "", err } - return cont.Location, nil + return content.Location, nil } func (cm *ContentManager) getGroupedStagedContentLocations(ctx context.Context, b *ContentStagingZone) (map[string]string, error) { @@ -584,7 +584,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt ctx, span := cm.tracer.Start(ctx, "aggregateStagingZone") defer span.End() - cm.log.Debugf("aggregating zone: %d", z.ContID) + cm.log.Debugf("aggregating zone: %d", z.ContentID) // get the aggregate content location var loc string @@ -606,7 +606,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } if size == 0 { - cm.log.Warnf("content %d aggregate dir apparent size is zero", z.ContID) + cm.log.Warnf("content %d aggregate dir apparent size is zero", z.ContentID) } obj := &util.Object{ @@ -618,7 +618,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } if err := cm.DB.Create(&util.ObjRef{ - Content: z.ContID, + Content: z.ContentID, Object: obj.ID, }).Error; err != nil { return err @@ -628,7 +628,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt return err } - if err := cm.DB.Model(util.Content{}).Where("id = ?", z.ContID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", z.ContentID).UpdateColumns(map[string]interface{}{ "active": true, "pinning": false, "cid": util.DbCID{CID: ncid}, @@ -639,7 +639,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } go func() { - cm.ToCheck(z.ContID) + cm.ToCheck(z.ContentID) }() return nil } @@ -651,7 +651,7 @@ func (cm *ContentManager) AggregateStagingZone(ctx context.Context, z *ContentSt } var bContent util.Content - if err := cm.DB.First(&bContent, "id = ?", z.ContID).Error; err != nil { + if err := cm.DB.First(&bContent, "id = ?", z.ContentID).Error; err != nil { return err } return cm.SendAggregateCmd(ctx, loc, bContent, aggrConts) @@ -716,7 +716,7 @@ func (cm *ContentManager) rebuildStagingBuckets() error { MaxSize: cm.cfg.Content.MaxSize, CurSize: zSize, User: c.UserID, - ContID: c.ID, + ContentID: c.ID, Location: c.Location, } z.updateReadiness() @@ -752,8 +752,8 @@ func (cm *ContentManager) SetDataTransferStartedOrFinished(ctx context.Context, return err } - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", deal.Content).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", deal.Content).Error; err != nil { return err } @@ -1102,16 +1102,16 @@ func (cm *ContentManager) ensureStorage(ctx context.Context, content util.Conten // if content has no deals, is not already staged, is below min content size, // and staging zone is enabled -func (cm *ContentManager) canStageContent(cont util.Content, deals []model.ContentDeal) bool { - return len(deals) == 0 && !cont.Aggregate && cont.Size < cm.cfg.Content.MinSize && cm.cfg.StagingBucket.Enabled +func (cm *ContentManager) canStageContent(content util.Content, deals []model.ContentDeal) bool { + return len(deals) == 0 && !content.Aggregate && content.Size < cm.cfg.Content.MinSize && cm.cfg.StagingBucket.Enabled } -func (cm *ContentManager) splitContent(ctx context.Context, cont util.Content, size int64) error { +func (cm *ContentManager) splitContent(ctx context.Context, content util.Content, size int64) error { ctx, span := cm.tracer.Start(ctx, "splitContent") defer span.End() var u util.User - if err := cm.DB.First(&u, "id = ?", cont.UserID).Error; err != nil { + if err := cm.DB.First(&u, "id = ?", content.UserID).Error; err != nil { return fmt.Errorf("failed to load contents user from db: %w", err) } @@ -1119,17 +1119,17 @@ func (cm *ContentManager) splitContent(ctx context.Context, cont util.Content, s return fmt.Errorf("user does not have content splitting enabled") } - cm.log.Debugf("splitting content %d (size: %d)", cont.ID, size) + cm.log.Debugf("splitting content %d (size: %d)", content.ID, size) - if cont.Location == constants.ContentLocationLocal { + if content.Location == constants.ContentLocationLocal { go func() { - if err := cm.splitContentLocal(ctx, cont, size); err != nil { - cm.log.Errorw("failed to split local content", "cont", cont.ID, "size", size, "err", err) + if err := cm.splitContentLocal(ctx, content, size); err != nil { + cm.log.Errorw("failed to split local content", "cont", content.ID, "size", size, "err", err) } }() return nil } else { - return cm.sendSplitContentCmd(ctx, cont.Location, cont.ID, size) + return cm.sendSplitContentCmd(ctx, content.Location, content.ID, size) } } @@ -1994,13 +1994,13 @@ func (cm *ContentManager) MakeDealWithMiner(ctx context.Context, content util.Co } func (cm *ContentManager) StartDataTransfer(ctx context.Context, cd *model.ContentDeal) error { - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", cd.Content).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", cd.Content).Error; err != nil { return err } - if cont.Location != constants.ContentLocationLocal { - return cm.sendStartTransferCommand(ctx, cont.Location, cd, cont.Cid.CID) + if content.Location != constants.ContentLocationLocal { + return cm.sendStartTransferCommand(ctx, content.Location, cd, content.Cid.CID) } miner, err := cd.MinerAddr() @@ -2008,14 +2008,14 @@ func (cm *ContentManager) StartDataTransfer(ctx context.Context, cd *model.Conte return err } - chanid, err := cm.FilClient.StartDataTransfer(ctx, miner, cd.PropCid.CID, cont.Cid.CID) + chanid, err := cm.FilClient.StartDataTransfer(ctx, miner, cd.PropCid.CID, content.Cid.CID) if err != nil { if oerr := cm.recordDealFailure(&DealFailureError{ Miner: miner, Phase: "start-data-transfer", Message: err.Error(), - Content: cont.ID, - UserID: cont.UserID, + Content: content.ID, + UserID: content.UserID, DealProtocolVersion: cd.DealProtocolVersion, MinerVersion: cd.MinerVersion, }); oerr != nil { @@ -2146,13 +2146,13 @@ func (cm *ContentManager) calculateCarSize(ctx context.Context, data cid.Cid) (u var ErrWaitForRemoteCompute = fmt.Errorf("waiting for remote commP computation") func (cm *ContentManager) runPieceCommCompute(ctx context.Context, data cid.Cid, bs blockstore.Blockstore) (cid.Cid, uint64, abi.UnpaddedPieceSize, error) { - var cont util.Content - if err := cm.DB.First(&cont, "cid = ?", data.Bytes()).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "cid = ?", data.Bytes()).Error; err != nil { return cid.Undef, 0, 0, err } - if cont.Location != constants.ContentLocationLocal { - if err := cm.SendShuttleCommand(ctx, cont.Location, &drpc.Command{ + if content.Location != constants.ContentLocationLocal { + if err := cm.SendShuttleCommand(ctx, content.Location, &drpc.Command{ Op: drpc.CMD_ComputeCommP, Params: drpc.CmdParams{ ComputeCommP: &drpc.ComputeCommP{ @@ -2166,7 +2166,7 @@ func (cm *ContentManager) runPieceCommCompute(ctx context.Context, data cid.Cid, return cid.Undef, 0, 0, ErrWaitForRemoteCompute } - cm.log.Debugw("computing piece commitment", "data", cont.Cid.CID) + cm.log.Debugw("computing piece commitment", "data", content.Cid.CID) return filclient.GeneratePieceCommitmentFFI(ctx, data, bs) } @@ -2278,14 +2278,14 @@ func (cm *ContentManager) RefreshContentForCid(ctx context.Context, c cid.Cid) ( } } -func (cm *ContentManager) RefreshContent(ctx context.Context, cont uint) error { +func (cm *ContentManager) RefreshContent(ctx context.Context, content uint) error { ctx, span := cm.tracer.Start(ctx, "refreshContent") defer span.End() // TODO: this retrieval needs to mark all of its content as 'referenced' // until we can update its offloading status in the database var c util.Content - if err := cm.DB.First(&c, "id = ?", cont).Error; err != nil { + if err := cm.DB.First(&c, "id = ?", content).Error; err != nil { return err } @@ -2293,19 +2293,19 @@ func (cm *ContentManager) RefreshContent(ctx context.Context, cont uint) error { if err != nil { return err } - cm.log.Infof("refreshing content %d onto shuttle %s", cont, loc) + cm.log.Infof("refreshing content %d onto shuttle %s", content, loc) switch loc { case constants.ContentLocationLocal: - if err := cm.retrieveContent(ctx, cont); err != nil { + if err := cm.retrieveContent(ctx, content); err != nil { return err } - if err := cm.DB.Model(&util.Content{}).Where("id = ?", cont).Update("offloaded", false).Error; err != nil { + if err := cm.DB.Model(&util.Content{}).Where("id = ?", content).Update("offloaded", false).Error; err != nil { return err } - if err := cm.DB.Model(&util.ObjRef{}).Where("content = ?", cont).Update("offloaded", 0).Error; err != nil { + if err := cm.DB.Model(&util.ObjRef{}).Where("content = ?", content).Update("offloaded", 0).Error; err != nil { return err } default: @@ -2315,17 +2315,17 @@ func (cm *ContentManager) RefreshContent(ctx context.Context, cont uint) error { return nil } -func (cm *ContentManager) sendRetrieveContentMessage(ctx context.Context, loc string, cont util.Content) error { +func (cm *ContentManager) sendRetrieveContentMessage(ctx context.Context, loc string, content util.Content) error { return fmt.Errorf("not retrieving content yet until implementation is finished") /* var activeDeals []contentDeal - if err := cm.DB.Find(&activeDeals, "content = ? and not failed and deal_id > 0", cont.ID).Error; err != nil { + if err := cm.DB.Find(&activeDeals, "content = ? and not failed and deal_id > 0", content.ID).Error; err != nil { return err } if len(activeDeals) == 0 { - cm.log.Errorf("attempted to retrieve content %d but have no active deals", cont.ID) - return fmt.Errorf("no active deals for content %d, cannot retrieve", cont.ID) + cm.log.Errorf("attempted to retrieve content %d but have no active deals", content.ID) + return fmt.Errorf("no active deals for content %d, cannot retrieve", content.ID) } var deals []drpc.StorageDeal @@ -2346,8 +2346,8 @@ func (cm *ContentManager) sendRetrieveContentMessage(ctx context.Context, loc st Op: drpc.CMD_RetrieveContent, Params: drpc.CmdParams{ RetrieveContent: &drpc.RetrieveContent{ - Content: cont.ID, - Cid: cont.Cid.CID, + Content: content.ID, + Cid: content.Cid.CID, Deals: deals, }, }, @@ -2530,21 +2530,21 @@ func (cm *ContentManager) migrateContentsToLocalNode(ctx context.Context, toMove return nil } -func (cm *ContentManager) migrateContentToLocalNode(ctx context.Context, cont util.Content) error { - done, err := cm.safeFetchData(ctx, cont.Cid.CID) +func (cm *ContentManager) migrateContentToLocalNode(ctx context.Context, content util.Content) error { + done, err := cm.safeFetchData(ctx, content.Cid.CID) if err != nil { return fmt.Errorf("failed to fetch data: %w", err) } defer done() - if err := cm.DB.Model(util.ObjRef{}).Where("id = ?", cont.ID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.ObjRef{}).Where("id = ?", content.ID).UpdateColumns(map[string]interface{}{ "offloaded": 0, }).Error; err != nil { return err } - if err := cm.DB.Model(util.Content{}).Where("id = ?", cont.ID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", content.ID).UpdateColumns(map[string]interface{}{ "offloaded": false, "location": constants.ContentLocationLocal, }).Error; err != nil { @@ -2651,13 +2651,13 @@ func (cm *ContentManager) sendStartTransferCommand(ctx context.Context, loc stri }) } -func (cm *ContentManager) SendAggregateCmd(ctx context.Context, loc string, cont util.Content, aggr []drpc.AggregateContent) error { +func (cm *ContentManager) SendAggregateCmd(ctx context.Context, loc string, content util.Content, aggr []drpc.AggregateContent) error { return cm.SendShuttleCommand(ctx, loc, &drpc.Command{ Op: drpc.CMD_AggregateContent, Params: drpc.CmdParams{ AggregateContent: &drpc.AggregateContents{ - DBID: cont.ID, - UserID: cont.UserID, + DBID: content.ID, + UserID: content.UserID, Contents: aggr, }, }, @@ -2676,12 +2676,12 @@ func (cm *ContentManager) sendRequestTransferStatusCmd(ctx context.Context, loc }) } -func (cm *ContentManager) sendSplitContentCmd(ctx context.Context, loc string, cont uint, size int64) error { +func (cm *ContentManager) sendSplitContentCmd(ctx context.Context, loc string, content uint, size int64) error { return cm.SendShuttleCommand(ctx, loc, &drpc.Command{ Op: drpc.CMD_SplitContent, Params: drpc.CmdParams{ SplitContent: &drpc.SplitContent{ - Content: cont, + Content: content, Size: size, }, }, @@ -2745,10 +2745,10 @@ func (cm *ContentManager) SetDealMakingEnabled(enable bool) { cm.isDealMakingDisabled = !enable } -func (cm *ContentManager) splitContentLocal(ctx context.Context, cont util.Content, size int64) error { +func (cm *ContentManager) splitContentLocal(ctx context.Context, content util.Content, size int64) error { dserv := merkledag.NewDAGService(blockservice.New(cm.Node.Blockstore, nil)) b := dagsplit.NewBuilder(dserv, uint64(size), 0) - if err := b.Pack(ctx, cont.Cid.CID); err != nil { + if err := b.Pack(ctx, content.Cid.CID); err != nil { return err } @@ -2766,14 +2766,14 @@ func (cm *ContentManager) splitContentLocal(ctx context.Context, cont util.Conte for i, c := range boxCids { content := &util.Content{ Cid: util.DbCID{CID: c}, - Name: fmt.Sprintf("%s-%d", cont.Name, i), + Name: fmt.Sprintf("%s-%d", content.Name, i), Active: false, // will be active after it's blocks are saved Pinning: true, - UserID: cont.UserID, - Replication: cont.Replication, + UserID: content.UserID, + Replication: content.Replication, Location: constants.ContentLocationLocal, DagSplit: true, - SplitFrom: cont.ID, + SplitFrom: content.ID, } if err := cm.DB.Create(content).Error; err != nil { @@ -2786,12 +2786,12 @@ func (cm *ContentManager) splitContentLocal(ctx context.Context, cont util.Conte // queue splited contents go func() { - cm.log.Debugw("queuing splited content child", "parent_contID", cont.ID, "child_contID", content.ID) + cm.log.Debugw("queuing splited content child", "parent_contID", content.ID, "child_contID", content.ID) cm.ToCheck(content.ID) }() } - if err := cm.DB.Model(util.Content{}).Where("id = ?", cont.ID).UpdateColumns(map[string]interface{}{ + if err := cm.DB.Model(util.Content{}).Where("id = ?", content.ID).UpdateColumns(map[string]interface{}{ "dag_split": true, "size": 0, "active": false, @@ -2799,12 +2799,12 @@ func (cm *ContentManager) splitContentLocal(ctx context.Context, cont util.Conte }).Error; err != nil { return err } - return cm.DB.Where("content = ?", cont.ID).Delete(&util.ObjRef{}).Error + return cm.DB.Where("content = ?", content.ID).Delete(&util.ObjRef{}).Error } var noDataTimeout = time.Minute * 10 -func (cm *ContentManager) AddDatabaseTrackingToContent(ctx context.Context, cont uint, dserv ipld.NodeGetter, root cid.Cid, cb func(int64)) error { +func (cm *ContentManager) AddDatabaseTrackingToContent(ctx context.Context, content uint, dserv ipld.NodeGetter, root cid.Cid, cb func(int64)) error { ctx, span := cm.tracer.Start(ctx, "computeObjRefsUpdate") defer span.End() @@ -2884,7 +2884,7 @@ func (cm *ContentManager) AddDatabaseTrackingToContent(ctx context.Context, cont if err != nil { return err } - return cm.addObjectsToDatabase(ctx, cont, objects, constants.ContentLocationLocal) + return cm.addObjectsToDatabase(ctx, content, objects, constants.ContentLocationLocal) } func (cm *ContentManager) AddDatabaseTracking(ctx context.Context, u *util.User, dserv ipld.NodeGetter, root cid.Cid, filename string, replication int) (*util.Content, error) { diff --git a/contentmgr/shuttle.go b/contentmgr/shuttle.go index 34aaa75f..eea27a38 100644 --- a/contentmgr/shuttle.go +++ b/contentmgr/shuttle.go @@ -417,8 +417,8 @@ func (cm *ContentManager) handleRpcShuttleUpdate(ctx context.Context, handle str func (cm *ContentManager) handleRpcGarbageCheck(ctx context.Context, handle string, param *drpc.GarbageCheck) error { var tounpin []uint for _, c := range param.Contents { - var cont util.Content - if err := cm.DB.First(&cont, "id = ?", c).Error; err != nil { + var content util.Content + if err := cm.DB.First(&content, "id = ?", c).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { tounpin = append(tounpin, c) } else { @@ -426,7 +426,7 @@ func (cm *ContentManager) handleRpcGarbageCheck(ctx context.Context, handle stri } } - if cont.Location != handle || cont.Offloaded { + if content.Location != handle || content.Offloaded { tounpin = append(tounpin, c) } } diff --git a/handlers.go b/handlers.go index f464ae87..b61e1bd7 100644 --- a/handlers.go +++ b/handlers.go @@ -49,7 +49,6 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" offline "github.com/ipfs/go-ipfs-exchange-offline" ipld "github.com/ipfs/go-ipld-format" @@ -57,7 +56,6 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs" uio "github.com/ipfs/go-unixfs/io" - "github.com/ipld/go-car" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/libp2p/go-libp2p/core/peer" @@ -157,32 +155,24 @@ func (s *Server) ServeAPI() error { userMiner.PUT("/unsuspend/:miner", withUser(s.handleUnsuspendMiner)) userMiner.PUT("/set-info/:miner", withUser(s.handleMinersSetInfo)) - contmeta := e.Group("/content") - uploads := contmeta.Group("", s.AuthRequired(util.PermLevelUpload)) - uploads.POST("/add", withUser(s.handleAdd)) - uploads.POST("/add-ipfs", withUser(s.handleAddIpfs)) - uploads.POST("/add-car", util.WithContentLengthCheck(withUser(s.handleAddCar))) - uploads.POST("/create", withUser(s.handleCreateContent)) - - content := contmeta.Group("", s.AuthRequired(util.PermLevelUser)) - content.GET("/by-cid/:cid", s.handleGetContentByCid) - content.GET("/:cont_id", withUser(s.handleGetContent)) - content.GET("/stats", withUser(s.handleStats)) - content.GET("/ensure-replication/:datacid", s.handleEnsureReplication) - content.GET("/status/:id", withUser(s.handleContentStatus)) - content.GET("/list", withUser(s.handleListContent)) - content.GET("/deals", withUser(s.handleListContentWithDeals)) - content.GET("/failures/:content", withUser(s.handleGetContentFailures)) - content.GET("/bw-usage/:content", withUser(s.handleGetContentBandwidth)) - content.GET("/staging-zones", withUser(s.handleGetStagingZoneForUser)) - content.GET("/aggregated/:content", withUser(s.handleGetAggregatedForContent)) - content.GET("/all-deals", withUser(s.handleGetAllDealsForUser)) + // to upload contents you only need an upload key + // to see info about contents you need a user-level key (see contents group) + e.POST("/contents", withUser(s.handleAdd), s.AuthRequired(util.PermLevelUpload)) + + contents := e.Group("", s.AuthRequired(util.PermLevelUser)) + contents.GET("/contents", withUser(s.handleListContent)) + contents.GET("/contents/:contentid", withUser(s.handleGetContent)) + contents.GET("/contents/:cid/ensure-replication", s.handleEnsureReplication) + contents.GET("/contents/:contentid/status", withUser(s.handleContentStatus)) + + stagingBuckets := e.Group("/staging-buckets", s.AuthRequired(util.PermLevelUser)) + stagingBuckets.GET("", withUser(s.handleGetStagingZoneForUser)) // TODO: the commented out routes here are still fairly useful, but maybe // need to have some sort of 'super user' permission level in order to use // them? Can easily cause harm using them - deals := e.Group("/deals") - deals.Use(s.AuthRequired(util.PermLevelUser)) + deals := e.Group("/deals", s.AuthRequired(util.PermLevelUser)) + deals.GET("", withUser(s.handleGetAllDealsForUser)) deals.GET("/status/:deal", withUser(s.handleGetDealStatus)) deals.GET("/status-by-proposal/:propcid", withUser(s.handleGetDealStatusByPropCid)) deals.GET("/query/:miner", s.handleQueryAsk) @@ -207,7 +197,7 @@ func (s *Server) ServeAPI() error { buckets.POST("/:uuid", withUser(s.handleAddContentsToBucket)) buckets.GET("/:uuid", withUser(s.handleGetBucketContents)) - buckets.DELETE("/:bucketuuid/contents", withUser(s.handleDeleteContentFromBucket)) + buckets.DELETE("/:uuid/contents", withUser(s.handleDeleteContentFromBucket)) buckets.POST("/:uuid/commit", withUser(s.handleCommitBucket)) @@ -224,7 +214,6 @@ func (s *Server) ServeAPI() error { // explicitly public, for now public := e.Group("/public") - public.GET("/stats", s.handlePublicStats) public.GET("/by-cid/:cid", s.handleGetContentByCid) public.GET("/deals/failures", s.handlePublicStorageFailures) public.GET("/info", s.handleGetPublicNodeInfo) @@ -373,7 +362,7 @@ func withUser(f func(echo.Context, *util.User) error) func(echo.Context) error { // handleStats godoc // @Summary Get content statistics // @Description This endpoint is used to get content statistics. Every content stored in the network (estuary) is tracked by a unique ID which can be used to get information about the content. This endpoint will allow the consumer to get the collected stats of a conten -// @Tags content +// @Tags contents // @Param limit query string true "limit" // @Param offset query string true "offset" // @Produce json @@ -619,240 +608,26 @@ func (s *Server) handlePeeringStatus(c echo.Context) error { return c.JSON(http.StatusOK, StateResponse{State: ""}) } -// handleAddIpfs godoc -// @Summary Add IPFS object -// @Description This endpoint is used to add an IPFS object to the network. The object can be a file or a directory. -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param body body util.ContentAddIpfsBody true "IPFS Body" -// @Param ignore-dupes query string false "Ignore Dupes" -// @Router /content/add-ipfs [post] -func (s *Server) handleAddIpfs(c echo.Context, u *util.User) error { - ctx := c.Request().Context() - - if err := util.ErrorIfContentAddingDisabled(s.isContentAddingDisabled(u)); err != nil { - return err - } - - var params util.ContentAddIpfsBody - if err := c.Bind(¶ms); err != nil { - return err - } - - filename := params.Name - if filename == "" { - filename = params.Root - } - - var bucks []*buckets.BucketRef - if params.BucketID != "" { - var srchbucket buckets.Bucket - if err := s.DB.First(&srchbucket, "uuid = ? and user_id = ?", params.BucketID, u.ID).Error; err != nil { - return err - } - - // if dir is "" or nil, put the file on the root dir (/filename) - defaultPath := "/" + filename - bucketp := defaultPath - if params.BucketDir != "" { - p, err := sanitizePath(params.BucketDir) - if err != nil { - return err - } - bucketp = p - } - - // default: bucketp ends in / (does not include filename e.g. /hello/) - path := bucketp + filename - - // if path does not end in /, it includes the filename - if !strings.HasSuffix(bucketp, "/") { - path = bucketp - filename = filepath.Base(bucketp) - } - - bucks = []*buckets.BucketRef{ - { - Bucket: srchbucket.ID, - Path: &path, - }, - } - } - - var origins []*peer.AddrInfo - for _, p := range params.Peers { - ai, err := peer.AddrInfoFromString(p) - if err != nil { - return err - } - origins = append(origins, ai) - } - - rcid, err := cid.Decode(params.Root) - if err != nil { - return err - } - - if c.QueryParam("ignore-dupes") == "true" { - var count int64 - if err := s.DB.Model(util.Content{}).Where("cid = ? and user_id = ?", rcid.Bytes(), u.ID).Count(&count).Error; err != nil { - return err - } - if count > 0 { - return c.JSON(302, map[string]string{"message": "content with given cid already preserved"}) - } - } - - makeDeal := true - pinstatus, err := s.CM.PinContent(ctx, u.ID, rcid, filename, bucks, origins, 0, nil, makeDeal) - if err != nil { - return err - } - return c.JSON(http.StatusAccepted, pinstatus) -} - -// handleAddCar godoc -// @Summary Add Car object -// @Description This endpoint is used to add a car object to the network. The object can be a file or a directory. -// @Tags content -// @Produce json -// @Success 200 {object} util.ContentAddResponse -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param body body string true "Car" -// @Param ignore-dupes query string false "Ignore Dupes" -// @Param filename query string false "Filename" -// @Router /content/add-car [post] -func (s *Server) handleAddCar(c echo.Context, u *util.User) error { - ctx := c.Request().Context() - - if err := util.ErrorIfContentAddingDisabled(s.isContentAddingDisabled(u)); err != nil { - return err - } - - if s.cfg.Content.DisableLocalAdding { - return s.redirectContentAdding(c, u) - } - - // if splitting is disabled and uploaded content size is greater than content size limit - // reject the upload, as it will only get stuck and deals will never be made for it - // if !u.FlagSplitContent() { - // bdWriter := &bytes.Buffer{} - // bdReader := io.TeeReader(c.Request().Body, bdWriter) - - // bdSize, err := io.Copy(ioutil.Discard, bdReader) - // if err != nil { - // return err - // } - - // if bdSize > util.MaxDealContentSize { - // return &util.HttpError{ - // Code: http.StatusBadRequest, - // Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, - // Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.MaxDealContentSize), - // } - // } - - // c.Request().Body = ioutil.NopCloser(bdWriter) - // } - - bsid, sbs, err := s.StagingMgr.AllocNew() - if err != nil { - return err - } - - defer func() { - go func() { - if err := s.StagingMgr.CleanUp(bsid); err != nil { - log.Errorf("failed to clean up staging blockstore: %s", err) - } - }() - }() - - defer c.Request().Body.Close() - header, err := s.loadCar(ctx, sbs, c.Request().Body) - if err != nil { - return err - } - - if len(header.Roots) != 1 { - // if someone wants this feature, let me know - return c.JSON(400, map[string]string{"error": "cannot handle uploading car files with multiple roots"}) - } - rootCID := header.Roots[0] - - if c.QueryParam("ignore-dupes") == "true" { - isDup, err := s.isDupCIDContent(c, rootCID, u) - if err != nil || isDup { - return err - } - } - - // TODO: how to specify filename? - filename := rootCID.String() - if qpname := c.QueryParam("filename"); qpname != "" { - filename = qpname - } - - bserv := blockservice.New(sbs, nil) - dserv := merkledag.NewDAGService(bserv) - - cont, err := s.CM.AddDatabaseTracking(ctx, u, dserv, rootCID, filename, s.cfg.Replication) - if err != nil { - return err - } - - if err := util.DumpBlockstoreTo(ctx, s.tracer, sbs, s.Node.Blockstore); err != nil { - return xerrors.Errorf("failed to move data from staging to main blockstore: %w", err) - } - - go func() { - // TODO: we should probably have a queue to throw these in instead of putting them out in goroutines... - s.CM.ToCheck(cont.ID) - }() - - go func() { - if err := s.Node.Provider.Provide(rootCID); err != nil { - log.Warnf("failed to announce providers: %s", err) - } - }() - - return c.JSON(http.StatusOK, &util.ContentAddResponse{ - Cid: rootCID.String(), - RetrievalURL: util.CreateDwebRetrievalURL(rootCID.String()), - EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(rootCID.String()), - EstuaryId: cont.ID, - Providers: s.CM.PinDelegatesForContent(*cont), - }) -} - -func (s *Server) loadCar(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (*car.CarHeader, error) { - _, span := s.tracer.Start(ctx, "loadCar") - defer span.End() - - return car.LoadCar(ctx, bs, r) -} - // handleAdd godoc // @Summary Add new content // @Description This endpoint is used to upload new content. -// @Tags content +// @Tags contents // @Produce json // @Accept multipart/form-data -// @Param data formData file true "File to upload" -// @Param filename formData string false "Filename to use for upload" -// @Param uuid query string false "Bucket UUID" +// @Param type query type false "Type of content to upload ('car', 'cid', 'file' or 'url'). Defaults to 'file'" +// @Param car body string false "Car file to upload" +// @Param body body util.ContentAddIpfsBody false "IPFS Body" +// @Param data formData file false "File to upload" +// @Param filename formData string false "Filename to use for upload" +// @Param uuid query string false "Bucket UUID" // @Param replication query int false "Replication value" // @Param ignore-dupes query string false "Ignore Dupes true/false" // @Param lazy-provide query string false "Lazy Provide true/false" -// @Param dir query string false "Directory" +// @Param dir query string false "Directory in collection" // @Success 200 {object} util.ContentAddResponse // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Router /content/add [post] +// @Router /contents [post] func (s *Server) handleAdd(c echo.Context, u *util.User) error { ctx, span := s.tracer.Start(c.Request().Context(), "handleAdd", trace.WithAttributes(attribute.Int("user", int(u.ID)))) defer span.End() @@ -865,41 +640,9 @@ func (s *Server) handleAdd(c echo.Context, u *util.User) error { return s.redirectContentAdding(c, u) } - form, err := c.MultipartForm() - if err != nil { - return err - } - defer form.RemoveAll() - - mpf, err := c.FormFile("data") - if err != nil { - return err - } - - // if splitting is disabled and uploaded content size is greater than content size limit - // reject the upload, as it will only get stuck and deals will never be made for it - if !u.FlagSplitContent() && mpf.Size > s.cfg.Content.MaxSize { - return &util.HttpError{ - Code: http.StatusBadRequest, - Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, - Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", mpf.Size, s.cfg.Content.MaxSize), - } - } - - filename := mpf.Filename - if fvname := c.FormValue("filename"); fvname != "" { - filename = fvname - } - - fi, err := mpf.Open() - if err != nil { - return err - } - - defer fi.Close() - + // replication from query params replication := s.cfg.Replication - replVal := c.FormValue("replication") + replVal := c.QueryParam("replication") if replVal != "" { parsed, err := strconv.Atoi(replVal) if err != nil { @@ -909,11 +652,12 @@ func (s *Server) handleAdd(c echo.Context, u *util.User) error { } } - uuid := c.QueryParam("uuid") + // bucket uuid from query params + bucketuuid := c.QueryParam("uuid") var bucket *buckets.Bucket - if uuid != "" { + if bucketuuid != "" { var srchbucket buckets.Bucket - if err := s.DB.First(&srchbucket, "uuid = ? and user_id = ?", uuid, u.ID).Error; err != nil { + if err := s.DB.First(&srchbucket, "uuid = ? and user_id = ?", bucketuuid, u.ID).Error; err != nil { return err } @@ -925,6 +669,11 @@ func (s *Server) handleAdd(c echo.Context, u *util.User) error { return err } + uploadType := util.UploadType(c.QueryParam("type")) + if uploadType == util.UploadTypeDefault { + uploadType = util.UploadTypeFile + } + bsid, bs, err := s.StagingMgr.AllocNew() if err != nil { return err @@ -941,24 +690,50 @@ func (s *Server) handleAdd(c echo.Context, u *util.User) error { bserv := blockservice.New(bs, nil) dserv := merkledag.NewDAGService(bserv) - nd, err := s.importFile(ctx, dserv, fi) - if err != nil { - return err + uploadedContent, err := util.LoadContentFromRequest(c, ctx, uploadType, bs, dserv) + + // if splitting is disabled and uploaded content size is greater than content size limit + // reject the upload, as it will only get stuck and deals will never be made for it + if !u.FlagSplitContent() && uploadedContent.Length > s.cfg.Content.MaxSize { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("content size %d bytes, is over upload size limit of %d bytes, and content splitting is not enabled, please reduce the content size", uploadedContent.Length, s.cfg.Content.MaxSize), + } } if c.QueryParam("ignore-dupes") == "true" { - isDup, err := s.isDupCIDContent(c, nd.Cid(), u) + isDup, err := s.isDupCIDContent(c, uploadedContent.CID, u) if err != nil || isDup { return err } } - content, err := s.CM.AddDatabaseTracking(ctx, u, dserv, nd.Cid(), filename, replication) + // when pinning a CID we need to add a file to handle the special case + // of calling PinContent on the content manager + // TODO(gabe): PinContent adds to database tracking. decouple logic from that + if uploadType == util.UploadTypeCID { + makeDeal := true + bucks := []*buckets.BucketRef{ + { + Bucket: bucket.ID, + Path: &path, + }, + } + pinstatus, err := s.CM.PinContent(ctx, u.ID, uploadedContent.CID, uploadedContent.Filename, bucks, uploadedContent.Origins, 0, nil, makeDeal) + if err != nil { + return err + } + return c.JSON(http.StatusAccepted, pinstatus) + } + + content, err := s.CM.AddDatabaseTracking(ctx, u, dserv, uploadedContent.CID, uploadedContent.Filename, replication) if err != nil { return xerrors.Errorf("encountered problem computing object references: %w", err) } fullPath := filepath.Join(path, content.Name) + // create bucket if need be if bucket != nil { log.Infof("BUCKET CREATION: %d, %d", bucket.ID, content.ID) if err := s.DB.Create(&buckets.BucketRef{ @@ -981,22 +756,22 @@ func (s *Server) handleAdd(c echo.Context, u *util.User) error { if c.QueryParam("lazy-provide") != "true" { subctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() - if err := s.Node.FullRT.Provide(subctx, nd.Cid(), true); err != nil { + if err := s.Node.FullRT.Provide(subctx, uploadedContent.CID, true); err != nil { span.RecordError(fmt.Errorf("provide error: %w", err)) log.Errorf("fullrt provide call errored: %s", err) } } go func() { - if err := s.Node.Provider.Provide(nd.Cid()); err != nil { + if err := s.Node.Provider.Provide(uploadedContent.CID); err != nil { log.Warnf("failed to announce providers: %s", err) } }() return c.JSON(http.StatusOK, &util.ContentAddResponse{ - Cid: nd.Cid().String(), - RetrievalURL: util.CreateDwebRetrievalURL(nd.Cid().String()), - EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(nd.Cid().String()), + Cid: uploadedContent.CID.String(), + RetrievalURL: util.CreateDwebRetrievalURL(uploadedContent.CID.String()), + EstuaryRetrievalURL: util.CreateEstuaryRetrievalURL(uploadedContent.CID.String()), EstuaryId: content.ID, Providers: s.CM.PinDelegatesForContent(*content), }) @@ -1045,35 +820,28 @@ func (s *Server) redirectContentAdding(c echo.Context, u *util.User) error { return nil } -func (s *Server) importFile(ctx context.Context, dserv ipld.DAGService, fi io.Reader) (ipld.Node, error) { - _, span := s.tracer.Start(ctx, "importFile") - defer span.End() - - return util.ImportFile(dserv, fi) -} - // handleEnsureReplication godoc // @Summary Ensure Replication // @Description This endpoint ensures that the content is replicated to the specified number of providers -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Param datacid path string true "Data CID" -// @Router /content/ensure-replication/{datacid} [get] +// @Param cid path string true "CID" +// @Router /content/{cid}/ensure-replication [get] func (s *Server) handleEnsureReplication(c echo.Context) error { - data, err := cid.Decode(c.Param("datacid")) + cid, err := cid.Decode(c.Param("cid")) if err != nil { return err } var content util.Content - if err := s.DB.Find(&content, "cid = ?", data.Bytes()).Error; err != nil { + if err := s.DB.Find(&content, "cid = ?", cid.Bytes()).Error; err != nil { return err } - fmt.Println("Content: ", content.Cid.CID, data) + fmt.Println("Content: ", content.Cid.CID, cid) s.CM.ToCheck(content.ID) return nil @@ -1082,13 +850,25 @@ func (s *Server) handleEnsureReplication(c echo.Context) error { // handleListContent godoc // @Summary List all pinned content // @Description This endpoint lists all content -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Router /content/list [get] +// @Param deals query bool false "If 'true', only list content with deals made" +// @Param cid query string false "CID of content to look for" +// @Router /content [get] func (s *Server) handleListContent(c echo.Context, u *util.User) error { + if cidStr := c.QueryParam("cid"); cidStr != "" { + out, err := s.getContentByCid(cidStr) + if err != nil { + return err + } + return c.JSON(http.StatusOK, out) + } + if deals := c.QueryParam("deals"); deals == "true" { + return s.handleListContentWithDeals(c, u) + } var contents []util.Content if err := s.DB.Find(&contents, "active and user_id = ?", u.ID).Error; err != nil { return err @@ -1105,7 +885,7 @@ type expandedContent struct { // handleListContentWithDeals godoc // @Summary Content with deals // @Description This endpoint lists all content with deals -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -1148,12 +928,12 @@ func (s *Server) handleListContentWithDeals(c echo.Context, u *util.User) error } out := make([]expandedContent, 0, len(contents)) - for _, cont := range contents { + for _, content := range contents { ec := expandedContent{ - Content: cont, + Content: content, } - if cont.Aggregate { - if err := s.DB.Model(util.Content{}).Where("aggregated_in = ?", cont.ID).Count(&ec.AggregatedFiles).Error; err != nil { + if content.Aggregate { + if err := s.DB.Model(util.Content{}).Where("aggregated_in = ?", content.ID).Count(&ec.AggregatedFiles).Error; err != nil { return err } @@ -1179,15 +959,15 @@ type dealStatus struct { // handleGetContent godoc // @Summary Content // @Description This endpoint returns a content by its ID -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Param id path int true "Content ID" -// @Router /content/{id} [get] +// @Param contentid path int true "Content ID" +// @Router /contents/{contentid} [get] func (s *Server) handleGetContent(c echo.Context, u *util.User) error { - contID, err := strconv.Atoi(c.Param("cont_id")) + contID, err := strconv.Atoi(c.Param("contentid")) if err != nil { return err } @@ -1211,19 +991,29 @@ func (s *Server) handleGetContent(c echo.Context, u *util.User) error { return c.JSON(http.StatusOK, content) } +type ContentStatusResponse struct { + Content util.Content `json:"content"` + Deals []dealStatus `json:"deals"` + FailuresCount int64 `json:"failuresCount"` + Failures []model.DfeRecord `json:"failures"` + TotalOutBw int64 `json:"totalOutBw"` + Aggregated []util.Content `json:"aggregated"` + Offloaded bool `json:"offloaded"` // TODO(gabe): this needs to be set on handleContentStatus, but we're not offloading data as of now +} + // handleContentStatus godoc // @Summary Content Status // @Description This endpoint returns the status of a content -// @Tags content +// @Tags contents // @Produce json -// @Success 200 {object} string +// @Success 200 {object} ContentStatusResponse // @Failure 400 {object} util.HttpError // @Failure 500 {object} util.HttpError -// @Param id path int true "Content ID" -// @Router /content/status/{id} [get] +// @Param contentid path int true "Content ID" +// @Router /contents/{contentid}/status [get] func (s *Server) handleContentStatus(c echo.Context, u *util.User) error { ctx := c.Request().Context() - contID, err := strconv.Atoi(c.Param("id")) + contID, err := strconv.Atoi(c.Param("contentid")) if err != nil { return err } @@ -1302,14 +1092,32 @@ func (s *Server) handleContentStatus(c echo.Context, u *util.User) error { }) var failCount int64 - if err := s.DB.Model(&model.DfeRecord{}).Where("content = ?", content.ID).Count(&failCount).Error; err != nil { + var errs []model.DfeRecord + if err := s.DB.Model(&model.DfeRecord{}).Where("content = ?", content.ID).Find(&errs).Count(&failCount).Error; err != nil { return err } - return c.JSON(http.StatusOK, map[string]interface{}{ - "content": content, - "deals": ds, - "failuresCount": failCount, + var bw int64 + if err := s.DB.Model(util.ObjRef{}). + Select("SUM(size * reads)"). + Where("obj_refs.content = ?", content.ID). + Joins("left join objects on obj_refs.object = objects.id"). + Scan(&bw).Error; err != nil { + return err + } + + var sub []util.Content + if err := s.DB.Find(&sub, "aggregated_in = ?", content.ID).Error; err != nil { + return err + } + + return c.JSON(http.StatusOK, ContentStatusResponse{ + Content: content, + Deals: ds, + FailuresCount: failCount, + Failures: errs, + TotalOutBw: bw, + Aggregated: sub, }) } @@ -1459,20 +1267,10 @@ func (s *Server) calcSelector(aggregatedIn uint, contentID uint) (string, error) return fmt.Sprintf("/Links/%d/Hash", ordinal), nil } -// handleGetContentByCid godoc -// @Summary Get Content by Cid -// @Description This endpoint returns the content record associated with a CID -// @Tags public -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param cid path string true "Cid" -// @Router /public/by-cid/{cid} [get] -func (s *Server) handleGetContentByCid(c echo.Context) error { - obj, err := cid.Decode(c.Param("cid")) +func (s *Server) getContentByCid(cidStr string) ([]getContentResponse, error) { + obj, err := cid.Decode(cidStr) if err != nil { - return errors.Wrapf(err, "invalid cid") + return []getContentResponse{}, errors.Wrapf(err, "invalid cid") } v0 := cid.Undef @@ -1486,36 +1284,36 @@ func (s *Server) handleGetContentByCid(c echo.Context) error { var contents []util.Content if err := s.DB.Find(&contents, "(cid=? or cid=?) and active", v0.Bytes(), v1.Bytes()).Error; err != nil { - return err + return []getContentResponse{}, err } out := make([]getContentResponse, 0) - for i, cont := range contents { + for i, content := range contents { resp := getContentResponse{ Content: &contents[i], } - id := cont.ID + id := content.ID - if cont.AggregatedIn > 0 { + if content.AggregatedIn > 0 { var aggr util.Content - if err := s.DB.First(&aggr, "id = ?", cont.AggregatedIn).Error; err != nil { - return err + if err := s.DB.First(&aggr, "id = ?", content.AggregatedIn).Error; err != nil { + return []getContentResponse{}, err } resp.AggregatedIn = &aggr // no need to early return here, the selector is mostly cosmetic atm - if selector, err := s.calcSelector(cont.AggregatedIn, cont.ID); err == nil { + if selector, err := s.calcSelector(content.AggregatedIn, content.ID); err == nil { resp.Selector = selector } - id = cont.AggregatedIn + id = content.AggregatedIn } var deals []*model.ContentDeal if err := s.DB.Find(&deals, "content = ? and deal_id > 0 and not failed", id).Error; err != nil { - return err + return []getContentResponse{}, err } resp.Deals = deals @@ -1523,6 +1321,25 @@ func (s *Server) handleGetContentByCid(c echo.Context) error { out = append(out, resp) } + return out, nil +} + +// handleGetContentByCid godoc +// @Summary Get Content by Cid +// @Description This endpoint returns the content record associated with a CID +// @Tags public +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Param cid path string true "Cid" +// @Router /public/by-cid/{cid} [get] +func (s *Server) handleGetContentByCid(c echo.Context) error { + cidStr := c.Param("cid") + out, err := s.getContentByCid(cidStr) + if err != nil { + return err + } return c.JSON(http.StatusOK, out) } @@ -1613,8 +1430,8 @@ func (s *Server) handleMakeDeal(c echo.Context, u *util.User) error { } } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", req.ContentID).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", req.ContentID).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, @@ -1625,7 +1442,7 @@ func (s *Server) handleMakeDeal(c echo.Context, u *util.User) error { return err } - id, err := s.CM.MakeDealWithMiner(ctx, cont, addr) + id, err := s.CM.MakeDealWithMiner(ctx, content, addr) if err != nil { return err } @@ -1671,12 +1488,12 @@ func (s *Server) handleTransferStatus(c echo.Context) error { return err } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", deal.Content).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", deal.Content).Error; err != nil { return err } - status, err := s.CM.GetTransferStatus(c.Request().Context(), &deal, cont.Cid.CID, cont.Location) + status, err := s.CM.GetTransferStatus(c.Request().Context(), &deal, content.Cid.CID, content.Location) if err != nil { return err } @@ -1699,12 +1516,12 @@ func (s *Server) handleTransferStatusByID(c echo.Context) error { return err } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", deal.Content).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", deal.Content).Error; err != nil { return err } - status, err := s.CM.GetTransferStatus(c.Request().Context(), &deal, cont.Cid.CID, cont.Location) + status, err := s.CM.GetTransferStatus(c.Request().Context(), &deal, content.Cid.CID, content.Location) if err != nil { return err } @@ -1765,8 +1582,8 @@ func (s *Server) handleTransferRestart(c echo.Context) error { return err } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", deal.Content).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", deal.Content).Error; err != nil { return err } @@ -1787,7 +1604,7 @@ func (s *Server) handleTransferRestart(c echo.Context) error { return err } - if err := s.CM.RestartTransfer(ctx, cont.Location, chanid, deal); err != nil { + if err := s.CM.RestartTransfer(ctx, content.Location, chanid, deal); err != nil { return err } return nil @@ -1917,7 +1734,7 @@ type getInvitesResp struct { // handleAdminGetInvites godoc // @Summary Get Estuary invites // @Description This endpoint is used to list all estuary invites. -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -1940,7 +1757,7 @@ func (s *Server) handleAdminGetInvites(c echo.Context) error { // handleAdminCreateInvite godoc // @Summary Create an Estuary invite // @Description This endpoint is used to create an estuary invite. -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -2667,120 +2484,6 @@ func (s *Server) handleGetMinerDeals(c echo.Context) error { return c.JSON(http.StatusOK, deals) } -type bandwidthResponse struct { - TotalOut int64 `json:"totalOut"` -} - -// handleGetContentBandwidth godoc -// @Summary Get content bandwidth -// @Description This endpoint returns content bandwidth -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param content path string true "Content ID" -// @Router /content/bw-usage/{content} [get] -func (s *Server) handleGetContentBandwidth(c echo.Context, u *util.User) error { - contID, err := strconv.Atoi(c.Param("content")) - if err != nil { - return err - } - - var content util.Content - if err := s.DB.First(&content, contID).Error; err != nil { - if xerrors.Is(err, gorm.ErrRecordNotFound) { - return &util.HttpError{ - Code: http.StatusNotFound, - Reason: util.ERR_RECORD_NOT_FOUND, - Details: fmt.Sprintf("content: %d was not found", contID), - } - } - return err - } - - if err := util.IsContentOwner(u.ID, content.UserID); err != nil { - return err - } - - // select SUM(size * reads) from obj_refs left join objects on obj_refs.object = objects.id where obj_refs.content = 42; - var bw int64 - if err := s.DB.Model(util.ObjRef{}). - Select("SUM(size * reads)"). - Where("obj_refs.content = ?", content.ID). - Joins("left join objects on obj_refs.object = objects.id"). - Scan(&bw).Error; err != nil { - return err - } - - return c.JSON(http.StatusOK, &bandwidthResponse{ - TotalOut: bw, - }) -} - -// handleGetAggregatedForContent godoc -// @Summary Get aggregated content stats -// @Description This endpoint returns aggregated content stats -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param content path string true "Content ID" -// @Router /content/aggregated/{content} [get] -func (s *Server) handleGetAggregatedForContent(c echo.Context, u *util.User) error { - contID, err := strconv.Atoi(c.Param("content")) - if err != nil { - return err - } - - var content util.Content - if err := s.DB.First(&content, "id = ?", contID).Error; err != nil { - if xerrors.Is(err, gorm.ErrRecordNotFound) { - return &util.HttpError{ - Code: http.StatusNotFound, - Reason: util.ERR_RECORD_NOT_FOUND, - Details: fmt.Sprintf("miner: %d was not found", contID), - } - } - return err - } - - if err := util.IsContentOwner(u.ID, content.UserID); err != nil { - return err - } - - var sub []util.Content - if err := s.DB.Find(&sub, "aggregated_in = ?", contID).Error; err != nil { - return err - } - return c.JSON(http.StatusOK, sub) -} - -// handleGetContentFailures godoc -// @Summary List all failures for a content -// @Description This endpoint returns all failures for a content -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param content path string true "Content ID" -// @Router /content/failures/{content} [get] -func (s *Server) handleGetContentFailures(c echo.Context, u *util.User) error { - cont, err := strconv.Atoi(c.Param("content")) - if err != nil { - return err - } - - var errs []model.DfeRecord - if err := s.DB.Find(&errs, "content = ?", cont).Error; err != nil { - return err - } - - return c.JSON(http.StatusOK, errs) -} - func (s *Server) handleAdminGetStagingZones(c echo.Context) error { s.CM.BucketLk.Lock() defer s.CM.BucketLk.Unlock() @@ -2817,12 +2520,12 @@ func (s *Server) handleRunOffloadingBucket(c echo.Context) error { } func (s *Server) handleOffloadContent(c echo.Context) error { - cont, err := strconv.Atoi(c.Param("content")) + content, err := strconv.Atoi(c.Param("content")) if err != nil { return err } - removed, err := s.CM.OffloadContents(c.Request().Context(), []uint{uint(cont)}) + removed, err := s.CM.OffloadContents(c.Request().Context(), []uint{uint(content)}) if err != nil { return err } @@ -2866,30 +2569,30 @@ func (s *Server) handleMoveContent(c echo.Context) error { } func (s *Server) handleRefreshContent(c echo.Context) error { - cont, err := strconv.Atoi(c.Param("content")) + content, err := strconv.Atoi(c.Param("content")) if err != nil { return err } - if err := s.CM.RefreshContent(c.Request().Context(), uint(cont)); err != nil { + if err := s.CM.RefreshContent(c.Request().Context(), uint(content)); err != nil { return c.JSON(500, map[string]string{"error": err.Error()}) } return c.JSON(http.StatusOK, map[string]string{}) } func (s *Server) handleReadLocalContent(c echo.Context) error { - cont, err := strconv.Atoi(c.Param("content")) + contentid, err := strconv.Atoi(c.Param("content")) if err != nil { return err } var content util.Content - if err := s.DB.First(&content, "id = ?", cont).Error; err != nil { + if err := s.DB.First(&content, "id = ?", contentid).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, Reason: util.ERR_RECORD_NOT_FOUND, - Details: fmt.Sprintf("content: %d was not found", cont), + Details: fmt.Sprintf("content: %d was not found", content), } } return err @@ -3600,11 +3303,11 @@ func (s *Server) handleAddContentsToBucket(c echo.Context, u *util.User) error { path, err := constructDirectoryPath(c.QueryParam(BucketDir)) var bucketrefs []buckets.BucketRef - for _, cont := range contents { - fullPath := filepath.Join(path, cont.Name) + for _, content := range contents { + fullPath := filepath.Join(path, content.Name) bucketrefs = append(bucketrefs, buckets.BucketRef{ Bucket: bucket.ID, - Content: cont.ID, + Content: content.ID, Path: &fullPath, }) } @@ -3712,7 +3415,6 @@ func (s *Server) handleCommitBucket(c echo.Context, u *util.User) error { ctx := c.Request().Context() makeDeal := false - pinstatus, err := s.CM.PinContent(ctx, u.ID, bucketNode.Cid(), bucketNode.Cid().String(), nil, origins, 0, nil, makeDeal) if err != nil { return err @@ -3787,14 +3489,14 @@ func (s *Server) handleGetBucketContents(c echo.Context, u *util.User) error { if relp == "." { // Query directory is the complete path containing the content. // trying to list a CID queryDir, not allowed - if r.Type == util.Directory { + if r.Type == util.ContentTypeDirectory { return c.JSON(http.StatusBadRequest, fmt.Errorf("listing CID directories is not allowed")) } out = append(out, bucketListResponse{ Name: r.Name, Size: r.Size, - ContID: r.ID, + ContentID: r.ID, Cid: &util.DbCID{CID: r.Cid.CID}, Dir: queryDir, uuid: uuid, @@ -3810,7 +3512,7 @@ func (s *Server) handleGetBucketContents(c echo.Context, u *util.User) error { // Name: relp, // Type: Dir, // Size: r.Size, - // ContID: r.ID, + // ContentID: r.ID, // Cid: &r.Cid, // Dir: queryDir, // uuid: uuid, @@ -3841,7 +3543,7 @@ func (s *Server) handleGetBucketContents(c echo.Context, u *util.User) error { } //var contentType CidType - //contentType = File + //contentType = ContentTypeFile //if r.Type == util.Directory { // contentType = Dir //} @@ -3849,7 +3551,7 @@ func (s *Server) handleGetBucketContents(c echo.Context, u *util.User) error { // Name: r.Name, // Type: contentType, // Size: r.Size, - // ContID: r.ID, + // ContentID: r.ID, // Cid: &util.DbCID{CID: r.Cid.CID}, // Dir: queryDir, // uuid: uuid, @@ -4151,7 +3853,7 @@ func (s *Server) handleGetBucketDiag(c echo.Context) error { // handleGetStagingZoneForUser godoc // @Summary Get staging zone for user // @Description This endpoint is used to get staging zone for user. -// @Tags content +// @Tags contents // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -4363,7 +4065,7 @@ func (s *Server) computeDealMetrics() ([]*dealMetricsInfo, error) { type dealQuery struct { DealID int64 - Contentid uint + ContentID uint Cid util.DbCID Aggregate bool } @@ -4376,7 +4078,7 @@ type dealPairs struct { // handleGetAllDealsForUser godoc // @Summary Get all deals for a user // @Description This endpoint is used to get all deals for a user -// @Tags content +// @Tags deals // @Produce json // @Success 200 {object} string // @Failure 400 {object} util.HttpError @@ -4384,7 +4086,7 @@ type dealPairs struct { // @Param begin query string true "Begin" // @Param duration query string true "Duration" // @Param all query string true "All" -// @Router /content/all-deals [get] +// @Router /deals [get] func (s *Server) handleGetAllDealsForUser(c echo.Context, u *util.User) error { begin := time.Now().Add(time.Hour * 24) @@ -4420,19 +4122,19 @@ func (s *Server) handleGetAllDealsForUser(c echo.Context, u *util.User) error { contmap := make(map[uint][]dealQuery) for _, d := range deals { - contmap[d.Contentid] = append(contmap[d.Contentid], d) + contmap[d.ContentID] = append(contmap[d.ContentID], d) } var out []dealPairs - for cont, deals := range contmap { + for content, deals := range contmap { var dp dealPairs if deals[0].Aggregate { - var conts []util.Content - if err := s.DB.Model(util.Content{}).Where("aggregated_in = ?", cont).Select("cid").Scan(&conts).Error; err != nil { + var contents []util.Content + if err := s.DB.Model(util.Content{}).Where("aggregated_in = ?", content).Select("cid").Scan(&contents).Error; err != nil { return err } - for _, c := range conts { + for _, c := range contents { dp.Cids = append(dp.Cids, c.Cid.CID) } } else { @@ -4469,8 +4171,8 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { return err } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", val).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", val).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, @@ -4482,17 +4184,17 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { } var u util.User - if err := s.DB.First(&u, "id = ?", cont.UserID).Error; err != nil { + if err := s.DB.First(&u, "id = ?", content.UserID).Error; err != nil { return err } var deals []model.ContentDeal - if err := s.DB.Find(&deals, "content = ? and not failed", cont.ID).Error; err != nil { + if err := s.DB.Find(&deals, "content = ? and not failed", content.ID).Error; err != nil { return err } var aggr []util.Content - if err := s.DB.Find(&aggr, "aggregated_in = ?", cont.ID).Error; err != nil { + if err := s.DB.Find(&aggr, "aggregated_in = ?", content.ID).Error; err != nil { return err } @@ -4502,13 +4204,13 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { } var fixedAggregateSize bool - if cont.Aggregate && cont.Size == 0 && cont.Active { + if content.Aggregate && content.Size == 0 && content.Active { // if this is an active aggregate and its size is zero, then that means we // failed at some point while updating the aggregate, we can fix that switch len(aggrLocs) { case 0: - log.Warnf("content %d has nothing aggregated in it", cont.ID) + log.Warnf("content %d has nothing aggregated in it", content.ID) case 1: var zSize int64 for _, zc := range aggr { @@ -4516,14 +4218,14 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { } z := &contentmgr.ContentStagingZone{ - ZoneOpened: cont.CreatedAt, + ZoneOpened: content.CreatedAt, Contents: aggr, MinSize: s.cfg.Content.MinSize, MaxSize: s.cfg.Content.MaxSize, CurSize: zSize, - User: cont.UserID, - ContID: cont.ID, - Location: cont.Location, + User: content.UserID, + ContentID: content.ID, + Location: content.Location, } if err := s.CM.AggregateStagingZone(ctx, z, aggrLocs); err != nil { @@ -4533,33 +4235,33 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { default: // well that sucks, this will need migration - log.Warnf("content %d has messed up aggregation", cont.ID) + log.Warnf("content %d has messed up aggregation", content.ID) } } - if cont.Location != constants.ContentLocationLocal { + if content.Location != constants.ContentLocationLocal { return c.JSON(http.StatusOK, map[string]interface{}{ "deals": deals, - "content": cont, + "content": content, "error": "requested content was not local to this instance, cannot check health right now", "fixedAggregateSize": fixedAggregateSize, }) } - _, rootFetchErr := s.Node.Blockstore.Get(ctx, cont.Cid.CID) + _, rootFetchErr := s.Node.Blockstore.Get(ctx, content.Cid.CID) if rootFetchErr != nil { log.Errorf("failed to fetch root: %s", rootFetchErr) } - if cont.Aggregate && rootFetchErr != nil { + if content.Aggregate && rootFetchErr != nil { // if this is an aggregate and we dont have the root, thats funky, but we can regenerate the root nd, err := s.CM.CreateAggregate(ctx, aggr) if err != nil { return fmt.Errorf("failed to create aggregate: %w", err) } - if nd.Cid() != cont.Cid.CID { - return fmt.Errorf("recreated aggregate cid does not match one recorded in db: %s != %s", nd.Cid(), cont.Cid.CID) + if nd.Cid() != content.Cid.CID { + return fmt.Errorf("recreated aggregate cid does not match one recorded in db: %s != %s", nd.Cid(), content.Cid.CID) } if err := s.Node.Blockstore.Put(ctx, nd); err != nil { @@ -4568,28 +4270,28 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { } var fixedAggregateLocation bool - if c.QueryParam("check-locations") != "" && cont.Aggregate { + if c.QueryParam("check-locations") != "" && content.Aggregate { // TODO: check if the contents of the aggregate are somewhere other than where the aggregate root is switch len(aggrLocs) { case 0: - log.Warnf("content %d has nothing aggregated in it", cont.ID) + log.Warnf("content %d has nothing aggregated in it", content.ID) case 1: loc := aggr[0].Location - if loc != cont.Location { + if loc != content.Location { // should be safe to send a re-aggregate command to the shuttle in question var aggrConts []drpc.AggregateContent for _, c := range aggr { aggrConts = append(aggrConts, drpc.AggregateContent{ID: c.ID, Name: c.Name, CID: c.Cid.CID}) } - if err := s.CM.SendAggregateCmd(ctx, loc, cont, aggrConts); err != nil { + if err := s.CM.SendAggregateCmd(ctx, loc, content, aggrConts); err != nil { return err } fixedAggregateLocation = true } default: // well that sucks, this will need migration - log.Warnf("content %d has messed up aggregation", cont.ID) + log.Warnf("content %d has messed up aggregation", content.ID) } } @@ -4613,7 +4315,7 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { } return util.FilterUnwalkableLinks(node.Links()), nil - }, cont.Cid.CID, cset.Visit, merkledag.Concurrent()) + }, content.Cid.CID, cset.Visit, merkledag.Concurrent()) errstr := "" if err != nil { @@ -4622,7 +4324,7 @@ func (s *Server) handleContentHealthCheck(c echo.Context) error { out := map[string]interface{}{ "user": u.Username, - "content": cont, + "content": content, "deals": deals, "traverseError": errstr, "foundBlocks": cset.Len(), @@ -5028,85 +4730,6 @@ func (s *Server) getStorageFailure(c echo.Context, u *util.User) ([]model.DfeRec return recs, nil } -// handleCreateContent godoc -// @Summary Add a new content -// @Description This endpoint adds a new content -// @Tags content -// @Produce json -// @Success 200 {object} string -// @Failure 400 {object} util.HttpError -// @Failure 500 {object} util.HttpError -// @Param req body util.ContentCreateBody true "Content" -// @Param ignore-dupes query string false "Ignore Dupes" -// @Router /content/create [post] -func (s *Server) handleCreateContent(c echo.Context, u *util.User) error { - var req util.ContentCreateBody - if err := c.Bind(&req); err != nil { - return err - } - - rootCID, err := cid.Decode(req.Root) - if err != nil { - return err - } - - if c.QueryParam("ignore-dupes") == "true" { - isDup, err := s.isDupCIDContent(c, rootCID, u) - if err != nil || isDup { - return err - } - } - - var bucket buckets.Bucket - if req.BucketID != "" { - if err := s.DB.First(&bucket, "uuid = ?", req.BucketID).Error; err != nil { - return err - } - - if err := util.IsBucketOwner(u.ID, bucket.UserID); err != nil { - return err - } - } - - content := &util.Content{ - Cid: util.DbCID{CID: rootCID}, - Name: req.Name, - Active: false, - Pinning: true, - UserID: u.ID, - Replication: s.cfg.Replication, - Location: req.Location, - } - - if err := s.DB.Create(content).Error; err != nil { - return err - } - - if req.BucketID != "" { - if req.BucketDir == "" { - req.BucketDir = "/" - } - - sp, err := sanitizePath(req.BucketDir) - if err != nil { - return err - } - - path := &sp - if err := s.DB.Create(&buckets.BucketRef{ - Bucket: bucket.ID, - Content: content.ID, - Path: path, - }).Error; err != nil { - return err - } - } - - return c.JSON(http.StatusOK, util.ContentCreateResponse{ - ID: content.ID, - }) -} - type claimMinerBody struct { Miner address.Address `json:"miner"` Claim string `json:"claim"` @@ -5261,17 +4884,17 @@ func (s *Server) handleAdminGetProgress(c echo.Context) error { return err } - var conts []contCheck + var contents []contCheck if err := s.DB.Model(util.Content{}).Where("not aggregated_in > 0 and active"). Select("id, (?) as num_deals", s.DB.Model(model.ContentDeal{}). Where("content = contents.id and deal_id > 0 and not failed"). Select("count(1)"), - ).Scan(&conts).Error; err != nil { + ).Scan(&contents).Error; err != nil { return err } - for _, c := range conts { + for _, c := range contents { if c.NumDeals >= s.cfg.Replication { out.GoodContents = append(out.GoodContents, c.ID) } else if c.NumDeals > 0 { @@ -5291,8 +4914,8 @@ func (s *Server) handleAdminBreakAggregate(c echo.Context) error { return err } - var cont util.Content - if err := s.DB.First(&cont, "id = ?", aggr).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "id = ?", aggr).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { return &util.HttpError{ Code: http.StatusNotFound, @@ -5303,7 +4926,7 @@ func (s *Server) handleAdminBreakAggregate(c echo.Context) error { return err } - if !cont.Aggregate { + if !content.Aggregate { return fmt.Errorf("content %d is not an aggregate", aggr) } @@ -5331,7 +4954,7 @@ func (s *Server) handleAdminBreakAggregate(c echo.Context) error { } return util.FilterUnwalkableLinks(node.Links()), nil - }, cont.Cid.CID, cset.Visit, merkledag.Concurrent()) + }, content.Cid.CID, cset.Visit, merkledag.Concurrent()) res := map[string]interface{}{ "content": c, "foundBlocks": cset.Len(), @@ -5533,24 +5156,24 @@ func (s *Server) handleShuttleRepinAll(c echo.Context) error { defer rows.Close() for rows.Next() { - var cont util.Content - if err := s.DB.ScanRows(rows, &cont); err != nil { + var content util.Content + if err := s.DB.ScanRows(rows, &content); err != nil { return err } var origins []*peer.AddrInfo // when refreshing pinning queue, use content origins if available - if cont.Origins != "" { - _ = json.Unmarshal([]byte(cont.Origins), &origins) // no need to handle or log err, its just a nice to have + if content.Origins != "" { + _ = json.Unmarshal([]byte(content.Origins), &origins) // no need to handle or log err, its just a nice to have } if err := s.CM.SendShuttleCommand(c.Request().Context(), handle, &drpc.Command{ Op: drpc.CMD_AddPin, Params: drpc.CmdParams{ AddPin: &drpc.AddPin{ - DBID: cont.ID, - UserId: cont.UserID, - Cid: cont.Cid.CID, + DBID: content.ID, + UserId: content.UserID, + Cid: content.Cid.CID, Peers: origins, }, }, @@ -5611,7 +5234,7 @@ type bucketListResponse struct { Name string `json:"name"` Type CidType `json:"type"` Size int64 `json:"size"` - ContID uint `json:"contId"` + ContentID uint `json:"contentid"` Cid *util.DbCID `json:"cid,omitempty"` Dir string `json:"dir"` uuid string `json:"uuid"` @@ -5676,8 +5299,8 @@ func (s *Server) checkGatewayRedirect(proto string, cc cid.Cid, segs []string) ( return fmt.Sprintf("https://%s/%s/%s/%s", bestGateway, proto, cc, strings.Join(segs, "/")), nil } - var cont util.Content - if err := s.DB.First(&cont, "cid = ? and active and not offloaded", &util.DbCID{CID: cc}).Error; err != nil { + var content util.Content + if err := s.DB.First(&content, "cid = ? and active and not offloaded", &util.DbCID{CID: cc}).Error; err != nil { if xerrors.Is(err, gorm.ErrRecordNotFound) { // if not pinned on any shuttle or local, check dweb return fmt.Sprintf("https://%s/%s/%s/%s", bestGateway, proto, cc, strings.Join(segs, "/")), nil @@ -5685,16 +5308,16 @@ func (s *Server) checkGatewayRedirect(proto string, cc cid.Cid, segs []string) ( return "", err } - if cont.Location == constants.ContentLocationLocal { + if content.Location == constants.ContentLocationLocal { return "", nil } - if !s.CM.ShuttleIsOnline(cont.Location) { + if !s.CM.ShuttleIsOnline(content.Location) { return fmt.Sprintf("https://%s/%s/%s/%s", bestGateway, proto, cc, strings.Join(segs, "/")), nil } var shuttle model.Shuttle - if err := s.DB.First(&shuttle, "handle = ?", cont.Location).Error; err != nil { + if err := s.DB.First(&shuttle, "handle = ?", content.Location).Error; err != nil { return "", err } return fmt.Sprintf("https://%s/gw/%s/%s/%s", shuttle.Host, proto, cc, strings.Join(segs, "/")), nil diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 7e643deb..23ca2539 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -98,7 +98,7 @@ type PinningOperation struct { Status types.PinningStatus UserId uint - ContId uint + ContentID uint Replace uint LastUpdate time.Time @@ -119,12 +119,12 @@ type PinningOperation struct { } type PinningOperationData struct { - ContId uint + ContentID uint } func getPinningData(po *PinningOperation) PinningOperationData { return PinningOperationData{ - ContId: po.ContId, + ContentID: po.ContentID, } } @@ -195,7 +195,7 @@ func (pm *PinManager) doPinning(op *PinningOperation) error { op.SizeFetched += size }); err != nil { op.fail(err) - if err2 := pm.StatusChangeFunc(op.ContId, op.Location, types.PinningStatusFailed); err2 != nil { + if err2 := pm.StatusChangeFunc(op.ContentID, op.Location, types.PinningStatusFailed); err2 != nil { return err2 } return errors.Wrap(err, "shuttle RunPinFunc failed") @@ -279,7 +279,7 @@ func (pm *PinManager) closeQueueDataStructures() { func createLevelDBKey(value PinningOperationData) []byte { var buffer bytes.Buffer enc := gob.NewEncoder(&buffer) - if err := enc.Encode(value.ContId); err != nil { + if err := enc.Encode(value.ContentID); err != nil { log.Fatal("Unable to encode value") } return buffer.Bytes() diff --git a/pinner/pinmgr_test.go b/pinner/pinmgr_test.go index 7ce2a9d7..b2436c87 100644 --- a/pinner/pinmgr_test.go +++ b/pinner/pinmgr_test.go @@ -14,8 +14,8 @@ import ( var countLock sync.Mutex -func onPinStatusUpdate(cont uint, location string, status types.PinningStatus) error { - //fmt.Println("onPinStatusUpdate", status, cont) +func onPinStatusUpdate(content uint, location string, status types.PinningStatus) error { + //fmt.Println("onPinStatusUpdate", status, content) return nil } func newManager(count *int) *PinManager { @@ -55,7 +55,7 @@ func newPinData(name string, userid int, contid int) PinningOperation { Name: name, UserId: uint(userid), lk: sync.Mutex{}, - ContId: uint(contid), + ContentID: uint(contid), } } diff --git a/pinning.go b/pinning.go index cf98f486..ca454e35 100644 --- a/pinning.go +++ b/pinning.go @@ -35,7 +35,7 @@ func (s *Server) doPinning(ctx context.Context, op *pinner.PinningOperation, cb if op.Replace > 0 { go func() { if err := s.CM.RemoveContent(ctx, op.Replace, true); err != nil { - log.Infof("failed to remove content in replacement: %d with: %d", op.Replace, op.ContId) + log.Infof("failed to remove content in replacement: %d with: %d", op.Replace, op.ContentID) } }() } @@ -50,12 +50,12 @@ func (s *Server) doPinning(ctx context.Context, op *pinner.PinningOperation, cb dserv := merkledag.NewDAGService(bserv) dsess := dserv.Session(ctx) - if err := s.CM.AddDatabaseTrackingToContent(ctx, op.ContId, dsess, op.Obj, cb); err != nil { + if err := s.CM.AddDatabaseTrackingToContent(ctx, op.ContentID, dsess, op.Obj, cb); err != nil { return err } if op.MakeDeal { - s.CM.ToCheck(op.ContId) + s.CM.ToCheck(op.ContentID) } // this provide call goes out immediately @@ -326,7 +326,7 @@ func (s *Server) handleAddPin(e echo.Context, u *util.User) error { cols = []*buckets.BucketRef{ { Bucket: srchCol.ID, - Path: colpath, + Path: colpath, }, } } diff --git a/util/content.go b/util/content.go index 6ef7aea2..35846f37 100644 --- a/util/content.go +++ b/util/content.go @@ -4,15 +4,20 @@ import ( "context" "fmt" "net/http" + "path" "path/filepath" "strings" "time" "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" format "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs" + "github.com/ipld/go-car" + "github.com/labstack/echo/v4" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "gorm.io/gorm" ) @@ -20,9 +25,19 @@ import ( type ContentType int64 const ( - Unknown ContentType = iota - File - Directory + ContentTypeUnknown ContentType = iota + ContentTypeFile + ContentTypeDirectory +) + +type UploadType string + +const ( + UploadTypeDefault UploadType = "" + UploadTypeFile UploadType = "file" + UploadTypeCID UploadType = "cid" + UploadTypeCar UploadType = "car" + UploadTypeUrl UploadType = "url" ) type ContentInBucket struct { @@ -121,11 +136,160 @@ type ObjRef struct { Offloaded uint } +type UploadedContent struct { + Length int64 + Filename string + CID cid.Cid + Origins []*peer.AddrInfo +} + +// LoadContentFromRequest reads a POST /contents request and loads the content from it +// It treats every different case of content upload: file (formData, CID, CAR or URL) +// Returns (UploadedContent, contentLen, filename, error) +func LoadContentFromRequest(c echo.Context, ctx context.Context, uploadType UploadType, bs blockstore.Blockstore, dserv ipld.DAGService) (UploadedContent, error) { + // for all three upload types + // get len + // get filename + // import file and get cid + content := UploadedContent{} + switch uploadType { + case UploadTypeFile: + // get file from formData + form, err := c.MultipartForm() + if err != nil { + return UploadedContent{}, xerrors.Errorf("invalid formData for 'file' upload option: %w", err) + } + defer form.RemoveAll() + mpf, err := c.FormFile("data") + if err != nil { + return UploadedContent{}, xerrors.Errorf("invalid formData for 'file' upload option: %w", err) + } + + // Get len + content.Length = mpf.Size + + // Get filename + content.Filename = mpf.Filename + if fvname := c.FormValue("filename"); fvname != "" { + content.Filename = fvname + } + + // import file and get UploadTypeCID + fi, err := mpf.Open() + if err != nil { + return UploadedContent{}, err + } + defer fi.Close() + nd, err := ImportFile(dserv, fi) + if err != nil { + return UploadedContent{}, err + } + content.CID = nd.Cid() + + case UploadTypeCar: + // get CAR file from request body + // import file and get UploadTypeCID + defer c.Request().Body.Close() + header, err := car.LoadCar(ctx, bs, c.Request().Body) + if err != nil { + return UploadedContent{}, err + } + if len(header.Roots) != 1 { + // if someone wants this feature, let me know + return UploadedContent{}, xerrors.Errorf("cannot handle uploading car files with multiple roots") + } + content.CID = header.Roots[0] + + // Get filename + // TODO: how to specify filename? + content.Filename = content.CID.String() + if qpname := c.QueryParam("filename"); qpname != "" { + content.Filename = qpname + } + + // Get len + // TODO: uncomment and fix this + // bdWriter := &bytes.Buffer{} + // bdReader := io.TeeReader(c.Request().Body, bdWriter) + + // bdSize, err := io.Copy(ioutil.Discard, bdReader) + // if err != nil { + // return err + // } + + // if bdSize > util.MaxDealContentSize { + // return &util.HttpError{ + // Code: http.StatusBadRequest, + // Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + // Details: fmt.Sprintf("content size %d bytes, is over upload size of limit %d bytes, and content splitting is not enabled, please reduce the content size", bdSize, util.MaxDealContentSize), + // } + // } + + // c.Request().Body = ioutil.NopCloser(bdWriter) + content.Length = 0 // zero since we're not checking the length of this content so it doesn't break the limit check (bad) + + case UploadTypeCID: + // get UploadTypeCID from POST body + var params ContentAddIpfsBody + if err := c.Bind(¶ms); err != nil { + return UploadedContent{}, err + } + + // Get filename + content.Filename = params.Name + if content.Filename == "" { + content.Filename = params.Root + } + + // get UploadTypeCID + cid, err := cid.Decode(params.Root) + if err != nil { + return UploadedContent{}, err + } + content.CID = cid + + // Can't get len (will be gotten during pinning) + content.Length = 0 + + // origins are needed for pinning later on + var origins []*peer.AddrInfo + for _, p := range params.Peers { + ai, err := peer.AddrInfoFromString(p) + if err != nil { + return UploadedContent{}, err + } + origins = append(origins, ai) + } + content.Origins = origins + + case UploadTypeUrl: + url := string(UploadTypeUrl) + filename := path.Base(url) + content.Filename = filename + + resp, err := http.Get(url) + if err != nil { + return UploadedContent{}, err + } + defer resp.Body.Close() + + nd, err := ImportFile(dserv, resp.Body) + if err != nil { + return UploadedContent{}, err + } + content.CID = nd.Cid() + + default: + return UploadedContent{}, xerrors.Errorf("invalid type, need 'file', 'cid' or 'car'. Got %s", uploadType) + } + return content, nil +} + // FindCIDType checks if a pinned CID (root) is a file, a dir or unknown -// Returns dbmgr.File or dbmgr.Directory on success -// Returns dbmgr.Unknown otherwise +// Returns dbmgr.ContentTypeFile or dbmgr.ContentTypeDirectory on success +// Returns dbmgr.ContentTypeUnknown otherwise func FindCIDType(ctx context.Context, root cid.Cid, dserv ipld.NodeGetter) (contentType ContentType) { - contentType = Unknown + contentType = ContentTypeUnknown nilCID := cid.Cid{} if root == nilCID || dserv == nil { return @@ -136,14 +300,14 @@ func FindCIDType(ctx context.Context, root cid.Cid, dserv ipld.NodeGetter) (cont return } - contentType = File + contentType = ContentTypeFile fsNode, err := TryExtractFSNode(nd) if err != nil { return } if fsNode.IsDir() { - contentType = Directory + contentType = ContentTypeDirectory } return } diff --git a/util/shuttle.go b/util/shuttle.go index a69807bc..b7b00275 100644 --- a/util/shuttle.go +++ b/util/shuttle.go @@ -34,7 +34,7 @@ type ShuttleListResponse struct { type ShuttleCreateContentBody struct { ContentCreateBody - Buckets []string `json:"buckets"` + Buckets []string `json:"buckets"` DagSplitRoot uint `json:"dagSplitRoot"` User uint `json:"user"` }