4444#include <linux/percpu.h>
4545#include <linux/slab.h>
4646#include <linux/workqueue.h>
47+ #include <linux/kthread.h>
4748#include <linux/blkdev.h>
4849#include <linux/bvec.h>
4950#include <linux/net.h>
@@ -108,12 +109,16 @@ struct io_ring_ctx {
108109 unsigned cached_sq_head ;
109110 unsigned sq_entries ;
110111 unsigned sq_mask ;
112+ unsigned sq_thread_idle ;
111113 struct io_uring_sqe * sq_sqes ;
112114 } ____cacheline_aligned_in_smp ;
113115
114116 /* IO offload */
115117 struct workqueue_struct * sqo_wq ;
118+ struct task_struct * sqo_thread ; /* if using sq thread polling */
116119 struct mm_struct * sqo_mm ;
120+ wait_queue_head_t sqo_wait ;
121+ unsigned sqo_stop ;
117122
118123 struct {
119124 /* CQ ring */
@@ -168,6 +173,7 @@ struct sqe_submit {
168173 unsigned short index ;
169174 bool has_user ;
170175 bool needs_lock ;
176+ bool needs_fixed_file ;
171177};
172178
173179struct io_kiocb {
@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
327333
328334 if (waitqueue_active (& ctx -> wait ))
329335 wake_up (& ctx -> wait );
336+ if (waitqueue_active (& ctx -> sqo_wait ))
337+ wake_up (& ctx -> sqo_wait );
330338}
331339
332340static void io_ring_drop_ctx_refs (struct io_ring_ctx * ctx , unsigned refs )
@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file)
680688 return false;
681689}
682690
683- static int io_prep_rw (struct io_kiocb * req , const struct io_uring_sqe * sqe ,
691+ static int io_prep_rw (struct io_kiocb * req , const struct sqe_submit * s ,
684692 bool force_nonblock , struct io_submit_state * state )
685693{
694+ const struct io_uring_sqe * sqe = s -> sqe ;
686695 struct io_ring_ctx * ctx = req -> ctx ;
687696 struct kiocb * kiocb = & req -> rw ;
688697 unsigned ioprio , flags ;
@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
702711 kiocb -> ki_filp = ctx -> user_files [fd ];
703712 req -> flags |= REQ_F_FIXED_FILE ;
704713 } else {
714+ if (s -> needs_fixed_file )
715+ return - EBADF ;
705716 kiocb -> ki_filp = io_file_get (state , fd );
706717 if (unlikely (!kiocb -> ki_filp ))
707718 return - EBADF ;
@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
865876 struct file * file ;
866877 ssize_t ret ;
867878
868- ret = io_prep_rw (req , s -> sqe , force_nonblock , state );
879+ ret = io_prep_rw (req , s , force_nonblock , state );
869880 if (ret )
870881 return ret ;
871882 file = kiocb -> ki_filp ;
@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
909920 struct file * file ;
910921 ssize_t ret ;
911922
912- ret = io_prep_rw (req , s -> sqe , force_nonblock , state );
923+ ret = io_prep_rw (req , s , force_nonblock , state );
913924 if (ret )
914925 return ret ;
915926 /* Hold on to the file for -EAGAIN */
@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
13011312 return false;
13021313}
13031314
1315+ static int io_submit_sqes (struct io_ring_ctx * ctx , struct sqe_submit * sqes ,
1316+ unsigned int nr , bool has_user , bool mm_fault )
1317+ {
1318+ struct io_submit_state state , * statep = NULL ;
1319+ int ret , i , submitted = 0 ;
1320+
1321+ if (nr > IO_PLUG_THRESHOLD ) {
1322+ io_submit_state_start (& state , ctx , nr );
1323+ statep = & state ;
1324+ }
1325+
1326+ for (i = 0 ; i < nr ; i ++ ) {
1327+ if (unlikely (mm_fault )) {
1328+ ret = - EFAULT ;
1329+ } else {
1330+ sqes [i ].has_user = has_user ;
1331+ sqes [i ].needs_lock = true;
1332+ sqes [i ].needs_fixed_file = true;
1333+ ret = io_submit_sqe (ctx , & sqes [i ], statep );
1334+ }
1335+ if (!ret ) {
1336+ submitted ++ ;
1337+ continue ;
1338+ }
1339+
1340+ io_cqring_add_event (ctx , sqes [i ].sqe -> user_data , ret , 0 );
1341+ }
1342+
1343+ if (statep )
1344+ io_submit_state_end (& state );
1345+
1346+ return submitted ;
1347+ }
1348+
1349+ static int io_sq_thread (void * data )
1350+ {
1351+ struct sqe_submit sqes [IO_IOPOLL_BATCH ];
1352+ struct io_ring_ctx * ctx = data ;
1353+ struct mm_struct * cur_mm = NULL ;
1354+ mm_segment_t old_fs ;
1355+ DEFINE_WAIT (wait );
1356+ unsigned inflight ;
1357+ unsigned long timeout ;
1358+
1359+ old_fs = get_fs ();
1360+ set_fs (USER_DS );
1361+
1362+ timeout = inflight = 0 ;
1363+ while (!kthread_should_stop () && !ctx -> sqo_stop ) {
1364+ bool all_fixed , mm_fault = false;
1365+ int i ;
1366+
1367+ if (inflight ) {
1368+ unsigned nr_events = 0 ;
1369+
1370+ if (ctx -> flags & IORING_SETUP_IOPOLL ) {
1371+ /*
1372+ * We disallow the app entering submit/complete
1373+ * with polling, but we still need to lock the
1374+ * ring to prevent racing with polled issue
1375+ * that got punted to a workqueue.
1376+ */
1377+ mutex_lock (& ctx -> uring_lock );
1378+ io_iopoll_check (ctx , & nr_events , 0 );
1379+ mutex_unlock (& ctx -> uring_lock );
1380+ } else {
1381+ /*
1382+ * Normal IO, just pretend everything completed.
1383+ * We don't have to poll completions for that.
1384+ */
1385+ nr_events = inflight ;
1386+ }
1387+
1388+ inflight -= nr_events ;
1389+ if (!inflight )
1390+ timeout = jiffies + ctx -> sq_thread_idle ;
1391+ }
1392+
1393+ if (!io_get_sqring (ctx , & sqes [0 ])) {
1394+ /*
1395+ * We're polling. If we're within the defined idle
1396+ * period, then let us spin without work before going
1397+ * to sleep.
1398+ */
1399+ if (inflight || !time_after (jiffies , timeout )) {
1400+ cpu_relax ();
1401+ continue ;
1402+ }
1403+
1404+ /*
1405+ * Drop cur_mm before scheduling, we can't hold it for
1406+ * long periods (or over schedule()). Do this before
1407+ * adding ourselves to the waitqueue, as the unuse/drop
1408+ * may sleep.
1409+ */
1410+ if (cur_mm ) {
1411+ unuse_mm (cur_mm );
1412+ mmput (cur_mm );
1413+ cur_mm = NULL ;
1414+ }
1415+
1416+ prepare_to_wait (& ctx -> sqo_wait , & wait ,
1417+ TASK_INTERRUPTIBLE );
1418+
1419+ /* Tell userspace we may need a wakeup call */
1420+ ctx -> sq_ring -> flags |= IORING_SQ_NEED_WAKEUP ;
1421+ smp_wmb ();
1422+
1423+ if (!io_get_sqring (ctx , & sqes [0 ])) {
1424+ if (kthread_should_stop ()) {
1425+ finish_wait (& ctx -> sqo_wait , & wait );
1426+ break ;
1427+ }
1428+ if (signal_pending (current ))
1429+ flush_signals (current );
1430+ schedule ();
1431+ finish_wait (& ctx -> sqo_wait , & wait );
1432+
1433+ ctx -> sq_ring -> flags &= ~IORING_SQ_NEED_WAKEUP ;
1434+ smp_wmb ();
1435+ continue ;
1436+ }
1437+ finish_wait (& ctx -> sqo_wait , & wait );
1438+
1439+ ctx -> sq_ring -> flags &= ~IORING_SQ_NEED_WAKEUP ;
1440+ smp_wmb ();
1441+ }
1442+
1443+ i = 0 ;
1444+ all_fixed = true;
1445+ do {
1446+ if (all_fixed && io_sqe_needs_user (sqes [i ].sqe ))
1447+ all_fixed = false;
1448+
1449+ i ++ ;
1450+ if (i == ARRAY_SIZE (sqes ))
1451+ break ;
1452+ } while (io_get_sqring (ctx , & sqes [i ]));
1453+
1454+ /* Unless all new commands are FIXED regions, grab mm */
1455+ if (!all_fixed && !cur_mm ) {
1456+ mm_fault = !mmget_not_zero (ctx -> sqo_mm );
1457+ if (!mm_fault ) {
1458+ use_mm (ctx -> sqo_mm );
1459+ cur_mm = ctx -> sqo_mm ;
1460+ }
1461+ }
1462+
1463+ inflight += io_submit_sqes (ctx , sqes , i , cur_mm != NULL ,
1464+ mm_fault );
1465+
1466+ /* Commit SQ ring head once we've consumed all SQEs */
1467+ io_commit_sqring (ctx );
1468+ }
1469+
1470+ set_fs (old_fs );
1471+ if (cur_mm ) {
1472+ unuse_mm (cur_mm );
1473+ mmput (cur_mm );
1474+ }
1475+ return 0 ;
1476+ }
1477+
13041478static int io_ring_submit (struct io_ring_ctx * ctx , unsigned int to_submit )
13051479{
13061480 struct io_submit_state state , * statep = NULL ;
@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
13191493
13201494 s .has_user = true;
13211495 s .needs_lock = false;
1496+ s .needs_fixed_file = false;
13221497
13231498 ret = io_submit_sqe (ctx , & s , statep );
13241499 if (ret ) {
@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
14181593 return 0 ;
14191594}
14201595
1596+ static void io_sq_thread_stop (struct io_ring_ctx * ctx )
1597+ {
1598+ if (ctx -> sqo_thread ) {
1599+ ctx -> sqo_stop = 1 ;
1600+ mb ();
1601+ kthread_stop (ctx -> sqo_thread );
1602+ ctx -> sqo_thread = NULL ;
1603+ }
1604+ }
1605+
14211606static void io_finish_async (struct io_ring_ctx * ctx )
14221607{
1608+ io_sq_thread_stop (ctx );
1609+
14231610 if (ctx -> sqo_wq ) {
14241611 destroy_workqueue (ctx -> sqo_wq );
14251612 ctx -> sqo_wq = NULL ;
@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
15831770 return ret ;
15841771}
15851772
1586- static int io_sq_offload_start (struct io_ring_ctx * ctx )
1773+ static int io_sq_offload_start (struct io_ring_ctx * ctx ,
1774+ struct io_uring_params * p )
15871775{
15881776 int ret ;
15891777
1778+ init_waitqueue_head (& ctx -> sqo_wait );
15901779 mmgrab (current -> mm );
15911780 ctx -> sqo_mm = current -> mm ;
15921781
1782+ ctx -> sq_thread_idle = msecs_to_jiffies (p -> sq_thread_idle );
1783+ if (!ctx -> sq_thread_idle )
1784+ ctx -> sq_thread_idle = HZ ;
1785+
1786+ ret = - EINVAL ;
1787+ if (!cpu_possible (p -> sq_thread_cpu ))
1788+ goto err ;
1789+
1790+ if (ctx -> flags & IORING_SETUP_SQPOLL ) {
1791+ if (p -> flags & IORING_SETUP_SQ_AFF ) {
1792+ int cpu ;
1793+
1794+ cpu = array_index_nospec (p -> sq_thread_cpu , NR_CPUS );
1795+ ctx -> sqo_thread = kthread_create_on_cpu (io_sq_thread ,
1796+ ctx , cpu ,
1797+ "io_uring-sq" );
1798+ } else {
1799+ ctx -> sqo_thread = kthread_create (io_sq_thread , ctx ,
1800+ "io_uring-sq" );
1801+ }
1802+ if (IS_ERR (ctx -> sqo_thread )) {
1803+ ret = PTR_ERR (ctx -> sqo_thread );
1804+ ctx -> sqo_thread = NULL ;
1805+ goto err ;
1806+ }
1807+ wake_up_process (ctx -> sqo_thread );
1808+ } else if (p -> flags & IORING_SETUP_SQ_AFF ) {
1809+ /* Can't have SQ_AFF without SQPOLL */
1810+ ret = - EINVAL ;
1811+ goto err ;
1812+ }
1813+
15931814 /* Do QD, or 2 * CPUS, whatever is smallest */
15941815 ctx -> sqo_wq = alloc_workqueue ("io_ring-wq" , WQ_UNBOUND | WQ_FREEZABLE ,
15951816 min (ctx -> sq_entries - 1 , 2 * num_online_cpus ()));
@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
16001821
16011822 return 0 ;
16021823err :
1824+ io_sq_thread_stop (ctx );
16031825 mmdrop (ctx -> sqo_mm );
16041826 ctx -> sqo_mm = NULL ;
16051827 return ret ;
@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
19592181 int submitted = 0 ;
19602182 struct fd f ;
19612183
1962- if (flags & ~IORING_ENTER_GETEVENTS )
2184+ if (flags & ~( IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP ) )
19632185 return - EINVAL ;
19642186
19652187 f = fdget (fd );
@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
19752197 if (!percpu_ref_tryget (& ctx -> refs ))
19762198 goto out_fput ;
19772199
2200+ /*
2201+ * For SQ polling, the thread will do all submissions and completions.
2202+ * Just return the requested submit count, and wake the thread if
2203+ * we were asked to.
2204+ */
2205+ if (ctx -> flags & IORING_SETUP_SQPOLL ) {
2206+ if (flags & IORING_ENTER_SQ_WAKEUP )
2207+ wake_up (& ctx -> sqo_wait );
2208+ submitted = to_submit ;
2209+ goto out_ctx ;
2210+ }
2211+
19782212 ret = 0 ;
19792213 if (to_submit ) {
19802214 to_submit = min (to_submit , ctx -> sq_entries );
@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
21562390 if (ret )
21572391 goto err ;
21582392
2159- ret = io_sq_offload_start (ctx );
2393+ ret = io_sq_offload_start (ctx , p );
21602394 if (ret )
21612395 goto err ;
21622396
@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
22042438 return - EINVAL ;
22052439 }
22062440
2207- if (p .flags & ~IORING_SETUP_IOPOLL )
2441+ if (p .flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2442+ IORING_SETUP_SQ_AFF ))
22082443 return - EINVAL ;
22092444
22102445 ret = io_uring_create (entries , & p );
0 commit comments