From 36a68d651a66d074a7ac2b943f937378c8281ffc Mon Sep 17 00:00:00 2001 From: Jun Tian Date: Wed, 12 Aug 2020 00:16:24 +0800 Subject: [PATCH 1/4] update dependency of RLCore --- src/algorithms/dqns/basic_dqn.jl | 29 ++--- src/algorithms/dqns/common.jl | 20 ++-- src/algorithms/dqns/dqn.jl | 14 +-- src/algorithms/policy_gradient/A2C.jl | 10 +- src/algorithms/policy_gradient/A2CGAE.jl | 10 +- src/algorithms/policy_gradient/ddpg.jl | 13 ++- src/algorithms/policy_gradient/ppo.jl | 12 +- .../policy_gradient/ppo_trajectory.jl | 110 ++++-------------- 8 files changed, 79 insertions(+), 139 deletions(-) diff --git a/src/algorithms/dqns/basic_dqn.jl b/src/algorithms/dqns/basic_dqn.jl index be1b40d..04ade6c 100644 --- a/src/algorithms/dqns/basic_dqn.jl +++ b/src/algorithms/dqns/basic_dqn.jl @@ -62,21 +62,24 @@ function BasicDQNLearner(; ) end -function RLBase.update!(learner::BasicDQNLearner, t::AbstractTrajectory) - length(t) < learner.min_replay_history && return +function RLBase.update!(learner::BasicDQNLearner, T::AbstractTrajectory) + length(T[:terminal]) < learner.min_replay_history && return - inds = rand(learner.rng, 1:length(t), learner.batch_size) - batch = map(get_trace(t, :state, :action, :reward, :terminal, :next_state)) do x - consecutive_view(x, inds) - end + Q = learner.approximator + D = device(Q) + γ = learner.γ + loss_func = learner.loss_func + batch_size = learner.batch_size - Q, γ, loss_func, batch_size = - learner.approximator, learner.γ, learner.loss_func, learner.batch_size - s, r, t, s′ = map( - x -> send_to_device(device(Q), x), - (batch.state, batch.reward, batch.terminal, batch.next_state), - ) - a = CartesianIndex.(batch.action, 1:batch_size) + inds = rand(learner.rng, 1:length(T[:terminal]), learner.batch_size) + + s = send_to_device(D, consecutive_view(T[:state], inds)) + a = consecutive_view(T[:action], inds) + r = send_to_device(D, consecutive_view(T[:reward], inds)) + t = send_to_device(D, consecutive_view(T[:terminal], inds)) + s′ = send_to_device(D, consecutive_view(T[:next_state], inds)) + + a = CartesianIndex.(a, 1:batch_size) gs = gradient(params(Q)) do q = Q(s)[a] diff --git a/src/algorithms/dqns/common.jl b/src/algorithms/dqns/common.jl index 4d0e54a..34689e6 100644 --- a/src/algorithms/dqns/common.jl +++ b/src/algorithms/dqns/common.jl @@ -11,14 +11,14 @@ function extract_experience(t::AbstractTrajectory, learner::PERLearners) γ = learner.γ # 1. sample indices based on priority - valid_ind_range = isnothing(s) ? (1:(length(t)-h)) : (s:(length(t)-h)) + valid_ind_range = isnothing(s) ? (1:(length(t[:terminal])-h)) : (s:(length(t[:terminal])-h)) if t isa CircularCompactPSARTSATrajectory inds = Vector{Int}(undef, n) priorities = Vector{Float32}(undef, n) for i in 1:n - ind, p = sample(learner.rng, get_trace(t, :priority)) + ind, p = sample(learner.rng, t[:priority]) while ind ∉ valid_ind_range - ind, p = sample(learner.rng, get_trace(t, :priority)) + ind, p = sample(learner.rng, t[:priority]) end inds[i] = ind priorities[i] = p @@ -29,11 +29,11 @@ function extract_experience(t::AbstractTrajectory, learner::PERLearners) end # 2. extract SARTS - states = consecutive_view(get_trace(t, :state), inds; n_stack = s) - actions = consecutive_view(get_trace(t, :action), inds) - next_states = consecutive_view(get_trace(t, :state), inds .+ h; n_stack = s) - consecutive_rewards = consecutive_view(get_trace(t, :reward), inds; n_horizon = h) - consecutive_terminals = consecutive_view(get_trace(t, :terminal), inds; n_horizon = h) + states = consecutive_view(t[:state], inds; n_stack = s) + actions = consecutive_view(t[:action], inds) + next_states = consecutive_view(t[:state], inds .+ h; n_stack = s) + consecutive_rewards = consecutive_view(t[:reward], inds; n_horizon = h) + consecutive_terminals = consecutive_view(t[:terminal], inds; n_horizon = h) rewards, terminals = zeros(Float32, n), fill(false, n) rewards = discount_rewards_reduced( @@ -57,7 +57,7 @@ end function RLBase.update!(p::QBasedPolicy{<:PERLearners}, t::AbstractTrajectory) learner = p.learner - length(t) < learner.min_replay_history && return + length(t[:terminal]) < learner.min_replay_history && return learner.update_step += 1 @@ -71,7 +71,7 @@ function RLBase.update!(p::QBasedPolicy{<:PERLearners}, t::AbstractTrajectory) if t isa CircularCompactPSARTSATrajectory priorities = update!(p.learner, experience) - get_trace(t, :priority)[inds] .= priorities + t[:priority][inds] .= priorities else update!(p.learner, experience) end diff --git a/src/algorithms/dqns/dqn.jl b/src/algorithms/dqns/dqn.jl index 0a5af4d..bd82b8e 100644 --- a/src/algorithms/dqns/dqn.jl +++ b/src/algorithms/dqns/dqn.jl @@ -99,7 +99,7 @@ end Flux.squeezebatch function RLBase.update!(learner::DQNLearner, t::AbstractTrajectory) - length(t) < learner.min_replay_history && return + length(t[:terminal]) < learner.min_replay_history && return learner.update_step += 1 @@ -144,13 +144,13 @@ function extract_experience(t::AbstractTrajectory, learner::DQNLearner) n = learner.batch_size γ = learner.γ - valid_ind_range = isnothing(s) ? (1:(length(t)-h)) : (s:(length(t)-h)) + valid_ind_range = isnothing(s) ? (1:(length(t[:terminal])-h)) : (s:(length(t[:terminal])-h)) inds = rand(learner.rng, valid_ind_range, n) - states = consecutive_view(get_trace(t, :state), inds; n_stack = s) - actions = consecutive_view(get_trace(t, :action), inds) - next_states = consecutive_view(get_trace(t, :state), inds .+ h; n_stack = s) - consecutive_rewards = consecutive_view(get_trace(t, :reward), inds; n_horizon = h) - consecutive_terminals = consecutive_view(get_trace(t, :terminal), inds; n_horizon = h) + states = consecutive_view(t[:state], inds; n_stack = s) + actions = consecutive_view(t[:action], inds) + next_states = consecutive_view(t[:state], inds .+ h; n_stack = s) + consecutive_rewards = consecutive_view(t[:reward], inds; n_horizon = h) + consecutive_terminals = consecutive_view(t[:terminal], inds; n_horizon = h) rewards, terminals = zeros(Float32, n), fill(false, n) # make sure that we only consider experiences in current episode diff --git a/src/algorithms/policy_gradient/A2C.jl b/src/algorithms/policy_gradient/A2C.jl index 463df8b..8405a0c 100644 --- a/src/algorithms/policy_gradient/A2C.jl +++ b/src/algorithms/policy_gradient/A2C.jl @@ -43,11 +43,11 @@ end function RLBase.update!(learner::A2CLearner, t::AbstractTrajectory) isfull(t) || return - states = get_trace(t, :state) - actions = get_trace(t, :action) - rewards = get_trace(t, :reward) - terminals = get_trace(t, :terminal) - next_state = select_last_frame(get_trace(t, :next_state)) + states = t[:state] + actions = t[:action] + rewards = t[:reward] + terminals = t[:terminal] + next_state = select_last_frame(t[:next_state]) AC = learner.approximator γ = learner.γ diff --git a/src/algorithms/policy_gradient/A2CGAE.jl b/src/algorithms/policy_gradient/A2CGAE.jl index e0805f6..e15ba35 100644 --- a/src/algorithms/policy_gradient/A2CGAE.jl +++ b/src/algorithms/policy_gradient/A2CGAE.jl @@ -36,11 +36,11 @@ end function RLBase.update!(learner::A2CGAELearner, t::AbstractTrajectory) isfull(t) || return - states = get_trace(t, :state) - actions = get_trace(t, :action) - rewards = get_trace(t, :reward) - terminals = get_trace(t, :terminal) - rollout = t[:state] + states = t[:state] + actions = t[:action] + rewards = t[:reward] + terminals = t[:terminal] + rollout = t[:full_state] AC = learner.approximator γ = learner.γ diff --git a/src/algorithms/policy_gradient/ddpg.jl b/src/algorithms/policy_gradient/ddpg.jl index b5b639e..c762cdb 100644 --- a/src/algorithms/policy_gradient/ddpg.jl +++ b/src/algorithms/policy_gradient/ddpg.jl @@ -103,13 +103,16 @@ function (p::DDPGPolicy)(env) end end -function RLBase.update!(p::DDPGPolicy, t::CircularCompactSARTSATrajectory) - length(t) > p.update_after || return +function RLBase.update!(p::DDPGPolicy, traj::CircularCompactSARTSATrajectory) + length(traj[:terminal]) > p.update_after || return p.step % p.update_every == 0 || return - inds = rand(p.rng, 1:(length(t)-1), p.batch_size) - SARTS = (:state, :action, :reward, :terminal, :next_state) - s, a, r, t, s′ = map(x -> select_last_dim(get_trace(t, x), inds), SARTS) + inds = rand(p.rng, 1:(length(traj[:terminal])-1), p.batch_size) + s = select_last_dim(traj[:state], inds) + a = select_last_dim(traj[:action], inds) + r = select_last_dim(traj[:reward], inds) + t = select_last_dim(traj[:terminal], inds) + s′ = select_last_dim(traj[:next_state], inds) A = p.behavior_actor C = p.behavior_critic diff --git a/src/algorithms/policy_gradient/ppo.jl b/src/algorithms/policy_gradient/ppo.jl index 104ecef..d4fb29a 100644 --- a/src/algorithms/policy_gradient/ppo.jl +++ b/src/algorithms/policy_gradient/ppo.jl @@ -90,12 +90,12 @@ end function RLBase.update!(learner::PPOLearner, t::PPOTrajectory) isfull(t) || return - states = get_trace(t, :state) - actions = get_trace(t, :action) - action_log_probs = get_trace(t, :action_log_prob) - rewards = get_trace(t, :reward) - terminals = get_trace(t, :terminal) - states_plus = t[:state] + states = t[:state] + actions = t[:action] + action_log_probs = t[:action_log_prob] + rewards = t[:reward] + terminals = t[:terminal] + states_plus = t[:full_state] rng = learner.rng AC = learner.approximator diff --git a/src/algorithms/policy_gradient/ppo_trajectory.jl b/src/algorithms/policy_gradient/ppo_trajectory.jl index 9ebcf61..f76e5b8 100644 --- a/src/algorithms/policy_gradient/ppo_trajectory.jl +++ b/src/algorithms/policy_gradient/ppo_trajectory.jl @@ -2,102 +2,36 @@ export PPOTrajectory using MacroTools -struct PPOTrajectory{T<:CircularCompactSARTSATrajectory,P,names,types} <: - AbstractTrajectory{names,types} - trajectory::T - action_log_prob::P -end +const PPOTrajectory = CombinedTrajectory{ + <:SharedTrajectory{<:CircularArrayBuffer, <:NamedTuple{(:action_log_prob, :next_action_log_prob, :full_action_log_prob)}}, + <:CircularCompactSARTSATrajectory, +} function PPOTrajectory(; capacity, action_log_prob_size = (), action_log_prob_type = Float32, - kw..., + kw... ) - t = CircularCompactSARTSATrajectory(; capacity = capacity, kw...) - p = CircularArrayBuffer{action_log_prob_type}(action_log_prob_size..., capacity + 1) - names = typeof(t).parameters[1] - types = typeof(t).parameters[2] - PPOTrajectory{ - typeof(t), - typeof(p), - ( - :state, - :action, - :action_log_prob, - :reward, - :terminal, - :next_state, - :next_action, - :next_action_log_prob, - ), - Tuple{ - types.parameters[1:2]..., - frame_type(p), - types.parameters[3:end]..., - frame_type(p), - }, - }( - t, - p, + CombinedTrajectory( + SharedTrajectory(CircularArrayBuffer{action_log_prob_type}(action_log_prob_size..., capacity + 1), :action_log_prob), + CircularCompactSARTSATrajectory(;capacity=capacity,kw...), ) end -MacroTools.@forward PPOTrajectory.trajectory Base.length, Base.isempty, RLCore.isfull - -function RLCore.get_trace(t::PPOTrajectory, s::Symbol) - if s == :action_log_prob - select_last_dim( - t.action_log_prob, - 1:(nframes(t.action_log_prob) > 1 ? nframes(t.action_log_prob) - 1 : - nframes(t.action_log_prob)), - ) - elseif s == :next_action_log_prob - select_last_dim(t.action_log_prob, 2:nframes(t.action_log_prob)) - else - get_trace(t.trajectory, s) - end -end +const PPOActionMaskTrajectory = CombinedTrajectory{ + <:SharedTrajectory{<:CircularArrayBuffer, <:NamedTuple{(:action_log_prob, :next_action_log_prob, :full_action_log_prob)}}, + <:CircularCompactSALRTSALTrajectory, +} -Base.getindex(t::PPOTrajectory, s::Symbol) = - s == :action_log_prob ? t.action_log_prob : t.trajectory[s] - -function Base.getindex(p::PPOTrajectory, i::Int) - s, a, r, t, s′, a′ = p.trajectory[i] - ( - state = s, - action = a, - action_log_prob = select_last_dim(p.action_log_prob, i), - reward = r, - terminal = t, - next_state = s′, - next_action = a′, - next_action_log_prob = select_last_dim(p.action_log_prob, i + 1), +function PPOActionMaskTrajectory(; + capacity, + action_log_prob_size = (), + action_log_prob_type = Float32, + kw... +) + CombinedTrajectory( + SharedTrajectory(CircularArrayBuffer{action_log_prob_type}(action_log_prob_size..., capacity + 1), :action_log_prob), + CircularCompactSALRTSALTrajectory(;capacity=capacity,kw...), ) -end - -function Base.empty!(b::PPOTrajectory) - empty!(b.action_log_prob) - empty!(b.trajectory) -end - -function Base.push!(b::PPOTrajectory, kv::Pair{Symbol}) - k, v = kv - if k == :action_log_prob || k == :next_action_log_prob - push!(b.action_log_prob, v) - else - push!(b.trajectory, kv) - end -end - -function Base.pop!(t::PPOTrajectory, s::Symbol) - if s == :action_log_prob || s == :next_action_log_prob - pop!(t.action_log_prob) - else - pop!(t.trajectory, s) - end -end - -function Base.pop!(t::PPOTrajectory) - (pop!(t.trajectory)..., action_log_prob = pop!(t.action_log_prob)) -end +end \ No newline at end of file From d115e9b4eda0f3eb9e42457ebdd5f9bd2e697497 Mon Sep 17 00:00:00 2001 From: Jun Tian Date: Thu, 13 Aug 2020 00:58:49 +0800 Subject: [PATCH 2/4] remove unnecessary copy due to upstream change --- src/algorithms/policy_gradient/ppo.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/algorithms/policy_gradient/ppo.jl b/src/algorithms/policy_gradient/ppo.jl index d4fb29a..40db89f 100644 --- a/src/algorithms/policy_gradient/ppo.jl +++ b/src/algorithms/policy_gradient/ppo.jl @@ -126,7 +126,7 @@ function RLBase.update!(learner::PPOLearner, t::PPOTrajectory) rand_inds = shuffle!(rng, Vector(1:n_envs*n_rollout)) for i in 1:n_microbatches inds = rand_inds[(i-1)*microbatch_size+1:i*microbatch_size] - s = send_to_device(D, select_last_dim(states_flatten, inds) |> copy) # !!! must copy here + s = send_to_device(D, select_last_dim(states_flatten, inds)) a = vec(actions)[inds] r = send_to_device(D, vec(returns)[inds]) log_p = send_to_device(D, vec(action_log_probs)[inds]) From 13b1143250fd723520bdbfe8e9391058580f6cca Mon Sep 17 00:00:00 2001 From: Jun Tian Date: Thu, 13 Aug 2020 00:59:25 +0800 Subject: [PATCH 3/4] correct save dir name --- src/experiments/atari.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/experiments/atari.jl b/src/experiments/atari.jl index 40ba520..12563dc 100644 --- a/src/experiments/atari.jl +++ b/src/experiments/atari.jl @@ -549,7 +549,7 @@ function RLCore.Experiment( rng = MersenneTwister(seed) if isnothing(save_dir) t = Dates.format(now(), "yyyy_mm_dd_HH_MM_SS") - save_dir = joinpath(pwd(), "checkpoints", "JuliaRL_A2C_Atari_$(name)_$(t)") + save_dir = joinpath(pwd(), "checkpoints", "rlpyt_A2C_Atari_$(name)_$(t)") end lg = TBLogger(joinpath(save_dir, "tb_log"), min_level = Logging.Info) @@ -716,7 +716,7 @@ function RLCore.Experiment( rng = MersenneTwister(seed) if isnothing(save_dir) t = Dates.format(now(), "yyyy_mm_dd_HH_MM_SS") - save_dir = joinpath(pwd(), "checkpoints", "JuliaRL_PPO_Atari_$(name)_$(t)") + save_dir = joinpath(pwd(), "checkpoints", "rlpyt_PPO_Atari_$(name)_$(t)") end lg = TBLogger(joinpath(save_dir, "tb_log"), min_level = Logging.Info) From 0b44b1f0f3db750f2254454ac202a7bda461c334 Mon Sep 17 00:00:00 2001 From: Jun Tian Date: Thu, 13 Aug 2020 01:21:13 +0800 Subject: [PATCH 4/4] use latest version of RLCore --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 52b7844..2b665d4 100644 --- a/Project.toml +++ b/Project.toml @@ -30,7 +30,7 @@ CUDA = "1" Flux = "0.11" MacroTools = "0.5" ReinforcementLearningBase = "0.8" -ReinforcementLearningCore = "0.4" +ReinforcementLearningCore = "0.4.1" Requires = "1" Setfield = "0.6, 0.7" StatsBase = "0.32, 0.33"