Skip to content

[feat]: decouple Dreamverse fMP4 streaming from generation#1425

Open
Davids048 wants to merge 2 commits into
hao-ai-lab:mainfrom
Davids048:feat/pipeline-ffmpeg
Open

[feat]: decouple Dreamverse fMP4 streaming from generation#1425
Davids048 wants to merge 2 commits into
hao-ai-lab:mainfrom
Davids048:feat/pipeline-ffmpeg

Conversation

@Davids048

Copy link
Copy Markdown
Collaborator

Summary

  • move Dreamverse fMP4 packaging out of the synchronous USER_STEP path into one serial worker-side background stream thread
  • add MediaError for post-generation fMP4 failures and route media events through a SessionController media relay task
  • keep ltx2_segment_complete and ltx2_stream_complete tied to media completion while StepComplete remains the generation/continuation-state boundary
  • update session logging expectations for the split generation/media lifecycle

Tests

  • git diff --cached --check
  • /home/hal-jundas/codes/FastVideo/.venv/bin/python -m py_compile apps/dreamverse/dreamverse/session/controller.py apps/dreamverse/dreamverse/tests/test_session_logging.py
  • /home/hal-jundas/codes/FastVideo/.venv/bin/python -m pytest apps/dreamverse/dreamverse/tests/test_session_logging.py::test_websocket_flow_emits_required_session_events -q (blocked during collection: Triton reports RuntimeError: 0 active drivers ([]). There should only be one.)

@mergify mergify Bot added the type: feat New feature or capability label Jun 2, 2026
@mergify

mergify Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Merge Protections

Your pull request matches the following merge protections and will not be merged until they are valid.

🔴 PR merge requirements

Waiting for

  • #approved-reviews-by>=1
  • check-success=fastcheck-passed
  • check-success=full-suite-passed
This rule is failing.
  • #approved-reviews-by>=1
  • check-success=fastcheck-passed
  • check-success=full-suite-passed
  • check-success~=pre-commit
  • title~=(?i)^\[(feat|feature|bugfix|fix|refactor|perf|ci|doc|docs|misc|chore|kernel|new.?model|skill|skills|infra)\]

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces asynchronous fMP4 segment streaming by running the stream encoding in a background thread on the GPU worker and adding an independent media_relay_loop task in the session controller. This decouples segment generation from media streaming. Additionally, a new MediaError event is introduced to handle streaming failures. The reviewer provided valuable feedback, pointing out a potential concurrent modification race condition on step_result.timings, the risk of indefinite blocking when joining the stream thread without a timeout, and the opportunity to terminate the session on MediaError to save GPU resources.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread apps/dreamverse/dreamverse/gpu_pool.py Outdated
Comment on lines +203 to +238
def _publish(event: StreamEvent) -> None:
response_queue.put(_stream_event_to_worker_event(event, user_id, segment_idx))

