Skip to content

Commit 10724cc

Browse files
Jon Paul Maloydavem330
authored andcommitted
tipc: redesign connection-level flow control
There are two flow control mechanisms in TIPC; one at link level that handles network congestion, burst control, and retransmission, and one at connection level which' only remaining task is to prevent overflow in the receiving socket buffer. In TIPC, the latter task has to be solved end-to-end because messages can not be thrown away once they have been accepted and delivered upwards from the link layer, i.e, we can never permit the receive buffer to overflow. Currently, this algorithm is message based. A counter in the receiving socket keeps track of number of consumed messages, and sends a dedicated acknowledge message back to the sender for each 256 consumed message. A counter at the sending end keeps track of the sent, not yet acknowledged messages, and blocks the sender if this number ever reaches 512 unacknowledged messages. When the missing acknowledge arrives, the socket is then woken up for renewed transmission. This works well for keeping the message flow running, as it almost never happens that a sender socket is blocked this way. A problem with the current mechanism is that it potentially is very memory consuming. Since we don't distinguish between small and large messages, we have to dimension the socket receive buffer according to a worst-case of both. I.e., the window size must be chosen large enough to sustain a reasonable throughput even for the smallest messages, while we must still consider a scenario where all messages are of maximum size. Hence, the current fix window size of 512 messages and a maximum message size of 66k results in a receive buffer of 66 MB when truesize(66k) = 131k is taken into account. It is possible to do much better. This commit introduces an algorithm where we instead use 1024-byte blocks as base unit. This unit, always rounded upwards from the actual message size, is used when we advertise windows as well as when we count and acknowledge transmitted data. The advertised window is based on the configured receive buffer size in such a way that even the worst-case truesize/msgsize ratio always is covered. Since the smallest possible message size (from a flow control viewpoint) now is 1024 bytes, we can safely assume this ratio to be less than four, which is the value we are now using. This way, we have been able to reduce the default receive buffer size from 66 MB to 2 MB with maintained performance. In order to keep this solution backwards compatible, we introduce a new capability bit in the discovery protocol, and use this throughout the message sending/reception path to always select the right unit. Acked-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 60020e1 commit 10724cc

File tree

5 files changed

+122
-62
lines changed

5 files changed

+122
-62
lines changed

net/tipc/core.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,9 @@ static int __init tipc_init(void)
112112

113113
pr_info("Activated (version " TIPC_MOD_VER ")\n");
114114

115-
sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
116-
TIPC_LOW_IMPORTANCE;
117-
sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
118-
TIPC_CRITICAL_IMPORTANCE;
119-
sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
115+
sysctl_tipc_rmem[0] = RCVBUF_MIN;
116+
sysctl_tipc_rmem[1] = RCVBUF_DEF;
117+
sysctl_tipc_rmem[2] = RCVBUF_MAX;
120118

121119
err = tipc_netlink_start();
122120
if (err)

net/tipc/msg.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
743743
msg_set_bits(m, 9, 16, 0xffff, n);
744744
}
745745

746-
static inline u32 msg_bcast_tag(struct tipc_msg *m)
746+
static inline u32 msg_conn_ack(struct tipc_msg *m)
747747
{
748748
return msg_bits(m, 9, 16, 0xffff);
749749
}
750750

751-
static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
751+
static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
752752
{
753753
msg_set_bits(m, 9, 16, 0xffff, n);
754754
}
755755

