A one-minute video at 30 fps contains 1800 frames. Each frame, processed through a ViT encoder with 14x14 patches on a 224x224 input, produces 256 visual tokens. That is tokens for one minute of video. At the attention complexity of , a 460K-token context is computationally impractical. The core problem of video LLM serving is compressing temporal visual information to a manageable token count while preserving enough detail for the LLM to reason about events, actions, and temporal relationships.
Video Token Budget
The raw token count sets the challenge:
Video Token Counts at Different Sampling Rates
| Sampling | Frames/sec | Patches/Frame | Tokens/sec | 1min Total | KV Cache (70B, FP16) |
|---|---|---|---|---|---|
| Every frame (30fps) | 30 | 256 | 7680 | 460,800 | 150 GB |
| Every frame (1fps) | 1 | 256 | 256 | 15,360 | 5.0 GB |
| Keyframe (0.5fps) | 0.5 | 256 | 128 | 7,680 | 2.5 GB |
| Pooled (30fps, 4x pool) | 30 | 64 | 1920 | 115,200 | 37 GB |
| Temporal compressed | 2 | 64 | 128 | 7,680 | 2.5 GB |
Production video LLMs use a combination of frame subsampling, spatial token pooling, and temporal compression to reduce the token count by 50-100x while retaining enough information for the task.
Temporal Video Encoding
Frame Subsampling
The simplest approach: process only a subset of frames.
class FrameSubsampler:
"""Select frames from video for visual encoding."""
def __init__(self, target_fps=2.0, max_frames=64):
self.target_fps = target_fps
self.max_frames = max_frames
def sample_frames(self, video_path, source_fps=30.0):
"""Select evenly-spaced frames from video."""
import cv2
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration_sec = total_frames / source_fps
# Calculate number of frames to extract
num_frames = min(
int(duration_sec * self.target_fps),
self.max_frames,
total_frames,
)
# Evenly space frame indices
frame_indices = [
int(i * total_frames / num_frames) for i in range(num_frames)
]
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frames.append({
"frame": frame,
"timestamp": idx / source_fps,
"index": idx,
})
cap.release()
return frames
Spatial Token Pooling
After ViT encoding, reduce the number of tokens per frame through spatial pooling:
import torch
import torch.nn as nn
class SpatialTokenPooler(nn.Module):
"""Reduce spatial tokens per frame via learned pooling."""
def __init__(self, hidden_dim=1024, num_input_tokens=256,
num_output_tokens=64, num_heads=16):
super().__init__()
self.hidden_dim = hidden_dim
self.num_output_tokens = num_output_tokens
# Learnable query tokens that attend to ViT output
self.query_tokens = nn.Parameter(
torch.randn(num_output_tokens, hidden_dim) * 0.02
)
# Cross-attention: queries attend to ViT spatial tokens
self.cross_attn = nn.MultiheadAttention(
hidden_dim, num_heads, batch_first=True
)
self.norm = nn.LayerNorm(hidden_dim)
def forward(self, vit_tokens):
"""Pool spatial tokens.
vit_tokens: [batch, num_frames, 256, hidden_dim]
returns: [batch, num_frames, 64, hidden_dim]
"""
B, F, S, D = vit_tokens.shape
# Reshape to process all frames at once
flat = vit_tokens.view(B * F, S, D)
# Expand queries for batch
queries = self.query_tokens.unsqueeze(0).expand(B * F, -1, -1)
# Cross-attention: 64 queries attend to 256 spatial tokens
pooled, _ = self.cross_attn(queries, flat, flat)
pooled = self.norm(pooled)
return pooled.view(B, F, self.num_output_tokens, D)
Temporal Compression
Adjacent frames in a video are highly redundant. Temporal compression merges tokens across nearby frames:
class TemporalCompressor(nn.Module):
"""Compress tokens across the temporal dimension.
Groups of N consecutive frames are merged into one set of tokens."""
def __init__(self, hidden_dim=1024, temporal_stride=4, num_heads=16):
super().__init__()
self.temporal_stride = temporal_stride
# Temporal attention: merge tokens across stride frames
self.temporal_attn = nn.MultiheadAttention(
hidden_dim, num_heads, batch_first=True
)
# Learned temporal query: one set of tokens per group
self.temporal_query = nn.Parameter(
torch.randn(64, hidden_dim) * 0.02 # 64 output tokens per group
)
self.norm = nn.LayerNorm(hidden_dim)
def forward(self, frame_tokens):
"""Compress temporal dimension.
frame_tokens: [batch, num_frames, tokens_per_frame, hidden_dim]
returns: [batch, num_groups, tokens_per_group, hidden_dim]
"""
B, F, T, D = frame_tokens.shape
stride = self.temporal_stride
# Pad frames to multiple of stride
pad_frames = (stride - F % stride) % stride
if pad_frames > 0:
frame_tokens = torch.nn.functional.pad(
frame_tokens, (0, 0, 0, 0, 0, pad_frames)
)
F = F + pad_frames
num_groups = F // stride
# Reshape: group stride frames together
# [B, num_groups, stride, T, D]
grouped = frame_tokens.view(B, num_groups, stride, T, D)
# Flatten spatial and temporal dims within each group
# [B, num_groups, stride*T, D]
grouped = grouped.view(B, num_groups, stride * T, D)
# Cross-attention: temporal queries attend to grouped tokens
B_G = B * num_groups
flat = grouped.view(B_G, stride * T, D)
queries = self.temporal_query.unsqueeze(0).expand(B_G, -1, -1)
compressed, _ = self.temporal_attn(queries, flat, flat)
compressed = self.norm(compressed)
return compressed.view(B, num_groups, 64, D)
Token Count After Each Compression Stage (1-minute, 30fps video)
| Metric | Raw frames | Subsampled (2fps) | Spatial pooled (64/frame) | Temporal compressed (stride=4) |
|---|---|---|---|---|
| Total tokens |
Chunked Video Processing
For long videos (10+ minutes), even compressed tokens exceed the modelโs context window. Chunked processing splits the video into temporal chunks, processes each independently, and aggregates results:
class ChunkedVideoProcessor:
"""Process long videos in temporal chunks."""
def __init__(self, model, encoder, chunk_duration_sec=30,
overlap_sec=2, max_context_tokens=32768):
self.model = model
self.encoder = encoder
self.chunk_duration = chunk_duration_sec
self.overlap = overlap_sec
self.max_tokens = max_context_tokens
def process_video(self, video_path, query):
"""Process a long video with temporal chunking."""
# Extract frames
frames = self.extract_all_frames(video_path)
fps = self.get_fps(video_path)
total_duration = len(frames) / fps
# Define chunks with overlap
chunks = []
chunk_start = 0
while chunk_start < total_duration:
chunk_end = min(chunk_start + self.chunk_duration, total_duration)
chunks.append((chunk_start, chunk_end))
chunk_start = chunk_end - self.overlap
# Process each chunk
chunk_summaries = []
for i, (start, end) in enumerate(chunks):
start_frame = int(start * fps)
end_frame = int(end * fps)
chunk_frames = frames[start_frame:end_frame]
# Encode visual tokens for this chunk
visual_tokens = self.encoder.encode_frames(chunk_frames)
# Build prompt with temporal context
prompt = self._build_chunk_prompt(
query=query,
chunk_idx=i,
total_chunks=len(chunks),
time_range=(start, end),
visual_tokens=visual_tokens,
previous_summaries=chunk_summaries[-2:], # Last 2 chunks for context
)
# Run LLM on this chunk
response = self.model.generate(prompt)
chunk_summaries.append({
"time_range": (start, end),
"response": response,
})
# Final aggregation pass
return self._aggregate_chunks(query, chunk_summaries)
def _build_chunk_prompt(self, query, chunk_idx, total_chunks,
time_range, visual_tokens, previous_summaries):
"""Build prompt for one video chunk."""
context_parts = []
# Previous chunk summaries for continuity
if previous_summaries:
context_parts.append("Previous context:")
for s in previous_summaries:
context_parts.append(
f" [{s['time_range'][0]:.1f}s-{s['time_range'][1]:.1f}s]: "
f"{s['response'][:200]}"
)
# Current chunk visual tokens
context_parts.append(
f"\nCurrent segment ({time_range[0]:.1f}s-{time_range[1]:.1f}s), "
f"chunk {chunk_idx + 1}/{total_chunks}:"
)
return {
"text": "\n".join(context_parts) + f"\n\nQuery: {query}",
"visual_tokens": visual_tokens,
}
Audio LLM Serving
Audio processing for LLM serving follows a different pipeline than video. The dominant approach uses Whisper-style encoding: convert audio waveform to mel spectrogram features, then encode with a transformer encoder into tokens that the LLM processes.
Audio Encoding Pipeline
import torch
import torch.nn.functional as F
class AudioEncoder:
"""Whisper-style audio encoder for LLM integration."""
def __init__(self, model_size="large-v3"):
self.sample_rate = 16000 # 16 kHz
self.n_mels = 128
self.n_fft = 400
self.hop_length = 160 # 10ms per frame
self.chunk_length = 30 # seconds
# Audio encoder (Whisper encoder architecture)
self.encoder = self._load_encoder(model_size)
def encode_audio(self, waveform):
"""Convert raw audio to LLM-compatible tokens.
waveform: [num_samples] at 16 kHz
returns: [num_audio_tokens, hidden_dim]
"""
# Step 1: Mel spectrogram
mel = self._compute_mel(waveform)
# mel: [n_mels, num_frames] where num_frames = num_samples / hop_length
# Step 2: Pad or split to 30-second chunks
# Whisper expects fixed 30-second input = 3000 frames
target_frames = self.chunk_length * self.sample_rate // self.hop_length
chunks = self._split_to_chunks(mel, target_frames)
# Step 3: Encode each chunk
all_tokens = []
for chunk in chunks:
# chunk: [1, n_mels, 3000]
tokens = self.encoder(chunk)
# tokens: [1, 1500, hidden_dim] (2x downsampled from 3000 frames)
all_tokens.append(tokens.squeeze(0))
# Step 4: Concatenate all chunks
audio_tokens = torch.cat(all_tokens, dim=0)
# Each token represents 20ms of audio
return audio_tokens
def _compute_mel(self, waveform):
"""Compute log-mel spectrogram."""
# STFT
stft = torch.stft(
waveform, n_fft=self.n_fft,
hop_length=self.hop_length,
window=torch.hann_window(self.n_fft),
return_complex=True,
)
magnitudes = stft.abs() ** 2
# Mel filterbank
mel_filters = self._get_mel_filters() # [n_mels, n_fft//2 + 1]
mel = mel_filters @ magnitudes
# Log scale
log_mel = torch.clamp(mel, min=1e-10).log10()
log_mel = torch.maximum(log_mel, log_mel.max() - 8.0)
log_mel = (log_mel + 4.0) / 4.0 # Normalize
return log_mel
Streaming Audio Recognition
For real-time applications, audio must be processed as it arrives, not after the full recording:
class StreamingAudioProcessor:
"""Process audio in real-time chunks for streaming LLM interaction."""
def __init__(self, encoder, llm, chunk_ms=500, lookahead_ms=100):
self.encoder = encoder
self.llm = llm
self.chunk_ms = chunk_ms
self.lookahead_ms = lookahead_ms
# Streaming state
self.audio_buffer = []
self.encoded_tokens = []
self.kv_cache = None
def process_chunk(self, audio_chunk):
"""Process one audio chunk in real time.
audio_chunk: [chunk_samples] at 16 kHz
Returns: Optional partial transcription/response
"""
# Buffer the audio
self.audio_buffer.append(audio_chunk)
# Encode when we have enough audio
buffer_duration_ms = (
sum(c.shape[0] for c in self.audio_buffer) / 16 * 1000
)
if buffer_duration_ms >= self.chunk_ms:
# Concatenate buffer
full_audio = torch.cat(self.audio_buffer)
self.audio_buffer = []
# Encode to tokens
new_tokens = self.encoder.encode_audio(full_audio)
self.encoded_tokens.append(new_tokens)
# Incremental LLM processing
# Only process the NEW tokens (KV cache has previous context)
with torch.no_grad():
output = self.llm.forward(
audio_tokens=new_tokens,
past_key_values=self.kv_cache,
use_cache=True,
)
self.kv_cache = output.past_key_values
# Check if the LLM wants to generate a response
# (e.g., user finished speaking, detected by silence/endpointing)
if self._should_respond(output.logits):
return self._generate_response()
return None
def _should_respond(self, logits):
"""Detect end of user speech (voice activity detection)."""
# In practice, this uses a separate VAD model or
# the LLM's own learned endpointing behavior
pass
Audio Encoding Latency Budget
| Component | Time (ms) | Notes |
|---|---|---|
| Mel spectrogram (500ms chunk) | 2.1 | FFT on CPU or GPU |
| Whisper encoder (500ms chunk) | 18.5 | GPU, large-v3 |
| Token projection to LLM dim | 0.3 | Linear layer |
| LLM incremental forward | 35.0 | Process new audio tokens |
| Total per chunk | 55.9 | Must be less than 500ms for real-time |
For real-time audio streaming, the total processing time per chunk must be less than the chunk duration. With 500ms chunks and 55.9ms processing time, we have 89% headroom. With 200ms chunks (lower latency), encoding takes approximately 10ms and LLM forward takes approximately 35ms, giving 45ms total against a 200ms budget (78% headroom). The bottleneck shifts to the LLM forward pass at small chunk sizes.
Multi-Turn Visual Context Caching
In a multi-turn video conversation, the user asks multiple questions about the same video. Re-encoding the video for each turn wastes compute. The solution: cache the visual tokens and their KV cache across turns.
class VisualContextCache:
"""Cache visual tokens and KV cache for multi-turn video conversations."""
def __init__(self, max_cached_videos=100, eviction_policy="lru"):
self.cache = {}
self.max_videos = max_cached_videos
self.access_order = []
def cache_visual_context(self, video_id, visual_tokens, kv_cache_prefix):
"""Cache encoded visual tokens and the KV cache from encoding them."""
if len(self.cache) >= self.max_videos:
self._evict()
self.cache[video_id] = {
"visual_tokens": visual_tokens,
"kv_cache_prefix": kv_cache_prefix,
"num_visual_tokens": visual_tokens.shape[0],
"cached_at": time.time(),
}
self.access_order.append(video_id)
def get_context(self, video_id):
"""Retrieve cached visual context for a follow-up turn."""
if video_id in self.cache:
entry = self.cache[video_id]
# Move to end of LRU
self.access_order.remove(video_id)
self.access_order.append(video_id)
return entry
return None
def multi_turn_forward(self, model, video_id, new_text_tokens):
"""Process a follow-up question using cached visual context.
Skips video encoding entirely."""
cached = self.get_context(video_id)
if cached is None:
raise ValueError(f"No cached context for video {video_id}")
# Use the cached KV prefix - skip encoding + prefill for visual tokens
# Only process the new text tokens
with torch.no_grad():
output = model.forward(
input_ids=new_text_tokens,
past_key_values=cached["kv_cache_prefix"],
use_cache=True,
)
return output
def memory_usage(self):
"""Report cache memory usage."""
total_bytes = 0
for vid, entry in self.cache.items():
# Visual tokens
total_bytes += entry["visual_tokens"].nbytes
# KV cache prefix (all layers)
for layer_kv in entry["kv_cache_prefix"]:
for tensor in layer_kv:
total_bytes += tensor.nbytes
return total_bytes
Multi-Turn Savings with Visual Context Cache
| Turn | Without Cache | With Cache | Savings |
|---|---|---|---|
| Turn 1 (initial query) | Video encode: 2.1s + Prefill: 850ms | Same (no cache yet) | None |
| Turn 2 (follow-up) | Video encode: 2.1s + Prefill: 850ms | Prefill text only: 45ms | 2.9s (97%) |
| Turn 3 (follow-up) | Video encode: 2.1s + Prefill: 850ms | Prefill text only: 52ms | 2.9s (97%) |
| 10 turns total | 29.5s encoding + prefill | 2.95s + 9 * 50ms = 3.4s | 88% |
Cache Invalidation and Memory Pressure
Visual context caches consume substantial GPU memory. For a 1-minute video with 1920 compressed visual tokens and the corresponding KV cache across 80 layers, the total cached state per video is approximately:
- Visual tokens: (FP16, 4096-dim embedding)
- KV cache: (GQA-8, FP16)
- Total per video: approximately 644 MB
With a 100-video cache, that is 64 GB dedicated to visual context alone. On an 8x H100 cluster, this is 10% of total HBM. Cache eviction must balance reuse probability against memory pressure from incoming requests.
def cache_eviction_policy(cache, memory_pressure_pct, min_cache_size=10):
"""Evict visual context caches based on memory pressure."""
if memory_pressure_pct < 80:
return # No eviction needed
# Aggressive eviction when memory is tight
target_evictions = max(
1,
int(len(cache) * (memory_pressure_pct - 80) / 20)
)
# Evict by combined score: LRU + size (prefer evicting large caches)
scored = []
for video_id, entry in cache.items():
recency = time.time() - entry["cached_at"]
size_mb = entry["num_visual_tokens"] * 0.335 # Approximate MB per visual token
score = recency * size_mb # Higher = evict first
scored.append((score, video_id))
scored.sort(reverse=True)
for _, video_id in scored[:target_evictions]:
if len(cache) > min_cache_size:
del cache[video_id]
Combined Video + Audio Serving
Production multimodal LLMs process both video and audio simultaneously:
class MultimodalServer:
"""Serve video + audio LLM requests."""
def __init__(self, model, video_encoder, audio_encoder):
self.model = model
self.video_encoder = video_encoder
self.audio_encoder = audio_encoder
self.visual_cache = VisualContextCache()
async def process_request(self, request):
"""Handle a multimodal request with video and/or audio."""
all_tokens = []
token_type_ids = []
# Process video (if present)
if request.video_url:
video_tokens = await self._encode_video(request)
all_tokens.append(video_tokens)
token_type_ids.extend(["video"] * video_tokens.shape[0])
# Process audio (if present)
if request.audio_url:
audio_tokens = await self._encode_audio(request)
all_tokens.append(audio_tokens)
token_type_ids.extend(["audio"] * audio_tokens.shape[0])
# Process text
text_tokens = self.model.tokenizer.encode(request.text_prompt)
all_tokens.append(
self.model.embed_tokens(
torch.tensor(text_tokens, device="cuda")
)
)
token_type_ids.extend(["text"] * len(text_tokens))
# Concatenate all modalities
combined = torch.cat(all_tokens, dim=0).unsqueeze(0)
# Forward pass
output = self.model.generate(
inputs_embeds=combined,
max_new_tokens=request.max_tokens,
)
return output
async def _encode_video(self, request):
"""Encode video with caching support."""
video_id = self._hash_video(request.video_url)
cached = self.visual_cache.get_context(video_id)
if cached:
return cached["visual_tokens"]
# Full encoding pipeline
frames = self.video_encoder.sample_frames(request.video_url)
vit_tokens = self.video_encoder.encode_frames(frames)
pooled = self.video_encoder.spatial_pool(vit_tokens)
compressed = self.video_encoder.temporal_compress(pooled)
# Cache for future turns
self.visual_cache.cache_visual_context(
video_id, compressed, kv_cache_prefix=None
)
return compressed
Latency Budget: Video + Audio + Text
End-to-End Latency: 1-min Video + Audio Query
| Metric | Frame extract | ViT encode | Spatial pool | Temporal compress | Audio encode | LLM prefill | Sampling | Total TTFT |
|---|---|---|---|---|---|---|---|---|
| Time (ms) |
Latency Budget: Different Video Lengths
| Video Duration | Visual Tokens | Encode Time | Prefill Time | Total TTFT | KV Cache |
|---|---|---|---|---|---|
| 10 seconds | 320 | 180ms | 85ms | 380ms | |
| 1 minute | 1920 | 650ms | 320ms | 1.2s | |
| 5 minutes | 9600 | 2.8s | 1.4s | 4.8s | |
| 30 minutes | 57600 | 16s | 8.2s | 28s | |
| 2 hours | 230400 | 62s | Chunked | Chunked |
Beyond 5 minutes of video, the combined visual+text token count approaches or exceeds the 32K-128K context windows of current models. Chunked processing becomes mandatory. For a 2-hour video, even heavily compressed tokens (1920 tokens/minute) produce 230K tokens, requiring either extreme compression (losing temporal detail), chunked processing (losing global context), or 256K+ context length models.
Optimization: Parallel Encoding
Video and audio encoding can run in parallel since they are independent:
import asyncio
class ParallelMultimodalEncoder:
"""Encode video and audio in parallel to minimize TTFT."""
def __init__(self, video_encoder, audio_encoder):
self.video_enc = video_encoder
self.audio_enc = audio_encoder
async def encode_parallel(self, video_path, audio_path):
"""Encode video and audio simultaneously on different GPU streams."""
video_stream = torch.cuda.Stream()
audio_stream = torch.cuda.Stream()
# Launch both encodings concurrently
async def encode_video():
with torch.cuda.stream(video_stream):
frames = self.video_enc.sample_frames(video_path)
tokens = self.video_enc.encode_frames(frames)
pooled = self.video_enc.spatial_pool(tokens)
compressed = self.video_enc.temporal_compress(pooled)
video_stream.synchronize()
return compressed
async def encode_audio():
with torch.cuda.stream(audio_stream):
import torchaudio
waveform, sr = torchaudio.load(audio_path)
if sr != 16000:
waveform = torchaudio.functional.resample(waveform, sr, 16000)
tokens = self.audio_enc.encode_audio(waveform.squeeze())
audio_stream.synchronize()
return tokens
# Run in parallel
video_tokens, audio_tokens = await asyncio.gather(
encode_video(), encode_audio()
)
return video_tokens, audio_tokens
With parallel encoding, the TTFT for a 1-minute video+audio request drops from sequential to parallel , saving 180ms (16%).
Adaptive Frame Sampling Based on Scene Complexity
Static scenes (a person sitting at a desk talking) need far fewer frames than dynamic scenes (a basketball game, a car chase). Adaptive sampling adjusts the frame rate based on detected scene changes:
import torch
import numpy as np
class AdaptiveFrameSampler:
"""Sample more frames from high-motion segments, fewer from static ones."""
def __init__(self, min_fps=0.5, max_fps=4.0, total_budget=128,
motion_threshold=0.15):
self.min_fps = min_fps
self.max_fps = max_fps
self.total_budget = total_budget
self.motion_threshold = motion_threshold
def compute_motion_scores(self, frames):
"""Compute per-frame motion score using pixel difference."""
scores = [0.0] # First frame has no reference
for i in range(1, len(frames)):
# Normalized L1 distance between consecutive frames
diff = np.abs(
frames[i].astype(float) - frames[i-1].astype(float)
).mean() / 255.0
scores.append(diff)
return np.array(scores)
def sample(self, frames, source_fps=30.0):
"""Adaptively sample frames based on motion."""
motion = self.compute_motion_scores(frames)
# Divide video into 1-second segments
frames_per_sec = int(source_fps)
num_segments = len(frames) // frames_per_sec
# Compute per-segment motion score
segment_motion = []
for s in range(num_segments):
start = s * frames_per_sec
end = start + frames_per_sec
segment_motion.append(motion[start:end].mean())
segment_motion = np.array(segment_motion)
# Allocate frames proportional to motion
# High motion segments get more frames (up to max_fps)
# Low motion segments get fewer (down to min_fps)
if segment_motion.sum() == 0:
allocation = np.ones(num_segments)
else:
allocation = segment_motion / segment_motion.sum()
frames_per_segment = np.clip(
(allocation * self.total_budget).astype(int),
int(self.min_fps),
int(self.max_fps),
)
# Adjust to exactly hit budget
while frames_per_segment.sum() > self.total_budget:
idx = frames_per_segment.argmax()
frames_per_segment[idx] -= 1
while frames_per_segment.sum() < self.total_budget:
idx = frames_per_segment.argmin()
frames_per_segment[idx] += 1
# Select frame indices
selected = []
for s in range(num_segments):
start = s * frames_per_sec
end = start + frames_per_sec
n = frames_per_segment[s]
indices = np.linspace(start, end - 1, n, dtype=int)
selected.extend(indices.tolist())
return selected
Adaptive vs Uniform Sampling: Quality on VideoQA Benchmarks
| Method | Frames Used | ActivityNet Acc | NextQA Acc | Tokens |
|---|---|---|---|---|
| Uniform 1fps | 60 | 52.3% | 58.1% | 3840 |
| Uniform 2fps | 120 | 55.8% | 61.4% | 7680 |
| Adaptive (budget=60) | 60 | 56.2% | 62.0% | 3840 |
| Adaptive (budget=120) | 120 | 58.1% | 63.8% | 7680 |
Adaptive sampling with a 60-frame budget matches or exceeds uniform 2fps sampling with double the frames, because it allocates more frames to high-information segments (action sequences, scene transitions) and fewer to static segments.
KV Cache Management for Multi-Modal Requests
Multi-modal requests create heterogeneous KV cache entries: visual tokens have different reuse patterns than text tokens. In multi-turn conversations, visual KV cache should persist across turns (the video does not change between questions), while text KV cache grows with each new turn.
class MultiModalKVCacheManager:
"""Manage KV cache with awareness of modality-specific patterns."""
def __init__(self, block_manager, max_visual_cache_gb=20):
self.block_manager = block_manager
self.max_visual_cache = max_visual_cache_gb
self.visual_cache_registry = {} # video_hash -> block_ids
def allocate_request(self, request):
"""Allocate KV cache blocks with modality awareness."""
visual_blocks = 0
text_blocks = 0
if request.has_video:
video_hash = request.video_hash
if video_hash in self.visual_cache_registry:
# Reuse existing visual KV cache blocks
visual_block_ids = self.visual_cache_registry[video_hash]
# Only allocate new blocks for text portion
text_blocks = self._allocate_text_blocks(request)
return visual_block_ids, text_blocks
else:
# Allocate new visual + text blocks
visual_blocks = self._allocate_visual_blocks(request)
self.visual_cache_registry[video_hash] = visual_blocks
text_blocks = self._allocate_text_blocks(request)
return visual_blocks, text_blocks
def evict_visual_cache(self):
"""Evict least-recently-used visual KV cache when memory is tight."""
# Visual KV cache eviction uses LRU across video hashes
# This is separate from text KV cache eviction
if self._visual_cache_usage_gb() > self.max_visual_cache:
oldest_hash = min(
self.visual_cache_registry,
key=lambda h: self.visual_cache_registry[h].last_access
)
blocks = self.visual_cache_registry.pop(oldest_hash)
self.block_manager.free_blocks(blocks)
Serving Cost Analysis
The encoding overhead for multi-modal models significantly impacts the cost per request compared to text-only models:
Cost per Request: Text-Only vs Video+Audio (Llama 70B, H100)
| Metric | Text only (512 in, 256 out) | Image (1 frame + text) | Short video (10s) | Long video (1min) | Video + Audio (1min) |
|---|---|---|---|---|---|
| GPU-seconds per request |
For production systems, the video LLM serving stack is defined by token budgets. Every design decision โ frame sampling rate, spatial pooling ratio, temporal compression stride, context window allocation between modalities โ is a tradeoff between information fidelity and computational cost. The systems that win in production are those that compress aggressively where information is redundant (static backgrounds, repeated frames) and preserve detail where it matters (scene changes, actions, speech segments). Adaptive sampling, visual KV cache reuse, and parallel multi-modal encoding are the three primary levers for making video+audio LLM serving economically viable.