def _run_stream() -> None:
try:
av_ok, av_error = stream_fmp4(
frames=step_result.frames,
audio=step_result.audio,
audio_sample_rate=step_result.audio_sample_rate,
stream_id=stream_id,
timings=step_result.timings,
head_trim_frames=head_trim_frames,
head_trim_audio_frames=head_trim_audio_frames,
shared_buffer=None,
shared_buffer_bytes=0,
publish=_publish,
log_prefix=f"[GPU {gpu_id}]",
)
if not av_ok:
response_queue.put(
MediaError(
user_id=user_id,
segment_idx=segment_idx,
stream_id=stream_id,
message=av_error or "worker av_fmp4 stream failed",
))
return
print(f"[GPU {gpu_id}] AV streamed segment {segment_idx}: "
f"encode_total={step_result.timings.get('av_encode_stream_ms', 0):.0f}ms "
f"wav_write={step_result.timings.get('av_wav_write_ms', 0):.1f}ms "
f"spawn={step_result.timings.get('av_ffmpeg_spawn_ms', 0):.1f}ms "
f"first_chunk={step_result.timings.get('av_first_chunk_ms', 0):.0f}ms "
f"chunk_interval_med={step_result.timings.get('av_chunk_interval_ms_median', 0):.1f}ms "
f"chunk_interval_p95={step_result.timings.get('av_chunk_interval_ms_p95', 0):.1f}ms "
f"publish_med={step_result.timings.get('av_chunk_publish_ms_median', 0):.2f}ms "
f"read_med={step_result.timings.get('av_chunk_read_ms_median', 0):.1f}ms")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a concurrent modification and serialization race condition on step_result.timings. The main thread modifies step_result.timings["ipc_put_start_ns"] and serializes it via response_queue.put(StepComplete(...)) immediately after starting the stream thread. Concurrently, the background active_stream_thread executes stream_fmp4 which writes several av_* metrics directly to the same step_result.timings dictionary. This can lead to a RuntimeError: dictionary changed size during iteration during pickling, or corrupted/incomplete timing data. Copying the timings dictionary before passing it to the background thread resolves this race.

            stream_timings = dict(step_result.timings)

            def _publish(event: StreamEvent) -> None:
                response_queue.put(_stream_event_to_worker_event(event, user_id, segment_idx))

            def _run_stream() -> None:
                try:
                    av_ok, av_error = stream_fmp4(
                        frames=step_result.frames,
                        audio=step_result.audio,
                        audio_sample_rate=step_result.audio_sample_rate,
                        stream_id=stream_id,
                        timings=stream_timings,
                        head_trim_frames=head_trim_frames,
                        head_trim_audio_frames=head_trim_audio_frames,
                        shared_buffer=None,
                        shared_buffer_bytes=0,
                        publish=_publish,
                        log_prefix=f"[GPU {gpu_id}]",
                    )
                    if not av_ok:
                        response_queue.put(
                            MediaError(
                                user_id=user_id,
                                segment_idx=segment_idx,
                                stream_id=stream_id,
                                message=av_error or "worker av_fmp4 stream failed",
                            ))
                        return
                    print(f"[GPU {gpu_id}] AV streamed segment {segment_idx}: "
                          f"encode_total={stream_timings.get('av_encode_stream_ms', 0):.0f}ms "
                          f"wav_write={stream_timings.get('av_wav_write_ms', 0):.1f}ms "
                          f"spawn={stream_timings.get('av_ffmpeg_spawn_ms', 0):.1f}ms "
                          f"first_chunk={stream_timings.get('av_first_chunk_ms', 0):.0f}ms "
                          f"chunk_interval_med={stream_timings.get('av_chunk_interval_ms_median', 0):.1f}ms "
                          f"chunk_interval_p95={stream_timings.get('av_chunk_interval_ms_p95', 0):.1f}ms "
                          f"publish_med={stream_timings.get('av_chunk_publish_ms_median', 0):.2f}ms "
                          f"read_med={stream_timings.get('av_chunk_read_ms_median', 0):.1f}ms")

Comment on lines +176 to +187
def wait_for_active_stream() -> None:
"""Join the active fMP4 segment stream before leave, shutdown, or the next stream."""
nonlocal active_stream_thread
nonlocal active_stream_segment_idx
if active_stream_thread is None:
return
if active_stream_thread.is_alive():
print(f"[GPU {gpu_id}] Waiting for AV stream "
f"segment {active_stream_segment_idx} before continuing")
active_stream_thread.join()
active_stream_thread = None
active_stream_segment_idx = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling active_stream_thread.join() without a timeout can block the GPU worker event loop indefinitely if the background stream thread hangs (e.g., due to an ffmpeg pipe deadlock or process hang). If this happens during a USER_LEAVE command, the event loop will remain permanently blocked, rendering the GPU slot completely unusable for subsequent sessions. Adding a timeout to the join operation prevents permanent hangs.

        def wait_for_active_stream(timeout: float = 30.0) -> None:
            """Join the active fMP4 segment stream before leave, shutdown, or the next stream."""
            nonlocal active_stream_thread
            nonlocal active_stream_segment_idx
            if active_stream_thread is None:
                return
            if active_stream_thread.is_alive():
                print(f"[GPU {gpu_id}] Waiting for AV stream "
                      f"segment {active_stream_segment_idx} before continuing")
            active_stream_thread.join(timeout=timeout)
            if active_stream_thread.is_alive():
                print(f"[GPU {gpu_id}] WARNING: AV stream thread for segment "
                      f"{active_stream_segment_idx} did not finish within {timeout}s")
            else:
                active_stream_thread = None
                active_stream_segment_idx = None

Comment on lines +1431 to +1437
case MediaError(stream_id=stream_id, message=message):
await ws_send_json({
"type": "error",
"message": f"AV streaming failed for segment {event.segment_idx}: {message}",
"segment_idx": event.segment_idx,
"stream_id": stream_id,
})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When a MediaError is received in media_relay_loop, the error is sent to the client, but the main generation loop continues to run and generate subsequent segments. Since the media pipeline is broken, continuing generation wastes valuable GPU resources. Setting stop_event.set() on MediaError ensures the session is cleanly terminated.

Suggested change
case MediaError(stream_id=stream_id, message=message):
await ws_send_json({
"type": "error",
"message": f"AV streaming failed for segment {event.segment_idx}: {message}",
"segment_idx": event.segment_idx,
"stream_id": stream_id,
})
case MediaError(stream_id=stream_id, message=message):
await ws_send_json({
"type": "error",
"message": f"AV streaming failed for segment {event.segment_idx}: {message}",
"segment_idx": event.segment_idx,
"stream_id": stream_id,
})
stop_event.set()