756+
static inline u32 msg_adv_win(struct tipc_msg *m)
757+
{
758+
return msg_bits(m, 9, 0, 0xffff);
759+
}
760+
761+
static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
762+
{
763+
msg_set_bits(m, 9, 0, 0xffff, n);
764+
}
765+
756766
static inline u32 msg_max_pkt(struct tipc_msg *m)
757767
{
758768
return msg_bits(m, 9, 16, 0xffff) * 4;

net/tipc/node.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@
4545
/* Optional capabilities supported by this code version
4646
*/
4747
enum {
48-
TIPC_BCAST_SYNCH = (1 << 1)
48+
TIPC_BCAST_SYNCH = (1 << 1),
49+
TIPC_BLOCK_FLOWCTL = (2 << 1)
4950
};
5051

51-
#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
52+
#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
5253
#define INVALID_BEARER_ID -1
5354

5455
void tipc_node_stop(struct net *net);

net/tipc/socket.c

Lines changed: 92 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,11 @@ struct tipc_sock {
9696
uint conn_timeout;
9797
atomic_t dupl_rcvcnt;
9898
bool link_cong;
99-
uint sent_unacked;
100-
uint rcv_unacked;
99+
u16 snt_unacked;
100+
u16 snd_win;
101101
u16 peer_caps;
102+
u16 rcv_unacked;
103+
u16 rcv_win;
102104
struct sockaddr_tipc remote;
103105
struct rhash_head node;
104106
struct rcu_head rcu;
@@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
228230
return container_of(sk, struct tipc_sock, sk);
229231
}
230232

231-
static int tsk_conn_cong(struct tipc_sock *tsk)
233+
static bool tsk_conn_cong(struct tipc_sock *tsk)
232234
{
233-
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
235+
return tsk->snt_unacked >= tsk->snd_win;
236+
}
237+
238+
/* tsk_blocks(): translate a buffer size in bytes to number of
239+
* advertisable blocks, taking into account the ratio truesize(len)/len
240+
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
241+
*/
242+
static u16 tsk_adv_blocks(int len)
243+
{
244+
return len / FLOWCTL_BLK_SZ / 4;
245+
}
246+
247+
/* tsk_inc(): increment counter for sent or received data
248+
* - If block based flow control is not supported by peer we
249+
* fall back to message based ditto, incrementing the counter
250+
*/
251+
static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
252+
{
253+
if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
254+
return ((msglen / FLOWCTL_BLK_SZ) + 1);
255+
return 1;
234256
}
235257

236258
/**
@@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
378400
sk->sk_write_space = tipc_write_space;
379401
sk->sk_destruct = tipc_sock_destruct;
380402
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
381-
tsk->sent_unacked = 0;
382403
atomic_set(&tsk->dupl_rcvcnt, 0);
383404

405+
/* Start out with safe limits until we receive an advertised window */
406+
tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
407+
tsk->rcv_win = tsk->snd_win;
408+
384409
if (sock->state == SS_READY) {
385410
tsk_set_unreturnable(tsk, true);
386411
if (sock->type == SOCK_DGRAM)
@@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
776801
struct sock *sk = &tsk->sk;
777802
struct tipc_msg *hdr = buf_msg(skb);
778803
int mtyp = msg_type(hdr);
779-
int conn_cong;
804+
bool conn_cong;
780805

781806
/* Ignore if connection cannot be validated: */
782807
if (!tsk_peer_msg(tsk, hdr))
@@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
790815
return;
791816
} else if (mtyp == CONN_ACK) {
792817
conn_cong = tsk_conn_cong(tsk);
793-
tsk->sent_unacked -= msg_msgcnt(hdr);
818+
tsk->snt_unacked -= msg_conn_ack(hdr);
819+
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
820+
tsk->snd_win = msg_adv_win(hdr);
794821
if (conn_cong)
795822
sk->sk_write_space(sk);
796823
} else if (mtyp != CONN_PROBE_REPLY) {
@@ -1021,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
10211048
u32 dnode;
10221049
uint mtu, send, sent = 0;
10231050
struct iov_iter save;
1051+
int hlen = MIN_H_SIZE;
10241052

10251053
/* Handle implied connection establishment */
10261054
if (unlikely(dest)) {
10271055
rc = __tipc_sendmsg(sock, m, dsz);
1056+
hlen = msg_hdr_sz(mhdr);
10281057
if (dsz && (dsz == rc))
1029-
tsk->sent_unacked = 1;
1058+
tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
10301059
return rc;
10311060
}
10321061
if (dsz > (uint)INT_MAX)
@@ -1055,7 +1084,7 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
10551084
if (likely(!tsk_conn_cong(tsk))) {
10561085
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
10571086
if (likely(!rc)) {
1058-
tsk->sent_unacked++;
1087+
tsk->snt_unacked += tsk_inc(tsk, send + hlen);
10591088
sent += send;
10601089
if (sent == dsz)
10611090
return dsz;
@@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
11201149
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
11211150
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
11221151
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1152+
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1153+
return;
1154+
1155+
/* Fall back to message based flow control */
1156+
tsk->rcv_win = FLOWCTL_MSG_WIN;
1157+
tsk->snd_win = FLOWCTL_MSG_WIN;
11231158
}
11241159

11251160
/**
@@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
12161251
return 0;
12171252
}
12181253

1219-
static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1254+
static void tipc_sk_send_ack(struct tipc_sock *tsk)
12201255
{
12211256
struct net *net = sock_net(&tsk->sk);
12221257
struct sk_buff *skb = NULL;
@@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
12321267
if (!skb)
12331268
return;
12341269
msg = buf_msg(skb);
1235-
msg_set_msgcnt(msg, ack);
1270+
msg_set_conn_ack(msg, tsk->rcv_unacked);
1271+
tsk->rcv_unacked = 0;
1272+
1273+
/* Adjust to and advertize the correct window limit */
1274+
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1275+
tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1276+
msg_set_adv_win(msg, tsk->rcv_win);
1277+
}
12361278
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
12371279
}
12381280

@@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
12901332
long timeo;
12911333
unsigned int sz;
12921334
u32 err;
1293-
int res;
1335+
int res, hlen;
12941336

12951337
/* Catch invalid receive requests */
12961338
if (unlikely(!buf_len))
@@ -1315,6 +1357,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
13151357
buf = skb_peek(&sk->sk_receive_queue);
13161358
msg = buf_msg(buf);
13171359
sz = msg_data_sz(msg);
1360+
hlen = msg_hdr_sz(msg);
13181361
err = msg_errcode(msg);
13191362

13201363
/* Discard an empty non-errored message & try again */
@@ -1337,7 +1380,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
13371380
sz = buf_len;
13381381
m->msg_flags |= MSG_TRUNC;
13391382
}
1340-
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1383+
res = skb_copy_datagram_msg(buf, hlen, m, sz);
13411384
if (res)
13421385
goto exit;
13431386
res = sz;
@@ -1349,15 +1392,15 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
13491392
res = -ECONNRESET;
13501393
}
13511394

