@@ -47,16 +47,59 @@ static void smc_rx_wake_up(struct sock *sk)
4747 * @conn connection to update
4848 * @cons consumer cursor
4949 * @len number of Bytes consumed
50+ * Returns:
51+ * 1 if we should end our receive, 0 otherwise
5052 */
51- static void smc_rx_update_consumer (struct smc_connection * conn ,
52- union smc_host_cursor cons , size_t len )
53+ static int smc_rx_update_consumer (struct smc_sock * smc ,
54+ union smc_host_cursor cons , size_t len )
5355{
56+ struct smc_connection * conn = & smc -> conn ;
57+ struct sock * sk = & smc -> sk ;
58+ bool force = false;
59+ int diff , rc = 0 ;
60+
5461 smc_curs_add (conn -> rmb_desc -> len , & cons , len );
62+
63+ /* did we process urgent data? */
64+ if (conn -> urg_state == SMC_URG_VALID || conn -> urg_rx_skip_pend ) {
65+ diff = smc_curs_comp (conn -> rmb_desc -> len , & cons ,
66+ & conn -> urg_curs );
67+ if (sock_flag (sk , SOCK_URGINLINE )) {
68+ if (diff == 0 ) {
69+ force = true;
70+ rc = 1 ;
71+ conn -> urg_state = SMC_URG_READ ;
72+ }
73+ } else {
74+ if (diff == 1 ) {
75+ /* skip urgent byte */
76+ force = true;
77+ smc_curs_add (conn -> rmb_desc -> len , & cons , 1 );
78+ conn -> urg_rx_skip_pend = false;
79+ } else if (diff < -1 )
80+ /* we read past urgent byte */
81+ conn -> urg_state = SMC_URG_READ ;
82+ }
83+ }
84+
5585 smc_curs_write (& conn -> local_tx_ctrl .cons , smc_curs_read (& cons , conn ),
5686 conn );
87+
5788 /* send consumer cursor update if required */
5889 /* similar to advertising new TCP rcv_wnd if required */
59- smc_tx_consumer_update (conn );
90+ smc_tx_consumer_update (conn , force );
91+
92+ return rc ;
93+ }
94+
95+ static void smc_rx_update_cons (struct smc_sock * smc , size_t len )
96+ {
97+ struct smc_connection * conn = & smc -> conn ;
98+ union smc_host_cursor cons ;
99+
100+ smc_curs_write (& cons , smc_curs_read (& conn -> local_tx_ctrl .cons , conn ),
101+ conn );
102+ smc_rx_update_consumer (smc , cons , len );
60103}
61104
62105struct smc_spd_priv {
@@ -70,7 +113,6 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
70113 struct smc_spd_priv * priv = (struct smc_spd_priv * )buf -> private ;
71114 struct smc_sock * smc = priv -> smc ;
72115 struct smc_connection * conn ;
73- union smc_host_cursor cons ;
74116 struct sock * sk = & smc -> sk ;
75117
76118 if (sk -> sk_state == SMC_CLOSED ||
@@ -79,9 +121,7 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
79121 goto out ;
80122 conn = & smc -> conn ;
81123 lock_sock (sk );
82- smc_curs_write (& cons , smc_curs_read (& conn -> local_tx_ctrl .cons , conn ),
83- conn );
84- smc_rx_update_consumer (conn , cons , priv -> len );
124+ smc_rx_update_cons (smc , priv -> len );
85125 release_sock (sk );
86126 if (atomic_sub_and_test (priv -> len , & conn -> splice_pending ))
87127 smc_rx_wake_up (sk );
@@ -184,6 +224,52 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
184224 return rc ;
185225}
186226
227+ static int smc_rx_recv_urg (struct smc_sock * smc , struct msghdr * msg , int len ,
228+ int flags )
229+ {
230+ struct smc_connection * conn = & smc -> conn ;
231+ union smc_host_cursor cons ;
232+ struct sock * sk = & smc -> sk ;
233+ int rc = 0 ;
234+
235+ if (sock_flag (sk , SOCK_URGINLINE ) ||
236+ !(conn -> urg_state == SMC_URG_VALID ) ||
237+ conn -> urg_state == SMC_URG_READ )
238+ return - EINVAL ;
239+
240+ if (conn -> urg_state == SMC_URG_VALID ) {
241+ if (!(flags & MSG_PEEK ))
242+ smc -> conn .urg_state = SMC_URG_READ ;
243+ msg -> msg_flags |= MSG_OOB ;
244+ if (len > 0 ) {
245+ if (!(flags & MSG_TRUNC ))
246+ rc = memcpy_to_msg (msg , & conn -> urg_rx_byte , 1 );
247+ len = 1 ;
248+ smc_curs_write (& cons ,
249+ smc_curs_read (& conn -> local_tx_ctrl .cons ,
250+ conn ),
251+ conn );
252+ if (smc_curs_diff (conn -> rmb_desc -> len , & cons ,
253+ & conn -> urg_curs ) > 1 )
254+ conn -> urg_rx_skip_pend = true;
255+ /* Urgent Byte was already accounted for, but trigger
256+ * skipping the urgent byte in non-inline case
257+ */
258+ if (!(flags & MSG_PEEK ))
259+ smc_rx_update_consumer (smc , cons , 0 );
260+ } else {
261+ msg -> msg_flags |= MSG_TRUNC ;
262+ }
263+
264+ return rc ? - EFAULT : len ;
265+ }
266+
267+ if (sk -> sk_state == SMC_CLOSED || sk -> sk_shutdown & RCV_SHUTDOWN )
268+ return 0 ;
269+
270+ return - EAGAIN ;
271+ }
272+
187273/* smc_rx_recvmsg - receive data from RMBE
188274 * @msg: copy data to receive buffer
189275 * @pipe: copy data to pipe if set - indicates splice() call
@@ -209,12 +295,12 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
209295
210296 if (unlikely (flags & MSG_ERRQUEUE ))
211297 return - EINVAL ; /* future work for sk.sk_family == AF_SMC */
212- if (flags & MSG_OOB )
213- return - EINVAL ; /* future work */
214298
215299 sk = & smc -> sk ;
216300 if (sk -> sk_state == SMC_LISTEN )
217301 return - ENOTCONN ;
302+ if (flags & MSG_OOB )
303+ return smc_rx_recv_urg (smc , msg , len , flags );
218304 timeo = sock_rcvtimeo (sk , flags & MSG_DONTWAIT );
219305 target = sock_rcvlowat (sk , flags & MSG_WAITALL , len );
220306
@@ -227,6 +313,9 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
227313
228314 if (atomic_read (& conn -> bytes_to_rcv ))
229315 goto copy ;
316+ else if (conn -> urg_state == SMC_URG_VALID )
317+ /* we received a single urgent Byte - skip */
318+ smc_rx_update_cons (smc , 0 );
230319
231320 if (sk -> sk_shutdown & RCV_SHUTDOWN ||
232321 smc_cdc_rxed_any_close_or_senddone (conn ) ||
@@ -281,14 +370,18 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
281370 continue ;
282371 }
283372
284- /* not more than what user space asked for */
285- copylen = min_t (size_t , read_remaining , readable );
286373 smc_curs_write (& cons ,
287374 smc_curs_read (& conn -> local_tx_ctrl .cons , conn ),
288375 conn );
289376 /* subsequent splice() calls pick up where previous left */
290377 if (splbytes )
291378 smc_curs_add (conn -> rmb_desc -> len , & cons , splbytes );
379+ if (conn -> urg_state == SMC_URG_VALID &&
380+ sock_flag (& smc -> sk , SOCK_URGINLINE ) &&
381+ readable > 1 )
382+ readable -- ; /* always stop at urgent Byte */
383+ /* not more than what user space asked for */
384+ copylen = min_t (size_t , read_remaining , readable );
292385 /* determine chunks where to read from rcvbuf */
293386 /* either unwrapped case, or 1st chunk of wrapped case */
294387 chunk_len = min_t (size_t , copylen , conn -> rmb_desc -> len -
@@ -333,8 +426,8 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
333426 atomic_sub (copylen , & conn -> bytes_to_rcv );
334427 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
335428 smp_mb__after_atomic ();
336- if (msg )
337- smc_rx_update_consumer ( conn , cons , copylen ) ;
429+ if (msg && smc_rx_update_consumer ( smc , cons , copylen ) )
430+ goto out ;
338431 }
339432 } while (read_remaining );
340433out :
@@ -346,4 +439,5 @@ void smc_rx_init(struct smc_sock *smc)
346439{
347440 smc -> sk .sk_data_ready = smc_rx_wake_up ;
348441 atomic_set (& smc -> conn .splice_pending , 0 );
442+ smc -> conn .urg_state = SMC_URG_READ ;
349443}
0 commit comments