[feat]: decouple Dreamverse fMP4 streaming from generation#1425
[feat]: decouple Dreamverse fMP4 streaming from generation#1425Davids048 wants to merge 2 commits into
Conversation
Merge ProtectionsYour pull request matches the following merge protections and will not be merged until they are valid. 🔴 PR merge requirementsWaiting for
This rule is failing.
|
There was a problem hiding this comment.
Code Review
This pull request introduces asynchronous fMP4 segment streaming by running the stream encoding in a background thread on the GPU worker and adding an independent media_relay_loop task in the session controller. This decouples segment generation from media streaming. Additionally, a new MediaError event is introduced to handle streaming failures. The reviewer provided valuable feedback, pointing out a potential concurrent modification race condition on step_result.timings, the risk of indefinite blocking when joining the stream thread without a timeout, and the opportunity to terminate the session on MediaError to save GPU resources.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _publish(event: StreamEvent) -> None: | ||
| response_queue.put(_stream_event_to_worker_event(event, user_id, segment_idx)) | ||
|
|
||
| def _run_stream() -> None: | ||
| try: | ||
| av_ok, av_error = stream_fmp4( | ||
| frames=step_result.frames, | ||
| audio=step_result.audio, | ||
| audio_sample_rate=step_result.audio_sample_rate, | ||
| stream_id=stream_id, | ||
| timings=step_result.timings, | ||
| head_trim_frames=head_trim_frames, | ||
| head_trim_audio_frames=head_trim_audio_frames, | ||
| shared_buffer=None, | ||
| shared_buffer_bytes=0, | ||
| publish=_publish, | ||
| log_prefix=f"[GPU {gpu_id}]", | ||
| ) | ||
| if not av_ok: | ||
| response_queue.put( | ||
| MediaError( | ||
| user_id=user_id, | ||
| segment_idx=segment_idx, | ||
| stream_id=stream_id, | ||
| message=av_error or "worker av_fmp4 stream failed", | ||
| )) | ||
| return | ||
| print(f"[GPU {gpu_id}] AV streamed segment {segment_idx}: " | ||
| f"encode_total={step_result.timings.get('av_encode_stream_ms', 0):.0f}ms " | ||
| f"wav_write={step_result.timings.get('av_wav_write_ms', 0):.1f}ms " | ||
| f"spawn={step_result.timings.get('av_ffmpeg_spawn_ms', 0):.1f}ms " | ||
| f"first_chunk={step_result.timings.get('av_first_chunk_ms', 0):.0f}ms " | ||
| f"chunk_interval_med={step_result.timings.get('av_chunk_interval_ms_median', 0):.1f}ms " | ||
| f"chunk_interval_p95={step_result.timings.get('av_chunk_interval_ms_p95', 0):.1f}ms " | ||
| f"publish_med={step_result.timings.get('av_chunk_publish_ms_median', 0):.2f}ms " | ||
| f"read_med={step_result.timings.get('av_chunk_read_ms_median', 0):.1f}ms") |
There was a problem hiding this comment.
There is a concurrent modification and serialization race condition on step_result.timings. The main thread modifies step_result.timings["ipc_put_start_ns"] and serializes it via response_queue.put(StepComplete(...)) immediately after starting the stream thread. Concurrently, the background active_stream_thread executes stream_fmp4 which writes several av_* metrics directly to the same step_result.timings dictionary. This can lead to a RuntimeError: dictionary changed size during iteration during pickling, or corrupted/incomplete timing data. Copying the timings dictionary before passing it to the background thread resolves this race.
stream_timings = dict(step_result.timings)
def _publish(event: StreamEvent) -> None:
response_queue.put(_stream_event_to_worker_event(event, user_id, segment_idx))
def _run_stream() -> None:
try:
av_ok, av_error = stream_fmp4(
frames=step_result.frames,
audio=step_result.audio,
audio_sample_rate=step_result.audio_sample_rate,
stream_id=stream_id,
timings=stream_timings,
head_trim_frames=head_trim_frames,
head_trim_audio_frames=head_trim_audio_frames,
shared_buffer=None,
shared_buffer_bytes=0,
publish=_publish,
log_prefix=f"[GPU {gpu_id}]",
)
if not av_ok:
response_queue.put(
MediaError(
user_id=user_id,
segment_idx=segment_idx,
stream_id=stream_id,
message=av_error or "worker av_fmp4 stream failed",
))
return
print(f"[GPU {gpu_id}] AV streamed segment {segment_idx}: "
f"encode_total={stream_timings.get('av_encode_stream_ms', 0):.0f}ms "
f"wav_write={stream_timings.get('av_wav_write_ms', 0):.1f}ms "
f"spawn={stream_timings.get('av_ffmpeg_spawn_ms', 0):.1f}ms "
f"first_chunk={stream_timings.get('av_first_chunk_ms', 0):.0f}ms "
f"chunk_interval_med={stream_timings.get('av_chunk_interval_ms_median', 0):.1f}ms "
f"chunk_interval_p95={stream_timings.get('av_chunk_interval_ms_p95', 0):.1f}ms "
f"publish_med={stream_timings.get('av_chunk_publish_ms_median', 0):.2f}ms "
f"read_med={stream_timings.get('av_chunk_read_ms_median', 0):.1f}ms")| def wait_for_active_stream() -> None: | ||
| """Join the active fMP4 segment stream before leave, shutdown, or the next stream.""" | ||
| nonlocal active_stream_thread | ||
| nonlocal active_stream_segment_idx | ||
| if active_stream_thread is None: | ||
| return | ||
| if active_stream_thread.is_alive(): | ||
| print(f"[GPU {gpu_id}] Waiting for AV stream " | ||
| f"segment {active_stream_segment_idx} before continuing") | ||
| active_stream_thread.join() | ||
| active_stream_thread = None | ||
| active_stream_segment_idx = None |
There was a problem hiding this comment.
Calling active_stream_thread.join() without a timeout can block the GPU worker event loop indefinitely if the background stream thread hangs (e.g., due to an ffmpeg pipe deadlock or process hang). If this happens during a USER_LEAVE command, the event loop will remain permanently blocked, rendering the GPU slot completely unusable for subsequent sessions. Adding a timeout to the join operation prevents permanent hangs.
def wait_for_active_stream(timeout: float = 30.0) -> None:
"""Join the active fMP4 segment stream before leave, shutdown, or the next stream."""
nonlocal active_stream_thread
nonlocal active_stream_segment_idx
if active_stream_thread is None:
return
if active_stream_thread.is_alive():
print(f"[GPU {gpu_id}] Waiting for AV stream "
f"segment {active_stream_segment_idx} before continuing")
active_stream_thread.join(timeout=timeout)
if active_stream_thread.is_alive():
print(f"[GPU {gpu_id}] WARNING: AV stream thread for segment "
f"{active_stream_segment_idx} did not finish within {timeout}s")
else:
active_stream_thread = None
active_stream_segment_idx = None| case MediaError(stream_id=stream_id, message=message): | ||
| await ws_send_json({ | ||
| "type": "error", | ||
| "message": f"AV streaming failed for segment {event.segment_idx}: {message}", | ||
| "segment_idx": event.segment_idx, | ||
| "stream_id": stream_id, | ||
| }) |
There was a problem hiding this comment.
When a MediaError is received in media_relay_loop, the error is sent to the client, but the main generation loop continues to run and generate subsequent segments. Since the media pipeline is broken, continuing generation wastes valuable GPU resources. Setting stop_event.set() on MediaError ensures the session is cleanly terminated.
| case MediaError(stream_id=stream_id, message=message): | |
| await ws_send_json({ | |
| "type": "error", | |
| "message": f"AV streaming failed for segment {event.segment_idx}: {message}", | |
| "segment_idx": event.segment_idx, | |
| "stream_id": stream_id, | |
| }) | |
| case MediaError(stream_id=stream_id, message=message): | |
| await ws_send_json({ | |
| "type": "error", | |
| "message": f"AV streaming failed for segment {event.segment_idx}: {message}", | |
| "segment_idx": event.segment_idx, | |
| "stream_id": stream_id, | |
| }) | |
| stop_event.set() |
ebbebdb to
0b75db4
Compare
Pre-commit checks failedHi @Davids048, the pre-commit checks have failed. To fix them locally: # Install pre-commit if you haven't already
uv pip install pre-commit
pre-commit install
# Run all checks and auto-fix what's possible
pre-commit run --all-filesCommon fixes:
After fixing, commit and push the changes. The checks will re-run automatically. For future commits, |
1 similar comment
Pre-commit checks failedHi @Davids048, the pre-commit checks have failed. To fix them locally: # Install pre-commit if you haven't already
uv pip install pre-commit
pre-commit install
# Run all checks and auto-fix what's possible
pre-commit run --all-filesCommon fixes:
After fixing, commit and push the changes. The checks will re-run automatically. For future commits, |
Move Dreamverse fMP4 packaging out of the synchronous USER_STEP path. The worker still runs generate_step synchronously so continuation state is saved before the next segment can start, but after generation returns it starts a single background fMP4 stream thread and emits StepComplete immediately. Add MediaError for post-generation fMP4 failures. These errors cannot use the normal WorkerError command path because StepComplete may already have resolved, so they are routed through the media event queue with MediaInit, MediaChunk, and MediaComplete. Add a SessionController media relay task that forwards per-user fMP4 events from the GPU slot queue to the browser WebSocket while the main generation loop can request the next segment. The relay keeps ltx2_segment_complete tied to media completion and gates ltx2_stream_complete until media completion catches up with generated segments. Keep media chunk ordering by allowing only one active fMP4 stream thread per worker and joining it before leave, shutdown, or starting the next segment's media stream. Disable the shared stream buffer for this async path and send chunk bytes through IPC for this version. Update the session logging fake and expectations for the split generation/media lifecycle: step_complete carries generation latency, while segment_complete logs media bytes after fMP4 completion.
0b75db4 to
b4be1de
Compare
|
Hi @Davids048 — automated review from Gob, one of @SolitaryThinker's AI reviewers. I read the diff at The one thing that jumped out is the natural extension of Gemini's first finding: The second-biggest gap is test coverage. The fake slot in A few smaller items in the review: Full findings with file:line evidence in the attached review. — Gob |
|
Hi @Davids048 — automated review from Gob, one of @SolitaryThinker's AI reviewers. Findings aren't all human-verified; ping @SolitaryThinker if anything looks off. TL;DRDecouples Dreamverse fMP4 streaming from segment generation via a background encoder thread + async Verdict: ship-with-fixes
Findings (formatted for upload)[S1]
|
Add Dreamverse session tests for media events that arrive after user_step returns, including stream completion waiting for the async media tail and MediaError terminating the session/releasing the GPU slot. Also consume segment_total_hints with pop on MediaComplete so per-segment total hints are cleaned up once relayed.
Summary
Tests
RuntimeError: 0 active drivers ([]). There should only be one.)