|
4 | 4 | using System.Reactive.Linq; |
5 | 5 | using Akavache; |
6 | 6 | using GitHub.Caches; |
| 7 | +using System.Threading.Tasks; |
7 | 8 |
|
8 | 9 | namespace GitHub.Extensions |
9 | 10 | { |
@@ -199,39 +200,45 @@ static IObservable<T> GetAndFetchLatestFromIndex<T>(this IBlobCache This, |
199 | 200 | bool shouldInvalidateOnError = false) |
200 | 201 | where T : CacheItem |
201 | 202 | { |
202 | | - var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) |
| 203 | + var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount(); |
| 204 | + |
| 205 | + |
| 206 | + var fetch = idx |
203 | 207 | .Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt))) |
204 | 208 | .Where(predicateIsTrue => predicateIsTrue.Item2) |
205 | 209 | .Select(x => x.Item1) |
206 | | - .SelectMany(index => index.Clear(This, key, absoluteExpiration)) |
207 | | - .SelectMany(index => |
208 | | - { |
209 | | - var fetchObs = fetchFunc().Catch<T, Exception>(ex => |
210 | | - { |
211 | | - var shouldInvalidate = shouldInvalidateOnError ? |
212 | | - This.InvalidateObject<CacheIndex>(key) : |
213 | | - Observable.Return(Unit.Default); |
214 | | - return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex)); |
215 | | - }); |
216 | | - |
217 | | - return fetchObs |
218 | | - .SelectMany(x => x.Save<T>(This, key, absoluteExpiration)) |
219 | | - .Do(x => index.AddAndSave(This, key, x, absoluteExpiration)) |
220 | | - .Finally(() => |
| 210 | + .Select(index => index.Clear()) |
| 211 | + .SelectMany(index => fetchFunc() |
| 212 | + .Catch<T, Exception>(ex => |
221 | 213 | { |
222 | | - This.GetObjects<T>(index.OldKeys.Except(index.Keys)) |
223 | | - .Do(dict => This.InvalidateObjects<T>(dict.Keys)) |
224 | | - .SelectMany(dict => dict.Values) |
225 | | - .Do(removedItemsCallback) |
226 | | - .Subscribe(); |
227 | | - }); |
228 | | - })); |
229 | | - |
230 | | - var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) |
231 | | - .SelectMany(index => This.GetObjects<T>(index.Keys)) |
232 | | - .SelectMany(dict => dict.Values)); |
233 | | - |
234 | | - return cache.Merge(fetch).Replay().RefCount(); |
| 214 | + var shouldInvalidate = shouldInvalidateOnError ? |
| 215 | + This.InvalidateObject<CacheIndex>(key) : |
| 216 | + Observable.Return(Unit.Default); |
| 217 | + return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex)); |
| 218 | + }) |
| 219 | + .SelectMany(x => x.Save<T>(This, key, absoluteExpiration)) |
| 220 | + .Do(x => index.Add(key, x)) |
| 221 | + ); |
| 222 | + |
| 223 | + var cache = idx |
| 224 | + .SelectMany(index => This.GetObjects<T>(index.Keys.ToList())) |
| 225 | + .SelectMany(dict => dict.Values); |
| 226 | + |
| 227 | + return cache.Merge(fetch) |
| 228 | + .Finally(async () => |
| 229 | + { |
| 230 | + var index = await idx; |
| 231 | + await index.Save(This); |
| 232 | + |
| 233 | + var list = index.OldKeys.Except(index.Keys); |
| 234 | + if (!list.Any()) |
| 235 | + return; |
| 236 | + var removed = await This.GetObjects<T>(list); |
| 237 | + foreach (var d in removed.Values) |
| 238 | + removedItemsCallback(d); |
| 239 | + await This.InvalidateObjects<T>(list); |
| 240 | + }) |
| 241 | + .Replay().RefCount(); |
235 | 242 | } |
236 | 243 |
|
237 | 244 | static bool IsExpired(IBlobCache blobCache, DateTimeOffset itemCreatedAt, TimeSpan cacheDuration) |
|
0 commit comments