1515using System . Reactive . Linq ;
1616using System . Reactive . Subjects ;
1717using System . Threading ;
18- using System . Threading . Tasks ;
1918
2019namespace GitHub . Collections
2120{
@@ -30,15 +29,16 @@ namespace GitHub.Collections
3029 /// </summary>
3130 /// <typeparam name="T"></typeparam>
3231 public class TrackingCollection < T > : ObservableCollection < T > , ITrackingCollection < T > , IDisposable
33- where T : class , ICopyable < T > , IComparable < T >
32+ where T : class , ICopyable < T >
3433 {
3534 enum TheAction
3635 {
3736 None ,
3837 Move ,
3938 Add ,
4039 Insert ,
41- Remove
40+ Remove ,
41+ Ignore
4242 }
4343
4444 bool isChanging ;
@@ -83,10 +83,7 @@ enum TheAction
8383 public TimeSpan ProcessingDelay
8484 {
8585 get { return requestedDelay ; }
86- set
87- {
88- requestedDelay = value ;
89- }
86+ set { requestedDelay = value ; }
9087 }
9188
9289 bool ManualProcessing => cache . IsEmpty && originalSourceIsCompleted ;
@@ -105,7 +102,7 @@ public TrackingCollection(Func<T, T, int> comparer = null, Func<T, int, IList<T>
105102#endif
106103 this . comparer = comparer ?? Comparer < T > . Default . Compare ;
107104 this . filter = filter ;
108- this . newer = newer ?? Comparer < T > . Default . Compare ;
105+ this . newer = newer ;
109106 }
110107
111108 public TrackingCollection ( IObservable < T > source ,
@@ -132,8 +129,12 @@ public IObservable<T> Listen(IObservable<T> obs)
132129
133130 Reset ( ) ;
134131
132+ // ManualResetEvent uses the realtime clock for accurate <50ms delays
135133 var waitHandle = new ManualResetEventSlim ( ) ;
136134
135+ // empty the source observable as fast as possible
136+ // to the cache queue, and signal that data is available
137+ // for processing
137138 dataPump = obs
138139 . Do ( data =>
139140 {
@@ -147,7 +148,6 @@ public IObservable<T> Listen(IObservable<T> obs)
147148 } )
148149 . Publish ( ) ;
149150
150-
151151 // when both signalHaveData and signalNeedData produce a value, dataListener gets a value
152152 // this will empty the queue of items that have been cached in regular intervals according
153153 // to the requested delay
@@ -167,7 +167,8 @@ public IObservable<T> Listen(IObservable<T> obs)
167167 source = dataListener
168168 . Where ( data => data . Item != null )
169169 . ObserveOn ( scheduler )
170- . Select ( data => {
170+ . Select ( data =>
171+ {
171172 data = ProcessItem ( data , original ) ;
172173
173174 // if we're removing an item that doesn't exist, ignore it
@@ -197,7 +198,6 @@ public IObservable<T> Listen(IObservable<T> obs)
197198 signalOriginalSourceCompletion = false ;
198199 originalSourceCompleted . OnNext ( Unit . Default ) ;
199200 }
200-
201201 }
202202 else
203203 signalNeedData . OnNext ( Unit . Default ) ;
@@ -339,16 +339,6 @@ void SetAndRecalculateFilter(Func<T, int, IList<T>, bool> newFilter)
339339
340340 #region Source pipeline processing
341341
342- ActionData CheckFilter ( ActionData data )
343- {
344- var isIncluded = true ;
345- if ( data . TheAction == TheAction . Remove )
346- isIncluded = false ;
347- else if ( filter != null )
348- isIncluded = filter ( data . Item , data . Position , this ) ;
349- return new ActionData ( data , isIncluded ) ;
350- }
351-
352342 int StartQueue ( )
353343 {
354344 disposables . Add ( cachePump . Connect ( ) ) ;
@@ -382,11 +372,12 @@ ActionData ProcessItem(ActionData data, List<T> list)
382372 if ( idx >= 0 )
383373 {
384374 var old = list [ idx ] ;
385- var isNewer = newer ( item , old ) ;
386-
387- // the object is "older" than the one we have, ignore it
388- if ( isNewer > 0 )
389- ret = new ActionData ( TheAction . None , list , item , null , idx , idx ) ;
375+ if ( newer != null )
376+ {
377+ // the object is "older" than the one we have, ignore it
378+ if ( newer ( item , old ) > 0 )
379+ return new ActionData ( TheAction . Ignore , list , item , null , idx , idx ) ;
380+ }
390381
391382 var comparison = comparer ( item , old ) ;
392383
@@ -466,10 +457,20 @@ ActionData SortedRemove(ActionData data)
466457 // unfiltered list update
467458 sortedIndexCache . Remove ( data . Item ) ;
468459 UpdateIndexCache ( data . List . Count - 1 , data . OldPosition , data . List , sortedIndexCache ) ;
469- original . Remove ( data . Item ) ;
460+ data . List . Remove ( data . Item ) ;
470461 return data ;
471462 }
472463
464+ ActionData CheckFilter ( ActionData data )
465+ {
466+ var isIncluded = true ;
467+ if ( data . TheAction == TheAction . Remove )
468+ isIncluded = false ;
469+ else if ( filter != null )
470+ isIncluded = filter ( data . Item , data . Position , this ) ;
471+ return new ActionData ( data , isIncluded ) ;
472+ }
473+
473474 ActionData FilteredAdd ( ActionData data )
474475 {
475476 if ( data . TheAction != TheAction . Add )
0 commit comments