1+ <?php
2+
3+ namespace EProcess \Stream ;
4+
5+ use React \Stream \Stream ;
6+ use React \EventLoop \LoopInterface ;
7+ use React \Stream \Buffer ;
8+
9+ class FullDrainStream extends Stream
10+ {
11+
12+ public function __construct ($ stream , LoopInterface $ loop )
13+ {
14+ $ this ->stream = $ stream ;
15+ if (!is_resource ($ this ->stream ) || get_resource_type ($ this ->stream ) !== "stream " ) {
16+ throw new \InvalidArgumentException ('First parameter must be a valid stream resource ' );
17+ }
18+
19+ stream_set_blocking ($ this ->stream , 0 );
20+
21+ // Use unbuffered read operations on the underlying stream resource.
22+ // Reading chunks from the stream may otherwise leave unread bytes in
23+ // PHP's stream buffers which some event loop implementations do not
24+ // trigger events on (edge triggered).
25+ // This does not affect the default event loop implementation (level
26+ // triggered), so we can ignore platforms not supporting this (HHVM).
27+ if (function_exists ('stream_set_read_buffer ' )) {
28+ stream_set_read_buffer ($ this ->stream , 0 );
29+ }
30+
31+ $ this ->loop = $ loop ;
32+ $ this ->buffer = new Buffer ($ this ->stream , $ this ->loop );
33+
34+ $ that = $ this ;
35+
36+ $ this ->buffer ->on ('error ' , function ($ error ) use ($ that ) {
37+ $ that ->emit ('error ' , array ($ error , $ that ));
38+ $ that ->close ();
39+ });
40+
41+ $ this ->buffer ->on ('drain ' , function () use ($ that ) {
42+ $ that ->emit ('drain ' , array ($ that ));
43+ });
44+
45+ $ this ->buffer ->on ('full-drain ' , function () use ($ that ) {
46+ $ that ->emit ('full-drain ' , array ($ that ));
47+ });
48+
49+ $ this ->resume ();
50+ }
51+ }
0 commit comments