Skip to content

Commit da64d6d

Browse files
leitaoaxboe
authored andcommitted
io_uring: One wqe per wq
Right now io_wq allocates one io_wqe per NUMA node. As io_wq is now bound to a task, the task basically uses only the NUMA local io_wqe, and almost never changes NUMA nodes, thus, the other wqes are mostly unused. Allocate just one io_wqe embedded into io_wq, and uses all possible cpus (cpu_possible_mask) in the io_wqe->cpumask. Signed-off-by: Breno Leitao <[email protected]> Link: https://lore.kernel.org/r/[email protected] Signed-off-by: Jens Axboe <[email protected]>
1 parent c56e022 commit da64d6d

File tree

1 file changed

+70
-110
lines changed

1 file changed

+70
-110
lines changed

io_uring/io-wq.c

Lines changed: 70 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <linux/cpu.h>
1616
#include <linux/task_work.h>
1717
#include <linux/audit.h>
18+
#include <linux/mmu_context.h>
1819
#include <uapi/linux/io_uring.h>
1920

2021
#include "io-wq.h"
@@ -96,8 +97,6 @@ struct io_wqe {
9697
raw_spinlock_t lock;
9798
struct io_wqe_acct acct[IO_WQ_ACCT_NR];
9899

99-
int node;
100-
101100
struct hlist_nulls_head free_list;
102101
struct list_head all_list;
103102

@@ -127,7 +126,7 @@ struct io_wq {
127126

128127
struct task_struct *task;
129128

130-
struct io_wqe *wqes[];
129+
struct io_wqe wqe;
131130
};
132131

133132
static enum cpuhp_state io_wq_online;
@@ -754,7 +753,7 @@ static void create_worker_cont(struct callback_head *cb)
754753
worker = container_of(cb, struct io_worker, create_work);
755754
clear_bit_unlock(0, &worker->create_state);
756755
wqe = worker->wqe;
757-
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
756+
tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
758757
if (!IS_ERR(tsk)) {
759758
io_init_new_worker(wqe, worker, tsk);
760759
io_worker_release(worker);
@@ -804,7 +803,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
804803

805804
__set_current_state(TASK_RUNNING);
806805

807-
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
806+
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
808807
if (!worker) {
809808
fail:
810809
atomic_dec(&acct->nr_running);
@@ -823,7 +822,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
823822
if (index == IO_WQ_ACCT_BOUND)
824823
worker->flags |= IO_WORKER_F_BOUND;
825824

826-
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
825+
tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
827826
if (!IS_ERR(tsk)) {
828827
io_init_new_worker(wqe, worker, tsk);
829828
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
@@ -961,7 +960,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
961960

962961
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
963962
{
964-
struct io_wqe *wqe = wq->wqes[numa_node_id()];
963+
struct io_wqe *wqe = &wq->wqe;
965964

966965
io_wqe_enqueue(wqe, work);
967966
}
@@ -1083,7 +1082,7 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
10831082
.data = data,
10841083
.cancel_all = cancel_all,
10851084
};
1086-
int node;
1085+
struct io_wqe *wqe = &wq->wqe;
10871086

10881087
/*
10891088
* First check pending list, if we're lucky we can just remove it
@@ -1098,19 +1097,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
10981097
* Do both of these while holding the wqe->lock, to ensure that
10991098
* we'll find a work item regardless of state.
11001099
*/
1101-
for_each_node(node) {
1102-
struct io_wqe *wqe = wq->wqes[node];
1103-
1104-
io_wqe_cancel_pending_work(wqe, &match);
1105-
if (match.nr_pending && !match.cancel_all)
1106-
return IO_WQ_CANCEL_OK;
1100+
io_wqe_cancel_pending_work(wqe, &match);
1101+
if (match.nr_pending && !match.cancel_all)
1102+
return IO_WQ_CANCEL_OK;
11071103

1108-
raw_spin_lock(&wqe->lock);
1109-
io_wqe_cancel_running_work(wqe, &match);
1110-
raw_spin_unlock(&wqe->lock);
1111-
if (match.nr_running && !match.cancel_all)
1112-
return IO_WQ_CANCEL_RUNNING;
1113-
}
1104+
raw_spin_lock(&wqe->lock);
1105+
io_wqe_cancel_running_work(wqe, &match);
1106+
raw_spin_unlock(&wqe->lock);
1107+
if (match.nr_running && !match.cancel_all)
1108+
return IO_WQ_CANCEL_RUNNING;
11141109

11151110
if (match.nr_running)
11161111
return IO_WQ_CANCEL_RUNNING;
@@ -1140,15 +1135,16 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
11401135

11411136
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
11421137
{
1143-
int ret, node, i;
1138+
int ret, i;
11441139
struct io_wq *wq;
1140+
struct io_wqe *wqe;
11451141

11461142
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
11471143
return ERR_PTR(-EINVAL);
11481144
if (WARN_ON_ONCE(!bounded))
11491145
return ERR_PTR(-EINVAL);
11501146

1151-
wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1147+
wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
11521148
if (!wq)
11531149
return ERR_PTR(-ENOMEM);
11541150
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1159,40 +1155,30 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
11591155
wq->hash = data->hash;
11601156
wq->free_work = data->free_work;
11611157
wq->do_work = data->do_work;
1158+
wqe = &wq->wqe;
11621159

11631160
ret = -ENOMEM;
1164-
for_each_node(node) {
1165-
struct io_wqe *wqe;
1166-
int alloc_node = node;
1167-
1168-
if (!node_online(alloc_node))
1169-
alloc_node = NUMA_NO_NODE;
1170-
wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1171-
if (!wqe)
1172-
goto err;
1173-
wq->wqes[node] = wqe;
1174-
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1175-
goto err;
1176-
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1177-
wqe->node = alloc_node;
1178-
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1179-
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1180-
task_rlimit(current, RLIMIT_NPROC);
1181-
INIT_LIST_HEAD(&wqe->wait.entry);
1182-
wqe->wait.func = io_wqe_hash_wake;
1183-
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1184-
struct io_wqe_acct *acct = &wqe->acct[i];
1185-
1186-
acct->index = i;
1187-
atomic_set(&acct->nr_running, 0);
1188-
INIT_WQ_LIST(&acct->work_list);
1189-
raw_spin_lock_init(&acct->lock);
1190-
}
1191-
wqe->wq = wq;
1192-
raw_spin_lock_init(&wqe->lock);
1193-
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1194-
INIT_LIST_HEAD(&wqe->all_list);
1161+
1162+
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1163+
goto err;
1164+
cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
1165+
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1166+
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1167+
task_rlimit(current, RLIMIT_NPROC);
1168+
INIT_LIST_HEAD(&wqe->wait.entry);
1169+
wqe->wait.func = io_wqe_hash_wake;
1170+
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1171+
struct io_wqe_acct *acct = &wqe->acct[i];
1172+
1173+
acct->index = i;
1174+
atomic_set(&acct->nr_running, 0);
1175+
INIT_WQ_LIST(&acct->work_list);
1176+
raw_spin_lock_init(&acct->lock);
11951177
}
1178+
wqe->wq = wq;
1179+
raw_spin_lock_init(&wqe->lock);
1180+
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1181+
INIT_LIST_HEAD(&wqe->all_list);
11961182

11971183
wq->task = get_task_struct(data->task);
11981184
atomic_set(&wq->worker_refs, 1);
@@ -1201,12 +1187,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
12011187
err:
12021188
io_wq_put_hash(data->hash);
12031189
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1204-
for_each_node(node) {
1205-
if (!wq->wqes[node])
1206-
continue;
1207-
free_cpumask_var(wq->wqes[node]->cpu_mask);
1208-
kfree(wq->wqes[node]);
1209-
}
1190+
1191+
free_cpumask_var(wq->wqe.cpu_mask);
12101192
err_wq:
12111193
kfree(wq);
12121194
return ERR_PTR(ret);
@@ -1247,48 +1229,36 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
12471229

12481230
static void io_wq_exit_workers(struct io_wq *wq)
12491231
{
1250-
int node;
1251-
12521232
if (!wq->task)
12531233
return;
12541234

12551235
io_wq_cancel_tw_create(wq);
12561236

12571237
rcu_read_lock();
1258-
for_each_node(node) {
1259-
struct io_wqe *wqe = wq->wqes[node];
1260-
1261-
io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1262-
}
1238+
io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL);
12631239
rcu_read_unlock();
12641240
io_worker_ref_put(wq);
12651241
wait_for_completion(&wq->worker_done);
12661242

1267-
for_each_node(node) {
1268-
spin_lock_irq(&wq->hash->wait.lock);
1269-
list_del_init(&wq->wqes[node]->wait.entry);
1270-
spin_unlock_irq(&wq->hash->wait.lock);
1271-
}
1243+
spin_lock_irq(&wq->hash->wait.lock);
1244+
list_del_init(&wq->wqe.wait.entry);
1245+
spin_unlock_irq(&wq->hash->wait.lock);
1246+
12721247
put_task_struct(wq->task);
12731248
wq->task = NULL;
12741249
}
12751250

12761251
static void io_wq_destroy(struct io_wq *wq)
12771252
{
1278-
int node;
1253+
struct io_cb_cancel_data match = {
1254+
.fn = io_wq_work_match_all,
1255+
.cancel_all = true,
1256+
};
1257+
struct io_wqe *wqe = &wq->wqe;
12791258

12801259
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1281-
1282-
for_each_node(node) {
1283-
struct io_wqe *wqe = wq->wqes[node];
1284-
struct io_cb_cancel_data match = {
1285-
.fn = io_wq_work_match_all,
1286-
.cancel_all = true,
1287-
};
1288-
io_wqe_cancel_pending_work(wqe, &match);
1289-
free_cpumask_var(wqe->cpu_mask);
1290-
kfree(wqe);
1291-
}
1260+
io_wqe_cancel_pending_work(wqe, &match);
1261+
free_cpumask_var(wqe->cpu_mask);
12921262
io_wq_put_hash(wq->hash);
12931263
kfree(wq);
12941264
}
@@ -1323,11 +1293,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
13231293
.cpu = cpu,
13241294
.online = online
13251295
};
1326-
int i;
13271296

13281297
rcu_read_lock();
1329-
for_each_node(i)
1330-
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1298+
io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od);
13311299
rcu_read_unlock();
13321300
return 0;
13331301
}
@@ -1348,18 +1316,15 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
13481316

