快捷方式

並行影片解碼:多程序與多執行緒

在本教程中,我們將探討並行解碼大量影片幀的不同方法。我們將比較三種並行策略:

  1. 基於 FFmpeg 的並行:使用 FFmpeg 的內部執行緒功能

  2. Joblib 多程序:將工作分配給多個程序

  3. Joblib 多執行緒:在單個程序中使用多個執行緒

我們將使用 joblib 進行並行處理,因為它提供了非常方便的 API 來將工作分配到多個程序或執行緒。但這只是 Python 中並行處理工作的一種方式。你絕對可以使用其他執行緒或程序池管理器。

讓我們先定義一些用於基準測試和資料處理的實用函式。我們還將下載一個影片並將其重複多次以建立一個更長的版本。這模擬了處理需要高效處理的長影片。你可以忽略這部分,直接跳轉到 幀取樣策略

from typing import List
import torch
import requests
import tempfile
from pathlib import Path
import subprocess
from time import perf_counter_ns
from datetime import timedelta

from joblib import Parallel, delayed, cpu_count
from torchcodec.decoders import VideoDecoder


def bench(f, *args, num_exp=3, warmup=1, **kwargs):
    """Benchmark a function by running it multiple times and measuring execution time."""
    for _ in range(warmup):
        f(*args, **kwargs)

    times = []
    for _ in range(num_exp):
        start = perf_counter_ns()
        result = f(*args, **kwargs)
        end = perf_counter_ns()
        times.append(end - start)

    return torch.tensor(times).float(), result


def report_stats(times, unit="s"):
    """Report median and standard deviation of benchmark times."""
    mul = {
        "ns": 1,
        "µs": 1e-3,
        "ms": 1e-6,
        "s": 1e-9,
    }[unit]
    times = times * mul
    std = times.std().item()
    med = times.median().item()
    print(f"median = {med:.2f}{unit} ± {std:.2f}")
    return med


def split_indices(indices: List[int], num_chunks: int) -> List[List[int]]:
    """Split a list of indices into approximately equal chunks."""
    chunk_size = len(indices) // num_chunks
    chunks = []

    for i in range(num_chunks - 1):
        chunks.append(indices[i * chunk_size:(i + 1) * chunk_size])

    # Last chunk may be slightly larger
    chunks.append(indices[(num_chunks - 1) * chunk_size:])
    return chunks


def generate_long_video(temp_dir: str):
    # Video source: https://www.pexels.com/video/dog-eating-854132/
    # License: CC0. Author: Coverr.
    url = "https://videos.pexels.com/video-files/854132/854132-sd_640_360_25fps.mp4"
    response = requests.get(url, headers={"User-Agent": ""})
    if response.status_code != 200:
        raise RuntimeError(f"Failed to download video. {response.status_code = }.")

    short_video_path = Path(temp_dir) / "short_video.mp4"
    with open(short_video_path, 'wb') as f:
        for chunk in response.iter_content():
            f.write(chunk)

    # Create a longer video by repeating the short one 50 times
    long_video_path = Path(temp_dir) / "long_video.mp4"
    ffmpeg_command = [
        "ffmpeg", "-y",
        "-stream_loop", "49",  # repeat video 50 times
        "-i", str(short_video_path),
        "-c", "copy",
        str(long_video_path)
    ]
    subprocess.run(ffmpeg_command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    return short_video_path, long_video_path


temp_dir = tempfile.mkdtemp()
short_video_path, long_video_path = generate_long_video(temp_dir)

decoder = VideoDecoder(long_video_path, seek_mode="approximate")
metadata = decoder.metadata

short_duration = timedelta(seconds=VideoDecoder(short_video_path).metadata.duration_seconds)
long_duration = timedelta(seconds=metadata.duration_seconds)
print(f"Original video duration: {int(short_duration.total_seconds() // 60)}m{int(short_duration.total_seconds() % 60):02d}s")
print(f"Long video duration: {int(long_duration.total_seconds() // 60)}m{int(long_duration.total_seconds() % 60):02d}s")
print(f"Video resolution: {metadata.width}x{metadata.height}")
print(f"Average FPS: {metadata.average_fps:.1f}")
print(f"Total frames: {metadata.num_frames}")
Original video duration: 0m13s
Long video duration: 11m30s
Video resolution: 640x360
Average FPS: 25.0
Total frames: 17250

幀取樣策略

在本教程中,我們將從長影片中每 2 秒取樣一幀。這模擬了一個常見的場景,即你需要處理一部分幀用於 LLM 推理。

TARGET_FPS = 2
step = max(1, round(metadata.average_fps / TARGET_FPS))
all_indices = list(range(0, metadata.num_frames, step))

print(f"Sampling 1 frame every {TARGET_FPS} seconds")
print(f"We'll skip every {step} frames")
print(f"Total frames to decode: {len(all_indices)}")
Sampling 1 frame every 2 seconds
We'll skip every 12 frames
Total frames to decode: 1438

方法 1:順序解碼(基準)

讓我們從順序方法開始作為我們的基準。這會逐幀處理,沒有任何並行化。

def decode_sequentially(indices: List[int], video_path=long_video_path):
    """Decode frames sequentially using a single decoder instance."""
    decoder = VideoDecoder(video_path, seek_mode="approximate")
    return decoder.get_frames_at(indices)


times, result_sequential = bench(decode_sequentially, all_indices)
sequential_time = report_stats(times, unit="s")
median = 14.31s ± 0.02

方法 2:基於 FFmpeg 的並行

FFmpeg 具有內建的多執行緒功能,可以透過 num_ffmpeg_threads 引數進行控制。此方法利用 FFmpeg 內部的多個執行緒來加速解碼操作。

def decode_with_ffmpeg_parallelism(
    indices: List[int],
    num_threads: int,
    video_path=long_video_path
):
    """Decode frames using FFmpeg's internal threading."""
    decoder = VideoDecoder(video_path, num_ffmpeg_threads=num_threads, seek_mode="approximate")
    return decoder.get_frames_at(indices)


NUM_CPUS = cpu_count()

times, result_ffmpeg = bench(decode_with_ffmpeg_parallelism, all_indices, num_threads=NUM_CPUS)
ffmpeg_time = report_stats(times, unit="s")
speedup = sequential_time / ffmpeg_time
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} FFmpeg threads.")
median = 7.09s ± 0.02
Speedup compared to sequential: 2.02x with 16 FFmpeg threads.

