diff --git a/src/GitHub.App/Caches/CacheIndex.cs b/src/GitHub.App/Caches/CacheIndex.cs index a9016470e1..8f3ae7aad7 100644 --- a/src/GitHub.App/Caches/CacheIndex.cs +++ b/src/GitHub.App/Caches/CacheIndex.cs @@ -21,6 +21,15 @@ public CacheIndex() OldKeys = new List(); } + public CacheIndex Add(string indexKey, CacheItem item) + { + var k = string.Format(CultureInfo.InvariantCulture, "{0}|{1}", IndexKey, item.Key); + if (!Keys.Contains(k)) + Keys.Add(k); + UpdatedAt = DateTimeOffset.UtcNow; + return this; + } + public IObservable AddAndSave(IBlobCache cache, string indexKey, CacheItem item, DateTimeOffset? absoluteExpiration = null) { @@ -29,7 +38,7 @@ public IObservable AddAndSave(IBlobCache cache, string indexKey, Cac Keys.Add(k); UpdatedAt = DateTimeOffset.UtcNow; return cache.InsertObject(IndexKey, this, absoluteExpiration) - .Select(x => this); + .Select(x => this); } public static IObservable AddAndSaveToIndex(IBlobCache cache, string indexKey, CacheItem item, @@ -47,15 +56,19 @@ public static IObservable AddAndSaveToIndex(IBlobCache cache, string .Select(x => index)); } - public IObservable Clear(IBlobCache cache, string indexKey, DateTimeOffset? absoluteExpiration = null) + public CacheIndex Clear() { OldKeys = Keys.ToList(); Keys.Clear(); UpdatedAt = DateTimeOffset.UtcNow; - return cache - .InvalidateObject(indexKey) - .SelectMany(_ => cache.InsertObject(indexKey, this, absoluteExpiration)) - .Select(_ => this); + return this; + } + + public IObservable Save(IBlobCache cache, + DateTimeOffset? absoluteExpiration = null) + { + return cache.InsertObject(IndexKey, this, absoluteExpiration) + .Select(x => this); } [AllowNull] diff --git a/src/GitHub.App/Extensions/AkavacheExtensions.cs b/src/GitHub.App/Extensions/AkavacheExtensions.cs index 87fa112fc2..7f4c4cb90e 100644 --- a/src/GitHub.App/Extensions/AkavacheExtensions.cs +++ b/src/GitHub.App/Extensions/AkavacheExtensions.cs @@ -4,6 +4,7 @@ using System.Reactive.Linq; using Akavache; using GitHub.Caches; +using System.Threading.Tasks; namespace GitHub.Extensions { @@ -199,39 +200,45 @@ static IObservable GetAndFetchLatestFromIndex(this IBlobCache This, bool shouldInvalidateOnError = false) where T : CacheItem { - var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) + var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount(); + + + var fetch = idx .Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt))) .Where(predicateIsTrue => predicateIsTrue.Item2) .Select(x => x.Item1) - .SelectMany(index => index.Clear(This, key, absoluteExpiration)) - .SelectMany(index => - { - var fetchObs = fetchFunc().Catch(ex => - { - var shouldInvalidate = shouldInvalidateOnError ? - This.InvalidateObject(key) : - Observable.Return(Unit.Default); - return shouldInvalidate.SelectMany(__ => Observable.Throw(ex)); - }); - - return fetchObs - .SelectMany(x => x.Save(This, key, absoluteExpiration)) - .Do(x => index.AddAndSave(This, key, x, absoluteExpiration)) - .Finally(() => + .Select(index => index.Clear()) + .SelectMany(index => fetchFunc() + .Catch(ex => { - This.GetObjects(index.OldKeys.Except(index.Keys)) - .Do(dict => This.InvalidateObjects(dict.Keys)) - .SelectMany(dict => dict.Values) - .Do(removedItemsCallback) - .Subscribe(); - }); - })); - - var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) - .SelectMany(index => This.GetObjects(index.Keys)) - .SelectMany(dict => dict.Values)); - - return cache.Merge(fetch).Replay().RefCount(); + var shouldInvalidate = shouldInvalidateOnError ? + This.InvalidateObject(key) : + Observable.Return(Unit.Default); + return shouldInvalidate.SelectMany(__ => Observable.Throw(ex)); + }) + .SelectMany(x => x.Save(This, key, absoluteExpiration)) + .Do(x => index.Add(key, x)) + ); + + var cache = idx + .SelectMany(index => This.GetObjects(index.Keys.ToList())) + .SelectMany(dict => dict.Values); + + return cache.Merge(fetch) + .Finally(async () => + { + var index = await idx; + await index.Save(This); + + var list = index.OldKeys.Except(index.Keys); + if (!list.Any()) + return; + var removed = await This.GetObjects(list); + foreach (var d in removed.Values) + removedItemsCallback(d); + await This.InvalidateObjects(list); + }) + .Replay().RefCount(); } static bool IsExpired(IBlobCache blobCache, DateTimeOffset itemCreatedAt, TimeSpan cacheDuration) diff --git a/src/GitHub.App/Models/PullRequestModel.cs b/src/GitHub.App/Models/PullRequestModel.cs index c9d531aead..43f47d2d4f 100644 --- a/src/GitHub.App/Models/PullRequestModel.cs +++ b/src/GitHub.App/Models/PullRequestModel.cs @@ -4,6 +4,7 @@ using GitHub.VisualStudio.Helpers; using NullGuard; using System.Diagnostics; +using GitHub.SampleData; namespace GitHub.Models { diff --git a/src/GitHub.App/Models/RepositoryHost.cs b/src/GitHub.App/Models/RepositoryHost.cs index c6a1db331d..852d54b042 100644 --- a/src/GitHub.App/Models/RepositoryHost.cs +++ b/src/GitHub.App/Models/RepositoryHost.cs @@ -252,8 +252,8 @@ IObservable LoginWithApiUser(UserAndScopes userAndScopes) { if (result.IsSuccess()) { - IsLoggedIn = true; SupportsGist = userAndScopes.Scopes?.Contains("gist") ?? true; + IsLoggedIn = true; } log.Info("Log in from cache for login '{0}' to host '{1}' {2}", diff --git a/src/GitHub.App/Services/ModelService.cs b/src/GitHub.App/Services/ModelService.cs index a281854224..972c9737f7 100644 --- a/src/GitHub.App/Services/ModelService.cs +++ b/src/GitHub.App/Services/ModelService.cs @@ -16,6 +16,7 @@ using NLog; using NullGuard; using Octokit; +using ReactiveUI; namespace GitHub.Services { @@ -136,7 +137,7 @@ public IObservable GetUserFromCache() } public ITrackingCollection GetPullRequests(ISimpleRepositoryModel repo, - [AllowNull]ITrackingCollection collection = null) + ITrackingCollection collection) { // Since the api to list pull requests returns all the data for each pr, cache each pr in its own entry // and also cache an index that contains all the keys for each pr. This way we can fetch prs in bulk @@ -147,9 +148,6 @@ public ITrackingCollection GetPullRequests(ISimpleRepositoryM var keyobs = GetUserFromCache() .Select(user => string.Format(CultureInfo.InvariantCulture, "{0}|{1}|pr", user.Login, repo.Name)); - if (collection == null) - collection = new TrackingCollection(); - var source = Observable.Defer(() => keyobs .SelectMany(key => hostCache.GetAndFetchLatestFromIndex(key, () => @@ -275,6 +273,7 @@ IPullRequestModel Create(PullRequestCacheItem prCacheItem) }; } + public IObservable InsertUser(AccountCacheItem user) { return hostCache.InsertObject("user", user); diff --git a/src/GitHub.App/ViewModels/PullRequestListViewModel.cs b/src/GitHub.App/ViewModels/PullRequestListViewModel.cs index 4187ec477d..4f4f2caa75 100644 --- a/src/GitHub.App/ViewModels/PullRequestListViewModel.cs +++ b/src/GitHub.App/ViewModels/PullRequestListViewModel.cs @@ -79,6 +79,7 @@ public PullRequestListViewModel(IRepositoryHost repositoryHost, ISimpleRepositor PullRequests = new TrackingCollection(); pullRequests.Comparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; pullRequests.Filter = (pr, i, l) => pr.IsOpen; + pullRequests.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; } public override void Initialize([AllowNull] ViewWithData data) diff --git a/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs b/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs index c6d1c13dc7..7faee925fd 100644 --- a/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs +++ b/src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Collections.Specialized; +using System.Reactive; namespace GitHub.Collections { @@ -32,11 +33,22 @@ public interface ITrackingCollection : IDisposable, IList where T : ICopya /// /// The comparer method for sorting, or null if not sorting Func Comparer { get; set; } + /// /// Set a new filter. This will cause the collection to be filtered /// /// The new filter, or null to not have any filtering Func, bool> Filter { get; set; } + + /// + /// Set a comparer that determines whether the item being processed is newer than the same + /// item seen before. This is to prevent stale items from overriding newer items when data + /// is coming simultaneously from cache and from live data. Use a timestamp-like comparison + /// for best results + /// + /// The comparer method for sorting, or null if not sorting + Func NewerComparer { get; set; } + void AddItem(T item); void RemoveItem(T item); /// @@ -44,5 +56,6 @@ public interface ITrackingCollection : IDisposable, IList where T : ICopya /// TimeSpan ProcessingDelay { get; set; } event NotifyCollectionChangedEventHandler CollectionChanged; + IObservable OriginalCompleted { get; } } } \ No newline at end of file diff --git a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs index a10b0a4752..d02540c6b6 100644 --- a/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs +++ b/src/GitHub.Exports.Reactive/Collections/TrackingCollection.cs @@ -1,8 +1,8 @@ #if !DISABLE_REACTIVEUI +using GitHub.VisualStudio.Helpers; using ReactiveUI; #else using System.Windows.Threading; -using System.Threading; #endif using System; using System.Collections.Concurrent; @@ -13,6 +13,8 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; namespace GitHub.Collections { @@ -27,7 +29,7 @@ namespace GitHub.Collections /// /// public class TrackingCollection : ObservableCollection, ITrackingCollection, IDisposable - where T : class, ICopyable, IComparable + where T : class, ICopyable { enum TheAction { @@ -35,19 +37,29 @@ enum TheAction Move, Add, Insert, - Remove + Remove, + Ignore } bool isChanging; - readonly CompositeDisposable disposables = new CompositeDisposable(); - IObservable source; - IObservable sourceQueue; Func comparer; Func, bool> filter; - readonly IScheduler scheduler; - ConcurrentQueue queue; + Func newer; // comparer to check whether the item being processed is newer than the existing one + IObservable source; + IConnectableObservable dataPump; + IConnectableObservable cachePump; + ConcurrentQueue cache; + + Subject signalHaveData; + Subject signalNeedData; + Subject dataListener; + + bool resetting = false; + + readonly CompositeDisposable disposables = new CompositeDisposable(); + readonly IScheduler scheduler; readonly List original = new List(); #if DEBUG public IList DebugInternalList => original; @@ -60,22 +72,26 @@ enum TheAction // for speeding up IndexOf in the filtered list readonly Dictionary filteredIndexCache = new Dictionary(); - TimeSpan delay; + bool originalSourceIsCompleted; + bool signalOriginalSourceCompletion; + readonly Subject originalSourceCompleted = new Subject(); + public IObservable OriginalCompleted => originalSourceCompleted; + TimeSpan requestedDelay; readonly TimeSpan fuzziness; + public TimeSpan ProcessingDelay { get { return requestedDelay; } - set - { - requestedDelay = value; - delay = value; - } + set { requestedDelay = value; } } - public TrackingCollection(Func comparer = null, Func, bool> filter = null, IScheduler scheduler = null) + bool ManualProcessing => cache.IsEmpty && originalSourceIsCompleted; + + public TrackingCollection(Func comparer = null, Func, bool> filter = null, + Func newer = null, IScheduler scheduler = null) { - queue = new ConcurrentQueue(); + cache = new ConcurrentQueue(); ProcessingDelay = TimeSpan.FromMilliseconds(10); fuzziness = TimeSpan.FromMilliseconds(1); @@ -86,14 +102,15 @@ public TrackingCollection(Func comparer = null, Func #endif this.comparer = comparer ?? Comparer.Default.Compare; this.filter = filter; + this.newer = newer; } - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors")] public TrackingCollection(IObservable source, Func comparer = null, Func, bool> filter = null, + Func newer = null, IScheduler scheduler = null) - : this(comparer, filter, scheduler) + : this(comparer, filter, newer, scheduler) { Listen(source); } @@ -110,37 +127,82 @@ public IObservable Listen(IObservable obs) if (disposed) throw new ObjectDisposedException("TrackingCollection"); - disposables.Clear(); - while (!queue.IsEmpty) - GetFromQueue(); + Reset(); - sourceQueue = obs - .Do(data => queue.Enqueue(new ActionData(data))); + // ManualResetEvent uses the realtime clock for accurate <50ms delays + var waitHandle = new ManualResetEventSlim(); - source = Observable.Defer(() => { - StartQueue(); - return Observable.Timer(TimeSpan.Zero, delay) - .Select(_ => GetFromQueue()); - }) + // empty the source observable as fast as possible + // to the cache queue, and signal that data is available + // for processing + dataPump = obs + .Do(data => + { + cache.Enqueue(new ActionData(data)); + signalHaveData.OnNext(Unit.Default); + }) + .Finally(() => + { + originalSourceIsCompleted = true; + signalOriginalSourceCompletion = true; + }) + .Publish(); + + // when both signalHaveData and signalNeedData produce a value, dataListener gets a value + // this will empty the queue of items that have been cached in regular intervals according + // to the requested delay + cachePump = signalHaveData + .Zip(signalNeedData, (a, b) => Unit.Default) + .ObserveOn(TaskPoolScheduler.Default) + .TimeInterval() + .Select(interval => + { + var delay = CalculateProcessingDelay(interval); + waitHandle.Wait(delay); + dataListener.OnNext(GetFromQueue()); + return Unit.Default; + }) + .Publish(); + + source = dataListener .Where(data => data.Item != null) .ObserveOn(scheduler) - .Select(x => ProcessItem(x, original)) - // if we're removing an item that doesn't exist, ignore it - .Where(data => !(data.TheAction == TheAction.Remove && data.OldPosition < 0)) - .Select(SortedNone) - .Select(SortedAdd) - .Select(SortedInsert) - .Select(SortedMove) - .Select(SortedRemove) - .Select(CheckFilter) - .Select(FilteredAdd) - .Select(CalculateIndexes) - .Select(FilteredNone) - .Select(FilteredInsert) - .Select(FilteredMove) - .Select(FilteredRemove) - .TimeInterval() - .Select(UpdateProcessingDelay) + .Select(data => + { + data = ProcessItem(data, original); + + // if we're removing an item that doesn't exist, ignore it + if (data.TheAction == TheAction.Remove && data.OldPosition < 0) + return ActionData.Default; + + data = SortedNone(data); + data = SortedAdd(data); + data = SortedInsert(data); + data = SortedMove(data); + data = SortedRemove(data); + data = CheckFilter(data); + data = FilteredAdd(data); + data = CalculateIndexes(data); + data = FilteredNone(data); + data = FilteredInsert(data); + data = FilteredMove(data); + data = FilteredRemove(data); + return data; + }) + .Do(_ => + { + if (ManualProcessing) + { + if (signalOriginalSourceCompletion) + { + signalOriginalSourceCompletion = false; + originalSourceCompleted.OnNext(Unit.Default); + } + } + else + signalNeedData.OnNext(Unit.Default); + }) + .Where(data => data.Item != null) .Select(data => data.Item) .Publish() .RefCount(); @@ -186,6 +248,27 @@ public Func, bool> Filter } } + /// + /// Set a comparer that determines whether the item being processed is newer than the same + /// item seen before. This is to prevent stale items from overriding newer items when data + /// is coming simultaneously from cache and from live data. Use a timestamp-like comparison + /// for best results + /// + /// The comparer method for sorting, or null if not sorting + public Func NewerComparer + { + get + { + return newer; + } + set + { + if (disposed) + throw new ObjectDisposedException("TrackingCollection"); + newer = value; + } + } + public IDisposable Subscribe() { if (source == null) @@ -193,6 +276,7 @@ public IDisposable Subscribe() if (disposed) throw new ObjectDisposedException("TrackingCollection"); disposables.Add(source.Subscribe()); + StartQueue(); return this; } @@ -203,6 +287,7 @@ public IDisposable Subscribe(Action onNext, Action onCompleted) if (disposed) throw new ObjectDisposedException("TrackingCollection"); disposables.Add(source.Subscribe(onNext, onCompleted)); + StartQueue(); return this; } @@ -210,14 +295,28 @@ public void AddItem(T item) { if (disposed) throw new ObjectDisposedException("TrackingCollection"); - queue.Enqueue(new ActionData(item)); + + if (ManualProcessing) + dataListener.OnNext(new ActionData(item)); + else + { + cache.Enqueue(new ActionData(item)); + signalHaveData.OnNext(Unit.Default); + } } public void RemoveItem(T item) { if (disposed) throw new ObjectDisposedException("TrackingCollection"); - queue.Enqueue(new ActionData(TheAction.Remove, item)); + + if (ManualProcessing) + dataListener.OnNext(new ActionData(TheAction.Remove, item)); + else + { + cache.Enqueue(new ActionData(TheAction.Remove, item)); + signalHaveData.OnNext(Unit.Default); + } } void SetAndRecalculateSort(Func theComparer) @@ -240,19 +339,11 @@ void SetAndRecalculateFilter(Func, bool> newFilter) #region Source pipeline processing - ActionData CheckFilter(ActionData data) - { - var isIncluded = true; - if (data.TheAction == TheAction.Remove) - isIncluded = false; - else if (filter != null) - isIncluded = filter(data.Item, data.Position, this); - return new ActionData(data, isIncluded); - } - int StartQueue() { - disposables.Add(sourceQueue.Subscribe()); + disposables.Add(cachePump.Connect()); + disposables.Add(dataPump.Connect()); + signalNeedData.OnNext(Unit.Default); return 0; } @@ -261,7 +352,7 @@ ActionData GetFromQueue() try { ActionData d = ActionData.Default; - if (queue?.TryDequeue(out d) ?? false) + if (cache?.TryDequeue(out d) ?? false) return d; } catch { } @@ -281,6 +372,13 @@ ActionData ProcessItem(ActionData data, List list) if (idx >= 0) { var old = list[idx]; + if (newer != null) + { + // the object is "older" than the one we have, ignore it + if (newer(item, old) > 0) + return new ActionData(TheAction.Ignore, list, item, null, idx, idx); + } + var comparison = comparer(item, old); // no sorting to be done, just replacing the element in-place @@ -359,10 +457,20 @@ ActionData SortedRemove(ActionData data) // unfiltered list update sortedIndexCache.Remove(data.Item); UpdateIndexCache(data.List.Count - 1, data.OldPosition, data.List, sortedIndexCache); - original.Remove(data.Item); + data.List.Remove(data.Item); return data; } + ActionData CheckFilter(ActionData data) + { + var isIncluded = true; + if (data.TheAction == TheAction.Remove) + isIncluded = false; + else if (filter != null) + isIncluded = filter(data.Item, data.Position, this); + return new ActionData(data, isIncluded); + } + ActionData FilteredAdd(ActionData data) { if (data.TheAction != TheAction.Add) @@ -510,17 +618,20 @@ ActionData FilteredRemove(ActionData data) /// so that the average time between an item being processed /// is +- the requested processing delay. /// - ActionData UpdateProcessingDelay(TimeInterval data) + TimeSpan CalculateProcessingDelay(TimeInterval interval) { - if (requestedDelay == TimeSpan.Zero) - return data.Value; - var time = data.Interval; - if (time > requestedDelay + fuzziness) - delay -= time - requestedDelay; - else if (time < requestedDelay + fuzziness) - delay += requestedDelay - time; - delay = delay < TimeSpan.Zero ? TimeSpan.Zero : delay; - return data.Value; + var delay = TimeSpan.Zero; + if (requestedDelay > TimeSpan.Zero) + { + var time = interval.Interval; + if (time > requestedDelay + fuzziness) + delay -= time - requestedDelay; + else if (time < requestedDelay + fuzziness) + delay += requestedDelay - time; + delay = delay < TimeSpan.Zero ? TimeSpan.Zero : delay; + } + + return delay; } #endregion @@ -693,7 +804,7 @@ void InternalInsertItem(T item, int position) protected override void InsertItem(int index, T item) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.InsertItem(index, item); @@ -729,7 +840,7 @@ void InternalRemoveItem(T item) protected override void RemoveItem(int index) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.RemoveItem(index); @@ -757,7 +868,7 @@ void InternalMoveItem(int positionFrom, int positionTo) protected override void MoveItem(int oldIndex, int newIndex) { -#if DEBUG +#if DEBUG && !DISABLE_REACTIVEUI if (Splat.ModeDetector.InDesignMode() && !isChanging) { base.MoveItem(oldIndex, newIndex); @@ -808,7 +919,8 @@ int GetIndexUnfiltered(T item) { ret = original.IndexOf(item); if (ret >= 0) - sortedIndexCache.Add(item, ret); + sortedIndexCache.Add(original[ret], ret); + } return ret; } @@ -879,6 +991,27 @@ static IScheduler GetScheduler(IScheduler scheduler) } #endif + void Reset() + { + if (resetting) + return; + + resetting = true; + + disposables.Clear(); + originalSourceIsCompleted = false; + signalOriginalSourceCompletion = false; + cache = new ConcurrentQueue(); + dataListener = new Subject(); + disposables.Add(dataListener); + signalHaveData = new Subject(); + disposables.Add(signalHaveData); + signalNeedData = new Subject(); + disposables.Add(signalNeedData); + + resetting = false; + } + bool disposed = false; void Dispose(bool disposing) { @@ -887,8 +1020,8 @@ void Dispose(bool disposing) if (!disposed) { disposed = true; - queue = null; disposables.Dispose(); + cache = null; } } } diff --git a/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj b/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj index 3f7d374e3b..83038d049c 100644 --- a/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj +++ b/src/GitHub.Exports.Reactive/GitHub.Exports.Reactive.csproj @@ -89,6 +89,7 @@ + diff --git a/src/GitHub.Exports.Reactive/GlobalSuppressions.cs b/src/GitHub.Exports.Reactive/GlobalSuppressions.cs new file mode 100644 index 0000000000..2538500fd8 Binary files /dev/null and b/src/GitHub.Exports.Reactive/GlobalSuppressions.cs differ diff --git a/src/TrackingCollectionTests/TrackingCollectionTests.cs b/src/TrackingCollectionTests/TrackingCollectionTests.cs index 8497813f83..b380e5b830 100644 --- a/src/TrackingCollectionTests/TrackingCollectionTests.cs +++ b/src/TrackingCollectionTests/TrackingCollectionTests.cs @@ -10,23 +10,27 @@ using System.Reactive.Subjects; using System.Threading; using NUnit.Framework; +using System.Reactive; [TestFixture] public class TrackingTests : TestBase { +#if !DISABLE_REACTIVE_UI [TestFixtureSetUp] public void Setup() { Splat.ModeDetector.Current.SetInUnitTestRunner(true); } +#endif [Test] public void OrderByUpdatedNoFilter() { var count = 6; - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); @@ -69,10 +73,11 @@ public void OrderByUpdatedNoFilter() public void OrderByUpdatedFilter() { var count = 3; - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => true); + (item, position, list) => true, + OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.ProcessingDelay = TimeSpan.Zero; var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); @@ -118,10 +123,11 @@ public void OnlyIndexes2To4() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => position >= 2 && position <= 4); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -142,7 +148,7 @@ public void OnlyIndexes2To4() Assert.AreEqual(3, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1.Reverse(), col.DebugInternalList); + CollectionAssert.AreEqual(list1.Reverse(), (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, new List() { list1[3], list1[2], list1[1] }); @@ -157,10 +163,11 @@ public void OnlyTimesEqualOrHigherThan3Minutes() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, (item, position, list) => item.UpdatedAt >= Now + TimeSpan.FromMinutes(3) && item.UpdatedAt <= Now + TimeSpan.FromMinutes(5)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -181,7 +188,7 @@ public void OnlyTimesEqualOrHigherThan3Minutes() Assert.AreEqual(3, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1.Reverse(), col.DebugInternalList); + CollectionAssert.AreEqual(list1.Reverse(), (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, new List() { list1[2], list1[1], list1[0] }); col.Dispose(); @@ -195,9 +202,10 @@ public void OrderByDescendingNoFilter() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -217,7 +225,7 @@ public void OrderByDescendingNoFilter() Assert.AreEqual(6, col.Count); #if DEBUG - CollectionAssert.AreEqual(list1, col.DebugInternalList); + CollectionAssert.AreEqual(list1, (col as TrackingCollection).DebugInternalList); #endif CollectionAssert.AreEqual(col, list1); @@ -243,9 +251,10 @@ public void OrderByDescendingNoFilter1000() var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -285,74 +294,24 @@ public void OrderByDescendingNoFilter1000() } - [Test, Category("Timings")] - public void ProcessingDelayPingsRegularly() - { - int count, total; - count = total = 400; - - var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i)).ToList()); - - var col = new TrackingCollection( - list1.ToObservable().Delay(TimeSpan.FromMilliseconds(10)), - OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); - col.ProcessingDelay = TimeSpan.FromMilliseconds(10); - - var sub = new Subject(); - var times = new List(); - sub.Subscribe(t => - { - times.Add(DateTimeOffset.UtcNow); - }); - - count = 0; - - var evt = new ManualResetEvent(false); - col.Subscribe(t => - { - sub.OnNext(t); - if (++count == list1.Count) - { - sub.OnCompleted(); - evt.Set(); - } - }, () => { }); - - - evt.WaitOne(); - evt.Reset(); - - Assert.AreEqual(total, col.Count); - - CollectionAssert.AreEqual(col, list1); - - long totalTime = 0; - - for (var j = 1; j < times.Count; j++) - totalTime += (times[j] - times[j - 1]).Ticks; - var avg = TimeSpan.FromTicks(totalTime / times.Count).TotalMilliseconds; - Assert.GreaterOrEqual(avg, 9); - Assert.LessOrEqual(avg, 12); - col.Dispose(); - } [Test] public void NotInitializedCorrectlyThrows1() { - var col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); Assert.Throws(() => col.Subscribe()); } [Test] public void NotInitializedCorrectlyThrows2() { - var col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); Assert.Throws(() => col.Subscribe(_ => { }, () => { })); } [Test] public void NoChangingAfterDisposed1() { - var col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.Dispose(); Assert.Throws(() => col.AddItem(new Thing())); } @@ -360,7 +319,7 @@ public void NoChangingAfterDisposed1() [Test] public void NoChangingAfterDisposed2() { - var col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); + ITrackingCollection col = new TrackingCollection(Observable.Never(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.Dispose(); Assert.Throws(() => col.RemoveItem(new Thing())); } @@ -374,10 +333,11 @@ public void FilterTitleRun2() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); @@ -418,14 +378,16 @@ public void OrderByDoesntMatchOriginalOrderTimings() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; var evt = new ManualResetEvent(false); var start = DateTimeOffset.UtcNow; + col.Subscribe(t => { if (++count == list1.Count) @@ -468,10 +430,11 @@ public void OrderByMatchesOriginalOrder() var list1 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, total - i, "Run 1")).ToList()); var list2 = new List(Enumerable.Range(1, total).Select(i => GetThing(i, i, total - i, "Run 2")).ToList()); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.ToObservable(), OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.Title.Equals("Run 2")); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; col.ProcessingDelay = TimeSpan.Zero; count = 0; @@ -509,7 +472,7 @@ public void SortingTest() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare); col.ProcessingDelay = TimeSpan.Zero; @@ -703,7 +666,7 @@ public void SortingTestWithFilterTrue() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => true); @@ -891,7 +854,7 @@ public void SortingTestWithFilterBetween6And12() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => item.UpdatedAt.Minute >= 6 && item.UpdatedAt.Minute <= 12); @@ -1045,7 +1008,7 @@ public void SortingTestWithFilterPosition2to4() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position >= 2 && position <= 4); @@ -1192,7 +1155,7 @@ public void SortingTestWithFilterPosition1And3to4() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare, (item, position, list) => position == 1 || (position >= 3 && position <= 4)); @@ -1355,11 +1318,11 @@ public void SortingTestWithFilterMoves() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => (position >= 1 && position <= 2) || (position >= 5 && position <= 7)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => (position >= 1 && position <= 2) || (position >= 5 && position <= 7)); + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1424,11 +1387,12 @@ public void ChangingItemContentRemovesItFromFilteredList() var source = new Subject(); var now = new DateTimeOffset(0, TimeSpan.FromTicks(0)); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare, - (item, position, list) => item.UpdatedAt < now + TimeSpan.FromMinutes(6)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt < now + TimeSpan.FromMinutes(6)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1471,11 +1435,11 @@ public void ChangingItemContentRemovesItFromFilteredList2() var source = new Subject(); var now = new DateTimeOffset(0, TimeSpan.FromTicks(0)); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare, - (item, position, list) => item.UpdatedAt > now + TimeSpan.FromMinutes(2) && item.UpdatedAt < now + TimeSpan.FromMinutes(8)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt > now + TimeSpan.FromMinutes(2) && item.UpdatedAt < now + TimeSpan.FromMinutes(8)); + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1554,12 +1518,12 @@ public void ChangingItemContentRemovesItFromFilteredList2() public void ChangingFilterUpdatesCollection() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)) - { ProcessingDelay = TimeSpan.Zero }; - + (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; @@ -1613,11 +1577,12 @@ public void ChangingFilterUpdatesCollection() public void ChangingSortUpdatesCollection() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => item.UpdatedAt < Now + TimeSpan.FromMinutes(10)); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var evt = new ManualResetEvent(false); @@ -1655,7 +1620,7 @@ public void ChangingSortUpdatesCollection() [Test] public void AddingItemsToCollectionManuallyThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); Assert.Throws(() => col.Add(GetThing(1))); col.Dispose(); } @@ -1663,7 +1628,7 @@ public void AddingItemsToCollectionManuallyThrows() [Test] public void InsertingItemsIntoCollectionManuallyThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); Assert.Throws(() => col.Insert(0, GetThing(1))); col.Dispose(); } @@ -1672,7 +1637,7 @@ public void InsertingItemsIntoCollectionManuallyThrows() public void MovingItemsIntoCollectionManuallyThrows() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1687,7 +1652,7 @@ public void MovingItemsIntoCollectionManuallyThrows() Add(source, GetThing(2, 2)); evt.WaitOne(); evt.Reset(); - Assert.Throws(() => col.Move(0, 1)); + Assert.Throws(() => (col as TrackingCollection).Move(0, 1)); col.Dispose(); } @@ -1695,7 +1660,7 @@ public void MovingItemsIntoCollectionManuallyThrows() public void RemovingItemsFromCollectionManuallyThrows() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1718,7 +1683,7 @@ public void RemovingItemsFromCollectionManuallyThrows() public void RemovingItemsFromCollectionManuallyThrows2() { var source = new Subject(); - var col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source) { ProcessingDelay = TimeSpan.Zero }; var count = 0; var expectedCount = 2; var evt = new ManualResetEvent(false); @@ -1743,7 +1708,9 @@ public void ChangingComparers() { var source = new Subject(); - var col = new TrackingCollection(source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare) { ProcessingDelay = TimeSpan.Zero }; + ITrackingCollection col = new TrackingCollection(source, OrderedComparer.OrderBy(x => x.CreatedAt).Compare); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var evt = new ManualResetEvent(false); @@ -1782,16 +1749,15 @@ public void Removing() { var source = new Subject(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( source, OrderedComparer.OrderBy(x => x.UpdatedAt).Compare, - (item, position, list) => (position > 2 && position < 5) || (position > 6 && position < 8)) - { ProcessingDelay = TimeSpan.Zero }; + (item, position, list) => (position > 2 && position < 5) || (position > 6 && position < 8)); + col.ProcessingDelay = TimeSpan.Zero; var count = 0; var expectedCount = 0; var evt = new ManualResetEvent(false); - col.Subscribe(t => { if (++count == expectedCount) @@ -1863,7 +1829,7 @@ public void Removing() [Test] public void DisposingThrows() { - var col = new TrackingCollection(Observable.Empty()); + ITrackingCollection col = new TrackingCollection(Observable.Empty()); col.Dispose(); Assert.Throws(() => col.Filter = null); Assert.Throws(() => col.Comparer = null); @@ -1887,16 +1853,20 @@ public void MultipleSortingAndFiltering() var titles2 = Enumerable.Range(1, expectedTotal).Select(x => ((char)('c' + x)).ToString()).ToList(); var dates2 = Enumerable.Range(1, expectedTotal).Select(x => Now + TimeSpan.FromMinutes(x)).ToList(); - - var idstack2 = new Stack(Enumerable.Range(1, expectedTotal).OrderBy(rnd.Next)); - var datestack2 = new Stack(new List() { + var dates2mixed = new List() { dates2[2], dates2[0], dates2[1], dates2[3], dates2[5], dates2[9], dates2[15], dates2[6], dates2[7], dates2[8], dates2[13], dates2[10], dates2[16], dates2[11], dates2[12], dates2[14], dates2[17], dates2[18], dates2[19], dates2[4], - }); + }; + var idstack2 = new Stack(Enumerable.Range(1, expectedTotal).OrderBy(rnd.Next)); + var datestack2 = new Stack(dates2mixed); var titlestack2 = new Stack(titles2.OrderBy(_ => rnd.Next())); + var datestack3 = new Stack(); + for (int i = 0; i < datestack1.Count; i++) + datestack3.Push(new DateTimeOffset(Math.Max(dates1[i].Ticks, dates2mixed[i].Ticks), TimeSpan.Zero)); + var list1 = Observable.Defer(() => Enumerable.Range(1, expectedTotal) .OrderBy(rnd.Next) .Select(x => new Thing(idstack1.Pop(), titlestack1.Pop(), datestack1.Pop())) @@ -1911,51 +1881,74 @@ public void MultipleSortingAndFiltering() .Replay() .RefCount(); - var col = new TrackingCollection( + ITrackingCollection col = new TrackingCollection( list1.Concat(list2), OrderedComparer.OrderByDescending(x => x.CreatedAt).Compare, (item, idx, list) => idx < 5 - ) - { ProcessingDelay = TimeSpan.Zero }; - - var count = 0; - var evt = new ManualResetEvent(false); - col.Subscribe(t => - { - if (++count == expectedTotal * 2) - evt.Set(); - }, () => { }); + ); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; + col.Subscribe(); - evt.WaitOne(); - evt.Reset(); + col.OriginalCompleted.Wait(); // it's initially sorted by date, so id list should not match CollectionAssert.AreNotEqual(list1.Select(x => x.Number).ToEnumerable(), list2.Select(x => x.Number).ToEnumerable()); - var sortlist = list1.ToEnumerable().ToArray(); + var sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderByDescending(x => x.CreatedAt).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.Number).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.Number).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.CreatedAt).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.CreatedAt).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderByDescending(x => x.Title).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderByDescending(x => x.Title).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Comparer = OrderedComparer.OrderBy(x => x.Title).Compare; - sortlist = list1.ToEnumerable().ToArray(); + sortlist = col.ToArray(); Array.Sort(sortlist, new LambdaComparer(OrderedComparer.OrderBy(x => x.Title).Compare)); CollectionAssert.AreEqual(sortlist.Take(5), col); col.Dispose(); } + + [Test] + public void ListeningTwiceWorks() + { + var count = 10; + ITrackingCollection col = new TrackingCollection(); + col.NewerComparer = OrderedComparer.OrderByDescending(x => x.UpdatedAt).Compare; + col.ProcessingDelay = TimeSpan.Zero; + + var list1 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, count - i, "Run 1")).ToList()); + var list2 = new List(Enumerable.Range(1, count).Select(i => GetThing(i, i, i + count, "Run 2")).ToList()); + + var subj = new ReplaySubject(); + subj.OnNext(Unit.Default); + var disp = col.OriginalCompleted.Subscribe(x => subj.OnCompleted()); + col.Listen(list1.ToObservable()); + col.Subscribe(); + subj.Wait(); + + disp.Dispose(); + col.Listen(list2.ToObservable()); + subj = new ReplaySubject(); + subj.OnNext(Unit.Default); + disp = col.OriginalCompleted.Subscribe(x => subj.OnCompleted()); + col.Subscribe(); + subj.Wait(); + disp.Dispose(); + + CollectionAssert.AreEqual(list2, col); + } } diff --git a/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs b/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs index 77e69d9846..a7a966ca51 100644 --- a/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs +++ b/src/UnitTests/GitHub.App/Models/ModelServiceTests.cs @@ -13,6 +13,7 @@ using Octokit; using Xunit; using System.Globalization; +using System.Reactive.Subjects; using System.Threading; using GitHub.Models; using GitHub.Primitives; @@ -441,20 +442,25 @@ public async Task NonExpiredIndexReturnsCache() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { if (++count == expected) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Collection(col, col.Select(x => new Action(t => Assert.True(x.Title.StartsWith("Cache")))).ToArray()); } @@ -503,20 +509,25 @@ public async Task ExpiredIndexReturnsLive() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { if (++count == expected * 2) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Collection(col, col.Select(x => new Action(t => Assert.True(x.Title.StartsWith("Live")))).ToArray()); } @@ -565,23 +576,28 @@ public async Task ExpiredIndexClearsItems() apiClient.GetPullRequestsForRepository(user.Login, repo.Name).Returns(prlive); await modelService.InsertUser(new AccountCacheItem(user)); - var col = modelService.GetPullRequests(repo); + + ITrackingCollection col = new TrackingCollection(); + col = modelService.GetPullRequests(repo, col); col.ProcessingDelay = TimeSpan.Zero; var count = 0; - var evt = new ManualResetEvent(false); + var done = new Subject(); + done.Subscribe(); + col.Subscribe(t => { // we get all the items from the cache (items 1-5), all the items from the live (items 5-9), // and 4 deletions (items 1-4) because the cache expired the items that were not // a part of the live data if (++count == 14) - evt.Set(); + { + done.OnNext(Unit.Default); + done.OnCompleted(); + } }, () => { }); - - evt.WaitOne(); - evt.Reset(); + await done; Assert.Equal(5, col.Count); Assert.Collection(col, diff --git a/submodules/akavache b/submodules/akavache index ba21a0f71f..056ec2af17 160000 --- a/submodules/akavache +++ b/submodules/akavache @@ -1 +1 @@ -Subproject commit ba21a0f71f4b3b6c5261be8b5eb5ce2c0cc3219c +Subproject commit 056ec2af1755be7672fda176ad6c697ee22e579b