Skip to content

Commit ccc293c

Browse files
KDr2yebaidevmotion
authored
Performance and Benchmarks (#104)
* temporarily add some pkgs to do testing * simple benchmarks * use ir and tape cache * use LRUCache instead of Dict * partially copy tape * fix a TArray bug * add Project.toml for perf dir * minor update * Update src/tapedtask.jl Co-authored-by: David Widmann <[email protected]> * remove redundant module * Catch and print error while re-running a (cached) tape. * put `new` onto tape * copy NewInstruction * update docs/comments * give a warning when find an unknown ir code * refactor new instruction, add test cases * new CI job * Update Project.toml Co-authored-by: Hong Ge <[email protected]> Co-authored-by: David Widmann <[email protected]>
1 parent d27401a commit ccc293c

File tree

13 files changed

+361
-58
lines changed

13 files changed

+361
-58
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: Benchmarks and MicroIntegration
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
pull_request:
8+
9+
jobs:
10+
test:
11+
name: Benchmarks and MicroIntegration
12+
runs-on: ubuntu-latest
13+
strategy:
14+
fail-fast: false
15+
steps:
16+
- uses: actions/checkout@v2
17+
- uses: julia-actions/setup-julia@v1
18+
with:
19+
version: 1
20+
arch: x64
21+
- uses: julia-actions/julia-buildpkg@latest
22+
- name: setup enviroment
23+
shell: julia --color=yes --project=perf {0}
24+
run: |
25+
using Pkg
26+
try
27+
# force it to use this PR's version of the package
28+
pkg"add Turing#hg/new-libtask2" # TODO: remove this when Turing is updated
29+
Pkg.develop(PackageSpec(path=".")) # resolver may fail with main deps
30+
Pkg.update()
31+
catch err
32+
err isa Pkg.Resolve.ResolverError || rethrow()
33+
# If we can't resolve that means this is incompatible by SemVer and this is fine
34+
# It means we marked this as a breaking change, so we don't need to worry about
35+
# Mistakenly introducing a breaking change, as we have intentionally made one
36+
@info "Not compatible with this release. No problem." exception=err
37+
exit(0) # Exit immediately, as a success
38+
end
39+
- name: run
40+
run: julia --color=yes --project=perf perf/runtests.jl

Project.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ uuid = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f"
33
license = "MIT"
44
desc = "Tape based task copying in Turing"
55
repo = "https://github.com/TuringLang/Libtask.jl.git"
6-
version = "0.6.2"
6+
version = "0.6.3"
77

88
[deps]
99
IRTools = "7869d1d1-7146-5819-86e3-90919afe41df"
10+
LRUCache = "8ac3fa9e-de4c-5943-b1dc-09c6b5f20637"
1011
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1112
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
1213
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"

perf/Project.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[deps]
2+
AbstractMCMC = "80f14c24-f653-4e6a-9b94-39d6b0f70001"
3+
AdvancedPS = "576499cb-2369-40b2-a588-c64705576edc"
4+
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
5+
DynamicPPL = "366bfd00-2699-11ea-058f-f148b4cae6d8"
6+
Libtask = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f"
7+
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
8+
Turing = "fce5fe82-541a-59a6-adf8-730c64b5f9a0"
9+
10+
[compat]
11+
julia = "1.3"
12+
13+
[targets]
14+
test = ["Test", "BenchmarkTools"]

perf/p0.jl

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# ]add Turing#hg/new-libtask2
2+
3+
using Libtask
4+
using Turing, DynamicPPL, AdvancedPS
5+
using BenchmarkTools
6+
7+
@model gdemo(x, y) = begin
8+
# Assumptions
9+
σ ~ InverseGamma(2,3)
10+
μ ~ Normal(0,sqrt(σ))
11+
# Observations
12+
x ~ Normal(μ, sqrt(σ))
13+
y ~ Normal(μ, sqrt(σ))
14+
end
15+
16+
17+
# Case 1: Sample from the prior.
18+
19+
m = Turing.Core.TracedModel(gdemo(1.5, 2.), SampleFromPrior(), VarInfo())
20+
21+
f = m.evaluator[1];
22+
23+
args = m.evaluator[2:end];
24+
25+
@show "Directly call..."
26+
@btime f(args...)
27+
# (2.0, VarInfo (2 variables (μ, σ), dimension 2; logp: -6.162))
28+
29+
@show "CTask construction..."
30+
t = @btime Libtask.CTask(f, args...)
31+
# schedule(t.task) # work fine!
32+
# @show Libtask.result(t.tf.tape)
33+
@show "Step in a tape..."
34+
@btime Libtask.step_in(t.tf.tape, args)
35+
36+
# Case 2: SMC sampler
37+
38+
m = Turing.Core.TracedModel(gdemo(1.5, 2.), Sampler(SMC(50)), VarInfo());
39+
@show "Directly call..."
40+
@btime m.evaluator[1](m.evaluator[2:end]...)
41+
42+
@show "CTask construction..."
43+
t = @btime Libtask.CTask(m.evaluator[1], m.evaluator[2:end]...);
44+
# schedule(t.task)
45+
# @show Libtask.result(t.tf.tape)
46+
@show "Step in a tape..."
47+
@btime Libtask.step_in(t.tf.tape, m.evaluator[2:end])

perf/p1.jl

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using Turing, Test, AbstractMCMC, DynamicPPL, Random
2+
3+
import AbstractMCMC.AbstractSampler
4+
5+
function check_numerical(chain,
6+
symbols::Vector,
7+
exact_vals::Vector;
8+
atol=0.2,
9+
rtol=0.0)
10+
for (sym, val) in zip(symbols, exact_vals)
11+
E = val isa Real ?
12+
mean(chain[sym]) :
13+
vec(mean(chain[sym], dims=1))
14+
@info (symbol=sym, exact=val, evaluated=E)
15+
@test E val atol=atol rtol=rtol
16+
end
17+
end
18+
19+
function check_MoGtest_default(chain; atol=0.2, rtol=0.0)
20+
check_numerical(chain,
21+
[:z1, :z2, :z3, :z4, :mu1, :mu2],
22+
[1.0, 1.0, 2.0, 2.0, 1.0, 4.0],
23+
atol=atol, rtol=rtol)
24+
end
25+
26+
@model gdemo_d(x, y) = begin
27+
s ~ InverseGamma(2, 3)
28+
m ~ Normal(0, sqrt(s))
29+
x ~ Normal(m, sqrt(s))
30+
y ~ Normal(m, sqrt(s))
31+
return s, m
32+
end
33+
34+
alg = CSMC(15)
35+
chain = sample(gdemo_d(1.5, 2.0), alg, 5_000)
36+
37+
@show chain
38+
39+
check_numerical(chain, [:s, :m], [49/24, 7/6], atol=0.1)

perf/p2.jl

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using Turing, Test, AbstractMCMC, DynamicPPL, Random, Turing.RandomMeasures, Libtask
2+
3+
@model infiniteGMM(x) = begin
4+
# Hyper-parameters, i.e. concentration parameter and parameters of H.
5+
α = 1.0
6+
μ0 = 0.0
7+
σ0 = 1.0
8+
9+
# Define random measure, e.g. Dirichlet process.
10+
rpm = DirichletProcess(α)
11+
12+
# Define the base distribution, i.e. expected value of the Dirichlet process.
13+
H = Normal(μ0, σ0)
14+
15+
# Latent assignment.
16+
z = tzeros(Int, length(x))
17+
18+
# Locations of the infinitely many clusters.
19+
μ = tzeros(Float64, 0)
20+
21+
for i in 1:length(x)
22+
23+
# Number of clusters.
24+
K = maximum(z)
25+
nk = Vector{Int}(map(k -> sum(z .== k), 1:K))
26+
27+
# Draw the latent assignment.
28+
z[i] ~ ChineseRestaurantProcess(rpm, nk)
29+
30+
# Create a new cluster?
31+
if z[i] > K
32+
push!(μ, 0.0)
33+
34+
# Draw location of new cluster.
35+
μ[z[i]] ~ H
36+
end
37+
38+
# Draw observation.
39+
x[i] ~ Normal(μ[z[i]], 1.0)
40+
end
41+
end
42+
43+
# Generate some test data.
44+
Random.seed!(1)
45+
46+
data = vcat(randn(10), randn(10) .- 5, randn(10) .+ 10)
47+
data .-= mean(data)
48+
data /= std(data)
49+
50+
# MCMC sampling
51+
Random.seed!(2)
52+
iterations = 500
53+
model_fun = infiniteGMM(data)
54+
55+
m = Turing.Core.TracedModel(model_fun, Sampler(SMC(50)), VarInfo())
56+
f = m.evaluator[1]
57+
args = m.evaluator[2:end]
58+
59+
t = Libtask.CTask(f, args...)
60+
61+
Libtask.step_in(t.tf.tape, args)
62+
63+
@show Libtask.result(t.tf.tape)

perf/runtests.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
include("p0.jl")
2+
include("p1.jl")
3+
include("p2.jl")

src/Libtask.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module Libtask
33
using IRTools
44
using MacroTools
55

6+
using LRUCache
7+
68
export CTask, consume, produce
79
export TArray, tzeros, tfill, TRef
810

src/tapedfunction.jl

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ mutable struct Tape
66
owner
77
end
88

9+
"""
10+
Instruction
11+
12+
An `Instruction` stands for a function call
13+
"""
914
mutable struct Instruction{F} <: AbstractInstruction
1015
fun::F
1116
input::Tuple
@@ -46,13 +51,20 @@ function Base.show(io::IO, box::Box)
4651
println(io, "Box($(box.val))")
4752
end
4853

54+
function Base.show(io::IO, instruction::AbstractInstruction)
55+
println(io, "A $(typeof(instruction))")
56+
end
57+
4958
function Base.show(io::IO, instruction::Instruction)
5059
fun = instruction.fun
5160
tape = instruction.tape
5261
println(io, "Instruction($(fun)$(map(val, instruction.input)), tape=$(objectid(tape)))")
5362
end
5463

5564
function Base.show(io::IO, tp::Tape)
65+
# we use an extra IOBuffer to collect all the data and then
66+
# output it once to avoid output interrupt during task context
67+
# switching
5668
buf = IOBuffer()
5769
print(buf, "$(length(tp))-element Tape")
5870
isempty(tp) || println(buf, ":")
@@ -66,10 +78,30 @@ function Base.show(io::IO, tp::Tape)
6678
end
6779

6880
function (instr::Instruction{F})() where F
69-
output = instr.fun(map(val, instr.input)...)
70-
instr.output.val = output
81+
# catch run-time exceptions / errors.
82+
try
83+
output = instr.fun(map(val, instr.input)...)
84+
instr.output.val = output
85+
catch e
86+
println(e, catch_backtrace());
87+
rethrow(e);
88+
end
7189
end
7290

91+
function _new end
92+
function (instr::Instruction{typeof(_new)})()
93+
# catch run-time exceptions / errors.
94+
try
95+
expr = Expr(:new, map(val, instr.input)...)
96+
output = eval(expr)
97+
instr.output.val = output
98+
catch e
99+
println(e, catch_backtrace());
100+
rethrow(e);
101+
end
102+
end
103+
104+
73105
function increase_counter!(t::Tape)
74106
t.counter > length(t) && return
75107
# instr = t[t.counter]
@@ -101,6 +133,19 @@ function run_and_record!(tape::Tape, f, args...)
101133
return output
102134
end
103135

136+
function run_and_record!(tape::Tape, ::typeof(_new), args...)
137+
output = try
138+
expr = Expr(:new, map(val, args)...)
139+
box(eval(expr))
140+
catch e
141+
@warn e
142+
Box{Any}(nothing)
143+
end
144+
ins = Instruction(_new, args, output, tape)
145+
push!(tape, ins)
146+
return output
147+
end
148+
104149
function unbox_condition(ir)
105150
for blk in IRTools.blocks(ir)
106151
vars = keys(blk)
@@ -169,9 +214,15 @@ function intercept(ir; recorder=:run_and_record!)
169214

170215
for (x, st) in ir
171216
x == tape && continue
172-
Meta.isexpr(st.expr, :call) || continue
173-
new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs)
174-
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...)
217+
if Meta.isexpr(st.expr, :call)
218+
new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs)
219+
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...)
220+
elseif Meta.isexpr(st.expr, :new)
221+
args = st.expr.args
222+
ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, _new, args...)
223+
else
224+
@warn "Unknown IR code: " st
225+
end
175226
end
176227
# the real return value will be in the last instruction on the tape
177228
IRTools.return!(ir, tape)
@@ -190,6 +241,13 @@ mutable struct TapedFunction
190241
end
191242
end
192243

