Skip to content

Commit f64b78f

Browse files
longlimsftsmfrench
authored andcommitted
CIFS: SMBD: Implement function to receive data via RDMA receive
On the receive path, the transport maintains receive buffers and a reassembly queue for transferring payload via RDMA recv. There is data copy in the transport on recv when it copies the payload to upper layer. The transport recognizes the RFC1002 header length use in the SMB upper layer payloads in CIFS. Because this length is mainly used for TCP and not applicable to RDMA, it is handled as a out-of-band information and is never sent over the wire, and the trasnport behaves like TCP to upper layer by processing and exposing the length correctly on data payloads. Signed-off-by: Long Li <[email protected]> Signed-off-by: Steve French <[email protected]> Reviewed-by: Pavel Shilovsky <[email protected]> Reviewed-by: Ronnie Sahlberg <[email protected]>
1 parent 09902f8 commit f64b78f

File tree

2 files changed

+235
-0
lines changed

2 files changed

+235
-0
lines changed

fs/cifs/smbdirect.c

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* the GNU General Public License for more details.
1515
*/
1616
#include <linux/module.h>
17+
#include <linux/highmem.h>
1718
#include "smbdirect.h"
1819
#include "cifs_debug.h"
1920

@@ -178,6 +179,8 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
178179

179180
log_rdma_event(INFO, "wait for all recv to finish\n");
180181
wake_up_interruptible(&info->wait_reassembly_queue);
182+
wait_event(info->wait_smbd_recv_pending,
183+
info->smbd_recv_pending == 0);
181184

