Skip to content

Commit 5d7edbc

Browse files
committed
rxrpc: Get rid of the Rx ring
Get rid of the Rx ring and replace it with a pair of queues instead. One queue gets the packets that are in-sequence and are ready for processing by recvmsg(); the other queue gets the out-of-sequence packets for addition to the first queue as the holes get filled. The annotation ring is removed and replaced with a SACK table. The SACK table has the bits set that correspond exactly to the sequence number of the packet being acked. The SACK ring is copied when an ACK packet is being assembled and rotated so that the first ACK is in byte 0. Flow control handling is altered so that packets that are moved to the in-sequence queue are hard-ACK'd even before they're consumed - and then the Rx window size in the ACK packet (rsize) is shrunk down to compensate (even going to 0 if the window is full). Signed-off-by: David Howells <[email protected]> cc: Marc Dionne <[email protected]> cc: [email protected]
1 parent d4d02d8 commit 5d7edbc

File tree

11 files changed

+290
-207
lines changed

11 files changed

+290
-207
lines changed

include/trace/events/rxrpc.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,12 @@
104104
EM(rxrpc_receive_incoming, "INC") \
105105
EM(rxrpc_receive_queue, "QUE") \
106106
EM(rxrpc_receive_queue_last, "QLS") \
107-
E_(rxrpc_receive_rotate, "ROT")
107+
EM(rxrpc_receive_queue_oos, "QUO") \
108+
EM(rxrpc_receive_queue_oos_last, "QOL") \
109+
EM(rxrpc_receive_oos, "OOS") \
110+
EM(rxrpc_receive_oos_last, "OSL") \
111+
EM(rxrpc_receive_rotate, "ROT") \
112+
E_(rxrpc_receive_rotate_last, "RLS")
108113

109114
#define rxrpc_recvmsg_traces \
110115
EM(rxrpc_recvmsg_cont, "CONT") \
@@ -860,26 +865,24 @@ TRACE_EVENT(rxrpc_receive,
860865
__field(enum rxrpc_receive_trace, why )
861866
__field(rxrpc_serial_t, serial )
862867
__field(rxrpc_seq_t, seq )
863-
__field(rxrpc_seq_t, hard_ack )
864-
__field(rxrpc_seq_t, top )
868+
__field(u64, window )
865869
),
866870

867871
TP_fast_assign(
868872
__entry->call = call->debug_id;
869873
__entry->why = why;
870874
__entry->serial = serial;
871875
__entry->seq = seq;
872-
__entry->hard_ack = call->rx_hard_ack;
873-
__entry->top = call->rx_top;
876+
__entry->window = atomic64_read(&call->ackr_window);
874877
),
875878

876879
TP_printk("c=%08x %s r=%08x q=%08x w=%08x-%08x",
877880
__entry->call,
878881
__print_symbolic(__entry->why, rxrpc_receive_traces),
879882
__entry->serial,
880883
__entry->seq,
881-
__entry->hard_ack,
882-
__entry->top)
884+
lower_32_bits(__entry->window),
885+
upper_32_bits(__entry->window))
883886
);
884887

