Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/GitHub.App/Caches/CacheIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ public CacheIndex()
OldKeys = new List<string>();
}

public CacheIndex Add(string indexKey, CacheItem item)
{
var k = string.Format(CultureInfo.InvariantCulture, "{0}|{1}", IndexKey, item.Key);
if (!Keys.Contains(k))
Keys.Add(k);
UpdatedAt = DateTimeOffset.UtcNow;
return this;
}

public IObservable<CacheIndex> AddAndSave(IBlobCache cache, string indexKey, CacheItem item,
DateTimeOffset? absoluteExpiration = null)
{
Expand All @@ -29,7 +38,7 @@ public IObservable<CacheIndex> AddAndSave(IBlobCache cache, string indexKey, Cac
Keys.Add(k);
UpdatedAt = DateTimeOffset.UtcNow;
return cache.InsertObject(IndexKey, this, absoluteExpiration)
.Select(x => this);
.Select(x => this);
}

public static IObservable<CacheIndex> AddAndSaveToIndex(IBlobCache cache, string indexKey, CacheItem item,
Expand All @@ -47,15 +56,19 @@ public static IObservable<CacheIndex> AddAndSaveToIndex(IBlobCache cache, string
.Select(x => index));
}

public IObservable<CacheIndex> Clear(IBlobCache cache, string indexKey, DateTimeOffset? absoluteExpiration = null)
public CacheIndex Clear()
{
OldKeys = Keys.ToList();
Keys.Clear();
UpdatedAt = DateTimeOffset.UtcNow;
return cache
.InvalidateObject<CacheIndex>(indexKey)
.SelectMany(_ => cache.InsertObject(indexKey, this, absoluteExpiration))
.Select(_ => this);
return this;
}

public IObservable<CacheIndex> Save(IBlobCache cache,
DateTimeOffset? absoluteExpiration = null)
{
return cache.InsertObject(IndexKey, this, absoluteExpiration)
.Select(x => this);
}

[AllowNull]
Expand Down
65 changes: 36 additions & 29 deletions src/GitHub.App/Extensions/AkavacheExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Reactive.Linq;
using Akavache;
using GitHub.Caches;
using System.Threading.Tasks;

namespace GitHub.Extensions
{
Expand Down Expand Up @@ -199,39 +200,45 @@ static IObservable<T> GetAndFetchLatestFromIndex<T>(this IBlobCache This,
bool shouldInvalidateOnError = false)
where T : CacheItem
{
var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))
var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount();


var fetch = idx
.Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt)))
.Where(predicateIsTrue => predicateIsTrue.Item2)
.Select(x => x.Item1)
.SelectMany(index => index.Clear(This, key, absoluteExpiration))
.SelectMany(index =>
{
var fetchObs = fetchFunc().Catch<T, Exception>(ex =>
{
var shouldInvalidate = shouldInvalidateOnError ?
This.InvalidateObject<CacheIndex>(key) :
Observable.Return(Unit.Default);
return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex));
});

return fetchObs
.SelectMany(x => x.Save<T>(This, key, absoluteExpiration))
.Do(x => index.AddAndSave(This, key, x, absoluteExpiration))
.Finally(() =>
.Select(index => index.Clear())
.SelectMany(index => fetchFunc()
.Catch<T, Exception>(ex =>
{
This.GetObjects<T>(index.OldKeys.Except(index.Keys))
.Do(dict => This.InvalidateObjects<T>(dict.Keys))
.SelectMany(dict => dict.Values)
.Do(removedItemsCallback)
.Subscribe();
});
}));

var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))
.SelectMany(index => This.GetObjects<T>(index.Keys))
.SelectMany(dict => dict.Values));

return cache.Merge(fetch).Replay().RefCount();
var shouldInvalidate = shouldInvalidateOnError ?
This.InvalidateObject<CacheIndex>(key) :
Observable.Return(Unit.Default);
return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex));
})
.SelectMany(x => x.Save<T>(This, key, absoluteExpiration))
.Do(x => index.Add(key, x))
);

var cache = idx
.SelectMany(index => This.GetObjects<T>(index.Keys.ToList()))
.SelectMany(dict => dict.Values);

return cache.Merge(fetch)
.Finally(async () =>
{
var index = await idx;
await index.Save(This);

var list = index.OldKeys.Except(index.Keys);
if (!list.Any())
return;
var removed = await This.GetObjects<T>(list);
foreach (var d in removed.Values)
removedItemsCallback(d);
await This.InvalidateObjects<T>(list);
})
.Replay().RefCount();
}

