-
Notifications
You must be signed in to change notification settings - Fork 0
Vberenz/o80 robot ball #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
import nptyping as npt | ||
|
||
import random | ||
import copy | ||
import math | ||
import pathlib | ||
import h5py | ||
|
@@ -23,7 +24,13 @@ | |
assert int(npt.__version__[0]) >= 2, "Need nptyping >=2." | ||
|
||
# 3: 3d position , Any: nb of points in trajectory | ||
Trajectory = npt.NDArray[npt.Shape["*, 3"], npt.Float32] | ||
BallTrajectory = npt.NDArray[npt.Shape["*, 3"], npt.Float32] | ||
|
||
# 7: 3d position of ball, and 4d position of robot | ||
BallRobotTrajectory = npt.NDArray[npt.Shape["*, 7"], npt.Float32] | ||
|
||
# we support both BallTrajectory and BallRobotTrajectory | ||
Trajectory = typing.Union[BallTrajectory, BallRobotTrajectory] | ||
|
||
# List of time stamps, in microseconds | ||
TimeStamps = npt.NDArray[ | ||
|
@@ -83,6 +90,9 @@ def to_stamped_trajectory(input: DurationTrajectory) -> StampedTrajectory: | |
return stamps, positions | ||
|
||
|
||
def _p(l): | ||
return " ".join([f"{abs(v):.2f}" for v in l]) | ||
|
||
def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: | ||
""" | ||
Converts a StampedTrajectories to a DurationTrajectory. | ||
|
@@ -216,6 +226,17 @@ def get_stamped_trajectories( | |
for index in indexes | ||
} | ||
|
||
def get_attributes( | ||
self, group: str, index: int | ||
)->typing.Dict[str,typing.Union[str,int,float]]: | ||
""" | ||
Returns all the attributes attached to the trajectory | ||
""" | ||
r = {} | ||
for key,value in self._f[group].attrs.items(): | ||
r[key]=value | ||
return r | ||
|
||
def close(self): | ||
""" | ||
Close the hdf5 file | ||
|
@@ -259,6 +280,17 @@ def rm_group(self, group: str) -> None: | |
raise KeyError("No such group: {}".format(group)) | ||
del self._f[group] | ||
|
||
def add_attribute( | ||
self, | ||
group: str, index: int, | ||
key: str, value: typing.Union[str,int,float] | ||
)->None: | ||
""" | ||
Add an attribute to the trajectory | ||
""" | ||
group: h5py._hl.group.Group = self._f[group] | ||
group.attrs[key]=value | ||
|
||
def overwrite( | ||
self, group: str, index: int, stamped_trajectory: StampedTrajectory | ||
) -> None: | ||
|
@@ -274,6 +306,233 @@ def overwrite( | |
traj_group.create_dataset(self._TIME_STAMPS, data=time_stamps) | ||
traj_group.create_dataset(self._TRAJECTORY, data=positions) | ||
|
||
def add_ball_robot_trajectories( | ||
self, group_name: str, o80_robot_ball_path: pathlib.Path | ||
) -> int: | ||
""" | ||
It is assumed that o80_robot_ball_path is a directory hosting | ||
a collection of files named o80_robot_ball_* that have been generated | ||
by the executable o80_robot_ball_logger (package o80_pam). This function | ||
will parse all these files and add them to the hdf5 under the specified | ||
group name. | ||
|
||
Returns | ||
------- | ||
The number of trajectories added to the file. | ||
""" | ||
|
||
def _read_trajectory(o80_robot_ball_file: pathlib.Path) -> StampedTrajectory: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The amount of nested functions here makes it a bit difficult to follow the code. Maybe it would make sense to move them to a separate module? |
||
|
||
Vector3d = typing.Tuple[float, float, float] | ||
Vector4d = typing.Tuple[float, float, float, float] | ||
|
||
class _Ball: | ||
""" | ||
Information regarding the ball at a given step. | ||
A negative ball_id means the ball was not detected | ||
during this step. | ||
""" | ||
|
||
def __init__( | ||
self, ball_id: int, position: Vector3d, velocity: Vector3d | ||
): | ||
self.ball_id = ball_id | ||
self.position = position | ||
self.velocity = velocity | ||
|
||
class _Robot: | ||
""" | ||
Information regarding the robot at a given step. | ||
Position in radian and velocity in radian per second. | ||
""" | ||
|
||
def __init__(self, position: Vector4d, velocity: Vector4d): | ||
self.position = position | ||
self.velocity = velocity | ||
|
||
Step = typing.Tuple[int, _Ball, _Robot] | ||
""" | ||
represent the observation at a given step, | ||
the first value being the timestamp (in nanoseconds) | ||
""" | ||
|
||
def _readline(line: str) -> Step: | ||
""" | ||
Parse a line of the file, and returns the corresponding | ||
observation. | ||
""" | ||
entries = eval(line) | ||
timestamp = entries[1][0] | ||
ball_id = entries[0][0] | ||
ball_position = entries[0][2] | ||
ball_velocity = entries[0][3] | ||
ball = _Ball(ball_id, ball_position, ball_velocity) | ||
robot_position = entries[1][1] | ||
robot_velocity = entries[1][2] | ||
robot = _Robot(robot_position, robot_velocity) | ||
return timestamp, ball, robot | ||
|
||
def _interpolation( | ||
nb_steps: int, start: Vector3d, end: Vector3d | ||
) -> typing.List[Vector3d]: | ||
""" | ||
Returns a list of length nb_steps which | ||
interpolates between start and end | ||
""" | ||
r: typing.List[Vector3d] = [] | ||
one_step = [(e - s) / nb_steps for s, e in zip(start, end)] | ||
for step in range(nb_steps): | ||
position = [s + os * step for s, os in zip(start, one_step)] | ||
r.append(tuple(position)) # type: ignore | ||
return r | ||
|
||
def _trim(steps: typing.List[Step]) -> typing.List[Step]: | ||
""" | ||
Remove the first steps during which the ball has not been | ||
detected (if any). Raise a ValueError if the ball | ||
has not been detected at all. | ||
""" | ||
detection_start = -1 | ||
for index, step in enumerate(steps): | ||
if step[1].ball_id >= 0: | ||
detection_start = index | ||
break | ||
if detection_start < 0: | ||
raise ValueError( | ||
"the ball was not detected at all " | ||
"(ball id negative for all steps)" | ||
) | ||
return steps[detection_start:] | ||
|
||
def _recompute_ball_velocity(steps: typing.List[Step])->None: | ||
for current,following in zip(steps,steps[1:]): | ||
time_diff = (following[0]-current[0])*1e-9 | ||
following[1].velocity = [ | ||
(fp-cp)/time_diff for cp,fp | ||
in zip(current[1].position,following[1].position) | ||
] | ||
|
||
def _ball_id_duplicates(steps: typing.List[Step]) -> None: | ||
for s1,s2 in zip(steps,steps[1:]): | ||
if s1[1].ball_id == s2[1].ball_id: | ||
s2[1].ball_id = -1 | ||
|
||
def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: | ||
""" | ||
For some step, the ball_id is -1, indicating that | ||
the ball was not detected. For these steps, the | ||
position of the ball is incorrect. We replace it | ||
by a value obtained by linearly interpolating between the | ||
previous and next valid steps. | ||
(all first steps with no ball detections are trimmed) | ||
""" | ||
# if the ball is not detected at the start, | ||
# removing the corresponding steps | ||
steps = _trim(steps) | ||
|
||
# sometimes there is the same ball id for two steps in a row, | ||
# i.e. new ball information did not come up yet during recording | ||
# setting these ball id to -1 | ||
_ball_id_duplicates(steps) | ||
|
||
# if the ball is not detected at the end, | ||
# also removing the corresponding steps | ||
steps.reverse() | ||
steps = _trim(steps) | ||
steps.reverse() | ||
|
||
init_steps = copy.deepcopy(steps) | ||
|
||
# computing missing position (via | ||
# linear interpolation) | ||
last_detection = 0 | ||
detecting = True | ||
for index, step in enumerate(steps): | ||
ball = step[1] | ||
if ball.ball_id >= 0: | ||
if not detecting: | ||
values = _interpolation( | ||
index - last_detection, | ||
steps[last_detection][1].position, | ||
ball.position, | ||
) | ||
for index_, position in enumerate(values): | ||
steps[index_ + last_detection][1].position = position | ||
last_detection = index | ||
detecting = True | ||
else: | ||
detecting = False | ||
|
||
# after computing the position, fixing the velocity | ||
_recompute_ball_velocity(steps) | ||
|
||
return steps | ||
|
||
# reading the file | ||
with open(o80_robot_ball_file, "r") as f: | ||
lines = f.readlines() | ||
|
||
# parsing the content of the file | ||
steps: typing.List[Step] | ||
steps = [_readline(line) for line in lines] | ||
|
||
# fixing the steps for which the ball | ||
# was not detected (ball_id < 0) | ||
# (fixing: performing linear interpolation to fill the gaps) | ||
steps = _fix_undetected_ball(steps) | ||
|
||
# casting to stamped trajectory | ||
start_time = steps[0][0] | ||
time_stamps = np.array([(step[0] - start_time) * 1e-3 for step in steps]) | ||
positions = np.array( | ||
[list(step[1].position) + list(step[2].position) for step in steps] | ||
) | ||
|
||
to_duration_trajectory((time_stamps, positions)) | ||
|
||
return time_stamps, positions | ||
|
||
def _read_folder(tennicam_path: pathlib.Path) -> StampedTrajectories: | ||
""" | ||
List all the file in tennicam_path that have the tennicam_ prefix, | ||
parse them and returns the corresponding list of stamped trajectories. | ||
""" | ||
files = _list_files(o80_robot_ball_path, prefix="o80_robot_ball_") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There seems to be a mismatch between the prefix mentioned in the docstring and the one actually used. |
||
stamped_trajectories = [ | ||
_read_trajectory(o80_robot_ball_path / f) for f in files | ||
] | ||
return stamped_trajectories | ||
|
||
def _save_trajectory( | ||
group: h5py._hl.group.Group, | ||
index: int, | ||
stamped_trajectory: StampedTrajectory, | ||
): | ||
""" | ||
Create in the group a new subgroup named according to the index | ||
and add to it 2 datasets, "time_stamps" (list of microseconds | ||
time stamps) and "trajectory" (list of corresponding 3d positions) | ||
""" | ||
# creating a new group for this trajectory | ||
traj_group = group.create_group(str(index)) | ||
# adding 2 datasets: time_stamps and positions | ||
time_stamps = stamped_trajectory[0] | ||
positions = stamped_trajectory[1] | ||
traj_group.create_dataset(self._TIME_STAMPS, data=time_stamps) | ||
traj_group.create_dataset(self._TRAJECTORY, data=positions) | ||
|
||
# reading all trajectories present in the directory | ||
stamped_trajectories = _read_folder(o80_robot_ball_path) | ||
|
||
# adding the new group to the hdf5 file | ||
group = self._f.create_group(group_name) | ||
|
||
# adding all trajectories as datasets to this group | ||
for index, stamped_trajectory in enumerate(stamped_trajectories): | ||
_save_trajectory(group, index, stamped_trajectory) | ||
|
||
return len(stamped_trajectories) | ||
|
||
def add_tennicam_trajectories( | ||
self, group_name: str, tennicam_path: pathlib.Path | ||
) -> int: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function could have a more descriptive name.