Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 29 additions & 184 deletions lib/std/child_process.zig
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ pub const ChildProcess = struct {
stderr: []u8,
};

fn fifoToOwnedArrayList(fifo: *std.io.PollFifo) std.ArrayList(u8) {
if (fifo.head > 0) {
std.mem.copy(u8, fifo.buf[0..fifo.count], fifo.buf[fifo.head .. fifo.head + fifo.count]);
}
const result = std.ArrayList(u8){
.items = fifo.buf[0..fifo.count],
.capacity = fifo.buf.len,
.allocator = fifo.allocator,
};
fifo.* = std.io.PollFifo.init(fifo.allocator);
return result;
}

/// Collect the output from the process's stdout and stderr. Will return once all output
/// has been collected. This does not mean that the process has ended. `wait` should still
/// be called to wait for and clean up the process.
Expand All @@ -210,195 +223,27 @@ pub const ChildProcess = struct {
) !void {
debug.assert(child.stdout_behavior == .Pipe);
debug.assert(child.stderr_behavior == .Pipe);
if (builtin.os.tag == .haiku) {
const stdout_in = child.stdout.?.reader();
const stderr_in = child.stderr.?.reader();

try stdout_in.readAllArrayList(stdout, max_output_bytes);
try stderr_in.readAllArrayList(stderr, max_output_bytes);
} else if (builtin.os.tag == .windows) {
try collectOutputWindows(child, stdout, stderr, max_output_bytes);
} else {
try collectOutputPosix(child, stdout, stderr, max_output_bytes);
}
}

fn collectOutputPosix(
child: ChildProcess,
stdout: *std.ArrayList(u8),
stderr: *std.ArrayList(u8),
max_output_bytes: usize,
) !void {
var poll_fds = [_]os.pollfd{
.{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined },
.{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined },
};

var dead_fds: usize = 0;
// We ask for ensureTotalCapacity with this much extra space. This has more of an
// effect on small reads because once the reads start to get larger the amount
// of space an ArrayList will allocate grows exponentially.
const bump_amt = 512;

const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP;

while (dead_fds < poll_fds.len) {
const events = try os.poll(&poll_fds, std.math.maxInt(i32));
if (events == 0) continue;

var remove_stdout = false;
var remove_stderr = false;
// Try reading whatever is available before checking the error
// conditions.
// It's still possible to read after a POLL.HUP is received, always
// check if there's some data waiting to be read first.
if (poll_fds[0].revents & os.POLL.IN != 0) {
// stdout is ready.
const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes);
try stdout.ensureTotalCapacity(new_capacity);
const buf = stdout.unusedCapacitySlice();
if (buf.len == 0) return error.StdoutStreamTooLong;
const nread = try os.read(poll_fds[0].fd, buf);
stdout.items.len += nread;

// Remove the fd when the EOF condition is met.
remove_stdout = nread == 0;
} else {
remove_stdout = poll_fds[0].revents & err_mask != 0;
}

if (poll_fds[1].revents & os.POLL.IN != 0) {
// stderr is ready.
const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes);
try stderr.ensureTotalCapacity(new_capacity);
const buf = stderr.unusedCapacitySlice();
if (buf.len == 0) return error.StderrStreamTooLong;
const nread = try os.read(poll_fds[1].fd, buf);
stderr.items.len += nread;

// Remove the fd when the EOF condition is met.
remove_stderr = nread == 0;
} else {
remove_stderr = poll_fds[1].revents & err_mask != 0;
}

// Exclude the fds that signaled an error.
if (remove_stdout) {
poll_fds[0].fd = -1;
dead_fds += 1;
}
if (remove_stderr) {
poll_fds[1].fd = -1;
dead_fds += 1;
}
}
}
// we could make this work with multiple allocators but YAGNI
if (stdout.allocator.ptr != stderr.allocator.ptr or
stdout.allocator.vtable != stderr.allocator.vtable)
@panic("ChildProcess.collectOutput only supports 1 allocator");

