Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 83 additions & 7 deletions src/om1_vlm/video/video_rtsp_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,35 @@ class VideoRTSPStream:
JPEG quality for encoding frames, by default 70
"""

# Per-URL singleton registry
_instances: dict[str, "VideoRTSPStream"] = {}
_instances_lock = threading.Lock()

def __new__(
cls,
rtsp_url: str = "rtsp://localhost:8554/live",
decode_format: str = "H264",
*args,
**kwargs,
):
with cls._instances_lock:
existing = cls._instances.get(rtsp_url)
if existing is not None:
# already have a stream for this URL → reuse and bump refcount
existing._refcount += 1
logger.info(
f"Reusing existing VideoRTSPStream for {rtsp_url}, "
f"refcount={existing._refcount}"
)
return existing

# first time this rtsp_url is seen → create new instance
instance = super().__new__(cls)
cls._instances[rtsp_url] = instance
instance._refcount = 1
logger.info(f"Created new VideoRTSPStream for {rtsp_url}, refcount=1")
return instance

def __init__(
self,
rtsp_url: str = "rtsp://localhost:8554/live",
Expand All @@ -50,6 +79,36 @@ def __init__(
resolution: Optional[Tuple[int, int]] = (480, 640),
jpeg_quality: int = 70,
):
# Prevent reinitialising on subsequent "constructions" for same URL
if getattr(self, "_initialized", False):
# register any new callbacks on the existing instance
try:
if frame_callback is not None:
self.register_frame_callback(frame_callback)
logger.info(
f"VideoRTSPStream for {self.rtsp_url}: registered extra frame callback on reused stream"
)
logger.info(
f"Current number of callbacks: {len(self.frame_callbacks)}"
)
except Exception as e:
logger.error(
f"VideoRTSPStream for {self.rtsp_url}: failed to register extra callback(s) on reused stream: {e}"
)

# mismatch logging logic (decode_format only)
if decode_format != self.decode_format:
logger.info(
f"VideoRTSPStream for {self.rtsp_url} already initialized "
f"with decode_format={self.decode_format}, fps={self.fps}, "
f"resolution={self.resolution}, jpeg_quality={self.jpeg_quality}, "
f"ignoring new request with decode_format={decode_format}, "
f"fps={fps}, resolution={resolution}, jpeg_quality={jpeg_quality}"
)
return

self._initialized = True

# RTSP stream parameters
self.rtsp_url = rtsp_url
self.decode_format = decode_format
Expand Down Expand Up @@ -235,14 +294,31 @@ def stop(self):
"""
Stop video capture and clean up resources.

Stops the video processing loop and waits for the
processing thread to finish.
Decrement reference count and only actually stop the stream
when the last user releases it.
"""
self.running = False
with self._instances_lock:
self._refcount -= 1
logger.info(
f"VideoRTSPStream.stop called for {self.rtsp_url}, "
f"new refcount={self._refcount}"
)

if self._refcount > 0:
# Still in use by other providers — do NOT tear it down.
return

self._release_capture()
# Last user: really stop and remove from registry
self.running = False
self._release_capture()

if self._video_thread and self._video_thread.is_alive():
self._video_thread.join(timeout=1.0)
if self._video_thread and self._video_thread.is_alive():
self._video_thread.join(timeout=1.0)

logger.info("Stopped video processing thread")
logger.info("Stopped video processing thread")

# Remove from class-level registry
try:
del self._instances[self.rtsp_url]
except KeyError:
pass
Loading