8484//! the test suite passing (the suite is in libstd), and that's good enough for
8585//! me!
8686
87- use std:: c_str:: CString ;
8887use libc;
88+ use std:: c_str:: CString ;
89+ use std:: intrinsics;
90+ use std:: io;
8991use std:: os:: win32:: as_utf16_p;
92+ use std:: os;
9093use std:: ptr;
9194use std:: rt:: rtio;
9295use std:: sync:: arc:: UnsafeArc ;
93- use std:: intrinsics;
96+ use std:: sync:: atomics;
97+ use std:: unstable:: mutex;
9498
9599use super :: IoResult ;
96100use super :: c;
@@ -124,6 +128,20 @@ impl Drop for Event {
124128
125129struct Inner {
126130 handle : libc:: HANDLE ,
131+ lock : mutex:: NativeMutex ,
132+ read_closed : atomics:: AtomicBool ,
133+ write_closed : atomics:: AtomicBool ,
134+ }
135+
136+ impl Inner {
137+ fn new ( handle : libc:: HANDLE ) -> Inner {
138+ Inner {
139+ handle : handle,
140+ lock : unsafe { mutex:: NativeMutex :: new ( ) } ,
141+ read_closed : atomics:: AtomicBool :: new ( false ) ,
142+ write_closed : atomics:: AtomicBool :: new ( false ) ,
143+ }
144+ }
127145}
128146
129147impl Drop for Inner {
@@ -218,7 +236,7 @@ impl UnixStream {
218236 loop {
219237 match UnixStream :: try_connect ( p) {
220238 Some ( handle) => {
221- let inner = Inner { handle : handle } ;
239+ let inner = Inner :: new ( handle) ;
222240 let mut mode = libc:: PIPE_TYPE_BYTE |
223241 libc:: PIPE_READMODE_BYTE |
224242 libc:: PIPE_WAIT ;
@@ -275,6 +293,24 @@ impl UnixStream {
275293 }
276294
277295 fn handle ( & self ) -> libc:: HANDLE { unsafe { ( * self . inner . get ( ) ) . handle } }
296+
297+ fn read_closed ( & self ) -> bool {
298+ unsafe { ( * self . inner . get ( ) ) . read_closed . load ( atomics:: SeqCst ) }
299+ }
300+
301+ fn write_closed ( & self ) -> bool {
302+ unsafe { ( * self . inner . get ( ) ) . write_closed . load ( atomics:: SeqCst ) }
303+ }
304+
305+ fn cancel_io ( & self ) -> IoResult < ( ) > {
306+ match unsafe { c:: CancelIoEx ( self . handle ( ) , ptr:: mut_null ( ) ) } {
307+ 0 if os:: errno ( ) == libc:: ERROR_NOT_FOUND as uint => {
308+ Ok ( ( ) )
309+ }
310+ 0 => Err ( super :: last_error ( ) ) ,
311+ _ => Ok ( ( ) )
312+ }
313+ }
278314}
279315
280316impl rtio:: RtioPipe for UnixStream {
@@ -287,31 +323,60 @@ impl rtio::RtioPipe for UnixStream {
287323 let mut overlapped: libc:: OVERLAPPED = unsafe { intrinsics:: init ( ) } ;
288324 overlapped. hEvent = self . read . get_ref ( ) . handle ( ) ;
289325
326+ // Pre-flight check to see if the reading half has been closed. This
327+ // must be done before issuing the ReadFile request, but after we
328+ // acquire the lock.
329+ //
330+ // See comments in close_read() about why this lock is necessary.
331+ let guard = unsafe { ( * self . inner . get ( ) ) . lock . lock ( ) } ;
332+ if self . read_closed ( ) {
333+ return Err ( io:: standard_error ( io:: EndOfFile ) )
334+ }
335+
336+ // Issue a nonblocking requests, succeeding quickly if it happened to
337+ // succeed.
290338 let ret = unsafe {
291339 libc:: ReadFile ( self . handle ( ) ,
292340 buf. as_ptr ( ) as libc:: LPVOID ,
293341 buf. len ( ) as libc:: DWORD ,
294342 & mut bytes_read,
295343 & mut overlapped)
296344 } ;
297- if ret == 0 {
298- let err = unsafe { libc:: GetLastError ( ) } ;
299- if err == libc:: ERROR_IO_PENDING as libc:: DWORD {
300- let ret = unsafe {
301- libc:: GetOverlappedResult ( self . handle ( ) ,
302- & mut overlapped,
303- & mut bytes_read,
304- libc:: TRUE )
305- } ;
306- if ret == 0 {
307- return Err ( super :: last_error ( ) )
308- }
309- } else {
345+ if ret != 0 { return Ok ( bytes_read as uint ) }
346+
347+ // If our errno doesn't say that the I/O is pending, then we hit some
348+ // legitimate error and reeturn immediately.
349+ if os:: errno ( ) != libc:: ERROR_IO_PENDING as uint {
350+ return Err ( super :: last_error ( ) )
351+ }
352+
353+ // Now that we've issued a successful nonblocking request, we need to
354+ // wait for it to finish. This can all be done outside the lock because
355+ // we'll see any invocation of CancelIoEx. We also call this in a loop
356+ // because we're woken up if the writing half is closed, we just need to
357+ // realize that the reading half wasn't closed and we go right back to
358+ // sleep.
359+ drop ( guard) ;
360+ loop {
361+ let ret = unsafe {
362+ libc:: GetOverlappedResult ( self . handle ( ) ,
363+ & mut overlapped,
364+ & mut bytes_read,
365+ libc:: TRUE )
366+ } ;
367+ // If we succeeded, or we failed for some reason other than
368+ // CancelIoEx, return immediately
369+ if ret != 0 { return Ok ( bytes_read as uint ) }
370+ if os:: errno ( ) != libc:: ERROR_OPERATION_ABORTED as uint {
310371 return Err ( super :: last_error ( ) )
311372 }
312- }
313373
314- Ok ( bytes_read as uint )
374+ // If the reading half is now closed, then we're done. If we woke up
375+ // because the writing half was closed, keep trying.
376+ if self . read_closed ( ) {
377+ return Err ( io:: standard_error ( io:: EndOfFile ) )
378+ }
379+ }
315380 }
316381
317382 fn write ( & mut self , buf : & [ u8 ] ) -> IoResult < ( ) > {
@@ -325,27 +390,47 @@ impl rtio::RtioPipe for UnixStream {
325390
326391 while offset < buf. len ( ) {
327392 let mut bytes_written = 0 ;
393+
394+ // This sequence below is quite similar to the one found in read().
395+ // Some careful looping is done to ensure that if close_write() is
396+ // invoked we bail out early, and if close_read() is invoked we keep
397+ // going after we woke up.
398+ //
399+ // See comments in close_read() about why this lock is necessary.
400+ let guard = unsafe { ( * self . inner . get ( ) ) . lock . lock ( ) } ;
401+ if self . write_closed ( ) {
402+ return Err ( io:: standard_error ( io:: BrokenPipe ) )
403+ }
328404 let ret = unsafe {
329405 libc:: WriteFile ( self . handle ( ) ,
330406 buf. slice_from ( offset) . as_ptr ( ) as libc:: LPVOID ,
331407 ( buf. len ( ) - offset) as libc:: DWORD ,
332408 & mut bytes_written,
333409 & mut overlapped)
334410 } ;
411+ drop ( guard) ;
412+
335413 if ret == 0 {
336- let err = unsafe { libc:: GetLastError ( ) } ;
337- if err == libc:: ERROR_IO_PENDING as libc:: DWORD {
338- let ret = unsafe {
339- libc:: GetOverlappedResult ( self . handle ( ) ,
340- & mut overlapped,
341- & mut bytes_written,
342- libc:: TRUE )
343- } ;
344- if ret == 0 {
414+ if os:: errno ( ) != libc:: ERROR_IO_PENDING as uint {
415+ return Err ( super :: last_error ( ) )
416+ }
417+ let ret = unsafe {
418+ libc:: GetOverlappedResult ( self . handle ( ) ,
419+ & mut overlapped,
420+ & mut bytes_written,
421+ libc:: TRUE )
422+ } ;
423+ // If we weren't aborted, this was a legit error, if we were
424+ // aborted, then check to see if the write half was actually
425+ // closed or whether we woke up from the read half closing.
426+ if ret == 0 {
427+ if os:: errno ( ) != libc:: ERROR_OPERATION_ABORTED as uint {
345428 return Err ( super :: last_error ( ) )
346429 }
347- } else {
348- return Err ( super :: last_error ( ) )
430+ if self . write_closed ( ) {
431+ return Err ( io:: standard_error ( io:: BrokenPipe ) )
432+ }
433+ continue ; // retry
349434 }
350435 }
351436 offset += bytes_written as uint ;
@@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream {
360445 write : None ,
361446 } as Box < rtio:: RtioPipe : Send >
362447 }
448+
449+ fn close_read ( & mut self ) -> IoResult < ( ) > {
450+ // On windows, there's no actual shutdown() method for pipes, so we're
451+ // forced to emulate the behavior manually at the application level. To
452+ // do this, we need to both cancel any pending requests, as well as
453+ // prevent all future requests from succeeding. These two operations are
454+ // not atomic with respect to one another, so we must use a lock to do
455+ // so.
456+ //
457+ // The read() code looks like:
458+ //
459+ // 1. Make sure the pipe is still open
460+ // 2. Submit a read request
461+ // 3. Wait for the read request to finish
462+ //
463+ // The race this lock is preventing is if another thread invokes
464+ // close_read() between steps 1 and 2. By atomically executing steps 1
465+ // and 2 with a lock with respect to close_read(), we're guaranteed that
466+ // no thread will erroneously sit in a read forever.
467+ let _guard = unsafe { ( * self . inner . get ( ) ) . lock . lock ( ) } ;
468+ unsafe { ( * self . inner . get ( ) ) . read_closed . store ( true , atomics:: SeqCst ) }
469+ self . cancel_io ( )
470+ }
471+
472+ fn close_write ( & mut self ) -> IoResult < ( ) > {
473+ // see comments in close_read() for why this lock is necessary
474+ let _guard = unsafe { ( * self . inner . get ( ) ) . lock . lock ( ) } ;
475+ unsafe { ( * self . inner . get ( ) ) . write_closed . store ( true , atomics:: SeqCst ) }
476+ self . cancel_io ( )
477+ }
363478}
364479
365480////////////////////////////////////////////////////////////////////////////////
@@ -520,7 +635,7 @@ impl UnixAcceptor {
520635
521636 // Transfer ownership of our handle into this stream
522637 Ok ( UnixStream {
523- inner : UnsafeArc :: new ( Inner { handle : handle } ) ,
638+ inner : UnsafeArc :: new ( Inner :: new ( handle) ) ,
524639 read : None ,
525640 write : None ,
526641 } )
0 commit comments