182185
log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
183186
wait_event(info->wait_send_pending,
@@ -1649,6 +1652,9 @@ struct smbd_connection *_smbd_get_connection(
16491652
queue_delayed_work(info->workqueue, &info->idle_timer_work,
16501653
info->keep_alive_interval*HZ);
16511654

1655+
init_waitqueue_head(&info->wait_smbd_recv_pending);
1656+
info->smbd_recv_pending = 0;
1657+
16521658
init_waitqueue_head(&info->wait_send_pending);
16531659
atomic_set(&info->send_pending, 0);
16541660

@@ -1715,3 +1721,225 @@ struct smbd_connection *smbd_get_connection(
17151721
}
17161722
return ret;
17171723
}
1724+
1725+
/*
1726+
* Receive data from receive reassembly queue
1727+
* All the incoming data packets are placed in reassembly queue
1728+
* buf: the buffer to read data into
1729+
* size: the length of data to read
1730+
* return value: actual data read
1731+
* Note: this implementation copies the data from reassebmly queue to receive
1732+
* buffers used by upper layer. This is not the optimal code path. A better way
1733+
* to do it is to not have upper layer allocate its receive buffers but rather
1734+
* borrow the buffer from reassembly queue, and return it after data is
1735+
* consumed. But this will require more changes to upper layer code, and also
1736+
* need to consider packet boundaries while they still being reassembled.
1737+
*/
1738+
int smbd_recv_buf(struct smbd_connection *info, char *buf, unsigned int size)
1739+
{
1740+
struct smbd_response *response;
1741+
struct smbd_data_transfer *data_transfer;
1742+
int to_copy, to_read, data_read, offset;
1743+
u32 data_length, remaining_data_length, data_offset;
1744+
int rc;
1745+
unsigned long flags;
1746+
1747+
again:
1748+
if (info->transport_status != SMBD_CONNECTED) {
1749+
log_read(ERR, "disconnected\n");
1750+
return -ENODEV;
1751+
}
1752+
1753+
/*
1754+
* No need to hold the reassembly queue lock all the time as we are
1755+
* the only one reading from the front of the queue. The transport
1756+
* may add more entries to the back of the queue at the same time
1757+
*/
1758+
log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1759+
info->reassembly_data_length);
1760+
if (info->reassembly_data_length >= size) {
1761+
int queue_length;
1762+
int queue_removed = 0;
1763+
1764+
/*
1765+
* Need to make sure reassembly_data_length is read before
1766+
* reading reassembly_queue_length and calling
1767+
* _get_first_reassembly. This call is lock free
1768+
* as we never read at the end of the queue which are being
1769+
* updated in SOFTIRQ as more data is received
1770+
*/
1771+
virt_rmb();
1772+
queue_length = info->reassembly_queue_length;
1773+
data_read = 0;
1774+
to_read = size;
1775+
offset = info->first_entry_offset;
1776+
while (data_read < size) {
1777+
response = _get_first_reassembly(info);
1778+
data_transfer = smbd_response_payload(response);
1779+
data_length = le32_to_cpu(data_transfer->data_length);
1780+
remaining_data_length =
1781+
le32_to_cpu(
1782+
data_transfer->remaining_data_length);
1783+
data_offset = le32_to_cpu(data_transfer->data_offset);
1784+
1785+
/*
1786+
* The upper layer expects RFC1002 length at the
1787+
* beginning of the payload. Return it to indicate
1788+
* the total length of the packet. This minimize the
1789+
* change to upper layer packet processing logic. This
1790+
* will be eventually remove when an intermediate
1791+
* transport layer is added
1792+
*/
1793+
if (response->first_segment && size == 4) {
1794+
unsigned int rfc1002_len =
1795+
data_length + remaining_data_length;
1796+
*((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1797+
data_read = 4;
1798+
response->first_segment = false;
1799+
log_read(INFO, "returning rfc1002 length %d\n",
1800+
rfc1002_len);
1801+
goto read_rfc1002_done;
1802+
}
1803+
1804+
to_copy = min_t(int, data_length - offset, to_read);
1805+
memcpy(
1806+
buf + data_read,
1807+
(char *)data_transfer + data_offset + offset,
1808+
to_copy);
1809+
1810+
/* move on to the next buffer? */
1811+
if (to_copy == data_length - offset) {
1812+
queue_length--;
1813+
/*
1814+
* No need to lock if we are not at the
1815+
* end of the queue
1816+
*/
1817+
if (!queue_length)
1818+
spin_lock_irqsave(
1819+
&info->reassembly_queue_lock,
1820+
flags);
1821+
list_del(&response->list);
1822+
queue_removed++;
1823+
if (!queue_length)
1824+
spin_unlock_irqrestore(
1825+
&info->reassembly_queue_lock,
1826+
flags);
1827+
1828+
info->count_reassembly_queue--;
1829+
info->count_dequeue_reassembly_queue++;
1830+
put_receive_buffer(info, response);
1831+
offset = 0;
1832+
log_read(INFO, "put_receive_buffer offset=0\n");
1833+
} else
1834+
offset += to_copy;
1835+
1836+
to_read -= to_copy;
1837+
data_read += to_copy;
1838+
1839+
log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1840+
"data_transfer_length-offset=%d after that "
1841+
"to_read=%d data_read=%d offset=%d\n",
1842+
to_copy, data_length - offset,
1843+
to_read, data_read, offset);
1844+
}
1845+
1846+
spin_lock_irqsave(&info->reassembly_queue_lock, flags);
1847+
info->reassembly_data_length -= data_read;
1848+
info->reassembly_queue_length -= queue_removed;
1849+
spin_unlock_irqrestore(&info->reassembly_queue_lock, flags);
1850+
1851+
info->first_entry_offset = offset;
1852+
log_read(INFO, "returning to thread data_read=%d "
1853+
"reassembly_data_length=%d first_entry_offset=%d\n",
1854+
data_read, info->reassembly_data_length,
1855+
info->first_entry_offset);
1856+
read_rfc1002_done:
1857+
return data_read;
1858+
}
1859+
1860+
log_read(INFO, "wait_event on more data\n");
1861+
rc = wait_event_interruptible(
1862+
info->wait_reassembly_queue,
1863+
info->reassembly_data_length >= size ||
1864+
info->transport_status != SMBD_CONNECTED);
1865+
/* Don't return any data if interrupted */
1866+
if (rc)
1867+
return -ENODEV;
1868+
1869+
goto again;
1870+
}
1871+
1872+
/*
1873+
* Receive a page from receive reassembly queue
1874+
* page: the page to read data into
1875+
* to_read: the length of data to read
1876+
* return value: actual data read
1877+
*/
1878+
int smbd_recv_page(struct smbd_connection *info,
1879+
struct page *page, unsigned int to_read)
1880+
{
1881+
int ret;
1882+
char *to_address;
1883+
1884+
/* make sure we have the page ready for read */
1885+
ret = wait_event_interruptible(
1886+
info->wait_reassembly_queue,
1887+
info->reassembly_data_length >= to_read ||
1888+
info->transport_status != SMBD_CONNECTED);
1889+
if (ret)
1890+
return 0;
1891+
1892+
/* now we can read from reassembly queue and not sleep */
1893+
to_address = kmap_atomic(page);
1894+
1895+
log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
1896+
page, to_address, to_read);
1897+
1898+
ret = smbd_recv_buf(info, to_address, to_read);
1899+
kunmap_atomic(to_address);
1900+
1901+
return ret;
1902+
}
1903+
1904+
/*
1905+
* Receive data from transport
1906+
* msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
1907+
* return: total bytes read, or 0. SMB Direct will not do partial read.
1908+
*/
1909+
int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
1910+
{
1911+
char *buf;
1912+
struct page *page;
1913+
unsigned int to_read;
1914+
int rc;
1915+
1916+
info->smbd_recv_pending++;
1917+
1918+
switch (msg->msg_iter.type) {
1919+
case READ | ITER_KVEC:
1920+
buf = msg->msg_iter.kvec->iov_base;
1921+
to_read = msg->msg_iter.kvec->iov_len;
1922+
rc = smbd_recv_buf(info, buf, to_read);
1923+
break;
1924+
1925+
case READ | ITER_BVEC:
1926+
page = msg->msg_iter.bvec->bv_page;
1927+
to_read = msg->msg_iter.bvec->bv_len;
1928+
rc = smbd_recv_page(info, page, to_read);
1929+
break;
1930+
1931+
default:
1932+
/* It's a bug in upper layer to get there */
1933+
cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
1934+
msg->msg_iter.type);
1935+
rc = -EIO;
1936+
}
1937+
1938+
info->smbd_recv_pending--;
1939+
wake_up(&info->wait_smbd_recv_pending);
1940+
1941+
/* SMBDirect will read it all or nothing */
1942+
if (rc > 0)
1943+
msg->msg_iter.count = 0;
1944+
return rc;
1945+
}

