@@ -105,12 +105,13 @@ func (items *priorityItems) push(item Item) {
105105// items that implement the Item interface and adds them
106106// to the queue in priority order.
107107type PriorityQueue struct {
108- waiters waiters
109- items priorityItems
110- itemMap map [Item ]struct {}
111- lock sync.Mutex
112- disposeLock sync.Mutex
113- disposed bool
108+ waiters waiters
109+ items priorityItems
110+ itemMap map [Item ]struct {}
111+ lock sync.Mutex
112+ disposeLock sync.Mutex
113+ disposed bool
114+ allowDuplicates bool
114115}
115116
116117// Put adds items to the queue.
@@ -126,7 +127,9 @@ func (pq *PriorityQueue) Put(items ...Item) error {
126127 }
127128
128129 for _ , item := range items {
129- if _ , ok := pq .itemMap [item ]; ! ok {
130+ if pq .allowDuplicates {
131+ pq .items .push (item )
132+ } else if _ , ok := pq .itemMap [item ]; ! ok {
130133 pq .itemMap [item ] = struct {}{}
131134 pq .items .push (item )
132135 }
@@ -188,7 +191,9 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {
188191 pq .disposeLock .Unlock ()
189192
190193 items = pq .items .get (number )
191- deleteItems (items )
194+ if ! pq .allowDuplicates {
195+ deleteItems (items )
196+ }
192197 sema .response .Done ()
193198 return items , nil
194199 }
@@ -253,6 +258,12 @@ func (pq *PriorityQueue) Dispose() {
253258 pq .waiters = nil
254259}
255260
261+ // AllowDuplicates determines whether the queue supports
262+ // duplicated items. This setting is false by default.
263+ func (pq * PriorityQueue ) AllowDuplicates (allow bool ) {
264+ pq .allowDuplicates = allow
265+ }
266+
256267// NewPriorityQueue is the constructor for a priority queue.
257268func NewPriorityQueue (hint int ) * PriorityQueue {
258269 return & PriorityQueue {
0 commit comments