From 384267feb948759dedf9a83b6e19af5fc9d29128 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Wed, 5 Jan 2022 00:51:51 +0000 Subject: [PATCH 1/4] back port basic refactor from PR#100 --- src/tapedfunction.jl | 37 +++++++++++++++-------- src/tapedtask.jl | 71 +++++++++++++++++++++++++++----------------- 2 files changed, 67 insertions(+), 41 deletions(-) diff --git a/src/tapedfunction.jl b/src/tapedfunction.jl index e2a4fdac..29ffd880 100644 --- a/src/tapedfunction.jl +++ b/src/tapedfunction.jl @@ -1,22 +1,28 @@ -mutable struct Instruction{F} - fun::F - input::Tuple - output - tape -end - +abstract type AbstractInstruction end mutable struct Tape - tape::Vector{Instruction} + tape::Vector{AbstractInstruction} + counter::Int owner end -Tape() = Tape(Vector{Instruction}(), nothing) -Tape(owner) = Tape(Vector{Instruction}(), owner) +mutable struct Instruction{F} <: AbstractInstruction + fun::F + input::Tuple + output + tape::Tape +end + +Tape() = Tape(Vector{AbstractInstruction}(), 1, nothing) +Tape(owner) = Tape(Vector{AbstractInstruction}(), 1, owner) MacroTools.@forward Tape.tape Base.iterate, Base.length MacroTools.@forward Tape.tape Base.push!, Base.getindex, Base.lastindex const NULL_TAPE = Tape() +function setowner!(tape::Tape, owner) + tape.owner = owner +end + mutable struct Box{T} val::T end @@ -24,7 +30,9 @@ end val(x) = x val(x::Box) = x.val box(x) = Box(x) +box(x::Box) = x any_box(x) = Box{Any}(x) +any_box(x::Box) = Box{Any}(x.val) gettape(x) = nothing gettape(x::Instruction) = x.tape @@ -64,10 +72,13 @@ function (instr::Instruction{F})() where F end function run(tape::Tape, args...) - input = map(box, args) - tape[1].input = input + if length(args) > 0 + input = map(box, args) + tape[1].input = input + end for instruction in tape instruction() + tape.counter += 1 end end @@ -188,7 +199,7 @@ function (tf::TapedFunction)(args...) tape = IRTools.evalir(ir, tf.func, args...) tf.ir = ir tf.tape = tape - tape.owner = tf + setowner!(tape, tf) return result(tape) end # TODO: use cache diff --git a/src/tapedtask.jl b/src/tapedtask.jl index a58e3a81..d6feaec7 100644 --- a/src/tapedtask.jl +++ b/src/tapedtask.jl @@ -1,18 +1,18 @@ struct TapedTaskException exc + backtrace end struct TapedTask task::Task tf::TapedFunction - counter::Ref{Int} produce_ch::Channel{Any} consume_ch::Channel{Int} produced_val::Vector{Any} function TapedTask( - t::Task, tf::TapedFunction, counter, pch::Channel{Any}, cch::Channel{Int}) - new(t, tf, counter, pch, cch, Any[]) + t::Task, tf::TapedFunction, pch::Channel{Any}, cch::Channel{Int}) + new(t, tf, pch, cch, Any[]) end end @@ -20,14 +20,14 @@ function TapedTask(tf::TapedFunction, args...) tf.owner != nothing && error("TapedFunction is owned to another task.") # dry_run(tf) isempty(tf.tape) && tf(args...) - counter = Ref{Int}(1) produce_ch = Channel() consume_ch = Channel{Int}() task = @task try - step_in(tf, counter, args) + step_in(tf.tape, args) catch e - put!(produce_ch, TapedTaskException(e)) - # @error "TapedTask Error: " exception=(e, catch_backtrace()) + bt = catch_backtrace() + put!(produce_ch, TapedTaskException(e, bt)) + # @error "TapedTask Error: " exception=(e, bt) rethrow() finally @static if VERSION >= v"1.4" @@ -40,7 +40,7 @@ function TapedTask(tf::TapedFunction, args...) close(produce_ch) close(consume_ch) end - t = TapedTask(task, tf, counter, produce_ch, consume_ch) + t = TapedTask(task, tf, produce_ch, consume_ch) task.storage === nothing && (task.storage = IdDict()) task.storage[:tapedtask] = t tf.owner = t @@ -53,25 +53,34 @@ TapedTask(f, args...) = TapedTask(TapedFunction(f, arity=length(args)), args...) TapedTask(t::TapedTask, args...) = TapedTask(func(t), args...) func(t::TapedTask) = t.tf.func -function step_in(tf::TapedFunction, counter::Ref{Int}, args) - len = length(tf.tape) - if(counter[] <= 1 && length(args) > 0) + +function step_in(t::Tape, args) + len = length(t) + if(t.counter <= 1 && length(args) > 0) input = map(box, args) - tf.tape[1].input = input + t[1].input = input end - while counter[] <= len - tf.tape[counter[]]() + while t.counter <= len + t[t.counter]() # produce and wait after an instruction is done - ttask = tf.owner + ttask = t.owner.owner if length(ttask.produced_val) > 0 val = pop!(ttask.produced_val) put!(ttask.produce_ch, val) take!(ttask.consume_ch) # wait for next consumer end - counter[] += 1 + t.counter += 1 end end +function increase_counter(t::Tape) + t.counter > length(t) && return + # instr = t[t.counter] + # current instrunction must be a produce instruction? + t.counter += 1 +end +next_step(t::TapedTask) = increase_counter(t.tf.tape) + #= # ** Approach (A) to implement `produce`: # Make`produce` a standalone instturction. This approach does NOT @@ -186,18 +195,21 @@ function copy_box(old_box::Box{T}, roster::Dict{UInt64, Any}) where T end copy_box(o, roster::Dict{UInt64, Any}) = o -function Base.copy(t::Tape) +function Base.copy(x::Instruction, on_tape::Tape, roster::Dict{UInt64, Any}) + input = map(x.input) do ob + copy_box(ob, roster) + end + output = copy_box(x.output, roster) + Instruction(x.fun, input, output, on_tape) +end + +function Base.copy(t::Tape, roster::Dict{UInt64, Any}) old_data = t.tape - new_data = Vector{Instruction}() - new_tape = Tape(new_data, t.owner) + new_data = Vector{AbstractInstruction}() + new_tape = Tape(new_data, t.counter, t.owner) - roster = Dict{UInt64, Any}() for x in old_data - input = map(x.input) do ob - copy_box(ob, roster) - end - output = copy_box(x.output, roster) - new_ins = Instruction(x.fun, input, output, new_tape) + new_ins = copy(x, new_tape, roster) push!(new_data, new_ins) end @@ -207,8 +219,9 @@ end function Base.copy(tf::TapedFunction) new_tf = TapedFunction(tf.func; arity=tf.arity) new_tf.ir = tf.ir - new_tape = copy(tf.tape) - new_tape.owner = new_tf + roster = Dict{UInt64, Any}() + new_tape = copy(tf.tape, roster) + setowner!(new_tape, new_tf) new_tf.tape = new_tape return new_tf end @@ -217,6 +230,8 @@ function Base.copy(t::TapedTask) # t.counter[] <= 1 && error("Can't copy a TapedTask which is not running.") tf = copy(t.tf) new_t = TapedTask(tf) - new_t.counter[] = t.counter[] + 1 + new_t.task.storage = copy(t.task.storage) + new_t.task.storage[:tapedtask] = new_t + next_step(new_t) return new_t end From b758fd5b29305f50b6709fd544a82705a86a7697 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Wed, 5 Jan 2022 11:12:40 +0000 Subject: [PATCH 2/4] remove unused code, and some minor changes --- src/tapedfunction.jl | 27 ++------------------------- src/tapedtask.jl | 3 +-- 2 files changed, 3 insertions(+), 27 deletions(-) diff --git a/src/tapedfunction.jl b/src/tapedfunction.jl index 29ffd880..cd98a294 100644 --- a/src/tapedfunction.jl +++ b/src/tapedfunction.jl @@ -1,7 +1,7 @@ abstract type AbstractInstruction end mutable struct Tape - tape::Vector{AbstractInstruction} + tape::Vector{<:AbstractInstruction} counter::Int owner end @@ -31,8 +31,6 @@ val(x) = x val(x::Box) = x.val box(x) = Box(x) box(x::Box) = x -any_box(x) = Box{Any}(x) -any_box(x::Box) = Box{Any}(x.val) gettape(x) = nothing gettape(x::Instruction) = x.tape @@ -88,21 +86,13 @@ function run_and_record!(tape::Tape, f, args...) box(f(map(val, args)...)) catch e @warn e - any_box(nothing) + Box{Any}(nothing) end ins = Instruction(f, args, output, tape) push!(tape, ins) return output end -function dry_record!(tape::Tape, f, args...) - # We don't know the type of box.val now, so we use Box{Any} - output = any_box(nothing) - ins = Instruction(f, args, output, tape) - push!(tape, ins) - return output -end - function unbox_condition(ir) for blk in IRTools.blocks(ir) vars = keys(blk) @@ -207,19 +197,6 @@ function (tf::TapedFunction)(args...) return result(tf.tape) end -function dry_run(tf::TapedFunction) - isempty(tf.tape) || (return tf) - @assert tf.arity >= 0 "TapedFunction need a fixed arity to dry run." - args = fill(nothing, tf.arity) - ir = IRTools.@code_ir tf.func(args...) - ir = intercept(ir; recorder=:dry_record!) - tape = IRTools.evalir(ir, tf.func, args...) - tf.ir = ir - tf.tape = tape - tape.owner = tf - return tf -end - function Base.show(io::IO, tf::TapedFunction) buf = IOBuffer() println(buf, "TapedFunction:") diff --git a/src/tapedtask.jl b/src/tapedtask.jl index d6feaec7..bab15e7c 100644 --- a/src/tapedtask.jl +++ b/src/tapedtask.jl @@ -18,7 +18,6 @@ end function TapedTask(tf::TapedFunction, args...) tf.owner != nothing && error("TapedFunction is owned to another task.") - # dry_run(tf) isempty(tf.tape) && tf(args...) produce_ch = Channel() consume_ch = Channel{Int}() @@ -69,7 +68,7 @@ function step_in(t::Tape, args) put!(ttask.produce_ch, val) take!(ttask.consume_ch) # wait for next consumer end - t.counter += 1 + increase_counter(t) end end From b96eaf74b60b9d8c70e78a97200f8ddb8bc9c9ba Mon Sep 17 00:00:00 2001 From: KDr2 Date: Wed, 5 Jan 2022 12:25:25 +0000 Subject: [PATCH 3/4] move increase_counter --- src/tapedfunction.jl | 8 +++++++- src/tapedtask.jl | 6 ------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tapedfunction.jl b/src/tapedfunction.jl index cd98a294..d190d2be 100644 --- a/src/tapedfunction.jl +++ b/src/tapedfunction.jl @@ -69,6 +69,12 @@ function (instr::Instruction{F})() where F instr.output.val = output end +function increase_counter(t::Tape) + t.counter > length(t) && return + # instr = t[t.counter] + t.counter += 1 +end + function run(tape::Tape, args...) if length(args) > 0 input = map(box, args) @@ -76,7 +82,7 @@ function run(tape::Tape, args...) end for instruction in tape instruction() - tape.counter += 1 + increase_counter(tape) end end diff --git a/src/tapedtask.jl b/src/tapedtask.jl index bab15e7c..37c397c7 100644 --- a/src/tapedtask.jl +++ b/src/tapedtask.jl @@ -72,12 +72,6 @@ function step_in(t::Tape, args) end end -function increase_counter(t::Tape) - t.counter > length(t) && return - # instr = t[t.counter] - # current instrunction must be a produce instruction? - t.counter += 1 -end next_step(t::TapedTask) = increase_counter(t.tf.tape) #= From 86c900b12bf015e7be3566b9c3609b23bd64b626 Mon Sep 17 00:00:00 2001 From: KDr2 Date: Wed, 5 Jan 2022 19:57:22 +0000 Subject: [PATCH 4/4] minor update --- src/tapedfunction.jl | 6 ++++-- src/tapedtask.jl | 11 +++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/tapedfunction.jl b/src/tapedfunction.jl index d190d2be..083cfccc 100644 --- a/src/tapedfunction.jl +++ b/src/tapedfunction.jl @@ -21,6 +21,7 @@ const NULL_TAPE = Tape() function setowner!(tape::Tape, owner) tape.owner = owner + return tape end mutable struct Box{T} @@ -69,10 +70,11 @@ function (instr::Instruction{F})() where F instr.output.val = output end -function increase_counter(t::Tape) +function increase_counter!(t::Tape) t.counter > length(t) && return # instr = t[t.counter] t.counter += 1 + return t end function run(tape::Tape, args...) @@ -82,7 +84,7 @@ function run(tape::Tape, args...) end for instruction in tape instruction() - increase_counter(tape) + increase_counter!(tape) end end diff --git a/src/tapedtask.jl b/src/tapedtask.jl index 37c397c7..e6da976d 100644 --- a/src/tapedtask.jl +++ b/src/tapedtask.jl @@ -1,5 +1,5 @@ struct TapedTaskException - exc + exc::Exception backtrace end @@ -68,11 +68,14 @@ function step_in(t::Tape, args) put!(ttask.produce_ch, val) take!(ttask.consume_ch) # wait for next consumer end - increase_counter(t) + increase_counter!(t) end end -next_step(t::TapedTask) = increase_counter(t.tf.tape) +function next_step!(t::TapedTask) + increase_counter!(t.tf.tape) + return t +end #= # ** Approach (A) to implement `produce`: @@ -225,6 +228,6 @@ function Base.copy(t::TapedTask) new_t = TapedTask(tf) new_t.task.storage = copy(t.task.storage) new_t.task.storage[:tapedtask] = new_t - next_step(new_t) + next_step!(new_t) return new_t end