885888
TRACE_EVENT(rxrpc_recvmsg,
@@ -1459,7 +1462,7 @@ TRACE_EVENT(rxrpc_call_reset,
14591462
__entry->call_serial = call->rx_serial;
14601463
__entry->conn_serial = call->conn->hi_serial;
14611464
__entry->tx_seq = call->tx_hard_ack;
1462-
__entry->rx_seq = call->rx_hard_ack;
1465+
__entry->rx_seq = call->rx_highest_seq;
14631466
),
14641467

14651468
TP_printk("c=%08x %08x:%08x r=%08x/%08x tx=%08x rx=%08x",

net/rxrpc/ar-internal.h

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ struct rxrpc_skb_priv {
198198
u16 remain;
199199
u16 offset; /* Offset of data */
200200
u16 len; /* Length of data */
201-
u8 rx_flags; /* Received packet flags */
202201
u8 flags;
203202
#define RXRPC_RX_VERIFIED 0x01
204203

@@ -644,8 +643,20 @@ struct rxrpc_call {
644643
rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
645644
* not hard-ACK'd packet follows this.
646645
*/
646+
647+
/* Transmitted data tracking. */
647648
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
648649
u16 tx_backoff; /* Delay to insert due to Tx failure */
650+
u8 tx_winsize; /* Maximum size of Tx window */
651+
652+
/* Received data tracking */
653+
struct sk_buff_head recvmsg_queue; /* Queue of packets ready for recvmsg() */
654+
struct sk_buff_head rx_oos_queue; /* Queue of out of sequence packets */
655+
656+
rxrpc_seq_t rx_highest_seq; /* Higest sequence number received */
657+
rxrpc_seq_t rx_consumed; /* Highest packet consumed */
658+
rxrpc_serial_t rx_serial; /* Highest serial received for this call */
659+
u8 rx_winsize; /* Size of Rx window */
649660

650661
/* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
651662
* is fixed, we keep these numbers in terms of segments (ie. DATA
@@ -660,23 +671,19 @@ struct rxrpc_call {
660671
u8 cong_cumul_acks; /* Cumulative ACK count */
661672
ktime_t cong_tstamp; /* Last time cwnd was changed */
662673

663-
rxrpc_seq_t rx_hard_ack; /* Dead slot in buffer; the first received but not
664-
* consumed packet follows this.
665-
*/
666-
rxrpc_seq_t rx_top; /* Highest Rx slot allocated. */
667-
rxrpc_seq_t rx_expect_next; /* Expected next packet sequence number */
668-
rxrpc_serial_t rx_serial; /* Highest serial received for this call */
669-
u8 rx_winsize; /* Size of Rx window */
670-
u8 tx_winsize; /* Maximum size of Tx window */
671-
672674
spinlock_t input_lock; /* Lock for packet input to this call */
673675

674676
/* Receive-phase ACK management (ACKs we send). */
675677
u8 ackr_reason; /* reason to ACK */
676678
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
677-
rxrpc_seq_t ackr_highest_seq; /* Higest sequence number received */
679+
atomic64_t ackr_window; /* Base (in LSW) and top (in MSW) of SACK window */
678680
atomic_t ackr_nr_unacked; /* Number of unacked packets */
679681
atomic_t ackr_nr_consumed; /* Number of packets needing hard ACK */
682+
struct {
683+
#define RXRPC_SACK_SIZE 256
684+
/* SACK table for soft-acked packets */
685+
u8 ackr_sack_table[RXRPC_SACK_SIZE];
686+
} __aligned(8);
680687

681688
/* RTT management */
682689
rxrpc_serial_t rtt_serial[4]; /* Serial number of DATA or PING sent */

net/rxrpc/call_object.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
155155
INIT_LIST_HEAD(&call->accept_link);
156156
INIT_LIST_HEAD(&call->recvmsg_link);
157157
INIT_LIST_HEAD(&call->sock_link);
158+
skb_queue_head_init(&call->recvmsg_queue);
159+
skb_queue_head_init(&call->rx_oos_queue);
158160
init_waitqueue_head(&call->waitq);
159161
spin_lock_init(&call->lock);
160162
spin_lock_init(&call->notify_lock);
@@ -165,13 +167,12 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
165167
call->tx_total_len = -1;
166168
call->next_rx_timo = 20 * HZ;
167169
call->next_req_timo = 1 * HZ;
170+
atomic64_set(&call->ackr_window, 0x100000001ULL);
168171

169172
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
170173

171-
/* Leave space in the ring to handle a maxed-out jumbo packet */
172174
call->rx_winsize = rxrpc_rx_window_size;
173175
call->tx_winsize = 16;
174-
call->rx_expect_next = 1;
175176

176177
call->cong_cwnd = 2;
177178
call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
@@ -519,6 +520,8 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
519520
rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
520521
call->rxtx_buffer[i] = NULL;
521522
}
523+
skb_queue_purge(&call->recvmsg_queue);
524+
skb_queue_purge(&call->rx_oos_queue);
522525
}
523526

524527
/*

net/rxrpc/conn_object.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
175175
trace_rxrpc_disconnect_call(call);
176176
switch (call->completion) {
177177
case RXRPC_CALL_SUCCEEDED:
178-
chan->last_seq = call->rx_hard_ack;
178+
chan->last_seq = call->rx_highest_seq;
179179
chan->last_type = RXRPC_PACKET_TYPE_ACK;
180180
break;
181181
case RXRPC_CALL_LOCALLY_ABORTED:

0 commit comments

Comments
 (0)