Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions src/ctask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ end

function Base.showerror(io::IO, ex::CTaskException)
println(io, "CTaskException:")
showerror(io, getproperty(ex.task, :exception), getproperty(ex.task, :backtrace))
bt = @static if VERSION < v"1.6.0-DEV.1145"
ct.backtrace
else
ct.storage[:_libtask_bt]
end
showerror(io, ex.task.exception, bt)
end

# Utility function for self-copying mechanism
Expand All @@ -37,8 +42,7 @@ function n_copies(t::Task)
end

function enable_stack_copying(t::Task)
state = getproperty(t, :state)
if state !== :runnable && state !== :done
if istaskfailed(t)
error("only runnable or finished tasks' stack can be copied.")
end
return ccall((:jl_enable_stack_copying, libtask_julia), Any, (Any,), t)::Task
Expand Down Expand Up @@ -72,11 +76,12 @@ function task_wrapper(func)
ct = _current_task()
@static if VERSION < v"1.6.0-DEV.1145"
ct.exception = ex
ct.backtrace = catch_backtrace()
else
ct._isexception = true
ct.storage[:_libtask_bt] = catch_backtrace()
end
ct.result = ex
ct.backtrace = catch_backtrace()
ct.storage === nothing && (ct.storage = IdDict())
ct.storage[:_libtask_state] = :failed
wait()
Expand All @@ -88,8 +93,7 @@ end

function Base.copy(ctask::CTask)
task = ctask.task
state = getproperty(task, :state)
if state !== :runnable && state !== :done
if istaskfailed(task)
error("only runnable or finished tasks can be copied.")
end

Expand All @@ -102,13 +106,6 @@ function Base.copy(ctask::CTask)
newtask.code = task.code
setstate!(newtask, getstate(task))
newtask.result = task.result
@static if VERSION < v"1.1"
newtask.parent = task.parent
end

if isdefined(task, :last)
newtask.last = nothing
end

return CTask(newtask)
end
Expand Down Expand Up @@ -139,13 +136,9 @@ function produce(v)
end

# Internal check to make sure that it is possible to switch to the consumer.
@assert getproperty(task, :state) in (:runnable, :queued)
@assert !istaskdone(task) && !istaskfailed(task)

@static if VERSION < v"1.1.9999"
task.state === :queued && yield()
else
task.queue !== nothing && yield()
end
task.queue !== nothing && yield()

if empty
# Switch to the consumer.
Expand Down Expand Up @@ -203,7 +196,7 @@ function consume(ctask::CTask, values...)
push!(producer.storage[:consumers].waitq, ct)
end

if getproperty(producer, :state) === :runnable
if !istaskdone(producer) && !istaskfailed(producer)
# Switch to the producer.
schedule(producer)
yield()
Expand All @@ -214,7 +207,7 @@ function consume(ctask::CTask, values...)
end

# If the task failed, throw an exception.
_istaskfailed(producer) && throw(CTaskException(producer))
istaskfailed(producer) && throw(CTaskException(producer))

# If the task is done return the result.
istaskdone(producer) && return producer.result
Expand All @@ -223,14 +216,6 @@ function consume(ctask::CTask, values...)
wait()
end

function _istaskfailed(task::Task)
@static if VERSION < v"1.3"
return task.state === :failed
else
return Base.istaskfailed(task)
end
end

function getstate(task::Task)
@static if VERSION < v"1.6.0-DEV.618"
return task.state
Expand All @@ -254,4 +239,3 @@ function setstate!(task::Task, state)
end
end
end