11// Package autobatch provides a go-datastore implementation that
22// automatically batches together writes by holding puts in memory until
3- // a certain threshold is met.
3+ // a certain threshold is met. It also acts as a debounce.
44package autobatch
55
66import (
7+ "log"
8+ "sync"
9+ "time"
10+
711 ds "github.com/ipfs/go-datastore"
812 dsq "github.com/ipfs/go-datastore/query"
913)
@@ -12,9 +16,13 @@ import (
1216type Datastore struct {
1317 child ds.Batching
1418
15- // TODO: discuss making ds.Batch implement the full ds.Datastore interface
16- buffer map [ds.Key ]op
17- maxBufferEntries int
19+ mu sync.RWMutex
20+ buffer map [ds.Key ]op
21+
22+ maxWrite int
23+ maxDelay time.Duration
24+ newWrite chan struct {}
25+ exit chan struct {}
1826}
1927
2028type op struct {
@@ -23,28 +31,79 @@ type op struct {
2331}
2432
2533// NewAutoBatching returns a new datastore that automatically
26- // batches writes using the given Batching datastore. The size
27- // of the memory pool is given by size.
28- func NewAutoBatching (d ds.Batching , size int ) * Datastore {
29- return & Datastore {
30- child : d ,
31- buffer : make (map [ds.Key ]op , size ),
32- maxBufferEntries : size ,
34+ // batches writes using the given Batching datastore. The maximum number of
35+ // write before triggering a batch is given by maxWrite. The maximum delay
36+ // before triggering a batch is given by maxDelay.
37+ func NewAutoBatching (child ds.Batching , maxWrite int , maxDelay time.Duration ) * Datastore {
38+ d := & Datastore {
39+ child : child ,
40+ buffer : make (map [ds.Key ]op , maxWrite ),
41+ maxWrite : maxWrite ,
42+ maxDelay : maxDelay ,
43+ newWrite : make (chan struct {}),
44+ exit : make (chan struct {}),
45+ }
46+ go d .runBatcher ()
47+ return d
48+ }
49+
50+ func (d * Datastore ) addOp (key ds.Key , op op ) {
51+ d .mu .Lock ()
52+ d .buffer [key ] = op
53+ d .mu .Unlock ()
54+ d .newWrite <- struct {}{}
55+ }
56+
57+ func (d * Datastore ) runBatcher () {
58+ var timer <- chan time.Time
59+
60+ write := func () {
61+ timer = nil
62+
63+ b , err := d .prepareBatch (nil )
64+ if err != nil {
65+ log .Println (err )
66+ return
67+ }
68+ err = b .Commit ()
69+ if err != nil {
70+ log .Println (err )
71+ return
72+ }
73+ }
74+
75+ for {
76+ select {
77+ case <- d .exit :
78+ return
79+ case <- timer :
80+ write ()
81+ case <- d .newWrite :
82+ d .mu .RLock ()
83+ ready := len (d .buffer )
84+ d .mu .RUnlock ()
85+ if ready > d .maxWrite {
86+ write ()
87+ }
88+ if timer == nil {
89+ timer = time .After (d .maxDelay )
90+ }
91+ }
3392 }
3493}
3594
3695// Delete deletes a key/value
3796func (d * Datastore ) Delete (k ds.Key ) error {
38- d .buffer [k ] = op {delete : true }
39- if len (d .buffer ) > d .maxBufferEntries {
40- return d .Flush ()
41- }
97+ d .addOp (k , op {delete : true })
4298 return nil
4399}
44100
45101// Get retrieves a value given a key.
46102func (d * Datastore ) Get (k ds.Key ) ([]byte , error ) {
103+ d .mu .RLock ()
47104 o , ok := d .buffer [k ]
105+ d .mu .RUnlock ()
106+
48107 if ok {
49108 if o .delete {
50109 return nil , ds .ErrNotFound
@@ -57,69 +116,67 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
57116
58117// Put stores a key/value.
59118func (d * Datastore ) Put (k ds.Key , val []byte ) error {
60- d .buffer [k ] = op {value : val }
61- if len (d .buffer ) > d .maxBufferEntries {
62- return d .Flush ()
63- }
119+ d .addOp (k , op {value : val })
64120 return nil
65121}
66122
67123// Sync flushes all operations on keys at or under the prefix
68124// from the current batch to the underlying datastore
69125func (d * Datastore ) Sync (prefix ds.Key ) error {
70- b , err := d .child . Batch ( )
126+ b , err := d .prepareBatch ( & prefix )
71127 if err != nil {
72128 return err
73129 }
74-
75- for k , o := range d .buffer {
76- if ! (k .Equal (prefix ) || k .IsDescendantOf (prefix )) {
77- continue
78- }
79-
80- var err error
81- if o .delete {
82- err = b .Delete (k )
83- } else {
84- err = b .Put (k , o .value )
85- }
86- if err != nil {
87- return err
88- }
89-
90- delete (d .buffer , k )
91- }
92-
93130 return b .Commit ()
94131}
95132
96133// Flush flushes the current batch to the underlying datastore.
97134func (d * Datastore ) Flush () error {
98- b , err := d .child . Batch ( )
135+ b , err := d .prepareBatch ( nil )
99136 if err != nil {
100137 return err
101138 }
139+ return b .Commit ()
140+ }
141+
142+ func (d * Datastore ) prepareBatch (prefix * ds.Key ) (ds.Batch , error ) {
143+ b , err := d .child .Batch ()
144+ if err != nil {
145+ return nil , err
146+ }
147+
148+ d .mu .Lock ()
102149
103150 for k , o := range d .buffer {
151+ if prefix != nil && ! (k .Equal (* prefix ) || k .IsDescendantOf (* prefix )) {
152+ continue
153+ }
154+
104155 var err error
105156 if o .delete {
106157 err = b .Delete (k )
107158 } else {
108159 err = b .Put (k , o .value )
109160 }
110161 if err != nil {
111- return err
162+ d .mu .Unlock ()
163+ return nil , err
112164 }
165+
166+ delete (d .buffer , k )
113167 }
114- // clear out buffer
115- d .buffer = make (map [ds.Key ]op , d .maxBufferEntries )
116168
117- return b .Commit ()
169+ d .mu .Unlock ()
170+
171+ return b , nil
118172}
119173
120174// Has checks if a key is stored.
121175func (d * Datastore ) Has (k ds.Key ) (bool , error ) {
176+ d .mu .RLock ()
122177 o , ok := d .buffer [k ]
178+ d .mu .RUnlock ()
179+
123180 if ok {
124181 return ! o .delete , nil
125182 }
@@ -129,7 +186,10 @@ func (d *Datastore) Has(k ds.Key) (bool, error) {
129186
130187// GetSize implements Datastore.GetSize
131188func (d * Datastore ) GetSize (k ds.Key ) (int , error ) {
189+ d .mu .RLock ()
132190 o , ok := d .buffer [k ]
191+ d .mu .RUnlock ()
192+
133193 if ok {
134194 if o .delete {
135195 return - 1 , ds .ErrNotFound
@@ -155,6 +215,18 @@ func (d *Datastore) DiskUsage() (uint64, error) {
155215 return ds .DiskUsage (d .child )
156216}
157217
218+ func (d * Datastore ) Batch () (ds.Batch , error ) {
219+ b , err := d .child .Batch ()
220+ if err != nil {
221+ return nil , err
222+ }
223+ return & batch {
224+ parent : d ,
225+ child : b ,
226+ toDelete : make (map [ds.Key ]struct {}),
227+ }, nil
228+ }
229+
158230func (d * Datastore ) Close () error {
159231 err1 := d .Flush ()
160232 err2 := d .child .Close ()
@@ -164,5 +236,32 @@ func (d *Datastore) Close() error {
164236 if err2 != nil {
165237 return err2
166238 }
239+ close (d .exit )
240+ close (d .newWrite )
167241 return nil
168242}
243+
244+ type batch struct {
245+ parent * Datastore
246+ child ds.Batch
247+ toDelete map [ds.Key ]struct {}
248+ }
249+
250+ func (b * batch ) Put (key ds.Key , value []byte ) error {
251+ delete (b .toDelete , key )
252+ return b .child .Put (key , value )
253+ }
254+
255+ func (b * batch ) Delete (key ds.Key ) error {
256+ b .toDelete [key ] = struct {}{}
257+ return b .child .Delete (key )
258+ }
259+
260+ func (b * batch ) Commit () error {
261+ b .parent .mu .Lock ()
262+ for key , _ := range b .toDelete {
263+ delete (b .parent .buffer , key )
264+ }
265+ b .parent .mu .Unlock ()
266+ return b .child .Commit ()
267+ }
0 commit comments