11struct TapedTaskException
22 exc
3+ backtrace
34end
45
56struct TapedTask
67 task:: Task
78 tf:: TapedFunction
8- counter:: Ref{Int}
99 produce_ch:: Channel{Any}
1010 consume_ch:: Channel{Int}
1111 produced_val:: Vector{Any}
1212
1313 function TapedTask (
14- t:: Task , tf:: TapedFunction , counter, pch:: Channel{Any} , cch:: Channel{Int} )
15- new (t, tf, counter, pch, cch, Any[])
14+ t:: Task , tf:: TapedFunction , pch:: Channel{Any} , cch:: Channel{Int} )
15+ new (t, tf, pch, cch, Any[])
1616 end
1717end
1818
1919function TapedTask (tf:: TapedFunction , args... )
2020 tf. owner != nothing && error (" TapedFunction is owned to another task." )
2121 # dry_run(tf)
2222 isempty (tf. tape) && tf (args... )
23- counter = Ref {Int} (1 )
2423 produce_ch = Channel ()
2524 consume_ch = Channel {Int} ()
2625 task = @task try
27- step_in (tf, counter , args)
26+ step_in (tf. tape , args)
2827 catch e
29- put! (produce_ch, TapedTaskException (e))
30- # @error "TapedTask Error: " exception=(e, catch_backtrace())
28+ bt = catch_backtrace ()
29+ put! (produce_ch, TapedTaskException (e, bt))
30+ # @error "TapedTask Error: " exception=(e, bt)
3131 rethrow ()
3232 finally
3333 @static if VERSION >= v " 1.4"
@@ -40,7 +40,7 @@ function TapedTask(tf::TapedFunction, args...)
4040 close (produce_ch)
4141 close (consume_ch)
4242 end
43- t = TapedTask (task, tf, counter, produce_ch, consume_ch)
43+ t = TapedTask (task, tf, produce_ch, consume_ch)
4444 task. storage === nothing && (task. storage = IdDict ())
4545 task. storage[:tapedtask ] = t
4646 tf. owner = t
@@ -53,25 +53,42 @@ TapedTask(f, args...) = TapedTask(TapedFunction(f, arity=length(args)), args...)
5353TapedTask (t:: TapedTask , args... ) = TapedTask (func (t), args... )
5454func (t:: TapedTask ) = t. tf. func
5555
56- function step_in (tf :: TapedFunction , counter :: Ref{Int} , args)
57- len = length (tf . tape )
58- if (counter[] <= 1 && length (args) > 0 )
56+ function step_in (t :: Tape , args)
57+ len = length (t )
58+ if (t . counter <= 1 && length (args) > 0 )
5959 input = map (box, args)
60- tf . tape [1 ]. input = input
60+ t [1 ]. input = input
6161 end
62- while counter[] <= len
63- tf. tape[counter[]]()
62+ while t. counter <= len
63+ ins = t[t. counter]
64+ if isa (ins, TapeInstruction)
65+ step_in (ins. subtape, ())
66+ else
67+ ins ()
68+ end
6469 # produce and wait after an instruction is done
65- ttask = tf . owner
70+ ttask = t . owner . owner
6671 if length (ttask. produced_val) > 0
6772 val = pop! (ttask. produced_val)
6873 put! (ttask. produce_ch, val)
6974 take! (ttask. consume_ch) # wait for next consumer
7075 end
71- counter[] += 1
76+ t . counter += 1
7277 end
7378end
7479
80+ function increase_counter (t:: Tape )
81+ t. counter > length (t) && return
82+ instr = t[t. counter]
83+ if isa (instr, TapeInstruction)
84+ increase_counter (instr. subtape)
85+ else
86+ # must be a produce instruction?
87+ t. counter += 1
88+ end
89+ end
90+ next_step (t:: TapedTask ) = increase_counter (t. tf. tape)
91+
7592#=
7693# ** Approach (A) to implement `produce`:
7794# Make`produce` a standalone instturction. This approach does NOT
@@ -94,14 +111,15 @@ function (instr::Instruction{typeof(produce)})()
94111end
95112=#
96113
97-
98114# ** Approach (B) to implement `produce`:
99115# This way has its caveat:
100116# `produce` may deeply hide in an instruction, but not be an instruction
101117# itself, and when we copy a task, the newly copied task will resume from
102118# the instruction after the one which contains this `produce` call. If the
103119# call to `produce` is not the last expression in the instuction, that
104120# instruction will not be whole executed in the copied task.
121+ # With the abilty to trace into nested function call, we can minimize the
122+ # limitation of this caveat.
105123@inline function is_in_tapedtask ()
106124 ct = current_task ()
107125 ct. storage === nothing && return false
114132function produce (val)
115133 is_in_tapedtask () || return nothing
116134 ttask = current_task (). storage[:tapedtask ]
135+ # put!(ttask.produce_ch, val)
136+ # take!(ttask.consume_ch) # wait for next consumer
117137 length (ttask. produced_val) > 1 &&
118138 error (" There is a produced value which is not consumed." )
119139 push! (ttask. produced_val, val)
@@ -186,18 +206,26 @@ function copy_box(old_box::Box{T}, roster::Dict{UInt64, Any}) where T
186206end
187207copy_box (o, roster:: Dict{UInt64, Any} ) = o
188208
189- function Base. copy (t:: Tape )
209+ function Base. copy (x:: Instruction , on_tape:: Tape , roster:: Dict{UInt64, Any} )
210+ input = map (x. input) do ob
211+ copy_box (ob, roster)
212+ end
213+ output = copy_box (x. output, roster)
214+ Instruction (x. fun, input, output, on_tape)
215+ end
216+
217+ function Base. copy (x:: TapeInstruction , on_tape:: Tape , roster:: Dict{UInt64, Any} )
218+ subtape = copy (x. subtape, roster)
219+ TapeInstruction (subtape, on_tape)
220+ end
221+
222+ function Base. copy (t:: Tape , roster:: Dict{UInt64, Any} )
190223 old_data = t. tape
191- new_data = Vector {Instruction } ()
192- new_tape = Tape (new_data, t. owner)
224+ new_data = Vector {AbstractInstruction } ()
225+ new_tape = Tape (new_data, t. counter, t . owner)
193226
194- roster = Dict {UInt64, Any} ()
195227 for x in old_data
196- input = map (x. input) do ob
197- copy_box (ob, roster)
198- end
199- output = copy_box (x. output, roster)
200- new_ins = Instruction (x. fun, input, output, new_tape)
228+ new_ins = copy (x, new_tape, roster)
201229 push! (new_data, new_ins)
202230 end
203231
@@ -207,16 +235,18 @@ end
207235function Base. copy (tf:: TapedFunction )
208236 new_tf = TapedFunction (tf. func; arity= tf. arity)
209237 new_tf. ir = tf. ir
210- new_tape = copy (tf. tape)
211- new_tape. owner = new_tf
238+ roster = Dict {UInt64, Any} ()
239+ new_tape = copy (tf. tape, roster)
240+ setowner! (new_tape, new_tf)
212241 new_tf. tape = new_tape
213242 return new_tf
214243end
215244
216245function Base. copy (t:: TapedTask )
217- # t.counter[] <= 1 && error("Can't copy a TapedTask which is not running.")
218246 tf = copy (t. tf)
219247 new_t = TapedTask (tf)
220- new_t. counter[] = t. counter[] + 1
248+ new_t. task. storage = copy (t. task. storage)
249+ new_t. task. storage[:tapedtask ] = new_t
250+ next_step (new_t)
221251 return new_t
222252end
0 commit comments