@@ -25,6 +25,7 @@ use rt::io::{standard_error, OtherIoError};
2525use  rt:: tube:: Tube ; 
2626use  rt:: local:: Local ; 
2727use  unstable:: sync:: { Exclusive ,  exclusive} ; 
28+ use  rt:: uv:: net:: uv_ip4_to_ip4; 
2829
2930#[ cfg( test) ]   use  container:: Container ; 
3031#[ cfg( test) ]   use  uint; 
@@ -260,6 +261,24 @@ impl IoFactory for UvIoFactory {
260261            } 
261262        } 
262263    } 
264+ 
265+     fn  udp_bind ( & mut  self ,  addr :  IpAddr )  -> Result < ~RtioUdpSocketObject ,  IoError >  { 
266+         let  mut  watcher = UdpWatcher :: new ( self . uv_loop ( ) ) ; 
267+         match  watcher. bind ( addr)  { 
268+             Ok ( _)  => Ok ( ~UvUdpSocket  {  watcher :  watcher } ) , 
269+             Err ( uverr)  => { 
270+                 let  scheduler = Local :: take :: < Scheduler > ( ) ; 
271+                 do scheduler. deschedule_running_task_and_then  |_,  task| { 
272+                     let  task_cell = Cell :: new ( task) ; 
273+                     do watcher. close   { 
274+                         let  scheduler = Local :: take :: < Scheduler > ( ) ; 
275+                         scheduler. resume_task_immediately ( task_cell. take ( ) ) ; 
276+                     } 
277+                 } 
278+                 Err ( uv_error_to_io_error ( uverr) ) 
279+             } 
280+         } 
281+     } 
263282} 
264283
265284// FIXME #6090: Prefer newtype structs but Drop doesn't work 
@@ -358,7 +377,7 @@ impl Drop for UvTcpStream {
358377} 
359378
360379impl  RtioTcpStream  for  UvTcpStream  { 
361-     fn  read ( & mut   self ,  buf :  & mut  [ u8 ] )  -> Result < uint ,  IoError >  { 
380+     fn  read ( & self ,  buf :  & mut  [ u8 ] )  -> Result < uint ,  IoError >  { 
362381        let  result_cell = Cell :: new_empty ( ) ; 
363382        let  result_cell_ptr:  * Cell < Result < uint ,  IoError > >  = & result_cell; 
364383
@@ -403,7 +422,7 @@ impl RtioTcpStream for UvTcpStream {
403422        return  result_cell. take ( ) ; 
404423    } 
405424
406-     fn  write ( & mut   self ,  buf :  & [ u8 ] )  -> Result < ( ) ,  IoError >  { 
425+     fn  write ( & self ,  buf :  & [ u8 ] )  -> Result < ( ) ,  IoError >  { 
407426        let  result_cell = Cell :: new_empty ( ) ; 
408427        let  result_cell_ptr:  * Cell < Result < ( ) ,  IoError > >  = & result_cell; 
409428        let  scheduler = Local :: take :: < Scheduler > ( ) ; 
@@ -433,23 +452,21 @@ impl RtioTcpStream for UvTcpStream {
433452    } 
434453} 
435454
436- pub  struct  UvUdpStream  { 
437-     watcher :  UdpWatcher , 
438-     address :  IpAddr 
455+ pub  struct  UvUdpSocket  { 
456+     watcher :  UdpWatcher 
439457} 
440458
441- impl  UvUdpStream  { 
459+ impl  UvUdpSocket  { 
442460    fn  watcher ( & self )  -> UdpWatcher  {  self . watcher  } 
443-     fn  address ( & self )  -> IpAddr  {  self . address  } 
444461} 
445462
446- impl  Drop  for  UvUdpStream  { 
463+ impl  Drop  for  UvUdpSocket  { 
447464    fn  finalize ( & self )  { 
448-         rtdebug ! ( "closing udp stream " ) ; 
465+         rtdebug ! ( "closing udp socket " ) ; 
449466        let  watcher = self . watcher ( ) ; 
450467        let  scheduler = Local :: take :: < Scheduler > ( ) ; 
451468        do scheduler. deschedule_running_task_and_then  |_,  task| { 
452-             let  task_cell = Cell ( task) ; 
469+             let  task_cell = Cell :: new ( task) ; 
453470            do watcher. close   { 
454471                let  scheduler = Local :: take :: < Scheduler > ( ) ; 
455472                scheduler. resume_task_immediately ( task_cell. take ( ) ) ; 
@@ -458,40 +475,31 @@ impl Drop for UvUdpStream {
458475    } 
459476} 
460477
461- impl  RtioUdpStream  for  UvUdpStream  { 
462-     fn  read ( & mut   self ,  buf :  & mut  [ u8 ] )  -> Result < uint ,  IoError >  { 
463-         let  result_cell = empty_cell ( ) ; 
464-         let  result_cell_ptr:  * Cell < Result < uint ,  IoError > >  = & result_cell; 
478+ impl  RtioUdpSocket  for  UvUdpSocket  { 
479+     fn  recvfrom ( & self ,  buf :  & mut  [ u8 ] )  -> Result < ( uint ,   IpAddr ) ,  IoError >  { 
480+         let  result_cell = Cell :: new_empty ( ) ; 
481+         let  result_cell_ptr:  * Cell < Result < ( uint ,   IpAddr ) ,  IoError > >  = & result_cell; 
465482
466483        let  scheduler = Local :: take :: < Scheduler > ( ) ; 
467484        assert ! ( scheduler. in_task_context( ) ) ; 
468485        let  watcher = self . watcher ( ) ; 
469-         let  connection_address = self . address ( ) ; 
470486        let  buf_ptr:  * & mut  [ u8 ]  = & buf; 
471487        do scheduler. deschedule_running_task_and_then  |sched,  task| { 
472-             rtdebug ! ( "read : entered scheduler context" ) ; 
488+             rtdebug ! ( "recvfrom : entered scheduler context" ) ; 
473489            assert ! ( !sched. in_task_context( ) ) ; 
474490            let  mut  watcher = watcher; 
475-             let  task_cell = Cell ( task) ; 
476-             // XXX: see note in RtioTcpStream implementation for UvTcpStream 
477-             let  alloc:  AllocCallback  = |_| unsafe  { 
478-                 slice_to_uv_buf ( * buf_ptr) 
479-             } ; 
480-             do watcher. recv_start ( alloc)  |watcher,  nread,  _buf,  addr,  flags,  status| { 
481-                 let  _ = flags;  // TODO actually use flags 
491+             let  task_cell = Cell :: new ( task) ; 
492+             let  alloc:  AllocCallback  = |_| unsafe  {  slice_to_uv_buf ( * buf_ptr)  } ; 
493+             do watcher. recv_start ( alloc)  |watcher,  nread,  buf,  addr,  flags,  status| { 
494+                 let  _ = flags;  // TODO  
495+                 let  _ = buf;  // TODO  
482496
483-                 // XXX: see note in RtioTcpStream implementation for UvTcpStream 
484497                let  mut  watcher = watcher; 
485498                watcher. recv_stop ( ) ; 
486499
487-                 let  incoming_address = net:: uv_ip4_to_ip4 ( & addr) ; 
488500                let  result = if  status. is_none ( )  { 
489501                    assert ! ( nread >= 0 ) ; 
490-                     if  incoming_address != connection_address { 
491-                         Ok ( 0 u) 
492-                     }  else  { 
493-                         Ok ( nread as  uint ) 
494-                     } 
502+                     Ok ( ( nread as  uint ,  uv_ip4_to_ip4 ( & addr) ) ) 
495503                }  else  { 
496504                    Err ( uv_error_to_io_error ( status. unwrap ( ) ) ) 
497505                } ; 
@@ -505,11 +513,37 @@ impl RtioUdpStream for UvUdpStream {
505513
506514        assert!( !result_cell. is_empty ( ) ) ; 
507515        return  result_cell. take ( ) ; 
516+ 
508517    } 
518+     fn  sendto ( & self ,  buf :  & [ u8 ] ,  dst :  IpAddr )  -> Result < ( ) ,  IoError >  { 
519+         let  result_cell = Cell :: new_empty ( ) ; 
520+         let  result_cell_ptr:  * Cell < Result < ( ) ,  IoError > >  = & result_cell; 
521+         let  scheduler = Local :: take :: < Scheduler > ( ) ; 
522+         assert ! ( scheduler. in_task_context( ) ) ; 
523+         let  watcher = self . watcher ( ) ; 
524+         let  buf_ptr:  * & [ u8 ]  = & buf; 
525+         do scheduler. deschedule_running_task_and_then  |_,  task| { 
526+             let  mut  watcher = watcher; 
527+             let  task_cell = Cell :: new ( task) ; 
528+             let  buf = unsafe  {  slice_to_uv_buf ( * buf_ptr)  } ; 
529+             do watcher. send ( buf,  dst)  |watcher,  status| { 
530+                 let  _ = watcher;  // TODO  
531+ 
532+                 let  result = if  status. is_none ( )  { 
533+                     Ok ( ( ) ) 
534+                 }  else  { 
535+                     Err ( uv_error_to_io_error ( status. unwrap ( ) ) ) 
536+                 } ; 
537+ 
538+                 unsafe  {  ( * result_cell_ptr) . put_back ( result) ;  } 
539+ 
540+                 let  scheduler = Local :: take :: < Scheduler > ( ) ; 
541+                 scheduler. resume_task_immediately ( task_cell. take ( ) ) ; 
542+             } 
543+         } 
509544
510-     fn  write ( & mut  self ,  buf :  & [ u8 ] )  -> Result < ( ) ,  IoError >  {  
511-         let  _ = buf; 
512-         fail ! ( )  
545+         assert!( !result_cell. is_empty ( ) ) ; 
546+         return  result_cell. take ( ) ; 
513547    } 
514548} 
515549
@@ -535,7 +569,7 @@ fn test_simple_tcp_server_and_client() {
535569            unsafe { 
536570                let  io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ; 
537571                let  mut  listener = ( * io) . tcp_bind ( addr) . unwrap ( ) ; 
538-                 let  mut   stream = listener. accept ( ) . unwrap ( ) ; 
572+                 let  stream = listener. accept ( ) . unwrap ( ) ; 
539573                let  mut  buf = [ 0 ,  .. 2048 ] ; 
540574                let  nread = stream. read ( buf) . unwrap ( ) ; 
541575                assert_eq ! ( nread,  8 ) ; 
@@ -549,7 +583,7 @@ fn test_simple_tcp_server_and_client() {
549583        do  spawntask_immediately { 
550584            unsafe { 
551585                let  io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ; 
552-                 let  mut   stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
586+                 let  stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
553587                stream. write ( [ 0 ,  1 ,  2 ,  3 ,  4 ,  5 ,  6 ,  7 ] ) ; 
554588            } 
555589        } 
@@ -564,7 +598,7 @@ fn test_read_and_block() {
564598        do spawntask_immediately { 
565599            let io = unsafe  {  Local :: unsafe_borrow :: < IoFactoryObject > ( )  } ; 
566600            let  mut  listener = unsafe  {  ( * io) . tcp_bind ( addr) . unwrap ( )  } ; 
567-             let  mut   stream = listener. accept ( ) . unwrap ( ) ; 
601+             let  stream = listener. accept ( ) . unwrap ( ) ; 
568602            let  mut  buf = [ 0 ,  .. 2048 ] ; 
569603
570604            let  expected = 32 ; 
@@ -597,7 +631,7 @@ fn test_read_and_block() {
597631        do  spawntask_immediately { 
598632            unsafe { 
599633                let  io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ; 
600-                 let  mut   stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
634+                 let  stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
601635                stream. write ( [ 0 ,  1 ,  2 ,  3 ,  4 ,  5 ,  6 ,  7 ] ) ; 
602636                stream. write ( [ 0 ,  1 ,  2 ,  3 ,  4 ,  5 ,  6 ,  7 ] ) ; 
603637                stream. write ( [ 0 ,  1 ,  2 ,  3 ,  4 ,  5 ,  6 ,  7 ] ) ; 
@@ -618,7 +652,7 @@ fn test_read_read_read() {
618652            unsafe { 
619653                let  io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ; 
620654                let  mut  listener = ( * io) . tcp_bind ( addr) . unwrap ( ) ; 
621-                 let  mut   stream = listener. accept ( ) . unwrap ( ) ; 
655+                 let  stream = listener. accept ( ) . unwrap ( ) ; 
622656                let  buf = [ 1 ,  .. 2048 ] ; 
623657                let  mut  total_bytes_written = 0 ; 
624658                while  total_bytes_written < MAX  { 
@@ -631,7 +665,7 @@ fn test_read_read_read() {
631665        do  spawntask_immediately { 
632666            unsafe { 
633667                let  io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ; 
634-                 let  mut   stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
668+                 let  stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ; 
635669                let  mut  buf = [ 0 ,  .. 2048 ] ; 
636670                let  mut  total_bytes_read = 0 ; 
637671                while  total_bytes_read < MAX  { 
0 commit comments