From 5e942d600cb97746838a71d9947eea2d0bc87ac1 Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Sun, 17 Feb 2019 16:01:27 -0800 Subject: [PATCH 1/6] this is waiting pids from diffy causing multi-node runs to fail. only wait your own pids not any child from the parent --- lib/octocatalog-diff/util/parallel.rb | 36 +++++++++++++++------------ 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/lib/octocatalog-diff/util/parallel.rb b/lib/octocatalog-diff/util/parallel.rb index 36e7ba58..4c5925fb 100644 --- a/lib/octocatalog-diff/util/parallel.rb +++ b/lib/octocatalog-diff/util/parallel.rb @@ -129,22 +129,26 @@ def self.run_tasks_parallel(result, task_array, logger) # Waiting for children and handling results while pidmap.any? - this_pid, exit_obj = Process.wait2(0) - next unless this_pid && pidmap.key?(this_pid) - index = pidmap[this_pid][:index] - exitstatus = exit_obj.exitstatus - raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil? - raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero? - - input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat")) - result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad - time_delta = Time.now - pidmap[this_pid][:start_time] - pidmap.delete(this_pid) - - logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes" - - next if result[index].status - return result[index].exception + pidmap.each do |pid, stuff| + status = Process.waitpid2(pid, Process::WNOHANG) + next if status.nil? + this_pid, exit_obj = status + next unless this_pid && pidmap.key?(this_pid) + index = pidmap[this_pid][:index] + exitstatus = exit_obj.exitstatus + raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil? + raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero? + + input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat")) + result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad + time_delta = Time.now - pidmap[this_pid][:start_time] + pidmap.delete(this_pid) + + logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes" + + next if result[index].status + return result[index].exception + end end logger.debug 'All child processes completed with no exceptions raised' From cc8c21dea0b025b29e26817109cdaa9f96b554b4 Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Mon, 18 Feb 2019 01:15:37 -0800 Subject: [PATCH 2/6] test that non-parallel launched processes dont get waited --- .../tests/util/parallel_spec.rb | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/spec/octocatalog-diff/tests/util/parallel_spec.rb b/spec/octocatalog-diff/tests/util/parallel_spec.rb index 82ba232d..4dcbbb4b 100644 --- a/spec/octocatalog-diff/tests/util/parallel_spec.rb +++ b/spec/octocatalog-diff/tests/util/parallel_spec.rb @@ -16,6 +16,65 @@ end context 'with parallel processing' do + it 'should only Process.wait() its own children' do + class Foo + def one(arg, _logger = nil) + 'one ' + arg + end + + def two(arg, _logger = nil) + 'two ' + arg + end + + def dont_wait_me_bro(sleep_for = 1) + # do we need a rescue block here? + pid = fork do + sleep sleep_for + Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting + end + pid + end + + def wait_on_me(pid) + status = nil + # just in case status never equals anything + count = 100 + while status.nil? or count <= 0 + status = Process.waitpid(pid, Process::WNOHANG) + count -= 1 + end + end + end + + c = Foo.new + # start my non-parallel process first + just_a_guy = c.dont_wait_me_bro() + + one = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:one), args: 'abc', description: 'test1') + two = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:two), args: 'def', description: 'test2') + result = OctocatalogDiff::Util::Parallel.run_tasks([one, two], nil, true) + expect(result).to be_a_kind_of(Array) + expect(result.size).to eq(2) + + one_result = result[0] + expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result) + expect(one_result.status).to eq(true) + expect(one_result.exception).to eq(nil) + expect(one_result.output).to match(/^one abc/) + + two_result = result[1] + expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result) + expect(two_result.status).to eq(true) + expect(two_result.exception).to eq(nil) + expect(two_result.output).to match(/^two def/) + + # just_a_guy should still be need to be waited + result = c.wait_on_me(just_a_guy) + expect(result).to be_a_kind_of(Array) + # test result and check for error conditions + + end + it 'should parallelize and return task results' do class Foo def one(arg, _logger = nil) From 40ca72c56b6d47af62f64bbc8e80c4fb781e9da0 Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Mon, 18 Feb 2019 01:33:47 -0800 Subject: [PATCH 3/6] return the expected object --- spec/octocatalog-diff/tests/util/parallel_spec.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spec/octocatalog-diff/tests/util/parallel_spec.rb b/spec/octocatalog-diff/tests/util/parallel_spec.rb index 4dcbbb4b..d3ae54a8 100644 --- a/spec/octocatalog-diff/tests/util/parallel_spec.rb +++ b/spec/octocatalog-diff/tests/util/parallel_spec.rb @@ -39,10 +39,11 @@ def wait_on_me(pid) status = nil # just in case status never equals anything count = 100 - while status.nil? or count <= 0 - status = Process.waitpid(pid, Process::WNOHANG) + while status.nil? or count > 0 count -= 1 + status = Process.waitpid2(pid, Process::WNOHANG) end + status end end From e9bb0b71e74c3bdc6602877078c7d14c664605d0 Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Mon, 18 Feb 2019 11:13:05 -0800 Subject: [PATCH 4/6] fix issues noted in rubocop test run --- lib/octocatalog-diff/util/parallel.rb | 2 +- spec/octocatalog-diff/tests/util/parallel_spec.rb | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/octocatalog-diff/util/parallel.rb b/lib/octocatalog-diff/util/parallel.rb index 4c5925fb..18de9dd8 100644 --- a/lib/octocatalog-diff/util/parallel.rb +++ b/lib/octocatalog-diff/util/parallel.rb @@ -129,7 +129,7 @@ def self.run_tasks_parallel(result, task_array, logger) # Waiting for children and handling results while pidmap.any? - pidmap.each do |pid, stuff| + pidmap.each do |pid| status = Process.waitpid2(pid, Process::WNOHANG) next if status.nil? this_pid, exit_obj = status diff --git a/spec/octocatalog-diff/tests/util/parallel_spec.rb b/spec/octocatalog-diff/tests/util/parallel_spec.rb index d3ae54a8..1cc349bd 100644 --- a/spec/octocatalog-diff/tests/util/parallel_spec.rb +++ b/spec/octocatalog-diff/tests/util/parallel_spec.rb @@ -39,7 +39,7 @@ def wait_on_me(pid) status = nil # just in case status never equals anything count = 100 - while status.nil? or count > 0 + while status.nil? || count > 0 count -= 1 status = Process.waitpid2(pid, Process::WNOHANG) end @@ -49,7 +49,7 @@ def wait_on_me(pid) c = Foo.new # start my non-parallel process first - just_a_guy = c.dont_wait_me_bro() + just_a_guy = c.dont_wait_me_bro one = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:one), args: 'abc', description: 'test1') two = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:two), args: 'def', description: 'test2') @@ -539,4 +539,4 @@ def validate(arg, _logger = nil, _extra_args = {}) end.to raise_error(ArgumentError, /Element .* must be a OctocatalogDiff::Util::Parallel::Task, not a /) end end -end +end \ No newline at end of file From d0f62be462ff26eae1a16fb3067578885406d825 Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Mon, 18 Feb 2019 11:15:57 -0800 Subject: [PATCH 5/6] the actual pid is in the 1st index of the pid array --- lib/octocatalog-diff/util/parallel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/octocatalog-diff/util/parallel.rb b/lib/octocatalog-diff/util/parallel.rb index 18de9dd8..4f4197ed 100644 --- a/lib/octocatalog-diff/util/parallel.rb +++ b/lib/octocatalog-diff/util/parallel.rb @@ -130,7 +130,7 @@ def self.run_tasks_parallel(result, task_array, logger) # Waiting for children and handling results while pidmap.any? pidmap.each do |pid| - status = Process.waitpid2(pid, Process::WNOHANG) + status = Process.waitpid2(pid[0], Process::WNOHANG) next if status.nil? this_pid, exit_obj = status next unless this_pid && pidmap.key?(this_pid) From 56bbbb86b16fc51db68dfa560d2a609ab317a57c Mon Sep 17 00:00:00 2001 From: Isaiah Frantz Date: Mon, 18 Feb 2019 11:47:29 -0800 Subject: [PATCH 6/6] moar rubobop's --- spec/octocatalog-diff/tests/util/parallel_spec.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spec/octocatalog-diff/tests/util/parallel_spec.rb b/spec/octocatalog-diff/tests/util/parallel_spec.rb index 1cc349bd..3cb2a8b7 100644 --- a/spec/octocatalog-diff/tests/util/parallel_spec.rb +++ b/spec/octocatalog-diff/tests/util/parallel_spec.rb @@ -38,7 +38,7 @@ def dont_wait_me_bro(sleep_for = 1) def wait_on_me(pid) status = nil # just in case status never equals anything - count = 100 + count = 100 while status.nil? || count > 0 count -= 1 status = Process.waitpid2(pid, Process::WNOHANG) @@ -73,7 +73,6 @@ def wait_on_me(pid) result = c.wait_on_me(just_a_guy) expect(result).to be_a_kind_of(Array) # test result and check for error conditions - end it 'should parallelize and return task results' do @@ -539,4 +538,4 @@ def validate(arg, _logger = nil, _extra_args = {}) end.to raise_error(ArgumentError, /Element .* must be a OctocatalogDiff::Util::Parallel::Task, not a /) end end -end \ No newline at end of file +end