linux

PyTorch在Linux的并行计算能力

小樊
37
2025-09-29 14:02:10
栏目: 智能运维

PyTorch在Linux环境下的并行计算能力
PyTorch作为灵活的深度学习框架,在Linux系统上提供了多线程数据加载多GPU数据并行DataParallel)、多GPU/多节点分布式并行DistributedDataParallel,简称DDP)等多层次的并行计算支持,能有效提升模型训练与推理效率。

1. 多线程数据加载:解决I/O瓶颈

数据加载是深度学习流程中的常见瓶颈,PyTorch通过torch.utils.data.DataLoader类实现多线程数据加载。通过设置num_workers参数(指定用于数据加载的子进程数量),可并行处理数据读取、预处理(如图像缩放、归一化)等I/O密集型任务,减少GPU等待时间。例如:

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# 定义数据预处理
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)

# 使用4个子进程加载数据
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)

注意:num_workers需根据CPU核心数调整(通常设置为CPU核心数的1-2倍),避免过多进程导致系统资源竞争。

2. 多GPU数据并行:快速上手多卡训练

torch.nn.DataParallel是PyTorch提供的简单多GPU并行方案,它会自动将模型复制到所有指定的GPU上,输入数据分割到各GPU进行前向/反向传播,最后聚合梯度更新模型参数。使用步骤如下:

import torch
import torch.nn as nn

# 定义模型
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(10, 10)
    def forward(self, x):
        return self.fc(x)

# 移动模型到GPU并包装
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleModel().to(device)
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs!")
    model = nn.DataParallel(model, device_ids=[0, 1])  # 使用GPU 0和1

注意:DataParallel适合快速验证多GPU效果,但在大规模训练中,DistributedDataParallel(DDP)的性能更优。

3. 多GPU/多节点分布式并行:高性能扩展方案

DistributedDataParallel(DDP)是PyTorch推荐的分布式训练方案,支持多GPU(单机多卡)和多节点(多机多卡)训练,通过进程级并行(每个GPU对应一个独立进程)和通信重叠计算梯度分桶等技术,实现接近线性的加速比(如256个GPU的加速比可达0.9以上)。

关键步骤

示例代码(单机多卡)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import torchvision.datasets as datasets
import torchvision.transforms as transforms

def main(rank, world_size):
    # 初始化分布式环境(nccl后端适合GPU)
    torch.distributed.init_process_group(
        backend='nccl',
        init_method='tcp://127.0.0.1:12345',  # 主节点IP和端口
        world_size=world_size,
        rank=rank
    )
    
    # 创建模型并移动到对应GPU
    model = nn.Sequential(
        nn.Linear(784, 1024),
        nn.ReLU(),
        nn.Linear(1024, 10)
    ).to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss().to(rank)
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
    
    # 加载数据(使用DistributedSampler)
    transform = transforms.Compose([transforms.ToTensor()])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    loader = DataLoader(dataset, batch_size=64, sampler=sampler)
    
    # 训练循环
    for epoch in range(5):
        sampler.set_epoch(epoch)  # 确保每个epoch数据打乱顺序不同
        running_loss = 0.0
        for data, target in loader:
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = ddp_model(data.view(data.size(0), -1))
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Rank {rank}, Epoch {epoch}, Loss: {running_loss/len(loader)}')
    
    # 清理分布式环境
    torch.distributed.destroy_process_group()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--world_size', type=int, default=2, help='Number of GPUs')
    parser.add_argument('--rank', type=int, default=0, help='Current GPU rank')
    args = parser.parse_args()
    main(args.rank, args.world_size)

启动命令(单机2卡):

python -m torch.distributed.launch --nproc_per_node=2 your_script.py

4. 并行计算的性能优化技巧

0
看了该问题的人还看了