Skip to content

Conversation

@KDr2
Copy link
Member

@KDr2 KDr2 commented Nov 24, 2021

I copied the "taped" implementation from https://github.com/KDr2/Taped.jl and updated the TArray and TRef. Now all test cases have passed, but some changes had been made to let them pass:

  1. Infinite loops in tasks are changed to finite loops (while true -> for _ in RANG)
  2. Exceptions now are handled in a different (and maybe a better one) way
  3. Task copying ONLY occurs at the time of calling produce, i.e., when the task is waiting on an unbuffered channel. This will cause some differences as to the value we get the first time we call consume after a task is copied.

The details of item 3:

       function f()
            t = [0 1 2]
            for _ in 1:10
                produce(t[1])
                t[1] = 1 + t[1]
            end
        end

        ctask = CTask(f)
        @test consume(ctask) == 0
        @test consume(ctask) == 1
        a = copy(ctask)  # --> 1.
        @test consume(a) == 2 # --> 2.
        @test consume(a) == 3
        # @test consume(ctask) == 4 # we get 4 in the old version
        @test consume(ctask) == 2 # --> 3.
        @test consume(ctask) == 5
        @test consume(ctask) == 6
        @test consume(ctask) == 7

On line --> 1., the task calls produce with t[1] which is computed and has the value of 2, and at this point, the task is scheduled off, then the copy occurs. then:

  • when we consume the original task, we get 2 (line --> 3)
  • when we consume the new task, the produce instruction, with its cached input which is 2, is replayed, and we also get 2 (line --> 2)

This happens on shared data.

UPDATE: The 3rd issue has been fixed.

@KDr2
Copy link
Member Author

KDr2 commented Nov 24, 2021

To solve the 3rd issue, maybe we can find the instruction which produces the input(argument) of produce, and replay the tape from that instruction. Should we do that?

UPDATE: I tried it and found that it's not that simple.

@yebai yebai mentioned this pull request Nov 24, 2021
@yebai
Copy link
Member

yebai commented Nov 24, 2021

# @test consume(ctask) == 4 # we get 4 in the old version
    @test consume(ctask) == 2 # --> 3.

I'm confused. Here we are interpreting the instructions on the tape for both ctask and copied task a. When line 3 runs, won't task ctask call t[1] (again) to get the current value of t?

@KDr2
Copy link
Member Author

KDr2 commented Nov 24, 2021

The IR of produce(t[1]) is like:

a = getindex(t, 1)
produce(a)

Thus, the tape is:

....
idx=5: Instruction(getindex, ...)
idx=6: Instruction(produce, instruction_5.output)
... 

The time we copy the task is when idx=6 is executing, the data is put into a channel and waits for being consumed.

In the copied task, we copy the tape, and replay it from idx=6, where the data to be produced was already there (generated by idx=5).

This only affects SHARED DATA, which doesn't concern us much and seems not to have a clear semantic in the old Libtask.

@yebai
Copy link
Member

yebai commented Nov 24, 2021

@KDr2 I think the reason is the difference between the new Channel mechanism and our previous consume/produce. Our previous consume/produce is a bit lazy, i.e. a task only produce when needed. While the Channel mechanism is in eager mode, i.e. it always produces when the buffer is empty. It means the Channel mechanism always pre-produces a result and put it in the channel buffer. The original task pre-computes the produced value based on the current t in the example above. It is fine if t is not modified by other tasks. Otherwise, the value pre-produced by the original task will be based on an out-of-date copy of t. It can be fixed by introducing a queuing mechanism in the produce function:

function (instr::Instruction{typeof(produce)})()
    args = val(instr.input[1])
    tape = gettape(instr)
    tf = tape.owner
    ttask = tf.owner
    ch = ttask.channel
    put!(ch, args)
    # check whether there is a waiting consumer in the queue, if yes, continue; 
    #  if not, put the task to sleep until the queue becomes non-empty.
    #  This would delay the `produce` action until there is a demand. 
endf

This queuing mechanism is similar to the one we have in the produce function and makes tasks produce-on-demand instead of produce-eagerly.

@KDr2
Copy link
Member Author

KDr2 commented Nov 24, 2021

You are right, but I'm afraid that maybe the queue mechanism wouldn't help here?

function (instr::Instruction{typeof(produce)})()
    args = val(instr.input[1])
    tape = gettape(instr)
    tf = tape.owner
    ttask = tf.owner
    ch = ttask.channel
    put!(ch, args)
    # check whether there is a waiting consumer in the queue, if yes, continue; 
    #  if not, put the task to sleep until the queue becomes non-empty.
    #  This would delay the `produce` action until there is a demand. 
