@@ -106,16 +106,8 @@ func (ptree *ptree) init(bufferSize, ary uint64) {
106106}
107107
108108func (ptree * ptree ) operationRunner (xns interfaces , threaded bool ) {
109- var writeOperations , deleteOperations map [* node ][]* keyBundle
110- var toComplete actions
111-
112- if threaded {
113- writeOperations , deleteOperations , toComplete = ptree .fetchKeys (xns )
114- } else {
115- writeOperations , deleteOperations , toComplete = ptree .singleThreadedFetchKeys (xns )
116- }
117-
118- ptree .recursiveAdd (writeOperations , deleteOperations , false )
109+ writeOperations , deleteOperations , toComplete := ptree .fetchKeys (xns , threaded )
110+ ptree .recursiveMutate (writeOperations , deleteOperations , false , threaded )
119111 for _ , a := range toComplete {
120112 a .complete ()
121113 }
@@ -139,27 +131,11 @@ func (ptree *ptree) read(action action) {
139131 }
140132}
141133
142- func (ptree * ptree ) singleThreadedFetchKeys (xns interfaces ) (map [* node ][]* keyBundle , map [* node ][]* keyBundle , actions ) {
143- for _ , ifc := range xns {
144- action := ifc .(action )
145- for i , key := range action .keys () {
146- n := getParent (ptree .root , key )
147- switch action .operation () {
148- case add , remove :
149- action .addNode (int64 (i ), n )
150- case get :
151- if n == nil {
152- action .keys ()[i ] = nil
153- } else {
154- k , _ := n .keys .withPosition (key )
155- if k == nil {
156- action .keys ()[i ] = nil
157- } else {
158- action .keys ()[i ] = k
159- }
160- }
161- }
162- }
134+ func (ptree * ptree ) fetchKeys (xns interfaces , inParallel bool ) (map [* node ][]* keyBundle , map [* node ][]* keyBundle , actions ) {
135+ if inParallel {
136+ ptree .fetchKeysInParallel (xns )
137+ } else {
138+ ptree .fetchKeysInSerial (xns )
163139 }
164140
165141 writeOperations := make (map [* node ][]* keyBundle )
@@ -186,6 +162,30 @@ func (ptree *ptree) singleThreadedFetchKeys(xns interfaces) (map[*node][]*keyBun
186162 return writeOperations , deleteOperations , toComplete
187163}
188164
165+ func (ptree * ptree ) fetchKeysInSerial (xns interfaces ) {
166+ for _ , ifc := range xns {
167+ action := ifc .(action )
168+ for i , key := range action .keys () {
169+ n := getParent (ptree .root , key )
170+ switch action .operation () {
171+ case add , remove :
172+ action .addNode (int64 (i ), n )
173+ case get :
174+ if n == nil {
175+ action .keys ()[i ] = nil
176+ } else {
177+ k , _ := n .keys .withPosition (key )
178+ if k == nil {
179+ action .keys ()[i ] = nil
180+ } else {
181+ action .keys ()[i ] = k
182+ }
183+ }
184+ }
185+ }
186+ }
187+ }
188+
189189func (ptree * ptree ) reset () {
190190 for i := range ptree .cache {
191191 ptree .cache [i ] = nil
@@ -196,7 +196,7 @@ func (ptree *ptree) reset() {
196196 ptree .checkAndRun (nil )
197197}
198198
199- func (ptree * ptree ) fetchKeys (xns []interface {}) ( map [ * node ][] * keyBundle , map [ * node ][] * keyBundle , actions ) {
199+ func (ptree * ptree ) fetchKeysInParallel (xns []interface {}) {
200200 var forCache struct {
201201 i int64
202202 buffer [8 ]uint64 // different cache lines
@@ -251,29 +251,6 @@ func (ptree *ptree) fetchKeys(xns []interface{}) (map[*node][]*keyBundle, map[*n
251251 }()
252252 }
253253 wg .Wait ()
254-
255- writeOperations := make (map [* node ][]* keyBundle )
256- deleteOperations := make (map [* node ][]* keyBundle )
257- toComplete := make (actions , 0 , len (xns )/ 2 )
258- for _ , ifc := range xns {
259- action := ifc .(action )
260- switch action .operation () {
261- case add :
262- for i , n := range action .nodes () {
263- writeOperations [n ] = append (writeOperations [n ], & keyBundle {key : action .keys ()[i ]})
264- }
265- toComplete = append (toComplete , action )
266- case remove :
267- for i , n := range action .nodes () {
268- deleteOperations [n ] = append (deleteOperations [n ], & keyBundle {key : action .keys ()[i ]})
269- }
270- toComplete = append (toComplete , action )
271- case get :
272- action .complete ()
273- }
274- }
275-
276- return writeOperations , deleteOperations , toComplete
277254}
278255
279256func (ptree * ptree ) splitNode (n , parent * node , nodes * []* node , keys * common.Comparators ) {
@@ -295,7 +272,43 @@ func (ptree *ptree) splitNode(n, parent *node, nodes *[]*node, keys *common.Comp
295272 }
296273}
297274
298- func (ptree * ptree ) recursiveAdd (adds , deletes map [* node ][]* keyBundle , setRoot bool ) {
275+ func (ptree * ptree ) applyNode (n * node , adds , deletes []* keyBundle ) {
276+ for _ , kb := range deletes {
277+ if n .keys .len () == 0 {
278+ break
279+ }
280+
281+ deleted := n .keys .delete (kb .key )
282+ if deleted != nil {
283+ atomic .AddUint64 (& ptree .number , ^ uint64 (0 ))
284+ }
285+ }
286+
287+ for _ , kb := range adds {
288+ if n .keys .len () == 0 {
289+ oldKey , _ := n .keys .insert (kb .key )
290+ if n .isLeaf && oldKey == nil {
291+ atomic .AddUint64 (& ptree .number , 1 )
292+ }
293+ if kb .left != nil {
294+ n .nodes .push (kb .left )
295+ n .nodes .push (kb .right )
296+ }
297+ continue
298+ }
299+
300+ oldKey , index := n .keys .insert (kb .key )
301+ if n .isLeaf && oldKey == nil {
302+ atomic .AddUint64 (& ptree .number , 1 )
303+ }
304+ if kb .left != nil {
305+ n .nodes .replaceAt (index , kb .left )
306+ n .nodes .insertAt (index + 1 , kb .right )
307+ }
308+ }
309+ }
310+
311+ func (ptree * ptree ) recursiveMutate (adds , deletes map [* node ][]* keyBundle , setRoot , inParallel bool ) {
299312 if len (adds ) == 0 && len (deletes ) == 0 {
300313 return
301314 }
@@ -331,10 +344,17 @@ func (ptree *ptree) recursiveAdd(adds, deletes map[*node][]*keyBundle, setRoot b
331344 }
332345
333346 var write sync.Mutex
334- nextLayerWrite := make (map [* node ][]* keyBundle , len ( adds ) )
347+ nextLayerWrite := make (map [* node ][]* keyBundle )
335348 nextLayerDelete := make (map [* node ][]* keyBundle )
336349
337- executeInterfacesInParallel (ifs , func (ifc interface {}) {
350+ var mutate func (interfaces , func (interface {}))
351+ if inParallel {
352+ mutate = executeInterfacesInParallel
353+ } else {
354+ mutate = executeInterfacesInSerial
355+ }
356+
357+ mutate (ifs , func (ifc interface {}) {
338358 n := ifc .(* node )
339359 adds := adds [n ]
340360 deletes := deletes [n ]
@@ -353,39 +373,7 @@ func (ptree *ptree) recursiveAdd(adds, deletes map[*node][]*keyBundle, setRoot b
353373 setRoot = true
354374 }
355375
356- for _ , kb := range deletes {
357- if n .keys .len () == 0 {
358- break
359- }
360-
361- deleted := n .keys .delete (kb .key )
362- if deleted != nil {
363- atomic .AddUint64 (& ptree .number , ^ uint64 (0 ))
364- }
365- }
366-
367- for _ , kb := range adds {
368- if n .keys .len () == 0 {
369- oldKey , _ := n .keys .insert (kb .key )
370- if n .isLeaf && oldKey == nil {
371- atomic .AddUint64 (& ptree .number , 1 )
372- }
373- if kb .left != nil {
374- n .nodes .push (kb .left )
375- n .nodes .push (kb .right )
376- }
377- continue
378- }
379-
380- oldKey , index := n .keys .insert (kb .key )
381- if n .isLeaf && oldKey == nil {
382- atomic .AddUint64 (& ptree .number , 1 )
383- }
384- if kb .left != nil {
385- n .nodes .replaceAt (index , kb .left )
386- n .nodes .insertAt (index + 1 , kb .right )
387- }
388- }
376+ ptree .applyNode (n , adds , deletes )
389377
390378 if n .needsSplit (ptree .ary ) {
391379 keys := make (common.Comparators , 0 , n .keys .len ())
@@ -399,7 +387,7 @@ func (ptree *ptree) recursiveAdd(adds, deletes map[*node][]*keyBundle, setRoot b
399387 }
400388 })
401389
402- ptree .recursiveAdd (nextLayerWrite , nextLayerDelete , setRoot )
390+ ptree .recursiveMutate (nextLayerWrite , nextLayerDelete , setRoot , inParallel )
403391}
404392
405393// Insert will add the provided keys to the tree.
0 commit comments