diff --git a/pinner/pinmgr.go b/pinner/pinmgr.go index 14fb0a6e..f5403aa9 100644 --- a/pinner/pinmgr.go +++ b/pinner/pinmgr.go @@ -60,14 +60,21 @@ type PinManagerOpts struct { QueueDataDir string } +type PinQueueData struct { + pinQueue *goque.PrefixQueue + pinQueueFront map[uint][]*operation.PinningOperation + pinQueueBack map[uint][]*operation.PinningOperation + N int +} + type PinManager struct { pinQueueIn chan *operation.PinningOperation pinQueueOut chan *operation.PinningOperation pinComplete chan *operation.PinningOperation duplicateGuard map[uint64]bool // track whether a content id already exists in the queue activePins map[uint]int // used to limit the number of pins per user - pinQueueCount map[uint]int // keep track of queue count per user - pinQueue *goque.PrefixQueue + pinQueueData PinQueueData + pinQueueCount map[uint]int // keep track of queue count per user pinQueueLk sync.Mutex RunPinFunc PinFunc StatusChangeFunc PinStatusFunc @@ -142,14 +149,14 @@ func newPinManager(pinfunc PinFunc, scf PinStatusFunc, opts *PinManagerOpts, log log.Fatal("Deque needs queue data dir") } duplicateGuard := buildDuplicateGuardFromPinQueue(opts.QueueDataDir, log) - pinQueue := createDQue(opts.QueueDataDir, log) + pinQueueData := createPinQueue(opts.QueueDataDir, log) //we need to have a variable pinQueueCount which keeps track in memory count in the queue //Since the disk dequeue is durable //we initialize pinQueueCount on boot by iterating through the queue - pinQueueCount := buildPinQueueCount(pinQueue, log) + pinQueueCount := buildPinQueueCount(pinQueueData.pinQueue, log) return &PinManager{ - pinQueue: pinQueue, + pinQueueData: pinQueueData, activePins: make(map[uint]int), pinQueueCount: pinQueueCount, pinQueueIn: make(chan *operation.PinningOperation, 64), @@ -187,10 +194,61 @@ func (pm *PinManager) complete(po *operation.PinningOperation) { po.Complete() } -func (pm *PinManager) PinQueueSize() int { +func (pm *PinManager) PinQueueSizeSafe() int { pm.pinQueueLk.Lock() defer pm.pinQueueLk.Unlock() - return int(pm.pinQueue.Length()) + return int(len(pm.pinQueueData.pinQueueFront)) + int(len(pm.pinQueueData.pinQueueBack)) + int(pm.pinQueueData.pinQueue.Length()) +} + +func (pq PinQueueData) popBack(UserId uint) (po *operation.PinningOperation) { + next := pq.pinQueueBack[UserId][0] + if len(pq.pinQueueBack[UserId]) == 1 { + delete(pq.pinQueueBack, UserId) + } else { + pq.pinQueueBack[UserId] = pq.pinQueueBack[UserId][1:] // TODO check if this is bad algorithm + } + return next +} + +func (pq PinQueueData) popFront(UserId uint) (po *operation.PinningOperation) { + next := pq.pinQueueFront[UserId][0] + if len(pq.pinQueueFront[UserId]) == 1 { + delete(pq.pinQueueFront, UserId) + } else { + pq.pinQueueFront[UserId] = pq.pinQueueFront[UserId][1:] // TODO check if this is bad algorithm + } + return next +} + +func (pq PinQueueData) Enqueue(UserId uint, po *operation.PinningOperation) error { + q := pq.pinQueueFront[UserId] + pq.pinQueueFront[UserId] = append(q, po) + + //move front to disk dequeue + if len(pq.pinQueueFront[UserId]) > pq.N { + opBytes, err := encodeMsgPack(pq.pinQueueFront[UserId]) + if err != nil { + return err + } + _, err = pq.pinQueue.Enqueue(getUserForQueue(UserId), opBytes) + if err != nil { + return err + } + delete(pq.pinQueueFront, UserId) + } + return nil +} + +//get size of leveldb queue for a user +func (pm *PinManager) PinQueueSizeUser(userId uint) int { + return pm.pinQueueCount[userId] - len(pm.pinQueueData.pinQueueFront[userId]) - len(pm.pinQueueData.pinQueueBack[userId]) +} +func (pm *PinManager) PinQueueSize() int { + total := 0 + for u := range pm.pinQueueCount { + total += pm.pinQueueCount[u] + } + return total } func (pm *PinManager) Add(op *operation.PinningOperation) { @@ -225,8 +283,37 @@ func (pm *PinManager) doPinning(po *operation.PinningOperation) error { return nil } +func (pm *PinManager) popUser(user uint) *operation.PinningOperation { + + if len(pm.pinQueueData.pinQueueBack[user]) > 0 { + return pm.pinQueueData.popBack(user) + } else { + pinQueueLength := pm.PinQueueSizeUser(user) + if pinQueueLength > 0 { + item, err := pm.pinQueueData.pinQueue.Dequeue(getUserForQueue(user)) + if err != nil { + pm.log.Fatal(err) + } + + // Assert type of the response to an Item pointer so we can work with it + newBackQueue, err := decodeMsgPack(item.Value) + if err != nil { + pm.log.Fatal("Cannot decode PinningOperation pointer") + } + + //read N objects into pinQueueFront, and pop and return + pm.pinQueueData.pinQueueBack[user] = newBackQueue + return pm.pinQueueData.popBack(user) + } + if len(pm.pinQueueData.pinQueueFront[user]) > 0 { + return pm.pinQueueData.popFront(user) + } + + } + return nil +} func (pm *PinManager) popNextPinOp() *operation.PinningOperation { - if pm.pinQueue.Length() == 0 { + if pm.PinQueueSize() == 0 { // todo better length return nil // no content in queue } @@ -249,6 +336,7 @@ func (pm *PinManager) popNextPinOp() *operation.PinningOperation { } } } + if minCount >= pm.maxActivePerUser && user != 0 { //return nil if the min count is greater than the limit and user is not 0 //TODO investigate whether we should pop the work off anyway and not return nil @@ -259,37 +347,22 @@ func (pm *PinManager) popNextPinOp() *operation.PinningOperation { return nil } - // Dequeue the next item in the queue - item, err := pm.pinQueue.Dequeue(getUserForQueue(user)) + // Dequeue the next item in the queue and check Front then leveldb then Back + next := pm.popUser(user) + pm.pinQueueCount[user]-- if pm.pinQueueCount[user] == 0 { delete(pm.pinQueueCount, user) } pm.activePins[user]++ - // no item in the queue for that query - if err == goque.ErrOutOfBounds { - return nil - } - - if err != nil { - pm.log.Errorf("Error dequeuing item ", err) - return nil - } - - // Assert type of the response to an Item pointer so we can work with it - next, err := decodeMsgPack(item.Value) - if err != nil { - pm.log.Errorf("Cannot decode PinningOperation pointer") - return nil - } return next } //currently only used for the tests since the tests need to open and close multiple dbs //handling errors paritally for gosec security scanner func (pm *PinManager) closeQueueDataStructures() { - err := pm.pinQueue.Close() + err := pm.pinQueueData.pinQueue.Close() // todo add front and back to queue if err != nil { pm.log.Fatal(err) } @@ -301,18 +374,20 @@ func createLevelDBKey(value PinningOperationData, log *zap.SugaredLogger) uint64 func buildDuplicateGuardFromPinQueue(QueueDataDir string, log *zap.SugaredLogger) map[uint64]bool { ret := make(map[uint64]bool) - dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") + dname := filepath.Join(QueueDataDir, "pinQueueMsgPack-v2") db, err := leveldb.OpenFile(dname, nil) if err != nil { return ret } iter := db.NewIterator(nil, nil) for iter.Next() { - entry, err := decodeMsgPack(iter.Value()) + items, err := decodeMsgPack(iter.Value()) if err != nil { continue } - ret[entry.ContId] = true + for _, entry := range items { + ret[entry.ContId] = true + } } err = db.Close() @@ -345,8 +420,18 @@ func buildPinQueueCount(q *goque.PrefixQueue, log *zap.SugaredLogger) map[uint]i return mapUint } +func createPinQueue(QueueDataDir string, log *zap.SugaredLogger) PinQueueData { + pq := PinQueueData{ + pinQueue: createDQue(QueueDataDir, log), + pinQueueFront: make(map[uint][]*operation.PinningOperation), + pinQueueBack: make(map[uint][]*operation.PinningOperation), + N: 50, // let frontQueue have 50 before pushing to disk + } + return pq +} + func createDQue(QueueDataDir string, log *zap.SugaredLogger) *goque.PrefixQueue { - dname := filepath.Join(QueueDataDir, "pinQueueMsgPack") + dname := filepath.Join(QueueDataDir, "pinQueueMsgPack-v2") if err := os.MkdirAll(dname, os.ModePerm); err != nil { log.Fatal("Unable to create directory for LevelDB. Out of disk? Too many open files? try ulimit -n 50000") } @@ -362,12 +447,12 @@ func getUserForQueue(UserId uint) []byte { return []byte(strconv.Itoa(int(UserId))) } -func encodeMsgPack(po *operation.PinningOperation) ([]byte, error) { - return msgpack.Marshal(&po) +func encodeMsgPack(front []*operation.PinningOperation) ([]byte, error) { + return msgpack.Marshal(&front) } -func decodeMsgPack(po_bytes []byte) (*operation.PinningOperation, error) { - var next *operation.PinningOperation +func decodeMsgPack(po_bytes []byte) ([]*operation.PinningOperation, error) { + var next []*operation.PinningOperation return next, msgpack.Unmarshal(po_bytes, &next) } @@ -377,7 +462,6 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { poData := getPinningData(po) _, exists := pm.duplicateGuard[createLevelDBKey(poData, pm.log)] if exists { - //work already exists in the queue not adding duplicate return } @@ -386,13 +470,8 @@ func (pm *PinManager) enqueuePinOp(po *operation.PinningOperation) { u = 0 } - opBytes, err := encodeMsgPack(po) - if err != nil { - pm.log.Fatal("Unable to encode data to add to queue.") - } - // Add it to the queue. - _, err = pm.pinQueue.Enqueue(getUserForQueue(u), opBytes) + err := pm.pinQueueData.Enqueue(u, po) if err != nil { pm.log.Fatal("Unable to add pin to queue.", err) } diff --git a/pinner/pinmgr_test.go b/pinner/pinmgr_test.go index 5d232228..9d5c48b5 100644 --- a/pinner/pinmgr_test.go +++ b/pinner/pinmgr_test.go @@ -2,7 +2,7 @@ package pinner import ( "context" - "encoding/json" + //"encoding/json" "fmt" "os" "sync" @@ -66,6 +66,7 @@ func TestConstructMultiAddr(t *testing.T) { }) } +/* func TestEncodeDecode(t *testing.T) { t.Run("", func(t *testing.T) { p := "/ip4/154.113.32.86/tcp/4001/p2p/12D3KooWCsxFFH242NZ4bjRMJEVc61La6Ha4yGVNXeEEwpf8KWCX" @@ -101,6 +102,7 @@ func TestEncodeDecode(t *testing.T) { assert.Equal(t, newPoPeers[0].ID, originsUnmarshalled[0].ID, "ID doesnt match") }) } +*/ func newPinData(name string, userid int, contid uint64) operation.PinningOperation { p := "/ip4/154.113.32.86/tcp/4001/p2p/12D3KooWCsxFFH242NZ4bjRMJEVc61La6Ha4yGVNXeEEwpf8KWCX" @@ -127,7 +129,7 @@ func TestSend1Pin1worker(t *testing.T) { go mgr.Add(&pin) sleepWhileWork(mgr, 0) - assert.Equal(t, 0, int(mgr.pinQueue.Length()), "first pin doesn't enter queue") + assert.Equal(t, 0, int(mgr.PinQueueSize()), "first pin doesn't enter queue") assert.Equal(t, 1, count, "DoPin called once") mgr.closeQueueDataStructures() })