diff --git a/api/v1/api.go b/api/v1/api.go index 852e1e51..0e8f5208 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -126,6 +126,7 @@ func (s *apiV1) RegisterRoutes(e *echo.Echo) { user.PUT("/password", util.WithUser(s.handleUserChangePassword)) user.PUT("/address", util.WithUser(s.handleUserChangeAddress)) user.GET("/stats", util.WithUser(s.handleGetUserStats)) + user.GET("/utilization", util.WithUser(s.handleGetUserUtilization)) userMiner := user.Group("/miner") userMiner.POST("/claim", util.WithUser(s.handleUserClaimMiner)) diff --git a/api/v1/handlers.go b/api/v1/handlers.go index 6b6c8c14..3cf24013 100644 --- a/api/v1/handlers.go +++ b/api/v1/handlers.go @@ -380,6 +380,20 @@ func (s *apiV1) handleAddCar(c echo.Context, u *util.User) error { return s.redirectContentAdding(c, u) } + // Get user storage capacity + usc, err := s.getUserStorageCapacity(u) + if err != nil { + return err + } + + if !usc.ValidateThreshold() { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)), + } + } + // if splitting is disabled and uploaded content size is greater than content size limit // reject the upload, as it will only get stuck and deals will never be made for it // if !u.FlagSplitContent() { @@ -492,6 +506,25 @@ func (s *apiV1) loadCar(ctx context.Context, bs blockstore.Blockstore, r io.Read return car.LoadCar(ctx, bs, r) } +func (s *apiV1) getUserStorageCapacity(user *util.User) (*util.UsersStorageCapacity, error) { + var usc *util.UsersStorageCapacity + err := s.db.First(&usc, "user_id = ?", user.ID).Error + + if err != nil || usc.IsSyncNeeded() { + var usage util.Utilization + if err := s.db.Raw(`SELECT (SELECT SUM(size) FROM contents where user_id = ? AND created_at >= ? AND NOT aggregate AND active AND deleted_at IS NULL) as total_size`, user.ID, util.CutOverUtilizationDate). + Scan(&usage).Error; err != nil { + return usc, err + } + usc.UserId = user.ID + usc.Size = usage.TotalSize + usc.LastSyncAt = time.Now() + s.db.Save(&usc) + } + + return usc, nil +} + // handleAdd godoc // @Summary Add new content // @Description This endpoint is used to upload new content. @@ -539,6 +572,20 @@ func (s *apiV1) handleAdd(c echo.Context, u *util.User) error { return err } + // Get user storage capacity + usc, err := s.getUserStorageCapacity(u) + if err != nil { + return err + } + + if !usc.ValidateThreshold() { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)), + } + } + // if splitting is disabled and uploaded content size is greater than content size limit // reject the upload, as it will only get stuck and deals will never be made for it if !u.FlagSplitContent() && mpf.Size > s.cfg.Content.MaxSize { @@ -2852,6 +2899,23 @@ func (s *apiV1) handleGetUserStats(c echo.Context, u *util.User) error { return c.JSON(http.StatusOK, stats) } +// handleGetUserStats godoc +// @Summary Gets User Utilization Stats +// @Description This endpoint is used to get utilization stats for the current user. +// @Tags User +// @Produce json +// @Success 200 {object} string +// @Failure 400 {object} util.HttpError +// @Failure 500 {object} util.HttpError +// @Router /user/utilization [get] +func (s *apiV1) handleGetUserUtilization(c echo.Context, u *util.User) error { + usc, err := s.getUserStorageCapacity(u) + if err != nil { + return err + } + return c.JSON(http.StatusOK, usc) +} + func (s *apiV1) newAuthTokenForUser(user *util.User, expiry time.Time, perms []string, label string, isSession bool) (*util.AuthToken, error) { if len(perms) > 1 { return nil, fmt.Errorf("invalid perms") @@ -4514,6 +4578,20 @@ func (s *apiV1) handleCreateContent(c echo.Context, u *util.User) error { return err } + // Get user storage capacity + usc, err := s.getUserStorageCapacity(u) + if err != nil { + return err + } + + if !usc.ValidateThreshold() { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)), + } + } + rootCID, err := cid.Decode(req.Root) if err != nil { return err diff --git a/api/v1/pinning.go b/api/v1/pinning.go index 4d58ea7c..052ae424 100644 --- a/api/v1/pinning.go +++ b/api/v1/pinning.go @@ -262,6 +262,20 @@ func (s *apiV1) handleAddPin(c echo.Context, u *util.User) error { overwrite = true } + // Get user storage capacity + usc, err := s.getUserStorageCapacity(u) + if err != nil { + return err + } + + if !usc.ValidateThreshold() { + return &util.HttpError{ + Code: http.StatusBadRequest, + Reason: util.ERR_CONTENT_SIZE_OVER_LIMIT, + Details: fmt.Sprintf("Reached Max Storage Threshold: %.2f TB. Please contact the Estuary Team for provisionning dedicated infrastruture if additonal storage is needed. We can be reached on the Filecoin slack under the #ecosystem-dev channel.", util.BytesToTB(usc.HardLimit)), + } + } + ignoreDuplicates := false if c.QueryParam("ignore-dupes") == "true" { ignoreDuplicates = true diff --git a/docs/docs.go b/docs/docs.go index a3273f6d..7e1d79e7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -3291,6 +3291,38 @@ const docTemplate = `{ } } }, + "/user/utilization": { + "get": { + "description": "This endpoint is used to get utilization stats for the current user.", + "produces": [ + "application/json" + ], + "tags": [ + "User" + ], + "summary": "Gets User Utilization Stats", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/util.HttpError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/util.HttpError" + } + } + } + } + }, "/viewer": { "get": { "description": "This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc.", diff --git a/docs/swagger.json b/docs/swagger.json index 697ae5c3..18a2a725 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3284,6 +3284,38 @@ } } }, + "/user/utilization": { + "get": { + "description": "This endpoint is used to get utilization stats for the current user.", + "produces": [ + "application/json" + ], + "tags": [ + "User" + ], + "summary": "Gets User Utilization Stats", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/util.HttpError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/util.HttpError" + } + } + } + } + }, "/viewer": { "get": { "description": "This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc.", diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 2f73f7e6..f995d4c6 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -2549,6 +2549,27 @@ paths: summary: Get stats for the current user tags: - User + /user/utilization: + get: + description: This endpoint is used to get utilization stats for the current user. + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/util.HttpError' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/util.HttpError' + summary: Gets User Utilization Stats + tags: + - User /viewer: get: description: This endpoint fetches viewer details such as username, permissions, address, owned miners, user settings etc. diff --git a/main.go b/main.go index 73175713..975d0795 100644 --- a/main.go +++ b/main.go @@ -405,6 +405,7 @@ func migrateSchemas(db *gorm.DB) error { &model.DealQueueTracker{}, &model.SplitQueue{}, &model.SplitQueueTracker{}, + &util.UsersStorageCapacity{}, ); err != nil { return err } diff --git a/pinner/block/block.go b/pinner/block/block.go index aa5b7418..a95b79dc 100644 --- a/pinner/block/block.go +++ b/pinner/block/block.go @@ -96,6 +96,16 @@ func (m *manager) addObjectsToDatabase(ctx context.Context, cont *util.Content, return xerrors.Errorf("failed to update content in database: %w", err) } + var usc *util.UsersStorageCapacity + err := tx.First(&usc, "user_id = ?", cont.UserID).Error + if err != nil { + usc.UserId = cont.UserID + usc.Size = 0 + } + + usc.Size += contSize + tx.Save(&usc) + // if content can be staged, stage it if contSize < m.cfg.Content.MinSize { return m.stgZoneQueueMgr.QueueContent(cont, tx, false) diff --git a/util/http.go b/util/http.go index 246832ad..74709e94 100644 --- a/util/http.go +++ b/util/http.go @@ -36,6 +36,7 @@ const ( ERR_CONTENT_ADDING_DISABLED = "ERR_CONTENT_ADDING_DISABLED" ERR_INVALID_INPUT = "ERR_INVALID_INPUT" ERR_CONTENT_SIZE_OVER_LIMIT = "ERR_CONTENT_SIZE_OVER_LIMIT" + ERR_USER_REACHED_STORAGE_TRESHOLD = "ERR_USER_REACHED_STORAGE_TRESHOLD" ERR_PEERING_PEERS_ADD_ERROR = "ERR_PEERING_PEERS_ADD_ERROR" ERR_PEERING_PEERS_REMOVE_ERROR = "ERR_PEERING_PEERS_REMOVE_ERROR" ERR_PEERING_PEERS_START_ERROR = "ERR_PEERING_PEERS_START_ERROR" diff --git a/util/misc.go b/util/misc.go index e1af71b1..c7191696 100644 --- a/util/misc.go +++ b/util/misc.go @@ -209,3 +209,8 @@ func ToMultiAddress(addr string) (multiaddr.Multiaddr, error) { } return a, nil } + +func BytesToTB(bytes int64) float64 { + tb := float64(bytes) / (1024 * 1024 * 1024 * 1024) + return tb +} diff --git a/util/users_storage_capacity.go b/util/users_storage_capacity.go new file mode 100644 index 00000000..e4160c13 --- /dev/null +++ b/util/users_storage_capacity.go @@ -0,0 +1,35 @@ +package util + +import ( + "gorm.io/gorm" + "time" +) + +type UsersStorageCapacity struct { + gorm.Model + + UserId uint `json:"user_id"` + Size int64 `json:"size" gorm:"default:0"` + SoftLimit int64 `json:"soft_limit_bytes" gorm:"default:1319413953331"` // Hardlimit*.8 + HardLimit int64 `json:"hard_limit_bytes" gorm:"default:1649267441664"` // 1.5TB + LastSyncAt time.Time `json:"last_sync_at"` +} + +type Utilization struct { + TotalSize int64 +} + +// CutOverUtilizationDate All content uploaded pass this date will count toward your user storage capacity +const CutOverUtilizationDate = "2023-05-26 00:00:00" +const SyncRefreshInHours = 24 + +func (usc *UsersStorageCapacity) ValidateThreshold() bool { + return usc.Size <= usc.HardLimit +} + +func (usc *UsersStorageCapacity) IsSyncNeeded() bool { + now := time.Now().UTC() + duration := now.Sub(usc.LastSyncAt.UTC()) + refresh := SyncRefreshInHours * time.Hour + return duration >= refresh +}