From c6e91dc1857ac9878e0081703588656aeb0f981e Mon Sep 17 00:00:00 2001 From: ShaohongZ Date: Tue, 18 Nov 2025 16:49:06 +0000 Subject: [PATCH] update VideoRTSPStream to enable reuse of same stream for multiple callbacks --- src/om1_vlm/video/video_rtsp_stream.py | 90 ++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 7 deletions(-) diff --git a/src/om1_vlm/video/video_rtsp_stream.py b/src/om1_vlm/video/video_rtsp_stream.py index 43b11f8..3ab897f 100644 --- a/src/om1_vlm/video/video_rtsp_stream.py +++ b/src/om1_vlm/video/video_rtsp_stream.py @@ -40,6 +40,35 @@ class VideoRTSPStream: JPEG quality for encoding frames, by default 70 """ + # Per-URL singleton registry + _instances: dict[str, "VideoRTSPStream"] = {} + _instances_lock = threading.Lock() + + def __new__( + cls, + rtsp_url: str = "rtsp://localhost:8554/live", + decode_format: str = "H264", + *args, + **kwargs, + ): + with cls._instances_lock: + existing = cls._instances.get(rtsp_url) + if existing is not None: + # already have a stream for this URL → reuse and bump refcount + existing._refcount += 1 + logger.info( + f"Reusing existing VideoRTSPStream for {rtsp_url}, " + f"refcount={existing._refcount}" + ) + return existing + + # first time this rtsp_url is seen → create new instance + instance = super().__new__(cls) + cls._instances[rtsp_url] = instance + instance._refcount = 1 + logger.info(f"Created new VideoRTSPStream for {rtsp_url}, refcount=1") + return instance + def __init__( self, rtsp_url: str = "rtsp://localhost:8554/live", @@ -50,6 +79,36 @@ def __init__( resolution: Optional[Tuple[int, int]] = (480, 640), jpeg_quality: int = 70, ): + # Prevent reinitialising on subsequent "constructions" for same URL + if getattr(self, "_initialized", False): + # register any new callbacks on the existing instance + try: + if frame_callback is not None: + self.register_frame_callback(frame_callback) + logger.info( + f"VideoRTSPStream for {self.rtsp_url}: registered extra frame callback on reused stream" + ) + logger.info( + f"Current number of callbacks: {len(self.frame_callbacks)}" + ) + except Exception as e: + logger.error( + f"VideoRTSPStream for {self.rtsp_url}: failed to register extra callback(s) on reused stream: {e}" + ) + + # mismatch logging logic (decode_format only) + if decode_format != self.decode_format: + logger.info( + f"VideoRTSPStream for {self.rtsp_url} already initialized " + f"with decode_format={self.decode_format}, fps={self.fps}, " + f"resolution={self.resolution}, jpeg_quality={self.jpeg_quality}, " + f"ignoring new request with decode_format={decode_format}, " + f"fps={fps}, resolution={resolution}, jpeg_quality={jpeg_quality}" + ) + return + + self._initialized = True + # RTSP stream parameters self.rtsp_url = rtsp_url self.decode_format = decode_format @@ -235,14 +294,31 @@ def stop(self): """ Stop video capture and clean up resources. - Stops the video processing loop and waits for the - processing thread to finish. + Decrement reference count and only actually stop the stream + when the last user releases it. """ - self.running = False + with self._instances_lock: + self._refcount -= 1 + logger.info( + f"VideoRTSPStream.stop called for {self.rtsp_url}, " + f"new refcount={self._refcount}" + ) + + if self._refcount > 0: + # Still in use by other providers — do NOT tear it down. + return - self._release_capture() + # Last user: really stop and remove from registry + self.running = False + self._release_capture() - if self._video_thread and self._video_thread.is_alive(): - self._video_thread.join(timeout=1.0) + if self._video_thread and self._video_thread.is_alive(): + self._video_thread.join(timeout=1.0) - logger.info("Stopped video processing thread") + logger.info("Stopped video processing thread") + + # Remove from class-level registry + try: + del self._instances[self.rtsp_url] + except KeyError: + pass