@@ -138,36 +138,37 @@ static void _BGThread_RinfoFinish(RedisAI_RunInfo *rinfo) {
138
138
}
139
139
140
140
static bool _BGThread_IsRInfoTimedOut (RedisAI_RunInfo * rinfo ) {
141
- bool timedOut = false;
142
- if (rinfo -> timeout > 0 ) {
143
- timedOut = __atomic_load_n (rinfo -> timedOut , __ATOMIC_RELAXED );
144
141
145
- if (!timedOut ) {
146
- struct timeval now , sub ;
147
- gettimeofday (& now , NULL );
148
- timersub (& now , & rinfo -> queuingTime , & sub );
149
- size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
150
-
151
- if (time_msec > rinfo -> timeout ) {
152
- timedOut = true;
153
- __atomic_store_n (rinfo -> timedOut , timedOut , __ATOMIC_RELAXED );
154
- }
142
+ if (RedisAI_DagTimeout (rinfo )) {
143
+ return true;
144
+ }
145
+ if (rinfo -> timeout > 0 ) {
146
+ struct timeval now , sub ;
147
+ gettimeofday (& now , NULL );
148
+ timersub (& now , & rinfo -> queuingTime , & sub );
149
+ size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
150
+ if (time_msec > rinfo -> timeout ) {
151
+ RedisAI_DagSetTimeout (rinfo );
152
+ return true;
155
153
}
156
154
}
157
- return timedOut ;
155
+ return false ;
158
156
}
159
157
160
- static void _BGThread_ExecutionFinish (RunQueueInfo * run_queue_info , RedisAI_RunInfo * * batch_rinfo ) {
158
+ static int * _BGThread_ExecutionFinish (RedisAI_RunInfo * * batch_rinfo ) {
159
+ int * unfinished_rinfos_indices = array_new (int , array_len (batch_rinfo ));
161
160
for (int i = array_len (batch_rinfo ) - 1 ; i >= 0 ; i -- ) {
162
161
RedisAI_RunInfo * rinfo = batch_rinfo [i ];
163
162
rinfo -> dagDeviceCompleteOpCount += 1 ;
164
163
__atomic_add_fetch (rinfo -> dagCompleteOpCount , 1 , __ATOMIC_RELAXED );
165
- if (RedisAI_DagDeviceComplete (rinfo ) || RedisAI_DagError (rinfo )) {
164
+ if (RedisAI_DagDeviceComplete (rinfo ) || RedisAI_DagError (rinfo ) ||
165
+ RedisAI_DagTimeout (rinfo )) {
166
166
_BGThread_RinfoFinish (rinfo );
167
167
} else {
168
- queuePushFront ( run_queue_info -> run_queue , rinfo );
168
+ unfinished_rinfos_indices = array_append ( unfinished_rinfos_indices , i );
169
169
}
170
170
}
171
+ return unfinished_rinfos_indices ;
171
172
}
172
173
173
174
static void _BGThread_Execute (RunQueueInfo * run_queue_info , RedisAI_RunInfo * * batch_rinfo ) {
@@ -298,6 +299,34 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
298
299
return batch_rinfo ;
299
300
}
300
301
302
+ static bool _BGThread_PrepareExecution (RunQueueInfo * run_queue_info , RedisAI_RunInfo * rinfo ,
303
+ RedisAI_RunInfo * * * batch_rinfo ) {
304
+ // Get if the operation is ready and bacthable
305
+ bool currentOpReady , currentOpBatchable ;
306
+ RedisAI_DagCurrentOpInfo (rinfo , & currentOpReady , & currentOpBatchable );
307
+ if (currentOpReady ) {
308
+ * batch_rinfo = array_append (* batch_rinfo , rinfo );
309
+ } else {
310
+ // Op is not ready - push back to queue and continue the loop.
311
+ queuePush (run_queue_info -> run_queue , rinfo );
312
+ return false;
313
+ }
314
+
315
+ if (currentOpBatchable ) {
316
+ bool batchReady = true;
317
+ * batch_rinfo = _BGThread_BatchOperations (run_queue_info , rinfo , * batch_rinfo , & batchReady );
318
+ if (!batchReady ) {
319
+ // Batch is not ready - batch size didn't match the expectations from
320
+ // minbatchsize
321
+ for (int i = array_len (* batch_rinfo ) - 1 ; i >= 0 ; i -- ) {
322
+ queuePush (run_queue_info -> run_queue , (* batch_rinfo )[i ]);
323
+ }
324
+ return false;
325
+ }
326
+ }
327
+ return true;
328
+ }
329
+
301
330
void * RedisAI_Run_ThreadMain (void * arg ) {
302
331
RunQueueInfo * run_queue_info = (RunQueueInfo * )arg ;
303
332
RAI_PTHREAD_SETNAME ("redisai_bthread" );
@@ -316,52 +345,39 @@ void *RedisAI_Run_ThreadMain(void *arg) {
316
345
queueItem * item = queuePop (run_queue_info -> run_queue );
317
346
RedisAI_RunInfo * rinfo = (RedisAI_RunInfo * )item -> value ;
318
347
RedisModule_Free (item );
319
- // In case of timeout.
320
- if (_BGThread_IsRInfoTimedOut (rinfo )) {
321
- _BGThread_RinfoFinish (rinfo );
322
- continue ;
323
- }
324
-
325
- if (RedisAI_DagError (rinfo )) {
326
- _BGThread_RinfoFinish (rinfo );
327
- continue ;
328
- }
329
-
330
- // Get if the operation is ready and bacthable
331
- bool currentOpReady , currentOpBatchable ;
332
- RedisAI_DagCurrentOpInfo (rinfo , & currentOpReady , & currentOpBatchable );
333
- if (currentOpReady ) {
334
- batch_rinfo = array_append (batch_rinfo , rinfo );
335
- } else {
336
- // Op is not ready - push back to queue and continue the loop.
337
- queuePush (run_queue_info -> run_queue , rinfo );
338
- continue ;
339
- }
340
-
341
- if (currentOpBatchable ) {
342
- bool batchReady = true;
343
- batch_rinfo =
344
- _BGThread_BatchOperations (run_queue_info , rinfo , batch_rinfo , & batchReady );
345
- if (!batchReady ) {
346
- // Batch is not ready - batch size didn't match the expectations from
347
- // minbatchsize
348
- for (int i = array_len (batch_rinfo ) - 1 ; i >= 0 ; i -- ) {
349
- queuePush (run_queue_info -> run_queue , batch_rinfo [i ]);
350
- }
351
- // Exit the loop, give a chance to new tasks to submit.
352
- break ;
353
- }
348
+ // In case of timeout or error - skip execution.
349
+ bool skip_execution = _BGThread_IsRInfoTimedOut (rinfo ) || RedisAI_DagError (rinfo );
350
+ // Prepare to execution, if the op or the batch is not ready, exit
351
+ // the loop, give a chance to new tasks to submit.
352
+ if (!skip_execution &&
353
+ !_BGThread_PrepareExecution (run_queue_info , rinfo , & batch_rinfo )) {
354
+ break ;
354
355
}
355
356
// Run the computation step (batched or not)
356
357
// We're done with the queue here, items have been evicted so we can
357
358
// safely unlock the queue mutex, to allow other threads to operate
358
359
// on the same queue. The evicted items at this point are only visible
359
360
// to this worker.
360
361
pthread_mutex_unlock (& run_queue_info -> run_queue_mutex );
361
- _BGThread_Execute (run_queue_info , batch_rinfo );
362
- // Lock the queue again: we're done operating on evicted items only.
362
+ if (!skip_execution ) {
363
+ _BGThread_Execute (run_queue_info , batch_rinfo );
364
+ } else {
365
+ // If we are skipping the execution due to dag error or timeout,
366
+ // we consider the batch as contains this single dag when finish.
367
+ batch_rinfo = array_append (batch_rinfo , rinfo );
368
+ }
369
+ // For every DAG in the batch: if the entire DAG run is complete,
370
+ // call the on finish callback. Otherwise, save the DAG index in
371
+ // the batch_rinfo array, so we reinsert the DAG to the queue
372
+ // (after acquiring the queue lock).
373
+ int * unfinished_rinfo_indices = _BGThread_ExecutionFinish (batch_rinfo );
363
374
pthread_mutex_lock (& run_queue_info -> run_queue_mutex );
364
- _BGThread_ExecutionFinish (run_queue_info , batch_rinfo );
375
+
376
+ // Reinsert the unfinished DAG's run info to the queue.
377
+ for (size_t i = 0 ; i < array_len (unfinished_rinfo_indices ); i ++ ) {
378
+ queuePushFront (run_queue_info -> run_queue , batch_rinfo [unfinished_rinfo_indices [i ]]);
379
+ }
380
+ array_free (unfinished_rinfo_indices );
365
381
}
366
382
}
367
383
array_free (batch_rinfo );
0 commit comments