1414using System . Reflection ;
1515using System . Text . Json ;
1616using System . Text . Json . Serialization ;
17+ using static CommunityToolkit . Datasync . Client . Offline . Operations . PullOperationManager ;
1718
1819namespace CommunityToolkit . Datasync . Client . Offline . Operations ;
1920
@@ -53,61 +54,87 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
5354
5455 QueueHandler < PullResponse > databaseUpdateQueue = new ( 1 , async pullResponse =>
5556 {
56- DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
57- foreach ( object item in pullResponse . Items )
57+ if ( pullResponse . Items . Any ( ) )
5858 {
59- EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
60- object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
61-
62- if ( originalEntity is null && ! metadata . Deleted )
63- {
64- _ = context . Add ( item ) ;
65- result . IncrementAdditions ( ) ;
66- }
67- else if ( originalEntity is not null && metadata . Deleted )
59+ DateTimeOffset lastSynchronization = await DeltaTokenStore . GetDeltaTokenAsync ( pullResponse . QueryId , cancellationToken ) . ConfigureAwait ( false ) ;
60+ foreach ( object item in pullResponse . Items )
6861 {
69- _ = context . Remove ( originalEntity ) ;
70- result . IncrementDeletions ( ) ;
71- }
72- else if ( originalEntity is not null && ! metadata . Deleted )
73- {
74- // Gather properties marked with [JsonIgnore]
75- HashSet < string > ignoredProps = pullResponse . EntityType
76- . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
77- . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
78- . Select ( p => p . Name )
79- . ToHashSet ( ) ;
80-
81- EntityEntry originalEntry = context . Entry ( originalEntity ) ;
82- EntityEntry newEntry = context . Entry ( item ) ;
83-
84- // Only copy properties that are not marked with [JsonIgnore]
85- foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
62+ EntityMetadata metadata = EntityResolver . GetEntityMetadata ( item , pullResponse . EntityType ) ;
63+ object ? originalEntity = await context . FindAsync ( pullResponse . EntityType , [ metadata . Id ] , cancellationToken ) . ConfigureAwait ( false ) ;
64+
65+ if ( originalEntity is null && ! metadata . Deleted )
66+ {
67+ _ = context . Add ( item ) ;
68+ result . IncrementAdditions ( ) ;
69+ }
70+ else if ( originalEntity is not null && metadata . Deleted )
71+ {
72+ _ = context . Remove ( originalEntity ) ;
73+ result . IncrementDeletions ( ) ;
74+ }
75+ else if ( originalEntity is not null && ! metadata . Deleted )
8676 {
87- if ( ! ignoredProps . Contains ( property . Name ) )
77+ // Gather properties marked with [JsonIgnore]
78+ HashSet < string > ignoredProps = pullResponse . EntityType
79+ . GetProperties ( BindingFlags . Public | BindingFlags . Instance )
80+ . Where ( p => p . IsDefined ( typeof ( JsonIgnoreAttribute ) , inherit : true ) )
81+ . Select ( p => p . Name )
82+ . ToHashSet ( ) ;
83+
84+ EntityEntry originalEntry = context . Entry ( originalEntity ) ;
85+ EntityEntry newEntry = context . Entry ( item ) ;
86+
87+ // Only copy properties that are not marked with [JsonIgnore]
88+ foreach ( IProperty property in originalEntry . Metadata . GetProperties ( ) )
8889 {
89- originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
90+ if ( ! ignoredProps . Contains ( property . Name ) )
91+ {
92+ originalEntry . Property ( property . Name ) . CurrentValue = newEntry . Property ( property . Name ) . CurrentValue ;
93+ }
9094 }
95+
96+ result . IncrementReplacements ( ) ;
9197 }
9298
93- result . IncrementReplacements ( ) ;
99+ if ( metadata . UpdatedAt > lastSynchronization )
100+ {
101+ lastSynchronization = metadata . UpdatedAt . Value ;
102+ bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
103+ if ( isAdded )
104+ {
105+ // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
106+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
107+ }
108+ }
94109 }
95110
96- if ( metadata . UpdatedAt > lastSynchronization )
111+ if ( pullOptions . SaveAfterEveryServiceRequest )
97112 {
98- lastSynchronization = metadata . UpdatedAt . Value ;
99- bool isAdded = await DeltaTokenStore . SetDeltaTokenAsync ( pullResponse . QueryId , metadata . UpdatedAt . Value , cancellationToken ) . ConfigureAwait ( false ) ;
100- if ( isAdded )
101- {
102- // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
103- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
104- }
113+ _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
105114 }
115+
116+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
117+ {
118+ EventType = SynchronizationEventType . ItemsCommitted ,
119+ EntityType = pullResponse . EntityType ,
120+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
121+ ItemsTotal = pullResponse . TotalRequestItems ,
122+ QueryId = pullResponse . QueryId
123+ } ) ;
106124 }
107125
108- if ( pullOptions . SaveAfterEveryServiceRequest )
126+ if ( pullResponse . Completed )
109127 {
110- _ = await context . SaveChangesAsync ( true , false , cancellationToken ) . ConfigureAwait ( false ) ;
128+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
129+ {
130+ EventType = SynchronizationEventType . PullEnded ,
131+ EntityType = pullResponse . EntityType ,
132+ ItemsProcessed = pullResponse . TotalItemsProcessed ,
133+ ItemsTotal = pullResponse . TotalRequestItems ,
134+ QueryId = pullResponse . QueryId ,
135+ Exception = pullResponse . Exception ,
136+ ServiceResponse = pullResponse . Exception is DatasyncPullException ex ? ex . ServiceResponse : null
137+ } ) ;
111138 }
112139 } ) ;
113140
@@ -116,14 +143,34 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
116143 Uri endpoint = ExecutableOperation . MakeAbsoluteUri ( pullRequest . HttpClient . BaseAddress , pullRequest . Endpoint ) ;
117144 Uri requestUri = new UriBuilder ( endpoint ) { Query = pullRequest . QueryDescription . ToODataQueryString ( ) } . Uri ;
118145 Type pageType = typeof ( Page < > ) . MakeGenericType ( pullRequest . EntityType ) ;
146+ long itemsProcessed = 0 ;
147+ long totalCount = 0 ;
119148
120149 try
121150 {
122151 bool completed = false ;
152+ // Signal we started the pull operation.
153+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
154+ {
155+ EventType = SynchronizationEventType . PullStarted ,
156+ EntityType = pullRequest . EntityType ,
157+ QueryId = pullRequest . QueryId
158+ } ) ;
123159 do
124160 {
125161 Page < object > page = await GetPageAsync ( pullRequest . HttpClient , requestUri , pageType , cancellationToken ) . ConfigureAwait ( false ) ;
126- databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , page . Items ) ) ;
162+ itemsProcessed += page . Items . Count ( ) ;
163+ totalCount = page . Count ?? totalCount ;
164+
165+ context . SendSynchronizationEvent ( new SynchronizationEventArgs ( )
166+ {
167+ EventType = SynchronizationEventType . ItemsFetched ,
168+ EntityType = pullRequest . EntityType ,
169+ ItemsProcessed = itemsProcessed ,
170+ ItemsTotal = page . Count ?? 0 ,
171+ QueryId = pullRequest . QueryId
172+ } ) ;
173+
127174 if ( ! string . IsNullOrEmpty ( page . NextLink ) )
128175 {
129176 requestUri = new UriBuilder ( endpoint ) { Query = page . NextLink } . Uri ;
@@ -132,12 +179,15 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
132179 {
133180 completed = true ;
134181 }
182+
183+ databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , page . Items , totalCount , itemsProcessed , completed ) ) ;
135184 }
136185 while ( ! completed ) ;
137186 }
138187 catch ( DatasyncPullException ex )
139188 {
140189 result . AddFailedRequest ( requestUri , ex . ServiceResponse ) ;
190+ databaseUpdateQueue . Enqueue ( new PullResponse ( pullRequest . EntityType , pullRequest . QueryId , [ ] , totalCount , itemsProcessed , true , ex ) ) ;
141191 }
142192 } ) ;
143193
@@ -173,6 +223,8 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
173223 /// <exception cref="DatasyncPullException">Thrown on error</exception>
174224 internal async Task < Page < object > > GetPageAsync ( HttpClient client , Uri requestUri , Type pageType , CancellationToken cancellationToken = default )
175225 {
226+ PropertyInfo countPropInfo = pageType . GetProperty ( "Count" )
227+ ?? throw new DatasyncException ( $ "Page type '{ pageType . Name } ' does not have a 'Count' property") ;
176228 PropertyInfo itemsPropInfo = pageType . GetProperty ( "Items" )
177229 ?? throw new DatasyncException ( $ "Page type '{ pageType . Name } ' does not have an 'Items' property") ;
178230 PropertyInfo nextLinkPropInfo = pageType . GetProperty ( "NextLink" )
@@ -193,6 +245,7 @@ internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri
193245
194246 return new Page < object > ( )
195247 {
248+ Count = ( long ? ) countPropInfo . GetValue ( result ) ,
196249 Items = ( IEnumerable < object > ) itemsPropInfo . GetValue ( result ) ! ,
197250 NextLink = ( string ? ) nextLinkPropInfo . GetValue ( result )
198251 } ;
@@ -237,6 +290,10 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source
237290 /// <param name="EntityType">The type of entity contained within the items.</param>
238291 /// <param name="QueryId">The query ID for the request.</param>
239292 /// <param name="Items">The list of items to process.</param>
293+ /// <param name="TotalRequestItems">The total number of items in the current pull request.</param>
294+ /// <param name="TotalItemsProcessed">The total number of items processed, <paramref name="Items"/> included.</param>
295+ /// <param name="Completed">If <c>true</c>, indicates that the pull request is completed.</param>
296+ /// <param name="Exception">Indicates an exception occured during fetching of data</param>
240297 [ ExcludeFromCodeCoverage ]
241- internal record PullResponse ( Type EntityType , string QueryId , IEnumerable < object > Items ) ;
298+ internal record PullResponse ( Type EntityType , string QueryId , IEnumerable < object > Items , long TotalRequestItems , long TotalItemsProcessed , bool Completed , Exception ? Exception = null ) ;
242299}
0 commit comments