diff --git a/ncore/impl/common/transformations.py b/ncore/impl/common/transformations.py index a6877591..3dbab66a 100644 --- a/ncore/impl/common/transformations.py +++ b/ncore/impl/common/transformations.py @@ -50,14 +50,17 @@ def time_bounds(timestamps_us: List[int], seek_sec: Optional[float], duration_se end_timestamp_us = int(timestamps_us[-1]) if seek_sec is not None: - assert seek_sec >= 0.0, "Require positive seek time" + if seek_sec < 0.0: + raise ValueError("Require positive seek time") start_timestamp_us += int(seek_sec * 1e6) if duration_sec is not None: - assert duration_sec > 0.0, "Require positive duration time" + if duration_sec <= 0.0: + raise ValueError("Require positive duration time") end_timestamp_us = start_timestamp_us + int(duration_sec * 1e6) - assert start_timestamp_us < end_timestamp_us, "Arguments lead to invalid time bounds" + if start_timestamp_us >= end_timestamp_us: + raise ValueError("Arguments lead to invalid time bounds") return start_timestamp_us, end_timestamp_us @@ -81,9 +84,12 @@ def from_start_end(start: int, end: int) -> HalfClosedInterval: def __post_init__(self) -> None: """Makes sure interval is well-defined""" - assert isinstance(self.start, int) - assert isinstance(self.stop, int) - assert self.start <= self.stop + if not isinstance(self.start, int): + raise TypeError(f"start must be int, got {type(self.start).__name__}") + if not isinstance(self.stop, int): + raise TypeError(f"stop must be int, got {type(self.stop).__name__}") + if self.start > self.stop: + raise ValueError(f"start ({self.start}) must be <= stop ({self.stop})") def __contains__(self, item: Union[int, np.integer, HalfClosedInterval]) -> bool: """Determines if an item / other interval is contained in the interval""" @@ -157,12 +163,12 @@ def __init__(self, poses, timestamps): """ poses = np.asarray(poses) - assert poses.ndim == 3 and poses.shape[1:] == (4, 4) and np.issubdtype(poses.dtype, np.floating), ( - "Invalid poses input" - ) + if not (poses.ndim == 3 and poses.shape[1:] == (4, 4) and np.issubdtype(poses.dtype, np.floating)): + raise ValueError("Invalid poses input") timestamps = np.asarray(timestamps) - assert timestamps.ndim == 1 and len(timestamps) > 1, "Invalid timestamps input" + if not (timestamps.ndim == 1 and len(timestamps) > 1): + raise ValueError("Invalid timestamps input") self._poses = poses self._timestamps = timestamps @@ -482,9 +488,12 @@ def is_within_3d_bboxes(pc: np.ndarray, bboxes: np.ndarray) -> np.ndarray: point_in_bboxes; [N,M] boolean array. """ - assert np.shape(pc)[1] == 3, "Wrong PC input size" - assert np.ndim(bboxes) == 2, "bboxes need to be a 2D numpy array" - assert np.shape(bboxes)[1] == 9, "bboxes need to be a 2D numpy array" + if np.shape(pc)[1] != 3: + raise ValueError("Wrong PC input size") + if np.ndim(bboxes) != 2: + raise ValueError("bboxes need to be a 2D numpy array") + if np.shape(bboxes)[1] != 9: + raise ValueError("bboxes need to be a 2D numpy array") centers = bboxes[..., 0:3] dims = bboxes[..., 3:6] @@ -593,16 +602,16 @@ def motion_compensate_points( """ # Sanity check timestamp consistency - assert len(xyz_pointtime) == len(timestamp_us) + if len(xyz_pointtime) != len(timestamp_us): + raise ValueError("Point timestamps not in frame time bounds") if not len(xyz_pointtime): return self.MotionCompensationResult( np.empty_like(xyz_pointtime, shape=(0, 3)), np.empty_like(xyz_pointtime, shape=(0, 3)) ) - assert frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us, ( - "Point timestamps not in frame time bounds" - ) + if not (frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us): + raise ValueError("Point timestamps not in frame time bounds") # Interpolate egomotion at frame end timestamp for sensor reference pose at end-of-frame time T_world_sensorRef = self._pose_graph.evaluate_poses( @@ -649,14 +658,14 @@ def motion_decompensate_points( """ # Sanity check timestamp consistency - assert len(xyz_sensorend) == len(timestamp_us) + if len(xyz_sensorend) != len(timestamp_us): + raise ValueError("Point timestamps not in frame time bounds") if not len(xyz_sensorend): return np.empty_like(xyz_sensorend, shape=(0, 3)) - assert frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us, ( - "Point timestamps not in frame time bounds" - ) + if not (frame_start_timestamp_us <= timestamp_us.min() and timestamp_us.max() <= frame_end_timestamp_us): + raise ValueError("Point timestamps not in frame time bounds") # Construct relative pose from end-of-frame reference coordinate system to start-of-frame coordinate system T_sensor_worlds = self._pose_graph.evaluate_poses( @@ -712,11 +721,21 @@ def __init__( self.interpolator: Optional[PoseInterpolator] = None if timestamps_us is not None: - assert len(self.T_source_target.shape) == 3 and self.T_source_target.shape[0] == len(timestamps_us) - assert self.T_source_target.shape[1:] == (4, 4) + if not (len(self.T_source_target.shape) == 3 and self.T_source_target.shape[0] == len(timestamps_us)): + raise ValueError( + f"T_source_target must have shape (n, 4, 4) with n == len(timestamps_us), " + f"got shape {self.T_source_target.shape} with {len(timestamps_us)} timestamps" + ) + if self.T_source_target.shape[1:] != (4, 4): + raise ValueError( + f"T_source_target must have shape (n, 4, 4), got {self.T_source_target.shape}" + ) self.interpolator = PoseInterpolator(self.T_source_target, timestamps_us) else: - assert self.T_source_target.shape == (4, 4) + if self.T_source_target.shape != (4, 4): + raise ValueError( + f"Static T_source_target must have shape (4, 4), got {self.T_source_target.shape}" + ) def get_T( self, source_node: str, target_node: str, timestamps_us: npt.NDArray[np.uint64] @@ -733,7 +752,8 @@ def get_T( in the data-type of the edge """ - assert np.issubdtype(timestamps_us.dtype, np.integer), "Timestamps must be of integer type" + if not np.issubdtype(timestamps_us.dtype, np.integer): + raise TypeError("Timestamps must be of integer type") batch_shape = timestamps_us.shape diff --git a/ncore/impl/common/transformations_test.py b/ncore/impl/common/transformations_test.py index 6c30222e..02a15f04 100644 --- a/ncore/impl/common/transformations_test.py +++ b/ncore/impl/common/transformations_test.py @@ -334,11 +334,11 @@ def setUp(self): def test_init_graph(self): """Test to verify pose graph initialization / path computation is correct""" - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): # invalid edge PoseGraphInterpolator.Edge("V3", "V4", np.stack([np.eye(4), get_SE3(np.array([0, 0, 1]))]), None) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): # invalid edge PoseGraphInterpolator.Edge("V3", "V4", get_SE3(np.array([0, 1, 0])), np.array([0, 10], dtype=np.uint64)) diff --git a/ncore/impl/common/util.py b/ncore/impl/common/util.py index 8f88d0dd..f26dd711 100644 --- a/ncore/impl/common/util.py +++ b/ncore/impl/common/util.py @@ -44,7 +44,8 @@ def _update_from_file(filename: UPath, hash: "Hash", chunk_size: int) -> "Hash": Returns: The updated hash object """ - assert filename.is_file() + if not filename.is_file(): + raise FileNotFoundError(f"File not found: {filename}") with filename.open("rb") as f: for chunk in iter(lambda: f.read(chunk_size), b""): hash.update(chunk) @@ -80,7 +81,8 @@ def _update_from_dir(directory: UPath, hash: "Hash", chunk_size: int) -> "Hash": Returns: The updated hash object """ - assert directory.is_dir() + if not directory.is_dir(): + raise FileNotFoundError(f"Directory not found: {directory}") for path in sorted(directory.iterdir(), key=lambda p: str(p).lower()): hash.update(path.name.encode()) if path.is_file(): diff --git a/ncore/impl/data/compat.py b/ncore/impl/data/compat.py index 2166685f..f142b605 100644 --- a/ncore/impl/data/compat.py +++ b/ncore/impl/data/compat.py @@ -396,9 +396,8 @@ def get_closest_frame_index(self, timestamp_us: int, relative_frame_time: float elif relative_frame_time == 1.0: target_timestamps_us = self.frames_timestamps_us[:, 1] else: - assert 0.0 <= relative_frame_time <= 1.0, ( - f"relative_frame_time must be in [0, 1], got {relative_frame_time}" - ) + if not (0.0 <= relative_frame_time <= 1.0): + raise ValueError(f"relative_frame_time must be in [0, 1], got {relative_frame_time}") target_timestamps_us = ( self.frames_timestamps_us[:, 0] + relative_frame_time * (self.frames_timestamps_us[:, 1] - self.frames_timestamps_us[:, 0]) diff --git a/ncore/impl/data/types.py b/ncore/impl/data/types.py index fdfc36d2..3aab1c48 100644 --- a/ncore/impl/data/types.py +++ b/ncore/impl/data/types.py @@ -127,19 +127,28 @@ def type() -> str: def __post_init__(self): # Sanity checks - assert isinstance(self.reference_poly, ReferencePolynomial) + if not isinstance(self.reference_poly, ReferencePolynomial): + raise TypeError(f"reference_poly must be ReferencePolynomial, got {type(self.reference_poly).__name__}") - assert self.horizontal_poly.ndim == 1 - assert self.horizontal_poly.dtype == np.dtype("float32") + if self.horizontal_poly.ndim != 1: + raise ValueError(f"horizontal_poly must be 1-dimensional, got ndim={self.horizontal_poly.ndim}") + if self.horizontal_poly.dtype != np.dtype("float32"): + raise ValueError(f"horizontal_poly must have dtype float32, got {self.horizontal_poly.dtype}") - assert self.vertical_poly.ndim == 1 - assert self.horizontal_poly.dtype == np.dtype("float32") + if self.vertical_poly.ndim != 1: + raise ValueError(f"vertical_poly must be 1-dimensional, got ndim={self.vertical_poly.ndim}") + if self.vertical_poly.dtype != np.dtype("float32"): + raise ValueError(f"vertical_poly must have dtype float32, got {self.vertical_poly.dtype}") - assert self.horizontal_poly_inverse.ndim == 1 - assert self.horizontal_poly.dtype == np.dtype("float32") + if self.horizontal_poly_inverse.ndim != 1: + raise ValueError(f"horizontal_poly_inverse must be 1-dimensional, got ndim={self.horizontal_poly_inverse.ndim}") + if self.horizontal_poly_inverse.dtype != np.dtype("float32"): + raise ValueError(f"horizontal_poly_inverse must have dtype float32, got {self.horizontal_poly_inverse.dtype}") - assert self.vertical_poly_inverse.ndim == 1 - assert self.horizontal_poly.dtype == np.dtype("float32") + if self.vertical_poly_inverse.ndim != 1: + raise ValueError(f"vertical_poly_inverse must be 1-dimensional, got ndim={self.vertical_poly_inverse.ndim}") + if self.vertical_poly_inverse.dtype != np.dtype("float32"): + raise ValueError(f"vertical_poly_inverse must have dtype float32, got {self.vertical_poly_inverse.dtype}") # Represents the collection of all concrete external distortion types @@ -185,15 +194,20 @@ def transform( def __post_init__(self): # Sanity checks - assert self.resolution.shape == (2,) - assert self.resolution.dtype == np.dtype("uint64") - assert self.resolution[0] > 0 and self.resolution[1] > 0 + if self.resolution.shape != (2,): + raise ValueError(f"resolution must have shape (2,), got {self.resolution.shape}") + if self.resolution.dtype != np.dtype("uint64"): + raise ValueError(f"resolution must have dtype uint64, got {self.resolution.dtype}") + if not (self.resolution[0] > 0 and self.resolution[1] > 0): + raise ValueError(f"resolution elements must be > 0, got {self.resolution}") if not isinstance(self.shutter_type, ShutterType): self.shutter_type = ShutterType(self.shutter_type) - assert self.shutter_type in ShutterType.__members__.values() + if self.shutter_type not in ShutterType.__members__.values(): + raise ValueError(f"shutter_type must be a valid ShutterType, got {self.shutter_type}") - assert isinstance(self.external_distortion_parameters, (type(None), ConcreteExternalDistortionParametersUnion)) + if not isinstance(self.external_distortion_parameters, (type(None), ConcreteExternalDistortionParametersUnion)): + raise TypeError(f"external_distortion_parameters must be None or ConcreteExternalDistortionParametersUnion, got {type(self.external_distortion_parameters).__name__}") @dataclass @@ -246,20 +260,29 @@ def fw_poly(self) -> np.ndarray: def __post_init__(self): # Sanity checks super().__post_init__() - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == np.dtype("float32") + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != np.dtype("float32"): + raise ValueError(f"principal_point must have dtype float32, got {self.principal_point.dtype}") if not isinstance(self.reference_poly, FThetaCameraModelParameters.PolynomialType): self.reference_poly = FThetaCameraModelParameters.PolynomialType(self.reference_poly) - assert self.reference_poly in FThetaCameraModelParameters.PolynomialType.__members__.values() - - assert self.pixeldist_to_angle_poly.ndim == 1 - assert len(self.pixeldist_to_angle_poly) <= self.POLYNOMIAL_DEGREE - assert self.pixeldist_to_angle_poly.dtype == np.dtype("float32") - - assert self.angle_to_pixeldist_poly.ndim == 1 - assert len(self.angle_to_pixeldist_poly) <= self.POLYNOMIAL_DEGREE - assert self.angle_to_pixeldist_poly.dtype == np.dtype("float32") + if self.reference_poly not in FThetaCameraModelParameters.PolynomialType.__members__.values(): + raise ValueError(f"reference_poly must be a valid PolynomialType, got {self.reference_poly}") + + if self.pixeldist_to_angle_poly.ndim != 1: + raise ValueError(f"pixeldist_to_angle_poly must be 1-dimensional, got ndim={self.pixeldist_to_angle_poly.ndim}") + if len(self.pixeldist_to_angle_poly) > self.POLYNOMIAL_DEGREE: + raise ValueError(f"pixeldist_to_angle_poly length must be <= {self.POLYNOMIAL_DEGREE}, got {len(self.pixeldist_to_angle_poly)}") + if self.pixeldist_to_angle_poly.dtype != np.dtype("float32"): + raise ValueError(f"pixeldist_to_angle_poly must have dtype float32, got {self.pixeldist_to_angle_poly.dtype}") + + if self.angle_to_pixeldist_poly.ndim != 1: + raise ValueError(f"angle_to_pixeldist_poly must be 1-dimensional, got ndim={self.angle_to_pixeldist_poly.ndim}") + if len(self.angle_to_pixeldist_poly) > self.POLYNOMIAL_DEGREE: + raise ValueError(f"angle_to_pixeldist_poly length must be <= {self.POLYNOMIAL_DEGREE}, got {len(self.angle_to_pixeldist_poly)}") + if self.angle_to_pixeldist_poly.dtype != np.dtype("float32"): + raise ValueError(f"angle_to_pixeldist_poly must have dtype float32, got {self.angle_to_pixeldist_poly.dtype}") # pad polynomials to full size self.pixeldist_to_angle_poly = np.pad( @@ -275,10 +298,13 @@ def __post_init__(self): constant_values=0.0, ) - assert self.max_angle > 0.0 + if self.max_angle <= 0.0: + raise ValueError(f"max_angle must be > 0.0, got {self.max_angle}") - assert self.linear_cde.shape == (3,) - assert self.linear_cde.dtype == np.dtype("float32") + if self.linear_cde.shape != (3,): + raise ValueError(f"linear_cde must have shape (3,), got {self.linear_cde.shape}") + if self.linear_cde.dtype != np.dtype("float32"): + raise ValueError(f"linear_cde must have dtype float32, got {self.linear_cde.dtype}") # some datasets might store _invalid_ linear terms (all zero) - workaround by setting these to default linear term if np.allclose(self.linear_cde, 0.0): @@ -319,7 +345,8 @@ def transform( # Otherwise make sure the scaled resolution is integer else: resolution = self.resolution * image_domain_scale_factors - assert all([r.is_integer() for r in resolution]), "Resolution must be integer after scaling" + if not all([r.is_integer() for r in resolution]): + raise ValueError(f"Resolution must be integer after scaling, got {resolution}") # Scale / offset principal point location by transforming it in the scaled image (make sure to account for 0.5px offset # of the image domain, as the stored parameters are represented with (0,0) at the center of the first pixel) @@ -395,21 +422,32 @@ def type() -> str: def __post_init__(self): # Sanity checks super().__post_init__() - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == np.dtype("float32") - - assert self.focal_length.shape == (2,) - assert self.focal_length.dtype == np.dtype("float32") - assert self.focal_length[0] > 0.0 and self.focal_length[1] > 0.0 - - assert self.radial_coeffs.shape == (6,) - assert self.radial_coeffs.dtype == np.dtype("float32") - - assert self.tangential_coeffs.shape == (2,) - assert self.tangential_coeffs.dtype == np.dtype("float32") - - assert self.thin_prism_coeffs.shape == (4,) - assert self.thin_prism_coeffs.dtype == np.dtype("float32") + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != np.dtype("float32"): + raise ValueError(f"principal_point must have dtype float32, got {self.principal_point.dtype}") + + if self.focal_length.shape != (2,): + raise ValueError(f"focal_length must have shape (2,), got {self.focal_length.shape}") + if self.focal_length.dtype != np.dtype("float32"): + raise ValueError(f"focal_length must have dtype float32, got {self.focal_length.dtype}") + if not (self.focal_length[0] > 0.0 and self.focal_length[1] > 0.0): + raise ValueError(f"focal_length elements must be > 0.0, got {self.focal_length}") + + if self.radial_coeffs.shape != (6,): + raise ValueError(f"radial_coeffs must have shape (6,), got {self.radial_coeffs.shape}") + if self.radial_coeffs.dtype != np.dtype("float32"): + raise ValueError(f"radial_coeffs must have dtype float32, got {self.radial_coeffs.dtype}") + + if self.tangential_coeffs.shape != (2,): + raise ValueError(f"tangential_coeffs must have shape (2,), got {self.tangential_coeffs.shape}") + if self.tangential_coeffs.dtype != np.dtype("float32"): + raise ValueError(f"tangential_coeffs must have dtype float32, got {self.tangential_coeffs.dtype}") + + if self.thin_prism_coeffs.shape != (4,): + raise ValueError(f"thin_prism_coeffs must have shape (4,), got {self.thin_prism_coeffs.shape}") + if self.thin_prism_coeffs.dtype != np.dtype("float32"): + raise ValueError(f"thin_prism_coeffs must have dtype float32, got {self.thin_prism_coeffs.dtype}") def transform( self, @@ -446,7 +484,8 @@ def transform( # Otherwise make sure the scaled resolution is integer else: resolution = self.resolution * image_domain_scale_factors - assert all([r.is_integer() for r in resolution]), "Resolution must be integer after scaling" + if not all([r.is_integer() for r in resolution]): + raise ValueError(f"Resolution must be integer after scaling, got {resolution}") return dataclasses.replace( self, @@ -482,17 +521,25 @@ def type() -> str: def __post_init__(self): # Sanity checks super().__post_init__() - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == np.dtype("float32") - - assert self.focal_length.shape == (2,) - assert self.focal_length.dtype == np.dtype("float32") - assert self.focal_length[0] > 0.0 and self.focal_length[1] > 0.0 - - assert self.radial_coeffs.shape == (4,) - assert self.radial_coeffs.dtype == np.dtype("float32") - - assert self.max_angle > 0.0 + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != np.dtype("float32"): + raise ValueError(f"principal_point must have dtype float32, got {self.principal_point.dtype}") + + if self.focal_length.shape != (2,): + raise ValueError(f"focal_length must have shape (2,), got {self.focal_length.shape}") + if self.focal_length.dtype != np.dtype("float32"): + raise ValueError(f"focal_length must have dtype float32, got {self.focal_length.dtype}") + if not (self.focal_length[0] > 0.0 and self.focal_length[1] > 0.0): + raise ValueError(f"focal_length elements must be > 0.0, got {self.focal_length}") + + if self.radial_coeffs.shape != (4,): + raise ValueError(f"radial_coeffs must have shape (4,), got {self.radial_coeffs.shape}") + if self.radial_coeffs.dtype != np.dtype("float32"): + raise ValueError(f"radial_coeffs must have dtype float32, got {self.radial_coeffs.dtype}") + + if self.max_angle <= 0.0: + raise ValueError(f"max_angle must be > 0.0, got {self.max_angle}") def transform( self, @@ -529,7 +576,8 @@ def transform( # Otherwise make sure the scaled resolution is integer else: resolution = self.resolution * image_domain_scale_factors - assert all([r.is_integer() for r in resolution]), "Resolution must be integer after scaling" + if not all([r.is_integer() for r in resolution]): + raise ValueError(f"Resolution must be integer after scaling, got {resolution}") return dataclasses.replace( self, @@ -613,8 +661,10 @@ class BaseSpinningLidarModelParameters(BaseLidarModelParameters): def __post_init__(self): # Sanity checks - assert self.spinning_frequency_hz > 0.0 - assert self.spinning_direction in ["cw", "ccw"] + if self.spinning_frequency_hz <= 0.0: + raise ValueError(f"spinning_frequency_hz must be > 0.0, got {self.spinning_frequency_hz}") + if self.spinning_direction not in ["cw", "ccw"]: + raise ValueError(f"spinning_direction must be 'cw' or 'ccw', got '{self.spinning_direction}'") @dataclass() @@ -629,8 +679,10 @@ class BaseStructuredSpinningLidarModelParameters(BaseSpinningLidarModelParameter def __post_init__(self): # Sanity checks - assert self.n_rows > 0 - assert self.n_columns > 0 + if self.n_rows <= 0: + raise ValueError(f"n_rows must be > 0, got {self.n_rows}") + if self.n_columns <= 0: + raise ValueError(f"n_columns must be > 0, got {self.n_columns}") @dataclass() @@ -655,32 +707,34 @@ class RowOffsetStructuredSpinningLidarModelParameters( def __post_init__(self): # Sanity checks - assert self.row_elevations_rad.dtype == np.float32 - assert self.row_elevations_rad.shape == (self.n_rows,) - assert self.row_azimuth_offsets_rad.dtype == np.float32 - assert self.row_azimuth_offsets_rad.shape == (self.n_rows,) - assert self.column_azimuths_rad.dtype == np.float32 - assert self.column_azimuths_rad.shape == (self.n_columns,) + if self.row_elevations_rad.dtype != np.float32: + raise ValueError(f"row_elevations_rad must have dtype float32, got {self.row_elevations_rad.dtype}") + if self.row_elevations_rad.shape != (self.n_rows,): + raise ValueError(f"row_elevations_rad must have shape ({self.n_rows},), got {self.row_elevations_rad.shape}") + if self.row_azimuth_offsets_rad.dtype != np.float32: + raise ValueError(f"row_azimuth_offsets_rad must have dtype float32, got {self.row_azimuth_offsets_rad.dtype}") + if self.row_azimuth_offsets_rad.shape != (self.n_rows,): + raise ValueError(f"row_azimuth_offsets_rad must have shape ({self.n_rows},), got {self.row_azimuth_offsets_rad.shape}") + if self.column_azimuths_rad.dtype != np.float32: + raise ValueError(f"column_azimuths_rad must have dtype float32, got {self.column_azimuths_rad.dtype}") + if self.column_azimuths_rad.shape != (self.n_columns,): + raise ValueError(f"column_azimuths_rad must have shape ({self.n_columns},), got {self.column_azimuths_rad.shape}") # Check elevation angles are sorted consistently relative_row_elevations_rad = util.relative_angle(self.row_elevations_rad[0], self.row_elevations_rad, "cw") - assert np.all(np.diff(relative_row_elevations_rad.relative_angle_rad) > 0), ( - "Row elevation angles must be sorted in descending order (cw)" - ) - assert np.all(~relative_row_elevations_rad.wrap_around_flag), ( - "Row elevation angles must not wrap around the start element" - ) + if not np.all(np.diff(relative_row_elevations_rad.relative_angle_rad) > 0): + raise ValueError("Row elevation angles must be sorted in descending order (cw)") + if not np.all(~relative_row_elevations_rad.wrap_around_flag): + raise ValueError("Row elevation angles must not wrap around the start element") # Check order of column azimuth angles is consistent with spinning direction relative_column_azimuths_rad = util.relative_angle( self.column_azimuths_rad[0], self.column_azimuths_rad, self.spinning_direction ) - assert np.all(np.diff(relative_column_azimuths_rad.relative_angle_rad) > 0), ( - "Column azimuth angles must be sorted in the spinning direction so the diff between relative angles of consecutive columns should always be positive" - ) - assert np.all(~relative_row_elevations_rad.wrap_around_flag), ( - "Column azimuth angles (without offsets) must not wrap around the start element" - ) + if not np.all(np.diff(relative_column_azimuths_rad.relative_angle_rad) > 0): + raise ValueError("Column azimuth angles must be sorted in the spinning direction so the diff between relative angles of consecutive columns should always be positive") + if not np.all(~relative_row_elevations_rad.wrap_around_flag): + raise ValueError("Column azimuth angles (without offsets) must not wrap around the start element") @staticmethod def type() -> str: @@ -776,12 +830,18 @@ def from_array(cls, array: np.ndarray) -> BBox3: def __post_init__(self): # Sanity checks - assert isinstance(self.centroid, tuple) - assert all(isinstance(i, float) for i in self.centroid) - assert isinstance(self.dim, tuple) - assert all(isinstance(i, float) for i in self.dim) - assert isinstance(self.rot, tuple) - assert all(isinstance(i, float) for i in self.rot) + if not isinstance(self.centroid, tuple): + raise TypeError(f"centroid must be tuple, got {type(self.centroid).__name__}") + if not all(isinstance(i, float) for i in self.centroid): + raise TypeError(f"centroid elements must be float, got {[type(i).__name__ for i in self.centroid]}") + if not isinstance(self.dim, tuple): + raise TypeError(f"dim must be tuple, got {type(self.dim).__name__}") + if not all(isinstance(i, float) for i in self.dim): + raise TypeError(f"dim elements must be float, got {[type(i).__name__ for i in self.dim]}") + if not isinstance(self.rot, tuple): + raise TypeError(f"rot must be tuple, got {type(self.rot).__name__}") + if not all(isinstance(i, float) for i in self.rot): + raise TypeError(f"rot elements must be float, got {[type(i).__name__ for i in self.rot]}") # --------------------------------------------------------------------------- @@ -886,12 +946,14 @@ class LabelType(dataclasses_json.DataClassJsonMixin): def __post_init__(self): # Sanity checks - assert isinstance(self.category, LabelCategory) - assert isinstance(self.qualifier, str) - assert len(self.qualifier) > 0, ( - "Qualifier should be a non-empty string to avoid confusion with default LabelType" - ) - assert self.unit is None or isinstance(self.unit, LabelUnit) + if not isinstance(self.category, LabelCategory): + raise TypeError(f"category must be LabelCategory, got {type(self.category).__name__}") + if not isinstance(self.qualifier, str): + raise TypeError(f"qualifier must be str, got {type(self.qualifier).__name__}") + if len(self.qualifier) == 0: + raise ValueError("Qualifier should be a non-empty string to avoid confusion with default LabelType") + if not (self.unit is None or isinstance(self.unit, LabelUnit)): + raise TypeError(f"unit must be None or LabelUnit, got {type(self.unit).__name__}") # Well-known constants (assigned after class definition) DEPTH_Z_M: ClassVar[LabelType] @@ -943,12 +1005,10 @@ class QuantizationParams(dataclasses_json.DataClassJsonMixin): ) #: Numpy dtype for intermediate arithmetic during (de-)quantization def __post_init__(self): - assert np.issubdtype(self.quantized_dtype, np.integer), ( - f"quantized_dtype must be an integer type, got {self.quantized_dtype}" - ) - assert np.issubdtype(self.intermediate_dtype, np.floating), ( - f"intermediate_dtype must be a floating type, got {self.intermediate_dtype}" - ) + if not np.issubdtype(self.quantized_dtype, np.integer): + raise ValueError(f"quantized_dtype must be an integer type, got {self.quantized_dtype}") + if not np.issubdtype(self.intermediate_dtype, np.floating): + raise ValueError(f"intermediate_dtype must be a floating type, got {self.intermediate_dtype}") @dataclass(**({"slots": True, "frozen": True} if sys.version_info >= (3, 10) else {"frozen": True})) @@ -968,16 +1028,23 @@ class LabelSchema(dataclasses_json.DataClassJsonMixin): def __post_init__(self): # Sanity checks - assert isinstance(self.dtype, np.dtype) - assert isinstance(self.shape_suffix, tuple) and all(isinstance(i, int) for i in self.shape_suffix) - assert isinstance(self.encoding, LabelEncoding) + if not isinstance(self.dtype, np.dtype): + raise TypeError(f"dtype must be np.dtype, got {type(self.dtype).__name__}") + if not (isinstance(self.shape_suffix, tuple) and all(isinstance(i, int) for i in self.shape_suffix)): + raise TypeError(f"shape_suffix must be a tuple of ints, got {self.shape_suffix}") + if not isinstance(self.encoding, LabelEncoding): + raise TypeError(f"encoding must be LabelEncoding, got {type(self.encoding).__name__}") if self.encoding == LabelEncoding.IMAGE_ENCODED: - assert self.encoded_format is not None, "encoded_format must be provided when encoding is IMAGE_ENCODED" + if self.encoded_format is None: + raise ValueError("encoded_format must be provided when encoding is IMAGE_ENCODED") else: - assert self.encoded_format is None, "encoded_format should only be provided when encoding is IMAGE_ENCODED" + if self.encoded_format is not None: + raise ValueError("encoded_format should only be provided when encoding is IMAGE_ENCODED") if self.quantization is not None: - assert isinstance(self.quantization, QuantizationParams) - assert self.encoding == LabelEncoding.RAW, "Quantization is only supported for RAW encoding" + if not isinstance(self.quantization, QuantizationParams): + raise TypeError(f"quantization must be QuantizationParams, got {type(self.quantization).__name__}") + if self.encoding != LabelEncoding.RAW: + raise ValueError("Quantization is only supported for RAW encoding") @dataclass @@ -1050,18 +1117,26 @@ def transform( def __post_init__(self): # Sanity checks - assert isinstance(self.track_id, str) - assert isinstance(self.class_id, str) - assert isinstance(self.reference_frame_id, str) - assert isinstance(self.reference_frame_timestamp_us, int) - assert isinstance(self.bbox3, BBox3) - assert isinstance(self.timestamp_us, int) + if not isinstance(self.track_id, str): + raise TypeError(f"track_id must be str, got {type(self.track_id).__name__}") + if not isinstance(self.class_id, str): + raise TypeError(f"class_id must be str, got {type(self.class_id).__name__}") + if not isinstance(self.reference_frame_id, str): + raise TypeError(f"reference_frame_id must be str, got {type(self.reference_frame_id).__name__}") + if not isinstance(self.reference_frame_timestamp_us, int): + raise TypeError(f"reference_frame_timestamp_us must be int, got {type(self.reference_frame_timestamp_us).__name__}") + if not isinstance(self.bbox3, BBox3): + raise TypeError(f"bbox3 must be BBox3, got {type(self.bbox3).__name__}") + if not isinstance(self.timestamp_us, int): + raise TypeError(f"timestamp_us must be int, got {type(self.timestamp_us).__name__}") if not isinstance(self.source, LabelSource): self.source = LabelSource(self.source) - assert self.source in LabelSource.__members__.values() + if self.source not in LabelSource.__members__.values(): + raise ValueError(f"source must be a valid LabelSource, got {self.source}") - assert isinstance(self.source_version, (type(None), str)) + if not isinstance(self.source_version, (type(None), str)): + raise TypeError(f"source_version must be None or str, got {type(self.source_version).__name__}") @unique @@ -1278,7 +1353,11 @@ def default_instance_name(self) -> str: def __post_init__(self): # Sanity checks - assert isinstance(self.camera_id, str) and len(self.camera_id) > 0, "camera_id should be a non-empty string" - assert isinstance(self.label_type, LabelType) - assert isinstance(self.label_schema, LabelSchema) - assert isinstance(self.label_source, LabelSource) + if not (isinstance(self.camera_id, str) and len(self.camera_id) > 0): + raise ValueError("camera_id should be a non-empty string") + if not isinstance(self.label_type, LabelType): + raise TypeError(f"label_type must be LabelType, got {type(self.label_type).__name__}") + if not isinstance(self.label_schema, LabelSchema): + raise TypeError(f"label_schema must be LabelSchema, got {type(self.label_schema).__name__}") + if not isinstance(self.label_source, LabelSource): + raise TypeError(f"label_source must be LabelSource, got {type(self.label_source).__name__}") diff --git a/ncore/impl/data/v4/components.py b/ncore/impl/data/v4/components.py index 6ae409f2..236a78bf 100644 --- a/ncore/impl/data/v4/components.py +++ b/ncore/impl/data/v4/components.py @@ -267,7 +267,8 @@ def register_component_writer( can infer the correct extra-argument signature from the concrete writer class that is passed in.""" - assert len(component_instance_name) > 0, "Component instance name must not be empty" + if not len(component_instance_name) > 0: + raise ValueError("Component instance name must not be empty") # The signature uses Callable[Concatenate[...], CW] for type-safe kwargs # inference, but we also need access to ComponentWriter static class attributes. @@ -277,7 +278,8 @@ class that is passed in.""" component_base_name = writer_cls.get_component_name() component_id = f"{component_base_name}:{component_instance_name}" - assert component_id not in self._component_writers, f"Component writer for {component_id} already registered" + if component_id in self._component_writers: + raise ValueError(f"Component writer for {component_id} already registered") # Create the component in the requested group, separated by component base name component_group = ( @@ -359,7 +361,8 @@ def __init__( component_group_upaths: List[UPath] = self.expand_component_group_paths(component_group_paths) - assert len(component_group_upaths), "No component inputs provided" + if not len(component_group_upaths): + raise ValueError("No component inputs provided") # Load component stores concurrently (to hide latency) and check for sequence consistency self._component_stores: Dict[str, Tuple[zarr.Group, UPath]] = {} # use str as the generic path / URL type @@ -620,8 +623,9 @@ def generic_meta_data(self) -> Dict[str, types.JsonLike]: def validate_frame_name(name: str) -> str: - """Checks if the given name is a valid frame name (non-empty, no whitespace), returns it if valid, raises AssertionError otherwise""" - assert len(name) and not name.isspace(), f"Frame '{name}' is invalid, must not be empty or contain whitespace" + """Checks if the given name is a valid frame name (non-empty, no whitespace), returns it if valid, raises ValueError otherwise""" + if not (len(name) and not name.isspace()): + raise ValueError(f"Frame '{name}' is invalid, must not be empty or contain whitespace") return name @@ -667,15 +671,20 @@ def store_static_pose( Makes sure the inverse transformation is not already stored.""" # Sanity checks - assert pose.shape == (4, 4) - assert np.issubdtype(pose.dtype, np.floating), "Poses must be of float type" - assert np.all(pose[3, :] == [0.0, 0.0, 0.0, 1.0]), "Invalid SE3 transformation" + if pose.shape != (4, 4): + raise ValueError(f"Pose must have shape (4, 4), got {pose.shape}") + if not np.issubdtype(pose.dtype, np.floating): + raise TypeError("Poses must be of float type") + if not np.all(pose[3, :] == [0.0, 0.0, 0.0, 1.0]): + raise ValueError("Invalid SE3 transformation") key = (validate_frame_name(source_frame_id), validate_frame_name(target_frame_id)) inv_key = key[::-1] - assert key not in self.data["static_poses"], f"Static pose {key} already exists" - assert inv_key not in self.data["static_poses"], f"Inverse static pose {inv_key} already exists" + if key in self.data["static_poses"]: + raise ValueError(f"Static pose {key} already exists") + if inv_key in self.data["static_poses"]: + raise ValueError(f"Inverse static pose {inv_key} already exists") self.data["static_poses"][str(key)] = {"pose": pose.tolist(), "dtype": str(pose.dtype)} @@ -699,39 +708,52 @@ def store_dynamic_pose( Makes sure the inverse transformation is not already stored.""" # Sanity / timestamp consistency checks - assert poses.shape[1:] == (4, 4) - assert np.issubdtype(poses.dtype, np.floating), "Poses must be of float type" - assert np.all(poses[:, 3, :] == [0.0, 0.0, 0.0, 1.0]), "Invalid SE3 transformations" + if poses.shape[1:] != (4, 4): + raise ValueError(f"Poses must have shape (N, 4, 4), got trailing shape {poses.shape[1:]}") + if not np.issubdtype(poses.dtype, np.floating): + raise TypeError("Poses must be of float type") + if not np.all(poses[:, 3, :] == [0.0, 0.0, 0.0, 1.0]): + raise ValueError("Invalid SE3 transformations") - assert timestamps_us.ndim == 1 - assert timestamps_us.dtype == np.dtype("uint64") + if timestamps_us.ndim != 1: + raise ValueError(f"timestamps_us must be 1-D, got ndim={timestamps_us.ndim}") + if timestamps_us.dtype != np.dtype("uint64"): + raise TypeError(f"timestamps_us must have dtype uint64, got {timestamps_us.dtype}") - assert len(poses) == len(timestamps_us) + if len(poses) != len(timestamps_us): + raise ValueError(f"poses length {len(poses)} != timestamps_us length {len(timestamps_us)}") - assert len(poses) > 1, "At least two poses required for a dynamic pose trajectory to support interpolation" + if not len(poses) > 1: + raise ValueError("At least two poses required for a dynamic pose trajectory to support interpolation") - assert np.all(np.diff(timestamps_us) > 0), "Timestamps must be strictly increasing" + if not np.all(np.diff(timestamps_us) > 0): + raise ValueError("Timestamps must be strictly increasing") - assert ( + if not ( timestamps_us[0].item() in self._sequence_timestamp_interval_us and timestamps_us[-1].item() in self._sequence_timestamp_interval_us - ), "Dynamic poses samples must be fully contained in the sequence time range" + ): + raise ValueError("Dynamic poses samples must be fully contained in the sequence time range") if require_sequence_time_coverage: - assert timestamps_us[0] == self._sequence_timestamp_interval_us.start, ( - "Dynamic poses must cover the full sequence time range - " - "the first timestamp is after the sequence start time" - ) - assert timestamps_us[-1] == self._sequence_timestamp_interval_us.stop - 1, ( - "Dynamic poses must cover the full sequence time range - " - "the last timestamp is before the sequence end time" - ) + if timestamps_us[0] != self._sequence_timestamp_interval_us.start: + raise ValueError( + "Dynamic poses must cover the full sequence time range - " + "the first timestamp is after the sequence start time" + ) + if timestamps_us[-1] != self._sequence_timestamp_interval_us.stop - 1: + raise ValueError( + "Dynamic poses must cover the full sequence time range - " + "the last timestamp is before the sequence end time" + ) key = (validate_frame_name(source_frame_id), validate_frame_name(target_frame_id)) inv_key = key[::-1] - assert key not in self.data["dynamic_poses"], f"Dynamic poses {key} already exists" - assert inv_key not in self.data["dynamic_poses"], f"Inverse dynamic poses {inv_key} already exists" + if key in self.data["dynamic_poses"]: + raise ValueError(f"Dynamic poses {key} already exists") + if inv_key in self.data["dynamic_poses"]: + raise ValueError(f"Inverse dynamic poses {inv_key} already exists") self.data["dynamic_poses"][str(key)] = { "poses": poses.tolist(), @@ -1021,24 +1043,24 @@ def _store_base_frame( generic_meta_data: Dict[str, types.JsonLike], ) -> zarr.Group: # Sanity / timestamp consistency checks - assert np.shape(frame_timestamps_us) == (2,) - assert frame_timestamps_us.dtype == np.dtype("uint64") - assert frame_timestamps_us[1] >= frame_timestamps_us[0] - - assert frame_timestamps_us[0].item() in self._sequence_timestamp_interval_us, ( - "Frame start timestamp must be contained in the sequence time range" - ) - assert frame_timestamps_us[1].item() in self._sequence_timestamp_interval_us, ( - "Frame end timestamp must be contained in the sequence time range" - ) + if np.shape(frame_timestamps_us) != (2,): + raise ValueError(f"frame_timestamps_us must have shape (2,), got {np.shape(frame_timestamps_us)}") + if frame_timestamps_us.dtype != np.dtype("uint64"): + raise TypeError(f"frame_timestamps_us must have dtype uint64, got {frame_timestamps_us.dtype}") + if not frame_timestamps_us[1] >= frame_timestamps_us[0]: + raise ValueError("Frame end timestamp must be >= frame start timestamp") + + if frame_timestamps_us[0].item() not in self._sequence_timestamp_interval_us: + raise ValueError("Frame start timestamp must be contained in the sequence time range") + if frame_timestamps_us[1].item() not in self._sequence_timestamp_interval_us: + raise ValueError("Frame end timestamp must be contained in the sequence time range") # Initialize frame group frame_group = self._get_frame_group(frame_timestamps_us) # Store timestamp data - assert frame_timestamps_us[1].item() not in self._frames_timestamps_us, ( - "Frame with the same end-of-frame timestamp already exists" - ) + if frame_timestamps_us[1].item() in self._frames_timestamps_us: + raise ValueError("Frame with the same end-of-frame timestamp already exists") self._frames_timestamps_us[frame_timestamps_us[1].item()] = frame_timestamps_us[0].item() # Store additional generic frame data and meta-data (not dimension / dtype checked) @@ -1141,7 +1163,8 @@ def _store_frame_ray_bundle( # Store per-ray data for name, (ray_data_data, chunks) in ray_data.items(): - assert len(ray_data_data) == n_rays, f"{name} doesn't have required ray count" + if len(ray_data_data) != n_rays: + raise ValueError(f"{name} doesn't have required ray count") ray_bundle_group.create_dataset( name, data=ray_data_data, @@ -1156,9 +1179,10 @@ def _store_frame_ray_bundle( # Store per-return data absent_mask = None for name, (return_data_data, chunks) in return_data.items(): - assert return_data_data.shape[:2] == (n_returns, n_rays), ( - f"{name} doesn't have required ray / return count {(n_returns, n_rays)}" - ) + if return_data_data.shape[:2] != (n_returns, n_rays): + raise ValueError( + f"{name} doesn't have required ray / return count {(n_returns, n_rays)}" + ) # Determine local absent mask from NaN values, # which needs to be consistent over all dimensions of a return @@ -1168,9 +1192,10 @@ def _store_frame_ray_bundle( d_axes = tuple(range(2, return_data_data.ndim)) all_nan = local_absent_mask.all(axis=d_axes) any_nan = local_absent_mask.any(axis=d_axes) - assert np.array_equal(all_nan, any_nan), ( - f"Partial NaN detected at positions: {np.argwhere(all_nan != any_nan)[:5].tolist()} in higher-dimensional return data {name}" - ) + if not np.array_equal(all_nan, any_nan): + raise ValueError( + f"Partial NaN detected at positions: {np.argwhere(all_nan != any_nan)[:5].tolist()} in higher-dimensional return data {name}" + ) local_absent_mask = all_nan assert local_absent_mask.shape == (n_returns, n_rays), ( @@ -1182,7 +1207,8 @@ def _store_frame_ray_bundle( absent_mask = local_absent_mask else: # validate absent mask consistency - assert np.array_equal(absent_mask, local_absent_mask), f"Inconsistent NaN masks in return data {name}" + if not np.array_equal(absent_mask, local_absent_mask): + raise ValueError(f"Inconsistent NaN masks in return data {name}") ray_bundle_returns_group.create_dataset( name, @@ -1404,12 +1430,16 @@ def store_frame( generic_meta_data: Dict[str, types.JsonLike], ) -> "Self": ## Sanity / consistency checks - assert direction.ndim == 2 - assert np.shape(direction)[1] == 3 - assert direction.dtype == np.dtype("float32") + if direction.ndim != 2: + raise ValueError(f"direction must be 2-D, got ndim={direction.ndim}") + if np.shape(direction)[1] != 3: + raise ValueError(f"direction must have 3 columns, got {np.shape(direction)[1]}") + if direction.dtype != np.dtype("float32"): + raise TypeError(f"direction must have dtype float32, got {direction.dtype}") # make sure directions are unit-norm - assert np.all(np.abs(np.sum(direction**2, axis=1) - 1.0) < 1e-4), "Direction vectors are not unit-norm" + if not np.all(np.abs(np.sum(direction**2, axis=1) - 1.0) < 1e-4): + raise ValueError("Direction vectors are not unit-norm") n_rays = len(direction) @@ -1421,17 +1451,22 @@ def store_frame( "direction": (direction, direction.shape), } - assert timestamp_us.dtype == np.dtype("uint64") - assert timestamp_us.shape == (n_rays,) + if timestamp_us.dtype != np.dtype("uint64"): + raise TypeError(f"timestamp_us must have dtype uint64, got {timestamp_us.dtype}") + if timestamp_us.shape != (n_rays,): + raise ValueError(f"timestamp_us must have shape ({n_rays},), got {timestamp_us.shape}") if n_rays: - assert (frame_timestamps_us[0] <= timestamp_us.min()) and ( + if not ((frame_timestamps_us[0] <= timestamp_us.min()) and ( timestamp_us.max() <= frame_timestamps_us[1] - ), "Point timestamps outside frame time bounds" + )): + raise ValueError("Point timestamps outside frame time bounds") ray_data["timestamp_us"] = (timestamp_us, timestamp_us.shape) if model_element is not None: - assert model_element.shape == (n_rays, 2) - assert model_element.dtype == np.dtype("uint16") + if model_element.shape != (n_rays, 2): + raise ValueError(f"model_element must have shape ({n_rays}, 2), got {model_element.shape}") + if model_element.dtype != np.dtype("uint16"): + raise TypeError(f"model_element must have dtype uint16, got {model_element.dtype}") ray_data["model_element"] = (model_element, model_element.shape) ## Per return data @@ -1439,19 +1474,26 @@ def store_frame( return_data: Dict[str, Tuple[npt.NDArray[np.float32], Tuple[int, ...]]] = {} # distance - assert distance_m.ndim == 2 + if distance_m.ndim != 2: + raise ValueError(f"distance_m must be 2-D, got ndim={distance_m.ndim}") n_returns = distance_m.shape[0] - assert distance_m.shape[1] == n_rays - assert distance_m.dtype == np.dtype("float32") + if distance_m.shape[1] != n_rays: + raise ValueError(f"distance_m must have {n_rays} columns, got {distance_m.shape[1]}") + if distance_m.dtype != np.dtype("float32"): + raise TypeError(f"distance_m must have dtype float32, got {distance_m.dtype}") distance_m_finite = distance_m[np.isfinite(distance_m)] - assert np.all(distance_m_finite >= 0.0), "Distance contains negative values" + if not np.all(distance_m_finite >= 0.0): + raise ValueError("Distance contains negative values") return_data["distance_m"] = (distance_m, (1, n_rays)) # chunk along returns # intensity - assert intensity.shape == (n_returns, n_rays) - assert intensity.dtype == np.dtype("float32") + if intensity.shape != (n_returns, n_rays): + raise ValueError(f"intensity must have shape ({n_returns}, {n_rays}), got {intensity.shape}") + if intensity.dtype != np.dtype("float32"): + raise TypeError(f"intensity must have dtype float32, got {intensity.dtype}") intensity_finite = intensity[np.isfinite(intensity)] - assert np.all(0.0 <= intensity_finite) and np.all(intensity_finite <= 1.0), "Intensity not normalized" + if not (np.all(0.0 <= intensity_finite) and np.all(intensity_finite <= 1.0)): + raise ValueError("Intensity not normalized") return_data["intensity"] = (intensity, (1, n_rays)) # chunk along returns # Store point-clouds data @@ -1509,12 +1551,16 @@ def store_frame( generic_meta_data: Dict[str, types.JsonLike], ) -> "Self": ## Sanity / consistency checks - assert direction.ndim == 2 - assert np.shape(direction)[1] == 3 - assert direction.dtype == np.dtype("float32") + if direction.ndim != 2: + raise ValueError(f"direction must be 2-D, got ndim={direction.ndim}") + if np.shape(direction)[1] != 3: + raise ValueError(f"direction must have 3 columns, got {np.shape(direction)[1]}") + if direction.dtype != np.dtype("float32"): + raise TypeError(f"direction must have dtype float32, got {direction.dtype}") # make sure directions are unit-norm - assert np.all(np.abs(np.sum(direction**2, axis=1) - 1.0) < 1e-4), "Direction vectors are not unit-norm" + if not np.all(np.abs(np.sum(direction**2, axis=1) - 1.0) < 1e-4): + raise ValueError("Direction vectors are not unit-norm") n_rays = len(direction) @@ -1526,12 +1572,15 @@ def store_frame( "direction": (direction, direction.shape), } - assert timestamp_us.dtype == np.dtype("uint64") - assert timestamp_us.shape == (n_rays,) + if timestamp_us.dtype != np.dtype("uint64"): + raise TypeError(f"timestamp_us must have dtype uint64, got {timestamp_us.dtype}") + if timestamp_us.shape != (n_rays,): + raise ValueError(f"timestamp_us must have shape ({n_rays},), got {timestamp_us.shape}") if n_rays: - assert (frame_timestamps_us[0] <= timestamp_us.min()) and ( + if not ((frame_timestamps_us[0] <= timestamp_us.min()) and ( timestamp_us.max() <= frame_timestamps_us[1] - ), "Point timestamps outside frame time bounds" + )): + raise ValueError("Point timestamps outside frame time bounds") ray_data["timestamp_us"] = (timestamp_us, timestamp_us.shape) ## Per return data @@ -1539,12 +1588,16 @@ def store_frame( return_data: Dict[str, Tuple[npt.NDArray[np.float32], Tuple[int, ...]]] = {} # distance - assert distance_m.ndim == 2 + if distance_m.ndim != 2: + raise ValueError(f"distance_m must be 2-D, got ndim={distance_m.ndim}") n_returns = distance_m.shape[0] - assert distance_m.shape[1] == n_rays - assert distance_m.dtype == np.dtype("float32") + if distance_m.shape[1] != n_rays: + raise ValueError(f"distance_m must have {n_rays} columns, got {distance_m.shape[1]}") + if distance_m.dtype != np.dtype("float32"): + raise TypeError(f"distance_m must have dtype float32, got {distance_m.dtype}") distance_m_finite = distance_m[np.isfinite(distance_m)] - assert np.all(distance_m_finite >= 0.0), "Distance contains negative values" + if not np.all(distance_m_finite >= 0.0): + raise ValueError("Distance contains negative values") return_data["distance_m"] = (distance_m, (1, n_rays)) # chunk along returns # Store point-clouds data @@ -1591,12 +1644,14 @@ def store_observations( obs_dict_list = [] for obs in cuboid_observations: # Check timestamp validity before serialization - assert obs.timestamp_us in self._sequence_timestamp_interval_us, ( - f"Cuboid track observation timestamp {obs.timestamp_us} not in the sequence time range" - ) - assert obs.reference_frame_timestamp_us in self._sequence_timestamp_interval_us, ( - f"Cuboid track observation reference frame timestamp {obs.reference_frame_timestamp_us} not in the sequence time range" - ) + if obs.timestamp_us not in self._sequence_timestamp_interval_us: + raise ValueError( + f"Cuboid track observation timestamp {obs.timestamp_us} not in the sequence time range" + ) + if obs.reference_frame_timestamp_us not in self._sequence_timestamp_interval_us: + raise ValueError( + f"Cuboid track observation reference frame timestamp {obs.reference_frame_timestamp_us} not in the sequence time range" + ) obs_dict_list.append(obs.to_dict()) self._group.create_group("cuboids").attrs.put({"cuboid_track_observations": obs_dict_list}) @@ -1704,29 +1759,35 @@ def store_pc( compressor = Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE) # -- Validate xyz -- - assert xyz.dtype == np.dtype("float32") - assert xyz.ndim == 2 and xyz.shape[1] == 3, f"xyz must be (N, 3), got {xyz.shape}" + if xyz.dtype != np.dtype("float32"): + raise ValueError(f"xyz must have dtype float32, got {xyz.dtype}") + if not (xyz.ndim == 2 and xyz.shape[1] == 3): + raise ValueError(f"xyz must be (N, 3), got {xyz.shape}") N = xyz.shape[0] # -- Validate timestamp -- - assert reference_frame_timestamp_us in self._sequence_timestamp_interval_us, ( - f"reference_frame_timestamp_us {reference_frame_timestamp_us} not in sequence time range" - ) + if reference_frame_timestamp_us not in self._sequence_timestamp_interval_us: + raise ValueError( + f"reference_frame_timestamp_us {reference_frame_timestamp_us} not in sequence time range" + ) # -- Validate attributes against schema -- - assert (provided_keys := set(attributes.keys())) == (schema_keys := set(self._attribute_schemas.keys())), ( - f"Attribute keys mismatch: expected {schema_keys}, got {provided_keys}" - ) + provided_keys = set(attributes.keys()) + schema_keys = set(self._attribute_schemas.keys()) + if provided_keys != schema_keys: + raise ValueError(f"Attribute keys mismatch: expected {schema_keys}, got {provided_keys}") for attr_name, attr_array in attributes.items(): schema = self._attribute_schemas[attr_name] expected_shape = (N,) + schema.shape_suffix - assert attr_array.shape == expected_shape, ( - f"Attribute '{attr_name}' shape mismatch: expected {expected_shape}, got {attr_array.shape}" - ) - assert np.dtype(attr_array.dtype) == schema.dtype, ( - f"Attribute '{attr_name}' dtype mismatch: expected {schema.dtype}, got {attr_array.dtype}" - ) + if attr_array.shape != expected_shape: + raise ValueError( + f"Attribute '{attr_name}' shape mismatch: expected {expected_shape}, got {attr_array.shape}" + ) + if np.dtype(attr_array.dtype) != schema.dtype: + raise ValueError( + f"Attribute '{attr_name}' dtype mismatch: expected {schema.dtype}, got {attr_array.dtype}" + ) # -- Create per-pc group -- pc_group = self._pcs_group.require_group(str(len(self._pc_timestamps))) @@ -1826,7 +1887,8 @@ def attribute_names(self) -> List[str]: # -- schema access ----------------------------------------------------- def get_attribute_schema(self, name: str) -> PointCloudsComponent.AttributeSchema: - assert name in self._attribute_schemas, f"Unknown attribute: {name}" + if name not in self._attribute_schemas: + raise ValueError(f"Unknown attribute: {name}") return self._attribute_schemas[name] # -- per-pc data access ------------------------------------------------ @@ -1921,32 +1983,37 @@ def store_label( compressor = Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE) # Sanity checks - assert timestamp_us in self._sequence_timestamp_interval_us, ( - f"timestamp_us {timestamp_us} not in sequence time range" - ) - assert timestamp_us not in self._timestamps, f"Duplicate timestamp_us: {timestamp_us}" + if timestamp_us not in self._sequence_timestamp_interval_us: + raise ValueError(f"timestamp_us {timestamp_us} not in sequence time range") + if timestamp_us in self._timestamps: + raise ValueError(f"Duplicate timestamp_us: {timestamp_us}") # Store label-associated data in a dedicated subgroup named by the timestamp label_group = self._labels_group.require_group(str(timestamp_us)) if self._descriptor.label_schema.encoding == types.LabelEncoding.RAW: - assert isinstance(data, np.ndarray), "RAW encoding requires a numpy array" + if not isinstance(data, np.ndarray): + raise TypeError("RAW encoding requires a numpy array") # Validate shape: (H, W) for scalar, (H, W, *shape_suffix) for multi-channel if self._descriptor.label_schema.shape_suffix: - assert data.ndim == 2 + len(self._descriptor.label_schema.shape_suffix), ( - f"Expected ndim={2 + len(self._descriptor.label_schema.shape_suffix)}, got {data.ndim}" - ) - assert data.shape[2:] == self._descriptor.label_schema.shape_suffix, ( - f"shape_suffix mismatch: expected {self._descriptor.label_schema.shape_suffix}, got {data.shape[2:]}" - ) + if data.ndim != 2 + len(self._descriptor.label_schema.shape_suffix): + raise ValueError( + f"Expected ndim={2 + len(self._descriptor.label_schema.shape_suffix)}, got {data.ndim}" + ) + if data.shape[2:] != self._descriptor.label_schema.shape_suffix: + raise ValueError( + f"shape_suffix mismatch: expected {self._descriptor.label_schema.shape_suffix}, got {data.shape[2:]}" + ) else: - assert data.ndim == 2, f"Scalar label must be 2-D (H, W), got ndim={data.ndim}" + if data.ndim != 2: + raise ValueError(f"Scalar label must be 2-D (H, W), got ndim={data.ndim}") # Validate dtype — caller must pass data in the expected dtype - assert np.dtype(data.dtype) == self._descriptor.label_schema.dtype, ( - f"dtype mismatch: expected {self._descriptor.label_schema.dtype}, got {data.dtype}" - ) + if np.dtype(data.dtype) != self._descriptor.label_schema.dtype: + raise TypeError( + f"dtype mismatch: expected {self._descriptor.label_schema.dtype}, got {data.dtype}" + ) # Quantize if configured stored = data @@ -1958,7 +2025,8 @@ def store_label( label_group.create_dataset("data", data=stored, chunks=stored.shape, compressor=compressor) elif self._descriptor.label_schema.encoding == types.LabelEncoding.IMAGE_ENCODED: - assert isinstance(data, bytes), "IMAGE_ENCODED encoding requires bytes" + if not isinstance(data, bytes): + raise TypeError("IMAGE_ENCODED encoding requires bytes") label_group.create_dataset( "data", @@ -2022,9 +2090,10 @@ def timestamps_us(self) -> npt.NDArray[np.uint64]: # -- per-label access -------------------------------------------------- def _label_group(self, timestamp_us: int) -> zarr.Group: - assert timestamp_us in self._timestamp_to_index, ( - f"Unknown timestamp: {timestamp_us}. Available: {list(self._timestamp_to_index.keys())[:5]}..." - ) + if timestamp_us not in self._timestamp_to_index: + raise ValueError( + f"Unknown timestamp: {timestamp_us}. Available: {list(self._timestamp_to_index.keys())[:5]}..." + ) return cast(zarr.Group, self._group["labels"][str(timestamp_us)]) class CameraLabelHandle: diff --git a/ncore/impl/data/v4/components_test.py b/ncore/impl/data/v4/components_test.py index 542a0c87..f98c5ba7 100644 --- a/ncore/impl/data/v4/components_test.py +++ b/ncore/impl/data/v4/components_test.py @@ -147,7 +147,7 @@ def test_reload(self, open_consolidated: bool): "throwaway_poses_type", group_name=None, # use default component group ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): coverate_pose_writer.store_dynamic_pose( source_frame_id="rig", target_frame_id="world", @@ -161,7 +161,7 @@ def test_reload(self, open_consolidated: bool): timestamps_us=T_rig_world_timestamps_us[: len(T_rig_worlds) - 1], require_sequence_time_coverage=False, ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): coverate_pose_writer.store_dynamic_pose( source_frame_id="rig", target_frame_id="world", @@ -531,14 +531,14 @@ def normalize_points(vectors: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: source_version="v0", ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): cuboids_writer.store_observations( cuboid_observations=[ dataclasses.replace(ref_observation, timestamp_us=ref_sequence_timestamp_interval_us.stop + 10) ] ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): cuboids_writer.store_observations( cuboid_observations=[ dataclasses.replace( @@ -1667,7 +1667,7 @@ def test_attribute_schema_json_roundtrip(self): self.assertEqual(rt.shape_suffix, ()) def test_writer_rejects_undeclared_attribute(self): - """store_pc with attr not in schema -> AssertionError.""" + """store_pc with attr not in schema -> ValueError.""" schemas = { "rgb": PointCloudsComponent.AttributeSchema( transform_type=PointCloud.AttributeTransformType.INVARIANT, @@ -1681,7 +1681,7 @@ def test_writer_rejects_undeclared_attribute(self): rgb = np.array([[128, 64, 32]], dtype=np.uint8) extra = np.array([[0.1, 0.2, 0.3]], dtype=np.float32) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): pc_writer.store_pc( xyz=xyz, reference_frame_id="world", @@ -1692,7 +1692,7 @@ def test_writer_rejects_undeclared_attribute(self): tmpdir.cleanup() def test_writer_rejects_missing_schema_attribute(self): - """store_pc missing a schema attr -> AssertionError.""" + """store_pc missing a schema attr -> ValueError.""" schemas = { "rgb": PointCloudsComponent.AttributeSchema( transform_type=PointCloud.AttributeTransformType.INVARIANT, @@ -1710,7 +1710,7 @@ def test_writer_rejects_missing_schema_attribute(self): xyz = np.array([[1.0, 2.0, 3.0]], dtype=np.float32) rgb = np.array([[128, 64, 32]], dtype=np.uint8) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): pc_writer.store_pc( xyz=xyz, reference_frame_id="world", @@ -1721,7 +1721,7 @@ def test_writer_rejects_missing_schema_attribute(self): tmpdir.cleanup() def test_writer_rejects_wrong_shape(self): - """store_pc with wrong-shaped array -> AssertionError.""" + """store_pc with wrong-shaped array -> ValueError.""" schemas = { "rgb": PointCloudsComponent.AttributeSchema( transform_type=PointCloud.AttributeTransformType.INVARIANT, @@ -1736,7 +1736,7 @@ def test_writer_rejects_wrong_shape(self): # Wrong shape: (N, 4) instead of (N, 3) rgb_wrong = np.random.default_rng().integers(0, 256, size=(N, 4), dtype=np.uint8) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): pc_writer.store_pc( xyz=xyz, reference_frame_id="world", @@ -1747,11 +1747,11 @@ def test_writer_rejects_wrong_shape(self): tmpdir.cleanup() def test_writer_rejects_reference_frame_timestamp_out_of_range(self): - """store_pc with reference_frame_timestamp_us outside sequence range -> AssertionError.""" + """store_pc with reference_frame_timestamp_us outside sequence range -> ValueError.""" pc_writer, _, tmpdir, _ = self._make_writer_reader() xyz = np.array([[1.0, 2.0, 3.0]], dtype=np.float32) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): pc_writer.store_pc( xyz=xyz, reference_frame_id="world", @@ -1761,11 +1761,11 @@ def test_writer_rejects_reference_frame_timestamp_out_of_range(self): tmpdir.cleanup() def test_writer_rejects_wrong_xyz_dtype(self): - """store_pc with float64 xyz raises AssertionError (float32 required).""" + """store_pc with float64 xyz raises ValueError (float32 required).""" pc_writer, _, tmpdir, _ = self._make_writer_reader() xyz_f64 = np.array([[1.0, 2.0, 3.0]], dtype=np.float64) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): pc_writer.store_pc( xyz=xyz_f64, # type: ignore reference_frame_id="world", @@ -2163,9 +2163,9 @@ def test_quantized_float32_intermediate(self) -> None: def test_quantization_params_rejects_float_dtype(self) -> None: """QuantizationParams must reject non-integer quantized_dtype.""" - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): QuantizationParams(quantized_dtype=np.dtype("float32"), scale=1.0, offset=0.0) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): QuantizationParams(quantized_dtype=np.dtype("float64"), scale=1.0, offset=0.0) # ------------------------------------------------------------------ @@ -2174,11 +2174,11 @@ def test_quantization_params_rejects_float_dtype(self) -> None: def test_quantization_params_rejects_non_float_intermediate(self) -> None: """QuantizationParams must reject non-floating intermediate_dtype.""" - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): QuantizationParams( quantized_dtype=np.dtype("uint16"), scale=1.0, offset=0.0, intermediate_dtype=np.dtype("int32") ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): QuantizationParams( quantized_dtype=np.dtype("uint16"), scale=1.0, offset=0.0, intermediate_dtype=np.dtype("uint8") ) @@ -2349,7 +2349,7 @@ def test_forward_compat_unknown_category(self) -> None: # 8. test_reject_empty_camera_id # ------------------------------------------------------------------ def test_reject_empty_camera_id(self) -> None: - """Passing an empty camera_id should raise AssertionError.""" + """Passing an empty camera_id should raise ValueError.""" tmpdir = tempfile.TemporaryDirectory() timestamp_interval = HalfClosedInterval(0, 10_000_001) @@ -2363,7 +2363,7 @@ def test_reject_empty_camera_id(self) -> None: generic_meta_data={}, ) - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): store_writer.register_component_writer( CameraLabelsComponent.Writer, "depth.z@front", diff --git a/ncore/impl/data_converter/base.py b/ncore/impl/data_converter/base.py index 7a692bf6..f63f1eb8 100644 --- a/ncore/impl/data_converter/base.py +++ b/ncore/impl/data_converter/base.py @@ -94,9 +94,10 @@ def _get_active_sensor_ids( return all_sensor_ids # Make sure active sensors are a subset of all sensors - assert set(active_sensor_ids).issubset(all_sensor_ids), ( - f"Selected active {sensor_type} sensors {active_sensor_ids} not a subset of all available sensors {all_sensor_ids}" - ) + if not set(active_sensor_ids).issubset(all_sensor_ids): + raise ValueError( + f"Selected active {sensor_type} sensors {active_sensor_ids} not a subset of all available sensors {all_sensor_ids}" + ) return active_sensor_ids diff --git a/ncore/impl/sensors/camera.py b/ncore/impl/sensors/camera.py index 1cfd43c1..4e8744e3 100644 --- a/ncore/impl/sensors/camera.py +++ b/ncore/impl/sensors/camera.py @@ -261,10 +261,16 @@ def __init__( ), ) - assert self.resolution.shape == (2,) - assert self.resolution.dtype == torch.int32 - assert self.shutter_type in types.ShutterType, f"Unsupported shutter type {self.shutter_type}" - assert self.external_distortion is None or isinstance(self.external_distortion, ExternalDistortionModel) + if self.resolution.shape != (2,): + raise ValueError(f"resolution must have shape (2,), got {self.resolution.shape}") + if self.resolution.dtype != torch.int32: + raise TypeError(f"resolution must have dtype torch.int32, got {self.resolution.dtype}") + if self.shutter_type not in types.ShutterType: + raise ValueError(f"Unsupported shutter type {self.shutter_type}") + if self.external_distortion is not None and not isinstance(self.external_distortion, ExternalDistortionModel): + raise TypeError( + f"external_distortion must be None or an ExternalDistortionModel, got {type(self.external_distortion)}" + ) @abstractmethod def _image_points_to_camera_rays_impl(self, image_points: torch.Tensor) -> torch.Tensor: @@ -291,7 +297,8 @@ def image_points_to_camera_rays(self, image_points: Union[torch.Tensor, np.ndarr image_points = to_torch(image_points, device=self.device) # Make sure users don't accidentally pass pixel coordinates (integer indices) - assert image_points.is_floating_point(), "[CameraModel]: image_points must be floating point values" + if not image_points.is_floating_point(): + raise TypeError("[CameraModel]: image_points must be floating point values") image_points = image_points.to(self.dtype) @@ -444,11 +451,14 @@ def world_points_to_pixels_shutter_pose( """Projects world points to corresponding pixel indices using *rolling-shutter compensation* of sensor motion""" if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) tmp = self.world_points_to_image_points_shutter_pose( world_points, @@ -485,7 +495,8 @@ def world_points_to_pixels_static_pose( """Projects world points to corresponding pixel indices using a *fixed* sensor pose (not compensating for potential sensor-motion).""" if return_timestamps: - assert timestamp_us is not None + if timestamp_us is None: + raise ValueError("[CameraModel]: timestamp_us must not be None when return_timestamps=True") tmp = self.world_points_to_image_points_static_pose( world_points, @@ -521,11 +532,14 @@ def world_points_to_pixels_mean_pose( """ if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) timestamp_us = (end_timestamp_us + start_timestamp_us) // 2 @@ -568,22 +582,34 @@ def world_points_to_image_points_shutter_pose( T_world_sensor_start = to_torch(T_world_sensor_start, device=self.device, dtype=self.dtype) T_world_sensor_end = to_torch(T_world_sensor_end, device=self.device, dtype=self.dtype) - assert T_world_sensor_start.shape == (4, 4) - assert T_world_sensor_end.shape == (4, 4) - assert len(world_points.shape) == 2 - assert world_points.shape[1] == 3 - assert world_points.dtype == self.dtype - assert T_world_sensor_start.dtype == self.dtype - assert T_world_sensor_end.dtype == self.dtype - assert isinstance(max_iterations, int) - assert max_iterations > 0 + if T_world_sensor_start.shape != (4, 4): + raise ValueError(f"T_world_sensor_start must have shape (4, 4), got {T_world_sensor_start.shape}") + if T_world_sensor_end.shape != (4, 4): + raise ValueError(f"T_world_sensor_end must have shape (4, 4), got {T_world_sensor_end.shape}") + if len(world_points.shape) != 2: + raise ValueError(f"world_points must be 2-dimensional, got {len(world_points.shape)} dimensions") + if world_points.shape[1] != 3: + raise ValueError(f"world_points must have 3 columns, got {world_points.shape[1]}") + if world_points.dtype != self.dtype: + raise TypeError(f"world_points must have dtype {self.dtype}, got {world_points.dtype}") + if T_world_sensor_start.dtype != self.dtype: + raise TypeError(f"T_world_sensor_start must have dtype {self.dtype}, got {T_world_sensor_start.dtype}") + if T_world_sensor_end.dtype != self.dtype: + raise TypeError(f"T_world_sensor_end must have dtype {self.dtype}, got {T_world_sensor_end.dtype}") + if not isinstance(max_iterations, int): + raise TypeError(f"max_iterations must be an int, got {type(max_iterations)}") + if max_iterations <= 0: + raise ValueError(f"max_iterations must be positive, got {max_iterations}") if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) # Make sure timestamps have correct type (might be, e.g., np.uint64, which torch doesn't like) start_timestamp_us = int(start_timestamp_us) @@ -756,14 +782,20 @@ def world_points_to_image_points_static_pose( world_points = to_torch(world_points, device=self.device, dtype=self.dtype) T_world_sensor = to_torch(T_world_sensor, device=self.device, dtype=self.dtype) - assert T_world_sensor.shape == (4, 4) - assert len(world_points.shape) == 2 - assert world_points.shape[1] == 3 - assert world_points.dtype == self.dtype - assert T_world_sensor.dtype == self.dtype + if T_world_sensor.shape != (4, 4): + raise ValueError(f"T_world_sensor must have shape (4, 4), got {T_world_sensor.shape}") + if len(world_points.shape) != 2: + raise ValueError(f"world_points must be 2-dimensional, got {len(world_points.shape)} dimensions") + if world_points.shape[1] != 3: + raise ValueError(f"world_points must have 3 columns, got {world_points.shape[1]}") + if world_points.dtype != self.dtype: + raise TypeError(f"world_points must have dtype {self.dtype}, got {world_points.dtype}") + if T_world_sensor.dtype != self.dtype: + raise TypeError(f"T_world_sensor must have dtype {self.dtype}, got {T_world_sensor.dtype}") if return_timestamps: - assert timestamp_us is not None + if timestamp_us is None: + raise ValueError("[CameraModel]: timestamp_us must not be None when return_timestamps=True") R_world_sensor = T_world_sensor[:3, :3] # [3, 3] t_world_sensor = T_world_sensor[:3, 3] # [3, 1] @@ -814,11 +846,14 @@ def world_points_to_image_points_mean_pose( """ if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) timestamp_us = (end_timestamp_us + start_timestamp_us) // 2 @@ -891,11 +926,14 @@ def pixels_to_world_rays_mean_pose( return_timestamps: bool = False, ) -> CameraModel.WorldRaysReturn: if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) return self.image_points_to_world_rays_mean_pose( self.pixels_to_image_points(pixel_idxs), @@ -927,11 +965,16 @@ def image_points_to_world_rays_static_pose( image_points = to_torch(image_points, device=self.device, dtype=self.dtype) T_sensor_world = to_torch(T_sensor_world, device=self.device, dtype=self.dtype) - assert T_sensor_world.shape == (4, 4) - assert len(image_points.shape) == 2 - assert image_points.shape[1] == 2 - assert image_points.dtype == self.dtype - assert T_sensor_world.dtype == self.dtype + if T_sensor_world.shape != (4, 4): + raise ValueError(f"T_sensor_world must have shape (4, 4), got {T_sensor_world.shape}") + if len(image_points.shape) != 2: + raise ValueError(f"image_points must be 2-dimensional, got {len(image_points.shape)} dimensions") + if image_points.shape[1] != 2: + raise ValueError(f"image_points must have 2 columns, got {image_points.shape[1]}") + if image_points.dtype != self.dtype: + raise TypeError(f"image_points must have dtype {self.dtype}, got {image_points.dtype}") + if T_sensor_world.dtype != self.dtype: + raise TypeError(f"T_sensor_world must have dtype {self.dtype}, got {T_sensor_world.dtype}") # Unproject the image points to camera rays if camera_rays is not None: @@ -960,7 +1003,8 @@ def image_points_to_world_rays_static_pose( return_var.T_sensor_worlds = T_sensor_world.unsqueeze(0).expand(len(world_rays), -1, -1).contiguous() if return_timestamps: - assert timestamp_us is not None + if timestamp_us is None: + raise ValueError("[CameraModel]: timestamp_us must not be None when return_timestamps=True") # Repeat constant timestamp for all rays return_var.timestamps_us = torch.tile( torch.tensor(timestamp_us, device=self.device), dims=(len(world_rays),) @@ -987,11 +1031,14 @@ def image_points_to_world_rays_mean_pose( For each image point returns 3d world rays [point, direction], represented by 3d start of ray points and 3d ray directions in the world frame """ if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) timestamp_us = (end_timestamp_us + start_timestamp_us) // 2 @@ -1044,13 +1091,20 @@ def image_points_to_world_rays_shutter_pose( T_sensor_world_start = to_torch(T_sensor_world_start, device=self.device, dtype=self.dtype) T_sensor_world_end = to_torch(T_sensor_world_end, device=self.device, dtype=self.dtype) - assert T_sensor_world_start.shape == (4, 4) - assert T_sensor_world_end.shape == (4, 4) - assert len(image_points.shape) == 2 - assert image_points.shape[1] == 2 - assert image_points.dtype == self.dtype - assert T_sensor_world_start.dtype == self.dtype - assert T_sensor_world_end.dtype == self.dtype + if T_sensor_world_start.shape != (4, 4): + raise ValueError(f"T_sensor_world_start must have shape (4, 4), got {T_sensor_world_start.shape}") + if T_sensor_world_end.shape != (4, 4): + raise ValueError(f"T_sensor_world_end must have shape (4, 4), got {T_sensor_world_end.shape}") + if len(image_points.shape) != 2: + raise ValueError(f"image_points must be 2-dimensional, got {len(image_points.shape)} dimensions") + if image_points.shape[1] != 2: + raise ValueError(f"image_points must have 2 columns, got {image_points.shape[1]}") + if image_points.dtype != self.dtype: + raise TypeError(f"image_points must have dtype {self.dtype}, got {image_points.dtype}") + if T_sensor_world_start.dtype != self.dtype: + raise TypeError(f"T_sensor_world_start must have dtype {self.dtype}, got {T_sensor_world_start.dtype}") + if T_sensor_world_end.dtype != self.dtype: + raise TypeError(f"T_sensor_world_end must have dtype {self.dtype}, got {T_sensor_world_end.dtype}") # Unproject the image points to camera rays if camera_rays is not None: @@ -1100,11 +1154,14 @@ def image_points_to_world_rays_shutter_pose( return_var.T_sensor_worlds[:, 3, 3] = 1 if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[CameraModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("[CameraModel]: start_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us is None: + raise ValueError("[CameraModel]: end_timestamp_us must not be None when return_timestamps=True") + if end_timestamp_us < start_timestamp_us: + raise ValueError( + "[CameraModel]: End timestamp must be larger or equal to the start timestamp" + ) return_var.timestamps_us = ( start_timestamp_us + (t[..., None] * (end_timestamp_us - start_timestamp_us)).to(torch.int64) ).squeeze(-1) # [n_image_points] @@ -1117,7 +1174,8 @@ def pixels_to_image_points(self, pixel_idxs: Union[torch.Tensor, np.ndarray]) -> # Convert to torch pixel_idxs = to_torch(pixel_idxs, device=self.device) - assert not pixel_idxs.is_floating_point(), "[CameraModel]: Pixel indices must be integers" + if pixel_idxs.is_floating_point(): + raise TypeError("[CameraModel]: Pixel indices must be integers") # Compute the image point coordinates representing the center of each pixel (shift from top left corner to the center) return pixel_idxs.to(self.dtype) + 0.5 @@ -1128,7 +1186,8 @@ def image_points_to_pixels(self, image_points: Union[torch.Tensor, np.ndarray]) # Convert to torch image_points = to_torch(image_points, device=self.device) - assert image_points.is_floating_point(), "[CameraModel]: Image points must be floating point values" + if not image_points.is_floating_point(): + raise TypeError("[CameraModel]: Image points must be floating point values") # Compute the pixel indices for given image points (round to top left corner integer coordinate) return torch.floor(image_points).to(torch.int32) @@ -1170,8 +1229,10 @@ def __interpolate_poses(self, pose_s: torch.Tensor, pose_e: torch.Tensor, t: flo pose_s = pose_s.to(self.device) pose_e = pose_e.to(self.device) - assert pose_s.shape == (4, 4) - assert pose_e.shape == (4, 4) + if pose_s.shape != (4, 4): + raise ValueError(f"pose_s must have shape (4, 4), got {pose_s.shape}") + if pose_e.shape != (4, 4): + raise ValueError(f"pose_e must have shape (4, 4), got {pose_e.shape}") # Convert the start and end rotation matrix to quaternions pose_s_quat = rotmat_to_unitquat(pose_s[None, :3, :3]) # [1, 4] @@ -1308,23 +1369,38 @@ def __init__( self.newton_iterations: int = newton_iterations # 2D pixel-distance threshold - assert min_2d_norm > 0, "require positive minimum norm threshold" + if min_2d_norm <= 0: + raise ValueError(f"require positive minimum norm threshold, got {min_2d_norm}") self.register_buffer("min_2d_norm", torch.tensor(min_2d_norm, dtype=self.dtype, device=self.device)) - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == self.dtype - assert self.fw_poly.shape == (6,) - assert self.fw_poly.dtype == self.dtype - assert self.dfw_poly.shape == (5,) - assert self.dfw_poly.dtype == self.dtype - assert self.bw_poly.shape == (6,) - assert self.bw_poly.dtype == self.dtype - assert self.dbw_poly.shape == (5,) - assert self.dbw_poly.dtype == self.dtype - assert self.A.shape == (2, 2) - assert self.A.dtype == self.dtype - assert self.Ainv.shape == (2, 2) - assert self.Ainv.dtype == self.dtype + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != self.dtype: + raise TypeError(f"principal_point must have dtype {self.dtype}, got {self.principal_point.dtype}") + if self.fw_poly.shape != (6,): + raise ValueError(f"fw_poly must have shape (6,), got {self.fw_poly.shape}") + if self.fw_poly.dtype != self.dtype: + raise TypeError(f"fw_poly must have dtype {self.dtype}, got {self.fw_poly.dtype}") + if self.dfw_poly.shape != (5,): + raise ValueError(f"dfw_poly must have shape (5,), got {self.dfw_poly.shape}") + if self.dfw_poly.dtype != self.dtype: + raise TypeError(f"dfw_poly must have dtype {self.dtype}, got {self.dfw_poly.dtype}") + if self.bw_poly.shape != (6,): + raise ValueError(f"bw_poly must have shape (6,), got {self.bw_poly.shape}") + if self.bw_poly.dtype != self.dtype: + raise TypeError(f"bw_poly must have dtype {self.dtype}, got {self.bw_poly.dtype}") + if self.dbw_poly.shape != (5,): + raise ValueError(f"dbw_poly must have shape (5,), got {self.dbw_poly.shape}") + if self.dbw_poly.dtype != self.dtype: + raise TypeError(f"dbw_poly must have dtype {self.dtype}, got {self.dbw_poly.dtype}") + if self.A.shape != (2, 2): + raise ValueError(f"A must have shape (2, 2), got {self.A.shape}") + if self.A.dtype != self.dtype: + raise TypeError(f"A must have dtype {self.dtype}, got {self.A.dtype}") + if self.Ainv.shape != (2, 2): + raise ValueError(f"Ainv must have shape (2, 2), got {self.Ainv.shape}") + if self.Ainv.dtype != self.dtype: + raise TypeError(f"Ainv must have dtype {self.dtype}, got {self.Ainv.dtype}") def get_parameters(self) -> types.FThetaCameraModelParameters: """Returns the camera model parameters specific to the current camera model instance""" @@ -1350,7 +1426,8 @@ def _image_points_to_camera_rays_impl(self, image_points: torch.Tensor) -> torch """ image_points = to_torch(image_points, device=self.device) - assert image_points.is_floating_point(), "[CameraModel]: image_points must be floating point values" + if not image_points.is_floating_point(): + raise TypeError("[CameraModel]: image_points must be floating point values") image_points = image_points.to(self.dtype) # Get f(theta)-weighted normalized 2d vectors (undoing linear term) @@ -1491,16 +1568,26 @@ def __init__( to_torch(camera_model_parameters.thin_prism_coeffs, device=self.device, dtype=self.dtype), ) - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == self.dtype - assert self.focal_length.shape == (2,) - assert self.focal_length.dtype == self.dtype - assert self.radial_coeffs.shape == (6,) - assert self.radial_coeffs.dtype == self.dtype - assert self.tangential_coeffs.shape == (2,) - assert self.tangential_coeffs.dtype == self.dtype - assert self.thin_prism_coeffs.shape == (4,) - assert self.thin_prism_coeffs.dtype == self.dtype + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != self.dtype: + raise TypeError(f"principal_point must have dtype {self.dtype}, got {self.principal_point.dtype}") + if self.focal_length.shape != (2,): + raise ValueError(f"focal_length must have shape (2,), got {self.focal_length.shape}") + if self.focal_length.dtype != self.dtype: + raise TypeError(f"focal_length must have dtype {self.dtype}, got {self.focal_length.dtype}") + if self.radial_coeffs.shape != (6,): + raise ValueError(f"radial_coeffs must have shape (6,), got {self.radial_coeffs.shape}") + if self.radial_coeffs.dtype != self.dtype: + raise TypeError(f"radial_coeffs must have dtype {self.dtype}, got {self.radial_coeffs.dtype}") + if self.tangential_coeffs.shape != (2,): + raise ValueError(f"tangential_coeffs must have shape (2,), got {self.tangential_coeffs.shape}") + if self.tangential_coeffs.dtype != self.dtype: + raise TypeError(f"tangential_coeffs must have dtype {self.dtype}, got {self.tangential_coeffs.dtype}") + if self.thin_prism_coeffs.shape != (4,): + raise ValueError(f"thin_prism_coeffs must have shape (4,), got {self.thin_prism_coeffs.shape}") + if self.thin_prism_coeffs.dtype != self.dtype: + raise TypeError(f"thin_prism_coeffs must have dtype {self.dtype}, got {self.thin_prism_coeffs.dtype}") def get_parameters(self) -> types.OpenCVPinholeCameraModelParameters: """Returns the camera model parameters specific to the current camera model instance""" @@ -1528,7 +1615,8 @@ def _image_points_to_camera_rays_impl(self, image_points: torch.Tensor) -> torch image_points, device=self.device, ) - assert image_points.is_floating_point(), "[CameraModel]: image_points must be floating point values" + if not image_points.is_floating_point(): + raise TypeError("[CameraModel]: image_points must be floating point values") image_points = image_points.to(self.dtype) camera_rays2 = self.__iterative_undistort(image_points) @@ -1765,13 +1853,18 @@ def __init__( self.newton_iterations: int = newton_iterations # 2D pixel-distance threshold - assert min_2d_norm > 0, "require positive minimum norm threshold" - self.register_buffer("min_2d_norm", torch.tensor(min_2d_norm, device=self.device, dtype=self.dtype)) + if min_2d_norm <= 0: + raise ValueError(f"require positive minimum norm threshold, got {min_2d_norm}") + self.register_buffer("min_2d_norm", torch.tensor(min_2d_norm, dtype=self.dtype, device=self.device)) - assert self.principal_point.shape == (2,) - assert self.principal_point.dtype == self.dtype - assert self.focal_length.shape == (2,) - assert self.focal_length.dtype == self.dtype + if self.principal_point.shape != (2,): + raise ValueError(f"principal_point must have shape (2,), got {self.principal_point.shape}") + if self.principal_point.dtype != self.dtype: + raise TypeError(f"principal_point must have dtype {self.dtype}, got {self.principal_point.dtype}") + if self.focal_length.shape != (2,): + raise ValueError(f"focal_length must have shape (2,), got {self.focal_length.shape}") + if self.focal_length.dtype != self.dtype: + raise TypeError(f"focal_length must have dtype {self.dtype}, got {self.focal_length.dtype}") k1, k2, k3, k4 = camera_model_parameters.radial_coeffs[:] # ninth-degree forward polynomial (mapping angles to normalized distances) theta + k1*theta^3 + k2*theta^5 + k3*theta^7 + k4*theta^9 @@ -1820,7 +1913,8 @@ def _image_points_to_camera_rays_impl(self, image_points: torch.Tensor) -> torch image_points, device=self.device, ) - assert image_points.is_floating_point(), "[CameraModel]: image_points must be floating point values" + if not image_points.is_floating_point(): + raise TypeError("[CameraModel]: image_points must be floating point values") image_points = image_points.to(self.dtype) normalized_image_points = (image_points - self.principal_point) / self.focal_length diff --git a/ncore/impl/sensors/camera_test.py b/ncore/impl/sensors/camera_test.py index e6210c39..7c30a7bf 100644 --- a/ncore/impl/sensors/camera_test.py +++ b/ncore/impl/sensors/camera_test.py @@ -673,11 +673,11 @@ def test_inputs_and_input_types(self): ray = np.array([0, 1, 0]).reshape(1, 3) # Test invalid inputs - self.assertRaises(AssertionError, ftheta_cam.image_points_to_camera_rays, pixel.astype(np.int32)) - self.assertRaises(AssertionError, ftheta_cam.pixels_to_camera_rays, pixel.astype(np.float32)) + self.assertRaises(TypeError, ftheta_cam.image_points_to_camera_rays, pixel.astype(np.int32)) + self.assertRaises(TypeError, ftheta_cam.pixels_to_camera_rays, pixel.astype(np.float32)) self.assertRaises( - AssertionError, + ValueError, ftheta_cam.world_points_to_image_points_shutter_pose, ray, np.eye(4), @@ -685,7 +685,7 @@ def test_inputs_and_input_types(self): **{"return_timestamps": True}, ) self.assertRaises( - AssertionError, + ValueError, ftheta_cam.world_points_to_image_points_shutter_pose, ray, np.eye(4), diff --git a/ncore/impl/sensors/common.py b/ncore/impl/sensors/common.py index f577bff5..b20951a9 100644 --- a/ncore/impl/sensors/common.py +++ b/ncore/impl/sensors/common.py @@ -90,21 +90,18 @@ def to_torch( var = cast(np.ndarray, var) if var.dtype == np.uint16: - assert np.all(var <= np.iinfo(np.int16).max), ( - "[CameraModel]: Trying to cast uint16 to int16 but the value exceeds max range." - ) + if not np.all(var <= np.iinfo(np.int16).max): + raise ValueError("[CameraModel]: Trying to cast uint16 to int16 but the value exceeds max range.") var = var.astype(np.int16) if var.dtype == np.uint32: - assert np.all(var <= np.iinfo(np.int32).max), ( - "[CameraModel]: Trying to cast uint32 to int32 but the value exceeds max range." - ) + if not np.all(var <= np.iinfo(np.int32).max): + raise ValueError("[CameraModel]: Trying to cast uint32 to int32 but the value exceeds max range.") var = var.astype(np.int32) if var.dtype == np.uint64: - assert np.all(var <= np.iinfo(np.int64).max), ( - "[CameraModel]: Trying to cast uint64 to int64 but the value exceeds max range." - ) + if not np.all(var <= np.iinfo(np.int64).max): + raise ValueError("[CameraModel]: Trying to cast uint64 to int64 but the value exceeds max range.") var = var.astype(np.int64) var = torch.from_numpy(var) @@ -138,7 +135,8 @@ def eval_poly_inverse_horner_newton( x = eval_poly_horner( inverse_poly_approximation_coefficients, y ) # approximation / starting points - also returned for zero iterations - assert newton_iterations >= 0, "Newton-iteration number needs to be non-negative" + if newton_iterations < 0: + raise ValueError("Newton-iteration number needs to be non-negative") # Buffers of intermediate results to allow differentiation # (only allocate entries as needed during iteration) @@ -165,7 +163,8 @@ def rotmat_to_unitquat(R: torch.Tensor) -> torch.Tensor: """ num_rotations, D1, D2 = R.shape - assert (D1, D2) == (3, 3), "Input has to be a Bx3x3 tensor." + if (D1, D2) != (3, 3): + raise ValueError("Input has to be a Bx3x3 tensor.") decision_matrix = torch.empty((num_rotations, 4), dtype=R.dtype, device=R.device) quat = torch.empty((num_rotations, 4), dtype=R.dtype, device=R.device) @@ -246,13 +245,15 @@ def unitquat_slerp(quat_s: torch.Tensor, quat_e: torch.Tensor, t: torch.Tensor, batch of interpolated quaternions [bs, 4] """ - assert quat_s.shape == quat_e.shape, "Input quaternions must be of the same shape." + if quat_s.shape != quat_e.shape: + raise ValueError("Input quaternions must be of the same shape.") if len(quat_s.shape) == 1: quat_s = torch.unsqueeze(quat_s, 0) quat_e = torch.unsqueeze(quat_e, 0) - assert t.ndim == 1 and t.shape[0] == quat_e.shape[0], "t is expected to have shape [bs]." + if t.ndim != 1 or t.shape[0] != quat_e.shape[0]: + raise ValueError("t is expected to have shape [bs].") # omega is the 'angle' between both quaternions cos_omega = torch.sum(quat_s * quat_e, dim=-1) diff --git a/ncore/impl/sensors/lidar.py b/ncore/impl/sensors/lidar.py index c736eae7..93e4c438 100644 --- a/ncore/impl/sensors/lidar.py +++ b/ncore/impl/sensors/lidar.py @@ -151,8 +151,10 @@ def elements_to_sensor_points( """Computes 3d sensor points for elements in the structured lidar model. Elements are given as (row, column) indices.""" # elements: N x 2 array of (row, column) indices - assert elements.ndim == 2 - assert element_distances.ndim == 1 + if elements.ndim != 2: + raise ValueError(f"elements must be 2-D, got ndim={elements.ndim}") + if element_distances.ndim != 1: + raise ValueError(f"element_distances must be 1-D, got ndim={element_distances.ndim}") elements = to_torch(elements, device=self.device, dtype=torch.long) element_distances = to_torch(element_distances, device=self.device, dtype=self.dtype) @@ -255,7 +257,8 @@ def elements_to_sensor_angles(self, elements: Union[torch.Tensor, np.ndarray]) - """Retrieves the elevation and azimuth angles for elements in the structured lidar model. Elements are given as (row, column) indices.""" # elements: N x 2 array of (row, column) indices - assert elements.ndim == 2 + if elements.ndim != 2: + raise ValueError(f"elements must be 2-D, got ndim={elements.ndim}") elements = to_torch(elements, device=self.device, dtype=torch.long) @@ -273,7 +276,8 @@ def sensor_rays_to_sensor_angles( """Computes the elevation and azimuth angles for normalized 3d sensor rays.""" # sensor_rays: N x 3 array of sensor rays - assert sensor_rays.ndim == 2 + if sensor_rays.ndim != 2: + raise ValueError(f"sensor_rays must be 2-D, got ndim={sensor_rays.ndim}") sensor_rays = to_torch(sensor_rays, device=self.device, dtype=self.dtype) @@ -296,7 +300,8 @@ def sensor_angles_to_sensor_rays( """Computes the sensor rays for elevation/azimuth angles.""" # sensor_angles: N x 2 array of elevation and azimuth angles - assert len(sensor_angles.shape) == 2 + if len(sensor_angles.shape) != 2: + raise ValueError(f"sensor_angles must be 2-D, got ndim={len(sensor_angles.shape)}") sensor_angles = to_torch(sensor_angles, device=self.device, dtype=self.dtype) @@ -414,7 +419,8 @@ def sensor_angles_relative_frame_times(self, sensor_angles: Union[torch.Tensor, relative_azimuths_rad = relative_sensor_angles[:, 1] # relative azimuth angles in radians # Check that all angles are in the fov (account for accumulated numerical errors via some epsilon) - assert torch.all(self._valid_relative_sensor_angles(relative_sensor_angles)) + if not torch.all(self._valid_relative_sensor_angles(relative_sensor_angles)): + raise ValueError("Not all sensor angles are within the FOV of the sensor") # Determine the location of the angle in the map (nearest neighbor lookup) horizontal_nn_dist = relative_azimuths_rad / self.map_resolution_horiz_rad + 0.5 # = (azimuth + res/2) / res @@ -445,11 +451,12 @@ def world_points_to_sensor_angles_shutter_pose( """Projects world points to corresponding sensor angle coordinates using *rolling-shutter compensation* of sensor motion""" if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[LidarModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("start_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us is None: + raise ValueError("end_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us < start_timestamp_us: + raise ValueError("[LidarModel]: End timestamp must be larger or equal to the start timestamp") # Make sure timestamps have correct type (might be, e.g., np.uint64, which torch doesn't like) start_timestamp_us = int(start_timestamp_us) @@ -460,15 +467,24 @@ def world_points_to_sensor_angles_shutter_pose( T_world_sensor_start = to_torch(T_world_sensor_start, device=self.device, dtype=self.dtype) T_world_sensor_end = to_torch(T_world_sensor_end, device=self.device, dtype=self.dtype) - assert T_world_sensor_start.shape == (4, 4) - assert T_world_sensor_end.shape == (4, 4) - assert len(world_points.shape) == 2 - assert world_points.shape[1] == 3 - assert world_points.dtype == self.dtype - assert T_world_sensor_start.dtype == self.dtype - assert T_world_sensor_end.dtype == self.dtype - assert isinstance(max_iterations, int) - assert max_iterations > 0 + if T_world_sensor_start.shape != (4, 4): + raise ValueError(f"T_world_sensor_start must have shape (4, 4), got {T_world_sensor_start.shape}") + if T_world_sensor_end.shape != (4, 4): + raise ValueError(f"T_world_sensor_end must have shape (4, 4), got {T_world_sensor_end.shape}") + if len(world_points.shape) != 2: + raise ValueError(f"world_points must be 2-D, got ndim={len(world_points.shape)}") + if world_points.shape[1] != 3: + raise ValueError(f"world_points must have 3 columns, got {world_points.shape[1]}") + if world_points.dtype != self.dtype: + raise TypeError(f"world_points.dtype must be {self.dtype}, got {world_points.dtype}") + if T_world_sensor_start.dtype != self.dtype: + raise TypeError(f"T_world_sensor_start.dtype must be {self.dtype}, got {T_world_sensor_start.dtype}") + if T_world_sensor_end.dtype != self.dtype: + raise TypeError(f"T_world_sensor_end.dtype must be {self.dtype}, got {T_world_sensor_end.dtype}") + if not isinstance(max_iterations, int): + raise TypeError(f"max_iterations must be an int, got {type(max_iterations)}") + if max_iterations <= 0: + raise ValueError(f"max_iterations must be > 0, got {max_iterations}") # Do initial transformations using both start and end pose to determine all candidate points and take union of valid projections as iteration starting points sensor_angles_start = self.sensor_rays_to_sensor_angles( @@ -587,8 +603,10 @@ def world_points_to_sensor_angles_shutter_pose( if return_timestamps: # MYPY is stupid and can't see that we are doing the same above already - assert start_timestamp_us is not None - assert end_timestamp_us is not None + if start_timestamp_us is None: + raise ValueError("start_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us is None: + raise ValueError("end_timestamp_us must not be None when return_timestamps is True") return_var.timestamps_us = ( start_timestamp_us + (relative_time[sensor_angles.valid_flag, None] * (end_timestamp_us - start_timestamp_us)).to( @@ -622,20 +640,28 @@ def elements_to_world_rays_shutter_pose( T_sensor_world_start = to_torch(T_sensor_world_start, device=self.device, dtype=self.dtype) T_sensor_world_end = to_torch(T_sensor_world_end, device=self.device, dtype=self.dtype) - assert T_sensor_world_start.shape == (4, 4) - assert T_sensor_world_end.shape == (4, 4) - assert len(elements.shape) == 2 - assert elements.shape[1] == 2 - assert elements.dtype == torch.long - assert T_sensor_world_start.dtype == self.dtype - assert T_sensor_world_end.dtype == self.dtype + if T_sensor_world_start.shape != (4, 4): + raise ValueError(f"T_sensor_world_start must have shape (4, 4), got {T_sensor_world_start.shape}") + if T_sensor_world_end.shape != (4, 4): + raise ValueError(f"T_sensor_world_end must have shape (4, 4), got {T_sensor_world_end.shape}") + if len(elements.shape) != 2: + raise ValueError(f"elements must be 2-D, got ndim={len(elements.shape)}") + if elements.shape[1] != 2: + raise ValueError(f"elements must have 2 columns, got {elements.shape[1]}") + if elements.dtype != torch.long: + raise TypeError(f"elements.dtype must be torch.long, got {elements.dtype}") + if T_sensor_world_start.dtype != self.dtype: + raise TypeError(f"T_sensor_world_start.dtype must be {self.dtype}, got {T_sensor_world_start.dtype}") + if T_sensor_world_end.dtype != self.dtype: + raise TypeError(f"T_sensor_world_end.dtype must be {self.dtype}, got {T_sensor_world_end.dtype}") if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None - assert end_timestamp_us >= start_timestamp_us, ( - "[LidarModel]: End timestamp must be larger or equal to the start timestamp" - ) + if start_timestamp_us is None: + raise ValueError("start_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us is None: + raise ValueError("end_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us < start_timestamp_us: + raise ValueError("[LidarModel]: End timestamp must be larger or equal to the start timestamp") # Make sure timestamps have correct type (might be, e.g., np.uint64, which torch doesn't like) start_timestamp_us = int(start_timestamp_us) @@ -649,10 +675,14 @@ def elements_to_world_rays_shutter_pose( if sensor_rays is not None: # Reuse provided sensor rays sensor_rays = to_torch(sensor_rays, device=self.device, dtype=self.dtype) - assert len(sensor_rays.shape) == 2 - assert len(sensor_rays) == len(elements) - assert sensor_rays.shape[1] == 3 - assert sensor_rays.dtype == self.dtype + if len(sensor_rays.shape) != 2: + raise ValueError(f"sensor_rays must be 2-D, got ndim={len(sensor_rays.shape)}") + if len(sensor_rays) != len(elements): + raise ValueError(f"sensor_rays length ({len(sensor_rays)}) must match elements length ({len(elements)})") + if sensor_rays.shape[1] != 3: + raise ValueError(f"sensor_rays must have 3 columns, got {sensor_rays.shape[1]}") + if sensor_rays.dtype != self.dtype: + raise TypeError(f"sensor_rays.dtype must be {self.dtype}, got {sensor_rays.dtype}") else: sensor_rays = self.elements_to_sensor_rays(elements) @@ -689,8 +719,10 @@ def elements_to_world_rays_shutter_pose( return_var.T_sensor_worlds[:, 3, 3] = 1 if return_timestamps: - assert start_timestamp_us is not None - assert end_timestamp_us is not None + if start_timestamp_us is None: + raise ValueError("start_timestamp_us must not be None when return_timestamps is True") + if end_timestamp_us is None: + raise ValueError("end_timestamp_us must not be None when return_timestamps is True") return_var.timestamps_us = ( start_timestamp_us + (t[..., None] * (end_timestamp_us - start_timestamp_us)).to(torch.int64) ).squeeze(-1) # [n_elements] diff --git a/tools/data_converter/colmap/converter.py b/tools/data_converter/colmap/converter.py index 18560983..0a5d41cd 100644 --- a/tools/data_converter/colmap/converter.py +++ b/tools/data_converter/colmap/converter.py @@ -104,7 +104,8 @@ def T_camera_refs(self) -> np.ndarray: def camera_model(self) -> Union[OpenCVPinholeCameraModelParameters, OpenCVFisheyeCameraModelParameters]: camera = self.colmap_camera - assert camera.camera_type in [0, 1, 2, 3, 4, 5], f"Unsupported camera type: {camera.camera_type}" + if camera.camera_type not in [0, 1, 2, 3, 4, 5]: + raise ValueError(f"Unsupported camera type: {camera.camera_type}") width = round(camera.width / self.downsample_factor) height = round(camera.height / self.downsample_factor) diff --git a/tools/data_converter/pai/converter.py b/tools/data_converter/pai/converter.py index d5cfc0f6..f51b75f8 100644 --- a/tools/data_converter/pai/converter.py +++ b/tools/data_converter/pai/converter.py @@ -223,7 +223,8 @@ def _convert_clip(self, clip_id: str) -> None: T_rig_worlds = T_rig_worlds[selected_range, :, :] T_rig_world_timestamps_us = T_rig_world_timestamps_us[selected_range] - assert len(T_rig_worlds) >= 2, "at least two poses required in selected time range" + if len(T_rig_worlds) < 2: + raise ValueError("at least two poses required in selected time range") # Select local world base pose (first pose defines global frame) T_world_world_global = T_rig_worlds[0].copy() @@ -456,10 +457,8 @@ def load_metadata(self) -> None: # Platform details self.platform_class = self.provider.get_platform_class() - assert self.platform_class in [ - "hyperion_8", - "hyperion_8.1", - ] + if self.platform_class not in ["hyperion_8", "hyperion_8.1"]: + raise ValueError(f"Unsupported platform class: {self.platform_class}") def decode_lidars(self, active_lidar_id): logger = self.logger.getChild("decode_lidar") diff --git a/tools/data_converter/waymo/converter.py b/tools/data_converter/waymo/converter.py index 6033ee14..3859d946 100644 --- a/tools/data_converter/waymo/converter.py +++ b/tools/data_converter/waymo/converter.py @@ -529,7 +529,8 @@ class RawFrameLabel3: ) # Collect all lidar per-frame data - assert len(frames) > 1 # require at least two frames to compute frame bound timestamps + if len(frames) <= 1: + raise ValueError("Require at least two frames to compute frame bound timestamps") for i, frame in tqdm.tqdm(enumerate(frames), desc=f"Process {lidar_ncore_id}", total=len(frames)): # Get frame timestamps frame_start_timestamp_us = raw_frame_start_timestamps_us[i] diff --git a/tools/ncore_project_pc_to_img.py b/tools/ncore_project_pc_to_img.py index 717dbea8..0b8691e3 100644 --- a/tools/ncore_project_pc_to_img.py +++ b/tools/ncore_project_pc_to_img.py @@ -128,7 +128,8 @@ def _plot_points_on_image( def se3_matrix(se3_delta: np.ndarray) -> np.ndarray: """Create the corresponding 4x4 matrix for se3_delta parameters""" - assert len(se3_delta) == 6 + if len(se3_delta) != 6: + raise ValueError(f"se3_delta must have exactly 6 elements, got {len(se3_delta)}") T = np.eye(4) T[:3, :3] = R.from_rotvec(se3_delta[3:]).as_matrix() T[:3, 3] = se3_delta[:3] @@ -334,7 +335,8 @@ def run(params: CLIBaseParams, loader: SequenceLoaderProtocol) -> None: motion_compensator = MotionCompensator(ray_sensor.pose_graph) if params.enable_lidar_model and isinstance(ray_sensor, LidarSensorProtocol): lidar_model = StructuredLidarModel.maybe_from_parameters(ray_sensor.model_parameters, device=params.device) - assert lidar_model is not None, f"No structured lidar model available for sensor {source_id}" + if lidar_model is None: + raise ValueError(f"No structured lidar model available for sensor {source_id}") msg += " | with structured lidar model" for frame_index in tqdm.tqdm(indices): @@ -457,7 +459,8 @@ def run(params: CLIBaseParams, loader: SequenceLoaderProtocol) -> None: save_path: Optional[Path] = None if params.encode_images: - assert len(params.output_dir) + if not len(params.output_dir): + raise ValueError("output_dir must not be empty when encode_images is enabled") output_path = Path(params.output_dir) output_path.mkdir(parents=True, exist_ok=True)