Skip to content

Commit 1f9ecd7

Browse files
sowminivdavem330
authored andcommitted
RDS: Pass rds_conn_path to rds_send_xmit()
Pass a struct rds_conn_path to rds_send_xmit so that MP capable transports can transmit packets on something other than c_path[0]. The eventual goal for MP capable transports is to hash the rds socket to a path based on the bound local address/port, and use this path as the argument to rds_send_xmit() Signed-off-by: Sowmini Varadhan <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent 780a6d9 commit 1f9ecd7

File tree

4 files changed

+87
-70
lines changed

4 files changed

+87
-70
lines changed

net/rds/ib_cm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
274274
if (rds_conn_up(conn) &&
275275
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
276276
test_bit(0, &conn->c_map_queued)))
277-
rds_send_xmit(ic->conn);
277+
rds_send_xmit(&ic->conn->c_path[0]);
278278
}
279279

280280
static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,

net/rds/rds.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,9 @@ struct rds_transport {
457457
int (*conn_connect)(struct rds_connection *conn);
458458
void (*conn_shutdown)(struct rds_connection *conn);
459459
void (*xmit_prepare)(struct rds_connection *conn);
460+
void (*xmit_path_prepare)(struct rds_conn_path *cp);
460461
void (*xmit_complete)(struct rds_connection *conn);
462+
void (*xmit_path_complete)(struct rds_conn_path *cp);
461463
int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
462464
unsigned int hdr_off, unsigned int sg, unsigned int off);
463465
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
@@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
780782
/* send.c */
781783
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
782784
void rds_send_reset(struct rds_connection *conn);
783-
int rds_send_xmit(struct rds_connection *conn);
785+
int rds_send_xmit(struct rds_conn_path *cp);
784786
struct sockaddr_in;
785787
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
786788
typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);

net/rds/send.c

Lines changed: 82 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -107,23 +107,23 @@ void rds_send_reset(struct rds_connection *conn)
107107
}
108108
EXPORT_SYMBOL_GPL(rds_send_reset);
109109

110-
static int acquire_in_xmit(struct rds_connection *conn)
110+
static int acquire_in_xmit(struct rds_conn_path *cp)
111111
{
112-
return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
112+
return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
113113
}
114114

115-
static void release_in_xmit(struct rds_connection *conn)
115+
static void release_in_xmit(struct rds_conn_path *cp)
116116
{
117-
clear_bit(RDS_IN_XMIT, &conn->c_flags);
117+
clear_bit(RDS_IN_XMIT, &cp->cp_flags);
118118
smp_mb__after_atomic();
119119
/*
120120
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
121121
* hot path and finding waiters is very rare. We don't want to walk
122122
* the system-wide hashed waitqueue buckets in the fast path only to
123123
* almost never find waiters.
124124
*/
125-
if (waitqueue_active(&conn->c_waitq))
126-
wake_up_all(&conn->c_waitq);
125+
if (waitqueue_active(&cp->cp_waitq))
126+
wake_up_all(&cp->cp_waitq);
127127
}
128128

