Skip to content

Commit 0cc2b96

Browse files
authored
Merge pull request #12807 from JuliaLang/jn/open_cmd_process
Change open(cmd) to return Process instead of Tuple
2 parents df066df + 505dec5 commit 0cc2b96

File tree

9 files changed

+106
-69
lines changed

9 files changed

+106
-69
lines changed

base/deprecated.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,5 +1324,18 @@ end
13241324

13251325
# END 0.6 deprecations
13261326

1327+
# BEGIN 0.7 deprecations
1328+
1329+
# 12807
1330+
start(::Union{Process, ProcessChain}) = 1
1331+
done(::Union{Process, ProcessChain}, i::Int) = (i == 3)
1332+
next(p::Union{Process, ProcessChain}, i::Int) = (getindex(p, i), i + 1)
1333+
@noinline function getindex(p::Union{Process, ProcessChain}, i::Int)
1334+
depwarn("open(cmd) now returns only a Process<:IO object", :getindex)
1335+
return i == 1 ? getfield(p, p.openstream) : p
1336+
end
1337+
1338+
# END 0.7 deprecations
1339+
13271340
# BEGIN 1.0 deprecations
13281341
# END 1.0 deprecations

base/distributed/cluster.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,8 @@ function launch_additional(np::Integer, cmd::Cmd)
525525
addresses = Vector{Any}(np)
526526

527527
for i in 1:np
528-
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
529-
io_objs[i] = io
528+
io = open(detach(cmd))
529+
io_objs[i] = io.out
530530
end
531531

532532
for (i,io) in enumerate(io_objs)

base/distributed/managers.jl

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
201201
# detach launches the command in a new process group, allowing it to outlive
202202
# the initial julia process (Ctrl-C and teardown methods are handled through messages)
203203
# for the launched processes.
204-
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
204+
io = open(detach(cmd))
205205

206206
wconfig = WorkerConfig()
207-
wconfig.io = io
207+
wconfig.io = io.out
208208
wconfig.host = host
209209
wconfig.tunnel = params[:tunnel]
210210
wconfig.sshflags = sshflags
@@ -320,12 +320,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
320320
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
321321