endf

When the execution reaches the point where the comment is placed, the data which will be produced is already determined by the last instruction, whether we wait for a consumer or not, that determined value would not change.

A right mechanism would be, we check if there's a consumer, if not, we wait at the instruction which computes the data, instead of waiting at the instruction of produce. This may be doable if the data is determined only by the instruction whose output is that data.

@KDr2
Copy link
Member Author

KDr2 commented Nov 24, 2021

Take produce(t[1]) as an example, we should wait on Instruction(getindex, t, 1), not Instruction(produce, ...).

@yebai
Copy link
Member

yebai commented Nov 24, 2021

I think the queue mechanism can work if we ignore the first produce call. In that case, a simple fix is to insert a void produce call and bypass the result by default.

EDIT: We currently only allow copying tasks after at least one produce call. It means that the issue of shared data won't matter since there won't be multiple tasks mutating (shared) data before the first produce call. So the queue system should work fine.

@KDr2
Copy link
Member Author

KDr2 commented Nov 25, 2021

The purpose of inserting a dummy produce is to delay the computing of the data. But it won't serve this purpose well if we insert it immediately before the real produce instruction:

data = t[1]
produce(void) # we insert it here, and data was already computed at this point
produce(data)

So if we want to delay the computing, we should insert the dummy produce before the computing, maybe like this:

produce(void)
data = t[1]
produce(data)

But, on the other hand, it is NOT very easy to determine which instruction(s) affects the computing of data, for example:

d0 = t[1]
d1 = d0 + 1
produce(d1)

In this tape, we should put the dummy produce before d0 but not d1.


More precisely, the copy only can occur during a call to produce, that is, when we have entered the function, but have not returned from it yet. In the new task, we will reenter the produce(the instruction), with the same data when copy is called.

If the data is of primitive types or deepcopy-implemented types (like TArray and TRef), all works fine without any confusion.

If the data is shared between the tasks, we should make the semantic of copy clear: When the copy occurs, produce(expression), before expression is evaluated or after that. To me, copying after
that is more clear, because if we choose copying before that, it is hard to determine how long before:

d1 = t[1]
d2 = ...
d3 = f(d1, d2)
produce(d3)
d4 = t[1] += 1 # some changes which affect the value of d1 in the next loop
...

In the old version, we didn't clear up this either I think, but it works intuitively due to the stack copy.

@KDr2
Copy link
Member Author

KDr2 commented Nov 30, 2021

Issue 3 fixed. Thanks to @yebai.

@yebai
Copy link
Member

yebai commented Nov 30, 2021

Great, thanks @KDr2 - let's now move forward to fix the integration tests with Turing and AdvancedPS.

@devmotion
Copy link
Member

Isn't it a bit surprising snd potentially misleading if Libtask.jl is not based on Libtask_jll anymore but a tape based approach? Wouldn't it be cleaner to register a new package?

I assume the next release would be breaking (seems safer even if the API is not changed), so downstream packages would require manual updates anyway.

@yebai
Copy link
Member

yebai commented Dec 1, 2021

