From 3fed7e1bfb714728ead476571d773ebd6a4a0a64 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Mon, 25 Feb 2019 23:13:43 +0800 Subject: [PATCH 1/6] Catch task exception --- src/taskcopy.jl | 15 +++++++++++++-- test/clonetask.jl | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/taskcopy.jl b/src/taskcopy.jl index 4568cc10..a5a2766d 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -72,7 +72,10 @@ produce(v) = begin wait() end - t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable")) + if !(t.state in [:runnable, :queued]) + throw(AssertionError("producer.consumer.state in [:runnable, :queued]")) + end + if t.state == :queued yield() end if empty Base.schedule_and_wait(t, v) ct = current_task() # When a task is copied, ct should be updated to new task ID. @@ -129,5 +132,13 @@ consume(p::Task, values...) = begin push!(p.storage[:consumers].waitq, ct) end - p.state == :runnable ? Base.schedule_and_wait(p) : wait() # don't attempt to queue it twice + if p.state == :runnable + Base.schedule(p) + yield() + + if p.exception != nothing + throw(p.exception) + end + end + wait() end diff --git a/test/clonetask.jl b/test/clonetask.jl index 8376bb47..deda86ab 100644 --- a/test/clonetask.jl +++ b/test/clonetask.jl @@ -43,3 +43,18 @@ a = copy(t); @test consume(t) == 5 @test consume(a) == 6 @test consume(a) == 7 + + +# Breaking test +function g_break() + t = 0 + while true + t[3] = 1 + produce(t) + t = t + 1 + end +end + +t = Task(g_break) + +@test_throws MethodError consume(t) From d4a901452dcd263b525e73ebb3e031ddf7ba094c Mon Sep 17 00:00:00 2001 From: KDr2 Date: Tue, 26 Feb 2019 12:13:03 +0800 Subject: [PATCH 2/6] set result/exception to the proper task --- README.md | 5 +---- src/Libtask.jl | 2 +- src/taskcopy.jl | 33 ++++++++++++++++++++++++++++++++- test/clonetask.jl | 5 ++--- test/tarray.jl | 2 +- test/tref.jl | 4 ++-- 6 files changed, 39 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 3b0a34d9..2b9fe2f2 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,6 @@ function f_ct() end t = CTask(f_ct) -# or t = Task(f_ct) |> enable_stack_copying consume(t) == 0 consume(t) == 1 @@ -42,8 +41,6 @@ function f_ct2() end t = CTask(f_ct2) -# or t = Task(f_ct2) |> enable_stack_copying - consume(t) == 0 consume(t) == 1 @@ -66,7 +63,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t) == 0 consume(t) == 1 diff --git a/src/Libtask.jl b/src/Libtask.jl index 7bd6caf1..2746eb93 100644 --- a/src/Libtask.jl +++ b/src/Libtask.jl @@ -1,6 +1,6 @@ module Libtask -export enable_stack_copying, CTask, consume, produce, TArray, get, tzeros, tfill, TRef +export CTask, consume, produce, TArray, get, tzeros, tfill, TRef include("../deps/deps.jl"); check_deps(); include("taskcopy.jl") diff --git a/src/taskcopy.jl b/src/taskcopy.jl index a5a2766d..ea4de731 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -22,7 +22,38 @@ function enable_stack_copying(t::Task) return ccall((:jl_enable_stack_copying, libtask), Any, (Any,), t)::Task end -CTask(func) = Task(func) |> enable_stack_copying +""" + +task_wrapper is a wordaround for set the result/exception to the +correct task which maybe copied/forked from another one(the original +one). Without this, the result/exception is always sent to the +original task. That is done in `JULIA_PROJECT/src/task.c`, the +function `start_task` and `finish_task`. + +This workaround is not the proper way to do the work it does. The +proper way is refreshing the current_task (the variable `t`) in +`start_task` after the call to `jl_apply` returns. + +""" +function task_wrapper(func) + () -> + try + res = func() + ct = current_task() + ct.result = res + ct.state = :done + wait() + catch ex + ct = current_task() + ct.exception = ex + ct.result = ex + ct.state = :failed + ct.backtrace = catch_backtrace() + wait() + end +end + +CTask(func) = Task(task_wrapper(func)) |> enable_stack_copying function Base.copy(t::Task) t.state != :runnable && t.state != :done && diff --git a/test/clonetask.jl b/test/clonetask.jl index deda86ab..e3c967c9 100644 --- a/test/clonetask.jl +++ b/test/clonetask.jl @@ -12,7 +12,7 @@ function f_ct() end end -t = Task(f_ct) |> enable_stack_copying +t = CTask(f_ct) @test consume(t) == 0 @test consume(t) == 1 @@ -55,6 +55,5 @@ function g_break() end end -t = Task(g_break) - +t = CTask(g_break) @test_throws MethodError consume(t) diff --git a/test/tarray.jl b/test/tarray.jl index c8c2f57e..87de9f85 100644 --- a/test/tarray.jl +++ b/test/tarray.jl @@ -13,7 +13,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t); consume(t) a = copy(t); diff --git a/test/tref.jl b/test/tref.jl index 190a4d8c..ced7211b 100644 --- a/test/tref.jl +++ b/test/tref.jl @@ -12,7 +12,7 @@ function f_cta() end end -t = Task(f_cta) |> enable_stack_copying +t = CTask(f_cta) consume(t); consume(t) a = copy(t); @@ -32,7 +32,7 @@ function dict_test() end end -t = Task(dict_test) |> enable_stack_copying +t = CTask(dict_test) consume(t); consume(t) a = copy(t); From 771d2a6af59c617d334a96493b2dd72e7979c2ee Mon Sep 17 00:00:00 2001 From: KDr2 Date: Tue, 26 Feb 2019 12:13:32 +0800 Subject: [PATCH 3/6] more test for broken task --- test/brokentask.jl | 88 ++++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 1 + 2 files changed, 89 insertions(+) create mode 100644 test/brokentask.jl diff --git a/test/brokentask.jl b/test/brokentask.jl new file mode 100644 index 00000000..8546d141 --- /dev/null +++ b/test/brokentask.jl @@ -0,0 +1,88 @@ +using Libtask +using Test + +r = @testset "Broken Functions Tests" begin + + @testset "Error Test" begin + function ftest() + x = 1 + while true + error("error test") + produce(x) + x += 1 + end + end + + t = CTask(ftest) + try + consume(t) + catch ex + @test isa(ex, ErrorException) + end + @test isa(t.exception, ErrorException) + end + + @testset "OutOfBounds Test Before" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + x[3] = 3 + produce(x[1]) + end + end + + t = CTask(ftest) + try + consume(t) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t.exception, BoundsError) + end + + @testset "OutOfBounds Test After `produce`" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + produce(x[2]) + x[3] = 3 + end + end + + t = CTask(ftest) + @test consume(t) == 2 + try + consume(t) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t.exception, BoundsError) + end + + @testset "OutOfBounds Test After `copy`" begin + function ftest() + x = zeros(2) + while true + x[1] = 1 + x[2] = 2 + produce(x[2]) + x[3] = 3 + end + end + + t = CTask(ftest) + @test consume(t) == 2 + t_copy = copy(t) + try + consume(t_copy) + catch ex + @test isa(ex, BoundsError) + end + @test isa(t_copy.exception, BoundsError) + end +end +Test.print_test_results(r) diff --git a/test/runtests.jl b/test/runtests.jl index e2eb1ddc..a8c02f5b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,4 @@ include("clonetask.jl") +include("brokentask.jl") include("tarray.jl") include("tarray2.jl") From 3e203aa56d056389d412776a2a9a9efa004d8725 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Tue, 26 Feb 2019 14:39:28 +0800 Subject: [PATCH 4/6] set state of task for v1.0 --- src/taskcopy.jl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/taskcopy.jl b/src/taskcopy.jl index ea4de731..2e3e7e9e 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -41,13 +41,17 @@ function task_wrapper(func) res = func() ct = current_task() ct.result = res - ct.state = :done + @static if VERSION >= v"1.1.0" + ct.state = :done + end wait() catch ex ct = current_task() ct.exception = ex ct.result = ex - ct.state = :failed + @static if VERSION >= v"1.1.0" + ct.state = :failed + end ct.backtrace = catch_backtrace() wait() end @@ -168,6 +172,9 @@ consume(p::Task, values...) = begin yield() if p.exception != nothing + @static if VERSION < v"1.1.0" + p.state = :failed + end throw(p.exception) end end From dc2bff570b09b244ac82e07088a4ea73610a0920 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Wed, 27 Feb 2019 11:18:04 +0800 Subject: [PATCH 5/6] docs update --- src/taskcopy.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/taskcopy.jl b/src/taskcopy.jl index 2e3e7e9e..7897549d 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -24,14 +24,16 @@ end """ -task_wrapper is a wordaround for set the result/exception to the + task_wrapper() + +`task_wrapper` is a wordaround for set the result/exception to the correct task which maybe copied/forked from another one(the original one). Without this, the result/exception is always sent to the original task. That is done in `JULIA_PROJECT/src/task.c`, the function `start_task` and `finish_task`. This workaround is not the proper way to do the work it does. The -proper way is refreshing the current_task (the variable `t`) in +proper way is refreshing the `current_task` (the variable `t`) in `start_task` after the call to `jl_apply` returns. """ From 57ee7febdb2d16eb07dd3d3319600ebeb8d0c9bb Mon Sep 17 00:00:00 2001 From: KDr2 Date: Thu, 28 Feb 2019 13:11:07 +0800 Subject: [PATCH 6/6] set state of task after it is queued --- src/taskcopy.jl | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/taskcopy.jl b/src/taskcopy.jl index 7897549d..6d296cf3 100644 --- a/src/taskcopy.jl +++ b/src/taskcopy.jl @@ -43,18 +43,16 @@ function task_wrapper(func) res = func() ct = current_task() ct.result = res - @static if VERSION >= v"1.1.0" - ct.state = :done - end + isa(ct.storage, Nothing) && (ct.storage = IdDict()) + ct.storage[:_libtask_state] = :done wait() catch ex ct = current_task() ct.exception = ex ct.result = ex - @static if VERSION >= v"1.1.0" - ct.state = :failed - end ct.backtrace = catch_backtrace() + isa(ct.storage, Nothing) && (ct.storage = IdDict()) + ct.storage[:_libtask_state] = :failed wait() end end @@ -173,10 +171,10 @@ consume(p::Task, values...) = begin Base.schedule(p) yield() + isa(p.storage, IdDict) && haskey(p.storage, :_libtask_state) && + (p.state = p.storage[:_libtask_state]) + if p.exception != nothing - @static if VERSION < v"1.1.0" - p.state = :failed - end throw(p.exception) end end