@@ -20,17 +20,17 @@ import (
2020// If value is nil but deleted is false, it means the parent doesn't have the
2121// key. (No need to delete upon Write())
2222type cValue struct {
23- value []byte
24- deleted bool
25- dirty bool
23+ value []byte
24+ dirty bool
2625}
2726
2827// Store wraps an in-memory cache around an underlying types.KVStore.
2928type Store struct {
3029 mtx sync.Mutex
3130 cache map [string ]* cValue
31+ deleted map [string ]struct {}
3232 unsortedCache map [string ]struct {}
33- sortedCache * kv. List // always ascending sorted
33+ sortedCache * dbm. MemDB // always ascending sorted
3434 parent types.KVStore
3535}
3636
@@ -40,8 +40,9 @@ var _ types.CacheKVStore = (*Store)(nil)
4040func NewStore (parent types.KVStore ) * Store {
4141 return & Store {
4242 cache : make (map [string ]* cValue ),
43+ deleted : make (map [string ]struct {}),
4344 unsortedCache : make (map [string ]struct {}),
44- sortedCache : kv . NewList (),
45+ sortedCache : dbm . NewMemDB (),
4546 parent : parent ,
4647 }
4748}
@@ -122,7 +123,7 @@ func (store *Store) Write() {
122123 cacheValue := store .cache [key ]
123124
124125 switch {
125- case cacheValue . deleted :
126+ case store . isDeleted ( key ) :
126127 store .parent .Delete ([]byte (key ))
127128 case cacheValue .value == nil :
128129 // Skip, it already doesn't exist in parent.
@@ -133,8 +134,9 @@ func (store *Store) Write() {
133134
134135 // Clear the cache
135136 store .cache = make (map [string ]* cValue )
137+ store .deleted = make (map [string ]struct {})
136138 store .unsortedCache = make (map [string ]struct {})
137- store .sortedCache = kv . NewList ()
139+ store .sortedCache = dbm . NewMemDB ()
138140}
139141
140142// CacheWrap implements CacheWrapper.
@@ -178,23 +180,40 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
178180 }
179181
180182 store .dirtyItems (start , end )
181- cache = newMemIterator (start , end , store .sortedCache , ascending )
183+ cache = newMemIterator (start , end , store .sortedCache , store . deleted , ascending )
182184
183185 return newCacheMergeIterator (parent , cache , ascending )
184186}
185187
186188// Constructs a slice of dirty items, to use w/ memIterator.
187189func (store * Store ) dirtyItems (start , end []byte ) {
188- unsorted := make ([]* kv.Pair , 0 )
189-
190190 n := len (store .unsortedCache )
191- for key := range store .unsortedCache {
192- if dbm .IsKeyInDomain (conv .UnsafeStrToBytes (key ), start , end ) {
191+ unsorted := make ([]* kv.Pair , 0 )
192+ // If the unsortedCache is too big, its costs too much to determine
193+ // whats in the subset we are concerned about.
194+ // If you are interleaving iterator calls with writes, this can easily become an
195+ // O(N^2) overhead.
196+ // Even without that, too many range checks eventually becomes more expensive
197+ // than just not having the cache.
198+ if n >= 1024 {
199+ for key := range store .unsortedCache {
193200 cacheValue := store .cache [key ]
194201 unsorted = append (unsorted , & kv.Pair {Key : []byte (key ), Value : cacheValue .value })
195202 }
203+ } else {
204+ // else do a linear scan to determine if the unsorted pairs are in the pool.
205+ for key := range store .unsortedCache {
206+ if dbm .IsKeyInDomain (conv .UnsafeStrToBytes (key ), start , end ) {
207+ cacheValue := store .cache [key ]
208+ unsorted = append (unsorted , & kv.Pair {Key : []byte (key ), Value : cacheValue .value })
209+ }
210+ }
196211 }
212+ store .clearUnsortedCacheSubset (unsorted )
213+ }
197214
215+ func (store * Store ) clearUnsortedCacheSubset (unsorted []* kv.Pair ) {
216+ n := len (store .unsortedCache )
198217 if len (unsorted ) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map.
199218 for key := range store .unsortedCache {
200219 delete (store .unsortedCache , key )
@@ -204,32 +223,21 @@ func (store *Store) dirtyItems(start, end []byte) {
204223 delete (store .unsortedCache , conv .UnsafeBytesToStr (kv .Key ))
205224 }
206225 }
207-
208226 sort .Slice (unsorted , func (i , j int ) bool {
209227 return bytes .Compare (unsorted [i ].Key , unsorted [j ].Key ) < 0
210228 })
211229
212- for e := store .sortedCache .Front (); e != nil && len (unsorted ) != 0 ; {
213- uitem := unsorted [0 ]
214- sitem := e .Value
215- comp := bytes .Compare (uitem .Key , sitem .Key )
216-
217- switch comp {
218- case - 1 :
219- unsorted = unsorted [1 :]
220-
221- store .sortedCache .InsertBefore (uitem , e )
222- case 1 :
223- e = e .Next ()
224- case 0 :
225- unsorted = unsorted [1 :]
226- e .Value = uitem
227- e = e .Next ()
230+ for _ , item := range unsorted {
231+ if item .Value == nil {
232+ // deleted element, tracked by store.deleted
233+ // setting arbitrary value
234+ store .sortedCache .Set (item .Key , []byte {})
235+ continue
236+ }
237+ err := store .sortedCache .Set (item .Key , item .Value )
238+ if err != nil {
239+ panic (err )
228240 }
229- }
230-
231- for _ , kvp := range unsorted {
232- store .sortedCache .PushBack (kvp )
233241 }
234242}
235243
@@ -238,12 +246,22 @@ func (store *Store) dirtyItems(start, end []byte) {
238246
239247// Only entrypoint to mutate store.cache.
240248func (store * Store ) setCacheValue (key , value []byte , deleted bool , dirty bool ) {
241- store .cache [conv .UnsafeBytesToStr (key )] = & cValue {
242- value : value ,
243- deleted : deleted ,
244- dirty : dirty ,
249+ keyStr := conv .UnsafeBytesToStr (key )
250+ store .cache [keyStr ] = & cValue {
251+ value : value ,
252+ dirty : dirty ,
253+ }
254+ if deleted {
255+ store .deleted [keyStr ] = struct {}{}
256+ } else {
257+ delete (store .deleted , keyStr )
245258 }
246259 if dirty {
247260 store .unsortedCache [conv .UnsafeBytesToStr (key )] = struct {}{}
248261 }
249262}
263+
264+ func (store * Store ) isDeleted (key string ) bool {
265+ _ , ok := store .deleted [key ]
266+ return ok
267+ }
0 commit comments