fs/cifs/smbdirect.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ struct smbd_connection {
9191
int fragment_reassembly_remaining;
9292

9393
/* Activity accoutning */
94+
/* Pending reqeusts issued from upper layer */
95+
int smbd_recv_pending;
96+
wait_queue_head_t wait_smbd_recv_pending;
9497

9598
atomic_t send_pending;
9699
wait_queue_head_t wait_send_pending;
@@ -252,13 +255,17 @@ int smbd_reconnect(struct TCP_Server_Info *server);
252255
/* Destroy SMBDirect session */
253256
void smbd_destroy(struct smbd_connection *info);
254257

258+
/* Interface for carrying upper layer I/O through send/recv */
259+
int smbd_recv(struct smbd_connection *info, struct msghdr *msg);
260+
255261
#else
256262
#define cifs_rdma_enabled(server) 0
257263
struct smbd_connection {};
258264
static inline void *smbd_get_connection(
259265
struct TCP_Server_Info *server, struct sockaddr *dstaddr) {return NULL;}
260266
static inline int smbd_reconnect(struct TCP_Server_Info *server) {return -1; }
261267
static inline void smbd_destroy(struct smbd_connection *info) {}
268+
static inline int smbd_recv(struct smbd_connection *info, struct msghdr *msg) {return -1; }
262269
#endif
263270

264271
#endif

0 commit comments

Comments
 (0)