13491317
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
13501318
{
1351-
int i;
1319+
struct io_wqe *wqe = &wq->wqe;
13521320

13531321
rcu_read_lock();
1354-
for_each_node(i) {
1355-
struct io_wqe *wqe = wq->wqes[i];
1356-
1357-
if (mask)
1358-
cpumask_copy(wqe->cpu_mask, mask);
1359-
else
1360-
cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1361-
}
1322+
if (mask)
1323+
cpumask_copy(wqe->cpu_mask, mask);
1324+
else
1325+
cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
13621326
rcu_read_unlock();
1327+
13631328
return 0;
13641329
}
13651330

@@ -1369,9 +1334,10 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
13691334
*/
13701335
int io_wq_max_workers(struct io_wq *wq, int *new_count)
13711336
{
1337+
struct io_wqe *wqe = &wq->wqe;
1338+
struct io_wqe_acct *acct;
13721339
int prev[IO_WQ_ACCT_NR];
1373-
bool first_node = true;
1374-
int i, node;
1340+
int i;
13751341

13761342
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
13771343
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
@@ -1386,21 +1352,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
13861352
prev[i] = 0;
13871353

13881354
rcu_read_lock();
1389-
for_each_node(node) {
1390-
struct io_wqe *wqe = wq->wqes[node];
1391-
struct io_wqe_acct *acct;
13921355

1393-
raw_spin_lock(&wqe->lock);
1394-
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1395-
acct = &wqe->acct[i];
1396-
if (first_node)
1397-
prev[i] = max_t(int, acct->max_workers, prev[i]);
1398-
if (new_count[i])
1399-
acct->max_workers = new_count[i];
1400-
}
1401-
raw_spin_unlock(&wqe->lock);
1402-
first_node = false;
1356+
raw_spin_lock(&wqe->lock);
1357+
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1358+
acct = &wqe->acct[i];
1359+
prev[i] = max_t(int, acct->max_workers, prev[i]);
1360+
if (new_count[i])
1361+
acct->max_workers = new_count[i];
14031362
}
1363+
raw_spin_unlock(&wqe->lock);
14041364
rcu_read_unlock();
14051365

14061366
for (i = 0; i < IO_WQ_ACCT_NR; i++)

0 commit comments

Comments
 (0)