Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
6c002d7
Mask in collectors
matteobettini Jan 13, 2023
7245b58
Works without traj_split
matteobettini Jan 15, 2023
a772ef5
Temp
matteobettini Jan 15, 2023
67c87d6
step count
matteobettini Jan 15, 2023
bb76d9e
clone the expand
matteobettini Jan 15, 2023
e1c0df0
link and docs
matteobettini Jan 15, 2023
630f4de
added errors for frames overflow
matteobettini Jan 20, 2023
29374d4
Merge branch 'main' into env_batch_size_mask
matteobettini Jan 20, 2023
cc99b2a
now all batch dimensions are squashed in first one
matteobettini Jan 20, 2023
352c9c0
amend
matteobettini Jan 20, 2023
14f8e69
refector
matteobettini Jan 20, 2023
571851b
removed numel with mask
matteobettini Jan 20, 2023
e22d1ed
fixed traj_ids
matteobettini Jan 20, 2023
5fa83ff
amend
matteobettini Jan 20, 2023
9e22fe1
amend
matteobettini Jan 20, 2023
8f60534
fix split traj
matteobettini Jan 22, 2023
eac0e26
doc
matteobettini Jan 22, 2023
43e093d
refactor
matteobettini Jan 22, 2023
df2e660
refactor
matteobettini Jan 22, 2023
a55c0b3
Merge branch 'main' into env_batch_size_mask
matteobettini Jan 22, 2023
57a0ffc
Merge branch 'main' into env_batch_size_mask
matteobettini Jan 22, 2023
b13778e
refactor
matteobettini Jan 22, 2023
dec7895
docs
matteobettini Jan 22, 2023
7c46c02
fix test
matteobettini Jan 22, 2023
06ebee8
new split_traj more efficient
matteobettini Jan 22, 2023
2785b55
lint
matteobettini Jan 22, 2023
20331b9
fix tests
matteobettini Jan 22, 2023
455c606
fix tests
matteobettini Jan 22, 2023
4b03102
Lint
matteobettini Jan 22, 2023
c21654f
refactor 4 efficiency
matteobettini Jan 22, 2023
5e6afa1
no sorting
matteobettini Jan 22, 2023
8fe0b9c
typo
matteobettini Jan 22, 2023
d4c6ba9
tests
matteobettini Jan 22, 2023
4595450
tests
matteobettini Jan 22, 2023
8c582fe
tests
matteobettini Jan 22, 2023
7e15d48
tests
matteobettini Jan 23, 2023
632447e
tests
matteobettini Jan 23, 2023
62701cf
Lint
matteobettini Jan 23, 2023
5af90c1
tests
matteobettini Jan 23, 2023
6b7bdc0
tests final
matteobettini Jan 23, 2023
9cb6492
docs
matteobettini Jan 23, 2023
6702f1b
Lint
matteobettini Jan 23, 2023
e78cd2b
tests
matteobettini Jan 23, 2023
7ec62a9
Lint
matteobettini Jan 23, 2023
e9c4487
Merge branch 'main' into env_batch_size_mask
matteobettini Jan 25, 2023
3b2646c
merge main
matteobettini Jan 25, 2023
d778c17
lint
matteobettini Jan 25, 2023
26de618
refactor
matteobettini Jan 25, 2023
1b2a3b4
adjusted self.frames_per_batch to always give the correct frames
matteobettini Jan 25, 2023
b4c8ba2
typo
matteobettini Jan 25, 2023
cf66664
Merge branch 'main' into env_batch_size_mask
vmoens Feb 20, 2023
1b1d2ea
Update torchrl/collectors/collectors.py
matteobettini Feb 28, 2023
f450b39
Update torchrl/collectors/collectors.py
matteobettini Feb 28, 2023
80f2b55
Update torchrl/collectors/utils.py
matteobettini Feb 28, 2023
1666111
Merge branch 'main' into env_batch_size_mask
matteobettini Feb 28, 2023
3545dd7
refactor doc
matteobettini Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test/mocking_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ def _reset(self, tensordict: TensorDictBase, **kwargs) -> TensorDictBase:
self.count[:] = self.start_val
return TensorDict(
source={
"observation": self.count.clone(),
"observation": self.count.float().clone(),
"done": self.count > self.max_steps,
},
batch_size=self.batch_size,
Expand All @@ -907,7 +907,7 @@ def _step(
self.count += action.to(torch.int)
return TensorDict(
source={
"observation": self.count,
"observation": self.count.float(),
"done": self.count > self.max_steps,
"reward": torch.zeros_like(self.count, dtype=torch.float),
},
Expand Down
166 changes: 154 additions & 12 deletions test/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pytest
import torch

from _utils_internal import generate_seeds, PENDULUM_VERSIONED, PONG_VERSIONED
from mocking_classes import (
ContinuousActionVecMockEnv,
Expand All @@ -23,7 +24,7 @@
from tensordict.nn import TensorDictModule
from tensordict.tensordict import assert_allclose_td, TensorDict
from torch import nn
from torchrl._utils import seed_generator
from torchrl._utils import prod, seed_generator
from torchrl.collectors import aSyncDataCollector, SyncDataCollector
from torchrl.collectors.collectors import (
MultiaSyncDataCollector,
Expand Down Expand Up @@ -318,16 +319,18 @@ def make_env():
)
for _data in collector:
continue
steps = _data["collector", "step_count"][..., 1:]
done = _data["done"][..., :-1, :].squeeze(-1)
steps = _data["collector", "step_count"]
done = _data["done"].squeeze(-1)
traj_ids = _data["collector", "traj_ids"]
# we don't want just one done
assert done.sum() > 3
# check that after a done, the next step count is always 1
assert (steps[done] == 1).all()
# check that if the env is not done, the next step count is > 1
assert (steps[~done] > 1).all()
# check that if step is 1, then the env was done before
assert (steps == 1)[done].all()
for i in traj_ids.unique(sorted=False):
# check that after a done, the next step count is always 1
assert (steps[traj_ids == i][0] == 1).all()
# check that step counts are positive for not first elements of traj
assert (steps[traj_ids == i][1:] > 1).all()
# check that non-last elements of trajectories are not done
assert (done[traj_ids == i][:-1] == 0).all()
# check that split traj has a minimum total reward of -21 (for pong only)
_data = split_trajectories(_data)
assert _data["reward"].sum(-2).min() == -21
Expand Down Expand Up @@ -375,9 +378,8 @@ def make_env(seed):
)
for _, d in enumerate(collector): # noqa
break

assert (d["done"].sum(-2) >= 1).all()
assert torch.unique(d["collector", "traj_ids"], dim=-1).shape[-1] == 1
assert (d["done"].sum() >= 1).all()
assert torch.unique(d["collector", "traj_ids"]).shape[0] == num_env

del collector

Expand Down Expand Up @@ -536,6 +538,146 @@ def env_fn():
ccollector.shutdown()


@pytest.mark.parametrize("n_env_workers", [1, 3])
@pytest.mark.parametrize("batch_size", [(), (2, 4)])
@pytest.mark.parametrize("mask_env_batch_size", [None, (True, False, True)])
def test_collector_batch_size_with_env_batch_size(
n_env_workers,
batch_size,
mask_env_batch_size,
max_steps=5,
n_collector_workers=4,
seed=100,
):
if n_env_workers == 3 and _os_is_windows:
pytest.skip("Test timeout (> 10 min) on CI pipeline Windows machine with GPU")
if n_env_workers == 1:
env = lambda: CountingEnv(max_steps=max_steps, batch_size=batch_size)
if mask_env_batch_size is not None:
mask_env_batch_size = mask_env_batch_size[1:]
else:
env = lambda: ParallelEnv(
num_workers=n_env_workers,
create_env_fn=lambda: CountingEnv(
max_steps=max_steps, batch_size=batch_size
),
)
new_batch_size = env().batch_size
policy = TensorDictModule(
nn.Linear(1, 1), in_keys=["observation"], out_keys=["action"]
)
torch.manual_seed(0)
np.random.seed(0)

env_unmasked_dims = [
dim
for i, dim in enumerate(new_batch_size)
if mask_env_batch_size is not None and not mask_env_batch_size[i]
]
n_batch_envs = max(
1,
prod(
[
dim
for i, dim in enumerate(new_batch_size)
if mask_env_batch_size is None or mask_env_batch_size[i]
]
),
)
frames_per_batch = n_collector_workers * n_batch_envs * n_env_workers * 5

if mask_env_batch_size is not None and len(mask_env_batch_size) != len(
new_batch_size
):
with pytest.raises(RuntimeError):
SyncDataCollector(
create_env_fn=env,
policy=policy,
frames_per_batch=frames_per_batch,
mask_env_batch_size=mask_env_batch_size,
pin_memory=False,
)
return

# Multi async no split traj
ccollector = MultiaSyncDataCollector(
create_env_fn=[env for _ in range(n_collector_workers)],
policy=policy,
frames_per_batch=frames_per_batch,
mask_env_batch_size=mask_env_batch_size,
pin_memory=False,
split_trajs=False,
)
ccollector.set_seed(seed)
for i, b in enumerate(ccollector):
assert b.batch_size == torch.Size([frames_per_batch, *env_unmasked_dims])
if i == 1:
break
ccollector.shutdown()

# Multi async split traj
ccollector = MultiaSyncDataCollector(
create_env_fn=[env for _ in range(n_collector_workers)],
policy=policy,
frames_per_batch=frames_per_batch,
max_frames_per_traj=max_steps,
mask_env_batch_size=mask_env_batch_size,
pin_memory=False,
split_trajs=True,
)
ccollector.set_seed(seed)
for i, b in enumerate(ccollector):
assert b.batch_size == torch.Size(
[
b["collector", "traj_ids"].unique(sorted=False).shape[0],
*env_unmasked_dims,
max_steps,
]
)
if i == 1:
break
ccollector.shutdown()

# Multi sync no split traj
ccollector = MultiSyncDataCollector(
create_env_fn=[env for _ in range(n_collector_workers)],
policy=policy,
frames_per_batch=frames_per_batch,
mask_env_batch_size=mask_env_batch_size,
pin_memory=False,
split_trajs=False,
)
ccollector.set_seed(seed)
for i, b in enumerate(ccollector):
assert b.batch_size == torch.Size([frames_per_batch, *env_unmasked_dims])
if i == 1:
break
ccollector.shutdown()

# Multi sync split traj
ccollector = MultiSyncDataCollector(
create_env_fn=[env for _ in range(n_collector_workers)],
policy=policy,
frames_per_batch=frames_per_batch,
max_frames_per_traj=max_steps,
mask_env_batch_size=mask_env_batch_size,
pin_memory=False,
split_trajs=True,
)
ccollector.set_seed(seed)
for i, b in enumerate(ccollector):
assert b.batch_size == torch.Size(
[
b["collector", "traj_ids"].unique(sorted=False).shape[0],
*env_unmasked_dims,
max_steps,
]
)
if i == 1:
break
ccollector.shutdown()


@pytest.mark.parametrize("num_env", [1, 2])
@pytest.mark.parametrize("env_name", ["vec", "conv"])
def test_concurrent_collector_seed(num_env, env_name, seed=100):
Expand Down
7 changes: 4 additions & 3 deletions test/test_postprocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest
import torch

from _utils_internal import get_available_devices
from tensordict.tensordict import assert_allclose_td, TensorDict
from torchrl.collectors.utils import split_trajectories
Expand Down Expand Up @@ -121,16 +122,16 @@ def create_fake_trajs(
traj_ids[done] = traj_ids.max() + torch.arange(1, done.sum() + 1)
step_count[done] = 0

out = torch.stack(out, 1).contiguous()
out = torch.stack(out, 1).view(-1).contiguous()
return out

@pytest.mark.parametrize("num_workers", range(3, 34, 3))
@pytest.mark.parametrize("traj_len", [10, 17, 50, 97])
def test_splits(self, num_workers, traj_len):

trajs = TestSplits.create_fake_trajs(num_workers, traj_len)
assert trajs.shape[0] == num_workers
assert trajs.shape[1] == traj_len
assert trajs.shape[0] == num_workers * traj_len
assert len(trajs.shape) == 1
split_trajs = split_trajectories(trajs)
assert (
split_trajs.shape[0] == split_trajs.get(("collector", "traj_ids")).max() + 1
Expand Down
Loading