Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 45 additions & 25 deletions ncore/impl/common/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ def time_bounds(timestamps_us: List[int], seek_sec: Optional[float], duration_se
end_timestamp_us = int(timestamps_us[-1])

if seek_sec is not None:
assert seek_sec >= 0.0, "Require positive seek time"
if seek_sec < 0.0:
raise ValueError("Require positive seek time")
start_timestamp_us += int(seek_sec * 1e6)

if duration_sec is not None:
assert duration_sec > 0.0, "Require positive duration time"
if duration_sec <= 0.0:
raise ValueError("Require positive duration time")
end_timestamp_us = start_timestamp_us + int(duration_sec * 1e6)

assert start_timestamp_us < end_timestamp_us, "Arguments lead to invalid time bounds"
if start_timestamp_us >= end_timestamp_us:
raise ValueError("Arguments lead to invalid time bounds")

return start_timestamp_us, end_timestamp_us

Expand All @@ -81,9 +84,12 @@ def from_start_end(start: int, end: int) -> HalfClosedInterval:

def __post_init__(self) -> None:
"""Makes sure interval is well-defined"""
assert isinstance(self.start, int)
assert isinstance(self.stop, int)
assert self.start <= self.stop
if not isinstance(self.start, int):
raise TypeError(f"start must be int, got {type(self.start).__name__}")
if not isinstance(self.stop, int):
raise TypeError(f"stop must be int, got {type(self.stop).__name__}")
if self.start > self.stop:
raise ValueError(f"start ({self.start}) must be <= stop ({self.stop})")

def __contains__(self, item: Union[int, np.integer, HalfClosedInterval]) -> bool:
"""Determines if an item / other interval is contained in the interval"""
Expand Down Expand Up @@ -157,12 +163,12 @@ def __init__(self, poses, timestamps):
"""

poses = np.asarray(poses)
assert poses.ndim == 3 and poses.shape[1:] == (4, 4) and np.issubdtype(poses.dtype, np.floating), (
"Invalid poses input"
)
if not (poses.ndim == 3 and poses.shape[1:] == (4, 4) and np.issubdtype(poses.dtype, np.floating)):
raise ValueError("Invalid poses input")

timestamps = np.asarray(timestamps)
assert timestamps.ndim == 1 and len(timestamps) > 1, "Invalid timestamps input"
if not (timestamps.ndim == 1 and len(timestamps) > 1):
raise ValueError("Invalid timestamps input")

self._poses = poses
self._timestamps = timestamps
Expand Down Expand Up @@ -482,9 +488,12 @@ def is_within_3d_bboxes(pc: np.ndarray, bboxes: np.ndarray) -> np.ndarray:
point_in_bboxes; [N,M] boolean array.
"""

assert np.shape(pc)[1] == 3, "Wrong PC input size"
assert np.ndim(bboxes) == 2, "bboxes need to be a 2D numpy array"
assert np.shape(bboxes)[1] == 9, "bboxes need to be a 2D numpy array"
if np.shape(pc)[1] != 3:
raise ValueError("Wrong PC input size")
if np.ndim(bboxes) != 2:
raise ValueError("bboxes need to be a 2D numpy array")
if np.shape(bboxes)[1] != 9:
raise ValueError("bboxes need to be a 2D numpy array")

centers = bboxes[..., 0:3]
dims = bboxes[..., 3:6]
Expand Down Expand Up @@ -593,16 +602,16 @@ def motion_compensate_points(
"""

# Sanity check timestamp consistency
assert len(xyz_pointtime) == len(timestamp_us)
if len(xyz_pointtime) != len(timestamp_us):
raise ValueError("Point timestamps not in frame time bounds")

if not len(xyz_pointtime):
return self.MotionCompensationResult(
np.empty_like(xyz_pointtime, shape=(0, 3)), np.empty_like(xyz_pointtime, shape=(0, 3))
)

assert frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us, (
"Point timestamps not in frame time bounds"
)
if not (frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us):
raise ValueError("Point timestamps not in frame time bounds")

# Interpolate egomotion at frame end timestamp for sensor reference pose at end-of-frame time
T_world_sensorRef = self._pose_graph.evaluate_poses(
Expand Down Expand Up @@ -649,14 +658,14 @@ def motion_decompensate_points(
"""

# Sanity check timestamp consistency
assert len(xyz_sensorend) == len(timestamp_us)
if len(xyz_sensorend) != len(timestamp_us):
raise ValueError("Point timestamps not in frame time bounds")

if not len(xyz_sensorend):
return np.empty_like(xyz_sensorend, shape=(0, 3))

assert frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us, (
"Point timestamps not in frame time bounds"
)
if not (frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us):
raise ValueError("Point timestamps not in frame time bounds")

# Construct relative pose from end-of-frame reference coordinate system to start-of-frame coordinate system
T_sensor_worlds = self._pose_graph.evaluate_poses(
Expand Down Expand Up @@ -712,11 +721,21 @@ def __init__(

self.interpolator: Optional[PoseInterpolator] = None
if timestamps_us is not None:
assert len(self.T_source_target.shape) == 3 and self.T_source_target.shape[0] == len(timestamps_us)
assert self.T_source_target.shape[1:] == (4, 4)
if not (len(self.T_source_target.shape) == 3 and self.T_source_target.shape[0] == len(timestamps_us)):
raise ValueError(
f"T_source_target must have shape (n, 4, 4) with n == len(timestamps_us), "
f"got shape {self.T_source_target.shape} with {len(timestamps_us)} timestamps"
)
if self.T_source_target.shape[1:] != (4, 4):
raise ValueError(
f"T_source_target must have shape (n, 4, 4), got {self.T_source_target.shape}"
)
self.interpolator = PoseInterpolator(self.T_source_target, timestamps_us)
else:
assert self.T_source_target.shape == (4, 4)
if self.T_source_target.shape != (4, 4):
raise ValueError(
f"Static T_source_target must have shape (4, 4), got {self.T_source_target.shape}"
)

def get_T(
self, source_node: str, target_node: str, timestamps_us: npt.NDArray[np.uint64]
Expand All @@ -733,7 +752,8 @@ def get_T(
in the data-type of the edge
"""

assert np.issubdtype(timestamps_us.dtype, np.integer), "Timestamps must be of integer type"
if not np.issubdtype(timestamps_us.dtype, np.integer):
raise TypeError("Timestamps must be of integer type")

batch_shape = timestamps_us.shape

Expand Down
4 changes: 2 additions & 2 deletions ncore/impl/common/transformations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ def setUp(self):
def test_init_graph(self):
"""Test to verify pose graph initialization / path computation is correct"""

with self.assertRaises(AssertionError):
with self.assertRaises(ValueError):
# invalid edge
PoseGraphInterpolator.Edge("V3", "V4", np.stack([np.eye(4), get_SE3(np.array([0, 0, 1]))]), None)

with self.assertRaises(AssertionError):
with self.assertRaises(ValueError):
# invalid edge
PoseGraphInterpolator.Edge("V3", "V4", get_SE3(np.array([0, 1, 0])), np.array([0, 10], dtype=np.uint64))

Expand Down
6 changes: 4 additions & 2 deletions ncore/impl/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def _update_from_file(filename: UPath, hash: "Hash", chunk_size: int) -> "Hash":
Returns:
The updated hash object
"""
assert filename.is_file()
if not filename.is_file():
raise FileNotFoundError(f"File not found: {filename}")
with filename.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash.update(chunk)
Expand Down Expand Up @@ -80,7 +81,8 @@ def _update_from_dir(directory: UPath, hash: "Hash", chunk_size: int) -> "Hash":
Returns:
The updated hash object
"""
assert directory.is_dir()
if not directory.is_dir():
raise FileNotFoundError(f"Directory not found: {directory}")
for path in sorted(directory.iterdir(), key=lambda p: str(p).lower()):
hash.update(path.name.encode())
if path.is_file():
Expand Down
5 changes: 2 additions & 3 deletions ncore/impl/data/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,8 @@ def get_closest_frame_index(self, timestamp_us: int, relative_frame_time: float
elif relative_frame_time == 1.0:
target_timestamps_us = self.frames_timestamps_us[:, 1]
else:
assert 0.0 <= relative_frame_time <= 1.0, (
f"relative_frame_time must be in [0, 1], got {relative_frame_time}"
)
if not (0.0 <= relative_frame_time <= 1.0):
raise ValueError(f"relative_frame_time must be in [0, 1], got {relative_frame_time}")
target_timestamps_us = (
self.frames_timestamps_us[:, 0]
+ relative_frame_time * (self.frames_timestamps_us[:, 1] - self.frames_timestamps_us[:, 0])
Expand Down
Loading
Loading