PyTorch在Linux环境下的并行计算能力
PyTorch作为灵活的深度学习框架,在Linux系统上提供了多线程数据加载、多GPU数据并行(DataParallel
)、多GPU/多节点分布式并行(DistributedDataParallel
,简称DDP)等多层次的并行计算支持,能有效提升模型训练与推理效率。
数据加载是深度学习流程中的常见瓶颈,PyTorch通过torch.utils.data.DataLoader
类实现多线程数据加载。通过设置num_workers
参数(指定用于数据加载的子进程数量),可并行处理数据读取、预处理(如图像缩放、归一化)等I/O密集型任务,减少GPU等待时间。例如:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 定义数据预处理
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
# 使用4个子进程加载数据
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
注意:num_workers
需根据CPU核心数调整(通常设置为CPU核心数的1-2倍),避免过多进程导致系统资源竞争。
torch.nn.DataParallel
是PyTorch提供的简单多GPU并行方案,它会自动将模型复制到所有指定的GPU上,输入数据分割到各GPU进行前向/反向传播,最后聚合梯度更新模型参数。使用步骤如下:
CUDA_VISIBLE_DEVICES
环境变量指定可用的GPU(如export CUDA_VISIBLE_DEVICES=0,1
表示仅使用GPU 0和1);DataParallel
包装(需指定device_ids
参数);DataParallel
会自动处理数据分发与梯度聚合。import torch
import torch.nn as nn
# 定义模型
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 10)
def forward(self, x):
return self.fc(x)
# 移动模型到GPU并包装
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleModel().to(device)
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs!")
model = nn.DataParallel(model, device_ids=[0, 1]) # 使用GPU 0和1
注意:DataParallel
适合快速验证多GPU效果,但在大规模训练中,DistributedDataParallel
(DDP)的性能更优。
DistributedDataParallel
(DDP)是PyTorch推荐的分布式训练方案,支持多GPU(单机多卡)和多节点(多机多卡)训练,通过进程级并行(每个GPU对应一个独立进程)和通信重叠计算、梯度分桶等技术,实现接近线性的加速比(如256个GPU的加速比可达0.9以上)。
torch.distributed.init_process_group
函数,指定后端(推荐nccl
,适合GPU训练)、初始化方法(如tcp://master_ip:port
)、world_size
(总进程数=节点数×每个节点的GPU数)、rank
(当前进程的全局排名);model.to(rank)
),用DDP
包装(device_ids=[rank]
);torch.utils.data.distributed.DistributedSampler
确保每个进程处理不同的数据子集(需设置num_replicas=world_size
、rank=rank
);torch.distributed.launch
工具或accelerate
库启动,自动管理进程启动与同步。import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import torchvision.datasets as datasets
import torchvision.transforms as transforms
def main(rank, world_size):
# 初始化分布式环境(nccl后端适合GPU)
torch.distributed.init_process_group(
backend='nccl',
init_method='tcp://127.0.0.1:12345', # 主节点IP和端口
world_size=world_size,
rank=rank
)
# 创建模型并移动到对应GPU
model = nn.Sequential(
nn.Linear(784, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
).to(rank)
ddp_model = DDP(model, device_ids=[rank])
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(rank)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
# 加载数据(使用DistributedSampler)
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
loader = DataLoader(dataset, batch_size=64, sampler=sampler)
# 训练循环
for epoch in range(5):
sampler.set_epoch(epoch) # 确保每个epoch数据打乱顺序不同
running_loss = 0.0
for data, target in loader:
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = ddp_model(data.view(data.size(0), -1))
loss = criterion(output, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f'Rank {rank}, Epoch {epoch}, Loss: {running_loss/len(loader)}')
# 清理分布式环境
torch.distributed.destroy_process_group()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--world_size', type=int, default=2, help='Number of GPUs')
parser.add_argument('--rank', type=int, default=0, help='Current GPU rank')
args = parser.parse_args()
main(args.rank, args.world_size)
启动命令(单机2卡):
python -m torch.distributed.launch --nproc_per_node=2 your_script.py
nccl
后端(NVIDIA集体通信库),其性能优于gloo
;DataLoader
的num_workers
参数,使用pin_memory=True
(将数据固定在内存中,加速GPU传输);bucket_cap_mb
参数调整梯度分桶大小(如bucket_cap_mb=25
),减少通信次数。