1352-
/* Consume received message (optional) */
1353-
if (likely(!(flags & MSG_PEEK))) {
1354-
if ((sock->state != SS_READY) &&
1355-
(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1356-
tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1357-
tsk->rcv_unacked = 0;
1358-
}
1359-
tsk_advance_rx_queue(sk);
1395+
if (unlikely(flags & MSG_PEEK))
1396+
goto exit;
1397+
1398+
if (likely(sock->state != SS_READY)) {
1399+
tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1400+
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1401+
tipc_sk_send_ack(tsk);
13601402
}
1403+
tsk_advance_rx_queue(sk);
13611404
exit:
13621405
release_sock(sk);
13631406
return res;
@@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
13861429
int sz_to_copy, target, needed;
13871430
int sz_copied = 0;
13881431
u32 err;
1389-
int res = 0;
1432+
int res = 0, hlen;
13901433

13911434
/* Catch invalid receive attempts */
13921435
if (unlikely(!buf_len))
@@ -1412,6 +1455,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
14121455
buf = skb_peek(&sk->sk_receive_queue);
14131456
msg = buf_msg(buf);
14141457
sz = msg_data_sz(msg);
1458+
hlen = msg_hdr_sz(msg);
14151459
err = msg_errcode(msg);
14161460

14171461
/* Discard an empty non-errored message & try again */
@@ -1436,8 +1480,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
14361480
needed = (buf_len - sz_copied);
14371481
sz_to_copy = (sz <= needed) ? sz : needed;
14381482

1439-
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1440-
m, sz_to_copy);
1483+
res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
14411484
if (res)
14421485
goto exit;
14431486

@@ -1459,20 +1502,18 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
14591502
res = -ECONNRESET;
14601503
}
14611504