const WindowsAsyncReadResult = enum {
pending,
closed,
full,
};

fn windowsAsyncRead(
handle: windows.HANDLE,
overlapped: *windows.OVERLAPPED,
buf: *std.ArrayList(u8),
bump_amt: usize,
max_output_bytes: usize,
) !WindowsAsyncReadResult {
while (true) {
const new_capacity = std.math.min(buf.items.len + bump_amt, max_output_bytes);
try buf.ensureTotalCapacity(new_capacity);
const next_buf = buf.unusedCapacitySlice();
if (next_buf.len == 0) return .full;
var read_bytes: u32 = undefined;
const read_result = windows.kernel32.ReadFile(handle, next_buf.ptr, math.cast(u32, next_buf.len) orelse maxInt(u32), &read_bytes, overlapped);
if (read_result == 0) return switch (windows.kernel32.GetLastError()) {
.IO_PENDING => .pending,
.BROKEN_PIPE => .closed,
else => |err| windows.unexpectedError(err),
};
buf.items.len += read_bytes;
}
}

fn collectOutputWindows(child: ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
const bump_amt = 512;
const outs = [_]*std.ArrayList(u8){
stdout,
stderr,
};
const handles = [_]windows.HANDLE{
child.stdout.?.handle,
child.stderr.?.handle,
};

var overlapped = [_]windows.OVERLAPPED{
mem.zeroes(windows.OVERLAPPED),
mem.zeroes(windows.OVERLAPPED),
};

var wait_objects: [2]windows.HANDLE = undefined;
var wait_object_count: u2 = 0;

// we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope
defer for (wait_objects[0..wait_object_count]) |o| {
_ = windows.kernel32.CancelIo(o);
};
var poller = std.io.poll(stdout.allocator, enum { stdout, stderr }, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();

// Windows Async IO requires an initial call to ReadFile before waiting on the handle
for ([_]u1{ 0, 1 }) |i| {
switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
.pending => {
wait_objects[wait_object_count] = handles[i];
wait_object_count += 1;
},
.closed => {}, // don't add to the wait_objects list
.full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
}
while (try poller.poll()) {
if (poller.fifo(.stdout).count > max_output_bytes)
return error.StdoutStreamTooLong;
if (poller.fifo(.stderr).count > max_output_bytes)
return error.StderrStreamTooLong;
}

while (wait_object_count > 0) {
const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE);
if (status == windows.WAIT_FAILED) {
switch (windows.kernel32.GetLastError()) {
else => |err| return windows.unexpectedError(err),
}
}
if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1)
unreachable;

const wait_idx = status - windows.WAIT_OBJECT_0;

// this extra `i` index is needed to map the wait handle back to the stdout or stderr
// values since the wait_idx can change which handle it corresponds with
const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1;

// remove completed event from the wait list
wait_object_count -= 1;
if (wait_idx == 0)
wait_objects[0] = wait_objects[1];

var read_bytes: u32 = undefined;
if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) {
switch (windows.kernel32.GetLastError()) {
.BROKEN_PIPE => continue,
else => |err| return windows.unexpectedError(err),
}
}

outs[i].items.len += read_bytes;

switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
.pending => {
wait_objects[wait_object_count] = handles[i];
wait_object_count += 1;
},
.closed => {}, // don't add to the wait_objects list
.full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
}
}
stdout.* = fifoToOwnedArrayList(poller.fifo(.stdout));
stderr.* = fifoToOwnedArrayList(poller.fifo(.stderr));
}

/// Spawns a child process, waits for it, collecting stdout and stderr, and then returns.
Expand Down
1 change: 1 addition & 0 deletions lib/std/heap/general_purpose_allocator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type {
}
} else struct {};

/// Returns true if there were leaks; false otherwise.
pub fn deinit(self: *Self) bool {
const leaks = if (config.safety) self.detectLeaks() else false;
if (config.retain_metadata) {
Expand Down
Loading