diff --git a/ext/standard/proc_open.c b/ext/standard/proc_open.c index 501d2c3f351bb..60ab630c4e672 100644 --- a/ext/standard/proc_open.c +++ b/ext/standard/proc_open.c @@ -420,7 +420,7 @@ SECURITY_ATTRIBUTES php_proc_open_security = { .bInheritHandle = TRUE }; -# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 0) ? 0 : -1) +# define pipe(pair) (CreatePipe(&pair[0], &pair[1], &php_proc_open_security, 8192) ? 0 : -1) # define COMSPEC_NT "cmd.exe" @@ -943,6 +943,268 @@ static int set_proc_descriptor_from_resource(zval *resource, descriptorspec_item return SUCCESS; } +#ifdef PHP_WIN32 + +#define THREADED_PIPE_BUF_SIZE 4096 + +typedef struct _threaded_pipe { + HANDLE pipe; /* Parent end of a process pipe. */ + HANDLE thread; /* Thread being used to handle IO. */ + php_socket_t sock; /* Internal end of the proxy socket pair. */ + php_stream *stream; /* Wraps the exposed end of the socket pair. */ + const char *mode; /* Pipe open mode. */ + int closed; /* Set to 1 on stream close. */ + int wsa_error; /* Last socket error code. */ + DWORD error; /* Last error code. */ +} threaded_pipe; + +static int threaded_error_check(threaded_pipe *data, const char *format) +{ + if (data->error) { + char *err = php_win32_error_to_msg(data->error); + php_error_docref(NULL, E_WARNING, format, (int) data->error, err); + php_win32_error_msg_free(err); + return 1; + } + + if (data->wsa_error) { + zend_string *err = php_socket_error_str(data->wsa_error); + php_error_docref(NULL, E_WARNING, format, data->wsa_error, ZSTR_VAL(err)); + zend_string_release(err); + return 1; + } + + return 0; +} + +static ssize_t threaded_pipe_write(php_stream *stream, const char *buf, size_t count) +{ + threaded_pipe *data = (threaded_pipe *) stream->abstract; + + if (threaded_error_check(data, "Pipe write failed: [%d] %s")) { + return -1; + } + + return php_stream_write(data->stream, buf, count); +} + +static ssize_t threaded_pipe_read(php_stream *stream, char *buf, size_t count) +{ + threaded_pipe *data = (threaded_pipe *) stream->abstract; + + if (threaded_error_check(data, "Pipe read failed: [%d] %s")) { + return -1; + } + + ssize_t len = php_stream_read(data->stream, buf, count); + + if (len == 0 && threaded_error_check(data, "Pipe read failed: [%d] %s")) { + return -1; + } + + return len; +} + +static int threaded_pipe_close(php_stream *stream, int close_handle) +{ + threaded_pipe *data = (threaded_pipe *) stream->abstract; + + data->closed = 1; + + /* Writer thread will terminate when socket is closed. */ + if (data->mode[0] == 'w') { + php_stream_close(data->stream); + data->stream = NULL; + + /* Thread will write remaining buffer contents to pipe and exit. */ + if (WAIT_TIMEOUT != WaitForSingleObject(data->thread, 1000)) { + CloseHandle(data->thread); + return 0; + } + } + + /* Interrupt IO calls in thread and wait with timeout to prevent race conditions. */ + do { + CancelSynchronousIo(data->thread); + } while (WAIT_TIMEOUT == WaitForSingleObject(data->thread, 5)); + + if (data->stream) { + php_stream_close(data->stream); + } + + /* Dispose of terminated thread. */ + CloseHandle(data->thread); + + return 0; +} + +static int threaded_pipe_flush(php_stream *stream) +{ + return php_stream_flush(((threaded_pipe *) stream->abstract)->stream); +} + +static int threaded_pipe_cast(php_stream *stream, int castas, void **ret) +{ + return php_stream_cast(((threaded_pipe *) stream->abstract)->stream, castas, ret, 1); +} + +static int threaded_pipe_stat(php_stream *stream, php_stream_statbuf *ssb) +{ + return php_stream_stat(((threaded_pipe *) stream->abstract)->stream, ssb); +} + +static int threaded_pipe_set_option(php_stream *stream, int option, int value, void *ptrparam) +{ + return php_stream_set_option(((threaded_pipe *) stream->abstract)->stream, option, value, ptrparam); +} + +/* Threaded pipe stream handlers. */ +static php_stream_ops threaded_pipe_ops = { + threaded_pipe_write, + threaded_pipe_read, + threaded_pipe_close, + threaded_pipe_flush, + "STDIO", + NULL, + threaded_pipe_cast, + threaded_pipe_stat, + threaded_pipe_set_option +}; + +/* Reads data from a process pipe and forwards it into a socket. */ +static DWORD WINAPI pipe_reader_thread(LPVOID arg) +{ + threaded_pipe *data = (threaded_pipe *) arg; + + DWORD a, b; + char buf[THREADED_PIPE_BUF_SIZE]; + + WSABUF wsabuf = { + .buf = buf + }; + + /* Loop runs until closed flag is set. */ + while (!data->closed) { + if (!ReadFile(data->pipe, buf, THREADED_PIPE_BUF_SIZE, &a, NULL)) { + /* Save error code but do not treat EOF as an error. */ + if (ERROR_BROKEN_PIPE == (data->error = GetLastError())) { + data->error = 0; + } + break; + } + + wsabuf.len = a; + + if (a < 1 || WSASend(data->sock, &wsabuf, 1, &b, 0, NULL, NULL)) { + data->wsa_error = WSAGetLastError(); + break; + } + } + + CloseHandle(data->pipe); + closesocket(data->sock); + + return 0; +} + +/* Reads data from a socket and forwards it into a process pipe. */ +static DWORD WINAPI pipe_writer_thread(LPVOID arg) +{ + threaded_pipe *data = (threaded_pipe *) arg; + + DWORD flags, a, b; + char buf[THREADED_PIPE_BUF_SIZE]; + + WSABUF wsabuf = { + .len = THREADED_PIPE_BUF_SIZE, + .buf = buf + }; + + /* Loop runs until exposed part of the socket pair is closed. */ + while (1) { + if (WSARecv(data->sock, &wsabuf, 1, &a, &flags, NULL, NULL)) { + data->wsa_error = WSAGetLastError(); + + /* Do not treat EOF as an error. */ + switch (data->wsa_error) { + case WSAECONNRESET: + case WSAECONNABORTED: + data->wsa_error = 0; + } + break; + } + + if (a < 1 || !WriteFile(data->pipe, buf, a, &b, NULL)) { + data->error = GetLastError(); + break; + } + } + + CloseHandle(data->pipe); + closesocket(data->sock); + + return 0; +} + +/* Create a new threaded pipe wrapping the given process pipe. */ +static php_stream *create_threaded_pipe(HANDLE pipe, const char *mode) +{ + php_socket_t socks[2]; + + /* Create socket pair that proxies data to userland PHP. */ + if (socketpair_win32(AF_INET, SOCK_STREAM, 0, socks, 0)) { + zend_string *err = php_socket_error_str(php_socket_errno()); + php_error_docref(NULL, E_WARNING, "Unable to create socket pair: %s", ZSTR_VAL(err)); + zend_string_release(err); + return NULL; + } + + threaded_pipe *data = emalloc(sizeof(threaded_pipe)); + + data->closed = 0; + data->error = 0; + data->wsa_error = 0; + + data->mode = mode; + data->pipe = pipe; + data->sock = socks[1]; + + /* Expose one end of the socket pair to userland. */ + if (NULL == (data->stream = php_stream_sock_open_from_socket(socks[0], NULL))) { + closesocket(socks[0]); + closesocket(socks[1]); + efree(data); + return NULL; + } + + /* Create thread based on pipe mode. */ + LPTHREAD_START_ROUTINE tf = pipe_reader_thread; + + if (mode[0] == 'w') { + tf = pipe_writer_thread; + + /* Restrict receive buffer size to avoid excessive data buffering. */ + int bsize = THREADED_PIPE_BUF_SIZE; + setsockopt(socks[1], SOL_SOCKET, SO_RCVBUF, (char *) &bsize, sizeof(int)); + } + + data->thread = CreateThread(&php_proc_open_security, 0xFFFF, tf, data, 0, NULL); + + if (data->thread == NULL) { + DWORD dw = GetLastError(); + closesocket(socks[0]); + closesocket(socks[1]); + php_stream_close(data->stream); + php_error_docref(NULL, E_WARNING, "Failed to spawn pipe thread: %u", dw); + efree(data); + return NULL; + } + + return php_stream_alloc_rel(&threaded_pipe_ops, data, NULL, mode); +} + +#endif + #ifndef PHP_WIN32 static int close_parentends_of_pipes(descriptorspec_item *descriptors, int ndesc) { @@ -1018,6 +1280,7 @@ PHP_FUNCTION(proc_open) int suppress_errors = 0; int bypass_shell = 0; int blocking_pipes = 0; + int threaded_pipes = 0; int create_process_group = 0; int create_new_console = 0; #else @@ -1070,6 +1333,7 @@ PHP_FUNCTION(proc_open) /* TODO: Deprecate in favor of array command? */ bypass_shell = bypass_shell || get_option(other_options, "bypass_shell"); blocking_pipes = get_option(other_options, "blocking_pipes"); + threaded_pipes = get_option(other_options, "threaded_pipes"); create_process_group = get_option(other_options, "create_process_group"); create_new_console = get_option(other_options, "create_new_console"); } @@ -1270,9 +1534,13 @@ PHP_FUNCTION(proc_open) } #ifdef PHP_WIN32 - stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend, - descriptors[i].mode_flags), mode_string, NULL); - php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL); + if (threaded_pipes) { + stream = create_threaded_pipe(descriptors[i].parentend, mode_string); + } else { + stream = php_stream_fopen_from_fd(_open_osfhandle((zend_intptr_t)descriptors[i].parentend, + descriptors[i].mode_flags), mode_string, NULL); + php_stream_set_option(stream, PHP_STREAM_OPTION_PIPE_BLOCKING, blocking_pipes, NULL); + } #else stream = php_stream_fopen_from_fd(descriptors[i].parentend, mode_string, NULL); #endif diff --git a/ext/standard/tests/general_functions/proc_open_sockets2.phpt b/ext/standard/tests/general_functions/proc_open_sockets2.phpt index 25f3153ec4874..8d4209a8728f4 100644 --- a/ext/standard/tests/general_functions/proc_open_sockets2.phpt +++ b/ext/standard/tests/general_functions/proc_open_sockets2.phpt @@ -56,7 +56,10 @@ printf("STDOUT << %s\n", read_pipe($pipes[1])); write_pipe($pipes[0], 'done'); fclose($pipes[0]); +var_dump(feof($pipes[1])); printf("STDOUT << %s\n", read_pipe($pipes[1])); +var_dump(read_pipe($pipes[1])); +var_dump(feof($pipes[1])); ?> --EXPECTF-- @@ -64,4 +67,7 @@ bool(true) bool(true) STDOUT << hello STDOUT << world +bool(false) STDOUT << DONE +string(0) "" +bool(true) diff --git a/ext/standard/tests/general_functions/proc_open_sockets3.phpt b/ext/standard/tests/general_functions/proc_open_sockets3.phpt index 5ee9e53b56b47..f2f69f709795b 100644 --- a/ext/standard/tests/general_functions/proc_open_sockets3.phpt +++ b/ext/standard/tests/general_functions/proc_open_sockets3.phpt @@ -45,11 +45,17 @@ printf("STDOUT << %s\n", read_pipe($pipes[1])); fwrite($pipes[0], 'done'); fclose($pipes[0]); +var_dump(feof($pipes[1])); printf("STDOUT << %s\n", read_pipe($pipes[1])); +var_dump(read_pipe($pipes[1])); +var_dump(feof($pipes[1])); ?> --EXPECTF-- bool(true) STDOUT << hello STDOUT << world +bool(false) STDOUT << DONE +string(0) "" +bool(true) diff --git a/ext/standard/tests/general_functions/proc_open_sockets4.phpt b/ext/standard/tests/general_functions/proc_open_sockets4.phpt new file mode 100644 index 0000000000000..53d6ad77b975c --- /dev/null +++ b/ext/standard/tests/general_functions/proc_open_sockets4.phpt @@ -0,0 +1,75 @@ +--TEST-- +proc_open() with non-blocking pipes (threaded on Windows) +--FILE-- + true +]); + +foreach ($pipes as $pipe) { + var_dump(stream_set_blocking($pipe, false)); +} + +printf("STDOUT << %s\n", read_pipe($pipes[1])); +printf("STDOUT << %s\n", read_pipe($pipes[1])); + +write_pipe($pipes[0], 'done'); +fclose($pipes[0]); + +var_dump(feof($pipes[1])); +printf("STDOUT << %s\n", read_pipe($pipes[1])); +var_dump(read_pipe($pipes[1])); +var_dump(feof($pipes[1])); + +?> +--EXPECTF-- +bool(true) +bool(true) +STDOUT << hello +STDOUT << world +bool(false) +STDOUT << DONE +string(0) "" +bool(true) diff --git a/ext/standard/tests/general_functions/proc_open_sockets5.inc b/ext/standard/tests/general_functions/proc_open_sockets5.inc new file mode 100644 index 0000000000000..ea8dd18cca28e --- /dev/null +++ b/ext/standard/tests/general_functions/proc_open_sockets5.inc @@ -0,0 +1,9 @@ + true + ]); + + $hash = hash_init('md5'); + $chunk = str_repeat('A', $size); + + for ($i = 0; $i < $count; $i++) { + hash_update($hash, $chunk); + fwrite($pipes[0], $chunk); + } + + fclose($pipes[0]); + + $a = trim(stream_get_contents($pipes[1])); + $b = hash_final($hash); + + if ($a !== $b) { + throw new Error(sprintf('Test failed with chunk size %d and %d chunks', $size, $count)); + } +} + +echo "DONE\n"; + +?> +--EXPECTF-- +START +DONE diff --git a/ext/standard/tests/general_functions/proc_open_threaded5.phpt b/ext/standard/tests/general_functions/proc_open_threaded5.phpt new file mode 100644 index 0000000000000..5e45fad08b0c1 --- /dev/null +++ b/ext/standard/tests/general_functions/proc_open_threaded5.phpt @@ -0,0 +1,59 @@ +--TEST-- +proc_open() with pipes (threaded on Windows) +--FILE-- + true + ]); + + $hash = hash_init('md5'); + $chunk = str_repeat('A', $size); + + for ($i = 0; $i < $count; $i++) { + hash_update($hash, $chunk); + fwrite($pipes[0], $chunk); + } + + fclose($pipes[0]); + + $a = trim(stream_get_contents($pipes[1])); + $b = hash_final($hash); + + if ($a !== $b) { + throw new Error(sprintf('Test failed with chunk size %d and %d chunks', $size, $count)); + } +} + +echo "DONE\n"; + +?> +--EXPECTF-- +START +DONE