diff --git a/lib/std/child_process.zig b/lib/std/child_process.zig index c3bd53b8802f..7ff48a565245 100644 --- a/lib/std/child_process.zig +++ b/lib/std/child_process.zig @@ -197,6 +197,19 @@ pub const ChildProcess = struct { stderr: []u8, }; + fn fifoToOwnedArrayList(fifo: *std.io.PollFifo) std.ArrayList(u8) { + if (fifo.head > 0) { + std.mem.copy(u8, fifo.buf[0..fifo.count], fifo.buf[fifo.head .. fifo.head + fifo.count]); + } + const result = std.ArrayList(u8){ + .items = fifo.buf[0..fifo.count], + .capacity = fifo.buf.len, + .allocator = fifo.allocator, + }; + fifo.* = std.io.PollFifo.init(fifo.allocator); + return result; + } + /// Collect the output from the process's stdout and stderr. Will return once all output /// has been collected. This does not mean that the process has ended. `wait` should still /// be called to wait for and clean up the process. @@ -210,195 +223,27 @@ pub const ChildProcess = struct { ) !void { debug.assert(child.stdout_behavior == .Pipe); debug.assert(child.stderr_behavior == .Pipe); - if (builtin.os.tag == .haiku) { - const stdout_in = child.stdout.?.reader(); - const stderr_in = child.stderr.?.reader(); - - try stdout_in.readAllArrayList(stdout, max_output_bytes); - try stderr_in.readAllArrayList(stderr, max_output_bytes); - } else if (builtin.os.tag == .windows) { - try collectOutputWindows(child, stdout, stderr, max_output_bytes); - } else { - try collectOutputPosix(child, stdout, stderr, max_output_bytes); - } - } - - fn collectOutputPosix( - child: ChildProcess, - stdout: *std.ArrayList(u8), - stderr: *std.ArrayList(u8), - max_output_bytes: usize, - ) !void { - var poll_fds = [_]os.pollfd{ - .{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined }, - .{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined }, - }; - - var dead_fds: usize = 0; - // We ask for ensureTotalCapacity with this much extra space. This has more of an - // effect on small reads because once the reads start to get larger the amount - // of space an ArrayList will allocate grows exponentially. - const bump_amt = 512; - - const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP; - - while (dead_fds < poll_fds.len) { - const events = try os.poll(&poll_fds, std.math.maxInt(i32)); - if (events == 0) continue; - - var remove_stdout = false; - var remove_stderr = false; - // Try reading whatever is available before checking the error - // conditions. - // It's still possible to read after a POLL.HUP is received, always - // check if there's some data waiting to be read first. - if (poll_fds[0].revents & os.POLL.IN != 0) { - // stdout is ready. - const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes); - try stdout.ensureTotalCapacity(new_capacity); - const buf = stdout.unusedCapacitySlice(); - if (buf.len == 0) return error.StdoutStreamTooLong; - const nread = try os.read(poll_fds[0].fd, buf); - stdout.items.len += nread; - - // Remove the fd when the EOF condition is met. - remove_stdout = nread == 0; - } else { - remove_stdout = poll_fds[0].revents & err_mask != 0; - } - - if (poll_fds[1].revents & os.POLL.IN != 0) { - // stderr is ready. - const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes); - try stderr.ensureTotalCapacity(new_capacity); - const buf = stderr.unusedCapacitySlice(); - if (buf.len == 0) return error.StderrStreamTooLong; - const nread = try os.read(poll_fds[1].fd, buf); - stderr.items.len += nread; - - // Remove the fd when the EOF condition is met. - remove_stderr = nread == 0; - } else { - remove_stderr = poll_fds[1].revents & err_mask != 0; - } - // Exclude the fds that signaled an error. - if (remove_stdout) { - poll_fds[0].fd = -1; - dead_fds += 1; - } - if (remove_stderr) { - poll_fds[1].fd = -1; - dead_fds += 1; - } - } - } + // we could make this work with multiple allocators but YAGNI + if (stdout.allocator.ptr != stderr.allocator.ptr or + stdout.allocator.vtable != stderr.allocator.vtable) + @panic("ChildProcess.collectOutput only supports 1 allocator"); - const WindowsAsyncReadResult = enum { - pending, - closed, - full, - }; - - fn windowsAsyncRead( - handle: windows.HANDLE, - overlapped: *windows.OVERLAPPED, - buf: *std.ArrayList(u8), - bump_amt: usize, - max_output_bytes: usize, - ) !WindowsAsyncReadResult { - while (true) { - const new_capacity = std.math.min(buf.items.len + bump_amt, max_output_bytes); - try buf.ensureTotalCapacity(new_capacity); - const next_buf = buf.unusedCapacitySlice(); - if (next_buf.len == 0) return .full; - var read_bytes: u32 = undefined; - const read_result = windows.kernel32.ReadFile(handle, next_buf.ptr, math.cast(u32, next_buf.len) orelse maxInt(u32), &read_bytes, overlapped); - if (read_result == 0) return switch (windows.kernel32.GetLastError()) { - .IO_PENDING => .pending, - .BROKEN_PIPE => .closed, - else => |err| windows.unexpectedError(err), - }; - buf.items.len += read_bytes; - } - } - - fn collectOutputWindows(child: ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void { - const bump_amt = 512; - const outs = [_]*std.ArrayList(u8){ - stdout, - stderr, - }; - const handles = [_]windows.HANDLE{ - child.stdout.?.handle, - child.stderr.?.handle, - }; - - var overlapped = [_]windows.OVERLAPPED{ - mem.zeroes(windows.OVERLAPPED), - mem.zeroes(windows.OVERLAPPED), - }; - - var wait_objects: [2]windows.HANDLE = undefined; - var wait_object_count: u2 = 0; - - // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope - defer for (wait_objects[0..wait_object_count]) |o| { - _ = windows.kernel32.CancelIo(o); - }; + var poller = std.io.poll(stdout.allocator, enum { stdout, stderr }, .{ + .stdout = child.stdout.?, + .stderr = child.stderr.?, + }); + defer poller.deinit(); - // Windows Async IO requires an initial call to ReadFile before waiting on the handle - for ([_]u1{ 0, 1 }) |i| { - switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) { - .pending => { - wait_objects[wait_object_count] = handles[i]; - wait_object_count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong, - } + while (try poller.poll()) { + if (poller.fifo(.stdout).count > max_output_bytes) + return error.StdoutStreamTooLong; + if (poller.fifo(.stderr).count > max_output_bytes) + return error.StderrStreamTooLong; } - while (wait_object_count > 0) { - const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE); - if (status == windows.WAIT_FAILED) { - switch (windows.kernel32.GetLastError()) { - else => |err| return windows.unexpectedError(err), - } - } - if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1) - unreachable; - - const wait_idx = status - windows.WAIT_OBJECT_0; - - // this extra `i` index is needed to map the wait handle back to the stdout or stderr - // values since the wait_idx can change which handle it corresponds with - const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1; - - // remove completed event from the wait list - wait_object_count -= 1; - if (wait_idx == 0) - wait_objects[0] = wait_objects[1]; - - var read_bytes: u32 = undefined; - if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) { - switch (windows.kernel32.GetLastError()) { - .BROKEN_PIPE => continue, - else => |err| return windows.unexpectedError(err), - } - } - - outs[i].items.len += read_bytes; - - switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) { - .pending => { - wait_objects[wait_object_count] = handles[i]; - wait_object_count += 1; - }, - .closed => {}, // don't add to the wait_objects list - .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong, - } - } + stdout.* = fifoToOwnedArrayList(poller.fifo(.stdout)); + stderr.* = fifoToOwnedArrayList(poller.fifo(.stderr)); } /// Spawns a child process, waits for it, collecting stdout and stderr, and then returns. diff --git a/lib/std/heap/general_purpose_allocator.zig b/lib/std/heap/general_purpose_allocator.zig index 452480dc7a3f..fed6eba47b47 100644 --- a/lib/std/heap/general_purpose_allocator.zig +++ b/lib/std/heap/general_purpose_allocator.zig @@ -423,6 +423,7 @@ pub fn GeneralPurposeAllocator(comptime config: Config) type { } } else struct {}; + /// Returns true if there were leaks; false otherwise. pub fn deinit(self: *Self) bool { const leaks = if (config.safety) self.detectLeaks() else false; if (config.retain_metadata) { diff --git a/lib/std/io.zig b/lib/std/io.zig index a61f2a4e0ed1..0faba2b6523e 100644 --- a/lib/std/io.zig +++ b/lib/std/io.zig @@ -168,6 +168,256 @@ test "null_writer" { null_writer.writeAll("yay" ** 10) catch |err| switch (err) {}; } +pub fn poll( + allocator: std.mem.Allocator, + comptime StreamEnum: type, + files: PollFiles(StreamEnum), +) Poller(StreamEnum) { + const enum_fields = @typeInfo(StreamEnum).Enum.fields; + var result: Poller(StreamEnum) = undefined; + + if (builtin.os.tag == .windows) result.windows = .{ + .first_read_done = false, + .overlapped = [1]os.windows.OVERLAPPED{ + mem.zeroes(os.windows.OVERLAPPED), + } ** enum_fields.len, + .active = .{ + .count = 0, + .handles_buf = undefined, + .stream_map = undefined, + }, + }; + + inline for (0..enum_fields.len) |i| { + result.fifos[i] = .{ + .allocator = allocator, + .buf = &.{}, + .head = 0, + .count = 0, + }; + if (builtin.os.tag == .windows) { + result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; + } else { + result.poll_fds[i] = .{ + .fd = @field(files, enum_fields[i].name).handle, + .events = os.POLL.IN, + .revents = undefined, + }; + } + } + return result; +} + +pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); + +pub fn Poller(comptime StreamEnum: type) type { + return struct { + const enum_fields = @typeInfo(StreamEnum).Enum.fields; + const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd; + + fifos: [enum_fields.len]PollFifo, + poll_fds: [enum_fields.len]PollFd, + windows: if (builtin.os.tag == .windows) struct { + first_read_done: bool, + overlapped: [enum_fields.len]os.windows.OVERLAPPED, + active: struct { + count: math.IntFittingRange(0, enum_fields.len), + handles_buf: [enum_fields.len]os.windows.HANDLE, + stream_map: [enum_fields.len]StreamEnum, + + pub fn removeAt(self: *@This(), index: u32) void { + std.debug.assert(index < self.count); + for (index + 1..self.count) |i| { + self.handles_buf[i - 1] = self.handles_buf[i]; + self.stream_map[i - 1] = self.stream_map[i]; + } + self.count -= 1; + } + }, + } else void, + + const Self = @This(); + + pub fn deinit(self: *Self) void { + if (builtin.os.tag == .windows) { + // cancel any pending IO to prevent clobbering OVERLAPPED value + for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { + _ = os.windows.kernel32.CancelIo(h); + } + } + inline for (&self.fifos) |*q| q.deinit(); + self.* = undefined; + } + + pub fn poll(self: *Self) !bool { + if (builtin.os.tag == .windows) { + return pollWindows(self); + } else { + return pollPosix(self); + } + } + + pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { + return &self.fifos[@enumToInt(which)]; + } + + fn pollWindows(self: *Self) !bool { + const bump_amt = 512; + + if (!self.windows.first_read_done) { + // Windows Async IO requires an initial call to ReadFile before waiting on the handle + for (0..enum_fields.len) |i| { + const handle = self.windows.active.handles_buf[i]; + switch (try windowsAsyncRead( + handle, + &self.windows.overlapped[i], + &self.fifos[i], + bump_amt, + )) { + .pending => { + self.windows.active.handles_buf[self.windows.active.count] = handle; + self.windows.active.stream_map[self.windows.active.count] = @intToEnum(StreamEnum, i); + self.windows.active.count += 1; + }, + .closed => {}, // don't add to the wait_objects list + } + } + self.windows.first_read_done = true; + } + + while (true) { + if (self.windows.active.count == 0) return false; + + const status = os.windows.kernel32.WaitForMultipleObjects( + self.windows.active.count, + &self.windows.active.handles_buf, + 0, + os.windows.INFINITE, + ); + if (status == os.windows.WAIT_FAILED) + return os.windows.unexpectedError(os.windows.kernel32.GetLastError()); + + if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1) + unreachable; + + const active_idx = status - os.windows.WAIT_OBJECT_0; + + const handle = self.windows.active.handles_buf[active_idx]; + const stream_idx = @enumToInt(self.windows.active.stream_map[active_idx]); + var read_bytes: u32 = undefined; + if (0 == os.windows.kernel32.GetOverlappedResult( + handle, + &self.windows.overlapped[stream_idx], + &read_bytes, + 0, + )) switch (os.windows.kernel32.GetLastError()) { + .BROKEN_PIPE => { + self.windows.active.removeAt(active_idx); + continue; + }, + else => |err| return os.windows.unexpectedError(err), + }; + + self.fifos[stream_idx].update(read_bytes); + + switch (try windowsAsyncRead( + handle, + &self.windows.overlapped[stream_idx], + &self.fifos[stream_idx], + bump_amt, + )) { + .pending => {}, + .closed => self.windows.active.removeAt(active_idx), + } + return true; + } + } + + fn pollPosix(self: *Self) !bool { + // We ask for ensureUnusedCapacity with this much extra space. This + // has more of an effect on small reads because once the reads + // start to get larger the amount of space an ArrayList will + // allocate grows exponentially. + const bump_amt = 512; + + const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP; + + const events_len = try os.poll(&self.poll_fds, std.math.maxInt(i32)); + if (events_len == 0) { + for (self.poll_fds) |poll_fd| { + if (poll_fd.fd != -1) return true; + } else return false; + } + + var keep_polling = false; + inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { + // Try reading whatever is available before checking the error + // conditions. + // It's still possible to read after a POLL.HUP is received, + // always check if there's some data waiting to be read first. + if (poll_fd.revents & os.POLL.IN != 0) { + const buf = try q.writableWithSize(bump_amt); + const amt = try os.read(poll_fd.fd, buf); + q.update(amt); + if (amt == 0) { + // Remove the fd when the EOF condition is met. + poll_fd.fd = -1; + } else { + keep_polling = true; + } + } else if (poll_fd.revents & err_mask != 0) { + // Exclude the fds that signaled an error. + poll_fd.fd = -1; + } else if (poll_fd.fd != -1) { + keep_polling = true; + } + } + return keep_polling; + } + }; +} + +fn windowsAsyncRead( + handle: os.windows.HANDLE, + overlapped: *os.windows.OVERLAPPED, + fifo: *PollFifo, + bump_amt: usize, +) !enum { pending, closed } { + while (true) { + const buf = try fifo.writableWithSize(bump_amt); + var read_bytes: u32 = undefined; + const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped); + if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) { + .IO_PENDING => .pending, + .BROKEN_PIPE => .closed, + else => |err| os.windows.unexpectedError(err), + }; + fifo.update(read_bytes); + } +} + +/// Given an enum, returns a struct with fields of that enum, each field +/// representing an I/O stream for polling. +pub fn PollFiles(comptime StreamEnum: type) type { + const enum_fields = @typeInfo(StreamEnum).Enum.fields; + var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined; + for (&struct_fields, enum_fields) |*struct_field, enum_field| { + struct_field.* = .{ + .name = enum_field.name, + .type = fs.File, + .default_value = null, + .is_comptime = false, + .alignment = @alignOf(fs.File), + }; + } + return @Type(.{ .Struct = .{ + .layout = .Auto, + .fields = &struct_fields, + .decls = &.{}, + .is_tuple = false, + } }); +} + test { _ = @import("io/bit_reader.zig"); _ = @import("io/bit_writer.zig");