@Davids048 Davids048 force-pushed the feat/pipeline-ffmpeg branch 2 times, most recently from ebbebdb to 0b75db4 Compare June 2, 2026 06:00
@mergify

mergify Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Pre-commit checks failed

Hi @Davids048, the pre-commit checks have failed. To fix them locally:

# Install pre-commit if you haven't already
uv pip install pre-commit
pre-commit install

# Run all checks and auto-fix what's possible
pre-commit run --all-files

Common fixes:

  • yapf: yapf -i <file> (formatting)
  • ruff: ruff check --fix <file> (linting)
  • codespell: codespell --write-changes <file> (spelling)

After fixing, commit and push the changes. The checks will re-run automatically.

For future commits, pre-commit will run automatically on changed files before each commit.

1 similar comment
@mergify

mergify Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Pre-commit checks failed

Hi @Davids048, the pre-commit checks have failed. To fix them locally:

# Install pre-commit if you haven't already
uv pip install pre-commit
pre-commit install

# Run all checks and auto-fix what's possible
pre-commit run --all-files

Common fixes:

  • yapf: yapf -i <file> (formatting)
  • ruff: ruff check --fix <file> (linting)
  • codespell: codespell --write-changes <file> (spelling)

After fixing, commit and push the changes. The checks will re-run automatically.

For future commits, pre-commit will run automatically on changed files before each commit.

Move Dreamverse fMP4 packaging out of the synchronous USER_STEP path. The worker still runs generate_step synchronously so continuation state is saved before the next segment can start, but after generation returns it starts a single background fMP4 stream thread and emits StepComplete immediately.

Add MediaError for post-generation fMP4 failures. These errors cannot use the normal WorkerError command path because StepComplete may already have resolved, so they are routed through the media event queue with MediaInit, MediaChunk, and MediaComplete.

Add a SessionController media relay task that forwards per-user fMP4 events from the GPU slot queue to the browser WebSocket while the main generation loop can request the next segment. The relay keeps ltx2_segment_complete tied to media completion and gates ltx2_stream_complete until media completion catches up with generated segments.

Keep media chunk ordering by allowing only one active fMP4 stream thread per worker and joining it before leave, shutdown, or starting the next segment's media stream. Disable the shared stream buffer for this async path and send chunk bytes through IPC for this version.

Update the session logging fake and expectations for the split generation/media lifecycle: step_complete carries generation latency, while segment_complete logs media bytes after fMP4 completion.
@Davids048 Davids048 force-pushed the feat/pipeline-ffmpeg branch from 0b75db4 to b4be1de Compare June 2, 2026 06:09
@SolitaryThinker

Copy link
Copy Markdown
Collaborator

Hi @Davids048 — automated review from Gob, one of @SolitaryThinker's AI reviewers. I read the diff at b4be1de7 against origin/main (PR #1424 just landed as 570607945; merges cleanly, no rebase needed). Verdict: ship-with-fixes. Counts: 1 S1 / 2 S2 / 3 S3. All three prior Gemini findings are genuinely addressed at this HEAD — nice work on the timings snapshot, the 15s join timeout, and the stop_event.set() on MediaError.

The one thing that jumped out is the natural extension of Gemini's first finding: step_result.timings is correctly snapshotted with dict(...), but step_result.frames and step_result.audio are still passed by reference into the background stream thread. The whole point of the refactor is to let worker.generate_step start the next segment while the previous one is still encoding — which is great — but that means the producer can overwrite frame/audio storage while the encoder is still reading it, if VideoGenerationWorker.generate_step reuses any pre-allocated output buffer. Worth either (a) cloning the frames/audio at the top of start_stream_thread, (b) asserting in a comment + tests that generate_step always returns freshly-allocated storage, or (c) calling wait_for_active_stream() before the next generate_step instead of before the next start_stream_thread (option c partially defeats the decoupling, so (a) or (b) is probably right).

The second-biggest gap is test coverage. The fake slot in test_session_logging.py still emits MediaInit/Chunk/Complete inline inside user_step(), which means none of the new async-relay timing is actually exercised by a test. Adding one test that (i) emits media events after user_step returns and asserts ltx2_stream_complete waits for the tail, and (ii) emits a MediaError and asserts the session terminates, would pin all the new invariants. The author already flagged the e2e test was blocked by a Triton driver error during local collection — worth getting that running too before merge.

A few smaller items in the review: segment_total_hints grows unbounded over a long session (just pop instead of get on MediaComplete); the media_relay_loop quietly removed the old out-of-order event guard and now depends on wait_for_active_stream() upstream — worth a one-line comment near the relay; the USER_LEAVE path now treats a 15s join timeout as a WorkerError that brings down the worker process, which is a behavior change worth confirming is what you want vs. just logging and acking the leave; and the media_relay_loop has no top-level try/except so an uncaught exception there would silently kill the relay while generation continues.

