[feat] Add opt-in dynamic generation batching#1453
Conversation
There was a problem hiding this comment.
Welcome to FastVideo! Thanks for your first pull request.
How our CI works:
PRs run a two-tier CI system:
- Pre-commit — formatting (yapf), linting (ruff), type checking (mypy). Runs immediately on every PR.
- Fastcheck — core GPU tests (encoders, VAEs, transformers, kernels, unit tests). Runs automatically via Buildkite on relevant file changes (~10-15 min).
- Full Suite — integration tests, training pipelines, SSIM regression. Runs only when a reviewer adds the
readylabel.
Before your PR is reviewed:
-
pre-commit run --all-filespasses locally - You've added or updated tests for your changes
- The PR description explains what and why
If pre-commit fails, a bot comment will explain how to fix it. Fastcheck and Full Suite results appear in the Checks section below.
Useful links:
Merge ProtectionsYour pull request matches the following merge protections and will not be merged until they are valid. 🔴 PR merge requirementsWaiting for
This rule is failing.
|
There was a problem hiding this comment.
Code Review
This pull request introduces an SGLang-style dynamic generation batching path to FastVideo for compatible text-only requests, covering configuration, admission control, request merging/splitting, batch-aware pipeline stages, and an OpenAI server queue scheduler. The review feedback highlights three key improvements: implementing a tensor padding helper in the text encoding stage to prevent shape mismatches when concatenating variable-length prompt embeddings, using appendleft in the scheduler to preserve strict FIFO queue ordering for incompatible requests, and expanding the _optional_bool helper to robustly support numeric boolean representations like 1 and 0.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _encode_prompt_list_individually( | ||
| self, | ||
| texts: list[str], | ||
| fastvideo_args: FastVideoArgs, | ||
| *, | ||
| encoder_index: list[int], | ||
| return_attention_mask: bool, | ||
| ) -> tuple[list[torch.Tensor], list[torch.Tensor]]: | ||
| per_prompt_embeds: list[list[torch.Tensor]] = [] | ||
| per_prompt_masks: list[list[torch.Tensor]] = [] | ||
| per_prompt_audio_embeds: list[list[torch.Tensor] | None] = [] | ||
|
|
||
| for text in texts: | ||
| embeds, masks = self.encode_text( | ||
| text, | ||
| fastvideo_args, | ||
| encoder_index=encoder_index, | ||
| return_attention_mask=return_attention_mask, | ||
| ) | ||
| per_prompt_embeds.append(embeds) | ||
| per_prompt_masks.append(masks) | ||
| per_prompt_audio_embeds.append(self._last_audio_embeds) | ||
|
|
||
| merged_embeds = [ | ||
| torch.cat([prompt_embeds[encoder_pos] for prompt_embeds in per_prompt_embeds], dim=0) | ||
| for encoder_pos in range(len(per_prompt_embeds[0])) | ||
| ] | ||
| merged_masks = [ | ||
| self._cat_attention_masks([prompt_masks[encoder_pos] for prompt_masks in per_prompt_masks]) | ||
| for encoder_pos in range(len(per_prompt_masks[0])) | ||
| ] | ||
| if per_prompt_audio_embeds and all(audio_embeds is not None for audio_embeds in per_prompt_audio_embeds): | ||
| audio_embed_lists = [audio_embeds for audio_embeds in per_prompt_audio_embeds if audio_embeds is not None] | ||
| self._last_audio_embeds = [ | ||
| torch.cat([audio_embeds[encoder_pos] for audio_embeds in audio_embed_lists], dim=0) | ||
| for encoder_pos in range(len(audio_embed_lists[0])) | ||
| ] | ||
| else: | ||
| self._last_audio_embeds = None | ||
| return merged_embeds, merged_masks | ||
|
|
||
| @staticmethod | ||
| def _cat_attention_masks(masks: list[torch.Tensor]) -> torch.Tensor: | ||
| base_shape = masks[0].shape[1:] | ||
| if all(mask.shape[1:] == base_shape for mask in masks): | ||
| return torch.cat(masks, dim=0) | ||
| if all(mask.ndim == 2 for mask in masks): | ||
| max_length = max(mask.shape[1] for mask in masks) | ||
| padded_masks = [] | ||
| for mask in masks: | ||
| pad_width = max_length - mask.shape[1] | ||
| if pad_width > 0: | ||
| mask = torch.nn.functional.pad(mask, (0, pad_width), value=0) | ||
| padded_masks.append(mask) | ||
| return torch.cat(padded_masks, dim=0) | ||
| raise ValueError(f"Cannot concatenate attention masks with shapes: {[list(mask.shape) for mask in masks]}") |
There was a problem hiding this comment.
When encoding a batch of prompts of different lengths individually, the resulting prompt_embeds (and self._last_audio_embeds) can have different sequence lengths (dimension 1). Attempting to concatenate them directly using torch.cat along dim=0 will raise a RuntimeError due to shape mismatch.
To prevent this, we should introduce a helper method _cat_tensors that pads the sequence dimension of 3D tensors (like prompt_embeds and self._last_audio_embeds) to the maximum sequence length in the batch, similar to how _cat_attention_masks handles 2D masks.
def _encode_prompt_list_individually(
self,
texts: list[str],
fastvideo_args: FastVideoArgs,
*,
encoder_index: list[int],
return_attention_mask: bool,
) -> tuple[list[torch.Tensor], list[torch.Tensor]]:
per_prompt_embeds: list[list[torch.Tensor]] = []
per_prompt_masks: list[list[torch.Tensor]] = []
per_prompt_audio_embeds: list[list[torch.Tensor] | None] = []
for text in texts:
embeds, masks = self.encode_text(
text,
fastvideo_args,
encoder_index=encoder_index,
return_attention_mask=return_attention_mask,
)
per_prompt_embeds.append(embeds)
per_prompt_masks.append(masks)
per_prompt_audio_embeds.append(self._last_audio_embeds)
merged_embeds = [
self._cat_tensors([prompt_embeds[encoder_pos] for prompt_embeds in per_prompt_embeds])
for encoder_pos in range(len(per_prompt_embeds[0]))
]
merged_masks = [
self._cat_attention_masks([prompt_masks[encoder_pos] for prompt_masks in per_prompt_masks])
for encoder_pos in range(len(per_prompt_masks[0]))
]
if per_prompt_audio_embeds and all(audio_embeds is not None for audio_embeds in per_prompt_audio_embeds):
audio_embed_lists = [audio_embeds for audio_embeds in per_prompt_audio_embeds if audio_embeds is not None]
self._last_audio_embeds = [
self._cat_tensors([audio_embeds[encoder_pos] for audio_embeds in audio_embed_lists])
for encoder_pos in range(len(audio_embed_lists[0]))
]
else:
self._last_audio_embeds = None
return merged_embeds, merged_masks
@staticmethod
def _cat_tensors(tensors: list[torch.Tensor]) -> torch.Tensor:
base_shape = tensors[0].shape[1:]
if all(t.shape[1:] == base_shape for t in tensors):
return torch.cat(tensors, dim=0)
if all(t.ndim == 3 for t in tensors):
max_len = max(t.shape[1] for t in tensors)
padded_tensors = []
for t in tensors:
pad_width = max_len - t.shape[1]
if pad_width > 0:
t = torch.nn.functional.pad(t, (0, 0, 0, pad_width), value=0.0)
padded_tensors.append(t)
return torch.cat(padded_tensors, dim=0)
raise ValueError(f"Cannot concatenate tensors with shapes: {[list(t.shape) for t in tensors]}")
@staticmethod
def _cat_attention_masks(masks: list[torch.Tensor]) -> torch.Tensor:
base_shape = masks[0].shape[1:]
if all(mask.shape[1:] == base_shape for mask in masks):
return torch.cat(masks, dim=0)
if all(mask.ndim == 2 for mask in masks):
max_length = max(mask.shape[1] for mask in masks)
padded_masks = []
for mask in masks:
pad_width = max_length - mask.shape[1]
if pad_width > 0:
mask = torch.nn.functional.pad(mask, (0, pad_width), value=0)
padded_masks.append(mask)
return torch.cat(padded_masks, dim=0)
raise ValueError(f"Cannot concatenate attention masks with shapes: {[list(mask.shape) for mask in masks]}")| if self._jobs_are_compatible(batch[0], candidate): | ||
| batch.append(candidate) | ||
| continue | ||
| self._pending.append(candidate) | ||
| break |
There was a problem hiding this comment.
When a candidate job is found to be incompatible with the current batch, appending it back to the right side of self._pending (using append) violates the strict FIFO ordering of the queue. This can cause newer jobs to be processed before older ones, potentially leading to unfairness or starvation.
Using self._pending.appendleft(candidate) ensures that the incompatible job remains at the front of the queue to be processed first in the next batch.
| if self._jobs_are_compatible(batch[0], candidate): | |
| batch.append(candidate) | |
| continue | |
| self._pending.append(candidate) | |
| break | |
| if self._jobs_are_compatible(batch[0], candidate): | |
| batch.append(candidate) | |
| continue | |
| self._pending.appendleft(candidate) | |
| break |
| if isinstance(value, bool): | ||
| return value | ||
| if isinstance(value, str): | ||
| lowered = value.strip().lower() | ||
| if lowered in ("1", "true", "yes", "y", "on"): | ||
| return True | ||
| if lowered in ("0", "false", "no", "n", "off"): | ||
| return False | ||
| raise ValueError(f"cannot parse boolean batching config value: {value!r}") |
There was a problem hiding this comment.
The _optional_bool helper raises a ValueError if the input is an integer or float (like 1 or 0), which are common representations of booleans in JSON configurations.
We should explicitly support 1 and 0 (or 1.0 and 0.0) to make config parsing more robust and user-friendly.
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, str): | |
| lowered = value.strip().lower() | |
| if lowered in ("1", "true", "yes", "y", "on"): | |
| return True | |
| if lowered in ("0", "false", "no", "n", "off"): | |
| return False | |
| raise ValueError(f"cannot parse boolean batching config value: {value!r}") | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, (int, float)): | |
| if value == 1.0: | |
| return True | |
| if value == 0.0: | |
| return False | |
| if isinstance(value, str): | |
| lowered = value.strip().lower() | |
| if lowered in ("1", "true", "yes", "y", "on"): | |
| return True | |
| if lowered in ("0", "false", "no", "n", "off"): | |
| return False |
Purpose
Adds an opt-in dynamic batching path for compatible text-only generation requests. This lets prompt-file generation and the OpenAI-compatible video server coalesce compatible requests
into a single forward pass while preserving the historical single-request path by default.
Fixes # N/A
Changes
batching_mode,batching_max_size,batching_delay_ms, admission config, and batching metrics.VideoGenerator.generate_video_batch()with merge/split behavior, per-request seed/output preservation, prompt-file dynamic batching, and sequential fallback for incompatiblerequests.
VideoBatchSchedulerthat starts in server lifespan and routes eligible requests through dynamic batching.da55c18b [docs]: record SSIM validation follow-up.Test Plan
SSIM:
FASTVIDEO_SSIM_MODEL_ID=Wan2.1-T2V-1.3B-Diffusers \ pytest fastvideo/tests/ssim/test_wan_t2v_similarity.py -vsAlso ran changed-file pre-commit run --files ..., attempted pre-commit run --all-files, and ran GPU validation through fastvideo/tests/modal/launch_l40s_job.py.
Test Results
Focused validation
Final changed-file suite on Modal L40S:
Focused suites:
SSIM validation
H100 SSIM attempt:
L40S SSIM fallback:
FASTVIDEO_SSIM_MODEL_ID=Wan2.1-T2V-1.3B-Diffusers pytest fastvideo/tests/ssim/test_wan_t2v_similarity.py -vs
Pre-commit validation
Changed-file pre-commit:
Full pre-commit attempt:
Docs follow-up validation:
Known Limitations
Dynamic batching remains disabled by default and is intentionally limited to compatible text-only requests. Image, video, audio, refine, continuation, and other conditioning-heavy
requests fall back to sequential execution.
Wan latent parity completed successfully but is not near bit-identical to sequential denoising in the current implementation:
Benchmarks on small latent-output Wan2.1 T2V 1.3B workloads showed:
Checklist
For model/pipeline changes, also check: