diff --git a/base/distributed/remotecall.jl b/base/distributed/remotecall.jl index 262828171283b..e39a59e77e49e 100644 --- a/base/distributed/remotecall.jl +++ b/base/distributed/remotecall.jl @@ -20,6 +20,8 @@ mutable struct Future <: AbstractRemoteRef Future(w::Int, rrid::RRID) = Future(w, rrid, Nullable{Any}()) Future(w::Int, rrid::RRID, v) = (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r)) + + Future(t::Tuple) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances end mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef @@ -31,6 +33,10 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef r = new(w, rrid.whence, rrid.id) return test_existing_ref(r) end + + function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel + return new(t[1],t[2],t[3]) + end end function test_existing_ref(r::AbstractRemoteRef) @@ -273,29 +279,29 @@ end channel_type(rr::RemoteChannel{T}) where {T} = T -serialize(s::AbstractSerializer, f::Future) = serialize(s, f, isnull(f.v)) -serialize(s::AbstractSerializer, rr::RemoteChannel) = serialize(s, rr, true) -function serialize(s::AbstractSerializer, rr::AbstractRemoteRef, addclient) +serialize(s::ClusterSerializer, f::Future) = serialize(s, f, isnull(f.v)) +serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true) +function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient) if addclient p = worker_id_from_socket(s.io) (p !== rr.where) && send_add_client(rr, p) end - invoke(serialize, Tuple{AbstractSerializer, Any}, s, rr) + invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr) end -function deserialize(s::AbstractSerializer, t::Type{<:Future}) +function deserialize(s::ClusterSerializer, t::Type{<:Future}) f = deserialize_rr(s,t) Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table end -function deserialize(s::AbstractSerializer, t::Type{<:RemoteChannel}) +function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel}) rr = deserialize_rr(s,t) # call ctor to make sure this rr gets added to the client_refs table RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id)) end function deserialize_rr(s, t) - rr = invoke(deserialize, Tuple{AbstractSerializer, DataType}, s, t) + rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t) if rr.where == myid() # send_add_client() is not executed when the ref is being # serialized to where it exists @@ -304,6 +310,18 @@ function deserialize_rr(s, t) rr end +# Future and RemoteChannel are serializable only in a running cluster. +# Serialize zeroed-out values to non ClusterSerializer objects +function serialize(s::AbstractSerializer, ::Future) + zero_fut = Future((0,0,0,Nullable{Any}())) + invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut) +end + +function serialize(s::AbstractSerializer, ::RemoteChannel) + zero_rc = RemoteChannel{Channel{Any}}((0,0,0)) + invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc) +end + # make a thunk to call f on args in a way that simulates what would happen if # the function were sent elsewhere diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index e8f93351f0771..5ad25d4e737e4 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -781,14 +781,16 @@ julia> @elapsed while n > 0 # print out results ## Remote References and Distributed Garbage Collection -Objects referred to by remote references can be freed only when *all* held references in the cluster -are deleted. +Objects referred to by remote references can be freed only when *all* held references +in the cluster are deleted. The node where the value is stored keeps track of which of the workers have a reference to it. Every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is serialized to a worker, the node pointed to by the reference is notified. And every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is garbage collected locally, the node owning the value is again -notified. +notified. This is implemented in an internal cluster aware serializer. Remote references are only +valid in the context of a running cluster. Serializing and deserializing references to and from +regular `IO` objects is not supported. The notifications are done via sending of "tracking" messages--an "add reference" message when a reference is serialized to a different process and a "delete reference" message when a reference diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 3b430a58ba275..8a7f12633fde0 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -268,6 +268,27 @@ test_indexing(Future(id_other)) test_indexing(RemoteChannel()) test_indexing(RemoteChannel(id_other)) +# Test ser/deser to non-ClusterSerializer objects. +function test_regular_io_ser(ref::Base.Distributed.AbstractRemoteRef) + io = IOBuffer() + serialize(io, ref) + seekstart(io) + ref2 = deserialize(io) + for fld in fieldnames(typeof(ref)) + v = getfield(ref2, fld) + if isa(v, Number) + @test v === zero(typeof(v)) + elseif isa(v, Nullable) + @test v === Nullable{Any}() + else + error(string("Add test for field ", fld)) + end + end +end + +test_regular_io_ser(Future()) +test_regular_io_ser(RemoteChannel()) + dims = (20,20,20) if Sys.islinux()