diff --git a/docs/data/formats.rst b/docs/data/formats.rst index 380f0b60..5122d621 100644 --- a/docs/data/formats.rst +++ b/docs/data/formats.rst @@ -89,6 +89,38 @@ Each component group has the following root-level structure: │ └── {component_specific_data}... +Component-Level Generic Data +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Components may optionally include a ``generic_data/`` subgroup containing +named numpy arrays. This allows converters to attach auxiliary data to any +component without defining a custom component type. For example, a converter +might attach raw GPS/IMU measurements to the poses component: + +.. code-block:: text + + {component_type}/ + └── {component_instance_name}/ + ├── {component_meta_data} + │ └── generic_meta_data: {...} (includes keys added via set_generic_data) + │ + ├── {component_specific_data}... + │ + └── generic_data/ (optional, only if generic data was set) + ├── {dataset_name} [shape] dtype (lz4-compressed zarr dataset) + └── ... + +Writers use :meth:`~ncore.impl.data.v4.components.ComponentWriter.set_generic_data` +to provide generic data arrays and optional additional metadata before finalization. +Metadata keys passed via ``set_generic_data(meta_data={...})`` **replace** keys with +the same name from the initial ``register_component_writer(generic_meta_data={...})`` +call. + +Readers access generic data via +:meth:`~ncore.impl.data.v4.components.ComponentReader.has_generic_data`, +:meth:`~ncore.impl.data.v4.components.ComponentReader.get_generic_data_names`, and +:meth:`~ncore.impl.data.v4.components.ComponentReader.get_generic_data`. + Poses Component ~~~~~~~~~~~~~~~ diff --git a/ncore/impl/data/v4/components.py b/ncore/impl/data/v4/components.py index 6ae409f2..97d6c66b 100644 --- a/ncore/impl/data/v4/components.py +++ b/ncore/impl/data/v4/components.py @@ -228,7 +228,7 @@ def get_base_group(self, component_group_name: Optional[str]) -> zarr.Group: # To be called after all data was written def finalize(self) -> List[UPath]: - """Validates all writers and closes all stores after consolidating their meta data. + """Finalize all writers, store their generic data and meta data, and close all stores after consolidating their meta data. Returns a list of the store paths """ @@ -236,6 +236,25 @@ def finalize(self) -> List[UPath]: for component_writer in self._component_writers.values(): component_writer.finalize() + # Write deferred component metadata and generic data of all writers + for component_writer in self._component_writers.values(): + # Merge generic_meta_data with regular component's meta data + (cw_group := component_writer._group).attrs.put( + {**component_writer._component_meta_data, "generic_meta_data": component_writer._generic_meta_data} + ) + + # Write generic data arrays if any + if component_writer._generic_data: + compressor = Blosc(cname="lz4", clevel=5, shuffle=Blosc.BITSHUFFLE) + gd_group = cw_group.require_group("generic_data") + for name, array in component_writer._generic_data.items(): + gd_group.create_dataset( + name, + data=array, + chunks=array.shape, + compressor=compressor, + ) + # Make sure the stores are consolidated and closed ret = [] for root_group, store_path in self._stores_rootgroups.values(): @@ -284,23 +303,23 @@ class that is passed in.""" self.get_base_group(group_name).require_group(component_base_name).require_group(component_instance_name) ) - # Prepare meta-data - meta_data = { - "component_name": component_base_name, - "component_instance_name": component_instance_name, - "component_version": writer_cls.get_component_version(), - "generic_meta_data": generic_meta_data, - } - - # Store meta-data - component_group.attrs.put(meta_data) - + # Initialize writer instance self._component_writers[component_id] = ( component_writer_instance := component_writer_type( component_group, self._sequence_timestamp_interval_us, *args, **kwargs ) ) + # Prepare component meta-data and initial meta data + component_meta_data: Dict[str, types.JsonLike] = { + "component_name": component_base_name, + "component_instance_name": component_instance_name, + "component_version": writer_cls.get_component_version(), + } + + component_writer_instance._component_meta_data = component_meta_data + component_writer_instance._generic_meta_data = generic_meta_data + return component_writer_instance @@ -565,11 +584,46 @@ def get_component_version() -> str: def __init__(self, component_group: zarr.Group, sequence_timestamp_interval_us: HalfClosedInterval) -> None: """Initializes a component writer targeting the given component group and sequence time interval""" + self._group = component_group self._sequence_timestamp_interval_us = sequence_timestamp_interval_us + # Initialized by the SequenceComponentGroupsWriter.register_component_writer() + self._component_meta_data: Dict[str, types.JsonLike] = {} + self._generic_data: Dict[str, np.ndarray] = {} + self._generic_meta_data: Dict[str, types.JsonLike] = {} + + def set_generic_data( + self, + data: Dict[str, np.ndarray], + meta_data: Optional[Dict[str, types.JsonLike]] = None, + ) -> None: + """Attach named generic data arrays (and optional metadata) to this component. + + Parameters + ---------- + data : Dict[str, np.ndarray] + Named numpy arrays to store under the component's ``generic_data/`` group (overwrites + existing arrays with the same name, if any). + meta_data : Optional[Dict[str, types.JsonLike]] + Keys provided here **replace** any init-time / previous ``generic_meta_data`` + keys set during component registration or earlier method calls. + """ + + # Merge generic data: new data overwrites existing data with the same name + self._generic_data.update(data) + + if meta_data is not None: + # Merge generic_meta_data: init-time / set_generic_data meta overwrites + self._generic_meta_data.update(meta_data) + def finalize(self) -> None: - """Overwrite to perform final operations after all user-data was written""" + """Overwrite to perform final operations after all user-data was written. + + Called by :meth:`SequenceComponentGroupsWriter.finalize` before + component metadata is persisted. Override to flush buffered data + (e.g., create zarr datasets/groups). + """ pass @@ -598,6 +652,10 @@ def __init__(self, component_instance_name: str, component_group: zarr.Group) -> self._instance_name = component_instance_name self._group = component_group + # Preload component meta-data and generic data group (if existing) + self._component_meta_data: Dict = dict(self._group.attrs) + self._generic_data_group: Optional[zarr.Group] = self._group.get("generic_data") + @property def instance_name(self) -> str: """The user-defined name that distinguishes this component instance from others of the same type.""" @@ -606,12 +664,31 @@ def instance_name(self) -> str: @property def component_version(self) -> str: """Returns the component version of the loaded component""" - return self._group.attrs["component_version"] + return self._component_meta_data["component_version"] @property def generic_meta_data(self) -> Dict[str, types.JsonLike]: """Returns the generic meta data of the loaded component""" - return self._group.attrs["generic_meta_data"] + return self._component_meta_data["generic_meta_data"] + + def has_generic_data(self, name: str) -> bool: + """Returns True if a named generic data array exists on this component""" + return name in self._generic_data_group if self._generic_data_group is not None else False + + def get_generic_data_names(self) -> List[str]: + """Returns the list of all generic data array names on this component""" + return list(self._generic_data_group.keys()) if self._generic_data_group is not None else [] + + def get_generic_data(self, name: str) -> np.ndarray: + """Returns a named generic data array from this component. + + Raises KeyError if the name does not exist. + """ + if self._generic_data_group is None: + raise KeyError("Component has no generic_data") + if (generic_data := self._generic_data_group.get(name)) is not None: + return np.array(generic_data) + raise KeyError(f"Generic data '{name}' not found. Available: {self.get_generic_data_names()}") CW = TypeVar("CW", bound=ComponentWriter) diff --git a/ncore/impl/data/v4/components_test.py b/ncore/impl/data/v4/components_test.py index 0c30776a..0becae4d 100644 --- a/ncore/impl/data/v4/components_test.py +++ b/ncore/impl/data/v4/components_test.py @@ -1079,6 +1079,220 @@ def normalize_points(vectors: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: self.assertEqual(list(cuboid_reader.get_observations()), ref_cuboid_observations) + # ------------------------------------------------------------------ + # Component-level generic_data tests + # ------------------------------------------------------------------ + + def test_component_generic_data_roundtrip(self) -> None: + """Write generic data arrays + additional metadata via set_generic_data(), then read them back and verify.""" + + tempdir = tempfile.TemporaryDirectory() + + timestamp_interval = HalfClosedInterval(0, 10_000_001) + + store_writer = SequenceComponentGroupsWriter( + output_dir_path=UPath(tempdir.name), + store_base_name=(seq_id := "generic-data-test"), + sequence_id=seq_id, + sequence_timestamp_interval_us=timestamp_interval, + store_type=self.store_type, + generic_meta_data={}, + ) + + init_meta: Dict[str, JsonLike] = {"description": "poses with generic data"} + poses_writer = store_writer.register_component_writer( + PosesComponent.Writer, + "test_poses", + generic_meta_data=init_meta, + ) + + # Store a minimal static pose so the component is non-empty + poses_writer.store_static_pose( + source_frame_id="sensor", + target_frame_id="rig", + pose=np.eye(4, dtype=np.float32), + ) + + # Prepare generic data arrays + rng = np.random.default_rng(42) + ref_weights = rng.random((10,), dtype=np.float32) + ref_offsets = rng.integers(0, 100, size=(5, 3), dtype=np.int32) + + ref_generic_meta: Dict[str, JsonLike] = {"source": "test", "version": 2} + + poses_writer.set_generic_data( + data={"weights": ref_weights, "offsets": ref_offsets}, + meta_data=ref_generic_meta, + ) + + # Finalize and read back + store_paths = store_writer.finalize() + store_reader = SequenceComponentGroupsReader(component_group_paths=store_paths) + poses_readers = store_reader.open_component_readers(PosesComponent.Reader) + poses_reader = poses_readers["test_poses"] + + # Verify generic data arrays + self.assertTrue(poses_reader.has_generic_data("weights")) + self.assertTrue(poses_reader.has_generic_data("offsets")) + self.assertFalse(poses_reader.has_generic_data("nonexistent")) + + self.assertSetEqual(set(poses_reader.get_generic_data_names()), {"weights", "offsets"}) + + np.testing.assert_array_almost_equal(poses_reader.get_generic_data("weights"), ref_weights) + np.testing.assert_array_equal(poses_reader.get_generic_data("offsets"), ref_offsets) + + # Verify merged metadata (init_meta + ref_generic_meta) + expected_meta = {**init_meta, **ref_generic_meta} + self.assertEqual(poses_reader.generic_meta_data, expected_meta) + + tempdir.cleanup() + + def test_component_generic_data_backwards_compat(self) -> None: + """Write without calling set_generic_data() (old behavior), verify readers handle missing generic_data/ gracefully.""" + + tempdir = tempfile.TemporaryDirectory() + + timestamp_interval = HalfClosedInterval(0, 10_000_001) + + store_writer = SequenceComponentGroupsWriter( + output_dir_path=UPath(tempdir.name), + store_base_name=(seq_id := "generic-data-compat-test"), + sequence_id=seq_id, + sequence_timestamp_interval_us=timestamp_interval, + store_type=self.store_type, + generic_meta_data={}, + ) + + init_meta: Dict[str, JsonLike] = {"old_key": "old_value"} + poses_writer = store_writer.register_component_writer( + PosesComponent.Writer, + "test_poses", + generic_meta_data=init_meta, + ) + + # Store a minimal static pose, but do NOT call set_generic_data + poses_writer.store_static_pose( + source_frame_id="sensor", + target_frame_id="rig", + pose=np.eye(4, dtype=np.float32), + ) + + # Finalize and read back + store_paths = store_writer.finalize() + store_reader = SequenceComponentGroupsReader(component_group_paths=store_paths) + poses_readers = store_reader.open_component_readers(PosesComponent.Reader) + poses_reader = poses_readers["test_poses"] + + # Readers should handle missing generic_data gracefully + self.assertFalse(poses_reader.has_generic_data("anything")) + self.assertEqual(poses_reader.get_generic_data_names(), []) + + # generic_meta_data should still contain only the init-time metadata + self.assertEqual(poses_reader.generic_meta_data, init_meta) + + tempdir.cleanup() + + def test_component_generic_data_meta_overwrite(self) -> None: + """Verify that meta_data passed to set_generic_data() overwrites init-time generic_meta_data keys.""" + + tempdir = tempfile.TemporaryDirectory() + + timestamp_interval = HalfClosedInterval(0, 10_000_001) + + store_writer = SequenceComponentGroupsWriter( + output_dir_path=UPath(tempdir.name), + store_base_name=(seq_id := "generic-data-overwrite-test"), + sequence_id=seq_id, + sequence_timestamp_interval_us=timestamp_interval, + store_type=self.store_type, + generic_meta_data={}, + ) + + # Init-time metadata has a key "version" that we will overwrite + init_meta: Dict[str, JsonLike] = {"version": 1, "author": "original"} + poses_writer = store_writer.register_component_writer( + PosesComponent.Writer, + "test_poses", + generic_meta_data=init_meta, + ) + + poses_writer.store_static_pose( + source_frame_id="sensor", + target_frame_id="rig", + pose=np.eye(4, dtype=np.float32), + ) + + # Overwrite "version" and add a new key + overwrite_meta: Dict[str, JsonLike] = {"version": 99, "extra": "new_value"} + poses_writer.set_generic_data( + data={"dummy": np.array([1.0, 2.0, 3.0], dtype=np.float32)}, + meta_data=overwrite_meta, + ) + + # Finalize and read back + store_paths = store_writer.finalize() + store_reader = SequenceComponentGroupsReader(component_group_paths=store_paths) + poses_readers = store_reader.open_component_readers(PosesComponent.Reader) + poses_reader = poses_readers["test_poses"] + + # "version" should be overwritten to 99, "author" preserved, "extra" added + expected_meta: Dict[str, JsonLike] = {"version": 99, "author": "original", "extra": "new_value"} + self.assertEqual(poses_reader.generic_meta_data, expected_meta) + + tempdir.cleanup() + + def test_component_generic_data_meta_only(self) -> None: + """Verify set_generic_data() can be used with empty data dict (meta-only addition).""" + + tempdir = tempfile.TemporaryDirectory() + + timestamp_interval = HalfClosedInterval(0, 10_000_001) + + store_writer = SequenceComponentGroupsWriter( + output_dir_path=UPath(tempdir.name), + store_base_name=(seq_id := "generic-data-meta-only-test"), + sequence_id=seq_id, + sequence_timestamp_interval_us=timestamp_interval, + store_type=self.store_type, + generic_meta_data={}, + ) + + init_meta: Dict[str, JsonLike] = {"base": "info"} + poses_writer = store_writer.register_component_writer( + PosesComponent.Writer, + "test_poses", + generic_meta_data=init_meta, + ) + + poses_writer.store_static_pose( + source_frame_id="sensor", + target_frame_id="rig", + pose=np.eye(4, dtype=np.float32), + ) + + # Call set_generic_data with empty data dict but meta_data provided + meta_only: Dict[str, JsonLike] = {"added_key": "added_value"} + poses_writer.set_generic_data( + data={}, + meta_data=meta_only, + ) + + # Finalize and read back + store_paths = store_writer.finalize() + store_reader = SequenceComponentGroupsReader(component_group_paths=store_paths) + poses_readers = store_reader.open_component_readers(PosesComponent.Reader) + poses_reader = poses_readers["test_poses"] + + # No generic data arrays should be present + self.assertEqual(poses_reader.get_generic_data_names(), []) + self.assertFalse(poses_reader.has_generic_data("anything")) + + # Metadata should be merged: init_meta + meta_only + expected_meta: Dict[str, JsonLike] = {"base": "info", "added_key": "added_value"} + self.assertEqual(poses_reader.generic_meta_data, expected_meta) + + tempdir.cleanup() + @parameterized_class( ("store_type"),