Skip to content

Commit a53851e

Browse files
jrfastabdavem330
authored andcommitted
net: sched: explicit locking in gso_cpu fallback
This work is preparing the qdisc layer to support egress lockless qdiscs. If we are running the egress qdisc lockless in the case we overrun the netdev, for whatever reason, the netdev returns a busy error code and the skb is parked on the gso_skb pointer. With many cores all hitting this case at once its possible to have multiple sk_buffs here so we turn gso_skb into a queue. This should be the edge case and if we see this frequently then the netdev/qdisc layer needs to back off. Signed-off-by: John Fastabend <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent d59f5ff commit a53851e

File tree

2 files changed

+84
-21
lines changed

2 files changed

+84
-21
lines changed

include/net/sch_generic.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ struct Qdisc {
8888
/*
8989
* For performance sake on SMP, we put highly modified fields at the end
9090
*/
91-
struct sk_buff *gso_skb ____cacheline_aligned_in_smp;
91+
struct sk_buff_head gso_skb ____cacheline_aligned_in_smp;
9292
struct qdisc_skb_head q;
9393
struct gnet_stats_basic_packed bstats;
9494
seqcount_t running;
@@ -796,26 +796,30 @@ static inline struct sk_buff *qdisc_peek_head(struct Qdisc *sch)
796796
/* generic pseudo peek method for non-work-conserving qdisc */
797797
static inline struct sk_buff *qdisc_peek_dequeued(struct Qdisc *sch)
798798
{
799+
struct sk_buff *skb = skb_peek(&sch->gso_skb);
800+
799801
/* we can reuse ->gso_skb because peek isn't called for root qdiscs */
800-
if (!sch->gso_skb) {
801-
sch->gso_skb = sch->dequeue(sch);
802-
if (sch->gso_skb) {
802+
if (!skb) {
803+
skb = sch->dequeue(sch);
804+
805+
if (skb) {
806+
__skb_queue_head(&sch->gso_skb, skb);
803807
/* it's still part of the queue */
804-
qdisc_qstats_backlog_inc(sch, sch->gso_skb);
808+
qdisc_qstats_backlog_inc(sch, skb);
805809
sch->q.qlen++;
806810
}
807811
}
808812

809-
return sch->gso_skb;
813+
return skb;
810814
}
811815

812816
/* use instead of qdisc->dequeue() for all qdiscs queried with ->peek() */
813817
static inline struct sk_buff *qdisc_dequeue_peeked(struct Qdisc *sch)
814818
{
815-
struct sk_buff *skb = sch->gso_skb;
819+
struct sk_buff *skb = skb_peek(&sch->gso_skb);
816820

817821
if (skb) {
818-
sch->gso_skb = NULL;
822+
skb = __skb_dequeue(&sch->gso_skb);
819823
qdisc_qstats_backlog_dec(sch, skb);
820824
sch->q.qlen--;
821825
} else {

net/sched/sch_generic.c

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ EXPORT_SYMBOL(default_qdisc_ops);
4545
* - ingress filtering is also serialized via qdisc root lock
4646
* - updates to tree and tree walking are only done under the rtnl mutex.
4747
*/
48-
49-
static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
48+
static inline int __dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
5049
{
51-
q->gso_skb = skb;
50+
__skb_queue_head(&q->gso_skb, skb);
5251
q->qstats.requeues++;
5352
qdisc_qstats_backlog_inc(q, skb);
5453
q->q.qlen++; /* it's still part of the queue */
@@ -57,6 +56,30 @@ static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
5756
return 0;
5857
}
5958

59+
static inline int dev_requeue_skb_locked(struct sk_buff *skb, struct Qdisc *q)
60+
{
61+
spinlock_t *lock = qdisc_lock(q);
62+
63+
spin_lock(lock);
64+
__skb_queue_tail(&q->gso_skb, skb);
65+
spin_unlock(lock);
66+
67+
qdisc_qstats_cpu_requeues_inc(q);
68+
qdisc_qstats_cpu_backlog_inc(q, skb);
69+
qdisc_qstats_cpu_qlen_inc(q);
70+
__netif_schedule(q);
71+
72+
return 0;
73+
}
74+
75+
static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
76+
{
77+
if (q->flags & TCQ_F_NOLOCK)
78+
return dev_requeue_skb_locked(skb, q);
79+
else
80+
return __dev_requeue_skb(skb, q);
81+
}
82+
6083
static void try_bulk_dequeue_skb(struct Qdisc *q,
6184
struct sk_buff *skb,
6285
const struct netdev_queue *txq,
@@ -112,23 +135,50 @@ static void try_bulk_dequeue_skb_slow(struct Qdisc *q,
112135
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
113136
int *packets)
114137
{
115-
struct sk_buff *skb = q->gso_skb;
116138
const struct netdev_queue *txq = q->dev_queue;
139+
struct sk_buff *skb;
117140

118141
*packets = 1;
119-
if (unlikely(skb)) {
142+
if (unlikely(!skb_queue_empty(&q->gso_skb))) {
143+
spinlock_t *lock = NULL;
144+
145+
if (q->flags & TCQ_F_NOLOCK) {
146+
lock = qdisc_lock(q);
147+
spin_lock(lock);
148+
}
149+
150+
skb = skb_peek(&q->gso_skb);
151+
152+
/* skb may be null if another cpu pulls gso_skb off in between
153+
* empty check and lock.
154+
*/
155+
if (!skb) {
156+
if (lock)
157+
spin_unlock(lock);
158+
goto validate;
159+
}
160+
120161
/* skb in gso_skb were already validated */
121162
*validate = false;
122163
/* check the reason of requeuing without tx lock first */
123164
txq = skb_get_tx_queue(txq->dev, skb);
124165
if (!netif_xmit_frozen_or_stopped(txq)) {
125-
q->gso_skb = NULL;
126-
qdisc_qstats_backlog_dec(q, skb);
127-
q->q.qlen--;
128-
} else
166+
skb = __skb_dequeue(&q->gso_skb);
167+
if (qdisc_is_percpu_stats(q)) {
168+
qdisc_qstats_cpu_backlog_dec(q, skb);
169+
qdisc_qstats_cpu_qlen_dec(q);
170+
} else {
171+
qdisc_qstats_backlog_dec(q, skb);
172+
q->q.qlen--;
173+
}
174+
} else {
129175
skb = NULL;
176+
}
177+
if (lock)
178+
spin_unlock(lock);
130179
goto trace;
131180
}
181+
validate:
132182
*validate = true;
133183
skb = q->skb_bad_txq;
134184
if (unlikely(skb)) {
@@ -629,6 +679,7 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
629679
sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
630680
sch->padded = (char *) sch - (char *) p;
631681
}
682+
__skb_queue_head_init(&sch->gso_skb);
632683
qdisc_skb_head_init(&sch->q);
633684
spin_lock_init(&sch->q.lock);
634685

@@ -697,17 +748,19 @@ EXPORT_SYMBOL(qdisc_create_dflt);
697748
void qdisc_reset(struct Qdisc *qdisc)
698749
{
699750
const struct Qdisc_ops *ops = qdisc->ops;
751+
struct sk_buff *skb, *tmp;
700752

701753
if (ops->reset)
702754
ops->reset(qdisc);
703755

704756
kfree_skb(qdisc->skb_bad_txq);
705757
qdisc->skb_bad_txq = NULL;
706758

707-
if (qdisc->gso_skb) {
708-
kfree_skb_list(qdisc->gso_skb);
709-
qdisc->gso_skb = NULL;
759+
skb_queue_walk_safe(&qdisc->gso_skb, skb, tmp) {
760+
__skb_unlink(skb, &qdisc->gso_skb);
761+
kfree_skb_list(skb);
710762
}
763+
711764
qdisc->q.qlen = 0;
712765
qdisc->qstats.backlog = 0;
713766
}
@@ -726,6 +779,7 @@ static void qdisc_free(struct Qdisc *qdisc)
726779
void qdisc_destroy(struct Qdisc *qdisc)
727780
{
728781
const struct Qdisc_ops *ops = qdisc->ops;
782+
struct sk_buff *skb, *tmp;
729783

730784
if (qdisc->flags & TCQ_F_BUILTIN ||
731785
!refcount_dec_and_test(&qdisc->refcnt))
@@ -745,7 +799,11 @@ void qdisc_destroy(struct Qdisc *qdisc)
745799
module_put(ops->owner);
746800
dev_put(qdisc_dev(qdisc));
747801

748-
kfree_skb_list(qdisc->gso_skb);
802+
skb_queue_walk_safe(&qdisc->gso_skb, skb, tmp) {
803+
__skb_unlink(skb, &qdisc->gso_skb);
804+
kfree_skb_list(skb);
805+
}
806+
749807
kfree_skb(qdisc->skb_bad_txq);
750808
qdisc_free(qdisc);
751809
}
@@ -973,6 +1031,7 @@ static void dev_init_scheduler_queue(struct net_device *dev,
9731031

9741032
rcu_assign_pointer(dev_queue->qdisc, qdisc);
9751033
dev_queue->qdisc_sleeping = qdisc;
1034+
__skb_queue_head_init(&qdisc->gso_skb);
9761035
}
9771036

9781037
void dev_init_scheduler(struct net_device *dev)

0 commit comments

Comments
 (0)