From d756cf2b595d2649ed13f2d51fc7360dac27dd33 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Tue, 21 Jun 2016 18:20:42 +0200 Subject: [PATCH 1/8] Internal constructor helper for ModelService --- src/GitHub.App/Models/PullRequestModel.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/GitHub.App/Models/PullRequestModel.cs b/src/GitHub.App/Models/PullRequestModel.cs index c9d531aead..f2d1afde59 100644 --- a/src/GitHub.App/Models/PullRequestModel.cs +++ b/src/GitHub.App/Models/PullRequestModel.cs @@ -4,6 +4,7 @@ using GitHub.VisualStudio.Helpers; using NullGuard; using System.Diagnostics; +using GitHub.SampleData; namespace GitHub.Models { @@ -22,6 +23,13 @@ public PullRequestModel(int number, string title, UpdatedAt = updatedAt ?? CreatedAt; } + internal PullRequestModel(int number) + { + Number = number; + Title = ""; + Author = new AccountDesigner(); + } + public void CopyFrom(IPullRequestModel other) { if (!Equals(other)) From 5592060848c1f66d0d15bb6ba2c0e7610c23abe4 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Tue, 21 Jun 2016 18:21:49 +0200 Subject: [PATCH 2/8] Let's trigger this after all the state is set --- src/GitHub.App/Models/RepositoryHost.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/GitHub.App/Models/RepositoryHost.cs b/src/GitHub.App/Models/RepositoryHost.cs index c6a1db331d..852d54b042 100644 --- a/src/GitHub.App/Models/RepositoryHost.cs +++ b/src/GitHub.App/Models/RepositoryHost.cs @@ -252,8 +252,8 @@ IObservable LoginWithApiUser(UserAndScopes userAndScopes) { if (result.IsSuccess()) { - IsLoggedIn = true; SupportsGist = userAndScopes.Scopes?.Contains("gist") ?? true; + IsLoggedIn = true; } log.Info("Log in from cache for login '{0}' to host '{1}' {2}", From 1024d35cd2992f85636a0b56184cd73fbf121f67 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Tue, 21 Jun 2016 18:55:07 +0200 Subject: [PATCH 3/8] Update to akavache 4.1.2 --- submodules/akavache | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submodules/akavache b/submodules/akavache index ba21a0f71f..056ec2af17 160000 --- a/submodules/akavache +++ b/submodules/akavache @@ -1 +1 @@ -Subproject commit ba21a0f71f4b3b6c5261be8b5eb5ce2c0cc3219c +Subproject commit 056ec2af1755be7672fda176ad6c697ee22e579b From cb937dc14657f5c54b24984dd733d75ce30d4b60 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Tue, 21 Jun 2016 20:08:59 +0200 Subject: [PATCH 4/8] Fix ALL THE RACES!!!!11 --- src/GitHub.App/Caches/CacheIndex.cs | 25 +- .../Extensions/AkavacheExtensions.cs | 60 +++-- src/GitHub.App/Services/ModelService.cs | 11 +- .../ViewModels/PullRequestListViewModel.cs | 1 + .../Collections/ITrackingCollection.cs | 13 + .../Collections/TrackingCollection.cs | 248 ++++++++++++++---- .../GitHub.Exports.Reactive.csproj | 1 + .../GlobalSuppressions.cs | Bin 0 -> 1988 bytes .../TrackingCollectionTests.cs | 246 ++++++++--------- .../GitHub.App/Models/ModelServiceTests.cs | 52 ++-- 10 files changed, 428 insertions(+), 229 deletions(-) create mode 100644 src/GitHub.Exports.Reactive/GlobalSuppressions.cs diff --git a/src/GitHub.App/Caches/CacheIndex.cs b/src/GitHub.App/Caches/CacheIndex.cs index a9016470e1..8f3ae7aad7 100644 --- a/src/GitHub.App/Caches/CacheIndex.cs +++ b/src/GitHub.App/Caches/CacheIndex.cs @@ -21,6 +21,15 @@ public CacheIndex() OldKeys = new List(); } + public CacheIndex Add(string indexKey, CacheItem item) + { + var k = string.Format(CultureInfo.InvariantCulture, "{0}|{1}", IndexKey, item.Key); + if (!Keys.Contains(k)) + Keys.Add(k); + UpdatedAt = DateTimeOffset.UtcNow; + return this; + } + public IObservable AddAndSave(IBlobCache cache, string indexKey, CacheItem item, DateTimeOffset? absoluteExpiration = null) { @@ -29,7 +38,7 @@ public IObservable AddAndSave(IBlobCache cache, string indexKey, Cac Keys.Add(k); UpdatedAt = DateTimeOffset.UtcNow; return cache.InsertObject(IndexKey, this, absoluteExpiration) - .Select(x => this); + .Select(x => this); } public static IObservable AddAndSaveToIndex(IBlobCache cache, string indexKey, CacheItem item, @@ -47,15 +56,19 @@ public static IObservable AddAndSaveToIndex(IBlobCache cache, string .Select(x => index)); } - public IObservable Clear(IBlobCache cache, string indexKey, DateTimeOffset? absoluteExpiration = null) + public CacheIndex Clear() { OldKeys = Keys.ToList(); Keys.Clear(); UpdatedAt = DateTimeOffset.UtcNow; - return cache - .InvalidateObject(indexKey) - .SelectMany(_ => cache.InsertObject(indexKey, this, absoluteExpiration)) - .Select(_ => this); + return this; + } + + public IObservable Save(IBlobCache cache, + DateTimeOffset? absoluteExpiration = null) + { + return cache.InsertObject(IndexKey, this, absoluteExpiration) + .Select(x => this); } [AllowNull] diff --git a/src/GitHub.App/Extensions/AkavacheExtensions.cs b/src/GitHub.App/Extensions/AkavacheExtensions.cs index 87fa112fc2..e18e881f10 100644 --- a/src/GitHub.App/Extensions/AkavacheExtensions.cs +++ b/src/GitHub.App/Extensions/AkavacheExtensions.cs @@ -4,6 +4,7 @@ using System.Reactive.Linq; using Akavache; using GitHub.Caches; +using System.Threading.Tasks; namespace GitHub.Extensions { @@ -165,7 +166,7 @@ public static IObservable GetAndFetchLatestFromIndex( this IBlobCache blobCache, string key, Func> fetchFunc, - Action removedItemsCallback, + Action removedItemsCallback, TimeSpan refreshInterval, TimeSpan maxCacheDuration) where T : CacheItem @@ -193,45 +194,54 @@ public static IObservable GetAndFetchLatestFromIndex( static IObservable GetAndFetchLatestFromIndex(this IBlobCache This, string key, Func> fetchFunc, - Action removedItemsCallback, + Action removedItemsCallback, Func fetchPredicate = null, DateTimeOffset? absoluteExpiration = null, bool shouldInvalidateOnError = false) where T : CacheItem { - var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) + var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount(); + + + var fetch = idx .Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt))) .Where(predicateIsTrue => predicateIsTrue.Item2) .Select(x => x.Item1) - .SelectMany(index => index.Clear(This, key, absoluteExpiration)) + .Select(index => index.Clear()) .SelectMany(index => { - var fetchObs = fetchFunc().Catch(ex => - { - var shouldInvalidate = shouldInvalidateOnError ? - This.InvalidateObject(key) : - Observable.Return(Unit.Default); - return shouldInvalidate.SelectMany(__ => Observable.Throw(ex)); - }); + var fetchObs = fetchFunc() + .Catch(ex => + { + var shouldInvalidate = shouldInvalidateOnError ? + This.InvalidateObject(key) : + Observable.Return(Unit.Default); + return shouldInvalidate.SelectMany(__ => Observable.Throw(ex)); + }); return fetchObs .SelectMany(x => x.Save(This, key, absoluteExpiration)) - .Do(x => index.AddAndSave(This, key, x, absoluteExpiration)) - .Finally(() => - { - This.GetObjects(index.OldKeys.Except(index.Keys)) - .Do(dict => This.InvalidateObjects(dict.Keys)) - .SelectMany(dict => dict.Values) - .Do(removedItemsCallback) - .Subscribe(); - }); - })); + .Do(x => index.Add(key, x)); + }); - var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) - .SelectMany(index => This.GetObjects(index.Keys)) - .SelectMany(dict => dict.Values)); + var cache = idx + .SelectMany(index => This.GetObjects(index.Keys.ToList())) + .SelectMany(dict => dict.Values); + + return cache.Merge(fetch) + .Finally(async () => + { + var index = await idx; + await index.Save(This); - return cache.Merge(fetch).Replay().RefCount(); + var list = index.OldKeys.Except(index.Keys); + if (!list.Any()) + return; + foreach (var d in list) + removedItemsCallback(d); + await This.InvalidateObjects(list); + }) + .Replay().RefCount(); } static bool IsExpired(IBlobCache blobCache, DateTimeOffset itemCreatedAt, TimeSpan cacheDuration) diff --git a/src/GitHub.App/Services/ModelService.cs b/src/GitHub.App/Services/ModelService.cs index a281854224..dbf4ade2f6 100644 --- a/src/GitHub.App/Services/ModelService.cs +++ b/src/GitHub.App/Services/ModelService.cs @@ -16,6 +16,7 @@ using NLog; using NullGuard; using Octokit; +using ReactiveUI; namespace GitHub.Services { @@ -136,7 +137,7 @@ public IObservable GetUserFromCache() } public ITrackingCollection GetPullRequests(ISimpleRepositoryModel repo, - [AllowNull]ITrackingCollection collection = null) + ITrackingCollection collection) { // Since the api to list pull requests returns all the data for each pr, cache each pr in its own entry // and also cache an index that contains all the keys for each pr. This way we can fetch prs in bulk @@ -147,9 +148,6 @@ public ITrackingCollection GetPullRequests(ISimpleRepositoryM var keyobs = GetUserFromCache() .Select(user => string.Format(CultureInfo.InvariantCulture, "{0}|{1}|pr", user.Login, repo.Name)); - if (collection == null) - collection = new TrackingCollection(); - var source = Observable.Defer(() => keyobs .SelectMany(key => hostCache.GetAndFetchLatestFromIndex(key, () => @@ -275,6 +273,11 @@ IPullRequestModel Create(PullRequestCacheItem prCacheItem) }; } + static IPullRequestModel Create(string key) + { + return new PullRequestModel(Int32.Parse(key.Split('|').Last(), CultureInfo.InvariantCulture)); + } + public IObservable InsertUser(AccountCacheItem user) { return hostCache.InsertObject("user", user); diff --git a/src/GitHub.App/ViewModels/PullRequestListViewModel.cs b/src/GitHub.App/ViewModels/PullRequestListViewModel.cs index 4187ec477d..4f4f2caa75 100644 --- a/src/GitHub.App/ViewModels/PullRequestListViewModel.cs +++ b/src/GitHub.App/ViewModels/PullRequestListViewModel.cs @@ -79,6 +79,7 @@ public PullRequestListViewModel(IRepositoryHost repositoryHost, ISimpleRepositor PullRequests = new TrackingCollection(); pullRequests.Comparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; pullRequests.Filter = (pr, i, l) => pr.IsOpen; + pullRequests.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; } public override void Initialize([AllowNull] ViewWithData data) diff --git a/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs b/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs index c6d1c13dc7..7faee925fd 100644 --- a/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs +++ b/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Collections.Specialized; +using System.Reactive; namespace GitHub.Collections { @@ -32,11 +33,22 @@ public interface ITrackingCollection : IDisposable, IList where T : ICopya /// /// The comparer method for sorting, or null if not sorting Func Comparer { get; set; } + /// /// Set a new filter. This will cause the collection to be filtered /// /// The new filter, or null to not have any filtering Func, bool> Filter { get; set; } + + /// + /// Set a comparer that determines whether the item being processed is newer than the same + /// item seen before. This is to prevent stale items from overriding newer items when data + /// is coming simultaneously from cache and from live data. Use a timestamp-like comparison + /// for best results + /// + /// The comparer method for sorting, or null if not sorting + Func NewerComparer { get; set; } + void AddItem(T item); void RemoveItem(T item); /// @@ -44,5 +56,6 @@ public interface ITrackingCollection : IDisposable, IList where T : ICopya /// TimeSpan ProcessingDelay { get; set; } event NotifyCollectionChangedEventHandler CollectionChanged; + IObservable OriginalCompleted { get; } } } \ No newline at end of file diff --git a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs index a10b0a4752..a41c52a36d 100644 --- a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs +++ b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs @@ -1,8 +1,8 @@ #if !DISABLE_REACTIVEUI +using GitHub.VisualStudio.Helpers; using ReactiveUI; #else using System.Windows.Threading; -using System.Threading; #endif using System; using System.Collections.Concurrent; @@ -13,6 +13,9 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; namespace GitHub.Collections { @@ -40,14 +43,23 @@ enum TheAction bool isChanging; - readonly CompositeDisposable disposables = new CompositeDisposable(); - IObservable source; - IObservable sourceQueue; Func comparer; Func, bool> filter; - readonly IScheduler scheduler; - ConcurrentQueue queue; + Func newer; // comparer to check whether the item being processed is newer than the existing one + + IObservable source; + IConnectableObservable dataPump; + IConnectableObservable cachePump; + ConcurrentQueue cache; + + Subject signalHaveData; + Subject signalNeedData; + Subject dataListener; + bool resetting = false; + + readonly CompositeDisposable disposables = new CompositeDisposable(); + readonly IScheduler scheduler; readonly List original = new List(); #if DEBUG public IList DebugInternalList => original; @@ -60,22 +72,29 @@ enum TheAction // for speeding up IndexOf in the filtered list readonly Dictionary filteredIndexCache = new Dictionary(); - TimeSpan delay; + bool originalSourceIsCompleted; + bool signalOriginalSourceCompletion; + readonly Subject originalSourceCompleted = new Subject(); + public IObservable OriginalCompleted => originalSourceCompleted; + TimeSpan requestedDelay; readonly TimeSpan fuzziness; + public TimeSpan ProcessingDelay { get { return requestedDelay; } set { requestedDelay = value; - delay = value; } } - public TrackingCollection(Func comparer = null, Func, bool> filter = null, IScheduler scheduler = null) + bool ManualProcessing => cache.IsEmpty && originalSourceIsCompleted; + + public TrackingCollection(Func comparer = null, Func, bool> filter = null, + Func newer = null, IScheduler scheduler = null) { - queue = new ConcurrentQueue(); + cache = new ConcurrentQueue(); ProcessingDelay = TimeSpan.FromMilliseconds(10); fuzziness = TimeSpan.FromMilliseconds(1); @@ -86,14 +105,15 @@ public TrackingCollection(Func comparer = null, Func #endif this.comparer = comparer ?? Comparer.Default.Compare; this.filter = filter; + this.newer = newer ?? Comparer.Default.Compare; } - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors")] public TrackingCollection(IObservable source, Func comparer = null, Func, bool> filter = null, + Func newer = null, IScheduler scheduler = null) - : this(comparer, filter, scheduler) + : this(comparer, filter, newer, scheduler) { Listen(source); } @@ -110,37 +130,79 @@ public IObservable Listen(IObservable obs) if (disposed) throw new ObjectDisposedException("TrackingCollection"); - disposables.Clear(); - while (!queue.IsEmpty) - GetFromQueue(); + Reset(); - sourceQueue = obs - .Do(data => queue.Enqueue(new ActionData(data))); + var waitHandle = new ManualResetEventSlim(); - source = Observable.Defer(() => { - StartQueue(); - return Observable.Timer(TimeSpan.Zero, delay) - .Select(_ => GetFromQueue()); - }) + dataPump = obs + .Do(data => + { + cache.Enqueue(new ActionData(data)); + signalHaveData.OnNext(Unit.Default); + }) + .Finally(() => + { + originalSourceIsCompleted = true; + signalOriginalSourceCompletion = true; + }) + .Publish(); + + + // when both signalHaveData and signalNeedData produce a value, dataListener gets a value + // this will empty the queue of items that have been cached in regular intervals according + // to the requested delay + cachePump = signalHaveData + .Zip(signalNeedData, (a, b) => Unit.Default) + .ObserveOn(TaskPoolScheduler.Default) + .TimeInterval() + .Select(interval => + { + var delay = CalculateProcessingDelay(interval); + waitHandle.Wait(delay); + dataListener.OnNext(GetFromQueue()); + return Unit.Default; + }) + .Publish(); + + source = dataListener .Where(data => data.Item != null) .ObserveOn(scheduler) - .Select(x => ProcessItem(x, original)) - // if we're removing an item that doesn't exist, ignore it - .Where(data => !(data.TheAction == TheAction.Remove && data.OldPosition < 0)) - .Select(SortedNone) - .Select(SortedAdd) - .Select(SortedInsert) - .Select(SortedMove) - .Select(SortedRemove) - .Select(CheckFilter) - .Select(FilteredAdd) - .Select(CalculateIndexes) - .Select(FilteredNone) - .Select(FilteredInsert) - .Select(FilteredMove) - .Select(FilteredRemove) - .TimeInterval() - .Select(UpdateProcessingDelay) + .Select(data => { + data = ProcessItem(data, original); + + // if we're removing an item that doesn't exist, ignore it + if (data.TheAction == TheAction.Remove && data.OldPosition < 0) + return ActionData.Default; + + data = SortedNone(data); + data = SortedAdd(data); + data = SortedInsert(data); + data = SortedMove(data); + data = SortedRemove(data); + data = CheckFilter(data); + data = FilteredAdd(data); + data = CalculateIndexes(data); + data = FilteredNone(data); + data = FilteredInsert(data); + data = FilteredMove(data); + data = FilteredRemove(data); + return data; + }) + .Do(_ => + { + if (ManualProcessing) + { + if (signalOriginalSourceCompletion) + { + signalOriginalSourceCompletion = false; + originalSourceCompleted.OnNext(Unit.Default); + } + + } + else + signalNeedData.OnNext(Unit.Default); + }) + .Where(data => data.Item != null) .Select(data => data.Item) .Publish() .RefCount(); @@ -186,6 +248,27 @@ public Func, bool> Filter } } + /// + /// Set a comparer that determines whether the item being processed is newer than the same + /// item seen before. This is to prevent stale items from overriding newer items when data + /// is coming simultaneously from cache and from live data. Use a timestamp-like comparison + /// for best results + /// + /// The comparer method for sorting, or null if not sorting + public Func NewerComparer + { + get + { + return newer; + } + set + { + if (disposed) + throw new ObjectDisposedException("TrackingCollection"); + newer = value; + } + } + public IDisposable Subscribe() { if (source == null) @@ -193,6 +276,7 @@ public IDisposable Subscribe() if (disposed) throw new ObjectDisposedException("TrackingCollection"); disposables.Add(source.Subscribe()); + StartQueue(); return this; } @@ -203,6 +287,7 @@ public IDisposable Subscribe(Action onNext, Action onCompleted) if (disposed) throw new ObjectDisposedException("TrackingCollection"); disposables.Add(source.Subscribe(onNext, onCompleted)); + StartQueue(); return this; } @@ -210,14 +295,28 @@ public void AddItem(T item) { if (disposed) throw new ObjectDisposedException("TrackingCollection"); - queue.Enqueue(new ActionData(item)); + + if (ManualProcessing) + dataListener.OnNext(new ActionData(item)); + else + { + cache.Enqueue(new ActionData(item)); + signalHaveData.OnNext(Unit.Default); + } } public void RemoveItem(T item) { if (disposed) throw new ObjectDisposedException("TrackingCollection"); - queue.Enqueue(new ActionData(TheAction.Remove, item)); + + if (ManualProcessing) + dataListener.OnNext(new ActionData(TheAction.Remove, item)); + else + { + cache.Enqueue(new ActionData(TheAction.Remove, item)); + signalHaveData.OnNext(Unit.Default); + } } void SetAndRecalculateSort(Func theComparer) @@ -252,7 +351,9 @@ ActionData CheckFilter(ActionData data) int StartQueue() { - disposables.Add(sourceQueue.Subscribe()); + disposables.Add(cachePump.Connect()); + disposables.Add(dataPump.Connect()); + signalNeedData.OnNext(Unit.Default); return 0; } @@ -261,7 +362,7 @@ ActionData GetFromQueue() try { ActionData d = ActionData.Default; - if (queue?.TryDequeue(out d) ?? false) + if (cache?.TryDequeue(out d) ?? false) return d; } catch { } @@ -281,6 +382,12 @@ ActionData ProcessItem(ActionData data, List list) if (idx >= 0) { var old = list[idx]; + var isNewer = newer(item, old); + + // the object is "older" than the one we have, ignore it + if (isNewer > 0) + ret = new ActionData(TheAction.None, list, item, null, idx, idx); + var comparison = comparer(item, old); // no sorting to be done, just replacing the element in-place @@ -510,17 +617,20 @@ ActionData FilteredRemove(ActionData data) /// so that the average time between an item being processed /// is +- the requested processing delay. /// - ActionData UpdateProcessingDelay(TimeInterval data) + TimeSpan CalculateProcessingDelay(TimeInterval interval) { - if (requestedDelay == TimeSpan.Zero) - return data.Value; - var time = data.Interval; - if (time > requestedDelay + fuzziness) - delay -= time - requestedDelay; - else if (time < requestedDelay + fuzziness) - delay += requestedDelay - time; - delay = delay < TimeSpan.Zero ? TimeSpan.Zero : delay; - return data.Value; + var delay = TimeSpan.Zero; + if (requestedDelay > TimeSpan.Zero) + { + var time = interval.Interval; + if (time > requestedDelay + fuzziness) + delay -= time - requestedDelay; + else if (time < requestedDelay + fuzziness) + delay += requestedDelay - time; + delay = delay < TimeSpan.Zero ? TimeSpan.Zero : delay; + } + + return delay; } #endregion @@ -693,7 +803,7 @@ void InternalInsertItem(T item, int position) protected override void InsertItem(int index, T item) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.InsertItem(index, item); @@ -729,7 +839,7 @@ void InternalRemoveItem(T item) protected override void RemoveItem(int index) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.RemoveItem(index); @@ -757,7 +867,7 @@ void InternalMoveItem(int positionFrom, int positionTo) protected override void MoveItem(int oldIndex, int newIndex) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.MoveItem(oldIndex, newIndex); @@ -808,7 +918,8 @@ int GetIndexUnfiltered(T item) { ret = original.IndexOf(item); if (ret >= 0) - sortedIndexCache.Add(item, ret); + sortedIndexCache.Add(original[ret], ret); + } return ret; } @@ -879,6 +990,27 @@ static IScheduler GetScheduler(IScheduler scheduler) } #endif + void Reset() + { + if (resetting) + return; + + resetting = true; + + disposables.Clear(); + originalSourceIsCompleted = false; + signalOriginalSourceCompletion = false; + cache = new ConcurrentQueue(); + dataListener = new Subject(); + disposables.Add(dataListener); + signalHaveData = new Subject(); + disposables.Add(signalHaveData); + signalNeedData = new Subject(); + disposables.Add(signalNeedData); + + resetting = false; + } + bool disposed = false; void Dispose(bool disposing) { @@ -887,8 +1019,8 @@ void Dispose(bool disposing) if (!disposed) { disposed = true; - queue = null; disposables.Dispose(); + cache = null; } } } diff --git a/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj b/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj index 3f7d374e3b..83038d049c 100644 --- a/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj +++ b/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj @@ -89,6 +89,7 @@ + diff --git a/src/GitHub.Exports.Reactive/GlobalSuppressions.cs b/src/GitHub.Exports.Reactive/GlobalSuppressions.cs new file mode 100644 index 0000000000000000000000000000000000000000..2538500fd8f1dfc1aa5761b22a8209fdf9602e4c GIT binary patch literal 1988 zcmeH|PjAye5XIjaiSMw4ODm`goKOWw6%~IDpi0RFAtcwF}D}B4~Ffa5NZjQ})oxpX%v!{RS<&_@kncnCL z_&c5_F(ak+$>YAh>7gFz|IOdm&3}&CaUI;(-5=@@m5($-vkkLzMk_mPzc;K}qXFe# z@y?YVP}TKX52(4u!u4wXyY@V~huqZzLeya5;c9KEi#gsl%-l1KJBz9fyxm7Ux~D9# zOWcxY+odSnZ)*Ku*UB$C9!{_A9K=|EVz&Ub>G8~!9oTpM&luO(nT^i`qI1NKbRGMd zJBI6e$8^d`+d2=fwH&9txetxmch1K*aNPBr_lCXHv;37JEtxs{?q0@m*QsK6Y`G=l zx>I(0o4?)B#+_{J_>5v4W$Z)dep@cG|6QiDUX1U1o06SY2l(;rxEH4TbG8!O27Gr{ zE6iC*dF^KBE{&!C$OiP5$nEtK?n!T%-Qb4eLhrwm>OE(;%XOENfj&llef1_)1iizn vD`n8hXwVnfY+yBs8MQvFi4}HTrRGfEuDHIUbDmRut?=RhtpCzU-RAuViqKhy literal 0 HcmV?d00001 diff --git a/src/TrackingCollectionTests/TrackingCollectionTests.cs b/src/TrackingCollectionTests/TrackingCollectionTests.cs index 8497813f83..23d7058eda 100644 --- a/src/TrackingCollectionTests/TrackingCollectionTests.cs +++ b/src/TrackingCollectionTests/TrackingCollectionTests.cs @@ -10,23 +10,27 @@ using System.Reactive.Subjects; using System.Threading; using NUnit.Framework; +using System.Reactive; [TestFixture] public class TrackingTests : TestBase { +#if !DISABLE_REACTIVE_UI [TestFixtureSetUp] public void Setup() { Splat.ModeDetector.Current.SetInUnitTestRunner(true); } +#endif [Test] public void OrderByUpdatedNoFilter() { var count = 6; - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); @@ -69,10 +73,11 @@ public void OrderByUpdatedNoFilter() public void OrderByUpdatedFilter() { var count = 3; - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => true); + (item, position, list) => true, + OrderedComparer.OrderBy(x => x.UpdatedAt).Compare); col.ProcessingDelay = TimeSpan.Zero; var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); @@ -118,10 +123,11 @@ public void OnlyIndexes2To4() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => position >= 2 && position <= 4); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -142,7 +148,7 @@ public void OnlyIndexes2To4() Assert.AreEqual(3, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1.Reverse(), col.DebugInternalList); + CollectionAssert.AreEqual(list1.Reverse(), (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, new List() { list1[3], list1[2], list1[1] }); @@ -157,10 +163,11 @@ public void OnlyTimesEqualOrHigherThan3Minutes() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => item.UpdatedAt >= Now + TimeSpan.FromMinutes(3) && item.UpdatedAt <= Now + TimeSpan.FromMinutes(5)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -181,7 +188,7 @@ public void OnlyTimesEqualOrHigherThan3Minutes() Assert.AreEqual(3, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1.Reverse(), col.DebugInternalList); + CollectionAssert.AreEqual(list1.Reverse(), (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, new List() { list1[2], list1[1], list1[0] }); col.Dispose(); @@ -195,9 +202,10 @@ public void OrderByDescendingNoFilter() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -217,7 +225,7 @@ public void OrderByDescendingNoFilter() Assert.AreEqual(6, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1, col.DebugInternalList); + CollectionAssert.AreEqual(list1, (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, list1); @@ -243,9 +251,10 @@ public void OrderByDescendingNoFilter1000() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -285,74 +294,24 @@ public void OrderByDescendingNoFilter1000() } - [Test, Category("Timings")] - public void ProcessingDelayPingsRegularly() - { - int count, total; - count = total = 400; - - var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i)).ToList()); - - var col = new TrackingCollection( - list1.ToObservable().Delay(TimeSpan.FromMilliseconds(10)), - OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); - col.ProcessingDelay = TimeSpan.FromMilliseconds(10); - - var sub = new Subject(); - var times = new List(); - sub.Subscribe(t => - { - times.Add(DateTimeOffset.UtcNow); - }); - - count = 0; - - var evt = new ManualResetEvent(false); - col.Subscribe(t => - { - sub.OnNext(t); - if (++count == list1.Count) - { - sub.OnCompleted(); - evt.Set(); - } - }, () => { }); - - - evt.WaitOne(); - evt.Reset(); - - Assert.AreEqual(total, col.Count); - - CollectionAssert.AreEqual(col, list1); - - long totalTime = 0; - - for (var j = 1; j < times.Count; j++) - totalTime += (times[j] - times[j - 1]).Ticks; - var avg = TimeSpan.FromTicks(totalTime / times.Count).TotalMilliseconds; - Assert.GreaterOrEqual(avg, 9); - Assert.LessOrEqual(avg, 12); - col.Dispose(); - } [Test] public void NotInitializedCorrectlyThrows1() { - var col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); Assert.Throws(() => col.Subscribe()); } [Test] public void NotInitializedCorrectlyThrows2() { - var col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); Assert.Throws(() => col.Subscribe(_ => { }, () => { })); } [Test] public void NoChangingAfterDisposed1() { - var col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.Dispose(); Assert.Throws(() => col.AddItem(new Thing())); } @@ -360,7 +319,7 @@ public void NoChangingAfterDisposed1() [Test] public void NoChangingAfterDisposed2() { - var col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.Dispose(); Assert.Throws(() => col.RemoveItem(new Thing())); } @@ -374,10 +333,11 @@ public void FilterTitleRun2() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -418,14 +378,16 @@ public void OrderByDoesntMatchOriginalOrderTimings() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); var start = DateTimeOffset.UtcNow; + col.Subscribe(t => { if (++count == list1.Count) @@ -468,10 +430,11 @@ public void OrderByMatchesOriginalOrder() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, total - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, total - i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; count = 0; @@ -509,9 +472,10 @@ public void SortingTest() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -703,10 +667,11 @@ public void SortingTestWithFilterTrue() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => true); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -891,10 +856,11 @@ public void SortingTestWithFilterBetween6And12() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.UpdatedAt.Minute >= 6 && item.UpdatedAt.Minute <= 12); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1045,10 +1011,11 @@ public void SortingTestWithFilterPosition2to4() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position >= 2 && position <= 4); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1192,10 +1159,11 @@ public void SortingTestWithFilterPosition1And3to4() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position == 1 || (position >= 3 && position <= 4)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1355,11 +1323,12 @@ public void SortingTestWithFilterMoves() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => (position >= 1 && position <= 2) || (position >= 5 && position <= 7)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => (position >= 1 && position <= 2) || (position >= 5 && position <= 7)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1424,11 +1393,12 @@ public void ChangingItemContentRemovesItFromFilteredList() var source = new Subject(); var now = new DateTimeOffset(0, TimeSpan.FromTicks(0)); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare, - (item, position, list) => item.UpdatedAt < now + TimeSpan.FromMinutes(6)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt < now + TimeSpan.FromMinutes(6)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1471,11 +1441,12 @@ public void ChangingItemContentRemovesItFromFilteredList2() var source = new Subject(); var now = new DateTimeOffset(0, TimeSpan.FromTicks(0)); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare, - (item, position, list) => item.UpdatedAt > now + TimeSpan.FromMinutes(2) && item.UpdatedAt < now + TimeSpan.FromMinutes(8)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt > now + TimeSpan.FromMinutes(2) && item.UpdatedAt < now + TimeSpan.FromMinutes(8)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1554,12 +1525,12 @@ public void ChangingItemContentRemovesItFromFilteredList2() public void ChangingFilterUpdatesCollection() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)) - { ProcessingDelay = TimeSpan.Zero }; - + (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1613,11 +1584,12 @@ public void ChangingFilterUpdatesCollection() public void ChangingSortUpdatesCollection() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var evt = new ManualResetEvent(false); @@ -1655,7 +1627,7 @@ public void ChangingSortUpdatesCollection() [Test] public void AddingItemsToCollectionManuallyThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); Assert.Throws(() => col.Add(GetThing(1))); col.Dispose(); } @@ -1663,7 +1635,7 @@ public void AddingItemsToCollectionManuallyThrows() [Test] public void InsertingItemsIntoCollectionManuallyThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); Assert.Throws(() => col.Insert(0, GetThing(1))); col.Dispose(); } @@ -1672,7 +1644,7 @@ public void InsertingItemsIntoCollectionManuallyThrows() public void MovingItemsIntoCollectionManuallyThrows() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1687,7 +1659,7 @@ public void MovingItemsIntoCollectionManuallyThrows() Add(source, GetThing(2, 2)); evt.WaitOne(); evt.Reset(); - Assert.Throws(() => col.Move(0, 1)); + Assert.Throws(() => (col as TrackingCollection).Move(0, 1)); col.Dispose(); } @@ -1695,7 +1667,7 @@ public void MovingItemsIntoCollectionManuallyThrows() public void RemovingItemsFromCollectionManuallyThrows() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1718,7 +1690,7 @@ public void RemovingItemsFromCollectionManuallyThrows() public void RemovingItemsFromCollectionManuallyThrows2() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1743,7 +1715,9 @@ public void ChangingComparers() { var source = new Subject(); - var col = new TrackingCollection(source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var evt = new ManualResetEvent(false); @@ -1782,16 +1756,16 @@ public void Removing() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => (position > 2 && position < 5) || (position > 6 && position < 8)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => (position > 2 && position < 5) || (position > 6 && position < 8)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; var evt = new ManualResetEvent(false); - col.Subscribe(t => { if (++count == expectedCount) @@ -1863,7 +1837,7 @@ public void Removing() [Test] public void DisposingThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); col.Dispose(); Assert.Throws(() => col.Filter = null); Assert.Throws(() => col.Comparer = null); @@ -1887,16 +1861,20 @@ public void MultipleSortingAndFiltering() var titles2 = Enumerable.Range(1, expectedTotal).Select(x => ((char)('c' + x)).ToString()).ToList(); var dates2 = Enumerable.Range(1, expectedTotal).Select(x => Now + TimeSpan.FromMinutes(x)).ToList(); - - var idstack2 = new Stack(Enumerable.Range(1, expectedTotal).OrderBy(rnd.Next)); - var datestack2 = new Stack(new List() { + var dates2mixed = new List() { dates2[2], dates2[0], dates2[1], dates2[3], dates2[5], dates2[9], dates2[15], dates2[6], dates2[7], dates2[8], dates2[13], dates2[10], dates2[16], dates2[11], dates2[12], dates2[14], dates2[17], dates2[18], dates2[19], dates2[4], - }); + }; + var idstack2 = new Stack(Enumerable.Range(1, expectedTotal).OrderBy(rnd.Next)); + var datestack2 = new Stack(dates2mixed); var titlestack2 = new Stack(titles2.OrderBy(_ => rnd.Next())); + var datestack3 = new Stack(); + for (int i = 0; i < datestack1.Count; i++) + datestack3.Push(new DateTimeOffset(Math.Max(dates1[i].Ticks, dates2mixed[i].Ticks), TimeSpan.Zero)); + var list1 = Observable.Defer(() => Enumerable.Range(1, expectedTotal) .OrderBy(rnd.Next) .Select(x => new Thing(idstack1.Pop(), titlestack1.Pop(), datestack1.Pop())) @@ -1911,20 +1889,22 @@ public void MultipleSortingAndFiltering() .Replay() .RefCount(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.Concat(list2), OrderedComparer.OrderByDescending(x => x.CreatedAt).Compare, (item, idx, list) => idx < 5 - ) - { ProcessingDelay = TimeSpan.Zero }; + ); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; - var count = 0; var evt = new ManualResetEvent(false); - col.Subscribe(t => - { - if (++count == expectedTotal * 2) - evt.Set(); - }, () => { }); + col.OriginalCompleted.Subscribe(_ => evt.Set()); + col.Subscribe(); + //col.Subscribe(t => + //{ + // if (++count == expectedTotal * 2) + // evt.Set(); + //}, () => { }); evt.WaitOne(); evt.Reset(); @@ -1932,30 +1912,60 @@ public void MultipleSortingAndFiltering() // it's initially sorted by date, so id list should not match CollectionAssert.AreNotEqual(list1.Select(x => x.Number).ToEnumerable(), list2.Select(x => x.Number).ToEnumerable()); - var sortlist = list1.ToEnumerable().ToArray(); + var sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderByDescending(x => x.CreatedAt).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.Number).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.Number).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.CreatedAt).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.CreatedAt).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderByDescending(x => x.Title).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderByDescending(x => x.Title).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.Title).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.Title).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Dispose(); } + + [Test] + public void ListeningTwiceWorks() + { + var count = 10; + ITrackingCollection col = new TrackingCollection(); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; + + var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); + var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, i + count, "Run 2")).ToList()); + + var subj = new ReplaySubject(); + subj.OnNext(Unit.Default); + var disp = col.OriginalCompleted.Subscribe(x => subj.OnCompleted()); + col.Listen(list1.ToObservable()); + col.Subscribe(); + subj.Wait(); + + disp.Dispose(); + col.Listen(list2.ToObservable()); + subj = new ReplaySubject(); + subj.OnNext(Unit.Default); + disp = col.OriginalCompleted.Subscribe(x => subj.OnCompleted()); + col.Subscribe(); + subj.Wait(); + disp.Dispose(); + + CollectionAssert.AreEqual(list2, col); + } } diff --git a/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs b/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs index 77e69d9846..a7a966ca51 100644 --- a/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs +++ b/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs @@ -13,6 +13,7 @@ using Octokit; using Xunit; using System.Globalization; +using System.Reactive.Subjects; using System.Threading; using GitHub.Models; using GitHub.Primitives; @@ -441,20 +442,25 @@ public async Task NonExpiredIndexReturnsCache() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { if (++count == expected) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Collection(col, col.Select(x => new Action(t => Assert.True(x.Title.StartsWith("Cache")))).ToArray()); } @@ -503,20 +509,25 @@ public async Task ExpiredIndexReturnsLive() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { if (++count == expected * 2) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Collection(col, col.Select(x => new Action(t => Assert.True(x.Title.StartsWith("Live")))).ToArray()); } @@ -565,23 +576,28 @@ public async Task ExpiredIndexClearsItems() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { // we get all the items from the cache (items 1-5), all the items from the live (items 5-9), // and 4 deletions (items 1-4) because the cache expired the items that were not // a part of the live data if (++count == 14) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Equal(5, col.Count); Assert.Collection(col, From 371483bcb7891da247c930db236133187bd4a4ab Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Thu, 23 Jun 2016 17:04:01 +0200 Subject: [PATCH 5/8] Go back to using full objects when removing --- src/GitHub.App/Extensions/AkavacheExtensions.cs | 7 ++++--- src/GitHub.App/Models/PullRequestModel.cs | 7 ------- src/GitHub.App/Services/ModelService.cs | 4 ---- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/GitHub.App/Extensions/AkavacheExtensions.cs b/src/GitHub.App/Extensions/AkavacheExtensions.cs index e18e881f10..0b4545258b 100644 --- a/src/GitHub.App/Extensions/AkavacheExtensions.cs +++ b/src/GitHub.App/Extensions/AkavacheExtensions.cs @@ -166,7 +166,7 @@ public static IObservable GetAndFetchLatestFromIndex( this IBlobCache blobCache, string key, Func> fetchFunc, - Action removedItemsCallback, + Action removedItemsCallback, TimeSpan refreshInterval, TimeSpan maxCacheDuration) where T : CacheItem @@ -194,7 +194,7 @@ public static IObservable GetAndFetchLatestFromIndex( static IObservable GetAndFetchLatestFromIndex(this IBlobCache This, string key, Func> fetchFunc, - Action removedItemsCallback, + Action removedItemsCallback, Func fetchPredicate = null, DateTimeOffset? absoluteExpiration = null, bool shouldInvalidateOnError = false) @@ -237,7 +237,8 @@ static IObservable GetAndFetchLatestFromIndex(this IBlobCache This, var list = index.OldKeys.Except(index.Keys); if (!list.Any()) return; - foreach (var d in list) + var removed = await This.GetObjects(list); + foreach (var d in removed.Values) removedItemsCallback(d); await This.InvalidateObjects(list); }) diff --git a/src/GitHub.App/Models/PullRequestModel.cs b/src/GitHub.App/Models/PullRequestModel.cs index f2d1afde59..43f47d2d4f 100644 --- a/src/GitHub.App/Models/PullRequestModel.cs +++ b/src/GitHub.App/Models/PullRequestModel.cs @@ -23,13 +23,6 @@ public PullRequestModel(int number, string title, UpdatedAt = updatedAt ?? CreatedAt; } - internal PullRequestModel(int number) - { - Number = number; - Title = ""; - Author = new AccountDesigner(); - } - public void CopyFrom(IPullRequestModel other) { if (!Equals(other)) diff --git a/src/GitHub.App/Services/ModelService.cs b/src/GitHub.App/Services/ModelService.cs index dbf4ade2f6..972c9737f7 100644 --- a/src/GitHub.App/Services/ModelService.cs +++ b/src/GitHub.App/Services/ModelService.cs @@ -273,10 +273,6 @@ IPullRequestModel Create(PullRequestCacheItem prCacheItem) }; } - static IPullRequestModel Create(string key) - { - return new PullRequestModel(Int32.Parse(key.Split('|').Last(), CultureInfo.InvariantCulture)); - } public IObservable InsertUser(AccountCacheItem user) { From 13cd10e0f6b34696b5dac83cf9ac19e46fb06f86 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Wed, 22 Jun 2016 17:40:09 +0200 Subject: [PATCH 6/8] Clean up a tad --- src/GitHub.App/Extensions/AkavacheExtensions.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/GitHub.App/Extensions/AkavacheExtensions.cs b/src/GitHub.App/Extensions/AkavacheExtensions.cs index 0b4545258b..7f4c4cb90e 100644 --- a/src/GitHub.App/Extensions/AkavacheExtensions.cs +++ b/src/GitHub.App/Extensions/AkavacheExtensions.cs @@ -208,21 +208,17 @@ static IObservable GetAndFetchLatestFromIndex(this IBlobCache This, .Where(predicateIsTrue => predicateIsTrue.Item2) .Select(x => x.Item1) .Select(index => index.Clear()) - .SelectMany(index => - { - var fetchObs = fetchFunc() + .SelectMany(index => fetchFunc() .Catch(ex => { var shouldInvalidate = shouldInvalidateOnError ? This.InvalidateObject(key) : Observable.Return(Unit.Default); return shouldInvalidate.SelectMany(__ => Observable.Throw(ex)); - }); - - return fetchObs + }) .SelectMany(x => x.Save(This, key, absoluteExpiration)) - .Do(x => index.Add(key, x)); - }); + .Do(x => index.Add(key, x)) + ); var cache = idx .SelectMany(index => This.GetObjects(index.Keys.ToList())) From abc934d25dc2db715298558b76a94f08b0560444 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Wed, 22 Jun 2016 18:08:11 +0200 Subject: [PATCH 7/8] Fix a bunch of things in the tracking collection See https://github.com/shana/handy-things/commits/79d3baa for a list of fixes --- .../Collections/TrackingCollection.cs | 55 ++++++++++--------- .../TrackingCollectionTests.cs | 10 +--- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs index a41c52a36d..d02540c6b6 100644 --- a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs +++ b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs @@ -15,7 +15,6 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; -using System.Threading.Tasks; namespace GitHub.Collections { @@ -30,7 +29,7 @@ namespace GitHub.Collections /// /// public class TrackingCollection : ObservableCollection, ITrackingCollection, IDisposable - where T : class, ICopyable, IComparable + where T : class, ICopyable { enum TheAction { @@ -38,7 +37,8 @@ enum TheAction Move, Add, Insert, - Remove + Remove, + Ignore } bool isChanging; @@ -83,10 +83,7 @@ enum TheAction public TimeSpan ProcessingDelay { get { return requestedDelay; } - set - { - requestedDelay = value; - } + set { requestedDelay = value; } } bool ManualProcessing => cache.IsEmpty && originalSourceIsCompleted; @@ -105,7 +102,7 @@ public TrackingCollection(Func comparer = null, Func #endif this.comparer = comparer ?? Comparer.Default.Compare; this.filter = filter; - this.newer = newer ?? Comparer.Default.Compare; + this.newer = newer; } public TrackingCollection(IObservable source, @@ -132,8 +129,12 @@ public IObservable Listen(IObservable obs) Reset(); + // ManualResetEvent uses the realtime clock for accurate <50ms delays var waitHandle = new ManualResetEventSlim(); + // empty the source observable as fast as possible + // to the cache queue, and signal that data is available + // for processing dataPump = obs .Do(data => { @@ -147,7 +148,6 @@ public IObservable Listen(IObservable obs) }) .Publish(); - // when both signalHaveData and signalNeedData produce a value, dataListener gets a value // this will empty the queue of items that have been cached in regular intervals according // to the requested delay @@ -167,7 +167,8 @@ public IObservable Listen(IObservable obs) source = dataListener .Where(data => data.Item != null) .ObserveOn(scheduler) - .Select(data => { + .Select(data => + { data = ProcessItem(data, original); // if we're removing an item that doesn't exist, ignore it @@ -197,7 +198,6 @@ public IObservable Listen(IObservable obs) signalOriginalSourceCompletion = false; originalSourceCompleted.OnNext(Unit.Default); } - } else signalNeedData.OnNext(Unit.Default); @@ -339,16 +339,6 @@ void SetAndRecalculateFilter(Func, bool> newFilter) #region Source pipeline processing - ActionData CheckFilter(ActionData data) - { - var isIncluded = true; - if (data.TheAction == TheAction.Remove) - isIncluded = false; - else if (filter != null) - isIncluded = filter(data.Item, data.Position, this); - return new ActionData(data, isIncluded); - } - int StartQueue() { disposables.Add(cachePump.Connect()); @@ -382,11 +372,12 @@ ActionData ProcessItem(ActionData data, List list) if (idx >= 0) { var old = list[idx]; - var isNewer = newer(item, old); - - // the object is "older" than the one we have, ignore it - if (isNewer > 0) - ret = new ActionData(TheAction.None, list, item, null, idx, idx); + if (newer != null) + { + // the object is "older" than the one we have, ignore it + if (newer(item, old) > 0) + return new ActionData(TheAction.Ignore, list, item, null, idx, idx); + } var comparison = comparer(item, old); @@ -466,10 +457,20 @@ ActionData SortedRemove(ActionData data) // unfiltered list update sortedIndexCache.Remove(data.Item); UpdateIndexCache(data.List.Count - 1, data.OldPosition, data.List, sortedIndexCache); - original.Remove(data.Item); + data.List.Remove(data.Item); return data; } + ActionData CheckFilter(ActionData data) + { + var isIncluded = true; + if (data.TheAction == TheAction.Remove) + isIncluded = false; + else if (filter != null) + isIncluded = filter(data.Item, data.Position, this); + return new ActionData(data, isIncluded); + } + ActionData FilteredAdd(ActionData data) { if (data.TheAction != TheAction.Add) diff --git a/src/TrackingCollectionTests/TrackingCollectionTests.cs b/src/TrackingCollectionTests/TrackingCollectionTests.cs index 23d7058eda..2fa38a4fc7 100644 --- a/src/TrackingCollectionTests/TrackingCollectionTests.cs +++ b/src/TrackingCollectionTests/TrackingCollectionTests.cs @@ -77,7 +77,7 @@ public void OrderByUpdatedFilter() Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => true, - OrderedComparer.OrderBy(x => x.UpdatedAt).Compare); + OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.ProcessingDelay = TimeSpan.Zero; var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); @@ -475,7 +475,6 @@ public void SortingTest() ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -671,7 +670,6 @@ public void SortingTestWithFilterTrue() source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => true); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -860,7 +858,6 @@ public void SortingTestWithFilterBetween6And12() source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.UpdatedAt.Minute >= 6 && item.UpdatedAt.Minute <= 12); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1015,7 +1012,6 @@ public void SortingTestWithFilterPosition2to4() source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position >= 2 && position <= 4); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1163,7 +1159,6 @@ public void SortingTestWithFilterPosition1And3to4() source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position == 1 || (position >= 3 && position <= 4)); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1327,7 +1322,6 @@ public void SortingTestWithFilterMoves() source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => (position >= 1 && position <= 2) || (position >= 5 && position <= 7)); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1445,7 +1439,6 @@ public void ChangingItemContentRemovesItFromFilteredList2() source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare, (item, position, list) => item.UpdatedAt > now + TimeSpan.FromMinutes(2) && item.UpdatedAt < now + TimeSpan.FromMinutes(8)); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; @@ -1760,7 +1753,6 @@ public void Removing() source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => (position > 2 && position < 5) || (position > 6 && position < 8)); - col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var count = 0; From 612f106d4e29e9bc875cb6a737305114fff31e97 Mon Sep 17 00:00:00 2001 From: Andreia Gaita Date: Fri, 24 Jun 2016 18:58:26 +0200 Subject: [PATCH 8/8] Fix tests --- .../TrackingCollectionTests.cs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/TrackingCollectionTests/TrackingCollectionTests.cs b/src/TrackingCollectionTests/TrackingCollectionTests.cs index 2fa38a4fc7..b380e5b830 100644 --- a/src/TrackingCollectionTests/TrackingCollectionTests.cs +++ b/src/TrackingCollectionTests/TrackingCollectionTests.cs @@ -1888,18 +1888,9 @@ public void MultipleSortingAndFiltering() ); col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; - - var evt = new ManualResetEvent(false); - col.OriginalCompleted.Subscribe(_ => evt.Set()); col.Subscribe(); - //col.Subscribe(t => - //{ - // if (++count == expectedTotal * 2) - // evt.Set(); - //}, () => { }); - evt.WaitOne(); - evt.Reset(); + col.OriginalCompleted.Wait(); // it's initially sorted by date, so id list should not match CollectionAssert.AreNotEqual(list1.Select(x => x.Number).ToEnumerable(), list2.Select(x => x.Number).ToEnumerable());