1462-
/* Consume received message (optional) */
1463-
if (likely(!(flags & MSG_PEEK))) {
1464-
if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1465-
tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1466-
tsk->rcv_unacked = 0;
1467-
}
1468-
tsk_advance_rx_queue(sk);
1469-
}
1505+
if (unlikely(flags & MSG_PEEK))
1506+
goto exit;
1507+
1508+
tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1509+
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1510+
tipc_sk_send_ack(tsk);
1511+
tsk_advance_rx_queue(sk);
14701512

14711513
/* Loop around if more data is required */
14721514
if ((sz_copied < buf_len) && /* didn't get all requested data */
14731515
(!skb_queue_empty(&sk->sk_receive_queue) ||
14741516
(sz_copied < target)) && /* and more is ready or required */
1475-
(!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
14761517
(!err)) /* and haven't reached a FIN */
14771518
goto restart;
14781519

@@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
16041645
/**
16051646
* rcvbuf_limit - get proper overload limit of socket receive queue
16061647
* @sk: socket
1607-
* @buf: message
1648+
* @skb: message
16081649
*
1609-
* For all connection oriented messages, irrespective of importance,
1610-
* the default overload value (i.e. 67MB) is set as limit.
1650+
* For connection oriented messages, irrespective of importance,
1651+
* default queue limit is 2 MB.
16111652
*
1612-
* For all connectionless messages, by default new queue limits are
1613-
* as belows:
1653+
* For connectionless messages, queue limits are based on message
1654+
* importance as follows:
16141655
*
1615-
* TIPC_LOW_IMPORTANCE (4 MB)
1616-
* TIPC_MEDIUM_IMPORTANCE (8 MB)
1617-
* TIPC_HIGH_IMPORTANCE (16 MB)
1618-
* TIPC_CRITICAL_IMPORTANCE (32 MB)
1656+
* TIPC_LOW_IMPORTANCE (2 MB)
1657+
* TIPC_MEDIUM_IMPORTANCE (4 MB)
1658+
* TIPC_HIGH_IMPORTANCE (8 MB)
1659+
* TIPC_CRITICAL_IMPORTANCE (16 MB)
16191660
*
16201661
* Returns overload limit according to corresponding message importance
16211662
*/
1622-
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1663+
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
16231664
{
1624-
struct tipc_msg *msg = buf_msg(buf);
1665+
struct tipc_sock *tsk = tipc_sk(sk);
1666+
struct tipc_msg *hdr = buf_msg(skb);
1667+
1668+
if (unlikely(!msg_connected(hdr)))
1669+
return sk->sk_rcvbuf << msg_importance(hdr);
16251670

1626-
if (msg_connected(msg))
1627-
return sysctl_tipc_rmem[2];
1671+
if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1672+
return sk->sk_rcvbuf;
16281673

1629-
return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1630-
msg_importance(msg);
1674+
return FLOWCTL_MSG_LIM;
16311675
}
16321676

16331677
/**

net/tipc/socket.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* net/tipc/socket.h: Include file for TIPC socket code
22
*
3-
* Copyright (c) 2014-2015, Ericsson AB
3+
* Copyright (c) 2014-2016, Ericsson AB
44
* All rights reserved.
55
*
66
* Redistribution and use in source and binary forms, with or without
@@ -38,10 +38,17 @@
3838
#include <net/sock.h>
3939
#include <net/genetlink.h>
4040

41-
#define TIPC_CONNACK_INTV 256
42-
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
43-
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
44-
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
41+
/* Compatibility values for deprecated message based flow control */
42+
#define FLOWCTL_MSG_WIN 512
43+
#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
44+
45+
#define FLOWCTL_BLK_SZ 1024
46+
47+
/* Socket receive buffer sizes */
48+
#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
49+
#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
50+
#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
51+
4552
int tipc_socket_init(void);
4653
void tipc_socket_stop(void);
4754
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);

0 commit comments

Comments
 (0)