It is significantly more challenging to port Libtask_jll for Julia 1.7 (see #92) than the previous upgrades. That motivated the tape-based approach for task copying. I agree that the code in this PR can live somewhere else (e.g. AdvancedPS, AbstractPPL), but for the time being, keeping them inside Libtask allows us to keep the disruption minimal - most of the APIs remain the same. That said, we can still make a breaking release of Libtask after this PR is merged since the underlying mechanism has changed.

In the longer run, we might consider depreciating Libtask since it is a bit hacky and poses a high maintenance burden.

@devmotion
Copy link
Member

I think it is great if we don't have to deal with the Libtask_jll issues and new Julia versions don't break Turing anymore. I'm not against this approach at all. My main question is just if it is reasonable to merge it into Libtask.jl given that it is not based on Libtask_jll anymore and such a fundamentally different approach.

I think it should not matter for downstream packages if the tape approach is available in a breaking Libtask release or a new package. Different packages might even motivate a common API and a more modular design in the future where packages and/or users can choose the preferred backend.

@FredericWantiez
Copy link
Member

Had a quick look at the integration test for AdvancedPS. We're using infinite loop which are not supported by this I think.

Tests don't hang anymore with finite loops but fail on:

InvalidStateException("Channel is closed.", :closed)

This seems to happen whenever we're using anonymous functions:

function outer()
    for i in 1:10
        produce(10)
    end
end

ctask = CTask(outer)
consume(ctask) # Fine, gives 10

ctask2 = CTask(() -> outer()) # Similar to the way AdvancedPS.Trace works
consume(ctask2) # Fails on Channel error

@KDr2
Copy link
Member Author

KDr2 commented Dec 4, 2021

It's because the limitations of the new approach:

  • produce should only be called directly in the function which is used to construct CTasks, calling to produce in a nested call couldn't be traced.
  • we can't use task.code as a function to construct another CTask, because what task.code does now is replaying the tape of the taped task. I think we should use CTask.func instead.

@yebai
Copy link
Member

yebai commented Dec 5, 2021

@KDr2 There are valid cases where we would like to construct a CTask using functions with non-empty arguments. This is currently impossible without creating an anonymous function. Since we assume TapedTask only track top-level function calls, it causes an issue mentioned by @FredericWantiez above. To address the issue, maybe we can add a new CTask constructor:

CTask(outer::Function, args::Tuple, ...)  #  call `outer(args)` internally

This would avoid the need to create anonymous functions for tasks.

@KDr2
Copy link
Member Author

KDr2 commented Dec 5, 2021

But I saw some cases like this:

a_task.code() # it must have calls to produce 
produce(something)

in a function which is used to create a ctask.

so maybe we should track 1 more depth down?

@yebai
Copy link
Member

yebai commented Dec 5, 2021

so maybe we should track 1 more depth down?
a_task.code() # it must have calls to produce
produce(something)

The produce statement found here is meant to provide a mechanism to inform the parent task that the producer task has terminated (see here). If we can provide a mechanism to check whether a CTask has terminated, then we can replace the anonymous function with a standard CTask constructor.

Copy link
Member

@phipsgabler phipsgabler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, it's an elegant tape implementation.

I do however share @devmotion's concern that it is a bit weird to completely replace the underlying library without changing the name. Why not make a separate package, and archive Libtask.jl?

@KDr2
Copy link
Member Author

KDr2 commented Dec 7, 2021

provide a mechanism to check whether a CTask has terminated

The Channel Closed Exception is a good indicator I think?

@KDr2
Copy link
Member Author

KDr2 commented Dec 7, 2021

I updated container.jl in AdancedPS.jl:

--- a/src/container.jl
+++ b/src/container.jl
@@ -7,13 +7,7 @@ end
 const Particle = Trace

 function Trace(f, rng::TracedRNG)
-    ctask = let f = f
-        Libtask.CTask() do
-            res = f(rng)
-            Libtask.produce(nothing)
-            return res
-        end
-    end
+    ctask = Libtask.CTask(f, rng)

     # add backward reference
     newtrace = Trace(f, ctask, rng)
@@ -62,13 +56,8 @@ function forkr(trace::Trace)
     newf = reset_model(trace.f)
     Random123.set_counter!(trace.rng, 1)

-    ctask = let f = trace.ctask.task.code
-        Libtask.CTask() do
-            res = f()(trace.rng)
-            Libtask.produce(nothing)
-            return res
-        end
-    end
+    func = trace.ctask.tf.func
+    ctask = Libtask.CTask(func, trace.rng)

There are still some test failures.

@devmotion
Copy link
Member

Another problem with updating AdvancedPS is that Turing does not support (and I assume is not compatible with) the many unreleased changes in its master branch.

@KDr2
Copy link
Member Author

KDr2 commented Dec 7, 2021

Most test cases of AdvancedPS passed now:

https://github.com/TuringLang/AdvancedPS.jl/pull/35/files

@KDr2
Copy link
Member Author

KDr2 commented Dec 8, 2021

I just found a way to support produce in nested function calls:
59c7d20

So we don't need to specialize AdvancedPS.obsever on tape now:
TuringLang/AdvancedPS.jl@d16415c

@yebai
Copy link
Member

yebai commented Dec 9, 2021

I just found a way to support produce in nested function calls: 59c7d20

@KDr2 The new produce implementation is a bit hacky I think. I like the previous implementation better because the semantics is clear and easy to modify via method overloading. However, I think we can make use of the new trick to implement an error handling mechanism - i.e. detect produce calls in nested function calls and throw an exception. This would warn the user and implement an overloaded function to fix the error.

@KDr2
Copy link
Member Author

KDr2 commented Dec 9, 2021

detect produce calls in nested function calls and throw an exception

That's brilliant! 😄

@yebai yebai merged commit af47df9 into master Dec 12, 2021
@delete-merged-branch delete-merged-branch bot deleted the taped branch December 12, 2021 19:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants