Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions fixtures/async/container/supervisor/a_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ def around
let(:monitors) {[registration_monitor]}
let(:server) {Async::Container::Supervisor::Server.new(endpoint: @bound_endpoint, monitors: monitors)}

def restart_supervisor
@server_task&.stop

@server_task = reactor.async do
server.run
end
end

before do
@bound_endpoint = endpoint.bound

Expand Down
8 changes: 6 additions & 2 deletions lib/async/container/supervisor/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def connect

# Run the client in a loop, reconnecting if necessary.
def run
Async(annotation: "Supervisor Client", transient: true) do
Async(annotation: "Supervisor Client", transient: true) do |task|
loop do
connection = connect!

Async do
connected_task = task.async do
connected!(connection)
end

Expand All @@ -61,6 +61,10 @@ def run
Console.error(self, "Connection failed:", exception: error)
sleep(rand)
ensure
# Ensure any tasks that were created during connection are stopped:
connected_task&.stop

# Close the connection itself:
connection&.close
end
end
Expand Down
25 changes: 25 additions & 0 deletions test/async/container/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,30 @@

client_task.stop
end

it "does not leak fibers when connected! creates tasks and reconnection occurs" do
state = Thread::Queue.new

mock(client) do |mock|
mock.replace(:connected!) do
state << :connected

Async do
sleep
ensure
state << :disconnected
end
end
end

client_task = client.run
expect(state.pop).to be == :connected

# Interrupt the supervisor:
restart_supervisor

expect(state.pop).to be == :disconnected
expect(state.pop).to be == :connected
end
end
end
Loading