高要建设网站,一流的做pc端网站,多语种企业网站建设,自用电脑做网站[pytorch distributed] 01 nn.DataParallel 数据并行初步
数据并行 vs. 模型并行 数据并行#xff1a;模型拷贝#xff08;per device#xff09;#xff0c;数据 split/chunk#xff08;对batch切分#xff09; 每个device上都拷贝一份完整模型#xff0c;每个device分…[pytorch distributed] 01 nn.DataParallel 数据并行初步
数据并行 vs. 模型并行 数据并行模型拷贝per device数据 split/chunk对batch切分 每个device上都拷贝一份完整模型每个device分别处理1个batch的一部分(如batch_size64, 2个device, 每device处理32个样本)梯度反向传播时每个设备上的梯度求和(求和才是一个完整batch所有样本的loss)汇入中心设备/参数服务器默认gpu0对模型进行梯度优化。 模型并行数据拷贝per device模型 split/chunk显然是单卡放不下模型的情况下 DP DDP DPnn.DataParallel (不推荐) https://pytorch.org/docs/stable/generated/torch.nn.DataParallel.html DDP: DistributedDataParallel (推荐)Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel.
1. 数据并行DP(nn.DataParallel)
预先定义一下Dataset和Model
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoaderclass RandomDataset(Dataset):def __init__(self, size, length):self.len length# 100*5self.data torch.randn(length, size)def __getitem__(self, index):# (5, )return self.data[index]def __len__(self):# 100return self.lenclass Model(nn.Module):# Our modeldef __init__(self, input_size, output_size):# 5 2super(Model, self).__init__()self.fc nn.Linear(input_size, output_size)def forward(self, input):output self.fc(input)print(\tIn Model: input size, input.size(),output size, output.size())return outputinput_size 5 # 模型输入数据维度(b,n) (30, 5)
output_size 2 # 模型输出数据维度(b,n) (30, 2)batch_size 30 # batch size
data_size 100 # 数据集样本数量rand_loader DataLoader(datasetRandomDataset(input_size, data_size),batch_sizebatch_size, shuffleTrue)
# 构造优化器和损失函数
optimizer optim.SGD(model.parameters(), lr0.01)
criterion nn.MSELoss()# 模拟目标值
target torch.randn(64, 5) step1 并行化包裹模型
# Parameters and DataLoaders
# (5, 2)
model Model(input_size, output_size)
if torch.cuda.device_count() 1: # 如果不止1张GPU # 构建数据并行模型device_ids [0, 1] # 使用的设备ID列表# 如3张GPUdim 0[30, xxx] - [15, ...], [15, ...] on 2 GPUsmodel nn.DataParallel(model, device_ids) # 并行化默认使用所有device加载数据torch.nn.DataParallel(module, device_idsNone, output_deviceNone, dim0) model 指传入的模型device_idsNone, 参与训练的 GPU 有哪些device_idsgpus默认None是使用全部device output_deviceNone 指定中心设备(参数服务器)用于汇总梯度的 GPU 是哪个output_devicegpus[0] dim0 从那一维度进行数据切分默认batch维度 在执行 forward/backward 之前使用 DataParallel 将 model 复制到 device_ids 指定设备上进行数据并行处理。 model.to(cuda:0)不同的是tensor的to(device)是在device上生成一个拷贝不改变原来cpu上的tensor而model是直接将原model转移到gpu上。
step2加载到device0
设置中心设备(参数服务器)用于反向传播时的梯度汇总一般指定cuda:0
# 将模型从cpu放在gpu 0上
device torch.device(cuda:0 if torch.cuda.is_available() else cpu)
model.to(device)step3forward前向传播
模型forward时将data_loader加载的一个batch的数据进行切分送入不同device的模型进行计算再将结果合并输出。
for data in rand_loader:# input_var can be on any device, including CPUinput data.to(device)
# input dataoutput model(input)print(Outside: input size, input.size(),output_size, output.size())
In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])step4反向传播梯度聚合
loss.backward()分别在每个device上计算loss的梯度average_gradients(model)将梯度聚合到中心设备/参数服务器(cuda:0)上进行梯度优化 # 在每个设备上进行前向传播和梯度计算loss criterion(output, target)loss.backward()# 对各个设备上的梯度进行求和average_gradients(model)# 使用原始设备模型进行梯度优化optimizer.step()2. 分布式数据并行DDP(nn.parallel.DistributedDataParallel) multiple GPUs in a single machine/server/node单机多卡 分布式数据并行时模型model parameters/优化器optimizer states每张卡都会拷贝一份replicas DDP 始终在卡间维持着模型参数和优化器状态的同步一致性在整个训练过程中 Data Parallel一个batch的数据通过 DistributedSampler 切分split 分发到不同的 gpus 上 此时虽然模型/optimizer 相同但因为每个device的数据输入不同导致 loss 不同反向传播时计算到的梯度也会不同此时 ddp 通过 ring all-reduce algorithm 保证每个batch step结束后不同卡间model/optimizer 的同步一致性 如上图所示Ring all-reduce algorithm 首先会将所有的 gpu cards 连成一个 ring环其同步过程不需要等待所有的卡都计算完一轮梯度经过这个同步过程之后所有的卡的 models/optimizers 就都会保持一致的状态 Ring all-reduce algorithm 计算和同步的几个过程 红线GPUs 分别计算损失forward和梯度backward蓝线梯度的聚合到中心device/参数服务器上(gpu0)绿线模型/优化器参数的更新及广播broadcast
其实参数服务器可以是一个GPU0也可以是CPU也可以是所有GPU 但将数据发送到GPU0会成为device通信的瓶颈
所以采用环形的梯度聚合方式更加高效
DDP基本概念 world world 表示包含所有进程的组(所有gpu的集合)。每个进程通常对应一个 GPU world 中的进程可以相互通信这使得使用分布式数据并行Distributed Data Parallel, DDP进行训练成为可能。 world_sizegpu个数/进程个数 world_size 表示分布式训练环境中的总进程数/gpu数。每个进程都会被分配一个唯一的标识符rank从 0 到 world_size-1。 rank进程标识符 rank 是分配给world中每个进程的唯一标识符用于标识每个进程在分布式训练中的角色。local rank是分配个单个node中每个进程的标识符world中可能有多个node。 node节点 node 可以理解为一个服务器代表着物理设备上的一个实体。在多机分布式训练中每台机器被视为一个节点节点之间需要进行通信。例如如果有2 个node/server每个 node/server/machine 各有4张卡4 gpus。total_world_size 2节点数 * 4每个节点的 GPU 数量 8 rank 的取值范围为 [0, 1, 2, 3, 4, 5, 6, 7] local_rank 的取值范围为 [0, 1, 2, 3][0, 1, 2, 3] 分别对应着不同的节点上的进程。 All to one聚合过程reduce所有GPU(model和optiminizer状态)汇聚到参数服务器 one to All广播过程broadcast参数服务器广播到所有GPU
torchrun
torchrun运行分布式train.py脚本nproc-per-node设置每个node服务器上的gpu个数(一般是1个服务器)ddp_gpus_torchrun.py脚本名称--max_epochs 5 --batch_size 32脚本参数。
!torchrun --nproc-per-node2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32实现batch_size不变的情况下对step的切分 如单卡情况下data_len1024batch_size32则一个gpu的step1024/3232 多卡情况下2个gpudata_len1024batch_size32则每个gpu的step(1024/32)/232/216
step1导入相关的包
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoaderimport torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler # 分发数据
from torch.nn.parallel import DistributedDataParallel as DDP # 包装model使之数据并行
from torch.distributed import init_process_group, destroy_process_groupstep2ddp_setup函数
这个函数用于设置分布式训练的环境。它调用了init_process_group函数来初始化进程组使用的通信backend后端是ncclNVIDIA Collective Communication Library然后使用torch.cuda.set_device函数根据环境变量设置当前进程使用的GPU设备。
def ddp_setup():Args:rank: Unique identifier of each processworld_size: Total number of processes# rank 0 process
# os.environ[MASTER_ADDR] localhost
# os.environ[MASTER_PORT] 12355# ncclNVIDIA Collective Communication Library # 分布式情况下的gpus 间通信init_process_group(backendnccl)torch.cuda.set_device(int(os.environ[LOCAL_RANK]))step3Trainer类
这个类定义了一个模型训练的封装器。在初始化方法中它接收一个模型backend、一个训练数据加载器train_dataloader、一个优化器train_dataloader作为参数并将模型移动到GPU上然后使用DistributedDataParallel对模型进行包装以实现数据并行。(model先放cuda再DDP封装)
_run_batch方法实现了一次批量的训练过程包括前向传播、计算损失、反向传播和更新参数。_run_epoch方法用于遍历整个训练集进行训练self.train_dataloader.sampler.set_epoch(epoch)是用于设置数据加载器的epoch以保证每个GPU在每个epoch开始时加载的数据都是不同的。train方法则用于控制训练的总体流程。
class Trainer:def __init__(self, model: torch.nn.Module, train_dataloader: DataLoader, optimizer: torch.optim.Optimizer, ) - None:self.gpu_id int(os.environ[LOCAL_RANK])self.model model.to(self.gpu_id)self.train_dataloader train_dataloaderself.optimizer optimizerself.model DDP(model, device_ids[self.gpu_id])def _run_batch(self, xs, ys):self.optimizer.zero_grad()output self.model(xs)loss F.cross_entropy(output, ys)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):batch_size len(next(iter(self.train_dataloader))[0])print(f[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)})self.train_dataloader.sampler.set_epoch(epoch)for xs, ys in self.train_dataloader:xs xs.to(self.gpu_id)ys ys.to(self.gpu_id)self._run_batch(xs, ys)def train(self, max_epoch: int):for epoch in range(max_epoch):self._run_epoch(epoch)step4MyTrainDataset类
这个类定义了一个自定义的训练数据集。在初始化方法中它接收一个大小参数并生成一组随机的数据样本。__len__方法返回数据集的大小__getitem__方法用于获取指定索引处的数据样本。
class MyTrainDataset(Dataset):def __init__(self, size):self.size sizeself.data [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]step5main函数
这个函数是程序的主函数。在函数内部首先调用了ddp_setup函数来设置分布式训练的环境。
然后创建了一个自定义的训练数据集和相应的数据加载器以及一个线性模型和一个优化器。DistributedSampler是PyTorch提供的一个分布式采样器用于确保每个进程加载的数据都是不同的且顺序随机。sampler对象被传入训练数据集的构造函数可以通过数据加载器如torch.utils.data.DataLoader的sampler参数指定。在每个进程中DistributedSampler会根据进程ID和进程数量将整个训练数据集划分成多个部分并为每个进程提供其应加载的数据索引。这样在分布式训练过程中每个进程只会加载自己负责的数据部分避免了数据重复加载。
接下来创建了一个Trainer对象并调用其train方法进行模型训练。最后调用destroy_process_group函数销毁进程组。
def main(max_epochs: int, batch_size: int):ddp_setup()train_dataset MyTrainDataset(2048)train_dataloader DataLoader(train_dataset, batch_sizebatch_size, pin_memoryTrue, shuffleFalse, # batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)samplerDistributedSampler(train_dataset))model torch.nn.Linear(20, 1)optimzer torch.optim.SGD(model.parameters(), lr1e-3)trainer Trainer(modelmodel, optimizeroptimzer, train_dataloadertrain_dataloader)trainer.train(max_epochs)destroy_process_group()step6解析命令行参数并运行主函数
在这个步骤中首先使用argparse模块解析命令行参数包括最大训练周期数max_epochs和批量大小batch_size。然后调用main函数并将解析后的参数传递给它进行模型训练。
if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(--max_epochs, typeint, helpTotal epochs to train the model)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()# world_size torch.cuda.device_count()main(args.max_epochs, args.batch_size)3. 模型并行
数据并行是切数据scattering inputs and gathering outputs模型并行是切模型shards 模型并行单卡放不下一份模型将一份大模型不同的层切分到不同的卡上forward时串行执行
Huggingface实现
device_mapHuggingface支持自动实现模型并行 device_map参数的取值[auto, balanced, balanced_low_0, sequential]auto的模型分割优先级:GPU(s) CPU (RAM) Disk
如下如果有两种gpudevice_mapauto使模型的layers的parameter分别加载到两张gpu上(各一半)
from transformers import LlamaTokenizer, LlamaForCausalLM, GenerationConfig
model LlamaForCausalLM.from_pretrained(decapoda-research/llama-7b-hf,load_in_8bitTrue,device_mapauto,
)
for i, para in enumerate(model.named_parameters()):
# print(f{i}, {para[0]}\t {para[1].device} \t{para[1].dtype})print(f{i}, \t {para[1].device} \t{para[1].dtype})to(device)实现
pytorch模拟模型并行原理分别用to(device)将不同的层加载到不同的gpu上forward时将parameter也加载到对应gpu。
import torch
import torch.nn as nn
import torch.optim as optimclass ToyModel(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.net1 torch.nn.Linear(10000, 10).to(cuda:0)self.relu torch.nn.ReLU()self.net2 torch.nn.Linear(10, 5).to(cuda:1)def forward(self, x):# 卡间串行执行x self.net1(x.to(cuda:0)))x self.net2(self.relu(x.to(cuda:1))return x进行一个batch的train每个batch_size20样本5分类
model ToyModel()
loss_fn nn.MSELoss()
optimizer optim.SGD(model.parameters(), lr0.001)optimizer.zero_grad()
outputs model(torch.randn(20, 10000))
labels torch.randn(20, 5).to(cuda:1)
loss_fn(outputs, labels).backward()
optimizer.step()4. Deepspeed
DeepSpeed炼丹小白居家旅行必备【神器】
技术栈
术语其实和前面DDP的概念一样。 Train的数据4部分组成model模型参数、backward的梯度gradient、optimizer优化器参数、forward的数据tensor
Deepspeed、ZeRO技术方案分发Partitioning(按gpu数量N等分数据)、卸载Offload(不用的数据放入CPU)、模型并行Pipeline(模型参数按层切分到不同gpu上)
step1deepspeed初始化
# init distributed
deepspeed.init_distributed()加载参数local_rank
def parse_arguments():import argparseparser argparse.ArgumentParser(descriptiondeepspeed training script.)parser.add_argument(--local_rank, typeint, default-1,helplocal rank passed from distributed launcher)# Include DeepSpeed configuration argumentsparser deepspeed.add_config_arguments(parser)args parser.parse_args()return argsstep2deepspeed封装模型和数据集
deepspeed.initialize()封装model和dataset相当于将模型和数据集交给deepspeed进行托管engine就是deepspeed封装后的model其他返回值同样都是deepspeed封装过的。(其中optimizer和lr_scheduler 后面是用不到的)我们只需要模型engine和数据加载器training_dataloader。
还要传入一个deepspeed的配置文件deepspeed_config。
# init model
model MyClassifier(3, 100, ch_multi128)
# init dataset
ds MyDataset((3, 512, 512), 100, sample_countint(1e6))# init engine
engine, optimizer, training_dataloader, lr_scheduler deepspeed.initialize(argsargs,modelmodel,model_parametersmodel.parameters(),training_datads,configdeepspeed_config,
)
# load checkpoint
engine.load_checkpoint(./data/checkpoints/MyClassifier/)step3训练
在使用DeepSpeed进行分布式训练时通常不需要手动调用optimizer.zero_grad()来清零梯度。DeepSpeed会自动处理梯度累积和梯度清零的操作无需手动调用zero_grad()。
当使用DeepSpeed进行分布式训练时一般会在engine.backward(loss)之后调用engine.step()来执行梯度更新操作。在engine.step()中DeepSpeed会执行优化器的step()方法来更新模型参数并在必要的时候自动清零梯度以便进行下一轮的反向传播。
engine.train()for step, (data, label) in enumerate(training_dataloader):step 1data data.to(deviceengine.device, dtypetorch.float16) # xlabel label.to(deviceengine.device, dtypetorch.long).reshape(-1) # y# 不需要梯度清零optimizer.zero_grad()outputs engine(data) # forwardloss F.cross_entropy(outputs, label )engine.backward(loss)engine.step()单机节点node多卡gpu运行
deepspeed \--launcher_args source ${PWD}/setup_env.sh \--hostfile hostfile \deepspeed_script.py \--deepspeed \--deepspeed_config $PWD/deepspeed_config.jsondeepspeed_config.json
{train_micro_batch_size_per_gpu: 1,gradient_accumulation_steps: 1,optimizer: {type: Adam,params: {lr: 0.001,betas: [0.8,0.999],eps: 1e-08,weight_decay: 3e-07}},scheduler: {type: WarmupLR,params: {warmup_min_lr: 0,warmup_max_lr: 0.001,warmup_num_steps: 1000}},activation_checkpointing: {partition_activations: true,cpu_checkpointing: true,contiguous_memory_optimization: false,number_checkpoints: null,synchronize_checkpoint_boundary: false,profile: true},fp16: {enabled: true,auto_cast: false,loss_scale: 0,initial_scale_power: 16,loss_scale_window: 1000,hysteresis: 2,consecutive_hysteresis: false,min_loss_scale: 1},zero_optimization: {stage: 3,offload_param: {device: cpu,pin_memory: true},offload_optimizer: {device: cpu,pin_memory: true},contiguous_gradients: true,overlap_comm: true}
}