在CentOS上使用PyTorch进行并行计算,通常涉及以下几种方法:
数据并行是最常见的并行计算方式,它将数据分成多个小批次,然后在多个GPU上并行处理这些小批次。
安装PyTorch:确保你已经安装了支持GPU的PyTorch版本。
pip install torch torchvision torchaudio
编写代码:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 定义模型
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = torch.relu(torch.max_pool2d(self.conv1(x), 2))
x = torch.relu(torch.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = torch.relu(self.fc1(x))
x = torch.dropout(x, training=self.training)
x = self.fc2(x)
return torch.log_softmax(x, dim=1)
# 初始化模型并移动到GPU
model = Net()
if torch.cuda.device_count() > 1:
print(f"Let's use {torch.cuda.device_count()} GPUs!")
model = nn.DataParallel(model)
model.cuda()
# 加载数据
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
trainset = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# 训练模型
for epoch in range(10):
running_loss = 0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.cuda(), labels.cuda()
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if i % 100 == 99: # 每100个小批次打印一次
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
running_loss = 0.0
print('Finished Training')
模型并行适用于模型太大,无法放入单个GPU内存的情况。它将模型的不同部分放在不同的GPU上。
import torch
import torch.nn as nn
class ModelParallelNet(nn.Module):
def __init__(self):
super(ModelParallelNet, self).__init__()
self.block1 = nn.Sequential(
nn.Conv2d(1, 10, kernel_size=5).cuda(0),
nn.ReLU().cuda(0),
nn.MaxPool2d(kernel_size=2, stride=2).cuda(0)
)
self.block2 = nn.Sequential(
nn.Conv2d(10, 20, kernel_size=5).cuda(1),
nn.ReLU().cuda(1),
nn.MaxPool2d(kernel_size=2, stride=2).cuda(1)
)
self.fc1 = nn.Linear(320, 50).cuda(0)
self.fc2 = nn.Linear(50, 10).cuda(1)
def forward(self, x):
x = x.cuda(0)
x = self.block1(x)
x = x.cuda(1)
x = self.block2(x)
x = torch.flatten(x, 1).cuda(0)
x = self.fc1(x)
x = torch.relu(x)
x = torch.dropout(x, training=self.training)
x = x.cuda(1)
x = self.fc2(x)
return torch.log_softmax(x, dim=1)
model = ModelParallelNet()
分布式并行适用于多台机器上的多个GPU。它使用torch.distributed
包来实现。
设置环境变量:
export MASTER_ADDR='localhost'
export MASTER_PORT='12345'
编写代码:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms
def main(rank, world_size):
torch.manual_seed(1234)
torch.cuda.set_device(rank)
# 初始化进程组
torch.distributed.init_process_group(backend='nccl', init_method='env://')
# 定义模型并移动到GPU
model = Net().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# 加载数据
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
dataset = datasets.MNIST('~/.pytorch/MNIST_data/', download=True, train=True, transform=transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
trainloader = DataLoader(dataset, batch_size=64, sampler=sampler)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(rank)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
# 训练模型
for epoch in range(10):
sampler.set_epoch(epoch)
running_loss = 0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.to(rank), labels.to(rank)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
if i % 100 == 99:
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}')
running_loss = 0.0
print('Finished Training')
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size,), nprocs=world_size, join=True)
根据你的具体需求和硬件配置,选择合适的并行计算方法。