在CentOS上实现PyTorch的并行计算,通常涉及以下几个方面:
多GPU并行:使用PyTorch的DataParallel
或DistributedDataParallel
模块来在多个GPU上进行并行计算。
多节点并行:使用PyTorch的分布式包(torch.distributed
)来实现跨多个节点的并行计算。
下面是一些基本的步骤和示例代码:
DataParallel
import torch
import torch.nn as nn
from torchvision import models
# 定义模型
model = models.resnet50(pretrained=True)
# 检查是否有可用的GPU
if torch.cuda.device_count() > 1:
print(f"Let's use {torch.cuda.device_count()} GPUs!")
# 包装模型
model = nn.DataParallel(model)
# 将模型发送到GPU
model.to('cuda')
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 训练模型
for epoch in range(num_epochs):
for inputs, labels in dataloader:
inputs, labels = inputs.to('cuda'), labels.to('cuda')
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
DistributedDataParallel
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms
# 初始化分布式环境
dist.init_process_group(backend='nccl')
# 定义模型
model = models.resnet50(pretrained=True).to(torch.device("cuda"))
# 包装模型
model = DDP(model)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(torch.device("cuda"))
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 数据加载
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
train_sampler = DistributedSampler(train_dataset)
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
# 训练模型
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch)
for inputs, labels in train_loader:
inputs, labels = inputs.to(torch.device("cuda")), labels.to(torch.device("cuda"))
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
多节点并行通常涉及更复杂的设置,包括网络配置、IP地址和端口设置等。以下是一个简单的示例:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'master_ip'
os.environ['MASTER_PORT'] = '12345'
dist.init_process_group(backend='nccl', init_method='env://', world_size=world_size, rank=rank)
def cleanup():
dist.destroy_process_group()
def train(rank, world_size):
setup(rank, world_size)
# 定义模型
model = models.resnet50(pretrained=True).to(torch.device(f"cuda:{rank}"))
# 包装模型
model = DDP(model, device_ids=[rank])
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(torch.device(f"cuda:{rank}"))
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# 数据加载
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
train_sampler = DistributedSampler(train_dataset)
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
# 训练模型
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch)
for inputs, labels in train_loader:
inputs, labels = inputs.to(torch.device(f"cuda:{rank}")), labels.to(torch.device(f"cuda:{rank}"))
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
world_size = 4 # 总共的节点数
mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
在这个示例中,mp.spawn
用于启动多个进程,每个进程对应一个节点。你需要根据实际情况设置MASTER_ADDR
和MASTER_PORT
。
MASTER_ADDR
、MASTER_PORT
和其他相关环境变量。DataParallel
或DistributedDataParallel
。torch.distributed
包。通过这些步骤,你可以在CentOS上实现PyTorch的并行计算。