Text-only LLM serving routes requests based on two factors: KV cache locality and queue depth. Multimodal requests break this model. A video request requires encoding 30 frames through a ViT, producing 17,000+ visual tokens before the LLM can begin. An audio request needs Whisper-style preprocessing that is CPU-bound. These preprocessing stages have different hardware affinities and latencies than the LLM decode phase.
Dynamo extends its routing framework to handle multimodal workloads by introducing modality-aware routing, dedicated encoder pools, and scheduling algorithms that overlap encoding with LLM execution.
The Multimodal Routing Problem
Why Text Routing Fails for Multimodal
Standard Dynamo routing optimizes for:
For multimodal requests, the TTFT equation becomes:
The encoding term dominates for video and audio.
Encoding Costs by Modality
| Modality | Input | Encoder | Encode Time | Visual Tokens | Token Equiv. |
|---|---|---|---|---|---|
| Text only | 1K tokens | None | 0ms | 0 | 1K |
| Single image | 336x336 | ViT-L/14 | 5ms | 576 | 1.6K |
| 4 images | 336x336 each | ViT-L/14 | 12ms (batched) | 2,304 | 3.3K |
| 10s video @ 3fps | 30 frames | ViT-L/14 | 48ms (batched) | 17,280 | 18.3K |
| 30s audio | WAV 16kHz | Whisper-small | 85ms | 1,500 | 2.5K |
| 10s video + audio | 30 frames + audio | ViT + Whisper | 120ms | 18,780 | 19.8K |
Encoder Pool Architecture
Dedicated Encoder GPUs
Dynamo allocates a pool of GPUs specifically for media encoding:
class EncoderPoolManager:
"""Manage a pool of GPUs dedicated to media encoding."""
def __init__(self, encoder_gpu_ids, encoder_configs):
self.encoders = {}
for gpu_id in encoder_gpu_ids:
self.encoders[gpu_id] = EncoderWorker(
gpu_id=gpu_id,
models={
"vision": load_vit(encoder_configs["vision"], device=f"cuda:{gpu_id}"),
"audio": load_whisper(encoder_configs["audio"], device=f"cuda:{gpu_id}"),
},
max_batch_encode=16,
)
self.gpu_loads = {gpu_id: 0 for gpu_id in encoder_gpu_ids}
def submit_encoding(self, request):
encoder_gpu = min(self.gpu_loads, key=self.gpu_loads.get)
encode_cost = self._estimate_cost(request)
self.gpu_loads[encoder_gpu] += encode_cost
future = self.encoders[encoder_gpu].encode_async(request)
return EncodingHandle(future=future, encoder_gpu=encoder_gpu, estimated_time=encode_cost)
def _estimate_cost(self, request):
cost = 0
for media_item in request.media:
if media_item.type == "image":
cost += 5
elif media_item.type == "video":
cost += media_item.num_frames * 1.6
elif media_item.type == "audio":
cost += media_item.duration_seconds * 2.8
return cost
Encoder Worker
Each encoder worker batches and processes encoding requests:
class EncoderWorker:
"""Encoder worker running on a dedicated GPU."""
def __init__(self, gpu_id, models, max_batch_encode):
self.gpu_id = gpu_id
self.models = models
self.max_batch = max_batch_encode
self.pending_queue = []
self.lock = threading.Lock()
def encode_async(self, request):
future = concurrent.futures.Future()
with self.lock:
self.pending_queue.append((request, future))
return future
def run_batch_loop(self):
while True:
with self.lock:
if not self.pending_queue:
time.sleep(0.001)
continue
batch = self.pending_queue[:self.max_batch]
self.pending_queue = self.pending_queue[self.max_batch:]
image_batch, video_batch, audio_batch = [], [], []
for request, future in batch:
for media in request.media:
if media.type == "image":
image_batch.append((media, request, future))
elif media.type == "video":
video_batch.append((media, request, future))
elif media.type == "audio":
audio_batch.append((media, request, future))
if image_batch:
self._encode_images(image_batch)
if video_batch:
self._encode_videos(video_batch)
if audio_batch:
self._encode_audio(audio_batch)
def _encode_images(self, batch):
pixel_values = torch.stack([
preprocess_image(item.data) for item, _, _ in batch
]).to(f"cuda:{self.gpu_id}", dtype=torch.float16)
with torch.no_grad():
features = self.models["vision"](pixel_values)
for i, (media, request, future) in enumerate(batch):
media.encoded_features = features[i].detach()
def _encode_videos(self, batch):
for media, request, future in batch:
frames = extract_frames(media.data, fps=media.target_fps)
pixel_values = torch.stack([
preprocess_image(f) for f in frames
]).to(f"cuda:{self.gpu_id}", dtype=torch.float16)
with torch.no_grad():
all_features = []
for i in range(0, len(pixel_values), self.max_batch):
chunk = pixel_values[i:i + self.max_batch]
features = self.models["vision"](chunk)
all_features.append(features)
features = torch.cat(all_features, dim=0)
media.encoded_features = features.detach()
Modality-Aware Routing
The Routing Decision
The router selects both an encoder GPU and an LLM GPU:
class MultimodalRouter:
def __init__(self, encoder_pool, llm_workers, kv_cache_index):
self.encoder_pool = encoder_pool
self.llm_workers = llm_workers
self.kv_cache_index = kv_cache_index
def route(self, request):
has_media = len(request.media) > 0
if not has_media:
return self._route_text_only(request)
encode_cost = self.encoder_pool._estimate_cost(request)
text_hash = hash_prompt_prefix(request.text_tokens)
cache_overlaps = self.kv_cache_index.get_overlaps(text_hash)
best_worker = None
best_score = float("inf")
for worker in self.llm_workers:
overlap = cache_overlaps.get(worker.id, 0)
uncached_tokens = request.total_tokens - overlap
total_ttft = (
encode_cost +
self._estimate_transfer(request, worker) +
uncached_tokens / worker.prefill_throughput +
worker.current_queue_depth * worker.avg_step_time
)
if total_ttft < best_score:
best_score = total_ttft
best_worker = worker
encoder_handle = self._select_encoder(request, best_worker)
return RoutingDecision(
encoder_gpu=encoder_handle,
llm_worker=best_worker,
estimated_ttft=best_score,
)
Encoding-Prefill Overlap
Start prefill of the text portion while the encoder processes the media:
class OverlappedPipeline:
def execute_request(self, request, encoder_gpu, llm_worker):
first_image_pos = find_first_media_token(request.text_tokens)
encode_future = self.encoder_pool.submit_encoding(request)
if first_image_pos > 0:
text_prefix = request.text_tokens[:first_image_pos]
llm_worker.prefill_partial(text_prefix)
visual_tokens = encode_future.result()
remaining_tokens = request.text_tokens[first_image_pos:]
llm_worker.prefill_with_visual(visual_tokens, remaining_tokens)
return llm_worker.start_decode(request)
TTFT: Sequential vs. Overlapped Pipeline (30-frame video, Llama 70B)
(ms TTFT)Video-Specific Handling
Frame Extraction Strategies
class VideoFrameExtractor:
def __init__(self, strategy="uniform", max_frames=32):
self.strategy = strategy
self.max_frames = max_frames
def extract(self, video_bytes, target_fps=None):
import cv2
video = cv2.VideoCapture(io.BytesIO(video_bytes))
native_fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if self.strategy == "uniform":
frame_indices = self._uniform_sample(total_frames)
elif self.strategy == "keyframe":
frame_indices = self._keyframe_sample(video, total_frames)
elif self.strategy == "fps":
sample_fps = target_fps or 2
step = max(1, int(native_fps / sample_fps))
frame_indices = list(range(0, total_frames, step))
if len(frame_indices) > self.max_frames:
step = len(frame_indices) / self.max_frames
frame_indices = [frame_indices[int(i * step)] for i in range(self.max_frames)]
frames = []
for idx in frame_indices:
video.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = video.read()
if ret:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(Image.fromarray(frame))
video.release()
return frames, frame_indices
def _uniform_sample(self, total_frames):
if total_frames <= self.max_frames:
return list(range(total_frames))
step = total_frames / self.max_frames
return [int(i * step) for i in range(self.max_frames)]
def _keyframe_sample(self, video, total_frames):
keyframes = [0]
prev_frame = None
for idx in range(0, total_frames, max(1, total_frames // 200)):
video.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = video.read()
if not ret:
continue
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame_small = cv2.resize(frame_gray, (64, 64))
if prev_frame is not None:
diff = np.mean(np.abs(frame_small.astype(float) - prev_frame.astype(float)))
if diff > 30:
keyframes.append(idx)
prev_frame = frame_small
return keyframes
Video Token Budget
class VideoTokenBudget:
def __init__(self, max_visual_tokens=8192, model_max_len=32768):
self.max_visual_tokens = max_visual_tokens
self.model_max_len = model_max_len
def compute_budget(self, request):
text_tokens = len(request.text_tokens)
output_budget = request.max_output_tokens
available = self.model_max_len - text_tokens - output_budget
return min(available, self.max_visual_tokens)
def select_frame_count(self, budget, patches_per_frame=576):
return max(1, budget // patches_per_frame)
A 30-second video at 2 fps with 576 patches per frame produces visual tokens. At a context window of 32K, this exceeds the modelβs capacity. Frame selection, token compression, or temporal pooling is mandatory for long videos.
Audio Preprocessing
Whisper-Style Feature Extraction
class AudioPreprocessor:
def __init__(self, config):
self.sample_rate = config.audio_sample_rate
self.n_mels = config.n_mels
self.chunk_length = config.chunk_length
self.hop_length = config.hop_length
def preprocess(self, audio_bytes):
import torchaudio
waveform, sr = torchaudio.load(io.BytesIO(audio_bytes))
if sr != self.sample_rate:
resampler = torchaudio.transforms.Resample(sr, self.sample_rate)
waveform = resampler(waveform)
if waveform.shape[0] > 1:
waveform = waveform.mean(dim=0, keepdim=True)
mel_transform = torchaudio.transforms.MelSpectrogram(
sample_rate=self.sample_rate,
n_fft=400,
hop_length=self.hop_length,
n_mels=self.n_mels,
)
mel = mel_transform(waveform)
mel = torch.clamp(mel, min=1e-10).log10()
mel = (mel + 4.0) / 4.0
max_frames = self.chunk_length * self.sample_rate // self.hop_length
if mel.shape[-1] < max_frames:
mel = torch.nn.functional.pad(mel, (0, max_frames - mel.shape[-1]))
else:
mel = mel[:, :, :max_frames]
return mel
Scheduling Mixed-Modality Batches
class MultimodalBatchScheduler:
def __init__(self, max_batch_tokens, max_batch_size):
self.max_tokens = max_batch_tokens
self.max_batch = max_batch_size
def form_batch(self, waiting_queue):
batch = []
total_tokens = 0
sorted_queue = sorted(
waiting_queue,
key=lambda r: (0 if r.encoding_complete else 1, r.arrival_time)
)
for request in sorted_queue:
if len(batch) >= self.max_batch:
break
if not request.encoding_complete and request.has_media:
continue
req_tokens = self._effective_tokens(request)
if total_tokens + req_tokens > self.max_tokens:
continue
batch.append(request)
total_tokens += req_tokens
return batch
def _effective_tokens(self, request):
text_tokens = len(request.text_tokens)
visual_tokens = request.num_visual_tokens if request.has_media else 0
audio_tokens = request.num_audio_tokens if request.has_audio else 0
return text_tokens + visual_tokens + audio_tokens
End-to-End Performance
Multimodal TTFT by Request Type (Llama 3.2 Vision 70B, 8xH100)
(ms TTFT)Complete Multimodal Routing System
class DynamoMultimodalRouter:
"""Complete multimodal routing system for Dynamo."""
def __init__(self, config):
self.encoder_pool = EncoderPoolManager(
encoder_gpu_ids=config.encoder_gpus,
encoder_configs=config.encoder_configs,
)
self.llm_workers = [
LLMWorker(gpu_ids=gpus, model=config.model_name)
for gpus in config.llm_gpu_groups
]
self.kv_index = KVCacheIndex()
def handle_request(self, request):
modality = self._classify_modality(request)
if modality == "text":
worker = self._route_text(request)
return worker.execute(request)
routing = self._joint_route(request, modality)
encode_handle = self.encoder_pool.submit_encoding(request)
if routing.can_overlap:
routing.llm_worker.prefill_text_prefix(
request.text_tokens[:routing.overlap_boundary]
)
visual_tokens = encode_handle.result()
routing.llm_worker.complete_prefill(request, visual_tokens)
return routing.llm_worker.decode_to_completion(request)
def _classify_modality(self, request):
has_video = any(m.type == "video" for m in request.media)
has_audio = any(m.type == "audio" for m in request.media)
has_image = any(m.type == "image" for m in request.media)
if has_video:
return "video"
elif has_audio:
return "audio"
elif has_image:
return "image"
return "text"
def _joint_route(self, request, modality):
best_combo = None
best_ttft = float("inf")
for encoder_gpu in self.encoder_pool.encoders:
encode_time = (
self.encoder_pool._estimate_cost(request) +
self.encoder_pool.gpu_loads[encoder_gpu]
)
for worker in self.llm_workers:
transfer = self._transfer_time(encoder_gpu, worker)
prefill = request.total_tokens / worker.prefill_throughput
queue = worker.queue_time_estimate()
ttft = encode_time + transfer + prefill + queue
if ttft < best_ttft:
best_ttft = ttft
overlap_boundary = self._find_overlap_boundary(request)
best_combo = RoutingDecision(
encoder_gpu=encoder_gpu,
llm_worker=worker,
estimated_ttft=ttft,
can_overlap=overlap_boundary > 0,
overlap_boundary=overlap_boundary,
)
return best_combo
def _find_overlap_boundary(self, request):
for i, token in enumerate(request.text_tokens):
if token in (IMAGE_TOKEN, VIDEO_TOKEN, AUDIO_TOKEN):
return i
return len(request.text_tokens)
def _transfer_time(self, encoder_gpu, worker):
same_node = encoder_gpu // 8 == worker.primary_gpu // 8
bandwidth = 600e9 if same_node else 50e9
bytes_to_transfer = 18000 * 4096 * 2
return bytes_to_transfer / bandwidth