@@ -130,6 +130,7 @@ struct io_cb_cancel_data {
130130};
131131
132132static void create_io_worker (struct io_wq * wq , struct io_wqe * wqe , int index );
133+ static void io_wqe_dec_running (struct io_worker * worker );
133134
134135static bool io_worker_get (struct io_worker * worker )
135136{
@@ -168,26 +169,21 @@ static void io_worker_exit(struct io_worker *worker)
168169{
169170 struct io_wqe * wqe = worker -> wqe ;
170171 struct io_wqe_acct * acct = io_wqe_get_acct (worker );
171- unsigned flags ;
172172
173173 if (refcount_dec_and_test (& worker -> ref ))
174174 complete (& worker -> ref_done );
175175 wait_for_completion (& worker -> ref_done );
176176
177- preempt_disable ();
178- current -> flags &= ~PF_IO_WORKER ;
179- flags = worker -> flags ;
180- worker -> flags = 0 ;
181- if (flags & IO_WORKER_F_RUNNING )
182- atomic_dec (& acct -> nr_running );
183- worker -> flags = 0 ;
184- preempt_enable ();
185-
186177 raw_spin_lock_irq (& wqe -> lock );
187- if (flags & IO_WORKER_F_FREE )
178+ if (worker -> flags & IO_WORKER_F_FREE )
188179 hlist_nulls_del_rcu (& worker -> nulls_node );
189180 list_del_rcu (& worker -> all_list );
190181 acct -> nr_workers -- ;
182+ preempt_disable ();
183+ io_wqe_dec_running (worker );
184+ worker -> flags = 0 ;
185+ current -> flags &= ~PF_IO_WORKER ;
186+ preempt_enable ();
191187 raw_spin_unlock_irq (& wqe -> lock );
192188
193189 kfree_rcu (worker , rcu );
@@ -214,15 +210,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
214210 struct hlist_nulls_node * n ;
215211 struct io_worker * worker ;
216212
217- n = rcu_dereference (hlist_nulls_first_rcu (& wqe -> free_list ));
218- if (is_a_nulls (n ))
219- return false;
220-
221- worker = hlist_nulls_entry (n , struct io_worker , nulls_node );
222- if (io_worker_get (worker )) {
223- wake_up_process (worker -> task );
213+ /*
214+ * Iterate free_list and see if we can find an idle worker to
215+ * activate. If a given worker is on the free_list but in the process
216+ * of exiting, keep trying.
217+ */
218+ hlist_nulls_for_each_entry_rcu (worker , n , & wqe -> free_list , nulls_node ) {
219+ if (!io_worker_get (worker ))
220+ continue ;
221+ if (wake_up_process (worker -> task )) {
222+ io_worker_release (worker );
223+ return true;
224+ }
224225 io_worker_release (worker );
225- return true;
226226 }
227227
228228 return false;
@@ -247,10 +247,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
247247 ret = io_wqe_activate_free_worker (wqe );
248248 rcu_read_unlock ();
249249
250- if (!ret && acct -> nr_workers < acct -> max_workers ) {
251- atomic_inc (& acct -> nr_running );
252- atomic_inc (& wqe -> wq -> worker_refs );
253- create_io_worker (wqe -> wq , wqe , acct -> index );
250+ if (!ret ) {
251+ bool do_create = false;
252+
253+ raw_spin_lock_irq (& wqe -> lock );
254+ if (acct -> nr_workers < acct -> max_workers ) {
255+ atomic_inc (& acct -> nr_running );
256+ atomic_inc (& wqe -> wq -> worker_refs );
257+ acct -> nr_workers ++ ;
258+ do_create = true;
259+ }
260+ raw_spin_unlock_irq (& wqe -> lock );
261+ if (do_create )
262+ create_io_worker (wqe -> wq , wqe , acct -> index );
254263 }
255264}
256265
@@ -271,9 +280,17 @@ static void create_worker_cb(struct callback_head *cb)
271280{
272281 struct create_worker_data * cwd ;
273282 struct io_wq * wq ;
283+ struct io_wqe * wqe ;
284+ struct io_wqe_acct * acct ;
274285
275286 cwd = container_of (cb , struct create_worker_data , work );
276- wq = cwd -> wqe -> wq ;
287+ wqe = cwd -> wqe ;
288+ wq = wqe -> wq ;
289+ acct = & wqe -> acct [cwd -> index ];
290+ raw_spin_lock_irq (& wqe -> lock );
291+ if (acct -> nr_workers < acct -> max_workers )
292+ acct -> nr_workers ++ ;
293+ raw_spin_unlock_irq (& wqe -> lock );
277294 create_io_worker (wq , cwd -> wqe , cwd -> index );
278295 kfree (cwd );
279296}
@@ -635,6 +652,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
635652 kfree (worker );
636653fail :
637654 atomic_dec (& acct -> nr_running );
655+ raw_spin_lock_irq (& wqe -> lock );
656+ acct -> nr_workers -- ;
657+ raw_spin_unlock_irq (& wqe -> lock );
638658 io_worker_ref_put (wq );
639659 return ;
640660 }
@@ -650,9 +670,8 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
650670 worker -> flags |= IO_WORKER_F_FREE ;
651671 if (index == IO_WQ_ACCT_BOUND )
652672 worker -> flags |= IO_WORKER_F_BOUND ;
653- if (! acct -> nr_workers && (worker -> flags & IO_WORKER_F_BOUND ))
673+ if (( acct -> nr_workers == 1 ) && (worker -> flags & IO_WORKER_F_BOUND ))
654674 worker -> flags |= IO_WORKER_F_FIXED ;
655- acct -> nr_workers ++ ;
656675 raw_spin_unlock_irq (& wqe -> lock );
657676 wake_up_new_task (tsk );
658677}
0 commit comments