Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions base/distributed/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mutable struct Future <: AbstractRemoteRef

Future(w::Int, rrid::RRID) = Future(w, rrid, Nullable{Any}())
Future(w::Int, rrid::RRID, v) = (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))

Future(t::Tuple) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
end

mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
Expand All @@ -31,6 +33,10 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
r = new(w, rrid.whence, rrid.id)
return test_existing_ref(r)
end

function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel
return new(t[1],t[2],t[3])
end
end

function test_existing_ref(r::AbstractRemoteRef)
Expand Down Expand Up @@ -273,29 +279,29 @@ end

channel_type(rr::RemoteChannel{T}) where {T} = T

serialize(s::AbstractSerializer, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::AbstractSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::AbstractSerializer, rr::AbstractRemoteRef, addclient)
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
end
invoke(serialize, Tuple{AbstractSerializer, Any}, s, rr)
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
end

function deserialize(s::AbstractSerializer, t::Type{<:Future})
function deserialize(s::ClusterSerializer, t::Type{<:Future})
f = deserialize_rr(s,t)
Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
end

function deserialize(s::AbstractSerializer, t::Type{<:RemoteChannel})
function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel})
rr = deserialize_rr(s,t)
# call ctor to make sure this rr gets added to the client_refs table
RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id))
end

function deserialize_rr(s, t)
rr = invoke(deserialize, Tuple{AbstractSerializer, DataType}, s, t)
rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
if rr.where == myid()
# send_add_client() is not executed when the ref is being
# serialized to where it exists
Expand All @@ -304,6 +310,18 @@ function deserialize_rr(s, t)
rr
end

# Future and RemoteChannel are serializable only in a running cluster.
# Serialize zeroed-out values to non ClusterSerializer objects
function serialize(s::AbstractSerializer, ::Future)
zero_fut = Future((0,0,0,Nullable{Any}()))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
end

function serialize(s::AbstractSerializer, ::RemoteChannel)
zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
end


# make a thunk to call f on args in a way that simulates what would happen if
# the function were sent elsewhere
Expand Down
8 changes: 5 additions & 3 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,14 +781,16 @@ julia> @elapsed while n > 0 # print out results

## Remote References and Distributed Garbage Collection

Objects referred to by remote references can be freed only when *all* held references in the cluster
are deleted.
Objects referred to by remote references can be freed only when *all* held references
in the cluster are deleted.

The node where the value is stored keeps track of which of the workers have a reference to it.
Every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is serialized to a worker,
the node pointed to by the reference is notified. And every time a [`RemoteChannel`](@ref) or
a (unfetched) [`Future`](@ref) is garbage collected locally, the node owning the value is again
notified.
notified. This is implemented in an internal cluster aware serializer. Remote references are only
valid in the context of a running cluster. Serializing and deserializing references to and from
regular `IO` objects is not supported.

The notifications are done via sending of "tracking" messages--an "add reference" message when
a reference is serialized to a different process and a "delete reference" message when a reference
Expand Down
21 changes: 21 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,27 @@ test_indexing(Future(id_other))
test_indexing(RemoteChannel())
test_indexing(RemoteChannel(id_other))

# Test ser/deser to non-ClusterSerializer objects.
function test_regular_io_ser(ref::Base.Distributed.AbstractRemoteRef)
io = IOBuffer()
serialize(io, ref)
seekstart(io)
ref2 = deserialize(io)
for fld in fieldnames(typeof(ref))
v = getfield(ref2, fld)
if isa(v, Number)
@test v === zero(typeof(v))
elseif isa(v, Nullable)
@test v === Nullable{Any}()
else
error(string("Add test for field ", fld))
end
end
end

test_regular_io_ser(Future())
test_regular_io_ser(RemoteChannel())

dims = (20,20,20)

if Sys.islinux()
Expand Down