@@ -106,32 +106,32 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
106106 }
107107 return f .blockLogs (ctx , header )
108108 }
109- // Short-cut if all we care about is pending logs
110- if f .begin == rpc .PendingBlockNumber .Int64 () {
111- if f .end != rpc .PendingBlockNumber .Int64 () {
112- return nil , errors .New ("invalid block range" )
113- }
114- return f .pendingLogs ()
115- }
116- // Figure out the limits of the filter range
117- header , _ := f .sys .backend .HeaderByNumber (ctx , rpc .LatestBlockNumber )
118- if header == nil {
119- return nil , nil
120- }
109+
121110 var (
122- err error
123- head = header .Number .Int64 ()
124- pending = f .end == rpc .PendingBlockNumber .Int64 ()
111+ beginPending = f .begin == rpc .PendingBlockNumber .Int64 ()
112+ endPending = f .end == rpc .PendingBlockNumber .Int64 ()
125113 )
114+
115+ // special case for pending logs
116+ if beginPending && ! endPending {
117+ return nil , errors .New ("invalid block range" )
118+ }
119+
120+ // Short-cut if all we care about is pending logs
121+ if beginPending && endPending {
122+ return f .pendingLogs (), nil
123+ }
124+
126125 resolveSpecial := func (number int64 ) (int64 , error ) {
127126 var hdr * types.Header
128127 switch number {
129- case rpc .LatestBlockNumber .Int64 ():
130- return head , nil
131- case rpc .PendingBlockNumber .Int64 ():
128+ case rpc .LatestBlockNumber .Int64 (), rpc .PendingBlockNumber .Int64 ():
132129 // we should return head here since we've already captured
133130 // that we need to get the pending logs in the pending boolean above
134- return head , nil
131+ hdr , _ = f .sys .backend .HeaderByNumber (ctx , rpc .LatestBlockNumber )
132+ if hdr == nil {
133+ return 0 , errors .New ("latest header not found" )
134+ }
135135 case rpc .FinalizedBlockNumber .Int64 ():
136136 hdr , _ = f .sys .backend .HeaderByNumber (ctx , rpc .FinalizedBlockNumber )
137137 if hdr == nil {
@@ -147,57 +147,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
147147 }
148148 return hdr .Number .Int64 (), nil
149149 }
150+
151+ var err error
152+ // range query need to resolve the special begin/end block number
150153 if f .begin , err = resolveSpecial (f .begin ); err != nil {
151154 return nil , err
152155 }
153156 if f .end , err = resolveSpecial (f .end ); err != nil {
154157 return nil , err
155158 }
156- // Gather all indexed logs, and finish with non indexed ones
159+
160+ logChan , errChan := f .rangeLogsAsync (ctx )
161+ var logs []* types.Log
162+ for {
163+ select {
164+ case log := <- logChan :
165+ logs = append (logs , log )
166+ case err := <- errChan :
167+ if err != nil {
168+ // if an error occurs during extraction, we do return the extracted data
169+ return logs , err
170+ }
171+ // Append the pending ones
172+ if endPending {
173+ pendingLogs := f .pendingLogs ()
174+ logs = append (logs , pendingLogs ... )
175+ }
176+ return logs , nil
177+ }
178+ }
179+ }
180+
181+ // rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
182+ // it creates and returns two channels: one for delivering log data, and one for reporting errors.
183+ func (f * Filter ) rangeLogsAsync (ctx context.Context ) (chan * types.Log , chan error ) {
157184 var (
158- logs []* types.Log
159- end = uint64 (f .end )
160- size , sections = f .sys .backend .BloomStatus ()
185+ logChan = make (chan * types.Log )
186+ errChan = make (chan error )
161187 )
162- if indexed := sections * size ; indexed > uint64 (f .begin ) {
163- if indexed > end {
164- logs , err = f .indexedLogs (ctx , end )
165- } else {
166- logs , err = f .indexedLogs (ctx , indexed - 1 )
167- }
168- if err != nil {
169- return logs , err
188+
189+ go func () {
190+ defer func () {
191+ close (errChan )
192+ close (logChan )
193+ }()
194+
195+ // Gather all indexed logs, and finish with non indexed ones
196+ var (
197+ end = uint64 (f .end )
198+ size , sections = f .sys .backend .BloomStatus ()
199+ err error
200+ )
201+ if indexed := sections * size ; indexed > uint64 (f .begin ) {
202+ if indexed > end {
203+ indexed = end + 1
204+ }
205+ if err = f .indexedLogs (ctx , indexed - 1 , logChan ); err != nil {
206+ errChan <- err
207+ return
208+ }
170209 }
171- }
172- rest , err := f .unindexedLogs (ctx , end )
173- logs = append (logs , rest ... )
174- if pending {
175- pendingLogs , err := f .pendingLogs ()
176- if err != nil {
177- return nil , err
210+
211+ if err := f .unindexedLogs (ctx , end , logChan ); err != nil {
212+ errChan <- err
213+ return
178214 }
179- logs = append (logs , pendingLogs ... )
180- }
181- return logs , err
215+
216+ errChan <- nil
217+ }()
218+
219+ return logChan , errChan
182220}
183221
184222// indexedLogs returns the logs matching the filter criteria based on the bloom
185223// bits indexed available locally or via the network.
186- func (f * Filter ) indexedLogs (ctx context.Context , end uint64 ) ([] * types.Log , error ) {
224+ func (f * Filter ) indexedLogs (ctx context.Context , end uint64 , logChan chan * types.Log ) error {
187225 // Create a matcher session and request servicing from the backend
188226 matches := make (chan uint64 , 64 )
189227
190228 session , err := f .matcher .Start (ctx , uint64 (f .begin ), end , matches )
191229 if err != nil {
192- return nil , err
230+ return err
193231 }
194232 defer session .Close ()
195233
196234 f .sys .backend .ServiceFilter (ctx , session )
197235
198- // Iterate over the matches until exhausted or context closed
199- var logs []* types.Log
200-
201236 for {
202237 select {
203238 case number , ok := <- matches :
@@ -207,47 +242,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
207242 if err == nil {
208243 f .begin = int64 (end ) + 1
209244 }
210- return logs , err
245+ return err
211246 }
212247 f .begin = int64 (number ) + 1
213248
214249 // Retrieve the suggested block and pull any truly matching logs
215250 header , err := f .sys .backend .HeaderByNumber (ctx , rpc .BlockNumber (number ))
216251 if header == nil || err != nil {
217- return logs , err
252+ return err
218253 }
219254 found , err := f .checkMatches (ctx , header )
220255 if err != nil {
221- return logs , err
256+ return err
257+ }
258+ for _ , log := range found {
259+ logChan <- log
222260 }
223- logs = append (logs , found ... )
224261
225262 case <- ctx .Done ():
226- return logs , ctx .Err ()
263+ return ctx .Err ()
227264 }
228265 }
229266}
230267
231268// unindexedLogs returns the logs matching the filter criteria based on raw block
232269// iteration and bloom matching.
233- func (f * Filter ) unindexedLogs (ctx context.Context , end uint64 ) ([]* types.Log , error ) {
234- var logs []* types.Log
235-
270+ func (f * Filter ) unindexedLogs (ctx context.Context , end uint64 , logChan chan * types.Log ) error {
236271 for ; f .begin <= int64 (end ); f .begin ++ {
237- if f .begin % 10 == 0 && ctx .Err () != nil {
238- return logs , ctx .Err ()
239- }
240272 header , err := f .sys .backend .HeaderByNumber (ctx , rpc .BlockNumber (f .begin ))
241273 if header == nil || err != nil {
242- return logs , err
274+ return err
243275 }
244276 found , err := f .blockLogs (ctx , header )
245277 if err != nil {
246- return logs , err
278+ return err
279+ }
280+ for _ , log := range found {
281+ select {
282+ case logChan <- log :
283+ case <- ctx .Done ():
284+ return ctx .Err ()
285+ }
247286 }
248- logs = append (logs , found ... )
249287 }
250- return logs , nil
288+ return nil
251289}
252290
253291// blockLogs returns the logs matching the filter criteria within a single block.
@@ -294,19 +332,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
294332}
295333
296334// pendingLogs returns the logs matching the filter criteria within the pending block.
297- func (f * Filter ) pendingLogs () ( []* types.Log , error ) {
335+ func (f * Filter ) pendingLogs () []* types.Log {
298336 block , receipts := f .sys .backend .PendingBlockAndReceipts ()
299337 if block == nil {
300- return nil , errors . New ( "pending state not available" )
338+ return nil
301339 }
302340 if bloomFilter (block .Bloom (), f .addresses , f .topics ) {
303341 var unfiltered []* types.Log
304342 for _ , r := range receipts {
305343 unfiltered = append (unfiltered , r .Logs ... )
306344 }
307- return filterLogs (unfiltered , nil , nil , f .addresses , f .topics ), nil
345+ return filterLogs (unfiltered , nil , nil , f .addresses , f .topics )
308346 }
309- return nil , nil
347+ return nil
310348}
311349
312350func includes (addresses []common.Address , a common.Address ) bool {
0 commit comments