244+
function reset!(tf::TapedFunction, ir::IRTools.IR, tape::Tape)
245+
tf.ir = ir
246+
tf.tape = tape
247+
setowner!(tape, tf)
248+
return tf
249+
end
250+
193251
function (tf::TapedFunction)(args...)
194252
if isempty(tf.tape)
195253
ir = IRTools.@code_ir tf.func(args...)

src/tapedtask.jl

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,22 @@ struct TapedTask
1616
end
1717
end
1818

19+
const TRCache = LRU{Any, Any}(maxsize=10)
20+
1921
function TapedTask(tf::TapedFunction, args...)
20-
tf.owner != nothing && error("TapedFunction is owned to another task.")
21-
isempty(tf.tape) && tf(args...)
22+
tf.owner !== nothing && error("TapedFunction is owned by another task.")
23+
if isempty(tf.tape)
24+
cache_key = (tf.func, typeof.(args)...)
25+
if haskey(TRCache, cache_key)
26+
ir, tape = TRCache[cache_key]
27+
# Here we don't need change the initial arguments of the tape,
28+
# it will be set when we `step_in` to the tape.
29+
reset!(tf, ir, copy(tape, Dict{UInt64, Any}(); partial=false))
30+
else
31+
tf(args...)
32+
TRCache[cache_key] = (tf.ir, tf.tape)
33+
end
34+
end
2235
produce_ch = Channel()
2336
consume_ch = Channel{Int}()
2437
task = @task try
@@ -199,14 +212,16 @@ function Base.copy(x::Instruction, on_tape::Tape, roster::Dict{UInt64, Any})
199212
Instruction(x.fun, input, output, on_tape)
200213
end
201214

202-
function Base.copy(t::Tape, roster::Dict{UInt64, Any})
215+
function Base.copy(t::Tape, roster::Dict{UInt64, Any}; partial=true)
203216
old_data = t.tape
204-
new_data = Vector{AbstractInstruction}()
205-
new_tape = Tape(new_data, t.counter, t.owner)
217+
len = partial ? length(old_data) - t.counter + 1 : length(old_data)
218+
start = partial ? t.counter : 1
219+
new_data = Vector{AbstractInstruction}(undef, len)
220+
new_tape = Tape(new_data, 1, t.owner)
206221

207-
for x in old_data
222+
for (i, x) in enumerate(old_data[start:end])
208223
new_ins = copy(x, new_tape, roster)
209-
push!(new_data, new_ins)
224+
new_data[i] = new_ins
210225
end
211226

212227
return new_tape

0 commit comments

Comments
 (0)