Skip to content

Commit 7c045f0

Browse files
authored
Serialize zeroed-out remote refs to non-ClusterSerializer objects (#22836)
1 parent eb133ab commit 7c045f0

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed

base/distributed/remotecall.jl

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ mutable struct Future <: AbstractRemoteRef
2020

2121
Future(w::Int, rrid::RRID) = Future(w, rrid, Nullable{Any}())
2222
Future(w::Int, rrid::RRID, v) = (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
23+
24+
Future(t::Tuple) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
2325
end
2426

2527
mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
@@ -31,6 +33,10 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
3133
r = new(w, rrid.whence, rrid.id)
3234
return test_existing_ref(r)
3335
end
36+
37+
function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel
38+
return new(t[1],t[2],t[3])
39+
end
3440
end
3541

3642
function test_existing_ref(r::AbstractRemoteRef)
@@ -273,29 +279,29 @@ end
273279

274280
channel_type(rr::RemoteChannel{T}) where {T} = T
275281

276-
serialize(s::AbstractSerializer, f::Future) = serialize(s, f, isnull(f.v))
277-
serialize(s::AbstractSerializer, rr::RemoteChannel) = serialize(s, rr, true)
278-
function serialize(s::AbstractSerializer, rr::AbstractRemoteRef, addclient)
282+
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, isnull(f.v))
283+
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
284+
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
279285
if addclient
280286
p = worker_id_from_socket(s.io)
281287
(p !== rr.where) && send_add_client(rr, p)
282288
end
283-
invoke(serialize, Tuple{AbstractSerializer, Any}, s, rr)
289+
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
284290
end
285291

286-
function deserialize(s::AbstractSerializer, t::Type{<:Future})
292+
function deserialize(s::ClusterSerializer, t::Type{<:Future})
287293
f = deserialize_rr(s,t)
288294
Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
289295
end
290296

291-
function deserialize(s::AbstractSerializer, t::Type{<:RemoteChannel})
297+
function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel})
292298
rr = deserialize_rr(s,t)
293299
# call ctor to make sure this rr gets added to the client_refs table
294300
RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id))
295301
end
296302

297303
function deserialize_rr(s, t)
298-
rr = invoke(deserialize, Tuple{AbstractSerializer, DataType}, s, t)
304+
rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
299305
if rr.where == myid()
300306
# send_add_client() is not executed when the ref is being
301307
# serialized to where it exists
@@ -304,6 +310,18 @@ function deserialize_rr(s, t)
304310
rr
305311
end
306312

313+
# Future and RemoteChannel are serializable only in a running cluster.
314+
# Serialize zeroed-out values to non ClusterSerializer objects
315+
function serialize(s::AbstractSerializer, ::Future)
316+
zero_fut = Future((0,0,0,Nullable{Any}()))
317+
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
318+
end
319+
320+
function serialize(s::AbstractSerializer, ::RemoteChannel)
321+
zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
322+
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
323+
end
324+
307325

308326
# make a thunk to call f on args in a way that simulates what would happen if
309327
# the function were sent elsewhere

doc/src/manual/parallel-computing.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -781,14 +781,16 @@ julia> @elapsed while n > 0 # print out results
781781

782782
## Remote References and Distributed Garbage Collection
783783

784-
Objects referred to by remote references can be freed only when *all* held references in the cluster
785-
are deleted.
784+
Objects referred to by remote references can be freed only when *all* held references
785+
in the cluster are deleted.
786786

787787
The node where the value is stored keeps track of which of the workers have a reference to it.
788788
Every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is serialized to a worker,
789789
the node pointed to by the reference is notified. And every time a [`RemoteChannel`](@ref) or
790790
a (unfetched) [`Future`](@ref) is garbage collected locally, the node owning the value is again
791-
notified.
791+
notified. This is implemented in an internal cluster aware serializer. Remote references are only
792+
valid in the context of a running cluster. Serializing and deserializing references to and from
793+
regular `IO` objects is not supported.
792794

793795
The notifications are done via sending of "tracking" messages--an "add reference" message when
794796
a reference is serialized to a different process and a "delete reference" message when a reference

test/distributed_exec.jl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,27 @@ test_indexing(Future(id_other))
268268
test_indexing(RemoteChannel())
269269
test_indexing(RemoteChannel(id_other))
270270

271+
# Test ser/deser to non-ClusterSerializer objects.
272+
function test_regular_io_ser(ref::Base.Distributed.AbstractRemoteRef)
273+
io = IOBuffer()
274+
serialize(io, ref)
275+
seekstart(io)
276+
ref2 = deserialize(io)
277+
for fld in fieldnames(typeof(ref))
278+
v = getfield(ref2, fld)
279+
if isa(v, Number)
280+
@test v === zero(typeof(v))
281+
elseif isa(v, Nullable)
282+
@test v === Nullable{Any}()
283+
else
284+
error(string("Add test for field ", fld))
285+
end
286+
end
287+
end
288+
289+
test_regular_io_ser(Future())
290+
test_regular_io_ser(RemoteChannel())
291+
271292
dims = (20,20,20)
272293

273294
if Sys.islinux()

0 commit comments

Comments
 (0)