static bool IsExpired(IBlobCache blobCache, DateTimeOffset itemCreatedAt, TimeSpan cacheDuration)
Expand Down
1 change: 1 addition & 0 deletions src/GitHub.App/Models/PullRequestModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using GitHub.VisualStudio.Helpers;
using NullGuard;
using System.Diagnostics;
using GitHub.SampleData;

namespace GitHub.Models
{
Expand Down
2 changes: 1 addition & 1 deletion src/GitHub.App/Models/RepositoryHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ IObservable<AuthenticationResult> LoginWithApiUser(UserAndScopes userAndScopes)
{
if (result.IsSuccess())
{
IsLoggedIn = true;
SupportsGist = userAndScopes.Scopes?.Contains("gist") ?? true;
IsLoggedIn = true;
}

log.Info("Log in from cache for login '{0}' to host '{1}' {2}",
Expand Down
7 changes: 3 additions & 4 deletions src/GitHub.App/Services/ModelService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using NLog;
using NullGuard;
using Octokit;
using ReactiveUI;

namespace GitHub.Services
{
Expand Down Expand Up @@ -136,7 +137,7 @@ public IObservable<AccountCacheItem> GetUserFromCache()
}

public ITrackingCollection<IPullRequestModel> GetPullRequests(ISimpleRepositoryModel repo,
[AllowNull]ITrackingCollection<IPullRequestModel> collection = null)
ITrackingCollection<IPullRequestModel> collection)
{
// Since the api to list pull requests returns all the data for each pr, cache each pr in its own entry
// and also cache an index that contains all the keys for each pr. This way we can fetch prs in bulk
Expand All @@ -147,9 +148,6 @@ public ITrackingCollection<IPullRequestModel> GetPullRequests(ISimpleRepositoryM
var keyobs = GetUserFromCache()
.Select(user => string.Format(CultureInfo.InvariantCulture, "{0}|{1}|pr", user.Login, repo.Name));

if (collection == null)
collection = new TrackingCollection<IPullRequestModel>();

var source = Observable.Defer(() => keyobs
.SelectMany(key =>
hostCache.GetAndFetchLatestFromIndex(key, () =>
Expand Down Expand Up @@ -275,6 +273,7 @@ IPullRequestModel Create(PullRequestCacheItem prCacheItem)
};
}


public IObservable<Unit> InsertUser(AccountCacheItem user)
{
return hostCache.InsertObject("user", user);
Expand Down
1 change: 1 addition & 0 deletions src/GitHub.App/ViewModels/PullRequestListViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public PullRequestListViewModel(IRepositoryHost repositoryHost, ISimpleRepositor
PullRequests = new TrackingCollection<IPullRequestModel>();
pullRequests.Comparer = OrderedComparer<IPullRequestModel>.OrderByDescending(x => x.UpdatedAt).Compare;
pullRequests.Filter = (pr, i, l) => pr.IsOpen;
pullRequests.NewerComparer = OrderedComparer<IPullRequestModel>.OrderByDescending(x => x.UpdatedAt).Compare;
}

public override void Initialize([AllowNull] ViewWithData data)
Expand Down
13 changes: 13 additions & 0 deletions src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Reactive;

namespace GitHub.Collections
{
Expand Down Expand Up @@ -32,17 +33,29 @@ public interface ITrackingCollection<T> : IDisposable, IList<T> where T : ICopya
/// </summary>
/// <param name="theComparer">The comparer method for sorting, or null if not sorting</param>
Func<T, T, int> Comparer { get; set; }

/// <summary>
/// Set a new filter. This will cause the collection to be filtered
/// </summary>
/// <param name="theFilter">The new filter, or null to not have any filtering</param>
Func<T, int, IList<T>, bool> Filter { get; set; }

/// <summary>
/// Set a comparer that determines whether the item being processed is newer than the same
/// item seen before. This is to prevent stale items from overriding newer items when data
/// is coming simultaneously from cache and from live data. Use a timestamp-like comparison
/// for best results
/// </summary>
/// <param name="theComparer">The comparer method for sorting, or null if not sorting</param>
Func<T, T, int> NewerComparer { get; set; }

void AddItem(T item);
void RemoveItem(T item);
/// <summary>
/// How long to delay between processing incoming items
/// </summary>
TimeSpan ProcessingDelay { get; set; }
event NotifyCollectionChangedEventHandler CollectionChanged;
IObservable<Unit> OriginalCompleted { get; }
}
}
Loading