322322
for i in 1:manager.np
323-
io, pobj = open(pipeline(detach(
324-
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)),
325-
stderr=STDERR), "r")
323+
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`
324+
io = open(detach(setenv(cmd, dir=dir)))
326325
wconfig = WorkerConfig()
327-
wconfig.process = pobj
328-
wconfig.io = io
326+
wconfig.process = io
327+
wconfig.io = io.out
329328
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
330329
push!(launched, wconfig)
331330
end

base/loading.jl

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -599,14 +599,15 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
599599
eval(Main, deserialize(STDIN))
600600
end
601601
"""
602-
io, pobj = open(pipeline(detach(`$(julia_cmd()) -O0
603-
--output-ji $output --output-incremental=yes
604-
--startup-file=no --history-file=no
605-
--color=$(have_color ? "yes" : "no")
606-
--eval $code_object`), stderr=STDERR),
607-
"w", STDOUT)
602+
io = open(pipeline(detach(`$(julia_cmd()) -O0
603+
--output-ji $output --output-incremental=yes
604+
--startup-file=no --history-file=no
605+
--color=$(have_color ? "yes" : "no")
606+
--eval $code_object`), stderr=STDERR),
607+
"w", STDOUT)
608+
in = io.in
608609
try
609-
serialize(io, quote
610+
serialize(in, quote
610611
empty!(Base.LOAD_PATH)
611612
append!(Base.LOAD_PATH, $LOAD_PATH)
612613
empty!(Base.LOAD_CACHE_PATH)
@@ -619,22 +620,21 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
619620
end)
620621
source = source_path(nothing)
621622
if source !== nothing
622-
serialize(io, quote
623+
serialize(in, quote
623624
task_local_storage()[:SOURCE_PATH] = $(source)
624625
end)
625626
end
626-
serialize(io, :(Base.include($(abspath(input)))))
627+
serialize(in, :(Base.include($(abspath(input)))))
627628
if source !== nothing
628-
serialize(io, :(delete!(task_local_storage(), :SOURCE_PATH)))
629+
serialize(in, :(delete!(task_local_storage(), :SOURCE_PATH)))
629630
end
630-
close(io)
631-
wait(pobj)
632-
return pobj
633-
catch
634-
kill(pobj)
635-
close(io)
636-
rethrow()
631+
close(in)
632+
catch ex
633+
close(in)
634+
process_running(io) && Timer(t -> kill(io), 5.0) # wait a short time before killing the process to give it a chance to clean up on its own first
635+
rethrow(ex)
637636
end
637+
return io
638638
end
639639

640640
compilecache(mod::Symbol) = compilecache(string(mod))

base/process.jl

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ mutable struct Process <: AbstractPipe
310310
termsignal::Int32
311311
exitnotify::Condition
312312
closenotify::Condition
313+
openstream::Symbol # for open(cmd) deprecation
313314
function Process(cmd::Cmd, handle::Ptr{Void},
314315
in::Union{Redirectable, Ptr{Void}},
315316
out::Union{Redirectable, Ptr{Void}},
@@ -339,7 +340,9 @@ struct ProcessChain <: AbstractPipe
339340
in::Redirectable
340341
out::Redirectable
341342
err::Redirectable
343+
openstream::Symbol # for open(cmd) deprecation
342344
ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3])
345+
ProcessChain(chain::ProcessChain, openstream::Symbol) = new(chain.processes, chain.in, chain.out, chain.err, openstream) # for open(cmd) deprecation
343346
end
344347
pipe_reader(p::ProcessChain) = p.out
345348
pipe_writer(p::ProcessChain) = p.in
@@ -572,38 +575,55 @@ the process's standard input and `stdio` optionally specifies the process's stan
572575
stream.
573576
"""
574577
function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull)
575-
if mode == "r"
578+
if mode == "r+" || mode == "w+"
579+
other === DevNull || throw(ArgumentError("no other stream for mode rw+"))
580+
in = Pipe()
581+
out = Pipe()
582+
processes = spawn(cmds, (in,out,STDERR))
583+
close(in.out)
584+
close(out.in)
585+
elseif mode == "r"
576586
in = other
577-
out = io = Pipe()
587+
out = Pipe()
578588
processes = spawn(cmds, (in,out,STDERR))
579589
close(out.in)
590+
if isa(processes, ProcessChain) # for open(cmd) deprecation
591+
processes = ProcessChain(processes, :out)
592+
else
593+
processes.openstream = :out
594+
end
580595
elseif mode == "w"
581-
in = io = Pipe()
596+
in = Pipe()
582597
out = other
583598
processes = spawn(cmds, (in,out,STDERR))
584599
close(in.out)
600+
if isa(processes, ProcessChain) # for open(cmd) deprecation
601+
processes = ProcessChain(processes, :in)
602+
else
603+
processes.openstream = :in
604+
end
585605
else
586606
throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\""))
587607
end
588-
return (io, processes)
608+
return processes
589609
end
590610

591611
"""
592612
open(f::Function, command, mode::AbstractString="r", stdio=DevNull)
593613
594-
Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting read or
595-
write stream, then closes the stream and waits for the process to complete. Returns the
596-
value returned by `f`.
614+
Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting process
615+
stream, then closes the input stream and waits for the process to complete.
616+
Returns the value returned by `f`.
597617
"""
598618
function open(f::Function, cmds::AbstractCmd, args...)
599-
io, P = open(cmds, args...)
619+
P = open(cmds, args...)
600620
ret = try
601-
f(io)
602-
catch
621+
f(P)
622+
catch e
603623
kill(P)
604-
rethrow()
624+
rethrow(e)
605625
finally
606-
close(io)
626+
close(P.in)
607627
end
608628
success(P) || pipeline_error(P)
609629
return ret
@@ -618,15 +638,14 @@ Starts running a command asynchronously, and returns a tuple (stdout,stdin,proce
618638
output stream and input stream of the process, and the process object itself.
619639
"""
620640
function readandwrite(cmds::AbstractCmd)
621-
in = Pipe()
622-
out, processes = open(cmds, "r", in)
623-
(out, in, processes)
641+
processes = open(cmds, "r+")
642+
return (processes.out, processes.in, processes)
624643
end
625644

626645
function read(cmd::AbstractCmd, stdin::Redirectable=DevNull)
627-
out, procs = open(cmd, "r", stdin)
628-
bytes = read(out)
629-
!success(procs) && pipeline_error(procs)
646+
procs = open(cmd, "r", stdin)
647+
bytes = read(procs.out)
648+
success(procs) || pipeline_error(procs)
630649
return bytes
631650
end
632651

@@ -651,9 +670,17 @@ function run(cmds::AbstractCmd, args...)
651670
success(ps) ? nothing : pipeline_error(ps)
652671
end
653672

654-
const SIGPIPE = 13
673+
# some common signal numbers that are usually available on all platforms
674+
# and might be useful as arguments to `kill` or testing against `Process.termsignal`
675+
const SIGHUP = 1
676+
const SIGINT = 2
677+
const SIGQUIT = 3 # !windows
678+
const SIGKILL = 9
679+
const SIGPIPE = 13 # !windows
680+
const SIGTERM = 15
681+
655682
function test_success(proc::Process)
656-
assert(process_exited(proc))
683+
@assert process_exited(proc)
657684
if proc.exitcode < 0
658685
#TODO: this codepath is not currently tested
659686
throw(UVError("could not start process $(string(proc.cmd))", proc.exitcode))
@@ -663,8 +690,7 @@ end
663690

664691
function success(x::Process)
665692
wait(x)
666-
kill(x)
667-
test_success(x)
693+
return test_success(x)
668694
end
669695
success(procs::Vector{Process}) = mapreduce(success, &, procs)
670696
success(procs::ProcessChain) = success(procs.processes)
@@ -700,8 +726,6 @@ function pipeline_error(procs::ProcessChain)
700726
error(msg)
701727
end
702728

703-
_jl_kill(p::Process, signum::Integer) = ccall(:uv_process_kill, Int32, (Ptr{Void},Int32), p.handle, signum)
704-
705729
"""
706730
kill(p::Process, signum=SIGTERM)
707731
@@ -710,14 +734,14 @@ Send a signal to a process. The default is to terminate the process.
710734
function kill(p::Process, signum::Integer)
711735
if process_running(p)
712736
@assert p.handle != C_NULL
713-
_jl_kill(p, signum)
737+
ccall(:uv_process_kill, Int32, (Ptr{Void}, Int32), p.handle, signum)
714738
else
715739
Int32(-1)
716740
end
717741
end
718742
kill(ps::Vector{Process}) = map(kill, ps)
719743
kill(ps::ProcessChain) = map(kill, ps.processes)
720-
kill(p::Process) = kill(p, 15) #SIGTERM
744+
kill(p::Process) = kill(p, SIGTERM)
721745

722746
function _contains_newline(bufptr::Ptr{Void}, len::Int32)
723747
return (ccall(:memchr, Ptr{Void}, (Ptr{Void},Int32,Csize_t), bufptr, '\n', len) != C_NULL)

examples/clustermanager/simple/UnixDomainCM.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
1313
sockname = tempname()
1414
try
1515
cmd = `$(params[:exename]) --startup-file=no $(@__FILE__) udwrkr $sockname $cookie`
16-
io, pobj = open(cmd, "r")
16+
pobj = open(cmd)
1717

1818
wconfig = WorkerConfig()
19-
wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
19+
wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj)
2020
push!(launched, wconfig)
2121
notify(c)
2222
catch e

test/distributed_exec.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,11 +1307,11 @@ function Base.launch(manager::ErrorSimulator, params::Dict, launched::Array, c::
13071307
else
13081308
error("Unknown mode")
13091309
end
1310-
io, pobj = open(pipeline(detach(setenv(cmd, dir=dir)); stderr=STDERR), "r")
1310+
io = open(detach(setenv(cmd, dir=dir)))
13111311

13121312
wconfig = WorkerConfig()
1313-
wconfig.process = pobj
1314-
wconfig.io = io
1313+
wconfig.process = io
1314+
wconfig.io = io.out
13151315
push!(launched, wconfig)
13161316
notify(c)
13171317
end

test/spawn.jl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,26 +242,26 @@ let fname = tempname()
242242
function thrash(handle::Ptr{Void})
243243
# Kill the memory, but write a nice low value in the libuv type field to
244244
# trigger the right code path
245-
ccall(:memset,Ptr{Void},(Ptr{Void},Cint,Csize_t),handle,0xee,3*sizeof(Ptr{Void}))
246-
unsafe_store!(convert(Ptr{Cint},handle+2*sizeof(Ptr{Void})),15)
245+
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), handle, 0xee, 3 * sizeof(Ptr{Void}))
246+
unsafe_store!(convert(Ptr{Cint}, handle + 2 * sizeof(Ptr{Void})), 15)
247247
nothing
248248
end
249249
OLD_STDERR = STDERR
250-
redirect_stderr(open("$(escape_string(fname))","w"))
250+
redirect_stderr(open("$(escape_string(fname))", "w"))
251251
# Usually this would be done by GC. Do it manually, to make the failure
252252
# case more reliable.
253253
oldhandle = OLD_STDERR.handle
254254
OLD_STDERR.status = Base.StatusClosing
255255
OLD_STDERR.handle = C_NULL
256-
ccall(:uv_close,Void,(Ptr{Void},Ptr{Void}),oldhandle,cfunction(thrash,Void,(Ptr{Void},)))
256+
ccall(:uv_close, Void, (Ptr{Void}, Ptr{Void}), oldhandle, cfunction(thrash, Void, (Ptr{Void},)))
257257
sleep(1)
258258
import Base.zzzInvalidIdentifier
259259
"""
260260
try
261-
(in,p) = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
262-
write(in,cmd)
263-
close(in)
264-
wait(p)
261+
io = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
262+
write(io, cmd)
263+
close(io)
264+
wait(io)
265265
catch
266266
error("IOStream redirect failed. Child stderr was \n$(readstring(fname))\n")
267267
finally

test/topology.jl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ function Base.launch(manager::TopoTestManager, params::Dict, launched::Array, c:
4141
exename = params[:exename]
4242
exeflags = params[:exeflags]
4343

44+
cmd = `$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())`
45+
cmd = pipeline(detach(setenv(cmd, dir=dir)))
4446
for i in 1:manager.np
45-
io, pobj = open(pipeline(detach(
46-
setenv(`$exename $exeflags --bind-to $(Base.Distributed.LPROC.bind_addr) --worker $(Base.cluster_cookie())`, dir=dir)); stderr=STDERR), "r")
47+
io = open(cmd)
4748
wconfig = WorkerConfig()
48-
wconfig.process = pobj
49-
wconfig.io = io
49+
wconfig.process = io
50+
wconfig.io = io.out
5051
wconfig.ident = i
5152
wconfig.connect_idents = collect(i+2:2:manager.np)
5253
push!(launched, wconfig)

0 commit comments

Comments
 (0)