@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
5454/*
5555 * rpciod-related stuff
5656 */
57- struct workqueue_struct * rpciod_workqueue ;
57+ struct workqueue_struct * rpciod_workqueue __read_mostly ;
58+ struct workqueue_struct * xprtiod_workqueue __read_mostly ;
5859
5960/*
6061 * Disable the timer for a given RPC task. Should be called with
@@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
329330 * lockless RPC_IS_QUEUED() test) before we've had a chance to test
330331 * the RPC_TASK_RUNNING flag.
331332 */
332- static void rpc_make_runnable (struct rpc_task * task )
333+ static void rpc_make_runnable (struct workqueue_struct * wq ,
334+ struct rpc_task * task )
333335{
334336 bool need_wakeup = !rpc_test_and_set_running (task );
335337
@@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
338340 return ;
339341 if (RPC_IS_ASYNC (task )) {
340342 INIT_WORK (& task -> u .tk_work , rpc_async_schedule );
341- queue_work (rpciod_workqueue , & task -> u .tk_work );
343+ queue_work (wq , & task -> u .tk_work );
342344 } else
343345 wake_up_bit (& task -> tk_runstate , RPC_TASK_QUEUED );
344346}
@@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
407409EXPORT_SYMBOL_GPL (rpc_sleep_on_priority );
408410
409411/**
410- * __rpc_do_wake_up_task - wake up a single rpc_task
412+ * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
413+ * @wq: workqueue on which to run task
411414 * @queue: wait queue
412415 * @task: task to be woken up
413416 *
414417 * Caller must hold queue->lock, and have cleared the task queued flag.
415418 */
416- static void __rpc_do_wake_up_task (struct rpc_wait_queue * queue , struct rpc_task * task )
419+ static void __rpc_do_wake_up_task_on_wq (struct workqueue_struct * wq ,
420+ struct rpc_wait_queue * queue ,
421+ struct rpc_task * task )
417422{
418423 dprintk ("RPC: %5u __rpc_wake_up_task (now %lu)\n" ,
419424 task -> tk_pid , jiffies );
@@ -428,23 +433,32 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
428433
429434 __rpc_remove_wait_queue (queue , task );
430435
431- rpc_make_runnable (task );
436+ rpc_make_runnable (wq , task );
432437
433438 dprintk ("RPC: __rpc_wake_up_task done\n" );
434439}
435440
436441/*
437442 * Wake up a queued task while the queue lock is being held
438443 */
439- static void rpc_wake_up_task_queue_locked (struct rpc_wait_queue * queue , struct rpc_task * task )
444+ static void rpc_wake_up_task_on_wq_queue_locked (struct workqueue_struct * wq ,
445+ struct rpc_wait_queue * queue , struct rpc_task * task )
440446{
441447 if (RPC_IS_QUEUED (task )) {
442448 smp_rmb ();
443449 if (task -> tk_waitqueue == queue )
444- __rpc_do_wake_up_task ( queue , task );
450+ __rpc_do_wake_up_task_on_wq ( wq , queue , task );
445451 }
446452}
447453
454+ /*
455+ * Wake up a queued task while the queue lock is being held
456+ */
457+ static void rpc_wake_up_task_queue_locked (struct rpc_wait_queue * queue , struct rpc_task * task )
458+ {
459+ rpc_wake_up_task_on_wq_queue_locked (rpciod_workqueue , queue , task );
460+ }
461+
448462/*
449463 * Wake up a task on a specific queue
450464 */
@@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
518532/*
519533 * Wake up the first task on the wait queue.
520534 */
521- struct rpc_task * rpc_wake_up_first (struct rpc_wait_queue * queue ,
535+ struct rpc_task * rpc_wake_up_first_on_wq (struct workqueue_struct * wq ,
536+ struct rpc_wait_queue * queue ,
522537 bool (* func )(struct rpc_task * , void * ), void * data )
523538{
524539 struct rpc_task * task = NULL ;
@@ -529,14 +544,23 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
529544 task = __rpc_find_next_queued (queue );
530545 if (task != NULL ) {
531546 if (func (task , data ))
532- rpc_wake_up_task_queue_locked ( queue , task );
547+ rpc_wake_up_task_on_wq_queue_locked ( wq , queue , task );
533548 else
534549 task = NULL ;
535550 }
536551 spin_unlock_bh (& queue -> lock );
537552
538553 return task ;
539554}
555+
556+ /*
557+ * Wake up the first task on the wait queue.
558+ */
559+ struct rpc_task * rpc_wake_up_first (struct rpc_wait_queue * queue ,
560+ bool (* func )(struct rpc_task * , void * ), void * data )
561+ {
562+ return rpc_wake_up_first_on_wq (rpciod_workqueue , queue , func , data );
563+ }
540564EXPORT_SYMBOL_GPL (rpc_wake_up_first );
541565
542566static bool rpc_wake_up_next_func (struct rpc_task * task , void * data )
@@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
814838 bool is_async = RPC_IS_ASYNC (task );
815839
816840 rpc_set_active (task );
817- rpc_make_runnable (task );
841+ rpc_make_runnable (rpciod_workqueue , task );
818842 if (!is_async )
819843 __rpc_execute (task );
820844}
@@ -1071,10 +1095,22 @@ static int rpciod_start(void)
10711095 * Create the rpciod thread and wait for it to start.
10721096 */
10731097 dprintk ("RPC: creating workqueue rpciod\n" );
1074- /* Note: highpri because network receive is latency sensitive */
1075- wq = alloc_workqueue ("rpciod" , WQ_MEM_RECLAIM | WQ_HIGHPRI , 0 );
1098+ wq = alloc_workqueue ("rpciod" , WQ_MEM_RECLAIM , 0 );
1099+ if (!wq )
1100+ goto out_failed ;
10761101 rpciod_workqueue = wq ;
1077- return rpciod_workqueue != NULL ;
1102+ /* Note: highpri because network receive is latency sensitive */
1103+ wq = alloc_workqueue ("xprtiod" , WQ_MEM_RECLAIM | WQ_HIGHPRI , 0 );
1104+ if (!wq )
1105+ goto free_rpciod ;
1106+ xprtiod_workqueue = wq ;
1107+ return 1 ;
1108+ free_rpciod :
1109+ wq = rpciod_workqueue ;
1110+ rpciod_workqueue = NULL ;
1111+ destroy_workqueue (wq );
1112+ out_failed :
1113+ return 0 ;
10781114}
10791115
10801116static void rpciod_stop (void )
@@ -1088,6 +1124,9 @@ static void rpciod_stop(void)
10881124 wq = rpciod_workqueue ;
10891125 rpciod_workqueue = NULL ;
10901126 destroy_workqueue (wq );
1127+ wq = xprtiod_workqueue ;
1128+ xprtiod_workqueue = NULL ;
1129+ destroy_workqueue (wq );
10911130}
10921131
10931132void
0 commit comments