方法 3:多程序

基於程序的並行將工作分配給多個 Python 程序。

def decode_with_multiprocessing(
    indices: List[int],
    num_processes: int,
    video_path=long_video_path
):
    """Decode frames using multiple processes with joblib."""
    chunks = split_indices(indices, num_chunks=num_processes)

    # loky is a multi-processing backend for joblib: https://github.com/joblib/loky
    results = Parallel(n_jobs=num_processes, backend="loky", verbose=0)(
        delayed(decode_sequentially)(chunk, video_path) for chunk in chunks
    )

    return torch.cat([frame_batch.data for frame_batch in results], dim=0)


times, result_multiprocessing = bench(decode_with_multiprocessing, all_indices, num_processes=NUM_CPUS)
multiprocessing_time = report_stats(times, unit="s")
speedup = sequential_time / multiprocessing_time
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} processes.")
median = 5.39s ± 0.01
Speedup compared to sequential: 2.65x with 16 processes.

方法 4:Joblib 多執行緒

基於執行緒的並行在單個程序中使用多個執行緒。TorchCodec 會釋放 GIL,因此這可能非常有效。

def decode_with_multithreading(
    indices: List[int],
    num_threads: int,
    video_path=long_video_path
):
    """Decode frames using multiple threads with joblib."""
    chunks = split_indices(indices, num_chunks=num_threads)

    results = Parallel(n_jobs=num_threads, prefer="threads", verbose=0)(
        delayed(decode_sequentially)(chunk, video_path) for chunk in chunks
    )

    # Concatenate results from all threads
    return torch.cat([frame_batch.data for frame_batch in results], dim=0)


times, result_multithreading = bench(decode_with_multithreading, all_indices, num_threads=NUM_CPUS)
multithreading_time = report_stats(times, unit="s")
speedup = sequential_time / multithreading_time
print(f"Speedup compared to sequential: {speedup:.2f}x with {NUM_CPUS} threads.")
median = 1.94s ± 0.01
Speedup compared to sequential: 7.38x with 16 threads.

驗證和正確性檢查

讓我們驗證所有方法是否都產生了相同的結果。

All good!
import shutil
shutil.rmtree(temp_dir)

指令碼總執行時間: (2 分鐘 1.981 秒)

由 Sphinx-Gallery 生成的畫廊

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

為初學者和高階開發者提供深入的教程

檢視教程

資源

查詢開發資源並讓您的問題得到解答

檢視資源