Full findings with file:line evidence in the attached review.

— Gob
(automated review; human review still required to land)

@SolitaryThinker

Copy link
Copy Markdown
Collaborator

Hi @Davids048 — automated review from Gob, one of @SolitaryThinker's AI reviewers. Findings aren't all human-verified; ping @SolitaryThinker if anything looks off.

TL;DR

Decouples Dreamverse fMP4 streaming from segment generation via a background encoder thread + async media_relay_loop. All three of gemini's prior flags are genuinely addressed at this HEAD (timings dict-snapshot, 15s join timeout, stop_event.set() on MediaError). The natural extension of gemini's first finding is the one S1 here: step_result.frames / step_result.audio are still passed by reference into the background thread while the next segment is being generated.

Verdict: ship-with-fixes

  • S0 (blockers): 0
  • S1 (must-fix): 1
  • S2 (should-fix; surfaced if persistent or important): 2
  • S3 (discussion): not shown here; see review.md

Findings (formatted for upload)

[S1] step_result.frames / step_result.audio passed by reference into the encoder thread

What: step_result.timings is correctly snapshotted via dict(step_result.timings) before the thread starts (good — that's gemini #1 addressed). But step_result.frames and step_result.audio are still passed by reference into the background stream thread, while the main loop is now free to call worker.generate_step() for the next segment without waiting.

Why it matters: The whole point of the refactor is to let generate_step start the next segment while the previous one is still encoding. If VideoGenerationWorker.generate_step ever reuses any pre-allocated output buffer for frames/audio (a common optimization), the producer overwrites the encoder's input mid-encode → corrupt segments. The window is exactly the speedup the refactor unlocks, so the data race scales with the speedup.

Suggested fix: Pick one:
(a) Clone frames/audio at the top of start_stream_thread (cheap, no behavior change required from worker).
(b) Assert in a comment + test that generate_step always returns freshly-allocated storage, and pin that with a regression test that mutates the original after handoff and confirms the encoder output is unaffected.
(c) Call wait_for_active_stream() before the next generate_step instead of before the next start_stream_thread — but this partially defeats the decoupling, so (a) or (b) is probably right.

Evidence: apps/dreamverse/dreamverse/worker_ipc.py start_stream_thread (around line 208 where dict(step_result.timings) snapshots; frames/audio remain by-reference).


[S2] No test exercises the new async-relay timing

What: The fake slot in apps/dreamverse/dreamverse/tests/test_session_logging.py still emits MediaInit / Chunk / Complete inline inside user_step(). None of the new async-relay timing — segments emitting after user_step returns — is actually exercised by a test. Same gap on the MediaError path: no test asserts the session terminates and the GPU slot releases.

Why it matters: The refactor introduces a class of bugs (join-timeout deadlocks, out-of-order events, missed MediaError cleanup, frames/audio races per S1 above) that only manifest when generation and streaming actually overlap in time. The current test only proves the happy synchronous path still works — which is also what the old code did. The author's PR description already flags an e2e test as blocked by a Triton driver error during local collection.

Suggested fix: Add two tests in test_session_logging.py:

  1. Have the fake slot emit MediaInit / Chunk / Complete after user_step returns (e.g., from a separate task/thread) and assert ltx2_stream_complete waits for the tail and the relay forwards every event in order.
  2. Have the fake slot emit MediaError and assert stop_event.set() fires, the session reaches a terminal state, and the GPU slot is released.

Resolving the local-e2e Triton blocker before merge would also retire significant risk.

Evidence: apps/dreamverse/dreamverse/tests/test_session_logging.py (fake slot's inline event emission); apps/dreamverse/dreamverse/session/controller.py media_relay_loop + MediaError handling.


[S2] segment_total_hints grows unbounded over the session lifetime

What: segment_total_hints is populated when a segment starts but never pruned. A long session (many segments) walks it indefinitely; over a long-lived worker it's a slow leak.

Why it matters: Not a crash, but the relay path is now the long-lived hot path of the worker — anything in it that grows with session length will surface as either memory growth or O(N) lookup cost over time. Cheap to fix.

Suggested fix: Replace the get on MediaComplete with pop, so each hint is consumed exactly once when its terminal event fires.

Evidence: apps/dreamverse/dreamverse/session/controller.py (segment_total_hints populated near MediaInit handling; consumed at MediaComplete).


— Gob (@SolitaryThinker's AI reviewer). Full review (including S3 items) is archived locally.

Add Dreamverse session tests for media events that arrive after user_step returns, including stream completion waiting for the async media tail and MediaError terminating the session/releasing the GPU slot.

Also consume segment_total_hints with pop on MediaComplete so per-segment total hints are cleaned up once relayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feat New feature or capability

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants