diff --git a/fixtures/async/container/supervisor/a_server.rb b/fixtures/async/container/supervisor/a_server.rb index 37276bb..b3f81e6 100644 --- a/fixtures/async/container/supervisor/a_server.rb +++ b/fixtures/async/container/supervisor/a_server.rb @@ -58,6 +58,14 @@ def around let(:monitors) {[registration_monitor]} let(:server) {Async::Container::Supervisor::Server.new(endpoint: @bound_endpoint, monitors: monitors)} + def restart_supervisor + @server_task&.stop + + @server_task = reactor.async do + server.run + end + end + before do @bound_endpoint = endpoint.bound diff --git a/lib/async/container/supervisor/client.rb b/lib/async/container/supervisor/client.rb index 05d4490..211e165 100644 --- a/lib/async/container/supervisor/client.rb +++ b/lib/async/container/supervisor/client.rb @@ -48,11 +48,11 @@ def connect # Run the client in a loop, reconnecting if necessary. def run - Async(annotation: "Supervisor Client", transient: true) do + Async(annotation: "Supervisor Client", transient: true) do |task| loop do connection = connect! - Async do + connected_task = task.async do connected!(connection) end @@ -61,6 +61,10 @@ def run Console.error(self, "Connection failed:", exception: error) sleep(rand) ensure + # Ensure any tasks that were created during connection are stopped: + connected_task&.stop + + # Close the connection itself: connection&.close end end diff --git a/test/async/container/client.rb b/test/async/container/client.rb index fc250b8..257902e 100644 --- a/test/async/container/client.rb +++ b/test/async/container/client.rb @@ -32,5 +32,30 @@ client_task.stop end + + it "does not leak fibers when connected! creates tasks and reconnection occurs" do + state = Thread::Queue.new + + mock(client) do |mock| + mock.replace(:connected!) do + state << :connected + + Async do + sleep + ensure + state << :disconnected + end + end + end + + client_task = client.run + expect(state.pop).to be == :connected + + # Interrupt the supervisor: + restart_supervisor + + expect(state.pop).to be == :disconnected + expect(state.pop).to be == :connected + end end end