129129
/*
@@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn)
140140
* - small message latency is higher behind queued large messages
141141
* - large message latency isn't starved by intervening small sends
142142
*/
143-
int rds_send_xmit(struct rds_connection *conn)
143+
int rds_send_xmit(struct rds_conn_path *cp)
144144
{
145+
struct rds_connection *conn = cp->cp_conn;
145146
struct rds_message *rm;
146147
unsigned long flags;
147148
unsigned int tmp;
@@ -161,7 +162,7 @@ int rds_send_xmit(struct rds_connection *conn)
161162
* avoids blocking the caller and trading per-connection data between
162163
* caches per message.
163164
*/
164-
if (!acquire_in_xmit(conn)) {
165+
if (!acquire_in_xmit(cp)) {
165166
rds_stats_inc(s_send_lock_contention);
166167
ret = -ENOMEM;
167168
goto out;
@@ -175,29 +176,33 @@ int rds_send_xmit(struct rds_connection *conn)
175176
* The acquire_in_xmit() check above ensures that only one
176177
* caller can increment c_send_gen at any time.
177178
*/
178-
conn->c_send_gen++;
179-
send_gen = conn->c_send_gen;
179+
cp->cp_send_gen++;
180+
send_gen = cp->cp_send_gen;
180181

181182
/*
182183
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
183184
* we do the opposite to avoid races.
184185
*/
185-
if (!rds_conn_up(conn)) {
186-
release_in_xmit(conn);
186+
if (!rds_conn_path_up(cp)) {
187+
release_in_xmit(cp);
187188
ret = 0;
188189
goto out;
189190
}
190191

191-
if (conn->c_trans->xmit_prepare)
192+
if (conn->c_trans->t_mp_capable) {
193+
if (conn->c_trans->xmit_path_prepare)
194+
conn->c_trans->xmit_path_prepare(cp);
195+
} else if (conn->c_trans->xmit_prepare) {
192196
conn->c_trans->xmit_prepare(conn);
197+
}
193198

194199
/*
195200
* spin trying to push headers and data down the connection until
196201
* the connection doesn't make forward progress.
197202
*/
198203
while (1) {
199204

200-
rm = conn->c_xmit_rm;
205+
rm = cp->cp_xmit_rm;
201206

202207
/*
203208
* If between sending messages, we can send a pending congestion
@@ -210,14 +215,16 @@ int rds_send_xmit(struct rds_connection *conn)
210215
break;
211216
}
212217
rm->data.op_active = 1;
218+
rm->m_inc.i_conn_path = cp;
219+
rm->m_inc.i_conn = cp->cp_conn;
213220

214-
conn->c_xmit_rm = rm;
221+
cp->cp_xmit_rm = rm;
215222
}
216223

217224
/*
218225
* If not already working on one, grab the next message.
219226
*
220-
* c_xmit_rm holds a ref while we're sending this message down
227+
* cp_xmit_rm holds a ref while we're sending this message down
221228
* the connction. We can use this ref while holding the
222229
* send_sem.. rds_send_reset() is serialized with it.
223230
*/
@@ -234,10 +241,10 @@ int rds_send_xmit(struct rds_connection *conn)
234241
if (batch_count >= send_batch_count)
235242
goto over_batch;
236243

237-
spin_lock_irqsave(&conn->c_lock, flags);
244+
spin_lock_irqsave(&cp->cp_lock, flags);
238245

239-
if (!list_empty(&conn->c_send_queue)) {
240-
rm = list_entry(conn->c_send_queue.next,
246+
if (!list_empty(&cp->cp_send_queue)) {
247+
rm = list_entry(cp->cp_send_queue.next,
241248
struct rds_message,
242249
m_conn_item);
243250
rds_message_addref(rm);
@@ -246,10 +253,11 @@ int rds_send_xmit(struct rds_connection *conn)
246253
* Move the message from the send queue to the retransmit
247254
* list right away.
248255
*/
249-
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
256+
list_move_tail(&rm->m_conn_item,
257+
&cp->cp_retrans);
250258
}
251259

252-
spin_unlock_irqrestore(&conn->c_lock, flags);
260+
spin_unlock_irqrestore(&cp->cp_lock, flags);
253261

254262
if (!rm)
255263
break;
@@ -263,32 +271,34 @@ int rds_send_xmit(struct rds_connection *conn)
263271
*/
264272
if (rm->rdma.op_active &&
265273
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
266-
spin_lock_irqsave(&conn->c_lock, flags);
274+
spin_lock_irqsave(&cp->cp_lock, flags);
267275
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
268276
list_move(&rm->m_conn_item, &to_be_dropped);
269-
spin_unlock_irqrestore(&conn->c_lock, flags);
277+
spin_unlock_irqrestore(&cp->cp_lock, flags);
270278
continue;
271279
}
272280

273281
/* Require an ACK every once in a while */
274282
len = ntohl(rm->m_inc.i_hdr.h_len);
275-
if (conn->c_unacked_packets == 0 ||
276-
conn->c_unacked_bytes < len) {
283+
if (cp->cp_unacked_packets == 0 ||
284+
cp->cp_unacked_bytes < len) {
277285
__set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
278286

279-
conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
280-
conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
287+
cp->cp_unacked_packets =
288+
rds_sysctl_max_unacked_packets;
289+
cp->cp_unacked_bytes =
290+
rds_sysctl_max_unacked_bytes;
281291
rds_stats_inc(s_send_ack_required);
282292
} else {
283-
conn->c_unacked_bytes -= len;
284-
conn->c_unacked_packets--;
293+
cp->cp_unacked_bytes -= len;
294+
cp->cp_unacked_packets--;
285295
}
286296

287-
conn->c_xmit_rm = rm;
297+
cp->cp_xmit_rm = rm;
288298
}
289299

290300
/* The transport either sends the whole rdma or none of it */
291-
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
301+
if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
292302
rm->m_final_op = &rm->rdma;
293303
/* The transport owns the mapped memory for now.
294304
* You can't unmap it while it's on the send queue
@@ -300,11 +310,11 @@ int rds_send_xmit(struct rds_connection *conn)
300310
wake_up_interruptible(&rm->m_flush_wait);
301311
break;
302312
}
303-
conn->c_xmit_rdma_sent = 1;
313+
cp->cp_xmit_rdma_sent = 1;
304314

305315
}
306316

307-
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
317+
if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
308318
rm->m_final_op = &rm->atomic;
309319
/* The transport owns the mapped memory for now.
310320
* You can't unmap it while it's on the send queue
@@ -316,7 +326,7 @@ int rds_send_xmit(struct rds_connection *conn)
316326
wake_up_interruptible(&rm->m_flush_wait);
317327
break;
318328
}
319-
conn->c_xmit_atomic_sent = 1;
329+
cp->cp_xmit_atomic_sent = 1;
320330

321331
}
322332

@@ -342,65 +352,70 @@ int rds_send_xmit(struct rds_connection *conn)
342352
rm->data.op_active = 0;
343353
}
344354

345-
if (rm->data.op_active && !conn->c_xmit_data_sent) {
355+
if (rm->data.op_active && !cp->cp_xmit_data_sent) {
346356
rm->m_final_op = &rm->data;
357+
347358
ret = conn->c_trans->xmit(conn, rm,
348-
conn->c_xmit_hdr_off,
349-
conn->c_xmit_sg,
350-
conn->c_xmit_data_off);
359+
cp->cp_xmit_hdr_off,
360+
cp->cp_xmit_sg,
361+
cp->cp_xmit_data_off);
351362
if (ret <= 0)
352363
break;
353364

354-
if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
365+
if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
355366
tmp = min_t(int, ret,
356367
sizeof(struct rds_header) -
357-
conn->c_xmit_hdr_off);
358-
conn->c_xmit_hdr_off += tmp;
368+
cp->cp_xmit_hdr_off);
369+
cp->cp_xmit_hdr_off += tmp;
359370
ret -= tmp;
360371
}
361372

362-
sg = &rm->data.op_sg[conn->c_xmit_sg];
373+
sg = &rm->data.op_sg[cp->cp_xmit_sg];
363374
while (ret) {
364375
tmp = min_t(int, ret, sg->length -
365-
conn->c_xmit_data_off);
366-
conn->c_xmit_data_off += tmp;
376+
cp->cp_xmit_data_off);
377+
cp->cp_xmit_data_off += tmp;
367378
ret -= tmp;
368-
if (conn->c_xmit_data_off == sg->length) {
369-
conn->c_xmit_data_off = 0;
379+
if (cp->cp_xmit_data_off == sg->length) {
380+
cp->cp_xmit_data_off = 0;
370381
sg++;
371-
conn->c_xmit_sg++;
372-
BUG_ON(ret != 0 &&
373-
conn->c_xmit_sg == rm->data.op_nents);
382+
cp->cp_xmit_sg++;
383+
BUG_ON(ret != 0 && cp->cp_xmit_sg ==
384+
rm->data.op_nents);
374385
}
375386
}
376387

377-
if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
378-
(conn->c_xmit_sg == rm->data.op_nents))
379-
conn->c_xmit_data_sent = 1;
388+
if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
389+
(cp->cp_xmit_sg == rm->data.op_nents))
390+
cp->cp_xmit_data_sent = 1;
380391
}
381392

382393
/*
383394
* A rm will only take multiple times through this loop
384395
* if there is a data op. Thus, if the data is sent (or there was
385396
* none), then we're done with the rm.
386397
*/
387-
if (!rm->data.op_active || conn->c_xmit_data_sent) {
388-
conn->c_xmit_rm = NULL;
389-
conn->c_xmit_sg = 0;
390-
conn->c_xmit_hdr_off = 0;
391-
conn->c_xmit_data_off = 0;
392-
conn->c_xmit_rdma_sent = 0;
393-
conn->c_xmit_atomic_sent = 0;
394-
conn->c_xmit_data_sent = 0;
398+
if (!rm->data.op_active || cp->cp_xmit_data_sent) {
399+
cp->cp_xmit_rm = NULL;
400+
cp->cp_xmit_sg = 0;
401+
cp->cp_xmit_hdr_off = 0;
402+
cp->cp_xmit_data_off = 0;
403+
cp->cp_xmit_rdma_sent = 0;
404+
cp->cp_xmit_atomic_sent = 0;
405+
cp->cp_xmit_data_sent = 0;
395406

396407
rds_message_put(rm);
397408
}
398409
}
399410

400411
over_batch:
401-
if (conn->c_trans->xmit_complete)
412+
if (conn->c_trans->t_mp_capable) {
413+
if (conn->c_trans->xmit_path_complete)
414+
conn->c_trans->xmit_path_complete(cp);
415+
} else if (conn->c_trans->xmit_complete) {
402416
conn->c_trans->xmit_complete(conn);
403-
release_in_xmit(conn);
417+
}
418+
release_in_xmit(cp);
404419

405420
/* Nuke any messages we decided not to retransmit. */
406421
if (!list_empty(&to_be_dropped)) {
@@ -428,12 +443,12 @@ int rds_send_xmit(struct rds_connection *conn)
428443
if (ret == 0) {
429444
smp_mb();
430445
if ((test_bit(0, &conn->c_map_queued) ||
431-
!list_empty(&conn->c_send_queue)) &&
432-
send_gen == conn->c_send_gen) {
446+
!list_empty(&cp->cp_send_queue)) &&
447+
send_gen == cp->cp_send_gen) {
433448
rds_stats_inc(s_send_lock_queue_raced);
434449
if (batch_count < send_batch_count)
435450
goto restart;
436-
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
451+
queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
437452
}
438453
}
439454
out:
@@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
11101125
*/
11111126
rds_stats_inc(s_send_queued);
11121127

1113-
ret = rds_send_xmit(conn);
1128+
ret = rds_send_xmit(cpath);
11141129
if (ret == -ENOMEM || ret == -EAGAIN)
1115-
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
1130+
queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
11161131

11171132
rds_message_put(rm);
11181133
return payload_len;

net/rds/threads.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work)
177177

178178
if (rds_conn_path_state(cp) == RDS_CONN_UP) {
179179
clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
180-
ret = rds_send_xmit(cp->cp_conn);
180+
ret = rds_send_xmit(cp);
181181
cond_resched();
182182
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
183183
switch (ret) {

0 commit comments

Comments
 (0)